Created
January 26, 2026 15:47
-
-
Save hoseinit/90907ca29127786f8d33d6f7aa717248 to your computer and use it in GitHub Desktop.
A script to find the missing rows from a source table compared with destination table and create a cql file to import the diffs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Cassandra Data Import Tool - Offline CQL Generator | |
| - Reads local CSV dumps for source and target tables | |
| - Finds rows present in source but missing in target | |
| - Writes a CQL script with INSERT statements (optionally IF NOT EXISTS) | |
| - No direct Cassandra connection required | |
| Run: | |
| python3 generate_missing_inserts.py | |
| Then apply: | |
| cqlsh -f D:/temp/missing_rows_inserts.cql | |
| """ | |
| import csv | |
| import sys | |
| from typing import Set, Tuple, List, Dict, Any | |
| # ============================================================================= | |
| # CONFIGURATION | |
| # ============================================================================= | |
| # Source and destination keyspaces/tables | |
| SOURCE_KEYSPACE = '' | |
| SOURCE_TABLE = '' | |
| DEST_KEYSPACE = '' | |
| DEST_TABLE = '' | |
| # File paths (local CSV dumps) | |
| SOURCE_DUMP_FILE = '.csv' # Full export from source table | |
| TARGET_DUMP_FILE = '.csv' # Full export from target table | |
| MISSING_REPORT_FILE = '.csv' # CSV of rows to insert | |
| CQL_OUTPUT_FILE = '.cql' # Generated CQL script | |
| # Column mappings: dest_column -> source_column | |
| COLUMN_MAPPING = { | |
| 'id': 'Id', | |
| } | |
| # Primary key columns in the destination table (in order) | |
| DEST_PRIMARY_KEY = ['', ''] | |
| # Import behavior | |
| SKIP_EXISTING = True # If True, generates IF NOT EXISTS | |
| BATCH_SIZE = 50 # Statements per BEGIN BATCH … APPLY BATCH | |
| DRY_RUN = True # If True, only reports, no CQL file output | |
| # ============================================================================= | |
| # HELPERS | |
| # ============================================================================= | |
| def load_csv_keys(file_path: str, key_columns: List[str]) -> Set[Tuple]: | |
| keys = set() | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| reader = csv.DictReader(f) | |
| for row in reader: | |
| key = tuple(row.get(col, '').strip() for col in key_columns) | |
| keys.add(key) | |
| print(f"✓ Loaded {len(keys)} keys from {file_path}") | |
| return keys | |
| except FileNotFoundError: | |
| print(f"✗ Error: File not found: {file_path}") | |
| sys.exit(1) | |
| except Exception as e: | |
| print(f"✗ Error reading {file_path}: {e}") | |
| sys.exit(1) | |
| def load_csv_rows(file_path: str) -> List[Dict]: | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| reader = csv.DictReader(f) | |
| rows = list(reader) | |
| print(f"✓ Loaded {len(rows)} rows from {file_path}") | |
| return rows | |
| except FileNotFoundError: | |
| print(f"✗ Error: File not found: {file_path}") | |
| sys.exit(1) | |
| except Exception as e: | |
| print(f"✗ Error reading {file_path}: {e}") | |
| sys.exit(1) | |
| def identify_missing_rows(source_rows: List[Dict], target_keys: Set[Tuple], | |
| source_key_cols: List[str]) -> List[Dict]: | |
| missing = [] | |
| for row in source_rows: | |
| key = tuple(row.get(col, '').strip() for col in source_key_cols) | |
| if key not in target_keys: | |
| missing.append(row) | |
| print(f"✓ Found {len(missing)} missing rows to import") | |
| return missing | |
| def map_row_columns(source_row: Dict, column_mapping: Dict) -> Dict: | |
| mapped = {} | |
| for dest_col, source_col in column_mapping.items(): | |
| value = source_row.get(source_col, '') | |
| if value == '' or value is None: | |
| mapped[dest_col] = None | |
| elif dest_col in ['']: | |
| try: | |
| mapped[dest_col] = int(value) | |
| except (ValueError, TypeError): | |
| mapped[dest_col] = None | |
| elif dest_col in ['', '']: | |
| try: | |
| mapped[dest_col] = float(value) | |
| except (ValueError, TypeError): | |
| mapped[dest_col] = None | |
| else: | |
| mapped[dest_col] = str(value).strip() | |
| return mapped | |
| def cql_literal(value: Any) -> str: | |
| if value is None: | |
| return 'null' | |
| if isinstance(value, str): | |
| escaped = value.replace("'", "''") | |
| return f"'{escaped}'" | |
| if isinstance(value, bool): | |
| return 'true' if value else 'false' | |
| return str(value) | |
| def write_cql_script(mapped_rows: List[Dict], dest_columns: List[str], | |
| output_file: str, batch_size: int, skip_existing: bool): | |
| if not mapped_rows: | |
| print("No rows to write into CQL script.") | |
| return | |
| column_list = ', '.join(f'"{col}"' for col in dest_columns) | |
| suffix = " IF NOT EXISTS" if skip_existing else "" | |
| # Group by partition key if using conditional statements | |
| use_batching = not skip_existing # Don't batch with IF NOT EXISTS | |
| try: | |
| with open(output_file, 'w', encoding='utf-8', newline='\n') as f: | |
| f.write(f"-- Generated CQL insert script for {DEST_KEYSPACE}.{DEST_TABLE}\n") | |
| f.write(f"-- Rows: {len(mapped_rows)} | Batch size: {batch_size if use_batching else 'N/A (using IF NOT EXISTS)'} | IF NOT EXISTS: {skip_existing}\n") | |
| if skip_existing: | |
| f.write(f"-- NOTE: Batching disabled because conditional statements (IF NOT EXISTS) cannot span multiple partitions\n\n") | |
| else: | |
| f.write("\n") | |
| if use_batching: | |
| # Original batching logic (when not using IF NOT EXISTS) | |
| for start in range(0, len(mapped_rows), batch_size): | |
| batch = mapped_rows[start:start + batch_size] | |
| if len(batch) > 1: | |
| f.write("BEGIN BATCH\n") | |
| for row in batch: | |
| values = ', '.join(cql_literal(row[col]) for col in dest_columns) | |
| f.write( | |
| f"INSERT INTO {DEST_KEYSPACE}.{DEST_TABLE} ({column_list}) " | |
| f"VALUES ({values}){suffix};\n" | |
| ) | |
| if len(batch) > 1: | |
| f.write("APPLY BATCH;\n") | |
| f.write("\n") | |
| else: | |
| # Individual inserts (when using IF NOT EXISTS) | |
| for row in mapped_rows: | |
| values = ', '.join(cql_literal(row[col]) for col in dest_columns) | |
| f.write( | |
| f"INSERT INTO {DEST_KEYSPACE}.{DEST_TABLE} ({column_list}) " | |
| f"VALUES ({values}){suffix};\n" | |
| ) | |
| print(f"✓ Wrote CQL script with {len(mapped_rows)} rows to {output_file}") | |
| except Exception as e: | |
| print(f"✗ Error writing CQL script: {e}") | |
| sys.exit(1) | |
| def save_missing_report(missing_rows: List[Dict], output_file: str): | |
| if not missing_rows: | |
| print("No missing rows to report.") | |
| return | |
| try: | |
| with open(output_file, 'w', newline='', encoding='utf-8') as f: | |
| writer = csv.DictWriter(f, fieldnames=missing_rows[0].keys()) | |
| writer.writeheader() | |
| writer.writerows(missing_rows) | |
| print(f"✓ Saved missing rows report to {output_file}") | |
| except Exception as e: | |
| print(f"✗ Error saving report: {e}") | |
| sys.exit(1) | |
| # ============================================================================= | |
| # MAIN | |
| # ============================================================================= | |
| def main(): | |
| print("=" * 70) | |
| print("Cassandra Data Import Tool - Offline CQL Generator") | |
| print("=" * 70) | |
| print() | |
| print("Step 1: Loading data dumps...") | |
| print("-" * 70) | |
| source_key_cols = [COLUMN_MAPPING[pk] for pk in DEST_PRIMARY_KEY] | |
| source_rows = load_csv_rows(SOURCE_DUMP_FILE) | |
| target_keys = load_csv_keys(TARGET_DUMP_FILE, DEST_PRIMARY_KEY) | |
| print() | |
| print("Step 2: Identifying missing rows...") | |
| print("-" * 70) | |
| missing_rows = identify_missing_rows(source_rows, target_keys, source_key_cols) | |
| print() | |
| if not missing_rows: | |
| print("✓ No missing rows found. Source and target are in sync!") | |
| return | |
| print("Step 3: Saving missing rows report...") | |
| print("-" * 70) | |
| save_missing_report(missing_rows, MISSING_REPORT_FILE) | |
| print() | |
| if DRY_RUN: | |
| print("DRY RUN MODE - No CQL file generated. Set DRY_RUN = False to emit CQL.") | |
| return | |
| print("Step 4: Generating CQL script...") | |
| print("-" * 70) | |
| mapped_rows = [map_row_columns(row, COLUMN_MAPPING) for row in missing_rows] | |
| dest_columns = list(COLUMN_MAPPING.keys()) | |
| write_cql_script(mapped_rows, dest_columns, CQL_OUTPUT_FILE, BATCH_SIZE, SKIP_EXISTING) | |
| print() | |
| print("=" * 70) | |
| print("All done!") | |
| print("=" * 70) | |
| if __name__ == '__main__': | |
| try: | |
| main() | |
| except KeyboardInterrupt: | |
| print("\n\n✗ Interrupted by user. Exiting...") | |
| sys.exit(1) | |
| except Exception as e: | |
| print(f"\n\n✗ Unexpected error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment