From 2c6f421e3e5d18f84f2e6d3e237b1109fa441a4b Mon Sep 17 00:00:00 2001 From: Peter Stuifzand Date: Sat, 7 Jul 2018 20:51:02 +0200 Subject: [PATCH] Set resubscribe_at as unix time --- cmd/eksterd/hubbackend.go | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/cmd/eksterd/hubbackend.go b/cmd/eksterd/hubbackend.go index 5882b7a..0b34778 100644 --- a/cmd/eksterd/hubbackend.go +++ b/cmd/eksterd/hubbackend.go @@ -45,8 +45,10 @@ func (h *hubIncomingBackend) GetSecret(id int64) string { func (h *hubIncomingBackend) CreateFeed(topic string, channel string) (int64, error) { conn := pool.Get() defer conn.Close() - id, err := redis.Int64(conn.Do("INCR", "feed:next_id")) + // TODO(peter): check if topic already is registered + + id, err := redis.Int64(conn.Do("INCR", "feed:next_id")) if err != nil { return 0, err } @@ -103,7 +105,7 @@ func (h *hubIncomingBackend) FeedSetLeaseSeconds(feedID int64, leaseSeconds int6 defer conn.Close() log.Printf("updating feed %d lease_seconds", feedID) - args := redis.Args{}.Add(fmt.Sprintf("feed:%d", feedID), "lease_seconds", leaseSeconds, "resubscribe_at", time.Now().Add(time.Duration(60*(leaseSeconds-15))*time.Second)) + args := redis.Args{}.Add(fmt.Sprintf("feed:%d", feedID), "lease_seconds", leaseSeconds, "resubscribe_at", time.Now().Add(time.Duration(60*(leaseSeconds-15))*time.Second).Unix()) _, err := conn.Do("HMSET", args...) if err != nil { log.Println(err) @@ -114,14 +116,14 @@ func (h *hubIncomingBackend) FeedSetLeaseSeconds(feedID int64, leaseSeconds int6 } type Feed struct { - ID int64 `redis:"id"` - Channel string `redis:"channel"` - URL string `redis:"url"` - Callback string `redis:"callback"` - Hub string `redis:"hub"` - Secret string `redis:"secret"` - LeaseSeconds int64 `redis:"lease_seconds"` - ResubscribeAt *time.Time `redis:"resubscribe_at"` + ID int64 `redis:"id"` + Channel string `redis:"channel"` + URL string `redis:"url"` + Callback string `redis:"callback"` + Hub string `redis:"hub"` + Secret string `redis:"secret"` + LeaseSeconds int64 `redis:"lease_seconds"` + ResubscribeAt int64 `redis:"resubscribe_at"` } func (h *hubIncomingBackend) GetFeeds() []Feed { @@ -180,7 +182,7 @@ func (h *hubIncomingBackend) run() error { case <-ticker.C: feeds := h.GetFeeds() for _, feed := range feeds { - if feed.ResubscribeAt == nil || time.Now().After(*feed.ResubscribeAt) { + if feed.ResubscribeAt == 0 || time.Now().After(time.Unix(feed.ResubscribeAt, 0)) { if feed.Callback == "" { feed.Callback = fmt.Sprintf("%s/incoming/%d", os.Getenv("EKSTER_BASEURL"), feed.ID) }