Skip to content

Instantly share code, notes, and snippets.

@ebridges
Created October 10, 2025 16:10
Show Gist options
  • Select an option

  • Save ebridges/26e26af2596e9f458f25c00d076ba5f6 to your computer and use it in GitHub Desktop.

Select an option

Save ebridges/26e26af2596e9f458f25c00d076ba5f6 to your computer and use it in GitHub Desktop.
ingest code books to kb
"""
Bedrock Knowledge Base Ingestion Agent
This agent syncs building code data from the PostgreSQL database to AWS Bedrock Knowledge Base (YSASCPLSVW).
It handles metadata transformation, content formatting, and maintains consistency between systems.
"""
import json
import logging
import boto3
from typing import Dict, List, Optional
from dataclasses import dataclass
import re
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
@dataclass
class BedrockMetadata:
"""Standardized metadata structure matching existing KB format"""
book_code: str
book_title: str
chapter_number: str
chapter_title: str
section_number: str
section_title: str
subsection_number: str
subsection_title: str
provider: str
jurisdiction: str
admin_area: str
year: str
depth: int = 1
section_uid: str = ""
def to_dict(self) -> Dict:
"""Convert to dictionary matching existing KB metadata format"""
# Helper function to clean strings for Bedrock (ASCII only, no special chars)
def clean_string(value: str) -> str:
if not value:
return ""
# Replace common non-ASCII characters
replacements = {
'—': '-', '–': '-', # dashes
''': "'", ''': "'", '"': '"', '"': '"', # quotes
'é': 'e', 'á': 'a', 'í': 'i', 'ó': 'o', 'ú': 'u', 'ñ': 'n', # accents
'É': 'E', 'Á': 'A', 'Í': 'I', 'Ó': 'O', 'Ú': 'U', 'Ñ': 'N'
}
cleaned = value
for old, new in replacements.items():
cleaned = cleaned.replace(old, new)
# Remove any remaining non-ASCII
return cleaned.encode('ascii', 'ignore').decode('ascii').strip()
# Build subsection path (always contains section number)
subsection_path = []
section_num = self.subsection_number or self.section_number
if section_num:
subsection_path.append(section_num)
# Create source identifier (clean chapter slug)
chapter_slug = clean_string(self.chapter_title or "unknown").lower()
chapter_slug = chapter_slug.replace(' ', '-').replace(':', '').replace('(', '').replace(')', '')
source = f"{self.provider}_{self.book_code}_{chapter_slug}"
# Primary title (clean)
title = clean_string(self.section_title or self.subsection_title or self.chapter_title or "")
# Chapter filename (matches existing pattern: slug.html)
chapter_filename = f"{chapter_slug}.html" if chapter_slug != "unknown" else ""
# Section URL (matches existing pattern)
section_anchor = f"#{self.book_code}_{self.chapter_number or 'Ch'}_{section_num}" if section_num else ""
url = f"https://codes.iccsafe.org/content/{self.book_code}/{chapter_slug}{section_anchor}"
return {
"metadataAttributes": {
"code_book": self.book_code,
"section": section_num,
"subsection_path": subsection_path,
"title": title,
"depth": self.depth,
"source": source,
"chapter_filename": chapter_filename,
"chapter_title": clean_string(self.chapter_title or ""),
"url": url,
"jurisdiction": self.admin_area or "I-CODES",
"discipline": "Building", # Default, will be overridden from DB
"admin_area": self.admin_area or "I-CODES",
"year": str(self.year) if self.year else "2024",
"book_title": "", # Will be filled from DB
"book_url": f"https://codes.iccsafe.org/content/{self.book_code}"
}
}
class BedrockKBIngestionAgent:
"""Agent for ingesting building codes from DB to S3 in Bedrock KB format"""
def __init__(self, s3_bucket: str = "com.greenlite.engineering-machine-learning-dev", db_url: Optional[str] = None):
self.s3_bucket = s3_bucket
self.s3_prefix = "codebooks3"
self.logger = logging.getLogger(__name__)
# Initialize AWS S3 client only
self.s3_client = boto3.client('s3')
# Initialize database connection
if db_url:
self.engine = create_engine(db_url)
else:
# Use environment variable or default
import os
db_url = os.getenv('DATABASE_URL', 'postgresql://localhost:5432/building_codes')
self.engine = create_engine(db_url)
self.Session = sessionmaker(bind=self.engine)
def get_s3_info(self) -> Dict:
"""Get S3 bucket information"""
return {
'bucket': self.s3_bucket,
'prefix': self.s3_prefix,
'full_prefix': f's3://{self.s3_bucket}/{self.s3_prefix}/'
}
def get_db_books_not_in_kb(self) -> List[Dict]:
"""Identify books in database that are missing from knowledge base"""
with self.Session() as session:
query = text("""
SELECT DISTINCT
bcb.id as book_id,
bcb.book_code as book_code,
bcb.book_title as book_title,
bcb.book_provider as provider,
bcb.book_admin_area as admin_area,
bcb.book_year as year,
COUNT(bc.id) as section_count,
COUNT(CASE WHEN bc.text_content IS NOT NULL AND bc.text_content != 'NaN' AND bc.text_content != '' THEN 1 END) as content_count
FROM building_code_book bcb
LEFT JOIN building_code bc ON bc.book_id = bcb.id
WHERE bcb.book_provider IN ('ICC', 'NFPA', 'TDLR')
GROUP BY bcb.id, bcb.book_code, bcb.book_title, bcb.book_provider, bcb.book_admin_area, bcb.book_year
ORDER BY bcb.book_provider, bcb.book_code
""")
result = session.execute(query)
return [dict(row._mapping) for row in result]
def get_book_hierarchy_with_content(self, book_id: str) -> List[Dict]:
"""Get hierarchical structure with content for a specific book"""
with self.Session() as session:
# Simple query without recursion for now - get all sections with content
query = text("""
SELECT
bc.id,
bc.section_uid,
bc.section_number,
bc.section_title,
bc.chapter_number,
bc.chapter_title,
CASE
WHEN bc.text_content IS NOT NULL AND bc.text_content != 'NaN' AND bc.text_content != ''
THEN bc.text_content
ELSE NULL
END as text_content,
bc.text_content_html,
bc.content_id,
bc.parent_content_id,
bc.depth,
bc.sort_order,
bc.is_chapter,
bc.is_leaf,
bc.path,
bcb.book_code,
bcb.book_title,
bcb.book_provider as provider,
bcb.book_admin_area as admin_area,
bcb.book_year as year,
CASE
WHEN bc.text_content IS NOT NULL AND bc.text_content != 'NaN' AND bc.text_content != ''
THEN true
ELSE false
END as has_content
FROM building_code bc
JOIN building_code_book bcb ON bc.book_id = bcb.id
WHERE bc.book_id = :book_id
ORDER BY bc.sort_order, bc.chapter_number, bc.section_number
""")
result = session.execute(query, {"book_id": book_id})
return [dict(row._mapping) for row in result]
def create_metadata(self, section: Dict, chapter_info: Dict = None) -> BedrockMetadata:
"""Create standardized metadata from database section"""
# Use the actual database fields directly
chapter_number = section.get('chapter_number', '')
chapter_title = section.get('chapter_title', '')
section_number = section.get('section_number', '')
section_title = section.get('section_title', '')
# For subsections, we'll use depth to determine if this should be treated as a subsection
depth = section.get('depth', 0)
if depth > 2: # Deeper levels treated as subsections
subsection_number = section_number
subsection_title = section_title
section_number = ''
section_title = ''
else:
subsection_number = ''
subsection_title = ''
return BedrockMetadata(
book_code=section.get('book_code', ''),
book_title=section.get('book_title', ''),
chapter_number=self._clean_section_number(chapter_number),
chapter_title=chapter_title,
section_number=self._clean_section_number(section_number),
section_title=section_title,
subsection_number=self._clean_section_number(subsection_number),
subsection_title=subsection_title,
provider=section.get('provider', ''),
jurisdiction=section.get('jurisdiction', ''),
admin_area=section.get('admin_area', ''),
year=str(section.get('year', '')),
depth=section.get('depth', 1),
section_uid=section.get('section_uid', '')
)
def _get_chapter_info(self, section: Dict) -> Dict:
"""Get chapter information for a section by traversing hierarchy"""
if section.get('is_chapter'):
return {
'number': self._clean_section_number(section.get('section_number', '')),
'title': section.get('title', '')
}
# If not a chapter, we'd need to query parent hierarchy
# For now, return empty to keep it simple
return {'number': '', 'title': ''}
def _clean_section_number(self, section_number: str) -> str:
"""Clean and standardize section numbers"""
if not section_number:
return ''
# Remove extra whitespace and normalize
cleaned = re.sub(r'\s+', ' ', str(section_number)).strip()
return cleaned
def format_content_for_kb(self, section: Dict) -> str:
"""Format section content for knowledge base ingestion"""
content_parts = []
# Add title if available
section_title = section.get('section_title') or section.get('chapter_title', '')
if section_title:
content_parts.append(f"# {section_title}")
# Add section number context
if section.get('section_number'):
content_parts.append(f"Section: {section['section_number']}")
elif section.get('chapter_number'):
content_parts.append(f"Chapter: {section['chapter_number']}")
# Add main content - use text_content field
content = section.get('text_content', '').strip() if section.get('text_content') else ''
if content:
content_parts.append(content)
else:
content_parts.append("No content available for this section.")
return '\n\n'.join(content_parts)
def create_s3_key(self, metadata: BedrockMetadata, section: Dict) -> str:
"""Create S3 key following KB structure with proper hierarchy"""
# Pattern: codebooks3/{PROVIDER}/{BOOK_CODE}/chapter_{slug}/section_{number}.txt
provider = metadata.provider.upper()
book_code = metadata.book_code.replace(' ', '_').replace('/', '_')
# Create chapter slug
chapter_slug = self._create_slug(metadata.chapter_title, metadata.chapter_number, "chapter")
# Determine file type and identifier based on depth
depth = section.get('depth', 0)
section_id = str(section['id'])
# Always create section files (no separate chapter files to match existing KB pattern)
if depth <= 2: # Section level (depth 1-2)
filename = f"section_{metadata.section_number}".replace(' ', '_').replace('.', '_')
if not filename or filename == "section_":
filename = f"section_{section_id}"
else: # Subsection level (depth > 2)
filename = f"subsection_{metadata.subsection_number}".replace(' ', '_').replace('.', '_')
if not filename or filename == "subsection_":
filename = f"subsection_{section_id}"
return f"{self.s3_prefix}/{provider}/{book_code}/{chapter_slug}/{filename}.txt"
def _create_slug(self, title: str, number: str, prefix: str) -> str:
"""Create URL-safe slug from title or number"""
if title and title.strip():
# Clean title for slug
slug = re.sub(r'[^\w\s-]', '', title).strip()
slug = re.sub(r'[-\s]+', '_', slug).lower()
return f"{prefix}_{slug}"
elif number and number.strip():
# Use number if no title
clean_number = number.replace(' ', '_').replace('.', '_')
return f"{prefix}_{clean_number}"
else:
# Fallback
return f"{prefix}_unknown"
def upload_to_s3(self, key: str, content: str, metadata: BedrockMetadata) -> bool:
"""Upload content and metadata to S3"""
try:
# Upload content file
self.s3_client.put_object(
Bucket=self.s3_bucket,
Key=key,
Body=content.encode('utf-8'),
ContentType='text/plain'
)
# Upload metadata file
metadata_key = f"{key}.metadata.json"
self.s3_client.put_object(
Bucket=self.s3_bucket,
Key=metadata_key,
Body=json.dumps(metadata.to_dict(), indent=2).encode('utf-8'),
ContentType='application/json'
)
self.logger.info(f"Successfully uploaded: s3://{self.s3_bucket}/{key}")
return True
except Exception as e:
self.logger.error(f"Failed to upload {key}: {e}")
return False
def ingest_book(self, book_id: str, dry_run: bool = True) -> Dict:
"""Ingest a single book to S3 in KB format"""
self.logger.info(f"Starting S3 ingestion for book {book_id} (dry_run={dry_run})")
# Get book hierarchy with content
sections = self.get_book_hierarchy_with_content(book_id)
if not sections:
return {"status": "no_content", "message": "No sections found"}
# Filter sections that have content (exclude chapter-only records)
content_sections = [s for s in sections if s.get('has_content')]
if not content_sections:
return {"status": "no_content", "message": "No sections with content found"}
results = {
"book_id": book_id,
"book_code": content_sections[0].get('book_code', 'unknown'),
"total_sections": len(content_sections),
"uploaded": 0,
"failed": 0,
"skipped": 0,
"errors": [],
"s3_paths": []
}
for section in content_sections:
try:
# Skip if no content (we already filtered, but double-check)
if not section.get('has_content'):
results["skipped"] += 1
continue
# Create metadata
metadata = self.create_metadata(section)
# Format content
content = self.format_content_for_kb(section)
# Create S3 key
s3_key = self.create_s3_key(metadata, section)
if dry_run:
self.logger.info(f"DRY RUN - Would upload: {s3_key}")
results["uploaded"] += 1
results["s3_paths"].append(f"s3://{self.s3_bucket}/{s3_key}")
else:
# Upload to S3
if self.upload_to_s3(s3_key, content, metadata):
results["uploaded"] += 1
results["s3_paths"].append(f"s3://{self.s3_bucket}/{s3_key}")
else:
results["failed"] += 1
except Exception as e:
self.logger.error(f"Error processing section {section['id']}: {e}")
results["failed"] += 1
results["errors"].append(str(e))
return results
def sync_all_missing_books(self, dry_run: bool = True, providers: Optional[List[str]] = None) -> Dict:
"""Sync all books that are missing from the knowledge base"""
missing_books = self.get_db_books_not_in_kb()
if providers:
missing_books = [book for book in missing_books if book['provider'] in providers]
self.logger.info(f"Found {len(missing_books)} books to sync")
overall_results = {
"total_books": len(missing_books),
"processed": 0,
"successful": 0,
"failed": 0,
"book_results": []
}
for book in missing_books:
if book['content_count'] == 0:
self.logger.warning(f"Skipping book {book['book_code']} - no content available")
continue
try:
result = self.ingest_book(book['book_id'], dry_run=dry_run)
result['book_info'] = book
overall_results['book_results'].append(result)
overall_results['processed'] += 1
if result.get('uploaded', 0) > 0:
overall_results['successful'] += 1
else:
overall_results['failed'] += 1
except Exception as e:
self.logger.error(f"Failed to process book {book['book_code']}: {e}")
overall_results['failed'] += 1
return overall_results
def main():
"""CLI entry point for the S3 ingestion agent"""
import argparse
parser = argparse.ArgumentParser(description='Building Code S3 Ingestion Agent for Bedrock KB')
parser.add_argument('--book-id', help='Specific book ID to ingest')
parser.add_argument('--provider', action='append', help='Provider filter (ICC, NFPA, TDLR)')
parser.add_argument('--dry-run', action='store_true', default=True, help='Perform dry run without uploading (default: True)')
parser.add_argument('--execute', action='store_true', help='Actually upload to S3 (overrides dry-run)')
parser.add_argument('--s3-bucket', default='com.greenlite.engineering-machine-learning-dev', help='S3 bucket name')
parser.add_argument('--db-url', help='Database URL (uses environment variable if not provided)')
args = parser.parse_args()
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Determine if this is a dry run
dry_run = args.dry_run and not args.execute
agent = BedrockKBIngestionAgent(s3_bucket=args.s3_bucket, db_url=args.db_url)
if args.book_id:
result = agent.ingest_book(args.book_id, dry_run=dry_run)
print(json.dumps(result, indent=2))
else:
result = agent.sync_all_missing_books(dry_run=dry_run, providers=args.provider)
print(json.dumps(result, indent=2))
if __name__ == '__main__':
main()
"""
Bedrock Knowledge Base Ingestion Agent
This agent syncs building code data from the PostgreSQL database to AWS Bedrock Knowledge Base (YSASCPLSVW).
It handles metadata transformation, content formatting, and maintains consistency between systems.
"""
import json
import logging
import boto3
from typing import Dict, List, Optional
from dataclasses import dataclass
import re
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
@dataclass
class BedrockMetadata:
"""Standardized metadata structure matching existing KB format"""
book_code: str
book_title: str
chapter_number: str
chapter_title: str
section_number: str
section_title: str
subsection_number: str
subsection_title: str
provider: str
jurisdiction: str
admin_area: str
year: str
depth: int = 1
section_uid: str = ""
def to_dict(self) -> Dict:
"""Convert to dictionary matching existing KB metadata format"""
# Helper function to clean strings for Bedrock (ASCII only, no special chars)
def clean_string(value: str) -> str:
if not value:
return ""
# Replace common non-ASCII characters
replacements = {
'—': '-', '–': '-', # dashes
''': "'", ''': "'", '"': '"', '"': '"', # quotes
'é': 'e', 'á': 'a', 'í': 'i', 'ó': 'o', 'ú': 'u', 'ñ': 'n', # accents
'É': 'E', 'Á': 'A', 'Í': 'I', 'Ó': 'O', 'Ú': 'U', 'Ñ': 'N'
}
cleaned = value
for old, new in replacements.items():
cleaned = cleaned.replace(old, new)
# Remove any remaining non-ASCII
return cleaned.encode('ascii', 'ignore').decode('ascii').strip()
# Build subsection path (always contains section number)
subsection_path = []
section_num = self.subsection_number or self.section_number
if section_num:
subsection_path.append(section_num)
# Create source identifier (clean chapter slug)
chapter_slug = clean_string(self.chapter_title or "unknown").lower()
chapter_slug = chapter_slug.replace(' ', '-').replace(':', '').replace('(', '').replace(')', '')
source = f"{self.provider}_{self.book_code}_{chapter_slug}"
# Primary title (clean)
title = clean_string(self.section_title or self.subsection_title or self.chapter_title or "")
# Chapter filename (matches existing pattern: slug.html)
chapter_filename = f"{chapter_slug}.html" if chapter_slug != "unknown" else ""
# Section URL (matches existing pattern)
section_anchor = f"#{self.book_code}_{self.chapter_number or 'Ch'}_{section_num}" if section_num else ""
url = f"https://codes.iccsafe.org/content/{self.book_code}/{chapter_slug}{section_anchor}"
return {
"metadataAttributes": {
"code_book": self.book_code,
"section": section_num,
"subsection_path": subsection_path,
"title": title,
"depth": self.depth,
"source": source,
"chapter_filename": chapter_filename,
"chapter_title": clean_string(self.chapter_title or ""),
"url": url,
"jurisdiction": self.admin_area or "I-CODES",
"discipline": "Building", # Default, will be overridden from DB
"admin_area": self.admin_area or "I-CODES",
"year": str(self.year) if self.year else "2024",
"book_title": "", # Will be filled from DB
"book_url": f"https://codes.iccsafe.org/content/{self.book_code}"
}
}
class BedrockKBIngestionAgent:
"""Agent for ingesting building codes from DB to S3 in Bedrock KB format"""
def __init__(self, s3_bucket: str = "com.greenlite.engineering-machine-learning-dev", db_url: Optional[str] = None):
self.s3_bucket = s3_bucket
self.s3_prefix = "codebooks3"
self.logger = logging.getLogger(__name__)
# Initialize AWS S3 client only
self.s3_client = boto3.client('s3')
# Initialize database connection
if db_url:
self.engine = create_engine(db_url)
else:
# Use environment variable or default
import os
db_url = os.getenv('DATABASE_URL', 'postgresql://localhost:5432/building_codes')
self.engine = create_engine(db_url)
self.Session = sessionmaker(bind=self.engine)
def get_s3_info(self) -> Dict:
"""Get S3 bucket information"""
return {
'bucket': self.s3_bucket,
'prefix': self.s3_prefix,
'full_prefix': f's3://{self.s3_bucket}/{self.s3_prefix}/'
}
def get_db_books_not_in_kb(self) -> List[Dict]:
"""Identify books in database that are missing from knowledge base"""
with self.Session() as session:
query = text("""
SELECT DISTINCT
bcb.id as book_id,
bcb.book_code as book_code,
bcb.book_title as book_title,
bcb.book_provider as provider,
bcb.book_admin_area as admin_area,
bcb.book_year as year,
COUNT(bc.id) as section_count,
COUNT(CASE WHEN bc.text_content IS NOT NULL AND bc.text_content != 'NaN' AND bc.text_content != '' THEN 1 END) as content_count
FROM building_code_book bcb
LEFT JOIN building_code bc ON bc.book_id = bcb.id
WHERE bcb.book_provider IN ('ICC', 'NFPA', 'TDLR')
GROUP BY bcb.id, bcb.book_code, bcb.book_title, bcb.book_provider, bcb.book_admin_area, bcb.book_year
ORDER BY bcb.book_provider, bcb.book_code
""")
result = session.execute(query)
return [dict(row._mapping) for row in result]
def get_book_hierarchy_with_content(self, book_id: str) -> List[Dict]:
"""Get hierarchical structure with content for a specific book"""
with self.Session() as session:
# Simple query without recursion for now - get all sections with content
query = text("""
SELECT
bc.id,
bc.section_uid,
bc.section_number,
bc.section_title,
bc.chapter_number,
bc.chapter_title,
CASE
WHEN bc.text_content IS NOT NULL AND bc.text_content != 'NaN' AND bc.text_content != ''
THEN bc.text_content
ELSE NULL
END as text_content,
bc.text_content_html,
bc.content_id,
bc.parent_content_id,
bc.depth,
bc.sort_order,
bc.is_chapter,
bc.is_leaf,
bc.path,
bcb.book_code,
bcb.book_title,
bcb.book_provider as provider,
bcb.book_admin_area as admin_area,
bcb.book_year as year,
CASE
WHEN bc.text_content IS NOT NULL AND bc.text_content != 'NaN' AND bc.text_content != ''
THEN true
ELSE false
END as has_content
FROM building_code bc
JOIN building_code_book bcb ON bc.book_id = bcb.id
WHERE bc.book_id = :book_id
ORDER BY bc.sort_order, bc.chapter_number, bc.section_number
""")
result = session.execute(query, {"book_id": book_id})
return [dict(row._mapping) for row in result]
def create_metadata(self, section: Dict, chapter_info: Dict = None) -> BedrockMetadata:
"""Create standardized metadata from database section"""
# Use the actual database fields directly
chapter_number = section.get('chapter_number', '')
chapter_title = section.get('chapter_title', '')
section_number = section.get('section_number', '')
section_title = section.get('section_title', '')
# For subsections, we'll use depth to determine if this should be treated as a subsection
depth = section.get('depth', 0)
if depth > 2: # Deeper levels treated as subsections
subsection_number = section_number
subsection_title = section_title
section_number = ''
section_title = ''
else:
subsection_number = ''
subsection_title = ''
return BedrockMetadata(
book_code=section.get('book_code', ''),
book_title=section.get('book_title', ''),
chapter_number=self._clean_section_number(chapter_number),
chapter_title=chapter_title,
section_number=self._clean_section_number(section_number),
section_title=section_title,
subsection_number=self._clean_section_number(subsection_number),
subsection_title=subsection_title,
provider=section.get('provider', ''),
jurisdiction=section.get('jurisdiction', ''),
admin_area=section.get('admin_area', ''),
year=str(section.get('year', '')),
depth=section.get('depth', 1),
section_uid=section.get('section_uid', '')
)
def _get_chapter_info(self, section: Dict) -> Dict:
"""Get chapter information for a section by traversing hierarchy"""
if section.get('is_chapter'):
return {
'number': self._clean_section_number(section.get('section_number', '')),
'title': section.get('title', '')
}
# If not a chapter, we'd need to query parent hierarchy
# For now, return empty to keep it simple
return {'number': '', 'title': ''}
def _clean_section_number(self, section_number: str) -> str:
"""Clean and standardize section numbers"""
if not section_number:
return ''
# Remove extra whitespace and normalize
cleaned = re.sub(r'\s+', ' ', str(section_number)).strip()
return cleaned
def format_content_for_kb(self, section: Dict) -> str:
"""Format section content for knowledge base ingestion"""
content_parts = []
# Add title if available
section_title = section.get('section_title') or section.get('chapter_title', '')
if section_title:
content_parts.append(f"# {section_title}")
# Add section number context
if section.get('section_number'):
content_parts.append(f"Section: {section['section_number']}")
elif section.get('chapter_number'):
content_parts.append(f"Chapter: {section['chapter_number']}")
# Add main content - use text_content field
content = section.get('text_content', '').strip() if section.get('text_content') else ''
if content:
content_parts.append(content)
else:
content_parts.append("No content available for this section.")
return '\n\n'.join(content_parts)
def create_s3_key(self, metadata: BedrockMetadata, section: Dict) -> str:
"""Create S3 key following KB structure with proper hierarchy"""
# Pattern: codebooks3/{PROVIDER}/{BOOK_CODE}/chapter_{slug}/section_{number}.txt
provider = metadata.provider.upper()
book_code = metadata.book_code.replace(' ', '_').replace('/', '_')
# Create chapter slug
chapter_slug = self._create_slug(metadata.chapter_title, metadata.chapter_number, "chapter")
# Determine file type and identifier based on depth
depth = section.get('depth', 0)
section_id = str(section['id'])
# Always create section files (no separate chapter files to match existing KB pattern)
if depth <= 2: # Section level (depth 1-2)
filename = f"section_{metadata.section_number}".replace(' ', '_').replace('.', '_')
if not filename or filename == "section_":
filename = f"section_{section_id}"
else: # Subsection level (depth > 2)
filename = f"subsection_{metadata.subsection_number}".replace(' ', '_').replace('.', '_')
if not filename or filename == "subsection_":
filename = f"subsection_{section_id}"
return f"{self.s3_prefix}/{provider}/{book_code}/{chapter_slug}/{filename}.txt"
def _create_slug(self, title: str, number: str, prefix: str) -> str:
"""Create URL-safe slug from title or number"""
if title and title.strip():
# Clean title for slug
slug = re.sub(r'[^\w\s-]', '', title).strip()
slug = re.sub(r'[-\s]+', '_', slug).lower()
return f"{prefix}_{slug}"
elif number and number.strip():
# Use number if no title
clean_number = number.replace(' ', '_').replace('.', '_')
return f"{prefix}_{clean_number}"
else:
# Fallback
return f"{prefix}_unknown"
def upload_to_s3(self, key: str, content: str, metadata: BedrockMetadata) -> bool:
"""Upload content and metadata to S3"""
try:
# Upload content file
self.s3_client.put_object(
Bucket=self.s3_bucket,
Key=key,
Body=content.encode('utf-8'),
ContentType='text/plain'
)
# Upload metadata file
metadata_key = f"{key}.metadata.json"
self.s3_client.put_object(
Bucket=self.s3_bucket,
Key=metadata_key,
Body=json.dumps(metadata.to_dict(), indent=2).encode('utf-8'),
ContentType='application/json'
)
self.logger.info(f"Successfully uploaded: s3://{self.s3_bucket}/{key}")
return True
except Exception as e:
self.logger.error(f"Failed to upload {key}: {e}")
return False
def ingest_book(self, book_id: str, dry_run: bool = True) -> Dict:
"""Ingest a single book to S3 in KB format"""
self.logger.info(f"Starting S3 ingestion for book {book_id} (dry_run={dry_run})")
# Get book hierarchy with content
sections = self.get_book_hierarchy_with_content(book_id)
if not sections:
return {"status": "no_content", "message": "No sections found"}
# Filter sections that have content (exclude chapter-only records)
content_sections = [s for s in sections if s.get('has_content')]
if not content_sections:
return {"status": "no_content", "message": "No sections with content found"}
results = {
"book_id": book_id,
"book_code": content_sections[0].get('book_code', 'unknown'),
"total_sections": len(content_sections),
"uploaded": 0,
"failed": 0,
"skipped": 0,
"errors": [],
"s3_paths": []
}
for section in content_sections:
try:
# Skip if no content (we already filtered, but double-check)
if not section.get('has_content'):
results["skipped"] += 1
continue
# Create metadata
metadata = self.create_metadata(section)
# Format content
content = self.format_content_for_kb(section)
# Create S3 key
s3_key = self.create_s3_key(metadata, section)
if dry_run:
self.logger.info(f"DRY RUN - Would upload: {s3_key}")
results["uploaded"] += 1
results["s3_paths"].append(f"s3://{self.s3_bucket}/{s3_key}")
else:
# Upload to S3
if self.upload_to_s3(s3_key, content, metadata):
results["uploaded"] += 1
results["s3_paths"].append(f"s3://{self.s3_bucket}/{s3_key}")
else:
results["failed"] += 1
except Exception as e:
self.logger.error(f"Error processing section {section['id']}: {e}")
results["failed"] += 1
results["errors"].append(str(e))
return results
def sync_all_missing_books(self, dry_run: bool = True, providers: Optional[List[str]] = None) -> Dict:
"""Sync all books that are missing from the knowledge base"""
missing_books = self.get_db_books_not_in_kb()
if providers:
missing_books = [book for book in missing_books if book['provider'] in providers]
self.logger.info(f"Found {len(missing_books)} books to sync")
overall_results = {
"total_books": len(missing_books),
"processed": 0,
"successful": 0,
"failed": 0,
"book_results": []
}
for book in missing_books:
if book['content_count'] == 0:
self.logger.warning(f"Skipping book {book['book_code']} - no content available")
continue
try:
result = self.ingest_book(book['book_id'], dry_run=dry_run)
result['book_info'] = book
overall_results['book_results'].append(result)
overall_results['processed'] += 1
if result.get('uploaded', 0) > 0:
overall_results['successful'] += 1
else:
overall_results['failed'] += 1
except Exception as e:
self.logger.error(f"Failed to process book {book['book_code']}: {e}")
overall_results['failed'] += 1
return overall_results
def main():
"""CLI entry point for the S3 ingestion agent"""
import argparse
parser = argparse.ArgumentParser(description='Building Code S3 Ingestion Agent for Bedrock KB')
parser.add_argument('--book-id', help='Specific book ID to ingest')
parser.add_argument('--provider', action='append', help='Provider filter (ICC, NFPA, TDLR)')
parser.add_argument('--dry-run', action='store_true', default=True, help='Perform dry run without uploading (default: True)')
parser.add_argument('--execute', action='store_true', help='Actually upload to S3 (overrides dry-run)')
parser.add_argument('--s3-bucket', default='com.greenlite.engineering-machine-learning-dev', help='S3 bucket name')
parser.add_argument('--db-url', help='Database URL (uses environment variable if not provided)')
args = parser.parse_args()
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Determine if this is a dry run
dry_run = args.dry_run and not args.execute
agent = BedrockKBIngestionAgent(s3_bucket=args.s3_bucket, db_url=args.db_url)
if args.book_id:
result = agent.ingest_book(args.book_id, dry_run=dry_run)
print(json.dumps(result, indent=2))
else:
result = agent.sync_all_missing_books(dry_run=dry_run, providers=args.provider)
print(json.dumps(result, indent=2))
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment