Skip to content

Instantly share code, notes, and snippets.

@dharmab
Created May 15, 2018 16:04
Show Gist options
  • Select an option

  • Save dharmab/504db2328cfc1e424a657384d1b4f1e2 to your computer and use it in GitHub Desktop.

Select an option

Save dharmab/504db2328cfc1e424a657384d1b4f1e2 to your computer and use it in GitHub Desktop.
Issue 437 reproduction script
azure-cli==2.0.32
#!/usr/bin/env python3
from azure.common.client_factory import get_client_from_cli_profile
from azure.mgmt.storage import StorageManagementClient
from azure.storage.blob.blockblobservice import BlockBlobService
import argparse
import random
import string
def borrow_storage_access_key(storage_management_client, resource_group_name, storage_account_name):
# Azure will create two keys for us automatically named 'key1' and 'key2'
storage_access_keys = storage_management_client.storage_accounts.list_keys(
resource_group_name=resource_group_name,
account_name=storage_account_name
).keys
# Grab any key with write permissions, in case key1 and key2 have been
# deleted
return next((k.value for k in storage_access_keys if k.permissions.value == 'Full'))
def construct_block_blob_service_interface(storage_client, resource_group_name, storage_account_name):
storage_access_key = borrow_storage_access_key(
storage_management_client=storage_client,
resource_group_name=resource_group_name,
storage_account_name=storage_account_name
)
return BlockBlobService(
account_name=storage_account_name,
account_key=storage_access_key
)
def random_string(length):
return ''.join(random.choices(string.ascii_lowercase, k=length))
def upload_file(block_blob_service):
container_name = random_string(length=12)
print(f'Creating storage container "{container_name}"')
block_blob_service.create_container(container_name)
filename = random_string(length=5) + '.txt'
content = random_string(length=256)
print(f'Checking if {filename} exists')
if not block_blob_service.exists(container_name=container_name, blob_name=filename):
print(f'Uploading {filename}')
block_blob_service.create_blob_from_text(
container_name=container_name,
blob_name=filename,
text=content
)
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('--subscription-id', '-s', help='Azure Subscription ID', required=True)
parser.add_argument('--resource-group', '-g', help='Azure Resource Group Name', required=True)
parser.add_argument('--storage_account', '-n', help='Azure Storage Account Name', required=True)
return parser.parse_args()
def main():
options = parse_args()
storage_client = get_client_from_cli_profile(
StorageManagementClient,
subscription_id=options.subscription_id
)
blob_service = construct_block_blob_service_interface(
storage_client=storage_client,
resource_group_name=options.resource_group,
storage_account_name=options.storage_account
)
upload_file(blob_service)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment