diff --git a/cmd/eksterd/fetch.go b/cmd/eksterd/fetch.go index bd84174..5b92729 100644 --- a/cmd/eksterd/fetch.go +++ b/cmd/eksterd/fetch.go @@ -342,6 +342,9 @@ func (b *memoryBackend) feedItems(fetchURL, contentType string, body io.Reader) } func (b *memoryBackend) ProcessContent(channel, fetchURL, contentType string, body io.Reader) error { + conn := pool.Get() + defer conn.Close() + items, err := b.feedItems(fetchURL, contentType, body) if err != nil { return err @@ -349,27 +352,23 @@ func (b *memoryBackend) ProcessContent(channel, fetchURL, contentType string, bo for _, item := range items { item.Read = false - b.channelAddItem(channel, item) + err = b.channelAddItem(conn, channel, item) + log.Printf("ERROR: %s\n", err) } + err = b.updateChannelUnreadCount(conn, channel) + log.Printf("error: while updating channel unread count for %s: %s\n", channel, err) + return nil } // Fetch3 fills stuff func (b *memoryBackend) Fetch3(channel, fetchURL string) (*http.Response, error) { log.Printf("Fetching channel=%s fetchURL=%s\n", channel, fetchURL) - - resp, err := Fetch2(fetchURL) - if err != nil { - return nil, err - } - return resp, nil + return Fetch2(fetchURL) } -func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) { - conn := pool.Get() - defer conn.Close() - +func (b *memoryBackend) channelAddItem(conn redis.Conn, channel string, item microsub.Item) error { // send to redis channelKey := fmt.Sprintf("channel:%s:posts", channel) zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel) @@ -381,7 +380,7 @@ func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) { data, err := json.Marshal(item) if err != nil { log.Printf("error while creating item for redis: %v\n", err) - return + return err } forRedis := redisItem{ @@ -397,46 +396,56 @@ func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) { _, err = redis.String(conn.Do("HMSET", redis.Args{}.Add(itemKey).AddFlat(&forRedis)...)) if err != nil { log.Printf("error while writing item for redis: %v\n", err) - return + return err } readChannelKey := fmt.Sprintf("channel:%s:read", channel) isRead, err := redis.Bool(conn.Do("SISMEMBER", readChannelKey, itemKey)) + if err != nil { + return err + } if isRead { - return + return nil } added, err := redis.Int64(conn.Do("SADD", channelKey, itemKey)) if err != nil { log.Printf("error while adding item %s to channel %s for redis: %v\n", itemKey, channelKey, err) - return + return err } score, err := time.Parse(time.RFC3339, item.Published) if err != nil { log.Printf("error can't parse %s as time\n", item.Published) + return err } added, err = redis.Int64(conn.Do("ZADD", zchannelKey, score.Unix()*1.0, itemKey)) if err != nil { log.Printf("error while zadding item %s to channel %s for redis: %v\n", itemKey, channelKey, err) - return + return err } if added > 0 { - log.Printf("Adding item to channel %s\n", channel) + log.Printf("Added item to channel %s\n", channel) log.Println(item) - - if c, e := b.Channels[channel]; e { - unread, err := redis.Int(conn.Do("ZCARD", zchannelKey)) - if err != nil { - log.Printf("error while getting length of channel %s: %v\n", channelKey, err) - } - c.Unread = unread - b.Channels[channel] = c - } } + + return nil +} + +func (b *memoryBackend) updateChannelUnreadCount(conn redis.Conn, channel string) error { + if c, e := b.Channels[channel]; e { + zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel) + unread, err := redis.Int(conn.Do("ZCARD", zchannelKey)) + if err != nil { + return err + } + c.Unread = unread + b.Channels[channel] = c + } + return nil } type redisItem struct { diff --git a/cmd/eksterd/micropub.go b/cmd/eksterd/micropub.go index 9f95d43..597b602 100644 --- a/cmd/eksterd/micropub.go +++ b/cmd/eksterd/micropub.go @@ -73,7 +73,8 @@ func (h *micropubHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { item.Read = false id, _ := redis.Int(conn.Do("INCR", "source:"+sourceID+"next_id")) item.ID = fmt.Sprintf("%x", sha1.Sum([]byte(fmt.Sprintf("source:%s:%d", sourceID, id)))) - h.Backend.channelAddItem(channel, item) + h.Backend.channelAddItem(conn, channel, item) + h.Backend.updateChannelUnreadCount(conn, channel) } w.Header().Set("Content-Type", "application/json")