Skip to content

Instantly share code, notes, and snippets.

@udaylunawat
Last active November 29, 2025 13:03
Show Gist options
  • Select an option

  • Save udaylunawat/e5c962e9fa9071d15711bb2ae042f2eb to your computer and use it in GitHub Desktop.

Select an option

Save udaylunawat/e5c962e9fa9071d15711bb2ae042f2eb to your computer and use it in GitHub Desktop.
Gets youtube transcripts from takeout data and enriches it with metadata using yt-dlp
#!/usr/bin/env python3
import argparse
import json
import csv
import os
import sys
import time
import requests
from bs4 import BeautifulSoup
import concurrent.futures
from typing import Optional, List, Dict, Any
# increase CSV field size limit if needed
csv_max = sys.maxsize
while True:
try:
import csv as _csv
_csv.field_size_limit(csv_max)
break
except OverflowError:
csv_max = int(csv_max / 10)
BASE = "https://youtubetotranscript.com"
def clean_transcript_text(raw: str) -> Optional[str]:
lines = raw.splitlines()
cleaned: List[str] = []
started = False
for ln in lines:
s = ln.strip()
if not started:
if not s or s.upper() == "AI":
continue
started = True
cleaned.append(ln)
if not cleaned:
return None
text = "\n".join(cleaned).strip()
return text or None
def fetch_transcript_direct(video_id: str, session: Optional[requests.Session] = None) -> Optional[str]:
url = f"{BASE}/transcript?v={video_id}"
s = session or requests.Session()
try:
resp = s.get(url, timeout=30)
except Exception:
return None
if resp.status_code != 200:
return None
html = resp.text
text = BeautifulSoup(html, "html.parser").get_text(separator="\n")
if "Failed to get transcript for this video" in text:
return None
if "Pin video" in text:
text = text.split("Pin video", 1)[1]
if "Back To Top" in text:
text = text.split("Back To Top", 1)[0]
lines: List[str] = []
skip = set(["Transcript", "Copy", "Timestamp OFF", "Timestamp On", "", " ", "\xa0"])
for line in text.splitlines():
l = line.strip()
if not l or l in skip:
continue
if l.startswith("YouTubeToTranscript"):
continue
lines.append(line)
raw = "\n".join(lines).strip()
if not raw:
return None
return clean_transcript_text(raw)
# import yt-dlp for metadata
from yt_dlp import YoutubeDL
def fetch_metadata(video_id: str) -> Optional[Dict[str, Any]]:
url = f"https://www.youtube.com/watch?v={video_id}"
opts = {
'quiet': True,
'no_warnings': True,
'skip_download': True,
'force_generic_extractor': False, # use default youtube extractor
}
try:
with YoutubeDL(opts) as ydl:
info = ydl.extract_info(url, download=False)
# note: uploader_url is usually provided in info dict as 'uploader_url' or 'uploader_id' + base YT URL
return {
"title": info.get("title"),
"uploader": info.get("uploader"),
"uploader_url": info.get("uploader_url"),
"upload_date": info.get("upload_date"),
"view_count": info.get("view_count"),
"duration": info.get("duration"),
"tags": info.get("tags"),
"description": info.get("description"),
}
except Exception as e:
return {"error": str(e)}
def load_history_json(path: str) -> Dict[str, Dict[str, Any]]:
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
history: Dict[str, Dict[str, Any]] = {}
for item in data:
url = item.get("titleUrl")
if not url:
continue
if "v=" in url:
vid = url.split("v=",1)[1].split("&")[0]
else:
continue
history[vid] = {
"history_title": item.get("title"),
"titleUrl": url,
"subtitles": item.get("subtitles"),
"time": item.get("time"),
}
return history
def load_watch_later_csv(path: str) -> Dict[str, Dict[str, Any]]:
watch_later: Dict[str, Dict[str, Any]] = {}
with open(path, newline='', encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
vid = row.get("Video ID") or row.get("video_id")
added = row.get("Playlist Video Creation Timestamp") or row.get("added_at")
if not vid:
continue
watch_later[vid] = {
"watch_later_added_at": added
}
return watch_later
def main():
parser = argparse.ArgumentParser(description="Fetch YouTube transcripts + metadata (channel link, duration etc) with resume + optional test-run")
parser.add_argument("--history-json", type=str, help="Path to history JSON file")
parser.add_argument("--watch-later-csv", type=str, help="Path to watch-later CSV file")
parser.add_argument("--out-jsonl", type=str, default="full_output.jsonl", help="Output JSONL file (append)")
parser.add_argument("--out-csv", type=str, default="full_output.csv", help="Output CSV file (append)")
parser.add_argument("--failed-jsonl", type=str, default="failed.jsonl", help="Log failed video IDs")
parser.add_argument("--max-workers", type=int, default=4, help="Parallel fetch workers")
parser.add_argument("--pause-after", type=int, default=10, help="Pause after processing N videos")
parser.add_argument("--test-count", type=int, default=0, help="If >0: only process first N videos (test run)")
args = parser.parse_args()
history = {}
if args.history_json and os.path.exists(args.history_json):
history = load_history_json(args.history_json)
watch = {}
if args.watch_later_csv and os.path.exists(args.watch_later_csv):
watch = load_watch_later_csv(args.watch_later_csv)
combined: Dict[str, Dict[str, Any]] = {}
for vid, m in history.items():
combined[vid] = { **m, "source": "history" }
for vid, m in watch.items():
if vid in combined:
continue
combined[vid] = { **m, "source": "watch_later" }
all_vids = list(combined.keys())
print(f"Total video IDs: {len(all_vids)} (history: {len(history)}, watch_later: {len(watch)})")
# load already processed
processed = set()
if os.path.exists(args.out_jsonl):
with open(args.out_jsonl, "r", encoding="utf-8") as jf:
for line in jf:
try:
rec = json.loads(line)
vid0 = rec.get("video_id")
if vid0:
processed.add(vid0)
except:
pass
to_process = [vid for vid in all_vids if vid not in processed]
if args.test_count and args.test_count > 0:
to_process = to_process[: args.test_count]
print(f"Test run mode: will process only first {len(to_process)} videos.")
print(f"Already processed: {len(processed)}, to process now: {len(to_process)}")
if not to_process:
print("Nothing to process — exiting.")
return
jsonl_f = open(args.out_jsonl, "a", encoding="utf-8")
csv_exists = os.path.exists(args.out_csv)
csv_f = open(args.out_csv, "a", newline="", encoding="utf-8")
writer = csv.writer(csv_f)
if not csv_exists:
writer.writerow([
"video_id", "source",
"history_title", "titleUrl",
"watch_later_added_at",
"metadata_title", "uploader", "uploader_url", "upload_date",
"view_count", "duration", "tags", "description",
"transcript"
])
failed_f = open(args.failed_jsonl, "a", encoding="utf-8")
count = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as exe:
future_to_vid = {
exe.submit(lambda v=vid: (vid, fetch_transcript_direct(v), fetch_metadata(v))) : vid
for vid in to_process
}
for fut in concurrent.futures.as_completed(future_to_vid):
vid = future_to_vid[fut]
try:
vid0, transcript, metadata = fut.result()
except Exception as e:
transcript, metadata = None, None
base = combined.get(vid, {})
record: Dict[str, Any] = {
"video_id": vid,
"source": base.get("source")
}
if base.get("history_title"):
record["history_title"] = base.get("history_title")
record["titleUrl"] = base.get("titleUrl")
if base.get("watch_later_added_at"):
record["watch_later_added_at"] = base.get("watch_later_added_at")
if metadata:
if metadata.get("error"):
# metadata fetch failed
record["metadata_error"] = metadata.get("error")
else:
record["title"] = metadata.get("title")
record["uploader"] = metadata.get("uploader")
record["uploader_url"] = metadata.get("uploader_url")
record["upload_date"] = metadata.get("upload_date")
record["view_count"] = metadata.get("view_count")
record["duration"] = metadata.get("duration")
record["tags"] = metadata.get("tags")
record["description"] = metadata.get("description")
if transcript:
record["transcript"] = transcript
# write JSONL (streaming)
jsonl_f.write(json.dumps(record, ensure_ascii=False) + "\n")
jsonl_f.flush()
# write CSV row
writer.writerow([
record.get("video_id",""),
record.get("source",""),
record.get("history_title",""),
record.get("titleUrl",""),
record.get("watch_later_added_at",""),
record.get("title",""),
record.get("uploader",""),
record.get("uploader_url",""),
record.get("upload_date",""),
record.get("view_count",""),
record.get("duration",""),
",".join(record.get("tags") or []),
record.get("description",""),
record.get("transcript","")
])
csv_f.flush()
# if transcript or metadata missing, log to failed
if (not transcript) or (not metadata) or metadata.get("error"):
failed_f.write(json.dumps({
"video_id": vid,
"transcript_ok": bool(transcript),
"metadata_ok": bool(metadata and not metadata.get("error")),
"metadata_error": metadata.get("error") if metadata else None
}, ensure_ascii=False) + "\n")
failed_f.flush()
count += 1
print(f"[{vid}] → transcript: {'OK' if transcript else 'NO'}, metadata: {'OK' if metadata and not metadata.get('error') else 'NO'}")
if count % args.pause_after == 0:
print(f"Processed {count} — sleeping briefly …")
time.sleep(5)
jsonl_f.close()
csv_f.close()
failed_f.close()
print("Done. Output written to:", args.out_jsonl, args.out_csv, "Failed log:", args.failed_jsonl)
if __name__ == "__main__":
main()
"""
python3 3_yt_full_fetch.py \
--history-json "Takeout/YouTube and YouTube Music/history/watch-history.json" \
--watch-later-csv "Takeout/YouTube and YouTube Music/playlists/Watch later-videos.csv" \
--out-jsonl enriched_watch_later.jsonl \
--out-csv enriched_watch_later.csv \
--failed-jsonl enriched_watch_later_failed.jsonl \
--max-workers 7 \
--pause-after 50 \
--test-count 10
"""
"""
python3 3_yt_full_fetch.py \
--watch-later-csv "Takeout/YouTube and YouTube Music/playlists/Watch later-videos.csv" \
--out-jsonl enriched_watch_later.jsonl \
--out-csv enriched_watch_later.csv \
--failed-jsonl enriched_watch_later_failed.jsonl \
--max-workers 7 \
--pause-after 50 \
--test-count 10
"""
requests>=2.0
beautifulsoup4>=4.0
playwright>=1.50.0
yt-dlp
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment