Set resubscribe_at as unix time
This commit is contained in:
parent
b291019093
commit
2c6f421e3e
|
@ -45,8 +45,10 @@ func (h *hubIncomingBackend) GetSecret(id int64) string {
|
||||||
func (h *hubIncomingBackend) CreateFeed(topic string, channel string) (int64, error) {
|
func (h *hubIncomingBackend) CreateFeed(topic string, channel string) (int64, error) {
|
||||||
conn := pool.Get()
|
conn := pool.Get()
|
||||||
defer conn.Close()
|
defer conn.Close()
|
||||||
id, err := redis.Int64(conn.Do("INCR", "feed:next_id"))
|
|
||||||
|
|
||||||
|
// TODO(peter): check if topic already is registered
|
||||||
|
|
||||||
|
id, err := redis.Int64(conn.Do("INCR", "feed:next_id"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
@ -103,7 +105,7 @@ func (h *hubIncomingBackend) FeedSetLeaseSeconds(feedID int64, leaseSeconds int6
|
||||||
defer conn.Close()
|
defer conn.Close()
|
||||||
log.Printf("updating feed %d lease_seconds", feedID)
|
log.Printf("updating feed %d lease_seconds", feedID)
|
||||||
|
|
||||||
args := redis.Args{}.Add(fmt.Sprintf("feed:%d", feedID), "lease_seconds", leaseSeconds, "resubscribe_at", time.Now().Add(time.Duration(60*(leaseSeconds-15))*time.Second))
|
args := redis.Args{}.Add(fmt.Sprintf("feed:%d", feedID), "lease_seconds", leaseSeconds, "resubscribe_at", time.Now().Add(time.Duration(60*(leaseSeconds-15))*time.Second).Unix())
|
||||||
_, err := conn.Do("HMSET", args...)
|
_, err := conn.Do("HMSET", args...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println(err)
|
log.Println(err)
|
||||||
|
@ -114,14 +116,14 @@ func (h *hubIncomingBackend) FeedSetLeaseSeconds(feedID int64, leaseSeconds int6
|
||||||
}
|
}
|
||||||
|
|
||||||
type Feed struct {
|
type Feed struct {
|
||||||
ID int64 `redis:"id"`
|
ID int64 `redis:"id"`
|
||||||
Channel string `redis:"channel"`
|
Channel string `redis:"channel"`
|
||||||
URL string `redis:"url"`
|
URL string `redis:"url"`
|
||||||
Callback string `redis:"callback"`
|
Callback string `redis:"callback"`
|
||||||
Hub string `redis:"hub"`
|
Hub string `redis:"hub"`
|
||||||
Secret string `redis:"secret"`
|
Secret string `redis:"secret"`
|
||||||
LeaseSeconds int64 `redis:"lease_seconds"`
|
LeaseSeconds int64 `redis:"lease_seconds"`
|
||||||
ResubscribeAt *time.Time `redis:"resubscribe_at"`
|
ResubscribeAt int64 `redis:"resubscribe_at"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *hubIncomingBackend) GetFeeds() []Feed {
|
func (h *hubIncomingBackend) GetFeeds() []Feed {
|
||||||
|
@ -180,7 +182,7 @@ func (h *hubIncomingBackend) run() error {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
feeds := h.GetFeeds()
|
feeds := h.GetFeeds()
|
||||||
for _, feed := range feeds {
|
for _, feed := range feeds {
|
||||||
if feed.ResubscribeAt == nil || time.Now().After(*feed.ResubscribeAt) {
|
if feed.ResubscribeAt == 0 || time.Now().After(time.Unix(feed.ResubscribeAt, 0)) {
|
||||||
if feed.Callback == "" {
|
if feed.Callback == "" {
|
||||||
feed.Callback = fmt.Sprintf("%s/incoming/%d", os.Getenv("EKSTER_BASEURL"), feed.ID)
|
feed.Callback = fmt.Sprintf("%s/incoming/%d", os.Getenv("EKSTER_BASEURL"), feed.ID)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user