Created
January 28, 2026 09:55
-
-
Save dened/06abb918cef4a8a8101bc699576e34fe to your computer and use it in GitHub Desktop.
Dart Server-sent events
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import 'dart:async'; | |
| import 'dart:convert' show Utf8Decoder, JsonDecoder; | |
| import 'dart:math' as math; | |
| import 'package:flutter/foundation.dart' show ValueListenable, ValueNotifier; | |
| import 'package:meta/meta.dart'; | |
| import 'api_client.dart'; | |
| /// A typedef representing a Server-Sent Event (SSE) event. | |
| /// [name] is the name of the event. | |
| /// [payload] is the data associated with the event. | |
| typedef SSEEvent = ({String name, Map<String, Object?> payload}); | |
| /// An enumeration representing the state of the SSE client. | |
| enum SSEClientState { | |
| disconnected('Disconnected'), | |
| connecting('Connecting'), | |
| connected('Connected'), | |
| closed('Closed'); | |
| const SSEClientState(this.representation); | |
| /// A string representation of the SSE client state. | |
| final String representation; | |
| /// Whether the client is disconnected. | |
| bool get isDisconnected => this == .disconnected; | |
| /// Whether the client is connecting. | |
| bool get isConnecting => this == .connecting; | |
| /// Whether the client is connected. | |
| bool get isConnected => this == .connected; | |
| /// Whether the client is closed. | |
| bool get isClosed => this == .closed; | |
| } | |
| /// {@template sse_client} | |
| /// A client for handling Server-Sent Events (SSE) connections. | |
| /// This client manages the connection lifecycle, including connecting, | |
| /// disconnecting, and reconnecting to the SSE endpoint. | |
| /// {@endtemplate} | |
| class SSEClient { | |
| /// Creates an instance of [SSEClient]. | |
| /// | |
| /// {@macro sse_client} | |
| SSEClient({ | |
| required ApiClient api, | |
| Map<String, String>? headers, | |
| Map<String, Object?>? context, | |
| Duration heartbeat = const .new(seconds: 60), | |
| ({Duration min, Duration max}) reconnectInterval = ( | |
| min: const .new(milliseconds: 500), | |
| max: const .new(seconds: 15), | |
| ), | |
| void Function(String name, Map<String, Object?> payload)? onEvent, | |
| void Function(SSEClientState state)? onStateChange, | |
| void Function(Object error, StackTrace stackTrace)? onError, | |
| }) : _api = api.clone(), | |
| _headers = <String, String>{ | |
| ...?headers, | |
| 'Accept': 'text/event-stream', | |
| 'Cache-Control': 'no-cache', | |
| 'Accept-Charset': 'utf-8', | |
| 'Connection': 'keep-alive', | |
| }, | |
| _context = <String, Object?>{ | |
| ...?context, | |
| 'timeout': Duration.zero, // Do not timeout the SSE connection | |
| 'sse': true, // Indicate that this is an SSE request | |
| }, | |
| _heartbeat = heartbeat, | |
| _reconnectInterval = reconnectInterval, | |
| _eventController = .broadcast(), | |
| _state = .new(.disconnected) { | |
| // --- Set up callbacks --- // | |
| // Handle incoming events | |
| _onEvent = (String name, Map<String, Object?> payload) { | |
| _eventCounter += .one; | |
| _lastEventAt = .now(); // Update last heartbeat time on any event | |
| onEvent?.call(name, payload); | |
| _eventController.add((name: name, payload: payload)); | |
| }; | |
| // Handle connection state changes | |
| _onStateChange = (SSEClientState state) { | |
| if (state == _state.value) return; // No state change | |
| _state.value = state; | |
| onStateChange?.call(state); | |
| }; | |
| // Handle errors within the SSE client | |
| _onError = (Object error, StackTrace stackTrace) { | |
| onError?.call(error, stackTrace); | |
| }; | |
| } | |
| /// Creates an instance of [SSEClient] and immediately connects to the SSE endpoint. | |
| /// [path] is the SSE endpoint path to connect to. | |
| /// | |
| /// Use it like this: | |
| /// ```dart | |
| /// SSEClient.connect(api: apiClient, path: '/v1/chats/$chatId/updates') | |
| /// ..events.listen((event) { | |
| /// print('${event.name}: ${event.payload}'); | |
| /// }); | |
| /// ``` | |
| /// | |
| /// {@macro sse_client} | |
| factory SSEClient.connect({ | |
| required ApiClient api, | |
| required String path, | |
| Map<String, String>? headers, | |
| Map<String, Object?>? context, | |
| Duration heartbeat = const .new(seconds: 60), | |
| ({Duration min, Duration max}) reconnectInterval = ( | |
| min: const .new(milliseconds: 500), | |
| max: const .new(seconds: 15), | |
| ), | |
| void Function(String name, Map<String, Object?> payload)? onEvent, | |
| void Function(SSEClientState state)? onStateChange, | |
| void Function(Object error, StackTrace stackTrace)? onError, | |
| }) => .new( | |
| api: api, | |
| headers: headers, | |
| context: context, | |
| heartbeat: heartbeat, | |
| reconnectInterval: reconnectInterval, | |
| onEvent: onEvent, | |
| onStateChange: onStateChange, | |
| onError: onError, | |
| )..reconnect(path); | |
| /// An API client used to make requests. | |
| final ApiClient _api; | |
| /// Headers to include in the SSE request. | |
| final Map<String, String> _headers; | |
| /// Context to include in the SSE request. | |
| final Map<String, Object?> _context; | |
| /// Heartbeat duration for keeping the connection alive. | |
| final Duration _heartbeat; | |
| /// Reconnection interval configuration. | |
| final ({Duration min, Duration max}) _reconnectInterval; | |
| /// Stream controller for broadcasting SSE events. | |
| final StreamController<SSEEvent> _eventController; | |
| /// A counter for number of events received. | |
| BigInt _eventCounter = .zero; | |
| /// The total number of SSE events received. | |
| BigInt get receivedEvents => _eventCounter; | |
| /// The current SSE endpoint path. | |
| String? _currentPath; | |
| /// The current SSE endpoint path. | |
| String? get currentPath => _currentPath; | |
| /// The last event ID received from the server. | |
| /// Used for reconnection to resume from the last received event. | |
| String? _lastEventId; | |
| /// The last event ID received from the server. | |
| /// This can be used to resume the event stream after reconnection. | |
| String? get lastEventId => _lastEventId; | |
| /// The timestamp of the last event received. | |
| DateTime? _lastEventAt; | |
| /// The timestamp of the last event received. | |
| DateTime? get lastEventAt => _lastEventAt; | |
| /// A counter for number of bytes received. | |
| BigInt _bytesReceived = .zero; | |
| /// The total number of bytes received from the SSE connection. | |
| /// This includes all data received over the connection. | |
| BigInt get bytesReceived => _bytesReceived; | |
| /// Stream of SSE events. | |
| /// Clients can listen to this stream to receive events. | |
| /// Each event is represented as a tuple containing the event name and payload. | |
| /// Stream events are broadcasted to all listeners and can be listened to multiple times. | |
| /// Stream provides only data, without errors, and emits done only when the SSE client is closed. | |
| /// | |
| /// Use it like this: | |
| /// ```dart | |
| /// final sub = sseClient.events.listen((event) { | |
| /// print('${event.event}: ${event.payload}'); | |
| /// }); | |
| /// ``` | |
| Stream<SSEEvent> get events => _eventController.stream; | |
| /// Indicates whether the SSE client is closed. | |
| bool get isClosed => _state.value == .closed; | |
| /// Indicates whether the SSE client is connected. | |
| bool get isConnected => _state.value == .connected; | |
| /// Indicates whether the SSE client is connecting. | |
| bool get isConnecting => _state.value == .connecting; | |
| /// Indicates whether the SSE client is disconnected. | |
| bool get isDisconnected => _state.value == .disconnected; | |
| /// Notifier for the current state of the SSE client. | |
| final ValueNotifier<SSEClientState> _state; | |
| /// The current state of the SSE client. | |
| ValueListenable<SSEClientState> get state => _state; | |
| /// Callback invoked when an SSE event is received. | |
| late void Function(String name, Map<String, Object?> payload) _onEvent; | |
| /// Callback invoked when the state of the SSE client changes. | |
| late void Function(SSEClientState state) _onStateChange; | |
| /// Callback invoked when an error occurs in the SSE client. | |
| /// E.g. network errors, socket errors, parsing errors, etc. | |
| late void Function(Object error, StackTrace stackTrace) _onError; | |
| /// A function to tear down the SSE connection. | |
| void Function()? _tearDown; | |
| /// A timer used for reconnection attempts. | |
| Timer? _reconnectTimer; | |
| /// The current number of reconnection attempts. | |
| int _reconnectAttempt = 0; | |
| /// The current number of reconnection attempts. | |
| int get reconnectAttempt => _reconnectAttempt; | |
| /// Disconnects from the SSE endpoint. | |
| /// If the client is already disconnected, this method does nothing. | |
| void disconnect() { | |
| _reconnectTimer?.cancel(); // Cancel any existing reconnection timer | |
| _reconnectTimer = null; | |
| if (isClosed || isDisconnected) return; // Already disconnected or closed | |
| _onStateChange(.disconnected); | |
| _tearDown?.call(); // Call the tear down function to close the connection | |
| _tearDown = null; | |
| } | |
| /// Reconnects to the SSE endpoint. | |
| /// If the client is already connected, it will first disconnect and then reconnect. | |
| /// [path] is the SSE endpoint path to connect to. | |
| /// | |
| /// Use it like this: | |
| /// ```dart | |
| /// sseClient | |
| /// ..reconnect('/v1/chats/$chatId/updates') | |
| /// ..events.listen((event) { | |
| /// print('${event.event}: ${event.payload}'); | |
| /// }); | |
| /// ``` | |
| void reconnect(String path) => runZonedGuarded<void>( | |
| () async { | |
| disconnect(); // If already connected, disconnect first | |
| // Push to stack of tear down functions | |
| // All functions must be synchronous to avoid race conditions | |
| void onClose(void Function() tearDown) { | |
| final fn = _tearDown ?? () {}; | |
| _tearDown = () { | |
| try { | |
| tearDown(); | |
| } on Object catch (e, s) { | |
| _onError.call(e, s); | |
| } | |
| fn(); | |
| }; | |
| } | |
| _onStateChange(.connecting); // Update state to connecting | |
| var disconnected = false; // Flag to indicate if the CURRENT connection is disconnected | |
| var heartbeat = DateTime.now(); // Last heartbeat time | |
| onClose(() { | |
| disconnected = true; // Set disconnected flag on tear down | |
| _onStateChange(.disconnected); | |
| }); | |
| // Exponential backoff reconnection logic | |
| void setUpReconnectTimer() { | |
| disconnect(); // Disconnect existing connection | |
| _reconnectTimer = Timer( | |
| // Calculate next delay using exponential backoff strategy | |
| Backoff.instance.nextDelay( | |
| _reconnectAttempt++, | |
| _reconnectInterval.min.inMilliseconds, | |
| _reconnectInterval.max.inMilliseconds, | |
| ), | |
| () { | |
| if (isConnected || isConnecting) return; // If already connected or connecting, do nothing | |
| reconnect(path); // Attempt to reconnect after the duration | |
| }, | |
| ); | |
| } | |
| // Set up heartbeat timer if configured | |
| if (_heartbeat case final duration when duration >= const .new(seconds: 1)) { | |
| // Set up heartbeat to keep the connection alive, if configured | |
| final timer = Timer.periodic(duration, (timer) { | |
| if (disconnected) return timer.cancel(); // If disconnected, stop the timer | |
| final now = DateTime.now(); | |
| // Check if the last heartbeat was received within the interval | |
| // Add a 50% grace period to account for network delays | |
| if (now.difference(heartbeat) < duration * 1.5) return; | |
| // No heartbeat received within the interval, close the connection | |
| _onError.call(Exception('SSE heartbeat timeout'), .current); | |
| setUpReconnectTimer(); // Set up reconnection timer | |
| }); | |
| onClose(timer.cancel); // Ensure timer is cancelled on close | |
| } | |
| try { | |
| // Make the SSE request with Last-Event-ID header if available | |
| final requestHeaders = <String, String>{..._headers, 'Last-Event-ID': ?_lastEventId}; | |
| final response = await _api | |
| .get(path, headers: requestHeaders, context: _context) | |
| .timeout(const .new(seconds: 30)); | |
| if (disconnected) return; // If closed during the request, stop processing | |
| // Handle response status codes | |
| switch (response.statusCode) { | |
| case > 299: | |
| case < 200: | |
| _onError.call(Exception('SSE connection error: ${response.statusCode}'), .current); | |
| setUpReconnectTimer(); // Set up reconnection timer | |
| return; | |
| } | |
| // Log the successful connection | |
| _onStateChange(.connected); | |
| _reconnectAttempt = 0; // Reset reconnection attempts on successful connection | |
| _lastEventAt = .now(); // Update last event timestamp | |
| // Start listening to the SSE stream and iterate over the stream events | |
| final byteStream = response.stream; | |
| final iterator = StreamIterator<List<int>>(byteStream); | |
| onClose(() => iterator.cancel().ignore()); // Ensure iterator is cancelled on close | |
| String? currentEvent; // Last event type, used to determine the type of data | |
| Object? currentData; // Accumulated data: String for single-line, StringBuffer for multi-line | |
| const utfDecoder = Utf8Decoder(allowMalformed: false); // Decoder for UTF-8 bytes, do not allow malformed bytes | |
| final jsonDecoder = const JsonDecoder().cast<String, Map<String, Object?>>(); // Event data JSON decoder | |
| final unixTimeRegex = RegExp(r'^\d{10,13}$'); // Regex to match unix timestamps (10..13 digits) | |
| final lineBuffer = StringBuffer(); // Buffer for incomplete lines across chunks | |
| // Listen to the stream and parse the events | |
| while (await iterator.moveNext()) { | |
| try { | |
| final bytes = iterator.current; | |
| if (bytes.isEmpty) continue; // Skip empty chunks | |
| _bytesReceived += .from(bytes.length); // Update bytes received counter | |
| // Convert bytes to a string and append to buffer | |
| final chunk = utfDecoder.convert(bytes); | |
| lineBuffer.write(chunk); | |
| // Process complete lines (delimited by \n) | |
| final bufferContent = lineBuffer.toString(); | |
| final lines = bufferContent.split('\n'); | |
| // Keep the last incomplete line in buffer | |
| if (!chunk.endsWith('\n')) { | |
| lineBuffer.clear(); | |
| lineBuffer.write(lines.removeLast()); | |
| } else { | |
| lineBuffer.clear(); | |
| } | |
| // Process each complete line | |
| for (final line in lines) { | |
| if (disconnected) return; // Current connection is closed, stop processing | |
| final trimmedLine = line.trim(); // Trim whitespace | |
| // Empty line marks end of event - dispatch it | |
| if (trimmedLine.isEmpty) { | |
| // Dispatch event if we have event name or data | |
| if (currentEvent != null || currentData != null) { | |
| final eventName = currentEvent ?? 'message'; // Default event name per SSE spec | |
| // Convert currentData to String (handle both String and StringBuffer) | |
| final eventData = switch (currentData) { | |
| String s => s, | |
| StringBuffer sb => sb.toString(), | |
| _ => null, | |
| }; | |
| switch (eventData?.trim()) { | |
| case null: | |
| case '': | |
| case '{}': | |
| case 'null': | |
| // Handle empty payload | |
| _onEvent(eventName, const <String, Object?>{}); | |
| case final data when data.startsWith('{') && data.endsWith('}'): | |
| // Handle JSON object as payload | |
| try { | |
| final json = jsonDecoder.convert(data); | |
| _onEvent(eventName, json); | |
| } on Object catch (e, s) { | |
| _onError.call(e, s); | |
| _onEvent(eventName, <String, Object?>{'data': data}); | |
| } | |
| case final data when unixTimeRegex.hasMatch(data): | |
| // Handle unix timestamps, for events like "ping" and "heartbeat" | |
| _onEvent(eventName, <String, Object?>{ | |
| 'timestamp': switch (int.tryParse(data)) { | |
| //int n => DateTime.fromMillisecondsSinceEpoch(n >= 1e12 ? n : n * 1000, isUtc: true), | |
| int n when data.length <= 10 => DateTime.fromMillisecondsSinceEpoch(n * 1000, isUtc: true), | |
| int n => DateTime.fromMillisecondsSinceEpoch(n, isUtc: true), | |
| null => DateTime.now().toUtc(), | |
| }, | |
| }); | |
| case final data: | |
| // Handle other cases as raw string payload | |
| _onEvent(eventName, <String, Object?>{'data': data}); | |
| } | |
| heartbeat = .now(); // Update last heartbeat time on any event | |
| currentEvent = null; | |
| currentData = null; | |
| } | |
| continue; | |
| } | |
| final pos = trimmedLine.indexOf(': '); | |
| // Lines without ': ' are comments or malformed - ignore per SSE spec | |
| if (pos == -1) continue; | |
| final eventType = trimmedLine.substring(0, pos).trim().toLowerCase(); | |
| final eventValue = trimmedLine.substring(pos + 2); // Don't trim - preserve spaces | |
| switch (eventType) { | |
| case 'event': | |
| // Set the current event type | |
| currentEvent = eventValue; | |
| case 'data': | |
| // Accumulate data (can be multi-line) | |
| // Use lazy StringBuffer only for multi-line data | |
| switch (currentData) { | |
| case null: | |
| // First line - store as String | |
| currentData = eventValue; | |
| case String s: | |
| // Second line - convert to StringBuffer | |
| currentData = StringBuffer(s) | |
| ..write('\n') | |
| ..write(eventValue); | |
| case StringBuffer sb: | |
| // Subsequent lines - append to StringBuffer | |
| sb | |
| ..write('\n') | |
| ..write(eventValue); | |
| } | |
| case 'id': | |
| // Store the last event ID for reconnection | |
| _lastEventId = eventValue.isNotEmpty ? eventValue : null; | |
| case 'retry': | |
| // Retry field - could be used to adjust reconnection delay | |
| // For now, just ignore it | |
| break; | |
| default: | |
| // Unknown field type - ignore it | |
| break; | |
| } | |
| } | |
| } on Object catch (e, s) { | |
| currentEvent = null; | |
| currentData = null; | |
| _onError.call(e, s); | |
| // Disconnect and attempt to reconnect after an error | |
| setUpReconnectTimer(); | |
| return; | |
| } | |
| } | |
| } on Object catch (e, s) { | |
| _onError.call(e, s); | |
| // Disconnect and attempt to reconnect after an error | |
| setUpReconnectTimer(); | |
| return; | |
| } | |
| }, | |
| (error, stackTrace) { | |
| _onError.call(error, stackTrace); | |
| }, | |
| ); | |
| /// Closes the SSE client and releases all resources. | |
| /// After calling this method, the SSE client cannot be used again. | |
| void dispose() { | |
| if (isClosed) return; | |
| disconnect(); // Disconnect if connected | |
| _api.close(); // Close the underlying API client | |
| _onEvent = (_, _) {}; // No-op | |
| _onError = (_, _) {}; // No-op | |
| _eventController.close().ignore(); // Close the event stream | |
| _onStateChange(.closed); // Update state to closed | |
| _onStateChange = (_) {}; // No-op | |
| _state.dispose(); // Dispose the state notifier | |
| } | |
| } | |
| /// Backoff strategy for reconnection. | |
| /// You can use this class to calculate the next delay for reconnection attempts. | |
| /// [Backoff] uses the full jitter technique and a random number generator to | |
| /// calculate the delay. | |
| @internal | |
| extension type const Backoff(math.Random _rnd) { | |
| /// Singleton instance of [Backoff] with a default random number generator. | |
| static final Backoff instance = .new(math.Random()); | |
| /// Full jitter technique. | |
| /// https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ | |
| /// step - current step in backoff strategy. | |
| /// minDelay - minimum delay in milliseconds, default is 500ms. | |
| /// maxDelay - maximum delay in milliseconds, default is 15000ms. | |
| Duration nextDelay(int step, [int minDelay = 500, int maxDelay = 15000]) { | |
| if (minDelay >= maxDelay) return .new(milliseconds: maxDelay); | |
| final val = math.min(maxDelay, minDelay * math.pow(2, step.clamp(0, 31))); | |
| final interval = _rnd.nextInt(val.toInt()); | |
| return .new(milliseconds: (minDelay + interval).clamp(0, maxDelay)); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment