Problem: feeds are fetched every hour and haven't changed, or have changed often
Some checks failed
continuous-integration/drone/push Build is failing
continuous-integration/drone/pr Build is failing

Solution: implement histogram buckets for feed polling

The feeds are now implemented in tiers. The tier is the bucket the feed
is in. To calculate the minutes to wait for when to fetch the next feed,
you add 2**tier minutes to the current time.
The feeds to fetch are filter by this time.
This commit is contained in:
Peter Stuifzand 2022-04-16 14:55:22 +02:00
parent dd1cf843e4
commit c9543e7a83
Signed by: peter
GPG Key ID: 374322D56E5209E8
5 changed files with 190 additions and 68 deletions

View File

@ -0,0 +1,4 @@
alter table "feeds"
drop column "tier",
drop column "unmodified",
drop column "next_fetch_at";

View File

@ -0,0 +1,4 @@
alter table "feeds"
add column "tier" int default 0,
add column "unmodified" int default 0,
add column "next_fetch_at" timestamptz;

View File

@ -158,7 +158,7 @@ func (h *hubIncomingBackend) UpdateFeed(processor ContentProcessor, subscription
}
log.Printf("Updating feed %s %q in %q\n", feedID, topic, channel)
err = processor.ProcessContent(channel, feedID, topic, contentType, body)
_, err = processor.ProcessContent(channel, feedID, topic, contentType, body)
if err != nil {
log.Printf("could not process content for channel %s: %s", channel, err)
}
@ -223,22 +223,24 @@ func (h *hubIncomingBackend) Subscribe(feed *Feed) error {
}
func (h *hubIncomingBackend) run() error {
ticker := time.NewTicker(10 * time.Minute)
ticker := time.NewTicker(1 * time.Minute)
quit := make(chan struct{})
go func() {
for {
select {
case <-ticker.C:
log.Println("Getting feeds for WebSub")
log.Println("Getting feeds for WebSub started")
varWebsub.Add("runs", 1)
feeds, err := h.Feeds()
if err != nil {
log.Println("Feeds failed:", err)
log.Println("Getting feeds for WebSub completed")
continue
}
log.Printf("Found %d feeds", len(feeds))
for _, feed := range feeds {
log.Printf("Looking at %s\n", feed.URL)
if feed.ResubscribeAt != nil && time.Now().After(*feed.ResubscribeAt) {
@ -254,6 +256,8 @@ func (h *hubIncomingBackend) run() error {
}
}
}
log.Println("Getting feeds for WebSub completed")
case <-quit:
ticker.Stop()
return

View File

@ -27,9 +27,11 @@ import (
"io"
"io/ioutil"
"log"
"math"
"net/http"
"net/url"
"regexp"
"strconv"
"strings"
"sync"
"time"
@ -102,6 +104,15 @@ type newItemMessage struct {
Channel string `json:"channel"`
}
type feed struct {
UID string // channel
ID int
URL string
Tier int
Unmodified int
NextFetchAt time.Time
}
func (b *memoryBackend) AuthTokenAccepted(header string, r *auth.TokenResponse) (bool, error) {
conn := b.pool.Get()
defer func() {
@ -219,15 +230,22 @@ func (b *memoryBackend) ChannelsDelete(uid string) error {
b.broker.Notifier <- sse.Message{Event: "delete channel", Object: channelDeletedMessage{1, uid}}
return nil
}
type feed struct {
UID string // channel
ID int
URL string
func (b *memoryBackend) updateFeed(feed feed) error {
_, err := b.database.Exec(`
UPDATE "feeds"
SET "tier" = $2, "unmodified" = $3, "next_fetch_at" = $4
WHERE "id" = $1
`, feed.ID, feed.Tier, feed.Unmodified, feed.NextFetchAt)
return err
}
func (b *memoryBackend) getFeeds() ([]feed, error) {
rows, err := b.database.Query(`SELECT "f"."id", "f"."url", "c"."uid" FROM "feeds" AS "f" INNER JOIN public.channels c on c.id = f.channel_id`)
rows, err := b.database.Query(`
SELECT "f"."id", "f"."url", "c"."uid", "f"."tier","f"."unmodified","f"."next_fetch_at"
FROM "feeds" AS "f"
INNER JOIN public.channels c ON c.id = f.channel_id
WHERE next_fetch_at IS NULL OR next_fetch_at < now()
`)
if err != nil {
return nil, err
}
@ -236,29 +254,49 @@ func (b *memoryBackend) getFeeds() ([]feed, error) {
for rows.Next() {
var feedID int
var feedURL, UID string
var tier, unmodified int
var nextFetchAt sql.NullTime
err = rows.Scan(&feedID, &feedURL, &UID)
err = rows.Scan(&feedID, &feedURL, &UID, &tier, &unmodified, &nextFetchAt)
if err != nil {
log.Printf("while scanning feeds: %s", err)
continue
}
feeds = append(feeds, feed{UID, feedID, feedURL})
var fetchTime time.Time
if nextFetchAt.Valid {
fetchTime = nextFetchAt.Time
} else {
fetchTime = time.Now()
}
feeds = append(
feeds,
feed{
UID: UID,
ID: feedID,
URL: feedURL,
Tier: tier,
Unmodified: unmodified,
NextFetchAt: fetchTime,
},
)
}
return feeds, nil
}
func (b *memoryBackend) run() {
b.ticker = time.NewTicker(10 * time.Minute)
b.ticker = time.NewTicker(1 * time.Minute)
b.quit = make(chan struct{})
go func() {
b.RefreshFeeds()
for {
select {
case <-b.ticker.C:
b.RefreshFeeds()
case <-b.quit:
b.ticker.Stop()
return
@ -268,43 +306,89 @@ func (b *memoryBackend) run() {
}
func (b *memoryBackend) RefreshFeeds() {
log.Println("Feed update process started")
defer log.Println("Feed update process completed")
feeds, err := b.getFeeds()
if err != nil {
return
}
count := 0
log.Printf("Found %d feeds", len(feeds))
count := 0
for _, feed := range feeds {
feedURL := feed.URL
feedID := feed.ID
uid := feed.UID
log.Println("Processing", feedURL)
resp, err := b.Fetch3(uid, feedURL)
log.Println("Processing", feed.URL)
err := b.refreshFeed(feed)
if err != nil {
log.Printf("Error while Fetch3 of %s: %v\n", feedURL, err)
b.addNotification("Error while fetching feed", feedURL, err)
count++
b.addNotification("Error while fetching feed", feed, err)
continue
}
err = b.ProcessContent(uid, fmt.Sprintf("%d", feedID), feedURL, resp.Header.Get("Content-Type"), resp.Body)
if err != nil {
log.Printf("Error while processing content for %s: %v\n", feedURL, err)
b.addNotification("Error while processing feed", feedURL, err)
count++
continue
}
_ = resp.Body.Close()
count++
}
if count > 0 {
_ = b.updateChannelUnreadCount("notifications")
}
log.Printf("Processed %d feeds", count)
}
func (b *memoryBackend) addNotification(name string, feedURL string, err error) {
err = b.channelAddItem("notifications", microsub.Item{
func (b *memoryBackend) refreshFeed(feed feed) error {
resp, err := b.Fetch3(feed.UID, feed.URL)
if err != nil {
return fmt.Errorf("while Fetch3 of %s: %w", feed.URL, err)
}
defer resp.Body.Close()
changed, err := b.ProcessContent(feed.UID, fmt.Sprintf("%d", feed.ID), feed.URL, resp.Header.Get("Content-Type"), resp.Body)
if err != nil {
return fmt.Errorf("in ProcessContent of %s: %w", feed.URL, err)
}
if changed {
feed.Tier--
} else {
feed.Unmodified++
}
if feed.Unmodified >= 2 {
feed.Tier++
feed.Unmodified = 0
}
if feed.Tier > 10 {
feed.Tier = 10
}
if feed.Tier < 0 {
feed.Tier = 0
}
minutes := time.Duration(math.Ceil(math.Exp2(float64(feed.Tier))))
feed.NextFetchAt = time.Now().Add(minutes * time.Minute)
log.Printf("Next Fetch in %d minutes at %v", minutes, feed.NextFetchAt.Format(time.RFC3339))
err = b.updateFeed(feed)
if err != nil {
log.Printf("Error: while updating feed %v: %v", feed, err)
// don't return error, because it becomes a notification
return nil
}
return nil
}
func (b *memoryBackend) addNotification(name string, feed feed, err error) {
_, err = b.channelAddItem("notifications", microsub.Item{
Type: "entry",
Source: &microsub.Source{
ID: strconv.Itoa(feed.ID),
URL: feed.URL,
Name: feed.URL,
},
Name: name,
Content: &microsub.Content{
Text: fmt.Sprintf("ERROR: while updating feed: %s", err),
@ -354,7 +438,7 @@ func (b *memoryBackend) FollowGetList(uid string) ([]microsub.Feed, error) {
}
func (b *memoryBackend) FollowURL(uid string, url string) (microsub.Feed, error) {
feed := microsub.Feed{Type: "feed", URL: url}
subFeed := microsub.Feed{Type: "feed", URL: url}
var channelID int
err := b.database.QueryRow(`SELECT "id" FROM "channels" WHERE "uid" = $1`, uid).Scan(&channelID)
@ -367,28 +451,36 @@ func (b *memoryBackend) FollowURL(uid string, url string) (microsub.Feed, error)
var feedID int
err = b.database.QueryRow(
`INSERT INTO "feeds" ("channel_id", "url") VALUES ($1, $2) RETURNING "id"`,
`INSERT INTO "feeds" ("channel_id", "url", "tier", "unmodified", "next_fetch_at") VALUES ($1, $2, 1, 0, now()) RETURNING "id"`,
channelID,
feed.URL,
subFeed.URL,
).Scan(&feedID)
if err != nil {
return feed, err
return subFeed, err
}
resp, err := b.Fetch3(uid, feed.URL)
var newFeed = feed{
ID: feedID,
UID: uid,
URL: url,
Tier: 1,
Unmodified: 0,
NextFetchAt: time.Now(),
}
resp, err := b.Fetch3(uid, subFeed.URL)
if err != nil {
log.Println(err)
b.addNotification("Error while fetching feed", feed.URL, err)
b.addNotification("Error while fetching feed", newFeed, err)
_ = b.updateChannelUnreadCount("notifications")
return feed, err
return subFeed, err
}
defer resp.Body.Close()
_ = b.ProcessContent(uid, fmt.Sprintf("%d", feedID), feed.URL, resp.Header.Get("Content-Type"), resp.Body)
_, _ = b.ProcessContent(uid, fmt.Sprintf("%d", feedID), subFeed.URL, resp.Header.Get("Content-Type"), resp.Body)
_, _ = b.hubBackend.CreateFeed(url)
return feed, nil
return subFeed, nil
}
func (b *memoryBackend) UnfollowURL(uid string, url string) error {
@ -584,31 +676,35 @@ func ProcessSourcedItems(fetcher fetch.Fetcher, fetchURL, contentType string, bo
// ContentProcessor processes content for a channel and feed
type ContentProcessor interface {
ProcessContent(channel, feedID, fetchURL, contentType string, body io.Reader) error
ProcessContent(channel, feedID, fetchURL, contentType string, body io.Reader) (bool, error)
}
func (b *memoryBackend) ProcessContent(channel, feedID, fetchURL, contentType string, body io.Reader) error {
// ProcessContent processes content of a feed, returns if the feed has changed or not
func (b *memoryBackend) ProcessContent(channel, feedID, fetchURL, contentType string, body io.Reader) (bool, error) {
cachingFetch := WithCaching(b.pool, fetch.FetcherFunc(Fetch2))
items, err := ProcessSourcedItems(cachingFetch, fetchURL, contentType, body)
if err != nil {
return err
return false, err
}
changed := false
for _, item := range items {
item.Source.ID = feedID
err = b.channelAddItemWithMatcher(channel, item)
added, err := b.channelAddItemWithMatcher(channel, item)
if err != nil {
log.Printf("ERROR: (feedID=%s) %s\n", feedID, err)
}
changed = changed && added
}
err = b.updateChannelUnreadCount(channel)
if err != nil {
return err
return changed, err
}
return nil
return changed, nil
}
// Fetch3 fills stuff
@ -617,17 +713,12 @@ func (b *memoryBackend) Fetch3(channel, fetchURL string) (*http.Response, error)
return Fetch2(fetchURL)
}
func (b *memoryBackend) channelAddItemWithMatcher(channel string, item microsub.Item) error {
func (b *memoryBackend) channelAddItemWithMatcher(channel string, item microsub.Item) (bool, error) {
// an item is posted
// check for all channels as channel
// if regex matches item
// - add item to channel
err := addToSearch(item, channel)
if err != nil {
return fmt.Errorf("addToSearch in channelAddItemWithMatcher: %v", err)
}
var updatedChannels []string
b.lock.RLock()
@ -640,23 +731,23 @@ func (b *memoryBackend) channelAddItemWithMatcher(channel string, item microsub.
switch v {
case "repost":
if len(item.RepostOf) > 0 {
return nil
return false, nil
}
case "like":
if len(item.LikeOf) > 0 {
return nil
return false, nil
}
case "bookmark":
if len(item.BookmarkOf) > 0 {
return nil
return false, nil
}
case "reply":
if len(item.InReplyTo) > 0 {
return nil
return false, nil
}
case "checkin":
if item.Checkin != nil {
return nil
return false, nil
}
}
}
@ -665,16 +756,24 @@ func (b *memoryBackend) channelAddItemWithMatcher(channel string, item microsub.
re, err := regexp.Compile(setting.IncludeRegex)
if err != nil {
log.Printf("error in regexp: %q, %s\n", setting.IncludeRegex, err)
return nil
return false, nil
}
if matchItem(item, re) {
log.Printf("Included %#v\n", item)
err := b.channelAddItem(channelKey, item)
added, err := b.channelAddItem(channelKey, item)
if err != nil {
continue
}
updatedChannels = append(updatedChannels, channelKey)
err = addToSearch(item, channel)
if err != nil {
return added, fmt.Errorf("addToSearch in channelAddItemWithMatcher: %v", err)
}
if added {
updatedChannels = append(updatedChannels, channelKey)
}
}
}
}
@ -697,15 +796,26 @@ func (b *memoryBackend) channelAddItemWithMatcher(channel string, item microsub.
excludeRegex, err := regexp.Compile(setting.ExcludeRegex)
if err != nil {
log.Printf("error in regexp: %q\n", excludeRegex)
return nil
return false, nil
}
if matchItem(item, excludeRegex) {
log.Printf("Excluded %#v\n", item)
return nil
return false, nil
}
}
return b.channelAddItem(channel, item)
added, err := b.channelAddItem(channel, item)
if err != nil {
return added, err
}
err = addToSearch(item, channel)
if err != nil {
return added, fmt.Errorf("addToSearch in channelAddItemWithMatcher: %v", err)
}
return added, nil
}
func matchItem(item microsub.Item, re *regexp.Regexp) bool {
@ -734,11 +844,11 @@ func matchItemText(item microsub.Item, re *regexp.Regexp) bool {
return re.MatchString(item.Name)
}
func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) error {
func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) (bool, error) {
timelineBackend := b.getTimeline(channel)
added, err := timelineBackend.AddItem(item)
if err != nil {
return err
return added, err
}
// Sent message to Server-Sent-Events
@ -746,7 +856,7 @@ func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) error
b.broker.Notifier <- sse.Message{Event: "new item", Object: newItemMessage{item, channel}}
}
return err
return added, err
}
func (b *memoryBackend) updateChannelUnreadCount(channel string) error {

View File

@ -99,7 +99,7 @@ func (h *micropubHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
item.ID = newID
err = h.Backend.channelAddItemWithMatcher(channel, *item)
_, err = h.Backend.channelAddItemWithMatcher(channel, *item)
if err != nil {
log.Printf("could not add item to channel %s: %v", channel, err)
}