Use server-sent-events implementation from thoughtbot
All checks were successful
the build was successful

New endpoint action "action=events" opens an SSE channel.

Implementation based on https://thoughtbot.com/blog/writing-a-server-sent-events-server-in-go
This commit is contained in:
Peter Stuifzand 2019-02-18 21:37:19 +01:00
parent 4ebe8f43fe
commit 83956b4d9f
Signed by: peter
GPG Key ID: 374322D56E5209E8
5 changed files with 123 additions and 56 deletions

View File

@ -116,11 +116,13 @@ func NewApp(options AppOptions) *App {
Backend: app.backend,
})
handler := server.NewMicrosubHandler(app.backend)
handler, broker := server.NewMicrosubHandler(app.backend)
if options.AuthEnabled {
handler = WithAuth(handler, app.backend)
}
app.backend.broker = broker
http.Handle("/microsub", handler)
http.Handle("/incoming/", &incomingHandler{

View File

@ -35,6 +35,7 @@ import (
"p83.nl/go/ekster/pkg/auth"
"p83.nl/go/ekster/pkg/fetch"
"p83.nl/go/ekster/pkg/microsub"
"p83.nl/go/ekster/pkg/server"
"p83.nl/go/ekster/pkg/util"
"github.com/gomodule/redigo/redis"
@ -59,7 +60,7 @@ type memoryBackend struct {
ticker *time.Ticker
quit chan struct{}
listeners []microsub.EventListener
broker *server.Broker
}
type channelSetting struct {
@ -646,7 +647,12 @@ func matchItemText(item microsub.Item, re *regexp.Regexp) bool {
func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) error {
timelineBackend := b.getTimeline(channel)
return timelineBackend.AddItem(item)
err := timelineBackend.AddItem(item)
// Sent message to Server-Sent-Events
b.broker.Notifier <- server.Message{Event: "new item", Object: item}
return err
}
func (b *memoryBackend) updateChannelUnreadCount(channel string) error {
@ -717,17 +723,6 @@ func Fetch2(fetchURL string) (*http.Response, error) {
return cachedResp, err
}
func (b *memoryBackend) sendMessage(msg microsub.Message) {
for _, l := range b.listeners {
l.WriteMessage(microsub.Event{Msg: msg})
}
}
func (b *memoryBackend) AddEventListener(el microsub.EventListener) error {
b.listeners = append(b.listeners, el)
return nil
}
func (b *memoryBackend) createChannel(name string) microsub.Channel {
uid := fmt.Sprintf("%012d", b.NextUid)
channel := microsub.Channel{

View File

@ -145,8 +145,6 @@ type Microsub interface {
Search(query string) ([]Feed, error)
PreviewURL(url string) (Timeline, error)
AddEventListener(el EventListener) error
}
func (unread Unread) MarshalJSON() ([]byte, error) {

View File

@ -1,51 +1,74 @@
package server
import (
"encoding/json"
"fmt"
"net"
"time"
"p83.nl/go/ekster/pkg/microsub"
"log"
)
type Consumer struct {
conn net.Conn
output chan microsub.Message
// A MessageChan is a channel of channels
// Each connection sends a channel of bytes to a global MessageChan
// The main broker listen() loop listens on new connections on MessageChan
// New event messages are broadcast to all registered connection channels
type MessageChan chan Message
type Message struct {
Event string
Object interface{}
}
func newConsumer(conn net.Conn) *Consumer {
cons := &Consumer{conn, make(chan microsub.Message)}
// A Broker holds open client connections,
// listens for incoming events on its Notifier channel
// and broadcast event data to all registered connections
type Broker struct {
// Events are pushed to this channel by the main UDP daemon
Notifier chan Message
fmt.Fprint(conn, "HTTP/1.0 200 OK\r\n")
fmt.Fprint(conn, "Content-Type: text/event-stream\r\n")
fmt.Fprint(conn, "Access-Control-Allow-Origin: *\r\n")
fmt.Fprint(conn, "\r\n")
// New client connections
newClients chan MessageChan
go func() {
ticker := time.NewTicker(10 * time.Second).C
for {
select {
case <-ticker:
fmt.Fprint(conn, `event: ping`)
fmt.Fprint(conn, "\r\n")
fmt.Fprint(conn, "\r\n")
// Closed client connections
closingClients chan MessageChan
case msg := <-cons.output:
fmt.Fprint(conn, `event: message`)
fmt.Fprint(conn, "\r\n")
fmt.Fprint(conn, `data:`)
json.NewEncoder(conn).Encode(msg)
fmt.Fprint(conn, "\r\n")
fmt.Fprint(conn, "\r\n")
// Client connections registry
clients map[MessageChan]bool
}
// Listen on different channels and act accordingly
func (broker *Broker) listen() {
for {
select {
case s := <-broker.newClients:
// A new client has connected.
// Register their message channel
broker.clients[s] = true
log.Printf("Client added. %d registered clients", len(broker.clients))
case s := <-broker.closingClients:
// A client has detached and we want to
// stop sending them messages.
delete(broker.clients, s)
log.Printf("Removed client. %d registered clients", len(broker.clients))
case event := <-broker.Notifier:
// We got a new event from the outside!
// Send event to all connected clients
for clientMessageChan := range broker.clients {
clientMessageChan <- event
}
}
conn.Close()
}()
}
return cons
}
func (cons *Consumer) WriteMessage(evt microsub.Event) {
cons.output <- evt.Msg
// Broker factory
func NewServer() (broker *Broker) {
// Instantiate a broker
broker = &Broker{
Notifier: make(chan Message, 1),
newClients: make(chan MessageChan),
closingClients: make(chan MessageChan),
clients: make(map[MessageChan]bool),
}
// Set it running - listening and broadcasting events
go broker.listen()
return
}

View File

@ -37,6 +37,7 @@ const (
type microsubHandler struct {
backend microsub.Microsub
Broker *Broker
}
func respondJSON(w http.ResponseWriter, value interface{}) {
@ -50,8 +51,9 @@ func respondJSON(w http.ResponseWriter, value interface{}) {
}
}
func NewMicrosubHandler(backend microsub.Microsub) http.Handler {
return &microsubHandler{backend}
func NewMicrosubHandler(backend microsub.Microsub) (http.Handler, *Broker) {
broker := NewServer()
return &microsubHandler{backend, broker}, broker
}
func (h *microsubHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@ -106,9 +108,56 @@ func (h *microsubHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
"items": following,
})
} else if action == "events" {
conn, _, _ := w.(http.Hijacker).Hijack()
cons := newConsumer(conn)
h.backend.AddEventListener(cons)
// Make sure that the writer supports flushing.
//
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
return
}
// Set the headers related to event streaming.
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
// Each connection registers its own message channel with the Broker's connections registry
messageChan := make(MessageChan)
// Signal the broker that we have a new connection
h.Broker.newClients <- messageChan
// Remove this client from the map of connected clients
// when this handler exits.
defer func() {
h.Broker.closingClients <- messageChan
}()
// Listen to connection close and un-register messageChan
notify := w.(http.CloseNotifier).CloseNotify()
go func() {
<-notify
h.Broker.closingClients <- messageChan
}()
// block waiting or messages broadcast on this connection's messageChan
for {
// Write to the ResponseWriter, Server Sent Events compatible
message := <-messageChan
output, err := json.Marshal(message.Object)
if err != nil {
log.Println(err)
continue
}
fmt.Fprintf(w, "event: %s\n", message.Event)
fmt.Fprintf(w, "data: %s\n\n", output)
// Flush the data immediately instead of buffering it for later.
flusher.Flush()
}
} else {
http.Error(w, fmt.Sprintf("unknown action %s\n", action), 400)
return