Histogram bucket feed polling #11

Merged
peter merged 3 commits from histogram-buckets-feed-polling into master 2022-04-16 13:28:04 +00:00
10 changed files with 205 additions and 73 deletions

View File

@ -9,12 +9,13 @@ workspace:
steps:
- name: testing
image: golang:1.16-alpine
image: golang:1.18-alpine
environment:
CGO_ENABLED: 0
GOOS: linux
GOARCH: amd64
commands:
- go version
- apk --no-cache add git
- go get -d -t ./...
- go install honnef.co/go/tools/cmd/staticcheck@latest

View File

@ -0,0 +1,4 @@
alter table "feeds"
drop column "tier",
drop column "unmodified",
drop column "next_fetch_at";

View File

@ -0,0 +1,4 @@
alter table "feeds"
add column "tier" int default 0,
add column "unmodified" int default 0,
add column "next_fetch_at" timestamptz;

View File

@ -158,7 +158,7 @@ func (h *hubIncomingBackend) UpdateFeed(processor ContentProcessor, subscription
}
log.Printf("Updating feed %s %q in %q\n", feedID, topic, channel)
err = processor.ProcessContent(channel, feedID, topic, contentType, body)
_, err = processor.ProcessContent(channel, feedID, topic, contentType, body)
if err != nil {
log.Printf("could not process content for channel %s: %s", channel, err)
}
@ -223,22 +223,24 @@ func (h *hubIncomingBackend) Subscribe(feed *Feed) error {
}
func (h *hubIncomingBackend) run() error {
ticker := time.NewTicker(10 * time.Minute)
ticker := time.NewTicker(1 * time.Minute)
quit := make(chan struct{})
go func() {
for {
select {
case <-ticker.C:
log.Println("Getting feeds for WebSub")
log.Println("Getting feeds for WebSub started")
varWebsub.Add("runs", 1)
feeds, err := h.Feeds()
if err != nil {
log.Println("Feeds failed:", err)
log.Println("Getting feeds for WebSub completed")
continue
}
log.Printf("Found %d feeds", len(feeds))
for _, feed := range feeds {
log.Printf("Looking at %s\n", feed.URL)
if feed.ResubscribeAt != nil && time.Now().After(*feed.ResubscribeAt) {
@ -254,6 +256,8 @@ func (h *hubIncomingBackend) run() error {
}
}
}
log.Println("Getting feeds for WebSub completed")
case <-quit:
ticker.Stop()
return

View File

@ -27,9 +27,11 @@ import (
"io"
"io/ioutil"
"log"
"math"
"net/http"
"net/url"
"regexp"
"strconv"
"strings"
"sync"
"time"
@ -102,6 +104,15 @@ type newItemMessage struct {
Channel string `json:"channel"`
}
type feed struct {
UID string // channel
ID int
URL string
Tier int
Unmodified int
NextFetchAt time.Time
}
func (b *memoryBackend) AuthTokenAccepted(header string, r *auth.TokenResponse) (bool, error) {
conn := b.pool.Get()
defer func() {
@ -219,15 +230,22 @@ func (b *memoryBackend) ChannelsDelete(uid string) error {
b.broker.Notifier <- sse.Message{Event: "delete channel", Object: channelDeletedMessage{1, uid}}
return nil
}
type feed struct {
UID string // channel
ID int
URL string
func (b *memoryBackend) updateFeed(feed feed) error {
_, err := b.database.Exec(`
UPDATE "feeds"
SET "tier" = $2, "unmodified" = $3, "next_fetch_at" = $4
WHERE "id" = $1
`, feed.ID, feed.Tier, feed.Unmodified, feed.NextFetchAt)
return err
}
func (b *memoryBackend) getFeeds() ([]feed, error) {
rows, err := b.database.Query(`SELECT "f"."id", "f"."url", "c"."uid" FROM "feeds" AS "f" INNER JOIN public.channels c on c.id = f.channel_id`)
rows, err := b.database.Query(`
SELECT "f"."id", "f"."url", "c"."uid", "f"."tier","f"."unmodified","f"."next_fetch_at"
FROM "feeds" AS "f"
INNER JOIN public.channels c ON c.id = f.channel_id
WHERE next_fetch_at IS NULL OR next_fetch_at < now()
`)
if err != nil {
return nil, err
}
@ -236,29 +254,49 @@ func (b *memoryBackend) getFeeds() ([]feed, error) {
for rows.Next() {
var feedID int
var feedURL, UID string
var tier, unmodified int
var nextFetchAt sql.NullTime
err = rows.Scan(&feedID, &feedURL, &UID)
err = rows.Scan(&feedID, &feedURL, &UID, &tier, &unmodified, &nextFetchAt)
if err != nil {
log.Printf("while scanning feeds: %s", err)
continue
}
feeds = append(feeds, feed{UID, feedID, feedURL})
var fetchTime time.Time
if nextFetchAt.Valid {
fetchTime = nextFetchAt.Time
} else {
fetchTime = time.Now()
}
feeds = append(
feeds,
feed{
UID: UID,
ID: feedID,
URL: feedURL,
Tier: tier,
Unmodified: unmodified,
NextFetchAt: fetchTime,
},
)
}
return feeds, nil
}
func (b *memoryBackend) run() {
b.ticker = time.NewTicker(10 * time.Minute)
b.ticker = time.NewTicker(1 * time.Minute)
b.quit = make(chan struct{})
go func() {
b.RefreshFeeds()
for {
select {
case <-b.ticker.C:
b.RefreshFeeds()
case <-b.quit:
b.ticker.Stop()
return
@ -268,43 +306,89 @@ func (b *memoryBackend) run() {
}
func (b *memoryBackend) RefreshFeeds() {
log.Println("Feed update process started")
defer log.Println("Feed update process completed")
feeds, err := b.getFeeds()
if err != nil {
return
}
count := 0
log.Printf("Found %d feeds", len(feeds))
count := 0
for _, feed := range feeds {
feedURL := feed.URL
feedID := feed.ID
uid := feed.UID
log.Println("Processing", feedURL)
resp, err := b.Fetch3(uid, feedURL)
log.Println("Processing", feed.URL)
err := b.refreshFeed(feed)
if err != nil {
log.Printf("Error while Fetch3 of %s: %v\n", feedURL, err)
b.addNotification("Error while fetching feed", feedURL, err)
count++
b.addNotification("Error while fetching feed", feed, err)
continue
}
err = b.ProcessContent(uid, fmt.Sprintf("%d", feedID), feedURL, resp.Header.Get("Content-Type"), resp.Body)
if err != nil {
log.Printf("Error while processing content for %s: %v\n", feedURL, err)
b.addNotification("Error while processing feed", feedURL, err)
count++
continue
}
_ = resp.Body.Close()
}
if count > 0 {
_ = b.updateChannelUnreadCount("notifications")
}
log.Printf("Processed %d feeds", count)
}
func (b *memoryBackend) addNotification(name string, feedURL string, err error) {
err = b.channelAddItem("notifications", microsub.Item{
func (b *memoryBackend) refreshFeed(feed feed) error {
resp, err := b.Fetch3(feed.UID, feed.URL)
if err != nil {
return fmt.Errorf("while Fetch3 of %s: %w", feed.URL, err)
}
defer resp.Body.Close()
changed, err := b.ProcessContent(feed.UID, fmt.Sprintf("%d", feed.ID), feed.URL, resp.Header.Get("Content-Type"), resp.Body)
if err != nil {
return fmt.Errorf("in ProcessContent of %s: %w", feed.URL, err)
}
if changed {
feed.Tier--
} else {
feed.Unmodified++
}
if feed.Unmodified >= 2 {
feed.Tier++
feed.Unmodified = 0
}
if feed.Tier > 10 {
feed.Tier = 10
}
if feed.Tier < 0 {
feed.Tier = 0
}
minutes := time.Duration(math.Ceil(math.Exp2(float64(feed.Tier))))
feed.NextFetchAt = time.Now().Add(minutes * time.Minute)
log.Printf("Next Fetch in %d minutes at %v", minutes, feed.NextFetchAt.Format(time.RFC3339))
err = b.updateFeed(feed)
if err != nil {
log.Printf("Error: while updating feed %v: %v", feed, err)
// don't return error, because it becomes a notification
return nil
}
return nil
}
func (b *memoryBackend) addNotification(name string, feed feed, err error) {
_, err = b.channelAddItem("notifications", microsub.Item{
Type: "entry",
Source: &microsub.Source{
ID: strconv.Itoa(feed.ID),
URL: feed.URL,
Name: feed.URL,
},
Name: name,
Content: &microsub.Content{
Text: fmt.Sprintf("ERROR: while updating feed: %s", err),
@ -354,7 +438,7 @@ func (b *memoryBackend) FollowGetList(uid string) ([]microsub.Feed, error) {
}
func (b *memoryBackend) FollowURL(uid string, url string) (microsub.Feed, error) {
feed := microsub.Feed{Type: "feed", URL: url}
subFeed := microsub.Feed{Type: "feed", URL: url}
var channelID int
err := b.database.QueryRow(`SELECT "id" FROM "channels" WHERE "uid" = $1`, uid).Scan(&channelID)
@ -367,28 +451,36 @@ func (b *memoryBackend) FollowURL(uid string, url string) (microsub.Feed, error)
var feedID int
err = b.database.QueryRow(
`INSERT INTO "feeds" ("channel_id", "url") VALUES ($1, $2) RETURNING "id"`,
`INSERT INTO "feeds" ("channel_id", "url", "tier", "unmodified", "next_fetch_at") VALUES ($1, $2, 1, 0, now()) RETURNING "id"`,
channelID,
feed.URL,
subFeed.URL,
).Scan(&feedID)
if err != nil {
return feed, err
return subFeed, err
}
resp, err := b.Fetch3(uid, feed.URL)
var newFeed = feed{
ID: feedID,
UID: uid,
URL: url,
Tier: 1,
Unmodified: 0,
NextFetchAt: time.Now(),
}
resp, err := b.Fetch3(uid, subFeed.URL)
if err != nil {
log.Println(err)
b.addNotification("Error while fetching feed", feed.URL, err)
b.addNotification("Error while fetching feed", newFeed, err)
_ = b.updateChannelUnreadCount("notifications")
return feed, err
return subFeed, err
}
defer resp.Body.Close()
_ = b.ProcessContent(uid, fmt.Sprintf("%d", feedID), feed.URL, resp.Header.Get("Content-Type"), resp.Body)
_, _ = b.ProcessContent(uid, fmt.Sprintf("%d", feedID), subFeed.URL, resp.Header.Get("Content-Type"), resp.Body)
_, _ = b.hubBackend.CreateFeed(url)
return feed, nil
return subFeed, nil
}
func (b *memoryBackend) UnfollowURL(uid string, url string) error {
@ -584,31 +676,35 @@ func ProcessSourcedItems(fetcher fetch.Fetcher, fetchURL, contentType string, bo
// ContentProcessor processes content for a channel and feed
type ContentProcessor interface {
ProcessContent(channel, feedID, fetchURL, contentType string, body io.Reader) error
ProcessContent(channel, feedID, fetchURL, contentType string, body io.Reader) (bool, error)
}
func (b *memoryBackend) ProcessContent(channel, feedID, fetchURL, contentType string, body io.Reader) error {
// ProcessContent processes content of a feed, returns if the feed has changed or not
func (b *memoryBackend) ProcessContent(channel, feedID, fetchURL, contentType string, body io.Reader) (bool, error) {
cachingFetch := WithCaching(b.pool, fetch.FetcherFunc(Fetch2))
items, err := ProcessSourcedItems(cachingFetch, fetchURL, contentType, body)
if err != nil {
return err
return false, err
}
changed := false
for _, item := range items {
item.Source.ID = feedID
err = b.channelAddItemWithMatcher(channel, item)
added, err := b.channelAddItemWithMatcher(channel, item)
if err != nil {
log.Printf("ERROR: (feedID=%s) %s\n", feedID, err)
}
changed = changed && added
}
err = b.updateChannelUnreadCount(channel)
if err != nil {
return err
return changed, err
}
return nil
return changed, nil
}
// Fetch3 fills stuff
@ -617,17 +713,12 @@ func (b *memoryBackend) Fetch3(channel, fetchURL string) (*http.Response, error)
return Fetch2(fetchURL)
}
func (b *memoryBackend) channelAddItemWithMatcher(channel string, item microsub.Item) error {
func (b *memoryBackend) channelAddItemWithMatcher(channel string, item microsub.Item) (bool, error) {
// an item is posted
// check for all channels as channel
// if regex matches item
// - add item to channel
err := addToSearch(item, channel)
if err != nil {
return fmt.Errorf("addToSearch in channelAddItemWithMatcher: %v", err)
}
var updatedChannels []string
b.lock.RLock()
@ -640,23 +731,23 @@ func (b *memoryBackend) channelAddItemWithMatcher(channel string, item microsub.
switch v {
case "repost":
if len(item.RepostOf) > 0 {
return nil
return false, nil
}
case "like":
if len(item.LikeOf) > 0 {
return nil
return false, nil
}
case "bookmark":
if len(item.BookmarkOf) > 0 {
return nil
return false, nil
}
case "reply":
if len(item.InReplyTo) > 0 {
return nil
return false, nil
}
case "checkin":
if item.Checkin != nil {
return nil
return false, nil
}
}
}
@ -665,19 +756,27 @@ func (b *memoryBackend) channelAddItemWithMatcher(channel string, item microsub.
re, err := regexp.Compile(setting.IncludeRegex)
if err != nil {
log.Printf("error in regexp: %q, %s\n", setting.IncludeRegex, err)
return nil
return false, nil
}
if matchItem(item, re) {
log.Printf("Included %#v\n", item)
err := b.channelAddItem(channelKey, item)
added, err := b.channelAddItem(channelKey, item)
if err != nil {
continue
}
err = addToSearch(item, channel)
if err != nil {
return added, fmt.Errorf("addToSearch in channelAddItemWithMatcher: %v", err)
}
if added {
updatedChannels = append(updatedChannels, channelKey)
}
}
}
}
// Update all channels that have added items, because of the include matching
for _, value := range updatedChannels {
@ -697,15 +796,26 @@ func (b *memoryBackend) channelAddItemWithMatcher(channel string, item microsub.
excludeRegex, err := regexp.Compile(setting.ExcludeRegex)
if err != nil {
log.Printf("error in regexp: %q\n", excludeRegex)
return nil
return false, nil
}
if matchItem(item, excludeRegex) {
log.Printf("Excluded %#v\n", item)
return nil
return false, nil
}
}
return b.channelAddItem(channel, item)
added, err := b.channelAddItem(channel, item)
if err != nil {
return added, err
}
err = addToSearch(item, channel)
if err != nil {
return added, fmt.Errorf("addToSearch in channelAddItemWithMatcher: %v", err)
}
return added, nil
}
func matchItem(item microsub.Item, re *regexp.Regexp) bool {
@ -734,11 +844,11 @@ func matchItemText(item microsub.Item, re *regexp.Regexp) bool {
return re.MatchString(item.Name)
}
func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) error {
func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) (bool, error) {
timelineBackend := b.getTimeline(channel)
added, err := timelineBackend.AddItem(item)
if err != nil {
return err
return added, err
}
// Sent message to Server-Sent-Events
@ -746,7 +856,7 @@ func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) error
b.broker.Notifier <- sse.Message{Event: "new item", Object: newItemMessage{item, channel}}
}
return err
return added, err
}
func (b *memoryBackend) updateChannelUnreadCount(channel string) error {

View File

@ -99,7 +99,7 @@ func (h *micropubHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
item.ID = newID
err = h.Backend.channelAddItemWithMatcher(channel, *item)
_, err = h.Backend.channelAddItemWithMatcher(channel, *item)
if err != nil {
log.Printf("could not add item to channel %s: %v", channel, err)
}

1
go.mod
View File

@ -13,5 +13,6 @@ require (
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.7.0
golang.org/x/net v0.0.0-20211013171255-e13a2654a71e
golang.org/x/text v0.3.7
willnorris.com/go/microformats v1.1.0
)

1
go.sum
View File

@ -1234,6 +1234,7 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=

View File

@ -5,8 +5,10 @@ import (
"encoding/xml"
"fmt"
"sort"
"strings"
"time"
"golang.org/x/text/cases"
"golang.org/x/text/language"
)
func parseRSS1(data []byte) (*Feed, error) {
@ -29,6 +31,8 @@ func parseRSS1(data []byte) (*Feed, error) {
out.Description = channel.Description
out.Link = channel.Link
out.Image = channel.Image.Image()
titleCaser := cases.Title(language.English)
if channel.MinsToLive != 0 {
sort.Ints(channel.SkipHours)
next := time.Now().Add(time.Duration(channel.MinsToLive) * time.Minute)
@ -41,7 +45,7 @@ func parseRSS1(data []byte) (*Feed, error) {
for trying {
trying = false
for _, day := range channel.SkipDays {
if strings.Title(day) == next.Weekday().String() {
if titleCaser.String(day) == next.Weekday().String() {
next.Add(time.Duration(24-next.Hour()) * time.Hour)
trying = true
break

View File

@ -5,8 +5,10 @@ import (
"encoding/xml"
"fmt"
"sort"
"strings"
"time"
"golang.org/x/text/cases"
"golang.org/x/text/language"
)
func parseRSS2(data []byte) (*Feed, error) {
@ -38,6 +40,7 @@ func parseRSS2(data []byte) (*Feed, error) {
out.Image = channel.Image.Image()
if channel.MinsToLive != 0 {
titleCaser := cases.Title(language.English)
sort.Ints(channel.SkipHours)
next := time.Now().Add(time.Duration(channel.MinsToLive) * time.Minute)
for _, hour := range channel.SkipHours {
@ -49,7 +52,7 @@ func parseRSS2(data []byte) (*Feed, error) {
for trying {
trying = false
for _, day := range channel.SkipDays {
if strings.Title(day) == next.Weekday().String() {
if titleCaser.String(day) == next.Weekday().String() {
next.Add(time.Duration(24-next.Hour()) * time.Hour)
trying = true
break