Skip to content

Instantly share code, notes, and snippets.

@btbytes
Created September 12, 2025 19:24
Show Gist options
  • Select an option

  • Save btbytes/3e0674b4d3c25974796b170f933be894 to your computer and use it in GitHub Desktop.

Select an option

Save btbytes/3e0674b4d3c25974796b170f933be894 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
Create an Azure AI Search vector index and ingest a directory of PDFs.
Requires:
pip install azure-search-documents pypdf openai python-dotenv
Env vars:
AZURE_SEARCH_ENDPOINT
AZURE_SEARCH_API_KEY
AZURE_SEARCH_INDEX
AZURE_OPENAI_ENDPOINT
AZURE_OPENAI_API_KEY
AZURE_OPENAI_EMBED_DEPLOYMENT
PDF_DIR
"""
import os
import uuid
import math
import time
from typing import Iterable, List, Dict
from dotenv import load_dotenv
from pypdf import PdfReader
from azure.core.credentials import AzureKeyCredential
from azure.search.documents import SearchClient
from azure.search.documents.indexes import SearchIndexClient
from azure.search.documents.indexes.models import (
SearchIndex,
SearchField,
SimpleField,
SearchableField,
SearchFieldDataType,
VectorSearch,
VectorSearchAlgorithmConfiguration,
VectorSearchProfile,
CorsOptions,
)
# OpenAI (for Azure OpenAI)
from openai import OpenAI
# ---------- config & helpers ----------
load_dotenv()
SEARCH_ENDPOINT = os.environ["AZURE_SEARCH_ENDPOINT"]
SEARCH_KEY = os.environ["AZURE_SEARCH_API_KEY"]
INDEX_NAME = os.environ.get("AZURE_SEARCH_INDEX", "pdf-vector-index")
AOAI_ENDPOINT = os.environ["AZURE_OPENAI_ENDPOINT"]
AOAI_KEY = os.environ["AZURE_OPENAI_API_KEY"]
AOAI_EMBED_DEPLOYMENT = os.environ["AZURE_OPENAI_EMBED_DEPLOYMENT"]
PDF_DIR = os.environ.get("PDF_DIR", "./pdfs")
# Choose your chunking and batching preferences
CHUNK_SIZE = 1200 # characters per chunk
CHUNK_OVERLAP = 200 # overlap between chunks
UPLOAD_BATCH_SIZE = 100 # docs per upload batch
EMBED_BATCH_SIZE = 64 # inputs per embeddings call
# Embedding dimensions for text-embedding-3-small/large are auto-returned; we can infer from first call.
# We'll detect dynamically so you can swap models without code edits.
EMBED_DIM = None
# ---------- text utils ----------
def chunk_text(text: str, size: int = CHUNK_SIZE, overlap: int = CHUNK_OVERLAP) -> List[str]:
text = (text or "").strip()
if not text:
return []
chunks = []
start = 0
while start < len(text):
end = min(len(text), start + size)
chunks.append(text[start:end])
if end == len(text):
break
start = end - overlap
if start < 0:
start = 0
return chunks
def extract_pdf_text(path: str) -> str:
reader = PdfReader(path)
parts = []
for page in reader.pages:
try:
parts.append(page.extract_text() or "")
except Exception:
parts.append("") # skip extraction errors but continue
return "\n".join(parts).strip()
# ---------- embeddings ----------
def get_aoai_client() -> OpenAI:
# OpenAI SDK talks to Azure OpenAI by setting base_url + api_key
# Ref: MS Learn embeddings how-to (Python) [oai_citation:3‡Microsoft Learn](https://learn.microsoft.com/en-us/azure/ai-foundry/openai/how-to/embeddings?utm_source=chatgpt.com)
return OpenAI(
api_key=AOAI_KEY,
base_url=f"{AOAI_ENDPOINT.rstrip('/')}/openai/v1",
)
def embed_texts(client: OpenAI, inputs: List[str]) -> List[List[float]]:
# Batch for efficiency (and token limits)
embeddings: List[List[float]] = []
for i in range(0, len(inputs), EMBED_BATCH_SIZE):
batch = inputs[i:i+EMBED_BATCH_SIZE]
resp = client.embeddings.create(
model=AOAI_EMBED_DEPLOYMENT,
input=batch
)
for d in resp.data:
embeddings.append(d.embedding)
return embeddings
# ---------- Azure AI Search index management ----------
def ensure_index(index_client: SearchIndexClient, embed_dim: int):
"""
Create or update an index with:
- content: full text
- contentVector: vector field (Collection(Edm.Single)) with HNSW profile
- file_path, page, chunk_id: metadata
"""
# Vector search config (HNSW) + profile binding
# Ref docs: vector index structure & profiles (create-index how-to) [oai_citation:4‡Microsoft Learn](https://learn.microsoft.com/en-us/azure/search/vector-search-how-to-create-index?utm_source=chatgpt.com)
vector_search = VectorSearch(
algorithms=[VectorSearchAlgorithmConfiguration(name="hnsw-config", kind="hnsw")],
profiles=[VectorSearchProfile(name="my-hnsw-profile", algorithm_configuration_name="hnsw-config")]
)
fields = [
SimpleField(name="id", type=SearchFieldDataType.String, key=True, filterable=True, sortable=False, facetable=False),
SearchableField(name="content", type=SearchFieldDataType.String, searchable=True, retrievable=True, analyzer_name="en.lucene"),
SimpleField(name="file_path", type=SearchFieldDataType.String, filterable=True, searchable=False, facetable=True, retrievable=True),
SimpleField(name="page", type=SearchFieldDataType.Int32, filterable=True, searchable=False, retrievable=True),
SimpleField(name="chunk_id", type=SearchFieldDataType.Int32, filterable=True, searchable=False, retrievable=True),
# Vector field:
SearchField(
name="contentVector",
type=SearchFieldDataType.Collection(SearchFieldDataType.Single),
searchable=True,
retrievable=False, # set True if you want vectors back (usually no)
dimensions=embed_dim,
vector_search_profile_name="my-hnsw-profile",
),
]
cors = CorsOptions(allowed_origins=["*"], max_age_in_seconds=300)
index = SearchIndex(name=INDEX_NAME, fields=fields, vector_search=vector_search, cors_options=cors)
# Create or recreate index
try:
existing = index_client.get_index(INDEX_NAME)
# if exists, update schema if needed
index_client.delete_index(INDEX_NAME)
except Exception:
pass
index_client.create_index(index)
# ---------- ingestion ----------
def yield_pdf_chunks(pdf_dir: str) -> Iterable[Dict]:
for root, _, files in os.walk(pdf_dir):
for fname in files:
if not fname.lower().endswith(".pdf"):
continue
fpath = os.path.join(root, fname)
text = extract_pdf_text(fpath)
if not text:
continue
# Optional: per-page chunking can be better for traceability; here we chunk whole doc text
chunks = chunk_text(text, CHUNK_SIZE, CHUNK_OVERLAP)
for i, chunk in enumerate(chunks):
yield {
"file_path": os.path.relpath(fpath, pdf_dir),
"page": -1, # unknown if not per-page; customize if you chunk per page
"chunk_id": i,
"content": chunk
}
def main():
# Clients
search_index_client = SearchIndexClient(SEARCH_ENDPOINT, AzureKeyCredential(SEARCH_KEY))
search_client = SearchClient(SEARCH_ENDPOINT, INDEX_NAME, AzureKeyCredential(SEARCH_KEY))
aoai = get_aoai_client()
# Peek one embedding to discover dimension (so index matches the model)
dim_probe = aoai.embeddings.create(model=AOAI_EMBED_DEPLOYMENT, input=["dimension probe"]).data[0].embedding
embed_dim = len(dim_probe)
# Ensure index exists with correct dim/profile
ensure_index(search_index_client, embed_dim)
# Collect chunks -> embed -> upload
buffer_docs: List[Dict] = []
buffer_texts: List[str] = []
def flush_buffers():
if not buffer_docs:
return
vectors = embed_texts(aoai, buffer_texts)
for doc, vec in zip(buffer_docs, vectors):
doc["id"] = str(uuid.uuid4())
doc["contentVector"] = vec
# Upload in sub-batches to respect payload size
for i in range(0, len(buffer_docs), UPLOAD_BATCH_SIZE):
batch = buffer_docs[i:i+UPLOAD_BATCH_SIZE]
result = search_client.upload_documents(batch)
# Optional: simple success check
failed = [r for r in result if not r.succeeded]
if failed:
print(f"[WARN] {len(failed)} docs failed to upload; first error: {failed[0].error_message}")
buffer_docs.clear()
buffer_texts.clear()
count = 0
for rec in yield_pdf_chunks(PDF_DIR):
buffer_docs.append(rec)
buffer_texts.append(rec["content"])
count += 1
if len(buffer_docs) >= EMBED_BATCH_SIZE:
flush_buffers()
flush_buffers()
print(f"Done. Ingested {count} chunks into index '{INDEX_NAME}'.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment