Skip to content

Instantly share code, notes, and snippets.

@ehzawad
Created October 10, 2025 17:30
Show Gist options
  • Select an option

  • Save ehzawad/abd687cefbb8245e38e763221efde414 to your computer and use it in GitHub Desktop.

Select an option

Save ehzawad/abd687cefbb8245e38e763221efde414 to your computer and use it in GitHub Desktop.

Problem Decomposition: Bengali RAG-Based Chatbot for Election Commission

Project Overview

A production-ready Bengali conversational AI system for National Identity Card (NID) and voter registration queries. The system uses semantic search (RAG) with multi-turn form conversations, interruption handling, and state management.

Core Challenge: Build an intelligent chatbot that understands Bengali queries about NID/voter registration, retrieves relevant answers from a knowledge base, and handles complex multi-turn conversations like form filling.

The Tentative Approach

The Mental Model

  1. Understand the domain first - What problem are we solving? Who are the users?
  2. Identify the technical core - What's the hardest technical challenge?
  3. Build incrementally - Start simple, add complexity layer by layer
  4. Test continuously - Validate each piece before moving forward
  5. Think about failure modes - What can go wrong? How do we handle it?

The Architecture at 10,000 Feet

User Input (Bengali text)
    ↓
Text Preprocessing (clean, normalize)
    ↓
State Management (are we in a form? interrupted?)
    ↓
Decision: Multi-turn handler OR RAG search?
    ↓
Response Generation (answer + follow-up questions)
    ↓
State Update (track conversation)
    ↓
JSON Response to Client

PHASE 1: UNDERSTANDING & FOUNDATION

1.1 Problem Domain Understanding

What you're building: A Bengali chatbot for election commission queries Why it's complex: Multi-language (Bengali), specialized domain (NID/voting), conversational state

Problems to Solve:

1.1.1 Understand the Data Structure

  • Problem: Figure out what data you have and how it's organized
  • File: full_dataset/ec_train.csv (3048 rows) and full_dataset/tag_answer.csv (210 rows)
  • Task: Open CSV files, examine structure
  • Skills: CSV reading, data inspection
  • Verification: Can you describe the relationship between questions, tags, and answers?

How to solve:

import pandas as pd
df_train = pd.read_csv('full_dataset/ec_train.csv')
df_answers = pd.read_csv('full_dataset/tag_answer.csv')
print(df_train.head())
print(df_answers.head())
print(f"Questions: {len(df_train)}, Answer tags: {len(df_answers)}")

Key insight: The architecture uses a two-table design:

  • ec_train.csv: Maps user questions → tags
  • tag_answer.csv: Maps tags → answers
  • This allows many questions to share the same answer (tag-based indirection)

1.1.2 Understand RAG (Retrieval-Augmented Generation)

  • Problem: What is RAG and why do we need it?
  • Concept: Instead of training a language model on all answers, we:
    1. Store questions in a searchable vector database
    2. When user asks a question, find the most similar stored question
    3. Return the answer associated with that question
  • Why: Works better for factual QA, easy to update, no model training needed

Mental model:

User: "আমি NID কার্ড হারিয়েছি" (I lost my NID card)
    ↓
System converts to vector: [0.23, -0.45, 0.67, ...]
    ↓
Search database for similar question vectors
    ↓
Find: "এনআইডি কার্ড হারিয়ে গেলে..." → tag: "card_lost_and_damaged"
    ↓
Lookup tag in tag_answer.csv → return answer

1.1.3 Understand Multi-turn Conversations

  • Problem: What are multi-turn forms and why are they needed?
  • Scenario: User asks about foreign resident registration
    • Bot: "You want to register as NRI. Which country are you in?"
    • User: "UAE"
    • Bot: "Here's the UAE consulate info..."

Complexity: What if user interrupts?

  • Bot: "Which country?"

  • User: "How much does it cost?" (different question!)

  • Bot: "It costs X. Do you want to continue with country selection?"

  • Skills needed: State machines, conversation context tracking

  • Files: e5/multi_turn_state.py (523 lines of state management logic)


1.2 Environment Setup

1.2.1 Set Up Python Virtual Environment

  • Problem: Isolate project dependencies
  • Why: Avoid dependency conflicts with other projects
  • Task: Create venv, activate it
  • Verification: which python shows venv path
python3 -m venv venv
source venv/bin/activate

1.2.2 Understand Dependencies

  • Problem: What external libraries does this need?
  • File: requirements.txt (142 lines)
  • Categories:
    • Web framework: FastAPI, uvicorn (HTTP server)
    • ML/Embeddings: sentence-transformers, transformers, torch
    • Vector search: faiss-cpu (or faiss-gpu)
    • Bengali NLP: bangla-stemmer, bnunicodenormalizer
    • Data: pandas, numpy
    • Logging: loguru

Task: Read requirements.txt and categorize each library by purpose


1.2.3 Install Core Dependencies (Minimal First)

  • Problem: Install only what you need to get started
  • Strategy: Don't install everything at once (141 packages is overwhelming)
  • Start with: FastAPI, pandas, sentence-transformers
pip install fastapi uvicorn pandas sentence-transformers

Verification: python -c "import fastapi; print('OK')"


PHASE 2: BUILDING THE CORE (Walking Skeleton)

2.1 FastAPI Basics - Hello World

2.1.1 Create Minimal FastAPI Server

  • Problem: Can you run a web server that responds to HTTP requests?
  • Goal: Understand request/response cycle before adding complexity
  • Skills: HTTP basics, FastAPI syntax, Pydantic models

Task: Create test_server.py

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

@app.get("/")
def read_root():
    return {"message": "Server is running"}

class QuestionRequest(BaseModel):
    question: str

@app.post("/ask/")
def ask_question(req: QuestionRequest):
    return {"response": f"You asked: {req.question}"}

Run: uvicorn test_server:app --reload Test: Open browser to http://localhost:8000 Verification: Can you POST JSON and get a response?

curl -X POST http://localhost:8000/ask/ \
  -H "Content-Type: application/json" \
  -d '{"question":"test"}'

2.1.2 Understand Pydantic Models

  • Problem: How does FastAPI validate incoming JSON?
  • Concept: Pydantic models define the "shape" of data
  • File: e5/e5_main.py:433-436 (RequestBody model)

Task: Extend your model to match the real API

class RequestBody(BaseModel):
    question: str       # User's current question
    messages: str       # JSON string of conversation history
    chat_id: str        # Unique conversation identifier

Why messages is a string: It's JSON-encoded conversation history Challenge: You'll need to parse it later: json.loads(messages)


2.2 CSV Data Loading

2.2.1 Load CSV Files into Memory

  • Problem: Read the two CSV files into pandas DataFrames
  • Files: ec_train.csv, tag_answer.csv
  • Skills: pandas basics, file I/O, error handling

Task:

import pandas as pd

df_questions = pd.read_csv('full_dataset/ec_train.csv', encoding='utf-8')
df_answers = pd.read_csv('full_dataset/tag_answer.csv', encoding='utf-8')

print(f"Loaded {len(df_questions)} questions")
print(f"Loaded {len(df_answers)} answer tags")

Verification: No errors, counts match expected values


2.2.2 Handle Bad CSV Data

  • Problem: CSVs might have empty rows, bad encoding, malformed lines
  • Real issue: See e5/e5_main.py:76-101 - they clean CSVs before use
  • Task: Write a function to clean CSV data

Problems in real data:

  • Empty rows (all fields None)
  • Extra whitespace in fields
  • Multiple spaces/newlines in text
  • Empty strings in important fields

Solution:

def clean_csv(csv_path, columns_to_clean):
    df = pd.read_csv(csv_path, encoding='utf-8', on_bad_lines='skip')
    original_count = len(df)

    # Remove completely empty rows
    df.dropna(how='all', inplace=True)

    # Clean each specified column
    for col in columns_to_clean:
        if col in df.columns:
            # Convert to string, strip whitespace, collapse multiple spaces
            df[col] = df[col].astype(str).apply(lambda x: re.sub(r'\s+', ' ', x.strip()))
            # Remove rows where this column is empty
            df = df[df[col] != ""]

    print(f"Cleaned: {original_count - len(df)} rows removed")
    return df

Verification: Compare row counts before/after cleaning


2.2.3 Merge Questions with Answers

  • Problem: Join the two tables so each question has its answer
  • SQL equivalent: SELECT * FROM questions JOIN answers ON questions.tag = answers.tag
  • Why: Makes it easier to work with the data

Task:

# Merge on 'tag' column
merged_df = df_questions.merge(df_answers, on="tag", how="left")

# Check for questions without answers
missing = merged_df['answer'].isna().sum()
if missing > 0:
    print(f"WARNING: {missing} questions have no matching answer")
    merged_df = merged_df.dropna(subset=["answer"])

print(f"Final dataset: {len(merged_df)} question-answer pairs")

Key insight: how="left" keeps all questions even if some tags don't have answers Verification: Every row should have both question and answer


2.3 Text Processing - Bengali Specifics

2.3.1 Install Bengali NLP Libraries

  • Problem: Bengali text needs special handling
  • Libraries:
    • bangla-stemmer: Reduce words to root form
    • bnunicodenormalizer: Standardize Unicode representations
pip install bangla-stemmer bnunicodenormalizer

2.3.2 Understand Bengali Text Normalization

  • Problem: Same Bengali word can be written multiple ways in Unicode
  • Example: "হ্যাঁ" (yes) might have different Unicode sequences
  • Why it matters: "হ্যাঁ" and "হ্যাঁ" might look identical but be different strings
  • Solution: Normalize to canonical form

Task:

from bnunicodenormalizer import Normalizer

bnorm = Normalizer(allow_english=True)

text = "হ্যাঁ আমি চাই"
words = text.split()
normalized = []
for word in words:
    result = bnorm(word)
    normalized.append(result["normalized"])

print(" ".join(normalized))

Verification: Test with copy-pasted Bengali text from different sources


2.3.3 Clean User Input

  • Problem: Users type messy input (punctuation, extra spaces, mixed case)
  • Goal: Standardize input before processing
  • See: e5/e5_main.py:136-138

Task: Write a text cleaning function

import re

# Pattern to remove: punctuation, special characters
cleaning_pattern = re.compile(r"[-=+,#/\:^.@*\"※~ㆍ!』'|\(\)\[\]`'…》\"\"\'·।?]")

def clean_text(sentence):
    # Remove special characters
    sentence = cleaning_pattern.sub("", sentence)
    # Collapse multiple spaces, convert to lowercase
    return " ".join(sentence.split()).lower()

# Test
user_input = "আমার NID কার্ড হারিয়ে গেছে!!!"
clean = clean_text(user_input)
print(clean)  # Should be cleaner

Verification: Try messy inputs, check output has no punctuation, single spaces


2.4 Embeddings & Vector Search - The Heart of RAG

2.4.1 Understand What Embeddings Are

  • Problem: How do we find similar questions?
  • Bad approach: String matching (exact words only)
  • Good approach: Semantic similarity (meaning-based)

Concept:

  • Convert text to a vector of numbers (embedding)
  • Similar meanings = similar vectors
  • Distance between vectors = semantic similarity

Example:

"I lost my card" → [0.23, -0.45, 0.67, 0.12, ...]
"My card is missing" → [0.25, -0.43, 0.65, 0.11, ...]  (very close!)
"What's the weather?" → [-0.80, 0.34, -0.22, 0.91, ...]  (far away)

2.4.2 Load an Embedding Model

  • Problem: Convert Bengali text to vectors
  • Model: intfloat/multilingual-e5-large-instruct (supports Bengali)
  • Library: sentence-transformers

Task:

from sentence_transformers import SentenceTransformer

# This will download the model (first time only, ~500MB)
model = SentenceTransformer("intfloat/multilingual-e5-large-instruct")

# Get embedding dimension
dim = model.get_sentence_embedding_dimension()
print(f"Embedding dimension: {dim}")  # Should be 1024

Wait time: First run downloads model (2-5 minutes) Verification: Model loads without errors, dimension is 1024


2.4.3 Generate Embeddings for Questions

  • Problem: Convert all 3048 questions to vectors
  • Challenge: This takes time (batch processing needed)
  • See: e5/e5_main.py:216-236

Task:

# Prepare questions for E5 model (it needs special formatting)
instruction = (
    "You are an expert in matching Bangladeshi NID queries. "
    "Find the most semantically relevant question."
)
prefix = f"Instruct: {instruction}\nquery: "

# Format all questions
questions = df['question'].tolist()
formatted_questions = [f"{prefix}{clean_text(q)}" for q in questions]

# Generate embeddings (batched for speed)
print("Generating embeddings... this may take 1-2 minutes")
embeddings = model.encode(
    formatted_questions,
    show_progress_bar=True,
    convert_to_numpy=True,
    batch_size=32
)

print(f"Generated {len(embeddings)} embeddings of dimension {embeddings.shape[1]}")

Expected: ~2 minutes for 3048 questions on CPU Verification: Shape should be (3048, 1024)


2.4.4 Understand FAISS Vector Search

  • Problem: How do we search 3048 vectors quickly?
  • Naive: Compare query to every vector (slow for large datasets)
  • FAISS: Facebook's library for efficient similarity search

Concept:

1. Build an index (one-time setup)
2. Add all vectors to the index
3. Query: "Find k nearest neighbors to this vector"
4. FAISS returns indices and distances

Why it's fast: Uses approximate nearest neighbor algorithms


2.4.5 Build a FAISS Index

  • Problem: Create searchable index from embeddings
  • Index type: IndexFlatIP (Inner Product = cosine similarity)
  • Why normalize: Cosine similarity needs unit vectors

Task:

pip install faiss-cpu  # Or faiss-gpu if you have NVIDIA GPU
import faiss
import numpy as np

# Normalize embeddings (required for cosine similarity)
faiss.normalize_L2(embeddings)

# Create index
embedding_dim = embeddings.shape[1]
index = faiss.IndexFlatIP(embedding_dim)  # IP = Inner Product

# Add vectors to index
index.add(embeddings)

print(f"Index contains {index.ntotal} vectors")

Verification: index.ntotal should equal number of questions


2.4.6 Search the Index

  • Problem: Given a user query, find most similar questions
  • Goal: Understand the search process end-to-end

Task:

# User query
query = "আমার কার্ড হারিয়ে গেছে"

# Format like we did for training data
query_formatted = f"{prefix}{clean_text(query)}"

# Generate query embedding
query_embedding = model.encode(query_formatted, convert_to_numpy=True)
query_embedding = query_embedding.reshape(1, -1)  # Shape: (1, 1024)

# Normalize
faiss.normalize_L2(query_embedding)

# Search for top 3 matches
k = 3
scores, indices = index.search(query_embedding, k)

# Print results
print(f"\nQuery: {query}")
print(f"\nTop {k} matches:")
for i, (score, idx) in enumerate(zip(scores[0], indices[0])):
    print(f"{i+1}. Score: {score:.3f}")
    print(f"   Question: {df.iloc[idx]['question']}")
    print(f"   Tag: {df.iloc[idx]['tag']}")
    print(f"   Answer: {df.iloc[idx]['answer'][:100]}...")
    print()

Verification: Top result should be semantically related to query Score range: 0.0 (unrelated) to 1.0 (identical)


2.5 Build the RAG System Class

2.5.1 Design the RAGSystem Class

  • Problem: Organize all RAG logic into a reusable class
  • Responsibilities:
    • Load data
    • Build/load index
    • Search for similar questions
    • Return answers
  • See: e5/e5_main.py:150-393

Task: Create class skeleton

class RAGSystem:
    def __init__(self, question_csv, answer_csv, index_path=None):
        # Load model
        # Load CSVs
        # Build or load FAISS index
        pass

    def search(self, query, k=3):
        # Find similar questions
        # Return list of (question, answer, score, tag)
        pass

    def save_index(self, path):
        # Save FAISS index to disk
        pass

    def load_index(self, path):
        # Load FAISS index from disk
        pass

2.5.2 Implement init Method

  • Problem: Initialize the RAG system
  • Decision: Load existing index OR build new one?

Task:

def __init__(self, question_csv, answer_csv, index_path=None):
    print("Initializing RAG System...")

    # Load model
    self.model = SentenceTransformer("intfloat/multilingual-e5-large-instruct")
    self.embedding_dim = self.model.get_sentence_embedding_dimension()

    # Load and clean CSVs
    self.questions_df = clean_csv(question_csv, ["question", "tag"])
    self.answers_df = clean_csv(answer_csv, ["tag", "answer"])

    # Merge
    self.df = self.questions_df.merge(self.answers_df, on="tag", how="left")
    self.df = self.df.dropna(subset=["answer"])

    # Tag -> answer mapping for fast lookup
    self.tag_answer_map = dict(zip(self.answers_df["tag"], self.answers_df["answer"]))

    # Handle index
    if index_path and os.path.exists(index_path):
        self.load_index(index_path)
    else:
        self.initialize_embeddings()
        if index_path:
            self.save_index(index_path)

Key decision: Cache the index to avoid re-computing embeddings every time


2.5.3 Implement Search Method

  • Problem: Given query, return top k answers
  • Return format: List of (question, answer, score, tag)

Task:

def search(self, query, k=3):
    # Format query
    instruction = "You are an expert in matching Bangladeshi NID queries..."
    prefix = f"Instruct: {instruction}\nquery: "
    query_formatted = f"{prefix}{clean_text(query)}"

    # Generate embedding
    query_embedding = self.model.encode(
        query_formatted,
        convert_to_numpy=True,
        normalize_embeddings=True
    ).reshape(1, -1)

    # Search
    scores, indices = self.index.search(query_embedding, k)

    # Format results
    results = []
    for score, idx in zip(scores[0], indices[0]):
        if idx < len(self.df):
            row = self.df.iloc[idx]
            tag = row['tag']
            answer = self.tag_answer_map.get(tag, row['answer'])
            results.append((
                row['question'],  # matched question
                answer,           # answer text
                float(score),     # similarity score
                tag               # tag identifier
            ))

    return results

Verification: Test with sample queries, check scores are reasonable


2.5.4 Implement Index Persistence

  • Problem: Save/load index to avoid re-computing
  • Why: Building index takes 1-2 minutes, loading takes 1 second

Task:

def save_index(self, path):
    if self.index is None:
        raise ValueError("No index to save")

    # FAISS can save to disk
    faiss.write_index(self.index, path)
    print(f"Index saved to {path}")

def load_index(self, path):
    if not os.path.exists(path):
        raise FileNotFoundError(f"Index not found: {path}")

    self.index = faiss.read_index(path)
    print(f"Index loaded from {path}")

Usage:

# First run: builds and saves
rag = RAGSystem("ec_train.csv", "tag_answer.csv", "faiss_index.bin")

# Subsequent runs: loads from disk (much faster)
rag = RAGSystem("ec_train.csv", "tag_answer.csv", "faiss_index.bin")

2.6 Integrate RAG with FastAPI

2.6.1 Initialize RAG at Server Startup

  • Problem: Where to create the RAG instance?
  • Bad: Create new RAG for each request (slow!)
  • Good: Create once at module level (shared across requests)

Task: Update your FastAPI server

from fastapi import FastAPI
from pydantic import BaseModel
import json

app = FastAPI()

# Initialize RAG system ONCE (at import time)
print("Loading RAG system...")
rag = RAGSystem(
    "full_dataset/ec_train.csv",
    "full_dataset/tag_answer.csv",
    "faiss_index.bin"
)
print("RAG system ready!")

class RequestBody(BaseModel):
    question: str
    messages: str
    chat_id: str

@app.get("/")
def read_root():
    return {"message": "Welcome to EC Bot API!"}

@app.post("/ec_bot/")
def get_response(body: RequestBody):
    # Search RAG
    results = rag.search(body.question, k=3)

    # Get top result
    top_question, top_answer, top_score, top_tag = results[0]

    return {
        "response": top_answer,
        "score": top_score,
        "tag": top_tag,
        "matched_question": top_question
    }

Test:

uvicorn your_file:app --reload

curl -X POST http://localhost:8000/ec_bot/ \
  -H "Content-Type: application/json" \
  -d '{"question":"আমার কার্ড হারিয়ে গেছে", "messages":"[]", "chat_id":"123"}'

Verification: Should return Bengali answer from dataset


2.6.2 Add Confidence Threshold

  • Problem: What if RAG isn't confident? (low similarity score)
  • Solution: If score < threshold, return "I don't know" response
  • See: e5/e5_main.py:438 (PROBABILITY_THRESHOLD = 0.6)

Task:

PROBABILITY_THRESHOLD = 0.6

FALLBACK_RESPONSES = [
    "আপনার প্রশ্নের জন্য ধন্যবাদ, দয়া করে আরও তথ্য জানতে আবার জিজ্ঞাসা করুন।",
    "প্রশ্নটি বোঝা যাচ্ছে না, আরও নির্দিষ্টভাবে জিজ্ঞাসা করলে ভালো হবে।",
]

@app.post("/ec_bot/")
def get_response(body: RequestBody):
    results = rag.search(body.question, k=3)
    top_question, top_answer, top_score, top_tag = results[0]

    if top_score < PROBABILITY_THRESHOLD:
        # Low confidence - don't answer
        import random
        response = random.choice(FALLBACK_RESPONSES)
        is_relevant = False
    else:
        response = top_answer
        is_relevant = True

    return {
        "response": response,
        "score": top_score,
        "is_relevant": is_relevant,
        "tag": top_tag
    }

Verification: Test with off-topic question, should get fallback


PHASE 3: ADVANCED FEATURES

3.1 Conversation History Management

3.1.1 Understand Message Format

  • Problem: How is conversation history stored?
  • Format: List of message objects

Structure:

messages = [
    {"role": "user", "content": "হ্যালো"},
    {"role": "assistant", "content": "আপনাকে স্বাগতম", "tag": "greetings"},
    {"role": "user", "content": "আমার কার্ড হারিয়েছে"},
    {"role": "assistant", "content": "...", "tag": "card_lost_and_damaged"},
]

Why JSON string: Client sends messages as a JSON string Task: Parse and append new messages


3.1.2 Parse and Update Message History

  • Problem: Update conversation after each exchange
  • See: e5/e5_main.py:516-521

Task:

@app.post("/ec_bot/")
def get_response(body: RequestBody):
    # Parse message history
    messages = json.loads(body.messages)

    # Add current user message
    messages.append({"role": "user", "content": body.question})

    # ... RAG search ...

    # Add assistant response
    messages.append({
        "role": "assistant",
        "content": response,
        "tag": top_tag
    })

    # Convert back to JSON string
    messages_str = json.dumps(messages, ensure_ascii=False)

    return {
        "response": response,
        "messages": messages_str,  # Return updated history
        "tag": top_tag
    }

Why ensure_ascii=False: Preserve Bengali characters in JSON


3.1.3 Implement Message History Truncation

  • Problem: Conversation gets long, context grows unbounded
  • Solution: Keep only last N messages
  • Challenge: Don't break active forms (covered later)
  • See: e5/e5_main.py:746-769

Simple version:

# After adding new messages
if len(messages) >= 8:
    messages = messages[-6:]  # Keep last 6 messages

Why 6?: Typical form needs 3-4 exchanges, this gives buffer


3.2 Special Response Handling

3.2.1 Detect Greeting Tag

  • Problem: Should "আপনার কি আর কোন প্রশ্ন আছে?" be appended?
  • Answer: Not for greetings, goodbyes, or agent_calling
  • See: e5/e5_main.py:626-644

Task: Add response extension logic

response_extension = " আপনার কি আর কোন প্রশ্ন আছে?"

# Don't append for certain tags
if top_tag in ['greetings', 'goodbye', 'agent_calling', 'repeat_again']:
    response_extension = ""

response = top_answer + response_extension

3.2.2 Handle "Repeat Again" Request

  • Problem: User asks to repeat the last answer
  • Solution: Find last assistant message, repeat it
  • See: e5/e5_main.py:680-693

Task:

if top_tag == "repeat_again":
    # Get last assistant content
    assistant_messages = [m for m in messages if m["role"] == "assistant"]
    if len(assistant_messages) >= 2:
        last_content = assistant_messages[-2]["content"]
        last_tag = assistant_messages[-2]["tag"]

        response = "জি ধন্যবাদ। আমি উত্তরটি আবার বলছি। " + last_content
        top_tag = last_tag  # Use original tag, not "repeat_again"

3.2.3 Handle Goodbye

  • Problem: User says goodbye
  • Solution: Confirm and ask for feedback
  • See: e5/e5_main.py:705-714

Task:

if top_tag == "goodbye":
    response = "বিদায়! আপনার দিনটা সুন্দর কাটুক। আপনি কি সিস্টেমটি পছন্দ করেছেন?"
    is_conversation_finished = True

3.3 Bengali Yes/No Detection

3.3.1 Understand the Problem

  • Problem: Detect affirmative/negative responses in Bengali
  • Why needed: Multi-turn forms need yes/no detection
  • Challenge: Bengali has many variations

Examples:

  • Yes: হ্যাঁ, হা, জি, জি হ্যাঁ, আছে, চাই
  • No: না, নাই, নেই, চাই না, লাগবে না

See: flag.py:1-40


3.3.2 Implement Yes Detection

  • Problem: Check if user input means "yes"
  • Strategy: Check compound phrases first, then single words

Task: Create flag.py

def is_yes(user_input):
    yes_list = ["ইয়েস", "জি", "হ্যা", "হা", "হ্যাঁ", "হ্যাম", "ইয়াপ", "হুঁ", "এটা", "হ্াঁ"]
    compound_yes = ["জি হ্যাঁ", "ও হ্যাঁ", "জি বলেন", "হ্যাঁ বলেন"]

    user_input_lower = user_input.strip().lower()

    # Check compound patterns first (more specific)
    if any(pattern in user_input_lower for pattern in compound_yes):
        return True

    # Check individual words (max 4 words to avoid false positives)
    words = user_input_lower.split()
    if any(yes in words for yes in yes_list) and len(words) <= 4:
        return True

    return False

# Test
print(is_yes("জি হ্যাঁ"))  # True
print(is_yes("হ্যাঁ বলেন"))  # True
print(is_yes("না"))  # False

3.3.3 Implement No Detection

  • Problem: Check if user input means "no"
  • Important: Check compound negatives first (they contain affirmative words!)

Task:

def is_no(user_input):
    no_list = ["না", "নানা", "নও", "নোক", "ন", "নো"]
    compound_no = ["না চাই না", "নাহ চাচ্ছি নাহ", "চাই না", "চাচ্ছি না", "লাগবে না"]

    user_input_lower = user_input.strip().lower()

    # Check compound patterns first
    if any(pattern in user_input_lower for pattern in compound_no):
        return True

    words = user_input_lower.split()
    if any(no in words for no in no_list) and len(words) <= 4:
        return True

    return False

Critical: "চাই না" contains both "চাই" (yes-ish) and "না" (no) Solution: Check compound_no BEFORE checking individual words


3.4 Logging System

3.4.1 Set Up Loguru

  • Problem: Track what's happening for debugging
  • Library: loguru (better than standard logging)
  • See: e5/e5_main.py:50-54

Task:

pip install loguru
from loguru import logger

logger.add(
    "log_folder/app_{time:YYYY-MM-DD}.log",
    format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | {message}",
    level="INFO"
)

logger.info("Server starting...")
logger.success("RAG system initialized")
logger.warning("Low confidence score")
logger.error("Failed to process request")

Verification: Check log_folder/ for daily log files


3.4.2 Log Irrelevant Queries

  • Problem: Track questions that got low confidence scores
  • Why: Improve dataset by adding these questions
  • See: e5/e5_main.py:463-473

Task:

from filelock import FileLock
import os

def log_irrelevant_query(question, filepath="irrelevant_questions.csv"):
    lock_path = filepath + ".lock"

    # Use file lock to prevent race conditions (multiple requests)
    with FileLock(lock_path):
        if os.path.exists(filepath):
            df = pd.read_csv(filepath)
            # Only add if not already logged
            if question not in df["question"].values:
                df = pd.concat([df, pd.DataFrame([{"question": question}])], ignore_index=True)
                df.to_csv(filepath, index=False)
        else:
            df = pd.DataFrame([{"question": question}])
            df.to_csv(filepath, index=False)

# Use in endpoint
if top_score < PROBABILITY_THRESHOLD:
    log_irrelevant_query(body.question)

Why FileLock: Multiple requests might write simultaneously


3.4.3 Log Successful Matches

  • Problem: Track what questions map to what answers
  • Why: Analytics, quality assurance
  • See: e5/e5_main.py:478-496

Task:

import csv

def log_mapped_query(user_input, matched_question, tag, answer, score, filepath="mapped_queries.csv"):
    lock_path = filepath + ".lock"
    with FileLock(lock_path):
        file_exists = os.path.exists(filepath)

        with open(filepath, mode='a', newline='', encoding='utf-8') as csvfile:
            writer = csv.DictWriter(csvfile,
                fieldnames=["user_input", "matched_question", "tag", "answer", "score"])

            if not file_exists:
                writer.writeheader()

            writer.writerow({
                "user_input": user_input,
                "matched_question": matched_question,
                "tag": tag,
                "answer": answer[:100],  # Truncate long answers
                "score": score
            })

# Use in endpoint
if top_score > PROBABILITY_THRESHOLD:
    log_mapped_query(body.question, top_question, top_tag, top_answer, top_score)

PHASE 4: MULTI-TURN FORMS (The Complex Part)

4.1 Understanding State Machines

4.1.1 What is a State Machine?

  • Problem: Track where we are in a multi-step conversation
  • Example: ATM withdrawal
    1. State: IDLE → Insert card → State: CARD_INSERTED
    2. State: CARD_INSERTED → Enter PIN → State: AUTHENTICATED
    3. State: AUTHENTICATED → Select amount → State: DISPENSING
    4. State: DISPENSING → Take cash → State: IDLE

For our chatbot:

State: NONE (no active form)
    ↓ User asks foreign resident question
State: ACTIVE (waiting for country)
    ↓ User provides country
State: COMPLETED (gave consulate info)
    ↓
State: NONE

Complication: What if interrupted?

State: ACTIVE (waiting for country)
    ↓ User asks DIFFERENT question
State: INTERRUPTED (paused form, answered other question)
    ↓ User says "yes, continue"
State: ACTIVE (resume waiting for country)

4.1.2 Identify Form Groups

  • Problem: Which questions belong to the same conversation flow?
  • See: e5/multi_turn_state.py:21-41

Task: Define form groups

FORM_GROUPS = {
    "foreign_resident": [
        "foreign_resident_action_after_biometrics_new",
        "foreign_resident_card_picture_done__inquery_done_no_msg_new",
        "foreign_resident_card_registration_process",
        "nid_registration_process_for_bangladeshis_abroad",
        # ... 9 tags total
    ],
    # Add more form groups as needed
}

# Create reverse mapping: tag → group name
TAG_TO_FORM_GROUP = {}
for group_name, tags in FORM_GROUPS.items():
    for tag in tags:
        TAG_TO_FORM_GROUP[tag] = group_name

Why group: All these questions are about foreign resident registration Flow: Ask question → Bot appends "Which country?" → User answers → Consulate info


4.1.3 Detect Form State from Messages

  • Problem: Look at conversation history, determine current state
  • States: "none", "active", "interrupted"
  • See: e5/multi_turn_state.py:105-153

Task:

def get_form_state(messages):
    """
    Returns: (state, form_group, form_tag, original_question)
    """
    if len(messages) < 1:
        return "none", None, None, None

    # Get last assistant message
    assistant_msgs = [m for m in messages if m.get("role") == "assistant"]
    if not assistant_msgs:
        return "none", None, None, None

    last_assistant = assistant_msgs[-1]

    # Check for interrupted state (highest priority)
    if last_assistant.get("form_interrupted"):
        form_tag = last_assistant.get("original_form_tag")
        form_group = TAG_TO_FORM_GROUP.get(form_tag)
        original_q = last_assistant.get("original_question", "")
        return "interrupted", form_group, form_tag, original_q

    # Check for active state
    last_tag = last_assistant.get("tag")
    form_group = TAG_TO_FORM_GROUP.get(last_tag)

    if form_group:
        # Find original question that started this form
        for i, msg in enumerate(messages):
            if msg.get("role") == "assistant" and TAG_TO_FORM_GROUP.get(msg.get("tag")) == form_group:
                if i > 0 and messages[i-1].get("role") == "user":
                    original_q = messages[i-1].get("content", "")
                    return "active", form_group, msg.get("tag"), original_q
                break

    return "none", None, None, None

Logic:

  1. If last message has form_interrupted metadata → INTERRUPTED
  2. If last assistant tag is in a form group → ACTIVE
  3. Otherwise → NONE

4.2 Country Detection

4.2.1 Define Approved Countries

  • Problem: Which countries have Bangladesh consulates?
  • Data: Map Bengali/English variations to canonical names
  • See: e5/multi_turn_state.py:44-72

Task:

APPROVED_COUNTRIES = {
    # Bangla → Canonical
    "সংযুক্ত আরব আমিরাত": "UAE",
    "মালয়েশিয়া": "Malaysia",
    "কুয়েত": "Kuwait",
    "কাতার": "Qatar",
    "যুক্তরাজ্য": "UK",
    "ইংল্যান্ড": "UK",
    # English variations
    "uae": "UAE",
    "united arab emirates": "UAE",
    "dubai": "UAE",
    "malaysia": "Malaysia",
    "uk": "UK",
    "united kingdom": "UK",
    "england": "UK",
    # ... more countries
}

Why multiple entries: User might say "UAE" or "Dubai" or "সংযুক্ত আরব আমিরাত" Canonical: Always return standardized name ("UAE", "Malaysia", etc.)


4.2.2 Implement Country Detection

  • Problem: Find country name in user input
  • Challenge: Case-insensitive, substring matching
  • See: e5/multi_turn_state.py:156-174

Task:

def detect_country(text):
    """Returns canonical country name or None"""
    text_lower = text.lower()

    for country_variant, canonical in APPROVED_COUNTRIES.items():
        if country_variant.lower() in text_lower:
            return canonical

    return None

# Test
print(detect_country("আমি সংযুক্ত আরব আমিরাত থেকে"))  # "UAE"
print(detect_country("I'm in Dubai"))  # "UAE"
print(detect_country("malaysia"))  # "Malaysia"
print(detect_country("france"))  # None (not approved)

4.2.3 Define Consulate Information

  • Problem: What to tell user for each country?
  • Data: Pre-written responses for each approved country
  • See: e5/multi_turn_state.py:75-85

Task:

CONSULATE_INFO = {
    "UAE": "সংযুক্ত আরব আমিরাতে বাংলাদেশ দূতাবাস, আবুধাবিতে অবস্থিত। এছাড়াও কনস্যুলেট জেনারেল অফিস দুবাইতে রয়েছে।",
    "Malaysia": "মালয়েশিয়ায় বাংলাদেশ হাই কমিশন, কুয়ালালামপুরে অবস্থিত।",
    "UK": "যুক্তরাজ্যে বাংলাদেশ হাই কমিশন, লন্ডনে অবস্থিত।",
    # ... more countries
}

def get_consulate_info(canonical_country):
    return CONSULATE_INFO.get(canonical_country, "")

4.3 Form Handler Logic

4.3.1 Handle Active Form State

  • Problem: User is in a form, they respond with country name
  • Expected behavior: Return consulate info, complete form
  • See: e5/multi_turn_state.py:256-358

Task:

def handle_foreign_resident_form(user_input, messages, state, current_rag_tag=None):
    """
    Returns: response dict if form handles it, None otherwise
    """
    if state not in ("active", "interrupted"):
        return None

    # STATE: ACTIVE
    if state == "active":
        detected_country = detect_country(user_input)

        if detected_country:
            # User provided valid country
            consulate_info = CONSULATE_INFO.get(detected_country, "")
            response_text = f"আপনি {detected_country} দেশের কনস্যুলেট জেনারেল অফিসের নাম এবং ঠিকানা হচ্ছে {consulate_info}"

            return {
                "question": f"প্রবাসী নিবন্ধন - {detected_country}",
                "answer": response_text,
                "tag": "foreign_resident_consulate_info",
                "score": 0.96,
                "form_completed": True
            }

        # Check if short input (1-2 words) but not recognized
        word_count = len(user_input.split())
        if word_count <= 2:
            # Likely unsupported country
            return {
                "question": "প্রবাসী নিবন্ধন - অসমর্থিত দেশ",
                "answer": "আপনি যে দেশে বসবাস করছেন, সেখানে কনস্যুলেট অফিস নেই।",
                "tag": "foreign_resident_no_consulate",
                "score": 0.95,
                "form_completed": True
            }

        # Input is >2 words and not a country → interruption
        return None  # Signal interruption to caller

Logic:

  1. If country detected → Give consulate info, mark complete
  2. If 1-2 words but no match → "No consulate in that country"
  3. If >2 words → User asked different question (interruption)

4.3.2 Handle Interrupted Form State

  • Problem: User is in interrupted state, check if they want to resume
  • Expected: "হ্যাঁ" → resume, "না" → cancel
  • See: e5/multi_turn_state.py:289-314

Task:

def is_affirmative(text):
    """Check if text is yes/affirmative"""
    affirmative = ["হ্যাঁ", "হা", "জি", "ঠিক", "আছে", "yes", "yeah"]
    text_lower = text.lower()
    return any(word in text_lower for word in affirmative)

def is_negative(text):
    """Check if text is no/negative"""
    negative = ["না", "নাহ", "নেই", "নাই", "no", "nope"]
    text_lower = text.lower()
    return any(word in text_lower for word in negative)

# In handle_foreign_resident_form:
if state == "interrupted":
    # Check negative FIRST (compound phrases contain affirmative words)
    if is_negative(user_input):
        return None  # Cancel form, proceed with RAG

    elif is_affirmative(user_input):
        # Resume form
        return {
            "question": "প্রবাসী নিবন্ধন - ফর্ম পুনরায় শুরু",
            "answer": "আপনি কোন দেশ থেকে ভোটার নিবন্ধন করতে চাচ্ছেন?",
            "tag": "foreign_resident_card_registration_process",
            "score": 0.97,
            "form_resumed": True
        }

    else:
        # Ambiguous, treat as cancel
        return None

Critical: Check is_negative BEFORE is_affirmative Why: "চাই না" contains both "চাই" (want) and "না" (no)


4.4 Integration with Main Pipeline

4.4.1 Add Multi-Turn Check (First Pass)

  • Problem: Before RAG, check if we're in a form
  • Why: Form handler might provide answer directly (country input)
  • See: e5/e5_main.py:535-548

Task: Update endpoint

@app.post("/ec_bot/")
def get_response(body: RequestBody):
    user_input = clean_text(body.question)
    messages = json.loads(body.messages)
    messages.append({"role": "user", "content": body.question})

    # FIRST PASS: Check multi-turn WITHOUT RAG tag
    multi_turn_response = process_multi_turn_query(user_input, messages[:-1], current_rag_tag=None)

    if multi_turn_response:
        # Form handler provided response (country detected or resume)
        final_output = multi_turn_response
    else:
        # No active form, proceed with RAG
        results = rag.search(user_input, k=3)
        final_output = {
            "question": results[0][0],
            "answer": results[0][1],
            "score": results[0][2],
            "tag": results[0][3]
        }

    # ... rest of logic

4.4.2 Add Multi-Turn Check (Second Pass)

  • Problem: RAG might return a tag from same form group
  • Example: User in foreign_resident form, asks another foreign_resident question
  • Solution: Check if RAG tag triggers same-group handling
  • See: e5/e5_main.py:550-556

Task:

if multi_turn_response:
    final_output = multi_turn_response
else:
    results = rag.search(user_input, k=3)
    rag_tag = results[0][3]

    # SECOND PASS: Check if RAG tag should be handled by form
    multi_turn_response_v2 = process_multi_turn_query(
        user_input,
        messages[:-1],
        current_rag_tag=rag_tag
    )

    if multi_turn_response_v2:
        final_output = multi_turn_response_v2
    else:
        final_output = {
            "question": results[0][0],
            "answer": results[0][1],
            "score": results[0][2],
            "tag": rag_tag
        }

Why two passes:

  • First: Catch country input (no RAG needed)
  • Second: Handle same-group question interruptions

4.4.3 Append Follow-up Question

  • Problem: When user first asks form question, append "Which country?"
  • Only if: Not already in this form group
  • See: e5/e5_main.py:600-607

Task:

def should_append_followup(tag, messages):
    """Returns follow-up text to append, or None"""
    form_group = TAG_TO_FORM_GROUP.get(tag)
    if not form_group:
        return None

    # Check if already in this form
    state, active_group, _, _ = get_form_state(messages)
    if state == "active" and active_group == form_group:
        return None  # Already in form, don't re-ask

    # First time in this form
    if form_group == "foreign_resident":
        return " আপনি কোন দেশ থেকে ভোটার নিবন্ধন করতে চাচ্ছেন?"

    return None

# In endpoint, after getting final_output:
came_from_form_handler = final_output.get('form_resumed', False) or final_output.get('form_completed', False)

if not came_from_form_handler:
    followup = should_append_followup(final_output['tag'], messages[:-1])
    if followup:
        final_output['answer'] = final_output['answer'] + followup

Logic: Only append if this is the FIRST message in the form


4.4.4 Detect and Mark Interruptions

  • Problem: User was in form, asks different question
  • Action: Append "Do you want to continue with [original question]?"
  • Metadata: Mark message as interrupted for state tracking
  • See: e5/e5_main.py:610-621, 444-498

Task:

def should_append_interruption_confirm(current_tag, messages):
    """Returns confirmation text to append, or None"""
    state, active_group, form_tag, original_question = get_form_state(messages)

    if state != "active":
        return None  # Not in active form

    # Don't trigger if form just completed
    if current_tag in ['foreign_resident_consulate_info', 'foreign_resident_no_consulate']:
        return None

    # Interruption detected
    display_q = original_question[:50] + "..." if len(original_question) > 50 else original_question
    return f" আপনি কি {display_q} সম্পর্কে জানতে চাচ্ছেন নাহ?"

def mark_message_as_interrupted(message, form_tag, original_question):
    """Add metadata to track interrupted state"""
    message["form_interrupted"] = True
    message["original_form_tag"] = form_tag
    message["original_question"] = original_question
    return message

# In endpoint:
interruption_confirm = None
if not came_from_form_handler:
    interruption_confirm = should_append_interruption_confirm(final_output['tag'], messages[:-1])
    if interruption_confirm:
        final_output['answer'] = final_output['answer'] + interruption_confirm

# Build assistant message
assistant_message = {
    "role": "assistant",
    "content": final_output['answer'],
    "tag": final_output['tag']
}

# Add metadata if interrupted
if interruption_confirm:
    state, group, form_tag, original_q = get_form_state(messages[:-1])
    assistant_message = mark_message_as_interrupted(assistant_message, form_tag, original_q)

messages.append(assistant_message)

4.5 Smart Message Truncation

4.5.1 Preserve Form State During Truncation

  • Problem: Can't blindly truncate if user is in a form
  • Bad: Remove messages, lose form context
  • Good: Keep messages from form start
  • See: e5/e5_main.py:746-769

Task:

if len(messages) >= 8:
    # Check if we have active/interrupted form
    state, form_group, form_tag, original_q = get_form_state(messages)

    if state in ("active", "interrupted"):
        # Find message that started the form
        form_start_idx = None
        for i, msg in enumerate(messages):
            if msg.get("role") == "assistant" and msg.get("tag") == form_tag:
                form_start_idx = max(0, i - 1)  # Include user question before
                break

        if form_start_idx is not None:
            messages = messages[form_start_idx:]
        else:
            messages = messages[-6:]  # Fallback
    else:
        # No active form, safe to truncate
        messages = messages[-6:]

Logic:

  • If in form → Keep from form start
  • If not in form → Keep last 6 messages

PHASE 5: OPTIMIZATION & PRODUCTION

5.1 GPU Acceleration

5.1.1 Understand GPU vs CPU for FAISS

  • Problem: FAISS on CPU is fast, but GPU is faster
  • When it matters: Large datasets (>10k vectors), high request volume
  • Tradeoff: GPU requires NVIDIA GPU, more complex setup

Task: Check if GPU available

import torch
print(f"CUDA available: {torch.cuda.is_available()}")
print(f"GPU name: {torch.cuda.get_device_name(0) if torch.cuda.is_available() else 'N/A'}")

5.1.2 Move FAISS Index to GPU

  • Problem: Transfer index from CPU to GPU memory
  • See: e5/e5_main.py:125-133

Task:

pip uninstall faiss-cpu
pip install faiss-gpu
def initialize_faiss_gpu(index_cpu):
    try:
        res = faiss.StandardGpuResources()
        index_gpu = faiss.index_cpu_to_gpu(res, 0, index_cpu)  # 0 = GPU device 0
        print("FAISS index moved to GPU")
        return index_gpu
    except Exception as e:
        print(f"GPU initialization failed: {e}, using CPU")
        return index_cpu

# After building index on CPU:
index = faiss.IndexFlatIP(embedding_dim)
index.add(embeddings)

# Move to GPU if available
index = initialize_faiss_gpu(index)

Performance: GPU can be 5-10x faster for large indexes


5.1.3 Handle GPU for Saving/Loading

  • Problem: Can't save GPU index directly, must convert to CPU first
  • See: e5/e5_main.py:266-295

Task:

def save_index(self, path):
    # Move to CPU for saving
    index_cpu = faiss.index_gpu_to_cpu(self.index)
    faiss.write_index(index_cpu, path)

def load_index(self, path):
    # Load on CPU
    index_cpu = faiss.read_index(path)
    # Move to GPU
    self.index = initialize_faiss_gpu(index_cpu)

5.2 Error Handling & Robustness

5.2.1 Wrap Endpoint in Try-Except

  • Problem: Any error crashes the server
  • Solution: Catch exceptions, return error response
  • See: e5/e5_main.py:828-829

Task:

@app.post("/ec_bot/")
def get_response(body: RequestBody):
    try:
        # ... all the logic ...
        return {
            "response": response,
            "messages": messages_str,
            "tag": tag,
            # ...
        }
    except Exception as e:
        logger.exception(f"Error processing request: {e}")
        return {
            "error": str(e),
            "response": "দুঃখিত, একটি ত্রুটি ঘটেছে। অনুগ্রহ করে আবার চেষ্টা করুন।"
        }

5.2.2 Validate Input Data

  • Problem: Malformed JSON in messages field
  • Solution: Validate before parsing

Task:

@app.post("/ec_bot/")
def get_response(body: RequestBody):
    try:
        # Validate messages is valid JSON
        try:
            messages = json.loads(body.messages)
            if not isinstance(messages, list):
                raise ValueError("messages must be a JSON array")
        except json.JSONDecodeError:
            return {"error": "Invalid JSON in messages field"}

        # Validate question is not empty
        if not body.question.strip():
            return {"error": "Question cannot be empty"}

        # ... proceed with logic ...

5.3 Testing Strategy

5.3.1 Test Basic RAG Search

  • Problem: Verify RAG returns correct answers
  • Method: Hardcode test questions with expected tags

Task: Create test_rag.py

from e5.e5_main import RAGSystem

rag = RAGSystem("full_dataset/ec_train.csv", "full_dataset/tag_answer.csv")

test_cases = [
    ("আমার কার্ড হারিয়ে গেছে", "card_lost_and_damaged"),
    ("নতুন ভোটার নিবন্ধন কিভাবে করবো", "online_new_voter_registration"),
    ("প্রবাসী নিবন্ধন", "foreign_resident"),
]

for query, expected_tag in test_cases:
    results = rag.search(query, k=1)
    actual_tag = results[0][3]
    score = results[0][2]

    print(f"Query: {query}")
    print(f"Expected: {expected_tag}, Got: {actual_tag}, Score: {score:.3f}")
    print(f"Status: {'PASS' if expected_tag in actual_tag else 'FAIL'}")
    print()

5.3.2 Test Multi-Turn Forms

  • Problem: Verify form state transitions
  • Method: Simulate conversation sequences

Task: Create test_forms.py

from e5.multi_turn_state import get_form_state, process_multi_turn_query

# Test 1: Form activation
messages = []
messages.append({"role": "user", "content": "প্রবাসী নিবন্ধন সম্পর্কে জানতে চাই"})
messages.append({
    "role": "assistant",
    "content": "প্রবাসী হিসেবে নিবন্ধনের জন্য... আপনি কোন দেশ থেকে ভোটার নিবন্ধন করতে চাচ্ছেন?",
    "tag": "foreign_resident_card_registration_process"
})

state, group, tag, _ = get_form_state(messages)
assert state == "active"
assert group == "foreign_resident"
print("Test 1 PASS: Form activation")

# Test 2: Country detection
response = process_multi_turn_query("আমি UAE থেকে", messages, None)
assert response is not None
assert response['tag'] == "foreign_resident_consulate_info"
print("Test 2 PASS: Country detection")

# Test 3: Interruption handling
messages.append({"role": "user", "content": "কার্ড হারালে কি করবো?"})
# This should trigger interruption...

5.4 Deployment

5.4.1 Production Server Configuration

  • Problem: Development server (--reload) not for production
  • Solution: Use production ASGI server

Task: Run with production settings

# Install production server
pip install gunicorn

# Run with multiple workers
gunicorn e5.e5_main:app \
    --workers 4 \
    --worker-class uvicorn.workers.UvicornWorker \
    --bind 0.0.0.0:8000 \
    --timeout 120 \
    --access-logfile logs/access.log \
    --error-logfile logs/error.log

Workers: Number of concurrent request handlers (set to CPU cores)


5.4.2 Environment Variables for Configuration

  • Problem: Hardcoded paths, settings in code
  • Solution: Use environment variables
  • See: e5/e5_main.py:413-426

Task:

import os
from pathlib import Path

# Allow override via environment variables
QUESTION_CSV = os.getenv("QUESTION_TAG_CSV_PATH", "full_dataset/ec_train.csv")
ANSWER_CSV = os.getenv("TAG_ANSWER_CSV_PATH", "full_dataset/tag_answer.csv")
INDEX_PATH = os.getenv("FAISS_INDEX_PATH", "faiss_index.bin")
THRESHOLD = float(os.getenv("PROBABILITY_THRESHOLD", "0.6"))

rag = RAGSystem(QUESTION_CSV, ANSWER_CSV, INDEX_PATH)

Usage:

export QUESTION_TAG_CSV_PATH=/path/to/custom/questions.csv
export PROBABILITY_THRESHOLD=0.7
python -m uvicorn e5.e5_main:app

5.4.3 Docker Containerization (Optional)

  • Problem: Deployment environment differences
  • Solution: Package everything in Docker

Task: Create Dockerfile

FROM python:3.10-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application
COPY . .

# Expose port
EXPOSE 8000

# Run server
CMD ["uvicorn", "e5.e5_main:app", "--host", "0.0.0.0", "--port", "8000"]

Build and run:

docker build -t ec-chatbot .
docker run -p 8000:8000 ec-chatbot

SUMMARY: The Problem-Solving Journey

How a Senior Engineer Would Actually Approach This

Week 1: Understanding

  • Read all docs, understand domain (NID/voter registration)
  • Examine data structure (CSV files, tags, questions)
  • Identify core challenge (semantic search in Bengali)
  • Research technologies (RAG, FAISS, sentence-transformers)

Week 2: Proof of Concept

  • Build minimal FastAPI hello world
  • Load CSVs, clean data
  • Get basic RAG working (single question → answer)
  • Verify Bengali text handling

Week 3: Core Features

  • Add conversation history
  • Implement logging
  • Add confidence threshold
  • Handle special cases (greetings, goodbye, repeat)

Week 4: Multi-Turn Forms

  • Design state machine
  • Implement foreign resident form
  • Add interruption handling
  • Test edge cases

Week 5: Production Polish

  • GPU optimization
  • Error handling
  • Performance testing
  • Deployment setup

Key Skills Developed Through This Project

1. Backend Development

  • FastAPI web framework
  • HTTP request/response cycle
  • Pydantic data validation
  • RESTful API design

2. Natural Language Processing

  • Text preprocessing (cleaning, normalization)
  • Embeddings (converting text to vectors)
  • Semantic similarity search
  • Multilingual NLP (Bengali)

3. Vector Databases

  • FAISS index creation and search
  • Embedding storage and retrieval
  • GPU acceleration
  • Index persistence

4. State Management

  • Conversation state tracking
  • State machine design
  • Context preservation
  • Interruption handling

5. Data Engineering

  • CSV processing with pandas
  • Data cleaning and validation
  • Join operations (merge tables)
  • File locking for concurrent writes

6. Production Engineering

  • Logging and monitoring
  • Error handling
  • Performance optimization
  • Deployment strategies

The Atomic Problem Checklist (100+ Micro-Problems)

Data & Files (10 problems)

  1. Read CSV file with pandas
  2. Handle bad CSV encoding
  3. Remove empty rows from DataFrame
  4. Clean whitespace in text columns
  5. Merge two DataFrames on common column
  6. Handle missing values in merged data
  7. Create dictionary from DataFrame columns
  8. Save DataFrame to CSV
  9. Use file locking for concurrent writes
  10. Read/write JSON files

Text Processing (15 problems)

  1. Remove punctuation with regex
  2. Normalize Bengali Unicode
  3. Convert text to lowercase
  4. Collapse multiple spaces
  5. Split text into words
  6. Detect Bengali yes/no responses
  7. Detect country names in text
  8. Format text for embedding model
  9. Handle English in Bengali text
  10. Truncate long text for display
  11. Clean user input
  12. Detect compound phrases
  13. Case-insensitive substring matching
  14. Word boundary detection
  15. Language-specific stemming

Machine Learning (20 problems)

  1. Install sentence-transformers
  2. Load pre-trained embedding model
  3. Check embedding dimension
  4. Generate embedding for single text
  5. Batch encode multiple texts
  6. Normalize embedding vectors
  7. Calculate cosine similarity
  8. Convert embeddings to numpy array
  9. Handle GPU/CPU for embeddings
  10. Format instruction for E5 model
  11. Show progress bar during encoding
  12. Set batch size for encoding
  13. Cache embeddings to disk
  14. Load cached embeddings
  15. Handle embedding dimension mismatch
  16. Understand inner product vs cosine
  17. Normalize L2 vectors
  18. Reshape embedding arrays
  19. Convert between torch and numpy
  20. Handle out-of-memory errors

FAISS Vector Search (15 problems)

  1. Install faiss-cpu or faiss-gpu
  2. Create FAISS index (IndexFlatIP)
  3. Add vectors to index
  4. Search index for nearest neighbors
  5. Interpret search results (scores, indices)
  6. Save FAISS index to disk
  7. Load FAISS index from disk
  8. Move index from CPU to GPU
  9. Move index from GPU to CPU
  10. Check index size (ntotal)
  11. Handle index build failures
  12. Understand approximate vs exact search
  13. Set search parameter k
  14. Normalize vectors before indexing
  15. Handle empty index errors

FastAPI & Web (15 problems)

  1. Create FastAPI app instance
  2. Define GET endpoint
  3. Define POST endpoint
  4. Create Pydantic model for request
  5. Validate incoming JSON
  6. Parse request body
  7. Return JSON response
  8. Handle CORS if needed
  9. Run uvicorn development server
  10. Handle server startup events
  11. Add middleware for logging
  12. Return error responses
  13. Set HTTP status codes
  14. Test endpoints with curl
  15. Use Pydantic for validation errors

State Management (20 problems)

  1. Design state machine states
  2. Detect state from message history
  3. Store metadata in messages
  4. Parse JSON message history
  5. Append messages to history
  6. Convert history to JSON string
  7. Preserve Bengali in JSON (ensure_ascii=False)
  8. Truncate message history
  9. Find specific messages by role
  10. Find specific messages by tag
  11. Check if in active form
  12. Check if form interrupted
  13. Track original form question
  14. Mark message as interrupted
  15. Resume interrupted form
  16. Cancel interrupted form
  17. Detect form completion
  18. Prevent nested forms
  19. Handle form state transitions
  20. Preserve form context during truncation

Logging & Debugging (10 problems)

  1. Install and configure loguru
  2. Log to rotating files
  3. Format log messages
  4. Set log levels (INFO, WARNING, ERROR)
  5. Log exceptions with stack traces
  6. Log irrelevant queries to CSV
  7. Log successful matches to CSV
  8. Add timestamps to logs
  9. Parse log files for analysis
  10. Monitor log file size

Production & Deployment (10 problems)

  1. Use environment variables
  2. Run with production ASGI server
  3. Configure multiple workers
  4. Set request timeout
  5. Handle graceful shutdown
  6. Implement health check endpoint
  7. Monitor server resources
  8. Set up error alerting
  9. Create Docker container
  10. Deploy to cloud platform

Final Thoughts: The Art of Problem Decomposition

This project demonstrates several key problem-solving principles:

1. Separation of Concerns

  • Data layer (CSVs, pandas)
  • Model layer (embeddings, FAISS)
  • Business logic (RAG, forms)
  • API layer (FastAPI)
  • Each layer solves independent problems

2. Incremental Complexity

  • Start simple (hello world)
  • Add features one by one
  • Test each piece before moving on
  • Complex behavior emerges from simple components

3. Abstraction Levels

  • Low: "Read CSV file"
  • Medium: "Build RAG system"
  • High: "Conversational AI chatbot"
  • Senior engineers navigate all levels fluently

4. Failure Planning

  • What if user asks off-topic question? (threshold + fallback)
  • What if user interrupts form? (interruption handling)
  • What if server crashes? (logging, error recovery)
  • What if data is corrupted? (validation, cleaning)

5. The Walking Skeleton Pattern Build the thinnest possible end-to-end slice first:

User types question → Server returns hardcoded answer
User types question → Server returns random answer from CSV
User types question → Server returns matched answer from CSV
User types question → Server returns RAG answer
User types question → Server returns RAG answer + handles forms

Each iteration is a complete, working system. You're never more than one step away from a working demo.


This is how probably a great project is built: not by solving one giant problem, but by decomposing it into hundreds of small, tractable problems, and systematically solving them one by one.

@ehzawad
Copy link
Author

ehzawad commented Oct 12, 2025

annotated-types==0.7.0
anyio==4.11.0
bnunicodenormalizer==0.1.7
certifi==2025.10.5
charset-normalizer==3.4.3
click==8.3.0
faiss-cpu==1.12.0
fastapi==0.118.3
filelock==3.20.0
fsspec==2025.9.0
h11==0.16.0
hf-xet==1.1.10
huggingface-hub==0.35.3
idna==3.10
Jinja2==3.1.6
joblib==1.5.2
loguru==0.7.3
MarkupSafe==3.0.3
mpmath==1.3.0
networkx==3.5
numpy==2.3.3
packaging==25.0
pandas==2.3.3
pillow==11.3.0
pydantic==2.12.0
pydantic_core==2.41.1
python-dateutil==2.9.0.post0
pytz==2025.2
PyYAML==6.0.3
regex==2025.9.18
requests==2.32.5
safetensors==0.6.2
scikit-learn==1.7.2
scipy==1.16.2
sentence-transformers==5.1.1
setuptools==80.9.0
six==1.17.0
sniffio==1.3.1
starlette==0.48.0
sympy==1.14.0
threadpoolctl==3.6.0
tokenizers==0.22.1
torch==2.8.0
tqdm==4.67.1
transformers==4.57.0
typing-inspection==0.4.2
typing_extensions==4.15.0
tzdata==2025.2
urllib3==2.5.0
uvicorn==0.37.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment