Created
November 28, 2025 19:40
-
-
Save udaylunawat/6c3cf79abbcffd0029905cc253c28869 to your computer and use it in GitHub Desktop.
Download youtube transcripts in bulk for Google takeout youtube data (supports both csv and json)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import argparse | |
| import json | |
| import csv | |
| import sys | |
| csv_max = sys.maxsize | |
| while True: | |
| try: | |
| csv.field_size_limit(csv_max) | |
| break | |
| except OverflowError: | |
| csv_max = int(csv_max / 10) | |
| import os | |
| import time | |
| import requests | |
| from bs4 import BeautifulSoup | |
| import concurrent.futures | |
| from typing import Optional, List, Dict, Any | |
| BASE = "https://youtubetotranscript.com" | |
| def clean_transcript_text(raw: str) -> Optional[str]: | |
| """Clean raw transcript: remove leading junk lines like 'AI', blank lines, then strip.""" | |
| lines = raw.splitlines() | |
| cleaned: List[str] = [] | |
| started = False | |
| for ln in lines: | |
| s = ln.strip() | |
| if not started: | |
| # skip blank or header-junk lines | |
| if not s or s.upper() == "AI": | |
| continue | |
| started = True | |
| cleaned.append(ln) | |
| if not cleaned: | |
| return None | |
| text = "\n".join(cleaned).strip() | |
| return text or None | |
| def fetch_transcript_direct(video_id: str, session: Optional[requests.Session] = None) -> Optional[str]: | |
| url = f"{BASE}/transcript?v={video_id}" | |
| s = session or requests.Session() | |
| try: | |
| resp = s.get(url, timeout=30) | |
| except Exception: | |
| return None | |
| if resp.status_code != 200: | |
| return None | |
| html = resp.text | |
| text = BeautifulSoup(html, "html.parser").get_text(separator="\n") | |
| if "Failed to get transcript for this video" in text: | |
| return None | |
| full = text | |
| marker = "Pin video" | |
| if marker in full: | |
| txt = full.split(marker, 1)[1] | |
| else: | |
| txt = full | |
| end_marker = "Back To Top" | |
| if end_marker in txt: | |
| txt = txt.split(end_marker, 1)[0] | |
| lines: List[str] = [] | |
| skip = set(["Transcript", "Copy", "Timestamp OFF", "Timestamp On", "", " ", "\xa0"]) | |
| for line in txt.splitlines(): | |
| l = line.strip() | |
| if not l or l in skip: | |
| continue | |
| if l.startswith("YouTubeToTranscript"): | |
| continue | |
| lines.append(line) | |
| raw = "\n".join(lines).strip() | |
| if not raw: | |
| return None | |
| return clean_transcript_text(raw) | |
| def load_history_json(path: str) -> Dict[str, Dict[str, Any]]: | |
| with open(path, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| history_map: Dict[str, Dict[str, Any]] = {} | |
| for item in data: | |
| url = item.get("titleUrl") | |
| if not url: | |
| continue | |
| if "v=" in url: | |
| vid = url.split("v=", 1)[1].split("&")[0] | |
| else: | |
| continue | |
| history_map[vid] = { | |
| "header": item.get("header"), | |
| "title": item.get("title"), | |
| "titleUrl": url, | |
| "subtitles": item.get("subtitles"), | |
| "time": item.get("time"), | |
| "source": "history", | |
| } | |
| return history_map | |
| def load_watch_later_csv(path: str) -> Dict[str, Dict[str, Any]]: | |
| watch_later_map: Dict[str, Dict[str, Any]] = {} | |
| with open(path, newline='', encoding="utf-8") as f: | |
| reader = csv.DictReader(f) | |
| for row in reader: | |
| vid = row.get("Video ID") or row.get("video_id") | |
| added = row.get("Playlist Video Creation Timestamp") or row.get("added_at") | |
| if vid: | |
| watch_later_map[vid] = { | |
| "watch_later_added_at": added, | |
| "source": "watch_later", | |
| } | |
| return watch_later_map | |
| def process_video(meta: Dict[str, Any]) -> Dict[str, Any]: | |
| vid = meta.get("video_id") | |
| transcript = fetch_transcript_direct(vid) | |
| result = { **meta, "transcript": transcript } | |
| return result | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Bulk fetch YouTube transcripts (history + watch-later), with metadata + resume-safe processing") | |
| parser.add_argument("--history-json", type=str, help="Path to watch-history JSON file") | |
| parser.add_argument("--watch-later-csv", type=str, help="Path to watch-later CSV file") | |
| parser.add_argument("--out-jsonl", type=str, default="history_plus_later.jsonl", | |
| help="Output JSONL file (appended incrementally)") | |
| parser.add_argument("--out-csv", type=str, default="history_plus_later.csv", | |
| help="Output CSV file (appended incrementally)") | |
| parser.add_argument("--max-workers", type=int, default=4, | |
| help="Number of threads for parallel fetching") | |
| parser.add_argument("--pause-after", type=int, default=10, | |
| help="Pause after this many transcripts to avoid overload") | |
| args = parser.parse_args() | |
| history_map: Dict[str, Dict[str, Any]] = {} | |
| watch_later_map: Dict[str, Dict[str, Any]] = {} | |
| if args.history_json and os.path.exists(args.history-json): | |
| history_map = load_history_json(args.history_json) | |
| if args.watch_later_csv and os.path.exists(args.watch_later_csv): | |
| watch_later_map = load_watch_later_csv(args.watch_later_csv) | |
| combined: Dict[str, Dict[str, Any]] = {} | |
| for vid, meta in history_map.items(): | |
| combined[vid] = meta.copy() | |
| for vid, meta in watch_later_map.items(): | |
| if vid in combined: | |
| continue | |
| combined[vid] = meta.copy() | |
| all_vids = list(combined.keys()) | |
| print(f"Found {len(all_vids)} unique video IDs: history={len(history_map)}, watch_later={len(watch_later_map)}") | |
| processed_ids = set() | |
| # load processed from JSONL | |
| if os.path.exists(args.out_jsonl): | |
| with open(args.out_jsonl, "r", encoding="utf-8") as jf: | |
| for line in jf: | |
| try: | |
| rec = json.loads(line) | |
| vid = rec.get("video_id") | |
| if vid: | |
| processed_ids.add(vid) | |
| except Exception: | |
| continue | |
| # also optionally load from existing CSV | |
| if os.path.exists(args.out_csv): | |
| with open(args.out_csv, newline='', encoding="utf-8") as cf: | |
| reader = csv.DictReader(cf) | |
| for row in reader: | |
| vid = row.get("video_id") | |
| if vid: | |
| processed_ids.add(vid) | |
| to_process = [vid for vid in all_vids if vid not in processed_ids] | |
| print(f"Already processed: {len(processed_ids)}, to process now: {len(to_process)}") | |
| if not to_process: | |
| print("Nothing to do. Exiting.") | |
| return | |
| jsonl_f = open(args.out_jsonl, "a", encoding="utf-8") | |
| csv_exists = os.path.exists(args.out_csv) | |
| csv_f = open(args.out_csv, "a", newline='', encoding="utf-8") | |
| writer = csv.writer(csv_f) | |
| if not csv_exists: | |
| writer.writerow([ | |
| "video_id", "source", "header", "title", "titleUrl", | |
| "subtitles", "time", "watch_later_added_at", "transcript" | |
| ]) | |
| count = 0 | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as exe: | |
| futures = { exe.submit(process_video, { **combined[vid], "video_id": vid }): vid for vid in to_process } | |
| for fut in concurrent.futures.as_completed(futures): | |
| rec = fut.result() | |
| vid = rec.get("video_id") | |
| ok = "OK" if rec.get("transcript") else "NO" | |
| print(f"[{vid}] → {ok}") | |
| jsonl_f.write(json.dumps(rec, ensure_ascii=False) + "\n") | |
| jsonl_f.flush() | |
| writer.writerow([ | |
| vid, | |
| rec.get("source"), | |
| rec.get("header"), | |
| rec.get("title"), | |
| rec.get("titleUrl"), | |
| json.dumps(rec.get("subtitles")) if rec.get("subtitles") else "", | |
| rec.get("time"), | |
| rec.get("watch_later_added_at"), | |
| rec.get("transcript") or "" | |
| ]) | |
| csv_f.flush() | |
| count += 1 | |
| if count % args.pause_after == 0: | |
| print(f"Processed {count} — sleeping briefly...") | |
| time.sleep(5) | |
| jsonl_f.close() | |
| csv_f.close() | |
| print("Done. Output written to:", args.out_jsonl, args.out_csv) | |
| if __name__ == "__main__": | |
| main() | |
| """ | |
| python yt_transcript_bulk.py \ | |
| --history-json "Takeout/YouTube and YouTube Music/history/watch-history.json" \ | |
| --watch-later-csv "Takeout/YouTube and YouTube Music/playlists/Watch later-videos.csv" \ | |
| --out-jsonl history_plus_later.jsonl \ | |
| --out-csv history_plus_later.csv \ | |
| --max-workers 4 \ | |
| --pause-after 10 | |
| """ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment