Cleanup subscribing and feeds
Add ID and URLs to feed, so we can easily resubscribe later
This commit is contained in:
parent
e8796eae5a
commit
97a98bbfed
|
@ -6,6 +6,8 @@ import (
|
|||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/garyburd/redigo/redis"
|
||||
|
@ -13,6 +15,19 @@ import (
|
|||
"github.com/pstuifzand/ekster/pkg/websub"
|
||||
)
|
||||
|
||||
// LeaseSeconds is the default number of seconds we want the subscription to last
|
||||
const LeaseSeconds = 24 * 60 * 60
|
||||
|
||||
// HubBackend handles information for the incoming handler
|
||||
type HubBackend interface {
|
||||
GetFeeds() []Feed
|
||||
CreateFeed(url, channel string) (int64, error)
|
||||
GetSecret(feedID int64) string
|
||||
UpdateFeed(feedID int64, contentType string, body io.Reader) error
|
||||
FeedSetLeaseSeconds(feedID int64, leaseSeconds int64) error
|
||||
Subscribe(feed *Feed) error
|
||||
}
|
||||
|
||||
type hubIncomingBackend struct {
|
||||
backend *memoryBackend
|
||||
}
|
||||
|
@ -88,12 +103,73 @@ func (h *hubIncomingBackend) FeedSetLeaseSeconds(feedID int64, leaseSeconds int6
|
|||
defer conn.Close()
|
||||
log.Printf("updating feed %d lease_seconds", feedID)
|
||||
|
||||
args := redis.Args{}.Add(fmt.Sprintf("feed:%d", feedID), "lease_seconds", leaseSeconds)
|
||||
conn.Do("HSET", args...)
|
||||
args := redis.Args{}.Add(fmt.Sprintf("feed:%d", feedID), "lease_seconds", leaseSeconds, "resubscribe_at", time.Now().Add(time.Duration(60*(leaseSeconds-15))*time.Second))
|
||||
_, err := conn.Do("HMSET", args...)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type Feed struct {
|
||||
ID int64 `redis:"id"`
|
||||
Channel string `redis:"channel"`
|
||||
URL string `redis:"url"`
|
||||
Callback string `redis:"callback"`
|
||||
Hub string `redis:"hub"`
|
||||
Secret string `redis:"secret"`
|
||||
LeaseSeconds int64 `redis:"lease_seconds"`
|
||||
ResubscribeAt *time.Time `redis:"resubscribe_at"`
|
||||
}
|
||||
|
||||
func (h *hubIncomingBackend) GetFeeds() []Feed {
|
||||
conn := pool.Get()
|
||||
defer conn.Close()
|
||||
feeds := []Feed{}
|
||||
|
||||
feedKeys, err := redis.Strings(conn.Do("KEYS feed:*"))
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return feeds
|
||||
}
|
||||
|
||||
for _, feedKey := range feedKeys {
|
||||
var feed Feed
|
||||
values, err := redis.Values(conn.Do("HGETALL", feedKey))
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
continue
|
||||
}
|
||||
|
||||
err = redis.ScanStruct(values, &feed)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
continue
|
||||
}
|
||||
|
||||
if feed.ID == 0 {
|
||||
parts := strings.Split(feedKey, ":")
|
||||
if len(parts) == 2 {
|
||||
feed.ID, _ = strconv.ParseInt(parts[1], 10, 64)
|
||||
conn.Do("HPUT", feedKey, "id", feed.ID)
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("Websub feed: %#v\n", feed)
|
||||
|
||||
feeds = append(feeds, feed)
|
||||
}
|
||||
|
||||
return feeds
|
||||
}
|
||||
|
||||
func (h *hubIncomingBackend) Subscribe(feed *Feed) error {
|
||||
client := http.Client{}
|
||||
return websub.Subscribe(&client, feed.Hub, feed.URL, feed.Callback, feed.Secret, LeaseSeconds)
|
||||
}
|
||||
|
||||
func (h *hubIncomingBackend) run() error {
|
||||
ticker := time.NewTicker(10 * time.Minute)
|
||||
quit := make(chan struct{})
|
||||
|
@ -102,6 +178,15 @@ func (h *hubIncomingBackend) run() error {
|
|||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
feeds := h.GetFeeds()
|
||||
for _, feed := range feeds {
|
||||
if feed.ResubscribeAt == nil || time.Now().After(*feed.ResubscribeAt) {
|
||||
if feed.Callback == "" {
|
||||
feed.Callback = fmt.Sprintf("%s/incoming/%d", os.Getenv("EKSTER_BASEURL"), feed.ID)
|
||||
}
|
||||
h.Subscribe(&feed)
|
||||
}
|
||||
}
|
||||
case <-quit:
|
||||
ticker.Stop()
|
||||
return
|
||||
|
|
|
@ -5,7 +5,6 @@ import (
|
|||
"crypto/hmac"
|
||||
"crypto/sha1"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
|
@ -14,14 +13,6 @@ import (
|
|||
"strings"
|
||||
)
|
||||
|
||||
// HubBackend handles information for the incoming handler
|
||||
type HubBackend interface {
|
||||
CreateFeed(url, channel string) (int64, error)
|
||||
GetSecret(feedID int64) string
|
||||
UpdateFeed(feedID int64, contentType string, body io.Reader) error
|
||||
FeedSetLeaseSeconds(feedID int64, leaseSeconds int64) error
|
||||
}
|
||||
|
||||
type incomingHandler struct {
|
||||
Backend HubBackend
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue
Block a user