Created
October 10, 2025 16:10
-
-
Save ebridges/26e26af2596e9f458f25c00d076ba5f6 to your computer and use it in GitHub Desktop.
ingest code books to kb
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Bedrock Knowledge Base Ingestion Agent | |
| This agent syncs building code data from the PostgreSQL database to AWS Bedrock Knowledge Base (YSASCPLSVW). | |
| It handles metadata transformation, content formatting, and maintains consistency between systems. | |
| """ | |
| import json | |
| import logging | |
| import boto3 | |
| from typing import Dict, List, Optional | |
| from dataclasses import dataclass | |
| import re | |
| from sqlalchemy import create_engine, text | |
| from sqlalchemy.orm import sessionmaker | |
| @dataclass | |
| class BedrockMetadata: | |
| """Standardized metadata structure matching existing KB format""" | |
| book_code: str | |
| book_title: str | |
| chapter_number: str | |
| chapter_title: str | |
| section_number: str | |
| section_title: str | |
| subsection_number: str | |
| subsection_title: str | |
| provider: str | |
| jurisdiction: str | |
| admin_area: str | |
| year: str | |
| depth: int = 1 | |
| section_uid: str = "" | |
| def to_dict(self) -> Dict: | |
| """Convert to dictionary matching existing KB metadata format""" | |
| # Helper function to clean strings for Bedrock (ASCII only, no special chars) | |
| def clean_string(value: str) -> str: | |
| if not value: | |
| return "" | |
| # Replace common non-ASCII characters | |
| replacements = { | |
| '—': '-', '–': '-', # dashes | |
| ''': "'", ''': "'", '"': '"', '"': '"', # quotes | |
| 'é': 'e', 'á': 'a', 'í': 'i', 'ó': 'o', 'ú': 'u', 'ñ': 'n', # accents | |
| 'É': 'E', 'Á': 'A', 'Í': 'I', 'Ó': 'O', 'Ú': 'U', 'Ñ': 'N' | |
| } | |
| cleaned = value | |
| for old, new in replacements.items(): | |
| cleaned = cleaned.replace(old, new) | |
| # Remove any remaining non-ASCII | |
| return cleaned.encode('ascii', 'ignore').decode('ascii').strip() | |
| # Build subsection path (always contains section number) | |
| subsection_path = [] | |
| section_num = self.subsection_number or self.section_number | |
| if section_num: | |
| subsection_path.append(section_num) | |
| # Create source identifier (clean chapter slug) | |
| chapter_slug = clean_string(self.chapter_title or "unknown").lower() | |
| chapter_slug = chapter_slug.replace(' ', '-').replace(':', '').replace('(', '').replace(')', '') | |
| source = f"{self.provider}_{self.book_code}_{chapter_slug}" | |
| # Primary title (clean) | |
| title = clean_string(self.section_title or self.subsection_title or self.chapter_title or "") | |
| # Chapter filename (matches existing pattern: slug.html) | |
| chapter_filename = f"{chapter_slug}.html" if chapter_slug != "unknown" else "" | |
| # Section URL (matches existing pattern) | |
| section_anchor = f"#{self.book_code}_{self.chapter_number or 'Ch'}_{section_num}" if section_num else "" | |
| url = f"https://codes.iccsafe.org/content/{self.book_code}/{chapter_slug}{section_anchor}" | |
| return { | |
| "metadataAttributes": { | |
| "code_book": self.book_code, | |
| "section": section_num, | |
| "subsection_path": subsection_path, | |
| "title": title, | |
| "depth": self.depth, | |
| "source": source, | |
| "chapter_filename": chapter_filename, | |
| "chapter_title": clean_string(self.chapter_title or ""), | |
| "url": url, | |
| "jurisdiction": self.admin_area or "I-CODES", | |
| "discipline": "Building", # Default, will be overridden from DB | |
| "admin_area": self.admin_area or "I-CODES", | |
| "year": str(self.year) if self.year else "2024", | |
| "book_title": "", # Will be filled from DB | |
| "book_url": f"https://codes.iccsafe.org/content/{self.book_code}" | |
| } | |
| } | |
| class BedrockKBIngestionAgent: | |
| """Agent for ingesting building codes from DB to S3 in Bedrock KB format""" | |
| def __init__(self, s3_bucket: str = "com.greenlite.engineering-machine-learning-dev", db_url: Optional[str] = None): | |
| self.s3_bucket = s3_bucket | |
| self.s3_prefix = "codebooks3" | |
| self.logger = logging.getLogger(__name__) | |
| # Initialize AWS S3 client only | |
| self.s3_client = boto3.client('s3') | |
| # Initialize database connection | |
| if db_url: | |
| self.engine = create_engine(db_url) | |
| else: | |
| # Use environment variable or default | |
| import os | |
| db_url = os.getenv('DATABASE_URL', 'postgresql://localhost:5432/building_codes') | |
| self.engine = create_engine(db_url) | |
| self.Session = sessionmaker(bind=self.engine) | |
| def get_s3_info(self) -> Dict: | |
| """Get S3 bucket information""" | |
| return { | |
| 'bucket': self.s3_bucket, | |
| 'prefix': self.s3_prefix, | |
| 'full_prefix': f's3://{self.s3_bucket}/{self.s3_prefix}/' | |
| } | |
| def get_db_books_not_in_kb(self) -> List[Dict]: | |
| """Identify books in database that are missing from knowledge base""" | |
| with self.Session() as session: | |
| query = text(""" | |
| SELECT DISTINCT | |
| bcb.id as book_id, | |
| bcb.book_code as book_code, | |
| bcb.book_title as book_title, | |
| bcb.book_provider as provider, | |
| bcb.book_admin_area as admin_area, | |
| bcb.book_year as year, | |
| COUNT(bc.id) as section_count, | |
| COUNT(CASE WHEN bc.text_content IS NOT NULL AND bc.text_content != 'NaN' AND bc.text_content != '' THEN 1 END) as content_count | |
| FROM building_code_book bcb | |
| LEFT JOIN building_code bc ON bc.book_id = bcb.id | |
| WHERE bcb.book_provider IN ('ICC', 'NFPA', 'TDLR') | |
| GROUP BY bcb.id, bcb.book_code, bcb.book_title, bcb.book_provider, bcb.book_admin_area, bcb.book_year | |
| ORDER BY bcb.book_provider, bcb.book_code | |
| """) | |
| result = session.execute(query) | |
| return [dict(row._mapping) for row in result] | |
| def get_book_hierarchy_with_content(self, book_id: str) -> List[Dict]: | |
| """Get hierarchical structure with content for a specific book""" | |
| with self.Session() as session: | |
| # Simple query without recursion for now - get all sections with content | |
| query = text(""" | |
| SELECT | |
| bc.id, | |
| bc.section_uid, | |
| bc.section_number, | |
| bc.section_title, | |
| bc.chapter_number, | |
| bc.chapter_title, | |
| CASE | |
| WHEN bc.text_content IS NOT NULL AND bc.text_content != 'NaN' AND bc.text_content != '' | |
| THEN bc.text_content | |
| ELSE NULL | |
| END as text_content, | |
| bc.text_content_html, | |
| bc.content_id, | |
| bc.parent_content_id, | |
| bc.depth, | |
| bc.sort_order, | |
| bc.is_chapter, | |
| bc.is_leaf, | |
| bc.path, | |
| bcb.book_code, | |
| bcb.book_title, | |
| bcb.book_provider as provider, | |
| bcb.book_admin_area as admin_area, | |
| bcb.book_year as year, | |
| CASE | |
| WHEN bc.text_content IS NOT NULL AND bc.text_content != 'NaN' AND bc.text_content != '' | |
| THEN true | |
| ELSE false | |
| END as has_content | |
| FROM building_code bc | |
| JOIN building_code_book bcb ON bc.book_id = bcb.id | |
| WHERE bc.book_id = :book_id | |
| ORDER BY bc.sort_order, bc.chapter_number, bc.section_number | |
| """) | |
| result = session.execute(query, {"book_id": book_id}) | |
| return [dict(row._mapping) for row in result] | |
| def create_metadata(self, section: Dict, chapter_info: Dict = None) -> BedrockMetadata: | |
| """Create standardized metadata from database section""" | |
| # Use the actual database fields directly | |
| chapter_number = section.get('chapter_number', '') | |
| chapter_title = section.get('chapter_title', '') | |
| section_number = section.get('section_number', '') | |
| section_title = section.get('section_title', '') | |
| # For subsections, we'll use depth to determine if this should be treated as a subsection | |
| depth = section.get('depth', 0) | |
| if depth > 2: # Deeper levels treated as subsections | |
| subsection_number = section_number | |
| subsection_title = section_title | |
| section_number = '' | |
| section_title = '' | |
| else: | |
| subsection_number = '' | |
| subsection_title = '' | |
| return BedrockMetadata( | |
| book_code=section.get('book_code', ''), | |
| book_title=section.get('book_title', ''), | |
| chapter_number=self._clean_section_number(chapter_number), | |
| chapter_title=chapter_title, | |
| section_number=self._clean_section_number(section_number), | |
| section_title=section_title, | |
| subsection_number=self._clean_section_number(subsection_number), | |
| subsection_title=subsection_title, | |
| provider=section.get('provider', ''), | |
| jurisdiction=section.get('jurisdiction', ''), | |
| admin_area=section.get('admin_area', ''), | |
| year=str(section.get('year', '')), | |
| depth=section.get('depth', 1), | |
| section_uid=section.get('section_uid', '') | |
| ) | |
| def _get_chapter_info(self, section: Dict) -> Dict: | |
| """Get chapter information for a section by traversing hierarchy""" | |
| if section.get('is_chapter'): | |
| return { | |
| 'number': self._clean_section_number(section.get('section_number', '')), | |
| 'title': section.get('title', '') | |
| } | |
| # If not a chapter, we'd need to query parent hierarchy | |
| # For now, return empty to keep it simple | |
| return {'number': '', 'title': ''} | |
| def _clean_section_number(self, section_number: str) -> str: | |
| """Clean and standardize section numbers""" | |
| if not section_number: | |
| return '' | |
| # Remove extra whitespace and normalize | |
| cleaned = re.sub(r'\s+', ' ', str(section_number)).strip() | |
| return cleaned | |
| def format_content_for_kb(self, section: Dict) -> str: | |
| """Format section content for knowledge base ingestion""" | |
| content_parts = [] | |
| # Add title if available | |
| section_title = section.get('section_title') or section.get('chapter_title', '') | |
| if section_title: | |
| content_parts.append(f"# {section_title}") | |
| # Add section number context | |
| if section.get('section_number'): | |
| content_parts.append(f"Section: {section['section_number']}") | |
| elif section.get('chapter_number'): | |
| content_parts.append(f"Chapter: {section['chapter_number']}") | |
| # Add main content - use text_content field | |
| content = section.get('text_content', '').strip() if section.get('text_content') else '' | |
| if content: | |
| content_parts.append(content) | |
| else: | |
| content_parts.append("No content available for this section.") | |
| return '\n\n'.join(content_parts) | |
| def create_s3_key(self, metadata: BedrockMetadata, section: Dict) -> str: | |
| """Create S3 key following KB structure with proper hierarchy""" | |
| # Pattern: codebooks3/{PROVIDER}/{BOOK_CODE}/chapter_{slug}/section_{number}.txt | |
| provider = metadata.provider.upper() | |
| book_code = metadata.book_code.replace(' ', '_').replace('/', '_') | |
| # Create chapter slug | |
| chapter_slug = self._create_slug(metadata.chapter_title, metadata.chapter_number, "chapter") | |
| # Determine file type and identifier based on depth | |
| depth = section.get('depth', 0) | |
| section_id = str(section['id']) | |
| # Always create section files (no separate chapter files to match existing KB pattern) | |
| if depth <= 2: # Section level (depth 1-2) | |
| filename = f"section_{metadata.section_number}".replace(' ', '_').replace('.', '_') | |
| if not filename or filename == "section_": | |
| filename = f"section_{section_id}" | |
| else: # Subsection level (depth > 2) | |
| filename = f"subsection_{metadata.subsection_number}".replace(' ', '_').replace('.', '_') | |
| if not filename or filename == "subsection_": | |
| filename = f"subsection_{section_id}" | |
| return f"{self.s3_prefix}/{provider}/{book_code}/{chapter_slug}/{filename}.txt" | |
| def _create_slug(self, title: str, number: str, prefix: str) -> str: | |
| """Create URL-safe slug from title or number""" | |
| if title and title.strip(): | |
| # Clean title for slug | |
| slug = re.sub(r'[^\w\s-]', '', title).strip() | |
| slug = re.sub(r'[-\s]+', '_', slug).lower() | |
| return f"{prefix}_{slug}" | |
| elif number and number.strip(): | |
| # Use number if no title | |
| clean_number = number.replace(' ', '_').replace('.', '_') | |
| return f"{prefix}_{clean_number}" | |
| else: | |
| # Fallback | |
| return f"{prefix}_unknown" | |
| def upload_to_s3(self, key: str, content: str, metadata: BedrockMetadata) -> bool: | |
| """Upload content and metadata to S3""" | |
| try: | |
| # Upload content file | |
| self.s3_client.put_object( | |
| Bucket=self.s3_bucket, | |
| Key=key, | |
| Body=content.encode('utf-8'), | |
| ContentType='text/plain' | |
| ) | |
| # Upload metadata file | |
| metadata_key = f"{key}.metadata.json" | |
| self.s3_client.put_object( | |
| Bucket=self.s3_bucket, | |
| Key=metadata_key, | |
| Body=json.dumps(metadata.to_dict(), indent=2).encode('utf-8'), | |
| ContentType='application/json' | |
| ) | |
| self.logger.info(f"Successfully uploaded: s3://{self.s3_bucket}/{key}") | |
| return True | |
| except Exception as e: | |
| self.logger.error(f"Failed to upload {key}: {e}") | |
| return False | |
| def ingest_book(self, book_id: str, dry_run: bool = True) -> Dict: | |
| """Ingest a single book to S3 in KB format""" | |
| self.logger.info(f"Starting S3 ingestion for book {book_id} (dry_run={dry_run})") | |
| # Get book hierarchy with content | |
| sections = self.get_book_hierarchy_with_content(book_id) | |
| if not sections: | |
| return {"status": "no_content", "message": "No sections found"} | |
| # Filter sections that have content (exclude chapter-only records) | |
| content_sections = [s for s in sections if s.get('has_content')] | |
| if not content_sections: | |
| return {"status": "no_content", "message": "No sections with content found"} | |
| results = { | |
| "book_id": book_id, | |
| "book_code": content_sections[0].get('book_code', 'unknown'), | |
| "total_sections": len(content_sections), | |
| "uploaded": 0, | |
| "failed": 0, | |
| "skipped": 0, | |
| "errors": [], | |
| "s3_paths": [] | |
| } | |
| for section in content_sections: | |
| try: | |
| # Skip if no content (we already filtered, but double-check) | |
| if not section.get('has_content'): | |
| results["skipped"] += 1 | |
| continue | |
| # Create metadata | |
| metadata = self.create_metadata(section) | |
| # Format content | |
| content = self.format_content_for_kb(section) | |
| # Create S3 key | |
| s3_key = self.create_s3_key(metadata, section) | |
| if dry_run: | |
| self.logger.info(f"DRY RUN - Would upload: {s3_key}") | |
| results["uploaded"] += 1 | |
| results["s3_paths"].append(f"s3://{self.s3_bucket}/{s3_key}") | |
| else: | |
| # Upload to S3 | |
| if self.upload_to_s3(s3_key, content, metadata): | |
| results["uploaded"] += 1 | |
| results["s3_paths"].append(f"s3://{self.s3_bucket}/{s3_key}") | |
| else: | |
| results["failed"] += 1 | |
| except Exception as e: | |
| self.logger.error(f"Error processing section {section['id']}: {e}") | |
| results["failed"] += 1 | |
| results["errors"].append(str(e)) | |
| return results | |
| def sync_all_missing_books(self, dry_run: bool = True, providers: Optional[List[str]] = None) -> Dict: | |
| """Sync all books that are missing from the knowledge base""" | |
| missing_books = self.get_db_books_not_in_kb() | |
| if providers: | |
| missing_books = [book for book in missing_books if book['provider'] in providers] | |
| self.logger.info(f"Found {len(missing_books)} books to sync") | |
| overall_results = { | |
| "total_books": len(missing_books), | |
| "processed": 0, | |
| "successful": 0, | |
| "failed": 0, | |
| "book_results": [] | |
| } | |
| for book in missing_books: | |
| if book['content_count'] == 0: | |
| self.logger.warning(f"Skipping book {book['book_code']} - no content available") | |
| continue | |
| try: | |
| result = self.ingest_book(book['book_id'], dry_run=dry_run) | |
| result['book_info'] = book | |
| overall_results['book_results'].append(result) | |
| overall_results['processed'] += 1 | |
| if result.get('uploaded', 0) > 0: | |
| overall_results['successful'] += 1 | |
| else: | |
| overall_results['failed'] += 1 | |
| except Exception as e: | |
| self.logger.error(f"Failed to process book {book['book_code']}: {e}") | |
| overall_results['failed'] += 1 | |
| return overall_results | |
| def main(): | |
| """CLI entry point for the S3 ingestion agent""" | |
| import argparse | |
| parser = argparse.ArgumentParser(description='Building Code S3 Ingestion Agent for Bedrock KB') | |
| parser.add_argument('--book-id', help='Specific book ID to ingest') | |
| parser.add_argument('--provider', action='append', help='Provider filter (ICC, NFPA, TDLR)') | |
| parser.add_argument('--dry-run', action='store_true', default=True, help='Perform dry run without uploading (default: True)') | |
| parser.add_argument('--execute', action='store_true', help='Actually upload to S3 (overrides dry-run)') | |
| parser.add_argument('--s3-bucket', default='com.greenlite.engineering-machine-learning-dev', help='S3 bucket name') | |
| parser.add_argument('--db-url', help='Database URL (uses environment variable if not provided)') | |
| args = parser.parse_args() | |
| # Setup logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| # Determine if this is a dry run | |
| dry_run = args.dry_run and not args.execute | |
| agent = BedrockKBIngestionAgent(s3_bucket=args.s3_bucket, db_url=args.db_url) | |
| if args.book_id: | |
| result = agent.ingest_book(args.book_id, dry_run=dry_run) | |
| print(json.dumps(result, indent=2)) | |
| else: | |
| result = agent.sync_all_missing_books(dry_run=dry_run, providers=args.provider) | |
| print(json.dumps(result, indent=2)) | |
| if __name__ == '__main__': | |
| main() | |
| """ | |
| Bedrock Knowledge Base Ingestion Agent | |
| This agent syncs building code data from the PostgreSQL database to AWS Bedrock Knowledge Base (YSASCPLSVW). | |
| It handles metadata transformation, content formatting, and maintains consistency between systems. | |
| """ | |
| import json | |
| import logging | |
| import boto3 | |
| from typing import Dict, List, Optional | |
| from dataclasses import dataclass | |
| import re | |
| from sqlalchemy import create_engine, text | |
| from sqlalchemy.orm import sessionmaker | |
| @dataclass | |
| class BedrockMetadata: | |
| """Standardized metadata structure matching existing KB format""" | |
| book_code: str | |
| book_title: str | |
| chapter_number: str | |
| chapter_title: str | |
| section_number: str | |
| section_title: str | |
| subsection_number: str | |
| subsection_title: str | |
| provider: str | |
| jurisdiction: str | |
| admin_area: str | |
| year: str | |
| depth: int = 1 | |
| section_uid: str = "" | |
| def to_dict(self) -> Dict: | |
| """Convert to dictionary matching existing KB metadata format""" | |
| # Helper function to clean strings for Bedrock (ASCII only, no special chars) | |
| def clean_string(value: str) -> str: | |
| if not value: | |
| return "" | |
| # Replace common non-ASCII characters | |
| replacements = { | |
| '—': '-', '–': '-', # dashes | |
| ''': "'", ''': "'", '"': '"', '"': '"', # quotes | |
| 'é': 'e', 'á': 'a', 'í': 'i', 'ó': 'o', 'ú': 'u', 'ñ': 'n', # accents | |
| 'É': 'E', 'Á': 'A', 'Í': 'I', 'Ó': 'O', 'Ú': 'U', 'Ñ': 'N' | |
| } | |
| cleaned = value | |
| for old, new in replacements.items(): | |
| cleaned = cleaned.replace(old, new) | |
| # Remove any remaining non-ASCII | |
| return cleaned.encode('ascii', 'ignore').decode('ascii').strip() | |
| # Build subsection path (always contains section number) | |
| subsection_path = [] | |
| section_num = self.subsection_number or self.section_number | |
| if section_num: | |
| subsection_path.append(section_num) | |
| # Create source identifier (clean chapter slug) | |
| chapter_slug = clean_string(self.chapter_title or "unknown").lower() | |
| chapter_slug = chapter_slug.replace(' ', '-').replace(':', '').replace('(', '').replace(')', '') | |
| source = f"{self.provider}_{self.book_code}_{chapter_slug}" | |
| # Primary title (clean) | |
| title = clean_string(self.section_title or self.subsection_title or self.chapter_title or "") | |
| # Chapter filename (matches existing pattern: slug.html) | |
| chapter_filename = f"{chapter_slug}.html" if chapter_slug != "unknown" else "" | |
| # Section URL (matches existing pattern) | |
| section_anchor = f"#{self.book_code}_{self.chapter_number or 'Ch'}_{section_num}" if section_num else "" | |
| url = f"https://codes.iccsafe.org/content/{self.book_code}/{chapter_slug}{section_anchor}" | |
| return { | |
| "metadataAttributes": { | |
| "code_book": self.book_code, | |
| "section": section_num, | |
| "subsection_path": subsection_path, | |
| "title": title, | |
| "depth": self.depth, | |
| "source": source, | |
| "chapter_filename": chapter_filename, | |
| "chapter_title": clean_string(self.chapter_title or ""), | |
| "url": url, | |
| "jurisdiction": self.admin_area or "I-CODES", | |
| "discipline": "Building", # Default, will be overridden from DB | |
| "admin_area": self.admin_area or "I-CODES", | |
| "year": str(self.year) if self.year else "2024", | |
| "book_title": "", # Will be filled from DB | |
| "book_url": f"https://codes.iccsafe.org/content/{self.book_code}" | |
| } | |
| } | |
| class BedrockKBIngestionAgent: | |
| """Agent for ingesting building codes from DB to S3 in Bedrock KB format""" | |
| def __init__(self, s3_bucket: str = "com.greenlite.engineering-machine-learning-dev", db_url: Optional[str] = None): | |
| self.s3_bucket = s3_bucket | |
| self.s3_prefix = "codebooks3" | |
| self.logger = logging.getLogger(__name__) | |
| # Initialize AWS S3 client only | |
| self.s3_client = boto3.client('s3') | |
| # Initialize database connection | |
| if db_url: | |
| self.engine = create_engine(db_url) | |
| else: | |
| # Use environment variable or default | |
| import os | |
| db_url = os.getenv('DATABASE_URL', 'postgresql://localhost:5432/building_codes') | |
| self.engine = create_engine(db_url) | |
| self.Session = sessionmaker(bind=self.engine) | |
| def get_s3_info(self) -> Dict: | |
| """Get S3 bucket information""" | |
| return { | |
| 'bucket': self.s3_bucket, | |
| 'prefix': self.s3_prefix, | |
| 'full_prefix': f's3://{self.s3_bucket}/{self.s3_prefix}/' | |
| } | |
| def get_db_books_not_in_kb(self) -> List[Dict]: | |
| """Identify books in database that are missing from knowledge base""" | |
| with self.Session() as session: | |
| query = text(""" | |
| SELECT DISTINCT | |
| bcb.id as book_id, | |
| bcb.book_code as book_code, | |
| bcb.book_title as book_title, | |
| bcb.book_provider as provider, | |
| bcb.book_admin_area as admin_area, | |
| bcb.book_year as year, | |
| COUNT(bc.id) as section_count, | |
| COUNT(CASE WHEN bc.text_content IS NOT NULL AND bc.text_content != 'NaN' AND bc.text_content != '' THEN 1 END) as content_count | |
| FROM building_code_book bcb | |
| LEFT JOIN building_code bc ON bc.book_id = bcb.id | |
| WHERE bcb.book_provider IN ('ICC', 'NFPA', 'TDLR') | |
| GROUP BY bcb.id, bcb.book_code, bcb.book_title, bcb.book_provider, bcb.book_admin_area, bcb.book_year | |
| ORDER BY bcb.book_provider, bcb.book_code | |
| """) | |
| result = session.execute(query) | |
| return [dict(row._mapping) for row in result] | |
| def get_book_hierarchy_with_content(self, book_id: str) -> List[Dict]: | |
| """Get hierarchical structure with content for a specific book""" | |
| with self.Session() as session: | |
| # Simple query without recursion for now - get all sections with content | |
| query = text(""" | |
| SELECT | |
| bc.id, | |
| bc.section_uid, | |
| bc.section_number, | |
| bc.section_title, | |
| bc.chapter_number, | |
| bc.chapter_title, | |
| CASE | |
| WHEN bc.text_content IS NOT NULL AND bc.text_content != 'NaN' AND bc.text_content != '' | |
| THEN bc.text_content | |
| ELSE NULL | |
| END as text_content, | |
| bc.text_content_html, | |
| bc.content_id, | |
| bc.parent_content_id, | |
| bc.depth, | |
| bc.sort_order, | |
| bc.is_chapter, | |
| bc.is_leaf, | |
| bc.path, | |
| bcb.book_code, | |
| bcb.book_title, | |
| bcb.book_provider as provider, | |
| bcb.book_admin_area as admin_area, | |
| bcb.book_year as year, | |
| CASE | |
| WHEN bc.text_content IS NOT NULL AND bc.text_content != 'NaN' AND bc.text_content != '' | |
| THEN true | |
| ELSE false | |
| END as has_content | |
| FROM building_code bc | |
| JOIN building_code_book bcb ON bc.book_id = bcb.id | |
| WHERE bc.book_id = :book_id | |
| ORDER BY bc.sort_order, bc.chapter_number, bc.section_number | |
| """) | |
| result = session.execute(query, {"book_id": book_id}) | |
| return [dict(row._mapping) for row in result] | |
| def create_metadata(self, section: Dict, chapter_info: Dict = None) -> BedrockMetadata: | |
| """Create standardized metadata from database section""" | |
| # Use the actual database fields directly | |
| chapter_number = section.get('chapter_number', '') | |
| chapter_title = section.get('chapter_title', '') | |
| section_number = section.get('section_number', '') | |
| section_title = section.get('section_title', '') | |
| # For subsections, we'll use depth to determine if this should be treated as a subsection | |
| depth = section.get('depth', 0) | |
| if depth > 2: # Deeper levels treated as subsections | |
| subsection_number = section_number | |
| subsection_title = section_title | |
| section_number = '' | |
| section_title = '' | |
| else: | |
| subsection_number = '' | |
| subsection_title = '' | |
| return BedrockMetadata( | |
| book_code=section.get('book_code', ''), | |
| book_title=section.get('book_title', ''), | |
| chapter_number=self._clean_section_number(chapter_number), | |
| chapter_title=chapter_title, | |
| section_number=self._clean_section_number(section_number), | |
| section_title=section_title, | |
| subsection_number=self._clean_section_number(subsection_number), | |
| subsection_title=subsection_title, | |
| provider=section.get('provider', ''), | |
| jurisdiction=section.get('jurisdiction', ''), | |
| admin_area=section.get('admin_area', ''), | |
| year=str(section.get('year', '')), | |
| depth=section.get('depth', 1), | |
| section_uid=section.get('section_uid', '') | |
| ) | |
| def _get_chapter_info(self, section: Dict) -> Dict: | |
| """Get chapter information for a section by traversing hierarchy""" | |
| if section.get('is_chapter'): | |
| return { | |
| 'number': self._clean_section_number(section.get('section_number', '')), | |
| 'title': section.get('title', '') | |
| } | |
| # If not a chapter, we'd need to query parent hierarchy | |
| # For now, return empty to keep it simple | |
| return {'number': '', 'title': ''} | |
| def _clean_section_number(self, section_number: str) -> str: | |
| """Clean and standardize section numbers""" | |
| if not section_number: | |
| return '' | |
| # Remove extra whitespace and normalize | |
| cleaned = re.sub(r'\s+', ' ', str(section_number)).strip() | |
| return cleaned | |
| def format_content_for_kb(self, section: Dict) -> str: | |
| """Format section content for knowledge base ingestion""" | |
| content_parts = [] | |
| # Add title if available | |
| section_title = section.get('section_title') or section.get('chapter_title', '') | |
| if section_title: | |
| content_parts.append(f"# {section_title}") | |
| # Add section number context | |
| if section.get('section_number'): | |
| content_parts.append(f"Section: {section['section_number']}") | |
| elif section.get('chapter_number'): | |
| content_parts.append(f"Chapter: {section['chapter_number']}") | |
| # Add main content - use text_content field | |
| content = section.get('text_content', '').strip() if section.get('text_content') else '' | |
| if content: | |
| content_parts.append(content) | |
| else: | |
| content_parts.append("No content available for this section.") | |
| return '\n\n'.join(content_parts) | |
| def create_s3_key(self, metadata: BedrockMetadata, section: Dict) -> str: | |
| """Create S3 key following KB structure with proper hierarchy""" | |
| # Pattern: codebooks3/{PROVIDER}/{BOOK_CODE}/chapter_{slug}/section_{number}.txt | |
| provider = metadata.provider.upper() | |
| book_code = metadata.book_code.replace(' ', '_').replace('/', '_') | |
| # Create chapter slug | |
| chapter_slug = self._create_slug(metadata.chapter_title, metadata.chapter_number, "chapter") | |
| # Determine file type and identifier based on depth | |
| depth = section.get('depth', 0) | |
| section_id = str(section['id']) | |
| # Always create section files (no separate chapter files to match existing KB pattern) | |
| if depth <= 2: # Section level (depth 1-2) | |
| filename = f"section_{metadata.section_number}".replace(' ', '_').replace('.', '_') | |
| if not filename or filename == "section_": | |
| filename = f"section_{section_id}" | |
| else: # Subsection level (depth > 2) | |
| filename = f"subsection_{metadata.subsection_number}".replace(' ', '_').replace('.', '_') | |
| if not filename or filename == "subsection_": | |
| filename = f"subsection_{section_id}" | |
| return f"{self.s3_prefix}/{provider}/{book_code}/{chapter_slug}/{filename}.txt" | |
| def _create_slug(self, title: str, number: str, prefix: str) -> str: | |
| """Create URL-safe slug from title or number""" | |
| if title and title.strip(): | |
| # Clean title for slug | |
| slug = re.sub(r'[^\w\s-]', '', title).strip() | |
| slug = re.sub(r'[-\s]+', '_', slug).lower() | |
| return f"{prefix}_{slug}" | |
| elif number and number.strip(): | |
| # Use number if no title | |
| clean_number = number.replace(' ', '_').replace('.', '_') | |
| return f"{prefix}_{clean_number}" | |
| else: | |
| # Fallback | |
| return f"{prefix}_unknown" | |
| def upload_to_s3(self, key: str, content: str, metadata: BedrockMetadata) -> bool: | |
| """Upload content and metadata to S3""" | |
| try: | |
| # Upload content file | |
| self.s3_client.put_object( | |
| Bucket=self.s3_bucket, | |
| Key=key, | |
| Body=content.encode('utf-8'), | |
| ContentType='text/plain' | |
| ) | |
| # Upload metadata file | |
| metadata_key = f"{key}.metadata.json" | |
| self.s3_client.put_object( | |
| Bucket=self.s3_bucket, | |
| Key=metadata_key, | |
| Body=json.dumps(metadata.to_dict(), indent=2).encode('utf-8'), | |
| ContentType='application/json' | |
| ) | |
| self.logger.info(f"Successfully uploaded: s3://{self.s3_bucket}/{key}") | |
| return True | |
| except Exception as e: | |
| self.logger.error(f"Failed to upload {key}: {e}") | |
| return False | |
| def ingest_book(self, book_id: str, dry_run: bool = True) -> Dict: | |
| """Ingest a single book to S3 in KB format""" | |
| self.logger.info(f"Starting S3 ingestion for book {book_id} (dry_run={dry_run})") | |
| # Get book hierarchy with content | |
| sections = self.get_book_hierarchy_with_content(book_id) | |
| if not sections: | |
| return {"status": "no_content", "message": "No sections found"} | |
| # Filter sections that have content (exclude chapter-only records) | |
| content_sections = [s for s in sections if s.get('has_content')] | |
| if not content_sections: | |
| return {"status": "no_content", "message": "No sections with content found"} | |
| results = { | |
| "book_id": book_id, | |
| "book_code": content_sections[0].get('book_code', 'unknown'), | |
| "total_sections": len(content_sections), | |
| "uploaded": 0, | |
| "failed": 0, | |
| "skipped": 0, | |
| "errors": [], | |
| "s3_paths": [] | |
| } | |
| for section in content_sections: | |
| try: | |
| # Skip if no content (we already filtered, but double-check) | |
| if not section.get('has_content'): | |
| results["skipped"] += 1 | |
| continue | |
| # Create metadata | |
| metadata = self.create_metadata(section) | |
| # Format content | |
| content = self.format_content_for_kb(section) | |
| # Create S3 key | |
| s3_key = self.create_s3_key(metadata, section) | |
| if dry_run: | |
| self.logger.info(f"DRY RUN - Would upload: {s3_key}") | |
| results["uploaded"] += 1 | |
| results["s3_paths"].append(f"s3://{self.s3_bucket}/{s3_key}") | |
| else: | |
| # Upload to S3 | |
| if self.upload_to_s3(s3_key, content, metadata): | |
| results["uploaded"] += 1 | |
| results["s3_paths"].append(f"s3://{self.s3_bucket}/{s3_key}") | |
| else: | |
| results["failed"] += 1 | |
| except Exception as e: | |
| self.logger.error(f"Error processing section {section['id']}: {e}") | |
| results["failed"] += 1 | |
| results["errors"].append(str(e)) | |
| return results | |
| def sync_all_missing_books(self, dry_run: bool = True, providers: Optional[List[str]] = None) -> Dict: | |
| """Sync all books that are missing from the knowledge base""" | |
| missing_books = self.get_db_books_not_in_kb() | |
| if providers: | |
| missing_books = [book for book in missing_books if book['provider'] in providers] | |
| self.logger.info(f"Found {len(missing_books)} books to sync") | |
| overall_results = { | |
| "total_books": len(missing_books), | |
| "processed": 0, | |
| "successful": 0, | |
| "failed": 0, | |
| "book_results": [] | |
| } | |
| for book in missing_books: | |
| if book['content_count'] == 0: | |
| self.logger.warning(f"Skipping book {book['book_code']} - no content available") | |
| continue | |
| try: | |
| result = self.ingest_book(book['book_id'], dry_run=dry_run) | |
| result['book_info'] = book | |
| overall_results['book_results'].append(result) | |
| overall_results['processed'] += 1 | |
| if result.get('uploaded', 0) > 0: | |
| overall_results['successful'] += 1 | |
| else: | |
| overall_results['failed'] += 1 | |
| except Exception as e: | |
| self.logger.error(f"Failed to process book {book['book_code']}: {e}") | |
| overall_results['failed'] += 1 | |
| return overall_results | |
| def main(): | |
| """CLI entry point for the S3 ingestion agent""" | |
| import argparse | |
| parser = argparse.ArgumentParser(description='Building Code S3 Ingestion Agent for Bedrock KB') | |
| parser.add_argument('--book-id', help='Specific book ID to ingest') | |
| parser.add_argument('--provider', action='append', help='Provider filter (ICC, NFPA, TDLR)') | |
| parser.add_argument('--dry-run', action='store_true', default=True, help='Perform dry run without uploading (default: True)') | |
| parser.add_argument('--execute', action='store_true', help='Actually upload to S3 (overrides dry-run)') | |
| parser.add_argument('--s3-bucket', default='com.greenlite.engineering-machine-learning-dev', help='S3 bucket name') | |
| parser.add_argument('--db-url', help='Database URL (uses environment variable if not provided)') | |
| args = parser.parse_args() | |
| # Setup logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| # Determine if this is a dry run | |
| dry_run = args.dry_run and not args.execute | |
| agent = BedrockKBIngestionAgent(s3_bucket=args.s3_bucket, db_url=args.db_url) | |
| if args.book_id: | |
| result = agent.ingest_book(args.book_id, dry_run=dry_run) | |
| print(json.dumps(result, indent=2)) | |
| else: | |
| result = agent.sync_all_missing_books(dry_run=dry_run, providers=args.provider) | |
| print(json.dumps(result, indent=2)) | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment