2021-11-20 21:26:39 +00:00
|
|
|
/*
|
|
|
|
* Ekster is a microsub server
|
|
|
|
* Copyright (c) 2021 The Ekster authors
|
|
|
|
*
|
|
|
|
* This program is free software: you can redistribute it and/or modify
|
|
|
|
* it under the terms of the GNU General Public License as published by
|
|
|
|
* the Free Software Foundation, either version 3 of the License, or
|
|
|
|
* (at your option) any later version.
|
|
|
|
*
|
|
|
|
* This program is distributed in the hope that it will be useful,
|
|
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
* GNU General Public License for more details.
|
|
|
|
*
|
|
|
|
* You should have received a copy of the GNU General Public License
|
|
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
*/
|
|
|
|
|
2019-03-23 19:42:13 +00:00
|
|
|
package timeline
|
2019-03-07 20:20:22 +00:00
|
|
|
|
|
|
|
import (
|
|
|
|
"encoding/json"
|
|
|
|
"fmt"
|
|
|
|
"log"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/gomodule/redigo/redis"
|
|
|
|
"p83.nl/go/ekster/pkg/microsub"
|
|
|
|
)
|
|
|
|
|
|
|
|
type redisSortedSetTimeline struct {
|
|
|
|
channel string
|
2019-03-23 17:23:56 +00:00
|
|
|
pool *redis.Pool
|
2019-03-07 20:20:22 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* REDIS SORTED SETS TIMELINE
|
|
|
|
*/
|
|
|
|
func (timeline *redisSortedSetTimeline) Init() error {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (timeline *redisSortedSetTimeline) Items(before, after string) (microsub.Timeline, error) {
|
2019-03-23 17:23:56 +00:00
|
|
|
conn := timeline.pool.Get()
|
2019-03-07 20:20:22 +00:00
|
|
|
defer conn.Close()
|
|
|
|
|
|
|
|
items := []microsub.Item{}
|
|
|
|
|
|
|
|
channel := timeline.channel
|
|
|
|
|
|
|
|
zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)
|
|
|
|
|
|
|
|
afterScore := "-inf"
|
|
|
|
if len(after) != 0 {
|
|
|
|
afterScore = "(" + after
|
|
|
|
}
|
|
|
|
beforeScore := "+inf"
|
|
|
|
if len(before) != 0 {
|
|
|
|
beforeScore = "(" + before
|
|
|
|
}
|
|
|
|
|
|
|
|
var itemJSONs [][]byte
|
|
|
|
|
|
|
|
itemScores, err := redis.Strings(
|
|
|
|
conn.Do(
|
|
|
|
"ZRANGEBYSCORE",
|
|
|
|
zchannelKey,
|
|
|
|
afterScore,
|
|
|
|
beforeScore,
|
|
|
|
"LIMIT",
|
|
|
|
0,
|
|
|
|
20,
|
|
|
|
"WITHSCORES",
|
|
|
|
),
|
|
|
|
)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return microsub.Timeline{
|
|
|
|
Paging: microsub.Pagination{},
|
|
|
|
Items: items,
|
|
|
|
}, err
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(itemScores) >= 2 {
|
|
|
|
before = itemScores[1]
|
|
|
|
after = itemScores[len(itemScores)-1]
|
|
|
|
} else {
|
|
|
|
before = ""
|
|
|
|
after = ""
|
|
|
|
}
|
|
|
|
|
|
|
|
for i := 0; i < len(itemScores); i += 2 {
|
|
|
|
itemID := itemScores[i]
|
|
|
|
itemJSON, err := redis.Bytes(conn.Do("HGET", itemID, "Data"))
|
|
|
|
if err != nil {
|
|
|
|
log.Println(err)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
itemJSONs = append(itemJSONs, itemJSON)
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, obj := range itemJSONs {
|
|
|
|
item := microsub.Item{}
|
|
|
|
err := json.Unmarshal(obj, &item)
|
|
|
|
if err != nil {
|
|
|
|
// FIXME: what should we do if one of the items doen't unmarshal?
|
|
|
|
log.Println(err)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
item.Read = false
|
|
|
|
items = append(items, item)
|
|
|
|
}
|
|
|
|
paging := microsub.Pagination{
|
|
|
|
After: after,
|
|
|
|
Before: before,
|
|
|
|
}
|
|
|
|
|
|
|
|
return microsub.Timeline{
|
|
|
|
Paging: paging,
|
|
|
|
Items: items,
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
2019-08-17 14:55:59 +00:00
|
|
|
func (timeline *redisSortedSetTimeline) AddItem(item microsub.Item) (bool, error) {
|
2019-03-23 17:23:56 +00:00
|
|
|
conn := timeline.pool.Get()
|
2019-03-07 20:20:22 +00:00
|
|
|
defer conn.Close()
|
|
|
|
|
|
|
|
channel := timeline.channel
|
|
|
|
zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)
|
|
|
|
|
|
|
|
if item.Published == "" {
|
|
|
|
item.Published = time.Now().Format(time.RFC3339)
|
|
|
|
}
|
|
|
|
|
2021-10-20 19:37:10 +00:00
|
|
|
// Fix date when it almost matches with RFC3339, except the colon in the timezone
|
|
|
|
format := "2006-01-02T15:04:05Z0700"
|
|
|
|
if parsedDate, err := time.Parse(format, item.Published); err == nil {
|
|
|
|
item.Published = parsedDate.Format(time.RFC3339)
|
|
|
|
}
|
|
|
|
|
2019-03-07 20:20:22 +00:00
|
|
|
data, err := json.Marshal(item)
|
|
|
|
if err != nil {
|
2019-08-17 14:55:59 +00:00
|
|
|
return false, fmt.Errorf("couldn't marshal item for redis: %s", err)
|
2019-03-07 20:20:22 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
forRedis := redisItem{
|
|
|
|
ID: item.ID,
|
|
|
|
Published: item.Published,
|
|
|
|
Read: item.Read,
|
|
|
|
Data: data,
|
|
|
|
}
|
|
|
|
|
|
|
|
itemKey := fmt.Sprintf("item:%s", item.ID)
|
|
|
|
_, err = redis.String(conn.Do("HMSET", redis.Args{}.Add(itemKey).AddFlat(&forRedis)...))
|
|
|
|
if err != nil {
|
2019-08-17 14:55:59 +00:00
|
|
|
return false, fmt.Errorf("writing failed for item to redis: %v", err)
|
2019-03-07 20:20:22 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
readChannelKey := fmt.Sprintf("channel:%s:read", channel)
|
|
|
|
isRead, err := redis.Bool(conn.Do("SISMEMBER", readChannelKey, itemKey))
|
|
|
|
if err != nil {
|
2019-08-17 14:55:59 +00:00
|
|
|
return false, err
|
2019-03-07 20:20:22 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if isRead {
|
2019-08-17 14:55:59 +00:00
|
|
|
return false, nil
|
2019-03-07 20:20:22 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
score, err := time.Parse(time.RFC3339, item.Published)
|
|
|
|
if err != nil {
|
2019-08-17 14:55:59 +00:00
|
|
|
return false, fmt.Errorf("can't parse %s as time", item.Published)
|
2019-03-07 20:20:22 +00:00
|
|
|
}
|
|
|
|
|
2019-08-17 17:59:19 +00:00
|
|
|
n, err := redis.Int64(conn.Do("ZADD", zchannelKey, score.Unix()*1.0, itemKey))
|
2019-03-07 20:20:22 +00:00
|
|
|
if err != nil {
|
2019-08-17 14:55:59 +00:00
|
|
|
return false, fmt.Errorf("zadding failed item %s to channel %s for redis: %v", itemKey, zchannelKey, err)
|
2019-03-07 20:20:22 +00:00
|
|
|
}
|
|
|
|
|
2019-08-17 17:59:19 +00:00
|
|
|
return n == 1, nil
|
2019-03-07 20:20:22 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (timeline *redisSortedSetTimeline) Count() (int, error) {
|
2019-03-23 17:23:56 +00:00
|
|
|
conn := timeline.pool.Get()
|
2019-03-07 20:20:22 +00:00
|
|
|
defer conn.Close()
|
|
|
|
|
|
|
|
channel := timeline.channel
|
|
|
|
zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)
|
|
|
|
unread, err := redis.Int(conn.Do("ZCARD", zchannelKey))
|
|
|
|
if err != nil {
|
|
|
|
return -1, fmt.Errorf("while updating channel unread count for %s: %s", channel, err)
|
|
|
|
}
|
|
|
|
return unread, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (timeline *redisSortedSetTimeline) MarkRead(uids []string) error {
|
2019-03-23 17:23:56 +00:00
|
|
|
conn := timeline.pool.Get()
|
2019-03-07 20:20:22 +00:00
|
|
|
defer conn.Close()
|
|
|
|
|
|
|
|
channel := timeline.channel
|
|
|
|
|
|
|
|
itemUIDs := []string{}
|
|
|
|
for _, uid := range uids {
|
|
|
|
itemUIDs = append(itemUIDs, "item:"+uid)
|
|
|
|
}
|
|
|
|
|
|
|
|
channelKey := fmt.Sprintf("channel:%s:read", channel)
|
|
|
|
args := redis.Args{}.Add(channelKey).AddFlat(itemUIDs)
|
|
|
|
|
|
|
|
if _, err := conn.Do("SADD", args...); err != nil {
|
|
|
|
return fmt.Errorf("marking read for channel %s has failed: %s", channel, err)
|
|
|
|
}
|
|
|
|
|
|
|
|
zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)
|
|
|
|
args = redis.Args{}.Add(zchannelKey).AddFlat(itemUIDs)
|
|
|
|
|
|
|
|
if _, err := conn.Do("ZREM", args...); err != nil {
|
|
|
|
return fmt.Errorf("marking read for channel %s has failed: %s", channel, err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (timeline *redisSortedSetTimeline) MarkUnread(uids []string) error {
|
|
|
|
panic("implement me")
|
|
|
|
}
|