Last active
June 5, 2025 03:04
-
-
Save Ananto30/8af841f250e89c07e122e2a838698246 to your computer and use it in GitHub Desktop.
SSE message stream in Go
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // Example SSE server in Golang. | |
| // $ go run sse.go | |
| // Inspired from https://gist.github.com/ismasan/3fb75381cd2deb6bfa9c | |
| package main | |
| import ( | |
| "encoding/json" | |
| "fmt" | |
| "log" | |
| "net/http" | |
| "github.com/gorilla/mux" | |
| ) | |
| type Broker struct { | |
| // Events are pushed to this channel by the main events-gathering routine | |
| Notifier chan []byte | |
| // New client connections are pushed to this channel | |
| newClients chan chan []byte | |
| // Closed client connections are pushed to this channel | |
| closingClients chan chan []byte | |
| // Client connections registry | |
| clients map[chan []byte]bool | |
| } | |
| func NewServer() (broker *Broker) { | |
| // Instantiate a broker | |
| broker = &Broker{ | |
| Notifier: make(chan []byte, 1), | |
| newClients: make(chan chan []byte), | |
| closingClients: make(chan chan []byte), | |
| clients: make(map[chan []byte]bool), | |
| } | |
| // Set it running - listening and broadcasting events | |
| go broker.listen() | |
| return | |
| } | |
| func (broker *Broker) listen() { | |
| for { | |
| select { | |
| case s := <-broker.newClients: | |
| // A new client has connected. | |
| // Register their message channel | |
| broker.clients[s] = true | |
| log.Printf("Client added. %d registered clients", len(broker.clients)) | |
| case s := <-broker.closingClients: | |
| // A client has dettached and we want to | |
| // stop sending them messages. | |
| delete(broker.clients, s) | |
| log.Printf("Removed client. %d registered clients", len(broker.clients)) | |
| case event := <-broker.Notifier: | |
| // We got a new event from the outside! | |
| // Send event to all connected clients | |
| for clientMessageChan := range broker.clients { | |
| clientMessageChan <- event | |
| } | |
| } | |
| } | |
| } | |
| type Message struct { | |
| Name string `json:"name"` | |
| Message string `json:"msg"` | |
| } | |
| func (broker *Broker) Stream(w http.ResponseWriter, r *http.Request) { | |
| // Check if the ResponseWriter supports flushing. | |
| flusher, ok := w.(http.Flusher) | |
| if !ok { | |
| http.Error(w, "Streaming unsupported!", http.StatusInternalServerError) | |
| return | |
| } | |
| // Each connection registers its own message channel with the Broker's connections registry | |
| messageChan := make(chan []byte) | |
| // Signal the broker that we have a new connection | |
| broker.newClients <- messageChan | |
| // Remove this client from the map of connected clients | |
| // when this handler exits. | |
| defer func() { | |
| broker.closingClients <- messageChan | |
| }() | |
| w.Header().Set("Content-Type", "text/event-stream") | |
| w.Header().Set("Cache-Control", "no-cache") | |
| w.Header().Set("Connection", "keep-alive") | |
| w.Header().Set("Access-Control-Allow-Origin", "*") | |
| for { | |
| select { | |
| // Listen to connection close and un-register messageChan | |
| case <-r.Context().Done(): | |
| // remove this client from the map of connected clients | |
| broker.closingClients <- messageChan | |
| return | |
| // Listen for incoming messages from messageChan | |
| case msg := <-messageChan: | |
| // Write to the ResponseWriter | |
| // Server Sent Events compatible | |
| fmt.Fprintf(w, "data: %s\n\n", msg) | |
| // Flush the data immediatly instead of buffering it for later. | |
| flusher.Flush() | |
| } | |
| } | |
| } | |
| func (broker *Broker) BroadcastMessage(w http.ResponseWriter, r *http.Request) { | |
| // Parse the request body | |
| var msg Message | |
| err := json.NewDecoder(r.Body).Decode(&msg) | |
| if err != nil { | |
| http.Error(w, err.Error(), http.StatusBadRequest) | |
| return | |
| } | |
| // Send the message to the broker via Notifier channel | |
| j, err := json.Marshal(msg) | |
| if err != nil { | |
| http.Error(w, err.Error(), http.StatusInternalServerError) | |
| return | |
| } | |
| broker.Notifier <- []byte(j) | |
| w.WriteHeader(http.StatusCreated) | |
| w.Write([]byte("Message sent")) | |
| } | |
| func main() { | |
| broker := NewServer() | |
| router := mux.NewRouter() | |
| router.HandleFunc("/messages", broker.BroadcastMessage).Methods("POST") | |
| router.HandleFunc("/stream", broker.Stream).Methods("GET") | |
| log.Println("Starting server on :8000") | |
| log.Fatal(http.ListenAndServe(":8000", router)) | |
| } | |
| // To test the server, run the following commands in separate terminals: | |
| // Start listening to the stream | |
| // $ curl -N http://localhost:8000/stream | |
| // Send a message | |
| // $ curl -X POST -H "Content-Type: application/json" -d '{"name": "Alice", "msg": "Hello"}' http://localhost:8000/messages |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hey thanks for the implementation @Ananto30 its great!
One thing
http.CloseNotifierfromnotify := w.(http.CloseNotifier).CloseNotify()inBroker.Stream()is deprecated now. Could you update the example to use[Request.Context]instead?For me this is the first result when I search for "gorilla mux sse" on google so it would be nice to have this example uptodate.
I would add a MR but I don't know how to do it here.
Here is my suggestion how the updated code could look:
Also I have a question. If we return from the
Broker.Stream()method when the connection is closed and we send themsgChanneltoBroker.ClosingClientsmanually, does it make sense to also send it in the defer function? Aren't we sending it twice?I think either just return when the connection closes and let the defer handle unregistering the client from the broker or unregister the client manually when the connection closes and don't use defer. This is not reflected in my suggestion tho, so if you are going to update it could you also take a look at this?
Have a nice day and thanks for your work!