|
import requests |
|
import time |
|
import os |
|
import json |
|
import sys |
|
import base64 |
|
from dotenv import load_dotenv |
|
|
|
load_dotenv() |
|
|
|
# Replace with your organization name and personal access token |
|
org_name = "sunsecrn" |
|
token = os.getenv("GITHUB_TOKEN") |
|
|
|
# Headers for authentication |
|
headers = {"Authorization": f"Bearer {token}"} |
|
|
|
# File to save progress |
|
PROGRESS_FILE = "github_detailed_data.json" |
|
|
|
# Test mode parameter |
|
TEST_MODE = "--test" in sys.argv |
|
MAX_TEST_REPOS = 5 |
|
|
|
def load_progress(): |
|
if os.path.exists(PROGRESS_FILE): |
|
try: |
|
with open(PROGRESS_FILE, 'r') as f: |
|
data = json.load(f) |
|
print(f"Loaded progress: {len(data.get('processed_repos', []))} repositories already processed") |
|
return data |
|
except Exception as e: |
|
print(f"Error loading progress file: {e}") |
|
return {"processed_repos": [], "all_repos": [], "repos_data": {}} |
|
|
|
def save_progress(processed_repos, all_repos, repos_data): |
|
data = { |
|
"processed_repos": processed_repos, |
|
"all_repos": all_repos, |
|
"repos_data": repos_data |
|
} |
|
try: |
|
with open(PROGRESS_FILE, 'w') as f: |
|
json.dump(data, f, indent=2) |
|
print(f"Progress saved: {len(processed_repos)} repositories processed") |
|
except Exception as e: |
|
print(f"Error saving progress: {e}") |
|
|
|
def handle_rate_limit(response): |
|
if response.status_code == 403 and "X-RateLimit-Remaining" in response.headers: |
|
remaining = int(response.headers["X-RateLimit-Remaining"]) |
|
if remaining == 0: |
|
reset_time = int(response.headers["X-RateLimit-Reset"]) |
|
sleep_time = reset_time - int(time.time()) |
|
if sleep_time > 0: |
|
print(f"Rate limit reached. Sleeping for {sleep_time} seconds...") |
|
time.sleep(sleep_time) |
|
|
|
def make_request_with_retry(url, max_retries=3, base_delay=1, request_name="API"): |
|
print(f" Making {request_name} request: {url}") |
|
for attempt in range(max_retries): |
|
try: |
|
response = requests.get(url, headers=headers, timeout=30) |
|
handle_rate_limit(response) |
|
print(f" {request_name} request successful (status: {response.status_code})") |
|
return response |
|
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e: |
|
if attempt < max_retries - 1: |
|
delay = base_delay * (2 ** attempt) |
|
print(f" Connection error (attempt {attempt + 1}/{max_retries}): {e}") |
|
print(f" Retrying in {delay} seconds...") |
|
time.sleep(delay) |
|
else: |
|
print(f" Failed after {max_retries} attempts: {e}") |
|
raise |
|
except Exception as e: |
|
print(f" Unexpected error: {e}") |
|
raise |
|
return None |
|
|
|
def make_head_request_with_retry(url, max_retries=3, base_delay=1, request_name="HEAD"): |
|
print(f" Making {request_name} request: {url}") |
|
for attempt in range(max_retries): |
|
try: |
|
response = requests.head(url, headers=headers, timeout=30) |
|
handle_rate_limit(response) |
|
print(f" {request_name} request successful (status: {response.status_code})") |
|
return response |
|
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e: |
|
if attempt < max_retries - 1: |
|
delay = base_delay * (2 ** attempt) |
|
print(f" Connection error (attempt {attempt + 1}/{max_retries}): {e}") |
|
print(f" Retrying in {delay} seconds...") |
|
time.sleep(delay) |
|
else: |
|
print(f" Failed after {max_retries} attempts: {e}") |
|
raise |
|
except Exception as e: |
|
print(f" Unexpected error: {e}") |
|
raise |
|
return None |
|
|
|
def fetch_all_pages(url, per_page=100, max_pages=None): |
|
results = [] |
|
page = 1 |
|
|
|
while True: |
|
# Stop if we've reached the max pages limit (for test mode) |
|
if max_pages and page > max_pages: |
|
print(f" Reached max pages limit ({max_pages})") |
|
break |
|
|
|
separator = "&" if "?" in url else "?" |
|
paginated_url = f"{url}{separator}per_page={per_page}&page={page}" |
|
|
|
print(f" Fetching page {page}...") |
|
response = make_request_with_retry(paginated_url, request_name=f"Page {page}") |
|
|
|
if response is None or response.status_code != 200: |
|
if response: |
|
print(f" Error: {response.status_code} - {response.text}") |
|
break |
|
|
|
page_data = response.json() |
|
|
|
if not page_data: |
|
print(f" No more data on page {page}") |
|
break |
|
|
|
results.extend(page_data) |
|
print(f" Fetched {len(page_data)} items from page {page}") |
|
|
|
if len(page_data) < per_page: |
|
print(f" Last page reached (got {len(page_data)} items, expected {per_page})") |
|
break |
|
|
|
page += 1 |
|
time.sleep(0.5) |
|
|
|
return results |
|
|
|
def get_count_from_link_header(url): |
|
"""Get total count from Link header using HEAD request""" |
|
response = make_head_request_with_retry(url, request_name="Count Check") |
|
|
|
if response is None or response.status_code != 200: |
|
return 0 |
|
|
|
# Try to get count from Link header |
|
link_header = response.headers.get("Link", "") |
|
if "rel=\"last\"" in link_header: |
|
# Extract page number from last link |
|
import re |
|
match = re.search(r'page=(\d+)>; rel="last"', link_header) |
|
if match: |
|
last_page = int(match.group(1)) |
|
# Get the last page to count actual items |
|
# Fix URL construction - use proper separator |
|
separator = "&" if "?" in url else "?" |
|
last_page_url = f"{url}{separator}per_page=100&page={last_page}" |
|
last_response = make_request_with_retry(last_page_url, request_name="Last Page Count") |
|
if last_response and last_response.status_code == 200: |
|
last_page_data = last_response.json() |
|
total_count = (last_page - 1) * 100 + len(last_page_data) |
|
print(f" Calculated total count: {total_count} (pages: {last_page}, last page items: {len(last_page_data)})") |
|
return total_count |
|
|
|
# Fallback: if no Link header, try to get count from first page |
|
try: |
|
separator = "&" if "?" in url else "?" |
|
first_page_url = f"{url}{separator}per_page=1" |
|
first_response = make_request_with_retry(first_page_url, request_name="First Page Count") |
|
if first_response and first_response.status_code == 200: |
|
first_page_data = first_response.json() |
|
if first_page_data: |
|
print(f" Found at least 1 item (no pagination info)") |
|
return 1 |
|
except Exception as e: |
|
print(f" Error in fallback count: {e}") |
|
|
|
return 0 |
|
|
|
def get_security_alerts(repo_name): |
|
"""Get security alerts count for different types""" |
|
print(f" Getting security alerts for: {repo_name}") |
|
|
|
alerts = { |
|
"dependabot": 0, |
|
"code_scanning": 0, |
|
"secret_scanning": 0 |
|
} |
|
|
|
# Dependabot alerts - fetch all and filter for open |
|
try: |
|
dependabot_url = f"https://api.github.com/repos/{org_name}/{repo_name}/dependabot/alerts" |
|
dependabot_data = fetch_all_pages(dependabot_url, max_pages=3 if TEST_MODE else None) |
|
|
|
open_dependabot_count = 0 |
|
for alert in dependabot_data: |
|
if alert.get("state") == "open": |
|
open_dependabot_count += 1 |
|
|
|
alerts["dependabot"] = open_dependabot_count |
|
print(f" Dependabot alerts (open): {alerts['dependabot']} out of {len(dependabot_data)} total") |
|
except Exception as e: |
|
print(f" Error getting Dependabot alerts: {e}") |
|
|
|
# Code scanning alerts - fetch all and filter for open |
|
try: |
|
code_scanning_url = f"https://api.github.com/repos/{org_name}/{repo_name}/code-scanning/alerts" |
|
code_scanning_data = fetch_all_pages(code_scanning_url, max_pages=3 if TEST_MODE else None) |
|
|
|
open_code_scanning_count = 0 |
|
for alert in code_scanning_data: |
|
if alert.get("state") == "open": |
|
open_code_scanning_count += 1 |
|
|
|
alerts["code_scanning"] = open_code_scanning_count |
|
print(f" Code scanning alerts (open): {alerts['code_scanning']} out of {len(code_scanning_data)} total") |
|
except Exception as e: |
|
print(f" Error getting code scanning alerts: {e}") |
|
|
|
# Secret scanning alerts - fetch all and filter for open |
|
try: |
|
secret_scanning_url = f"https://api.github.com/repos/{org_name}/{repo_name}/secret-scanning/alerts" |
|
secret_scanning_data = fetch_all_pages(secret_scanning_url, max_pages=3 if TEST_MODE else None) |
|
|
|
open_secret_scanning_count = 0 |
|
for alert in secret_scanning_data: |
|
if alert.get("state") == "open": |
|
open_secret_scanning_count += 1 |
|
|
|
alerts["secret_scanning"] = open_secret_scanning_count |
|
print(f" Secret scanning alerts (open): {alerts['secret_scanning']} out of {len(secret_scanning_data)} total") |
|
except Exception as e: |
|
print(f" Error getting secret scanning alerts: {e}") |
|
|
|
return alerts |
|
|
|
def get_dependabot_prs(repo_name): |
|
"""Get count of open pull requests created by Dependabot""" |
|
print(f" Getting Dependabot PRs for: {repo_name}") |
|
|
|
try: |
|
url = f"https://api.github.com/repos/{org_name}/{repo_name}/pulls?state=open&per_page=100" |
|
pulls_data = fetch_all_pages(url, max_pages=3 if TEST_MODE else None) |
|
|
|
dependabot_count = 0 |
|
for pull in pulls_data: |
|
if pull.get("user", {}).get("login") == "dependabot[bot]": |
|
dependabot_count += 1 |
|
|
|
print(f" Dependabot open PRs: {dependabot_count}") |
|
return dependabot_count |
|
except Exception as e: |
|
print(f" Error getting Dependabot PRs: {e}") |
|
return 0 |
|
|
|
def get_security_and_automation_status(repo_name, repo_details): |
|
"""Get security and automation tool status""" |
|
print(f" Getting security and automation status for: {repo_name}") |
|
|
|
security_data = { |
|
"github_actions_enabled": False, |
|
"dependabot_enabled": False, |
|
"code_scanning_enabled": False, |
|
"secret_scanning_enabled": False, |
|
"alerts": { |
|
"dependabot": 0, |
|
"code_scanning": 0, |
|
"secret_scanning": 0 |
|
}, |
|
"dependabot_open_prs": 0 |
|
} |
|
|
|
# Check GitHub Actions status |
|
try: |
|
actions_url = f"https://api.github.com/repos/{org_name}/{repo_name}/actions/permissions" |
|
response = make_request_with_retry(actions_url, request_name="GitHub Actions Check") |
|
security_data["github_actions_enabled"] = response is not None and response.status_code == 200 |
|
print(f" GitHub Actions enabled: {security_data['github_actions_enabled']}") |
|
except Exception as e: |
|
print(f" Error checking GitHub Actions: {e}") |
|
|
|
# Get security and analysis status from repo details |
|
if repo_details and "security_and_analysis" in repo_details: |
|
security_analysis = repo_details["security_and_analysis"] |
|
|
|
# Dependabot status |
|
dependabot_info = security_analysis.get("dependabot_security_updates", {}) |
|
security_data["dependabot_enabled"] = dependabot_info.get("status") == "enabled" |
|
print(f" Dependabot enabled: {security_data['dependabot_enabled']}") |
|
|
|
# Code scanning status |
|
advanced_security = security_analysis.get("advanced_security", {}) |
|
security_data["code_scanning_enabled"] = advanced_security.get("status") == "enabled" |
|
print(f" Code scanning enabled: {security_data['code_scanning_enabled']}") |
|
|
|
# Secret scanning status |
|
secret_scanning = security_analysis.get("secret_scanning", {}) |
|
security_data["secret_scanning_enabled"] = secret_scanning.get("status") == "enabled" |
|
print(f" Secret scanning enabled: {security_data['secret_scanning_enabled']}") |
|
|
|
# Get security alerts |
|
alerts = get_security_alerts(repo_name) |
|
security_data["alerts"] = alerts |
|
|
|
# Get Dependabot PRs |
|
dependabot_prs = get_dependabot_prs(repo_name) |
|
security_data["dependabot_open_prs"] = dependabot_prs |
|
|
|
return security_data |
|
|
|
def get_readme_content(repo_name): |
|
"""Get README.md content if it exists""" |
|
print(f" Checking for README.md in: {repo_name}") |
|
|
|
# Try different README file names |
|
readme_files = ["README.md", "README.txt", "readme.md", "readme.txt"] |
|
|
|
for readme_file in readme_files: |
|
url = f"https://api.github.com/repos/{org_name}/{repo_name}/contents/{readme_file}" |
|
response = make_request_with_retry(url, request_name=f"README Check ({readme_file})") |
|
|
|
if response and response.status_code == 200: |
|
try: |
|
content_data = response.json() |
|
|
|
# Check if it's a file and has content |
|
if content_data.get("type") == "file" and content_data.get("content"): |
|
# Decode base64 content |
|
content_bytes = base64.b64decode(content_data["content"]) |
|
content_string = content_bytes.decode('utf-8', errors='ignore') |
|
|
|
print(f" Found README: {readme_file} ({len(content_string)} characters)") |
|
return { |
|
"filename": readme_file, |
|
"content": content_string, |
|
"size": content_data.get("size", 0), |
|
"sha": content_data.get("sha", "") |
|
} |
|
except Exception as e: |
|
print(f" Error decoding README content: {e}") |
|
continue |
|
|
|
print(f" No README file found") |
|
return None |
|
|
|
def get_repo_details(repo_name): |
|
"""Get detailed repository information""" |
|
print(f" Getting repository details for: {repo_name}") |
|
url = f"https://api.github.com/repos/{org_name}/{repo_name}" |
|
response = make_request_with_retry(url, request_name="Repo Details") |
|
|
|
if response is None or response.status_code != 200: |
|
return None |
|
|
|
data = response.json() |
|
repo_info = { |
|
"repo_id": data["id"], |
|
"repo_name": data["name"], |
|
"url": data["html_url"], |
|
"description": data["description"], |
|
"is_private": data["private"], |
|
"is_archived": data["archived"], |
|
"created_at": data["created_at"], |
|
"pushed_at": data["pushed_at"] |
|
} |
|
print(f" Repository details: {repo_info['repo_name']} (ID: {repo_info['repo_id']})") |
|
return repo_info, data # Return both processed data and raw data |
|
|
|
def get_repo_languages(repo_name): |
|
"""Get repository languages""" |
|
print(f" Getting languages for: {repo_name}") |
|
url = f"https://api.github.com/repos/{org_name}/{repo_name}/languages" |
|
response = make_request_with_retry(url, request_name="Languages") |
|
|
|
if response is None or response.status_code != 200: |
|
return {} |
|
|
|
languages = response.json() |
|
print(f" Languages found: {list(languages.keys()) if languages else 'None'}") |
|
return languages |
|
|
|
def get_last_commits(repo_name, count=5): |
|
"""Get last N commits""" |
|
print(f" Getting last {count} commits for: {repo_name}") |
|
url = f"https://api.github.com/repos/{org_name}/{repo_name}/commits?per_page={count}" |
|
response = make_request_with_retry(url, request_name="Last Commits") |
|
|
|
if response is None or response.status_code != 200: |
|
return [] |
|
|
|
commits_data = response.json() |
|
commits = [] |
|
|
|
for commit in commits_data: |
|
commit_info = { |
|
"commit_sha": commit["sha"], |
|
"author_login": commit["author"]["login"] if commit["author"] else None, |
|
"author_email": commit["commit"]["author"]["email"], |
|
"commit_at": commit["commit"]["author"]["date"], |
|
"message": commit["commit"]["message"] |
|
} |
|
commits.append(commit_info) |
|
|
|
print(f" Found {len(commits)} commits") |
|
return commits |
|
|
|
def get_commit_count(repo_name): |
|
"""Get total commit count using HEAD request""" |
|
print(f" Getting commit count for: {repo_name}") |
|
url = f"https://api.github.com/repos/{org_name}/{repo_name}/commits" |
|
response = make_head_request_with_retry(url, request_name="Commit Count") |
|
|
|
if response is None or response.status_code != 200: |
|
print(f" Could not access commits endpoint (status: {response.status_code if response else 'None'})") |
|
return 0 |
|
|
|
# Try to get count from Link header |
|
link_header = response.headers.get("Link", "") |
|
if "rel=\"last\"" in link_header: |
|
# Extract page number from last link |
|
import re |
|
match = re.search(r'page=(\d+)>; rel="last"', link_header) |
|
if match: |
|
last_page = int(match.group(1)) |
|
# Get the last page to count actual commits |
|
last_page_url = f"{url}?per_page=100&page={last_page}" |
|
last_response = make_request_with_retry(last_page_url, request_name="Last Page Count") |
|
if last_response and last_response.status_code == 200: |
|
last_page_data = last_response.json() |
|
total_count = (last_page - 1) * 100 + len(last_page_data) |
|
print(f" Total commits: {total_count} (pages: {last_page}, last page commits: {len(last_page_data)})") |
|
return total_count |
|
|
|
# Fallback: if no Link header, try to get count from first page |
|
try: |
|
first_page_url = f"{url}?per_page=1" |
|
first_response = make_request_with_retry(first_page_url, request_name="First Page Count") |
|
if first_response and first_response.status_code == 200: |
|
first_page_data = first_response.json() |
|
if first_page_data: |
|
print(f" Found at least 1 commit (no pagination info)") |
|
return 1 |
|
except Exception as e: |
|
print(f" Error in fallback commit count: {e}") |
|
|
|
print(f" Could not determine commit count") |
|
return 0 |
|
|
|
def get_contributors(repo_name): |
|
"""Get repository contributors""" |
|
print(f" Getting contributors for: {repo_name}") |
|
url = f"https://api.github.com/repos/{org_name}/{repo_name}/contributors" |
|
contributors_data = fetch_all_pages(url, max_pages=3 if TEST_MODE else None) # Limit pages in test mode |
|
|
|
contributors = [] |
|
for contributor in contributors_data: |
|
contributor_info = { |
|
"user_id": contributor["id"], |
|
"login": contributor["login"], |
|
"account_type": contributor["type"], |
|
"contributions": contributor["contributions"] |
|
} |
|
contributors.append(contributor_info) |
|
|
|
print(f" Found {len(contributors)} contributors") |
|
return contributors |
|
|
|
def get_file_tree(repo_name): |
|
"""Get repository file tree""" |
|
print(f" Getting file tree for: {repo_name}") |
|
# First get the default branch |
|
repo_url = f"https://api.github.com/repos/{org_name}/{repo_name}" |
|
response = make_request_with_retry(repo_url, request_name="Default Branch") |
|
|
|
if response is None or response.status_code != 200: |
|
return [] |
|
|
|
repo_data = response.json() |
|
default_branch = repo_data["default_branch"] |
|
|
|
# Get the tree |
|
tree_url = f"https://api.github.com/repos/{org_name}/{repo_name}/git/trees/{default_branch}?recursive=1" |
|
response = make_request_with_retry(tree_url, request_name="File Tree") |
|
|
|
if response is None or response.status_code != 200: |
|
return [] |
|
|
|
tree_data = response.json() |
|
file_tree = [] |
|
|
|
for item in tree_data.get("tree", []): |
|
file_info = { |
|
"path": item["path"], |
|
"type": item["type"], |
|
"size": item.get("size", 0) |
|
} |
|
file_tree.append(file_info) |
|
|
|
print(f" Found {len(file_tree)} files/directories") |
|
return file_tree |
|
|
|
def process_repository(repo_name): |
|
"""Process a single repository and return detailed data""" |
|
print(f"\nProcessing repository: {repo_name}") |
|
|
|
repo_data = {} |
|
|
|
# 1. Get repository details (including security_and_analysis data) |
|
details_result = get_repo_details(repo_name) |
|
if details_result: |
|
repo_details, raw_repo_data = details_result |
|
repo_data.update(repo_details) |
|
else: |
|
raw_repo_data = None |
|
time.sleep(0.5) |
|
|
|
# 2. Get languages |
|
languages = get_repo_languages(repo_name) |
|
repo_data["languages"] = languages |
|
|
|
# Determine main language |
|
if languages: |
|
main_language = max(languages.items(), key=lambda x: x[1])[0] |
|
repo_data["main_language"] = main_language |
|
else: |
|
repo_data["main_language"] = None |
|
|
|
time.sleep(0.5) |
|
|
|
# 3. Get README content |
|
readme_data = get_readme_content(repo_name) |
|
repo_data["readme"] = readme_data |
|
time.sleep(0.5) |
|
|
|
# 4. Get last 5 commits |
|
commits = get_last_commits(repo_name, 5) |
|
repo_data["last_5_commits"] = commits |
|
time.sleep(0.5) |
|
|
|
# 5. Get commit count |
|
commit_count = get_commit_count(repo_name) |
|
repo_data["commit_count"] = commit_count |
|
time.sleep(0.5) |
|
|
|
# 6. Get contributors |
|
contributors = get_contributors(repo_name) |
|
repo_data["contributors"] = contributors |
|
time.sleep(0.5) |
|
|
|
# 7. Get file tree |
|
file_tree = get_file_tree(repo_name) |
|
repo_data["file_tree"] = file_tree |
|
time.sleep(0.5) |
|
|
|
# 8. Get security and automation data |
|
security_data = get_security_and_automation_status(repo_name, raw_repo_data) |
|
repo_data["security_and_automation"] = security_data |
|
time.sleep(0.5) |
|
|
|
print(f"Completed processing {repo_name}") |
|
return repo_data |
|
|
|
# Load existing progress |
|
progress_data = load_progress() |
|
processed_repos = set(progress_data.get("processed_repos", [])) |
|
all_repos = progress_data.get("all_repos", []) |
|
repos_data = progress_data.get("repos_data", {}) |
|
|
|
# Step 1: Get all repositories in the organization (only if not already loaded) |
|
if not all_repos: |
|
print(f"Fetching repositories from organization: {org_name}") |
|
repos_url = f"https://api.github.com/orgs/{org_name}/repos" |
|
|
|
# In test mode, limit the initial fetch to just get enough repos |
|
if TEST_MODE: |
|
print(f"TEST MODE: Limiting initial fetch to {MAX_TEST_REPOS} repositories") |
|
all_repos = fetch_all_pages(repos_url, max_pages=1) # Just get first page |
|
all_repos = all_repos[:MAX_TEST_REPOS] # Take only first 10 |
|
else: |
|
all_repos = fetch_all_pages(repos_url) |
|
|
|
print(f"Total repositories to process: {len(all_repos)}") |
|
else: |
|
print(f"Using cached repository list: {len(all_repos)} repositories") |
|
|
|
# Limit repositories for test mode (if we have more than needed) |
|
if TEST_MODE and len(all_repos) > MAX_TEST_REPOS: |
|
all_repos = all_repos[:MAX_TEST_REPOS] |
|
print(f"TEST MODE: Limited to {len(all_repos)} repositories") |
|
|
|
# Step 2: Process each repository |
|
repo_count = 0 |
|
total_repos = len(all_repos) |
|
|
|
for repo in all_repos: |
|
repo_name = repo['name'] |
|
|
|
# Skip if already processed |
|
if repo_name in processed_repos: |
|
repo_count += 1 |
|
print(f"Skipping already processed repository {repo_count}/{total_repos}: {repo_name}") |
|
continue |
|
|
|
repo_count += 1 |
|
print(f"\nProcessing repository {repo_count}/{total_repos}: {repo_name}") |
|
|
|
try: |
|
# Process repository |
|
repo_data = process_repository(repo_name) |
|
|
|
# Store the data |
|
repos_data[repo_name] = repo_data |
|
|
|
# Mark as processed |
|
processed_repos.add(repo_name) |
|
|
|
# Save progress every 5 repositories |
|
if repo_count % 5 == 0: |
|
save_progress(list(processed_repos), all_repos, repos_data) |
|
|
|
print(f"Progress saved for {repo_name}") |
|
|
|
except Exception as e: |
|
print(f"Error processing repository {repo_name}: {e}") |
|
# Save progress even on error |
|
save_progress(list(processed_repos), all_repos, repos_data) |
|
continue |
|
|
|
# Final save |
|
save_progress(list(processed_repos), all_repos, repos_data) |
|
|
|
# Step 3: Generate summary statistics |
|
print(f"\n{'='*80}") |
|
print("REPOSITORY ANALYSIS SUMMARY") |
|
print(f"{'='*80}") |
|
|
|
# Language statistics |
|
language_stats = {} |
|
total_repos_with_languages = 0 |
|
total_repos_with_readme = 0 |
|
|
|
# Security statistics |
|
security_stats = { |
|
"github_actions_enabled": 0, |
|
"dependabot_enabled": 0, |
|
"code_scanning_enabled": 0, |
|
"secret_scanning_enabled": 0, |
|
"total_dependabot_alerts": 0, |
|
"total_code_scanning_alerts": 0, |
|
"total_secret_scanning_alerts": 0, |
|
"total_dependabot_prs": 0 |
|
} |
|
|
|
for repo_name, repo_data in repos_data.items(): |
|
languages = repo_data.get("languages", {}) |
|
if languages: |
|
total_repos_with_languages += 1 |
|
for lang, bytes_count in languages.items(): |
|
if lang in language_stats: |
|
language_stats[lang]["bytes"] += bytes_count |
|
language_stats[lang]["repos"] += 1 |
|
else: |
|
language_stats[lang] = {"bytes": bytes_count, "repos": 1} |
|
|
|
# Count repos with README |
|
if repo_data.get("readme"): |
|
total_repos_with_readme += 1 |
|
|
|
# Security statistics |
|
security_data = repo_data.get("security_and_automation", {}) |
|
if security_data.get("github_actions_enabled"): |
|
security_stats["github_actions_enabled"] += 1 |
|
if security_data.get("dependabot_enabled"): |
|
security_stats["dependabot_enabled"] += 1 |
|
if security_data.get("code_scanning_enabled"): |
|
security_stats["code_scanning_enabled"] += 1 |
|
if security_data.get("secret_scanning_enabled"): |
|
security_stats["secret_scanning_enabled"] += 1 |
|
|
|
alerts = security_data.get("alerts", {}) |
|
security_stats["total_dependabot_alerts"] += alerts.get("dependabot", 0) |
|
security_stats["total_code_scanning_alerts"] += alerts.get("code_scanning", 0) |
|
security_stats["total_secret_scanning_alerts"] += alerts.get("secret_scanning", 0) |
|
security_stats["total_dependabot_prs"] += security_data.get("dependabot_open_prs", 0) |
|
|
|
# Sort languages by total bytes |
|
sorted_languages = sorted(language_stats.items(), key=lambda x: x[1]["bytes"], reverse=True) |
|
|
|
print(f"\nLANGUAGE STATISTICS:") |
|
print(f"{'Language':<20} {'Total Bytes':<15} {'Repos Count':<12} {'Avg Bytes/Repo':<15}") |
|
print(f"{'-'*20} {'-'*15} {'-'*12} {'-'*15}") |
|
|
|
for lang, stats in sorted_languages[:20]: # Top 20 languages |
|
avg_bytes = stats["bytes"] // stats["repos"] |
|
print(f"{lang:<20} {stats['bytes']:<15,} {stats['repos']:<12} {avg_bytes:<15,}") |
|
|
|
print(f"\nSECURITY AND AUTOMATION STATISTICS:") |
|
print(f"{'Tool':<25} {'Enabled Repos':<15} {'Percentage':<12}") |
|
print(f"{'-'*25} {'-'*15} {'-'*12}") |
|
|
|
total_repos = len(processed_repos) |
|
if total_repos > 0: |
|
print(f"{'GitHub Actions':<25} {security_stats['github_actions_enabled']:<15} {(security_stats['github_actions_enabled']/total_repos)*100:<12.1f}%") |
|
print(f"{'Dependabot':<25} {security_stats['dependabot_enabled']:<15} {(security_stats['dependabot_enabled']/total_repos)*100:<12.1f}%") |
|
print(f"{'Code Scanning':<25} {security_stats['code_scanning_enabled']:<15} {(security_stats['code_scanning_enabled']/total_repos)*100:<12.1f}%") |
|
print(f"{'Secret Scanning':<25} {security_stats['secret_scanning_enabled']:<15} {(security_stats['secret_scanning_enabled']/total_repos)*100:<12.1f}%") |
|
|
|
print(f"\nSECURITY ALERTS SUMMARY:") |
|
print(f"{'Alert Type':<20} {'Total Count':<15}") |
|
print(f"{'-'*20} {'-'*15}") |
|
print(f"{'Dependabot':<20} {security_stats['total_dependabot_alerts']:<15}") |
|
print(f"{'Code Scanning':<20} {security_stats['total_code_scanning_alerts']:<15}") |
|
print(f"{'Secret Scanning':<20} {security_stats['total_secret_scanning_alerts']:<15}") |
|
print(f"{'Dependabot PRs':<20} {security_stats['total_dependabot_prs']:<15}") |
|
|
|
print(f"\nSUMMARY:") |
|
print(f"Total repositories processed: {len(processed_repos)}") |
|
print(f"Repositories with language data: {total_repos_with_languages}") |
|
print(f"Repositories with README: {total_repos_with_readme}") |
|
print(f"Total unique languages found: {len(language_stats)}") |
|
print(f"Data saved to: {PROGRESS_FILE}") |
|
|
|
if TEST_MODE: |
|
print(f"\nTEST MODE: Only processed {len(all_repos)} repositories") |
|
print("Run without --test flag to process all repositories") |