From d2fc2ecd3c4a64ed03e22cbbc84fa289c8bc6694 Mon Sep 17 00:00:00 2001 From: Peter Stuifzand Date: Sat, 17 Aug 2019 16:55:59 +0200 Subject: [PATCH] Added boolean to show when item was added to channel --- cmd/eksterd/memory.go | 9 +++++++-- pkg/sse/events.go | 2 +- pkg/timeline/null.go | 4 ++-- pkg/timeline/redisset.go | 19 ++++++++----------- pkg/timeline/redisstreams.go | 6 +++--- pkg/timeline/timeline.go | 2 +- 6 files changed, 22 insertions(+), 20 deletions(-) diff --git a/cmd/eksterd/memory.go b/cmd/eksterd/memory.go index b5db83d..0ec1033 100644 --- a/cmd/eksterd/memory.go +++ b/cmd/eksterd/memory.go @@ -639,10 +639,12 @@ func matchItemText(item microsub.Item, re *regexp.Regexp) bool { func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) error { timelineBackend := b.getTimeline(channel) - err := timelineBackend.AddItem(item) + added, err := timelineBackend.AddItem(item) // Sent message to Server-Sent-Events - b.broker.Notifier <- sse.Message{Event: "new item", Object: item} + if added { + b.broker.Notifier <- sse.Message{Event: "new item", Object: item} + } return err } @@ -661,6 +663,9 @@ func (b *memoryBackend) updateChannelUnreadCount(channel string) error { defer b.save() c.Unread = microsub.Unread{Type: microsub.UnreadCount, UnreadCount: unread} + // Sent message to Server-Sent-Events + b.broker.Notifier <- sse.Message{Event: "new item in channel", Object: c} + b.lock.Lock() b.Channels[channel] = c b.lock.Unlock() diff --git a/pkg/sse/events.go b/pkg/sse/events.go index a4c5faa..136804c 100644 --- a/pkg/sse/events.go +++ b/pkg/sse/events.go @@ -129,7 +129,7 @@ func WriteMessages(w http.ResponseWriter, messageChan chan Message) error { return err } - _, err = fmt.Fprintf(w, "data: %s", encoded) + _, err = fmt.Fprintf(w, "data: %s\r\n\r\n", encoded) if err != nil { return err } diff --git a/pkg/timeline/null.go b/pkg/timeline/null.go index 409158c..f9f4fdd 100644 --- a/pkg/timeline/null.go +++ b/pkg/timeline/null.go @@ -14,8 +14,8 @@ func (timeline *nullTimeline) Items(before, after string) (microsub.Timeline, er return microsub.Timeline{Items: []microsub.Item{}}, nil } -func (timeline *nullTimeline) AddItem(item microsub.Item) error { - return nil +func (timeline *nullTimeline) AddItem(item microsub.Item) (bool, error) { + return false, nil } func (timeline *nullTimeline) Count() (int, error) { diff --git a/pkg/timeline/redisset.go b/pkg/timeline/redisset.go index c947226..9fb7e65 100644 --- a/pkg/timeline/redisset.go +++ b/pkg/timeline/redisset.go @@ -103,7 +103,7 @@ func (timeline *redisSortedSetTimeline) Items(before, after string) (microsub.Ti }, nil } -func (timeline *redisSortedSetTimeline) AddItem(item microsub.Item) error { +func (timeline *redisSortedSetTimeline) AddItem(item microsub.Item) (bool, error) { conn := timeline.pool.Get() defer conn.Close() @@ -116,7 +116,7 @@ func (timeline *redisSortedSetTimeline) AddItem(item microsub.Item) error { data, err := json.Marshal(item) if err != nil { - return fmt.Errorf("couldn't marshall item for redis: %s", err) + return false, fmt.Errorf("couldn't marshal item for redis: %s", err) } forRedis := redisItem{ @@ -129,33 +129,30 @@ func (timeline *redisSortedSetTimeline) AddItem(item microsub.Item) error { itemKey := fmt.Sprintf("item:%s", item.ID) _, err = redis.String(conn.Do("HMSET", redis.Args{}.Add(itemKey).AddFlat(&forRedis)...)) if err != nil { - return fmt.Errorf("writing failed for item to redis: %v", err) + return false, fmt.Errorf("writing failed for item to redis: %v", err) } readChannelKey := fmt.Sprintf("channel:%s:read", channel) isRead, err := redis.Bool(conn.Do("SISMEMBER", readChannelKey, itemKey)) if err != nil { - return err + return false, err } if isRead { - return nil + return false, nil } score, err := time.Parse(time.RFC3339, item.Published) if err != nil { - return fmt.Errorf("can't parse %s as time", item.Published) + return false, fmt.Errorf("can't parse %s as time", item.Published) } _, err = redis.Int64(conn.Do("ZADD", zchannelKey, score.Unix()*1.0, itemKey)) if err != nil { - return fmt.Errorf("zadding failed item %s to channel %s for redis: %v", itemKey, zchannelKey, err) + return false, fmt.Errorf("zadding failed item %s to channel %s for redis: %v", itemKey, zchannelKey, err) } - // FIXME: send message to events... - // b.sendMessage(microsub.Message("item added " + item.ID)) - - return nil + return true, nil } func (timeline *redisSortedSetTimeline) Count() (int, error) { diff --git a/pkg/timeline/redisstreams.go b/pkg/timeline/redisstreams.go index 523ef21..ce47051 100644 --- a/pkg/timeline/redisstreams.go +++ b/pkg/timeline/redisstreams.go @@ -70,7 +70,7 @@ func (timeline *redisStreamTimeline) Items(before, after string) (microsub.Timel }, nil } -func (timeline *redisStreamTimeline) AddItem(item microsub.Item) error { +func (timeline *redisStreamTimeline) AddItem(item microsub.Item) (bool, error) { conn := timeline.pool.Get() defer conn.Close() @@ -81,14 +81,14 @@ func (timeline *redisStreamTimeline) AddItem(item microsub.Item) error { data, err := json.Marshal(item) if err != nil { log.Printf("error while creating item for redis: %v\n", err) - return err + return false, err } args := redis.Args{}.Add(timeline.channelKey).Add("*").Add("ID").Add(item.ID).Add("Published").Add(item.Published).Add("Read").Add(item.Read).Add("Data").Add(data) _, err = redis.String(conn.Do("XADD", args...)) - return err + return err == nil, err } func (timeline *redisStreamTimeline) Count() (int, error) { diff --git a/pkg/timeline/timeline.go b/pkg/timeline/timeline.go index 202f8ad..04c261b 100644 --- a/pkg/timeline/timeline.go +++ b/pkg/timeline/timeline.go @@ -19,7 +19,7 @@ type Backend interface { Items(before, after string) (microsub.Timeline, error) Count() (int, error) - AddItem(item microsub.Item) error + AddItem(item microsub.Item) (bool, error) MarkRead(uids []string) error // Not used at the moment