Compare commits
No commits in common. "ccaff6fe19483771e7f9881eebb74b2e85826b8e" and "44ec376bda29013cc181065adb42a0c41fe23f43" have entirely different histories.
ccaff6fe19
...
44ec376bda
24
.drone.yml
24
.drone.yml
|
|
@ -31,18 +31,18 @@ steps:
|
|||
password:
|
||||
from_secret: docker_password
|
||||
|
||||
# - name: publish-docker
|
||||
# image: plugins/docker
|
||||
# depends_on:
|
||||
# - testing
|
||||
# settings:
|
||||
# repo: pstuifzand/ekster
|
||||
# tags:
|
||||
# - alpine
|
||||
# username:
|
||||
# from_secret: docker_official_username
|
||||
# password:
|
||||
# from_secret: docker_official_password
|
||||
- name: publish-docker
|
||||
image: plugins/docker
|
||||
depends_on:
|
||||
- testing
|
||||
settings:
|
||||
repo: pstuifzand/ekster
|
||||
tags:
|
||||
- alpine
|
||||
username:
|
||||
from_secret: docker_official_username
|
||||
password:
|
||||
from_secret: docker_official_password
|
||||
|
||||
- name: deploy
|
||||
image: appleboy/drone-ssh
|
||||
|
|
|
|||
|
|
@ -1,77 +0,0 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"p83.nl/go/ekster/pkg/server"
|
||||
)
|
||||
|
||||
// App is the main app structure
|
||||
type App struct {
|
||||
options AppOptions
|
||||
backend *memoryBackend
|
||||
hubBackend *hubIncomingBackend
|
||||
}
|
||||
|
||||
// Run runs the app
|
||||
func (app *App) Run() error {
|
||||
err := initSearch()
|
||||
if err != nil {
|
||||
return fmt.Errorf("while starting app: %v", err)
|
||||
}
|
||||
app.backend.run()
|
||||
app.hubBackend.run()
|
||||
|
||||
log.Printf("Listening on port %d\n", app.options.Port)
|
||||
return http.ListenAndServe(fmt.Sprintf(":%d", app.options.Port), nil)
|
||||
}
|
||||
|
||||
// NewApp initializes the App
|
||||
func NewApp(options AppOptions) (*App, error) {
|
||||
app := &App{
|
||||
options: options,
|
||||
}
|
||||
|
||||
backend, err := loadMemoryBackend(options.pool, options.database)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
app.backend = backend
|
||||
app.backend.AuthEnabled = options.AuthEnabled
|
||||
app.backend.baseURL = options.BaseURL
|
||||
app.backend.hubIncomingBackend.pool = options.pool
|
||||
app.backend.hubIncomingBackend.baseURL = options.BaseURL
|
||||
|
||||
app.hubBackend = &hubIncomingBackend{backend: app.backend, baseURL: options.BaseURL, pool: options.pool}
|
||||
|
||||
http.Handle("/micropub", µpubHandler{
|
||||
Backend: app.backend,
|
||||
pool: options.pool,
|
||||
})
|
||||
|
||||
handler, broker := server.NewMicrosubHandler(app.backend)
|
||||
if options.AuthEnabled {
|
||||
handler = WithAuth(handler, app.backend)
|
||||
}
|
||||
|
||||
app.backend.broker = broker
|
||||
|
||||
http.Handle("/microsub", handler)
|
||||
|
||||
http.Handle("/incoming/", &incomingHandler{
|
||||
Backend: app.hubBackend,
|
||||
})
|
||||
|
||||
if !options.Headless {
|
||||
handler, err := newMainHandler(app.backend, options.BaseURL, options.TemplateDir, options.pool)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not create main handler")
|
||||
}
|
||||
http.Handle("/", handler)
|
||||
}
|
||||
|
||||
return app, nil
|
||||
}
|
||||
|
|
@ -1 +0,0 @@
|
|||
DROP TABLE "channels";
|
||||
|
|
@ -1,7 +0,0 @@
|
|||
CREATE TABLE IF NOT EXISTS "channels" (
|
||||
"id" int primary key generated always as identity,
|
||||
"uid" varchar(255) unique,
|
||||
"name" varchar(255) unique,
|
||||
"created_at" timestamptz DEFAULT current_timestamp,
|
||||
"updated_at" timestamptz
|
||||
);
|
||||
|
|
@ -1 +0,0 @@
|
|||
DROP TABLE "feeds";
|
||||
|
|
@ -1,7 +0,0 @@
|
|||
CREATE TABLE "feeds" (
|
||||
"id" int primary key generated always as identity,
|
||||
"channel_id" int references "channels" on update cascade on delete cascade,
|
||||
"url" varchar(512) not null unique,
|
||||
"created_at" timestamptz DEFAULT current_timestamp,
|
||||
"updated_at" timestamptz
|
||||
);
|
||||
|
|
@ -1 +0,0 @@
|
|||
DELETE FROM "channels" WHERE "uid" IN ('home', 'notifications');
|
||||
|
|
@ -1 +0,0 @@
|
|||
INSERT INTO "channels" ("uid", "name") VALUES ('home', 'Home'), ('notifications', 'Notifications');
|
||||
|
|
@ -1 +0,0 @@
|
|||
DROP TABLE "items";
|
||||
|
|
@ -1,10 +0,0 @@
|
|||
CREATE TABLE IF NOT EXISTS "items" (
|
||||
"id" int primary key generated always as identity,
|
||||
"channel_id" int references "channels" on delete cascade,
|
||||
"uid" varchar(512) not null unique,
|
||||
"is_read" int default 0,
|
||||
"data" jsonb,
|
||||
"created_at" timestamptz DEFAULT current_timestamp,
|
||||
"updated_at" timestamptz,
|
||||
"published_at" timestamptz
|
||||
);
|
||||
|
|
@ -1 +0,0 @@
|
|||
ALTER TABLE "items" DROP COLUMN "feed_id";
|
||||
|
|
@ -1 +0,0 @@
|
|||
ALTER TABLE "items" ADD COLUMN "feed_id" INT REFERENCES "feeds" ON DELETE CASCADE;
|
||||
|
|
@ -659,28 +659,30 @@ func (h *mainHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
return
|
||||
} else if r.URL.Path == "/settings/channel" {
|
||||
// defer h.Backend.save()
|
||||
// uid := r.FormValue("uid")
|
||||
//
|
||||
// if h.Backend.Settings == nil {
|
||||
// h.Backend.Settings = make(map[string]channelSetting)
|
||||
// }
|
||||
//
|
||||
// excludeRegex := r.FormValue("exclude_regex")
|
||||
// includeRegex := r.FormValue("include_regex")
|
||||
// channelType := r.FormValue("type")
|
||||
//
|
||||
// setting, e := h.Backend.Settings[uid]
|
||||
// if !e {
|
||||
// setting = channelSetting{}
|
||||
// }
|
||||
// setting.ExcludeRegex = excludeRegex
|
||||
// setting.IncludeRegex = includeRegex
|
||||
// setting.ChannelType = channelType
|
||||
// if values, e := r.Form["exclude_type"]; e {
|
||||
// setting.ExcludeType = values
|
||||
// }
|
||||
// h.Backend.Settings[uid] = setting
|
||||
defer h.Backend.save()
|
||||
uid := r.FormValue("uid")
|
||||
|
||||
if h.Backend.Settings == nil {
|
||||
h.Backend.Settings = make(map[string]channelSetting)
|
||||
}
|
||||
|
||||
excludeRegex := r.FormValue("exclude_regex")
|
||||
includeRegex := r.FormValue("include_regex")
|
||||
channelType := r.FormValue("type")
|
||||
|
||||
setting, e := h.Backend.Settings[uid]
|
||||
if !e {
|
||||
setting = channelSetting{}
|
||||
}
|
||||
setting.ExcludeRegex = excludeRegex
|
||||
setting.IncludeRegex = includeRegex
|
||||
setting.ChannelType = channelType
|
||||
if values, e := r.Form["exclude_type"]; e {
|
||||
setting.ExcludeType = values
|
||||
}
|
||||
h.Backend.Settings[uid] = setting
|
||||
|
||||
h.Backend.Debug()
|
||||
|
||||
http.Redirect(w, r, "/settings", 302)
|
||||
return
|
||||
|
|
|
|||
|
|
@ -126,9 +126,8 @@ func (h *hubIncomingBackend) UpdateFeed(feedID int64, contentType string, body i
|
|||
return err
|
||||
}
|
||||
|
||||
// FIXME: feed id for incoming websub content
|
||||
log.Printf("Updating feed %d - %s %s\n", feedID, u, channel)
|
||||
err = h.backend.ProcessContent(channel, fmt.Sprintf("incoming:%d", feedID), u, contentType, body)
|
||||
err = h.backend.ProcessContent(channel, u, contentType, body)
|
||||
if err != nil {
|
||||
log.Printf("could not process content for channel %s: %s", channel, err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,20 +17,18 @@ package main
|
|||
|
||||
import (
|
||||
"database/sql"
|
||||
"embed"
|
||||
_ "expvar"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/gomodule/redigo/redis"
|
||||
"github.com/pkg/errors"
|
||||
"p83.nl/go/ekster/pkg/auth"
|
||||
|
||||
"github.com/golang-migrate/migrate/v4"
|
||||
_ "github.com/golang-migrate/migrate/v4/database/postgres"
|
||||
"github.com/golang-migrate/migrate/v4/source/iofs"
|
||||
"github.com/gomodule/redigo/redis"
|
||||
"p83.nl/go/ekster/pkg/server"
|
||||
)
|
||||
|
||||
// AppOptions are options for the app
|
||||
|
|
@ -45,9 +43,6 @@ type AppOptions struct {
|
|||
database *sql.DB
|
||||
}
|
||||
|
||||
//go:embed db/migrations/*.sql
|
||||
var migrations embed.FS
|
||||
|
||||
func init() {
|
||||
log.SetFlags(log.Lshortfile | log.Ldate | log.Ltime)
|
||||
}
|
||||
|
|
@ -100,6 +95,73 @@ func WithAuth(handler http.Handler, b *memoryBackend) http.Handler {
|
|||
})
|
||||
}
|
||||
|
||||
// App is the main app structure
|
||||
type App struct {
|
||||
options AppOptions
|
||||
backend *memoryBackend
|
||||
hubBackend *hubIncomingBackend
|
||||
}
|
||||
|
||||
// Run runs the app
|
||||
func (app *App) Run() error {
|
||||
err := initSearch()
|
||||
if err != nil {
|
||||
return fmt.Errorf("while starting app: %v", err)
|
||||
}
|
||||
app.backend.run()
|
||||
app.hubBackend.run()
|
||||
|
||||
log.Printf("Listening on port %d\n", app.options.Port)
|
||||
return http.ListenAndServe(fmt.Sprintf(":%d", app.options.Port), nil)
|
||||
}
|
||||
|
||||
// NewApp initializes the App
|
||||
func NewApp(options AppOptions) (*App, error) {
|
||||
app := &App{
|
||||
options: options,
|
||||
}
|
||||
|
||||
backend, err := loadMemoryBackend(options.pool, options.database)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
app.backend = backend
|
||||
app.backend.AuthEnabled = options.AuthEnabled
|
||||
app.backend.baseURL = options.BaseURL
|
||||
app.backend.hubIncomingBackend.pool = options.pool
|
||||
app.backend.hubIncomingBackend.baseURL = options.BaseURL
|
||||
|
||||
app.hubBackend = &hubIncomingBackend{backend: app.backend, baseURL: options.BaseURL, pool: options.pool}
|
||||
|
||||
http.Handle("/micropub", µpubHandler{
|
||||
Backend: app.backend,
|
||||
pool: options.pool,
|
||||
})
|
||||
|
||||
handler, broker := server.NewMicrosubHandler(app.backend)
|
||||
if options.AuthEnabled {
|
||||
handler = WithAuth(handler, app.backend)
|
||||
}
|
||||
|
||||
app.backend.broker = broker
|
||||
|
||||
http.Handle("/microsub", handler)
|
||||
|
||||
http.Handle("/incoming/", &incomingHandler{
|
||||
Backend: app.hubBackend,
|
||||
})
|
||||
|
||||
if !options.Headless {
|
||||
handler, err := newMainHandler(app.backend, options.BaseURL, options.TemplateDir, options.pool)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not create main handler")
|
||||
}
|
||||
http.Handle("/", handler)
|
||||
}
|
||||
|
||||
return app, nil
|
||||
}
|
||||
|
||||
func main() {
|
||||
log.Println("eksterd - microsub server")
|
||||
|
||||
|
|
@ -135,36 +197,28 @@ func main() {
|
|||
log.Fatal("EKSTER_TEMPLATES environment variable not found, use env var or -templates dir option")
|
||||
}
|
||||
}
|
||||
//
|
||||
// createBackend := false
|
||||
// args := flag.Args()
|
||||
//
|
||||
// if len(args) >= 1 {
|
||||
// if args[0] == "new" {
|
||||
// createBackend = true
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// if createBackend {
|
||||
// err := createMemoryBackend()
|
||||
// if err != nil {
|
||||
// log.Fatalf("Error while saving backend.json: %s", err)
|
||||
// }
|
||||
//
|
||||
// TODO(peter): automatically gather this information from login or otherwise
|
||||
//
|
||||
// log.Println(`Config file "backend.json" is created in the current directory.`)
|
||||
// log.Println(`Update "Me" variable to your website address "https://example.com/"`)
|
||||
// log.Println(`Update "TokenEndpoint" variable to the address of your token endpoint "https://example.com/token"`)
|
||||
//
|
||||
// return
|
||||
// }
|
||||
|
||||
// TODO(peter): automatically gather this information from login or otherwise
|
||||
createBackend := false
|
||||
args := flag.Args()
|
||||
|
||||
err := runMigrations()
|
||||
if err != nil {
|
||||
log.Fatalf("Error with migrations: %s", err)
|
||||
if len(args) >= 1 {
|
||||
if args[0] == "new" {
|
||||
createBackend = true
|
||||
}
|
||||
}
|
||||
|
||||
if createBackend {
|
||||
err := createMemoryBackend()
|
||||
if err != nil {
|
||||
log.Fatalf("Error while saving backend.json: %s", err)
|
||||
}
|
||||
|
||||
// TODO(peter): automatically gather this information from login or otherwise
|
||||
log.Println(`Config file "backend.json" is created in the current directory.`)
|
||||
log.Println(`Update "Me" variable to your website address "https://example.com/"`)
|
||||
log.Println(`Update "TokenEndpoint" variable to the address of your token endpoint "https://example.com/token"`)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
pool := newPool(options.RedisServer)
|
||||
|
|
@ -184,38 +238,3 @@ func main() {
|
|||
|
||||
db.Close()
|
||||
}
|
||||
|
||||
// Log migrations
|
||||
type Log struct {
|
||||
}
|
||||
|
||||
// Printf for migrations logs
|
||||
func (l Log) Printf(format string, v ...interface{}) {
|
||||
log.Printf(format, v...)
|
||||
}
|
||||
|
||||
// Verbose returns false
|
||||
func (l Log) Verbose() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func runMigrations() error {
|
||||
d, err := iofs.New(migrations, "db/migrations")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m, err := migrate.NewWithSourceInstance("iofs", d, "postgres://postgres@database/ekster?sslmode=disable&user=postgres&password=simple")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer m.Close()
|
||||
m.Log = &Log{}
|
||||
log.Println("Running migrations")
|
||||
if err = m.Up(); err != nil {
|
||||
if err != migrate.ErrNoChange {
|
||||
return err
|
||||
}
|
||||
}
|
||||
log.Println("Migrations are up")
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,17 +4,20 @@ import (
|
|||
"bufio"
|
||||
"bytes"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"p83.nl/go/ekster/pkg/auth"
|
||||
"p83.nl/go/ekster/pkg/fetch"
|
||||
"p83.nl/go/ekster/pkg/microsub"
|
||||
|
|
@ -74,6 +77,11 @@ type newItemMessage struct {
|
|||
Channel string `json:"channel"`
|
||||
}
|
||||
|
||||
// Debug interface for easy of use in other packages
|
||||
type Debug interface {
|
||||
Debug()
|
||||
}
|
||||
|
||||
type fetch2 struct{}
|
||||
|
||||
func (f *fetch2) Fetch(url string) (*http.Response, error) {
|
||||
|
|
@ -91,117 +99,223 @@ func (b *memoryBackend) AuthTokenAccepted(header string, r *auth.TokenResponse)
|
|||
return cachedCheckAuthToken(conn, header, b.TokenEndpoint, r)
|
||||
}
|
||||
|
||||
func (b *memoryBackend) Debug() {
|
||||
b.lock.RLock()
|
||||
defer b.lock.RUnlock()
|
||||
fmt.Println(b.Channels)
|
||||
fmt.Println(b.Feeds)
|
||||
fmt.Println(b.Settings)
|
||||
}
|
||||
|
||||
func (b *memoryBackend) load() error {
|
||||
filename := "backend.json"
|
||||
f, err := os.Open(filename)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
jw := json.NewDecoder(f)
|
||||
err = jw.Decode(b)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *memoryBackend) refreshChannels() {
|
||||
conn := b.pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
conn.Do("DEL", "channels")
|
||||
|
||||
updateChannelInRedis(conn, "notifications", 1)
|
||||
|
||||
b.lock.RLock()
|
||||
for uid, channel := range b.Channels {
|
||||
log.Printf("loading channel %s - %s\n", uid, channel.Name)
|
||||
updateChannelInRedis(conn, channel.UID, DefaultPrio)
|
||||
}
|
||||
|
||||
b.lock.RUnlock()
|
||||
}
|
||||
|
||||
func (b *memoryBackend) save() error {
|
||||
filename := "backend.json"
|
||||
f, err := os.Create(filename)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
jw := json.NewEncoder(f)
|
||||
jw.SetIndent("", " ")
|
||||
b.lock.RLock()
|
||||
defer b.lock.RUnlock()
|
||||
return jw.Encode(b)
|
||||
}
|
||||
|
||||
func loadMemoryBackend(pool *redis.Pool, database *sql.DB) (*memoryBackend, error) {
|
||||
backend := &memoryBackend{pool: pool, database: database}
|
||||
err := backend.load()
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "while loading backend")
|
||||
}
|
||||
backend.refreshChannels()
|
||||
return backend, nil
|
||||
}
|
||||
|
||||
func createMemoryBackend() error {
|
||||
backend := memoryBackend{}
|
||||
backend.lock.Lock()
|
||||
|
||||
backend.Feeds = make(map[string][]microsub.Feed)
|
||||
channels := []microsub.Channel{
|
||||
{UID: "notifications", Name: "Notifications"},
|
||||
{UID: "home", Name: "Home"},
|
||||
}
|
||||
|
||||
backend.Channels = make(map[string]microsub.Channel)
|
||||
for _, c := range channels {
|
||||
backend.Channels[c.UID] = c
|
||||
}
|
||||
|
||||
backend.NextUID = 1000000
|
||||
// FIXME: can't be used in Backend
|
||||
backend.Me = "https://example.com/"
|
||||
|
||||
backend.lock.Unlock()
|
||||
|
||||
return backend.save()
|
||||
}
|
||||
|
||||
// ChannelsGetList gets channels
|
||||
func (b *memoryBackend) ChannelsGetList() ([]microsub.Channel, error) {
|
||||
conn := b.pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
var channels []microsub.Channel
|
||||
rows, err := b.database.Query(`
|
||||
SELECT c.uid, c.name, count(i.channel_id)
|
||||
FROM "channels" "c" left join items i on c.id = i.channel_id and i.is_read = 0
|
||||
GROUP BY c.id;
|
||||
`)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
for rows.Next() {
|
||||
var uid, name string
|
||||
var count int
|
||||
_ = rows.Scan(&uid, &name, &count)
|
||||
b.lock.RLock()
|
||||
defer b.lock.RUnlock()
|
||||
|
||||
channels = append(channels, microsub.Channel{UID: uid, Name: name, Unread: microsub.Unread{
|
||||
Type: microsub.UnreadCount,
|
||||
UnreadCount: count,
|
||||
}})
|
||||
var channels []microsub.Channel
|
||||
uids, err := redis.Strings(conn.Do("SORT", "channels", "BY", "channel_sortorder_*", "ASC"))
|
||||
if err != nil {
|
||||
log.Printf("Sorting channels failed: %v\n", err)
|
||||
for _, v := range b.Channels {
|
||||
channels = append(channels, v)
|
||||
}
|
||||
} else {
|
||||
for _, uid := range uids {
|
||||
if c, e := b.Channels[uid]; e {
|
||||
channels = append(channels, c)
|
||||
}
|
||||
}
|
||||
}
|
||||
util.StablePartition(channels, 0, len(channels), func(i int) bool {
|
||||
return channels[i].Unread.HasUnread()
|
||||
})
|
||||
|
||||
return channels, nil
|
||||
}
|
||||
|
||||
// ChannelsCreate creates a channels
|
||||
func (b *memoryBackend) ChannelsCreate(name string) (microsub.Channel, error) {
|
||||
uid := util.RandStringBytes(24)
|
||||
|
||||
channel := microsub.Channel{
|
||||
UID: uid,
|
||||
Name: name,
|
||||
Unread: microsub.Unread{Type: microsub.UnreadCount},
|
||||
// Try to fetch the channel, if it exists, we don't create it
|
||||
if channel, e := b.fetchChannel(name); e {
|
||||
return channel, nil
|
||||
}
|
||||
|
||||
result, err := b.database.Exec(`insert into "channels" ("uid", "name", "created_at") values($1, $2, DEFAULT)`, channel.UID, channel.Name)
|
||||
if err != nil {
|
||||
return channel, err
|
||||
}
|
||||
// Otherwise create the channel
|
||||
channel := b.createChannel(name)
|
||||
b.setChannel(channel)
|
||||
b.save()
|
||||
|
||||
conn := b.pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
updateChannelInRedis(conn, channel.UID, DefaultPrio)
|
||||
|
||||
b.broker.Notifier <- sse.Message{Event: "new channel", Object: channelMessage{1, channel}}
|
||||
|
||||
if n, err := result.RowsAffected(); err != nil {
|
||||
if n > 0 {
|
||||
b.broker.Notifier <- sse.Message{Event: "new channel", Object: channelMessage{1, channel}}
|
||||
}
|
||||
}
|
||||
return channel, nil
|
||||
}
|
||||
|
||||
// ChannelsUpdate updates a channels
|
||||
func (b *memoryBackend) ChannelsUpdate(uid, name string) (microsub.Channel, error) {
|
||||
_, err := b.database.Exec(`UPDATE "channels" SET "name" = $1 WHERE "uid" = $2`, name, uid)
|
||||
if err != nil {
|
||||
return microsub.Channel{}, err
|
||||
}
|
||||
c := microsub.Channel{
|
||||
UID: uid,
|
||||
Name: name,
|
||||
Unread: microsub.Unread{},
|
||||
defer b.save()
|
||||
|
||||
b.lock.RLock()
|
||||
c, e := b.Channels[uid]
|
||||
b.lock.RUnlock()
|
||||
|
||||
if e {
|
||||
c.Name = name
|
||||
|
||||
b.lock.Lock()
|
||||
b.Channels[uid] = c
|
||||
b.lock.Unlock()
|
||||
|
||||
b.broker.Notifier <- sse.Message{Event: "update channel", Object: channelMessage{1, c}}
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
b.broker.Notifier <- sse.Message{Event: "update channel", Object: channelMessage{1, c}}
|
||||
|
||||
return c, nil
|
||||
return microsub.Channel{}, fmt.Errorf("channel %s does not exist", uid)
|
||||
}
|
||||
|
||||
// ChannelsDelete deletes a channel
|
||||
func (b *memoryBackend) ChannelsDelete(uid string) error {
|
||||
_, err := b.database.Exec(`delete from "channels" where "uid" = $1`, uid)
|
||||
if err != nil {
|
||||
return err
|
||||
defer b.save()
|
||||
|
||||
conn := b.pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
removed := false
|
||||
|
||||
b.lock.RLock()
|
||||
if _, e := b.Channels[uid]; e {
|
||||
removed = true
|
||||
}
|
||||
b.broker.Notifier <- sse.Message{Event: "delete channel", Object: channelDeletedMessage{1, uid}}
|
||||
b.lock.RUnlock()
|
||||
|
||||
removeChannelFromRedis(conn, uid)
|
||||
|
||||
b.lock.Lock()
|
||||
delete(b.Channels, uid)
|
||||
delete(b.Feeds, uid)
|
||||
b.lock.Unlock()
|
||||
|
||||
if removed {
|
||||
b.broker.Notifier <- sse.Message{Event: "delete channel", Object: channelDeletedMessage{1, uid}}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type feed struct {
|
||||
UID string // channel
|
||||
ID int
|
||||
URL string
|
||||
func (b *memoryBackend) removeFeed(feedID string) error {
|
||||
b.lock.Lock()
|
||||
for uid := range b.Channels {
|
||||
feeds := b.Feeds[uid]
|
||||
for i, feed := range feeds {
|
||||
if feed.URL == feedID {
|
||||
feeds = append(feeds[:i], feeds[i+1:]...)
|
||||
}
|
||||
}
|
||||
b.Feeds[uid] = feeds
|
||||
}
|
||||
b.lock.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *memoryBackend) getFeeds() ([]feed, error) {
|
||||
rows, err := b.database.Query(`SELECT "f"."id", "f"."url", "c"."uid" FROM "feeds" AS "f" INNER JOIN public.channels c on c.id = f.channel_id`)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var feeds []feed
|
||||
for rows.Next() {
|
||||
var feedID int
|
||||
var feedURL, UID string
|
||||
|
||||
err = rows.Scan(&feedID, &feedURL, &UID)
|
||||
if err != nil {
|
||||
log.Printf("while scanning feeds: %s", err)
|
||||
continue
|
||||
func (b *memoryBackend) getFeeds() map[string][]string {
|
||||
feeds := make(map[string][]string)
|
||||
b.lock.RLock()
|
||||
for uid := range b.Channels {
|
||||
for _, feed := range b.Feeds[uid] {
|
||||
feeds[uid] = append(feeds[uid], feed.URL)
|
||||
}
|
||||
|
||||
feeds = append(feeds, feed{UID, feedID, feedURL})
|
||||
}
|
||||
|
||||
return feeds, nil
|
||||
b.lock.RUnlock()
|
||||
return feeds
|
||||
}
|
||||
|
||||
func (b *memoryBackend) run() {
|
||||
|
|
@ -223,33 +337,29 @@ func (b *memoryBackend) run() {
|
|||
}
|
||||
|
||||
func (b *memoryBackend) RefreshFeeds() {
|
||||
feeds, err := b.getFeeds()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
feeds := b.getFeeds()
|
||||
|
||||
count := 0
|
||||
|
||||
for _, feed := range feeds {
|
||||
feedURL := feed.URL
|
||||
feedID := feed.ID
|
||||
uid := feed.UID
|
||||
log.Println("Processing", feedURL)
|
||||
resp, err := b.Fetch3(uid, feedURL)
|
||||
if err != nil {
|
||||
log.Printf("Error while Fetch3 of %s: %v\n", feedURL, err)
|
||||
b.addNotification("Error while fetching feed", feedURL, err)
|
||||
count++
|
||||
continue
|
||||
for uid := range feeds {
|
||||
for _, feedURL := range feeds[uid] {
|
||||
log.Println("Processing", feedURL)
|
||||
resp, err := b.Fetch3(uid, feedURL)
|
||||
if err != nil {
|
||||
log.Printf("Error while Fetch3 of %s: %v\n", feedURL, err)
|
||||
b.addNotification("Error while fetching feed", feedURL, err)
|
||||
count++
|
||||
continue
|
||||
}
|
||||
err = b.ProcessContent(uid, feedURL, resp.Header.Get("Content-Type"), resp.Body)
|
||||
if err != nil {
|
||||
log.Printf("Error while processing content for %s: %v\n", feedURL, err)
|
||||
b.addNotification("Error while processing feed", feedURL, err)
|
||||
count++
|
||||
continue
|
||||
}
|
||||
_ = resp.Body.Close()
|
||||
}
|
||||
err = b.ProcessContent(uid, fmt.Sprintf("%d", feedID), feedURL, resp.Header.Get("Content-Type"), resp.Body)
|
||||
if err != nil {
|
||||
log.Printf("Error while processing content for %s: %v\n", feedURL, err)
|
||||
b.addNotification("Error while processing feed", feedURL, err)
|
||||
count++
|
||||
continue
|
||||
}
|
||||
_ = resp.Body.Close()
|
||||
}
|
||||
|
||||
if count > 0 {
|
||||
|
|
@ -258,17 +368,14 @@ func (b *memoryBackend) RefreshFeeds() {
|
|||
}
|
||||
|
||||
func (b *memoryBackend) addNotification(name string, feedURL string, err error) {
|
||||
err = b.channelAddItem("notifications", microsub.Item{
|
||||
_ = b.channelAddItem("notifications", microsub.Item{
|
||||
Type: "entry",
|
||||
Name: name,
|
||||
Content: µsub.Content{
|
||||
Text: fmt.Sprintf("ERROR: while updating feed: %s", err),
|
||||
Text: fmt.Sprintf("Error while updating feed %s: %v", feedURL, err),
|
||||
},
|
||||
Published: time.Now().Format(time.RFC3339),
|
||||
UID: time.Now().String(),
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("ERROR: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (b *memoryBackend) TimelineGet(before, after, channel string) (microsub.Timeline, error) {
|
||||
|
|
@ -288,67 +395,58 @@ func (b *memoryBackend) TimelineGet(before, after, channel string) (microsub.Tim
|
|||
}
|
||||
|
||||
func (b *memoryBackend) FollowGetList(uid string) ([]microsub.Feed, error) {
|
||||
rows, err := b.database.Query(`SELECT "f"."url" FROM "feeds" AS "f" INNER JOIN channels c on c.id = f.channel_id WHERE c.uid = $1`, uid)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var feeds []microsub.Feed
|
||||
for rows.Next() {
|
||||
var feedURL string
|
||||
err = rows.Scan(&feedURL)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
feeds = append(feeds, microsub.Feed{
|
||||
Type: "feed",
|
||||
URL: feedURL,
|
||||
})
|
||||
}
|
||||
return feeds, nil
|
||||
b.lock.RLock()
|
||||
defer b.lock.RUnlock()
|
||||
return b.Feeds[uid], nil
|
||||
}
|
||||
|
||||
func (b *memoryBackend) FollowURL(uid string, url string) (microsub.Feed, error) {
|
||||
defer b.save()
|
||||
feed := microsub.Feed{Type: "feed", URL: url}
|
||||
|
||||
var channelID int
|
||||
if row := b.database.QueryRow(`SELECT "id" FROM "channels" WHERE "uid" = $1`, uid); row != nil {
|
||||
err := row.Scan(&channelID)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
result, err := b.database.Exec(
|
||||
`INSERT INTO "feeds" ("channel_id", "url") VALUES ($1, $2)`,
|
||||
channelID,
|
||||
feed.URL,
|
||||
)
|
||||
if err != nil {
|
||||
return microsub.Feed{}, err
|
||||
}
|
||||
feedID, _ := result.LastInsertId()
|
||||
|
||||
resp, err := b.Fetch3(uid, feed.URL)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
b.addNotification("Error while fetching feed", feed.URL, err)
|
||||
_ = b.channelAddItem("notifications", microsub.Item{
|
||||
Type: "entry",
|
||||
Name: "Error while fetching feed",
|
||||
Content: µsub.Content{
|
||||
Text: fmt.Sprintf("Error while Fetch3 of %s: %v", feed.URL, err),
|
||||
},
|
||||
UID: time.Now().String(),
|
||||
})
|
||||
_ = b.updateChannelUnreadCount("notifications")
|
||||
return feed, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
_ = b.ProcessContent(uid, fmt.Sprintf("%d", feedID), feed.URL, resp.Header.Get("Content-Type"), resp.Body)
|
||||
b.lock.Lock()
|
||||
b.Feeds[uid] = append(b.Feeds[uid], feed)
|
||||
b.lock.Unlock()
|
||||
|
||||
// FIXME: re-enable CreateFeed
|
||||
// _, _ = b.CreateFeed(url, uid)
|
||||
_ = b.ProcessContent(uid, feed.URL, resp.Header.Get("Content-Type"), resp.Body)
|
||||
|
||||
_, _ = b.CreateFeed(url, uid)
|
||||
|
||||
return feed, nil
|
||||
}
|
||||
|
||||
func (b *memoryBackend) UnfollowURL(uid string, url string) error {
|
||||
_, err := b.database.Exec(`DELETE FROM "feeds" "f" USING "channels" "c" WHERE "c"."id" = "f"."channel_id" AND f.url = $1 AND c.uid = $2`, url, uid)
|
||||
return err
|
||||
defer b.save()
|
||||
index := -1
|
||||
b.lock.Lock()
|
||||
for i, f := range b.Feeds[uid] {
|
||||
if f.URL == url {
|
||||
index = i
|
||||
break
|
||||
}
|
||||
}
|
||||
if index >= 0 {
|
||||
feeds := b.Feeds[uid]
|
||||
b.Feeds[uid] = append(feeds[:index], feeds[index+1:]...)
|
||||
}
|
||||
b.lock.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func checkURL(u string) bool {
|
||||
|
|
@ -397,11 +495,11 @@ func (b *memoryBackend) Search(query string) ([]microsub.Feed, error) {
|
|||
// needs to be like this, because we get a null result otherwise in the json output
|
||||
feeds := []microsub.Feed{}
|
||||
|
||||
cachingFetch := WithCaching(b.pool, fetch.FetcherFunc(Fetch2))
|
||||
cachingFetch := WithCaching(b.pool, Fetch2)
|
||||
|
||||
for _, u := range urls {
|
||||
log.Println(u)
|
||||
resp, err := cachingFetch.Fetch(u)
|
||||
resp, err := cachingFetch(u)
|
||||
if err != nil {
|
||||
log.Printf("Error while fetching %s: %v\n", u, err)
|
||||
continue
|
||||
|
|
@ -415,7 +513,7 @@ func (b *memoryBackend) Search(query string) ([]microsub.Feed, error) {
|
|||
continue
|
||||
}
|
||||
|
||||
feedResp, err := cachingFetch.Fetch(fetchURL.String())
|
||||
feedResp, err := cachingFetch(fetchURL.String())
|
||||
if err != nil {
|
||||
log.Printf("Error in fetch of %s - %v\n", fetchURL, err)
|
||||
continue
|
||||
|
|
@ -438,7 +536,7 @@ func (b *memoryBackend) Search(query string) ([]microsub.Feed, error) {
|
|||
log.Printf("alternate found with type %s %#v\n", relURL.Type, relURL)
|
||||
|
||||
if strings.HasPrefix(relURL.Type, "text/html") || strings.HasPrefix(relURL.Type, "application/json") || strings.HasPrefix(relURL.Type, "application/xml") || strings.HasPrefix(relURL.Type, "text/xml") || strings.HasPrefix(relURL.Type, "application/rss+xml") || strings.HasPrefix(relURL.Type, "application/atom+xml") {
|
||||
feedResp, err := cachingFetch.Fetch(alt)
|
||||
feedResp, err := cachingFetch(alt)
|
||||
if err != nil {
|
||||
log.Printf("Error in fetch of %s - %v\n", alt, err)
|
||||
continue
|
||||
|
|
@ -462,8 +560,8 @@ func (b *memoryBackend) Search(query string) ([]microsub.Feed, error) {
|
|||
}
|
||||
|
||||
func (b *memoryBackend) PreviewURL(previewURL string) (microsub.Timeline, error) {
|
||||
cachingFetch := WithCaching(b.pool, fetch.FetcherFunc(Fetch2))
|
||||
resp, err := cachingFetch.Fetch(previewURL)
|
||||
cachingFetch := WithCaching(b.pool, Fetch2)
|
||||
resp, err := cachingFetch(previewURL)
|
||||
if err != nil {
|
||||
return microsub.Timeline{}, fmt.Errorf("error while fetching %s: %v", previewURL, err)
|
||||
}
|
||||
|
|
@ -500,7 +598,7 @@ func (b *memoryBackend) Events() (chan sse.Message, error) {
|
|||
}
|
||||
|
||||
// ProcessSourcedItems processes items and adds the Source
|
||||
func ProcessSourcedItems(fetcher fetch.Fetcher, fetchURL, contentType string, body io.Reader) ([]microsub.Item, error) {
|
||||
func ProcessSourcedItems(fetcher fetch.FetcherFunc, fetchURL, contentType string, body io.Reader) ([]microsub.Item, error) {
|
||||
// When the source is available from the Header, we fill the Source of the item
|
||||
|
||||
bodyBytes, err := ioutil.ReadAll(body)
|
||||
|
|
@ -537,8 +635,8 @@ func ProcessSourcedItems(fetcher fetch.Fetcher, fetchURL, contentType string, bo
|
|||
return items, nil
|
||||
}
|
||||
|
||||
func (b *memoryBackend) ProcessContent(channel, feedID, fetchURL, contentType string, body io.Reader) error {
|
||||
cachingFetch := WithCaching(b.pool, fetch.FetcherFunc(Fetch2))
|
||||
func (b *memoryBackend) ProcessContent(channel, fetchURL, contentType string, body io.Reader) error {
|
||||
cachingFetch := WithCaching(b.pool, Fetch2)
|
||||
|
||||
items, err := ProcessSourcedItems(cachingFetch, fetchURL, contentType, body)
|
||||
if err != nil {
|
||||
|
|
@ -546,7 +644,6 @@ func (b *memoryBackend) ProcessContent(channel, feedID, fetchURL, contentType st
|
|||
}
|
||||
|
||||
for _, item := range items {
|
||||
item.Source.ID = feedID
|
||||
err = b.channelAddItemWithMatcher(channel, item)
|
||||
if err != nil {
|
||||
log.Printf("ERROR: %s\n", err)
|
||||
|
|
@ -692,9 +789,6 @@ func matchItemText(item microsub.Item, re *regexp.Regexp) bool {
|
|||
func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) error {
|
||||
timelineBackend := b.getTimeline(channel)
|
||||
added, err := timelineBackend.AddItem(item)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Sent message to Server-Sent-Events
|
||||
if added {
|
||||
|
|
@ -705,29 +799,40 @@ func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) error
|
|||
}
|
||||
|
||||
func (b *memoryBackend) updateChannelUnreadCount(channel string) error {
|
||||
// tl := b.getTimeline(channel)
|
||||
// unread, err := tl.Count()
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
//
|
||||
// currentCount := c.Unread.UnreadCount
|
||||
// c.Unread = microsub.Unread{Type: microsub.UnreadCount, UnreadCount: unread}
|
||||
//
|
||||
// // Sent message to Server-Sent-Events
|
||||
// if currentCount != unread {
|
||||
// b.broker.Notifier <- sse.Message{Event: "new item in channel", Object: c}
|
||||
// }
|
||||
b.lock.RLock()
|
||||
c, exists := b.Channels[channel]
|
||||
b.lock.RUnlock()
|
||||
|
||||
if exists {
|
||||
tl := b.getTimeline(channel)
|
||||
unread, err := tl.Count()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer b.save()
|
||||
|
||||
currentCount := c.Unread.UnreadCount
|
||||
c.Unread = microsub.Unread{Type: microsub.UnreadCount, UnreadCount: unread}
|
||||
|
||||
// Sent message to Server-Sent-Events
|
||||
if currentCount != unread {
|
||||
b.broker.Notifier <- sse.Message{Event: "new item in channel", Object: c}
|
||||
}
|
||||
|
||||
b.lock.Lock()
|
||||
b.Channels[channel] = c
|
||||
b.lock.Unlock()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// WithCaching adds caching to a fetch.Fetcher
|
||||
func WithCaching(pool *redis.Pool, ff fetch.Fetcher) fetch.Fetcher {
|
||||
ff2 := (func(fetchURL string) (*http.Response, error) {
|
||||
conn := pool.Get()
|
||||
defer conn.Close()
|
||||
// WithCaching adds caching to a FetcherFunc
|
||||
func WithCaching(pool *redis.Pool, ff fetch.FetcherFunc) fetch.FetcherFunc {
|
||||
conn := pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
return func(fetchURL string) (*http.Response, error) {
|
||||
cacheKey := fmt.Sprintf("http_cache:%s", fetchURL)
|
||||
u, err := url.Parse(fetchURL)
|
||||
if err != nil {
|
||||
|
|
@ -745,7 +850,7 @@ func WithCaching(pool *redis.Pool, ff fetch.Fetcher) fetch.Fetcher {
|
|||
|
||||
log.Printf("MISS %s\n", fetchURL)
|
||||
|
||||
resp, err := ff.Fetch(fetchURL)
|
||||
resp, err := ff(fetchURL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -765,8 +870,7 @@ func WithCaching(pool *redis.Pool, ff fetch.Fetcher) fetch.Fetcher {
|
|||
|
||||
cachedResp, err := http.ReadResponse(bufio.NewReader(bytes.NewReader(cachedCopy)), req)
|
||||
return cachedResp, err
|
||||
})
|
||||
return fetch.FetcherFunc(ff2)
|
||||
}
|
||||
}
|
||||
|
||||
// Fetch2 fetches stuff
|
||||
|
|
@ -785,7 +889,7 @@ func Fetch2(fetchURL string) (*http.Response, error) {
|
|||
client := http.Client{}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("fetch failed: %s", err)
|
||||
return nil, fmt.Errorf("fetch failed: %s: %s", u, err)
|
||||
}
|
||||
|
||||
return resp, err
|
||||
|
|
@ -794,9 +898,9 @@ func Fetch2(fetchURL string) (*http.Response, error) {
|
|||
func (b *memoryBackend) getTimeline(channel string) timeline.Backend {
|
||||
// Set a default timeline type if not set
|
||||
timelineType := "postgres-stream"
|
||||
// if setting, ok := b.Settings[channel]; ok && setting.ChannelType != "" {
|
||||
// timelineType = setting.ChannelType
|
||||
// }
|
||||
if setting, ok := b.Settings[channel]; ok && setting.ChannelType != "" {
|
||||
timelineType = setting.ChannelType
|
||||
}
|
||||
tl := timeline.Create(channel, timelineType, b.pool, b.database)
|
||||
if tl == nil {
|
||||
log.Printf("no timeline found with name %q and type %q", channel, timelineType)
|
||||
|
|
|
|||
|
|
@ -1,210 +1,222 @@
|
|||
package main
|
||||
|
||||
// func Test_memoryBackend_ChannelsCreate(t *testing.T) {
|
||||
// type fields struct {
|
||||
// hubIncomingBackend hubIncomingBackend
|
||||
// lock sync.RWMutex
|
||||
// Channels map[string]microsub.Channel
|
||||
// Feeds map[string][]microsub.Feed
|
||||
// Settings map[string]channelSetting
|
||||
// NextUID int
|
||||
// Me string
|
||||
// TokenEndpoint string
|
||||
// AuthEnabled bool
|
||||
// ticker *time.Ticker
|
||||
// quit chan struct{}
|
||||
// broker *sse.Broker
|
||||
// pool *redis.Pool
|
||||
// }
|
||||
// type args struct {
|
||||
// name string
|
||||
// }
|
||||
// tests := []struct {
|
||||
// name string
|
||||
// fields fields
|
||||
// args args
|
||||
// want microsub.Channel
|
||||
// wantErr bool
|
||||
// }{
|
||||
// {
|
||||
// name: "Duplicate channel",
|
||||
// fields: fields{
|
||||
// hubIncomingBackend: hubIncomingBackend{},
|
||||
// lock: sync.RWMutex{},
|
||||
// Channels: func() map[string]microsub.Channel {
|
||||
// channels := make(map[string]microsub.Channel)
|
||||
// channels["1234"] = microsub.Channel{
|
||||
// UID: "1234",
|
||||
// Name: "Test",
|
||||
// Unread: microsub.Unread{
|
||||
// Type: microsub.UnreadCount,
|
||||
// Unread: false,
|
||||
// UnreadCount: 0,
|
||||
// },
|
||||
// }
|
||||
// return channels
|
||||
// }(),
|
||||
// Feeds: func() map[string][]microsub.Feed {
|
||||
// feeds := make(map[string][]microsub.Feed)
|
||||
// return feeds
|
||||
// }(),
|
||||
// Settings: nil,
|
||||
// NextUID: 1,
|
||||
// Me: "",
|
||||
// TokenEndpoint: "",
|
||||
// AuthEnabled: false,
|
||||
// ticker: nil,
|
||||
// quit: nil,
|
||||
// broker: nil,
|
||||
// pool: nil,
|
||||
// },
|
||||
// args: args{
|
||||
// name: "Test",
|
||||
// },
|
||||
// want: microsub.Channel{
|
||||
// UID: "1234",
|
||||
// Name: "Test",
|
||||
// Unread: microsub.Unread{
|
||||
// Type: microsub.UnreadCount,
|
||||
// Unread: false,
|
||||
// UnreadCount: 0,
|
||||
// },
|
||||
// },
|
||||
// wantErr: false,
|
||||
// },
|
||||
// }
|
||||
// for _, tt := range tests {
|
||||
// t.Run(tt.name, func(t *testing.T) {
|
||||
// b := &memoryBackend{
|
||||
// hubIncomingBackend: tt.fields.hubIncomingBackend,
|
||||
// lock: tt.fields.lock,
|
||||
// Channels: tt.fields.Channels,
|
||||
// Feeds: tt.fields.Feeds,
|
||||
// Settings: tt.fields.Settings,
|
||||
// NextUID: tt.fields.NextUID,
|
||||
// Me: tt.fields.Me,
|
||||
// TokenEndpoint: tt.fields.TokenEndpoint,
|
||||
// AuthEnabled: tt.fields.AuthEnabled,
|
||||
// ticker: tt.fields.ticker,
|
||||
// quit: tt.fields.quit,
|
||||
// broker: tt.fields.broker,
|
||||
// pool: tt.fields.pool,
|
||||
// }
|
||||
// got, err := b.ChannelsCreate(tt.args.name)
|
||||
// if (err != nil) != tt.wantErr {
|
||||
// t.Errorf("ChannelsCreate() error = %v, wantErr %v", err, tt.wantErr)
|
||||
// return
|
||||
// }
|
||||
// if !reflect.DeepEqual(got, tt.want) {
|
||||
// t.Errorf("ChannelsCreate() got = %v, want %v", got, tt.want)
|
||||
// }
|
||||
// })
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// func Test_memoryBackend_removeFeed(t *testing.T) {
|
||||
// type fields struct {
|
||||
// Channels map[string]microsub.Channel
|
||||
// Feeds map[string][]microsub.Feed
|
||||
// }
|
||||
// type args struct {
|
||||
// feedID string
|
||||
// }
|
||||
// tests := []struct {
|
||||
// name string
|
||||
// fields fields
|
||||
// args args
|
||||
// lens map[string]int
|
||||
// wantErr bool
|
||||
// }{
|
||||
// {
|
||||
// name: "remove from channel 1",
|
||||
// fields: fields{
|
||||
// Channels: map[string]microsub.Channel{
|
||||
// "123": {UID: "channel1", Name: "Channel 1"},
|
||||
// "124": {UID: "channel2", Name: "Channel 2"},
|
||||
// },
|
||||
// Feeds: map[string][]microsub.Feed{
|
||||
// "123": {{Type: "feed", URL: "feed1", Name: "Feed1"}},
|
||||
// "124": {{Type: "feed", URL: "feed2", Name: "Feed2"}},
|
||||
// },
|
||||
// },
|
||||
// args: args{feedID: "feed1"},
|
||||
// lens: map[string]int{"123": 0, "124": 1},
|
||||
// wantErr: false,
|
||||
// },
|
||||
// {
|
||||
// name: "remove from channel 2",
|
||||
// fields: fields{
|
||||
// Channels: map[string]microsub.Channel{
|
||||
// "123": {UID: "channel1", Name: "Channel 1"},
|
||||
// "124": {UID: "channel2", Name: "Channel 2"},
|
||||
// },
|
||||
// Feeds: map[string][]microsub.Feed{
|
||||
// "123": {{Type: "feed", URL: "feed1", Name: "Feed1"}},
|
||||
// "124": {{Type: "feed", URL: "feed2", Name: "Feed2"}},
|
||||
// },
|
||||
// },
|
||||
// args: args{feedID: "feed2"},
|
||||
// lens: map[string]int{"123": 1, "124": 0},
|
||||
// wantErr: false,
|
||||
// },
|
||||
// {
|
||||
// name: "remove unknown",
|
||||
// fields: fields{
|
||||
// Channels: map[string]microsub.Channel{
|
||||
// "123": {UID: "channel1", Name: "Channel 1"},
|
||||
// "124": {UID: "channel2", Name: "Channel 2"},
|
||||
// },
|
||||
// Feeds: map[string][]microsub.Feed{
|
||||
// "123": {{Type: "feed", URL: "feed1", Name: "Feed1"}},
|
||||
// "124": {{Type: "feed", URL: "feed2", Name: "Feed2"}},
|
||||
// },
|
||||
// },
|
||||
// args: args{feedID: "feed3"},
|
||||
// lens: map[string]int{"123": 1, "124": 1},
|
||||
// wantErr: false,
|
||||
// },
|
||||
// {
|
||||
// name: "remove from 0 channels",
|
||||
// fields: fields{
|
||||
// Channels: map[string]microsub.Channel{},
|
||||
// Feeds: map[string][]microsub.Feed{},
|
||||
// },
|
||||
// args: args{feedID: "feed3"},
|
||||
// lens: map[string]int{},
|
||||
// wantErr: false,
|
||||
// },
|
||||
// {
|
||||
// name: "remove from multiple channels",
|
||||
// fields: fields{
|
||||
// Channels: map[string]microsub.Channel{
|
||||
// "123": {UID: "channel1", Name: "Channel 1"},
|
||||
// "124": {UID: "channel2", Name: "Channel 2"},
|
||||
// },
|
||||
// Feeds: map[string][]microsub.Feed{
|
||||
// "123": {{Type: "feed", URL: "feed1", Name: "Feed1"}},
|
||||
// "124": {{Type: "feed", URL: "feed1", Name: "Feed1"}},
|
||||
// },
|
||||
// },
|
||||
// args: args{feedID: "feed1"},
|
||||
// lens: map[string]int{"123": 0, "124": 0},
|
||||
// wantErr: false,
|
||||
// },
|
||||
// }
|
||||
// for _, tt := range tests {
|
||||
// t.Run(tt.name, func(t *testing.T) {
|
||||
// b := &memoryBackend{
|
||||
// Channels: tt.fields.Channels,
|
||||
// Feeds: tt.fields.Feeds,
|
||||
// }
|
||||
// if err := b.removeFeed(tt.args.feedID); (err != nil) != tt.wantErr {
|
||||
// t.Errorf("removeFeed() error = %v, wantErr %v", err, tt.wantErr)
|
||||
// }
|
||||
// assert.Len(t, b.Channels, len(tt.lens))
|
||||
// for k, v := range tt.lens {
|
||||
// assert.Len(t, b.Feeds[k], v)
|
||||
// }
|
||||
// })
|
||||
// }
|
||||
// }
|
||||
import (
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gomodule/redigo/redis"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"p83.nl/go/ekster/pkg/microsub"
|
||||
"p83.nl/go/ekster/pkg/sse"
|
||||
)
|
||||
|
||||
func Test_memoryBackend_ChannelsCreate(t *testing.T) {
|
||||
type fields struct {
|
||||
hubIncomingBackend hubIncomingBackend
|
||||
lock sync.RWMutex
|
||||
Channels map[string]microsub.Channel
|
||||
Feeds map[string][]microsub.Feed
|
||||
Settings map[string]channelSetting
|
||||
NextUID int
|
||||
Me string
|
||||
TokenEndpoint string
|
||||
AuthEnabled bool
|
||||
ticker *time.Ticker
|
||||
quit chan struct{}
|
||||
broker *sse.Broker
|
||||
pool *redis.Pool
|
||||
}
|
||||
type args struct {
|
||||
name string
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
want microsub.Channel
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "Duplicate channel",
|
||||
fields: fields{
|
||||
hubIncomingBackend: hubIncomingBackend{},
|
||||
lock: sync.RWMutex{},
|
||||
Channels: func() map[string]microsub.Channel {
|
||||
channels := make(map[string]microsub.Channel)
|
||||
channels["1234"] = microsub.Channel{
|
||||
UID: "1234",
|
||||
Name: "Test",
|
||||
Unread: microsub.Unread{
|
||||
Type: microsub.UnreadCount,
|
||||
Unread: false,
|
||||
UnreadCount: 0,
|
||||
},
|
||||
}
|
||||
return channels
|
||||
}(),
|
||||
Feeds: func() map[string][]microsub.Feed {
|
||||
feeds := make(map[string][]microsub.Feed)
|
||||
return feeds
|
||||
}(),
|
||||
Settings: nil,
|
||||
NextUID: 1,
|
||||
Me: "",
|
||||
TokenEndpoint: "",
|
||||
AuthEnabled: false,
|
||||
ticker: nil,
|
||||
quit: nil,
|
||||
broker: nil,
|
||||
pool: nil,
|
||||
},
|
||||
args: args{
|
||||
name: "Test",
|
||||
},
|
||||
want: microsub.Channel{
|
||||
UID: "1234",
|
||||
Name: "Test",
|
||||
Unread: microsub.Unread{
|
||||
Type: microsub.UnreadCount,
|
||||
Unread: false,
|
||||
UnreadCount: 0,
|
||||
},
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
b := &memoryBackend{
|
||||
hubIncomingBackend: tt.fields.hubIncomingBackend,
|
||||
lock: tt.fields.lock,
|
||||
Channels: tt.fields.Channels,
|
||||
Feeds: tt.fields.Feeds,
|
||||
Settings: tt.fields.Settings,
|
||||
NextUID: tt.fields.NextUID,
|
||||
Me: tt.fields.Me,
|
||||
TokenEndpoint: tt.fields.TokenEndpoint,
|
||||
AuthEnabled: tt.fields.AuthEnabled,
|
||||
ticker: tt.fields.ticker,
|
||||
quit: tt.fields.quit,
|
||||
broker: tt.fields.broker,
|
||||
pool: tt.fields.pool,
|
||||
}
|
||||
got, err := b.ChannelsCreate(tt.args.name)
|
||||
if (err != nil) != tt.wantErr {
|
||||
t.Errorf("ChannelsCreate() error = %v, wantErr %v", err, tt.wantErr)
|
||||
return
|
||||
}
|
||||
if !reflect.DeepEqual(got, tt.want) {
|
||||
t.Errorf("ChannelsCreate() got = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func Test_memoryBackend_removeFeed(t *testing.T) {
|
||||
type fields struct {
|
||||
Channels map[string]microsub.Channel
|
||||
Feeds map[string][]microsub.Feed
|
||||
}
|
||||
type args struct {
|
||||
feedID string
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
lens map[string]int
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "remove from channel 1",
|
||||
fields: fields{
|
||||
Channels: map[string]microsub.Channel{
|
||||
"123": {UID: "channel1", Name: "Channel 1"},
|
||||
"124": {UID: "channel2", Name: "Channel 2"},
|
||||
},
|
||||
Feeds: map[string][]microsub.Feed{
|
||||
"123": {{Type: "feed", URL: "feed1", Name: "Feed1"}},
|
||||
"124": {{Type: "feed", URL: "feed2", Name: "Feed2"}},
|
||||
},
|
||||
},
|
||||
args: args{feedID: "feed1"},
|
||||
lens: map[string]int{"123": 0, "124": 1},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "remove from channel 2",
|
||||
fields: fields{
|
||||
Channels: map[string]microsub.Channel{
|
||||
"123": {UID: "channel1", Name: "Channel 1"},
|
||||
"124": {UID: "channel2", Name: "Channel 2"},
|
||||
},
|
||||
Feeds: map[string][]microsub.Feed{
|
||||
"123": {{Type: "feed", URL: "feed1", Name: "Feed1"}},
|
||||
"124": {{Type: "feed", URL: "feed2", Name: "Feed2"}},
|
||||
},
|
||||
},
|
||||
args: args{feedID: "feed2"},
|
||||
lens: map[string]int{"123": 1, "124": 0},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "remove unknown",
|
||||
fields: fields{
|
||||
Channels: map[string]microsub.Channel{
|
||||
"123": {UID: "channel1", Name: "Channel 1"},
|
||||
"124": {UID: "channel2", Name: "Channel 2"},
|
||||
},
|
||||
Feeds: map[string][]microsub.Feed{
|
||||
"123": {{Type: "feed", URL: "feed1", Name: "Feed1"}},
|
||||
"124": {{Type: "feed", URL: "feed2", Name: "Feed2"}},
|
||||
},
|
||||
},
|
||||
args: args{feedID: "feed3"},
|
||||
lens: map[string]int{"123": 1, "124": 1},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "remove from 0 channels",
|
||||
fields: fields{
|
||||
Channels: map[string]microsub.Channel{},
|
||||
Feeds: map[string][]microsub.Feed{},
|
||||
},
|
||||
args: args{feedID: "feed3"},
|
||||
lens: map[string]int{},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "remove from multiple channels",
|
||||
fields: fields{
|
||||
Channels: map[string]microsub.Channel{
|
||||
"123": {UID: "channel1", Name: "Channel 1"},
|
||||
"124": {UID: "channel2", Name: "Channel 2"},
|
||||
},
|
||||
Feeds: map[string][]microsub.Feed{
|
||||
"123": {{Type: "feed", URL: "feed1", Name: "Feed1"}},
|
||||
"124": {{Type: "feed", URL: "feed1", Name: "Feed1"}},
|
||||
},
|
||||
},
|
||||
args: args{feedID: "feed1"},
|
||||
lens: map[string]int{"123": 0, "124": 0},
|
||||
wantErr: false,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
b := &memoryBackend{
|
||||
Channels: tt.fields.Channels,
|
||||
Feeds: tt.fields.Feeds,
|
||||
}
|
||||
if err := b.removeFeed(tt.args.feedID); (err != nil) != tt.wantErr {
|
||||
t.Errorf("removeFeed() error = %v, wantErr %v", err, tt.wantErr)
|
||||
}
|
||||
assert.Len(t, b.Channels, len(tt.lens))
|
||||
for k, v := range tt.lens {
|
||||
assert.Len(t, b.Feeds[k], v)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@ func main() {
|
|||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
items, err := fetch.FeedItems(fetch.FetcherFunc(Fetch), url, resp.Header.Get("Content-Type"), resp.Body)
|
||||
items, err := fetch.FeedItems(Fetch, url, resp.Header.Get("Content-Type"), resp.Body)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ services:
|
|||
image: "redis:5"
|
||||
|
||||
database:
|
||||
image: postgres:14
|
||||
image: postgres
|
||||
volumes:
|
||||
- database-data:/var/lib/postgresql/data
|
||||
environment:
|
||||
|
|
@ -14,16 +14,21 @@ services:
|
|||
POSTGRES_HOST_AUTH_METHOD: trust
|
||||
|
||||
web:
|
||||
image: ubuntu
|
||||
working_dir: /app
|
||||
image: "pstuifzand/ekster:alpine"
|
||||
working_dir: /opt/microsub
|
||||
links:
|
||||
- redis:redis
|
||||
volumes:
|
||||
- microsub-data:/opt/microsub
|
||||
- ./templates:/app/templates
|
||||
- ./eksterd:/app/eksterd
|
||||
- ./backend.json:/app/backend.json
|
||||
entrypoint: /app/eksterd
|
||||
command: -auth=false -port 80
|
||||
command: -auth=false -port 80 -templates templates
|
||||
ports:
|
||||
- 8089:80
|
||||
environment:
|
||||
- "FEEDBIN_USER="
|
||||
- "FEEDBIN_PASS="
|
||||
- "EKSTER_BASEURL=http://localhost:8089/"
|
||||
- "EKSTER_TEMPLATES=/app/templates"
|
||||
|
||||
|
|
|
|||
9
go.mod
9
go.mod
|
|
@ -3,14 +3,15 @@ module p83.nl/go/ekster
|
|||
go 1.16
|
||||
|
||||
require (
|
||||
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751
|
||||
github.com/axgle/mahonia v0.0.0-20180208002826-3358181d7394
|
||||
github.com/blevesearch/bleve/v2 v2.0.3
|
||||
github.com/blevesearch/bleve/v2 v2.0.3 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/gilliek/go-opml v1.0.0
|
||||
github.com/golang-migrate/migrate/v4 v4.15.1
|
||||
github.com/gomodule/redigo v1.8.2
|
||||
github.com/lib/pq v1.10.1
|
||||
github.com/pkg/errors v0.9.1
|
||||
github.com/stretchr/testify v1.7.0
|
||||
golang.org/x/net v0.0.0-20211013171255-e13a2654a71e
|
||||
github.com/stretchr/testify v1.5.1
|
||||
golang.org/x/net v0.0.0-20200707034311-ab3426394381
|
||||
willnorris.com/go/microformats v1.1.0
|
||||
)
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ import (
|
|||
)
|
||||
|
||||
// FeedHeader returns a new microsub.Feed with the information parsed from body.
|
||||
func FeedHeader(fetcher Fetcher, fetchURL, contentType string, body io.Reader) (microsub.Feed, error) {
|
||||
func FeedHeader(fetcher FetcherFunc, fetchURL, contentType string, body io.Reader) (microsub.Feed, error) {
|
||||
log.Printf("ProcessContent %s\n", fetchURL)
|
||||
log.Println("Found " + contentType)
|
||||
|
||||
|
|
@ -38,7 +38,7 @@ func FeedHeader(fetcher Fetcher, fetchURL, contentType string, body io.Reader) (
|
|||
author, ok := jf2.SimplifyMicroformatDataAuthor(data)
|
||||
if !ok {
|
||||
if strings.HasPrefix(author.URL, "http") {
|
||||
resp, err := fetcher.Fetch(author.URL)
|
||||
resp, err := fetcher(author.URL)
|
||||
if err != nil {
|
||||
return feed, err
|
||||
}
|
||||
|
|
@ -108,7 +108,7 @@ func FeedHeader(fetcher Fetcher, fetchURL, contentType string, body io.Reader) (
|
|||
}
|
||||
|
||||
// FeedItems returns the items from the url, parsed from body.
|
||||
func FeedItems(fetcher Fetcher, fetchURL, contentType string, body io.Reader) ([]microsub.Item, error) {
|
||||
func FeedItems(fetcher FetcherFunc, fetchURL, contentType string, body io.Reader) ([]microsub.Item, error) {
|
||||
log.Printf("ProcessContent %s\n", fetchURL)
|
||||
log.Println("Found " + contentType)
|
||||
|
||||
|
|
|
|||
|
|
@ -23,7 +23,7 @@ func TestFeedHeader(t *testing.T) {
|
|||
</body>
|
||||
</html>
|
||||
`
|
||||
feed, err := FeedHeader(FetcherFunc(fetcher), "https://example.com/", "text/html", strings.NewReader(doc))
|
||||
feed, err := FeedHeader(fetcher, "https://example.com/", "text/html", strings.NewReader(doc))
|
||||
if assert.NoError(t, err) {
|
||||
assert.Equal(t, "feed", feed.Type)
|
||||
assert.Equal(t, "Title", feed.Name)
|
||||
|
|
|
|||
|
|
@ -2,15 +2,5 @@ package fetch
|
|||
|
||||
import "net/http"
|
||||
|
||||
// Fetcher fetches urls
|
||||
type Fetcher interface {
|
||||
Fetch(url string) (*http.Response, error)
|
||||
}
|
||||
|
||||
// FetcherFunc is a function that fetches an url
|
||||
type FetcherFunc func(url string) (*http.Response, error)
|
||||
|
||||
// Fetch fetches an url and returns a response or error
|
||||
func (ff FetcherFunc) Fetch(url string) (*http.Response, error) {
|
||||
return ff(url)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,12 +2,9 @@ package timeline
|
|||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"database/sql"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"log"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
|
|
@ -35,46 +32,46 @@ func (p *postgresStream) Init() error {
|
|||
if err != nil {
|
||||
return fmt.Errorf("database ping failed: %w", err)
|
||||
}
|
||||
//
|
||||
// _, err = conn.ExecContext(ctx, `
|
||||
// CREATE TABLE IF NOT EXISTS "channels" (
|
||||
// "id" int primary key generated always as identity,
|
||||
// "name" varchar(255) unique,
|
||||
// "created_at" timestamp DEFAULT current_timestamp
|
||||
// );
|
||||
// `)
|
||||
// if err != nil {
|
||||
// return fmt.Errorf("create channels table failed: %w", err)
|
||||
// }
|
||||
//
|
||||
// _, err = conn.ExecContext(ctx, `
|
||||
// CREATE TABLE IF NOT EXISTS "items" (
|
||||
// "id" int primary key generated always as identity,
|
||||
// "channel_id" int references "channels" on delete cascade,
|
||||
// "uid" varchar(512) not null unique,
|
||||
// "is_read" int default 0,
|
||||
// "data" jsonb,
|
||||
// "created_at" timestamp DEFAULT current_timestamp,
|
||||
// "updated_at" timestamp,
|
||||
// "published_at" timestamp
|
||||
// );
|
||||
// `)
|
||||
// if err != nil {
|
||||
// return fmt.Errorf("create items table failed: %w", err)
|
||||
// }
|
||||
//
|
||||
// _, err = conn.ExecContext(ctx, `ALTER TABLE "items" ALTER COLUMN "data" TYPE jsonb, ALTER COLUMN "uid" TYPE varchar(1024)`)
|
||||
// if err != nil {
|
||||
// return fmt.Errorf("alter items table failed: %w", err)
|
||||
// }
|
||||
|
||||
_, err = conn.ExecContext(ctx, `INSERT INTO "channels" ("uid", "name", "created_at") VALUES ($1, $1, DEFAULT)
|
||||
_, err = conn.ExecContext(ctx, `
|
||||
CREATE TABLE IF NOT EXISTS "channels" (
|
||||
"id" int primary key generated always as identity,
|
||||
"name" varchar(255) unique,
|
||||
"created_at" timestamp DEFAULT current_timestamp
|
||||
);
|
||||
`)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create channels table failed: %w", err)
|
||||
}
|
||||
|
||||
_, err = conn.ExecContext(ctx, `
|
||||
CREATE TABLE IF NOT EXISTS "items" (
|
||||
"id" int primary key generated always as identity,
|
||||
"channel_id" int references "channels" on delete cascade,
|
||||
"uid" varchar(512) not null unique,
|
||||
"is_read" int default 0,
|
||||
"data" jsonb,
|
||||
"created_at" timestamp DEFAULT current_timestamp,
|
||||
"updated_at" timestamp,
|
||||
"published_at" timestamp
|
||||
);
|
||||
`)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create items table failed: %w", err)
|
||||
}
|
||||
|
||||
_, err = conn.ExecContext(ctx, `ALTER TABLE "items" ALTER COLUMN "data" TYPE jsonb, ALTER COLUMN "uid" TYPE varchar(1024)`)
|
||||
if err != nil {
|
||||
return fmt.Errorf("alter items table failed: %w", err)
|
||||
}
|
||||
|
||||
_, err = conn.ExecContext(ctx, `INSERT INTO "channels" ("name", "created_at") VALUES ($1, DEFAULT)
|
||||
ON CONFLICT DO NOTHING`, p.channel)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create channel failed: %w", err)
|
||||
}
|
||||
|
||||
row := conn.QueryRowContext(ctx, `SELECT "id" FROM "channels" WHERE "uid" = $1`, p.channel)
|
||||
row := conn.QueryRowContext(ctx, `SELECT "id" FROM "channels" WHERE "name" = $1`, p.channel)
|
||||
if row == nil {
|
||||
return fmt.Errorf("fetch channel failed: %w", err)
|
||||
}
|
||||
|
|
@ -147,7 +144,7 @@ WHERE "channel_id" = $1
|
|||
last = publishedAt
|
||||
|
||||
item.Read = isRead == 1
|
||||
item.ID = strconv.Itoa(id)
|
||||
item.ID = uid
|
||||
item.Published = publishedAt
|
||||
|
||||
tl.Items = append(tl.Items, item)
|
||||
|
|
@ -211,23 +208,14 @@ func (p *postgresStream) AddItem(item microsub.Item) (bool, error) {
|
|||
}
|
||||
t = t2
|
||||
}
|
||||
if item.UID == "" {
|
||||
h := sha256.Sum256([]byte(fmt.Sprintf("%s:%d", p.channel, time.Now().UnixNano())))
|
||||
item.UID = hex.EncodeToString(h[:])
|
||||
}
|
||||
|
||||
feedID, err := strconv.ParseInt(item.Source.ID, 10, 64)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("ERROR: item.Source.ID is not an integer %q: %w", item.Source.ID, err)
|
||||
}
|
||||
|
||||
result, err := conn.ExecContext(context.Background(), `
|
||||
INSERT INTO "items" ("channel_id", "feed_id", "uid", "data", "published_at", "created_at")
|
||||
VALUES ($1, $2, $3, $4, $5, DEFAULT)
|
||||
INSERT INTO "items" ("channel_id", "uid", "data", "published_at", "created_at")
|
||||
VALUES ($1, $2, $3, $4, DEFAULT)
|
||||
ON CONFLICT ON CONSTRAINT "items_uid_key" DO NOTHING
|
||||
`, p.channelID, feedID, item.UID, &item, t)
|
||||
`, p.channelID, item.ID, &item, t)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("insert item: %w", err)
|
||||
return false, fmt.Errorf("while adding item: %w", err)
|
||||
}
|
||||
c, err := result.RowsAffected()
|
||||
if err != nil {
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user