Split of StartConnection code from microsub server
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Peter Stuifzand 2019-03-24 15:15:55 +01:00
parent 0f9e3043ef
commit 1e00d32aed
Signed by: peter
GPG Key ID: 374322D56E5209E8
4 changed files with 150 additions and 135 deletions

View File

@ -18,7 +18,7 @@ import (
"p83.nl/go/ekster/pkg/auth" "p83.nl/go/ekster/pkg/auth"
"p83.nl/go/ekster/pkg/fetch" "p83.nl/go/ekster/pkg/fetch"
"p83.nl/go/ekster/pkg/microsub" "p83.nl/go/ekster/pkg/microsub"
"p83.nl/go/ekster/pkg/server" "p83.nl/go/ekster/pkg/sse"
"p83.nl/go/ekster/pkg/timeline" "p83.nl/go/ekster/pkg/timeline"
"p83.nl/go/ekster/pkg/util" "p83.nl/go/ekster/pkg/util"
@ -45,7 +45,7 @@ type memoryBackend struct {
ticker *time.Ticker ticker *time.Ticker
quit chan struct{} quit chan struct{}
broker *server.Broker broker *sse.Broker
pool *redis.Pool pool *redis.Pool
} }
@ -636,7 +636,7 @@ func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) error
err := timelineBackend.AddItem(item) err := timelineBackend.AddItem(item)
// Sent message to Server-Sent-Events // Sent message to Server-Sent-Events
b.broker.Notifier <- server.Message{Event: "new item", Object: item} b.broker.Notifier <- sse.Message{Event: "new item", Object: item}
return err return err
} }

View File

@ -1,74 +0,0 @@
package server
import (
"log"
)
// A MessageChan is a channel of channels
// Each connection sends a channel of bytes to a global MessageChan
// The main broker listen() loop listens on new connections on MessageChan
// New event messages are broadcast to all registered connection channels
type MessageChan chan Message
type Message struct {
Event string
Object interface{}
}
// A Broker holds open client connections,
// listens for incoming events on its Notifier channel
// and broadcast event data to all registered connections
type Broker struct {
// Events are pushed to this channel by the main UDP daemon
Notifier chan Message
// New client connections
newClients chan MessageChan
// Closed client connections
closingClients chan MessageChan
// Client connections registry
clients map[MessageChan]bool
}
// Listen on different channels and act accordingly
func (broker *Broker) listen() {
for {
select {
case s := <-broker.newClients:
// A new client has connected.
// Register their message channel
broker.clients[s] = true
log.Printf("Client added. %d registered clients", len(broker.clients))
case s := <-broker.closingClients:
// A client has detached and we want to
// stop sending them messages.
delete(broker.clients, s)
log.Printf("Removed client. %d registered clients", len(broker.clients))
case event := <-broker.Notifier:
// We got a new event from the outside!
// Send event to all connected clients
for clientMessageChan := range broker.clients {
clientMessageChan <- event
}
}
}
}
// Broker factory
func NewServer() (broker *Broker) {
// Instantiate a broker
broker = &Broker{
Notifier: make(chan Message, 1),
newClients: make(chan MessageChan),
closingClients: make(chan MessageChan),
clients: make(map[MessageChan]bool),
}
// Set it running - listening and broadcasting events
go broker.listen()
return
}

View File

@ -7,11 +7,11 @@ package server
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"log"
"net/http" "net/http"
"regexp" "regexp"
"p83.nl/go/ekster/pkg/microsub" "p83.nl/go/ekster/pkg/microsub"
"p83.nl/go/ekster/pkg/sse"
) )
var ( var (
@ -25,7 +25,7 @@ const (
type microsubHandler struct { type microsubHandler struct {
backend microsub.Microsub backend microsub.Microsub
Broker *Broker Broker *sse.Broker
} }
func respondJSON(w http.ResponseWriter, value interface{}) { func respondJSON(w http.ResponseWriter, value interface{}) {
@ -41,8 +41,8 @@ func respondJSON(w http.ResponseWriter, value interface{}) {
// NewMicrosubHandler is the main entry point for the microsub server // NewMicrosubHandler is the main entry point for the microsub server
// It returns a handler for HTTP and a broker that will send events. // It returns a handler for HTTP and a broker that will send events.
func NewMicrosubHandler(backend microsub.Microsub) (http.Handler, *Broker) { func NewMicrosubHandler(backend microsub.Microsub) (http.Handler, *sse.Broker) {
broker := NewServer() broker := sse.NewBroker()
return &microsubHandler{backend, broker}, broker return &microsubHandler{backend, broker}, broker
} }
@ -99,60 +99,9 @@ func (h *microsubHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
"items": following, "items": following,
}) })
} else if action == "events" { } else if action == "events" {
// Make sure that the writer supports flushing. err := sse.StartConnection(h.Broker, w)
// if err != nil {
flusher, ok := w.(http.Flusher) http.Error(w, "could not start sse connection", 500)
if !ok {
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
return
}
// Set the headers related to event streaming.
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
fmt.Fprintf(w, "event: welcome\r\n")
fmt.Fprintf(w, "data: {\"key\":\"hello world\"}\r\n\r\n")
flusher.Flush()
// Each connection registers its own message channel with the Broker's connections registry
messageChan := make(MessageChan)
// Signal the broker that we have a new connection
h.Broker.newClients <- messageChan
// Remove this client from the map of connected clients
// when this handler exits.
defer func() {
h.Broker.closingClients <- messageChan
}()
// Listen to connection close and un-register messageChan
notify := w.(http.CloseNotifier).CloseNotify()
go func() {
<-notify
h.Broker.closingClients <- messageChan
}()
// block waiting or messages broadcast on this connection's messageChan
for {
// Write to the ResponseWriter, Server Sent Events compatible
message := <-messageChan
output, err := json.Marshal(message.Object)
if err != nil {
log.Println(err)
continue
}
fmt.Fprintf(w, "event: %s\n", message.Event)
fmt.Fprintf(w, "data: %s\n\n", output)
// Flush the data immediately instead of buffering it for later.
flusher.Flush()
} }
} else { } else {
http.Error(w, fmt.Sprintf("unknown action %s\n", action), 400) http.Error(w, fmt.Sprintf("unknown action %s\n", action), 400)

140
pkg/sse/events.go Normal file
View File

@ -0,0 +1,140 @@
package sse
import (
"encoding/json"
"fmt"
"log"
"net/http"
)
// A MessageChan is a channel of channels
// Each connection sends a channel of bytes to a global MessageChan
// The main broker listen() loop listens on new connections on MessageChan
// New event messages are broadcast to all registered connection channels
type MessageChan chan Message
// Message is a message.
type Message struct {
Event string
Object interface{}
}
// Broker holds open client connections,
// listens for incoming events on its Notifier channel
// and broadcast event data to all registered connections
type Broker struct {
// Events are pushed to this channel by the main UDP daemon
Notifier chan Message
// New client connections
newClients chan MessageChan
// Closed client connections
closingClients chan MessageChan
// Client connections registry
clients map[MessageChan]bool
}
// Listen on different channels and act accordingly
func (broker *Broker) listen() {
for {
select {
case s := <-broker.newClients:
// A new client has connected.
// Register their message channel
broker.clients[s] = true
log.Printf("Client added. %d registered clients", len(broker.clients))
case s := <-broker.closingClients:
// A client has detached and we want to
// stop sending them messages.
delete(broker.clients, s)
log.Printf("Removed client. %d registered clients", len(broker.clients))
case event := <-broker.Notifier:
// We got a new event from the outside!
// Send event to all connected clients
for clientMessageChan := range broker.clients {
clientMessageChan <- event
}
}
}
}
// NewBroker creates a Broker.
func NewBroker() (broker *Broker) {
// Instantiate a broker
broker = &Broker{
Notifier: make(chan Message, 1),
newClients: make(chan MessageChan),
closingClients: make(chan MessageChan),
clients: make(map[MessageChan]bool),
}
// Set it running - listening and broadcasting events
go broker.listen()
return
}
// StartConnection starts a SSE connection, based on an existing HTTP connection.
func StartConnection(broker *Broker, w http.ResponseWriter) error {
// Make sure that the writer supports flushing.
flusher, ok := w.(http.Flusher)
if !ok {
return fmt.Errorf("streaming unsupported")
}
// Set the headers related to event streaming.
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
_, err := fmt.Fprintf(w, "event: welcome\r\n")
if err != nil {
return err
}
_, err = fmt.Fprintf(w, "data: {\"key\":\"hello world\"}\r\n\r\n")
if err != nil {
return err
}
flusher.Flush()
// Each connection registers its own message channel with the Broker's connections registry
messageChan := make(MessageChan)
// Signal the broker that we have a new connection
broker.newClients <- messageChan
// Remove this client from the map of connected clients
// when this handler exits.
defer func() {
broker.closingClients <- messageChan
}()
// Listen to connection close and un-register messageChan
notify := w.(http.CloseNotifier).CloseNotify()
go func() {
<-notify
broker.closingClients <- messageChan
}()
// block waiting or messages broadcast on this connection's messageChan
for {
// Write to the ResponseWriter, Server Sent Events compatible
message := <-messageChan
output, err := json.Marshal(message.Object)
if err != nil {
log.Println(err)
continue
}
fmt.Fprintf(w, "event: %s\n", message.Event)
fmt.Fprintf(w, "data: %s\n\n", output)
// Flush the data immediately instead of buffering it for later.
flusher.Flush()
}
}