From 1e00d32aedb8c41cd5141eaa6ec6122f99dedbe5 Mon Sep 17 00:00:00 2001 From: Peter Stuifzand Date: Sun, 24 Mar 2019 15:15:55 +0100 Subject: [PATCH] Split of StartConnection code from microsub server --- cmd/eksterd/memory.go | 6 +- pkg/server/events.go | 74 ---------------------- pkg/server/microsub.go | 65 +++---------------- pkg/sse/events.go | 140 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 150 insertions(+), 135 deletions(-) delete mode 100644 pkg/server/events.go create mode 100644 pkg/sse/events.go diff --git a/cmd/eksterd/memory.go b/cmd/eksterd/memory.go index 0987d05..df3ef1a 100644 --- a/cmd/eksterd/memory.go +++ b/cmd/eksterd/memory.go @@ -18,7 +18,7 @@ import ( "p83.nl/go/ekster/pkg/auth" "p83.nl/go/ekster/pkg/fetch" "p83.nl/go/ekster/pkg/microsub" - "p83.nl/go/ekster/pkg/server" + "p83.nl/go/ekster/pkg/sse" "p83.nl/go/ekster/pkg/timeline" "p83.nl/go/ekster/pkg/util" @@ -45,7 +45,7 @@ type memoryBackend struct { ticker *time.Ticker quit chan struct{} - broker *server.Broker + broker *sse.Broker pool *redis.Pool } @@ -636,7 +636,7 @@ func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) error err := timelineBackend.AddItem(item) // Sent message to Server-Sent-Events - b.broker.Notifier <- server.Message{Event: "new item", Object: item} + b.broker.Notifier <- sse.Message{Event: "new item", Object: item} return err } diff --git a/pkg/server/events.go b/pkg/server/events.go deleted file mode 100644 index b8d5d47..0000000 --- a/pkg/server/events.go +++ /dev/null @@ -1,74 +0,0 @@ -package server - -import ( - "log" -) - -// A MessageChan is a channel of channels -// Each connection sends a channel of bytes to a global MessageChan -// The main broker listen() loop listens on new connections on MessageChan -// New event messages are broadcast to all registered connection channels -type MessageChan chan Message - -type Message struct { - Event string - Object interface{} -} - -// A Broker holds open client connections, -// listens for incoming events on its Notifier channel -// and broadcast event data to all registered connections -type Broker struct { - // Events are pushed to this channel by the main UDP daemon - Notifier chan Message - - // New client connections - newClients chan MessageChan - - // Closed client connections - closingClients chan MessageChan - - // Client connections registry - clients map[MessageChan]bool -} - -// Listen on different channels and act accordingly -func (broker *Broker) listen() { - for { - select { - case s := <-broker.newClients: - // A new client has connected. - // Register their message channel - broker.clients[s] = true - log.Printf("Client added. %d registered clients", len(broker.clients)) - case s := <-broker.closingClients: - // A client has detached and we want to - // stop sending them messages. - delete(broker.clients, s) - log.Printf("Removed client. %d registered clients", len(broker.clients)) - case event := <-broker.Notifier: - // We got a new event from the outside! - // Send event to all connected clients - for clientMessageChan := range broker.clients { - clientMessageChan <- event - } - } - } - -} - -// Broker factory -func NewServer() (broker *Broker) { - // Instantiate a broker - broker = &Broker{ - Notifier: make(chan Message, 1), - newClients: make(chan MessageChan), - closingClients: make(chan MessageChan), - clients: make(map[MessageChan]bool), - } - - // Set it running - listening and broadcasting events - go broker.listen() - - return -} diff --git a/pkg/server/microsub.go b/pkg/server/microsub.go index d35e56d..5694a96 100644 --- a/pkg/server/microsub.go +++ b/pkg/server/microsub.go @@ -7,11 +7,11 @@ package server import ( "encoding/json" "fmt" - "log" "net/http" "regexp" "p83.nl/go/ekster/pkg/microsub" + "p83.nl/go/ekster/pkg/sse" ) var ( @@ -25,7 +25,7 @@ const ( type microsubHandler struct { backend microsub.Microsub - Broker *Broker + Broker *sse.Broker } func respondJSON(w http.ResponseWriter, value interface{}) { @@ -41,8 +41,8 @@ func respondJSON(w http.ResponseWriter, value interface{}) { // NewMicrosubHandler is the main entry point for the microsub server // It returns a handler for HTTP and a broker that will send events. -func NewMicrosubHandler(backend microsub.Microsub) (http.Handler, *Broker) { - broker := NewServer() +func NewMicrosubHandler(backend microsub.Microsub) (http.Handler, *sse.Broker) { + broker := sse.NewBroker() return µsubHandler{backend, broker}, broker } @@ -99,60 +99,9 @@ func (h *microsubHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { "items": following, }) } else if action == "events" { - // Make sure that the writer supports flushing. - // - flusher, ok := w.(http.Flusher) - - if !ok { - http.Error(w, "Streaming unsupported!", http.StatusInternalServerError) - return - } - - // Set the headers related to event streaming. - w.Header().Set("Content-Type", "text/event-stream") - w.Header().Set("Cache-Control", "no-cache") - w.Header().Set("Connection", "keep-alive") - w.Header().Set("Access-Control-Allow-Origin", "*") - - fmt.Fprintf(w, "event: welcome\r\n") - fmt.Fprintf(w, "data: {\"key\":\"hello world\"}\r\n\r\n") - - flusher.Flush() - - // Each connection registers its own message channel with the Broker's connections registry - messageChan := make(MessageChan) - - // Signal the broker that we have a new connection - h.Broker.newClients <- messageChan - - // Remove this client from the map of connected clients - // when this handler exits. - defer func() { - h.Broker.closingClients <- messageChan - }() - - // Listen to connection close and un-register messageChan - notify := w.(http.CloseNotifier).CloseNotify() - - go func() { - <-notify - h.Broker.closingClients <- messageChan - }() - - // block waiting or messages broadcast on this connection's messageChan - for { - // Write to the ResponseWriter, Server Sent Events compatible - message := <-messageChan - output, err := json.Marshal(message.Object) - if err != nil { - log.Println(err) - continue - } - fmt.Fprintf(w, "event: %s\n", message.Event) - fmt.Fprintf(w, "data: %s\n\n", output) - - // Flush the data immediately instead of buffering it for later. - flusher.Flush() + err := sse.StartConnection(h.Broker, w) + if err != nil { + http.Error(w, "could not start sse connection", 500) } } else { http.Error(w, fmt.Sprintf("unknown action %s\n", action), 400) diff --git a/pkg/sse/events.go b/pkg/sse/events.go new file mode 100644 index 0000000..aa17eab --- /dev/null +++ b/pkg/sse/events.go @@ -0,0 +1,140 @@ +package sse + +import ( + "encoding/json" + "fmt" + "log" + "net/http" +) + +// A MessageChan is a channel of channels +// Each connection sends a channel of bytes to a global MessageChan +// The main broker listen() loop listens on new connections on MessageChan +// New event messages are broadcast to all registered connection channels +type MessageChan chan Message + +// Message is a message. +type Message struct { + Event string + Object interface{} +} + +// Broker holds open client connections, +// listens for incoming events on its Notifier channel +// and broadcast event data to all registered connections +type Broker struct { + // Events are pushed to this channel by the main UDP daemon + Notifier chan Message + + // New client connections + newClients chan MessageChan + + // Closed client connections + closingClients chan MessageChan + + // Client connections registry + clients map[MessageChan]bool +} + +// Listen on different channels and act accordingly +func (broker *Broker) listen() { + for { + select { + case s := <-broker.newClients: + // A new client has connected. + // Register their message channel + broker.clients[s] = true + log.Printf("Client added. %d registered clients", len(broker.clients)) + case s := <-broker.closingClients: + // A client has detached and we want to + // stop sending them messages. + delete(broker.clients, s) + log.Printf("Removed client. %d registered clients", len(broker.clients)) + case event := <-broker.Notifier: + // We got a new event from the outside! + // Send event to all connected clients + for clientMessageChan := range broker.clients { + clientMessageChan <- event + } + } + } + +} + +// NewBroker creates a Broker. +func NewBroker() (broker *Broker) { + // Instantiate a broker + broker = &Broker{ + Notifier: make(chan Message, 1), + newClients: make(chan MessageChan), + closingClients: make(chan MessageChan), + clients: make(map[MessageChan]bool), + } + + // Set it running - listening and broadcasting events + go broker.listen() + + return +} + +// StartConnection starts a SSE connection, based on an existing HTTP connection. +func StartConnection(broker *Broker, w http.ResponseWriter) error { + // Make sure that the writer supports flushing. + flusher, ok := w.(http.Flusher) + if !ok { + return fmt.Errorf("streaming unsupported") + } + + // Set the headers related to event streaming. + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("Access-Control-Allow-Origin", "*") + + _, err := fmt.Fprintf(w, "event: welcome\r\n") + if err != nil { + return err + } + _, err = fmt.Fprintf(w, "data: {\"key\":\"hello world\"}\r\n\r\n") + if err != nil { + return err + } + + flusher.Flush() + + // Each connection registers its own message channel with the Broker's connections registry + messageChan := make(MessageChan) + + // Signal the broker that we have a new connection + broker.newClients <- messageChan + + // Remove this client from the map of connected clients + // when this handler exits. + defer func() { + broker.closingClients <- messageChan + }() + + // Listen to connection close and un-register messageChan + notify := w.(http.CloseNotifier).CloseNotify() + + go func() { + <-notify + broker.closingClients <- messageChan + }() + + // block waiting or messages broadcast on this connection's messageChan + for { + // Write to the ResponseWriter, Server Sent Events compatible + message := <-messageChan + output, err := json.Marshal(message.Object) + if err != nil { + log.Println(err) + continue + } + fmt.Fprintf(w, "event: %s\n", message.Event) + fmt.Fprintf(w, "data: %s\n\n", output) + + // Flush the data immediately instead of buffering it for later. + flusher.Flush() + } +}