classDiagram
class InvoiceParser {
<<interface>>
+parse(document: PDFDocument) ParseResult
+validate(result: ParseResult) bool
+get_confidence() float
}
class BaseParser {
<<abstract>>
#logger: Logger
#config: ParserConfig
+parse(document: PDFDocument) ParseResult
+validate(result: ParseResult) bool
+extract_metadata(document: PDFDocument) dict
}
class TextParser {
-text_extractor: TextExtractor
+parse(document: PDFDocument) ParseResult
+extract_tables(document: PDFDocument) list
}
class OCRParser {
-ocr_engine: OCREngine
-image_processor: ImageProcessor
+parse(document: PDFDocument) ParseResult
+preprocess_image(image: Image) Image
}
class AIParser {
-llm_client: LLMClient
-prompt_manager: PromptManager
+parse(document: PDFDocument) ParseResult
+generate_prompt(vendor: Vendor) str
}
class ParserStrategy {
-parsers: dict~str, InvoiceParser~
-vendor_detector: VendorDetector
+select_parser(document: PDFDocument) InvoiceParser
+register_parser(vendor_id: str, parser: InvoiceParser)
}
class VendorDetector {
-text_matcher: TextMatcher
-ai_classifier: AIClassifier
-cache: Cache
+detect_vendor(document: PDFDocument) Vendor
+update_vendor_patterns(vendor: Vendor, patterns: list)
}
InvoiceParser <|-- BaseParser
BaseParser <|-- TextParser
BaseParser <|-- OCRParser
BaseParser <|-- AIParser
ParserStrategy --> InvoiceParser
ParserStrategy --> VendorDetector
classDiagram
class ParserFactory {
<<abstract>>
+create_parser(config: ParserConfig) InvoiceParser
}
class TextParserFactory {
+create_parser(config: ParserConfig) TextParser
}
class OCRParserFactory {
+create_parser(config: ParserConfig) OCRParser
}
class AIParserFactory {
+create_parser(config: ParserConfig) AIParser
}
class ParserRegistry {
-factories: dict~str, ParserFactory~
+register_factory(type: str, factory: ParserFactory)
+create_parser(type: str, config: ParserConfig) InvoiceParser
}
ParserFactory <|-- TextParserFactory
ParserFactory <|-- OCRParserFactory
ParserFactory <|-- AIParserFactory
ParserRegistry --> ParserFactory
classDiagram
class PDFDocument {
+document_id: str
+file_path: str
+file_size: int
+page_count: int
+metadata: dict
+created_at: datetime
+get_page(page_num: int) PDFPage
+get_text() str
}
class PDFPage {
+page_number: int
+width: float
+height: float
+text_content: str
+images: list~Image~
+tables: list~Table~
+get_text_blocks() list~TextBlock~
}
class ParseResult {
+job_id: str
+vendor_id: str
+invoice_number: str
+invoice_date: datetime
+total_amount: Decimal
+line_items: list~LineItem~
+extracted_fields: dict
+confidence_scores: dict
+parser_used: str
+processing_time: float
}
class LineItem {
+description: str
+quantity: float
+unit_price: Decimal
+total_price: Decimal
+tax_rate: float
+metadata: dict
}
class Vendor {
+vendor_id: str
+vendor_name: str
+parser_strategy: str
+field_mappings: dict
+validation_rules: list~Rule~
+confidence_threshold: float
}
PDFDocument "1" --> "*" PDFPage
ParseResult "1" --> "*" LineItem
ParseResult --> Vendor
classDiagram
class ParserConfig {
+parser_type: str
+timeout_seconds: int
+retry_count: int
+confidence_threshold: float
+custom_settings: dict
}
class TextParserConfig {
+extraction_library: str
+table_detection: bool
+regex_patterns: dict
}
class OCRConfig {
+engine: str
+language: str
+dpi: int
+preprocessing_steps: list
}
class AIParserConfig {
+model: str
+temperature: float
+max_tokens: int
+prompt_template: str
}
ParserConfig <|-- TextParserConfig
ParserConfig <|-- OCRConfig
ParserConfig <|-- AIParserConfig
flowchart TB
subgraph "API Layer"
A[FastAPI Application]
B[Request Validator]
C[Response Formatter]
end
subgraph "Business Logic Layer"
D[Invoice Service]
E[Vendor Service]
F[Parser Service]
end
subgraph "Data Access Layer"
G[Document Repository]
H[Vendor Repository]
I[Result Repository]
end
subgraph "External Services"
J[S3 Service]
K[Queue Service]
L[Cache Service]
end
A --> B
B --> D
D --> F
D --> E
E --> H
F --> G
G --> J
D --> K
E --> L
F --> I
C --> A
classDiagram
class LambdaHandler {
<<abstract>>
+handle(event: dict, context: dict) dict
#validate_input(event: dict) bool
#process(data: dict) dict
#format_response(result: dict) dict
}
class VendorDetectionHandler {
-vendor_detector: VendorDetector
+handle(event: dict, context: dict) dict
}
class ParsingHandler {
-parser_strategy: ParserStrategy
+handle(event: dict, context: dict) dict
}
class ResultProcessorHandler {
-result_validator: ResultValidator
-storage_service: StorageService
+handle(event: dict, context: dict) dict
}
LambdaHandler <|-- VendorDetectionHandler
LambdaHandler <|-- ParsingHandler
LambdaHandler <|-- ResultProcessorHandler
stateDiagram-v2
[*] --> Queued: Invoice Uploaded
Queued --> Processing: Job Dequeued
Processing --> VendorDetection: Start Processing
VendorDetection --> ParserSelection: Vendor Identified
VendorDetection --> AIClassification: Vendor Unknown
AIClassification --> ParserSelection: Vendor Classified
ParserSelection --> TextParsing: Digital PDF
ParserSelection --> OCRProcessing: Scanned PDF
ParserSelection --> AIParsing: Complex Format
TextParsing --> Validation: Extraction Complete
OCRProcessing --> Validation: OCR Complete
AIParsing --> Validation: AI Processing Complete
Validation --> Success: Validation Passed
Validation --> Retry: Validation Failed
Retry --> AIParsing: Retry with AI
Retry --> Failed: Max Retries Exceeded
Success --> [*]: Result Stored
Failed --> ManualReview: Send to Queue
ManualReview --> [*]: Manual Processing
flowchart TD
A[Processing Error] --> B{Error Type}
B -->|Timeout| C[Increase Timeout]
B -->|Parse Error| D[Fallback Parser]
B -->|Validation Error| E[Retry with AI]
B -->|System Error| F[Dead Letter Queue]
C --> G{Retry?}
D --> G
E --> G
G -->|Yes| H[Requeue Job]
G -->|No| I[Log Error]
F --> I
I --> J[Send Alert]
J --> K[Manual Review]
# Core Text Parser Implementation
from abc import ABC, abstractmethod
from typing import Dict, List, Optional
import pdfplumber
from dataclasses import dataclass
from decimal import Decimal
@dataclass
class ExtractionResult:
text: str
tables: List[List[List[str]]]
metadata: Dict[str, any]
confidence: float
class TextExtractor(ABC):
@abstractmethod
def extract(self, pdf_path: str) -> ExtractionResult:
pass
class PDFPlumberExtractor(TextExtractor):
def __init__(self, config: Dict[str, any]):
self.config = config
def extract(self, pdf_path: str) -> ExtractionResult:
text = ""
tables = []
with pdfplumber.open(pdf_path) as pdf:
for page in pdf.pages:
# Extract text
page_text = page.extract_text() or ""
text += page_text + "\n"
# Extract tables
page_tables = page.extract_tables() or []
tables.extend(page_tables)
return ExtractionResult(
text=text,
tables=tables,
metadata={"page_count": len(pdf.pages)},
confidence=self._calculate_confidence(text)
)
def _calculate_confidence(self, text: str) -> float:
# Implement confidence scoring based on text quality
if not text:
return 0.0
# Check for common invoice patterns
patterns = ['invoice', 'total', 'amount', 'date']
matches = sum(1 for p in patterns if p.lower() in text.lower())
return min(matches / len(patterns), 1.0)# Vendor Detection System
class VendorDetector:
def __init__(self, cache_service, ai_classifier):
self.cache = cache_service
self.ai_classifier = ai_classifier
self.vendor_patterns = self._load_vendor_patterns()
def detect_vendor(self, document: PDFDocument) -> Optional[Vendor]:
# Try cache first
cached_vendor = self._check_cache(document)
if cached_vendor:
return cached_vendor
# Extract identifying text
text = document.get_text()[:1000] # First 1000 chars
# Try pattern matching
vendor = self._match_patterns(text)
if vendor:
self._update_cache(document, vendor)
return vendor
# Fallback to AI classification
vendor = self.ai_classifier.classify(text)
self._update_cache(document, vendor)
return vendor
def _match_patterns(self, text: str) -> Optional[Vendor]:
for vendor_id, patterns in self.vendor_patterns.items():
for pattern in patterns:
if re.search(pattern, text, re.IGNORECASE):
return self._get_vendor(vendor_id)
return None# Strategy Pattern Implementation
class ParserStrategy:
def __init__(self, parser_registry: ParserRegistry):
self.parsers = {}
self.parser_registry = parser_registry
def select_parser(self,
document: PDFDocument,
vendor: Vendor) -> InvoiceParser:
# Get vendor-specific configuration
parser_config = vendor.parser_config
# Check document quality
quality = self._assess_document_quality(document)
# Select appropriate parser
if quality.is_digital and quality.text_quality > 0.8:
parser_type = "text"
elif quality.is_scanned and quality.ocr_suitable:
parser_type = "ocr"
else:
parser_type = "ai"
# Override with vendor preference if specified
if vendor.preferred_parser:
parser_type = vendor.preferred_parser
# Get or create parser
parser_key = f"{vendor.vendor_id}_{parser_type}"
if parser_key not in self.parsers:
self.parsers[parser_key] = self.parser_registry.create_parser(
parser_type,
parser_config
)
return self.parsers[parser_key]# AI-Powered Parser
class AIParser(BaseParser):
def __init__(self, llm_client, prompt_manager):
super().__init__()
self.llm_client = llm_client
self.prompt_manager = prompt_manager
def parse(self, document: PDFDocument) -> ParseResult:
# Extract text for context
text = document.get_text()
# Get vendor-specific prompt
prompt = self.prompt_manager.get_prompt(
"invoice_extraction",
context={"text": text[:4000]} # Token limit
)
# Call LLM
response = self.llm_client.complete(
prompt=prompt,
temperature=0.1,
response_format={"type": "json_object"}
)
# Parse and validate response
extracted_data = json.loads(response)
return self._build_parse_result(extracted_data, document)
def _build_parse_result(self,
data: Dict,
document: PDFDocument) -> ParseResult:
return ParseResult(
job_id=document.document_id,
vendor_id=data.get("vendor_id"),
invoice_number=data.get("invoice_number"),
invoice_date=self._parse_date(data.get("invoice_date")),
total_amount=Decimal(str(data.get("total_amount", 0))),
line_items=self._parse_line_items(data.get("line_items", [])),
extracted_fields=data,
confidence_scores=self._calculate_confidences(data),
parser_used="ai",
processing_time=0.0 # Set by wrapper
)graph TD
subgraph "Test Levels"
A[Unit Tests]
B[Integration Tests]
C[End-to-End Tests]
D[Performance Tests]
end
subgraph "Test Types"
E[Parser Tests]
F[Vendor Detection Tests]
G[API Tests]
H[Load Tests]
end
subgraph "Test Data"
I[Mock PDFs]
J[Real Vendor Samples]
K[Edge Cases]
L[Performance Datasets]
end
A --> E
A --> F
B --> G
C --> G
D --> H
I --> A
J --> B
K --> C
L --> D
# Test Framework Design
class ParserTestCase:
def __init__(self, vendor_id: str, test_file: str):
self.vendor_id = vendor_id
self.test_file = test_file
self.expected_result = self._load_expected_result()
def run(self, parser: InvoiceParser) -> TestResult:
# Parse document
document = PDFDocument.from_file(self.test_file)
actual_result = parser.parse(document)
# Compare results
return self._compare_results(
expected=self.expected_result,
actual=actual_result
)
def _compare_results(self, expected: ParseResult, actual: ParseResult) -> TestResult:
differences = []
# Compare core fields
for field in ['invoice_number', 'invoice_date', 'total_amount']:
if getattr(expected, field) != getattr(actual, field):
differences.append(f"{field}: expected {getattr(expected, field)}, got {getattr(actual, field)}")
# Compare line items
if len(expected.line_items) != len(actual.line_items):
differences.append(f"Line items count mismatch")
return TestResult(
passed=len(differences) == 0,
differences=differences,
confidence=actual.confidence_scores
)
class VendorTestSuite:
def __init__(self, vendor_id: str):
self.vendor_id = vendor_id
self.test_cases = self._load_test_cases()
def run_all(self) -> TestSuiteResult:
results = []
for test_case in self.test_cases:
parser = self._get_parser_for_vendor()
result = test_case.run(parser)
results.append(result)
return TestSuiteResult(
vendor_id=self.vendor_id,
total_tests=len(results),
passed=sum(1 for r in results if r.passed),
failed=sum(1 for r in results if not r.passed),
results=results
)# Multi-level Caching Strategy
class CacheManager:
def __init__(self):
self.memory_cache = {} # Lambda memory cache
self.redis_client = self._init_redis()
self.cache_ttl = {
'vendor_mapping': 3600, # 1 hour
'parser_config': 1800, # 30 minutes
'parse_result': 86400 # 24 hours
}
async def get_vendor(self, document_hash: str) -> Optional[Vendor]:
# Check memory cache first
if document_hash in self.memory_cache:
return self.memory_cache[document_hash]
# Check Redis
cached = await self.redis_client.get(f"vendor:{document_hash}")
if cached:
vendor = Vendor.from_json(cached)
self.memory_cache[document_hash] = vendor
return vendor
return None
async def set_vendor(self, document_hash: str, vendor: Vendor):
# Update both caches
self.memory_cache[document_hash] = vendor
await self.redis_client.setex(
f"vendor:{document_hash}",
self.cache_ttl['vendor_mapping'],
vendor.to_json()
)
# Connection Pooling for Lambda
class ConnectionPool:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._init_connections()
return cls._instance
def _init_connections(self):
self.s3_client = boto3.client('s3')
self.dynamodb = boto3.resource('dynamodb')
self.redis_pool = redis.ConnectionPool(
host=os.getenv('REDIS_HOST'),
port=6379,
decode_responses=True
)# Cold Start Optimization
import json
import os
from functools import lru_cache
# Initialize outside handler for reuse
connection_pool = ConnectionPool()
parser_registry = ParserRegistry()
# Pre-load common models
@lru_cache(maxsize=None)
def get_parser(parser_type: str) -> InvoiceParser:
return parser_registry.create_parser(parser_type, get_default_config())
def lambda_handler(event, context):
# Warm connection check
if event.get('warm'):
return {'statusCode': 200, 'body': 'warm'}
# Process actual request
try:
# Reuse connections
s3 = connection_pool.s3_client
# Parse request
body = json.loads(event['body'])
document_id = body['document_id']
# Process invoice
result = process_invoice(document_id)
return {
'statusCode': 200,
'body': json.dumps(result)
}
except Exception as e:
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}# Multi-stage build for optimal size
FROM python:3.11-slim as builder
# Install build dependencies
RUN apt-get update && apt-get install -y \
gcc \
g++ \
tesseract-ocr \
poppler-utils \
&& rm -rf /var/lib/apt/lists/*
# Create virtual environment
RUN python -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Runtime stage
FROM python:3.11-slim
# Install runtime dependencies
RUN apt-get update && apt-get install -y \
tesseract-ocr \
tesseract-ocr-eng \
poppler-utils \
&& rm -rf /var/lib/apt/lists/*
# Copy virtual environment
COPY --from=builder /opt/venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
# Copy application code
WORKDIR /app
COPY src/ ./src/
COPY config/ ./config/
# Lambda runtime interface
RUN pip install awslambdaric
ENTRYPOINT [ "python", "-m", "awslambdaric" ]
CMD [ "src.handlers.parsing_handler.lambda_handler" ]version: '3.8'
services:
parser-api:
build: .
ports:
- "8000:8000"
environment:
- AWS_REGION=us-east-1
- REDIS_HOST=redis
- ENVIRONMENT=development
volumes:
- ./src:/app/src
- ./test_data:/app/test_data
depends_on:
- redis
- localstack
redis:
image: redis:7-alpine
ports:
- "6379:6379"
localstack:
image: localstack/localstack
ports:
- "4566:4566"
environment:
- SERVICES=s3,sqs,dynamodb
- DEFAULT_REGION=us-east-1
volumes:
- "./init-aws.sh:/etc/localstack/init/ready.d/init-aws.sh"
test-runner:
build:
context: .
target: builder
command: pytest -v
volumes:
- ./src:/app/src
- ./tests:/app/tests
- ./test_data:/app/test_dataopenapi: 3.0.0
info:
title: PDF Invoice Parser API
version: 1.0.0
paths:
/api/v1/invoices/upload:
post:
summary: Upload invoice for parsing
requestBody:
content:
multipart/form-data:
schema:
type: object
properties:
file:
type: string
format: binary
vendor_id:
type: string
required: false
responses:
'200':
content:
application/json:
schema:
type: object
properties:
job_id:
type: string
status:
type: string
estimated_time:
type: integer
/api/v1/invoices/{job_id}/status:
get:
summary: Get parsing job status
parameters:
- name: job_id
in: path
required: true
schema:
type: string
responses:
'200':
content:
application/json:
schema:
$ref: '#/components/schemas/JobStatus'
/api/v1/invoices/{job_id}/result:
get:
summary: Get parsed invoice data
parameters:
- name: job_id
in: path
required: true
schema:
type: string
responses:
'200':
content:
application/json:
schema:
$ref: '#/components/schemas/ParseResult'
/api/v1/vendors:
get:
summary: List all vendors
post:
summary: Register new vendor
/api/v1/vendors/{vendor_id}/parser-config:
get:
summary: Get vendor parser configuration
put:
summary: Update vendor parser configurationflowchart LR
subgraph "Event Sources"
A[S3 Upload Event]
B[API Gateway Event]
C[SQS Message]
end
subgraph "Event Router"
D[EventBridge]
end
subgraph "Event Handlers"
E[Parse Request Handler]
F[Vendor Detection Handler]
G[Result Processor Handler]
end
subgraph "Event Targets"
H[SQS Parse Queue]
I[SNS Notification]
J[DynamoDB Stream]
end
A --> D
B --> D
C --> D
D --> E
D --> F
D --> G
E --> H
F --> H
G --> I
G --> J
flowchart TD
subgraph "Security Layers"
A[API Authentication]
B[Request Validation]
C[Data Encryption]
D[Access Control]
end
subgraph "Authentication Methods"
E[API Keys]
F[JWT Tokens]
G[IAM Roles]
end
subgraph "Encryption"
H[TLS 1.3]
I[S3 Encryption]
J[KMS Keys]
end
subgraph "Access Control"
K[Resource Policies]
L[VPC Endpoints]
M[Security Groups]
end
A --> E
A --> F
A --> G
C --> H
C --> I
C --> J
D --> K
D --> L
D --> M
# API Security Middleware
from fastapi import Security, HTTPException, status
from fastapi.security import APIKeyHeader
import jwt
class SecurityMiddleware:
def __init__(self):
self.api_key_header = APIKeyHeader(name="X-API-Key")
self.jwt_secret = os.getenv("JWT_SECRET")
async def verify_api_key(self, api_key: str = Security(api_key_header)):
if not self._is_valid_api_key(api_key):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid API Key"
)
return api_key
async def verify_jwt(self, token: str):
try:
payload = jwt.decode(
token,
self.jwt_secret,
algorithms=["HS256"]
)
return payload
except jwt.InvalidTokenError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token"
)
# Data Encryption
class EncryptionService:
def __init__(self):
self.kms_client = boto3.client('kms')
self.key_id = os.getenv("KMS_KEY_ID")
def encrypt_sensitive_data(self, data: dict) -> dict:
sensitive_fields = ['invoice_number', 'tax_id', 'account_number']
encrypted_data = data.copy()
for field in sensitive_fields:
if field in data:
encrypted_data[field] = self._encrypt_field(data[field])
return encrypted_data
def _encrypt_field(self, value: str) -> str:
response = self.kms_client.encrypt(
KeyId=self.key_id,
Plaintext=value
)
return base64.b64encode(response['CiphertextBlob']).decode()flowchart TB
subgraph "Application Metrics"
A[Custom Metrics]
B[Performance Metrics]
C[Business Metrics]
end
subgraph "CloudWatch"
D[Metrics Collection]
E[Log Aggregation]
F[Alarms]
end
subgraph "X-Ray"
G[Distributed Tracing]
H[Service Map]
I[Performance Analysis]
end
subgraph "Dashboards"
J[Operations Dashboard]
K[Business Dashboard]
L[Cost Dashboard]
end
A --> D
B --> D
C --> D
D --> F
E --> F
G --> H
H --> I
D --> J
E --> J
I --> K
D --> L
# Structured Logging
import structlog
from aws_xray_sdk.core import xray_recorder
# Configure structured logging
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer()
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
class InstrumentedParser:
def __init__(self, parser: InvoiceParser):
self.parser = parser
self.logger = structlog.get_logger()
@xray_recorder.capture('parse_invoice')
def parse(self, document: PDFDocument) -> ParseResult:
subsegment = xray_recorder.current_subsegment()
self.logger.info(
"Starting invoice parsing",
document_id=document.document_id,
parser_type=self.parser.__class__.__name__,
page_count=document.page_count
)
start_time = time.time()
try:
result = self.parser.parse(document)
# Add metrics
subsegment.put_metadata('vendor_id', result.vendor_id)
subsegment.put_metadata('confidence', result.confidence_scores)
self.logger.info(
"Invoice parsed successfully",
document_id=document.document_id,
vendor_id=result.vendor_id,
processing_time=time.time() - start_time,
confidence=result.confidence_scores
)
# Emit CloudWatch metric
self._emit_metric(
'InvoicesParsed',
1,
vendor_id=result.vendor_id
)
return result
except Exception as e:
self.logger.error(
"Invoice parsing failed",
document_id=document.document_id,
error=str(e),
processing_time=time.time() - start_time
)
# Emit error metric
self._emit_metric(
'InvoicesParsingErrors',
1,
error_type=type(e).__name__
)
raise# Lambda Functions
resource "aws_lambda_function" "parser_functions" {
for_each = var.parser_types
function_name = "invoice-parser-${each.key}"
role = aws_iam_role.lambda_role.arn
handler = "src.handlers.${each.key}_handler.lambda_handler"
runtime = "python3.11"
timeout = 300
memory_size = each.value.memory_size
environment {
variables = {
ENVIRONMENT = var.environment
REDIS_HOST = aws_elasticache_cluster.redis.cache_nodes[0].address
S3_BUCKET = aws_s3_bucket.documents.id
}
}
vpc_config {
subnet_ids = var.private_subnet_ids
security_group_ids = [aws_security_group.lambda_sg.id]
}
}
# SQS Queues
resource "aws_sqs_queue" "parsing_queue" {
name = "invoice-parsing-queue"
visibility_timeout_seconds = 360
message_retention_seconds = 1209600 # 14 days
max_message_size = 262144
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.dlq.arn
maxReceiveCount = 3
})
}
# DynamoDB Tables
resource "aws_dynamodb_table" "vendors" {
name = "invoice-parser-vendors"
billing_mode = "PAY_PER_REQUEST"
hash_key = "vendor_id"
attribute {
name = "vendor_id"
type = "S"
}
global_secondary_index {
name = "vendor_name_index"
hash_key = "vendor_name"
projection_type = "ALL"
}
}name: Deploy Invoice Parser
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install -r requirements-dev.txt
- name: Run tests
run: |
pytest tests/ -v --cov=src --cov-report=xml
- name: Upload coverage
uses: codecov/codecov-action@v3
build:
needs: test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v2
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: us-east-1
- name: Build and push Docker image
run: |
aws ecr get-login-password | docker login --username AWS --password-stdin $ECR_REGISTRY
docker build -t $ECR_REGISTRY/invoice-parser:$GITHUB_SHA .
docker push $ECR_REGISTRY/invoice-parser:$GITHUB_SHA
deploy:
needs: build
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- name: Deploy to Lambda
run: |
aws lambda update-function-code \
--function-name invoice-parser-main \
--image-uri $ECR_REGISTRY/invoice-parser:$GITHUB_SHA| Metric | Target | Measurement Method |
|---|---|---|
| Average Processing Time | < 5 seconds | CloudWatch Metrics |
| P99 Processing Time | < 30 seconds | X-Ray Traces |
| Throughput | 1000 invoices/hour | Custom Metrics |
| Error Rate | < 0.5% | CloudWatch Alarms |
| Cold Start Time | < 2 seconds | Lambda Insights |
| Memory Usage | < 80% allocated | CloudWatch Metrics |
-
Lambda Memory Optimization
- Text Parser: 512 MB
- OCR Parser: 1024 MB
- AI Parser: 2048 MB
-
Concurrency Limits
- Reserved Concurrency: 100 per parser type
- Provisioned Concurrency: 10 for high-volume vendors
-
Caching Strategy
- Vendor mappings: 1 hour TTL
- Parser configs: 30 minutes TTL
- ML model weights: Persistent in Lambda memory
pie title "Estimated Monthly Cost Distribution"
"Lambda Compute" : 30
"S3 Storage" : 15
"DynamoDB" : 10
"API Gateway" : 5
"Data Transfer" : 5
"CloudWatch" : 5
"AI/ML APIs" : 25
"Other Services" : 5
-
S3 Lifecycle Policies
- Move processed invoices to Glacier after 90 days
- Delete temporary files after 7 days
-
Lambda Optimization
- Use ARM-based Graviton2 processors
- Implement request batching for AI parsing
-
DynamoDB Optimization
- Use on-demand pricing for variable workloads
- Implement item expiration for temporary data
flowchart LR
subgraph "Primary Region"
A[Lambda Functions]
B[DynamoDB]
C[S3 Bucket]
end
subgraph "Backup Region"
D[Lambda Functions]
E[DynamoDB Global Table]
F[S3 Cross-Region Replication]
end
subgraph "Recovery Process"
G[Health Check Failure]
H[Failover Trigger]
I[DNS Update]
end
B --> E
C --> F
G --> H
H --> I
I --> D
-
Machine Learning Pipeline
- Automated model training for new vendors
- Continuous improvement based on corrections
-
Advanced Features
- Multi-language support
- Handwritten invoice support
- Real-time collaboration for corrections
-
Integration Capabilities
- ERP system connectors
- Accounting software APIs
- Webhook notifications
This Low-Level Design provides a comprehensive blueprint for implementing a scalable PDF invoice parsing system. The architecture supports:
- 1000+ vendor-specific parsers
- Multiple parsing strategies (text, OCR, AI)
- Serverless deployment with auto-scaling
- Comprehensive testing framework
- Production-ready monitoring and security
The modular design allows for easy extension and maintenance while ensuring high performance and reliability.