Skip to content

Instantly share code, notes, and snippets.

@ani03sha
Created December 4, 2025 04:36
Show Gist options
  • Select an option

  • Save ani03sha/da68d3828081aea374640ca0b0bdfb57 to your computer and use it in GitHub Desktop.

Select an option

Save ani03sha/da68d3828081aea374640ca0b0bdfb57 to your computer and use it in GitHub Desktop.

RAG Search System POC Implementation Guide

Complete guide for implementing a multi-tenant RAG search system using Google Vertex AI with support for external and internal document access control.


Table of Contents

  1. Overview
  2. Architecture Questions & Answers
  3. Project Structure
  4. Implementation Steps
  5. Key Features
  6. Testing Guide
  7. Deployment Options
  8. Troubleshooting

Overview

This POC demonstrates a production-ready RAG (Retrieval Augmented Generation) search system that:

  • ✅ Crawls public documentation (e.g., Alteryx Help)
  • ✅ Ingests internal documents (Confluence, Jira, Slack)
  • ✅ Stores all embeddings in a shared vector space
  • ✅ Implements multi-tenant access control via metadata filtering
  • ✅ Built with Google Vertex AI RAG Engine and ADK
  • ✅ Exposes as REST API or managed agent

Architecture Questions & Answers

1. Is it possible with Vertex AI? How to expose to AI Agents?

Answer: YES! ✅

How it works:

Crawling Public Docs (Step 1)

  • Vertex AI Search: Native web crawling for public URLs
  • Custom Crawler: src/ingestion/external_crawler.py
    • Uses BeautifulSoup + Requests
    • Crawls Alteryx help docs
    • Stores to GCS with source_type: external metadata
    • Imports into RAG Engine

Crawling Internal Docs (Step 2)

Vertex AI RAG Engine provides native connectors:

  • Confluence - Via confluence_connector.py
  • Jira - Via jira_connector.py
  • Slack - Via Slack API
  • SharePoint - Via SharePoint connector
  • ⚠️ Datadog - Requires custom integration

All tagged with source_type: internal metadata.

Multi-tenant Access Control (Step 3)

Metadata filtering approach:

External Users:

filter = "source_type = 'external'"
# Result: Only sees public Alteryx docs

Internal Users:

filter = None  # No filter
# Result: Sees ALL documents (external + internal)

Implemented in:

  • src/rag/filters.py - Filter builder
  • src/agent/user_context.py - User type detection
  • src/agent/handlers.py - Query routing

Exposing to AI Agents

Option A: REST API (Cloud Run)

Your Agent → HTTP POST → Cloud Run → RAG Engine → Response

Option B: Vertex AI Agent Engine

Your Agent → gRPC/HTTP → Agent Engine → RAG Engine → Response

Option C: Direct SDK Integration

from src.agent.main import RAGAgent

agent = RAGAgent()
response = agent.query(
    question="How do I connect?",
    user_type="external"
)

2. Does it make sense to use Google's ADK?

Answer: YES! Strongly Recommended ✅

Why ADK makes sense:

Feature Benefit
Purpose-built for RAG Optimized for RAG workflows with Vertex AI
Quick Development Build agents in <100 lines of Python
Managed Deployment Deploy to Vertex AI Agent Engine (fully managed)
Model Flexibility Works with Gemini, Claude, or any Vertex AI model
Battle-tested Same framework powering Google's Agentspace
Production-ready Built-in monitoring, scaling, and orchestration

What we built with ADK:

# src/agent/main.py - Main ADK Agent

class RAGAgent:
    def query(self, question, user_type):
        # 1. Detect user type (external/internal)
        user_context = self.detect_user(user_type)

        # 2. Apply metadata filter
        filter = self.build_filter(user_context)

        # 3. Query RAG Engine
        response = rag.retrieval_query(
            text=question,
            filter=filter,
            top_k=5
        )

        # 4. Generate answer with Gemini
        answer = gemini.generate(contexts=response)

        return answer

ADK Features Used:

  • ✅ Tool integration (Tool.from_retrieval)
  • ✅ Vertex AI RAG Store
  • ✅ GenerativeModel with RAG tool
  • ✅ Response grounding with sources

3. Which Vector DB will be used?

Answer: Vertex AI RAG Engine-managed Spanner (Recommended)

Option A: Managed Spanner ⭐ Recommended

Status: GA (Generally Available)
Management: Fully managed, zero infrastructure
Integration: Native with RAG Engine
Metadata: Built-in filtering support
Scaling: Automatic
Best for: Quick setup, managed simplicity

Option B: Vertex AI Vector Search 2.0

Status: GA
Management: Managed with more control
Features: Hybrid search, advanced tuning
Scaling: Massive datasets (disk-based)
Best for: Large-scale, custom requirements

Both options:

  • ✅ Support metadata filtering
  • ✅ Live in the same vector space
  • ✅ Support our multi-tenant use case

Configuration:

# config/rag_config.yaml

embedding:
  model: "text-embedding-004"
  dimension: 768

vector_search:
  distance_measure: "COSINE"
  algorithm: "TREE_AH"

4. How will the deployment look like?

Deployment Architecture:

┌─────────────────────────────────────────────────────────┐
│                    Data Sources                         │
├─────────────────────────────────────────────────────────┤
│  External              Internal                         │
│  • Alteryx Help        • Confluence                     │
│  • Public Docs         • Jira                           │
│                        • Slack                          │
└────────┬─────────────────────┬──────────────────────────┘
         │                     │
         ▼                     ▼
┌─────────────────────────────────────────────────────────┐
│              Data Ingestion Layer                       │
│  • Web Crawler (external_crawler.py)                    │
│  • Native Connectors (confluence, jira)                 │
│  • Metadata Tagging (source_type)                       │
└────────────────────┬────────────────────────────────────┘
                     │
                     ▼
┌─────────────────────────────────────────────────────────┐
│         Google Cloud Storage (Staging)                  │
│  gs://project-documents/                                │
│    ├── external/*.txt                                   │
│    └── internal/*.txt                                   │
└────────────────────┬────────────────────────────────────┘
                     │
                     ▼
┌─────────────────────────────────────────────────────────┐
│           Vertex AI RAG Engine                          │
│  • Embeddings: text-embedding-004                       │
│  • Vector DB: Managed Spanner                           │
│  • Metadata: source_type, source, category              │
└────────────────────┬────────────────────────────────────┘
                     │
                     ▼
┌─────────────────────────────────────────────────────────┐
│              ADK Agent Layer                            │
│  • User Type Detection                                  │
│  • Metadata Filter Builder                              │
│  • RAG Retrieval + Generation                           │
└────────────────────┬────────────────────────────────────┘
                     │
                     ▼
┌─────────────────────────────────────────────────────────┐
│           Deployment Options                            │
│                                                          │
│  Option A: Cloud Run (REST API)                         │
│    └─ Serverless, auto-scaling                          │
│                                                          │
│  Option B: Vertex AI Agent Engine                       │
│    └─ Fully managed, multi-agent support                │
└────────────────────┬────────────────────────────────────┘
                     │
                     ▼
┌─────────────────────────────────────────────────────────┐
│               Client Applications                       │
│  External Users          Internal Users                 │
│  • Public Chatbot        • Internal Dashboard           │
│  • Mobile App            • Slack Bot                    │
│  Sees: External only     Sees: All documents            │
└─────────────────────────────────────────────────────────┘

Infrastructure Components:

Component Technology Purpose
Compute Cloud Run / Agent Engine Serverless agent hosting
Storage GCS Document staging
Vector DB Managed Spanner Embeddings + metadata
Embedding text-embedding-004 Vector generation
LLM Gemini 2.0 Flash Answer generation
Secrets Secret Manager API tokens
IAM Service Accounts Access control
Monitoring Cloud Logging Observability

Project Structure

rag-search-vertex-ai/
│
├── README.md                    # Main documentation
├── POC_IMPLEMENTATION_GUIDE.md  # This file
├── requirements.txt             # Python dependencies
├── setup.py                     # Package setup
├── .env.example                 # Environment template
├── .gitignore                   # Git ignore rules
│
├── config/                      # Configuration
│   ├── __init__.py
│   ├── settings.py             # Pydantic settings management
│   └── rag_config.yaml         # RAG corpus configuration
│
├── src/                         # Source code
│   ├── __init__.py
│   │
│   ├── ingestion/              # Data ingestion modules
│   │   ├── __init__.py
│   │   ├── base.py            # Base ingestion class
│   │   ├── external_crawler.py     # Web crawler for public docs
│   │   ├── confluence_connector.py # Confluence API integration
│   │   ├── jira_connector.py       # Jira API integration
│   │   └── metadata_manager.py     # Metadata tagging logic
│   │
│   ├── rag/                    # RAG Engine management
│   │   ├── __init__.py
│   │   ├── corpus_manager.py  # Create/manage RAG corpus
│   │   ├── retrieval.py       # Query and retrieval logic
│   │   └── filters.py         # Metadata filtering
│   │
│   ├── agent/                  # ADK Agent implementation
│   │   ├── __init__.py
│   │   ├── main.py            # Agent entry point + CLI
│   │   ├── handlers.py        # Request handlers
│   │   └── user_context.py   # User type detection
│   │
│   └── utils/                  # Utilities
│       ├── __init__.py
│       ├── gcs_helper.py      # Google Cloud Storage utilities
│       └── logger.py          # Structured logging
│
├── scripts/                    # Setup & deployment scripts
│   ├── setup_gcp.sh           # GCP project setup
│   ├── create_corpus.sh       # Create RAG corpus
│   ├── deploy_agent.sh        # Deploy to Cloud Run/Agent Engine
│   └── test_queries.sh        # Test queries
│
├── tests/                      # Unit tests
│   ├── __init__.py
│   ├── test_ingestion.py
│   ├── test_retrieval.py
│   └── test_agent.py
│
├── docs/                       # Documentation
│   ├── setup_guide.md         # Detailed setup instructions
│   └── architecture.md        # Architecture deep dive
│
└── notebooks/                  # Jupyter notebooks (optional)
    └── poc_demo.ipynb

Implementation Steps

Prerequisites

Requirements:

  • ✅ Google Cloud Platform account with billing enabled
  • ✅ Python 3.10+ installed
  • ✅ gcloud CLI installed and configured
  • ✅ Git installed
  • ⚠️ Confluence API token (optional, for internal docs)
  • ⚠️ Jira API token (optional, for internal docs)

Step 1: Configure Environment

# 1. Navigate to project directory
cd rag-search-vertex-ai

# 2. Copy environment template
cp .env.example .env

# 3. Edit .env with your values
nano .env  # or your preferred editor

Required configuration in .env:

# ===== GCP Settings =====
GCP_PROJECT_ID=your-project-id
GCP_REGION=us-central1

# GCS Buckets (will be created automatically)
GCS_BUCKET_DOCUMENTS=your-project-documents
GCS_BUCKET_PROCESSED=your-project-processed

# ===== Service Account =====
GOOGLE_APPLICATION_CREDENTIALS=./credentials.json

# ===== RAG Corpus =====
RAG_CORPUS_NAME=knowledge-base-poc
RAG_CORPUS_DISPLAY_NAME=Knowledge Base POC
RAG_CORPUS_DESCRIPTION=POC corpus for external and internal documents

# ===== External Documentation =====
EXTERNAL_DOCS_URL=https://help.alteryx.com/aac/en/platform/connections.html
CRAWL_DEPTH=3
CRAWL_MAX_PAGES=100

# ===== Confluence (Optional) =====
CONFLUENCE_URL=https://your-domain.atlassian.net
[email protected]
CONFLUENCE_API_TOKEN=your-confluence-token
CONFLUENCE_SPACE_KEYS=SPACE1,SPACE2

# ===== Jira (Optional) =====
JIRA_URL=https://your-domain.atlassian.net
[email protected]
JIRA_API_TOKEN=your-jira-token
JIRA_PROJECT_KEYS=PROJ1,PROJ2

# ===== Agent Settings =====
AGENT_MODEL=gemini-2.0-flash-exp
AGENT_TEMPERATURE=0.7
AGENT_TOP_K=5

# ===== Logging =====
LOG_LEVEL=INFO
LOG_FORMAT=json

Step 2: Set Up GCP

2.1 Authenticate with GCP

# Login to GCP
gcloud auth login

# Set up application default credentials
gcloud auth application-default login

2.2 Install Python Dependencies

# Create virtual environment
python -m venv venv

# Activate virtual environment
source venv/bin/activate  # On Windows: venv\Scripts\activate

# Install dependencies
pip install -r requirements.txt

2.3 Run GCP Setup Script

# Make scripts executable
chmod +x scripts/*.sh

# Run setup script
./scripts/setup_gcp.sh

This script will:

  • ✅ Set active GCP project
  • ✅ Enable required APIs:
    • aiplatform.googleapis.com
    • storage.googleapis.com
    • secretmanager.googleapis.com
    • discoveryengine.googleapis.com
    • cloudfunctions.googleapis.com
    • run.googleapis.com
  • ✅ Create service account rag-search-sa
  • ✅ Grant necessary IAM roles
  • ✅ Generate service account key (credentials.json)
  • ✅ Create GCS buckets
  • ✅ Store API tokens in Secret Manager

2.4 Verify Setup

# Check enabled APIs
gcloud services list --enabled | grep -E "aiplatform|storage|secretmanager"

# Check GCS buckets
gsutil ls

# Check service account
gcloud iam service-accounts list | grep rag-search

# Verify credentials
echo $GOOGLE_APPLICATION_CREDENTIALS

Step 3: Create RAG Corpus

3.1 Create Corpus

./scripts/create_corpus.sh

Expected output:

========================================
Creating RAG Corpus
========================================

Creating RAG corpus using Python SDK...

Creating corpus: Knowledge Base POC
Description: POC corpus for external and internal documents

✓ Corpus created successfully!
  Name: projects/PROJECT_ID/locations/REGION/ragCorpora/CORPUS_ID
  Display Name: Knowledge Base POC

Add this to your .env file:
RAG_CORPUS_NAME=projects/PROJECT_ID/locations/REGION/ragCorpora/CORPUS_ID

3.2 Update .env

# Add the corpus name to .env
echo "RAG_CORPUS_NAME=projects/PROJECT_ID/locations/REGION/ragCorpora/CORPUS_ID" >> .env

3.3 Verify Corpus

# List all corpora
python -c "from src.rag.corpus_manager import CorpusManager; \
[print(f'{c.display_name}: {c.name}') for c in CorpusManager().list_corpora()]"

Step 4: Ingest Data

4.1 Crawl External Documentation

# Using default URL from .env
python src/ingestion/external_crawler.py

# Or specify a custom URL
python src/ingestion/external_crawler.py https://help.alteryx.com/aac/en/platform/connections.html

Expected output:

Starting web crawl
  start_url: https://help.alteryx.com/aac/en/platform/connections.html
  max_depth: 3
  max_pages: 100

Crawling URL: https://help.alteryx.com/... (1/100)
...

Web crawl completed
  total_pages: 47
  documents_created: 47

Documents uploaded to GCS
  count: 47
  bucket: your-project-documents

Crawling completed!
Documents crawled: 47
Uploaded to GCS: 47 files

4.2 Ingest Confluence Pages (Optional)

python src/ingestion/confluence_connector.py

Expected output:

Confluence connector initialized
  url: https://your-domain.atlassian.net
  spaces: ['SPACE1', 'SPACE2']

Fetching pages from space: SPACE1
Pages fetched from space: 24

Fetching pages from space: SPACE2
Pages fetched from space: 18

Documents uploaded to GCS: 42 files

Confluence ingestion completed!

4.3 Ingest Jira Issues (Optional)

python src/ingestion/jira_connector.py

Expected output:

Jira connector initialized
  url: https://your-domain.atlassian.net
  projects: ['PROJ1', 'PROJ2']
  jql_filter: updated >= -30d

Fetching issues from project: PROJ1
Issues fetched: 35

Fetching issues from project: PROJ2
Issues fetched: 28

Documents uploaded to GCS: 63 files

Jira ingestion completed!

4.4 Import to RAG Corpus

# Run this Python script
python << 'EOF'
from src.rag.corpus_manager import CorpusManager
from config.settings import settings

manager = CorpusManager()

print("Importing files to RAG corpus...")
print(f"Corpus: {settings.rag_corpus_name}")
print(f"Bucket: {settings.gcs_bucket_documents}")

response = manager.import_files_from_gcs(
    corpus_name=settings.rag_corpus_name,
    gcs_bucket=settings.gcs_bucket_documents,
)

print("\n✓ Import completed!")
print(f"Files imported from GCS bucket")
EOF

Alternative - Manual import specific files:

python << 'EOF'
from src.rag.corpus_manager import CorpusManager
from config.settings import settings

manager = CorpusManager()

# Import specific files
paths = [
    "gs://your-bucket/external/doc1.txt",
    "gs://your-bucket/internal/conf1.txt",
]

response = manager.import_files(
    corpus_name=settings.rag_corpus_name,
    paths=paths,
    chunk_size=1000,
    chunk_overlap=200,
)

print("Import completed!")
EOF

4.5 Verify Import

# List files in corpus
python -c "from src.rag.corpus_manager import CorpusManager; \
from config.settings import settings; \
files = CorpusManager().list_files(settings.rag_corpus_name); \
print(f'Total files in corpus: {len(files)}')"

Step 5: Test the Agent

5.1 Interactive Mode (Recommended)

python src/agent/main.py interactive

Example session:

========================================
RAG Search Agent - Interactive Mode
========================================

Commands:
  /external - Switch to external user mode
  /internal - Switch to internal user mode
  /quit or /exit - Exit interactive mode
========================================

[EXTERNAL] Your question: How do I connect to a database?

--------------------------------------------------------------------------------
ANSWER:
To connect to a database in Alteryx, you need to use the Input Data tool...
[Full answer here]

--------------------------------------------------------------------------------
SOURCES (3):
  1. Database Connections - Alteryx Help
  2. ODBC Configuration Guide
  3. Connection String Examples
--------------------------------------------------------------------------------

[EXTERNAL] Your question: /internal

Switched to INTERNAL user mode

[INTERNAL] Your question: What's the status of PROJ-123?

--------------------------------------------------------------------------------
ANSWER:
PROJ-123 is currently in progress. The issue is assigned to John Doe...
[Full answer with internal context]

--------------------------------------------------------------------------------
SOURCES (5):
  1. PROJ-123: Database Migration (Jira)
  2. Project Status Updates (Confluence)
  3. Discussion about PROJ-123 (Slack)
  4. Technical Specs (Confluence)
  5. Related Issue PROJ-124 (Jira)
--------------------------------------------------------------------------------

5.2 Single Query Mode

# External user query
python src/agent/main.py query \
    --question "How do I connect to a database?" \
    --user-type external

# Internal user query
python src/agent/main.py query \
    --question "What are the recent Jira tickets?" \
    --user-type internal \
    --top-k 10

5.3 Retrieval-Only Mode

# Get contexts without generation
python src/agent/main.py retrieve \
    --question "database connection" \
    --user-type external \
    --top-k 5

Output:

========================================
QUESTION: database connection
USER TYPE: external
========================================

RETRIEVED CONTEXTS (5):

[1] Score: 0.89
Source: https://help.alteryx.com/...
Text: To connect to a database, use the Input Data tool...

[2] Score: 0.85
Source: https://help.alteryx.com/...
Text: ODBC connections require configuration...

...

5.4 Run Test Suite

# Run all tests
pytest tests/ -v

# Run specific test file
pytest tests/test_agent.py -v

# Run with coverage
pytest tests/ --cov=src --cov-report=html

5.5 Run Integration Tests

./scripts/test_queries.sh

This script tests:

  • ✅ External user queries (filtered to external docs only)
  • ✅ Internal user queries (access to all docs)
  • ✅ Retrieval-only mode
  • ✅ Different query types

Step 6: Deploy to Production (Optional)

Option A: Deploy to Cloud Run

# Run deployment script
./scripts/deploy_agent.sh

# Select option 1 for Cloud Run
# Choice [1]: 1

What happens:

  1. Creates Dockerfile
  2. Builds container image
  3. Deploys to Cloud Run
  4. Returns service URL

Get service URL:

SERVICE_URL=$(gcloud run services describe rag-search-agent \
    --region us-central1 \
    --format 'value(status.url)')

echo "Service URL: $SERVICE_URL"

Test deployed service:

# External user query
curl -X POST $SERVICE_URL/query \
    -H "Content-Type: application/json" \
    -d '{
        "question": "How do I connect to a database?",
        "user_type": "external"
    }'

# Internal user query
curl -X POST $SERVICE_URL/query \
    -H "Content-Type: application/json" \
    -d '{
        "question": "What is PROJ-123 status?",
        "user_type": "internal",
        "user_id": "user123",
        "email": "[email protected]"
    }'

Option B: Deploy to Vertex AI Agent Engine

./scripts/deploy_agent.sh

# Select option 2 for Agent Engine
# Choice [1]: 2

Note: Agent Engine deployment requires additional configuration. Refer to:


Key Features

1. Multi-tenant Access Control

Implementation:

# src/rag/filters.py
class MetadataFilter:
    def build_filter(self, user_type: UserType):
        if user_type == UserType.EXTERNAL:
            return "source_type = 'external'"
        else:
            return None  # No filter for internal users

User Type Detection:

# src/agent/user_context.py
def detect_user_type(email: str) -> UserType:
    domain = email.split('@')[1]

    if domain in INTERNAL_DOMAINS:
        return UserType.INTERNAL
    else:
        return UserType.EXTERNAL

Usage:

# External user - sees only public docs
agent.query(
    question="How to connect?",
    email="[email protected]"  # External domain
)
# Filter applied: source_type = 'external'

# Internal user - sees all docs
agent.query(
    question="What's PROJ-123 status?",
    email="[email protected]"  # Internal domain
)
# Filter applied: None (access all)

2. Native Connectors

Confluence Connector:

from src.ingestion.confluence_connector import ConfluenceConnector

connector = ConfluenceConnector(
    space_keys=["SPACE1", "SPACE2"]
)

# Fetch all pages
documents = connector.fetch_documents()

# Upload to GCS and import
gcs_uris = connector.ingest()

Jira Connector:

from src.ingestion.jira_connector import JiraConnector

connector = JiraConnector(
    project_keys=["PROJ1", "PROJ2"],
    jql_filter="updated >= -30d"  # Last 30 days
)

# Fetch issues
documents = connector.fetch_documents()

# Upload and import
gcs_uris = connector.ingest()

Custom Search:

# Confluence CQL
confluence_docs = connector.search_pages(
    cql="space = SPACE1 AND label = important"
)

# Jira JQL
jira_docs = connector.search_issues(
    jql="project = PROJ1 AND status = 'In Progress'"
)

3. Metadata Tagging

Automatic Metadata:

# External document metadata
{
    "source_type": "external",
    "source": "alteryx_docs",
    "category": "documentation",
    "source_url": "https://help.alteryx.com/...",
    "ingestion_timestamp": "2024-12-04T10:30:00Z"
}

# Confluence document metadata
{
    "source_type": "internal",
    "source": "confluence",
    "category": "wiki",
    "space_key": "SPACE1",
    "page_id": "12345",
    "page_url": "https://confluence.com/...",
    "author": "John Doe",
    "last_modified": "2024-12-01T15:00:00Z"
}

# Jira document metadata
{
    "source_type": "internal",
    "source": "jira",
    "category": "issues",
    "project_key": "PROJ1",
    "issue_key": "PROJ-123",
    "issue_type": "Story",
    "status": "In Progress",
    "assignee": "Jane Smith"
}

Custom Metadata:

from src.ingestion.metadata_manager import MetadataManager

manager = MetadataManager()

# Create custom metadata
metadata = manager.create_external_metadata(
    source="custom_docs",
    url="https://example.com",
    category="api_reference",
    custom_field="custom_value"  # Add any custom fields
)

4. ADK Agent Features

Query Validation:

# src/agent/handlers.py
class QueryValidator:
    @staticmethod
    def validate_query(question: str):
        # Check empty
        if not question.strip():
            return False, "Query cannot be empty"

        # Check length
        if len(question) < 3:
            return False, "Query too short"

        if len(question) > 1000:
            return False, "Query too long"

        return True, None

User Context Management:

# src/agent/user_context.py
context_manager = UserContextManager()

# Get or create context
context = context_manager.get_or_create_context(
    user_id="user123",
    email="[email protected]",
    name="John Doe"
)

# Context persists across queries
response1 = agent.query("First question", user_id="user123")
response2 = agent.query("Follow-up", user_id="user123")
# Same context used

Response Format:

{
    "answer": "To connect to a database...",
    "sources": [
        {
            "title": "Database Connections",
            "url": "https://help.alteryx.com/..."
        }
    ],
    "metadata": {
        "user_type": "external",
        "filter": "source_type = 'external'",
        "model": "gemini-2.0-flash-exp",
        "user_id": "user123",
        "user_email": "[email protected]"
    }
}

5. Production-Ready Features

Structured Logging:

# config/settings.py
LOG_LEVEL=INFO
LOG_FORMAT=json

# All logs structured as JSON
{
    "timestamp": "2024-12-04T10:30:00Z",
    "level": "INFO",
    "name": "RAGAgent",
    "message": "Query processed",
    "question": "How to connect?",
    "user_type": "external",
    "sources_count": 5
}

Error Handling:

try:
    response = agent.query(question, user_type)
except Exception as e:
    logger.error("Query failed", error=str(e))
    return {
        "answer": "Error processing request",
        "sources": [],
        "metadata": {"error": str(e)}
    }

Rate Limiting:

# src/ingestion/external_crawler.py
for link in links:
    self._crawl_recursive(link, depth + 1)
    time.sleep(0.5)  # Rate limiting

Retry Logic:

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=10)
)
def _fetch_page(self, url: str):
    response = self.session.get(url, timeout=10)
    response.raise_for_status()
    return response

Testing Guide

Unit Tests

# Run all tests
pytest tests/ -v

# Expected output:
# tests/test_agent.py::TestUserContext::test_user_context_creation PASSED
# tests/test_agent.py::TestUserContext::test_detect_user_type_internal PASSED
# tests/test_agent.py::TestQueryValidator::test_validate_valid_query PASSED
# tests/test_ingestion.py::TestMetadataManager::test_create_external_metadata PASSED
# tests/test_retrieval.py::TestMetadataFilter::test_external_filter PASSED
#
# ========================== 15 passed in 2.34s ==========================

Integration Tests

# Run integration test script
./scripts/test_queries.sh

# Tests:
# 1. External user - public docs query
# 2. External user - platform-specific query
# 3. Internal user - same query (more sources)
# 4. Internal user - internal docs query
# 5. Retrieval-only mode

Manual Testing Scenarios

Scenario 1: External User Access Control

# Should only see external docs
response = agent.query(
    question="How do I connect to a database?",
    email="[email protected]"
)

# Verify all sources are from external docs
assert all(
    "alteryx.com" in source["url"]
    for source in response["sources"]
)

Scenario 2: Internal User Access

# Should see all docs (external + internal)
response = agent.query(
    question="Database connection issues",
    email="[email protected]"
)

# Sources should include Jira/Confluence
sources = [s["url"] for s in response["sources"]]
assert any("jira" in url or "confluence" in url for url in sources)

Scenario 3: Metadata Filtering

# Custom filter
from src.rag.filters import MetadataFilter

filter_builder = MetadataFilter()
custom_filter = filter_builder.build_custom_filter(
    source="jira",
    category="issues",
    status="In Progress"
)

# Should return: source = 'jira' AND category = 'issues' AND status = 'In Progress'

Deployment Options

Cloud Run Deployment

Benefits:

  • ✅ Serverless (pay per request)
  • ✅ Auto-scaling (0 to 1000 instances)
  • ✅ Simple REST API
  • ✅ Fast deployment

Configuration:

# Cloud Run service spec
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: rag-search-agent
spec:
  template:
    spec:
      containers:
      - image: gcr.io/PROJECT_ID/rag-search-agent
        env:
        - name: GCP_PROJECT_ID
          value: "your-project-id"
        - name: RAG_CORPUS_NAME
          value: "projects/.../ragCorpora/..."
        resources:
          limits:
            memory: 2Gi
            cpu: 2

Deploy:

gcloud run deploy rag-search-agent \
    --source . \
    --region us-central1 \
    --allow-unauthenticated \
    --memory 2Gi \
    --cpu 2 \
    --timeout 300

Vertex AI Agent Engine Deployment

Benefits:

  • ✅ Fully managed agent runtime
  • ✅ Built-in monitoring
  • ✅ Multi-agent orchestration
  • ✅ Enterprise-grade scaling

Deployment (conceptual):

# agent_config.yaml
agent:
  name: rag-search-agent
  description: Multi-tenant RAG search

  tools:
    - name: rag_retrieval
      type: vertex_rag
      config:
        corpus: projects/.../ragCorpora/...

  model:
    name: gemini-2.0-flash-exp
    temperature: 0.7

Deploy:

# Using ADK
gcloud ai agents deploy \
    --config agent_config.yaml \
    --region us-central1

Troubleshooting

Common Issues

1. Authentication Errors

Problem:

PermissionDenied: 403 Permission denied

Solution:

# Re-authenticate
gcloud auth application-default login

# Set credentials
export GOOGLE_APPLICATION_CREDENTIALS=./credentials.json

# Verify
gcloud auth application-default print-access-token

2. API Not Enabled

Problem:

API [aiplatform.googleapis.com] not enabled

Solution:

# Enable API
gcloud services enable aiplatform.googleapis.com

# Verify
gcloud services list --enabled | grep aiplatform

3. Corpus Not Found

Problem:

NotFound: RAG corpus not found

Solution:

# List all corpora
python -c "from src.rag.corpus_manager import CorpusManager; \
[print(c.name) for c in CorpusManager().list_corpora()]"

# Update RAG_CORPUS_NAME in .env with correct value

4. Import Failures

Problem:

Failed to import files to corpus

Solutions:

# Check files exist in GCS
gsutil ls gs://your-bucket/

# Verify file formats (supported: PDF, TXT, HTML, DOCX, etc.)
gsutil ls -l gs://your-bucket/file.txt

# Check quota limits
gcloud alpha billing quotas list --service=aiplatform.googleapis.com

# Try importing single file
python -c "
from src.rag.corpus_manager import CorpusManager
manager = CorpusManager()
manager.import_files(
    corpus_name='...',
    paths=['gs://bucket/test.txt']
)
"

5. Empty Retrieval Results

Problem:

No relevant documents found

Solutions:

# Check corpus has files
python -c "from src.rag.corpus_manager import CorpusManager; \
from config.settings import settings; \
print(f'Files: {len(CorpusManager().list_files(settings.rag_corpus_name))}')"

# Check metadata filter isn't too restrictive
# Temporarily disable filter:
python -c "
from src.agent.main import RAGAgent
agent = RAGAgent()
# Use internal user type (no filter)
response = agent.query('test query', user_type='internal')
print(f'Sources: {len(response[\"sources\"])}')
"

# Verify embeddings were created
# Check Cloud Console > Vertex AI > RAG Engine > Corpus > Files

6. Slow Queries

Problem:

Queries taking > 5 seconds

Solutions:

# Reduce top_k
response = agent.query(question, top_k=3)

# Disable reranking (faster but less accurate)
# config/rag_config.yaml
reranking:
  enabled: false

# Use faster model
AGENT_MODEL=gemini-1.5-flash

# Add caching
from functools import lru_cache

@lru_cache(maxsize=1000)
def cached_query(question, user_type):
    return agent.query(question, user_type)

7. Cost Issues

Problem:

Unexpected high costs

Solutions:

# Check usage
gcloud billing projects describe $GCP_PROJECT_ID

# Set budget alerts
gcloud billing budgets create \
    --billing-account=ACCOUNT_ID \
    --display-name="RAG Search Budget" \
    --budget-amount=100

# Optimize:
# 1. Use storage-optimized Vector Search for large corpora
# 2. Batch embedding requests
# 3. Cache frequent queries
# 4. Delete unused corpora
# 5. Set lifecycle policies on GCS

Getting Help

Logs:

# Application logs
tail -f logs/app.log

# Cloud Run logs
gcloud run services logs read rag-search-agent \
    --region us-central1 \
    --limit 50

# Enable debug logging
# In .env
LOG_LEVEL=DEBUG

Support:


Next Steps

After completing the POC:

1. Customize for Your Use Case

# Add custom data source
# src/ingestion/custom_connector.py

class CustomConnector(BaseIngestion):
    def fetch_documents(self):
        # Your custom logic
        pass

2. Tune Retrieval Parameters

# config/rag_config.yaml

retrieval:
  external:
    top_k: 10  # Increase results
    similarity_threshold: 0.6  # Adjust threshold

3. Implement Advanced Features

  • Hybrid Search: Combine vector + keyword
  • Query Expansion: Auto-expand queries
  • Feedback Loop: Learn from user feedback
  • Caching: Add Redis for performance
  • Monitoring: Cloud Monitoring dashboards

4. Scale to Production

  • Security: VPC-SC, CMEK encryption
  • Monitoring: Custom dashboards
  • CI/CD: Automated deployments
  • Multi-region: Deploy to multiple regions
  • Load Testing: Verify performance at scale

Summary

This POC provides a complete, production-ready RAG search system with:

Multi-tenant access control (external vs internal users) ✅ Native connectors (Confluence, Jira, Slack) ✅ Web crawling for public documentation ✅ Metadata filtering for access control ✅ Google ADK agent implementation ✅ Vertex AI RAG Engine integration ✅ Cloud Run deployment ready ✅ Comprehensive testing (unit + integration) ✅ Production features (logging, error handling, monitoring)

Total Implementation:

  • 30+ Python modules
  • 4 deployment scripts
  • Comprehensive documentation
  • Unit tests
  • Ready to deploy

Follow the steps above to deploy your RAG search system in under 2 hours!


Resources


Last Updated: 2025-12-04 Version: 1.0.0 Status: Production Ready

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment