Last active
September 9, 2025 18:22
-
-
Save pcmoritz/fa911ee735a5a8f5329ea7a7c8adc47d to your computer and use it in GitHub Desktop.
Code used to create the https://huggingface.co/datasets/pcmoritz/cpython_dataset dataset
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Given a list of github issues in a 'cpython_issues.jsonl' file, | |
| # extract the base_commit, issue body and title. Note: This script | |
| # assumes that the https://github.com/python/cpython repo is | |
| # checked out in the current working directory. | |
| import json | |
| import subprocess | |
| EXCLUDE_MERGE_COMMITS = { | |
| "30de46c201254fadecccc766c544e7db6b07df6c", | |
| } | |
| def read_jsonl(file_path): | |
| data = [] | |
| with open(file_path, 'r', encoding='utf-8') as file: | |
| for line in file: | |
| data.append(json.loads(line.strip())) | |
| return data | |
| cpython_issues = read_jsonl('cpython_issues.jsonl') | |
| num_skipped_issues = 0 | |
| def extract_pr_number(issue_body: str) -> str | None: | |
| header = "### Linked PRs" | |
| header_pos = issue_body.find(header) | |
| if header_pos == -1: | |
| return None | |
| remaining_text = issue_body[header_pos:] | |
| lines = remaining_text.split('\n') | |
| for line in lines: | |
| line = line.strip() | |
| prefix = "* gh-" | |
| if line.startswith(prefix): | |
| return line[len(prefix):].strip() | |
| return None | |
| def get_commits(pr_number: str) -> dict[str, str] | None: | |
| "Find merge commit of the PR and the base commit (the parent commit of the merge commit)." | |
| print("pr", pr_number) | |
| result = subprocess.run([ | |
| "git", "log", "--oneline", "--no-abbrev-commit", f'--grep={pr_number}', "main" | |
| ], capture_output=True, check=True, text=True) | |
| merge_commit = result.stdout.split(" ")[0] | |
| if merge_commit in EXCLUDE_MERGE_COMMITS or merge_commit == "": | |
| return None | |
| parent_result = subprocess.run([ | |
| 'git', 'rev-parse', f'{merge_commit}^1' | |
| ], capture_output=True, check=True, text=True) | |
| parent_commit = parent_result.stdout.strip() | |
| if parent_commit == "": | |
| return None | |
| return {"merge_commit": merge_commit, "base_commit": parent_commit} | |
| result = [] | |
| for issue in cpython_issues: | |
| if not issue["body"]: | |
| num_skipped_issues += 1 | |
| print(f"!!! Skipping issue {issue['title']} since it is missing a body") | |
| continue | |
| pr_number = extract_pr_number(issue["body"]) | |
| if not pr_number: | |
| num_skipped_issues += 1 | |
| print(f"!!! Skipping issue {issue['title']} since it is missing an associated PR") | |
| continue | |
| commits = get_commits(pr_number) | |
| if not commits: | |
| num_skipped_issues += 1 | |
| print(f"!!! Skipping issue {issue['title']} since base commit of pr {pr_number} cannot be determined") | |
| continue | |
| problem_statement = "# " + issue["title"] + "\n\n" + issue["body"] | |
| result.append({"repo": "python/cpython", "instance_id": f"python__cpython-{pr_number}", "problem_statement": problem_statement, **commits}) | |
| print("Number of skipped issues:", num_skipped_issues) | |
| # Write dataset to jsonl file | |
| with open('cpython_dataset.jsonl', 'w') as f: | |
| for record in result: | |
| f.write(json.dumps(record) + '\n') |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # This script fetches the completed issues from the https://github.com/python/cpython | |
| # repository and writes them to a file 'cpython_issues.jsonl'. Note that if you run it | |
| # yourself, you probably want to adapt the num_pages argument below. | |
| import json | |
| import os | |
| import requests | |
| def list_issues(owner: str, repo: str, num_pages: int = 1): | |
| current_page = 0 | |
| per_page = 100 | |
| url = f"https://api.github.com/repos/{owner}/{repo}/issues" | |
| params = { | |
| "state": "closed", | |
| "per_page": per_page, | |
| "sort": "created", | |
| "direction": "desc" | |
| } | |
| headers = {} | |
| if token := os.getenv('GITHUB_TOKEN'): | |
| headers["Authorization"] = f"Bearer {token}" | |
| while current_page < num_pages: | |
| response = requests.get(url, params=params, headers=headers) | |
| if response.status_code != 200: | |
| print(f"Error: {response.status_code} - {response.text}") | |
| break | |
| page_issues = response.json() | |
| if not page_issues: # No more issues | |
| break | |
| print("number of issues before filtering:", len(page_issues)) | |
| # Filter out PRs | |
| page_issues = [issue for issue in page_issues if 'pull_request' not in issue] | |
| # Only keep issues that have been completed | |
| page_issues = [issue for issue in page_issues if issue['state_reason'] == "completed"] | |
| print("number of issues after filtering:", len(page_issues)) | |
| yield page_issues | |
| current_page += 1 | |
| # Get next page URL from Link header | |
| link_header = response.headers.get('Link', '') | |
| next_url = None | |
| if 'rel="next"' in link_header: | |
| # Parse the Link header to find next URL | |
| links = link_header.split(',') | |
| for link in links: | |
| if 'rel="next"' in link: | |
| next_url = link.split(';')[0].strip('<> ') | |
| break | |
| if not next_url: # No more pages | |
| break | |
| # Update URL for next iteration and clear params (URL already contains them) | |
| url = next_url | |
| params = {} | |
| return issues | |
| # Write issues to jsonl file | |
| with open('cpython_issues.jsonl', 'w') as f: | |
| for issues in list_issues("python", "cpython", num_pages=100000): | |
| for issue in issues: | |
| f.write(json.dumps(issue) + '\n') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment