Last active
December 6, 2025 22:43
-
-
Save mrhalix/0901dd59cee6920186b4041f6b15e794 to your computer and use it in GitHub Desktop.
Generate clickhouse kafka ddls from postgresql tables
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Generate ClickHouse Kafka ingestion DDLs from Postgres table schemas. | |
| Usage: | |
| export PGHOST=... | |
| export PGPORT=5432 | |
| export PGDATABASE=... | |
| export PGUSER=... | |
| export PGPASSWORD=... | |
| python generate_clickhouse_kafka_ddls.py --tables users,orders --schema public \ | |
| --mode snapshot \ | |
| --kafka-bootstrap redpanda-0.redpanda.redpanda.svc.cluster.local:9093 \ | |
| --consumer-prefix clickhouse-consumer | |
| Modes: | |
| snapshot -> assumes JSONEachRow payload that matches column names. | |
| cdc -> Debezium style payload; generates transformation SELECT using JSONExtract*. | |
| Edit the TYPE_MAPPING if needed. | |
| """ | |
| import os | |
| import argparse | |
| import psycopg2 | |
| import json | |
| from psycopg2 import sql | |
| TYPE_MAPPING = { | |
| # postgres_type_lower: (clickhouse_type, nullable_allowed) | |
| 'smallint': ('Int16', True), | |
| 'int2': ('Int16', True), | |
| 'integer': ('Int32', True), | |
| 'int4': ('Int32', True), | |
| 'bigint': ('Int64', True), | |
| 'int8': ('Int64', True), | |
| 'serial': ('Int32', True), | |
| 'bigserial': ('Int64', True), | |
| 'numeric': ('Decimal(38,10)', True), | |
| 'decimal': ('Decimal(38,10)', True), | |
| 'real': ('Float32', True), | |
| 'double precision': ('Float64', True), | |
| 'bool': ('UInt8', True), | |
| 'boolean': ('UInt8', True), | |
| 'text': ('String', True), | |
| 'character varying': ('String', True), | |
| 'varchar': ('String', True), | |
| 'character': ('String', True), | |
| 'char': ('String', True), | |
| 'uuid': ('String', True), | |
| 'json': ('String', True), | |
| 'jsonb': ('String', True), | |
| 'timestamp without time zone': ('DateTime', True), | |
| 'timestamp with time zone': ('DateTime64(3)', True), | |
| 'date': ('Date', True), | |
| 'time without time zone': ('String', True), | |
| 'time with time zone': ('String', True), | |
| 'bytea': ('String', True), | |
| } | |
| CLICKHOUSE_RESERVED = { | |
| 'add', 'after', 'alias', 'all', 'alter', 'and', 'anti', 'any', 'array', 'as', 'asc', 'asof', 'attach', | |
| 'between', 'by', 'case', 'cast', 'check', 'class', 'clear', 'cluster', 'codec', 'collate', 'column', 'comment', | |
| 'constraint', 'create', 'cross', 'cube', 'current', 'database', 'databases', 'date', 'datetime', 'default', | |
| 'delete', 'desc', 'describe', 'detach', 'dictionary', 'dictionaries', 'disk', 'distinct', 'distributed', | |
| 'drop', 'else', 'end', 'engine', 'events', 'exists', 'explain', 'expression', 'extract', 'filesystem', | |
| 'final', 'first', 'flatten', 'flush', 'following', 'for', 'format', 'from', 'full', 'function', 'functions', | |
| 'global', 'grant', 'granularity', 'group', 'having', 'if', 'ilike', 'in', 'index', 'inner', 'insert', | |
| 'interval', 'into', 'is', 'join', 'key', 'kill', 'last', 'left', 'like', 'limit', 'live', 'local', 'materialized', | |
| 'max', 'min', 'modify', 'move', 'mutation', 'nan', 'not', 'null', 'nulls', 'offset', 'on', 'optimize', 'or', | |
| 'order', 'outer', 'outfile', 'over', 'partition', 'populate', 'preceding', 'prewhere', 'primary', 'projection', | |
| 'quota', 'range', 'read', 'reload', 'remove', 'rename', 'replace', 'replica', 'replicated', 'restrict', 'revoke', | |
| "right", "role", "rollup", "row", "rows", "sample", "select", "semi", "settings", "show", "source", "start", | |
| "stop", "storage", "string", "subpartition", "system", "table", "tables", "temporary", "then", "ties", "time", | |
| "to", "top", "totals", "touch", "truncate", "ttl", "type", "unbounded", "union", "update", "use", "using", | |
| "uuid", "values", "view", "volume", "watch", "when", "where", "window", "with", "zookeeper" | |
| } | |
| def escape_identifier(name): | |
| if name.lower() in CLICKHOUSE_RESERVED: | |
| return f"`{name}`" | |
| return name | |
| def get_columns(conn, schema, table): | |
| q = sql.SQL(""" | |
| SELECT column_name, data_type, is_nullable | |
| FROM information_schema.columns | |
| WHERE table_schema = %s AND table_name = %s | |
| ORDER BY ordinal_position | |
| """) | |
| with conn.cursor() as cur: | |
| cur.execute(q, [schema, table]) | |
| rows = cur.fetchall() | |
| columns = [] | |
| for name, dtype, nullable in rows: | |
| ch_type, _ = TYPE_MAPPING.get(dtype.lower(), ('String', True)) | |
| is_null = (nullable.lower() == 'yes') | |
| # Decide final CH type | |
| if is_null: | |
| ch_full = f"Nullable({ch_type})" | |
| else: | |
| ch_full = ch_type | |
| columns.append({ | |
| 'name': name, | |
| 'pg_type': dtype, | |
| 'nullable': is_null, | |
| 'ch_type': ch_full | |
| }) | |
| return columns | |
| def build_snapshot_triad(table, schema, columns, kafka_bootstrap, topic, consumer_group): | |
| # Kafka staging table | |
| cols_def = ",\n ".join(f"{escape_identifier(c['name'])} {c['ch_type']}" for c in columns) | |
| staging = f""" | |
| CREATE TABLE IF NOT EXISTS kafka_queue_{table} ( | |
| {cols_def} | |
| ) ENGINE = Kafka | |
| SETTINGS | |
| kafka_broker_list = '{kafka_bootstrap}', | |
| kafka_topic_list = '{topic}', | |
| kafka_group_name = '{consumer_group}', | |
| kafka_format = 'JSONEachRow', | |
| kafka_skip_broken_messages = 1; | |
| """.strip() | |
| # Target table (add ingested_at) | |
| cols_target = ",\n ".join(f"{escape_identifier(c['name'])} {c['ch_type']}" for c in columns) | |
| target = f""" | |
| CREATE TABLE IF NOT EXISTS {table} ( | |
| {cols_target}, | |
| ingested_at DateTime DEFAULT now() | |
| ) ENGINE = MergeTree | |
| ORDER BY (ingested_at); -- Adjust ORDER BY for production (e.g. primary key or (created_at, id)) | |
| """.strip() | |
| mv_cols = ", ".join(escape_identifier(c['name']) for c in columns) | |
| mv = f""" | |
| CREATE MATERIALIZED VIEW IF NOT EXISTS mv_ingest_{table} | |
| TO {table} AS | |
| SELECT {mv_cols}, now() AS ingested_at | |
| FROM kafka_queue_{table}; | |
| """.strip() | |
| return staging, target, mv | |
| def build_cdc_triad(table, schema, columns, kafka_bootstrap, topic, consumer_group): | |
| # Debezium CDC payload typical structure: | |
| # {"before": {...}, "after": {...}, "op":"c","ts_ms":...} | |
| # We use JSONExtract* on a raw String column payload. | |
| staging = f""" | |
| CREATE TABLE IF NOT EXISTS kafka_queue_{table}_cdc ( | |
| raw String | |
| ) ENGINE = Kafka | |
| SETTINGS | |
| kafka_broker_list = '{kafka_bootstrap}', | |
| kafka_topic_list = '{topic}', | |
| kafka_group_name = '{consumer_group}', | |
| kafka_format = 'JSONEachRow', | |
| kafka_skip_broken_messages = 1; | |
| """.strip() | |
| cols_target = ",\n ".join(f"{escape_identifier(c['name'])} {c['ch_type']}" for c in columns) | |
| target = f""" | |
| CREATE TABLE IF NOT EXISTS {table} ( | |
| {cols_target}, | |
| op Enum8('c'=1,'u'=2,'d'=3) DEFAULT 'c', | |
| change_ts DateTime64(3) DEFAULT now(), | |
| ingested_at DateTime DEFAULT now() | |
| ) ENGINE = MergeTree | |
| ORDER BY (change_ts, ingested_at); | |
| """.strip() | |
| # Build extraction of "after" object fields | |
| extracts = [] | |
| for c in columns: | |
| # Choose extraction function based on assumed type | |
| base_type = c['ch_type'].replace("Nullable(", "").replace(")", "") | |
| col_name = escape_identifier(c['name']) | |
| if base_type.startswith("Int") or base_type.startswith("UInt") or base_type.startswith("Float") or base_type.startswith("Decimal"): | |
| fn = "JSONExtract(raw, 'after', 'JSON').:" # We'll refine | |
| # Simplify: use generic JSONExtract functions | |
| if "Int" in base_type or "UInt" in base_type: | |
| extracts.append(f"toInt64OrNull(JSONExtract(raw, 'after.{c['name']}', 'Nullable(Int64)')) AS {col_name}") | |
| elif "Float" in base_type or "Decimal" in base_type: | |
| extracts.append(f"toFloat64OrNull(JSONExtract(raw, 'after.{c['name']}', 'Nullable(Float64)')) AS {col_name}") | |
| elif base_type == "DateTime" or base_type.startswith("DateTime64"): | |
| extracts.append(f"parseDateTime64BestEffortOrNull(JSONExtractString(raw, 'after.{c['name']}')) AS {col_name}") | |
| elif base_type == "Date": | |
| extracts.append(f"toDateOrNull(JSONExtractString(raw, 'after.{c['name']}')) AS {col_name}") | |
| else: | |
| extracts.append(f"JSONExtractString(raw, 'after.{c['name']}') AS {col_name}") | |
| # op mapping (Debezium: c=insert, u=update, d=delete) | |
| mv = f""" | |
| CREATE MATERIALIZED VIEW IF NOT EXISTS mv_ingest_{table}_cdc | |
| TO {table} AS | |
| SELECT | |
| {",\n ".join(extracts)}, | |
| multiIf(JSONExtractString(raw,'op')='c','c', | |
| JSONExtractString(raw,'op')='u','u', | |
| JSONExtractString(raw,'op')='d','d','c') AS op, | |
| toDateTime64OrNull(JSONExtract(raw,'ts_ms','Nullable(Int64)')/1000,3) AS change_ts, | |
| now() AS ingested_at | |
| FROM kafka_queue_{table}_cdc | |
| WHERE JSONExtractString(raw,'op') IN ('c','u','d'); | |
| """.strip() | |
| return staging, target, mv | |
| def main(): | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--tables", required=True, help="Comma separated table names") | |
| parser.add_argument("--schema", default="public", help="Postgres schema") | |
| parser.add_argument("--mode", choices=["snapshot","cdc"], default="snapshot", help="Ingestion mode") | |
| parser.add_argument("--kafka-bootstrap", required=True, help="Redpanda/Kafka bootstrap") | |
| parser.add_argument("--consumer-prefix", default="clickhouse-consumer", help="Prefix for consumer group names") | |
| parser.add_argument("--topic-prefix", default="pg", help="Topic prefix") | |
| args = parser.parse_args() | |
| pg_params = { | |
| 'host': os.environ.get("PGHOST"), | |
| 'port': os.environ.get("PGPORT", 5432), | |
| 'dbname': os.environ.get("PGDATABASE"), | |
| 'user': os.environ.get("PGUSER"), | |
| 'password': os.environ.get("PGPASSWORD"), | |
| } | |
| missing = [k for k,v in pg_params.items() if not v] | |
| if missing: | |
| raise SystemExit(f"Missing Postgres env vars: {missing}") | |
| conn = psycopg2.connect(**pg_params) | |
| tables = [t.strip() for t in args.tables.split(",") if t.strip()] | |
| output = {} | |
| for table in tables: | |
| cols = get_columns(conn, args.schema, table) | |
| topic = f"{args.topic_prefix}.{args.schema}.{table}" | |
| consumer_group = f"{args.consumer_prefix}-{table}" | |
| if args.mode == "snapshot": | |
| staging, target, mv = build_snapshot_triad(table, args.schema, cols, args.kafka_bootstrap, topic, consumer_group) | |
| else: | |
| staging, target, mv = build_cdc_triad(table, args.schema, cols, args.kafka_bootstrap, topic, consumer_group) | |
| output[table] = { | |
| 'topic': topic, | |
| 'consumer_group': consumer_group, | |
| 'staging': staging, | |
| 'target': target, | |
| 'materialized_view': mv | |
| } | |
| print("\n-- GENERATED DDL (EXECUTE IN CLICKHOUSE) --\n") | |
| for table, ddls in output.items(): | |
| print(f"-- Table: {table} (Topic: {ddls['topic']})") | |
| print(ddls['staging']) | |
| print() | |
| print(ddls['target']) | |
| print() | |
| print(ddls['materialized_view']) | |
| print("\n-- --------------------------------------------------\n") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment