Skip to content

Instantly share code, notes, and snippets.

@udaylunawat
Created November 28, 2025 19:40
Show Gist options
  • Select an option

  • Save udaylunawat/6c3cf79abbcffd0029905cc253c28869 to your computer and use it in GitHub Desktop.

Select an option

Save udaylunawat/6c3cf79abbcffd0029905cc253c28869 to your computer and use it in GitHub Desktop.
Download youtube transcripts in bulk for Google takeout youtube data (supports both csv and json)
#!/usr/bin/env python3
import argparse
import json
import csv
import sys
csv_max = sys.maxsize
while True:
try:
csv.field_size_limit(csv_max)
break
except OverflowError:
csv_max = int(csv_max / 10)
import os
import time
import requests
from bs4 import BeautifulSoup
import concurrent.futures
from typing import Optional, List, Dict, Any
BASE = "https://youtubetotranscript.com"
def clean_transcript_text(raw: str) -> Optional[str]:
"""Clean raw transcript: remove leading junk lines like 'AI', blank lines, then strip."""
lines = raw.splitlines()
cleaned: List[str] = []
started = False
for ln in lines:
s = ln.strip()
if not started:
# skip blank or header-junk lines
if not s or s.upper() == "AI":
continue
started = True
cleaned.append(ln)
if not cleaned:
return None
text = "\n".join(cleaned).strip()
return text or None
def fetch_transcript_direct(video_id: str, session: Optional[requests.Session] = None) -> Optional[str]:
url = f"{BASE}/transcript?v={video_id}"
s = session or requests.Session()
try:
resp = s.get(url, timeout=30)
except Exception:
return None
if resp.status_code != 200:
return None
html = resp.text
text = BeautifulSoup(html, "html.parser").get_text(separator="\n")
if "Failed to get transcript for this video" in text:
return None
full = text
marker = "Pin video"
if marker in full:
txt = full.split(marker, 1)[1]
else:
txt = full
end_marker = "Back To Top"
if end_marker in txt:
txt = txt.split(end_marker, 1)[0]
lines: List[str] = []
skip = set(["Transcript", "Copy", "Timestamp OFF", "Timestamp On", "", " ", "\xa0"])
for line in txt.splitlines():
l = line.strip()
if not l or l in skip:
continue
if l.startswith("YouTubeToTranscript"):
continue
lines.append(line)
raw = "\n".join(lines).strip()
if not raw:
return None
return clean_transcript_text(raw)
def load_history_json(path: str) -> Dict[str, Dict[str, Any]]:
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
history_map: Dict[str, Dict[str, Any]] = {}
for item in data:
url = item.get("titleUrl")
if not url:
continue
if "v=" in url:
vid = url.split("v=", 1)[1].split("&")[0]
else:
continue
history_map[vid] = {
"header": item.get("header"),
"title": item.get("title"),
"titleUrl": url,
"subtitles": item.get("subtitles"),
"time": item.get("time"),
"source": "history",
}
return history_map
def load_watch_later_csv(path: str) -> Dict[str, Dict[str, Any]]:
watch_later_map: Dict[str, Dict[str, Any]] = {}
with open(path, newline='', encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
vid = row.get("Video ID") or row.get("video_id")
added = row.get("Playlist Video Creation Timestamp") or row.get("added_at")
if vid:
watch_later_map[vid] = {
"watch_later_added_at": added,
"source": "watch_later",
}
return watch_later_map
def process_video(meta: Dict[str, Any]) -> Dict[str, Any]:
vid = meta.get("video_id")
transcript = fetch_transcript_direct(vid)
result = { **meta, "transcript": transcript }
return result
def main():
parser = argparse.ArgumentParser(description="Bulk fetch YouTube transcripts (history + watch-later), with metadata + resume-safe processing")
parser.add_argument("--history-json", type=str, help="Path to watch-history JSON file")
parser.add_argument("--watch-later-csv", type=str, help="Path to watch-later CSV file")
parser.add_argument("--out-jsonl", type=str, default="history_plus_later.jsonl",
help="Output JSONL file (appended incrementally)")
parser.add_argument("--out-csv", type=str, default="history_plus_later.csv",
help="Output CSV file (appended incrementally)")
parser.add_argument("--max-workers", type=int, default=4,
help="Number of threads for parallel fetching")
parser.add_argument("--pause-after", type=int, default=10,
help="Pause after this many transcripts to avoid overload")
args = parser.parse_args()
history_map: Dict[str, Dict[str, Any]] = {}
watch_later_map: Dict[str, Dict[str, Any]] = {}
if args.history_json and os.path.exists(args.history-json):
history_map = load_history_json(args.history_json)
if args.watch_later_csv and os.path.exists(args.watch_later_csv):
watch_later_map = load_watch_later_csv(args.watch_later_csv)
combined: Dict[str, Dict[str, Any]] = {}
for vid, meta in history_map.items():
combined[vid] = meta.copy()
for vid, meta in watch_later_map.items():
if vid in combined:
continue
combined[vid] = meta.copy()
all_vids = list(combined.keys())
print(f"Found {len(all_vids)} unique video IDs: history={len(history_map)}, watch_later={len(watch_later_map)}")
processed_ids = set()
# load processed from JSONL
if os.path.exists(args.out_jsonl):
with open(args.out_jsonl, "r", encoding="utf-8") as jf:
for line in jf:
try:
rec = json.loads(line)
vid = rec.get("video_id")
if vid:
processed_ids.add(vid)
except Exception:
continue
# also optionally load from existing CSV
if os.path.exists(args.out_csv):
with open(args.out_csv, newline='', encoding="utf-8") as cf:
reader = csv.DictReader(cf)
for row in reader:
vid = row.get("video_id")
if vid:
processed_ids.add(vid)
to_process = [vid for vid in all_vids if vid not in processed_ids]
print(f"Already processed: {len(processed_ids)}, to process now: {len(to_process)}")
if not to_process:
print("Nothing to do. Exiting.")
return
jsonl_f = open(args.out_jsonl, "a", encoding="utf-8")
csv_exists = os.path.exists(args.out_csv)
csv_f = open(args.out_csv, "a", newline='', encoding="utf-8")
writer = csv.writer(csv_f)
if not csv_exists:
writer.writerow([
"video_id", "source", "header", "title", "titleUrl",
"subtitles", "time", "watch_later_added_at", "transcript"
])
count = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as exe:
futures = { exe.submit(process_video, { **combined[vid], "video_id": vid }): vid for vid in to_process }
for fut in concurrent.futures.as_completed(futures):
rec = fut.result()
vid = rec.get("video_id")
ok = "OK" if rec.get("transcript") else "NO"
print(f"[{vid}] → {ok}")
jsonl_f.write(json.dumps(rec, ensure_ascii=False) + "\n")
jsonl_f.flush()
writer.writerow([
vid,
rec.get("source"),
rec.get("header"),
rec.get("title"),
rec.get("titleUrl"),
json.dumps(rec.get("subtitles")) if rec.get("subtitles") else "",
rec.get("time"),
rec.get("watch_later_added_at"),
rec.get("transcript") or ""
])
csv_f.flush()
count += 1
if count % args.pause_after == 0:
print(f"Processed {count} — sleeping briefly...")
time.sleep(5)
jsonl_f.close()
csv_f.close()
print("Done. Output written to:", args.out_jsonl, args.out_csv)
if __name__ == "__main__":
main()
"""
python yt_transcript_bulk.py \
--history-json "Takeout/YouTube and YouTube Music/history/watch-history.json" \
--watch-later-csv "Takeout/YouTube and YouTube Music/playlists/Watch later-videos.csv" \
--out-jsonl history_plus_later.jsonl \
--out-csv history_plus_later.csv \
--max-workers 4 \
--pause-after 10
"""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment