diff --git a/cmd/eksterd/hubbackend.go b/cmd/eksterd/hubbackend.go index e532955..5882b7a 100644 --- a/cmd/eksterd/hubbackend.go +++ b/cmd/eksterd/hubbackend.go @@ -6,6 +6,8 @@ import ( "log" "net/http" "os" + "strconv" + "strings" "time" "github.com/garyburd/redigo/redis" @@ -13,6 +15,19 @@ import ( "github.com/pstuifzand/ekster/pkg/websub" ) +// LeaseSeconds is the default number of seconds we want the subscription to last +const LeaseSeconds = 24 * 60 * 60 + +// HubBackend handles information for the incoming handler +type HubBackend interface { + GetFeeds() []Feed + CreateFeed(url, channel string) (int64, error) + GetSecret(feedID int64) string + UpdateFeed(feedID int64, contentType string, body io.Reader) error + FeedSetLeaseSeconds(feedID int64, leaseSeconds int64) error + Subscribe(feed *Feed) error +} + type hubIncomingBackend struct { backend *memoryBackend } @@ -88,12 +103,73 @@ func (h *hubIncomingBackend) FeedSetLeaseSeconds(feedID int64, leaseSeconds int6 defer conn.Close() log.Printf("updating feed %d lease_seconds", feedID) - args := redis.Args{}.Add(fmt.Sprintf("feed:%d", feedID), "lease_seconds", leaseSeconds) - conn.Do("HSET", args...) + args := redis.Args{}.Add(fmt.Sprintf("feed:%d", feedID), "lease_seconds", leaseSeconds, "resubscribe_at", time.Now().Add(time.Duration(60*(leaseSeconds-15))*time.Second)) + _, err := conn.Do("HMSET", args...) + if err != nil { + log.Println(err) + return err + } return nil } +type Feed struct { + ID int64 `redis:"id"` + Channel string `redis:"channel"` + URL string `redis:"url"` + Callback string `redis:"callback"` + Hub string `redis:"hub"` + Secret string `redis:"secret"` + LeaseSeconds int64 `redis:"lease_seconds"` + ResubscribeAt *time.Time `redis:"resubscribe_at"` +} + +func (h *hubIncomingBackend) GetFeeds() []Feed { + conn := pool.Get() + defer conn.Close() + feeds := []Feed{} + + feedKeys, err := redis.Strings(conn.Do("KEYS feed:*")) + if err != nil { + log.Println(err) + return feeds + } + + for _, feedKey := range feedKeys { + var feed Feed + values, err := redis.Values(conn.Do("HGETALL", feedKey)) + if err != nil { + log.Println(err) + continue + } + + err = redis.ScanStruct(values, &feed) + if err != nil { + log.Println(err) + continue + } + + if feed.ID == 0 { + parts := strings.Split(feedKey, ":") + if len(parts) == 2 { + feed.ID, _ = strconv.ParseInt(parts[1], 10, 64) + conn.Do("HPUT", feedKey, "id", feed.ID) + } + } + + log.Printf("Websub feed: %#v\n", feed) + + feeds = append(feeds, feed) + } + + return feeds +} + +func (h *hubIncomingBackend) Subscribe(feed *Feed) error { + client := http.Client{} + return websub.Subscribe(&client, feed.Hub, feed.URL, feed.Callback, feed.Secret, LeaseSeconds) +} + func (h *hubIncomingBackend) run() error { ticker := time.NewTicker(10 * time.Minute) quit := make(chan struct{}) @@ -102,6 +178,15 @@ func (h *hubIncomingBackend) run() error { for { select { case <-ticker.C: + feeds := h.GetFeeds() + for _, feed := range feeds { + if feed.ResubscribeAt == nil || time.Now().After(*feed.ResubscribeAt) { + if feed.Callback == "" { + feed.Callback = fmt.Sprintf("%s/incoming/%d", os.Getenv("EKSTER_BASEURL"), feed.ID) + } + h.Subscribe(&feed) + } + } case <-quit: ticker.Stop() return diff --git a/cmd/eksterd/incoming.go b/cmd/eksterd/incoming.go index 056b29b..a6ef9ea 100644 --- a/cmd/eksterd/incoming.go +++ b/cmd/eksterd/incoming.go @@ -5,7 +5,6 @@ import ( "crypto/hmac" "crypto/sha1" "fmt" - "io" "io/ioutil" "log" "net/http" @@ -14,14 +13,6 @@ import ( "strings" ) -// HubBackend handles information for the incoming handler -type HubBackend interface { - CreateFeed(url, channel string) (int64, error) - GetSecret(feedID int64) string - UpdateFeed(feedID int64, contentType string, body io.Reader) error - FeedSetLeaseSeconds(feedID int64, leaseSeconds int64) error -} - type incomingHandler struct { Backend HubBackend }