Skip to content

Instantly share code, notes, and snippets.

@Codycody31
Created October 29, 2025 18:12
Show Gist options
  • Select an option

  • Save Codycody31/842571928cb057eb45b134492c0b1c86 to your computer and use it in GitHub Desktop.

Select an option

Save Codycody31/842571928cb057eb45b134492c0b1c86 to your computer and use it in GitHub Desktop.
Take the outputed mp3, webp, and json file from yt-dl and merge into mp3 with best support for Navidrome
import argparse
import json
import sys
from pathlib import Path
from typing import Dict, Any, Optional, List
from mutagen.id3 import (
ID3, ID3NoHeaderError, APIC, TIT2, TPE1, TPE2, TALB, TCON, TDRC, COMM,
WXXX, TXXX
)
# Pillow is optional; we detect it at runtime.
try:
from PIL import Image
PIL_OK = True
except Exception:
PIL_OK = False
def load_json(path: Path) -> Dict[str, Any]:
try:
with path.open("r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
print(f"[warn] Failed to parse JSON {path}: {e}")
return {}
def parse_date(d: Optional[str]) -> Optional[str]:
#Return an ID3-friendly date string from various inputs.
#Accepts 'YYYY', 'YYYY-MM-DD', 'YYYYMMDD', or int seconds; returns 'YYYY-MM-DD' or 'YYYY'.
if not d:
return None
try:
# yt-dlp style 'upload_date': 'YYYYMMDD'
if isinstance(d, str) and len(d) == 8 and d.isdigit():
return f"{d[0:4]}-{d[4:6]}-{d[6:8]}"
# already ISO-ish
if isinstance(d, str) and (len(d) == 4 or "-" in d):
return d
# integer seconds since epoch (rare in these JSONs)
if isinstance(d, (int, float)):
import datetime as _dt
return _dt.datetime.utcfromtimestamp(d).strftime("%Y-%m-%d")
except Exception:
pass
return None
def coalesce(*vals) -> Optional[str]:
for v in vals:
if isinstance(v, str) and v.strip():
return v.strip()
return None
def list_to_str(items: Optional[List[str]], limit: int = 6) -> Optional[str]:
if not items:
return None
# Keep first few to avoid overly long tags
slim = [str(x) for x in items if str(x).strip()]
if not slim:
return None
if len(slim) > limit:
slim = slim[:limit]
return ", ".join(slim)
def add_or_update_text(frame_cls, key, value, tags: ID3):
if value is None:
return
tags.add(frame_cls(encoding=3, text=value))
def ensure_id3(mp3_path: Path) -> ID3:
try:
return ID3(mp3_path)
except ID3NoHeaderError:
tags = ID3()
tags.save(mp3_path) # create header
return tags
def embed_apic(tags: ID3, mime: str, data: bytes, desc: str, pic_type: int = 3):
#Add an APIC frame (type 3 = front cover)
try:
tags.add(APIC(encoding=3, mime=mime, type=pic_type, desc=desc, data=data))
except Exception as e:
# In rare cases, upgrade tag to v2.3/v2.4 and retry
tags.update_to_v23()
tags.add(APIC(encoding=3, mime=mime, type=pic_type, desc=desc, data=data))
def process_file(mp3_path: Path, args) -> None:
stem = mp3_path.stem
json_path = mp3_path.with_suffix(".json")
webp_path = mp3_path.with_suffix(".webp")
meta = load_json(json_path) if json_path.exists() else {}
# Extract fields (try several aliases)
title = coalesce(meta.get("title"), stem)
# artist: prefer explicit 'artist', else 'channel'/'uploader'
artist = coalesce(meta.get("artist"), meta.get("channel"), meta.get("uploader"))
album_artist = None
if args.prefer_channel_as_artist:
album_artist = coalesce(meta.get("channel"), meta.get("uploader"), artist)
else:
album_artist = coalesce(artist, meta.get("channel"), meta.get("uploader"))
album = coalesce(meta.get("album"), meta.get("playlist"), args.default_album)
# genres / categories / tags
genre = list_to_str(meta.get("categories"))
tags_list = meta.get("tags") if isinstance(meta.get("tags"), list) else None
tags_str = list_to_str(tags_list, limit=12)
# date
date = parse_date(meta.get("upload_date") or meta.get("release_date") or meta.get("release_timestamp"))
description = meta.get("description")
# URLs / provenance
source_url = coalesce(meta.get("webpage_url"), meta.get("original_url"), meta.get("url"))
channel_id = meta.get("channel_id")
channel_url = meta.get("channel_url")
source_id = meta.get("id")
tags = ensure_id3(mp3_path)
# Core text frames
if title: tags["TIT2"] = TIT2(encoding=3, text=title)
if artist: tags["TPE1"] = TPE1(encoding=3, text=artist)
if album: tags["TALB"] = TALB(encoding=3, text=album)
if album_artist: tags["TPE2"] = TPE2(encoding=3, text=album_artist)
if genre: tags["TCON"] = TCON(encoding=3, text=genre)
if date: tags["TDRC"] = TDRC(encoding=3, text=date)
if description:
# 'eng' language code; You can change if needed
tags["COMM"] = COMM(encoding=3, lang="eng", desc="description", text=description)
# Helpful provenance
if source_url:
tags.add(WXXX(encoding=3, desc="Source", url=source_url))
if channel_url:
tags.add(WXXX(encoding=3, desc="Channel", url=channel_url))
if source_id:
tags.add(TXXX(encoding=3, desc="SOURCE_ID", text=source_id))
if channel_id:
tags.add(TXXX(encoding=3, desc="CHANNEL_ID", text=channel_id))
if tags_str:
tags.add(TXXX(encoding=3, desc="TAGS", text=tags_str))
# Encoder info (handy for troubleshooting)
tags.add(TXXX(encoding=3, desc="TAGGED_BY", text="embed_webp_to_mp3.py"))
# Cover art
if webp_path.exists():
try:
webp_bytes = webp_path.read_bytes()
if not args.no_webp:
embed_apic(tags, "image/webp", webp_bytes, "Cover (WEBP)", pic_type=3)
if not args.no_jpeg_fallback:
if PIL_OK:
try:
from io import BytesIO
with Image.open(webp_path) as im:
if im.mode in ("RGBA", "LA"):
bg = Image.new("RGB", im.size, (0, 0, 0))
bg.paste(im, mask=im.split()[-1])
im = bg
else:
im = im.convert("RGB")
buf = BytesIO()
# quality ~85 is a good tradeoff
im.save(buf, format="JPEG", quality=85, optimize=True)
jpeg_bytes = buf.getvalue()
# Put JPEG first so clients that pick the first APIC see it
embed_apic(tags, "image/jpeg", jpeg_bytes, "Cover (JPEG fallback)", pic_type=3)
except Exception as e:
print(f"[warn] JPEG fallback failed for {webp_path.name}: {e}")
else:
print("[info] Pillow not available; skipping JPEG fallback.")
except Exception as e:
print(f"[warn] Failed to embed cover art for {mp3_path.name}: {e}")
else:
if args.require_cover:
print(f"[skip] No matching WEBP for {mp3_path.name} and --require-cover set.")
return
# Save tags (mutagen auto-selects v2.3 default; safe for Navidrome/clients)
tags.save(mp3_path, v2_version=3)
print(f"[ok] Tagged: {mp3_path.name}")
def main():
ap = argparse.ArgumentParser(description="Embed matching WEBP cover and JSON metadata into MP3 files.")
ap.add_argument("folder", help="Folder to scan", nargs="?", default=".")
ap.add_argument("--default-album", default="Singles", help="Album name to use if JSON has none (default: %(default)s)")
ap.add_argument("--prefer-channel-as-artist", action="store_true",
help="If set, prefer channel/uploader as Artist/Album Artist when available.")
ap.add_argument("--no-jpeg-fallback", action="store_true",
help="Do not embed JPEG fallback even if Pillow is available.")
ap.add_argument("--no-webp", action="store_true",
help="Do not embed WEBP (embed only JPEG fallback).")
ap.add_argument("--require-cover", action="store_true",
help="Skip files without a matching .webp")
args = ap.parse_args()
base = Path(args.folder)
if not base.exists():
print(f"[error] Folder not found: {base}", file=sys.stderr)
sys.exit(1)
mp3s = sorted(base.glob("*.mp3"))
if not mp3s:
# also search recursively if none at top level
mp3s = sorted(base.rglob("*.mp3"))
if not mp3s:
print("[info] No MP3s found.")
return
for mp3 in mp3s:
try:
process_file(mp3, args)
except Exception as e:
print(f"[warn] Failed to process {mp3}: {e}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment