diff --git a/cmd/eksterd/db/migrations/000007_alter_table_add_feed_tier.down.sql b/cmd/eksterd/db/migrations/000007_alter_table_add_feed_tier.down.sql new file mode 100644 index 0000000..f6fbfe4 --- /dev/null +++ b/cmd/eksterd/db/migrations/000007_alter_table_add_feed_tier.down.sql @@ -0,0 +1,4 @@ +alter table "feeds" + drop column "tier", + drop column "unmodified", + drop column "next_fetch_at"; diff --git a/cmd/eksterd/db/migrations/000007_alter_table_add_feed_tier.up.sql b/cmd/eksterd/db/migrations/000007_alter_table_add_feed_tier.up.sql new file mode 100644 index 0000000..18eb848 --- /dev/null +++ b/cmd/eksterd/db/migrations/000007_alter_table_add_feed_tier.up.sql @@ -0,0 +1,4 @@ +alter table "feeds" + add column "tier" int default 0, + add column "unmodified" int default 0, + add column "next_fetch_at" timestamptz; diff --git a/cmd/eksterd/hubbackend.go b/cmd/eksterd/hubbackend.go index 382bd5f..f3254ec 100644 --- a/cmd/eksterd/hubbackend.go +++ b/cmd/eksterd/hubbackend.go @@ -158,7 +158,7 @@ func (h *hubIncomingBackend) UpdateFeed(processor ContentProcessor, subscription } log.Printf("Updating feed %s %q in %q\n", feedID, topic, channel) - err = processor.ProcessContent(channel, feedID, topic, contentType, body) + _, err = processor.ProcessContent(channel, feedID, topic, contentType, body) if err != nil { log.Printf("could not process content for channel %s: %s", channel, err) } @@ -223,22 +223,24 @@ func (h *hubIncomingBackend) Subscribe(feed *Feed) error { } func (h *hubIncomingBackend) run() error { - ticker := time.NewTicker(10 * time.Minute) + ticker := time.NewTicker(1 * time.Minute) quit := make(chan struct{}) go func() { for { select { case <-ticker.C: - log.Println("Getting feeds for WebSub") + log.Println("Getting feeds for WebSub started") varWebsub.Add("runs", 1) feeds, err := h.Feeds() if err != nil { log.Println("Feeds failed:", err) + log.Println("Getting feeds for WebSub completed") continue } + log.Printf("Found %d feeds", len(feeds)) for _, feed := range feeds { log.Printf("Looking at %s\n", feed.URL) if feed.ResubscribeAt != nil && time.Now().After(*feed.ResubscribeAt) { @@ -254,6 +256,8 @@ func (h *hubIncomingBackend) run() error { } } } + + log.Println("Getting feeds for WebSub completed") case <-quit: ticker.Stop() return diff --git a/cmd/eksterd/memory.go b/cmd/eksterd/memory.go index c17a9c6..5f0283d 100644 --- a/cmd/eksterd/memory.go +++ b/cmd/eksterd/memory.go @@ -27,9 +27,11 @@ import ( "io" "io/ioutil" "log" + "math" "net/http" "net/url" "regexp" + "strconv" "strings" "sync" "time" @@ -102,6 +104,15 @@ type newItemMessage struct { Channel string `json:"channel"` } +type feed struct { + UID string // channel + ID int + URL string + Tier int + Unmodified int + NextFetchAt time.Time +} + func (b *memoryBackend) AuthTokenAccepted(header string, r *auth.TokenResponse) (bool, error) { conn := b.pool.Get() defer func() { @@ -219,15 +230,22 @@ func (b *memoryBackend) ChannelsDelete(uid string) error { b.broker.Notifier <- sse.Message{Event: "delete channel", Object: channelDeletedMessage{1, uid}} return nil } - -type feed struct { - UID string // channel - ID int - URL string +func (b *memoryBackend) updateFeed(feed feed) error { + _, err := b.database.Exec(` +UPDATE "feeds" +SET "tier" = $2, "unmodified" = $3, "next_fetch_at" = $4 +WHERE "id" = $1 +`, feed.ID, feed.Tier, feed.Unmodified, feed.NextFetchAt) + return err } func (b *memoryBackend) getFeeds() ([]feed, error) { - rows, err := b.database.Query(`SELECT "f"."id", "f"."url", "c"."uid" FROM "feeds" AS "f" INNER JOIN public.channels c on c.id = f.channel_id`) + rows, err := b.database.Query(` +SELECT "f"."id", "f"."url", "c"."uid", "f"."tier","f"."unmodified","f"."next_fetch_at" +FROM "feeds" AS "f" +INNER JOIN public.channels c ON c.id = f.channel_id +WHERE next_fetch_at IS NULL OR next_fetch_at < now() +`) if err != nil { return nil, err } @@ -236,29 +254,49 @@ func (b *memoryBackend) getFeeds() ([]feed, error) { for rows.Next() { var feedID int var feedURL, UID string + var tier, unmodified int + var nextFetchAt sql.NullTime - err = rows.Scan(&feedID, &feedURL, &UID) + err = rows.Scan(&feedID, &feedURL, &UID, &tier, &unmodified, &nextFetchAt) if err != nil { log.Printf("while scanning feeds: %s", err) continue } - feeds = append(feeds, feed{UID, feedID, feedURL}) + var fetchTime time.Time + if nextFetchAt.Valid { + fetchTime = nextFetchAt.Time + } else { + fetchTime = time.Now() + } + + feeds = append( + feeds, + feed{ + UID: UID, + ID: feedID, + URL: feedURL, + Tier: tier, + Unmodified: unmodified, + NextFetchAt: fetchTime, + }, + ) } return feeds, nil } func (b *memoryBackend) run() { - b.ticker = time.NewTicker(10 * time.Minute) + b.ticker = time.NewTicker(1 * time.Minute) b.quit = make(chan struct{}) go func() { + b.RefreshFeeds() + for { select { case <-b.ticker.C: b.RefreshFeeds() - case <-b.quit: b.ticker.Stop() return @@ -268,43 +306,89 @@ func (b *memoryBackend) run() { } func (b *memoryBackend) RefreshFeeds() { + log.Println("Feed update process started") + defer log.Println("Feed update process completed") + feeds, err := b.getFeeds() if err != nil { return } - count := 0 + log.Printf("Found %d feeds", len(feeds)) + count := 0 for _, feed := range feeds { - feedURL := feed.URL - feedID := feed.ID - uid := feed.UID - log.Println("Processing", feedURL) - resp, err := b.Fetch3(uid, feedURL) + log.Println("Processing", feed.URL) + err := b.refreshFeed(feed) if err != nil { - log.Printf("Error while Fetch3 of %s: %v\n", feedURL, err) - b.addNotification("Error while fetching feed", feedURL, err) - count++ + b.addNotification("Error while fetching feed", feed, err) continue } - err = b.ProcessContent(uid, fmt.Sprintf("%d", feedID), feedURL, resp.Header.Get("Content-Type"), resp.Body) - if err != nil { - log.Printf("Error while processing content for %s: %v\n", feedURL, err) - b.addNotification("Error while processing feed", feedURL, err) - count++ - continue - } - _ = resp.Body.Close() + + count++ } if count > 0 { _ = b.updateChannelUnreadCount("notifications") } + log.Printf("Processed %d feeds", count) } -func (b *memoryBackend) addNotification(name string, feedURL string, err error) { - err = b.channelAddItem("notifications", microsub.Item{ +func (b *memoryBackend) refreshFeed(feed feed) error { + resp, err := b.Fetch3(feed.UID, feed.URL) + if err != nil { + return fmt.Errorf("while Fetch3 of %s: %w", feed.URL, err) + } + defer resp.Body.Close() + + changed, err := b.ProcessContent(feed.UID, fmt.Sprintf("%d", feed.ID), feed.URL, resp.Header.Get("Content-Type"), resp.Body) + if err != nil { + return fmt.Errorf("in ProcessContent of %s: %w", feed.URL, err) + } + + if changed { + feed.Tier-- + } else { + feed.Unmodified++ + } + + if feed.Unmodified >= 2 { + feed.Tier++ + feed.Unmodified = 0 + } + + if feed.Tier > 10 { + feed.Tier = 10 + } + + if feed.Tier < 0 { + feed.Tier = 0 + } + + minutes := time.Duration(math.Ceil(math.Exp2(float64(feed.Tier)))) + + feed.NextFetchAt = time.Now().Add(minutes * time.Minute) + + log.Printf("Next Fetch in %d minutes at %v", minutes, feed.NextFetchAt.Format(time.RFC3339)) + + err = b.updateFeed(feed) + if err != nil { + log.Printf("Error: while updating feed %v: %v", feed, err) + // don't return error, because it becomes a notification + return nil + } + + return nil +} + +func (b *memoryBackend) addNotification(name string, feed feed, err error) { + _, err = b.channelAddItem("notifications", microsub.Item{ Type: "entry", + Source: µsub.Source{ + ID: strconv.Itoa(feed.ID), + URL: feed.URL, + Name: feed.URL, + }, Name: name, Content: µsub.Content{ Text: fmt.Sprintf("ERROR: while updating feed: %s", err), @@ -354,7 +438,7 @@ func (b *memoryBackend) FollowGetList(uid string) ([]microsub.Feed, error) { } func (b *memoryBackend) FollowURL(uid string, url string) (microsub.Feed, error) { - feed := microsub.Feed{Type: "feed", URL: url} + subFeed := microsub.Feed{Type: "feed", URL: url} var channelID int err := b.database.QueryRow(`SELECT "id" FROM "channels" WHERE "uid" = $1`, uid).Scan(&channelID) @@ -367,28 +451,36 @@ func (b *memoryBackend) FollowURL(uid string, url string) (microsub.Feed, error) var feedID int err = b.database.QueryRow( - `INSERT INTO "feeds" ("channel_id", "url") VALUES ($1, $2) RETURNING "id"`, + `INSERT INTO "feeds" ("channel_id", "url", "tier", "unmodified", "next_fetch_at") VALUES ($1, $2, 1, 0, now()) RETURNING "id"`, channelID, - feed.URL, + subFeed.URL, ).Scan(&feedID) if err != nil { - return feed, err + return subFeed, err } - resp, err := b.Fetch3(uid, feed.URL) + var newFeed = feed{ + ID: feedID, + UID: uid, + URL: url, + Tier: 1, + Unmodified: 0, + NextFetchAt: time.Now(), + } + resp, err := b.Fetch3(uid, subFeed.URL) if err != nil { log.Println(err) - b.addNotification("Error while fetching feed", feed.URL, err) + b.addNotification("Error while fetching feed", newFeed, err) _ = b.updateChannelUnreadCount("notifications") - return feed, err + return subFeed, err } defer resp.Body.Close() - _ = b.ProcessContent(uid, fmt.Sprintf("%d", feedID), feed.URL, resp.Header.Get("Content-Type"), resp.Body) + _, _ = b.ProcessContent(uid, fmt.Sprintf("%d", feedID), subFeed.URL, resp.Header.Get("Content-Type"), resp.Body) _, _ = b.hubBackend.CreateFeed(url) - return feed, nil + return subFeed, nil } func (b *memoryBackend) UnfollowURL(uid string, url string) error { @@ -584,31 +676,35 @@ func ProcessSourcedItems(fetcher fetch.Fetcher, fetchURL, contentType string, bo // ContentProcessor processes content for a channel and feed type ContentProcessor interface { - ProcessContent(channel, feedID, fetchURL, contentType string, body io.Reader) error + ProcessContent(channel, feedID, fetchURL, contentType string, body io.Reader) (bool, error) } -func (b *memoryBackend) ProcessContent(channel, feedID, fetchURL, contentType string, body io.Reader) error { +// ProcessContent processes content of a feed, returns if the feed has changed or not +func (b *memoryBackend) ProcessContent(channel, feedID, fetchURL, contentType string, body io.Reader) (bool, error) { cachingFetch := WithCaching(b.pool, fetch.FetcherFunc(Fetch2)) items, err := ProcessSourcedItems(cachingFetch, fetchURL, contentType, body) if err != nil { - return err + return false, err } + changed := false + for _, item := range items { item.Source.ID = feedID - err = b.channelAddItemWithMatcher(channel, item) + added, err := b.channelAddItemWithMatcher(channel, item) if err != nil { log.Printf("ERROR: (feedID=%s) %s\n", feedID, err) } + changed = changed && added } err = b.updateChannelUnreadCount(channel) if err != nil { - return err + return changed, err } - return nil + return changed, nil } // Fetch3 fills stuff @@ -617,17 +713,12 @@ func (b *memoryBackend) Fetch3(channel, fetchURL string) (*http.Response, error) return Fetch2(fetchURL) } -func (b *memoryBackend) channelAddItemWithMatcher(channel string, item microsub.Item) error { +func (b *memoryBackend) channelAddItemWithMatcher(channel string, item microsub.Item) (bool, error) { // an item is posted // check for all channels as channel // if regex matches item // - add item to channel - err := addToSearch(item, channel) - if err != nil { - return fmt.Errorf("addToSearch in channelAddItemWithMatcher: %v", err) - } - var updatedChannels []string b.lock.RLock() @@ -640,23 +731,23 @@ func (b *memoryBackend) channelAddItemWithMatcher(channel string, item microsub. switch v { case "repost": if len(item.RepostOf) > 0 { - return nil + return false, nil } case "like": if len(item.LikeOf) > 0 { - return nil + return false, nil } case "bookmark": if len(item.BookmarkOf) > 0 { - return nil + return false, nil } case "reply": if len(item.InReplyTo) > 0 { - return nil + return false, nil } case "checkin": if item.Checkin != nil { - return nil + return false, nil } } } @@ -665,16 +756,24 @@ func (b *memoryBackend) channelAddItemWithMatcher(channel string, item microsub. re, err := regexp.Compile(setting.IncludeRegex) if err != nil { log.Printf("error in regexp: %q, %s\n", setting.IncludeRegex, err) - return nil + return false, nil } if matchItem(item, re) { log.Printf("Included %#v\n", item) - err := b.channelAddItem(channelKey, item) + added, err := b.channelAddItem(channelKey, item) if err != nil { continue } - updatedChannels = append(updatedChannels, channelKey) + + err = addToSearch(item, channel) + if err != nil { + return added, fmt.Errorf("addToSearch in channelAddItemWithMatcher: %v", err) + } + + if added { + updatedChannels = append(updatedChannels, channelKey) + } } } } @@ -697,15 +796,26 @@ func (b *memoryBackend) channelAddItemWithMatcher(channel string, item microsub. excludeRegex, err := regexp.Compile(setting.ExcludeRegex) if err != nil { log.Printf("error in regexp: %q\n", excludeRegex) - return nil + return false, nil } if matchItem(item, excludeRegex) { log.Printf("Excluded %#v\n", item) - return nil + return false, nil } } - return b.channelAddItem(channel, item) + added, err := b.channelAddItem(channel, item) + + if err != nil { + return added, err + } + + err = addToSearch(item, channel) + if err != nil { + return added, fmt.Errorf("addToSearch in channelAddItemWithMatcher: %v", err) + } + + return added, nil } func matchItem(item microsub.Item, re *regexp.Regexp) bool { @@ -734,11 +844,11 @@ func matchItemText(item microsub.Item, re *regexp.Regexp) bool { return re.MatchString(item.Name) } -func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) error { +func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) (bool, error) { timelineBackend := b.getTimeline(channel) added, err := timelineBackend.AddItem(item) if err != nil { - return err + return added, err } // Sent message to Server-Sent-Events @@ -746,7 +856,7 @@ func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) error b.broker.Notifier <- sse.Message{Event: "new item", Object: newItemMessage{item, channel}} } - return err + return added, err } func (b *memoryBackend) updateChannelUnreadCount(channel string) error { diff --git a/cmd/eksterd/micropub.go b/cmd/eksterd/micropub.go index f10776f..9db070c 100644 --- a/cmd/eksterd/micropub.go +++ b/cmd/eksterd/micropub.go @@ -99,7 +99,7 @@ func (h *micropubHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } item.ID = newID - err = h.Backend.channelAddItemWithMatcher(channel, *item) + _, err = h.Backend.channelAddItemWithMatcher(channel, *item) if err != nil { log.Printf("could not add item to channel %s: %v", channel, err) }