Last active
December 5, 2025 18:13
-
-
Save iklobato/13ff11dc8d7f5d76ea1a3a03c44de1ec to your computer and use it in GitHub Desktop.
asdasd
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| The translator takes a MongoDB query string like "db.user.find({age:{$gte:21}})" from the command line. | |
| The QueryParser fixes the string by adding quotes around keys and operators, then runs it with a fake database | |
| object that records the collection name, conditions, and which columns to return. The SqlTranslator then walks | |
| through the query: it checks for $or or $and at the top, and for each field it looks for operators like $gte or $in. | |
| When it finds an operator, it asks the OperationFactory for the right operation class (like GteOperation or InOperation); | |
| the factory makes one instance per operator and reuses it. Each operation class has an execute method that | |
| handles its own operator: comparison operators format the value and build something like "age >= 21", logical | |
| operators handle lists and join them with OR or AND, and $in formats multiple values into an IN clause. | |
| The translator combines all conditions with AND, builds the column list from the projection, and fills in | |
| a SQL SELECT template to produce the final SQL query string. | |
| """ | |
| import re | |
| import sys | |
| from abc import ABC, abstractmethod | |
| from enum import Enum | |
| class MongoOperations(Enum): | |
| OR = ('$or', 'OR') | |
| AND = ('$and', 'AND') | |
| LT = ('$lt', '<') | |
| LTE = ('$lte', '<=') | |
| GT = ('$gt', '>') | |
| GTE = ('$gte', '>=') | |
| NE = ('$ne', '!=') | |
| IN = ('$in', 'IN') | |
| @property | |
| def mongo(self): | |
| return self.value[0] | |
| @property | |
| def sql(self): | |
| return self.value[1] | |
| class SqlQueryTemplate(Enum): | |
| SELECT = 'SELECT {columns} FROM {table} WHERE {where_clause}' | |
| class BaseOperation(ABC): | |
| SYMBOL: tuple | |
| @staticmethod | |
| def format_value(value): | |
| if isinstance(value, (int, float)): | |
| return str(value) | |
| return f"'{value}'" | |
| @abstractmethod | |
| def execute(self, key, value, context): | |
| pass | |
| class OrOperation(BaseOperation): | |
| SYMBOL = MongoOperations.OR.value | |
| def execute(self, key, value, context): | |
| if not isinstance(value, list): | |
| raise ValueError(f"{MongoOperations.OR.mongo} expects a list, got {type(value)}") | |
| conditions = [] | |
| for condition_dict in value: | |
| condition = context._build_where_clause(condition_dict) | |
| conditions.append(f"({condition})") | |
| return f" {MongoOperations.OR.sql} ".join(conditions) if conditions else "1=1" | |
| class AndOperation(BaseOperation): | |
| SYMBOL = MongoOperations.AND.value | |
| def execute(self, key, value, context): | |
| if not isinstance(value, list): | |
| raise ValueError(f"{MongoOperations.AND.mongo} expects a list, got {type(value)}") | |
| conditions = [] | |
| for condition_dict in value: | |
| condition = context._build_where_clause(condition_dict) | |
| conditions.append(f"({condition})") | |
| return f" {MongoOperations.AND.sql} ".join(conditions) if conditions else "1=1" | |
| class LtOperation(BaseOperation): | |
| SYMBOL = MongoOperations.LT.value | |
| def execute(self, key, value, context): | |
| formatted_value = BaseOperation.format_value(value) | |
| return f"{key} {MongoOperations.LT.sql} {formatted_value}" | |
| class LteOperation(BaseOperation): | |
| SYMBOL = MongoOperations.LTE.value | |
| def execute(self, key, value, context): | |
| formatted_value = BaseOperation.format_value(value) | |
| return f"{key} {MongoOperations.LTE.sql} {formatted_value}" | |
| class GtOperation(BaseOperation): | |
| SYMBOL = MongoOperations.GT.value | |
| def execute(self, key, value, context): | |
| formatted_value = BaseOperation.format_value(value) | |
| return f"{key} {MongoOperations.GT.sql} {formatted_value}" | |
| class GteOperation(BaseOperation): | |
| SYMBOL = MongoOperations.GTE.value | |
| def execute(self, key, value, context): | |
| formatted_value = BaseOperation.format_value(value) | |
| return f"{key} {MongoOperations.GTE.sql} {formatted_value}" | |
| class NeOperation(BaseOperation): | |
| SYMBOL = MongoOperations.NE.value | |
| def execute(self, key, value, context): | |
| formatted_value = BaseOperation.format_value(value) | |
| return f"{key} {MongoOperations.NE.sql} {formatted_value}" | |
| class InOperation(BaseOperation): | |
| SYMBOL = MongoOperations.IN.value | |
| def execute(self, key, value, context): | |
| if not isinstance(value, list): | |
| raise ValueError(f"{MongoOperations.IN.mongo} expects a list, got {type(value)}") | |
| formatted_values = [BaseOperation.format_value(v) for v in value] | |
| values_str = ", ".join(formatted_values) | |
| return f"{key} {MongoOperations.IN.sql} ({values_str})" | |
| class OperationFactory: | |
| _instances = {} | |
| _operation_classes = { | |
| MongoOperations.OR.mongo: OrOperation, | |
| MongoOperations.AND.mongo: AndOperation, | |
| MongoOperations.LT.mongo: LtOperation, | |
| MongoOperations.LTE.mongo: LteOperation, | |
| MongoOperations.GT.mongo: GtOperation, | |
| MongoOperations.GTE.mongo: GteOperation, | |
| MongoOperations.NE.mongo: NeOperation, | |
| MongoOperations.IN.mongo: InOperation, | |
| } | |
| @classmethod | |
| def get_operation(cls, mongo_symbol): | |
| if mongo_symbol not in cls._instances: | |
| if mongo_symbol in cls._operation_classes: | |
| cls._instances[mongo_symbol] = cls._operation_classes[mongo_symbol]() | |
| else: | |
| return None | |
| return cls._instances.get(mongo_symbol) | |
| class QueryParser: | |
| @staticmethod | |
| def normalize_mongo_syntax(command): | |
| def replace_key(match): | |
| prefix = match.group(1) | |
| key = match.group(2) | |
| if prefix and not prefix.rstrip().endswith("'"): | |
| return f"{prefix}'{key}':" | |
| return match.group(0) | |
| command = re.sub(r'([{,]\s*)(\$?[a-zA-Z_][a-zA-Z0-9_]*)\s*:', replace_key, command) | |
| return command | |
| @classmethod | |
| def parse(cls, query_string): | |
| normalized = cls.normalize_mongo_syntax(query_string) | |
| db = MockDb() | |
| namespace = {'db': db} | |
| exec(normalized, namespace) | |
| if not hasattr(db, '_parsed_query') or db._parsed_query is None: | |
| raise ValueError("Query must call db.collection.find()") | |
| return db._parsed_query | |
| class SqlTranslator: | |
| def __init__(self): | |
| self._current_collection = None | |
| self._factory = OperationFactory() | |
| def translate(self, query_string): | |
| collection_name, query_dict, projection_dict = QueryParser.parse(query_string) | |
| where_clause = self._build_where_clause(query_dict) if query_dict else "1=1" | |
| columns = self._build_columns(projection_dict) if projection_dict else "*" | |
| return SqlQueryTemplate.SELECT.value.format( | |
| columns=columns, | |
| table=collection_name, | |
| where_clause=where_clause | |
| ) | |
| def _build_columns(self, projection): | |
| columns = [] | |
| for key, value in projection.items(): | |
| if value == 1: | |
| columns.append(key) | |
| return ", ".join(columns) if columns else "*" | |
| def _build_where_clause(self, query): | |
| if MongoOperations.OR.mongo in query: | |
| operation = self._factory.get_operation(MongoOperations.OR.mongo) | |
| return operation.execute(None, query[MongoOperations.OR.mongo], self) | |
| if MongoOperations.AND.mongo in query: | |
| operation = self._factory.get_operation(MongoOperations.AND.mongo) | |
| return operation.execute(None, query[MongoOperations.AND.mongo], self) | |
| conditions = [] | |
| for key, value in query.items(): | |
| conditions.append(self._build_single_condition(key, value)) | |
| return " AND ".join(conditions) if conditions else "1=1" | |
| def _build_single_condition(self, key, value): | |
| if isinstance(value, dict): | |
| for operator, op_value in value.items(): | |
| operation = self._factory.get_operation(operator) | |
| if operation: | |
| return operation.execute(key, op_value, self) | |
| formatted_value = BaseOperation.format_value(value) | |
| return f"{key} = {formatted_value}" | |
| class MockDb: | |
| def __init__(self): | |
| self._parsed_query = None | |
| def __getattr__(self, collection_name): | |
| return MockCollection(self, collection_name) | |
| class MockCollection: | |
| def __init__(self, db, collection_name): | |
| self._db = db | |
| self._collection_name = collection_name | |
| def find(self, query=None, projection=None): | |
| self._db._parsed_query = (self._collection_name, query or {}, projection or {}) | |
| return None | |
| if __name__ == "__main__": | |
| if len(sys.argv) < 2: | |
| print("Usage: python app2.py \"db.user.find({name:'julio'})\"") | |
| sys.exit(1) | |
| query_string = sys.argv[1] | |
| translator = SqlTranslator() | |
| sql_query = translator.translate(query_string) | |
| print(sql_query) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment