diff --git a/cmd/hubserver/handler.go b/cmd/hubserver/handler.go index bec0250..9fa2eea 100644 --- a/cmd/hubserver/handler.go +++ b/cmd/hubserver/handler.go @@ -44,6 +44,11 @@ func (handler *subscriptionHandler) handlePublish(w http.ResponseWriter, r *http return errors.Wrap(err, "could not read body") } + err = handler.store.RemoveExpiredSubscribers() + if err != nil { + return errors.Wrap(err, "could not remove expired subscribers") + } + subs, err := handler.store.Subscribers(topic) if err != nil { return errors.Wrap(err, "could not get subscribers") diff --git a/cmd/hubserver/storage/postgres.go b/cmd/hubserver/storage/postgres.go index 5185519..a12f847 100644 --- a/cmd/hubserver/storage/postgres.go +++ b/cmd/hubserver/storage/postgres.go @@ -27,12 +27,13 @@ func (s *postgres) Close() error { func (s *postgres) Subscribe(topic string, sub Subscriber) error { _, err := s.db.Exec(` INSERT INTO "subscribers" - ("topic", "callback", "lease_seconds", "secret", "created") -VALUES ($1, $2, $3, $4, now()) + ("topic", "callback", "lease_seconds", "secret", "created", "updated") +VALUES ($1, $2, $3, $4, now(), now()) ON CONFLICT (topic, callback) DO UPDATE SET lease_seconds = excluded.lease_seconds, secret = excluded.secret, - updated = excluded.created + updated = now(), + last_subscribed = now() `, topic, sub.Callback, @@ -45,6 +46,11 @@ DO UPDATE SET lease_seconds = excluded.lease_seconds, return err } +func (s *postgres) RemoveExpiredSubscribers() error { + _, err := s.db.Exec(`DELETE FROM "subscribers" WHERE "last_subscribed" + "lease_seconds" * interval '1' second < now()`) + return err +} + func (s *postgres) Unsubscribe(topic, callback string) error { _, err := s.db.Exec( `DELETE FROM "subscribers" WHERE "topic" = $1 AND "callback" = $2`, diff --git a/cmd/hubserver/storage/storage.go b/cmd/hubserver/storage/storage.go index e381d86..28ce03d 100644 --- a/cmd/hubserver/storage/storage.go +++ b/cmd/hubserver/storage/storage.go @@ -2,13 +2,13 @@ package storage import ( "time" - ) type Service interface { Subscribe(topic string, subscriber Subscriber) error Unsubscribe(topic, callback string) error Subscribers(topic string) ([]Subscriber, error) + RemoveExpiredSubscribers() error Close() error } @@ -18,4 +18,3 @@ type Subscriber struct { Secret string Created time.Time } - diff --git a/db/04_timestamptz.sql b/db/04_timestamptz.sql new file mode 100644 index 0000000..8003128 --- /dev/null +++ b/db/04_timestamptz.sql @@ -0,0 +1,4 @@ +ALTER TABLE "subscribers" + ALTER COLUMN "created" TYPE timestamptz, + ALTER COLUMN "updated" TYPE timestamptz, + ADD COLUMN "last_subscribed" timestamptz;