Created
October 1, 2025 13:39
-
-
Save raress96/ce55e5f85e7cc0e114424d6e17c2d7d5 to your computer and use it in GitHub Desktop.
Surflux SSE processing from backend
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| export const SURFLUX_API_URL = process.env.SURFLUX_API_URL || 'http://localhost:8000'; | |
| export const SURFLUX_API_KEY = process.env.SURFLUX_API_KEY || 'test_api_key'; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import sqlite3 from 'sqlite3'; | |
| import { promisify } from 'util'; | |
| // Initialize SQLite database | |
| const db = new sqlite3.Database('./api/db/demo.db'); | |
| // Promisify database methods | |
| const dbRun = promisify(db.run.bind(db)); | |
| const dbGet = promisify(db.get.bind(db)); | |
| export async function initDatabase() { | |
| try { | |
| await dbRun(` | |
| CREATE TABLE IF NOT EXISTS sse_state ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| last_message_id TEXT, | |
| endpoint TEXT NOT NULL, | |
| updated_at DATETIME DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| `); | |
| console.log('Database initialized'); | |
| } catch (error) { | |
| console.error('Error initializing database:', error); | |
| } | |
| } | |
| export async function getSSEState(endpoint) { | |
| try { | |
| const result = await dbGet( | |
| 'SELECT * FROM sse_state WHERE endpoint = ? LIMIT 1', | |
| [endpoint] | |
| ); | |
| if (!result) { | |
| await dbRun( | |
| 'INSERT INTO sse_state (endpoint, last_message_id) VALUES (?, ?)', | |
| [endpoint, null] | |
| ); | |
| return { | |
| endpoint, | |
| last_message_id: null, | |
| } | |
| } | |
| return result; | |
| } catch (error) { | |
| console.error('Error getting SSE state:', error); | |
| throw error; | |
| } | |
| } | |
| export async function updateLastMessageId(messageId, endpoint = 'main') { | |
| try { | |
| await dbRun( | |
| 'UPDATE sse_state SET last_message_id = ?, updated_at = CURRENT_TIMESTAMP WHERE endpoint = ?', | |
| [messageId, endpoint] | |
| ); | |
| console.log(`Updated last message ID: ${messageId}`); | |
| } catch (error) { | |
| console.error('Error updating last message ID:', error); | |
| throw error; | |
| } | |
| } | |
| export { db }; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import express from 'express'; | |
| import cors from 'cors'; | |
| import path from 'path'; | |
| import { fileURLToPath } from 'url'; | |
| import { initDatabase } from './db/index.js'; | |
| import { handleApiStatus, handleSSE } from './sse.js'; | |
| const __filename = fileURLToPath(import.meta.url); | |
| const __dirname = path.dirname(__filename); | |
| const app = express(); | |
| const PORT = process.env.PORT || 8080; | |
| // Middleware | |
| app.use(cors()); | |
| app.use(express.json()); | |
| app.use(express.static(path.join(__dirname, '../html'))); | |
| app.get('/api/sse', handleSSE); | |
| app.get('/api/status', handleApiStatus); | |
| app.listen(PORT, async () => { | |
| await initDatabase(); | |
| console.log(`🚀 Server running on http://localhost:${PORT}`); | |
| console.log(`📁 Static files served from: ${path.join(__dirname, '..')}`); | |
| console.log(`🔌 SSE proxy available at: http://localhost:${PORT}/api/sse`); | |
| console.log(`📊 Status endpoint: http://localhost:${PORT}/api/status`); | |
| }); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // Ensure upstream connection exists | |
| import EventSource from 'eventsource'; | |
| import { getSSEState, updateLastMessageId } from './db/index.js'; | |
| import { SURFLUX_API_KEY, SURFLUX_API_URL } from './constants.js'; | |
| export const EVENTS_KEY = 'events'; | |
| // SSE connection management | |
| let upstreamConnection = null; | |
| let clientConnections = new Set(); | |
| export async function handleSSE(req, res) { | |
| console.log('New SSE client connected'); | |
| // Set SSE headers | |
| res.writeHead(200, { | |
| 'Content-Type': 'text/event-stream', | |
| 'Cache-Control': 'no-cache', | |
| 'Connection': 'keep-alive', | |
| 'Access-Control-Allow-Origin': '*', | |
| 'Access-Control-Allow-Headers': 'Cache-Control', | |
| }); | |
| // Add client to connections | |
| clientConnections.add(res); | |
| // Ensure upstream connection | |
| try { | |
| await ensureUpstreamConnection(); | |
| } catch (error) { | |
| console.error('Error establishing upstream connection:', error); | |
| res.write(`data: {"type":"error","message":"Failed to connect to upstream"}\n\n`); | |
| } | |
| // Send initial connection message | |
| res.write(`data: {"type":"connected","message":"SSE proxy connected"}\n\n`); | |
| // Handle client disconnect | |
| req.on('close', () => { | |
| console.log('SSE client disconnected'); | |
| clientConnections.delete(res); | |
| // If no more clients, close upstream connection after delay | |
| if (clientConnections.size === 0) { | |
| setTimeout(() => { | |
| if (clientConnections.size === 0 && upstreamConnection) { | |
| console.log('No clients connected, closing upstream connection'); | |
| upstreamConnection.close(); | |
| upstreamConnection = null; | |
| } | |
| }, 100); | |
| } | |
| }); | |
| req.on('error', () => { | |
| clientConnections.delete(res); | |
| // If no more clients, close upstream connection after delay | |
| if (clientConnections.size === 0) { | |
| setTimeout(() => { | |
| if (clientConnections.size === 0 && upstreamConnection) { | |
| console.log('No clients connected, closing upstream connection'); | |
| upstreamConnection.close(); | |
| upstreamConnection = null; | |
| } | |
| }, 1000); | |
| } | |
| }); | |
| } | |
| export async function handleApiStatus(req, res) { | |
| try { | |
| const state = await getSSEState(EVENTS_KEY); | |
| res.json({ | |
| status: 'ok', | |
| upstreamConnection: upstreamConnection ? upstreamConnection.readyState : 'closed', | |
| clientConnections: clientConnections.size, | |
| lastMessageId: state.last_message_id, | |
| lastUpdated: state.updated_at, | |
| }); | |
| } catch (error) { | |
| res.status(500).json({ | |
| status: 'error', | |
| error: error.message, | |
| }); | |
| } | |
| } | |
| async function ensureUpstreamConnection() { | |
| if (upstreamConnection && upstreamConnection.readyState === EventSource.OPEN) { | |
| return upstreamConnection; | |
| } | |
| if (upstreamConnection) { | |
| upstreamConnection.close(); | |
| } | |
| const state = await getSSEState(EVENTS_KEY); | |
| let sseUrl = `${SURFLUX_API_URL}/events?api-key=${SURFLUX_API_KEY}`; | |
| // Add last-id parameter if we have one | |
| if (state.last_message_id) { | |
| sseUrl += `&last-id=${encodeURIComponent(state.last_message_id)}`; | |
| console.log(`Resuming SSE from message ID: ${state.last_message_id}`); | |
| } else { | |
| console.log('Starting fresh SSE connection', sseUrl); | |
| } | |
| upstreamConnection = new EventSource(sseUrl); | |
| upstreamConnection.onopen = () => { | |
| console.log('Upstream SSE connection opened'); | |
| }; | |
| upstreamConnection.onmessage = async (event) => { | |
| console.log('Received SSE message:', { | |
| data: event.data?.substring(0, 100) + '...', // Log first 100 chars | |
| lastEventId: event.lastEventId, | |
| }); | |
| // Broadcast to all connected clients | |
| const message = `id: ${event.lastEventId || ''}\ndata: ${event.data}\n\n`; | |
| clientConnections.forEach(client => { | |
| try { | |
| client.write(message); | |
| } catch (error) { | |
| console.error('Error writing to client:', error); | |
| clientConnections.delete(client); | |
| } | |
| }); | |
| // Save the last message ID if available | |
| if (event.lastEventId) { | |
| const packageEventId = `${event.data?.tx_hash}${event.data?.data?.event_index}`; | |
| console.log('Package Event Id', packageEventId); | |
| await updateLastMessageId(event.lastEventId, EVENTS_KEY); | |
| } | |
| }; | |
| upstreamConnection.onerror = (error) => { | |
| console.error('Upstream SSE connection error:', { | |
| message: error.message, | |
| status: error.status, | |
| type: error.type, | |
| url: sseUrl | |
| }); | |
| // Notify clients of connection error | |
| const errorMessage = `data: {"type":"error","message":"Connection lost, reconnecting..."}\n\n`; | |
| clientConnections.forEach(client => { | |
| try { | |
| client.write(errorMessage); | |
| } catch (error) { | |
| clientConnections.delete(client); | |
| } | |
| }); | |
| // Reconnect after 5 seconds | |
| setTimeout(() => { | |
| if (upstreamConnection && clientConnections.size > 0) { | |
| console.log('Attempting to reconnect upstream SSE...'); | |
| ensureUpstreamConnection(); | |
| } | |
| }, 5000); | |
| }; | |
| return upstreamConnection; | |
| } | |
| // Graceful shutdown | |
| process.on('SIGTERM', () => { | |
| console.log('Received SIGTERM, closing connections...'); | |
| if (upstreamConnection) { | |
| upstreamConnection.close(); | |
| } | |
| clientConnections.clear(); | |
| process.exit(0); | |
| }); | |
| process.on('SIGINT', () => { | |
| console.log('Received SIGINT, closing connections...'); | |
| if (upstreamConnection) { | |
| upstreamConnection.close(); | |
| } | |
| clientConnections.clear(); | |
| process.exit(0); | |
| }); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| { | |
| "name": "sui-checkpoint-uploader", | |
| "version": "1.0.0", | |
| "description": "Upload Sui mainnet checkpoint files to DigitalOcean Spaces", | |
| "type": "module", | |
| "main": "checkpoints.js", | |
| "scripts": { | |
| "upload": "node -r dotenv/config checkpoints.js" | |
| }, | |
| "keywords": [ | |
| "sui", | |
| "blockchain", | |
| "checkpoints", | |
| "digitalocean", | |
| "spaces", | |
| "s3" | |
| ], | |
| "author": "", | |
| "license": "MIT", | |
| "dependencies": { | |
| "@aws-sdk/client-s3": "^3.478.0", | |
| "@aws-sdk/lib-storage": "^3.478.0" | |
| }, | |
| "devDependencies": { | |
| "dotenv": "^16.3.1" | |
| }, | |
| "engines": { | |
| "node": ">=18.0.0" | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment