Skip to content

Instantly share code, notes, and snippets.

@raress96
Created October 1, 2025 13:39
Show Gist options
  • Select an option

  • Save raress96/ce55e5f85e7cc0e114424d6e17c2d7d5 to your computer and use it in GitHub Desktop.

Select an option

Save raress96/ce55e5f85e7cc0e114424d6e17c2d7d5 to your computer and use it in GitHub Desktop.
Surflux SSE processing from backend
export const SURFLUX_API_URL = process.env.SURFLUX_API_URL || 'http://localhost:8000';
export const SURFLUX_API_KEY = process.env.SURFLUX_API_KEY || 'test_api_key';
import sqlite3 from 'sqlite3';
import { promisify } from 'util';
// Initialize SQLite database
const db = new sqlite3.Database('./api/db/demo.db');
// Promisify database methods
const dbRun = promisify(db.run.bind(db));
const dbGet = promisify(db.get.bind(db));
export async function initDatabase() {
try {
await dbRun(`
CREATE TABLE IF NOT EXISTS sse_state (
id INTEGER PRIMARY KEY AUTOINCREMENT,
last_message_id TEXT,
endpoint TEXT NOT NULL,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
`);
console.log('Database initialized');
} catch (error) {
console.error('Error initializing database:', error);
}
}
export async function getSSEState(endpoint) {
try {
const result = await dbGet(
'SELECT * FROM sse_state WHERE endpoint = ? LIMIT 1',
[endpoint]
);
if (!result) {
await dbRun(
'INSERT INTO sse_state (endpoint, last_message_id) VALUES (?, ?)',
[endpoint, null]
);
return {
endpoint,
last_message_id: null,
}
}
return result;
} catch (error) {
console.error('Error getting SSE state:', error);
throw error;
}
}
export async function updateLastMessageId(messageId, endpoint = 'main') {
try {
await dbRun(
'UPDATE sse_state SET last_message_id = ?, updated_at = CURRENT_TIMESTAMP WHERE endpoint = ?',
[messageId, endpoint]
);
console.log(`Updated last message ID: ${messageId}`);
} catch (error) {
console.error('Error updating last message ID:', error);
throw error;
}
}
export { db };
import express from 'express';
import cors from 'cors';
import path from 'path';
import { fileURLToPath } from 'url';
import { initDatabase } from './db/index.js';
import { handleApiStatus, handleSSE } from './sse.js';
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
const app = express();
const PORT = process.env.PORT || 8080;
// Middleware
app.use(cors());
app.use(express.json());
app.use(express.static(path.join(__dirname, '../html')));
app.get('/api/sse', handleSSE);
app.get('/api/status', handleApiStatus);
app.listen(PORT, async () => {
await initDatabase();
console.log(`🚀 Server running on http://localhost:${PORT}`);
console.log(`📁 Static files served from: ${path.join(__dirname, '..')}`);
console.log(`🔌 SSE proxy available at: http://localhost:${PORT}/api/sse`);
console.log(`📊 Status endpoint: http://localhost:${PORT}/api/status`);
});
// Ensure upstream connection exists
import EventSource from 'eventsource';
import { getSSEState, updateLastMessageId } from './db/index.js';
import { SURFLUX_API_KEY, SURFLUX_API_URL } from './constants.js';
export const EVENTS_KEY = 'events';
// SSE connection management
let upstreamConnection = null;
let clientConnections = new Set();
export async function handleSSE(req, res) {
console.log('New SSE client connected');
// Set SSE headers
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Cache-Control',
});
// Add client to connections
clientConnections.add(res);
// Ensure upstream connection
try {
await ensureUpstreamConnection();
} catch (error) {
console.error('Error establishing upstream connection:', error);
res.write(`data: {"type":"error","message":"Failed to connect to upstream"}\n\n`);
}
// Send initial connection message
res.write(`data: {"type":"connected","message":"SSE proxy connected"}\n\n`);
// Handle client disconnect
req.on('close', () => {
console.log('SSE client disconnected');
clientConnections.delete(res);
// If no more clients, close upstream connection after delay
if (clientConnections.size === 0) {
setTimeout(() => {
if (clientConnections.size === 0 && upstreamConnection) {
console.log('No clients connected, closing upstream connection');
upstreamConnection.close();
upstreamConnection = null;
}
}, 100);
}
});
req.on('error', () => {
clientConnections.delete(res);
// If no more clients, close upstream connection after delay
if (clientConnections.size === 0) {
setTimeout(() => {
if (clientConnections.size === 0 && upstreamConnection) {
console.log('No clients connected, closing upstream connection');
upstreamConnection.close();
upstreamConnection = null;
}
}, 1000);
}
});
}
export async function handleApiStatus(req, res) {
try {
const state = await getSSEState(EVENTS_KEY);
res.json({
status: 'ok',
upstreamConnection: upstreamConnection ? upstreamConnection.readyState : 'closed',
clientConnections: clientConnections.size,
lastMessageId: state.last_message_id,
lastUpdated: state.updated_at,
});
} catch (error) {
res.status(500).json({
status: 'error',
error: error.message,
});
}
}
async function ensureUpstreamConnection() {
if (upstreamConnection && upstreamConnection.readyState === EventSource.OPEN) {
return upstreamConnection;
}
if (upstreamConnection) {
upstreamConnection.close();
}
const state = await getSSEState(EVENTS_KEY);
let sseUrl = `${SURFLUX_API_URL}/events?api-key=${SURFLUX_API_KEY}`;
// Add last-id parameter if we have one
if (state.last_message_id) {
sseUrl += `&last-id=${encodeURIComponent(state.last_message_id)}`;
console.log(`Resuming SSE from message ID: ${state.last_message_id}`);
} else {
console.log('Starting fresh SSE connection', sseUrl);
}
upstreamConnection = new EventSource(sseUrl);
upstreamConnection.onopen = () => {
console.log('Upstream SSE connection opened');
};
upstreamConnection.onmessage = async (event) => {
console.log('Received SSE message:', {
data: event.data?.substring(0, 100) + '...', // Log first 100 chars
lastEventId: event.lastEventId,
});
// Broadcast to all connected clients
const message = `id: ${event.lastEventId || ''}\ndata: ${event.data}\n\n`;
clientConnections.forEach(client => {
try {
client.write(message);
} catch (error) {
console.error('Error writing to client:', error);
clientConnections.delete(client);
}
});
// Save the last message ID if available
if (event.lastEventId) {
const packageEventId = `${event.data?.tx_hash}${event.data?.data?.event_index}`;
console.log('Package Event Id', packageEventId);
await updateLastMessageId(event.lastEventId, EVENTS_KEY);
}
};
upstreamConnection.onerror = (error) => {
console.error('Upstream SSE connection error:', {
message: error.message,
status: error.status,
type: error.type,
url: sseUrl
});
// Notify clients of connection error
const errorMessage = `data: {"type":"error","message":"Connection lost, reconnecting..."}\n\n`;
clientConnections.forEach(client => {
try {
client.write(errorMessage);
} catch (error) {
clientConnections.delete(client);
}
});
// Reconnect after 5 seconds
setTimeout(() => {
if (upstreamConnection && clientConnections.size > 0) {
console.log('Attempting to reconnect upstream SSE...');
ensureUpstreamConnection();
}
}, 5000);
};
return upstreamConnection;
}
// Graceful shutdown
process.on('SIGTERM', () => {
console.log('Received SIGTERM, closing connections...');
if (upstreamConnection) {
upstreamConnection.close();
}
clientConnections.clear();
process.exit(0);
});
process.on('SIGINT', () => {
console.log('Received SIGINT, closing connections...');
if (upstreamConnection) {
upstreamConnection.close();
}
clientConnections.clear();
process.exit(0);
});
{
"name": "sui-checkpoint-uploader",
"version": "1.0.0",
"description": "Upload Sui mainnet checkpoint files to DigitalOcean Spaces",
"type": "module",
"main": "checkpoints.js",
"scripts": {
"upload": "node -r dotenv/config checkpoints.js"
},
"keywords": [
"sui",
"blockchain",
"checkpoints",
"digitalocean",
"spaces",
"s3"
],
"author": "",
"license": "MIT",
"dependencies": {
"@aws-sdk/client-s3": "^3.478.0",
"@aws-sdk/lib-storage": "^3.478.0"
},
"devDependencies": {
"dotenv": "^16.3.1"
},
"engines": {
"node": ">=18.0.0"
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment