From 83956b4d9fdfed39b43ea350aa1f7d641a8af2ba Mon Sep 17 00:00:00 2001 From: Peter Stuifzand Date: Mon, 18 Feb 2019 21:37:19 +0100 Subject: [PATCH] Use server-sent-events implementation from thoughtbot New endpoint action "action=events" opens an SSE channel. Implementation based on https://thoughtbot.com/blog/writing-a-server-sent-events-server-in-go --- cmd/eksterd/main.go | 4 +- cmd/eksterd/memory.go | 21 ++++----- pkg/microsub/protocol.go | 2 - pkg/server/events.go | 93 +++++++++++++++++++++++++--------------- pkg/server/microsub.go | 59 ++++++++++++++++++++++--- 5 files changed, 123 insertions(+), 56 deletions(-) diff --git a/cmd/eksterd/main.go b/cmd/eksterd/main.go index c595961..8268efa 100644 --- a/cmd/eksterd/main.go +++ b/cmd/eksterd/main.go @@ -116,11 +116,13 @@ func NewApp(options AppOptions) *App { Backend: app.backend, }) - handler := server.NewMicrosubHandler(app.backend) + handler, broker := server.NewMicrosubHandler(app.backend) if options.AuthEnabled { handler = WithAuth(handler, app.backend) } + app.backend.broker = broker + http.Handle("/microsub", handler) http.Handle("/incoming/", &incomingHandler{ diff --git a/cmd/eksterd/memory.go b/cmd/eksterd/memory.go index 8e934b5..e778ccc 100644 --- a/cmd/eksterd/memory.go +++ b/cmd/eksterd/memory.go @@ -35,6 +35,7 @@ import ( "p83.nl/go/ekster/pkg/auth" "p83.nl/go/ekster/pkg/fetch" "p83.nl/go/ekster/pkg/microsub" + "p83.nl/go/ekster/pkg/server" "p83.nl/go/ekster/pkg/util" "github.com/gomodule/redigo/redis" @@ -59,7 +60,7 @@ type memoryBackend struct { ticker *time.Ticker quit chan struct{} - listeners []microsub.EventListener + broker *server.Broker } type channelSetting struct { @@ -646,7 +647,12 @@ func matchItemText(item microsub.Item, re *regexp.Regexp) bool { func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) error { timelineBackend := b.getTimeline(channel) - return timelineBackend.AddItem(item) + err := timelineBackend.AddItem(item) + + // Sent message to Server-Sent-Events + b.broker.Notifier <- server.Message{Event: "new item", Object: item} + + return err } func (b *memoryBackend) updateChannelUnreadCount(channel string) error { @@ -717,17 +723,6 @@ func Fetch2(fetchURL string) (*http.Response, error) { return cachedResp, err } -func (b *memoryBackend) sendMessage(msg microsub.Message) { - for _, l := range b.listeners { - l.WriteMessage(microsub.Event{Msg: msg}) - } -} - -func (b *memoryBackend) AddEventListener(el microsub.EventListener) error { - b.listeners = append(b.listeners, el) - return nil -} - func (b *memoryBackend) createChannel(name string) microsub.Channel { uid := fmt.Sprintf("%012d", b.NextUid) channel := microsub.Channel{ diff --git a/pkg/microsub/protocol.go b/pkg/microsub/protocol.go index e379f8b..f62ec18 100644 --- a/pkg/microsub/protocol.go +++ b/pkg/microsub/protocol.go @@ -145,8 +145,6 @@ type Microsub interface { Search(query string) ([]Feed, error) PreviewURL(url string) (Timeline, error) - - AddEventListener(el EventListener) error } func (unread Unread) MarshalJSON() ([]byte, error) { diff --git a/pkg/server/events.go b/pkg/server/events.go index ad846d4..b8d5d47 100644 --- a/pkg/server/events.go +++ b/pkg/server/events.go @@ -1,51 +1,74 @@ package server import ( - "encoding/json" - "fmt" - "net" - "time" - - "p83.nl/go/ekster/pkg/microsub" + "log" ) -type Consumer struct { - conn net.Conn - output chan microsub.Message +// A MessageChan is a channel of channels +// Each connection sends a channel of bytes to a global MessageChan +// The main broker listen() loop listens on new connections on MessageChan +// New event messages are broadcast to all registered connection channels +type MessageChan chan Message + +type Message struct { + Event string + Object interface{} } -func newConsumer(conn net.Conn) *Consumer { - cons := &Consumer{conn, make(chan microsub.Message)} +// A Broker holds open client connections, +// listens for incoming events on its Notifier channel +// and broadcast event data to all registered connections +type Broker struct { + // Events are pushed to this channel by the main UDP daemon + Notifier chan Message - fmt.Fprint(conn, "HTTP/1.0 200 OK\r\n") - fmt.Fprint(conn, "Content-Type: text/event-stream\r\n") - fmt.Fprint(conn, "Access-Control-Allow-Origin: *\r\n") - fmt.Fprint(conn, "\r\n") + // New client connections + newClients chan MessageChan - go func() { - ticker := time.NewTicker(10 * time.Second).C - for { - select { - case <-ticker: - fmt.Fprint(conn, `event: ping`) - fmt.Fprint(conn, "\r\n") - fmt.Fprint(conn, "\r\n") + // Closed client connections + closingClients chan MessageChan - case msg := <-cons.output: - fmt.Fprint(conn, `event: message`) - fmt.Fprint(conn, "\r\n") - fmt.Fprint(conn, `data:`) - json.NewEncoder(conn).Encode(msg) - fmt.Fprint(conn, "\r\n") - fmt.Fprint(conn, "\r\n") + // Client connections registry + clients map[MessageChan]bool +} + +// Listen on different channels and act accordingly +func (broker *Broker) listen() { + for { + select { + case s := <-broker.newClients: + // A new client has connected. + // Register their message channel + broker.clients[s] = true + log.Printf("Client added. %d registered clients", len(broker.clients)) + case s := <-broker.closingClients: + // A client has detached and we want to + // stop sending them messages. + delete(broker.clients, s) + log.Printf("Removed client. %d registered clients", len(broker.clients)) + case event := <-broker.Notifier: + // We got a new event from the outside! + // Send event to all connected clients + for clientMessageChan := range broker.clients { + clientMessageChan <- event } } - conn.Close() - }() + } - return cons } -func (cons *Consumer) WriteMessage(evt microsub.Event) { - cons.output <- evt.Msg +// Broker factory +func NewServer() (broker *Broker) { + // Instantiate a broker + broker = &Broker{ + Notifier: make(chan Message, 1), + newClients: make(chan MessageChan), + closingClients: make(chan MessageChan), + clients: make(map[MessageChan]bool), + } + + // Set it running - listening and broadcasting events + go broker.listen() + + return } diff --git a/pkg/server/microsub.go b/pkg/server/microsub.go index 72a7d49..f8673fd 100644 --- a/pkg/server/microsub.go +++ b/pkg/server/microsub.go @@ -37,6 +37,7 @@ const ( type microsubHandler struct { backend microsub.Microsub + Broker *Broker } func respondJSON(w http.ResponseWriter, value interface{}) { @@ -50,8 +51,9 @@ func respondJSON(w http.ResponseWriter, value interface{}) { } } -func NewMicrosubHandler(backend microsub.Microsub) http.Handler { - return µsubHandler{backend} +func NewMicrosubHandler(backend microsub.Microsub) (http.Handler, *Broker) { + broker := NewServer() + return µsubHandler{backend, broker}, broker } func (h *microsubHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -106,9 +108,56 @@ func (h *microsubHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { "items": following, }) } else if action == "events" { - conn, _, _ := w.(http.Hijacker).Hijack() - cons := newConsumer(conn) - h.backend.AddEventListener(cons) + // Make sure that the writer supports flushing. + // + flusher, ok := w.(http.Flusher) + + if !ok { + http.Error(w, "Streaming unsupported!", http.StatusInternalServerError) + return + } + + // Set the headers related to event streaming. + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("Access-Control-Allow-Origin", "*") + + // Each connection registers its own message channel with the Broker's connections registry + messageChan := make(MessageChan) + + // Signal the broker that we have a new connection + h.Broker.newClients <- messageChan + + // Remove this client from the map of connected clients + // when this handler exits. + defer func() { + h.Broker.closingClients <- messageChan + }() + + // Listen to connection close and un-register messageChan + notify := w.(http.CloseNotifier).CloseNotify() + + go func() { + <-notify + h.Broker.closingClients <- messageChan + }() + + // block waiting or messages broadcast on this connection's messageChan + for { + // Write to the ResponseWriter, Server Sent Events compatible + message := <-messageChan + output, err := json.Marshal(message.Object) + if err != nil { + log.Println(err) + continue + } + fmt.Fprintf(w, "event: %s\n", message.Event) + fmt.Fprintf(w, "data: %s\n\n", output) + + // Flush the data immediately instead of buffering it for later. + flusher.Flush() + } } else { http.Error(w, fmt.Sprintf("unknown action %s\n", action), 400) return