Compare commits
4 Commits
dd1cf843e4
...
a3dd194472
Author | SHA1 | Date | |
---|---|---|---|
a3dd194472 | |||
a2f04e4d6e | |||
179955dbc7 | |||
c9543e7a83 |
|
@ -9,12 +9,13 @@ workspace:
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
- name: testing
|
- name: testing
|
||||||
image: golang:1.16-alpine
|
image: golang:1.18-alpine
|
||||||
environment:
|
environment:
|
||||||
CGO_ENABLED: 0
|
CGO_ENABLED: 0
|
||||||
GOOS: linux
|
GOOS: linux
|
||||||
GOARCH: amd64
|
GOARCH: amd64
|
||||||
commands:
|
commands:
|
||||||
|
- go version
|
||||||
- apk --no-cache add git
|
- apk --no-cache add git
|
||||||
- go get -d -t ./...
|
- go get -d -t ./...
|
||||||
- go install honnef.co/go/tools/cmd/staticcheck@latest
|
- go install honnef.co/go/tools/cmd/staticcheck@latest
|
||||||
|
|
|
@ -0,0 +1,4 @@
|
||||||
|
alter table "feeds"
|
||||||
|
drop column "tier",
|
||||||
|
drop column "unmodified",
|
||||||
|
drop column "next_fetch_at";
|
|
@ -0,0 +1,4 @@
|
||||||
|
alter table "feeds"
|
||||||
|
add column "tier" int default 0,
|
||||||
|
add column "unmodified" int default 0,
|
||||||
|
add column "next_fetch_at" timestamptz;
|
|
@ -158,7 +158,7 @@ func (h *hubIncomingBackend) UpdateFeed(processor ContentProcessor, subscription
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("Updating feed %s %q in %q\n", feedID, topic, channel)
|
log.Printf("Updating feed %s %q in %q\n", feedID, topic, channel)
|
||||||
err = processor.ProcessContent(channel, feedID, topic, contentType, body)
|
_, err = processor.ProcessContent(channel, feedID, topic, contentType, body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("could not process content for channel %s: %s", channel, err)
|
log.Printf("could not process content for channel %s: %s", channel, err)
|
||||||
}
|
}
|
||||||
|
@ -223,22 +223,24 @@ func (h *hubIncomingBackend) Subscribe(feed *Feed) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *hubIncomingBackend) run() error {
|
func (h *hubIncomingBackend) run() error {
|
||||||
ticker := time.NewTicker(10 * time.Minute)
|
ticker := time.NewTicker(1 * time.Minute)
|
||||||
quit := make(chan struct{})
|
quit := make(chan struct{})
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
log.Println("Getting feeds for WebSub")
|
log.Println("Getting feeds for WebSub started")
|
||||||
varWebsub.Add("runs", 1)
|
varWebsub.Add("runs", 1)
|
||||||
|
|
||||||
feeds, err := h.Feeds()
|
feeds, err := h.Feeds()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println("Feeds failed:", err)
|
log.Println("Feeds failed:", err)
|
||||||
|
log.Println("Getting feeds for WebSub completed")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Printf("Found %d feeds", len(feeds))
|
||||||
for _, feed := range feeds {
|
for _, feed := range feeds {
|
||||||
log.Printf("Looking at %s\n", feed.URL)
|
log.Printf("Looking at %s\n", feed.URL)
|
||||||
if feed.ResubscribeAt != nil && time.Now().After(*feed.ResubscribeAt) {
|
if feed.ResubscribeAt != nil && time.Now().After(*feed.ResubscribeAt) {
|
||||||
|
@ -254,6 +256,8 @@ func (h *hubIncomingBackend) run() error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Println("Getting feeds for WebSub completed")
|
||||||
case <-quit:
|
case <-quit:
|
||||||
ticker.Stop()
|
ticker.Stop()
|
||||||
return
|
return
|
||||||
|
|
|
@ -27,9 +27,11 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"log"
|
"log"
|
||||||
|
"math"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"regexp"
|
"regexp"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
@ -102,6 +104,15 @@ type newItemMessage struct {
|
||||||
Channel string `json:"channel"`
|
Channel string `json:"channel"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type feed struct {
|
||||||
|
UID string // channel
|
||||||
|
ID int
|
||||||
|
URL string
|
||||||
|
Tier int
|
||||||
|
Unmodified int
|
||||||
|
NextFetchAt time.Time
|
||||||
|
}
|
||||||
|
|
||||||
func (b *memoryBackend) AuthTokenAccepted(header string, r *auth.TokenResponse) (bool, error) {
|
func (b *memoryBackend) AuthTokenAccepted(header string, r *auth.TokenResponse) (bool, error) {
|
||||||
conn := b.pool.Get()
|
conn := b.pool.Get()
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -219,15 +230,22 @@ func (b *memoryBackend) ChannelsDelete(uid string) error {
|
||||||
b.broker.Notifier <- sse.Message{Event: "delete channel", Object: channelDeletedMessage{1, uid}}
|
b.broker.Notifier <- sse.Message{Event: "delete channel", Object: channelDeletedMessage{1, uid}}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
func (b *memoryBackend) updateFeed(feed feed) error {
|
||||||
type feed struct {
|
_, err := b.database.Exec(`
|
||||||
UID string // channel
|
UPDATE "feeds"
|
||||||
ID int
|
SET "tier" = $2, "unmodified" = $3, "next_fetch_at" = $4
|
||||||
URL string
|
WHERE "id" = $1
|
||||||
|
`, feed.ID, feed.Tier, feed.Unmodified, feed.NextFetchAt)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *memoryBackend) getFeeds() ([]feed, error) {
|
func (b *memoryBackend) getFeeds() ([]feed, error) {
|
||||||
rows, err := b.database.Query(`SELECT "f"."id", "f"."url", "c"."uid" FROM "feeds" AS "f" INNER JOIN public.channels c on c.id = f.channel_id`)
|
rows, err := b.database.Query(`
|
||||||
|
SELECT "f"."id", "f"."url", "c"."uid", "f"."tier","f"."unmodified","f"."next_fetch_at"
|
||||||
|
FROM "feeds" AS "f"
|
||||||
|
INNER JOIN public.channels c ON c.id = f.channel_id
|
||||||
|
WHERE next_fetch_at IS NULL OR next_fetch_at < now()
|
||||||
|
`)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -236,29 +254,49 @@ func (b *memoryBackend) getFeeds() ([]feed, error) {
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var feedID int
|
var feedID int
|
||||||
var feedURL, UID string
|
var feedURL, UID string
|
||||||
|
var tier, unmodified int
|
||||||
|
var nextFetchAt sql.NullTime
|
||||||
|
|
||||||
err = rows.Scan(&feedID, &feedURL, &UID)
|
err = rows.Scan(&feedID, &feedURL, &UID, &tier, &unmodified, &nextFetchAt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("while scanning feeds: %s", err)
|
log.Printf("while scanning feeds: %s", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
feeds = append(feeds, feed{UID, feedID, feedURL})
|
var fetchTime time.Time
|
||||||
|
if nextFetchAt.Valid {
|
||||||
|
fetchTime = nextFetchAt.Time
|
||||||
|
} else {
|
||||||
|
fetchTime = time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
feeds = append(
|
||||||
|
feeds,
|
||||||
|
feed{
|
||||||
|
UID: UID,
|
||||||
|
ID: feedID,
|
||||||
|
URL: feedURL,
|
||||||
|
Tier: tier,
|
||||||
|
Unmodified: unmodified,
|
||||||
|
NextFetchAt: fetchTime,
|
||||||
|
},
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
return feeds, nil
|
return feeds, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *memoryBackend) run() {
|
func (b *memoryBackend) run() {
|
||||||
b.ticker = time.NewTicker(10 * time.Minute)
|
b.ticker = time.NewTicker(1 * time.Minute)
|
||||||
b.quit = make(chan struct{})
|
b.quit = make(chan struct{})
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
b.RefreshFeeds()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-b.ticker.C:
|
case <-b.ticker.C:
|
||||||
b.RefreshFeeds()
|
b.RefreshFeeds()
|
||||||
|
|
||||||
case <-b.quit:
|
case <-b.quit:
|
||||||
b.ticker.Stop()
|
b.ticker.Stop()
|
||||||
return
|
return
|
||||||
|
@ -268,43 +306,89 @@ func (b *memoryBackend) run() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *memoryBackend) RefreshFeeds() {
|
func (b *memoryBackend) RefreshFeeds() {
|
||||||
|
log.Println("Feed update process started")
|
||||||
|
defer log.Println("Feed update process completed")
|
||||||
|
|
||||||
feeds, err := b.getFeeds()
|
feeds, err := b.getFeeds()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
count := 0
|
log.Printf("Found %d feeds", len(feeds))
|
||||||
|
|
||||||
|
count := 0
|
||||||
for _, feed := range feeds {
|
for _, feed := range feeds {
|
||||||
feedURL := feed.URL
|
log.Println("Processing", feed.URL)
|
||||||
feedID := feed.ID
|
err := b.refreshFeed(feed)
|
||||||
uid := feed.UID
|
|
||||||
log.Println("Processing", feedURL)
|
|
||||||
resp, err := b.Fetch3(uid, feedURL)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Error while Fetch3 of %s: %v\n", feedURL, err)
|
b.addNotification("Error while fetching feed", feed, err)
|
||||||
b.addNotification("Error while fetching feed", feedURL, err)
|
|
||||||
count++
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
err = b.ProcessContent(uid, fmt.Sprintf("%d", feedID), feedURL, resp.Header.Get("Content-Type"), resp.Body)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("Error while processing content for %s: %v\n", feedURL, err)
|
|
||||||
b.addNotification("Error while processing feed", feedURL, err)
|
|
||||||
count++
|
count++
|
||||||
continue
|
|
||||||
}
|
|
||||||
_ = resp.Body.Close()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if count > 0 {
|
if count > 0 {
|
||||||
_ = b.updateChannelUnreadCount("notifications")
|
_ = b.updateChannelUnreadCount("notifications")
|
||||||
}
|
}
|
||||||
|
log.Printf("Processed %d feeds", count)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *memoryBackend) addNotification(name string, feedURL string, err error) {
|
func (b *memoryBackend) refreshFeed(feed feed) error {
|
||||||
err = b.channelAddItem("notifications", microsub.Item{
|
resp, err := b.Fetch3(feed.UID, feed.URL)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("while Fetch3 of %s: %w", feed.URL, err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
changed, err := b.ProcessContent(feed.UID, fmt.Sprintf("%d", feed.ID), feed.URL, resp.Header.Get("Content-Type"), resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("in ProcessContent of %s: %w", feed.URL, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if changed {
|
||||||
|
feed.Tier--
|
||||||
|
} else {
|
||||||
|
feed.Unmodified++
|
||||||
|
}
|
||||||
|
|
||||||
|
if feed.Unmodified >= 2 {
|
||||||
|
feed.Tier++
|
||||||
|
feed.Unmodified = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
if feed.Tier > 10 {
|
||||||
|
feed.Tier = 10
|
||||||
|
}
|
||||||
|
|
||||||
|
if feed.Tier < 0 {
|
||||||
|
feed.Tier = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
minutes := time.Duration(math.Ceil(math.Exp2(float64(feed.Tier))))
|
||||||
|
|
||||||
|
feed.NextFetchAt = time.Now().Add(minutes * time.Minute)
|
||||||
|
|
||||||
|
log.Printf("Next Fetch in %d minutes at %v", minutes, feed.NextFetchAt.Format(time.RFC3339))
|
||||||
|
|
||||||
|
err = b.updateFeed(feed)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Error: while updating feed %v: %v", feed, err)
|
||||||
|
// don't return error, because it becomes a notification
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *memoryBackend) addNotification(name string, feed feed, err error) {
|
||||||
|
_, err = b.channelAddItem("notifications", microsub.Item{
|
||||||
Type: "entry",
|
Type: "entry",
|
||||||
|
Source: µsub.Source{
|
||||||
|
ID: strconv.Itoa(feed.ID),
|
||||||
|
URL: feed.URL,
|
||||||
|
Name: feed.URL,
|
||||||
|
},
|
||||||
Name: name,
|
Name: name,
|
||||||
Content: µsub.Content{
|
Content: µsub.Content{
|
||||||
Text: fmt.Sprintf("ERROR: while updating feed: %s", err),
|
Text: fmt.Sprintf("ERROR: while updating feed: %s", err),
|
||||||
|
@ -354,7 +438,7 @@ func (b *memoryBackend) FollowGetList(uid string) ([]microsub.Feed, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *memoryBackend) FollowURL(uid string, url string) (microsub.Feed, error) {
|
func (b *memoryBackend) FollowURL(uid string, url string) (microsub.Feed, error) {
|
||||||
feed := microsub.Feed{Type: "feed", URL: url}
|
subFeed := microsub.Feed{Type: "feed", URL: url}
|
||||||
|
|
||||||
var channelID int
|
var channelID int
|
||||||
err := b.database.QueryRow(`SELECT "id" FROM "channels" WHERE "uid" = $1`, uid).Scan(&channelID)
|
err := b.database.QueryRow(`SELECT "id" FROM "channels" WHERE "uid" = $1`, uid).Scan(&channelID)
|
||||||
|
@ -367,28 +451,36 @@ func (b *memoryBackend) FollowURL(uid string, url string) (microsub.Feed, error)
|
||||||
|
|
||||||
var feedID int
|
var feedID int
|
||||||
err = b.database.QueryRow(
|
err = b.database.QueryRow(
|
||||||
`INSERT INTO "feeds" ("channel_id", "url") VALUES ($1, $2) RETURNING "id"`,
|
`INSERT INTO "feeds" ("channel_id", "url", "tier", "unmodified", "next_fetch_at") VALUES ($1, $2, 1, 0, now()) RETURNING "id"`,
|
||||||
channelID,
|
channelID,
|
||||||
feed.URL,
|
subFeed.URL,
|
||||||
).Scan(&feedID)
|
).Scan(&feedID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return feed, err
|
return subFeed, err
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := b.Fetch3(uid, feed.URL)
|
var newFeed = feed{
|
||||||
|
ID: feedID,
|
||||||
|
UID: uid,
|
||||||
|
URL: url,
|
||||||
|
Tier: 1,
|
||||||
|
Unmodified: 0,
|
||||||
|
NextFetchAt: time.Now(),
|
||||||
|
}
|
||||||
|
resp, err := b.Fetch3(uid, subFeed.URL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println(err)
|
log.Println(err)
|
||||||
b.addNotification("Error while fetching feed", feed.URL, err)
|
b.addNotification("Error while fetching feed", newFeed, err)
|
||||||
_ = b.updateChannelUnreadCount("notifications")
|
_ = b.updateChannelUnreadCount("notifications")
|
||||||
return feed, err
|
return subFeed, err
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
_ = b.ProcessContent(uid, fmt.Sprintf("%d", feedID), feed.URL, resp.Header.Get("Content-Type"), resp.Body)
|
_, _ = b.ProcessContent(uid, fmt.Sprintf("%d", feedID), subFeed.URL, resp.Header.Get("Content-Type"), resp.Body)
|
||||||
|
|
||||||
_, _ = b.hubBackend.CreateFeed(url)
|
_, _ = b.hubBackend.CreateFeed(url)
|
||||||
|
|
||||||
return feed, nil
|
return subFeed, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *memoryBackend) UnfollowURL(uid string, url string) error {
|
func (b *memoryBackend) UnfollowURL(uid string, url string) error {
|
||||||
|
@ -584,31 +676,35 @@ func ProcessSourcedItems(fetcher fetch.Fetcher, fetchURL, contentType string, bo
|
||||||
|
|
||||||
// ContentProcessor processes content for a channel and feed
|
// ContentProcessor processes content for a channel and feed
|
||||||
type ContentProcessor interface {
|
type ContentProcessor interface {
|
||||||
ProcessContent(channel, feedID, fetchURL, contentType string, body io.Reader) error
|
ProcessContent(channel, feedID, fetchURL, contentType string, body io.Reader) (bool, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *memoryBackend) ProcessContent(channel, feedID, fetchURL, contentType string, body io.Reader) error {
|
// ProcessContent processes content of a feed, returns if the feed has changed or not
|
||||||
|
func (b *memoryBackend) ProcessContent(channel, feedID, fetchURL, contentType string, body io.Reader) (bool, error) {
|
||||||
cachingFetch := WithCaching(b.pool, fetch.FetcherFunc(Fetch2))
|
cachingFetch := WithCaching(b.pool, fetch.FetcherFunc(Fetch2))
|
||||||
|
|
||||||
items, err := ProcessSourcedItems(cachingFetch, fetchURL, contentType, body)
|
items, err := ProcessSourcedItems(cachingFetch, fetchURL, contentType, body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
changed := false
|
||||||
|
|
||||||
for _, item := range items {
|
for _, item := range items {
|
||||||
item.Source.ID = feedID
|
item.Source.ID = feedID
|
||||||
err = b.channelAddItemWithMatcher(channel, item)
|
added, err := b.channelAddItemWithMatcher(channel, item)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("ERROR: (feedID=%s) %s\n", feedID, err)
|
log.Printf("ERROR: (feedID=%s) %s\n", feedID, err)
|
||||||
}
|
}
|
||||||
|
changed = changed && added
|
||||||
}
|
}
|
||||||
|
|
||||||
err = b.updateChannelUnreadCount(channel)
|
err = b.updateChannelUnreadCount(channel)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return changed, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return changed, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fetch3 fills stuff
|
// Fetch3 fills stuff
|
||||||
|
@ -617,17 +713,12 @@ func (b *memoryBackend) Fetch3(channel, fetchURL string) (*http.Response, error)
|
||||||
return Fetch2(fetchURL)
|
return Fetch2(fetchURL)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *memoryBackend) channelAddItemWithMatcher(channel string, item microsub.Item) error {
|
func (b *memoryBackend) channelAddItemWithMatcher(channel string, item microsub.Item) (bool, error) {
|
||||||
// an item is posted
|
// an item is posted
|
||||||
// check for all channels as channel
|
// check for all channels as channel
|
||||||
// if regex matches item
|
// if regex matches item
|
||||||
// - add item to channel
|
// - add item to channel
|
||||||
|
|
||||||
err := addToSearch(item, channel)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("addToSearch in channelAddItemWithMatcher: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var updatedChannels []string
|
var updatedChannels []string
|
||||||
|
|
||||||
b.lock.RLock()
|
b.lock.RLock()
|
||||||
|
@ -640,23 +731,23 @@ func (b *memoryBackend) channelAddItemWithMatcher(channel string, item microsub.
|
||||||
switch v {
|
switch v {
|
||||||
case "repost":
|
case "repost":
|
||||||
if len(item.RepostOf) > 0 {
|
if len(item.RepostOf) > 0 {
|
||||||
return nil
|
return false, nil
|
||||||
}
|
}
|
||||||
case "like":
|
case "like":
|
||||||
if len(item.LikeOf) > 0 {
|
if len(item.LikeOf) > 0 {
|
||||||
return nil
|
return false, nil
|
||||||
}
|
}
|
||||||
case "bookmark":
|
case "bookmark":
|
||||||
if len(item.BookmarkOf) > 0 {
|
if len(item.BookmarkOf) > 0 {
|
||||||
return nil
|
return false, nil
|
||||||
}
|
}
|
||||||
case "reply":
|
case "reply":
|
||||||
if len(item.InReplyTo) > 0 {
|
if len(item.InReplyTo) > 0 {
|
||||||
return nil
|
return false, nil
|
||||||
}
|
}
|
||||||
case "checkin":
|
case "checkin":
|
||||||
if item.Checkin != nil {
|
if item.Checkin != nil {
|
||||||
return nil
|
return false, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -665,19 +756,27 @@ func (b *memoryBackend) channelAddItemWithMatcher(channel string, item microsub.
|
||||||
re, err := regexp.Compile(setting.IncludeRegex)
|
re, err := regexp.Compile(setting.IncludeRegex)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("error in regexp: %q, %s\n", setting.IncludeRegex, err)
|
log.Printf("error in regexp: %q, %s\n", setting.IncludeRegex, err)
|
||||||
return nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if matchItem(item, re) {
|
if matchItem(item, re) {
|
||||||
log.Printf("Included %#v\n", item)
|
log.Printf("Included %#v\n", item)
|
||||||
err := b.channelAddItem(channelKey, item)
|
added, err := b.channelAddItem(channelKey, item)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = addToSearch(item, channel)
|
||||||
|
if err != nil {
|
||||||
|
return added, fmt.Errorf("addToSearch in channelAddItemWithMatcher: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if added {
|
||||||
updatedChannels = append(updatedChannels, channelKey)
|
updatedChannels = append(updatedChannels, channelKey)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Update all channels that have added items, because of the include matching
|
// Update all channels that have added items, because of the include matching
|
||||||
for _, value := range updatedChannels {
|
for _, value := range updatedChannels {
|
||||||
|
@ -697,15 +796,26 @@ func (b *memoryBackend) channelAddItemWithMatcher(channel string, item microsub.
|
||||||
excludeRegex, err := regexp.Compile(setting.ExcludeRegex)
|
excludeRegex, err := regexp.Compile(setting.ExcludeRegex)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("error in regexp: %q\n", excludeRegex)
|
log.Printf("error in regexp: %q\n", excludeRegex)
|
||||||
return nil
|
return false, nil
|
||||||
}
|
}
|
||||||
if matchItem(item, excludeRegex) {
|
if matchItem(item, excludeRegex) {
|
||||||
log.Printf("Excluded %#v\n", item)
|
log.Printf("Excluded %#v\n", item)
|
||||||
return nil
|
return false, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return b.channelAddItem(channel, item)
|
added, err := b.channelAddItem(channel, item)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return added, err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = addToSearch(item, channel)
|
||||||
|
if err != nil {
|
||||||
|
return added, fmt.Errorf("addToSearch in channelAddItemWithMatcher: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return added, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func matchItem(item microsub.Item, re *regexp.Regexp) bool {
|
func matchItem(item microsub.Item, re *regexp.Regexp) bool {
|
||||||
|
@ -734,11 +844,11 @@ func matchItemText(item microsub.Item, re *regexp.Regexp) bool {
|
||||||
return re.MatchString(item.Name)
|
return re.MatchString(item.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) error {
|
func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) (bool, error) {
|
||||||
timelineBackend := b.getTimeline(channel)
|
timelineBackend := b.getTimeline(channel)
|
||||||
added, err := timelineBackend.AddItem(item)
|
added, err := timelineBackend.AddItem(item)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return added, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sent message to Server-Sent-Events
|
// Sent message to Server-Sent-Events
|
||||||
|
@ -746,7 +856,7 @@ func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) error
|
||||||
b.broker.Notifier <- sse.Message{Event: "new item", Object: newItemMessage{item, channel}}
|
b.broker.Notifier <- sse.Message{Event: "new item", Object: newItemMessage{item, channel}}
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return added, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *memoryBackend) updateChannelUnreadCount(channel string) error {
|
func (b *memoryBackend) updateChannelUnreadCount(channel string) error {
|
||||||
|
|
|
@ -99,7 +99,7 @@ func (h *micropubHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
item.ID = newID
|
item.ID = newID
|
||||||
|
|
||||||
err = h.Backend.channelAddItemWithMatcher(channel, *item)
|
_, err = h.Backend.channelAddItemWithMatcher(channel, *item)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("could not add item to channel %s: %v", channel, err)
|
log.Printf("could not add item to channel %s: %v", channel, err)
|
||||||
}
|
}
|
||||||
|
|
1
go.mod
1
go.mod
|
@ -13,5 +13,6 @@ require (
|
||||||
github.com/pkg/errors v0.9.1
|
github.com/pkg/errors v0.9.1
|
||||||
github.com/stretchr/testify v1.7.0
|
github.com/stretchr/testify v1.7.0
|
||||||
golang.org/x/net v0.0.0-20211013171255-e13a2654a71e
|
golang.org/x/net v0.0.0-20211013171255-e13a2654a71e
|
||||||
|
golang.org/x/text v0.3.7
|
||||||
willnorris.com/go/microformats v1.1.0
|
willnorris.com/go/microformats v1.1.0
|
||||||
)
|
)
|
||||||
|
|
1
go.sum
1
go.sum
|
@ -1234,6 +1234,7 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||||
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||||
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||||
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||||
|
golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
|
||||||
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
|
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
|
||||||
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||||
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||||
|
|
|
@ -5,8 +5,10 @@ import (
|
||||||
"encoding/xml"
|
"encoding/xml"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"golang.org/x/text/cases"
|
||||||
|
"golang.org/x/text/language"
|
||||||
)
|
)
|
||||||
|
|
||||||
func parseRSS1(data []byte) (*Feed, error) {
|
func parseRSS1(data []byte) (*Feed, error) {
|
||||||
|
@ -29,6 +31,8 @@ func parseRSS1(data []byte) (*Feed, error) {
|
||||||
out.Description = channel.Description
|
out.Description = channel.Description
|
||||||
out.Link = channel.Link
|
out.Link = channel.Link
|
||||||
out.Image = channel.Image.Image()
|
out.Image = channel.Image.Image()
|
||||||
|
|
||||||
|
titleCaser := cases.Title(language.English)
|
||||||
if channel.MinsToLive != 0 {
|
if channel.MinsToLive != 0 {
|
||||||
sort.Ints(channel.SkipHours)
|
sort.Ints(channel.SkipHours)
|
||||||
next := time.Now().Add(time.Duration(channel.MinsToLive) * time.Minute)
|
next := time.Now().Add(time.Duration(channel.MinsToLive) * time.Minute)
|
||||||
|
@ -41,7 +45,7 @@ func parseRSS1(data []byte) (*Feed, error) {
|
||||||
for trying {
|
for trying {
|
||||||
trying = false
|
trying = false
|
||||||
for _, day := range channel.SkipDays {
|
for _, day := range channel.SkipDays {
|
||||||
if strings.Title(day) == next.Weekday().String() {
|
if titleCaser.String(day) == next.Weekday().String() {
|
||||||
next.Add(time.Duration(24-next.Hour()) * time.Hour)
|
next.Add(time.Duration(24-next.Hour()) * time.Hour)
|
||||||
trying = true
|
trying = true
|
||||||
break
|
break
|
||||||
|
|
|
@ -5,8 +5,10 @@ import (
|
||||||
"encoding/xml"
|
"encoding/xml"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"golang.org/x/text/cases"
|
||||||
|
"golang.org/x/text/language"
|
||||||
)
|
)
|
||||||
|
|
||||||
func parseRSS2(data []byte) (*Feed, error) {
|
func parseRSS2(data []byte) (*Feed, error) {
|
||||||
|
@ -38,6 +40,7 @@ func parseRSS2(data []byte) (*Feed, error) {
|
||||||
|
|
||||||
out.Image = channel.Image.Image()
|
out.Image = channel.Image.Image()
|
||||||
if channel.MinsToLive != 0 {
|
if channel.MinsToLive != 0 {
|
||||||
|
titleCaser := cases.Title(language.English)
|
||||||
sort.Ints(channel.SkipHours)
|
sort.Ints(channel.SkipHours)
|
||||||
next := time.Now().Add(time.Duration(channel.MinsToLive) * time.Minute)
|
next := time.Now().Add(time.Duration(channel.MinsToLive) * time.Minute)
|
||||||
for _, hour := range channel.SkipHours {
|
for _, hour := range channel.SkipHours {
|
||||||
|
@ -49,7 +52,7 @@ func parseRSS2(data []byte) (*Feed, error) {
|
||||||
for trying {
|
for trying {
|
||||||
trying = false
|
trying = false
|
||||||
for _, day := range channel.SkipDays {
|
for _, day := range channel.SkipDays {
|
||||||
if strings.Title(day) == next.Weekday().String() {
|
if titleCaser.String(day) == next.Weekday().String() {
|
||||||
next.Add(time.Duration(24-next.Hour()) * time.Hour)
|
next.Add(time.Duration(24-next.Hour()) * time.Hour)
|
||||||
trying = true
|
trying = true
|
||||||
break
|
break
|
||||||
|
|
Loading…
Reference in New Issue
Block a user