From 42cee3c9a44f1f85eba91cbe4fe42471e164f530 Mon Sep 17 00:00:00 2001 From: Peter Stuifzand Date: Sat, 7 Apr 2018 20:00:48 +0200 Subject: [PATCH] Run fetch every hour --- cmd/server/main.go | 1 + cmd/server/memory.go | 48 +++++++++++++++++++++++++++++++++++--------- 2 files changed, 40 insertions(+), 9 deletions(-) diff --git a/cmd/server/main.go b/cmd/server/main.go index 0fc81c3..b79974f 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -370,5 +370,6 @@ func main() { http.Handle("/incoming/", &incomingHandler{ Backend: &hubBackend, }) + backend.(*memoryBackend).run() log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil)) } diff --git a/cmd/server/memory.go b/cmd/server/memory.go index ccd8e92..7d825f6 100644 --- a/cmd/server/memory.go +++ b/cmd/server/memory.go @@ -27,6 +27,7 @@ import ( "os" "reflect" "strings" + "time" "github.com/garyburd/redigo/redis" "github.com/pstuifzand/microsub-server/microsub" @@ -37,6 +38,9 @@ type memoryBackend struct { Channels map[string]microsub.Channel Feeds map[string][]microsub.Feed NextUid int + + ticker *time.Ticker + quit chan struct{} } type Debug interface { @@ -239,6 +243,27 @@ func mapToItem(result map[string]interface{}) microsub.Item { return item } +func (b *memoryBackend) run() { + b.ticker = time.NewTicker(1 * time.Hour) + b.quit = make(chan struct{}) + + go func() { + for { + select { + case <-b.ticker.C: + for uid, _ := range b.Channels { + for _, feed := range b.Feeds[uid] { + b.Fetch3(uid, feed.URL) + } + } + case <-b.quit: + b.ticker.Stop() + return + } + } + }() +} + func (b *memoryBackend) TimelineGet(after, before, channel string) microsub.Timeline { log.Printf("TimelineGet %s\n", channel) feeds := b.FollowGetList(channel) @@ -246,9 +271,9 @@ func (b *memoryBackend) TimelineGet(after, before, channel string) microsub.Time items := []microsub.Item{} - for _, feed := range feeds { - b.Fetch3(channel, feed.URL) - } + // for _, feed := range feeds { + // b.Fetch3(channel, feed.URL) + // } channelKey := fmt.Sprintf("channel:%s:posts", channel) @@ -283,7 +308,7 @@ func reverseSlice(s interface{}) { } func (b *memoryBackend) checkRead(channel string, uid string) bool { - args := redis.Args{}.Add(fmt.Sprintf("timeline:%s:read", channel)).Add(uid) + args := redis.Args{}.Add(fmt.Sprintf("timeline:%s:read", channel)).Add("item:" + uid) member, err := redis.Bool(b.Redis.Do("SISMEMBER", args...)) if err != nil { log.Printf("Checking read for channel %s item %s has failed\n", channel, uid) @@ -410,7 +435,6 @@ func (b *memoryBackend) PreviewURL(previewURL string) microsub.Timeline { items := []microsub.Item{} for _, r := range results { item := mapToItem(r) - log.Println(item) items = append(items, item) } return microsub.Timeline{ @@ -418,11 +442,17 @@ func (b *memoryBackend) PreviewURL(previewURL string) microsub.Timeline { } } -func (b *memoryBackend) MarkRead(channel string, itemUids []string) { - log.Printf("Marking read for %s %v\n", channel, itemUids) - args := redis.Args{}.Add(fmt.Sprintf("timeline:%s:read", channel)).AddFlat(itemUids) +func (b *memoryBackend) MarkRead(channel string, uids []string) { + log.Printf("Marking read for %s %v\n", channel, uids) + + itemUIDs := []string{} + for _, uid := range uids { + itemUIDs = append(itemUIDs, "item:"+uid) + } + + args := redis.Args{}.Add(fmt.Sprintf("timeline:%s:read", channel)).AddFlat(itemUIDs) if _, err := b.Redis.Do("SADD", args...); err != nil { log.Printf("Marking read for channel %s has failed\n", channel) } - log.Printf("Marking read success for %s %v\n", channel, itemUids) + log.Printf("Marking read success for %s %v\n", channel, itemUIDs) }