From 9c4a166b45320c41e4a9bed74c39e3974bd077d3 Mon Sep 17 00:00:00 2001 From: Peter Stuifzand Date: Tue, 11 May 2021 22:23:45 +0200 Subject: [PATCH] Extract database, add refreshFeeds --- cmd/eksterd/http.go | 4 +++ cmd/eksterd/main.go | 16 ++++++--- cmd/eksterd/memory.go | 70 ++++++++++++++++++++++------------------ pkg/timeline/postgres.go | 26 +++++++++------ pkg/timeline/timeline.go | 5 +-- 5 files changed, 73 insertions(+), 48 deletions(-) diff --git a/cmd/eksterd/http.go b/cmd/eksterd/http.go index 2e8cce5..a7c0c27 100644 --- a/cmd/eksterd/http.go +++ b/cmd/eksterd/http.go @@ -683,6 +683,10 @@ func (h *mainHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Redirect(w, r, "/settings", 302) return + } else if r.URL.Path == "/refresh" { + h.Backend.RefreshFeeds() + http.Redirect(w, r, "/", 302) + return } } diff --git a/cmd/eksterd/main.go b/cmd/eksterd/main.go index 40ae779..679ebd3 100644 --- a/cmd/eksterd/main.go +++ b/cmd/eksterd/main.go @@ -16,6 +16,7 @@ Eksterd is a microsub server that is extendable. package main import ( + "database/sql" "flag" "fmt" "log" @@ -39,6 +40,7 @@ type AppOptions struct { BaseURL string TemplateDir string pool *redis.Pool + database *sql.DB } func init() { @@ -115,7 +117,7 @@ func NewApp(options AppOptions) (*App, error) { options: options, } - backend, err := loadMemoryBackend(options.pool) + backend, err := loadMemoryBackend(options.pool, options.database) if err != nil { return nil, err } @@ -217,14 +219,18 @@ func main() { pool := newPool(options.RedisServer) options.pool = pool + db, err := sql.Open("postgres", "host=database user=postgres password=simple dbname=ekster sslmode=disable") + if err != nil { + log.Fatalf("database open failed: %w", err) + } + options.database = db app, err := NewApp(options) if err != nil { log.Fatal(err) } - err = app.Run() - if err != nil { - log.Fatal(err) - } + log.Fatal(app.Run()) + + db.Close() } diff --git a/cmd/eksterd/memory.go b/cmd/eksterd/memory.go index 12c9caa..1868e80 100644 --- a/cmd/eksterd/memory.go +++ b/cmd/eksterd/memory.go @@ -3,6 +3,7 @@ package main import ( "bufio" "bytes" + "database/sql" "encoding/json" "fmt" "io" @@ -49,6 +50,8 @@ type memoryBackend struct { broker *sse.Broker pool *redis.Pool + + database *sql.DB } type channelSetting struct { @@ -150,8 +153,8 @@ func (b *memoryBackend) save() error { return jw.Encode(b) } -func loadMemoryBackend(pool *redis.Pool) (*memoryBackend, error) { - backend := &memoryBackend{pool: pool} +func loadMemoryBackend(pool *redis.Pool, database *sql.DB) (*memoryBackend, error) { + backend := &memoryBackend{pool: pool, database: database} err := backend.load() if err != nil { return nil, errors.Wrap(err, "while loading backend") @@ -307,34 +310,7 @@ func (b *memoryBackend) run() { for { select { case <-b.ticker.C: - feeds := b.getFeeds() - - count := 0 - - for uid := range feeds { - for _, feedURL := range feeds[uid] { - resp, err := b.Fetch3(uid, feedURL) - if err != nil { - _ = b.channelAddItem("notifications", microsub.Item{ - Type: "entry", - Name: "Error while fetching feed", - Content: µsub.Content{ - Text: fmt.Sprintf("Error while updating feed %s: %v", feedURL, err), - }, - UID: time.Now().String(), - }) - count++ - log.Printf("Error while Fetch3 of %s: %v\n", feedURL, err) - continue - } - _ = b.ProcessContent(uid, feedURL, resp.Header.Get("Content-Type"), resp.Body) - _ = resp.Body.Close() - } - } - - if count > 0 { - _ = b.updateChannelUnreadCount("notifications") - } + b.RefreshFeeds() case <-b.quit: b.ticker.Stop() @@ -344,6 +320,38 @@ func (b *memoryBackend) run() { }() } +func (b *memoryBackend) RefreshFeeds() { + feeds := b.getFeeds() + + count := 0 + + for uid := range feeds { + for _, feedURL := range feeds[uid] { + log.Println(feedURL) + resp, err := b.Fetch3(uid, feedURL) + if err != nil { + _ = b.channelAddItem("notifications", microsub.Item{ + Type: "entry", + Name: "Error while fetching feed", + Content: µsub.Content{ + Text: fmt.Sprintf("Error while updating feed %s: %v", feedURL, err), + }, + UID: time.Now().String(), + }) + count++ + log.Printf("Error while Fetch3 of %s: %v\n", feedURL, err) + continue + } + _ = b.ProcessContent(uid, feedURL, resp.Header.Get("Content-Type"), resp.Body) + _ = resp.Body.Close() + } + } + + if count > 0 { + _ = b.updateChannelUnreadCount("notifications") + } +} + func (b *memoryBackend) TimelineGet(before, after, channel string) (microsub.Timeline, error) { log.Printf("TimelineGet %s\n", channel) @@ -826,7 +834,7 @@ func (b *memoryBackend) getTimeline(channel string) timeline.Backend { } } - return timeline.Create(channel, timelineType, b.pool) + return timeline.Create(channel, timelineType, b.pool, b.database) } func (b *memoryBackend) createChannel(name string) microsub.Channel { diff --git a/pkg/timeline/postgres.go b/pkg/timeline/postgres.go index 0336a94..63659ac 100644 --- a/pkg/timeline/postgres.go +++ b/pkg/timeline/postgres.go @@ -19,13 +19,8 @@ type postgresStream struct { // Init func (p *postgresStream) Init() error { - db, err := sql.Open("postgres", "host=database user=postgres password=simple dbname=ekster sslmode=disable") - if err != nil { - return fmt.Errorf("database open failed: %w", err) - } - p.database = db - - err = db.Ping() + db := p.database + err := db.Ping() if err != nil { return fmt.Errorf("database ping failed: %w", err) } @@ -80,7 +75,14 @@ CREATE TABLE IF NOT EXISTS "items" ( // Items func (p *postgresStream) Items(before, after string) (microsub.Timeline, error) { - rows, err := p.database.Query(`SELECT "id", "uid", "data", "created_at", "is_read" FROM "items" WHERE "channel_id" = $1 ORDER BY "published_at"`, p.channelID) + query := ` +SELECT "id", "uid", "data", "created_at", "is_read" +FROM "items" +WHERE "channel_id" = $1 +ORDER BY "published_at" +` + + rows, err := p.database.Query(query, p.channelID) if err != nil { return microsub.Timeline{}, fmt.Errorf("while query: %w", err) } @@ -138,9 +140,13 @@ func (p *postgresStream) Count() (int, error) { // AddItem func (p *postgresStream) AddItem(item microsub.Item) (bool, error) { - t, err := time.Parse(time.RFC3339, item.Published) + t, err := time.Parse("2006-01-02T15:04:05Z0700", item.Published) if err != nil { - return false, fmt.Errorf("while adding item: time %q could not be parsed: %w", item.Published, err) + t2, err := time.Parse("2006-01-02T15:04:05Z07:00", item.Published) + if err != nil { + return false, fmt.Errorf("while adding item: time %q could not be parsed: %w", item.Published, err) + } + t = t2 } _, err = p.database.Exec(` diff --git a/pkg/timeline/timeline.go b/pkg/timeline/timeline.go index 54aea46..261a1f2 100644 --- a/pkg/timeline/timeline.go +++ b/pkg/timeline/timeline.go @@ -6,6 +6,7 @@ package timeline import ( + "database/sql" "encoding/json" "log" @@ -29,7 +30,7 @@ type Backend interface { // Create creates a channel of the specified type. Return nil when the type // is not known. -func Create(channel, timelineType string, pool *redis.Pool) Backend { +func Create(channel, timelineType string, pool *redis.Pool, db *sql.DB) Backend { if timelineType == "sorted-set" { timeline := &redisSortedSetTimeline{channel: channel, pool: pool} err := timeline.Init() @@ -58,7 +59,7 @@ func Create(channel, timelineType string, pool *redis.Pool) Backend { } if timelineType == "postgres-stream" { - timeline := &postgresStream{channel: channel} + timeline := &postgresStream{database: db, channel: channel} err := timeline.Init() if err != nil { log.Printf("Error while creating %s: %v", channel, err)