Delete expired subscribers
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
parent
6c63ba052b
commit
9ce5f6e6c6
|
@ -44,6 +44,11 @@ func (handler *subscriptionHandler) handlePublish(w http.ResponseWriter, r *http
|
||||||
return errors.Wrap(err, "could not read body")
|
return errors.Wrap(err, "could not read body")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = handler.store.RemoveExpiredSubscribers()
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "could not remove expired subscribers")
|
||||||
|
}
|
||||||
|
|
||||||
subs, err := handler.store.Subscribers(topic)
|
subs, err := handler.store.Subscribers(topic)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "could not get subscribers")
|
return errors.Wrap(err, "could not get subscribers")
|
||||||
|
|
|
@ -27,12 +27,13 @@ func (s *postgres) Close() error {
|
||||||
func (s *postgres) Subscribe(topic string, sub Subscriber) error {
|
func (s *postgres) Subscribe(topic string, sub Subscriber) error {
|
||||||
_, err := s.db.Exec(`
|
_, err := s.db.Exec(`
|
||||||
INSERT INTO "subscribers"
|
INSERT INTO "subscribers"
|
||||||
("topic", "callback", "lease_seconds", "secret", "created")
|
("topic", "callback", "lease_seconds", "secret", "created", "updated")
|
||||||
VALUES ($1, $2, $3, $4, now())
|
VALUES ($1, $2, $3, $4, now(), now())
|
||||||
ON CONFLICT (topic, callback)
|
ON CONFLICT (topic, callback)
|
||||||
DO UPDATE SET lease_seconds = excluded.lease_seconds,
|
DO UPDATE SET lease_seconds = excluded.lease_seconds,
|
||||||
secret = excluded.secret,
|
secret = excluded.secret,
|
||||||
updated = excluded.created
|
updated = now(),
|
||||||
|
last_subscribed = now()
|
||||||
`,
|
`,
|
||||||
topic,
|
topic,
|
||||||
sub.Callback,
|
sub.Callback,
|
||||||
|
@ -45,6 +46,11 @@ DO UPDATE SET lease_seconds = excluded.lease_seconds,
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *postgres) RemoveExpiredSubscribers() error {
|
||||||
|
_, err := s.db.Exec(`DELETE FROM "subscribers" WHERE "last_subscribed" + "lease_seconds" * interval '1' second < now()`)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func (s *postgres) Unsubscribe(topic, callback string) error {
|
func (s *postgres) Unsubscribe(topic, callback string) error {
|
||||||
_, err := s.db.Exec(
|
_, err := s.db.Exec(
|
||||||
`DELETE FROM "subscribers" WHERE "topic" = $1 AND "callback" = $2`,
|
`DELETE FROM "subscribers" WHERE "topic" = $1 AND "callback" = $2`,
|
||||||
|
|
|
@ -2,13 +2,13 @@ package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type Service interface {
|
type Service interface {
|
||||||
Subscribe(topic string, subscriber Subscriber) error
|
Subscribe(topic string, subscriber Subscriber) error
|
||||||
Unsubscribe(topic, callback string) error
|
Unsubscribe(topic, callback string) error
|
||||||
Subscribers(topic string) ([]Subscriber, error)
|
Subscribers(topic string) ([]Subscriber, error)
|
||||||
|
RemoveExpiredSubscribers() error
|
||||||
Close() error
|
Close() error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -18,4 +18,3 @@ type Subscriber struct {
|
||||||
Secret string
|
Secret string
|
||||||
Created time.Time
|
Created time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
4
db/04_timestamptz.sql
Normal file
4
db/04_timestamptz.sql
Normal file
|
@ -0,0 +1,4 @@
|
||||||
|
ALTER TABLE "subscribers"
|
||||||
|
ALTER COLUMN "created" TYPE timestamptz,
|
||||||
|
ALTER COLUMN "updated" TYPE timestamptz,
|
||||||
|
ADD COLUMN "last_subscribed" timestamptz;
|
Loading…
Reference in New Issue
Block a user