Compare commits

...

2 Commits

Author SHA1 Message Date
3a43844e93
Fix foreign key errors for feed_id
All checks were successful
continuous-integration/drone/push Build is passing
Fix usage of QueryRow.Scan
2021-10-31 14:51:31 +01:00
26b85152fd
Move subscriptions to database 2021-10-31 14:24:41 +01:00
7 changed files with 124 additions and 181 deletions

View File

@ -1,6 +1,6 @@
CREATE TABLE "feeds" ( CREATE TABLE "feeds" (
"id" int primary key generated always as identity, "id" int primary key generated always as identity,
"channel_id" int references "channels" on update cascade on delete cascade, "channel_id" int references "channels" (id) on update cascade on delete cascade,
"url" varchar(512) not null unique, "url" varchar(512) not null unique,
"created_at" timestamptz DEFAULT current_timestamp, "created_at" timestamptz DEFAULT current_timestamp,
"updated_at" timestamptz "updated_at" timestamptz

View File

@ -0,0 +1 @@
DROP TABLE "subscriptions";

View File

@ -0,0 +1,12 @@
CREATE TABLE "subscriptions" (
"id" int primary key generated always as identity,
"topic" varchar(1024) not null references "feeds" ("url") on update cascade on delete cascade,
"hub" varchar(1024) null,
"callback" varchar(1024) null,
"subscription_secret" varchar(32) not null,
"url_secret" varchar(32) not null,
"lease_seconds" int not null,
"created_at" timestamptz DEFAULT current_timestamp,
"updated_at" timestamptz,
"resubscribe_at" timestamptz
);

View File

@ -6,12 +6,8 @@ import (
"io" "io"
"log" "log"
"net/http" "net/http"
"net/url"
"strconv"
"strings"
"time" "time"
"github.com/pkg/errors"
"p83.nl/go/ekster/pkg/util" "p83.nl/go/ekster/pkg/util"
"p83.nl/go/ekster/pkg/websub" "p83.nl/go/ekster/pkg/websub"
@ -23,9 +19,8 @@ const LeaseSeconds = 24 * 60 * 60
// HubBackend handles information for the incoming handler // HubBackend handles information for the incoming handler
type HubBackend interface { type HubBackend interface {
GetFeeds() []Feed // Deprecated
Feeds() ([]Feed, error) Feeds() ([]Feed, error)
CreateFeed(url, channel string) (int64, error) CreateFeed(url string) (int64, error)
GetSecret(feedID int64) string GetSecret(feedID int64) string
UpdateFeed(feedID int64, contentType string, body io.Reader) error UpdateFeed(feedID int64, contentType string, body io.Reader) error
FeedSetLeaseSeconds(feedID int64, leaseSeconds int64) error FeedSetLeaseSeconds(feedID int64, leaseSeconds int64) error
@ -40,14 +35,13 @@ type hubIncomingBackend struct {
// Feed contains information about the feed subscriptions // Feed contains information about the feed subscriptions
type Feed struct { type Feed struct {
ID int64 `redis:"id"` ID int64
Channel string `redis:"channel"` URL string
URL string `redis:"url"` Callback string
Callback string `redis:"callback"` Hub string
Hub string `redis:"hub"` Secret string
Secret string `redis:"secret"` LeaseSeconds int64
LeaseSeconds int64 `redis:"lease_seconds"` ResubscribeAt time.Time
ResubscribeAt int64 `redis:"resubscribe_at"`
} }
var ( var (
@ -59,29 +53,34 @@ func init() {
} }
func (h *hubIncomingBackend) GetSecret(id int64) string { func (h *hubIncomingBackend) GetSecret(id int64) string {
conn := h.pool.Get() db := h.backend.database
defer conn.Close() var secret string
secret, err := redis.String(conn.Do("HGET", fmt.Sprintf("feed:%d", id), "secret")) err := db.QueryRow(
`select "subscription_secret" from "subscriptions" where "id" = $1`,
id,
).Scan(&secret)
if err != nil { if err != nil {
return "" return ""
} }
return secret return secret
} }
func (h *hubIncomingBackend) CreateFeed(topic string, channel string) (int64, error) { func (h *hubIncomingBackend) CreateFeed(topic string) (int64, error) {
conn := h.pool.Get() db := h.backend.database
defer conn.Close()
// TODO(peter): check if topic already is registered secret := util.RandStringBytes(32)
id, err := redis.Int64(conn.Do("INCR", "feed:next_id")) urlSecret := util.RandStringBytes(32)
var subscriptionID int
err := db.QueryRow(`
INSERT INTO "subscriptions" ("topic","subscription_secret", "url_secret", "lease_seconds", "created_at")
VALUES ($1, $2, $3, $4, DEFAULT) RETURNING "id"`, topic, secret, urlSecret, 60*60*24*7).Scan(&subscriptionID)
if err != nil { if err != nil {
return 0, err return 0, err
} }
if err != nil {
conn.Do("HSET", fmt.Sprintf("feed:%d", id), "url", topic) return 0, fmt.Errorf("insert into subscriptions: %w", err)
conn.Do("HSET", fmt.Sprintf("feed:%d", id), "channel", channel) }
secret := util.RandStringBytes(16)
conn.Do("HSET", fmt.Sprintf("feed:%d", id), "secret", secret)
client := &http.Client{} client := &http.Client{}
@ -91,136 +90,95 @@ func (h *hubIncomingBackend) CreateFeed(topic string, channel string) (int64, er
return 0, err return 0, err
} }
callbackURL := fmt.Sprintf("%s/incoming/%d", h.baseURL, id) callbackURL := fmt.Sprintf("%s/incoming/%d", h.baseURL, subscriptionID)
log.Printf("WebSub Hub URL found for topic=%q hub=%q callback=%q\n", topic, hubURL, callbackURL) log.Printf("WebSub Hub URL found for topic=%q hub=%q callback=%q\n", topic, hubURL, callbackURL)
if err == nil && hubURL != "" { if err == nil && hubURL != "" {
args := redis.Args{}.Add(fmt.Sprintf("feed:%d", id), "hub", hubURL, "callback", callbackURL) _, err := db.Exec(`UPDATE subscriptions SET hub = $1, callback = $2 WHERE id = $3`, subscriptionID, hubURL, callbackURL)
_, err = conn.Do("HMSET", args...)
if err != nil { if err != nil {
return 0, errors.Wrap(err, "could not write to redis backend") return 0, fmt.Errorf("save hub and callback: %w", err)
} }
} else { } else {
return id, nil return int64(subscriptionID), nil
} }
err = websub.Subscribe(client, hubURL, topic, callbackURL, secret, 24*3600) err = websub.Subscribe(client, hubURL, topic, callbackURL, secret, 24*3600)
if err != nil { if err != nil {
return 0, err return 0, fmt.Errorf("subscribe: %w", err)
} }
return id, nil return int64(subscriptionID), nil
} }
func (h *hubIncomingBackend) UpdateFeed(feedID int64, contentType string, body io.Reader) error { func (h *hubIncomingBackend) UpdateFeed(subscriptionID int64, contentType string, body io.Reader) error {
conn := h.pool.Get() db := h.backend.database
defer conn.Close() var (
log.Printf("updating feed %d", feedID) topic string
u, err := redis.String(conn.Do("HGET", fmt.Sprintf("feed:%d", feedID), "url")) channel string
if err != nil { feedID string
return err )
}
channel, err := redis.String(conn.Do("HGET", fmt.Sprintf("feed:%d", feedID), "channel")) // Process all channels that contains this feed
rows, err := db.Query(
`select topic, c.uid, f.id from subscriptions s inner join feeds f on f.url = s.topic inner join channels c on c.id = f.channel_id where s.id = $1`,
subscriptionID,
)
if err != nil { if err != nil {
return err return err
} }
// FIXME: feed id for incoming websub content for rows.Next() {
log.Printf("Updating feed %d - %s %s\n", feedID, u, channel) err = rows.Scan(&topic, channel, feedID)
err = h.backend.ProcessContent(channel, fmt.Sprintf("incoming:%d", feedID), u, contentType, body) if err != nil {
if err != nil { log.Println(err)
log.Printf("could not process content for channel %s: %s", channel, err) continue
}
log.Printf("Updating feed %s %q in %q\n", feedID, topic, channel)
err = h.backend.ProcessContent(channel, feedID, topic, contentType, body)
if err != nil {
log.Printf("could not process content for channel %s: %s", channel, err)
}
} }
return err return err
} }
func (h *hubIncomingBackend) FeedSetLeaseSeconds(feedID int64, leaseSeconds int64) error { func (h *hubIncomingBackend) FeedSetLeaseSeconds(subscriptionID int64, leaseSeconds int64) error {
conn := h.pool.Get() db := h.backend.database
defer conn.Close() _, err := db.Exec("update subscriptions set lease_seconds = $1, resubscribe_at = now() + $1 * interval '1' second where id = $2", leaseSeconds, subscriptionID)
log.Printf("updating feed %d lease_seconds", feedID) return err
args := redis.Args{}.Add(fmt.Sprintf("feed:%d", feedID), "lease_seconds", leaseSeconds, "resubscribe_at", time.Now().Add(time.Duration(60*(leaseSeconds-15))*time.Second).Unix())
_, err := conn.Do("HMSET", args...)
if err != nil {
log.Println(err)
return err
}
return nil
}
// GetFeeds is deprecated, use Feeds instead
func (h *hubIncomingBackend) GetFeeds() []Feed {
log.Println("GetFeeds called, consider replacing with Feeds")
feeds, err := h.Feeds()
if err != nil {
log.Printf("Feeds returned an error: %v", err)
}
return feeds
} }
// Feeds returns a list of subscribed feeds // Feeds returns a list of subscribed feeds
func (h *hubIncomingBackend) Feeds() ([]Feed, error) { func (h *hubIncomingBackend) Feeds() ([]Feed, error) {
conn := h.pool.Get() db := h.backend.database
defer conn.Close() var feeds []Feed
feeds := []Feed{}
// FIXME(peter): replace with set of currently checked feeds rows, err := db.Query(`
feedKeys, err := redis.Strings(conn.Do("KEYS", "feed:*")) select s.id, c.uid, topic, hub, callback, subscription_secret, lease_seconds, resubscribe_at
if err != nil { from subscriptions s
return nil, errors.Wrap(err, "could not get feeds from backend") inner join feeds f on f.url = s.topic
} inner join channels c on c.id = f.channel_id
`)
for _, feedKey := range feedKeys { for rows.Next() {
var feed Feed var feed Feed
values, err := redis.Values(conn.Do("HGETALL", feedKey))
err = rows.Scan(
&feed.ID,
&feed.URL,
&feed.Hub,
&feed.Callback,
&feed.Secret,
&feed.LeaseSeconds,
&feed.ResubscribeAt,
)
if err != nil { if err != nil {
log.Printf("could not get feed info for key %s: %v", feedKey, err) log.Println("Feeds: scan subscriptions:", err)
continue continue
} }
err = redis.ScanStruct(values, &feed)
if err != nil {
log.Printf("could not scan struct for key %s: %v", feedKey, err)
continue
}
// Add feed id
if feed.ID == 0 {
parts := strings.Split(feedKey, ":")
if len(parts) == 2 {
feed.ID, _ = strconv.ParseInt(parts[1], 10, 64)
_, err = conn.Do("HSET", feedKey, "id", feed.ID)
if err != nil {
log.Printf("could not save id for %s: %v", feedKey, err)
}
}
}
// Fix the callback url
callbackURL, err := url.Parse(feed.Callback)
if err != nil || !callbackURL.IsAbs() {
if err != nil {
log.Printf("could not parse callback url %q: %v", callbackURL, err)
} else {
log.Printf("url is relative; replace with absolute url: %q", callbackURL)
}
feed.Callback = fmt.Sprintf("%s/incoming/%d", h.baseURL, feed.ID)
_, err = conn.Do("HSET", feedKey, "callback", feed.Callback)
if err != nil {
log.Printf("could not save id for %s: %v", feedKey, err)
}
}
// Skip feeds without a Hub
if feed.Hub == "" {
continue
}
log.Printf("Websub feed: %#v\n", feed)
feeds = append(feeds, feed) feeds = append(feeds, feed)
} }
@ -245,11 +203,13 @@ func (h *hubIncomingBackend) run() error {
feeds, err := h.Feeds() feeds, err := h.Feeds()
if err != nil { if err != nil {
log.Println("Feeds failed:", err)
continue
} }
for _, feed := range feeds { for _, feed := range feeds {
log.Printf("Looking at %s\n", feed.URL) log.Printf("Looking at %s\n", feed.URL)
if feed.ResubscribeAt == 0 || time.Now().After(time.Unix(feed.ResubscribeAt, 0)) { if time.Now().After(feed.ResubscribeAt) {
if feed.Callback == "" { if feed.Callback == "" {
feed.Callback = fmt.Sprintf("%s/incoming/%d", h.baseURL, feed.ID) feed.Callback = fmt.Sprintf("%s/incoming/%d", h.baseURL, feed.ID)
} }

View File

@ -312,23 +312,23 @@ func (b *memoryBackend) FollowURL(uid string, url string) (microsub.Feed, error)
feed := microsub.Feed{Type: "feed", URL: url} feed := microsub.Feed{Type: "feed", URL: url}
var channelID int var channelID int
if row := b.database.QueryRow(`SELECT "id" FROM "channels" WHERE "uid" = $1`, uid); row != nil { err := b.database.QueryRow(`SELECT "id" FROM "channels" WHERE "uid" = $1`, uid).Scan(&channelID)
err := row.Scan(&channelID) if err != nil {
if err != nil { if err == sql.ErrNoRows {
log.Fatal(err) return microsub.Feed{}, fmt.Errorf("channel does not exist: %w", err)
} }
return microsub.Feed{}, err
} }
row := b.database.QueryRow( var feedID int
err = b.database.QueryRow(
`INSERT INTO "feeds" ("channel_id", "url") VALUES ($1, $2) RETURNING "id"`, `INSERT INTO "feeds" ("channel_id", "url") VALUES ($1, $2) RETURNING "id"`,
channelID, channelID,
feed.URL, feed.URL,
) ).Scan(&feedID)
if row == nil { if err != nil {
return microsub.Feed{}, fmt.Errorf("no feed_id") return feed, err
} }
var feedID int
_ = row.Scan(&feedID)
resp, err := b.Fetch3(uid, feed.URL) resp, err := b.Fetch3(uid, feed.URL)
if err != nil { if err != nil {
@ -804,45 +804,3 @@ func (b *memoryBackend) getTimeline(channel string) timeline.Backend {
} }
return tl return tl
} }
func (b *memoryBackend) createChannel(name string) microsub.Channel {
uid := fmt.Sprintf("%012d", b.NextUID)
channel := microsub.Channel{
UID: uid,
Name: name,
Unread: microsub.Unread{Type: microsub.UnreadCount},
}
return channel
}
func (b *memoryBackend) fetchChannel(name string) (microsub.Channel, bool) {
b.lock.RLock()
defer b.lock.RUnlock()
for _, c := range b.Channels {
if c.Name == name {
return c, true
}
}
return microsub.Channel{}, false
}
func (b *memoryBackend) setChannel(channel microsub.Channel) {
b.lock.Lock()
defer b.lock.Unlock()
b.Channels[channel.UID] = channel
b.Feeds[channel.UID] = []microsub.Feed{}
b.NextUID++
}
func updateChannelInRedis(conn redis.Conn, uid string, prio int) {
conn.Do("SADD", "channels", uid)
conn.Do("SETNX", "channel_sortorder_"+uid, prio)
}
func removeChannelFromRedis(conn redis.Conn, uid string) {
conn.Do("SREM", "channels", uid)
conn.Do("DEL", "channel_sortorder_"+uid)
}

1
go.mod
View File

@ -5,6 +5,7 @@ go 1.16
require ( require (
github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394 github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394
github.com/blevesearch/bleve/v2 v2.0.3 github.com/blevesearch/bleve/v2 v2.0.3
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/gilliek/go-opml v1.0.0 github.com/gilliek/go-opml v1.0.0
github.com/golang-migrate/migrate/v4 v4.15.1 github.com/golang-migrate/migrate/v4 v4.15.1
github.com/gomodule/redigo v1.8.2 github.com/gomodule/redigo v1.8.2

View File

@ -212,20 +212,31 @@ func (p *postgresStream) AddItem(item microsub.Item) (bool, error) {
t = t2 t = t2
} }
if item.UID == "" { if item.UID == "" {
// FIXME: This won't work when we receive the item multiple times
h := sha256.Sum256([]byte(fmt.Sprintf("%s:%d", p.channel, time.Now().UnixNano()))) h := sha256.Sum256([]byte(fmt.Sprintf("%s:%d", p.channel, time.Now().UnixNano())))
item.UID = hex.EncodeToString(h[:]) item.UID = hex.EncodeToString(h[:])
} }
feedID, err := strconv.ParseInt(item.Source.ID, 10, 64) var optFeedID sql.NullInt64
if err != nil { if item.Source == nil || item.Source.ID == "" {
return false, fmt.Errorf("ERROR: item.Source.ID is not an integer %q: %w", item.Source.ID, err) optFeedID.Valid = false
optFeedID.Int64 = 0
} else {
feedID, err := strconv.ParseInt(item.Source.ID, 10, 64)
if err != nil {
optFeedID.Valid = false
optFeedID.Int64 = 0
} else {
optFeedID.Valid = true
optFeedID.Int64 = feedID
}
} }
result, err := conn.ExecContext(context.Background(), ` result, err := conn.ExecContext(context.Background(), `
INSERT INTO "items" ("channel_id", "feed_id", "uid", "data", "published_at", "created_at") INSERT INTO "items" ("channel_id", "feed_id", "uid", "data", "published_at", "created_at")
VALUES ($1, $2, $3, $4, $5, DEFAULT) VALUES ($1, $2, $3, $4, $5, DEFAULT)
ON CONFLICT ON CONSTRAINT "items_uid_key" DO NOTHING ON CONFLICT ON CONSTRAINT "items_uid_key" DO NOTHING
`, p.channelID, feedID, item.UID, &item, t) `, p.channelID, optFeedID, item.UID, &item, t)
if err != nil { if err != nil {
return false, fmt.Errorf("insert item: %w", err) return false, fmt.Errorf("insert item: %w", err)
} }