Created
November 27, 2025 18:04
-
-
Save Plasmoxy/a7427a57d19e1df4df7857e7a61c9945 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Backup yo photos with this simple skrippp!! | |
| import shutil | |
| from pathlib import Path | |
| from datetime import datetime | |
| import exifread | |
| import os | |
| from tqdm import tqdm | |
| import json | |
| class PhotoBackup: | |
| def __init__(self, source_dir, target_dir, date_from=None, date_until=None): | |
| self.source_dir = Path(source_dir) | |
| self.target_dir = Path(target_dir) | |
| self.date_from = date_from | |
| self.date_until = date_until | |
| self.stats = { | |
| 'total_found': 0, | |
| 'copied': 0, | |
| 'skipped': 0, | |
| 'filtered': 0, | |
| 'errors': 0 | |
| } | |
| self.current_file_pbar = None | |
| self.overall_pbar = None | |
| def get_media_extensions(self): | |
| """Common photo and video file extensions""" | |
| return { | |
| # Photos | |
| '.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.tif', | |
| '.raw', '.cr2', '.nef', '.arw', '.dng', '.rw2', '.orf', | |
| '.heic', '.heif', '.webp', | |
| # Videos | |
| '.mp4', '.mov', '.avi', '.mkv', '.m4v', '.3gp', | |
| '.mts', '.m2ts', '.wmv', '.flv' | |
| } | |
| def get_capture_date_from_exif(self, file_path): | |
| """ | |
| Extract capture date from EXIF data without fully loading image. | |
| Works for most photo formats. | |
| Returns: | |
| datetime object or None | |
| """ | |
| try: | |
| with open(file_path, 'rb') as f: | |
| tags = exifread.process_file(f, stop_tag='DateTimeOriginal', details=False) | |
| # Try different EXIF date tags | |
| date_tags = ['EXIF DateTimeOriginal', 'EXIF DateTimeDigitized', 'Image DateTime'] | |
| for tag in date_tags: | |
| if tag in tags: | |
| date_str = str(tags[tag]) | |
| # EXIF format: "2024:01:15 14:30:45" | |
| try: | |
| return datetime.strptime(date_str, '%Y:%m:%d %H:%M:%S') | |
| except ValueError: | |
| continue | |
| return None | |
| except Exception: | |
| # EXIF reading failed (common for videos) | |
| return None | |
| def get_capture_date(self, file_path): | |
| """ | |
| Get capture date from EXIF or fallback to file modification time. | |
| For videos, uses file modification time directly as EXIF is not typically available. | |
| Returns: | |
| datetime object | |
| """ | |
| # Check if it's a video file | |
| video_extensions = {'.mp4', '.mov', '.avi', '.mkv', '.m4v', '.3gp', | |
| '.mts', '.m2ts', '.wmv', '.flv'} | |
| if file_path.suffix.lower() in video_extensions: | |
| # For videos, use file modification time directly | |
| # (videos typically don't have EXIF data like photos) | |
| timestamp = os.path.getmtime(file_path) | |
| return datetime.fromtimestamp(timestamp) | |
| # For photos, try EXIF first | |
| exif_date = self.get_capture_date_from_exif(file_path) | |
| if exif_date: | |
| return exif_date | |
| # Fallback to file modification time | |
| timestamp = os.path.getmtime(file_path) | |
| return datetime.fromtimestamp(timestamp) | |
| def is_within_date_range(self, capture_date): | |
| """ | |
| Check if capture date is within the specified date range. | |
| Args: | |
| capture_date: datetime object | |
| Returns: | |
| True if within range, False otherwise | |
| """ | |
| # If no date filters specified, include all files | |
| if self.date_from is None and self.date_until is None: | |
| return True | |
| # Check from date (inclusive) | |
| if self.date_from is not None: | |
| if capture_date.date() < self.date_from: | |
| return False | |
| # Check until date (inclusive) | |
| if self.date_until is not None: | |
| if capture_date.date() > self.date_until: | |
| return False | |
| return True | |
| def find_all_media_files(self): | |
| """ | |
| Recursively find all media files in source directory. | |
| Returns: | |
| List of Path objects | |
| """ | |
| media_extensions = self.get_media_extensions() | |
| files = [] | |
| print(f"Scanning {self.source_dir} for media files...") | |
| for file_path in self.source_dir.rglob('*'): | |
| if file_path.is_file() and file_path.suffix.lower() in media_extensions: | |
| files.append(file_path) | |
| self.stats['total_found'] = len(files) | |
| print(f"Found {len(files)} media files\n") | |
| return files | |
| def get_destination_path(self, source_path, capture_date): | |
| """ | |
| Generate destination path based on capture date. | |
| Args: | |
| source_path: Source file path | |
| capture_date: datetime object | |
| Returns: | |
| Path object for destination or None if file should be skipped | |
| """ | |
| # Format: YYYY-MM-DD | |
| date_folder = capture_date.strftime('%Y-%m-%d') | |
| # Create subdirectory path | |
| dest_subdir = self.target_dir / date_folder | |
| # Keep original filename | |
| dest_path = dest_subdir / source_path.name | |
| # Handle filename conflicts | |
| counter = 1 | |
| original_stem = source_path.stem | |
| while dest_path.exists(): | |
| # Check if files are identical (size and modification time) | |
| if self.files_are_same(source_path, dest_path): | |
| return None # Skip identical file | |
| # Add counter to filename | |
| dest_path = dest_subdir / f"{original_stem}_{counter}{source_path.suffix}" | |
| counter += 1 | |
| return dest_path | |
| def files_are_same(self, file1, file2): | |
| """ | |
| Quick comparison by size and modification time. | |
| Returns: | |
| True if files appear to be the same | |
| """ | |
| stat1 = file1.stat() | |
| stat2 = file2.stat() | |
| # Compare size | |
| if stat1.st_size != stat2.st_size: | |
| return False | |
| # Compare modification time (within 2 seconds tolerance) | |
| time_diff = abs(stat1.st_mtime - stat2.st_mtime) | |
| if time_diff < 2: | |
| return True | |
| return False | |
| def copy_with_progress(self, source, destination): | |
| """ | |
| Copy file using shutil with progress callback. | |
| Args: | |
| source: Source file path | |
| destination: Destination file path | |
| """ | |
| file_size = source.stat().st_size | |
| # Reset and configure current file progress bar | |
| self.current_file_pbar.reset(total=file_size) | |
| self.current_file_pbar.set_description_str( | |
| f"Current: {source.name[:45]:45}" | |
| ) | |
| # Create destination directory | |
| destination.parent.mkdir(parents=True, exist_ok=True) | |
| # Progress callback | |
| def progress_callback(bytes_copied): | |
| self.current_file_pbar.update(bytes_copied - self.current_file_pbar.n) | |
| # Copy file with callback (Python 3.8+) | |
| try: | |
| # For Python 3.8+, shutil.copy supports callback via copyfileobj | |
| with open(source, 'rb') as fsrc: | |
| with open(destination, 'wb') as fdst: | |
| self._copyfileobj_with_progress(fsrc, fdst, file_size) | |
| # Preserve metadata | |
| shutil.copystat(source, destination) | |
| except Exception as e: | |
| # Clean up partial file if copy failed | |
| if destination.exists(): | |
| destination.unlink() | |
| raise e | |
| def _copyfileobj_with_progress(self, fsrc, fdst, total_size): | |
| """ | |
| Copy file object with progress updates. | |
| """ | |
| chunk_size = 1024 * 1024 # 1MB chunks | |
| copied = 0 | |
| while True: | |
| chunk = fsrc.read(chunk_size) | |
| if not chunk: | |
| break | |
| fdst.write(chunk) | |
| copied += len(chunk) | |
| self.current_file_pbar.update(len(chunk)) | |
| def backup(self): | |
| """ | |
| Main backup process with progress tracking. | |
| """ | |
| print(f"{'='*70}") | |
| print(f"Photo/Video Backup Script") | |
| print(f"{'='*70}") | |
| print(f"Source: {self.source_dir}") | |
| print(f"Target: {self.target_dir}") | |
| if self.date_from or self.date_until: | |
| print(f"Date filter:") | |
| if self.date_from: | |
| print(f" From: {self.date_from}") | |
| if self.date_until: | |
| print(f" Until: {self.date_until}") | |
| print(f"{'='*70}\n") | |
| # Find all media files | |
| media_files = self.find_all_media_files() | |
| if not media_files: | |
| print("No media files found!") | |
| return | |
| # Create progress bars with fixed positions | |
| self.overall_pbar = tqdm( | |
| total=len(media_files), | |
| desc="Overall ", | |
| position=0, | |
| leave=True, | |
| unit='file', | |
| colour='green', | |
| bar_format='{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]' | |
| ) | |
| self.current_file_pbar = tqdm( | |
| total=0, | |
| desc="Current ", | |
| position=1, | |
| leave=True, | |
| unit='B', | |
| unit_scale=True, | |
| unit_divisor=1024, | |
| colour='blue', | |
| bar_format='{desc}|{bar}| {n_fmt}/{total_fmt} [{rate_fmt}]' | |
| ) | |
| try: | |
| for media_file in media_files: | |
| try: | |
| # Get capture date | |
| capture_date = self.get_capture_date(media_file) | |
| # Check if file is within date range | |
| if not self.is_within_date_range(capture_date): | |
| self.stats['filtered'] += 1 | |
| self.overall_pbar.set_postfix_str( | |
| f"⏭️ Filtered: {media_file.name[:40]} ({capture_date.date()})" | |
| ) | |
| self.overall_pbar.update(1) | |
| continue | |
| # Get destination path | |
| dest_path = self.get_destination_path(media_file, capture_date) | |
| if dest_path is None: | |
| # File already exists and is identical | |
| self.stats['skipped'] += 1 | |
| self.overall_pbar.set_postfix_str( | |
| f"⏭️ Skipped: {media_file.name[:40]}" | |
| ) | |
| else: | |
| # Copy with progress | |
| self.copy_with_progress(media_file, dest_path) | |
| self.stats['copied'] += 1 | |
| self.overall_pbar.set_postfix_str( | |
| f"✅ Copied to: {dest_path.parent.name}/" | |
| ) | |
| except Exception as e: | |
| self.stats['errors'] += 1 | |
| # Show specific file and error details | |
| error_msg = f"❌ Error in '{media_file.name}': {str(e)}" | |
| self.overall_pbar.set_postfix_str(error_msg[:70]) | |
| # Also print to console for permanent record | |
| print(f"\nError processing file: {media_file}") | |
| print(f" Error details: {str(e)}\n") | |
| # Update overall progress | |
| self.overall_pbar.update(1) | |
| finally: | |
| self.overall_pbar.close() | |
| self.current_file_pbar.close() | |
| # Print summary | |
| self.print_summary() | |
| def print_summary(self): | |
| """Print backup summary statistics""" | |
| print(f"\n{'='*70}") | |
| print(f"Backup Complete!") | |
| print(f"{'='*70}") | |
| print(f"Total files found: {self.stats['total_found']}") | |
| print(f"Files copied: {self.stats['copied']}") | |
| print(f"Files skipped: {self.stats['skipped']}") | |
| if self.date_from or self.date_until: | |
| print(f"Files filtered: {self.stats['filtered']}") | |
| print(f"Errors: {self.stats['errors']}") | |
| print(f"{'='*70}\n") | |
| # CLI interface | |
| if __name__ == "__main__": | |
| import argparse | |
| parser = argparse.ArgumentParser(description='Backup photos and videos from SD card') | |
| parser.add_argument('source', help='Source directory name (relative to /Volumes/LUMIX/DCIM)') | |
| parser.add_argument('target', help='Target directory name (relative to /Volumes/CHLEBIG/Memories)') | |
| parser.add_argument('--from', dest='date_from', | |
| help='Start date (YYYY-MM-DD) - only import files from this date onwards') | |
| parser.add_argument('--until', dest='date_until', help='End date (YYYY-MM-DD) - only import files up to this date') | |
| args = parser.parse_args() | |
| # Build full paths | |
| # Try to load config file for source and target roots | |
| config_file = Path("import_config.json") | |
| SOURCE_ROOT = None | |
| TARGET_ROOT = None | |
| if config_file.exists(): | |
| try: | |
| with open(config_file, 'r') as f: | |
| config = json.load(f) | |
| SOURCE_ROOT = Path(config.get("sourceRoot", "")) | |
| TARGET_ROOT = Path(config.get("targetRoot", "")) | |
| except (json.JSONDecodeError, KeyError, Exception): | |
| pass | |
| # Build source and target directories | |
| if SOURCE_ROOT: | |
| SOURCE_DIR = SOURCE_ROOT / args.source | |
| else: | |
| SOURCE_DIR = Path(args.source) | |
| if TARGET_ROOT: | |
| TARGET_DIR = TARGET_ROOT / args.target | |
| else: | |
| TARGET_DIR = Path(args.target) | |
| # Validate source exists | |
| if not SOURCE_DIR.exists(): | |
| print(f"Error: Source directory does not exist: {SOURCE_DIR}") | |
| exit(1) | |
| # Parse date arguments | |
| date_from = None | |
| date_until = None | |
| if args.date_from: | |
| try: | |
| date_from = datetime.strptime(args.date_from, '%Y-%m-%d').date() | |
| except ValueError: | |
| print(f"Error: Invalid --from date format. Use YYYY-MM-DD") | |
| exit(1) | |
| if args.date_until: | |
| try: | |
| date_until = datetime.strptime(args.date_until, '%Y-%m-%d').date() | |
| except ValueError: | |
| print(f"Error: Invalid --until date format. Use YYYY-MM-DD") | |
| exit(1) | |
| # Validate date range | |
| if date_from and date_until and date_from > date_until: | |
| print(f"Error: --from date must be before or equal to --until date") | |
| exit(1) | |
| backup = PhotoBackup(SOURCE_DIR, TARGET_DIR, date_from, date_until) | |
| backup.backup() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment