ekster/pkg/sse/events.go

215 lines
5.5 KiB
Go
Raw Normal View History

/*
* Ekster is a microsub server
* Copyright (c) 2021 The Ekster authors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package sse
import (
"bufio"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"strings"
2019-08-18 09:52:03 +00:00
"time"
"github.com/pkg/errors"
)
// A MessageChan is a channel of channels
// Each connection sends a channel of bytes to a global MessageChan
// The main broker listen() loop listens on new connections on MessageChan
// New event messages are broadcast to all registered connection channels
type MessageChan chan Message
// Message is a message.
type Message struct {
Event string
Data string
Object interface{}
}
2019-08-18 09:52:03 +00:00
type pingMessage struct {
PingCount int `json:"ping"`
}
// Broker holds open client connections,
// listens for incoming events on its Notifier channel
// and broadcast event data to all registered connections
type Broker struct {
// Events are pushed to this channel by the main UDP daemon
Notifier chan Message
// New client connections
newClients chan MessageChan
// Closed client connections
closingClients chan MessageChan
// Client connections registry
clients map[MessageChan]bool
}
// Listen on different channels and act accordingly
func (broker *Broker) listen() {
2019-08-18 09:52:03 +00:00
ticker := time.NewTicker(10 * time.Second)
pingCount := 0
for {
select {
2019-08-18 09:52:03 +00:00
case <-ticker.C:
broker.Notifier <- Message{
Event: "ping",
Object: pingMessage{PingCount: pingCount},
}
pingCount++
case s := <-broker.newClients:
// A new client has connected.
// Register their message channel
broker.clients[s] = true
log.Printf("Client added. %d registered clients", len(broker.clients))
case s := <-broker.closingClients:
// A client has detached and we want to
// stop sending them messages.
delete(broker.clients, s)
log.Printf("Removed client. %d registered clients", len(broker.clients))
case event := <-broker.Notifier:
// We got a new event from the outside!
// Send event to all connected clients
for clientMessageChan := range broker.clients {
clientMessageChan <- event
}
}
}
}
// NewBroker creates a Broker.
func NewBroker() (broker *Broker) {
// Instantiate a broker
broker = &Broker{
Notifier: make(chan Message, 1),
newClients: make(chan MessageChan),
closingClients: make(chan MessageChan),
clients: make(map[MessageChan]bool),
}
// Set it running - listening and broadcasting events
go broker.listen()
return
}
// CloseClient closes the client channel
func (broker *Broker) CloseClient(ch MessageChan) {
broker.closingClients <- ch
}
// StartConnection starts a SSE connection, based on an existing HTTP connection.
func StartConnection(broker *Broker) (MessageChan, error) {
// Each connection registers its own message channel with the Broker's connections registry
messageChan := make(MessageChan)
// Signal the broker that we have a new connection
broker.newClients <- messageChan
return messageChan, nil
}
2019-03-24 15:39:07 +00:00
type welcomeMessage struct {
Version string `json:"version"`
}
// WriteMessages writes SSE formatted messages to the writer
func WriteMessages(w http.ResponseWriter, messageChan chan Message) error {
// Make sure that the writer supports flushing.
flusher, ok := w.(http.Flusher)
if !ok {
return fmt.Errorf("streaming unsupported")
}
// Set the headers related to event streaming.
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
2019-03-24 15:39:07 +00:00
var welcomeMsg welcomeMessage
welcomeMsg.Version = "1.0.0"
encoded, err := json.Marshal(&welcomeMsg)
if err != nil {
return errors.Wrap(err, "could not encode welcome message")
}
2019-08-17 20:13:41 +00:00
messageID := 1
2019-03-24 15:41:13 +00:00
2019-08-17 20:13:41 +00:00
_, err = fmt.Fprintf(w, "event: started\r\nid: %d\r\ndata: %s\r\n\r\n", messageID, encoded)
if err != nil {
return err
}
2019-08-17 20:13:41 +00:00
messageID++
flusher.Flush()
// block waiting or messages broadcast on this connection's messageChan
for message := range messageChan {
// Write to the ResponseWriter, Server Sent Events compatible
output, err := json.Marshal(message.Object)
if err != nil {
return errors.Wrap(err, "could not marshal message data")
}
2019-08-17 20:13:41 +00:00
_, err = fmt.Fprintf(w, "event: %s\r\nid: %d\r\ndata: %s\r\n\r\n", message.Event, messageID, output)
if err != nil {
2019-08-17 20:13:41 +00:00
return errors.Wrap(err, "could not write message")
}
2019-08-17 20:13:41 +00:00
messageID++
flusher.Flush()
}
return nil
}
// Reader returns a channel that contains parsed SSE messages.
2019-03-24 15:39:07 +00:00
func Reader(body io.ReadCloser, ch MessageChan) error {
r := bufio.NewScanner(body)
var msg Message
2019-03-24 15:39:07 +00:00
for r.Scan() {
line := r.Text()
if line == "" {
ch <- msg
msg = Message{}
continue
}
2019-03-24 15:39:07 +00:00
if strings.HasPrefix(line, "event: ") {
line = line[len("event: "):]
msg.Event = line
}
2019-03-24 15:39:07 +00:00
if strings.HasPrefix(line, "data: ") {
line = line[len("data: "):]
msg.Data = line
}
}
if err := r.Err(); err != nil {
2019-03-24 15:41:13 +00:00
return errors.Wrap(err, "could not scan lines from sse events")
2019-03-24 15:39:07 +00:00
}
return nil
}