Improve event handling
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Peter Stuifzand 2019-03-24 16:39:07 +01:00
parent 15d7c69c30
commit 2971f45d7d
Signed by: peter
GPG Key ID: 374322D56E5209E8
2 changed files with 60 additions and 36 deletions

View File

@ -10,7 +10,6 @@ import (
"net/url" "net/url"
"strings" "strings"
"github.com/pkg/errors"
"p83.nl/go/ekster/pkg/microsub" "p83.nl/go/ekster/pkg/microsub"
"p83.nl/go/ekster/pkg/sse" "p83.nl/go/ekster/pkg/sse"
) )
@ -355,14 +354,31 @@ func (c *Client) MarkRead(channel string, uids []string) error {
// Events open an event channel to the server. // Events open an event channel to the server.
func (c *Client) Events() (chan sse.Message, error) { func (c *Client) Events() (chan sse.Message, error) {
res, err := c.microsubGetRequest("events", nil)
if err != nil { ch := make(chan sse.Message)
return nil, err
} errorCounter := 0
ch, err := sse.Reader(res.Body) go func() {
if err != nil { for {
return nil, errors.Wrap(err, "could not create reader") res, err := c.microsubGetRequest("events", nil)
} if err != nil {
log.Printf("could not request events: %s", err)
errorCounter++
if errorCounter > 5 {
break
}
continue
}
err = sse.Reader(res.Body, ch)
if err != nil {
log.Printf("could not create reader: %s", err)
break
}
}
close(ch)
}()
return ch, nil return ch, nil
} }

View File

@ -99,6 +99,10 @@ func StartConnection(broker *Broker) (MessageChan, error) {
return messageChan, nil return messageChan, nil
} }
type welcomeMessage struct {
Version string `json:"version"`
}
// WriteMessages writes SSE formatted messages to the writer // WriteMessages writes SSE formatted messages to the writer
func WriteMessages(w http.ResponseWriter, messageChan chan Message) error { func WriteMessages(w http.ResponseWriter, messageChan chan Message) error {
// Make sure that the writer supports flushing. // Make sure that the writer supports flushing.
@ -113,11 +117,19 @@ func WriteMessages(w http.ResponseWriter, messageChan chan Message) error {
w.Header().Set("Connection", "keep-alive") w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Origin", "*")
_, err := fmt.Fprintf(w, "event: welcome\r\n") _, err := fmt.Fprintf(w, "event: started\r\n")
if err != nil { if err != nil {
return err return err
} }
_, err = fmt.Fprintf(w, "data: {\"key\":\"hello world\"}\r\n\r\n")
var welcomeMsg welcomeMessage
welcomeMsg.Version = "1.0.0"
encoded, err := json.Marshal(&welcomeMsg)
if err != nil {
return errors.Wrap(err, "could not encode welcome message")
}
_, err = fmt.Fprintf(w, "data: %s", encoded)
if err != nil { if err != nil {
return err return err
} }
@ -148,32 +160,28 @@ func WriteMessages(w http.ResponseWriter, messageChan chan Message) error {
} }
// Reader returns a channel that contains parsed SSE messages. // Reader returns a channel that contains parsed SSE messages.
func Reader(body io.ReadCloser) (MessageChan, error) { func Reader(body io.ReadCloser, ch MessageChan) error {
ch := make(MessageChan)
r := bufio.NewScanner(body) r := bufio.NewScanner(body)
var msg Message var msg Message
go func() {
for r.Scan() {
line := r.Text()
if line == "" {
ch <- msg
msg = Message{}
continue
}
if strings.HasPrefix(line, "event: ") {
line = line[len("event: "):]
msg.Event = line
}
if strings.HasPrefix(line, "data: ") {
line = line[len("data: "):]
msg.Data = line
}
}
if err := r.Err(); err != nil {
log.Printf("could not scanner lines from sse events: %+v", err)
}
}()
return ch, nil for r.Scan() {
line := r.Text()
if line == "" {
ch <- msg
msg = Message{}
continue
}
if strings.HasPrefix(line, "event: ") {
line = line[len("event: "):]
msg.Event = line
}
if strings.HasPrefix(line, "data: ") {
line = line[len("data: "):]
msg.Data = line
}
}
if err := r.Err(); err != nil {
return errors.Wrap(err, "could not scan lines from sse events: %+v")
}
return nil
} }