Move subscriptions to database

This commit is contained in:
Peter Stuifzand 2021-10-31 14:24:41 +01:00
parent 4f6ea0efb2
commit 26b85152fd
Signed by: peter
GPG Key ID: 374322D56E5209E8
5 changed files with 98 additions and 167 deletions

View File

@ -1,6 +1,6 @@
CREATE TABLE "feeds" ( CREATE TABLE "feeds" (
"id" int primary key generated always as identity, "id" int primary key generated always as identity,
"channel_id" int references "channels" on update cascade on delete cascade, "channel_id" int references "channels" (id) on update cascade on delete cascade,
"url" varchar(512) not null unique, "url" varchar(512) not null unique,
"created_at" timestamptz DEFAULT current_timestamp, "created_at" timestamptz DEFAULT current_timestamp,
"updated_at" timestamptz "updated_at" timestamptz

View File

@ -0,0 +1 @@
DROP TABLE "subscriptions";

View File

@ -0,0 +1,12 @@
CREATE TABLE "subscriptions" (
"id" int primary key generated always as identity,
"topic" varchar(1024) not null references "feeds" ("url") on update cascade on delete cascade,
"hub" varchar(1024) null,
"callback" varchar(1024) null,
"subscription_secret" varchar(32) not null,
"url_secret" varchar(32) not null,
"lease_seconds" int not null,
"created_at" timestamptz DEFAULT current_timestamp,
"updated_at" timestamptz,
"resubscribe_at" timestamptz
);

View File

@ -6,12 +6,8 @@ import (
"io" "io"
"log" "log"
"net/http" "net/http"
"net/url"
"strconv"
"strings"
"time" "time"
"github.com/pkg/errors"
"p83.nl/go/ekster/pkg/util" "p83.nl/go/ekster/pkg/util"
"p83.nl/go/ekster/pkg/websub" "p83.nl/go/ekster/pkg/websub"
@ -23,9 +19,8 @@ const LeaseSeconds = 24 * 60 * 60
// HubBackend handles information for the incoming handler // HubBackend handles information for the incoming handler
type HubBackend interface { type HubBackend interface {
GetFeeds() []Feed // Deprecated
Feeds() ([]Feed, error) Feeds() ([]Feed, error)
CreateFeed(url, channel string) (int64, error) CreateFeed(url string) (int64, error)
GetSecret(feedID int64) string GetSecret(feedID int64) string
UpdateFeed(feedID int64, contentType string, body io.Reader) error UpdateFeed(feedID int64, contentType string, body io.Reader) error
FeedSetLeaseSeconds(feedID int64, leaseSeconds int64) error FeedSetLeaseSeconds(feedID int64, leaseSeconds int64) error
@ -40,14 +35,13 @@ type hubIncomingBackend struct {
// Feed contains information about the feed subscriptions // Feed contains information about the feed subscriptions
type Feed struct { type Feed struct {
ID int64 `redis:"id"` ID int64
Channel string `redis:"channel"` URL string
URL string `redis:"url"` Callback string
Callback string `redis:"callback"` Hub string
Hub string `redis:"hub"` Secret string
Secret string `redis:"secret"` LeaseSeconds int64
LeaseSeconds int64 `redis:"lease_seconds"` ResubscribeAt time.Time
ResubscribeAt int64 `redis:"resubscribe_at"`
} }
var ( var (
@ -59,29 +53,34 @@ func init() {
} }
func (h *hubIncomingBackend) GetSecret(id int64) string { func (h *hubIncomingBackend) GetSecret(id int64) string {
conn := h.pool.Get() db := h.backend.database
defer conn.Close() var secret string
secret, err := redis.String(conn.Do("HGET", fmt.Sprintf("feed:%d", id), "secret")) err := db.QueryRow(
`select "subscription_secret" from "subscriptions" where "id" = $1`,
id,
).Scan(&secret)
if err != nil { if err != nil {
return "" return ""
} }
return secret return secret
} }
func (h *hubIncomingBackend) CreateFeed(topic string, channel string) (int64, error) { func (h *hubIncomingBackend) CreateFeed(topic string) (int64, error) {
conn := h.pool.Get() db := h.backend.database
defer conn.Close()
// TODO(peter): check if topic already is registered secret := util.RandStringBytes(32)
id, err := redis.Int64(conn.Do("INCR", "feed:next_id")) urlSecret := util.RandStringBytes(32)
var subscriptionID int
err := db.QueryRow(`
INSERT INTO "subscriptions" ("topic","subscription_secret", "url_secret", "lease_seconds", "created_at")
VALUES ($1, $2, $3, $4, DEFAULT) RETURNING "id"`, topic, secret, urlSecret, 60*60*24*7).Scan(&subscriptionID)
if err != nil { if err != nil {
return 0, err return 0, err
} }
if err != nil {
conn.Do("HSET", fmt.Sprintf("feed:%d", id), "url", topic) return 0, fmt.Errorf("insert into subscriptions: %w", err)
conn.Do("HSET", fmt.Sprintf("feed:%d", id), "channel", channel) }
secret := util.RandStringBytes(16)
conn.Do("HSET", fmt.Sprintf("feed:%d", id), "secret", secret)
client := &http.Client{} client := &http.Client{}
@ -91,136 +90,95 @@ func (h *hubIncomingBackend) CreateFeed(topic string, channel string) (int64, er
return 0, err return 0, err
} }
callbackURL := fmt.Sprintf("%s/incoming/%d", h.baseURL, id) callbackURL := fmt.Sprintf("%s/incoming/%d", h.baseURL, subscriptionID)
log.Printf("WebSub Hub URL found for topic=%q hub=%q callback=%q\n", topic, hubURL, callbackURL) log.Printf("WebSub Hub URL found for topic=%q hub=%q callback=%q\n", topic, hubURL, callbackURL)
if err == nil && hubURL != "" { if err == nil && hubURL != "" {
args := redis.Args{}.Add(fmt.Sprintf("feed:%d", id), "hub", hubURL, "callback", callbackURL) _, err := db.Exec(`UPDATE subscriptions SET hub = $1, callback = $2 WHERE id = $3`, subscriptionID, hubURL, callbackURL)
_, err = conn.Do("HMSET", args...)
if err != nil { if err != nil {
return 0, errors.Wrap(err, "could not write to redis backend") return 0, fmt.Errorf("save hub and callback: %w", err)
} }
} else { } else {
return id, nil return int64(subscriptionID), nil
} }
err = websub.Subscribe(client, hubURL, topic, callbackURL, secret, 24*3600) err = websub.Subscribe(client, hubURL, topic, callbackURL, secret, 24*3600)
if err != nil { if err != nil {
return 0, err return 0, fmt.Errorf("subscribe: %w", err)
} }
return id, nil return int64(subscriptionID), nil
} }
func (h *hubIncomingBackend) UpdateFeed(feedID int64, contentType string, body io.Reader) error { func (h *hubIncomingBackend) UpdateFeed(subscriptionID int64, contentType string, body io.Reader) error {
conn := h.pool.Get() db := h.backend.database
defer conn.Close() var (
log.Printf("updating feed %d", feedID) topic string
u, err := redis.String(conn.Do("HGET", fmt.Sprintf("feed:%d", feedID), "url")) channel string
if err != nil { feedID string
return err )
}
channel, err := redis.String(conn.Do("HGET", fmt.Sprintf("feed:%d", feedID), "channel")) // Process all channels that contains this feed
rows, err := db.Query(
`select topic, c.uid, f.id from subscriptions s inner join feeds f on f.url = s.topic inner join channels c on c.id = f.channel_id where s.id = $1`,
subscriptionID,
)
if err != nil { if err != nil {
return err return err
} }
// FIXME: feed id for incoming websub content for rows.Next() {
log.Printf("Updating feed %d - %s %s\n", feedID, u, channel) err = rows.Scan(&topic, channel, feedID)
err = h.backend.ProcessContent(channel, fmt.Sprintf("incoming:%d", feedID), u, contentType, body) if err != nil {
if err != nil { log.Println(err)
log.Printf("could not process content for channel %s: %s", channel, err) continue
}
log.Printf("Updating feed %s %q in %q\n", feedID, topic, channel)
err = h.backend.ProcessContent(channel, feedID, topic, contentType, body)
if err != nil {
log.Printf("could not process content for channel %s: %s", channel, err)
}
} }
return err return err
} }
func (h *hubIncomingBackend) FeedSetLeaseSeconds(feedID int64, leaseSeconds int64) error { func (h *hubIncomingBackend) FeedSetLeaseSeconds(subscriptionID int64, leaseSeconds int64) error {
conn := h.pool.Get() db := h.backend.database
defer conn.Close() _, err := db.Exec("update subscriptions set lease_seconds = $1, resubscribe_at = now() + $1 * interval '1' second where id = $2", leaseSeconds, subscriptionID)
log.Printf("updating feed %d lease_seconds", feedID) return err
args := redis.Args{}.Add(fmt.Sprintf("feed:%d", feedID), "lease_seconds", leaseSeconds, "resubscribe_at", time.Now().Add(time.Duration(60*(leaseSeconds-15))*time.Second).Unix())
_, err := conn.Do("HMSET", args...)
if err != nil {
log.Println(err)
return err
}
return nil
}
// GetFeeds is deprecated, use Feeds instead
func (h *hubIncomingBackend) GetFeeds() []Feed {
log.Println("GetFeeds called, consider replacing with Feeds")
feeds, err := h.Feeds()
if err != nil {
log.Printf("Feeds returned an error: %v", err)
}
return feeds
} }
// Feeds returns a list of subscribed feeds // Feeds returns a list of subscribed feeds
func (h *hubIncomingBackend) Feeds() ([]Feed, error) { func (h *hubIncomingBackend) Feeds() ([]Feed, error) {
conn := h.pool.Get() db := h.backend.database
defer conn.Close() var feeds []Feed
feeds := []Feed{}
// FIXME(peter): replace with set of currently checked feeds rows, err := db.Query(`
feedKeys, err := redis.Strings(conn.Do("KEYS", "feed:*")) select s.id, c.uid, topic, hub, callback, subscription_secret, lease_seconds, resubscribe_at
if err != nil { from subscriptions s
return nil, errors.Wrap(err, "could not get feeds from backend") inner join feeds f on f.url = s.topic
} inner join channels c on c.id = f.channel_id
`)
for _, feedKey := range feedKeys { for rows.Next() {
var feed Feed var feed Feed
values, err := redis.Values(conn.Do("HGETALL", feedKey))
err = rows.Scan(
&feed.ID,
&feed.URL,
&feed.Hub,
&feed.Callback,
&feed.Secret,
&feed.LeaseSeconds,
&feed.ResubscribeAt,
)
if err != nil { if err != nil {
log.Printf("could not get feed info for key %s: %v", feedKey, err) log.Println("Feeds: scan subscriptions:", err)
continue continue
} }
err = redis.ScanStruct(values, &feed)
if err != nil {
log.Printf("could not scan struct for key %s: %v", feedKey, err)
continue
}
// Add feed id
if feed.ID == 0 {
parts := strings.Split(feedKey, ":")
if len(parts) == 2 {
feed.ID, _ = strconv.ParseInt(parts[1], 10, 64)
_, err = conn.Do("HSET", feedKey, "id", feed.ID)
if err != nil {
log.Printf("could not save id for %s: %v", feedKey, err)
}
}
}
// Fix the callback url
callbackURL, err := url.Parse(feed.Callback)
if err != nil || !callbackURL.IsAbs() {
if err != nil {
log.Printf("could not parse callback url %q: %v", callbackURL, err)
} else {
log.Printf("url is relative; replace with absolute url: %q", callbackURL)
}
feed.Callback = fmt.Sprintf("%s/incoming/%d", h.baseURL, feed.ID)
_, err = conn.Do("HSET", feedKey, "callback", feed.Callback)
if err != nil {
log.Printf("could not save id for %s: %v", feedKey, err)
}
}
// Skip feeds without a Hub
if feed.Hub == "" {
continue
}
log.Printf("Websub feed: %#v\n", feed)
feeds = append(feeds, feed) feeds = append(feeds, feed)
} }
@ -245,11 +203,13 @@ func (h *hubIncomingBackend) run() error {
feeds, err := h.Feeds() feeds, err := h.Feeds()
if err != nil { if err != nil {
log.Println("Feeds failed:", err)
continue
} }
for _, feed := range feeds { for _, feed := range feeds {
log.Printf("Looking at %s\n", feed.URL) log.Printf("Looking at %s\n", feed.URL)
if feed.ResubscribeAt == 0 || time.Now().After(time.Unix(feed.ResubscribeAt, 0)) { if time.Now().After(feed.ResubscribeAt) {
if feed.Callback == "" { if feed.Callback == "" {
feed.Callback = fmt.Sprintf("%s/incoming/%d", h.baseURL, feed.ID) feed.Callback = fmt.Sprintf("%s/incoming/%d", h.baseURL, feed.ID)
} }

View File

@ -804,45 +804,3 @@ func (b *memoryBackend) getTimeline(channel string) timeline.Backend {
} }
return tl return tl
} }
func (b *memoryBackend) createChannel(name string) microsub.Channel {
uid := fmt.Sprintf("%012d", b.NextUID)
channel := microsub.Channel{
UID: uid,
Name: name,
Unread: microsub.Unread{Type: microsub.UnreadCount},
}
return channel
}
func (b *memoryBackend) fetchChannel(name string) (microsub.Channel, bool) {
b.lock.RLock()
defer b.lock.RUnlock()
for _, c := range b.Channels {
if c.Name == name {
return c, true
}
}
return microsub.Channel{}, false
}
func (b *memoryBackend) setChannel(channel microsub.Channel) {
b.lock.Lock()
defer b.lock.Unlock()
b.Channels[channel.UID] = channel
b.Feeds[channel.UID] = []microsub.Feed{}
b.NextUID++
}
func updateChannelInRedis(conn redis.Conn, uid string, prio int) {
conn.Do("SADD", "channels", uid)
conn.Do("SETNX", "channel_sortorder_"+uid, prio)
}
func removeChannelFromRedis(conn redis.Conn, uid string) {
conn.Do("SREM", "channels", uid)
conn.Do("DEL", "channel_sortorder_"+uid)
}