Skip to content

Instantly share code, notes, and snippets.

@mrhalix
Last active December 6, 2025 22:43
Show Gist options
  • Select an option

  • Save mrhalix/0901dd59cee6920186b4041f6b15e794 to your computer and use it in GitHub Desktop.

Select an option

Save mrhalix/0901dd59cee6920186b4041f6b15e794 to your computer and use it in GitHub Desktop.
Generate clickhouse kafka ddls from postgresql tables
#!/usr/bin/env python3
"""
Generate ClickHouse Kafka ingestion DDLs from Postgres table schemas.
Usage:
export PGHOST=...
export PGPORT=5432
export PGDATABASE=...
export PGUSER=...
export PGPASSWORD=...
python generate_clickhouse_kafka_ddls.py --tables users,orders --schema public \
--mode snapshot \
--kafka-bootstrap redpanda-0.redpanda.redpanda.svc.cluster.local:9093 \
--consumer-prefix clickhouse-consumer
Modes:
snapshot -> assumes JSONEachRow payload that matches column names.
cdc -> Debezium style payload; generates transformation SELECT using JSONExtract*.
Edit the TYPE_MAPPING if needed.
"""
import os
import argparse
import psycopg2
import json
from psycopg2 import sql
TYPE_MAPPING = {
# postgres_type_lower: (clickhouse_type, nullable_allowed)
'smallint': ('Int16', True),
'int2': ('Int16', True),
'integer': ('Int32', True),
'int4': ('Int32', True),
'bigint': ('Int64', True),
'int8': ('Int64', True),
'serial': ('Int32', True),
'bigserial': ('Int64', True),
'numeric': ('Decimal(38,10)', True),
'decimal': ('Decimal(38,10)', True),
'real': ('Float32', True),
'double precision': ('Float64', True),
'bool': ('UInt8', True),
'boolean': ('UInt8', True),
'text': ('String', True),
'character varying': ('String', True),
'varchar': ('String', True),
'character': ('String', True),
'char': ('String', True),
'uuid': ('String', True),
'json': ('String', True),
'jsonb': ('String', True),
'timestamp without time zone': ('DateTime', True),
'timestamp with time zone': ('DateTime64(3)', True),
'date': ('Date', True),
'time without time zone': ('String', True),
'time with time zone': ('String', True),
'bytea': ('String', True),
}
CLICKHOUSE_RESERVED = {
'add', 'after', 'alias', 'all', 'alter', 'and', 'anti', 'any', 'array', 'as', 'asc', 'asof', 'attach',
'between', 'by', 'case', 'cast', 'check', 'class', 'clear', 'cluster', 'codec', 'collate', 'column', 'comment',
'constraint', 'create', 'cross', 'cube', 'current', 'database', 'databases', 'date', 'datetime', 'default',
'delete', 'desc', 'describe', 'detach', 'dictionary', 'dictionaries', 'disk', 'distinct', 'distributed',
'drop', 'else', 'end', 'engine', 'events', 'exists', 'explain', 'expression', 'extract', 'filesystem',
'final', 'first', 'flatten', 'flush', 'following', 'for', 'format', 'from', 'full', 'function', 'functions',
'global', 'grant', 'granularity', 'group', 'having', 'if', 'ilike', 'in', 'index', 'inner', 'insert',
'interval', 'into', 'is', 'join', 'key', 'kill', 'last', 'left', 'like', 'limit', 'live', 'local', 'materialized',
'max', 'min', 'modify', 'move', 'mutation', 'nan', 'not', 'null', 'nulls', 'offset', 'on', 'optimize', 'or',
'order', 'outer', 'outfile', 'over', 'partition', 'populate', 'preceding', 'prewhere', 'primary', 'projection',
'quota', 'range', 'read', 'reload', 'remove', 'rename', 'replace', 'replica', 'replicated', 'restrict', 'revoke',
"right", "role", "rollup", "row", "rows", "sample", "select", "semi", "settings", "show", "source", "start",
"stop", "storage", "string", "subpartition", "system", "table", "tables", "temporary", "then", "ties", "time",
"to", "top", "totals", "touch", "truncate", "ttl", "type", "unbounded", "union", "update", "use", "using",
"uuid", "values", "view", "volume", "watch", "when", "where", "window", "with", "zookeeper"
}
def escape_identifier(name):
if name.lower() in CLICKHOUSE_RESERVED:
return f"`{name}`"
return name
def get_columns(conn, schema, table):
q = sql.SQL("""
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_schema = %s AND table_name = %s
ORDER BY ordinal_position
""")
with conn.cursor() as cur:
cur.execute(q, [schema, table])
rows = cur.fetchall()
columns = []
for name, dtype, nullable in rows:
ch_type, _ = TYPE_MAPPING.get(dtype.lower(), ('String', True))
is_null = (nullable.lower() == 'yes')
# Decide final CH type
if is_null:
ch_full = f"Nullable({ch_type})"
else:
ch_full = ch_type
columns.append({
'name': name,
'pg_type': dtype,
'nullable': is_null,
'ch_type': ch_full
})
return columns
def build_snapshot_triad(table, schema, columns, kafka_bootstrap, topic, consumer_group):
# Kafka staging table
cols_def = ",\n ".join(f"{escape_identifier(c['name'])} {c['ch_type']}" for c in columns)
staging = f"""
CREATE TABLE IF NOT EXISTS kafka_queue_{table} (
{cols_def}
) ENGINE = Kafka
SETTINGS
kafka_broker_list = '{kafka_bootstrap}',
kafka_topic_list = '{topic}',
kafka_group_name = '{consumer_group}',
kafka_format = 'JSONEachRow',
kafka_skip_broken_messages = 1;
""".strip()
# Target table (add ingested_at)
cols_target = ",\n ".join(f"{escape_identifier(c['name'])} {c['ch_type']}" for c in columns)
target = f"""
CREATE TABLE IF NOT EXISTS {table} (
{cols_target},
ingested_at DateTime DEFAULT now()
) ENGINE = MergeTree
ORDER BY (ingested_at); -- Adjust ORDER BY for production (e.g. primary key or (created_at, id))
""".strip()
mv_cols = ", ".join(escape_identifier(c['name']) for c in columns)
mv = f"""
CREATE MATERIALIZED VIEW IF NOT EXISTS mv_ingest_{table}
TO {table} AS
SELECT {mv_cols}, now() AS ingested_at
FROM kafka_queue_{table};
""".strip()
return staging, target, mv
def build_cdc_triad(table, schema, columns, kafka_bootstrap, topic, consumer_group):
# Debezium CDC payload typical structure:
# {"before": {...}, "after": {...}, "op":"c","ts_ms":...}
# We use JSONExtract* on a raw String column payload.
staging = f"""
CREATE TABLE IF NOT EXISTS kafka_queue_{table}_cdc (
raw String
) ENGINE = Kafka
SETTINGS
kafka_broker_list = '{kafka_bootstrap}',
kafka_topic_list = '{topic}',
kafka_group_name = '{consumer_group}',
kafka_format = 'JSONEachRow',
kafka_skip_broken_messages = 1;
""".strip()
cols_target = ",\n ".join(f"{escape_identifier(c['name'])} {c['ch_type']}" for c in columns)
target = f"""
CREATE TABLE IF NOT EXISTS {table} (
{cols_target},
op Enum8('c'=1,'u'=2,'d'=3) DEFAULT 'c',
change_ts DateTime64(3) DEFAULT now(),
ingested_at DateTime DEFAULT now()
) ENGINE = MergeTree
ORDER BY (change_ts, ingested_at);
""".strip()
# Build extraction of "after" object fields
extracts = []
for c in columns:
# Choose extraction function based on assumed type
base_type = c['ch_type'].replace("Nullable(", "").replace(")", "")
col_name = escape_identifier(c['name'])
if base_type.startswith("Int") or base_type.startswith("UInt") or base_type.startswith("Float") or base_type.startswith("Decimal"):
fn = "JSONExtract(raw, 'after', 'JSON').:" # We'll refine
# Simplify: use generic JSONExtract functions
if "Int" in base_type or "UInt" in base_type:
extracts.append(f"toInt64OrNull(JSONExtract(raw, 'after.{c['name']}', 'Nullable(Int64)')) AS {col_name}")
elif "Float" in base_type or "Decimal" in base_type:
extracts.append(f"toFloat64OrNull(JSONExtract(raw, 'after.{c['name']}', 'Nullable(Float64)')) AS {col_name}")
elif base_type == "DateTime" or base_type.startswith("DateTime64"):
extracts.append(f"parseDateTime64BestEffortOrNull(JSONExtractString(raw, 'after.{c['name']}')) AS {col_name}")
elif base_type == "Date":
extracts.append(f"toDateOrNull(JSONExtractString(raw, 'after.{c['name']}')) AS {col_name}")
else:
extracts.append(f"JSONExtractString(raw, 'after.{c['name']}') AS {col_name}")
# op mapping (Debezium: c=insert, u=update, d=delete)
mv = f"""
CREATE MATERIALIZED VIEW IF NOT EXISTS mv_ingest_{table}_cdc
TO {table} AS
SELECT
{",\n ".join(extracts)},
multiIf(JSONExtractString(raw,'op')='c','c',
JSONExtractString(raw,'op')='u','u',
JSONExtractString(raw,'op')='d','d','c') AS op,
toDateTime64OrNull(JSONExtract(raw,'ts_ms','Nullable(Int64)')/1000,3) AS change_ts,
now() AS ingested_at
FROM kafka_queue_{table}_cdc
WHERE JSONExtractString(raw,'op') IN ('c','u','d');
""".strip()
return staging, target, mv
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--tables", required=True, help="Comma separated table names")
parser.add_argument("--schema", default="public", help="Postgres schema")
parser.add_argument("--mode", choices=["snapshot","cdc"], default="snapshot", help="Ingestion mode")
parser.add_argument("--kafka-bootstrap", required=True, help="Redpanda/Kafka bootstrap")
parser.add_argument("--consumer-prefix", default="clickhouse-consumer", help="Prefix for consumer group names")
parser.add_argument("--topic-prefix", default="pg", help="Topic prefix")
args = parser.parse_args()
pg_params = {
'host': os.environ.get("PGHOST"),
'port': os.environ.get("PGPORT", 5432),
'dbname': os.environ.get("PGDATABASE"),
'user': os.environ.get("PGUSER"),
'password': os.environ.get("PGPASSWORD"),
}
missing = [k for k,v in pg_params.items() if not v]
if missing:
raise SystemExit(f"Missing Postgres env vars: {missing}")
conn = psycopg2.connect(**pg_params)
tables = [t.strip() for t in args.tables.split(",") if t.strip()]
output = {}
for table in tables:
cols = get_columns(conn, args.schema, table)
topic = f"{args.topic_prefix}.{args.schema}.{table}"
consumer_group = f"{args.consumer_prefix}-{table}"
if args.mode == "snapshot":
staging, target, mv = build_snapshot_triad(table, args.schema, cols, args.kafka_bootstrap, topic, consumer_group)
else:
staging, target, mv = build_cdc_triad(table, args.schema, cols, args.kafka_bootstrap, topic, consumer_group)
output[table] = {
'topic': topic,
'consumer_group': consumer_group,
'staging': staging,
'target': target,
'materialized_view': mv
}
print("\n-- GENERATED DDL (EXECUTE IN CLICKHOUSE) --\n")
for table, ddls in output.items():
print(f"-- Table: {table} (Topic: {ddls['topic']})")
print(ddls['staging'])
print()
print(ddls['target'])
print()
print(ddls['materialized_view'])
print("\n-- --------------------------------------------------\n")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment