This guide explains how to use the new WebSocket-based upload flow for Google Drive uploads in Verispotter, powered by Watermill with PGX for reliable job processing.
Google Drive uploads are now processed asynchronously using Watermill with PGX to avoid nginx timeouts and provide better reliability. When you submit a Google Drive URL, the server immediately returns a success response and queues the upload job in a PostgreSQL table using the existing PGX connection pool. Background workers process these jobs while sending real-time progress updates via WebSocket.
- Persistence: Upload jobs survive server restarts - stored in PostgreSQL
- PGX Integration: Uses your existing PGX connection pool for consistency
- Reliability: Built-in retry mechanisms and error handling
- Scalability: Multiple workers can process jobs concurrently
- Monitoring: Job status visible in database tables
- Real-time Updates: WebSocket progress notifications for each file
- Connection Efficiency: Shares the same database connections as your main application
graph LR
A[Client] -->|1. POST /upload| B[REST API]
B -->|2. Publish Job| C[Watermill PGX]
C -->|3. Store Job| D[(PostgreSQL Pool)]
D -->|4. Consume Job| E[Background Worker]
E -->|5. Process Upload| F[Google Drive + GCS]
E -->|6. Send Progress| G[WebSocket Hub]
G -->|7. Real-time Updates| A
The system creates these tables using your existing PGX connection pool:
-- Jobs table (auto-created)
CREATE TABLE watermill_upload_jobs (
"offset" SERIAL PRIMARY KEY,
"uuid" UUID NOT NULL,
"created_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
"payload" JSONB NOT NULL,
"metadata" JSONB DEFAULT NULL
);
-- Worker offsets table (auto-created)
CREATE TABLE watermill_offsets_upload_workers (
"consumer_group" VARCHAR(255) NOT NULL,
"offset_acked" BIGINT NOT NULL DEFAULT 0,
"offset_consumed" BIGINT NOT NULL DEFAULT 0,
"last_processed_transaction_id" BIGINT,
PRIMARY KEY ("consumer_group")
);POST /api/events/{event_id}/upload
curl -X POST \
"https://your-domain.com/api/events/EVENT_ID/upload" \
-H "Authorization: Bearer YOUR_JWT_TOKEN" \
-H "X-GID-Token: YOUR_GOOGLE_DRIVE_TOKEN" \
-F "gdrive_url=https://drive.google.com/drive/folders/FOLDER_ID" \
-F "force_upload=false"Response (HTTP 202):
{
"message": "Upload request received and will be processed in background. Connect to WebSocket for progress updates.",
"data": {
"websocket_url": "/ws/events/EVENT_ID/upload-progress",
"message": "Connect to the WebSocket endpoint to receive real-time upload progress updates"
}
}When you submit an upload request:
- Job Publication: The server publishes an upload job message to the
watermill_upload_jobstable using PGX - Job Storage: Watermill stores the job in PostgreSQL using your existing connection pool
- Worker Processing: Background workers (consumer group:
upload_workers) pick up jobs using PGX queries - Progress Updates: Workers send real-time progress via WebSocket as they process each file
- Completion: Job is marked as processed when upload completes or fails
WebSocket URL: wss://your-domain.com/ws/events/{event_id}/upload-progress
Option 1: Authorization Header (Recommended)
const ws = new WebSocket('wss://your-domain.com/ws/events/EVENT_ID/upload-progress', [], {
headers: {
'Authorization': 'Bearer YOUR_JWT_TOKEN'
}
});Option 2: Query Parameter
const ws = new WebSocket('wss://your-domain.com/ws/events/EVENT_ID/upload-progress?token=YOUR_JWT_TOKEN');ws.onmessage = function(event) {
const message = JSON.parse(event.data);
switch(message.type) {
case 'upload_started':
console.log('Upload started:', message.data.message);
console.log('Job ID:', message.data.job_id);
console.log('Total files:', message.data.total_files);
break;
case 'upload_progress':
const progress = message.data;
console.log(`Processing ${progress.current_file}/${progress.total_files}: ${progress.filename}`);
console.log(`Status: ${progress.status}`);
if (progress.status === 'failed') {
console.error(`Failed: ${progress.error_reason}`);
}
break;
case 'upload_completed':
console.log('Upload completed:', message.data.message);
console.log('Processed files:', message.data.processed_files);
console.log('Oversized files:', message.data.oversized_files);
ws.close();
break;
case 'upload_failed':
console.error('Upload failed:', message.data.message);
console.error('Error:', message.data.error);
console.error('Job ID:', message.data.job_id);
ws.close();
break;
}
};SELECT
uuid as job_id,
payload->>'event_id' as event_id,
payload->>'uploader_id' as uploader_id,
created_at
FROM watermill_upload_jobs
WHERE "offset" > (
SELECT COALESCE(offset_consumed, 0)
FROM watermill_offsets_upload_workers
WHERE consumer_group = 'upload_workers'
);SELECT
consumer_group,
offset_acked,
offset_consumed,
(SELECT COUNT(*) FROM watermill_upload_jobs) as total_jobs
FROM watermill_offsets_upload_workers;SELECT
"offset",
uuid,
payload->>'job_id' as job_id,
payload->>'event_id' as event_id,
payload->>'uploader_id' as uploader_id,
payload->>'gdrive_url' as gdrive_url,
payload->>'force_upload' as force_upload,
created_at
FROM watermill_upload_jobs
ORDER BY "offset" DESC
LIMIT 10;To retry failed jobs, reset the consumed offset:
UPDATE watermill_offsets_upload_workers
SET offset_consumed = offset_acked
WHERE consumer_group = 'upload_workers';- Connection Reuse: Uses your existing PGX pool instead of creating separate SQL connections
- Better Performance: PGX is more efficient than database/sql for PostgreSQL
- Consistent Configuration: Inherits your existing database settings (timeouts, pool size, SSL)
- Resource Efficiency: No additional connection overhead
- Transaction Support: Leverages PGX's advanced PostgreSQL features
- JWT Authentication: Required for both HTTP upload requests and WebSocket connections
- Database Security: Uses the same PGX pool and security as your main application
- Job Isolation: Each job runs in its own context with proper error boundaries
- Access Control: Users can only view progress for events they have access to
- Connection Security: Inherits SSL and authentication from your existing PGX configuration
- Server Restart: Jobs persist in PostgreSQL and resume processing when workers restart
- Network Issues: PGX handles connection failures and retries automatically
- Worker Failures: Jobs are automatically retried if workers crash during processing
- Database Failures: Standard PostgreSQL backup/recovery procedures apply
- File Failures: Individual file failures are reported but don't stop the entire job
- Connection Pool Health: Benefits from PGX's connection health checking
class PGXWatermillUploadProgressHandler {
constructor(eventId, jwtToken) {
this.eventId = eventId;
this.jwtToken = jwtToken;
this.ws = null;
this.jobId = null;
}
async startUpload(gdriveUrl, forceUpload = false) {
// 1. Submit upload request to queue job via PGX Watermill
const response = await fetch(`/api/events/${this.eventId}/upload`, {
method: 'POST',
headers: {
'Authorization': `Bearer ${this.jwtToken}`
},
body: this.createFormData(gdriveUrl, forceUpload)
});
if (response.status === 202) {
// 2. Connect to WebSocket for progress updates
this.connectWebSocket();
} else {
throw new Error('Failed to queue upload job via PGX');
}
}
createFormData(gdriveUrl, forceUpload) {
const formData = new FormData();
formData.append('gdrive_url', gdriveUrl);
formData.append('force_upload', forceUpload);
return formData;
}
connectWebSocket() {
const wsUrl = `wss://your-domain.com/ws/events/${this.eventId}/upload-progress`;
this.ws = new WebSocket(wsUrl, [], {
headers: { 'Authorization': `Bearer ${this.jwtToken}` }
});
this.ws.onmessage = (event) => this.handleMessage(JSON.parse(event.data));
this.ws.onerror = (error) => this.handleError(error);
this.ws.onclose = () => this.handleClose();
}
handleMessage(message) {
switch(message.type) {
case 'upload_started':
this.jobId = message.data.job_id;
this.onUploadStarted(message.data);
break;
case 'upload_progress':
this.onUploadProgress(message.data);
break;
case 'upload_completed':
this.onUploadCompleted(message.data);
break;
case 'upload_failed':
this.onUploadFailed(message.data);
break;
}
}
onUploadStarted(data) {
console.log(`Upload job ${data.job_id} started: ${data.total_files} files to process`);
// Update UI: show progress bar, display job ID
}
onUploadProgress(data) {
const percentage = (data.current_file / data.total_files) * 100;
console.log(`Progress: ${percentage.toFixed(1)}% - ${data.message}`);
// Update UI: update progress bar, show current file
if (data.status === 'failed') {
console.warn(`File failed: ${data.filename} - ${data.error_reason}`);
// Update UI: show file error in list
}
}
onUploadCompleted(data) {
console.log(`Upload job ${this.jobId} completed successfully`);
// Update UI: show success message, hide progress bar
this.disconnect();
}
onUploadFailed(data) {
console.error(`Upload job ${data.job_id} failed: ${data.error}`);
// Update UI: show error message, allow retry
this.disconnect();
}
// Monitor job in database (optional)
async getJobStatus() {
if (!this.jobId) return null;
const response = await fetch(`/api/admin/jobs/${this.jobId}`, {
headers: { 'Authorization': `Bearer ${this.jwtToken}` }
});
return response.json();
}
disconnect() {
if (this.ws) {
this.ws.close();
this.ws = null;
}
}
}
// Usage
const uploader = new PGXWatermillUploadProgressHandler('EVENT_ID', 'JWT_TOKEN');
uploader.startUpload('https://drive.google.com/drive/folders/FOLDER_ID', false);This PGX-based implementation provides enterprise-grade reliability for handling large Google Drive uploads while maintaining perfect integration with your existing PostgreSQL infrastructure and real-time progress feedback to users.