package sse import ( "encoding/json" "fmt" "log" "net/http" ) // A MessageChan is a channel of channels // Each connection sends a channel of bytes to a global MessageChan // The main broker listen() loop listens on new connections on MessageChan // New event messages are broadcast to all registered connection channels type MessageChan chan Message // Message is a message. type Message struct { Event string Object interface{} } // Broker holds open client connections, // listens for incoming events on its Notifier channel // and broadcast event data to all registered connections type Broker struct { // Events are pushed to this channel by the main UDP daemon Notifier chan Message // New client connections newClients chan MessageChan // Closed client connections closingClients chan MessageChan // Client connections registry clients map[MessageChan]bool } // Listen on different channels and act accordingly func (broker *Broker) listen() { for { select { case s := <-broker.newClients: // A new client has connected. // Register their message channel broker.clients[s] = true log.Printf("Client added. %d registered clients", len(broker.clients)) case s := <-broker.closingClients: // A client has detached and we want to // stop sending them messages. delete(broker.clients, s) log.Printf("Removed client. %d registered clients", len(broker.clients)) case event := <-broker.Notifier: // We got a new event from the outside! // Send event to all connected clients for clientMessageChan := range broker.clients { clientMessageChan <- event } } } } // NewBroker creates a Broker. func NewBroker() (broker *Broker) { // Instantiate a broker broker = &Broker{ Notifier: make(chan Message, 1), newClients: make(chan MessageChan), closingClients: make(chan MessageChan), clients: make(map[MessageChan]bool), } // Set it running - listening and broadcasting events go broker.listen() return } // StartConnection starts a SSE connection, based on an existing HTTP connection. func StartConnection(broker *Broker, w http.ResponseWriter) error { // Make sure that the writer supports flushing. flusher, ok := w.(http.Flusher) if !ok { return fmt.Errorf("streaming unsupported") } // Set the headers related to event streaming. w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("Access-Control-Allow-Origin", "*") _, err := fmt.Fprintf(w, "event: welcome\r\n") if err != nil { return err } _, err = fmt.Fprintf(w, "data: {\"key\":\"hello world\"}\r\n\r\n") if err != nil { return err } flusher.Flush() // Each connection registers its own message channel with the Broker's connections registry messageChan := make(MessageChan) // Signal the broker that we have a new connection broker.newClients <- messageChan // Remove this client from the map of connected clients // when this handler exits. defer func() { broker.closingClients <- messageChan }() // Listen to connection close and un-register messageChan notify := w.(http.CloseNotifier).CloseNotify() go func() { <-notify broker.closingClients <- messageChan }() // block waiting or messages broadcast on this connection's messageChan for { // Write to the ResponseWriter, Server Sent Events compatible message := <-messageChan output, err := json.Marshal(message.Object) if err != nil { log.Println(err) continue } fmt.Fprintf(w, "event: %s\n", message.Event) fmt.Fprintf(w, "data: %s\n\n", output) // Flush the data immediately instead of buffering it for later. flusher.Flush() } }