Improve handling of before value
Some checks failed
continuous-integration/drone/push Build is failing
Some checks failed
continuous-integration/drone/push Build is failing
This commit is contained in:
parent
f7fe30f387
commit
eb3dba4d17
|
@ -1,8 +1,10 @@
|
||||||
package timeline
|
package timeline
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -20,13 +22,18 @@ type postgresStream struct {
|
||||||
|
|
||||||
// Init
|
// Init
|
||||||
func (p *postgresStream) Init() error {
|
func (p *postgresStream) Init() error {
|
||||||
db := p.database
|
ctx := context.Background()
|
||||||
err := db.Ping()
|
conn, err := p.database.Conn(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer conn.Close()
|
||||||
|
err = conn.PingContext(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("database ping failed: %w", err)
|
return fmt.Errorf("database ping failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = db.Exec(`
|
_, err = conn.ExecContext(ctx, `
|
||||||
CREATE TABLE IF NOT EXISTS "channels" (
|
CREATE TABLE IF NOT EXISTS "channels" (
|
||||||
"id" int primary key generated always as identity,
|
"id" int primary key generated always as identity,
|
||||||
"name" varchar(255) unique,
|
"name" varchar(255) unique,
|
||||||
|
@ -37,7 +44,7 @@ CREATE TABLE IF NOT EXISTS "channels" (
|
||||||
return fmt.Errorf("create channels table failed: %w", err)
|
return fmt.Errorf("create channels table failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = db.Exec(`
|
_, err = conn.ExecContext(ctx, `
|
||||||
CREATE TABLE IF NOT EXISTS "items" (
|
CREATE TABLE IF NOT EXISTS "items" (
|
||||||
"id" int primary key generated always as identity,
|
"id" int primary key generated always as identity,
|
||||||
"channel_id" int references "channels" on delete cascade,
|
"channel_id" int references "channels" on delete cascade,
|
||||||
|
@ -53,22 +60,19 @@ CREATE TABLE IF NOT EXISTS "items" (
|
||||||
return fmt.Errorf("create items table failed: %w", err)
|
return fmt.Errorf("create items table failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = db.Exec(`INSERT INTO "channels" ("name", "created_at") VALUES ($1, DEFAULT)
|
_, err = conn.ExecContext(ctx, `INSERT INTO "channels" ("name", "created_at") VALUES ($1, DEFAULT)
|
||||||
ON CONFLICT DO NOTHING`, p.channel)
|
ON CONFLICT DO NOTHING`, p.channel)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("create channel failed: %w", err)
|
return fmt.Errorf("create channel failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
rows, err := db.Query(`SELECT "id" FROM "channels" WHERE "name" = $1`, p.channel)
|
row := conn.QueryRowContext(ctx, `SELECT "id" FROM "channels" WHERE "name" = $1`, p.channel)
|
||||||
if err != nil {
|
if row == nil {
|
||||||
return fmt.Errorf("fetch channel failed: %w", err)
|
return fmt.Errorf("fetch channel failed: %w", err)
|
||||||
}
|
}
|
||||||
for rows.Next() {
|
err = row.Scan(&p.channelID)
|
||||||
err = rows.Scan(&p.channelID)
|
if err != nil {
|
||||||
if err != nil {
|
return fmt.Errorf("fetch channel failed while scanning: %w", err)
|
||||||
return fmt.Errorf("fetch channel failed while scanning: %w", err)
|
|
||||||
}
|
|
||||||
break
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -76,6 +80,13 @@ CREATE TABLE IF NOT EXISTS "items" (
|
||||||
|
|
||||||
// Items
|
// Items
|
||||||
func (p *postgresStream) Items(before, after string) (microsub.Timeline, error) {
|
func (p *postgresStream) Items(before, after string) (microsub.Timeline, error) {
|
||||||
|
ctx := context.Background()
|
||||||
|
conn, err := p.database.Conn(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return microsub.Timeline{}, err
|
||||||
|
}
|
||||||
|
defer conn.Close()
|
||||||
|
|
||||||
var args []interface{}
|
var args []interface{}
|
||||||
args = append(args, p.channelID)
|
args = append(args, p.channelID)
|
||||||
var qb strings.Builder
|
var qb strings.Builder
|
||||||
|
@ -86,7 +97,9 @@ WHERE "channel_id" = $1
|
||||||
`)
|
`)
|
||||||
if before != "" {
|
if before != "" {
|
||||||
b, err := time.Parse(time.RFC3339, before)
|
b, err := time.Parse(time.RFC3339, before)
|
||||||
if err == nil {
|
if err != nil {
|
||||||
|
log.Println(err)
|
||||||
|
} else {
|
||||||
args = append(args, b)
|
args = append(args, b)
|
||||||
qb.WriteString(` AND "published_at" < $2`)
|
qb.WriteString(` AND "published_at" < $2`)
|
||||||
}
|
}
|
||||||
|
@ -97,9 +110,9 @@ WHERE "channel_id" = $1
|
||||||
qb.WriteString(` AND "published_at" > $2`)
|
qb.WriteString(` AND "published_at" > $2`)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
qb.WriteString(` ORDER BY "published_at"`)
|
qb.WriteString(` ORDER BY "published_at" DESC LIMIT 10`)
|
||||||
|
|
||||||
rows, err := p.database.Query(qb.String(), args...)
|
rows, err := conn.QueryContext(context.Background(), qb.String(), args...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return microsub.Timeline{}, fmt.Errorf("while query: %w", err)
|
return microsub.Timeline{}, fmt.Errorf("while query: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -127,6 +140,7 @@ WHERE "channel_id" = $1
|
||||||
|
|
||||||
item.Read = isRead == 1
|
item.Read = isRead == 1
|
||||||
item.ID = uid
|
item.ID = uid
|
||||||
|
item.Published = publishedAt
|
||||||
|
|
||||||
tl.Items = append(tl.Items, item)
|
tl.Items = append(tl.Items, item)
|
||||||
}
|
}
|
||||||
|
@ -140,15 +154,22 @@ WHERE "channel_id" = $1
|
||||||
return tl, err
|
return tl, err
|
||||||
}
|
}
|
||||||
|
|
||||||
tl.Paging.Before = first
|
// TODO: should only be set of there are more items available
|
||||||
tl.Paging.After = last
|
tl.Paging.Before = last
|
||||||
|
// tl.Paging.After = last
|
||||||
|
|
||||||
return tl, nil
|
return tl, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Count
|
// Count
|
||||||
func (p *postgresStream) Count() (int, error) {
|
func (p *postgresStream) Count() (int, error) {
|
||||||
rows, err := p.database.Query("SELECT COUNT(*) FROM items WHERE channel_id = ?", p.channel)
|
ctx := context.Background()
|
||||||
|
conn, err := p.database.Conn(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return -1, err
|
||||||
|
}
|
||||||
|
defer conn.Close()
|
||||||
|
rows, err := conn.QueryContext(context.Background(), "SELECT COUNT(*) FROM items WHERE channel_id = ?", p.channel)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
@ -167,6 +188,13 @@ func (p *postgresStream) Count() (int, error) {
|
||||||
|
|
||||||
// AddItem
|
// AddItem
|
||||||
func (p *postgresStream) AddItem(item microsub.Item) (bool, error) {
|
func (p *postgresStream) AddItem(item microsub.Item) (bool, error) {
|
||||||
|
ctx := context.Background()
|
||||||
|
conn, err := p.database.Conn(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
defer conn.Close()
|
||||||
|
|
||||||
t, err := time.Parse("2006-01-02T15:04:05Z0700", item.Published)
|
t, err := time.Parse("2006-01-02T15:04:05Z0700", item.Published)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t2, err := time.Parse("2006-01-02T15:04:05Z07:00", item.Published)
|
t2, err := time.Parse("2006-01-02T15:04:05Z07:00", item.Published)
|
||||||
|
@ -176,7 +204,7 @@ func (p *postgresStream) AddItem(item microsub.Item) (bool, error) {
|
||||||
t = t2
|
t = t2
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = p.database.Exec(`
|
_, err = conn.ExecContext(context.Background(), `
|
||||||
INSERT INTO "items" ("channel_id", "uid", "data", "published_at", "created_at")
|
INSERT INTO "items" ("channel_id", "uid", "data", "published_at", "created_at")
|
||||||
VALUES ($1, $2, $3, $4, DEFAULT)
|
VALUES ($1, $2, $3, $4, DEFAULT)
|
||||||
ON CONFLICT ON CONSTRAINT "items_uid_key" DO UPDATE SET "updated_at" = now()
|
ON CONFLICT ON CONSTRAINT "items_uid_key" DO UPDATE SET "updated_at" = now()
|
||||||
|
@ -189,7 +217,13 @@ ON CONFLICT ON CONSTRAINT "items_uid_key" DO UPDATE SET "updated_at" = now()
|
||||||
|
|
||||||
// MarkRead
|
// MarkRead
|
||||||
func (p *postgresStream) MarkRead(uids []string) error {
|
func (p *postgresStream) MarkRead(uids []string) error {
|
||||||
_, err := p.database.Exec(`UPDATE "items" SET is_read = 1 WHERE "uid" IN ($1)`, uids)
|
ctx := context.Background()
|
||||||
|
conn, err := p.database.Conn(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer conn.Close()
|
||||||
|
_, err = conn.ExecContext(context.Background(), `UPDATE "items" SET is_read = 1 WHERE "uid" IN ($1)`, uids)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("while marking as read: %w", err)
|
return fmt.Errorf("while marking as read: %w", err)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user