Skip to content

Instantly share code, notes, and snippets.

@zuramai
Created July 30, 2025 07:45
Show Gist options
  • Select an option

  • Save zuramai/1b9e21074b3b78f547f041ff54b4d92c to your computer and use it in GitHub Desktop.

Select an option

Save zuramai/1b9e21074b3b78f547f041ff54b4d92c to your computer and use it in GitHub Desktop.
ws

WebSocket Upload Progress Guide

This guide explains how to use the new WebSocket-based upload flow for Google Drive uploads in Verispotter, powered by Watermill with PGX for reliable job processing.

Overview

Google Drive uploads are now processed asynchronously using Watermill with PGX to avoid nginx timeouts and provide better reliability. When you submit a Google Drive URL, the server immediately returns a success response and queues the upload job in a PostgreSQL table using the existing PGX connection pool. Background workers process these jobs while sending real-time progress updates via WebSocket.

Key Benefits

  • Persistence: Upload jobs survive server restarts - stored in PostgreSQL
  • PGX Integration: Uses your existing PGX connection pool for consistency
  • Reliability: Built-in retry mechanisms and error handling
  • Scalability: Multiple workers can process jobs concurrently
  • Monitoring: Job status visible in database tables
  • Real-time Updates: WebSocket progress notifications for each file
  • Connection Efficiency: Shares the same database connections as your main application

Architecture

graph LR
    A[Client] -->|1. POST /upload| B[REST API]
    B -->|2. Publish Job| C[Watermill PGX]
    C -->|3. Store Job| D[(PostgreSQL Pool)]
    D -->|4. Consume Job| E[Background Worker]
    E -->|5. Process Upload| F[Google Drive + GCS]
    E -->|6. Send Progress| G[WebSocket Hub]
    G -->|7. Real-time Updates| A
Loading

Database Schema (PGX)

The system creates these tables using your existing PGX connection pool:

-- Jobs table (auto-created)
CREATE TABLE watermill_upload_jobs (
  "offset" SERIAL PRIMARY KEY,
  "uuid" UUID NOT NULL,
  "created_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
  "payload" JSONB NOT NULL,
  "metadata" JSONB DEFAULT NULL
);

-- Worker offsets table (auto-created)
CREATE TABLE watermill_offsets_upload_workers (
  "consumer_group" VARCHAR(255) NOT NULL,
  "offset_acked" BIGINT NOT NULL DEFAULT 0,
  "offset_consumed" BIGINT NOT NULL DEFAULT 0,
  "last_processed_transaction_id" BIGINT,
  PRIMARY KEY ("consumer_group")
);

Upload Flow

1. Submit Upload Request

POST /api/events/{event_id}/upload

curl -X POST \
  "https://your-domain.com/api/events/EVENT_ID/upload" \
  -H "Authorization: Bearer YOUR_JWT_TOKEN" \
  -H "X-GID-Token: YOUR_GOOGLE_DRIVE_TOKEN" \
  -F "gdrive_url=https://drive.google.com/drive/folders/FOLDER_ID" \
  -F "force_upload=false"

Response (HTTP 202):

{
  "message": "Upload request received and will be processed in background. Connect to WebSocket for progress updates.",
  "data": {
    "websocket_url": "/ws/events/EVENT_ID/upload-progress",
    "message": "Connect to the WebSocket endpoint to receive real-time upload progress updates"
  }
}

2. Job Processing with Watermill PGX

When you submit an upload request:

  1. Job Publication: The server publishes an upload job message to the watermill_upload_jobs table using PGX
  2. Job Storage: Watermill stores the job in PostgreSQL using your existing connection pool
  3. Worker Processing: Background workers (consumer group: upload_workers) pick up jobs using PGX queries
  4. Progress Updates: Workers send real-time progress via WebSocket as they process each file
  5. Completion: Job is marked as processed when upload completes or fails

3. Connect to WebSocket for Progress Updates

WebSocket URL: wss://your-domain.com/ws/events/{event_id}/upload-progress

Authentication Options:

Option 1: Authorization Header (Recommended)

const ws = new WebSocket('wss://your-domain.com/ws/events/EVENT_ID/upload-progress', [], {
  headers: {
    'Authorization': 'Bearer YOUR_JWT_TOKEN'
  }
});

Option 2: Query Parameter

const ws = new WebSocket('wss://your-domain.com/ws/events/EVENT_ID/upload-progress?token=YOUR_JWT_TOKEN');

4. Handle Progress Messages

ws.onmessage = function(event) {
  const message = JSON.parse(event.data);
  
  switch(message.type) {
    case 'upload_started':
      console.log('Upload started:', message.data.message);
      console.log('Job ID:', message.data.job_id);
      console.log('Total files:', message.data.total_files);
      break;
      
    case 'upload_progress':
      const progress = message.data;
      console.log(`Processing ${progress.current_file}/${progress.total_files}: ${progress.filename}`);
      console.log(`Status: ${progress.status}`);
      if (progress.status === 'failed') {
        console.error(`Failed: ${progress.error_reason}`);
      }
      break;
      
    case 'upload_completed':
      console.log('Upload completed:', message.data.message);
      console.log('Processed files:', message.data.processed_files);
      console.log('Oversized files:', message.data.oversized_files);
      ws.close();
      break;
      
    case 'upload_failed':
      console.error('Upload failed:', message.data.message);
      console.error('Error:', message.data.error);
      console.error('Job ID:', message.data.job_id);
      ws.close();
      break;
  }
};

Monitoring and Operations (PGX)

View Pending Jobs

SELECT 
  uuid as job_id,
  payload->>'event_id' as event_id,
  payload->>'uploader_id' as uploader_id,
  created_at
FROM watermill_upload_jobs 
WHERE "offset" > (
  SELECT COALESCE(offset_consumed, 0) 
  FROM watermill_offsets_upload_workers 
  WHERE consumer_group = 'upload_workers'
);

View Processing Progress

SELECT 
  consumer_group,
  offset_acked,
  offset_consumed,
  (SELECT COUNT(*) FROM watermill_upload_jobs) as total_jobs
FROM watermill_offsets_upload_workers;

View Job Payload Details

SELECT 
  "offset",
  uuid,
  payload->>'job_id' as job_id,
  payload->>'event_id' as event_id,
  payload->>'uploader_id' as uploader_id,
  payload->>'gdrive_url' as gdrive_url,
  payload->>'force_upload' as force_upload,
  created_at
FROM watermill_upload_jobs
ORDER BY "offset" DESC
LIMIT 10;

Retry Failed Jobs

To retry failed jobs, reset the consumed offset:

UPDATE watermill_offsets_upload_workers 
SET offset_consumed = offset_acked 
WHERE consumer_group = 'upload_workers';

Performance Benefits of PGX Integration

  • Connection Reuse: Uses your existing PGX pool instead of creating separate SQL connections
  • Better Performance: PGX is more efficient than database/sql for PostgreSQL
  • Consistent Configuration: Inherits your existing database settings (timeouts, pool size, SSL)
  • Resource Efficiency: No additional connection overhead
  • Transaction Support: Leverages PGX's advanced PostgreSQL features

Security Considerations

  • JWT Authentication: Required for both HTTP upload requests and WebSocket connections
  • Database Security: Uses the same PGX pool and security as your main application
  • Job Isolation: Each job runs in its own context with proper error boundaries
  • Access Control: Users can only view progress for events they have access to
  • Connection Security: Inherits SSL and authentication from your existing PGX configuration

Error Handling & Recovery

  • Server Restart: Jobs persist in PostgreSQL and resume processing when workers restart
  • Network Issues: PGX handles connection failures and retries automatically
  • Worker Failures: Jobs are automatically retried if workers crash during processing
  • Database Failures: Standard PostgreSQL backup/recovery procedures apply
  • File Failures: Individual file failures are reported but don't stop the entire job
  • Connection Pool Health: Benefits from PGX's connection health checking

Frontend Implementation Example

class PGXWatermillUploadProgressHandler {
  constructor(eventId, jwtToken) {
    this.eventId = eventId;
    this.jwtToken = jwtToken;
    this.ws = null;
    this.jobId = null;
  }

  async startUpload(gdriveUrl, forceUpload = false) {
    // 1. Submit upload request to queue job via PGX Watermill
    const response = await fetch(`/api/events/${this.eventId}/upload`, {
      method: 'POST',
      headers: {
        'Authorization': `Bearer ${this.jwtToken}`
      },
      body: this.createFormData(gdriveUrl, forceUpload)
    });

    if (response.status === 202) {
      // 2. Connect to WebSocket for progress updates
      this.connectWebSocket();
    } else {
      throw new Error('Failed to queue upload job via PGX');
    }
  }

  createFormData(gdriveUrl, forceUpload) {
    const formData = new FormData();
    formData.append('gdrive_url', gdriveUrl);
    formData.append('force_upload', forceUpload);
    return formData;
  }

  connectWebSocket() {
    const wsUrl = `wss://your-domain.com/ws/events/${this.eventId}/upload-progress`;
    this.ws = new WebSocket(wsUrl, [], {
      headers: { 'Authorization': `Bearer ${this.jwtToken}` }
    });

    this.ws.onmessage = (event) => this.handleMessage(JSON.parse(event.data));
    this.ws.onerror = (error) => this.handleError(error);
    this.ws.onclose = () => this.handleClose();
  }

  handleMessage(message) {
    switch(message.type) {
      case 'upload_started':
        this.jobId = message.data.job_id;
        this.onUploadStarted(message.data);
        break;
      case 'upload_progress':
        this.onUploadProgress(message.data);
        break;
      case 'upload_completed':
        this.onUploadCompleted(message.data);
        break;
      case 'upload_failed':
        this.onUploadFailed(message.data);
        break;
    }
  }

  onUploadStarted(data) {
    console.log(`Upload job ${data.job_id} started: ${data.total_files} files to process`);
    // Update UI: show progress bar, display job ID
  }

  onUploadProgress(data) {
    const percentage = (data.current_file / data.total_files) * 100;
    console.log(`Progress: ${percentage.toFixed(1)}% - ${data.message}`);
    // Update UI: update progress bar, show current file
    
    if (data.status === 'failed') {
      console.warn(`File failed: ${data.filename} - ${data.error_reason}`);
      // Update UI: show file error in list
    }
  }

  onUploadCompleted(data) {
    console.log(`Upload job ${this.jobId} completed successfully`);
    // Update UI: show success message, hide progress bar
    this.disconnect();
  }

  onUploadFailed(data) {
    console.error(`Upload job ${data.job_id} failed: ${data.error}`);
    // Update UI: show error message, allow retry
    this.disconnect();
  }

  // Monitor job in database (optional)
  async getJobStatus() {
    if (!this.jobId) return null;
    
    const response = await fetch(`/api/admin/jobs/${this.jobId}`, {
      headers: { 'Authorization': `Bearer ${this.jwtToken}` }
    });
    return response.json();
  }

  disconnect() {
    if (this.ws) {
      this.ws.close();
      this.ws = null;
    }
  }
}

// Usage
const uploader = new PGXWatermillUploadProgressHandler('EVENT_ID', 'JWT_TOKEN');
uploader.startUpload('https://drive.google.com/drive/folders/FOLDER_ID', false);

This PGX-based implementation provides enterprise-grade reliability for handling large Google Drive uploads while maintaining perfect integration with your existing PostgreSQL infrastructure and real-time progress feedback to users.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment