Skip to content

Instantly share code, notes, and snippets.

@dat-boris
Created July 30, 2025 04:11
Show Gist options
  • Select an option

  • Save dat-boris/e20c1fb088b200fdeb348100c08f6fc5 to your computer and use it in GitHub Desktop.

Select an option

Save dat-boris/e20c1fb088b200fdeb348100c08f6fc5 to your computer and use it in GitHub Desktop.
Script for exporting Langfuse annotation queue comments for open coding analysis
#!/usr/bin/env python3
"""Given a queue name, export the list of traces which have been annotated to a
CSV file.
./export_langfuse_comments.py <queueId> <output-csv-file>
The CSV file should contains the following columns:
- query
- response
- comments
- manual-score
"""
import argparse
import csv
import os
import requests
from requests.auth import HTTPBasicAuth
from typing import Dict, List, Optional
import sys
import base64
# Constants
LANGFUSE_PUBLIC_KEY = os.getenv("LANGFUSE_PUBLIC_KEY")
LANGFUSE_SECRET_KEY = os.getenv("LANGFUSE_SECRET_KEY")
LANGFUSE_BASE_URL = "https://us.cloud.langfuse.com/api/public"
PROJECT_NAME = "khan-academy-dev"
ITEMS_PER_PAGE = 100
def get_auth() -> HTTPBasicAuth:
"""Get HTTPBasicAuth for Langfuse API."""
if not LANGFUSE_PUBLIC_KEY or not LANGFUSE_SECRET_KEY:
raise ValueError("LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY environment variables must be set")
return HTTPBasicAuth(LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY)
def verify_project_access() -> bool:
"""Verify authentication and project access.
Returns:
bool: True if "khan-academy-dev" project is accessible, False otherwise.
"""
auth = get_auth()
try:
response = requests.get(
f"{LANGFUSE_BASE_URL}/projects",
auth=auth,
headers={"Content-Type": "application/json"}
)
response.raise_for_status()
data = response.json()
# Check if khan-academy-dev project exists
for project in data.get("data", []):
if project.get("name") == PROJECT_NAME:
return True
return False
except requests.exceptions.RequestException as e:
print(f"Error verifying project access: {e}")
return False
def get_queue_items(queue_id: str, status: str = "COMPLETED") -> List[Dict]:
"""Get all items from a queue with pagination."""
items = []
page = 1
auth = get_auth()
while True:
response = requests.get(
f"{LANGFUSE_BASE_URL}/annotation-queues/{queue_id}/items",
auth=auth,
headers={"Content-Type": "application/json"},
params={"status": status, "page": page, "limit": ITEMS_PER_PAGE}
)
response.raise_for_status()
data = response.json()
items.extend(data["data"])
if page >= data["meta"]["totalPages"]:
break
page += 1
return items
def get_trace_details(trace_id: str) -> Dict:
"""Get details for a specific trace."""
auth = get_auth()
response = requests.get(
f"{LANGFUSE_BASE_URL}/traces/{trace_id}",
auth=auth,
headers={"Content-Type": "application/json"}
)
response.raise_for_status()
return response.json()
def get_comments(object_id: str) -> List[Dict]:
"""Get all comments for a specific object."""
auth = get_auth()
response = requests.get(
f"{LANGFUSE_BASE_URL}/comments",
auth=auth,
headers={"Content-Type": "application/json"},
params={"objectId": object_id, "objectType": "TRACE"}
)
response.raise_for_status()
return response.json()["data"]
def get_manual_score(scores: List[Dict]) -> Optional[float]:
"""Extract manual score from scores list."""
for score in scores:
if score["source"] == "ANNOTATION" and score["dataType"] == "NUMERIC":
return score["value"]
return None
def get_last_content(input: str) -> str:
"""
Given a JSON string representation of a dict with a 'messages' list,
return the 'content' of the last message.
"""
if isinstance(input, dict) and "messages" in input:
messages = input["messages"]
if isinstance(messages, list) and messages:
last_msg = messages[-1]
if isinstance(last_msg, dict) and "content" in last_msg:
return last_msg["content"]
return ""
def main():
# Parse command line arguments
parser = argparse.ArgumentParser(description="Export Langfuse annotations to CSV")
parser.add_argument("queue_id", help="Langfuse queue ID")
parser.add_argument("output_file", help="Output CSV file path")
args = parser.parse_args()
# Verify project access first
if not verify_project_access():
print("Error: Could not access khan-academy-dev project. Please check your credentials.")
sys.exit(1)
# Get all completed items from the queue
queue_items = get_queue_items(args.queue_id)
# Prepare CSV file
print(f"Writing output to: {args.output_file}")
with open(args.output_file, 'w', newline='') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=['query', 'response', 'comments', 'manual-score'])
writer.writeheader()
for item in queue_items:
trace_id = item["objectId"]
# Get trace details
trace = get_trace_details(trace_id)
# Get comments
comments = get_comments(trace_id)
comment_texts = [comment["content"] for comment in comments]
# Extract query and response from trace
query = get_last_content(trace.get("input", ""))
response = trace.get("output", "")
# Get manual score
manual_score = get_manual_score(trace.get("scores", []))
# Write to CSV
writer.writerow({
'query': query,
'response': response,
'comments': "; ".join(comment_texts),
'manual-score': manual_score if manual_score is not None else ""
})
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment