parent
bd05576f1c
commit
a64a4e1541
|
@ -52,6 +52,8 @@ type memoryBackend struct {
|
|||
|
||||
ticker *time.Ticker
|
||||
quit chan struct{}
|
||||
|
||||
listeners []microsub.EventListener
|
||||
}
|
||||
|
||||
type channelSetting struct {
|
||||
|
@ -738,6 +740,8 @@ func (b *memoryBackend) channelAddItem(conn redis.Conn, channel string, item mic
|
|||
return fmt.Errorf("error while zadding item %s to channel %s for redis: %v", itemKey, zchannelKey, err)
|
||||
}
|
||||
|
||||
b.sendMessage(microsub.Message("item added " + item.ID))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -808,3 +812,14 @@ func Fetch2(fetchURL string) (*http.Response, error) {
|
|||
cachedResp, err := http.ReadResponse(bufio.NewReader(bytes.NewReader(cachedCopy)), req)
|
||||
return cachedResp, err
|
||||
}
|
||||
|
||||
func (b *memoryBackend) sendMessage(msg microsub.Message) {
|
||||
for _, l := range b.listeners {
|
||||
l.WriteMessage(microsub.Event{msg})
|
||||
}
|
||||
}
|
||||
|
||||
func (b *memoryBackend) AddEventListener(el microsub.EventListener) error {
|
||||
b.listeners = append(b.listeners, el)
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -23,7 +23,6 @@ import (
|
|||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"p83.nl/go/ekster/pkg/microsub"
|
||||
|
||||
|
@ -135,36 +134,9 @@ func (h *microsubHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
} else if action == "events" {
|
||||
//w.Header().Add("Content-Type", "text/event-stream")
|
||||
|
||||
c := make(chan string)
|
||||
go func() {
|
||||
c <- "test"
|
||||
time.Sleep(10 * time.Second)
|
||||
c <- "test"
|
||||
time.Sleep(10 * time.Second)
|
||||
c <- "test"
|
||||
time.Sleep(10 * time.Second)
|
||||
c <- "test"
|
||||
time.Sleep(10 * time.Second)
|
||||
c <- "end"
|
||||
}()
|
||||
|
||||
conn, _, _ := w.(http.Hijacker).Hijack()
|
||||
fmt.Fprint(conn, "HTTP/1.0 200 OK\r\n")
|
||||
fmt.Fprint(conn, "Content-Type: text/event-stream\r\n")
|
||||
fmt.Fprint(conn, "Access-Control-Allow-Origin: *\r\n")
|
||||
fmt.Fprint(conn, "\r\n")
|
||||
go func() {
|
||||
for t := range c {
|
||||
fmt.Fprint(conn, `event: ping`)
|
||||
fmt.Fprint(conn, "\r\n")
|
||||
fmt.Fprintf(conn, `data: %s`, t)
|
||||
fmt.Fprint(conn, "\r\n")
|
||||
fmt.Fprint(conn, "\r\n")
|
||||
}
|
||||
conn.Close()
|
||||
}()
|
||||
cons := newConsumer(conn)
|
||||
h.Backend.AddEventListener(cons)
|
||||
} else {
|
||||
http.Error(w, fmt.Sprintf("unknown action %s\n", action), 500)
|
||||
return
|
||||
|
|
|
@ -100,6 +100,16 @@ type Feed struct {
|
|||
Author Card `json:"author,omitempty"`
|
||||
}
|
||||
|
||||
type Message string
|
||||
|
||||
type Event struct {
|
||||
Msg Message
|
||||
}
|
||||
|
||||
type EventListener interface {
|
||||
WriteMessage(evt Event)
|
||||
}
|
||||
|
||||
// Microsub is the main protocol that should be implemented by a backend
|
||||
type Microsub interface {
|
||||
ChannelsGetList() ([]Channel, error)
|
||||
|
@ -118,4 +128,6 @@ type Microsub interface {
|
|||
|
||||
Search(query string) ([]Feed, error)
|
||||
PreviewURL(url string) (Timeline, error)
|
||||
|
||||
AddEventListener(el EventListener) error
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue
Block a user