From c9543e7a831668a192b006aaac90658319fec6e5 Mon Sep 17 00:00:00 2001 From: Peter Stuifzand Date: Sat, 16 Apr 2022 14:55:22 +0200 Subject: [PATCH 1/3] Problem: feeds are fetched every hour and haven't changed, or have changed often Solution: implement histogram buckets for feed polling The feeds are now implemented in tiers. The tier is the bucket the feed is in. To calculate the minutes to wait for when to fetch the next feed, you add 2**tier minutes to the current time. The feeds to fetch are filter by this time. --- .../000007_alter_table_add_feed_tier.down.sql | 4 + .../000007_alter_table_add_feed_tier.up.sql | 4 + cmd/eksterd/hubbackend.go | 10 +- cmd/eksterd/memory.go | 238 +++++++++++++----- cmd/eksterd/micropub.go | 2 +- 5 files changed, 190 insertions(+), 68 deletions(-) create mode 100644 cmd/eksterd/db/migrations/000007_alter_table_add_feed_tier.down.sql create mode 100644 cmd/eksterd/db/migrations/000007_alter_table_add_feed_tier.up.sql diff --git a/cmd/eksterd/db/migrations/000007_alter_table_add_feed_tier.down.sql b/cmd/eksterd/db/migrations/000007_alter_table_add_feed_tier.down.sql new file mode 100644 index 0000000..f6fbfe4 --- /dev/null +++ b/cmd/eksterd/db/migrations/000007_alter_table_add_feed_tier.down.sql @@ -0,0 +1,4 @@ +alter table "feeds" + drop column "tier", + drop column "unmodified", + drop column "next_fetch_at"; diff --git a/cmd/eksterd/db/migrations/000007_alter_table_add_feed_tier.up.sql b/cmd/eksterd/db/migrations/000007_alter_table_add_feed_tier.up.sql new file mode 100644 index 0000000..18eb848 --- /dev/null +++ b/cmd/eksterd/db/migrations/000007_alter_table_add_feed_tier.up.sql @@ -0,0 +1,4 @@ +alter table "feeds" + add column "tier" int default 0, + add column "unmodified" int default 0, + add column "next_fetch_at" timestamptz; diff --git a/cmd/eksterd/hubbackend.go b/cmd/eksterd/hubbackend.go index 382bd5f..f3254ec 100644 --- a/cmd/eksterd/hubbackend.go +++ b/cmd/eksterd/hubbackend.go @@ -158,7 +158,7 @@ func (h *hubIncomingBackend) UpdateFeed(processor ContentProcessor, subscription } log.Printf("Updating feed %s %q in %q\n", feedID, topic, channel) - err = processor.ProcessContent(channel, feedID, topic, contentType, body) + _, err = processor.ProcessContent(channel, feedID, topic, contentType, body) if err != nil { log.Printf("could not process content for channel %s: %s", channel, err) } @@ -223,22 +223,24 @@ func (h *hubIncomingBackend) Subscribe(feed *Feed) error { } func (h *hubIncomingBackend) run() error { - ticker := time.NewTicker(10 * time.Minute) + ticker := time.NewTicker(1 * time.Minute) quit := make(chan struct{}) go func() { for { select { case <-ticker.C: - log.Println("Getting feeds for WebSub") + log.Println("Getting feeds for WebSub started") varWebsub.Add("runs", 1) feeds, err := h.Feeds() if err != nil { log.Println("Feeds failed:", err) + log.Println("Getting feeds for WebSub completed") continue } + log.Printf("Found %d feeds", len(feeds)) for _, feed := range feeds { log.Printf("Looking at %s\n", feed.URL) if feed.ResubscribeAt != nil && time.Now().After(*feed.ResubscribeAt) { @@ -254,6 +256,8 @@ func (h *hubIncomingBackend) run() error { } } } + + log.Println("Getting feeds for WebSub completed") case <-quit: ticker.Stop() return diff --git a/cmd/eksterd/memory.go b/cmd/eksterd/memory.go index c17a9c6..5f0283d 100644 --- a/cmd/eksterd/memory.go +++ b/cmd/eksterd/memory.go @@ -27,9 +27,11 @@ import ( "io" "io/ioutil" "log" + "math" "net/http" "net/url" "regexp" + "strconv" "strings" "sync" "time" @@ -102,6 +104,15 @@ type newItemMessage struct { Channel string `json:"channel"` } +type feed struct { + UID string // channel + ID int + URL string + Tier int + Unmodified int + NextFetchAt time.Time +} + func (b *memoryBackend) AuthTokenAccepted(header string, r *auth.TokenResponse) (bool, error) { conn := b.pool.Get() defer func() { @@ -219,15 +230,22 @@ func (b *memoryBackend) ChannelsDelete(uid string) error { b.broker.Notifier <- sse.Message{Event: "delete channel", Object: channelDeletedMessage{1, uid}} return nil } - -type feed struct { - UID string // channel - ID int - URL string +func (b *memoryBackend) updateFeed(feed feed) error { + _, err := b.database.Exec(` +UPDATE "feeds" +SET "tier" = $2, "unmodified" = $3, "next_fetch_at" = $4 +WHERE "id" = $1 +`, feed.ID, feed.Tier, feed.Unmodified, feed.NextFetchAt) + return err } func (b *memoryBackend) getFeeds() ([]feed, error) { - rows, err := b.database.Query(`SELECT "f"."id", "f"."url", "c"."uid" FROM "feeds" AS "f" INNER JOIN public.channels c on c.id = f.channel_id`) + rows, err := b.database.Query(` +SELECT "f"."id", "f"."url", "c"."uid", "f"."tier","f"."unmodified","f"."next_fetch_at" +FROM "feeds" AS "f" +INNER JOIN public.channels c ON c.id = f.channel_id +WHERE next_fetch_at IS NULL OR next_fetch_at < now() +`) if err != nil { return nil, err } @@ -236,29 +254,49 @@ func (b *memoryBackend) getFeeds() ([]feed, error) { for rows.Next() { var feedID int var feedURL, UID string + var tier, unmodified int + var nextFetchAt sql.NullTime - err = rows.Scan(&feedID, &feedURL, &UID) + err = rows.Scan(&feedID, &feedURL, &UID, &tier, &unmodified, &nextFetchAt) if err != nil { log.Printf("while scanning feeds: %s", err) continue } - feeds = append(feeds, feed{UID, feedID, feedURL}) + var fetchTime time.Time + if nextFetchAt.Valid { + fetchTime = nextFetchAt.Time + } else { + fetchTime = time.Now() + } + + feeds = append( + feeds, + feed{ + UID: UID, + ID: feedID, + URL: feedURL, + Tier: tier, + Unmodified: unmodified, + NextFetchAt: fetchTime, + }, + ) } return feeds, nil } func (b *memoryBackend) run() { - b.ticker = time.NewTicker(10 * time.Minute) + b.ticker = time.NewTicker(1 * time.Minute) b.quit = make(chan struct{}) go func() { + b.RefreshFeeds() + for { select { case <-b.ticker.C: b.RefreshFeeds() - case <-b.quit: b.ticker.Stop() return @@ -268,43 +306,89 @@ func (b *memoryBackend) run() { } func (b *memoryBackend) RefreshFeeds() { + log.Println("Feed update process started") + defer log.Println("Feed update process completed") + feeds, err := b.getFeeds() if err != nil { return } - count := 0 + log.Printf("Found %d feeds", len(feeds)) + count := 0 for _, feed := range feeds { - feedURL := feed.URL - feedID := feed.ID - uid := feed.UID - log.Println("Processing", feedURL) - resp, err := b.Fetch3(uid, feedURL) + log.Println("Processing", feed.URL) + err := b.refreshFeed(feed) if err != nil { - log.Printf("Error while Fetch3 of %s: %v\n", feedURL, err) - b.addNotification("Error while fetching feed", feedURL, err) - count++ + b.addNotification("Error while fetching feed", feed, err) continue } - err = b.ProcessContent(uid, fmt.Sprintf("%d", feedID), feedURL, resp.Header.Get("Content-Type"), resp.Body) - if err != nil { - log.Printf("Error while processing content for %s: %v\n", feedURL, err) - b.addNotification("Error while processing feed", feedURL, err) - count++ - continue - } - _ = resp.Body.Close() + + count++ } if count > 0 { _ = b.updateChannelUnreadCount("notifications") } + log.Printf("Processed %d feeds", count) } -func (b *memoryBackend) addNotification(name string, feedURL string, err error) { - err = b.channelAddItem("notifications", microsub.Item{ +func (b *memoryBackend) refreshFeed(feed feed) error { + resp, err := b.Fetch3(feed.UID, feed.URL) + if err != nil { + return fmt.Errorf("while Fetch3 of %s: %w", feed.URL, err) + } + defer resp.Body.Close() + + changed, err := b.ProcessContent(feed.UID, fmt.Sprintf("%d", feed.ID), feed.URL, resp.Header.Get("Content-Type"), resp.Body) + if err != nil { + return fmt.Errorf("in ProcessContent of %s: %w", feed.URL, err) + } + + if changed { + feed.Tier-- + } else { + feed.Unmodified++ + } + + if feed.Unmodified >= 2 { + feed.Tier++ + feed.Unmodified = 0 + } + + if feed.Tier > 10 { + feed.Tier = 10 + } + + if feed.Tier < 0 { + feed.Tier = 0 + } + + minutes := time.Duration(math.Ceil(math.Exp2(float64(feed.Tier)))) + + feed.NextFetchAt = time.Now().Add(minutes * time.Minute) + + log.Printf("Next Fetch in %d minutes at %v", minutes, feed.NextFetchAt.Format(time.RFC3339)) + + err = b.updateFeed(feed) + if err != nil { + log.Printf("Error: while updating feed %v: %v", feed, err) + // don't return error, because it becomes a notification + return nil + } + + return nil +} + +func (b *memoryBackend) addNotification(name string, feed feed, err error) { + _, err = b.channelAddItem("notifications", microsub.Item{ Type: "entry", + Source: µsub.Source{ + ID: strconv.Itoa(feed.ID), + URL: feed.URL, + Name: feed.URL, + }, Name: name, Content: µsub.Content{ Text: fmt.Sprintf("ERROR: while updating feed: %s", err), @@ -354,7 +438,7 @@ func (b *memoryBackend) FollowGetList(uid string) ([]microsub.Feed, error) { } func (b *memoryBackend) FollowURL(uid string, url string) (microsub.Feed, error) { - feed := microsub.Feed{Type: "feed", URL: url} + subFeed := microsub.Feed{Type: "feed", URL: url} var channelID int err := b.database.QueryRow(`SELECT "id" FROM "channels" WHERE "uid" = $1`, uid).Scan(&channelID) @@ -367,28 +451,36 @@ func (b *memoryBackend) FollowURL(uid string, url string) (microsub.Feed, error) var feedID int err = b.database.QueryRow( - `INSERT INTO "feeds" ("channel_id", "url") VALUES ($1, $2) RETURNING "id"`, + `INSERT INTO "feeds" ("channel_id", "url", "tier", "unmodified", "next_fetch_at") VALUES ($1, $2, 1, 0, now()) RETURNING "id"`, channelID, - feed.URL, + subFeed.URL, ).Scan(&feedID) if err != nil { - return feed, err + return subFeed, err } - resp, err := b.Fetch3(uid, feed.URL) + var newFeed = feed{ + ID: feedID, + UID: uid, + URL: url, + Tier: 1, + Unmodified: 0, + NextFetchAt: time.Now(), + } + resp, err := b.Fetch3(uid, subFeed.URL) if err != nil { log.Println(err) - b.addNotification("Error while fetching feed", feed.URL, err) + b.addNotification("Error while fetching feed", newFeed, err) _ = b.updateChannelUnreadCount("notifications") - return feed, err + return subFeed, err } defer resp.Body.Close() - _ = b.ProcessContent(uid, fmt.Sprintf("%d", feedID), feed.URL, resp.Header.Get("Content-Type"), resp.Body) + _, _ = b.ProcessContent(uid, fmt.Sprintf("%d", feedID), subFeed.URL, resp.Header.Get("Content-Type"), resp.Body) _, _ = b.hubBackend.CreateFeed(url) - return feed, nil + return subFeed, nil } func (b *memoryBackend) UnfollowURL(uid string, url string) error { @@ -584,31 +676,35 @@ func ProcessSourcedItems(fetcher fetch.Fetcher, fetchURL, contentType string, bo // ContentProcessor processes content for a channel and feed type ContentProcessor interface { - ProcessContent(channel, feedID, fetchURL, contentType string, body io.Reader) error + ProcessContent(channel, feedID, fetchURL, contentType string, body io.Reader) (bool, error) } -func (b *memoryBackend) ProcessContent(channel, feedID, fetchURL, contentType string, body io.Reader) error { +// ProcessContent processes content of a feed, returns if the feed has changed or not +func (b *memoryBackend) ProcessContent(channel, feedID, fetchURL, contentType string, body io.Reader) (bool, error) { cachingFetch := WithCaching(b.pool, fetch.FetcherFunc(Fetch2)) items, err := ProcessSourcedItems(cachingFetch, fetchURL, contentType, body) if err != nil { - return err + return false, err } + changed := false + for _, item := range items { item.Source.ID = feedID - err = b.channelAddItemWithMatcher(channel, item) + added, err := b.channelAddItemWithMatcher(channel, item) if err != nil { log.Printf("ERROR: (feedID=%s) %s\n", feedID, err) } + changed = changed && added } err = b.updateChannelUnreadCount(channel) if err != nil { - return err + return changed, err } - return nil + return changed, nil } // Fetch3 fills stuff @@ -617,17 +713,12 @@ func (b *memoryBackend) Fetch3(channel, fetchURL string) (*http.Response, error) return Fetch2(fetchURL) } -func (b *memoryBackend) channelAddItemWithMatcher(channel string, item microsub.Item) error { +func (b *memoryBackend) channelAddItemWithMatcher(channel string, item microsub.Item) (bool, error) { // an item is posted // check for all channels as channel // if regex matches item // - add item to channel - err := addToSearch(item, channel) - if err != nil { - return fmt.Errorf("addToSearch in channelAddItemWithMatcher: %v", err) - } - var updatedChannels []string b.lock.RLock() @@ -640,23 +731,23 @@ func (b *memoryBackend) channelAddItemWithMatcher(channel string, item microsub. switch v { case "repost": if len(item.RepostOf) > 0 { - return nil + return false, nil } case "like": if len(item.LikeOf) > 0 { - return nil + return false, nil } case "bookmark": if len(item.BookmarkOf) > 0 { - return nil + return false, nil } case "reply": if len(item.InReplyTo) > 0 { - return nil + return false, nil } case "checkin": if item.Checkin != nil { - return nil + return false, nil } } } @@ -665,16 +756,24 @@ func (b *memoryBackend) channelAddItemWithMatcher(channel string, item microsub. re, err := regexp.Compile(setting.IncludeRegex) if err != nil { log.Printf("error in regexp: %q, %s\n", setting.IncludeRegex, err) - return nil + return false, nil } if matchItem(item, re) { log.Printf("Included %#v\n", item) - err := b.channelAddItem(channelKey, item) + added, err := b.channelAddItem(channelKey, item) if err != nil { continue } - updatedChannels = append(updatedChannels, channelKey) + + err = addToSearch(item, channel) + if err != nil { + return added, fmt.Errorf("addToSearch in channelAddItemWithMatcher: %v", err) + } + + if added { + updatedChannels = append(updatedChannels, channelKey) + } } } } @@ -697,15 +796,26 @@ func (b *memoryBackend) channelAddItemWithMatcher(channel string, item microsub. excludeRegex, err := regexp.Compile(setting.ExcludeRegex) if err != nil { log.Printf("error in regexp: %q\n", excludeRegex) - return nil + return false, nil } if matchItem(item, excludeRegex) { log.Printf("Excluded %#v\n", item) - return nil + return false, nil } } - return b.channelAddItem(channel, item) + added, err := b.channelAddItem(channel, item) + + if err != nil { + return added, err + } + + err = addToSearch(item, channel) + if err != nil { + return added, fmt.Errorf("addToSearch in channelAddItemWithMatcher: %v", err) + } + + return added, nil } func matchItem(item microsub.Item, re *regexp.Regexp) bool { @@ -734,11 +844,11 @@ func matchItemText(item microsub.Item, re *regexp.Regexp) bool { return re.MatchString(item.Name) } -func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) error { +func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) (bool, error) { timelineBackend := b.getTimeline(channel) added, err := timelineBackend.AddItem(item) if err != nil { - return err + return added, err } // Sent message to Server-Sent-Events @@ -746,7 +856,7 @@ func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) error b.broker.Notifier <- sse.Message{Event: "new item", Object: newItemMessage{item, channel}} } - return err + return added, err } func (b *memoryBackend) updateChannelUnreadCount(channel string) error { diff --git a/cmd/eksterd/micropub.go b/cmd/eksterd/micropub.go index f10776f..9db070c 100644 --- a/cmd/eksterd/micropub.go +++ b/cmd/eksterd/micropub.go @@ -99,7 +99,7 @@ func (h *micropubHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } item.ID = newID - err = h.Backend.channelAddItemWithMatcher(channel, *item) + _, err = h.Backend.channelAddItemWithMatcher(channel, *item) if err != nil { log.Printf("could not add item to channel %s: %v", channel, err) } From 179955dbc753740feabae3a1994f0db9f731a3f6 Mon Sep 17 00:00:00 2001 From: Peter Stuifzand Date: Sat, 16 Apr 2022 15:05:22 +0200 Subject: [PATCH 2/3] Problem: we use older Go version Solution: upgrade Go version to 1.18.1 --- .drone.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.drone.yml b/.drone.yml index 39292ac..b9c58ec 100644 --- a/.drone.yml +++ b/.drone.yml @@ -9,12 +9,13 @@ workspace: steps: - name: testing - image: golang:1.16-alpine + image: golang:1.18-alpine environment: CGO_ENABLED: 0 GOOS: linux GOARCH: amd64 commands: + - go version - apk --no-cache add git - go get -d -t ./... - go install honnef.co/go/tools/cmd/staticcheck@latest From a2f04e4d6ebf69e71260c01fca8f4e9c421a53c8 Mon Sep 17 00:00:00 2001 From: Peter Stuifzand Date: Sat, 16 Apr 2022 15:12:58 +0200 Subject: [PATCH 3/3] Problem: strings.Title is deprecated Solution: use golang.org/x/text/cases instead --- go.mod | 1 + go.sum | 1 + pkg/rss/rss_1.0.go | 8 ++++++-- pkg/rss/rss_2.0.go | 7 +++++-- 4 files changed, 13 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index 3aa331e..9ae83be 100644 --- a/go.mod +++ b/go.mod @@ -13,5 +13,6 @@ require ( github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.7.0 golang.org/x/net v0.0.0-20211013171255-e13a2654a71e + golang.org/x/text v0.3.7 willnorris.com/go/microformats v1.1.0 ) diff --git a/go.sum b/go.sum index 07b746f..ed77444 100644 --- a/go.sum +++ b/go.sum @@ -1234,6 +1234,7 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/pkg/rss/rss_1.0.go b/pkg/rss/rss_1.0.go index 278988c..ab22330 100644 --- a/pkg/rss/rss_1.0.go +++ b/pkg/rss/rss_1.0.go @@ -5,8 +5,10 @@ import ( "encoding/xml" "fmt" "sort" - "strings" "time" + + "golang.org/x/text/cases" + "golang.org/x/text/language" ) func parseRSS1(data []byte) (*Feed, error) { @@ -29,6 +31,8 @@ func parseRSS1(data []byte) (*Feed, error) { out.Description = channel.Description out.Link = channel.Link out.Image = channel.Image.Image() + + titleCaser := cases.Title(language.English) if channel.MinsToLive != 0 { sort.Ints(channel.SkipHours) next := time.Now().Add(time.Duration(channel.MinsToLive) * time.Minute) @@ -41,7 +45,7 @@ func parseRSS1(data []byte) (*Feed, error) { for trying { trying = false for _, day := range channel.SkipDays { - if strings.Title(day) == next.Weekday().String() { + if titleCaser.String(day) == next.Weekday().String() { next.Add(time.Duration(24-next.Hour()) * time.Hour) trying = true break diff --git a/pkg/rss/rss_2.0.go b/pkg/rss/rss_2.0.go index 137f099..c70d7b7 100644 --- a/pkg/rss/rss_2.0.go +++ b/pkg/rss/rss_2.0.go @@ -5,8 +5,10 @@ import ( "encoding/xml" "fmt" "sort" - "strings" "time" + + "golang.org/x/text/cases" + "golang.org/x/text/language" ) func parseRSS2(data []byte) (*Feed, error) { @@ -38,6 +40,7 @@ func parseRSS2(data []byte) (*Feed, error) { out.Image = channel.Image.Image() if channel.MinsToLive != 0 { + titleCaser := cases.Title(language.English) sort.Ints(channel.SkipHours) next := time.Now().Add(time.Duration(channel.MinsToLive) * time.Minute) for _, hour := range channel.SkipHours { @@ -49,7 +52,7 @@ func parseRSS2(data []byte) (*Feed, error) { for trying { trying = false for _, day := range channel.SkipDays { - if strings.Title(day) == next.Weekday().String() { + if titleCaser.String(day) == next.Weekday().String() { next.Add(time.Duration(24-next.Hour()) * time.Hour) trying = true break