Skip to content

Instantly share code, notes, and snippets.

@samwho
Created August 7, 2025 11:39
Show Gist options
  • Select an option

  • Save samwho/dd55b9165e27375eaa15b2684d5c6744 to your computer and use it in GitHub Desktop.

Select an option

Save samwho/dd55b9165e27375eaa15b2684d5c6744 to your computer and use it in GitHub Desktop.
#!/usr/bin/env uv run --script
# /// script
# requires-python = ">=3.9"
# dependencies = [
# "requests",
# "watchdog",
# ]
# ///
import argparse
import sys
import time
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Dict, Set
import requests
from watchdog.events import FileSystemEvent, FileSystemEventHandler
from watchdog.observers import Observer
# ANSI color codes
class Colors:
RED = "\033[0;31m"
GREEN = "\033[0;32m"
YELLOW = "\033[1;33m"
BLUE = "\033[0;34m"
MAGENTA = "\033[0;35m"
CYAN = "\033[0;36m"
NC = "\033[0m" # No Color
BOLD = "\033[1m"
# Color helper functions
def red(text):
print(f"{Colors.RED}{text}{Colors.NC}")
def green(text):
print(f"{Colors.GREEN}{text}{Colors.NC}")
def yellow(text):
print(f"{Colors.YELLOW}{text}{Colors.NC}")
def blue(text):
print(f"{Colors.BLUE}{text}{Colors.NC}")
def cyan(text):
print(f"{Colors.CYAN}{text}{Colors.NC}")
@dataclass
class FileMonitor:
"""Tracks the state of a file being monitored"""
path: str
first_seen: float
last_size: int
stable_count: int = 0
class VoiceMemosEventHandler(FileSystemEventHandler):
"""Handles file system events for Voice Memos"""
def __init__(self, watcher):
self.watcher = watcher
def on_created(self, event: FileSystemEvent):
if not event.is_directory and event.src_path.endswith(".m4a"):
self.watcher.add_file_to_monitor(event.src_path)
def on_modified(self, event: FileSystemEvent):
if not event.is_directory and event.src_path.endswith(".m4a"):
self.watcher.add_file_to_monitor(event.src_path)
def on_moved(self, event: FileSystemEvent):
if not event.is_directory and event.dest_path.endswith(".m4a"):
self.watcher.add_file_to_monitor(event.dest_path)
class VoiceMemosWatcher:
def __init__(
self,
endpoint: str,
api_key: str = "",
polling_interval: float = 5.0,
stability_checks: int = 3,
):
self.watch_dir = (
Path.home()
/ "Library/Group Containers/group.com.apple.VoiceMemos.shared/Recordings"
)
self.endpoint = endpoint
self.api_key = api_key
self.polling_interval = polling_interval
self.stability_checks = stability_checks
self.processed_files: Set[str] = set()
self.monitoring: Dict[str, FileMonitor] = {}
# Track startup time to ignore existing files
self.startup_time = time.time()
# File system observer
self.observer = Observer()
self.event_handler = VoiceMemosEventHandler(self)
def add_file_to_monitor(self, filepath: str):
"""Add a file to the monitoring queue"""
path = Path(filepath)
# Skip if already processed
if filepath in self.processed_files:
return
# Skip if doesn't exist
if not path.exists():
return
# Skip files that existed before the watcher started
try:
if path.stat().st_mtime < self.startup_time:
return
except OSError:
return
# Add to monitoring if not already there
if filepath not in self.monitoring:
self.monitoring[filepath] = FileMonitor(
path=filepath,
first_seen=time.time(),
last_size=self.get_file_size(path),
)
filename = path.name
blue(
f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Detected: {filename}"
)
def get_file_size(self, filepath: Path) -> int:
"""Get file size in bytes"""
try:
return filepath.stat().st_size
except OSError:
return 0
def format_size(self, size_bytes: int) -> str:
"""Format size in human-readable format"""
size_mb = size_bytes / (1024 * 1024)
return f"{size_mb:.2f}MB"
def upload_file(self, filepath: str) -> bool:
"""Upload a file to the endpoint"""
path = Path(filepath)
filename = path.name
yellow(f" 🛜 Uploading: {filename}")
if not path.exists():
red(" ✗ File no longer exists")
return False
filesize = self.get_file_size(path)
print(f" 📊 File size: {self.format_size(filesize)}")
try:
print(f" → Uploading to endpoint: {self.endpoint}")
# Read file directly (requires Full Disk Access)
with open(path, "rb") as f:
files = {
"file": (filename, f, "audio/x-m4a"),
"filename": (None, filename),
"timestamp": (None, datetime.utcnow().isoformat() + "Z"),
"filesize": (None, str(filesize)),
}
headers = {}
if self.api_key:
headers["Authorization"] = f"Bearer {self.api_key}"
# Make the request
response = requests.post(
self.endpoint,
files=files,
headers=headers,
)
if response.status_code >= 200 and response.status_code < 300:
green(f" ✓ Upload successful (HTTP {response.status_code})")
if response.text:
print(f" Response: {response.text}")
return True
else:
red(f" ✗ Upload failed (HTTP {response.status_code})")
if response.text:
red(f" Error: {response.text}")
return False
except requests.exceptions.ConnectionError:
red(" ✗ Connection failed - is the server running?")
return False
except requests.exceptions.Timeout:
red(" ✗ Upload timeout")
return False
except PermissionError:
red(" ✗ Permission denied - grant Full Disk Access to this program")
red(" System Settings > Privacy & Security > Full Disk Access")
return False
except Exception as e:
red(f" ✗ Upload error: {e}")
return False
def process_monitoring_queue(self):
"""Process all files being monitored - called from main loop"""
current_time = time.time()
files_to_remove = []
for filepath, monitor in list(self.monitoring.items()):
path = Path(filepath)
# Check if file still exists
if not path.exists():
red(f" ✗ File disappeared: {path.name}")
files_to_remove.append(filepath)
continue
current_size = self.get_file_size(path)
wait_time = int(current_time - monitor.first_seen)
# Check if size is stable
if current_size == monitor.last_size:
monitor.stable_count += 1
if monitor.stable_count == 1:
print(f"\n ⏳ {path.name}: File appears stable, verifying...")
elif monitor.stable_count >= self.stability_checks:
# File is stable, upload it
green(f" ✓ {path.name}: Recording completed (waited {wait_time}s)")
if self.upload_file(filepath):
self.processed_files.add(filepath)
files_to_remove.append(filepath)
else:
# Size changed, reset stability counter
if monitor.stable_count > 0:
print(
f" ⏳ {path.name}: File size changed, continuing to monitor..."
)
else:
print(
f"\r ⏳ {path.name}: Recording in progress... {wait_time}s ({self.format_size(current_size)})",
end="",
flush=True,
)
monitor.stable_count = 0
monitor.last_size = current_size
# Timeout after 5 minutes
if wait_time > 300:
yellow(f"\n ⚠ {path.name}: Timeout waiting for recording")
files_to_remove.append(filepath)
# Clean up completed or timed out files
for filepath in files_to_remove:
del self.monitoring[filepath]
def watch(self):
"""Start watching for new files"""
green("Starting Voice Memos watcher...")
yellow(f"Watching: {self.watch_dir}")
yellow(f"Endpoint: {self.endpoint}")
cyan("Note: Only monitoring files created after startup")
print()
if not self.watch_dir.exists():
red(f"Error: Watch directory does not exist: {self.watch_dir}")
print(
"Make sure you have granted Full Disk Access to your terminal application."
)
sys.exit(1)
# Test read access
try:
list(self.watch_dir.iterdir())
except PermissionError:
red("Error: Cannot read watch directory")
red("Grant Full Disk Access to your terminal application:")
red(" System Settings > Privacy & Security > Full Disk Access")
sys.exit(1)
green("Now watching for new files...")
blue(
f"ℹ️ Files will be uploaded after {self.stability_checks} seconds of no size changes"
)
print()
# Start the observer
self.observer.schedule(self.event_handler, str(self.watch_dir), recursive=False)
self.observer.start()
try:
# Main loop - process monitoring queue
while True:
self.process_monitoring_queue()
time.sleep(self.polling_interval)
except KeyboardInterrupt:
yellow("\nShutting down watcher...")
self.observer.stop()
self.observer.join()
green("Watcher stopped.")
def main():
parser = argparse.ArgumentParser(
description="Watch Voice Memos for new recordings and upload them"
)
parser.add_argument(
"-e",
"--endpoint",
default="http://127.0.0.1:8080/upload",
help="Upload endpoint URL (default: http://127.0.0.1:8080/upload)",
)
parser.add_argument(
"-k", "--api-key", default="", help="API key for authentication"
)
parser.add_argument(
"-i",
"--interval",
type=float,
default=1.0,
help="Polling interval in seconds (default: 1.0)",
)
parser.add_argument(
"-s",
"--stability-checks",
type=int,
default=3,
help="Number of polling intervals to wait before considering a file stable (default: 3)",
)
args = parser.parse_args()
watcher = VoiceMemosWatcher(
endpoint=args.endpoint,
api_key=args.api_key,
polling_interval=args.interval,
stability_checks=args.stability_checks,
)
watcher.watch()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment