Skip to content

Instantly share code, notes, and snippets.

@dened
Created January 28, 2026 09:55
Show Gist options
  • Select an option

  • Save dened/06abb918cef4a8a8101bc699576e34fe to your computer and use it in GitHub Desktop.

Select an option

Save dened/06abb918cef4a8a8101bc699576e34fe to your computer and use it in GitHub Desktop.
Dart Server-sent events
import 'dart:async';
import 'dart:convert' show Utf8Decoder, JsonDecoder;
import 'dart:math' as math;
import 'package:flutter/foundation.dart' show ValueListenable, ValueNotifier;
import 'package:meta/meta.dart';
import 'api_client.dart';
/// A typedef representing a Server-Sent Event (SSE) event.
/// [name] is the name of the event.
/// [payload] is the data associated with the event.
typedef SSEEvent = ({String name, Map<String, Object?> payload});
/// An enumeration representing the state of the SSE client.
enum SSEClientState {
disconnected('Disconnected'),
connecting('Connecting'),
connected('Connected'),
closed('Closed');
const SSEClientState(this.representation);
/// A string representation of the SSE client state.
final String representation;
/// Whether the client is disconnected.
bool get isDisconnected => this == .disconnected;
/// Whether the client is connecting.
bool get isConnecting => this == .connecting;
/// Whether the client is connected.
bool get isConnected => this == .connected;
/// Whether the client is closed.
bool get isClosed => this == .closed;
}
/// {@template sse_client}
/// A client for handling Server-Sent Events (SSE) connections.
/// This client manages the connection lifecycle, including connecting,
/// disconnecting, and reconnecting to the SSE endpoint.
/// {@endtemplate}
class SSEClient {
/// Creates an instance of [SSEClient].
///
/// {@macro sse_client}
SSEClient({
required ApiClient api,
Map<String, String>? headers,
Map<String, Object?>? context,
Duration heartbeat = const .new(seconds: 60),
({Duration min, Duration max}) reconnectInterval = (
min: const .new(milliseconds: 500),
max: const .new(seconds: 15),
),
void Function(String name, Map<String, Object?> payload)? onEvent,
void Function(SSEClientState state)? onStateChange,
void Function(Object error, StackTrace stackTrace)? onError,
}) : _api = api.clone(),
_headers = <String, String>{
...?headers,
'Accept': 'text/event-stream',
'Cache-Control': 'no-cache',
'Accept-Charset': 'utf-8',
'Connection': 'keep-alive',
},
_context = <String, Object?>{
...?context,
'timeout': Duration.zero, // Do not timeout the SSE connection
'sse': true, // Indicate that this is an SSE request
},
_heartbeat = heartbeat,
_reconnectInterval = reconnectInterval,
_eventController = .broadcast(),
_state = .new(.disconnected) {
// --- Set up callbacks --- //
// Handle incoming events
_onEvent = (String name, Map<String, Object?> payload) {
_eventCounter += .one;
_lastEventAt = .now(); // Update last heartbeat time on any event
onEvent?.call(name, payload);
_eventController.add((name: name, payload: payload));
};
// Handle connection state changes
_onStateChange = (SSEClientState state) {
if (state == _state.value) return; // No state change
_state.value = state;
onStateChange?.call(state);
};
// Handle errors within the SSE client
_onError = (Object error, StackTrace stackTrace) {
onError?.call(error, stackTrace);
};
}
/// Creates an instance of [SSEClient] and immediately connects to the SSE endpoint.
/// [path] is the SSE endpoint path to connect to.
///
/// Use it like this:
/// ```dart
/// SSEClient.connect(api: apiClient, path: '/v1/chats/$chatId/updates')
/// ..events.listen((event) {
/// print('${event.name}: ${event.payload}');
/// });
/// ```
///
/// {@macro sse_client}
factory SSEClient.connect({
required ApiClient api,
required String path,
Map<String, String>? headers,
Map<String, Object?>? context,
Duration heartbeat = const .new(seconds: 60),
({Duration min, Duration max}) reconnectInterval = (
min: const .new(milliseconds: 500),
max: const .new(seconds: 15),
),
void Function(String name, Map<String, Object?> payload)? onEvent,
void Function(SSEClientState state)? onStateChange,
void Function(Object error, StackTrace stackTrace)? onError,
}) => .new(
api: api,
headers: headers,
context: context,
heartbeat: heartbeat,
reconnectInterval: reconnectInterval,
onEvent: onEvent,
onStateChange: onStateChange,
onError: onError,
)..reconnect(path);
/// An API client used to make requests.
final ApiClient _api;
/// Headers to include in the SSE request.
final Map<String, String> _headers;
/// Context to include in the SSE request.
final Map<String, Object?> _context;
/// Heartbeat duration for keeping the connection alive.
final Duration _heartbeat;
/// Reconnection interval configuration.
final ({Duration min, Duration max}) _reconnectInterval;
/// Stream controller for broadcasting SSE events.
final StreamController<SSEEvent> _eventController;
/// A counter for number of events received.
BigInt _eventCounter = .zero;
/// The total number of SSE events received.
BigInt get receivedEvents => _eventCounter;
/// The current SSE endpoint path.
String? _currentPath;
/// The current SSE endpoint path.
String? get currentPath => _currentPath;
/// The last event ID received from the server.
/// Used for reconnection to resume from the last received event.
String? _lastEventId;
/// The last event ID received from the server.
/// This can be used to resume the event stream after reconnection.
String? get lastEventId => _lastEventId;
/// The timestamp of the last event received.
DateTime? _lastEventAt;
/// The timestamp of the last event received.
DateTime? get lastEventAt => _lastEventAt;
/// A counter for number of bytes received.
BigInt _bytesReceived = .zero;
/// The total number of bytes received from the SSE connection.
/// This includes all data received over the connection.
BigInt get bytesReceived => _bytesReceived;
/// Stream of SSE events.
/// Clients can listen to this stream to receive events.
/// Each event is represented as a tuple containing the event name and payload.
/// Stream events are broadcasted to all listeners and can be listened to multiple times.
/// Stream provides only data, without errors, and emits done only when the SSE client is closed.
///
/// Use it like this:
/// ```dart
/// final sub = sseClient.events.listen((event) {
/// print('${event.event}: ${event.payload}');
/// });
/// ```
Stream<SSEEvent> get events => _eventController.stream;
/// Indicates whether the SSE client is closed.
bool get isClosed => _state.value == .closed;
/// Indicates whether the SSE client is connected.
bool get isConnected => _state.value == .connected;
/// Indicates whether the SSE client is connecting.
bool get isConnecting => _state.value == .connecting;
/// Indicates whether the SSE client is disconnected.
bool get isDisconnected => _state.value == .disconnected;
/// Notifier for the current state of the SSE client.
final ValueNotifier<SSEClientState> _state;
/// The current state of the SSE client.
ValueListenable<SSEClientState> get state => _state;
/// Callback invoked when an SSE event is received.
late void Function(String name, Map<String, Object?> payload) _onEvent;
/// Callback invoked when the state of the SSE client changes.
late void Function(SSEClientState state) _onStateChange;
/// Callback invoked when an error occurs in the SSE client.
/// E.g. network errors, socket errors, parsing errors, etc.
late void Function(Object error, StackTrace stackTrace) _onError;
/// A function to tear down the SSE connection.
void Function()? _tearDown;
/// A timer used for reconnection attempts.
Timer? _reconnectTimer;
/// The current number of reconnection attempts.
int _reconnectAttempt = 0;
/// The current number of reconnection attempts.
int get reconnectAttempt => _reconnectAttempt;
/// Disconnects from the SSE endpoint.
/// If the client is already disconnected, this method does nothing.
void disconnect() {
_reconnectTimer?.cancel(); // Cancel any existing reconnection timer
_reconnectTimer = null;
if (isClosed || isDisconnected) return; // Already disconnected or closed
_onStateChange(.disconnected);
_tearDown?.call(); // Call the tear down function to close the connection
_tearDown = null;
}
/// Reconnects to the SSE endpoint.
/// If the client is already connected, it will first disconnect and then reconnect.
/// [path] is the SSE endpoint path to connect to.
///
/// Use it like this:
/// ```dart
/// sseClient
/// ..reconnect('/v1/chats/$chatId/updates')
/// ..events.listen((event) {
/// print('${event.event}: ${event.payload}');
/// });
/// ```
void reconnect(String path) => runZonedGuarded<void>(
() async {
disconnect(); // If already connected, disconnect first
// Push to stack of tear down functions
// All functions must be synchronous to avoid race conditions
void onClose(void Function() tearDown) {
final fn = _tearDown ?? () {};
_tearDown = () {
try {
tearDown();
} on Object catch (e, s) {
_onError.call(e, s);
}
fn();
};
}
_onStateChange(.connecting); // Update state to connecting
var disconnected = false; // Flag to indicate if the CURRENT connection is disconnected
var heartbeat = DateTime.now(); // Last heartbeat time
onClose(() {
disconnected = true; // Set disconnected flag on tear down
_onStateChange(.disconnected);
});
// Exponential backoff reconnection logic
void setUpReconnectTimer() {
disconnect(); // Disconnect existing connection
_reconnectTimer = Timer(
// Calculate next delay using exponential backoff strategy
Backoff.instance.nextDelay(
_reconnectAttempt++,
_reconnectInterval.min.inMilliseconds,
_reconnectInterval.max.inMilliseconds,
),
() {
if (isConnected || isConnecting) return; // If already connected or connecting, do nothing
reconnect(path); // Attempt to reconnect after the duration
},
);
}
// Set up heartbeat timer if configured
if (_heartbeat case final duration when duration >= const .new(seconds: 1)) {
// Set up heartbeat to keep the connection alive, if configured
final timer = Timer.periodic(duration, (timer) {
if (disconnected) return timer.cancel(); // If disconnected, stop the timer
final now = DateTime.now();
// Check if the last heartbeat was received within the interval
// Add a 50% grace period to account for network delays
if (now.difference(heartbeat) < duration * 1.5) return;
// No heartbeat received within the interval, close the connection
_onError.call(Exception('SSE heartbeat timeout'), .current);
setUpReconnectTimer(); // Set up reconnection timer
});
onClose(timer.cancel); // Ensure timer is cancelled on close
}
try {
// Make the SSE request with Last-Event-ID header if available
final requestHeaders = <String, String>{..._headers, 'Last-Event-ID': ?_lastEventId};
final response = await _api
.get(path, headers: requestHeaders, context: _context)
.timeout(const .new(seconds: 30));
if (disconnected) return; // If closed during the request, stop processing
// Handle response status codes
switch (response.statusCode) {
case > 299:
case < 200:
_onError.call(Exception('SSE connection error: ${response.statusCode}'), .current);
setUpReconnectTimer(); // Set up reconnection timer
return;
}
// Log the successful connection
_onStateChange(.connected);
_reconnectAttempt = 0; // Reset reconnection attempts on successful connection
_lastEventAt = .now(); // Update last event timestamp
// Start listening to the SSE stream and iterate over the stream events
final byteStream = response.stream;
final iterator = StreamIterator<List<int>>(byteStream);
onClose(() => iterator.cancel().ignore()); // Ensure iterator is cancelled on close
String? currentEvent; // Last event type, used to determine the type of data
Object? currentData; // Accumulated data: String for single-line, StringBuffer for multi-line
const utfDecoder = Utf8Decoder(allowMalformed: false); // Decoder for UTF-8 bytes, do not allow malformed bytes
final jsonDecoder = const JsonDecoder().cast<String, Map<String, Object?>>(); // Event data JSON decoder
final unixTimeRegex = RegExp(r'^\d{10,13}$'); // Regex to match unix timestamps (10..13 digits)
final lineBuffer = StringBuffer(); // Buffer for incomplete lines across chunks
// Listen to the stream and parse the events
while (await iterator.moveNext()) {
try {
final bytes = iterator.current;
if (bytes.isEmpty) continue; // Skip empty chunks
_bytesReceived += .from(bytes.length); // Update bytes received counter
// Convert bytes to a string and append to buffer
final chunk = utfDecoder.convert(bytes);
lineBuffer.write(chunk);
// Process complete lines (delimited by \n)
final bufferContent = lineBuffer.toString();
final lines = bufferContent.split('\n');
// Keep the last incomplete line in buffer
if (!chunk.endsWith('\n')) {
lineBuffer.clear();
lineBuffer.write(lines.removeLast());
} else {
lineBuffer.clear();
}
// Process each complete line
for (final line in lines) {
if (disconnected) return; // Current connection is closed, stop processing
final trimmedLine = line.trim(); // Trim whitespace
// Empty line marks end of event - dispatch it
if (trimmedLine.isEmpty) {
// Dispatch event if we have event name or data
if (currentEvent != null || currentData != null) {
final eventName = currentEvent ?? 'message'; // Default event name per SSE spec
// Convert currentData to String (handle both String and StringBuffer)
final eventData = switch (currentData) {
String s => s,
StringBuffer sb => sb.toString(),
_ => null,
};
switch (eventData?.trim()) {
case null:
case '':
case '{}':
case 'null':
// Handle empty payload
_onEvent(eventName, const <String, Object?>{});
case final data when data.startsWith('{') && data.endsWith('}'):
// Handle JSON object as payload
try {
final json = jsonDecoder.convert(data);
_onEvent(eventName, json);
} on Object catch (e, s) {
_onError.call(e, s);
_onEvent(eventName, <String, Object?>{'data': data});
}
case final data when unixTimeRegex.hasMatch(data):
// Handle unix timestamps, for events like "ping" and "heartbeat"
_onEvent(eventName, <String, Object?>{
'timestamp': switch (int.tryParse(data)) {
//int n => DateTime.fromMillisecondsSinceEpoch(n >= 1e12 ? n : n * 1000, isUtc: true),
int n when data.length <= 10 => DateTime.fromMillisecondsSinceEpoch(n * 1000, isUtc: true),
int n => DateTime.fromMillisecondsSinceEpoch(n, isUtc: true),
null => DateTime.now().toUtc(),
},
});
case final data:
// Handle other cases as raw string payload
_onEvent(eventName, <String, Object?>{'data': data});
}
heartbeat = .now(); // Update last heartbeat time on any event
currentEvent = null;
currentData = null;
}
continue;
}
final pos = trimmedLine.indexOf(': ');
// Lines without ': ' are comments or malformed - ignore per SSE spec
if (pos == -1) continue;
final eventType = trimmedLine.substring(0, pos).trim().toLowerCase();
final eventValue = trimmedLine.substring(pos + 2); // Don't trim - preserve spaces
switch (eventType) {
case 'event':
// Set the current event type
currentEvent = eventValue;
case 'data':
// Accumulate data (can be multi-line)
// Use lazy StringBuffer only for multi-line data
switch (currentData) {
case null:
// First line - store as String
currentData = eventValue;
case String s:
// Second line - convert to StringBuffer
currentData = StringBuffer(s)
..write('\n')
..write(eventValue);
case StringBuffer sb:
// Subsequent lines - append to StringBuffer
sb
..write('\n')
..write(eventValue);
}
case 'id':
// Store the last event ID for reconnection
_lastEventId = eventValue.isNotEmpty ? eventValue : null;
case 'retry':
// Retry field - could be used to adjust reconnection delay
// For now, just ignore it
break;
default:
// Unknown field type - ignore it
break;
}
}
} on Object catch (e, s) {
currentEvent = null;
currentData = null;
_onError.call(e, s);
// Disconnect and attempt to reconnect after an error
setUpReconnectTimer();
return;
}
}
} on Object catch (e, s) {
_onError.call(e, s);
// Disconnect and attempt to reconnect after an error
setUpReconnectTimer();
return;
}
},
(error, stackTrace) {
_onError.call(error, stackTrace);
},
);
/// Closes the SSE client and releases all resources.
/// After calling this method, the SSE client cannot be used again.
void dispose() {
if (isClosed) return;
disconnect(); // Disconnect if connected
_api.close(); // Close the underlying API client
_onEvent = (_, _) {}; // No-op
_onError = (_, _) {}; // No-op
_eventController.close().ignore(); // Close the event stream
_onStateChange(.closed); // Update state to closed
_onStateChange = (_) {}; // No-op
_state.dispose(); // Dispose the state notifier
}
}
/// Backoff strategy for reconnection.
/// You can use this class to calculate the next delay for reconnection attempts.
/// [Backoff] uses the full jitter technique and a random number generator to
/// calculate the delay.
@internal
extension type const Backoff(math.Random _rnd) {
/// Singleton instance of [Backoff] with a default random number generator.
static final Backoff instance = .new(math.Random());
/// Full jitter technique.
/// https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
/// step - current step in backoff strategy.
/// minDelay - minimum delay in milliseconds, default is 500ms.
/// maxDelay - maximum delay in milliseconds, default is 15000ms.
Duration nextDelay(int step, [int minDelay = 500, int maxDelay = 15000]) {
if (minDelay >= maxDelay) return .new(milliseconds: maxDelay);
final val = math.min(maxDelay, minDelay * math.pow(2, step.clamp(0, 31)));
final interval = _rnd.nextInt(val.toInt());
return .new(milliseconds: (minDelay + interval).clamp(0, maxDelay));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment