This commit is contained in:
parent
70f5fb82f9
commit
a72f3ce493
|
@ -12,7 +12,6 @@ repos:
|
|||
- repo: https://github.com/dnephin/pre-commit-golang
|
||||
rev: master
|
||||
hooks:
|
||||
- id: go-vet
|
||||
- id: go-fmt
|
||||
- id: go-lint
|
||||
- id: go-unit-tests
|
||||
|
|
27
cmd/eksterd/null.go
Normal file
27
cmd/eksterd/null.go
Normal file
|
@ -0,0 +1,27 @@
|
|||
package main
|
||||
|
||||
import "p83.nl/go/ekster/pkg/microsub"
|
||||
|
||||
type nullTimeline struct {
|
||||
channel string
|
||||
}
|
||||
|
||||
func (timeline *nullTimeline) Init() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (timeline *nullTimeline) Items(before, after string) (microsub.Timeline, error) {
|
||||
return microsub.Timeline{Items: []microsub.Item{}}, nil
|
||||
}
|
||||
|
||||
func (timeline *nullTimeline) AddItem(item microsub.Item) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (timeline *nullTimeline) Count() (int, error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func (timeline *nullTimeline) MarkRead(uids []string) error {
|
||||
return nil
|
||||
}
|
203
cmd/eksterd/redisset.go
Normal file
203
cmd/eksterd/redisset.go
Normal file
|
@ -0,0 +1,203 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/gomodule/redigo/redis"
|
||||
"p83.nl/go/ekster/pkg/microsub"
|
||||
)
|
||||
|
||||
type redisSortedSetTimeline struct {
|
||||
channel string
|
||||
}
|
||||
|
||||
/*
|
||||
* REDIS SORTED SETS TIMELINE
|
||||
*/
|
||||
func (timeline *redisSortedSetTimeline) Init() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (timeline *redisSortedSetTimeline) Items(before, after string) (microsub.Timeline, error) {
|
||||
conn := pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
items := []microsub.Item{}
|
||||
|
||||
channel := timeline.channel
|
||||
|
||||
zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)
|
||||
|
||||
afterScore := "-inf"
|
||||
if len(after) != 0 {
|
||||
afterScore = "(" + after
|
||||
}
|
||||
beforeScore := "+inf"
|
||||
if len(before) != 0 {
|
||||
beforeScore = "(" + before
|
||||
}
|
||||
|
||||
var itemJSONs [][]byte
|
||||
|
||||
itemScores, err := redis.Strings(
|
||||
conn.Do(
|
||||
"ZRANGEBYSCORE",
|
||||
zchannelKey,
|
||||
afterScore,
|
||||
beforeScore,
|
||||
"LIMIT",
|
||||
0,
|
||||
20,
|
||||
"WITHSCORES",
|
||||
),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return microsub.Timeline{
|
||||
Paging: microsub.Pagination{},
|
||||
Items: items,
|
||||
}, err
|
||||
}
|
||||
|
||||
if len(itemScores) >= 2 {
|
||||
before = itemScores[1]
|
||||
after = itemScores[len(itemScores)-1]
|
||||
} else {
|
||||
before = ""
|
||||
after = ""
|
||||
}
|
||||
|
||||
for i := 0; i < len(itemScores); i += 2 {
|
||||
itemID := itemScores[i]
|
||||
itemJSON, err := redis.Bytes(conn.Do("HGET", itemID, "Data"))
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
continue
|
||||
}
|
||||
itemJSONs = append(itemJSONs, itemJSON)
|
||||
}
|
||||
|
||||
for _, obj := range itemJSONs {
|
||||
item := microsub.Item{}
|
||||
err := json.Unmarshal(obj, &item)
|
||||
if err != nil {
|
||||
// FIXME: what should we do if one of the items doen't unmarshal?
|
||||
log.Println(err)
|
||||
continue
|
||||
}
|
||||
item.Read = false
|
||||
items = append(items, item)
|
||||
}
|
||||
paging := microsub.Pagination{
|
||||
After: after,
|
||||
Before: before,
|
||||
}
|
||||
|
||||
return microsub.Timeline{
|
||||
Paging: paging,
|
||||
Items: items,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (timeline *redisSortedSetTimeline) AddItem(item microsub.Item) error {
|
||||
conn := pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
channel := timeline.channel
|
||||
zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)
|
||||
|
||||
if item.Published == "" {
|
||||
item.Published = time.Now().Format(time.RFC3339)
|
||||
}
|
||||
|
||||
data, err := json.Marshal(item)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't marshall item for redis: %s", err)
|
||||
}
|
||||
|
||||
forRedis := redisItem{
|
||||
ID: item.ID,
|
||||
Published: item.Published,
|
||||
Read: item.Read,
|
||||
Data: data,
|
||||
}
|
||||
|
||||
itemKey := fmt.Sprintf("item:%s", item.ID)
|
||||
_, err = redis.String(conn.Do("HMSET", redis.Args{}.Add(itemKey).AddFlat(&forRedis)...))
|
||||
if err != nil {
|
||||
return fmt.Errorf("writing failed for item to redis: %v", err)
|
||||
}
|
||||
|
||||
readChannelKey := fmt.Sprintf("channel:%s:read", channel)
|
||||
isRead, err := redis.Bool(conn.Do("SISMEMBER", readChannelKey, itemKey))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if isRead {
|
||||
return nil
|
||||
}
|
||||
|
||||
score, err := time.Parse(time.RFC3339, item.Published)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't parse %s as time", item.Published)
|
||||
}
|
||||
|
||||
_, err = redis.Int64(conn.Do("ZADD", zchannelKey, score.Unix()*1.0, itemKey))
|
||||
if err != nil {
|
||||
return fmt.Errorf("zadding failed item %s to channel %s for redis: %v", itemKey, zchannelKey, err)
|
||||
}
|
||||
|
||||
// FIXME: send message to events...
|
||||
// b.sendMessage(microsub.Message("item added " + item.ID))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (timeline *redisSortedSetTimeline) Count() (int, error) {
|
||||
conn := pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
channel := timeline.channel
|
||||
zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)
|
||||
unread, err := redis.Int(conn.Do("ZCARD", zchannelKey))
|
||||
if err != nil {
|
||||
return -1, fmt.Errorf("while updating channel unread count for %s: %s", channel, err)
|
||||
}
|
||||
return unread, nil
|
||||
}
|
||||
|
||||
func (timeline *redisSortedSetTimeline) MarkRead(uids []string) error {
|
||||
conn := pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
channel := timeline.channel
|
||||
|
||||
itemUIDs := []string{}
|
||||
for _, uid := range uids {
|
||||
itemUIDs = append(itemUIDs, "item:"+uid)
|
||||
}
|
||||
|
||||
channelKey := fmt.Sprintf("channel:%s:read", channel)
|
||||
args := redis.Args{}.Add(channelKey).AddFlat(itemUIDs)
|
||||
|
||||
if _, err := conn.Do("SADD", args...); err != nil {
|
||||
return fmt.Errorf("marking read for channel %s has failed: %s", channel, err)
|
||||
}
|
||||
|
||||
zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)
|
||||
args = redis.Args{}.Add(zchannelKey).AddFlat(itemUIDs)
|
||||
|
||||
if _, err := conn.Do("ZREM", args...); err != nil {
|
||||
return fmt.Errorf("marking read for channel %s has failed: %s", channel, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (timeline *redisSortedSetTimeline) MarkUnread(uids []string) error {
|
||||
panic("implement me")
|
||||
}
|
107
cmd/eksterd/redisstreams.go
Normal file
107
cmd/eksterd/redisstreams.go
Normal file
|
@ -0,0 +1,107 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/gomodule/redigo/redis"
|
||||
"p83.nl/go/ekster/pkg/microsub"
|
||||
)
|
||||
|
||||
type redisStreamTimeline struct {
|
||||
channel, channelKey string
|
||||
}
|
||||
|
||||
/*
|
||||
* REDIS STREAMS TIMELINE
|
||||
*/
|
||||
func (timeline *redisStreamTimeline) Init() error {
|
||||
timeline.channelKey = fmt.Sprintf("stream:%s", timeline.channel)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (timeline *redisStreamTimeline) Items(before, after string) (microsub.Timeline, error) {
|
||||
conn := pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
if before == "" {
|
||||
before = "-"
|
||||
}
|
||||
|
||||
if after == "" {
|
||||
after = "+"
|
||||
}
|
||||
|
||||
results, err := redis.Values(conn.Do("XREVRANGE", redis.Args{}.Add(timeline.channelKey, after, before, "COUNT", "20")...))
|
||||
if err != nil {
|
||||
return microsub.Timeline{}, err
|
||||
}
|
||||
|
||||
var forRedis redisItem
|
||||
|
||||
var items []microsub.Item
|
||||
for _, result := range results {
|
||||
if value, ok := result.([]interface{}); ok {
|
||||
id, ok2 := value[0].([]uint8)
|
||||
|
||||
if item, ok3 := value[1].([]interface{}); ok3 {
|
||||
err = redis.ScanStruct(item, &forRedis)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
item := forRedis.Item()
|
||||
if ok2 {
|
||||
item.ID = string(id)
|
||||
}
|
||||
items = append(items, item)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return microsub.Timeline{
|
||||
Items: items,
|
||||
Paging: microsub.Pagination{
|
||||
After: items[len(items)-1].ID,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (timeline *redisStreamTimeline) AddItem(item microsub.Item) error {
|
||||
conn := pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
if item.Published == "" {
|
||||
item.Published = time.Now().Format(time.RFC3339)
|
||||
}
|
||||
|
||||
data, err := json.Marshal(item)
|
||||
if err != nil {
|
||||
log.Printf("error while creating item for redis: %v\n", err)
|
||||
return err
|
||||
}
|
||||
|
||||
args := redis.Args{}.Add(timeline.channelKey).Add("*").Add("ID").Add(item.ID).Add("Published").Add(item.Published).Add("Read").Add(item.Read).Add("Data").Add(data)
|
||||
|
||||
_, err = redis.String(conn.Do("XADD", args...))
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (timeline *redisStreamTimeline) Count() (int, error) {
|
||||
conn := pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
return redis.Int(conn.Do("XLEN", timeline.channelKey))
|
||||
}
|
||||
|
||||
func (timeline *redisStreamTimeline) MarkRead(uids []string) error {
|
||||
// panic("implement me")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (timeline *redisStreamTimeline) MarkUnread(uids []string) error {
|
||||
// panic("implement me")
|
||||
return nil
|
||||
}
|
|
@ -1,36 +1,22 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/gomodule/redigo/redis"
|
||||
"p83.nl/go/ekster/pkg/microsub"
|
||||
)
|
||||
|
||||
// TimelineBackend specifies the interface for Timeline. It supports everything that is needed
|
||||
// for Ekster to implement the channel protocol for Microsub
|
||||
type TimelineBackend interface {
|
||||
Init() error
|
||||
|
||||
Items(before, after string) (microsub.Timeline, error)
|
||||
AddItem(item microsub.Item) error
|
||||
Count() (int, error)
|
||||
|
||||
AddItem(item microsub.Item) error
|
||||
MarkRead(uids []string) error
|
||||
|
||||
// Not used at the moment
|
||||
// MarkUnread(uids []string) error
|
||||
}
|
||||
|
||||
type redisSortedSetTimeline struct {
|
||||
channel string
|
||||
}
|
||||
|
||||
type redisStreamTimeline struct {
|
||||
channel, channelKey string
|
||||
}
|
||||
|
||||
func (b *memoryBackend) getTimeline(channel string) TimelineBackend {
|
||||
// TODO: fetch timeline type from channel
|
||||
timelineType := "sorted-set"
|
||||
|
@ -53,285 +39,13 @@ func (b *memoryBackend) getTimeline(channel string) TimelineBackend {
|
|||
}
|
||||
return timeline
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
/*
|
||||
* REDIS SORTED SETS TIMELINE
|
||||
*/
|
||||
func (timeline *redisSortedSetTimeline) Init() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (timeline *redisSortedSetTimeline) Items(before, after string) (microsub.Timeline, error) {
|
||||
conn := pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
items := []microsub.Item{}
|
||||
|
||||
channel := timeline.channel
|
||||
|
||||
zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)
|
||||
|
||||
afterScore := "-inf"
|
||||
if len(after) != 0 {
|
||||
afterScore = "(" + after
|
||||
}
|
||||
beforeScore := "+inf"
|
||||
if len(before) != 0 {
|
||||
beforeScore = "(" + before
|
||||
}
|
||||
|
||||
var itemJSONs [][]byte
|
||||
|
||||
itemScores, err := redis.Strings(
|
||||
conn.Do(
|
||||
"ZRANGEBYSCORE",
|
||||
zchannelKey,
|
||||
afterScore,
|
||||
beforeScore,
|
||||
"LIMIT",
|
||||
0,
|
||||
20,
|
||||
"WITHSCORES",
|
||||
),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return microsub.Timeline{
|
||||
Paging: microsub.Pagination{},
|
||||
Items: items,
|
||||
}, err
|
||||
}
|
||||
|
||||
if len(itemScores) >= 2 {
|
||||
before = itemScores[1]
|
||||
after = itemScores[len(itemScores)-1]
|
||||
} else {
|
||||
before = ""
|
||||
after = ""
|
||||
}
|
||||
|
||||
for i := 0; i < len(itemScores); i += 2 {
|
||||
itemID := itemScores[i]
|
||||
itemJSON, err := redis.Bytes(conn.Do("HGET", itemID, "Data"))
|
||||
if timelineType == "null" {
|
||||
timeline := &nullTimeline{channel: channel}
|
||||
err := timeline.Init()
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
continue
|
||||
return nil
|
||||
}
|
||||
itemJSONs = append(itemJSONs, itemJSON)
|
||||
return timeline
|
||||
}
|
||||
|
||||
for _, obj := range itemJSONs {
|
||||
item := microsub.Item{}
|
||||
err := json.Unmarshal(obj, &item)
|
||||
if err != nil {
|
||||
// FIXME: what should we do if one of the items doen't unmarshal?
|
||||
log.Println(err)
|
||||
continue
|
||||
}
|
||||
item.Read = false
|
||||
items = append(items, item)
|
||||
}
|
||||
paging := microsub.Pagination{
|
||||
After: after,
|
||||
Before: before,
|
||||
}
|
||||
|
||||
return microsub.Timeline{
|
||||
Paging: paging,
|
||||
Items: items,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (timeline *redisSortedSetTimeline) AddItem(item microsub.Item) error {
|
||||
conn := pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
channel := timeline.channel
|
||||
zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)
|
||||
|
||||
if item.Published == "" {
|
||||
item.Published = time.Now().Format(time.RFC3339)
|
||||
}
|
||||
|
||||
data, err := json.Marshal(item)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't marshall item for redis: %s", err)
|
||||
}
|
||||
|
||||
forRedis := redisItem{
|
||||
ID: item.ID,
|
||||
Published: item.Published,
|
||||
Read: item.Read,
|
||||
Data: data,
|
||||
}
|
||||
|
||||
itemKey := fmt.Sprintf("item:%s", item.ID)
|
||||
_, err = redis.String(conn.Do("HMSET", redis.Args{}.Add(itemKey).AddFlat(&forRedis)...))
|
||||
if err != nil {
|
||||
return fmt.Errorf("writing failed for item to redis: %v", err)
|
||||
}
|
||||
|
||||
readChannelKey := fmt.Sprintf("channel:%s:read", channel)
|
||||
isRead, err := redis.Bool(conn.Do("SISMEMBER", readChannelKey, itemKey))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if isRead {
|
||||
return nil
|
||||
}
|
||||
|
||||
score, err := time.Parse(time.RFC3339, item.Published)
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't parse %s as time", item.Published)
|
||||
}
|
||||
|
||||
_, err = redis.Int64(conn.Do("ZADD", zchannelKey, score.Unix()*1.0, itemKey))
|
||||
if err != nil {
|
||||
return fmt.Errorf("zadding failed item %s to channel %s for redis: %v", itemKey, zchannelKey, err)
|
||||
}
|
||||
|
||||
// FIXME: send message to events...
|
||||
// b.sendMessage(microsub.Message("item added " + item.ID))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (timeline *redisSortedSetTimeline) Count() (int, error) {
|
||||
conn := pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
channel := timeline.channel
|
||||
zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)
|
||||
unread, err := redis.Int(conn.Do("ZCARD", zchannelKey))
|
||||
if err != nil {
|
||||
return -1, fmt.Errorf("while updating channel unread count for %s: %s", channel, err)
|
||||
}
|
||||
return unread, nil
|
||||
}
|
||||
|
||||
func (timeline *redisSortedSetTimeline) MarkRead(uids []string) error {
|
||||
conn := pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
channel := timeline.channel
|
||||
|
||||
itemUIDs := []string{}
|
||||
for _, uid := range uids {
|
||||
itemUIDs = append(itemUIDs, "item:"+uid)
|
||||
}
|
||||
|
||||
channelKey := fmt.Sprintf("channel:%s:read", channel)
|
||||
args := redis.Args{}.Add(channelKey).AddFlat(itemUIDs)
|
||||
|
||||
if _, err := conn.Do("SADD", args...); err != nil {
|
||||
return fmt.Errorf("marking read for channel %s has failed: %s", channel, err)
|
||||
}
|
||||
|
||||
zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)
|
||||
args = redis.Args{}.Add(zchannelKey).AddFlat(itemUIDs)
|
||||
|
||||
if _, err := conn.Do("ZREM", args...); err != nil {
|
||||
return fmt.Errorf("marking read for channel %s has failed: %s", channel, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (timeline *redisSortedSetTimeline) MarkUnread(uids []string) error {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
/*
|
||||
* REDIS STREAMS TIMELINE
|
||||
*/
|
||||
func (timeline *redisStreamTimeline) Init() error {
|
||||
timeline.channelKey = fmt.Sprintf("stream:%s", timeline.channel)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (timeline *redisStreamTimeline) Items(before, after string) (microsub.Timeline, error) {
|
||||
conn := pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
if before == "" {
|
||||
before = "-"
|
||||
}
|
||||
|
||||
if after == "" {
|
||||
after = "+"
|
||||
}
|
||||
|
||||
results, err := redis.Values(conn.Do("XREVRANGE", redis.Args{}.Add(timeline.channelKey, after, before, "COUNT", "20")...))
|
||||
if err != nil {
|
||||
return microsub.Timeline{}, err
|
||||
}
|
||||
|
||||
var forRedis redisItem
|
||||
|
||||
var items []microsub.Item
|
||||
for _, result := range results {
|
||||
if value, ok := result.([]interface{}); ok {
|
||||
id, ok2 := value[0].([]uint8)
|
||||
|
||||
if item, ok3 := value[1].([]interface{}); ok3 {
|
||||
err = redis.ScanStruct(item, &forRedis)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
item := forRedis.Item()
|
||||
if ok2 {
|
||||
item.ID = string(id)
|
||||
}
|
||||
items = append(items, item)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return microsub.Timeline{
|
||||
Items: items,
|
||||
Paging: microsub.Pagination{
|
||||
After: items[len(items)-1].ID,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (timeline *redisStreamTimeline) AddItem(item microsub.Item) error {
|
||||
conn := pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
if item.Published == "" {
|
||||
item.Published = time.Now().Format(time.RFC3339)
|
||||
}
|
||||
|
||||
data, err := json.Marshal(item)
|
||||
if err != nil {
|
||||
log.Printf("error while creating item for redis: %v\n", err)
|
||||
return err
|
||||
}
|
||||
|
||||
args := redis.Args{}.Add(timeline.channelKey).Add("*").Add("ID").Add(item.ID).Add("Published").Add(item.Published).Add("Read").Add(item.Read).Add("Data").Add(data)
|
||||
|
||||
_, err = redis.String(conn.Do("XADD", args...))
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (timeline *redisStreamTimeline) Count() (int, error) {
|
||||
conn := pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
return redis.Int(conn.Do("XLEN", timeline.channelKey))
|
||||
}
|
||||
|
||||
func (timeline *redisStreamTimeline) MarkRead(uids []string) error {
|
||||
// panic("implement me")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (timeline *redisStreamTimeline) MarkUnread(uids []string) error {
|
||||
// panic("implement me")
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue
Block a user