Created
September 12, 2025 19:24
-
-
Save btbytes/3e0674b4d3c25974796b170f933be894 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Create an Azure AI Search vector index and ingest a directory of PDFs. | |
| Requires: | |
| pip install azure-search-documents pypdf openai python-dotenv | |
| Env vars: | |
| AZURE_SEARCH_ENDPOINT | |
| AZURE_SEARCH_API_KEY | |
| AZURE_SEARCH_INDEX | |
| AZURE_OPENAI_ENDPOINT | |
| AZURE_OPENAI_API_KEY | |
| AZURE_OPENAI_EMBED_DEPLOYMENT | |
| PDF_DIR | |
| """ | |
| import os | |
| import uuid | |
| import math | |
| import time | |
| from typing import Iterable, List, Dict | |
| from dotenv import load_dotenv | |
| from pypdf import PdfReader | |
| from azure.core.credentials import AzureKeyCredential | |
| from azure.search.documents import SearchClient | |
| from azure.search.documents.indexes import SearchIndexClient | |
| from azure.search.documents.indexes.models import ( | |
| SearchIndex, | |
| SearchField, | |
| SimpleField, | |
| SearchableField, | |
| SearchFieldDataType, | |
| VectorSearch, | |
| VectorSearchAlgorithmConfiguration, | |
| VectorSearchProfile, | |
| CorsOptions, | |
| ) | |
| # OpenAI (for Azure OpenAI) | |
| from openai import OpenAI | |
| # ---------- config & helpers ---------- | |
| load_dotenv() | |
| SEARCH_ENDPOINT = os.environ["AZURE_SEARCH_ENDPOINT"] | |
| SEARCH_KEY = os.environ["AZURE_SEARCH_API_KEY"] | |
| INDEX_NAME = os.environ.get("AZURE_SEARCH_INDEX", "pdf-vector-index") | |
| AOAI_ENDPOINT = os.environ["AZURE_OPENAI_ENDPOINT"] | |
| AOAI_KEY = os.environ["AZURE_OPENAI_API_KEY"] | |
| AOAI_EMBED_DEPLOYMENT = os.environ["AZURE_OPENAI_EMBED_DEPLOYMENT"] | |
| PDF_DIR = os.environ.get("PDF_DIR", "./pdfs") | |
| # Choose your chunking and batching preferences | |
| CHUNK_SIZE = 1200 # characters per chunk | |
| CHUNK_OVERLAP = 200 # overlap between chunks | |
| UPLOAD_BATCH_SIZE = 100 # docs per upload batch | |
| EMBED_BATCH_SIZE = 64 # inputs per embeddings call | |
| # Embedding dimensions for text-embedding-3-small/large are auto-returned; we can infer from first call. | |
| # We'll detect dynamically so you can swap models without code edits. | |
| EMBED_DIM = None | |
| # ---------- text utils ---------- | |
| def chunk_text(text: str, size: int = CHUNK_SIZE, overlap: int = CHUNK_OVERLAP) -> List[str]: | |
| text = (text or "").strip() | |
| if not text: | |
| return [] | |
| chunks = [] | |
| start = 0 | |
| while start < len(text): | |
| end = min(len(text), start + size) | |
| chunks.append(text[start:end]) | |
| if end == len(text): | |
| break | |
| start = end - overlap | |
| if start < 0: | |
| start = 0 | |
| return chunks | |
| def extract_pdf_text(path: str) -> str: | |
| reader = PdfReader(path) | |
| parts = [] | |
| for page in reader.pages: | |
| try: | |
| parts.append(page.extract_text() or "") | |
| except Exception: | |
| parts.append("") # skip extraction errors but continue | |
| return "\n".join(parts).strip() | |
| # ---------- embeddings ---------- | |
| def get_aoai_client() -> OpenAI: | |
| # OpenAI SDK talks to Azure OpenAI by setting base_url + api_key | |
| # Ref: MS Learn embeddings how-to (Python) [oai_citation:3‡Microsoft Learn](https://learn.microsoft.com/en-us/azure/ai-foundry/openai/how-to/embeddings?utm_source=chatgpt.com) | |
| return OpenAI( | |
| api_key=AOAI_KEY, | |
| base_url=f"{AOAI_ENDPOINT.rstrip('/')}/openai/v1", | |
| ) | |
| def embed_texts(client: OpenAI, inputs: List[str]) -> List[List[float]]: | |
| # Batch for efficiency (and token limits) | |
| embeddings: List[List[float]] = [] | |
| for i in range(0, len(inputs), EMBED_BATCH_SIZE): | |
| batch = inputs[i:i+EMBED_BATCH_SIZE] | |
| resp = client.embeddings.create( | |
| model=AOAI_EMBED_DEPLOYMENT, | |
| input=batch | |
| ) | |
| for d in resp.data: | |
| embeddings.append(d.embedding) | |
| return embeddings | |
| # ---------- Azure AI Search index management ---------- | |
| def ensure_index(index_client: SearchIndexClient, embed_dim: int): | |
| """ | |
| Create or update an index with: | |
| - content: full text | |
| - contentVector: vector field (Collection(Edm.Single)) with HNSW profile | |
| - file_path, page, chunk_id: metadata | |
| """ | |
| # Vector search config (HNSW) + profile binding | |
| # Ref docs: vector index structure & profiles (create-index how-to) [oai_citation:4‡Microsoft Learn](https://learn.microsoft.com/en-us/azure/search/vector-search-how-to-create-index?utm_source=chatgpt.com) | |
| vector_search = VectorSearch( | |
| algorithms=[VectorSearchAlgorithmConfiguration(name="hnsw-config", kind="hnsw")], | |
| profiles=[VectorSearchProfile(name="my-hnsw-profile", algorithm_configuration_name="hnsw-config")] | |
| ) | |
| fields = [ | |
| SimpleField(name="id", type=SearchFieldDataType.String, key=True, filterable=True, sortable=False, facetable=False), | |
| SearchableField(name="content", type=SearchFieldDataType.String, searchable=True, retrievable=True, analyzer_name="en.lucene"), | |
| SimpleField(name="file_path", type=SearchFieldDataType.String, filterable=True, searchable=False, facetable=True, retrievable=True), | |
| SimpleField(name="page", type=SearchFieldDataType.Int32, filterable=True, searchable=False, retrievable=True), | |
| SimpleField(name="chunk_id", type=SearchFieldDataType.Int32, filterable=True, searchable=False, retrievable=True), | |
| # Vector field: | |
| SearchField( | |
| name="contentVector", | |
| type=SearchFieldDataType.Collection(SearchFieldDataType.Single), | |
| searchable=True, | |
| retrievable=False, # set True if you want vectors back (usually no) | |
| dimensions=embed_dim, | |
| vector_search_profile_name="my-hnsw-profile", | |
| ), | |
| ] | |
| cors = CorsOptions(allowed_origins=["*"], max_age_in_seconds=300) | |
| index = SearchIndex(name=INDEX_NAME, fields=fields, vector_search=vector_search, cors_options=cors) | |
| # Create or recreate index | |
| try: | |
| existing = index_client.get_index(INDEX_NAME) | |
| # if exists, update schema if needed | |
| index_client.delete_index(INDEX_NAME) | |
| except Exception: | |
| pass | |
| index_client.create_index(index) | |
| # ---------- ingestion ---------- | |
| def yield_pdf_chunks(pdf_dir: str) -> Iterable[Dict]: | |
| for root, _, files in os.walk(pdf_dir): | |
| for fname in files: | |
| if not fname.lower().endswith(".pdf"): | |
| continue | |
| fpath = os.path.join(root, fname) | |
| text = extract_pdf_text(fpath) | |
| if not text: | |
| continue | |
| # Optional: per-page chunking can be better for traceability; here we chunk whole doc text | |
| chunks = chunk_text(text, CHUNK_SIZE, CHUNK_OVERLAP) | |
| for i, chunk in enumerate(chunks): | |
| yield { | |
| "file_path": os.path.relpath(fpath, pdf_dir), | |
| "page": -1, # unknown if not per-page; customize if you chunk per page | |
| "chunk_id": i, | |
| "content": chunk | |
| } | |
| def main(): | |
| # Clients | |
| search_index_client = SearchIndexClient(SEARCH_ENDPOINT, AzureKeyCredential(SEARCH_KEY)) | |
| search_client = SearchClient(SEARCH_ENDPOINT, INDEX_NAME, AzureKeyCredential(SEARCH_KEY)) | |
| aoai = get_aoai_client() | |
| # Peek one embedding to discover dimension (so index matches the model) | |
| dim_probe = aoai.embeddings.create(model=AOAI_EMBED_DEPLOYMENT, input=["dimension probe"]).data[0].embedding | |
| embed_dim = len(dim_probe) | |
| # Ensure index exists with correct dim/profile | |
| ensure_index(search_index_client, embed_dim) | |
| # Collect chunks -> embed -> upload | |
| buffer_docs: List[Dict] = [] | |
| buffer_texts: List[str] = [] | |
| def flush_buffers(): | |
| if not buffer_docs: | |
| return | |
| vectors = embed_texts(aoai, buffer_texts) | |
| for doc, vec in zip(buffer_docs, vectors): | |
| doc["id"] = str(uuid.uuid4()) | |
| doc["contentVector"] = vec | |
| # Upload in sub-batches to respect payload size | |
| for i in range(0, len(buffer_docs), UPLOAD_BATCH_SIZE): | |
| batch = buffer_docs[i:i+UPLOAD_BATCH_SIZE] | |
| result = search_client.upload_documents(batch) | |
| # Optional: simple success check | |
| failed = [r for r in result if not r.succeeded] | |
| if failed: | |
| print(f"[WARN] {len(failed)} docs failed to upload; first error: {failed[0].error_message}") | |
| buffer_docs.clear() | |
| buffer_texts.clear() | |
| count = 0 | |
| for rec in yield_pdf_chunks(PDF_DIR): | |
| buffer_docs.append(rec) | |
| buffer_texts.append(rec["content"]) | |
| count += 1 | |
| if len(buffer_docs) >= EMBED_BATCH_SIZE: | |
| flush_buffers() | |
| flush_buffers() | |
| print(f"Done. Ingested {count} chunks into index '{INDEX_NAME}'.") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment