Created
August 7, 2025 11:39
-
-
Save samwho/dd55b9165e27375eaa15b2684d5c6744 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env uv run --script | |
| # /// script | |
| # requires-python = ">=3.9" | |
| # dependencies = [ | |
| # "requests", | |
| # "watchdog", | |
| # ] | |
| # /// | |
| import argparse | |
| import sys | |
| import time | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Dict, Set | |
| import requests | |
| from watchdog.events import FileSystemEvent, FileSystemEventHandler | |
| from watchdog.observers import Observer | |
| # ANSI color codes | |
| class Colors: | |
| RED = "\033[0;31m" | |
| GREEN = "\033[0;32m" | |
| YELLOW = "\033[1;33m" | |
| BLUE = "\033[0;34m" | |
| MAGENTA = "\033[0;35m" | |
| CYAN = "\033[0;36m" | |
| NC = "\033[0m" # No Color | |
| BOLD = "\033[1m" | |
| # Color helper functions | |
| def red(text): | |
| print(f"{Colors.RED}{text}{Colors.NC}") | |
| def green(text): | |
| print(f"{Colors.GREEN}{text}{Colors.NC}") | |
| def yellow(text): | |
| print(f"{Colors.YELLOW}{text}{Colors.NC}") | |
| def blue(text): | |
| print(f"{Colors.BLUE}{text}{Colors.NC}") | |
| def cyan(text): | |
| print(f"{Colors.CYAN}{text}{Colors.NC}") | |
| @dataclass | |
| class FileMonitor: | |
| """Tracks the state of a file being monitored""" | |
| path: str | |
| first_seen: float | |
| last_size: int | |
| stable_count: int = 0 | |
| class VoiceMemosEventHandler(FileSystemEventHandler): | |
| """Handles file system events for Voice Memos""" | |
| def __init__(self, watcher): | |
| self.watcher = watcher | |
| def on_created(self, event: FileSystemEvent): | |
| if not event.is_directory and event.src_path.endswith(".m4a"): | |
| self.watcher.add_file_to_monitor(event.src_path) | |
| def on_modified(self, event: FileSystemEvent): | |
| if not event.is_directory and event.src_path.endswith(".m4a"): | |
| self.watcher.add_file_to_monitor(event.src_path) | |
| def on_moved(self, event: FileSystemEvent): | |
| if not event.is_directory and event.dest_path.endswith(".m4a"): | |
| self.watcher.add_file_to_monitor(event.dest_path) | |
| class VoiceMemosWatcher: | |
| def __init__( | |
| self, | |
| endpoint: str, | |
| api_key: str = "", | |
| polling_interval: float = 5.0, | |
| stability_checks: int = 3, | |
| ): | |
| self.watch_dir = ( | |
| Path.home() | |
| / "Library/Group Containers/group.com.apple.VoiceMemos.shared/Recordings" | |
| ) | |
| self.endpoint = endpoint | |
| self.api_key = api_key | |
| self.polling_interval = polling_interval | |
| self.stability_checks = stability_checks | |
| self.processed_files: Set[str] = set() | |
| self.monitoring: Dict[str, FileMonitor] = {} | |
| # Track startup time to ignore existing files | |
| self.startup_time = time.time() | |
| # File system observer | |
| self.observer = Observer() | |
| self.event_handler = VoiceMemosEventHandler(self) | |
| def add_file_to_monitor(self, filepath: str): | |
| """Add a file to the monitoring queue""" | |
| path = Path(filepath) | |
| # Skip if already processed | |
| if filepath in self.processed_files: | |
| return | |
| # Skip if doesn't exist | |
| if not path.exists(): | |
| return | |
| # Skip files that existed before the watcher started | |
| try: | |
| if path.stat().st_mtime < self.startup_time: | |
| return | |
| except OSError: | |
| return | |
| # Add to monitoring if not already there | |
| if filepath not in self.monitoring: | |
| self.monitoring[filepath] = FileMonitor( | |
| path=filepath, | |
| first_seen=time.time(), | |
| last_size=self.get_file_size(path), | |
| ) | |
| filename = path.name | |
| blue( | |
| f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Detected: {filename}" | |
| ) | |
| def get_file_size(self, filepath: Path) -> int: | |
| """Get file size in bytes""" | |
| try: | |
| return filepath.stat().st_size | |
| except OSError: | |
| return 0 | |
| def format_size(self, size_bytes: int) -> str: | |
| """Format size in human-readable format""" | |
| size_mb = size_bytes / (1024 * 1024) | |
| return f"{size_mb:.2f}MB" | |
| def upload_file(self, filepath: str) -> bool: | |
| """Upload a file to the endpoint""" | |
| path = Path(filepath) | |
| filename = path.name | |
| yellow(f" 🛜 Uploading: {filename}") | |
| if not path.exists(): | |
| red(" ✗ File no longer exists") | |
| return False | |
| filesize = self.get_file_size(path) | |
| print(f" 📊 File size: {self.format_size(filesize)}") | |
| try: | |
| print(f" → Uploading to endpoint: {self.endpoint}") | |
| # Read file directly (requires Full Disk Access) | |
| with open(path, "rb") as f: | |
| files = { | |
| "file": (filename, f, "audio/x-m4a"), | |
| "filename": (None, filename), | |
| "timestamp": (None, datetime.utcnow().isoformat() + "Z"), | |
| "filesize": (None, str(filesize)), | |
| } | |
| headers = {} | |
| if self.api_key: | |
| headers["Authorization"] = f"Bearer {self.api_key}" | |
| # Make the request | |
| response = requests.post( | |
| self.endpoint, | |
| files=files, | |
| headers=headers, | |
| ) | |
| if response.status_code >= 200 and response.status_code < 300: | |
| green(f" ✓ Upload successful (HTTP {response.status_code})") | |
| if response.text: | |
| print(f" Response: {response.text}") | |
| return True | |
| else: | |
| red(f" ✗ Upload failed (HTTP {response.status_code})") | |
| if response.text: | |
| red(f" Error: {response.text}") | |
| return False | |
| except requests.exceptions.ConnectionError: | |
| red(" ✗ Connection failed - is the server running?") | |
| return False | |
| except requests.exceptions.Timeout: | |
| red(" ✗ Upload timeout") | |
| return False | |
| except PermissionError: | |
| red(" ✗ Permission denied - grant Full Disk Access to this program") | |
| red(" System Settings > Privacy & Security > Full Disk Access") | |
| return False | |
| except Exception as e: | |
| red(f" ✗ Upload error: {e}") | |
| return False | |
| def process_monitoring_queue(self): | |
| """Process all files being monitored - called from main loop""" | |
| current_time = time.time() | |
| files_to_remove = [] | |
| for filepath, monitor in list(self.monitoring.items()): | |
| path = Path(filepath) | |
| # Check if file still exists | |
| if not path.exists(): | |
| red(f" ✗ File disappeared: {path.name}") | |
| files_to_remove.append(filepath) | |
| continue | |
| current_size = self.get_file_size(path) | |
| wait_time = int(current_time - monitor.first_seen) | |
| # Check if size is stable | |
| if current_size == monitor.last_size: | |
| monitor.stable_count += 1 | |
| if monitor.stable_count == 1: | |
| print(f"\n ⏳ {path.name}: File appears stable, verifying...") | |
| elif monitor.stable_count >= self.stability_checks: | |
| # File is stable, upload it | |
| green(f" ✓ {path.name}: Recording completed (waited {wait_time}s)") | |
| if self.upload_file(filepath): | |
| self.processed_files.add(filepath) | |
| files_to_remove.append(filepath) | |
| else: | |
| # Size changed, reset stability counter | |
| if monitor.stable_count > 0: | |
| print( | |
| f" ⏳ {path.name}: File size changed, continuing to monitor..." | |
| ) | |
| else: | |
| print( | |
| f"\r ⏳ {path.name}: Recording in progress... {wait_time}s ({self.format_size(current_size)})", | |
| end="", | |
| flush=True, | |
| ) | |
| monitor.stable_count = 0 | |
| monitor.last_size = current_size | |
| # Timeout after 5 minutes | |
| if wait_time > 300: | |
| yellow(f"\n ⚠ {path.name}: Timeout waiting for recording") | |
| files_to_remove.append(filepath) | |
| # Clean up completed or timed out files | |
| for filepath in files_to_remove: | |
| del self.monitoring[filepath] | |
| def watch(self): | |
| """Start watching for new files""" | |
| green("Starting Voice Memos watcher...") | |
| yellow(f"Watching: {self.watch_dir}") | |
| yellow(f"Endpoint: {self.endpoint}") | |
| cyan("Note: Only monitoring files created after startup") | |
| print() | |
| if not self.watch_dir.exists(): | |
| red(f"Error: Watch directory does not exist: {self.watch_dir}") | |
| print( | |
| "Make sure you have granted Full Disk Access to your terminal application." | |
| ) | |
| sys.exit(1) | |
| # Test read access | |
| try: | |
| list(self.watch_dir.iterdir()) | |
| except PermissionError: | |
| red("Error: Cannot read watch directory") | |
| red("Grant Full Disk Access to your terminal application:") | |
| red(" System Settings > Privacy & Security > Full Disk Access") | |
| sys.exit(1) | |
| green("Now watching for new files...") | |
| blue( | |
| f"ℹ️ Files will be uploaded after {self.stability_checks} seconds of no size changes" | |
| ) | |
| print() | |
| # Start the observer | |
| self.observer.schedule(self.event_handler, str(self.watch_dir), recursive=False) | |
| self.observer.start() | |
| try: | |
| # Main loop - process monitoring queue | |
| while True: | |
| self.process_monitoring_queue() | |
| time.sleep(self.polling_interval) | |
| except KeyboardInterrupt: | |
| yellow("\nShutting down watcher...") | |
| self.observer.stop() | |
| self.observer.join() | |
| green("Watcher stopped.") | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Watch Voice Memos for new recordings and upload them" | |
| ) | |
| parser.add_argument( | |
| "-e", | |
| "--endpoint", | |
| default="http://127.0.0.1:8080/upload", | |
| help="Upload endpoint URL (default: http://127.0.0.1:8080/upload)", | |
| ) | |
| parser.add_argument( | |
| "-k", "--api-key", default="", help="API key for authentication" | |
| ) | |
| parser.add_argument( | |
| "-i", | |
| "--interval", | |
| type=float, | |
| default=1.0, | |
| help="Polling interval in seconds (default: 1.0)", | |
| ) | |
| parser.add_argument( | |
| "-s", | |
| "--stability-checks", | |
| type=int, | |
| default=3, | |
| help="Number of polling intervals to wait before considering a file stable (default: 3)", | |
| ) | |
| args = parser.parse_args() | |
| watcher = VoiceMemosWatcher( | |
| endpoint=args.endpoint, | |
| api_key=args.api_key, | |
| polling_interval=args.interval, | |
| stability_checks=args.stability_checks, | |
| ) | |
| watcher.watch() | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment