From 723b5b24d56a6b2572c947a701e912c4efd743a4 Mon Sep 17 00:00:00 2001 From: Peter Stuifzand Date: Tue, 25 Dec 2018 13:47:20 +0100 Subject: [PATCH] Cleanup of interface - Rename GetItems -> Items - Add Count method - Move add item method to timeline.go - Make getTimeline method of memorybackend --- cmd/eksterd/memory.go | 60 ++++---------------------- cmd/eksterd/timeline.go | 93 ++++++++++++++++++++++++++++++++++++----- 2 files changed, 90 insertions(+), 63 deletions(-) diff --git a/cmd/eksterd/memory.go b/cmd/eksterd/memory.go index cb6b898..2afcd7d 100644 --- a/cmd/eksterd/memory.go +++ b/cmd/eksterd/memory.go @@ -313,9 +313,9 @@ func (b *memoryBackend) TimelineGet(before, after, channel string) (microsub.Tim return microsub.Timeline{Items: []microsub.Item{}}, err } - timelineBackend := GetTimeline("sorted-set", channel) + timelineBackend := b.getTimeline(channel) - return timelineBackend.GetItems(before, after) + return timelineBackend.Items(before, after) } func (b *memoryBackend) FollowGetList(uid string) ([]microsub.Feed, error) { @@ -635,54 +635,8 @@ func matchItemText(item microsub.Item, re *regexp.Regexp) bool { } func (b *memoryBackend) channelAddItem(conn redis.Conn, channel string, item microsub.Item) error { - zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel) - - if item.Published == "" { - item.Published = time.Now().Format(time.RFC3339) - } - - data, err := json.Marshal(item) - if err != nil { - log.Printf("error while creating item for redis: %v\n", err) - return err - } - - forRedis := redisItem{ - ID: item.ID, - Published: item.Published, - Read: item.Read, - Data: data, - } - - itemKey := fmt.Sprintf("item:%s", item.ID) - _, err = redis.String(conn.Do("HMSET", redis.Args{}.Add(itemKey).AddFlat(&forRedis)...)) - if err != nil { - return fmt.Errorf("error while writing item for redis: %v", err) - } - - readChannelKey := fmt.Sprintf("channel:%s:read", channel) - isRead, err := redis.Bool(conn.Do("SISMEMBER", readChannelKey, itemKey)) - if err != nil { - return err - } - - if isRead { - return nil - } - - score, err := time.Parse(time.RFC3339, item.Published) - if err != nil { - return fmt.Errorf("error can't parse %s as time", item.Published) - } - - _, err = redis.Int64(conn.Do("ZADD", zchannelKey, score.Unix()*1.0, itemKey)) - if err != nil { - return fmt.Errorf("error while zadding item %s to channel %s for redis: %v", itemKey, zchannelKey, err) - } - - b.sendMessage(microsub.Message("item added " + item.ID)) - - return nil + timelineBackend := b.getTimeline(channel) + return timelineBackend.AddItem(item) } func (b *memoryBackend) updateChannelUnreadCount(conn redis.Conn, channel string) error { @@ -691,10 +645,10 @@ func (b *memoryBackend) updateChannelUnreadCount(conn redis.Conn, channel string b.lock.RUnlock() if exists { - zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel) - unread, err := redis.Int(conn.Do("ZCARD", zchannelKey)) + timeline := b.getTimeline(channel) + unread, err := timeline.Count() if err != nil { - return fmt.Errorf("error: while updating channel unread count for %s: %s", channel, err) + return err } defer b.save() c.Unread = unread diff --git a/cmd/eksterd/timeline.go b/cmd/eksterd/timeline.go index 8454672..617391e 100644 --- a/cmd/eksterd/timeline.go +++ b/cmd/eksterd/timeline.go @@ -4,14 +4,16 @@ import ( "encoding/json" "fmt" "log" + "time" "github.com/gomodule/redigo/redis" "p83.nl/go/ekster/pkg/microsub" ) type TimelineBackend interface { - GetItems(before, after string) (microsub.Timeline, error) + Items(before, after string) (microsub.Timeline, error) AddItem(item microsub.Item) error + Count() (int, error) MarkRead(uid string) error MarkUnread(uid string) error @@ -25,7 +27,9 @@ type redisStreamTimeline struct { channel string } -func GetTimeline(timelineType, channel string) TimelineBackend { +func (b *memoryBackend) getTimeline(channel string) TimelineBackend { + // TODO: fetch timeline type from channel + timelineType := "sorted-set" if timelineType == "sorted-set" { return &redisSortedSetTimeline{channel} } @@ -38,7 +42,7 @@ func GetTimeline(timelineType, channel string) TimelineBackend { /* * REDIS SORTED SETS TIMELINE */ -func (timeline *redisSortedSetTimeline) GetItems(before, after string) (microsub.Timeline, error) { +func (timeline *redisSortedSetTimeline) Items(before, after string) (microsub.Timeline, error) { conn := pool.Get() defer conn.Close() @@ -119,22 +123,87 @@ func (timeline *redisSortedSetTimeline) GetItems(before, after string) (microsub }, nil } -func (*redisSortedSetTimeline) AddItem(item microsub.Item) error { +func (timeline *redisSortedSetTimeline) AddItem(item microsub.Item) error { + conn := pool.Get() + defer conn.Close() + + channel := timeline.channel + zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel) + + if item.Published == "" { + item.Published = time.Now().Format(time.RFC3339) + } + + data, err := json.Marshal(item) + if err != nil { + log.Printf("error while creating item for redis: %v\n", err) + return err + } + + forRedis := redisItem{ + ID: item.ID, + Published: item.Published, + Read: item.Read, + Data: data, + } + + itemKey := fmt.Sprintf("item:%s", item.ID) + _, err = redis.String(conn.Do("HMSET", redis.Args{}.Add(itemKey).AddFlat(&forRedis)...)) + if err != nil { + return fmt.Errorf("error while writing item for redis: %v", err) + } + + readChannelKey := fmt.Sprintf("channel:%s:read", channel) + isRead, err := redis.Bool(conn.Do("SISMEMBER", readChannelKey, itemKey)) + if err != nil { + return err + } + + if isRead { + return nil + } + + score, err := time.Parse(time.RFC3339, item.Published) + if err != nil { + return fmt.Errorf("error can't parse %s as time", item.Published) + } + + _, err = redis.Int64(conn.Do("ZADD", zchannelKey, score.Unix()*1.0, itemKey)) + if err != nil { + return fmt.Errorf("error while zadding item %s to channel %s for redis: %v", itemKey, zchannelKey, err) + } + + // FIXME: send message to events... + // b.sendMessage(microsub.Message("item added " + item.ID)) + + return nil +} + +func (timeline *redisSortedSetTimeline) Count() (int, error) { + conn := pool.Get() + defer conn.Close() + + channel := timeline.channel + zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel) + unread, err := redis.Int(conn.Do("ZCARD", zchannelKey)) + if err != nil { + return -1, fmt.Errorf("while updating channel unread count for %s: %s", channel, err) + } + return unread, nil +} + +func (timeline *redisSortedSetTimeline) MarkRead(uid string) error { panic("implement me") } -func (*redisSortedSetTimeline) MarkRead(uid string) error { - panic("implement me") -} - -func (*redisSortedSetTimeline) MarkUnread(uid string) error { +func (timeline *redisSortedSetTimeline) MarkUnread(uid string) error { panic("implement me") } /* * REDIS STREAMS TIMELINE */ -func (*redisStreamTimeline) GetItems(before, after string) (microsub.Timeline, error) { +func (*redisStreamTimeline) Items(before, after string) (microsub.Timeline, error) { panic("implement me") } @@ -142,6 +211,10 @@ func (*redisStreamTimeline) AddItem(item microsub.Item) error { panic("implement me") } +func (*redisStreamTimeline) Count() (int, error) { + return 0, nil +} + func (*redisStreamTimeline) MarkRead(uid string) error { panic("implement me") }