Created
January 15, 2025 13:37
-
-
Save poundifdef/6f9e6028add195362760b7518d4aed80 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import json | |
| from typing import Any, List, Dict | |
| class Snowflake: | |
| def generate(self): | |
| # Replace with your implementation of Snowflake ID generator | |
| from random import randint | |
| return randint(1, 1 << 63) # Example implementation | |
| class JSONData: | |
| def __init__(self, table: str, json_data: str): | |
| self.table = table | |
| self.json = json_data | |
| class MultiTableFlattener: | |
| def __init__(self): | |
| self.snowflake = Snowflake() | |
| def flatten_json(self, table: str, data: Any, parent_table: str = "", parent_id: int = None) -> List[JSONData]: | |
| output = [] | |
| if isinstance(data, dict): # If the data is an object | |
| oid = self.snowflake.generate() | |
| # rc = {"id": oid} | |
| rc = {table + "_id": oid} | |
| if parent_table: | |
| rc[f"{parent_table}_id"] = parent_id | |
| for key, value in data.items(): | |
| if isinstance(value, (dict, list)): # Nested data | |
| try: | |
| nested_data = self.flatten_json( | |
| table + "_" + key, value, table, oid) | |
| # nested_data = self.flatten_json(key, value, table, oid) | |
| output.extend(nested_data) | |
| except Exception as e: | |
| raise e | |
| else: | |
| rc[key] = value | |
| try: | |
| j = json.dumps(rc) | |
| output.append(JSONData(table, j)) | |
| except json.JSONDecodeError as e: | |
| raise e | |
| elif isinstance(data, list): # If the data is an array | |
| for item in data: | |
| try: | |
| nested_data = self.flatten_json( | |
| table, item, parent_table, parent_id) | |
| # nested_data = self.flatten_json(parent_table + "_" + table, item, parent_table, parent_id) | |
| output.extend(nested_data) | |
| except Exception as e: | |
| raise e | |
| else: # If the data is a primitive value | |
| oid = self.snowflake.generate() | |
| rc = {"id": oid, table: data} | |
| if parent_table: | |
| rc[f"{parent_table}_id"] = parent_id | |
| try: | |
| j = json.dumps(rc) | |
| output.append(JSONData(table, j)) | |
| except json.JSONDecodeError as e: | |
| raise e | |
| return output | |
| d = [{ | |
| "name": "John Doe", | |
| "age": 30, | |
| "address": { | |
| "street": "123 Main St", | |
| "city": "Anytown" | |
| }, | |
| "hobbies": [ | |
| {"name": "Reading", "type": "Indoor"}, | |
| {"name": "Cycling", "type": "Outdoor", | |
| } | |
| ], | |
| "siblings": [{"name": "Jane", "relation": "Sister"}, {"name": "James", "relation": "Brother"}], | |
| "foods": [{"name":"Apple"}, {"name":"Banana"}, {"name":"Cherry"}] | |
| }, | |
| { | |
| "name": "Picard", | |
| "age": 30, | |
| "address": { | |
| "street": "123 Main St", | |
| "city": "Anytown" | |
| }, | |
| "hobbies": [ | |
| {"name": "Reading", "type": "Books"}, | |
| {"name": "Cycling", "type": "Outdoor"}, | |
| ], | |
| "siblings": [{"name": "A", "relation": "B"}, ], | |
| "foods": [{"name":"Tea"}, {"name":"Hot"}] | |
| } | |
| ] | |
| cols = [ | |
| { | |
| "header": "Name", | |
| "path": ["txn", "name"], | |
| "join": "rows" | |
| }, | |
| { | |
| "header": "Sibling Name", | |
| "path": ["txn", "txn_siblings", "name"], | |
| "join": "rows" | |
| }, | |
| { | |
| "header": "Sibling Relation", | |
| "path": ["txn", "txn_siblings", "relation"], | |
| "join": "rows" | |
| }, | |
| { | |
| "header": "Reading", | |
| "path": ["txn", "txn_hobbies", "type"], | |
| "join": "cols", | |
| "col": ["name", "Reading"] | |
| }, | |
| { | |
| "header": "Foods", | |
| "path": ["txn", "txn_foods", "name"], | |
| "join": "concat" | |
| }, | |
| ] | |
| # "nested": {"scalar": "bar", "list": [1,2], "obj": {"hello":"world"}} | |
| f = MultiTableFlattener() | |
| rc = f.flatten_json("txn", d) | |
| for r in rc: | |
| print(r.table, r.json) | |
| available_fields = [ | |
| ] | |
| import sqlite3 | |
| import json | |
| def generate_sql_query(columns): | |
| """ | |
| Generates a SQL query based on column specifications that handle different types of joins | |
| and aggregations, with dynamic GROUP BY clause. | |
| Args: | |
| columns: List of dictionaries containing column specifications | |
| Returns: | |
| str: Generated SQL query | |
| """ | |
| # Track tables we need to join | |
| tables = set() | |
| select_parts = [] | |
| join_clauses = set() # Using set to avoid duplicate joins | |
| group_by_parts = [] | |
| # Helper to generate table alias | |
| def get_alias(path): | |
| return path[-2] | |
| return '_'.join(path[:-1]) # Exclude the column name from alias | |
| # Helper to quote identifiers that contain spaces | |
| def quote_identifier(identifier): | |
| return f'"{identifier}"' if ' ' in identifier else identifier | |
| # Process each column | |
| for col in columns: | |
| path = col['path'] | |
| alias = get_alias(path) | |
| column_name = path[-1] # Last element is always the column name | |
| # Add all tables we encounter to the set (except last element which is column name) | |
| for i in range(len(path) - 1): # -1 because last element is column name | |
| if i > 0: # Skip the base table (txn) | |
| # parent = '_'.join(path[:i]) | |
| parent = path[i-1] | |
| current_table = path[i] | |
| # Generate join clause | |
| join_clause = f"LEFT JOIN {current_table} ON {parent}.txn_id = {alias}.txn_id" | |
| # join_clause = f"LEFT JOIN {current_table} AS {alias} ON {parent}.txn_id = {alias}.txn_id" | |
| join_clauses.add(join_clause) | |
| # Handle different join types | |
| if col['join'] == 'rows': | |
| # For row-based joins, just select the column | |
| column_reference = f"{alias}.{column_name}" | |
| select_parts.append(f"{column_reference} as {quote_identifier(col['header'])}") | |
| # Add to GROUP BY if it's a 'rows' type | |
| group_by_parts.append(column_reference) | |
| elif col['join'] == 'cols': | |
| # For column-based joins, use CASE statement | |
| condition = f"{alias}.{col['col'][0]} = '{col['col'][1]}'" | |
| select_parts.append( | |
| f"MAX(CASE WHEN {condition} THEN {alias}.{column_name} END) as {quote_identifier(col['header'])}" | |
| ) | |
| elif col['join'] == 'concat': | |
| # For concatenation, use GROUP_CONCAT in a subquery | |
| subquery = f"(SELECT GROUP_CONCAT({alias}.{column_name}) FROM {alias} WHERE {alias}.txn_id = txn.txn_id) as {quote_identifier(col['header'])}" | |
| select_parts.append(subquery) | |
| # Construct the final query | |
| base_table = "txn" | |
| query = f""" | |
| SELECT {', '.join(select_parts)} | |
| FROM {base_table} | |
| {' '.join(sorted(join_clauses))}""" # Sort joins for consistent output | |
| # Add GROUP BY clause only if we have grouping columns | |
| if group_by_parts: | |
| query += f"\nGROUP BY {', '.join(group_by_parts)}" | |
| return query.strip() | |
| sql = generate_sql_query(cols) | |
| print(sql) | |
| def setup_sqlite_database(data_rows, db_path=":memory:"): | |
| """ | |
| Creates SQLite tables and populates them with data based on input rows. | |
| Args: | |
| data_rows (list): List of strings, each representing a table row with JSON data | |
| db_path (str): Path to SQLite database file, defaults to in-memory database | |
| Returns: | |
| sqlite3.Connection: Database connection object | |
| """ | |
| # Connect to database | |
| conn = sqlite3.connect(db_path) | |
| cursor = conn.cursor() | |
| # Track table schemas | |
| table_schemas = {} | |
| # First pass: determine schema for each table | |
| for row in data_rows: | |
| # Parse table name and data | |
| # table_name, json_data = row.split(" ", 1) | |
| table_name = row.table | |
| json_data = row.json | |
| data = json.loads(json_data) | |
| if table_name not in table_schemas: | |
| # Create schema based on first occurrence of table | |
| columns = [] | |
| for key, value in data.items(): | |
| # Determine SQLite type based on Python type | |
| sql_type = "TEXT" # default type | |
| if isinstance(value, int): | |
| sql_type = "INTEGER" | |
| elif isinstance(value, float): | |
| sql_type = "REAL" | |
| columns.append((key, sql_type)) | |
| table_schemas[table_name] = columns | |
| # Create tables | |
| for table_name, columns in table_schemas.items(): | |
| # Create CREATE TABLE statement | |
| column_defs = [f"{col[0]} {col[1]}" for col in columns] | |
| create_stmt = f""" | |
| CREATE TABLE IF NOT EXISTS {table_name} ( | |
| {', '.join(column_defs)} | |
| ) | |
| """ | |
| cursor.execute(create_stmt) | |
| # Create index on txn_id if it exists in table | |
| if any(col[0] == 'txn_id' for col in columns): | |
| index_stmt = f""" | |
| CREATE INDEX IF NOT EXISTS idx_{table_name}_txn_id | |
| ON {table_name}(txn_id) | |
| """ | |
| cursor.execute(index_stmt) | |
| # Second pass: insert data | |
| for row in data_rows: | |
| # table_name, json_data = row.split(" ", 1) | |
| table_name = row.table | |
| json_data = row.json | |
| data = json.loads(json_data) | |
| # Create INSERT statement | |
| columns = list(data.keys()) | |
| placeholders = ','.join(['?' for _ in columns]) | |
| insert_stmt = f""" | |
| INSERT INTO {table_name} ({','.join(columns)}) | |
| VALUES ({placeholders}) | |
| """ | |
| # Execute insert with values | |
| values = [data[col] for col in columns] | |
| cursor.execute(insert_stmt, values) | |
| # Commit changes and return connection | |
| conn.commit() | |
| return conn | |
| # Create database | |
| conn = setup_sqlite_database(rc) | |
| # Verify data | |
| cursor = conn.cursor() | |
| cursor.execute(sql) | |
| print(cursor.fetchall()) | |
| # for table in ['txn', 'txn_address', 'txn_hobbies', 'txn_siblings', 'txn_foods']: | |
| # print(f"\nContents of {table}:") | |
| # cursor.execute(f"SELECT * FROM {table}") | |
| # print(cursor.fetchall()) | |
| # return conn |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment