Skip to content

Instantly share code, notes, and snippets.

@poundifdef
Created January 15, 2025 13:37
Show Gist options
  • Select an option

  • Save poundifdef/6f9e6028add195362760b7518d4aed80 to your computer and use it in GitHub Desktop.

Select an option

Save poundifdef/6f9e6028add195362760b7518d4aed80 to your computer and use it in GitHub Desktop.
import json
from typing import Any, List, Dict
class Snowflake:
def generate(self):
# Replace with your implementation of Snowflake ID generator
from random import randint
return randint(1, 1 << 63) # Example implementation
class JSONData:
def __init__(self, table: str, json_data: str):
self.table = table
self.json = json_data
class MultiTableFlattener:
def __init__(self):
self.snowflake = Snowflake()
def flatten_json(self, table: str, data: Any, parent_table: str = "", parent_id: int = None) -> List[JSONData]:
output = []
if isinstance(data, dict): # If the data is an object
oid = self.snowflake.generate()
# rc = {"id": oid}
rc = {table + "_id": oid}
if parent_table:
rc[f"{parent_table}_id"] = parent_id
for key, value in data.items():
if isinstance(value, (dict, list)): # Nested data
try:
nested_data = self.flatten_json(
table + "_" + key, value, table, oid)
# nested_data = self.flatten_json(key, value, table, oid)
output.extend(nested_data)
except Exception as e:
raise e
else:
rc[key] = value
try:
j = json.dumps(rc)
output.append(JSONData(table, j))
except json.JSONDecodeError as e:
raise e
elif isinstance(data, list): # If the data is an array
for item in data:
try:
nested_data = self.flatten_json(
table, item, parent_table, parent_id)
# nested_data = self.flatten_json(parent_table + "_" + table, item, parent_table, parent_id)
output.extend(nested_data)
except Exception as e:
raise e
else: # If the data is a primitive value
oid = self.snowflake.generate()
rc = {"id": oid, table: data}
if parent_table:
rc[f"{parent_table}_id"] = parent_id
try:
j = json.dumps(rc)
output.append(JSONData(table, j))
except json.JSONDecodeError as e:
raise e
return output
d = [{
"name": "John Doe",
"age": 30,
"address": {
"street": "123 Main St",
"city": "Anytown"
},
"hobbies": [
{"name": "Reading", "type": "Indoor"},
{"name": "Cycling", "type": "Outdoor",
}
],
"siblings": [{"name": "Jane", "relation": "Sister"}, {"name": "James", "relation": "Brother"}],
"foods": [{"name":"Apple"}, {"name":"Banana"}, {"name":"Cherry"}]
},
{
"name": "Picard",
"age": 30,
"address": {
"street": "123 Main St",
"city": "Anytown"
},
"hobbies": [
{"name": "Reading", "type": "Books"},
{"name": "Cycling", "type": "Outdoor"},
],
"siblings": [{"name": "A", "relation": "B"}, ],
"foods": [{"name":"Tea"}, {"name":"Hot"}]
}
]
cols = [
{
"header": "Name",
"path": ["txn", "name"],
"join": "rows"
},
{
"header": "Sibling Name",
"path": ["txn", "txn_siblings", "name"],
"join": "rows"
},
{
"header": "Sibling Relation",
"path": ["txn", "txn_siblings", "relation"],
"join": "rows"
},
{
"header": "Reading",
"path": ["txn", "txn_hobbies", "type"],
"join": "cols",
"col": ["name", "Reading"]
},
{
"header": "Foods",
"path": ["txn", "txn_foods", "name"],
"join": "concat"
},
]
# "nested": {"scalar": "bar", "list": [1,2], "obj": {"hello":"world"}}
f = MultiTableFlattener()
rc = f.flatten_json("txn", d)
for r in rc:
print(r.table, r.json)
available_fields = [
]
import sqlite3
import json
def generate_sql_query(columns):
"""
Generates a SQL query based on column specifications that handle different types of joins
and aggregations, with dynamic GROUP BY clause.
Args:
columns: List of dictionaries containing column specifications
Returns:
str: Generated SQL query
"""
# Track tables we need to join
tables = set()
select_parts = []
join_clauses = set() # Using set to avoid duplicate joins
group_by_parts = []
# Helper to generate table alias
def get_alias(path):
return path[-2]
return '_'.join(path[:-1]) # Exclude the column name from alias
# Helper to quote identifiers that contain spaces
def quote_identifier(identifier):
return f'"{identifier}"' if ' ' in identifier else identifier
# Process each column
for col in columns:
path = col['path']
alias = get_alias(path)
column_name = path[-1] # Last element is always the column name
# Add all tables we encounter to the set (except last element which is column name)
for i in range(len(path) - 1): # -1 because last element is column name
if i > 0: # Skip the base table (txn)
# parent = '_'.join(path[:i])
parent = path[i-1]
current_table = path[i]
# Generate join clause
join_clause = f"LEFT JOIN {current_table} ON {parent}.txn_id = {alias}.txn_id"
# join_clause = f"LEFT JOIN {current_table} AS {alias} ON {parent}.txn_id = {alias}.txn_id"
join_clauses.add(join_clause)
# Handle different join types
if col['join'] == 'rows':
# For row-based joins, just select the column
column_reference = f"{alias}.{column_name}"
select_parts.append(f"{column_reference} as {quote_identifier(col['header'])}")
# Add to GROUP BY if it's a 'rows' type
group_by_parts.append(column_reference)
elif col['join'] == 'cols':
# For column-based joins, use CASE statement
condition = f"{alias}.{col['col'][0]} = '{col['col'][1]}'"
select_parts.append(
f"MAX(CASE WHEN {condition} THEN {alias}.{column_name} END) as {quote_identifier(col['header'])}"
)
elif col['join'] == 'concat':
# For concatenation, use GROUP_CONCAT in a subquery
subquery = f"(SELECT GROUP_CONCAT({alias}.{column_name}) FROM {alias} WHERE {alias}.txn_id = txn.txn_id) as {quote_identifier(col['header'])}"
select_parts.append(subquery)
# Construct the final query
base_table = "txn"
query = f"""
SELECT {', '.join(select_parts)}
FROM {base_table}
{' '.join(sorted(join_clauses))}""" # Sort joins for consistent output
# Add GROUP BY clause only if we have grouping columns
if group_by_parts:
query += f"\nGROUP BY {', '.join(group_by_parts)}"
return query.strip()
sql = generate_sql_query(cols)
print(sql)
def setup_sqlite_database(data_rows, db_path=":memory:"):
"""
Creates SQLite tables and populates them with data based on input rows.
Args:
data_rows (list): List of strings, each representing a table row with JSON data
db_path (str): Path to SQLite database file, defaults to in-memory database
Returns:
sqlite3.Connection: Database connection object
"""
# Connect to database
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Track table schemas
table_schemas = {}
# First pass: determine schema for each table
for row in data_rows:
# Parse table name and data
# table_name, json_data = row.split(" ", 1)
table_name = row.table
json_data = row.json
data = json.loads(json_data)
if table_name not in table_schemas:
# Create schema based on first occurrence of table
columns = []
for key, value in data.items():
# Determine SQLite type based on Python type
sql_type = "TEXT" # default type
if isinstance(value, int):
sql_type = "INTEGER"
elif isinstance(value, float):
sql_type = "REAL"
columns.append((key, sql_type))
table_schemas[table_name] = columns
# Create tables
for table_name, columns in table_schemas.items():
# Create CREATE TABLE statement
column_defs = [f"{col[0]} {col[1]}" for col in columns]
create_stmt = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
{', '.join(column_defs)}
)
"""
cursor.execute(create_stmt)
# Create index on txn_id if it exists in table
if any(col[0] == 'txn_id' for col in columns):
index_stmt = f"""
CREATE INDEX IF NOT EXISTS idx_{table_name}_txn_id
ON {table_name}(txn_id)
"""
cursor.execute(index_stmt)
# Second pass: insert data
for row in data_rows:
# table_name, json_data = row.split(" ", 1)
table_name = row.table
json_data = row.json
data = json.loads(json_data)
# Create INSERT statement
columns = list(data.keys())
placeholders = ','.join(['?' for _ in columns])
insert_stmt = f"""
INSERT INTO {table_name} ({','.join(columns)})
VALUES ({placeholders})
"""
# Execute insert with values
values = [data[col] for col in columns]
cursor.execute(insert_stmt, values)
# Commit changes and return connection
conn.commit()
return conn
# Create database
conn = setup_sqlite_database(rc)
# Verify data
cursor = conn.cursor()
cursor.execute(sql)
print(cursor.fetchall())
# for table in ['txn', 'txn_address', 'txn_hobbies', 'txn_siblings', 'txn_foods']:
# print(f"\nContents of {table}:")
# cursor.execute(f"SELECT * FROM {table}")
# print(cursor.fetchall())
# return conn
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment