Last active
November 29, 2025 13:03
-
-
Save udaylunawat/e5c962e9fa9071d15711bb2ae042f2eb to your computer and use it in GitHub Desktop.
Gets youtube transcripts from takeout data and enriches it with metadata using yt-dlp
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import argparse | |
| import json | |
| import csv | |
| import os | |
| import sys | |
| import time | |
| import requests | |
| from bs4 import BeautifulSoup | |
| import concurrent.futures | |
| from typing import Optional, List, Dict, Any | |
| # increase CSV field size limit if needed | |
| csv_max = sys.maxsize | |
| while True: | |
| try: | |
| import csv as _csv | |
| _csv.field_size_limit(csv_max) | |
| break | |
| except OverflowError: | |
| csv_max = int(csv_max / 10) | |
| BASE = "https://youtubetotranscript.com" | |
| def clean_transcript_text(raw: str) -> Optional[str]: | |
| lines = raw.splitlines() | |
| cleaned: List[str] = [] | |
| started = False | |
| for ln in lines: | |
| s = ln.strip() | |
| if not started: | |
| if not s or s.upper() == "AI": | |
| continue | |
| started = True | |
| cleaned.append(ln) | |
| if not cleaned: | |
| return None | |
| text = "\n".join(cleaned).strip() | |
| return text or None | |
| def fetch_transcript_direct(video_id: str, session: Optional[requests.Session] = None) -> Optional[str]: | |
| url = f"{BASE}/transcript?v={video_id}" | |
| s = session or requests.Session() | |
| try: | |
| resp = s.get(url, timeout=30) | |
| except Exception: | |
| return None | |
| if resp.status_code != 200: | |
| return None | |
| html = resp.text | |
| text = BeautifulSoup(html, "html.parser").get_text(separator="\n") | |
| if "Failed to get transcript for this video" in text: | |
| return None | |
| if "Pin video" in text: | |
| text = text.split("Pin video", 1)[1] | |
| if "Back To Top" in text: | |
| text = text.split("Back To Top", 1)[0] | |
| lines: List[str] = [] | |
| skip = set(["Transcript", "Copy", "Timestamp OFF", "Timestamp On", "", " ", "\xa0"]) | |
| for line in text.splitlines(): | |
| l = line.strip() | |
| if not l or l in skip: | |
| continue | |
| if l.startswith("YouTubeToTranscript"): | |
| continue | |
| lines.append(line) | |
| raw = "\n".join(lines).strip() | |
| if not raw: | |
| return None | |
| return clean_transcript_text(raw) | |
| # import yt-dlp for metadata | |
| from yt_dlp import YoutubeDL | |
| def fetch_metadata(video_id: str) -> Optional[Dict[str, Any]]: | |
| url = f"https://www.youtube.com/watch?v={video_id}" | |
| opts = { | |
| 'quiet': True, | |
| 'no_warnings': True, | |
| 'skip_download': True, | |
| 'force_generic_extractor': False, # use default youtube extractor | |
| } | |
| try: | |
| with YoutubeDL(opts) as ydl: | |
| info = ydl.extract_info(url, download=False) | |
| # note: uploader_url is usually provided in info dict as 'uploader_url' or 'uploader_id' + base YT URL | |
| return { | |
| "title": info.get("title"), | |
| "uploader": info.get("uploader"), | |
| "uploader_url": info.get("uploader_url"), | |
| "upload_date": info.get("upload_date"), | |
| "view_count": info.get("view_count"), | |
| "duration": info.get("duration"), | |
| "tags": info.get("tags"), | |
| "description": info.get("description"), | |
| } | |
| except Exception as e: | |
| return {"error": str(e)} | |
| def load_history_json(path: str) -> Dict[str, Dict[str, Any]]: | |
| with open(path, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| history: Dict[str, Dict[str, Any]] = {} | |
| for item in data: | |
| url = item.get("titleUrl") | |
| if not url: | |
| continue | |
| if "v=" in url: | |
| vid = url.split("v=",1)[1].split("&")[0] | |
| else: | |
| continue | |
| history[vid] = { | |
| "history_title": item.get("title"), | |
| "titleUrl": url, | |
| "subtitles": item.get("subtitles"), | |
| "time": item.get("time"), | |
| } | |
| return history | |
| def load_watch_later_csv(path: str) -> Dict[str, Dict[str, Any]]: | |
| watch_later: Dict[str, Dict[str, Any]] = {} | |
| with open(path, newline='', encoding="utf-8") as f: | |
| reader = csv.DictReader(f) | |
| for row in reader: | |
| vid = row.get("Video ID") or row.get("video_id") | |
| added = row.get("Playlist Video Creation Timestamp") or row.get("added_at") | |
| if not vid: | |
| continue | |
| watch_later[vid] = { | |
| "watch_later_added_at": added | |
| } | |
| return watch_later | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Fetch YouTube transcripts + metadata (channel link, duration etc) with resume + optional test-run") | |
| parser.add_argument("--history-json", type=str, help="Path to history JSON file") | |
| parser.add_argument("--watch-later-csv", type=str, help="Path to watch-later CSV file") | |
| parser.add_argument("--out-jsonl", type=str, default="full_output.jsonl", help="Output JSONL file (append)") | |
| parser.add_argument("--out-csv", type=str, default="full_output.csv", help="Output CSV file (append)") | |
| parser.add_argument("--failed-jsonl", type=str, default="failed.jsonl", help="Log failed video IDs") | |
| parser.add_argument("--max-workers", type=int, default=4, help="Parallel fetch workers") | |
| parser.add_argument("--pause-after", type=int, default=10, help="Pause after processing N videos") | |
| parser.add_argument("--test-count", type=int, default=0, help="If >0: only process first N videos (test run)") | |
| args = parser.parse_args() | |
| history = {} | |
| if args.history_json and os.path.exists(args.history_json): | |
| history = load_history_json(args.history_json) | |
| watch = {} | |
| if args.watch_later_csv and os.path.exists(args.watch_later_csv): | |
| watch = load_watch_later_csv(args.watch_later_csv) | |
| combined: Dict[str, Dict[str, Any]] = {} | |
| for vid, m in history.items(): | |
| combined[vid] = { **m, "source": "history" } | |
| for vid, m in watch.items(): | |
| if vid in combined: | |
| continue | |
| combined[vid] = { **m, "source": "watch_later" } | |
| all_vids = list(combined.keys()) | |
| print(f"Total video IDs: {len(all_vids)} (history: {len(history)}, watch_later: {len(watch)})") | |
| # load already processed | |
| processed = set() | |
| if os.path.exists(args.out_jsonl): | |
| with open(args.out_jsonl, "r", encoding="utf-8") as jf: | |
| for line in jf: | |
| try: | |
| rec = json.loads(line) | |
| vid0 = rec.get("video_id") | |
| if vid0: | |
| processed.add(vid0) | |
| except: | |
| pass | |
| to_process = [vid for vid in all_vids if vid not in processed] | |
| if args.test_count and args.test_count > 0: | |
| to_process = to_process[: args.test_count] | |
| print(f"Test run mode: will process only first {len(to_process)} videos.") | |
| print(f"Already processed: {len(processed)}, to process now: {len(to_process)}") | |
| if not to_process: | |
| print("Nothing to process — exiting.") | |
| return | |
| jsonl_f = open(args.out_jsonl, "a", encoding="utf-8") | |
| csv_exists = os.path.exists(args.out_csv) | |
| csv_f = open(args.out_csv, "a", newline="", encoding="utf-8") | |
| writer = csv.writer(csv_f) | |
| if not csv_exists: | |
| writer.writerow([ | |
| "video_id", "source", | |
| "history_title", "titleUrl", | |
| "watch_later_added_at", | |
| "metadata_title", "uploader", "uploader_url", "upload_date", | |
| "view_count", "duration", "tags", "description", | |
| "transcript" | |
| ]) | |
| failed_f = open(args.failed_jsonl, "a", encoding="utf-8") | |
| count = 0 | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as exe: | |
| future_to_vid = { | |
| exe.submit(lambda v=vid: (vid, fetch_transcript_direct(v), fetch_metadata(v))) : vid | |
| for vid in to_process | |
| } | |
| for fut in concurrent.futures.as_completed(future_to_vid): | |
| vid = future_to_vid[fut] | |
| try: | |
| vid0, transcript, metadata = fut.result() | |
| except Exception as e: | |
| transcript, metadata = None, None | |
| base = combined.get(vid, {}) | |
| record: Dict[str, Any] = { | |
| "video_id": vid, | |
| "source": base.get("source") | |
| } | |
| if base.get("history_title"): | |
| record["history_title"] = base.get("history_title") | |
| record["titleUrl"] = base.get("titleUrl") | |
| if base.get("watch_later_added_at"): | |
| record["watch_later_added_at"] = base.get("watch_later_added_at") | |
| if metadata: | |
| if metadata.get("error"): | |
| # metadata fetch failed | |
| record["metadata_error"] = metadata.get("error") | |
| else: | |
| record["title"] = metadata.get("title") | |
| record["uploader"] = metadata.get("uploader") | |
| record["uploader_url"] = metadata.get("uploader_url") | |
| record["upload_date"] = metadata.get("upload_date") | |
| record["view_count"] = metadata.get("view_count") | |
| record["duration"] = metadata.get("duration") | |
| record["tags"] = metadata.get("tags") | |
| record["description"] = metadata.get("description") | |
| if transcript: | |
| record["transcript"] = transcript | |
| # write JSONL (streaming) | |
| jsonl_f.write(json.dumps(record, ensure_ascii=False) + "\n") | |
| jsonl_f.flush() | |
| # write CSV row | |
| writer.writerow([ | |
| record.get("video_id",""), | |
| record.get("source",""), | |
| record.get("history_title",""), | |
| record.get("titleUrl",""), | |
| record.get("watch_later_added_at",""), | |
| record.get("title",""), | |
| record.get("uploader",""), | |
| record.get("uploader_url",""), | |
| record.get("upload_date",""), | |
| record.get("view_count",""), | |
| record.get("duration",""), | |
| ",".join(record.get("tags") or []), | |
| record.get("description",""), | |
| record.get("transcript","") | |
| ]) | |
| csv_f.flush() | |
| # if transcript or metadata missing, log to failed | |
| if (not transcript) or (not metadata) or metadata.get("error"): | |
| failed_f.write(json.dumps({ | |
| "video_id": vid, | |
| "transcript_ok": bool(transcript), | |
| "metadata_ok": bool(metadata and not metadata.get("error")), | |
| "metadata_error": metadata.get("error") if metadata else None | |
| }, ensure_ascii=False) + "\n") | |
| failed_f.flush() | |
| count += 1 | |
| print(f"[{vid}] → transcript: {'OK' if transcript else 'NO'}, metadata: {'OK' if metadata and not metadata.get('error') else 'NO'}") | |
| if count % args.pause_after == 0: | |
| print(f"Processed {count} — sleeping briefly …") | |
| time.sleep(5) | |
| jsonl_f.close() | |
| csv_f.close() | |
| failed_f.close() | |
| print("Done. Output written to:", args.out_jsonl, args.out_csv, "Failed log:", args.failed_jsonl) | |
| if __name__ == "__main__": | |
| main() | |
| """ | |
| python3 3_yt_full_fetch.py \ | |
| --history-json "Takeout/YouTube and YouTube Music/history/watch-history.json" \ | |
| --watch-later-csv "Takeout/YouTube and YouTube Music/playlists/Watch later-videos.csv" \ | |
| --out-jsonl enriched_watch_later.jsonl \ | |
| --out-csv enriched_watch_later.csv \ | |
| --failed-jsonl enriched_watch_later_failed.jsonl \ | |
| --max-workers 7 \ | |
| --pause-after 50 \ | |
| --test-count 10 | |
| """ | |
| """ | |
| python3 3_yt_full_fetch.py \ | |
| --watch-later-csv "Takeout/YouTube and YouTube Music/playlists/Watch later-videos.csv" \ | |
| --out-jsonl enriched_watch_later.jsonl \ | |
| --out-csv enriched_watch_later.csv \ | |
| --failed-jsonl enriched_watch_later_failed.jsonl \ | |
| --max-workers 7 \ | |
| --pause-after 50 \ | |
| --test-count 10 | |
| """ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| requests>=2.0 | |
| beautifulsoup4>=4.0 | |
| playwright>=1.50.0 | |
| yt-dlp |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment