Skip to content

Instantly share code, notes, and snippets.

@madhurprash
Created April 2, 2025 16:15
Show Gist options
  • Select an option

  • Save madhurprash/714858203c33ef9e3d431eebf0871f31 to your computer and use it in GitHub Desktop.

Select an option

Save madhurprash/714858203c33ef9e3d431eebf0871f31 to your computer and use it in GitHub Desktop.
This gist moves folders from within the same s3 bucket to another using pythons asyncio functionality
import os
import shutil
import concurrent.futures
import boto3
from pathlib import Path
import logging
import time
def move_folders_to_combined(bucket_name, folders_to_move=["SSE", "FPA", "Hypershield"], log_level=logging.INFO):
"""
Move specified folders to the 'combined' folder in an S3 bucket using parallel processing.
Args:
bucket_name (str): Name of the S3 bucket
folders_to_move (list): List of folder names to move to combined
log_level (int): Logging level (default: logging.INFO)
Returns:
dict: Summary of results including success status and moved object counts
"""
# Configure logging
logging.basicConfig(
level=log_level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(f"s3_move_{time.strftime('%Y%m%d_%H%M%S')}.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger("s3_folder_mover")
logger.info(f"Starting move operation for folders {folders_to_move} to combined/ in bucket {bucket_name}")
start_time = time.time()
s3_client = boto3.client('s3')
results = {folder: {"success": False, "objects_moved": 0} for folder in folders_to_move}
def move_folder(folder_name):
"""Helper function to move a single folder to combined"""
folder_start_time = time.time()
logger.info(f"Processing folder: {folder_name}")
try:
# List all objects in the source folder
paginator = s3_client.get_paginator('list_objects_v2')
source_prefix = f"{folder_name}/"
objects_to_move = []
logger.info(f"Listing objects in {source_prefix}")
for page in paginator.paginate(Bucket=bucket_name, Prefix=source_prefix):
if 'Contents' in page:
for obj in page['Contents']:
key = obj['Key']
target_key = f"combined/{key}"
objects_to_move.append((key, target_key))
logger.info(f"Found {len(objects_to_move)} objects to move in {folder_name}/")
# Use ThreadPoolExecutor to copy objects in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
def copy_object(source_key, target_key):
try:
logger.debug(f"Copying {source_key} to {target_key}")
s3_client.copy_object(
CopySource={'Bucket': bucket_name, 'Key': source_key},
Bucket=bucket_name,
Key=target_key
)
return source_key
except Exception as e:
logger.error(f"Error copying {source_key}: {str(e)}")
raise
# Submit all copy tasks
future_to_key = {
executor.submit(copy_object, src, tgt): src
for src, tgt in objects_to_move
}
# Process results as they complete
moved_count = 0
for future in concurrent.futures.as_completed(future_to_key):
source_key = future_to_key[future]
try:
future.result()
moved_count += 1
if moved_count % 100 == 0:
logger.info(f"Progress for {folder_name}: {moved_count}/{len(objects_to_move)} objects copied")
except Exception as e:
logger.error(f"Failed to copy {source_key}: {str(e)}")
results[folder_name]["objects_moved"] = moved_count
results[folder_name]["success"] = True
folder_end_time = time.time()
logger.info(f"Completed moving {folder_name}/ with {moved_count} objects in {folder_end_time - folder_start_time:.2f} seconds")
# Optional: Delete original objects after successful copy
# Uncomment the following code if you want to delete the original objects
# logger.info(f"Deleting original objects from {folder_name}/")
# for key, _ in objects_to_move:
# s3_client.delete_object(Bucket=bucket_name, Key=key)
return True
except Exception as e:
logger.error(f"Error moving folder {folder_name}: {str(e)}", exc_info=True)
return False
# Move all folders in parallel
logger.info(f"Starting parallel processing of {len(folders_to_move)} folders")
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
executor.map(move_folder, folders_to_move)
end_time = time.time()
total_objects_moved = sum(folder_result["objects_moved"] for folder_result in results.values())
logger.info(f"Operation completed: Moved {total_objects_moved} objects in {end_time - start_time:.2f} seconds")
logger.info(f"Summary: {results}")
return results
# Example usage:
move_folders_to_combined('<your-bucket-name>')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment