Skip to content

Instantly share code, notes, and snippets.

@iklobato
Last active December 5, 2025 18:13
Show Gist options
  • Select an option

  • Save iklobato/13ff11dc8d7f5d76ea1a3a03c44de1ec to your computer and use it in GitHub Desktop.

Select an option

Save iklobato/13ff11dc8d7f5d76ea1a3a03c44de1ec to your computer and use it in GitHub Desktop.
asdasd
"""
The translator takes a MongoDB query string like "db.user.find({age:{$gte:21}})" from the command line.
The QueryParser fixes the string by adding quotes around keys and operators, then runs it with a fake database
object that records the collection name, conditions, and which columns to return. The SqlTranslator then walks
through the query: it checks for $or or $and at the top, and for each field it looks for operators like $gte or $in.
When it finds an operator, it asks the OperationFactory for the right operation class (like GteOperation or InOperation);
the factory makes one instance per operator and reuses it. Each operation class has an execute method that
handles its own operator: comparison operators format the value and build something like "age >= 21", logical
operators handle lists and join them with OR or AND, and $in formats multiple values into an IN clause.
The translator combines all conditions with AND, builds the column list from the projection, and fills in
a SQL SELECT template to produce the final SQL query string.
"""
import re
import sys
from abc import ABC, abstractmethod
from enum import Enum
class MongoOperations(Enum):
OR = ('$or', 'OR')
AND = ('$and', 'AND')
LT = ('$lt', '<')
LTE = ('$lte', '<=')
GT = ('$gt', '>')
GTE = ('$gte', '>=')
NE = ('$ne', '!=')
IN = ('$in', 'IN')
@property
def mongo(self):
return self.value[0]
@property
def sql(self):
return self.value[1]
class SqlQueryTemplate(Enum):
SELECT = 'SELECT {columns} FROM {table} WHERE {where_clause}'
class BaseOperation(ABC):
SYMBOL: tuple
@staticmethod
def format_value(value):
if isinstance(value, (int, float)):
return str(value)
return f"'{value}'"
@abstractmethod
def execute(self, key, value, context):
pass
class OrOperation(BaseOperation):
SYMBOL = MongoOperations.OR.value
def execute(self, key, value, context):
if not isinstance(value, list):
raise ValueError(f"{MongoOperations.OR.mongo} expects a list, got {type(value)}")
conditions = []
for condition_dict in value:
condition = context._build_where_clause(condition_dict)
conditions.append(f"({condition})")
return f" {MongoOperations.OR.sql} ".join(conditions) if conditions else "1=1"
class AndOperation(BaseOperation):
SYMBOL = MongoOperations.AND.value
def execute(self, key, value, context):
if not isinstance(value, list):
raise ValueError(f"{MongoOperations.AND.mongo} expects a list, got {type(value)}")
conditions = []
for condition_dict in value:
condition = context._build_where_clause(condition_dict)
conditions.append(f"({condition})")
return f" {MongoOperations.AND.sql} ".join(conditions) if conditions else "1=1"
class LtOperation(BaseOperation):
SYMBOL = MongoOperations.LT.value
def execute(self, key, value, context):
formatted_value = BaseOperation.format_value(value)
return f"{key} {MongoOperations.LT.sql} {formatted_value}"
class LteOperation(BaseOperation):
SYMBOL = MongoOperations.LTE.value
def execute(self, key, value, context):
formatted_value = BaseOperation.format_value(value)
return f"{key} {MongoOperations.LTE.sql} {formatted_value}"
class GtOperation(BaseOperation):
SYMBOL = MongoOperations.GT.value
def execute(self, key, value, context):
formatted_value = BaseOperation.format_value(value)
return f"{key} {MongoOperations.GT.sql} {formatted_value}"
class GteOperation(BaseOperation):
SYMBOL = MongoOperations.GTE.value
def execute(self, key, value, context):
formatted_value = BaseOperation.format_value(value)
return f"{key} {MongoOperations.GTE.sql} {formatted_value}"
class NeOperation(BaseOperation):
SYMBOL = MongoOperations.NE.value
def execute(self, key, value, context):
formatted_value = BaseOperation.format_value(value)
return f"{key} {MongoOperations.NE.sql} {formatted_value}"
class InOperation(BaseOperation):
SYMBOL = MongoOperations.IN.value
def execute(self, key, value, context):
if not isinstance(value, list):
raise ValueError(f"{MongoOperations.IN.mongo} expects a list, got {type(value)}")
formatted_values = [BaseOperation.format_value(v) for v in value]
values_str = ", ".join(formatted_values)
return f"{key} {MongoOperations.IN.sql} ({values_str})"
class OperationFactory:
_instances = {}
_operation_classes = {
MongoOperations.OR.mongo: OrOperation,
MongoOperations.AND.mongo: AndOperation,
MongoOperations.LT.mongo: LtOperation,
MongoOperations.LTE.mongo: LteOperation,
MongoOperations.GT.mongo: GtOperation,
MongoOperations.GTE.mongo: GteOperation,
MongoOperations.NE.mongo: NeOperation,
MongoOperations.IN.mongo: InOperation,
}
@classmethod
def get_operation(cls, mongo_symbol):
if mongo_symbol not in cls._instances:
if mongo_symbol in cls._operation_classes:
cls._instances[mongo_symbol] = cls._operation_classes[mongo_symbol]()
else:
return None
return cls._instances.get(mongo_symbol)
class QueryParser:
@staticmethod
def normalize_mongo_syntax(command):
def replace_key(match):
prefix = match.group(1)
key = match.group(2)
if prefix and not prefix.rstrip().endswith("'"):
return f"{prefix}'{key}':"
return match.group(0)
command = re.sub(r'([{,]\s*)(\$?[a-zA-Z_][a-zA-Z0-9_]*)\s*:', replace_key, command)
return command
@classmethod
def parse(cls, query_string):
normalized = cls.normalize_mongo_syntax(query_string)
db = MockDb()
namespace = {'db': db}
exec(normalized, namespace)
if not hasattr(db, '_parsed_query') or db._parsed_query is None:
raise ValueError("Query must call db.collection.find()")
return db._parsed_query
class SqlTranslator:
def __init__(self):
self._current_collection = None
self._factory = OperationFactory()
def translate(self, query_string):
collection_name, query_dict, projection_dict = QueryParser.parse(query_string)
where_clause = self._build_where_clause(query_dict) if query_dict else "1=1"
columns = self._build_columns(projection_dict) if projection_dict else "*"
return SqlQueryTemplate.SELECT.value.format(
columns=columns,
table=collection_name,
where_clause=where_clause
)
def _build_columns(self, projection):
columns = []
for key, value in projection.items():
if value == 1:
columns.append(key)
return ", ".join(columns) if columns else "*"
def _build_where_clause(self, query):
if MongoOperations.OR.mongo in query:
operation = self._factory.get_operation(MongoOperations.OR.mongo)
return operation.execute(None, query[MongoOperations.OR.mongo], self)
if MongoOperations.AND.mongo in query:
operation = self._factory.get_operation(MongoOperations.AND.mongo)
return operation.execute(None, query[MongoOperations.AND.mongo], self)
conditions = []
for key, value in query.items():
conditions.append(self._build_single_condition(key, value))
return " AND ".join(conditions) if conditions else "1=1"
def _build_single_condition(self, key, value):
if isinstance(value, dict):
for operator, op_value in value.items():
operation = self._factory.get_operation(operator)
if operation:
return operation.execute(key, op_value, self)
formatted_value = BaseOperation.format_value(value)
return f"{key} = {formatted_value}"
class MockDb:
def __init__(self):
self._parsed_query = None
def __getattr__(self, collection_name):
return MockCollection(self, collection_name)
class MockCollection:
def __init__(self, db, collection_name):
self._db = db
self._collection_name = collection_name
def find(self, query=None, projection=None):
self._db._parsed_query = (self._collection_name, query or {}, projection or {})
return None
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python app2.py \"db.user.find({name:'julio'})\"")
sys.exit(1)
query_string = sys.argv[1]
translator = SqlTranslator()
sql_query = translator.translate(query_string)
print(sql_query)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment