diff --git a/pkg/client/requests.go b/pkg/client/requests.go index 1204f44..743c92d 100644 --- a/pkg/client/requests.go +++ b/pkg/client/requests.go @@ -10,7 +10,6 @@ import ( "net/url" "strings" - "github.com/pkg/errors" "p83.nl/go/ekster/pkg/microsub" "p83.nl/go/ekster/pkg/sse" ) @@ -355,14 +354,31 @@ func (c *Client) MarkRead(channel string, uids []string) error { // Events open an event channel to the server. func (c *Client) Events() (chan sse.Message, error) { - res, err := c.microsubGetRequest("events", nil) - if err != nil { - return nil, err - } - ch, err := sse.Reader(res.Body) - if err != nil { - return nil, errors.Wrap(err, "could not create reader") - } + + ch := make(chan sse.Message) + + errorCounter := 0 + go func() { + for { + res, err := c.microsubGetRequest("events", nil) + if err != nil { + log.Printf("could not request events: %s", err) + errorCounter++ + if errorCounter > 5 { + break + } + continue + } + + err = sse.Reader(res.Body, ch) + if err != nil { + log.Printf("could not create reader: %s", err) + break + } + } + + close(ch) + }() return ch, nil } diff --git a/pkg/sse/events.go b/pkg/sse/events.go index a14bdc8..cd654d5 100644 --- a/pkg/sse/events.go +++ b/pkg/sse/events.go @@ -99,6 +99,10 @@ func StartConnection(broker *Broker) (MessageChan, error) { return messageChan, nil } +type welcomeMessage struct { + Version string `json:"version"` +} + // WriteMessages writes SSE formatted messages to the writer func WriteMessages(w http.ResponseWriter, messageChan chan Message) error { // Make sure that the writer supports flushing. @@ -113,11 +117,19 @@ func WriteMessages(w http.ResponseWriter, messageChan chan Message) error { w.Header().Set("Connection", "keep-alive") w.Header().Set("Access-Control-Allow-Origin", "*") - _, err := fmt.Fprintf(w, "event: welcome\r\n") + _, err := fmt.Fprintf(w, "event: started\r\n") if err != nil { return err } - _, err = fmt.Fprintf(w, "data: {\"key\":\"hello world\"}\r\n\r\n") + + var welcomeMsg welcomeMessage + welcomeMsg.Version = "1.0.0" + encoded, err := json.Marshal(&welcomeMsg) + if err != nil { + return errors.Wrap(err, "could not encode welcome message") + } + + _, err = fmt.Fprintf(w, "data: %s", encoded) if err != nil { return err } @@ -148,32 +160,28 @@ func WriteMessages(w http.ResponseWriter, messageChan chan Message) error { } // Reader returns a channel that contains parsed SSE messages. -func Reader(body io.ReadCloser) (MessageChan, error) { - ch := make(MessageChan) - +func Reader(body io.ReadCloser, ch MessageChan) error { r := bufio.NewScanner(body) var msg Message - go func() { - for r.Scan() { - line := r.Text() - if line == "" { - ch <- msg - msg = Message{} - continue - } - if strings.HasPrefix(line, "event: ") { - line = line[len("event: "):] - msg.Event = line - } - if strings.HasPrefix(line, "data: ") { - line = line[len("data: "):] - msg.Data = line - } - } - if err := r.Err(); err != nil { - log.Printf("could not scanner lines from sse events: %+v", err) - } - }() - return ch, nil + for r.Scan() { + line := r.Text() + if line == "" { + ch <- msg + msg = Message{} + continue + } + if strings.HasPrefix(line, "event: ") { + line = line[len("event: "):] + msg.Event = line + } + if strings.HasPrefix(line, "data: ") { + line = line[len("data: "):] + msg.Data = line + } + } + if err := r.Err(); err != nil { + return errors.Wrap(err, "could not scan lines from sse events: %+v") + } + return nil }