Skip to content

Instantly share code, notes, and snippets.

@pamelafox
Created November 11, 2025 22:27
Show Gist options
  • Select an option

  • Save pamelafox/fb15e5d354c6f5764a82df51c01b9757 to your computer and use it in GitHub Desktop.

Select an option

Save pamelafox/fb15e5d354c6f5764a82df51c01b9757 to your computer and use it in GitHub Desktop.
compare_search_indexes.py
"""Compare documents across two Azure AI Search indexes"""
import argparse
import asyncio
import logging
import os
from collections.abc import Iterable, Mapping
from dataclasses import dataclass, field
from typing import Any, cast
from azure.core.credentials_async import AsyncTokenCredential
from azure.identity.aio import AzureDeveloperCliCredential
from azure.search.documents.aio import SearchClient
from Levenshtein import ratio
from load_azd_env import load_azd_env
logger = logging.getLogger("scripts")
IndexKey = tuple[str | None, str | None]
@dataclass
class IndexComparisonResult:
"""Holds summary data for one index."""
index_name: str
total_documents: int
keys: set[IndexKey]
documents_by_key: dict[IndexKey, list[dict[str, Any]]] = field(default_factory=dict)
async def collect_index_documents(
*, endpoint: str, credential: AsyncTokenCredential, index_name: str
) -> IndexComparisonResult:
"""Collect all documents grouped by (sourcefile, sourcepage) pairs for the specified index."""
keys: set[IndexKey] = set()
documents_by_key: dict[IndexKey, list[dict[str, Any]]] = {}
total_documents = 0
async with SearchClient(endpoint=endpoint, index_name=index_name, credential=credential) as client:
results = await client.search(
search_text="",
select=["*"],
include_total_count=True,
)
async for doc in results:
document = cast(Mapping[str, Any], doc)
total_documents += 1
sourcefile = document.get("sourcefile")
sourcepage = document.get("sourcepage")
key = (sourcefile, sourcepage)
keys.add(key)
if key not in documents_by_key:
documents_by_key[key] = []
documents_by_key[key].append(dict(document))
return IndexComparisonResult(
index_name=index_name, total_documents=total_documents, keys=keys, documents_by_key=documents_by_key
)
def parse_args() -> argparse.Namespace:
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(
description="Compare documents across two Azure AI Search indexes using sourcefile/sourcepage pairs.",
)
parser.add_argument("first_index", help="Name of the first search index to compare.")
parser.add_argument("second_index", help="Name of the second search index to compare.")
return parser.parse_args()
def build_endpoint(service_name: str) -> str:
"""Return the full endpoint URL for the Azure AI Search service."""
return f"https://{service_name}.search.windows.net"
def _match_chunks_by_similarity(
first_docs: list[dict[str, Any]], second_docs: list[dict[str, Any]]
) -> list[tuple[dict[str, Any], dict[str, Any], float]]:
"""
Match chunks from two document lists based on content similarity using Levenshtein ratio.
Returns a list of tuples (doc1, doc2, similarity_score) where each doc1 is matched
to its best matching doc2 based on content similarity.
"""
matched_pairs = []
used_second_indices = set()
for doc1 in first_docs:
content1 = doc1.get("content", "")
best_match = None
best_similarity = 0.0
best_idx = -1
# Find the best matching chunk from second_docs
for idx, doc2 in enumerate(second_docs):
if idx in used_second_indices:
continue
content2 = doc2.get("content", "")
# Normalize whitespace for comparison
normalized1 = " ".join(str(content1).split())
normalized2 = " ".join(str(content2).split())
# Calculate similarity ratio (0.0 to 1.0)
similarity = ratio(normalized1, normalized2)
if similarity > best_similarity:
best_similarity = similarity
best_match = doc2
best_idx = idx
if best_match is not None:
matched_pairs.append((doc1, best_match, best_similarity))
used_second_indices.add(best_idx)
else:
# No match found, pair with None
matched_pairs.append((doc1, {}, 0.0))
# Add any unmatched docs from second_docs
for idx, doc2 in enumerate(second_docs):
if idx not in used_second_indices:
matched_pairs.append(({}, doc2, 0.0))
return matched_pairs
async def compare_indexes(
*, first_index: str, second_index: str, endpoint: str, credential: AsyncTokenCredential
) -> None:
"""Fetch documents from both indexes and report detailed field differences."""
first_result, second_result = await asyncio.gather(
collect_index_documents(endpoint=endpoint, credential=credential, index_name=first_index),
collect_index_documents(endpoint=endpoint, credential=credential, index_name=second_index),
)
missing_from_second = first_result.keys - second_result.keys
missing_from_first = second_result.keys - first_result.keys
logger.info(
"Index '%s': %d docs, %d unique source pairs",
first_result.index_name,
first_result.total_documents,
len(first_result.keys),
)
logger.info(
"Index '%s': %d docs, %d unique source pairs",
second_result.index_name,
second_result.total_documents,
len(second_result.keys),
)
def format_missing(pairs: Iterable[IndexKey]) -> str:
return "\n".join(
f" sourcefile={sourcefile or '<none>'}, sourcepage={sourcepage or '<none>'}"
for sourcefile, sourcepage in sorted(pairs)
)
if missing_from_second:
logger.warning(
"Pairs present in '%s' but missing in '%s':\n%s",
first_index,
second_index,
format_missing(missing_from_second),
)
if missing_from_first:
logger.warning(
"Pairs present in '%s' but missing in '%s':\n%s",
second_index,
first_index,
format_missing(missing_from_first),
)
# Compare common keys for field differences
common_keys = first_result.keys & second_result.keys
if common_keys:
logger.info("Comparing %d common source pairs for field differences...", len(common_keys))
differences_found = False
for key in sorted(common_keys):
first_docs = first_result.documents_by_key[key]
second_docs = second_result.documents_by_key[key]
if len(first_docs) != len(second_docs):
differences_found = True
logger.warning("\n=== MISMATCH for sourcefile=%s, sourcepage=%s ===", key[0], key[1])
logger.warning(
" Document count: %s has %d chunks, %s has %d chunks",
first_index,
len(first_docs),
second_index,
len(second_docs),
)
# Match chunks by content similarity instead of position
matched_pairs = _match_chunks_by_similarity(first_docs, second_docs)
# Compare field sets and values for each matched document pair
for idx, (doc1, doc2, similarity) in enumerate(matched_pairs):
# Skip if one or both documents are empty (unmatched)
if not doc1 or not doc2:
differences_found = True
logger.warning(
"\n=== UNMATCHED CHUNK for sourcefile=%s, sourcepage=%s ===",
key[0],
key[1],
)
if not doc1:
logger.warning(" Chunk only in %s: ID=%s", second_index, doc2.get("id"))
if not doc2:
logger.warning(" Chunk only in %s: ID=%s", first_index, doc1.get("id"))
continue
if similarity < 0.8:
logger.warning(
"\n=== LOW SIMILARITY MATCH for sourcefile=%s, sourcepage=%s (chunk pair %d) ===",
key[0],
key[1],
idx,
)
logger.warning(" Content similarity: %.2f%%", similarity * 100)
logger.warning(" %s ID: %s", first_index, doc1.get("id"))
logger.warning(" %s ID: %s", second_index, doc2.get("id"))
fields1 = set(doc1.keys())
fields2 = set(doc2.keys())
missing_fields_in_second = fields1 - fields2
missing_fields_in_first = fields2 - fields1
has_field_diff = missing_fields_in_second or missing_fields_in_first
has_value_diff = False
value_diffs: list[tuple[str, Any, Any]] = []
embedding_diffs: list[tuple[str, int | None, int | None]] = []
# Get common fields first
common_fields = fields1 & fields2
# Compare embedding fields separately (dimension only, not values)
for field_name in sorted(common_fields):
if "embedding" in field_name.lower():
val1 = doc1[field_name]
val2 = doc2[field_name]
dim1 = len(val1) if isinstance(val1, list) else None
dim2 = len(val2) if isinstance(val2, list) else None
if dim1 != dim2:
embedding_diffs.append((field_name, dim1, dim2))
# Compare values for common fields (excluding embeddings and large fields)
for field_name in sorted(common_fields):
# Skip embedding fields and other large binary/array fields
if "embedding" in field_name.lower() or field_name.startswith("@search"):
continue
val1 = doc1[field_name]
val2 = doc2[field_name]
# Special handling for images field
if field_name == "images":
if isinstance(val1, list) and isinstance(val2, list):
if len(val1) != len(val2):
has_value_diff = True
value_diffs.append((field_name, val1, val2))
elif len(val1) > 0:
# Compare first image's non-embedding fields
img1_keys = set(val1[0].keys()) - {"embedding"}
img2_keys = set(val2[0].keys()) - {"embedding"}
if img1_keys != img2_keys:
has_value_diff = True
value_diffs.append((field_name, val1, val2))
# Check image embedding dimensions
for img_idx, (img1, img2) in enumerate(zip(val1, val2)):
if "embedding" in img1 and "embedding" in img2:
emb1 = img1["embedding"]
emb2 = img2["embedding"]
dim1 = len(emb1) if isinstance(emb1, list) else None
dim2 = len(emb2) if isinstance(emb2, list) else None
if dim1 != dim2:
embedding_diffs.append((f"images[{img_idx}].embedding", dim1, dim2))
elif val1 != val2:
has_value_diff = True
value_diffs.append((field_name, val1, val2))
# Special handling for content field - normalize whitespace
elif field_name == "content":
normalized1 = " ".join(str(val1).split()) if val1 else ""
normalized2 = " ".join(str(val2).split()) if val2 else ""
if normalized1 != normalized2:
has_value_diff = True
value_diffs.append((field_name, val1, val2))
elif val1 != val2:
has_value_diff = True
value_diffs.append((field_name, val1, val2))
if has_field_diff or has_value_diff or embedding_diffs:
differences_found = True
logger.warning(
"\n=== DIFFERENCE for sourcefile=%s, sourcepage=%s (chunk %d) ===", key[0], key[1], idx
)
if missing_fields_in_second:
logger.warning(" Fields only in %s: %s", first_index, sorted(missing_fields_in_second))
if missing_fields_in_first:
logger.warning(" Fields only in %s: %s", second_index, sorted(missing_fields_in_first))
if embedding_diffs:
for field_name, dim1, dim2 in embedding_diffs:
logger.warning(" Embedding field '%s' dimension mismatch:", field_name)
logger.warning(" %s: %s dimensions", first_index, dim1)
logger.warning(" %s: %s dimensions", second_index, dim2)
for field_name, val1, val2 in value_diffs:
logger.warning(" Field '%s':", field_name)
logger.warning(" %s: %s", first_index, _format_value(val1, field_name))
logger.warning(" %s: %s", second_index, _format_value(val2, field_name))
if not differences_found:
logger.info("No field differences found for common source pairs.")
if not missing_from_first and not missing_from_second and not differences_found:
logger.info("Indexes are identical.")
def _format_value(val: Any, field_name: str | None = None) -> str:
"""Format a field value for logging, truncating if necessary."""
if val is None:
return "<none>"
if isinstance(val, str):
return val[:200] + "..." if len(val) > 200 else val
if isinstance(val, list):
# Special formatting for images field
if field_name == "images" and len(val) > 0 and isinstance(val[0], dict):
img_keys = sorted(set(val[0].keys()) - {"embedding"})
return f"[{len(val)} images with fields: {img_keys}]"
return f"[{len(val)} items]" if len(val) > 5 else str(val)
return str(val)
async def main() -> None:
"""Entry point for asynchronous execution."""
args = parse_args()
load_azd_env()
service_name = os.getenv("AZURE_SEARCH_SERVICE")
if not service_name:
raise RuntimeError(
"AZURE_SEARCH_SERVICE must be set. Run 'azd env get-values' or ensure azd environment is loaded."
)
endpoint = build_endpoint(service_name)
tenant_id = os.getenv("AZURE_TENANT_ID")
credential = AzureDeveloperCliCredential(tenant_id=tenant_id) if tenant_id else AzureDeveloperCliCredential()
try:
await compare_indexes(
first_index=args.first_index,
second_index=args.second_index,
endpoint=endpoint,
credential=credential,
)
finally:
await credential.close()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format="%(message)s")
logger.setLevel(logging.DEBUG)
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment