diff --git a/cmd/server/fetch.go b/cmd/server/fetch.go index 3725632..b1b8c48 100644 --- a/cmd/server/fetch.go +++ b/cmd/server/fetch.go @@ -359,6 +359,7 @@ func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) { // send to redis channelKey := fmt.Sprintf("channel:%s:posts", channel) + zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel) data, err := json.Marshal(item) if err != nil { @@ -385,6 +386,18 @@ func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) { log.Printf("error while adding item %s to channel %s for redis: %v\n", itemKey, channelKey, err) return } + + score, err := time.Parse(time.RFC3339, item.Published) + if err != nil { + log.Printf("error can't parse %s as time\n", item.Published) + } + + added, err = redis.Int64(conn.Do("ZADD", zchannelKey, score.Unix()*1.0, itemKey)) + if err != nil { + log.Printf("error while zadding item %s to channel %s for redis: %v\n", itemKey, channelKey, err) + return + } + if added > 0 { log.Printf("Adding item to channel %s\n", channel) log.Println(item) diff --git a/cmd/server/memory.go b/cmd/server/memory.go index d6f702b..7c84e2b 100644 --- a/cmd/server/memory.go +++ b/cmd/server/memory.go @@ -362,9 +362,40 @@ func (b *memoryBackend) TimelineGet(after, before, channel string) microsub.Time items := []microsub.Item{} - channelKey := fmt.Sprintf("channel:%s:posts", channel) + zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel) + //channelKey := fmt.Sprintf("channel:%s:posts", channel) + + //itemJsons, err := redis.ByteSlices(conn.Do("SORT", channelKey, "BY", "*->Published", "GET", "*->Data", "ASC", "ALPHA")) + // if err != nil { + // log.Println(err) + // return microsub.Timeline{ + // Paging: microsub.Pagination{}, + // Items: items, + // } + // } + + if len(after) == 0 { + after = "-inf" + } + if len(before) == 0 { + before = "+inf" + } + + itemJSONs := [][]byte{} + + itemScores, err := redis.Strings( + conn.Do( + "ZRANGEBYSCORE", + zchannelKey, + after, + before, + "LIMIT", + 0, + 20, + "WITHSCORES", + ), + ) - itemJsons, err := redis.ByteSlices(conn.Do("SORT", channelKey, "BY", "*->Published", "GET", "*->Data", "ASC", "ALPHA")) if err != nil { log.Println(err) return microsub.Timeline{ @@ -373,18 +404,41 @@ func (b *memoryBackend) TimelineGet(after, before, channel string) microsub.Time } } - for _, obj := range itemJsons { - item := microsub.Item{} - json.Unmarshal(obj, &item) - item.Read = b.checkRead(channel, item.ID) - if item.Read { + if len(itemScores) >= 2 { + before = itemScores[1] + after = itemScores[len(itemScores)-1] + } else { + before = "" + after = "" + } + + for i := 0; i < len(itemScores); i += 2 { + itemID := itemScores[i] + itemJSON, err := redis.Bytes(conn.Do("HGET", itemID, "Data")) + if err != nil { + log.Println(err) continue } + itemJSONs = append(itemJSONs, itemJSON) + } + + for _, obj := range itemJSONs { + item := microsub.Item{} + err := json.Unmarshal(obj, &item) + if err != nil { + log.Println(err) + continue + } + item.Read = b.checkRead(channel, item.ID) items = append(items, item) } + paging := microsub.Pagination{ + After: after, + Before: before, + } return microsub.Timeline{ - Paging: microsub.Pagination{}, + Paging: paging, Items: items, } }