Skip to content

Instantly share code, notes, and snippets.

@hoseinit
Created January 26, 2026 15:47
Show Gist options
  • Select an option

  • Save hoseinit/90907ca29127786f8d33d6f7aa717248 to your computer and use it in GitHub Desktop.

Select an option

Save hoseinit/90907ca29127786f8d33d6f7aa717248 to your computer and use it in GitHub Desktop.
A script to find the missing rows from a source table compared with destination table and create a cql file to import the diffs
#!/usr/bin/env python3
"""
Cassandra Data Import Tool - Offline CQL Generator
- Reads local CSV dumps for source and target tables
- Finds rows present in source but missing in target
- Writes a CQL script with INSERT statements (optionally IF NOT EXISTS)
- No direct Cassandra connection required
Run:
python3 generate_missing_inserts.py
Then apply:
cqlsh -f D:/temp/missing_rows_inserts.cql
"""
import csv
import sys
from typing import Set, Tuple, List, Dict, Any
# =============================================================================
# CONFIGURATION
# =============================================================================
# Source and destination keyspaces/tables
SOURCE_KEYSPACE = ''
SOURCE_TABLE = ''
DEST_KEYSPACE = ''
DEST_TABLE = ''
# File paths (local CSV dumps)
SOURCE_DUMP_FILE = '.csv' # Full export from source table
TARGET_DUMP_FILE = '.csv' # Full export from target table
MISSING_REPORT_FILE = '.csv' # CSV of rows to insert
CQL_OUTPUT_FILE = '.cql' # Generated CQL script
# Column mappings: dest_column -> source_column
COLUMN_MAPPING = {
'id': 'Id',
}
# Primary key columns in the destination table (in order)
DEST_PRIMARY_KEY = ['', '']
# Import behavior
SKIP_EXISTING = True # If True, generates IF NOT EXISTS
BATCH_SIZE = 50 # Statements per BEGIN BATCH … APPLY BATCH
DRY_RUN = True # If True, only reports, no CQL file output
# =============================================================================
# HELPERS
# =============================================================================
def load_csv_keys(file_path: str, key_columns: List[str]) -> Set[Tuple]:
keys = set()
try:
with open(file_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
key = tuple(row.get(col, '').strip() for col in key_columns)
keys.add(key)
print(f"✓ Loaded {len(keys)} keys from {file_path}")
return keys
except FileNotFoundError:
print(f"✗ Error: File not found: {file_path}")
sys.exit(1)
except Exception as e:
print(f"✗ Error reading {file_path}: {e}")
sys.exit(1)
def load_csv_rows(file_path: str) -> List[Dict]:
try:
with open(file_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
rows = list(reader)
print(f"✓ Loaded {len(rows)} rows from {file_path}")
return rows
except FileNotFoundError:
print(f"✗ Error: File not found: {file_path}")
sys.exit(1)
except Exception as e:
print(f"✗ Error reading {file_path}: {e}")
sys.exit(1)
def identify_missing_rows(source_rows: List[Dict], target_keys: Set[Tuple],
source_key_cols: List[str]) -> List[Dict]:
missing = []
for row in source_rows:
key = tuple(row.get(col, '').strip() for col in source_key_cols)
if key not in target_keys:
missing.append(row)
print(f"✓ Found {len(missing)} missing rows to import")
return missing
def map_row_columns(source_row: Dict, column_mapping: Dict) -> Dict:
mapped = {}
for dest_col, source_col in column_mapping.items():
value = source_row.get(source_col, '')
if value == '' or value is None:
mapped[dest_col] = None
elif dest_col in ['']:
try:
mapped[dest_col] = int(value)
except (ValueError, TypeError):
mapped[dest_col] = None
elif dest_col in ['', '']:
try:
mapped[dest_col] = float(value)
except (ValueError, TypeError):
mapped[dest_col] = None
else:
mapped[dest_col] = str(value).strip()
return mapped
def cql_literal(value: Any) -> str:
if value is None:
return 'null'
if isinstance(value, str):
escaped = value.replace("'", "''")
return f"'{escaped}'"
if isinstance(value, bool):
return 'true' if value else 'false'
return str(value)
def write_cql_script(mapped_rows: List[Dict], dest_columns: List[str],
output_file: str, batch_size: int, skip_existing: bool):
if not mapped_rows:
print("No rows to write into CQL script.")
return
column_list = ', '.join(f'"{col}"' for col in dest_columns)
suffix = " IF NOT EXISTS" if skip_existing else ""
# Group by partition key if using conditional statements
use_batching = not skip_existing # Don't batch with IF NOT EXISTS
try:
with open(output_file, 'w', encoding='utf-8', newline='\n') as f:
f.write(f"-- Generated CQL insert script for {DEST_KEYSPACE}.{DEST_TABLE}\n")
f.write(f"-- Rows: {len(mapped_rows)} | Batch size: {batch_size if use_batching else 'N/A (using IF NOT EXISTS)'} | IF NOT EXISTS: {skip_existing}\n")
if skip_existing:
f.write(f"-- NOTE: Batching disabled because conditional statements (IF NOT EXISTS) cannot span multiple partitions\n\n")
else:
f.write("\n")
if use_batching:
# Original batching logic (when not using IF NOT EXISTS)
for start in range(0, len(mapped_rows), batch_size):
batch = mapped_rows[start:start + batch_size]
if len(batch) > 1:
f.write("BEGIN BATCH\n")
for row in batch:
values = ', '.join(cql_literal(row[col]) for col in dest_columns)
f.write(
f"INSERT INTO {DEST_KEYSPACE}.{DEST_TABLE} ({column_list}) "
f"VALUES ({values}){suffix};\n"
)
if len(batch) > 1:
f.write("APPLY BATCH;\n")
f.write("\n")
else:
# Individual inserts (when using IF NOT EXISTS)
for row in mapped_rows:
values = ', '.join(cql_literal(row[col]) for col in dest_columns)
f.write(
f"INSERT INTO {DEST_KEYSPACE}.{DEST_TABLE} ({column_list}) "
f"VALUES ({values}){suffix};\n"
)
print(f"✓ Wrote CQL script with {len(mapped_rows)} rows to {output_file}")
except Exception as e:
print(f"✗ Error writing CQL script: {e}")
sys.exit(1)
def save_missing_report(missing_rows: List[Dict], output_file: str):
if not missing_rows:
print("No missing rows to report.")
return
try:
with open(output_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=missing_rows[0].keys())
writer.writeheader()
writer.writerows(missing_rows)
print(f"✓ Saved missing rows report to {output_file}")
except Exception as e:
print(f"✗ Error saving report: {e}")
sys.exit(1)
# =============================================================================
# MAIN
# =============================================================================
def main():
print("=" * 70)
print("Cassandra Data Import Tool - Offline CQL Generator")
print("=" * 70)
print()
print("Step 1: Loading data dumps...")
print("-" * 70)
source_key_cols = [COLUMN_MAPPING[pk] for pk in DEST_PRIMARY_KEY]
source_rows = load_csv_rows(SOURCE_DUMP_FILE)
target_keys = load_csv_keys(TARGET_DUMP_FILE, DEST_PRIMARY_KEY)
print()
print("Step 2: Identifying missing rows...")
print("-" * 70)
missing_rows = identify_missing_rows(source_rows, target_keys, source_key_cols)
print()
if not missing_rows:
print("✓ No missing rows found. Source and target are in sync!")
return
print("Step 3: Saving missing rows report...")
print("-" * 70)
save_missing_report(missing_rows, MISSING_REPORT_FILE)
print()
if DRY_RUN:
print("DRY RUN MODE - No CQL file generated. Set DRY_RUN = False to emit CQL.")
return
print("Step 4: Generating CQL script...")
print("-" * 70)
mapped_rows = [map_row_columns(row, COLUMN_MAPPING) for row in missing_rows]
dest_columns = list(COLUMN_MAPPING.keys())
write_cql_script(mapped_rows, dest_columns, CQL_OUTPUT_FILE, BATCH_SIZE, SKIP_EXISTING)
print()
print("=" * 70)
print("All done!")
print("=" * 70)
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print("\n\n✗ Interrupted by user. Exiting...")
sys.exit(1)
except Exception as e:
print(f"\n\n✗ Unexpected error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment