Skip to content

Instantly share code, notes, and snippets.

@abhirockzz
Last active December 8, 2025 05:07
Show Gist options
  • Select an option

  • Save abhirockzz/76766da88203edfa6181ab67a9dae8a5 to your computer and use it in GitHub Desktop.

Select an option

Save abhirockzz/76766da88203edfa6181ab67a9dae8a5 to your computer and use it in GitHub Desktop.
Cosmos DB emulator query examples
import json
from azure.cosmos import CosmosClient, PartitionKey, exceptions
# Emulator Configuration
ENDPOINT = "http://localhost:8081"
KEY = "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw=="
DATABASE_NAME = "TestDB"
CONTAINER_NAME = "Families"
def get_client():
"""Create Cosmos DB client"""
return CosmosClient(ENDPOINT, KEY, connection_verify=False)
def setup_database(client):
"""Create database and container"""
try:
database = client.create_database(id=DATABASE_NAME)
except exceptions.CosmosResourceExistsError:
database = client.get_database_client(DATABASE_NAME)
try:
container = database.create_container(
id=CONTAINER_NAME, partition_key=PartitionKey(path="/id")
)
except exceptions.CosmosResourceExistsError:
container = database.get_container_client(CONTAINER_NAME)
return container
def load_sample_data(container):
"""Load family dataset"""
# Clear existing data
items = list(
container.query_items(
query="SELECT * FROM c", enable_cross_partition_query=True
)
)
for item in items:
container.delete_item(item=item["id"], partition_key=item["id"])
# Sample families (same as shown in the blog)
families = [
{
"id": "AndersenFamily",
"lastName": "Andersen",
"parents": [
{"firstName": "Thomas", "relationship": "father"},
{"firstName": "Mary Kay", "relationship": "mother"},
],
"children": [
{
"firstName": "Henriette",
"grade": 5,
"pets": [{"name": "Fluffy", "type": "Rabbit"}],
}
],
"tags": ["seattle", "active", "family-friendly"],
"address": {"state": "WA", "city": "Seattle"},
},
{
"id": "WakefieldFamily",
"lastName": "Wakefield",
"parents": [
{"firstName": "Robin", "relationship": "mother"},
{"firstName": "Ben", "relationship": "father"},
],
"children": [
{
"firstName": "Jesse",
"grade": 8,
"pets": [
{"name": "Goofy", "type": "Dog"},
{"name": "Shadow", "type": "Horse"},
],
},
{"firstName": "Lisa", "grade": 1, "pets": []},
],
"tags": ["newyork", "urban"],
"address": {"state": "NY", "city": "New York"},
},
{
"id": "MillerFamily",
"lastName": "Miller",
"parents": [{"firstName": "David", "relationship": "father"}],
"children": [
{
"firstName": "Emma",
"grade": 6,
"pets": [{"name": "Whiskers", "type": "Cat"}],
}
],
"tags": ["boston", "academic"],
"address": {"state": "MA", "city": "Boston"},
},
]
for family in families:
container.upsert_item(family)
def run_query(container, query, title):
"""Execute query and display results"""
print(f"\n{'='*70}\n{title}\n{'='*70}")
print(f"Query: {query}\n")
results = list(
container.query_items(query=query, enable_cross_partition_query=True)
)
print(f"Results: {len(results)} row(s)\n")
for result in results:
print(json.dumps(result, indent=2))
def main():
client = get_client()
container = setup_database(client)
load_sample_data(container)
# Basic JOIN
run_query(
container,
"SELECT f.id, f.lastName, c.firstName, c.grade FROM f JOIN c IN f.children",
"Basic JOIN",
)
# JOIN with Filter
run_query(
container,
"SELECT f.id, f.lastName, c.firstName, c.grade FROM f JOIN c IN f.children WHERE c.grade >= 6",
"JOIN with Filter",
)
# Nested JOIN
run_query(
container,
"SELECT f.id, c.firstName AS child, p.name AS pet, p.type FROM f JOIN c IN f.children JOIN p IN c.pets",
"Nested JOIN",
)
# Cross Product JOIN
run_query(
container,
"SELECT f.id, c.firstName AS child, t AS tag FROM f JOIN c IN f.children JOIN t IN f.tags",
"Cross Product JOIN",
)
# Primitive Array JOIN
run_query(
container,
"SELECT f.id, f.lastName, t AS tag FROM f JOIN t IN f.tags",
"Primitive Array JOIN",
)
# Self-JOIN
run_query(
container,
"SELECT f.id, t1 AS tag1, t2 AS tag2 FROM f JOIN t1 IN f.tags JOIN t2 IN f.tags WHERE t1 < t2",
"Self-JOIN",
)
# Complex Filter
run_query(
container,
"SELECT f.id, f.address.state, c.firstName, p.name AS pet FROM f JOIN c IN f.children JOIN p IN c.pets WHERE f.address.state = 'WA' AND p.type = 'Rabbit'",
"Complex Filter",
)
if __name__ == "__main__":
main()
import json
from azure.cosmos import CosmosClient, PartitionKey, exceptions
# Emulator Configuration
ENDPOINT = "http://localhost:8081"
KEY = "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw=="
DATABASE_NAME = "TestDB"
CONTAINER_NAME = "Employees"
def get_client():
"""Create Cosmos DB client"""
return CosmosClient(ENDPOINT, KEY, connection_verify=False)
def setup_database(client):
"""Create database and container"""
try:
database = client.create_database(id=DATABASE_NAME)
print(f"✅ Database '{DATABASE_NAME}' created.")
except exceptions.CosmosResourceExistsError:
database = client.get_database_client(DATABASE_NAME)
print(f"ℹ️ Database '{DATABASE_NAME}' already exists.")
try:
container = database.create_container(
id=CONTAINER_NAME, partition_key=PartitionKey(path="/id")
)
print(f"✅ Container '{CONTAINER_NAME}' created.")
except exceptions.CosmosResourceExistsError:
container = database.get_container_client(CONTAINER_NAME)
print(f"ℹ️ Container '{CONTAINER_NAME}' already exists.")
return container
def load_sample_data(container):
"""Load employee dataset"""
# Clear existing data
items = list(
container.query_items(
query="SELECT * FROM c", enable_cross_partition_query=True
)
)
for item in items:
container.delete_item(item=item["id"], partition_key=item["id"])
employees = [
{
"id": "1",
"firstName": "Alice",
"email": "[email protected]",
"tags": ["senior", "manager", "remote"],
"scores": [85, 92, 78, 95],
},
{
"id": "2",
"firstName": "Bob",
"email": "[email protected]",
"tags": ["junior", "developer"],
"scores": [72, 88, 65, 91],
},
{
"id": "3",
"firstName": "Charlie",
"email": "[email protected]",
"tags": ["senior", "architect", "onsite"],
"scores": [95, 87, 92, 89],
},
]
for employee in employees:
container.upsert_item(employee)
print(f"📝 Inserted employee: {employee['firstName']}")
def run_query(container, query, title):
"""Execute query and display results"""
print(f"\n{'='*70}\n{title}\n{'='*70}")
print(f"Query: {query}\n")
results = list(
container.query_items(query=query, enable_cross_partition_query=True)
)
print(f"Results: {len(results)} row(s)\n")
if results:
print(json.dumps(results, indent=2))
else:
print("No results found.")
def main():
client = get_client()
container = setup_database(client)
load_sample_data(container)
print("\n" + "=" * 40)
print("STRING OPERATIONS")
# String CONCAT
run_query(
container,
"SELECT CONCAT(e.firstName, ' - ', e.email) as contact FROM e",
"CONCAT - Build formatted contact lists",
)
# String LENGTH with filter
run_query(
container,
"SELECT e.firstName FROM e WHERE LENGTH(e.firstName) > 5",
"LENGTH - Filter records by text length",
)
print("\n" + "=" * 40)
print("ARRAY OPERATIONS")
# ARRAY_LENGTH
run_query(
container,
"SELECT e.firstName, ARRAY_LENGTH(e.tags) as skillCount FROM e",
"ARRAY_LENGTH - Count elements in nested arrays",
)
# Array element access
run_query(
container,
"SELECT e.firstName FROM e WHERE e.scores[0] > 85",
"Array Indexing - Query specific array positions",
)
if __name__ == "__main__":
main()
import json
from azure.cosmos import CosmosClient, PartitionKey, exceptions
# Emulator Configuration
ENDPOINT = "http://localhost:8081"
KEY = "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw=="
DATABASE_NAME = "TestDB"
CONTAINER_NAME = "Users"
def get_client():
"""Create Cosmos DB client"""
return CosmosClient(ENDPOINT, KEY, connection_verify=False)
def setup_database(client):
"""Create database and container"""
try:
database = client.create_database(id=DATABASE_NAME)
except exceptions.CosmosResourceExistsError:
database = client.get_database_client(DATABASE_NAME)
try:
container = database.create_container(
id=CONTAINER_NAME, partition_key=PartitionKey(path="/id")
)
except exceptions.CosmosResourceExistsError:
container = database.get_container_client(CONTAINER_NAME)
return container
def load_sample_data(container):
"""Load user profile dataset"""
# Clear existing data
items = list(
container.query_items(
query="SELECT * FROM c", enable_cross_partition_query=True
)
)
for item in items:
container.delete_item(item=item["id"], partition_key=item["id"])
users = [
{
"id": "1",
"username": "user1",
"profile": {
"fullName": "John Doe",
"age": 30,
"contact": {"email": "[email protected]", "phone": "555-0101"},
},
"settings": {"theme": "dark", "notifications": True},
},
{
"id": "2",
"username": "user2",
"profile": {"fullName": "Jane Smith", "age": 25},
"settings": {"theme": "light", "notifications": False},
},
]
for user in users:
container.upsert_item(user)
def run_query(container, query, title):
"""Execute query and display results"""
print(f"\n{'='*70}\n{title}\n{'='*70}")
print(f"Query: {query}\n")
results = list(
container.query_items(query=query, enable_cross_partition_query=True)
)
print(f"Results: {len(results)} row(s)\n")
for result in results:
print(json.dumps(result, indent=2))
def main():
client = get_client()
container = setup_database(client)
load_sample_data(container)
# Select nested properties with IS_DEFINED
run_query(
container,
"SELECT c.username, c.profile.fullName FROM c WHERE IS_DEFINED(c.profile)",
"Selecting Nested Properties",
)
# Check deeply nested object existence
run_query(
container,
"SELECT c.id, c.username FROM c WHERE IS_DEFINED(c.profile.contact)",
"Checking Deeply Nested Object Existence",
)
# Handle missing properties
run_query(
container,
"SELECT c.id, c.profile.contact.phone FROM c",
"Handling Missing Properties",
)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment