Created
April 2, 2025 16:15
-
-
Save madhurprash/714858203c33ef9e3d431eebf0871f31 to your computer and use it in GitHub Desktop.
This gist moves folders from within the same s3 bucket to another using pythons asyncio functionality
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import shutil | |
| import concurrent.futures | |
| import boto3 | |
| from pathlib import Path | |
| import logging | |
| import time | |
| def move_folders_to_combined(bucket_name, folders_to_move=["SSE", "FPA", "Hypershield"], log_level=logging.INFO): | |
| """ | |
| Move specified folders to the 'combined' folder in an S3 bucket using parallel processing. | |
| Args: | |
| bucket_name (str): Name of the S3 bucket | |
| folders_to_move (list): List of folder names to move to combined | |
| log_level (int): Logging level (default: logging.INFO) | |
| Returns: | |
| dict: Summary of results including success status and moved object counts | |
| """ | |
| # Configure logging | |
| logging.basicConfig( | |
| level=log_level, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler(f"s3_move_{time.strftime('%Y%m%d_%H%M%S')}.log"), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| logger = logging.getLogger("s3_folder_mover") | |
| logger.info(f"Starting move operation for folders {folders_to_move} to combined/ in bucket {bucket_name}") | |
| start_time = time.time() | |
| s3_client = boto3.client('s3') | |
| results = {folder: {"success": False, "objects_moved": 0} for folder in folders_to_move} | |
| def move_folder(folder_name): | |
| """Helper function to move a single folder to combined""" | |
| folder_start_time = time.time() | |
| logger.info(f"Processing folder: {folder_name}") | |
| try: | |
| # List all objects in the source folder | |
| paginator = s3_client.get_paginator('list_objects_v2') | |
| source_prefix = f"{folder_name}/" | |
| objects_to_move = [] | |
| logger.info(f"Listing objects in {source_prefix}") | |
| for page in paginator.paginate(Bucket=bucket_name, Prefix=source_prefix): | |
| if 'Contents' in page: | |
| for obj in page['Contents']: | |
| key = obj['Key'] | |
| target_key = f"combined/{key}" | |
| objects_to_move.append((key, target_key)) | |
| logger.info(f"Found {len(objects_to_move)} objects to move in {folder_name}/") | |
| # Use ThreadPoolExecutor to copy objects in parallel | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: | |
| def copy_object(source_key, target_key): | |
| try: | |
| logger.debug(f"Copying {source_key} to {target_key}") | |
| s3_client.copy_object( | |
| CopySource={'Bucket': bucket_name, 'Key': source_key}, | |
| Bucket=bucket_name, | |
| Key=target_key | |
| ) | |
| return source_key | |
| except Exception as e: | |
| logger.error(f"Error copying {source_key}: {str(e)}") | |
| raise | |
| # Submit all copy tasks | |
| future_to_key = { | |
| executor.submit(copy_object, src, tgt): src | |
| for src, tgt in objects_to_move | |
| } | |
| # Process results as they complete | |
| moved_count = 0 | |
| for future in concurrent.futures.as_completed(future_to_key): | |
| source_key = future_to_key[future] | |
| try: | |
| future.result() | |
| moved_count += 1 | |
| if moved_count % 100 == 0: | |
| logger.info(f"Progress for {folder_name}: {moved_count}/{len(objects_to_move)} objects copied") | |
| except Exception as e: | |
| logger.error(f"Failed to copy {source_key}: {str(e)}") | |
| results[folder_name]["objects_moved"] = moved_count | |
| results[folder_name]["success"] = True | |
| folder_end_time = time.time() | |
| logger.info(f"Completed moving {folder_name}/ with {moved_count} objects in {folder_end_time - folder_start_time:.2f} seconds") | |
| # Optional: Delete original objects after successful copy | |
| # Uncomment the following code if you want to delete the original objects | |
| # logger.info(f"Deleting original objects from {folder_name}/") | |
| # for key, _ in objects_to_move: | |
| # s3_client.delete_object(Bucket=bucket_name, Key=key) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error moving folder {folder_name}: {str(e)}", exc_info=True) | |
| return False | |
| # Move all folders in parallel | |
| logger.info(f"Starting parallel processing of {len(folders_to_move)} folders") | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: | |
| executor.map(move_folder, folders_to_move) | |
| end_time = time.time() | |
| total_objects_moved = sum(folder_result["objects_moved"] for folder_result in results.values()) | |
| logger.info(f"Operation completed: Moved {total_objects_moved} objects in {end_time - start_time:.2f} seconds") | |
| logger.info(f"Summary: {results}") | |
| return results | |
| # Example usage: | |
| move_folders_to_combined('<your-bucket-name>') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment