Created
October 29, 2025 18:12
-
-
Save Codycody31/842571928cb057eb45b134492c0b1c86 to your computer and use it in GitHub Desktop.
Take the outputed mp3, webp, and json file from yt-dl and merge into mp3 with best support for Navidrome
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import argparse | |
| import json | |
| import sys | |
| from pathlib import Path | |
| from typing import Dict, Any, Optional, List | |
| from mutagen.id3 import ( | |
| ID3, ID3NoHeaderError, APIC, TIT2, TPE1, TPE2, TALB, TCON, TDRC, COMM, | |
| WXXX, TXXX | |
| ) | |
| # Pillow is optional; we detect it at runtime. | |
| try: | |
| from PIL import Image | |
| PIL_OK = True | |
| except Exception: | |
| PIL_OK = False | |
| def load_json(path: Path) -> Dict[str, Any]: | |
| try: | |
| with path.open("r", encoding="utf-8") as f: | |
| return json.load(f) | |
| except Exception as e: | |
| print(f"[warn] Failed to parse JSON {path}: {e}") | |
| return {} | |
| def parse_date(d: Optional[str]) -> Optional[str]: | |
| #Return an ID3-friendly date string from various inputs. | |
| #Accepts 'YYYY', 'YYYY-MM-DD', 'YYYYMMDD', or int seconds; returns 'YYYY-MM-DD' or 'YYYY'. | |
| if not d: | |
| return None | |
| try: | |
| # yt-dlp style 'upload_date': 'YYYYMMDD' | |
| if isinstance(d, str) and len(d) == 8 and d.isdigit(): | |
| return f"{d[0:4]}-{d[4:6]}-{d[6:8]}" | |
| # already ISO-ish | |
| if isinstance(d, str) and (len(d) == 4 or "-" in d): | |
| return d | |
| # integer seconds since epoch (rare in these JSONs) | |
| if isinstance(d, (int, float)): | |
| import datetime as _dt | |
| return _dt.datetime.utcfromtimestamp(d).strftime("%Y-%m-%d") | |
| except Exception: | |
| pass | |
| return None | |
| def coalesce(*vals) -> Optional[str]: | |
| for v in vals: | |
| if isinstance(v, str) and v.strip(): | |
| return v.strip() | |
| return None | |
| def list_to_str(items: Optional[List[str]], limit: int = 6) -> Optional[str]: | |
| if not items: | |
| return None | |
| # Keep first few to avoid overly long tags | |
| slim = [str(x) for x in items if str(x).strip()] | |
| if not slim: | |
| return None | |
| if len(slim) > limit: | |
| slim = slim[:limit] | |
| return ", ".join(slim) | |
| def add_or_update_text(frame_cls, key, value, tags: ID3): | |
| if value is None: | |
| return | |
| tags.add(frame_cls(encoding=3, text=value)) | |
| def ensure_id3(mp3_path: Path) -> ID3: | |
| try: | |
| return ID3(mp3_path) | |
| except ID3NoHeaderError: | |
| tags = ID3() | |
| tags.save(mp3_path) # create header | |
| return tags | |
| def embed_apic(tags: ID3, mime: str, data: bytes, desc: str, pic_type: int = 3): | |
| #Add an APIC frame (type 3 = front cover) | |
| try: | |
| tags.add(APIC(encoding=3, mime=mime, type=pic_type, desc=desc, data=data)) | |
| except Exception as e: | |
| # In rare cases, upgrade tag to v2.3/v2.4 and retry | |
| tags.update_to_v23() | |
| tags.add(APIC(encoding=3, mime=mime, type=pic_type, desc=desc, data=data)) | |
| def process_file(mp3_path: Path, args) -> None: | |
| stem = mp3_path.stem | |
| json_path = mp3_path.with_suffix(".json") | |
| webp_path = mp3_path.with_suffix(".webp") | |
| meta = load_json(json_path) if json_path.exists() else {} | |
| # Extract fields (try several aliases) | |
| title = coalesce(meta.get("title"), stem) | |
| # artist: prefer explicit 'artist', else 'channel'/'uploader' | |
| artist = coalesce(meta.get("artist"), meta.get("channel"), meta.get("uploader")) | |
| album_artist = None | |
| if args.prefer_channel_as_artist: | |
| album_artist = coalesce(meta.get("channel"), meta.get("uploader"), artist) | |
| else: | |
| album_artist = coalesce(artist, meta.get("channel"), meta.get("uploader")) | |
| album = coalesce(meta.get("album"), meta.get("playlist"), args.default_album) | |
| # genres / categories / tags | |
| genre = list_to_str(meta.get("categories")) | |
| tags_list = meta.get("tags") if isinstance(meta.get("tags"), list) else None | |
| tags_str = list_to_str(tags_list, limit=12) | |
| # date | |
| date = parse_date(meta.get("upload_date") or meta.get("release_date") or meta.get("release_timestamp")) | |
| description = meta.get("description") | |
| # URLs / provenance | |
| source_url = coalesce(meta.get("webpage_url"), meta.get("original_url"), meta.get("url")) | |
| channel_id = meta.get("channel_id") | |
| channel_url = meta.get("channel_url") | |
| source_id = meta.get("id") | |
| tags = ensure_id3(mp3_path) | |
| # Core text frames | |
| if title: tags["TIT2"] = TIT2(encoding=3, text=title) | |
| if artist: tags["TPE1"] = TPE1(encoding=3, text=artist) | |
| if album: tags["TALB"] = TALB(encoding=3, text=album) | |
| if album_artist: tags["TPE2"] = TPE2(encoding=3, text=album_artist) | |
| if genre: tags["TCON"] = TCON(encoding=3, text=genre) | |
| if date: tags["TDRC"] = TDRC(encoding=3, text=date) | |
| if description: | |
| # 'eng' language code; You can change if needed | |
| tags["COMM"] = COMM(encoding=3, lang="eng", desc="description", text=description) | |
| # Helpful provenance | |
| if source_url: | |
| tags.add(WXXX(encoding=3, desc="Source", url=source_url)) | |
| if channel_url: | |
| tags.add(WXXX(encoding=3, desc="Channel", url=channel_url)) | |
| if source_id: | |
| tags.add(TXXX(encoding=3, desc="SOURCE_ID", text=source_id)) | |
| if channel_id: | |
| tags.add(TXXX(encoding=3, desc="CHANNEL_ID", text=channel_id)) | |
| if tags_str: | |
| tags.add(TXXX(encoding=3, desc="TAGS", text=tags_str)) | |
| # Encoder info (handy for troubleshooting) | |
| tags.add(TXXX(encoding=3, desc="TAGGED_BY", text="embed_webp_to_mp3.py")) | |
| # Cover art | |
| if webp_path.exists(): | |
| try: | |
| webp_bytes = webp_path.read_bytes() | |
| if not args.no_webp: | |
| embed_apic(tags, "image/webp", webp_bytes, "Cover (WEBP)", pic_type=3) | |
| if not args.no_jpeg_fallback: | |
| if PIL_OK: | |
| try: | |
| from io import BytesIO | |
| with Image.open(webp_path) as im: | |
| if im.mode in ("RGBA", "LA"): | |
| bg = Image.new("RGB", im.size, (0, 0, 0)) | |
| bg.paste(im, mask=im.split()[-1]) | |
| im = bg | |
| else: | |
| im = im.convert("RGB") | |
| buf = BytesIO() | |
| # quality ~85 is a good tradeoff | |
| im.save(buf, format="JPEG", quality=85, optimize=True) | |
| jpeg_bytes = buf.getvalue() | |
| # Put JPEG first so clients that pick the first APIC see it | |
| embed_apic(tags, "image/jpeg", jpeg_bytes, "Cover (JPEG fallback)", pic_type=3) | |
| except Exception as e: | |
| print(f"[warn] JPEG fallback failed for {webp_path.name}: {e}") | |
| else: | |
| print("[info] Pillow not available; skipping JPEG fallback.") | |
| except Exception as e: | |
| print(f"[warn] Failed to embed cover art for {mp3_path.name}: {e}") | |
| else: | |
| if args.require_cover: | |
| print(f"[skip] No matching WEBP for {mp3_path.name} and --require-cover set.") | |
| return | |
| # Save tags (mutagen auto-selects v2.3 default; safe for Navidrome/clients) | |
| tags.save(mp3_path, v2_version=3) | |
| print(f"[ok] Tagged: {mp3_path.name}") | |
| def main(): | |
| ap = argparse.ArgumentParser(description="Embed matching WEBP cover and JSON metadata into MP3 files.") | |
| ap.add_argument("folder", help="Folder to scan", nargs="?", default=".") | |
| ap.add_argument("--default-album", default="Singles", help="Album name to use if JSON has none (default: %(default)s)") | |
| ap.add_argument("--prefer-channel-as-artist", action="store_true", | |
| help="If set, prefer channel/uploader as Artist/Album Artist when available.") | |
| ap.add_argument("--no-jpeg-fallback", action="store_true", | |
| help="Do not embed JPEG fallback even if Pillow is available.") | |
| ap.add_argument("--no-webp", action="store_true", | |
| help="Do not embed WEBP (embed only JPEG fallback).") | |
| ap.add_argument("--require-cover", action="store_true", | |
| help="Skip files without a matching .webp") | |
| args = ap.parse_args() | |
| base = Path(args.folder) | |
| if not base.exists(): | |
| print(f"[error] Folder not found: {base}", file=sys.stderr) | |
| sys.exit(1) | |
| mp3s = sorted(base.glob("*.mp3")) | |
| if not mp3s: | |
| # also search recursively if none at top level | |
| mp3s = sorted(base.rglob("*.mp3")) | |
| if not mp3s: | |
| print("[info] No MP3s found.") | |
| return | |
| for mp3 in mp3s: | |
| try: | |
| process_file(mp3, args) | |
| except Exception as e: | |
| print(f"[warn] Failed to process {mp3}: {e}") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment