From a64a4e154166a25c762ce44ef232636b92ba2be4 Mon Sep 17 00:00:00 2001 From: Peter Stuifzand Date: Sat, 8 Sep 2018 19:56:54 +0200 Subject: [PATCH] Add events --- cmd/eksterd/memory.go | 15 +++++++++++++++ cmd/eksterd/microsub.go | 32 ++------------------------------ pkg/microsub/protocol.go | 12 ++++++++++++ 3 files changed, 29 insertions(+), 30 deletions(-) diff --git a/cmd/eksterd/memory.go b/cmd/eksterd/memory.go index 90b38a2..ffa6f0d 100644 --- a/cmd/eksterd/memory.go +++ b/cmd/eksterd/memory.go @@ -52,6 +52,8 @@ type memoryBackend struct { ticker *time.Ticker quit chan struct{} + + listeners []microsub.EventListener } type channelSetting struct { @@ -738,6 +740,8 @@ func (b *memoryBackend) channelAddItem(conn redis.Conn, channel string, item mic return fmt.Errorf("error while zadding item %s to channel %s for redis: %v", itemKey, zchannelKey, err) } + b.sendMessage(microsub.Message("item added " + item.ID)) + return nil } @@ -808,3 +812,14 @@ func Fetch2(fetchURL string) (*http.Response, error) { cachedResp, err := http.ReadResponse(bufio.NewReader(bytes.NewReader(cachedCopy)), req) return cachedResp, err } + +func (b *memoryBackend) sendMessage(msg microsub.Message) { + for _, l := range b.listeners { + l.WriteMessage(microsub.Event{msg}) + } +} + +func (b *memoryBackend) AddEventListener(el microsub.EventListener) error { + b.listeners = append(b.listeners, el) + return nil +} diff --git a/cmd/eksterd/microsub.go b/cmd/eksterd/microsub.go index 3937989..1af984b 100644 --- a/cmd/eksterd/microsub.go +++ b/cmd/eksterd/microsub.go @@ -23,7 +23,6 @@ import ( "log" "net/http" "os" - "time" "p83.nl/go/ekster/pkg/microsub" @@ -135,36 +134,9 @@ func (h *microsubHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } } else if action == "events" { - //w.Header().Add("Content-Type", "text/event-stream") - - c := make(chan string) - go func() { - c <- "test" - time.Sleep(10 * time.Second) - c <- "test" - time.Sleep(10 * time.Second) - c <- "test" - time.Sleep(10 * time.Second) - c <- "test" - time.Sleep(10 * time.Second) - c <- "end" - }() - conn, _, _ := w.(http.Hijacker).Hijack() - fmt.Fprint(conn, "HTTP/1.0 200 OK\r\n") - fmt.Fprint(conn, "Content-Type: text/event-stream\r\n") - fmt.Fprint(conn, "Access-Control-Allow-Origin: *\r\n") - fmt.Fprint(conn, "\r\n") - go func() { - for t := range c { - fmt.Fprint(conn, `event: ping`) - fmt.Fprint(conn, "\r\n") - fmt.Fprintf(conn, `data: %s`, t) - fmt.Fprint(conn, "\r\n") - fmt.Fprint(conn, "\r\n") - } - conn.Close() - }() + cons := newConsumer(conn) + h.Backend.AddEventListener(cons) } else { http.Error(w, fmt.Sprintf("unknown action %s\n", action), 500) return diff --git a/pkg/microsub/protocol.go b/pkg/microsub/protocol.go index 97d4f56..7536768 100644 --- a/pkg/microsub/protocol.go +++ b/pkg/microsub/protocol.go @@ -100,6 +100,16 @@ type Feed struct { Author Card `json:"author,omitempty"` } +type Message string + +type Event struct { + Msg Message +} + +type EventListener interface { + WriteMessage(evt Event) +} + // Microsub is the main protocol that should be implemented by a backend type Microsub interface { ChannelsGetList() ([]Channel, error) @@ -118,4 +128,6 @@ type Microsub interface { Search(query string) ([]Feed, error) PreviewURL(url string) (Timeline, error) + + AddEventListener(el EventListener) error }