Compare commits
5 Commits
a2f76c6071
...
1e4847cf35
| Author | SHA1 | Date | |
|---|---|---|---|
| 1e4847cf35 | |||
| 22cbf01566 | |||
| 723b5b24d5 | |||
| 67c8b03467 | |||
| 56d9981a4a |
|
|
@ -32,8 +32,6 @@ import (
|
||||||
var authHeaderRegex = regexp.MustCompile("^Bearer (.+)$")
|
var authHeaderRegex = regexp.MustCompile("^Bearer (.+)$")
|
||||||
|
|
||||||
func (b *memoryBackend) cachedCheckAuthToken(conn redis.Conn, header string, r *auth.TokenResponse) bool {
|
func (b *memoryBackend) cachedCheckAuthToken(conn redis.Conn, header string, r *auth.TokenResponse) bool {
|
||||||
log.Println("Cached checking Auth Token")
|
|
||||||
|
|
||||||
tokens := authHeaderRegex.FindStringSubmatch(header)
|
tokens := authHeaderRegex.FindStringSubmatch(header)
|
||||||
|
|
||||||
if len(tokens) != 2 {
|
if len(tokens) != 2 {
|
||||||
|
|
@ -54,8 +52,10 @@ func (b *memoryBackend) cachedCheckAuthToken(conn redis.Conn, header string, r *
|
||||||
|
|
||||||
authorized = b.checkAuthToken(header, r)
|
authorized = b.checkAuthToken(header, r)
|
||||||
if authorized {
|
if authorized {
|
||||||
fmt.Printf("Token response: %#v\n", r)
|
err = setCachedTokenResponseValue(conn, key, r)
|
||||||
setCachedTokenResponseValue(conn, key, r)
|
if err != nil {
|
||||||
|
log.Println(err)
|
||||||
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -78,6 +78,12 @@ type redisItem struct {
|
||||||
Data []byte
|
Data []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ri *redisItem) Item() microsub.Item {
|
||||||
|
var item microsub.Item
|
||||||
|
_ = json.Unmarshal(ri.Data, &item)
|
||||||
|
return item
|
||||||
|
}
|
||||||
|
|
||||||
type fetch2 struct{}
|
type fetch2 struct{}
|
||||||
|
|
||||||
func (f *fetch2) Fetch(url string) (*http.Response, error) {
|
func (f *fetch2) Fetch(url string) (*http.Response, error) {
|
||||||
|
|
@ -288,11 +294,19 @@ func (b *memoryBackend) run() {
|
||||||
for _, feedURL := range feeds[uid] {
|
for _, feedURL := range feeds[uid] {
|
||||||
resp, err := b.Fetch3(uid, feedURL)
|
resp, err := b.Fetch3(uid, feedURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
_ = b.channelAddItem("notifications", microsub.Item{
|
||||||
|
Type: "entry",
|
||||||
|
Name: fmt.Sprintf("Error while Fetch3 of %s: %v", feedURL, err),
|
||||||
|
Content: µsub.Content{
|
||||||
|
Text: "error while fetching feed",
|
||||||
|
},
|
||||||
|
UID: time.Now().String(),
|
||||||
|
})
|
||||||
log.Printf("Error while Fetch3 of %s: %v\n", feedURL, err)
|
log.Printf("Error while Fetch3 of %s: %v\n", feedURL, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
_ = b.ProcessContent(uid, feedURL, resp.Header.Get("Content-Type"), resp.Body)
|
||||||
b.ProcessContent(uid, feedURL, resp.Header.Get("Content-Type"), resp.Body)
|
_ = resp.Body.Close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -305,89 +319,17 @@ func (b *memoryBackend) run() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *memoryBackend) TimelineGet(before, after, channel string) (microsub.Timeline, error) {
|
func (b *memoryBackend) TimelineGet(before, after, channel string) (microsub.Timeline, error) {
|
||||||
conn := pool.Get()
|
|
||||||
defer conn.Close()
|
|
||||||
|
|
||||||
items := []microsub.Item{}
|
|
||||||
|
|
||||||
log.Printf("TimelineGet %s\n", channel)
|
log.Printf("TimelineGet %s\n", channel)
|
||||||
feeds, err := b.FollowGetList(channel)
|
|
||||||
|
// Check if feed exists
|
||||||
|
_, err := b.FollowGetList(channel)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return microsub.Timeline{Items: items}, err
|
return microsub.Timeline{Items: []microsub.Item{}}, err
|
||||||
}
|
|
||||||
log.Println(feeds)
|
|
||||||
|
|
||||||
zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)
|
|
||||||
|
|
||||||
afterScore := "-inf"
|
|
||||||
if len(after) != 0 {
|
|
||||||
afterScore = "(" + after
|
|
||||||
}
|
|
||||||
beforeScore := "+inf"
|
|
||||||
if len(before) != 0 {
|
|
||||||
beforeScore = "(" + before
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var itemJSONs [][]byte
|
timelineBackend := b.getTimeline(channel)
|
||||||
|
|
||||||
itemScores, err := redis.Strings(
|
return timelineBackend.Items(before, after)
|
||||||
conn.Do(
|
|
||||||
"ZRANGEBYSCORE",
|
|
||||||
zchannelKey,
|
|
||||||
afterScore,
|
|
||||||
beforeScore,
|
|
||||||
"LIMIT",
|
|
||||||
0,
|
|
||||||
20,
|
|
||||||
"WITHSCORES",
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return microsub.Timeline{
|
|
||||||
Paging: microsub.Pagination{},
|
|
||||||
Items: items,
|
|
||||||
}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(itemScores) >= 2 {
|
|
||||||
before = itemScores[1]
|
|
||||||
after = itemScores[len(itemScores)-1]
|
|
||||||
} else {
|
|
||||||
before = ""
|
|
||||||
after = ""
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := 0; i < len(itemScores); i += 2 {
|
|
||||||
itemID := itemScores[i]
|
|
||||||
itemJSON, err := redis.Bytes(conn.Do("HGET", itemID, "Data"))
|
|
||||||
if err != nil {
|
|
||||||
log.Println(err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
itemJSONs = append(itemJSONs, itemJSON)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, obj := range itemJSONs {
|
|
||||||
item := microsub.Item{}
|
|
||||||
err := json.Unmarshal(obj, &item)
|
|
||||||
if err != nil {
|
|
||||||
// FIXME: what should we do if one of the items doen't unmarshal?
|
|
||||||
log.Println(err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
item.Read = false
|
|
||||||
items = append(items, item)
|
|
||||||
}
|
|
||||||
paging := microsub.Pagination{
|
|
||||||
After: after,
|
|
||||||
Before: before,
|
|
||||||
}
|
|
||||||
|
|
||||||
return microsub.Timeline{
|
|
||||||
Paging: paging,
|
|
||||||
Items: items,
|
|
||||||
}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *memoryBackend) FollowGetList(uid string) ([]microsub.Feed, error) {
|
func (b *memoryBackend) FollowGetList(uid string) ([]microsub.Feed, error) {
|
||||||
|
|
@ -402,6 +344,14 @@ func (b *memoryBackend) FollowURL(uid string, url string) (microsub.Feed, error)
|
||||||
|
|
||||||
resp, err := b.Fetch3(uid, feed.URL)
|
resp, err := b.Fetch3(uid, feed.URL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
_ = b.channelAddItem("notifications", microsub.Item{
|
||||||
|
Type: "entry",
|
||||||
|
Name: fmt.Sprintf("Error while Fetch3 of %s: %v", feed.URL, err),
|
||||||
|
Content: µsub.Content{
|
||||||
|
Text: "error while fetching feed",
|
||||||
|
},
|
||||||
|
UID: time.Now().String(),
|
||||||
|
})
|
||||||
return feed, err
|
return feed, err
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
@ -410,9 +360,9 @@ func (b *memoryBackend) FollowURL(uid string, url string) (microsub.Feed, error)
|
||||||
b.Feeds[uid] = append(b.Feeds[uid], feed)
|
b.Feeds[uid] = append(b.Feeds[uid], feed)
|
||||||
b.lock.Unlock()
|
b.lock.Unlock()
|
||||||
|
|
||||||
b.ProcessContent(uid, feed.URL, resp.Header.Get("Content-Type"), resp.Body)
|
_ = b.ProcessContent(uid, feed.URL, resp.Header.Get("Content-Type"), resp.Body)
|
||||||
|
|
||||||
b.CreateFeed(url, uid)
|
_, _ = b.CreateFeed(url, uid)
|
||||||
|
|
||||||
return feed, nil
|
return feed, nil
|
||||||
}
|
}
|
||||||
|
|
@ -557,34 +507,17 @@ func (b *memoryBackend) PreviewURL(previewURL string) (microsub.Timeline, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *memoryBackend) MarkRead(channel string, uids []string) error {
|
func (b *memoryBackend) MarkRead(channel string, uids []string) error {
|
||||||
conn := pool.Get()
|
timeline := b.getTimeline(channel)
|
||||||
defer conn.Close()
|
err := timeline.MarkRead(uids)
|
||||||
|
|
||||||
itemUIDs := []string{}
|
|
||||||
for _, uid := range uids {
|
|
||||||
itemUIDs = append(itemUIDs, "item:"+uid)
|
|
||||||
}
|
|
||||||
|
|
||||||
channelKey := fmt.Sprintf("channel:%s:read", channel)
|
|
||||||
args := redis.Args{}.Add(channelKey).AddFlat(itemUIDs)
|
|
||||||
|
|
||||||
if _, err := conn.Do("SADD", args...); err != nil {
|
|
||||||
return fmt.Errorf("marking read for channel %s has failed: %s", channel, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)
|
|
||||||
args = redis.Args{}.Add(zchannelKey).AddFlat(itemUIDs)
|
|
||||||
|
|
||||||
if _, err := conn.Do("ZREM", args...); err != nil {
|
|
||||||
return fmt.Errorf("marking read for channel %s has failed: %s", channel, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err := b.updateChannelUnreadCount(conn, channel)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("Marking read success for %s %v\n", channel, itemUIDs)
|
err = b.updateChannelUnreadCount(channel)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
@ -600,13 +533,13 @@ func (b *memoryBackend) ProcessContent(channel, fetchURL, contentType string, bo
|
||||||
|
|
||||||
for _, item := range items {
|
for _, item := range items {
|
||||||
item.Read = false
|
item.Read = false
|
||||||
err = b.channelAddItemWithMatcher(conn, channel, item)
|
err = b.channelAddItemWithMatcher(channel, item)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("ERROR: %s\n", err)
|
log.Printf("ERROR: %s\n", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = b.updateChannelUnreadCount(conn, channel)
|
err = b.updateChannelUnreadCount(channel)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -620,7 +553,7 @@ func (b *memoryBackend) Fetch3(channel, fetchURL string) (*http.Response, error)
|
||||||
return Fetch2(fetchURL)
|
return Fetch2(fetchURL)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *memoryBackend) channelAddItemWithMatcher(conn redis.Conn, channel string, item microsub.Item) error {
|
func (b *memoryBackend) channelAddItemWithMatcher(channel string, item microsub.Item) error {
|
||||||
// an item is posted
|
// an item is posted
|
||||||
// check for all channels as channel
|
// check for all channels as channel
|
||||||
// if regex matches item
|
// if regex matches item
|
||||||
|
|
@ -642,7 +575,7 @@ func (b *memoryBackend) channelAddItemWithMatcher(conn redis.Conn, channel strin
|
||||||
|
|
||||||
if matchItem(item, re) {
|
if matchItem(item, re) {
|
||||||
log.Printf("Included %#v\n", item)
|
log.Printf("Included %#v\n", item)
|
||||||
err := b.channelAddItem(conn, channelKey, item)
|
err := b.channelAddItem(channelKey, item)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
@ -653,7 +586,7 @@ func (b *memoryBackend) channelAddItemWithMatcher(conn redis.Conn, channel strin
|
||||||
|
|
||||||
// Update all channels that have added items, because of the include matching
|
// Update all channels that have added items, because of the include matching
|
||||||
for _, value := range updatedChannels {
|
for _, value := range updatedChannels {
|
||||||
err := b.updateChannelUnreadCount(conn, value)
|
err := b.updateChannelUnreadCount(value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("error while updating unread count for %s: %s", value, err)
|
log.Printf("error while updating unread count for %s: %s", value, err)
|
||||||
continue
|
continue
|
||||||
|
|
@ -677,7 +610,7 @@ func (b *memoryBackend) channelAddItemWithMatcher(conn redis.Conn, channel strin
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return b.channelAddItem(conn, channel, item)
|
return b.channelAddItem(channel, item)
|
||||||
}
|
}
|
||||||
|
|
||||||
func matchItem(item microsub.Item, re *regexp.Regexp) bool {
|
func matchItem(item microsub.Item, re *regexp.Regexp) bool {
|
||||||
|
|
@ -706,67 +639,21 @@ func matchItemText(item microsub.Item, re *regexp.Regexp) bool {
|
||||||
return re.MatchString(item.Name)
|
return re.MatchString(item.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *memoryBackend) channelAddItem(conn redis.Conn, channel string, item microsub.Item) error {
|
func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) error {
|
||||||
zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)
|
timelineBackend := b.getTimeline(channel)
|
||||||
|
return timelineBackend.AddItem(item)
|
||||||
if item.Published == "" {
|
|
||||||
item.Published = time.Now().Format(time.RFC3339)
|
|
||||||
}
|
|
||||||
|
|
||||||
data, err := json.Marshal(item)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("error while creating item for redis: %v\n", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
forRedis := redisItem{
|
|
||||||
ID: item.ID,
|
|
||||||
Published: item.Published,
|
|
||||||
Read: item.Read,
|
|
||||||
Data: data,
|
|
||||||
}
|
|
||||||
|
|
||||||
itemKey := fmt.Sprintf("item:%s", item.ID)
|
|
||||||
_, err = redis.String(conn.Do("HMSET", redis.Args{}.Add(itemKey).AddFlat(&forRedis)...))
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error while writing item for redis: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
readChannelKey := fmt.Sprintf("channel:%s:read", channel)
|
|
||||||
isRead, err := redis.Bool(conn.Do("SISMEMBER", readChannelKey, itemKey))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if isRead {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
score, err := time.Parse(time.RFC3339, item.Published)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error can't parse %s as time", item.Published)
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = redis.Int64(conn.Do("ZADD", zchannelKey, score.Unix()*1.0, itemKey))
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error while zadding item %s to channel %s for redis: %v", itemKey, zchannelKey, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
b.sendMessage(microsub.Message("item added " + item.ID))
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *memoryBackend) updateChannelUnreadCount(conn redis.Conn, channel string) error {
|
func (b *memoryBackend) updateChannelUnreadCount(channel string) error {
|
||||||
b.lock.RLock()
|
b.lock.RLock()
|
||||||
c, exists := b.Channels[channel]
|
c, exists := b.Channels[channel]
|
||||||
b.lock.RUnlock()
|
b.lock.RUnlock()
|
||||||
|
|
||||||
if exists {
|
if exists {
|
||||||
zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)
|
timeline := b.getTimeline(channel)
|
||||||
unread, err := redis.Int(conn.Do("ZCARD", zchannelKey))
|
unread, err := timeline.Count()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error: while updating channel unread count for %s: %s", channel, err)
|
return err
|
||||||
}
|
}
|
||||||
defer b.save()
|
defer b.save()
|
||||||
c.Unread = unread
|
c.Unread = unread
|
||||||
|
|
|
||||||
|
|
@ -111,8 +111,8 @@ func (h *micropubHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
item.Read = false
|
item.Read = false
|
||||||
id, _ := redis.Int(conn.Do("INCR", "source:"+sourceID+"next_id"))
|
id, _ := redis.Int(conn.Do("INCR", "source:"+sourceID+"next_id"))
|
||||||
item.ID = fmt.Sprintf("%x", sha1.Sum([]byte(fmt.Sprintf("source:%s:%d", sourceID, id))))
|
item.ID = fmt.Sprintf("%x", sha1.Sum([]byte(fmt.Sprintf("source:%s:%d", sourceID, id))))
|
||||||
h.Backend.channelAddItemWithMatcher(conn, channel, item)
|
err = h.Backend.channelAddItemWithMatcher(channel, item)
|
||||||
err = h.Backend.updateChannelUnreadCount(conn, channel)
|
err = h.Backend.updateChannelUnreadCount(channel)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("error: while updating channel unread count for %s: %s\n", channel, err)
|
log.Printf("error: while updating channel unread count for %s: %s\n", channel, err)
|
||||||
}
|
}
|
||||||
|
|
@ -121,7 +121,7 @@ func (h *micropubHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
|
||||||
enc := json.NewEncoder(w)
|
enc := json.NewEncoder(w)
|
||||||
enc.Encode(map[string]string{
|
err = enc.Encode(map[string]string{
|
||||||
"ok": "1",
|
"ok": "1",
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
||||||
334
cmd/eksterd/timeline.go
Normal file
334
cmd/eksterd/timeline.go
Normal file
|
|
@ -0,0 +1,334 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/gomodule/redigo/redis"
|
||||||
|
"p83.nl/go/ekster/pkg/microsub"
|
||||||
|
)
|
||||||
|
|
||||||
|
type TimelineBackend interface {
|
||||||
|
Init() error
|
||||||
|
|
||||||
|
Items(before, after string) (microsub.Timeline, error)
|
||||||
|
AddItem(item microsub.Item) error
|
||||||
|
Count() (int, error)
|
||||||
|
|
||||||
|
MarkRead(uids []string) error
|
||||||
|
|
||||||
|
// Not used at the moment
|
||||||
|
// MarkUnread(uids []string) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type redisSortedSetTimeline struct {
|
||||||
|
channel string
|
||||||
|
}
|
||||||
|
|
||||||
|
type redisStreamTimeline struct {
|
||||||
|
channel, channelKey string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *memoryBackend) getTimeline(channel string) TimelineBackend {
|
||||||
|
// TODO: fetch timeline type from channel
|
||||||
|
timelineType := "sorted-set"
|
||||||
|
if channel == "notifications" {
|
||||||
|
timelineType = "stream"
|
||||||
|
}
|
||||||
|
if timelineType == "sorted-set" {
|
||||||
|
timeline := &redisSortedSetTimeline{channel}
|
||||||
|
err := timeline.Init()
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return timeline
|
||||||
|
}
|
||||||
|
if timelineType == "stream" {
|
||||||
|
timeline := &redisStreamTimeline{channel: channel}
|
||||||
|
err := timeline.Init()
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return timeline
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* REDIS SORTED SETS TIMELINE
|
||||||
|
*/
|
||||||
|
func (timeline *redisSortedSetTimeline) Init() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (timeline *redisSortedSetTimeline) Items(before, after string) (microsub.Timeline, error) {
|
||||||
|
conn := pool.Get()
|
||||||
|
defer conn.Close()
|
||||||
|
|
||||||
|
items := []microsub.Item{}
|
||||||
|
|
||||||
|
channel := timeline.channel
|
||||||
|
|
||||||
|
zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)
|
||||||
|
|
||||||
|
afterScore := "-inf"
|
||||||
|
if len(after) != 0 {
|
||||||
|
afterScore = "(" + after
|
||||||
|
}
|
||||||
|
beforeScore := "+inf"
|
||||||
|
if len(before) != 0 {
|
||||||
|
beforeScore = "(" + before
|
||||||
|
}
|
||||||
|
|
||||||
|
var itemJSONs [][]byte
|
||||||
|
|
||||||
|
itemScores, err := redis.Strings(
|
||||||
|
conn.Do(
|
||||||
|
"ZRANGEBYSCORE",
|
||||||
|
zchannelKey,
|
||||||
|
afterScore,
|
||||||
|
beforeScore,
|
||||||
|
"LIMIT",
|
||||||
|
0,
|
||||||
|
20,
|
||||||
|
"WITHSCORES",
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return microsub.Timeline{
|
||||||
|
Paging: microsub.Pagination{},
|
||||||
|
Items: items,
|
||||||
|
}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(itemScores) >= 2 {
|
||||||
|
before = itemScores[1]
|
||||||
|
after = itemScores[len(itemScores)-1]
|
||||||
|
} else {
|
||||||
|
before = ""
|
||||||
|
after = ""
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < len(itemScores); i += 2 {
|
||||||
|
itemID := itemScores[i]
|
||||||
|
itemJSON, err := redis.Bytes(conn.Do("HGET", itemID, "Data"))
|
||||||
|
if err != nil {
|
||||||
|
log.Println(err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
itemJSONs = append(itemJSONs, itemJSON)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, obj := range itemJSONs {
|
||||||
|
item := microsub.Item{}
|
||||||
|
err := json.Unmarshal(obj, &item)
|
||||||
|
if err != nil {
|
||||||
|
// FIXME: what should we do if one of the items doen't unmarshal?
|
||||||
|
log.Println(err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
item.Read = false
|
||||||
|
items = append(items, item)
|
||||||
|
}
|
||||||
|
paging := microsub.Pagination{
|
||||||
|
After: after,
|
||||||
|
Before: before,
|
||||||
|
}
|
||||||
|
|
||||||
|
return microsub.Timeline{
|
||||||
|
Paging: paging,
|
||||||
|
Items: items,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (timeline *redisSortedSetTimeline) AddItem(item microsub.Item) error {
|
||||||
|
conn := pool.Get()
|
||||||
|
defer conn.Close()
|
||||||
|
|
||||||
|
channel := timeline.channel
|
||||||
|
zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)
|
||||||
|
|
||||||
|
if item.Published == "" {
|
||||||
|
item.Published = time.Now().Format(time.RFC3339)
|
||||||
|
}
|
||||||
|
|
||||||
|
data, err := json.Marshal(item)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("error while creating item for redis: %v\n", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
forRedis := redisItem{
|
||||||
|
ID: item.ID,
|
||||||
|
Published: item.Published,
|
||||||
|
Read: item.Read,
|
||||||
|
Data: data,
|
||||||
|
}
|
||||||
|
|
||||||
|
itemKey := fmt.Sprintf("item:%s", item.ID)
|
||||||
|
_, err = redis.String(conn.Do("HMSET", redis.Args{}.Add(itemKey).AddFlat(&forRedis)...))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error while writing item for redis: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
readChannelKey := fmt.Sprintf("channel:%s:read", channel)
|
||||||
|
isRead, err := redis.Bool(conn.Do("SISMEMBER", readChannelKey, itemKey))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if isRead {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
score, err := time.Parse(time.RFC3339, item.Published)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error can't parse %s as time", item.Published)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = redis.Int64(conn.Do("ZADD", zchannelKey, score.Unix()*1.0, itemKey))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error while zadding item %s to channel %s for redis: %v", itemKey, zchannelKey, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// FIXME: send message to events...
|
||||||
|
// b.sendMessage(microsub.Message("item added " + item.ID))
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (timeline *redisSortedSetTimeline) Count() (int, error) {
|
||||||
|
conn := pool.Get()
|
||||||
|
defer conn.Close()
|
||||||
|
|
||||||
|
channel := timeline.channel
|
||||||
|
zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)
|
||||||
|
unread, err := redis.Int(conn.Do("ZCARD", zchannelKey))
|
||||||
|
if err != nil {
|
||||||
|
return -1, fmt.Errorf("while updating channel unread count for %s: %s", channel, err)
|
||||||
|
}
|
||||||
|
return unread, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (timeline *redisSortedSetTimeline) MarkRead(uids []string) error {
|
||||||
|
conn := pool.Get()
|
||||||
|
defer conn.Close()
|
||||||
|
|
||||||
|
channel := timeline.channel
|
||||||
|
|
||||||
|
itemUIDs := []string{}
|
||||||
|
for _, uid := range uids {
|
||||||
|
itemUIDs = append(itemUIDs, "item:"+uid)
|
||||||
|
}
|
||||||
|
|
||||||
|
channelKey := fmt.Sprintf("channel:%s:read", channel)
|
||||||
|
args := redis.Args{}.Add(channelKey).AddFlat(itemUIDs)
|
||||||
|
|
||||||
|
if _, err := conn.Do("SADD", args...); err != nil {
|
||||||
|
return fmt.Errorf("marking read for channel %s has failed: %s", channel, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)
|
||||||
|
args = redis.Args{}.Add(zchannelKey).AddFlat(itemUIDs)
|
||||||
|
|
||||||
|
if _, err := conn.Do("ZREM", args...); err != nil {
|
||||||
|
return fmt.Errorf("marking read for channel %s has failed: %s", channel, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (timeline *redisSortedSetTimeline) MarkUnread(uids []string) error {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* REDIS STREAMS TIMELINE
|
||||||
|
*/
|
||||||
|
func (timeline *redisStreamTimeline) Init() error {
|
||||||
|
timeline.channelKey = fmt.Sprintf("stream:%s", timeline.channel)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (timeline *redisStreamTimeline) Items(before, after string) (microsub.Timeline, error) {
|
||||||
|
conn := pool.Get()
|
||||||
|
defer conn.Close()
|
||||||
|
|
||||||
|
if before == "" {
|
||||||
|
before = "-"
|
||||||
|
}
|
||||||
|
|
||||||
|
if after == "" {
|
||||||
|
after = "+"
|
||||||
|
}
|
||||||
|
|
||||||
|
results, err := redis.Values(conn.Do("XREVRANGE", redis.Args{}.Add(timeline.channelKey, after, before, "COUNT", "20")...))
|
||||||
|
if err != nil {
|
||||||
|
return microsub.Timeline{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var forRedis redisItem
|
||||||
|
|
||||||
|
var items []microsub.Item
|
||||||
|
for _, result := range results {
|
||||||
|
if value, ok := result.([]interface{}); ok {
|
||||||
|
id, ok2 := value[0].([]uint8)
|
||||||
|
|
||||||
|
if item, ok3 := value[1].([]interface{}); ok3 {
|
||||||
|
err = redis.ScanStruct(item, &forRedis)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
item := forRedis.Item()
|
||||||
|
if ok2 {
|
||||||
|
item.ID = string(id)
|
||||||
|
}
|
||||||
|
items = append(items, item)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return microsub.Timeline{
|
||||||
|
Items: items,
|
||||||
|
Paging: microsub.Pagination{},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (timeline *redisStreamTimeline) AddItem(item microsub.Item) error {
|
||||||
|
conn := pool.Get()
|
||||||
|
defer conn.Close()
|
||||||
|
|
||||||
|
if item.Published == "" {
|
||||||
|
item.Published = time.Now().Format(time.RFC3339)
|
||||||
|
}
|
||||||
|
|
||||||
|
data, err := json.Marshal(item)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("error while creating item for redis: %v\n", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
args := redis.Args{}.Add(timeline.channelKey).Add("*").Add("ID").Add(item.ID).Add("Published").Add(item.Published).Add("Read").Add(item.Read).Add("Data").Add(data)
|
||||||
|
|
||||||
|
_, err = redis.String(conn.Do("XADD", args...))
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (timeline *redisStreamTimeline) Count() (int, error) {
|
||||||
|
conn := pool.Get()
|
||||||
|
defer conn.Close()
|
||||||
|
|
||||||
|
return redis.Int(conn.Do("XLEN", timeline.channelKey))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (timeline *redisStreamTimeline) MarkRead(uids []string) error {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (timeline *redisStreamTimeline) MarkUnread(uids []string) error {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue
Block a user