Extract database, add refreshFeeds
Some checks failed
continuous-integration/drone/push Build is failing

This commit is contained in:
Peter Stuifzand 2021-05-11 22:23:45 +02:00
parent afbd987ca3
commit 9c4a166b45
Signed by: peter
GPG Key ID: 374322D56E5209E8
5 changed files with 73 additions and 48 deletions

View File

@ -683,6 +683,10 @@ func (h *mainHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Redirect(w, r, "/settings", 302) http.Redirect(w, r, "/settings", 302)
return return
} else if r.URL.Path == "/refresh" {
h.Backend.RefreshFeeds()
http.Redirect(w, r, "/", 302)
return
} }
} }

View File

@ -16,6 +16,7 @@ Eksterd is a microsub server that is extendable.
package main package main
import ( import (
"database/sql"
"flag" "flag"
"fmt" "fmt"
"log" "log"
@ -39,6 +40,7 @@ type AppOptions struct {
BaseURL string BaseURL string
TemplateDir string TemplateDir string
pool *redis.Pool pool *redis.Pool
database *sql.DB
} }
func init() { func init() {
@ -115,7 +117,7 @@ func NewApp(options AppOptions) (*App, error) {
options: options, options: options,
} }
backend, err := loadMemoryBackend(options.pool) backend, err := loadMemoryBackend(options.pool, options.database)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -217,14 +219,18 @@ func main() {
pool := newPool(options.RedisServer) pool := newPool(options.RedisServer)
options.pool = pool options.pool = pool
db, err := sql.Open("postgres", "host=database user=postgres password=simple dbname=ekster sslmode=disable")
if err != nil {
log.Fatalf("database open failed: %w", err)
}
options.database = db
app, err := NewApp(options) app, err := NewApp(options)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
err = app.Run() log.Fatal(app.Run())
if err != nil {
log.Fatal(err) db.Close()
}
} }

View File

@ -3,6 +3,7 @@ package main
import ( import (
"bufio" "bufio"
"bytes" "bytes"
"database/sql"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
@ -49,6 +50,8 @@ type memoryBackend struct {
broker *sse.Broker broker *sse.Broker
pool *redis.Pool pool *redis.Pool
database *sql.DB
} }
type channelSetting struct { type channelSetting struct {
@ -150,8 +153,8 @@ func (b *memoryBackend) save() error {
return jw.Encode(b) return jw.Encode(b)
} }
func loadMemoryBackend(pool *redis.Pool) (*memoryBackend, error) { func loadMemoryBackend(pool *redis.Pool, database *sql.DB) (*memoryBackend, error) {
backend := &memoryBackend{pool: pool} backend := &memoryBackend{pool: pool, database: database}
err := backend.load() err := backend.load()
if err != nil { if err != nil {
return nil, errors.Wrap(err, "while loading backend") return nil, errors.Wrap(err, "while loading backend")
@ -307,34 +310,7 @@ func (b *memoryBackend) run() {
for { for {
select { select {
case <-b.ticker.C: case <-b.ticker.C:
feeds := b.getFeeds() b.RefreshFeeds()
count := 0
for uid := range feeds {
for _, feedURL := range feeds[uid] {
resp, err := b.Fetch3(uid, feedURL)
if err != nil {
_ = b.channelAddItem("notifications", microsub.Item{
Type: "entry",
Name: "Error while fetching feed",
Content: &microsub.Content{
Text: fmt.Sprintf("Error while updating feed %s: %v", feedURL, err),
},
UID: time.Now().String(),
})
count++
log.Printf("Error while Fetch3 of %s: %v\n", feedURL, err)
continue
}
_ = b.ProcessContent(uid, feedURL, resp.Header.Get("Content-Type"), resp.Body)
_ = resp.Body.Close()
}
}
if count > 0 {
_ = b.updateChannelUnreadCount("notifications")
}
case <-b.quit: case <-b.quit:
b.ticker.Stop() b.ticker.Stop()
@ -344,6 +320,38 @@ func (b *memoryBackend) run() {
}() }()
} }
func (b *memoryBackend) RefreshFeeds() {
feeds := b.getFeeds()
count := 0
for uid := range feeds {
for _, feedURL := range feeds[uid] {
log.Println(feedURL)
resp, err := b.Fetch3(uid, feedURL)
if err != nil {
_ = b.channelAddItem("notifications", microsub.Item{
Type: "entry",
Name: "Error while fetching feed",
Content: &microsub.Content{
Text: fmt.Sprintf("Error while updating feed %s: %v", feedURL, err),
},
UID: time.Now().String(),
})
count++
log.Printf("Error while Fetch3 of %s: %v\n", feedURL, err)
continue
}
_ = b.ProcessContent(uid, feedURL, resp.Header.Get("Content-Type"), resp.Body)
_ = resp.Body.Close()
}
}
if count > 0 {
_ = b.updateChannelUnreadCount("notifications")
}
}
func (b *memoryBackend) TimelineGet(before, after, channel string) (microsub.Timeline, error) { func (b *memoryBackend) TimelineGet(before, after, channel string) (microsub.Timeline, error) {
log.Printf("TimelineGet %s\n", channel) log.Printf("TimelineGet %s\n", channel)
@ -826,7 +834,7 @@ func (b *memoryBackend) getTimeline(channel string) timeline.Backend {
} }
} }
return timeline.Create(channel, timelineType, b.pool) return timeline.Create(channel, timelineType, b.pool, b.database)
} }
func (b *memoryBackend) createChannel(name string) microsub.Channel { func (b *memoryBackend) createChannel(name string) microsub.Channel {

View File

@ -19,13 +19,8 @@ type postgresStream struct {
// Init // Init
func (p *postgresStream) Init() error { func (p *postgresStream) Init() error {
db, err := sql.Open("postgres", "host=database user=postgres password=simple dbname=ekster sslmode=disable") db := p.database
if err != nil { err := db.Ping()
return fmt.Errorf("database open failed: %w", err)
}
p.database = db
err = db.Ping()
if err != nil { if err != nil {
return fmt.Errorf("database ping failed: %w", err) return fmt.Errorf("database ping failed: %w", err)
} }
@ -80,7 +75,14 @@ CREATE TABLE IF NOT EXISTS "items" (
// Items // Items
func (p *postgresStream) Items(before, after string) (microsub.Timeline, error) { func (p *postgresStream) Items(before, after string) (microsub.Timeline, error) {
rows, err := p.database.Query(`SELECT "id", "uid", "data", "created_at", "is_read" FROM "items" WHERE "channel_id" = $1 ORDER BY "published_at"`, p.channelID) query := `
SELECT "id", "uid", "data", "created_at", "is_read"
FROM "items"
WHERE "channel_id" = $1
ORDER BY "published_at"
`
rows, err := p.database.Query(query, p.channelID)
if err != nil { if err != nil {
return microsub.Timeline{}, fmt.Errorf("while query: %w", err) return microsub.Timeline{}, fmt.Errorf("while query: %w", err)
} }
@ -138,9 +140,13 @@ func (p *postgresStream) Count() (int, error) {
// AddItem // AddItem
func (p *postgresStream) AddItem(item microsub.Item) (bool, error) { func (p *postgresStream) AddItem(item microsub.Item) (bool, error) {
t, err := time.Parse(time.RFC3339, item.Published) t, err := time.Parse("2006-01-02T15:04:05Z0700", item.Published)
if err != nil { if err != nil {
return false, fmt.Errorf("while adding item: time %q could not be parsed: %w", item.Published, err) t2, err := time.Parse("2006-01-02T15:04:05Z07:00", item.Published)
if err != nil {
return false, fmt.Errorf("while adding item: time %q could not be parsed: %w", item.Published, err)
}
t = t2
} }
_, err = p.database.Exec(` _, err = p.database.Exec(`

View File

@ -6,6 +6,7 @@
package timeline package timeline
import ( import (
"database/sql"
"encoding/json" "encoding/json"
"log" "log"
@ -29,7 +30,7 @@ type Backend interface {
// Create creates a channel of the specified type. Return nil when the type // Create creates a channel of the specified type. Return nil when the type
// is not known. // is not known.
func Create(channel, timelineType string, pool *redis.Pool) Backend { func Create(channel, timelineType string, pool *redis.Pool, db *sql.DB) Backend {
if timelineType == "sorted-set" { if timelineType == "sorted-set" {
timeline := &redisSortedSetTimeline{channel: channel, pool: pool} timeline := &redisSortedSetTimeline{channel: channel, pool: pool}
err := timeline.Init() err := timeline.Init()
@ -58,7 +59,7 @@ func Create(channel, timelineType string, pool *redis.Pool) Backend {
} }
if timelineType == "postgres-stream" { if timelineType == "postgres-stream" {
timeline := &postgresStream{channel: channel} timeline := &postgresStream{database: db, channel: channel}
err := timeline.Init() err := timeline.Init()
if err != nil { if err != nil {
log.Printf("Error while creating %s: %v", channel, err) log.Printf("Error while creating %s: %v", channel, err)