First try at paging with ZSETS

This commit is contained in:
Peter Stuifzand 2018-04-09 21:52:55 +02:00
parent eeeb9f3b3f
commit 6ae8062dc6
2 changed files with 75 additions and 8 deletions

View File

@ -359,6 +359,7 @@ func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) {
// send to redis // send to redis
channelKey := fmt.Sprintf("channel:%s:posts", channel) channelKey := fmt.Sprintf("channel:%s:posts", channel)
zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)
data, err := json.Marshal(item) data, err := json.Marshal(item)
if err != nil { if err != nil {
@ -385,6 +386,18 @@ func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) {
log.Printf("error while adding item %s to channel %s for redis: %v\n", itemKey, channelKey, err) log.Printf("error while adding item %s to channel %s for redis: %v\n", itemKey, channelKey, err)
return return
} }
score, err := time.Parse(time.RFC3339, item.Published)
if err != nil {
log.Printf("error can't parse %s as time\n", item.Published)
}
added, err = redis.Int64(conn.Do("ZADD", zchannelKey, score.Unix()*1.0, itemKey))
if err != nil {
log.Printf("error while zadding item %s to channel %s for redis: %v\n", itemKey, channelKey, err)
return
}
if added > 0 { if added > 0 {
log.Printf("Adding item to channel %s\n", channel) log.Printf("Adding item to channel %s\n", channel)
log.Println(item) log.Println(item)

View File

@ -362,9 +362,40 @@ func (b *memoryBackend) TimelineGet(after, before, channel string) microsub.Time
items := []microsub.Item{} items := []microsub.Item{}
channelKey := fmt.Sprintf("channel:%s:posts", channel) zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)
//channelKey := fmt.Sprintf("channel:%s:posts", channel)
//itemJsons, err := redis.ByteSlices(conn.Do("SORT", channelKey, "BY", "*->Published", "GET", "*->Data", "ASC", "ALPHA"))
// if err != nil {
// log.Println(err)
// return microsub.Timeline{
// Paging: microsub.Pagination{},
// Items: items,
// }
// }
if len(after) == 0 {
after = "-inf"
}
if len(before) == 0 {
before = "+inf"
}
itemJSONs := [][]byte{}
itemScores, err := redis.Strings(
conn.Do(
"ZRANGEBYSCORE",
zchannelKey,
after,
before,
"LIMIT",
0,
20,
"WITHSCORES",
),
)
itemJsons, err := redis.ByteSlices(conn.Do("SORT", channelKey, "BY", "*->Published", "GET", "*->Data", "ASC", "ALPHA"))
if err != nil { if err != nil {
log.Println(err) log.Println(err)
return microsub.Timeline{ return microsub.Timeline{
@ -373,18 +404,41 @@ func (b *memoryBackend) TimelineGet(after, before, channel string) microsub.Time
} }
} }
for _, obj := range itemJsons { if len(itemScores) >= 2 {
item := microsub.Item{} before = itemScores[1]
json.Unmarshal(obj, &item) after = itemScores[len(itemScores)-1]
item.Read = b.checkRead(channel, item.ID) } else {
if item.Read { before = ""
after = ""
}
for i := 0; i < len(itemScores); i += 2 {
itemID := itemScores[i]
itemJSON, err := redis.Bytes(conn.Do("HGET", itemID, "Data"))
if err != nil {
log.Println(err)
continue continue
} }
itemJSONs = append(itemJSONs, itemJSON)
}
for _, obj := range itemJSONs {
item := microsub.Item{}
err := json.Unmarshal(obj, &item)
if err != nil {
log.Println(err)
continue
}
item.Read = b.checkRead(channel, item.ID)
items = append(items, item) items = append(items, item)
} }
paging := microsub.Pagination{
After: after,
Before: before,
}
return microsub.Timeline{ return microsub.Timeline{
Paging: microsub.Pagination{}, Paging: paging,
Items: items, Items: items,
} }
} }