Created
October 25, 2025 09:29
-
-
Save thedatadavis/38331377b1910c85435d922d3dcbde74 to your computer and use it in GitHub Desktop.
Converts jsonl to sqlite for Webgrab
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import json | |
| import sqlite3 | |
| import sys | |
| import os | |
| def import_jsonl_to_sqlite(jsonl_file_path, db_file_path): | |
| """ | |
| Imports page data from a Webgrab JSON Lines file into an SQLite database. | |
| Each page record includes a batch_id. | |
| """ | |
| conn = None # Initialize conn to None | |
| try: | |
| conn = sqlite3.connect(db_file_path) | |
| cursor = conn.cursor() | |
| # Create table if it doesn't exist, now including batch_id | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS pages ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| url TEXT UNIQUE NOT NULL, | |
| batch_id TEXT NOT NULL, -- Added batch identifier (UUID as TEXT) | |
| site TEXT, | |
| params TEXT, | |
| html_content TEXT, | |
| retrieved_at TEXT NOT NULL | |
| ) | |
| ''') | |
| # Optional: Add an index for faster batch lookups | |
| cursor.execute(''' | |
| CREATE INDEX IF NOT EXISTS idx_batch_id ON pages (batch_id) | |
| ''') | |
| conn.commit() | |
| processed_count = 0 | |
| error_count = 0 | |
| print(f"Importing from {jsonl_file_path} to {db_file_path}...") | |
| # Check if file exists and is readable | |
| if not os.path.exists(jsonl_file_path): | |
| print(f"Error: Input file not found at {jsonl_file_path}") | |
| return | |
| try: | |
| file_size = os.path.getsize(jsonl_file_path) | |
| if file_size == 0: | |
| print("Warning: Input file is empty.") | |
| # Allow continuing if the file is just empty, maybe schema setup was intended. | |
| with open(jsonl_file_path, 'r', encoding='utf-8') as f: | |
| for i, line in enumerate(f): | |
| data = None # Reset data for each line | |
| try: | |
| line = line.strip() | |
| if not line: | |
| continue # Skip empty lines | |
| data = json.loads(line) | |
| # Validate essential fields | |
| if 'url' not in data or 'batchId' not in data or 'retrieved_at' not in data: | |
| print(f"Skipping line {i+1}: Missing essential field (url, batchId, or retrieved_at).") | |
| error_count += 1 | |
| continue | |
| # Use INSERT OR REPLACE to handle potential duplicates based on URL | |
| cursor.execute(''' | |
| INSERT OR REPLACE INTO pages (url, batch_id, site, params, html_content, retrieved_at) | |
| VALUES (?, ?, ?, ?, ?, ?) | |
| ''', ( | |
| data.get('url'), | |
| data.get('batchId'), # Get the batchId from JSON | |
| data.get('site'), | |
| data.get('params'), | |
| data.get('html_content'), | |
| data.get('retrieved_at') | |
| )) | |
| conn.commit() | |
| processed_count += 1 # Count all successful inserts/updates | |
| # Optional: Add progress indicator for large files | |
| if (i + 1) % 100 == 0: | |
| print(f"Processed {i+1} lines...", end='\r') | |
| except json.JSONDecodeError as e: | |
| print(f"\nSkipping invalid JSON on line {i+1}: {e}") | |
| error_count += 1 | |
| except sqlite3.Error as e: | |
| print(f"\nDatabase error on line {i+1} for URL {data.get('url', 'N/A') if data else 'N/A'}: {e}") | |
| error_count += 1 | |
| if conn: # Only rollback if connection is valid | |
| conn.rollback() # Rollback the failed transaction | |
| except Exception as e: | |
| print(f"\nUnexpected error on line {i+1}: {e}") | |
| error_count += 1 | |
| if conn: | |
| conn.rollback() | |
| except IOError as e: | |
| print(f"Error reading input file {jsonl_file_path}: {e}") | |
| error_count += 1 # Count file read error | |
| except sqlite3.Error as e: | |
| print(f"Failed to connect to or initialize database {db_file_path}: {e}") | |
| error_count +=1 # Count DB connection error | |
| except Exception as e: | |
| print(f"An unexpected error occurred during setup: {e}") | |
| error_count +=1 # Count other setup errors | |
| finally: | |
| if conn: | |
| conn.close() | |
| print("\nDatabase connection closed.") | |
| else: | |
| print("\nDatabase connection was not established.") | |
| print(f"\nImport finished.") | |
| print(f"Successfully processed (inserted/updated): {processed_count} records.") | |
| print(f"Errors encountered (incl. skipped lines): {error_count}.") | |
| if __name__ == "__main__": | |
| if len(sys.argv) != 3: | |
| print("Usage: python python_importer.py <input_jsonl_file> <output_sqlite_db_file>") | |
| sys.exit(1) | |
| jsonl_path = sys.argv[1] | |
| db_path = sys.argv[2] | |
| import_jsonl_to_sqlite(jsonl_path, db_path) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment