Skip to content

Instantly share code, notes, and snippets.

@R4wm
Last active August 19, 2025 22:24
Show Gist options
  • Select an option

  • Save R4wm/66416abac5dfd2e27ebcf387263cde58 to your computer and use it in GitHub Desktop.

Select an option

Save R4wm/66416abac5dfd2e27ebcf387263cde58 to your computer and use it in GitHub Desktop.
media-processor process flow

  Moderation Process Flow

  ┌─────────────┐     ┌──────────────────┐     ┌─────────────────┐
  │   User      │────▶│    API Server    │────▶│   PostgreSQL    │
  │   Upload    │ ①   │  (FastAPI)       │ ②   │   Database      │
  └─────────────┘     └──────────────────┘     └─────────────────┘
                               │ ③
                               ▼
                      ┌──────────────────┐
                      │   RabbitMQ       │
                      │   Exchange       │
                      └──────────────────┘
                               │ ④
                               ▼
  ┌─────────────────┐  ┌──────────────────┐  ┌─────────────────┐
  │ Media Processor │◀─│   Message Queue  │─▶│ Moderation      │
  │   Service       │  │   (RabbitMQ)     │  │ Service         │
  └─────────────────┘  └──────────────────┘  └─────────────────┘
           │ ⑤                                          │ ⑥
           ▼                                            ▼
  ┌─────────────────┐                        ┌─────────────────┐
  │   S3 Storage    │                        │   Hive AI API   │
  │  (Variants)     │                        │  (Moderation)   │
  └─────────────────┘                        └─────────────────┘
                                                       │ ⑦
                                                       ▼
                                            ┌─────────────────┐
                                            │   Database      │
                                            │   Update        │
                                            └─────────────────┘

  Brief Summary of the Moderation Flow

  Asynchronous Microservice Architecture with Queue-Based Processing:

  1. Upload & Queue: User uploads media → API stores metadata → RabbitMQ message published
  2. Processing Pipeline: Media Processor consumes queue → creates variants → publishes completion event
  3. Moderation Service: Consumes completion event → calls Hive AI → determines status (approved/rejected/flagged)
  4. Status Update: Results stored in database → user notified via WebSocket

  Key Components:

  - RabbitMQ serves as the message broker enabling:
    - Asynchronous processing (non-blocking uploads)
    - Service decoupling (API, Media Processor, Moderation Service)
    - Retry logic and dead letter queues for error handling
    - Scalable worker pools for concurrent processing
  - Hive AI Integration (services/hive_moderation_client.py):
    - Multi-model content analysis (visual, text, audio)
    - Configurable thresholds: Safe (<0.3), Unsafe (>0.7), Review needed (0.3-0.7)
    - Mock service support for development/testing
  - Queue Structure:
    - media.uploaded → Media processing
    - media.process.complete → Moderation trigger
    - media.moderation.complete → Final status update
    - Dead letter queues for failed jobs

  Status Flow: pending → processing → approved/rejected/flagged

  This architecture allows the system to handle media uploads efficiently without blocking the user interface while ensuring all content is properly moderated before becoming publicly
  visible.

> API stores metadata to database ?

● Let me check how the API handles metadata storage during the upload process.

● Search(pattern: "upload_media", glob: "services/*.py", output_mode: "content")
  ⎿  Found 2 lines (ctrl+r to expand)

● Read(models/media.py)
  ⎿  Read 202 lines (ctrl+r to expand)

● Search(pattern: "upload_media", path: "services", output_mode: "content")
  ⎿  Found 23 lines (ctrl+r to expand)

● Read(services/media_service.py)
  ⎿  Read 50 lines (ctrl+r to expand)

● Read(services/media_service.py)
  ⎿  Read 80 lines (ctrl+r to expand)

● Read(services/media_service.py)
  ⎿  Read 50 lines (ctrl+r to expand)

● Yes, you're correct! Let me clarify the exact flow:

  The API does store metadata to the database immediately upon upload:

  From /home/rmintz/github/lucid/lucid-api/services/media_service.py:209:

  # Save metadata to database
  db_media = self.repository.create(media_data)

  The metadata stored includes:
  - user_id, filename, original_filename
  - content_type, file_size, s3_key
  - media_type, visibility, description
  - status: PENDING (initial status)
  - created_at timestamp

  Complete Flow Correction:

  1. Upload: User uploads → File goes to S3 → Metadata immediately saved to database with status: PENDING
  2. Processing Trigger: API calls Media Processor service (HTTP client, not RabbitMQ currently)
  3. Processing: Media Processor creates variants → Updates database
  4. Moderation: Hive AI called → Database updated with moderation results

  RabbitMQ's Current Role:
  Based on the code, RabbitMQ infrastructure exists but the current implementation uses HTTP service calls rather than queue-based messaging. The queue service is ready for future
  microservice decomposition when they move from the current "modular monolith" to separate services.

  So yes - metadata goes to database immediately on upload, not after processing.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment