Skip to content

Instantly share code, notes, and snippets.

@deepaks7n
Last active May 22, 2025 21:34
Show Gist options
  • Select an option

  • Save deepaks7n/9dc24fc4ce78fdb4b3c3e189dc9ea65b to your computer and use it in GitHub Desktop.

Select an option

Save deepaks7n/9dc24fc4ce78fdb4b3c3e189dc9ea65b to your computer and use it in GitHub Desktop.

PDF Invoice Parser System - Low Level Design (LLD)

1. System Components Architecture

1.1 Component Interaction Diagram

classDiagram
    class InvoiceParser {
        <<interface>>
        +parse(document: PDFDocument) ParseResult
        +validate(result: ParseResult) bool
        +get_confidence() float
    }
    
    class BaseParser {
        <<abstract>>
        #logger: Logger
        #config: ParserConfig
        +parse(document: PDFDocument) ParseResult
        +validate(result: ParseResult) bool
        +extract_metadata(document: PDFDocument) dict
    }
    
    class TextParser {
        -text_extractor: TextExtractor
        +parse(document: PDFDocument) ParseResult
        +extract_tables(document: PDFDocument) list
    }
    
    class OCRParser {
        -ocr_engine: OCREngine
        -image_processor: ImageProcessor
        +parse(document: PDFDocument) ParseResult
        +preprocess_image(image: Image) Image
    }
    
    class AIParser {
        -llm_client: LLMClient
        -prompt_manager: PromptManager
        +parse(document: PDFDocument) ParseResult
        +generate_prompt(vendor: Vendor) str
    }
    
    class ParserStrategy {
        -parsers: dict~str, InvoiceParser~
        -vendor_detector: VendorDetector
        +select_parser(document: PDFDocument) InvoiceParser
        +register_parser(vendor_id: str, parser: InvoiceParser)
    }
    
    class VendorDetector {
        -text_matcher: TextMatcher
        -ai_classifier: AIClassifier
        -cache: Cache
        +detect_vendor(document: PDFDocument) Vendor
        +update_vendor_patterns(vendor: Vendor, patterns: list)
    }
    
    InvoiceParser <|-- BaseParser
    BaseParser <|-- TextParser
    BaseParser <|-- OCRParser
    BaseParser <|-- AIParser
    ParserStrategy --> InvoiceParser
    ParserStrategy --> VendorDetector
Loading

1.2 Parser Factory Pattern

classDiagram
    class ParserFactory {
        <<abstract>>
        +create_parser(config: ParserConfig) InvoiceParser
    }
    
    class TextParserFactory {
        +create_parser(config: ParserConfig) TextParser
    }
    
    class OCRParserFactory {
        +create_parser(config: ParserConfig) OCRParser
    }
    
    class AIParserFactory {
        +create_parser(config: ParserConfig) AIParser
    }
    
    class ParserRegistry {
        -factories: dict~str, ParserFactory~
        +register_factory(type: str, factory: ParserFactory)
        +create_parser(type: str, config: ParserConfig) InvoiceParser
    }
    
    ParserFactory <|-- TextParserFactory
    ParserFactory <|-- OCRParserFactory
    ParserFactory <|-- AIParserFactory
    ParserRegistry --> ParserFactory
Loading

2. Data Models

2.1 Core Data Structures

classDiagram
    class PDFDocument {
        +document_id: str
        +file_path: str
        +file_size: int
        +page_count: int
        +metadata: dict
        +created_at: datetime
        +get_page(page_num: int) PDFPage
        +get_text() str
    }
    
    class PDFPage {
        +page_number: int
        +width: float
        +height: float
        +text_content: str
        +images: list~Image~
        +tables: list~Table~
        +get_text_blocks() list~TextBlock~
    }
    
    class ParseResult {
        +job_id: str
        +vendor_id: str
        +invoice_number: str
        +invoice_date: datetime
        +total_amount: Decimal
        +line_items: list~LineItem~
        +extracted_fields: dict
        +confidence_scores: dict
        +parser_used: str
        +processing_time: float
    }
    
    class LineItem {
        +description: str
        +quantity: float
        +unit_price: Decimal
        +total_price: Decimal
        +tax_rate: float
        +metadata: dict
    }
    
    class Vendor {
        +vendor_id: str
        +vendor_name: str
        +parser_strategy: str
        +field_mappings: dict
        +validation_rules: list~Rule~
        +confidence_threshold: float
    }
    
    PDFDocument "1" --> "*" PDFPage
    ParseResult "1" --> "*" LineItem
    ParseResult --> Vendor
Loading

2.2 Configuration Models

classDiagram
    class ParserConfig {
        +parser_type: str
        +timeout_seconds: int
        +retry_count: int
        +confidence_threshold: float
        +custom_settings: dict
    }
    
    class TextParserConfig {
        +extraction_library: str
        +table_detection: bool
        +regex_patterns: dict
    }
    
    class OCRConfig {
        +engine: str
        +language: str
        +dpi: int
        +preprocessing_steps: list
    }
    
    class AIParserConfig {
        +model: str
        +temperature: float
        +max_tokens: int
        +prompt_template: str
    }
    
    ParserConfig <|-- TextParserConfig
    ParserConfig <|-- OCRConfig
    ParserConfig <|-- AIParserConfig
Loading

3. Service Layer Architecture

3.1 Service Components

flowchart TB
    subgraph "API Layer"
        A[FastAPI Application]
        B[Request Validator]
        C[Response Formatter]
    end
    
    subgraph "Business Logic Layer"
        D[Invoice Service]
        E[Vendor Service]
        F[Parser Service]
    end
    
    subgraph "Data Access Layer"
        G[Document Repository]
        H[Vendor Repository]
        I[Result Repository]
    end
    
    subgraph "External Services"
        J[S3 Service]
        K[Queue Service]
        L[Cache Service]
    end
    
    A --> B
    B --> D
    D --> F
    D --> E
    E --> H
    F --> G
    G --> J
    D --> K
    E --> L
    F --> I
    C --> A
Loading

3.2 Lambda Function Architecture

classDiagram
    class LambdaHandler {
        <<abstract>>
        +handle(event: dict, context: dict) dict
        #validate_input(event: dict) bool
        #process(data: dict) dict
        #format_response(result: dict) dict
    }
    
    class VendorDetectionHandler {
        -vendor_detector: VendorDetector
        +handle(event: dict, context: dict) dict
    }
    
    class ParsingHandler {
        -parser_strategy: ParserStrategy
        +handle(event: dict, context: dict) dict
    }
    
    class ResultProcessorHandler {
        -result_validator: ResultValidator
        -storage_service: StorageService
        +handle(event: dict, context: dict) dict
    }
    
    LambdaHandler <|-- VendorDetectionHandler
    LambdaHandler <|-- ParsingHandler
    LambdaHandler <|-- ResultProcessorHandler
Loading

4. Processing Pipeline

4.1 Queue Processing Flow

stateDiagram-v2
    [*] --> Queued: Invoice Uploaded
    Queued --> Processing: Job Dequeued
    Processing --> VendorDetection: Start Processing
    
    VendorDetection --> ParserSelection: Vendor Identified
    VendorDetection --> AIClassification: Vendor Unknown
    AIClassification --> ParserSelection: Vendor Classified
    
    ParserSelection --> TextParsing: Digital PDF
    ParserSelection --> OCRProcessing: Scanned PDF
    ParserSelection --> AIParsing: Complex Format
    
    TextParsing --> Validation: Extraction Complete
    OCRProcessing --> Validation: OCR Complete
    AIParsing --> Validation: AI Processing Complete
    
    Validation --> Success: Validation Passed
    Validation --> Retry: Validation Failed
    Retry --> AIParsing: Retry with AI
    Retry --> Failed: Max Retries Exceeded
    
    Success --> [*]: Result Stored
    Failed --> ManualReview: Send to Queue
    ManualReview --> [*]: Manual Processing
Loading

4.2 Error Handling Flow

flowchart TD
    A[Processing Error] --> B{Error Type}
    
    B -->|Timeout| C[Increase Timeout]
    B -->|Parse Error| D[Fallback Parser]
    B -->|Validation Error| E[Retry with AI]
    B -->|System Error| F[Dead Letter Queue]
    
    C --> G{Retry?}
    D --> G
    E --> G
    
    G -->|Yes| H[Requeue Job]
    G -->|No| I[Log Error]
    
    F --> I
    I --> J[Send Alert]
    J --> K[Manual Review]
Loading

5. Implementation Details

5.1 Text Extraction Implementation

# Core Text Parser Implementation
from abc import ABC, abstractmethod
from typing import Dict, List, Optional
import pdfplumber
from dataclasses import dataclass
from decimal import Decimal

@dataclass
class ExtractionResult:
    text: str
    tables: List[List[List[str]]]
    metadata: Dict[str, any]
    confidence: float

class TextExtractor(ABC):
    @abstractmethod
    def extract(self, pdf_path: str) -> ExtractionResult:
        pass

class PDFPlumberExtractor(TextExtractor):
    def __init__(self, config: Dict[str, any]):
        self.config = config
        
    def extract(self, pdf_path: str) -> ExtractionResult:
        text = ""
        tables = []
        
        with pdfplumber.open(pdf_path) as pdf:
            for page in pdf.pages:
                # Extract text
                page_text = page.extract_text() or ""
                text += page_text + "\n"
                
                # Extract tables
                page_tables = page.extract_tables() or []
                tables.extend(page_tables)
        
        return ExtractionResult(
            text=text,
            tables=tables,
            metadata={"page_count": len(pdf.pages)},
            confidence=self._calculate_confidence(text)
        )
    
    def _calculate_confidence(self, text: str) -> float:
        # Implement confidence scoring based on text quality
        if not text:
            return 0.0
        # Check for common invoice patterns
        patterns = ['invoice', 'total', 'amount', 'date']
        matches = sum(1 for p in patterns if p.lower() in text.lower())
        return min(matches / len(patterns), 1.0)

5.2 Vendor Detection Implementation

# Vendor Detection System
class VendorDetector:
    def __init__(self, cache_service, ai_classifier):
        self.cache = cache_service
        self.ai_classifier = ai_classifier
        self.vendor_patterns = self._load_vendor_patterns()
    
    def detect_vendor(self, document: PDFDocument) -> Optional[Vendor]:
        # Try cache first
        cached_vendor = self._check_cache(document)
        if cached_vendor:
            return cached_vendor
        
        # Extract identifying text
        text = document.get_text()[:1000]  # First 1000 chars
        
        # Try pattern matching
        vendor = self._match_patterns(text)
        if vendor:
            self._update_cache(document, vendor)
            return vendor
        
        # Fallback to AI classification
        vendor = self.ai_classifier.classify(text)
        self._update_cache(document, vendor)
        return vendor
    
    def _match_patterns(self, text: str) -> Optional[Vendor]:
        for vendor_id, patterns in self.vendor_patterns.items():
            for pattern in patterns:
                if re.search(pattern, text, re.IGNORECASE):
                    return self._get_vendor(vendor_id)
        return None

5.3 Parser Strategy Implementation

# Strategy Pattern Implementation
class ParserStrategy:
    def __init__(self, parser_registry: ParserRegistry):
        self.parsers = {}
        self.parser_registry = parser_registry
        
    def select_parser(self, 
                     document: PDFDocument, 
                     vendor: Vendor) -> InvoiceParser:
        # Get vendor-specific configuration
        parser_config = vendor.parser_config
        
        # Check document quality
        quality = self._assess_document_quality(document)
        
        # Select appropriate parser
        if quality.is_digital and quality.text_quality > 0.8:
            parser_type = "text"
        elif quality.is_scanned and quality.ocr_suitable:
            parser_type = "ocr"
        else:
            parser_type = "ai"
        
        # Override with vendor preference if specified
        if vendor.preferred_parser:
            parser_type = vendor.preferred_parser
        
        # Get or create parser
        parser_key = f"{vendor.vendor_id}_{parser_type}"
        if parser_key not in self.parsers:
            self.parsers[parser_key] = self.parser_registry.create_parser(
                parser_type, 
                parser_config
            )
        
        return self.parsers[parser_key]

5.4 AI Parser Implementation

# AI-Powered Parser
class AIParser(BaseParser):
    def __init__(self, llm_client, prompt_manager):
        super().__init__()
        self.llm_client = llm_client
        self.prompt_manager = prompt_manager
    
    def parse(self, document: PDFDocument) -> ParseResult:
        # Extract text for context
        text = document.get_text()
        
        # Get vendor-specific prompt
        prompt = self.prompt_manager.get_prompt(
            "invoice_extraction",
            context={"text": text[:4000]}  # Token limit
        )
        
        # Call LLM
        response = self.llm_client.complete(
            prompt=prompt,
            temperature=0.1,
            response_format={"type": "json_object"}
        )
        
        # Parse and validate response
        extracted_data = json.loads(response)
        return self._build_parse_result(extracted_data, document)
    
    def _build_parse_result(self, 
                           data: Dict, 
                           document: PDFDocument) -> ParseResult:
        return ParseResult(
            job_id=document.document_id,
            vendor_id=data.get("vendor_id"),
            invoice_number=data.get("invoice_number"),
            invoice_date=self._parse_date(data.get("invoice_date")),
            total_amount=Decimal(str(data.get("total_amount", 0))),
            line_items=self._parse_line_items(data.get("line_items", [])),
            extracted_fields=data,
            confidence_scores=self._calculate_confidences(data),
            parser_used="ai",
            processing_time=0.0  # Set by wrapper
        )

6. Testing Architecture

6.1 Test Strategy Diagram

graph TD
    subgraph "Test Levels"
        A[Unit Tests]
        B[Integration Tests]
        C[End-to-End Tests]
        D[Performance Tests]
    end
    
    subgraph "Test Types"
        E[Parser Tests]
        F[Vendor Detection Tests]
        G[API Tests]
        H[Load Tests]
    end
    
    subgraph "Test Data"
        I[Mock PDFs]
        J[Real Vendor Samples]
        K[Edge Cases]
        L[Performance Datasets]
    end
    
    A --> E
    A --> F
    B --> G
    C --> G
    D --> H
    
    I --> A
    J --> B
    K --> C
    L --> D
Loading

6.2 Test Framework Structure

# Test Framework Design
class ParserTestCase:
    def __init__(self, vendor_id: str, test_file: str):
        self.vendor_id = vendor_id
        self.test_file = test_file
        self.expected_result = self._load_expected_result()
    
    def run(self, parser: InvoiceParser) -> TestResult:
        # Parse document
        document = PDFDocument.from_file(self.test_file)
        actual_result = parser.parse(document)
        
        # Compare results
        return self._compare_results(
            expected=self.expected_result,
            actual=actual_result
        )
    
    def _compare_results(self, expected: ParseResult, actual: ParseResult) -> TestResult:
        differences = []
        
        # Compare core fields
        for field in ['invoice_number', 'invoice_date', 'total_amount']:
            if getattr(expected, field) != getattr(actual, field):
                differences.append(f"{field}: expected {getattr(expected, field)}, got {getattr(actual, field)}")
        
        # Compare line items
        if len(expected.line_items) != len(actual.line_items):
            differences.append(f"Line items count mismatch")
        
        return TestResult(
            passed=len(differences) == 0,
            differences=differences,
            confidence=actual.confidence_scores
        )

class VendorTestSuite:
    def __init__(self, vendor_id: str):
        self.vendor_id = vendor_id
        self.test_cases = self._load_test_cases()
        
    def run_all(self) -> TestSuiteResult:
        results = []
        for test_case in self.test_cases:
            parser = self._get_parser_for_vendor()
            result = test_case.run(parser)
            results.append(result)
        
        return TestSuiteResult(
            vendor_id=self.vendor_id,
            total_tests=len(results),
            passed=sum(1 for r in results if r.passed),
            failed=sum(1 for r in results if not r.passed),
            results=results
        )

7. Performance Optimization

7.1 Caching Implementation

# Multi-level Caching Strategy
class CacheManager:
    def __init__(self):
        self.memory_cache = {}  # Lambda memory cache
        self.redis_client = self._init_redis()
        self.cache_ttl = {
            'vendor_mapping': 3600,  # 1 hour
            'parser_config': 1800,   # 30 minutes
            'parse_result': 86400    # 24 hours
        }
    
    async def get_vendor(self, document_hash: str) -> Optional[Vendor]:
        # Check memory cache first
        if document_hash in self.memory_cache:
            return self.memory_cache[document_hash]
        
        # Check Redis
        cached = await self.redis_client.get(f"vendor:{document_hash}")
        if cached:
            vendor = Vendor.from_json(cached)
            self.memory_cache[document_hash] = vendor
            return vendor
        
        return None
    
    async def set_vendor(self, document_hash: str, vendor: Vendor):
        # Update both caches
        self.memory_cache[document_hash] = vendor
        await self.redis_client.setex(
            f"vendor:{document_hash}",
            self.cache_ttl['vendor_mapping'],
            vendor.to_json()
        )

# Connection Pooling for Lambda
class ConnectionPool:
    _instance = None
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance._init_connections()
        return cls._instance
    
    def _init_connections(self):
        self.s3_client = boto3.client('s3')
        self.dynamodb = boto3.resource('dynamodb')
        self.redis_pool = redis.ConnectionPool(
            host=os.getenv('REDIS_HOST'),
            port=6379,
            decode_responses=True
        )

7.2 Lambda Optimization

# Cold Start Optimization
import json
import os
from functools import lru_cache

# Initialize outside handler for reuse
connection_pool = ConnectionPool()
parser_registry = ParserRegistry()

# Pre-load common models
@lru_cache(maxsize=None)
def get_parser(parser_type: str) -> InvoiceParser:
    return parser_registry.create_parser(parser_type, get_default_config())

def lambda_handler(event, context):
    # Warm connection check
    if event.get('warm'):
        return {'statusCode': 200, 'body': 'warm'}
    
    # Process actual request
    try:
        # Reuse connections
        s3 = connection_pool.s3_client
        
        # Parse request
        body = json.loads(event['body'])
        document_id = body['document_id']
        
        # Process invoice
        result = process_invoice(document_id)
        
        return {
            'statusCode': 200,
            'body': json.dumps(result)
        }
    except Exception as e:
        return {
            'statusCode': 500,
            'body': json.dumps({'error': str(e)})
        }

8. Container Configuration

8.1 Dockerfile Structure

# Multi-stage build for optimal size
FROM python:3.11-slim as builder

# Install build dependencies
RUN apt-get update && apt-get install -y \
    gcc \
    g++ \
    tesseract-ocr \
    poppler-utils \
    && rm -rf /var/lib/apt/lists/*

# Create virtual environment
RUN python -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"

# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Runtime stage
FROM python:3.11-slim

# Install runtime dependencies
RUN apt-get update && apt-get install -y \
    tesseract-ocr \
    tesseract-ocr-eng \
    poppler-utils \
    && rm -rf /var/lib/apt/lists/*

# Copy virtual environment
COPY --from=builder /opt/venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"

# Copy application code
WORKDIR /app
COPY src/ ./src/
COPY config/ ./config/

# Lambda runtime interface
RUN pip install awslambdaric
ENTRYPOINT [ "python", "-m", "awslambdaric" ]
CMD [ "src.handlers.parsing_handler.lambda_handler" ]

8.2 Docker Compose for Local Development

version: '3.8'

services:
  parser-api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - AWS_REGION=us-east-1
      - REDIS_HOST=redis
      - ENVIRONMENT=development
    volumes:
      - ./src:/app/src
      - ./test_data:/app/test_data
    depends_on:
      - redis
      - localstack

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"

  localstack:
    image: localstack/localstack
    ports:
      - "4566:4566"
    environment:
      - SERVICES=s3,sqs,dynamodb
      - DEFAULT_REGION=us-east-1
    volumes:
      - "./init-aws.sh:/etc/localstack/init/ready.d/init-aws.sh"

  test-runner:
    build:
      context: .
      target: builder
    command: pytest -v
    volumes:
      - ./src:/app/src
      - ./tests:/app/tests
      - ./test_data:/app/test_data

9. API Design

9.1 REST API Endpoints

openapi: 3.0.0
info:
  title: PDF Invoice Parser API
  version: 1.0.0

paths:
  /api/v1/invoices/upload:
    post:
      summary: Upload invoice for parsing
      requestBody:
        content:
          multipart/form-data:
            schema:
              type: object
              properties:
                file:
                  type: string
                  format: binary
                vendor_id:
                  type: string
                  required: false
      responses:
        '200':
          content:
            application/json:
              schema:
                type: object
                properties:
                  job_id:
                    type: string
                  status:
                    type: string
                  estimated_time:
                    type: integer

  /api/v1/invoices/{job_id}/status:
    get:
      summary: Get parsing job status
      parameters:
        - name: job_id
          in: path
          required: true
          schema:
            type: string
      responses:
        '200':
          content:
            application/json:
              schema:
                $ref: '#/components/schemas/JobStatus'

  /api/v1/invoices/{job_id}/result:
    get:
      summary: Get parsed invoice data
      parameters:
        - name: job_id
          in: path
          required: true
          schema:
            type: string
      responses:
        '200':
          content:
            application/json:
              schema:
                $ref: '#/components/schemas/ParseResult'

  /api/v1/vendors:
    get:
      summary: List all vendors
    post:
      summary: Register new vendor

  /api/v1/vendors/{vendor_id}/parser-config:
    get:
      summary: Get vendor parser configuration
    put:
      summary: Update vendor parser configuration

9.2 Event-Driven Architecture

flowchart LR
    subgraph "Event Sources"
        A[S3 Upload Event]
        B[API Gateway Event]
        C[SQS Message]
    end
    
    subgraph "Event Router"
        D[EventBridge]
    end
    
    subgraph "Event Handlers"
        E[Parse Request Handler]
        F[Vendor Detection Handler]
        G[Result Processor Handler]
    end
    
    subgraph "Event Targets"
        H[SQS Parse Queue]
        I[SNS Notification]
        J[DynamoDB Stream]
    end
    
    A --> D
    B --> D
    C --> D
    
    D --> E
    D --> F
    D --> G
    
    E --> H
    F --> H
    G --> I
    G --> J
Loading

10. Security Implementation

10.1 Security Architecture

flowchart TD
    subgraph "Security Layers"
        A[API Authentication]
        B[Request Validation]
        C[Data Encryption]
        D[Access Control]
    end
    
    subgraph "Authentication Methods"
        E[API Keys]
        F[JWT Tokens]
        G[IAM Roles]
    end
    
    subgraph "Encryption"
        H[TLS 1.3]
        I[S3 Encryption]
        J[KMS Keys]
    end
    
    subgraph "Access Control"
        K[Resource Policies]
        L[VPC Endpoints]
        M[Security Groups]
    end
    
    A --> E
    A --> F
    A --> G
    
    C --> H
    C --> I
    C --> J
    
    D --> K
    D --> L
    D --> M
Loading

10.2 Security Implementation Code

# API Security Middleware
from fastapi import Security, HTTPException, status
from fastapi.security import APIKeyHeader
import jwt

class SecurityMiddleware:
    def __init__(self):
        self.api_key_header = APIKeyHeader(name="X-API-Key")
        self.jwt_secret = os.getenv("JWT_SECRET")
    
    async def verify_api_key(self, api_key: str = Security(api_key_header)):
        if not self._is_valid_api_key(api_key):
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail="Invalid API Key"
            )
        return api_key
    
    async def verify_jwt(self, token: str):
        try:
            payload = jwt.decode(
                token, 
                self.jwt_secret, 
                algorithms=["HS256"]
            )
            return payload
        except jwt.InvalidTokenError:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Invalid token"
            )

# Data Encryption
class EncryptionService:
    def __init__(self):
        self.kms_client = boto3.client('kms')
        self.key_id = os.getenv("KMS_KEY_ID")
    
    def encrypt_sensitive_data(self, data: dict) -> dict:
        sensitive_fields = ['invoice_number', 'tax_id', 'account_number']
        encrypted_data = data.copy()
        
        for field in sensitive_fields:
            if field in data:
                encrypted_data[field] = self._encrypt_field(data[field])
        
        return encrypted_data
    
    def _encrypt_field(self, value: str) -> str:
        response = self.kms_client.encrypt(
            KeyId=self.key_id,
            Plaintext=value
        )
        return base64.b64encode(response['CiphertextBlob']).decode()

11. Monitoring and Logging

11.1 Observability Stack

flowchart TB
    subgraph "Application Metrics"
        A[Custom Metrics]
        B[Performance Metrics]
        C[Business Metrics]
    end
    
    subgraph "CloudWatch"
        D[Metrics Collection]
        E[Log Aggregation]
        F[Alarms]
    end
    
    subgraph "X-Ray"
        G[Distributed Tracing]
        H[Service Map]
        I[Performance Analysis]
    end
    
    subgraph "Dashboards"
        J[Operations Dashboard]
        K[Business Dashboard]
        L[Cost Dashboard]
    end
    
    A --> D
    B --> D
    C --> D
    
    D --> F
    E --> F
    
    G --> H
    H --> I
    
    D --> J
    E --> J
    I --> K
    D --> L
Loading

11.2 Logging Implementation

# Structured Logging
import structlog
from aws_xray_sdk.core import xray_recorder

# Configure structured logging
structlog.configure(
    processors=[
        structlog.stdlib.filter_by_level,
        structlog.stdlib.add_logger_name,
        structlog.stdlib.add_log_level,
        structlog.stdlib.PositionalArgumentsFormatter(),
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
        structlog.processors.UnicodeDecoder(),
        structlog.processors.JSONRenderer()
    ],
    context_class=dict,
    logger_factory=structlog.stdlib.LoggerFactory(),
    cache_logger_on_first_use=True,
)

class InstrumentedParser:
    def __init__(self, parser: InvoiceParser):
        self.parser = parser
        self.logger = structlog.get_logger()
    
    @xray_recorder.capture('parse_invoice')
    def parse(self, document: PDFDocument) -> ParseResult:
        subsegment = xray_recorder.current_subsegment()
        
        self.logger.info(
            "Starting invoice parsing",
            document_id=document.document_id,
            parser_type=self.parser.__class__.__name__,
            page_count=document.page_count
        )
        
        start_time = time.time()
        
        try:
            result = self.parser.parse(document)
            
            # Add metrics
            subsegment.put_metadata('vendor_id', result.vendor_id)
            subsegment.put_metadata('confidence', result.confidence_scores)
            
            self.logger.info(
                "Invoice parsed successfully",
                document_id=document.document_id,
                vendor_id=result.vendor_id,
                processing_time=time.time() - start_time,
                confidence=result.confidence_scores
            )
            
            # Emit CloudWatch metric
            self._emit_metric(
                'InvoicesParsed',
                1,
                vendor_id=result.vendor_id
            )
            
            return result
            
        except Exception as e:
            self.logger.error(
                "Invoice parsing failed",
                document_id=document.document_id,
                error=str(e),
                processing_time=time.time() - start_time
            )
            
            # Emit error metric
            self._emit_metric(
                'InvoicesParsingErrors',
                1,
                error_type=type(e).__name__
            )
            
            raise

12. Deployment Configuration

12.1 Infrastructure as Code (Terraform)

# Lambda Functions
resource "aws_lambda_function" "parser_functions" {
  for_each = var.parser_types
  
  function_name = "invoice-parser-${each.key}"
  role         = aws_iam_role.lambda_role.arn
  handler      = "src.handlers.${each.key}_handler.lambda_handler"
  runtime      = "python3.11"
  timeout      = 300
  memory_size  = each.value.memory_size
  
  environment {
    variables = {
      ENVIRONMENT = var.environment
      REDIS_HOST  = aws_elasticache_cluster.redis.cache_nodes[0].address
      S3_BUCKET   = aws_s3_bucket.documents.id
    }
  }
  
  vpc_config {
    subnet_ids         = var.private_subnet_ids
    security_group_ids = [aws_security_group.lambda_sg.id]
  }
}

# SQS Queues
resource "aws_sqs_queue" "parsing_queue" {
  name                      = "invoice-parsing-queue"
  visibility_timeout_seconds = 360
  message_retention_seconds  = 1209600  # 14 days
  max_message_size          = 262144
  
  redrive_policy = jsonencode({
    deadLetterTargetArn = aws_sqs_queue.dlq.arn
    maxReceiveCount     = 3
  })
}

# DynamoDB Tables
resource "aws_dynamodb_table" "vendors" {
  name           = "invoice-parser-vendors"
  billing_mode   = "PAY_PER_REQUEST"
  hash_key       = "vendor_id"
  
  attribute {
    name = "vendor_id"
    type = "S"
  }
  
  global_secondary_index {
    name            = "vendor_name_index"
    hash_key        = "vendor_name"
    projection_type = "ALL"
  }
}

12.2 CI/CD Pipeline (GitHub Actions)

name: Deploy Invoice Parser

on:
  push:
    branches: [main]
  pull_request:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      
      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'
      
      - name: Install dependencies
        run: |
          pip install -r requirements.txt
          pip install -r requirements-dev.txt
      
      - name: Run tests
        run: |
          pytest tests/ -v --cov=src --cov-report=xml
      
      - name: Upload coverage
        uses: codecov/codecov-action@v3

  build:
    needs: test
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      
      - name: Configure AWS credentials
        uses: aws-actions/configure-aws-credentials@v2
        with:
          aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
          aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
          aws-region: us-east-1
      
      - name: Build and push Docker image
        run: |
          aws ecr get-login-password | docker login --username AWS --password-stdin $ECR_REGISTRY
          docker build -t $ECR_REGISTRY/invoice-parser:$GITHUB_SHA .
          docker push $ECR_REGISTRY/invoice-parser:$GITHUB_SHA

  deploy:
    needs: build
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    steps:
      - name: Deploy to Lambda
        run: |
          aws lambda update-function-code \
            --function-name invoice-parser-main \
            --image-uri $ECR_REGISTRY/invoice-parser:$GITHUB_SHA

13. Performance Benchmarks

13.1 Expected Performance Metrics

Metric Target Measurement Method
Average Processing Time < 5 seconds CloudWatch Metrics
P99 Processing Time < 30 seconds X-Ray Traces
Throughput 1000 invoices/hour Custom Metrics
Error Rate < 0.5% CloudWatch Alarms
Cold Start Time < 2 seconds Lambda Insights
Memory Usage < 80% allocated CloudWatch Metrics

13.2 Optimization Strategies

  1. Lambda Memory Optimization

    • Text Parser: 512 MB
    • OCR Parser: 1024 MB
    • AI Parser: 2048 MB
  2. Concurrency Limits

    • Reserved Concurrency: 100 per parser type
    • Provisioned Concurrency: 10 for high-volume vendors
  3. Caching Strategy

    • Vendor mappings: 1 hour TTL
    • Parser configs: 30 minutes TTL
    • ML model weights: Persistent in Lambda memory

14. Cost Optimization

14.1 Cost Breakdown

pie title "Estimated Monthly Cost Distribution"
    "Lambda Compute" : 30
    "S3 Storage" : 15
    "DynamoDB" : 10
    "API Gateway" : 5
    "Data Transfer" : 5
    "CloudWatch" : 5
    "AI/ML APIs" : 25
    "Other Services" : 5
Loading

14.2 Cost Optimization Strategies

  1. S3 Lifecycle Policies

    • Move processed invoices to Glacier after 90 days
    • Delete temporary files after 7 days
  2. Lambda Optimization

    • Use ARM-based Graviton2 processors
    • Implement request batching for AI parsing
  3. DynamoDB Optimization

    • Use on-demand pricing for variable workloads
    • Implement item expiration for temporary data

15. Disaster Recovery

15.1 Backup and Recovery Strategy

flowchart LR
    subgraph "Primary Region"
        A[Lambda Functions]
        B[DynamoDB]
        C[S3 Bucket]
    end
    
    subgraph "Backup Region"
        D[Lambda Functions]
        E[DynamoDB Global Table]
        F[S3 Cross-Region Replication]
    end
    
    subgraph "Recovery Process"
        G[Health Check Failure]
        H[Failover Trigger]
        I[DNS Update]
    end
    
    B --> E
    C --> F
    G --> H
    H --> I
    I --> D
Loading

16. Future Enhancements

  1. Machine Learning Pipeline

    • Automated model training for new vendors
    • Continuous improvement based on corrections
  2. Advanced Features

    • Multi-language support
    • Handwritten invoice support
    • Real-time collaboration for corrections
  3. Integration Capabilities

    • ERP system connectors
    • Accounting software APIs
    • Webhook notifications

17. Conclusion

This Low-Level Design provides a comprehensive blueprint for implementing a scalable PDF invoice parsing system. The architecture supports:

  • 1000+ vendor-specific parsers
  • Multiple parsing strategies (text, OCR, AI)
  • Serverless deployment with auto-scaling
  • Comprehensive testing framework
  • Production-ready monitoring and security

The modular design allows for easy extension and maintenance while ensuring high performance and reliability.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment