-
-
Save dharmab/504db2328cfc1e424a657384d1b4f1e2 to your computer and use it in GitHub Desktop.
Issue 437 reproduction script
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| azure-cli==2.0.32 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| from azure.common.client_factory import get_client_from_cli_profile | |
| from azure.mgmt.storage import StorageManagementClient | |
| from azure.storage.blob.blockblobservice import BlockBlobService | |
| import argparse | |
| import random | |
| import string | |
| def borrow_storage_access_key(storage_management_client, resource_group_name, storage_account_name): | |
| # Azure will create two keys for us automatically named 'key1' and 'key2' | |
| storage_access_keys = storage_management_client.storage_accounts.list_keys( | |
| resource_group_name=resource_group_name, | |
| account_name=storage_account_name | |
| ).keys | |
| # Grab any key with write permissions, in case key1 and key2 have been | |
| # deleted | |
| return next((k.value for k in storage_access_keys if k.permissions.value == 'Full')) | |
| def construct_block_blob_service_interface(storage_client, resource_group_name, storage_account_name): | |
| storage_access_key = borrow_storage_access_key( | |
| storage_management_client=storage_client, | |
| resource_group_name=resource_group_name, | |
| storage_account_name=storage_account_name | |
| ) | |
| return BlockBlobService( | |
| account_name=storage_account_name, | |
| account_key=storage_access_key | |
| ) | |
| def random_string(length): | |
| return ''.join(random.choices(string.ascii_lowercase, k=length)) | |
| def upload_file(block_blob_service): | |
| container_name = random_string(length=12) | |
| print(f'Creating storage container "{container_name}"') | |
| block_blob_service.create_container(container_name) | |
| filename = random_string(length=5) + '.txt' | |
| content = random_string(length=256) | |
| print(f'Checking if {filename} exists') | |
| if not block_blob_service.exists(container_name=container_name, blob_name=filename): | |
| print(f'Uploading {filename}') | |
| block_blob_service.create_blob_from_text( | |
| container_name=container_name, | |
| blob_name=filename, | |
| text=content | |
| ) | |
| def parse_args(): | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument('--subscription-id', '-s', help='Azure Subscription ID', required=True) | |
| parser.add_argument('--resource-group', '-g', help='Azure Resource Group Name', required=True) | |
| parser.add_argument('--storage_account', '-n', help='Azure Storage Account Name', required=True) | |
| return parser.parse_args() | |
| def main(): | |
| options = parse_args() | |
| storage_client = get_client_from_cli_profile( | |
| StorageManagementClient, | |
| subscription_id=options.subscription_id | |
| ) | |
| blob_service = construct_block_blob_service_interface( | |
| storage_client=storage_client, | |
| resource_group_name=options.resource_group, | |
| storage_account_name=options.storage_account | |
| ) | |
| upload_file(blob_service) | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment