Added boolean to show when item was added to channel
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Peter Stuifzand 2019-08-17 16:55:59 +02:00
parent c7a1e65b50
commit d2fc2ecd3c
Signed by: peter
GPG Key ID: 374322D56E5209E8
6 changed files with 22 additions and 20 deletions

View File

@ -639,10 +639,12 @@ func matchItemText(item microsub.Item, re *regexp.Regexp) bool {
func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) error { func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) error {
timelineBackend := b.getTimeline(channel) timelineBackend := b.getTimeline(channel)
err := timelineBackend.AddItem(item) added, err := timelineBackend.AddItem(item)
// Sent message to Server-Sent-Events // Sent message to Server-Sent-Events
b.broker.Notifier <- sse.Message{Event: "new item", Object: item} if added {
b.broker.Notifier <- sse.Message{Event: "new item", Object: item}
}
return err return err
} }
@ -661,6 +663,9 @@ func (b *memoryBackend) updateChannelUnreadCount(channel string) error {
defer b.save() defer b.save()
c.Unread = microsub.Unread{Type: microsub.UnreadCount, UnreadCount: unread} c.Unread = microsub.Unread{Type: microsub.UnreadCount, UnreadCount: unread}
// Sent message to Server-Sent-Events
b.broker.Notifier <- sse.Message{Event: "new item in channel", Object: c}
b.lock.Lock() b.lock.Lock()
b.Channels[channel] = c b.Channels[channel] = c
b.lock.Unlock() b.lock.Unlock()

View File

@ -129,7 +129,7 @@ func WriteMessages(w http.ResponseWriter, messageChan chan Message) error {
return err return err
} }
_, err = fmt.Fprintf(w, "data: %s", encoded) _, err = fmt.Fprintf(w, "data: %s\r\n\r\n", encoded)
if err != nil { if err != nil {
return err return err
} }

View File

@ -14,8 +14,8 @@ func (timeline *nullTimeline) Items(before, after string) (microsub.Timeline, er
return microsub.Timeline{Items: []microsub.Item{}}, nil return microsub.Timeline{Items: []microsub.Item{}}, nil
} }
func (timeline *nullTimeline) AddItem(item microsub.Item) error { func (timeline *nullTimeline) AddItem(item microsub.Item) (bool, error) {
return nil return false, nil
} }
func (timeline *nullTimeline) Count() (int, error) { func (timeline *nullTimeline) Count() (int, error) {

View File

@ -103,7 +103,7 @@ func (timeline *redisSortedSetTimeline) Items(before, after string) (microsub.Ti
}, nil }, nil
} }
func (timeline *redisSortedSetTimeline) AddItem(item microsub.Item) error { func (timeline *redisSortedSetTimeline) AddItem(item microsub.Item) (bool, error) {
conn := timeline.pool.Get() conn := timeline.pool.Get()
defer conn.Close() defer conn.Close()
@ -116,7 +116,7 @@ func (timeline *redisSortedSetTimeline) AddItem(item microsub.Item) error {
data, err := json.Marshal(item) data, err := json.Marshal(item)
if err != nil { if err != nil {
return fmt.Errorf("couldn't marshall item for redis: %s", err) return false, fmt.Errorf("couldn't marshal item for redis: %s", err)
} }
forRedis := redisItem{ forRedis := redisItem{
@ -129,33 +129,30 @@ func (timeline *redisSortedSetTimeline) AddItem(item microsub.Item) error {
itemKey := fmt.Sprintf("item:%s", item.ID) itemKey := fmt.Sprintf("item:%s", item.ID)
_, err = redis.String(conn.Do("HMSET", redis.Args{}.Add(itemKey).AddFlat(&forRedis)...)) _, err = redis.String(conn.Do("HMSET", redis.Args{}.Add(itemKey).AddFlat(&forRedis)...))
if err != nil { if err != nil {
return fmt.Errorf("writing failed for item to redis: %v", err) return false, fmt.Errorf("writing failed for item to redis: %v", err)
} }
readChannelKey := fmt.Sprintf("channel:%s:read", channel) readChannelKey := fmt.Sprintf("channel:%s:read", channel)
isRead, err := redis.Bool(conn.Do("SISMEMBER", readChannelKey, itemKey)) isRead, err := redis.Bool(conn.Do("SISMEMBER", readChannelKey, itemKey))
if err != nil { if err != nil {
return err return false, err
} }
if isRead { if isRead {
return nil return false, nil
} }
score, err := time.Parse(time.RFC3339, item.Published) score, err := time.Parse(time.RFC3339, item.Published)
if err != nil { if err != nil {
return fmt.Errorf("can't parse %s as time", item.Published) return false, fmt.Errorf("can't parse %s as time", item.Published)
} }
_, err = redis.Int64(conn.Do("ZADD", zchannelKey, score.Unix()*1.0, itemKey)) _, err = redis.Int64(conn.Do("ZADD", zchannelKey, score.Unix()*1.0, itemKey))
if err != nil { if err != nil {
return fmt.Errorf("zadding failed item %s to channel %s for redis: %v", itemKey, zchannelKey, err) return false, fmt.Errorf("zadding failed item %s to channel %s for redis: %v", itemKey, zchannelKey, err)
} }
// FIXME: send message to events... return true, nil
// b.sendMessage(microsub.Message("item added " + item.ID))
return nil
} }
func (timeline *redisSortedSetTimeline) Count() (int, error) { func (timeline *redisSortedSetTimeline) Count() (int, error) {

View File

@ -70,7 +70,7 @@ func (timeline *redisStreamTimeline) Items(before, after string) (microsub.Timel
}, nil }, nil
} }
func (timeline *redisStreamTimeline) AddItem(item microsub.Item) error { func (timeline *redisStreamTimeline) AddItem(item microsub.Item) (bool, error) {
conn := timeline.pool.Get() conn := timeline.pool.Get()
defer conn.Close() defer conn.Close()
@ -81,14 +81,14 @@ func (timeline *redisStreamTimeline) AddItem(item microsub.Item) error {
data, err := json.Marshal(item) data, err := json.Marshal(item)
if err != nil { if err != nil {
log.Printf("error while creating item for redis: %v\n", err) log.Printf("error while creating item for redis: %v\n", err)
return err return false, err
} }
args := redis.Args{}.Add(timeline.channelKey).Add("*").Add("ID").Add(item.ID).Add("Published").Add(item.Published).Add("Read").Add(item.Read).Add("Data").Add(data) args := redis.Args{}.Add(timeline.channelKey).Add("*").Add("ID").Add(item.ID).Add("Published").Add(item.Published).Add("Read").Add(item.Read).Add("Data").Add(data)
_, err = redis.String(conn.Do("XADD", args...)) _, err = redis.String(conn.Do("XADD", args...))
return err return err == nil, err
} }
func (timeline *redisStreamTimeline) Count() (int, error) { func (timeline *redisStreamTimeline) Count() (int, error) {

View File

@ -19,7 +19,7 @@ type Backend interface {
Items(before, after string) (microsub.Timeline, error) Items(before, after string) (microsub.Timeline, error)
Count() (int, error) Count() (int, error)
AddItem(item microsub.Item) error AddItem(item microsub.Item) (bool, error)
MarkRead(uids []string) error MarkRead(uids []string) error
// Not used at the moment // Not used at the moment