Extract hubbackend.go

This commit is contained in:
Peter Stuifzand 2018-07-07 17:59:41 +02:00
parent 471ff2c564
commit e8796eae5a
3 changed files with 114 additions and 103 deletions

113
cmd/eksterd/hubbackend.go Normal file
View File

@ -0,0 +1,113 @@
package main
import (
"fmt"
"io"
"log"
"net/http"
"os"
"time"
"github.com/garyburd/redigo/redis"
"github.com/pstuifzand/ekster/pkg/util"
"github.com/pstuifzand/ekster/pkg/websub"
)
type hubIncomingBackend struct {
backend *memoryBackend
}
func (h *hubIncomingBackend) GetSecret(id int64) string {
conn := pool.Get()
defer conn.Close()
secret, err := redis.String(conn.Do("HGET", fmt.Sprintf("feed:%d", id), "secret"))
if err != nil {
return ""
}
return secret
}
func (h *hubIncomingBackend) CreateFeed(topic string, channel string) (int64, error) {
conn := pool.Get()
defer conn.Close()
id, err := redis.Int64(conn.Do("INCR", "feed:next_id"))
if err != nil {
return 0, err
}
conn.Do("HSET", fmt.Sprintf("feed:%d", id), "url", topic)
conn.Do("HSET", fmt.Sprintf("feed:%d", id), "channel", channel)
secret := util.RandStringBytes(16)
conn.Do("HSET", fmt.Sprintf("feed:%d", id), "secret", secret)
client := &http.Client{}
hubURL, err := websub.GetHubURL(client, topic)
if hubURL == "" {
log.Printf("WebSub Hub URL not found for topic=%s\n", topic)
} else {
log.Printf("WebSub Hub URL found for topic=%s hub=%s\n", topic, hubURL)
}
callbackURL := fmt.Sprintf("%s/incoming/%d", os.Getenv("EKSTER_BASEURL"), id)
if err == nil && hubURL != "" {
args := redis.Args{}.Add(fmt.Sprintf("feed:%d", id), "hub", hubURL, "callback", callbackURL)
conn.Do("HMSET", args...)
} else {
return id, nil
}
websub.Subscribe(client, hubURL, topic, callbackURL, secret, 24*3600)
return id, nil
}
func (h *hubIncomingBackend) UpdateFeed(feedID int64, contentType string, body io.Reader) error {
conn := pool.Get()
defer conn.Close()
log.Printf("updating feed %d", feedID)
u, err := redis.String(conn.Do("HGET", fmt.Sprintf("feed:%d", feedID), "url"))
if err != nil {
return err
}
channel, err := redis.String(conn.Do("HGET", fmt.Sprintf("feed:%d", feedID), "channel"))
if err != nil {
return err
}
log.Printf("updating feed %d - %s %s\n", feedID, u, channel)
h.backend.ProcessContent(channel, u, contentType, body)
return err
}
func (h *hubIncomingBackend) FeedSetLeaseSeconds(feedID int64, leaseSeconds int64) error {
conn := pool.Get()
defer conn.Close()
log.Printf("updating feed %d lease_seconds", feedID)
args := redis.Args{}.Add(fmt.Sprintf("feed:%d", feedID), "lease_seconds", leaseSeconds)
conn.Do("HSET", args...)
return nil
}
func (h *hubIncomingBackend) run() error {
ticker := time.NewTicker(10 * time.Minute)
quit := make(chan struct{})
go func() {
for {
select {
case <-ticker.C:
case <-quit:
ticker.Stop()
return
}
}
}()
return nil
}

View File

@ -17,7 +17,7 @@ import (
// HubBackend handles information for the incoming handler // HubBackend handles information for the incoming handler
type HubBackend interface { type HubBackend interface {
CreateFeed(url, channel string) (int64, error) CreateFeed(url, channel string) (int64, error)
GetSecret(id int64) string GetSecret(feedID int64) string
UpdateFeed(feedID int64, contentType string, body io.Reader) error UpdateFeed(feedID int64, contentType string, body io.Reader) error
FeedSetLeaseSeconds(feedID int64, leaseSeconds int64) error FeedSetLeaseSeconds(feedID int64, leaseSeconds int64) error
} }

View File

@ -20,7 +20,6 @@ package main
import ( import (
"flag" "flag"
"fmt" "fmt"
"io"
"log" "log"
"net/http" "net/http"
"os" "os"
@ -29,8 +28,6 @@ import (
"github.com/garyburd/redigo/redis" "github.com/garyburd/redigo/redis"
"github.com/pstuifzand/ekster/pkg/microsub" "github.com/pstuifzand/ekster/pkg/microsub"
"github.com/pstuifzand/ekster/pkg/util"
"github.com/pstuifzand/ekster/pkg/websub"
) )
var ( var (
@ -61,105 +58,6 @@ func (h *mainHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.NotFound(w, r) http.NotFound(w, r)
} }
type hubIncomingBackend struct {
backend *memoryBackend
}
func (h *hubIncomingBackend) GetSecret(id int64) string {
conn := pool.Get()
defer conn.Close()
secret, err := redis.String(conn.Do("HGET", fmt.Sprintf("feed:%d", id), "secret"))
if err != nil {
return ""
}
return secret
}
func (h *hubIncomingBackend) CreateFeed(topic string, channel string) (int64, error) {
conn := pool.Get()
defer conn.Close()
id, err := redis.Int64(conn.Do("INCR", "feed:next_id"))
if err != nil {
return 0, err
}
conn.Do("HSET", fmt.Sprintf("feed:%d", id), "url", topic)
conn.Do("HSET", fmt.Sprintf("feed:%d", id), "channel", channel)
secret := util.RandStringBytes(16)
conn.Do("HSET", fmt.Sprintf("feed:%d", id), "secret", secret)
client := &http.Client{}
hubURL, err := websub.GetHubURL(client, topic)
if hubURL == "" {
log.Printf("WebSub Hub URL not found for topic=%s\n", topic)
} else {
log.Printf("WebSub Hub URL found for topic=%s hub=%s\n", topic, hubURL)
}
callbackURL := fmt.Sprintf("%s/incoming/%d", os.Getenv("EKSTER_BASEURL"), id)
if err == nil && hubURL != "" {
args := redis.Args{}.Add(fmt.Sprintf("feed:%d", id), "hub", hubURL, "callback", callbackURL)
conn.Do("HMSET", args...)
} else {
return id, nil
}
websub.Subscribe(client, hubURL, topic, callbackURL, secret, 24*3600)
return id, nil
}
func (h *hubIncomingBackend) UpdateFeed(feedID int64, contentType string, body io.Reader) error {
conn := pool.Get()
defer conn.Close()
log.Printf("updating feed %d", feedID)
u, err := redis.String(conn.Do("HGET", fmt.Sprintf("feed:%d", feedID), "url"))
if err != nil {
return err
}
channel, err := redis.String(conn.Do("HGET", fmt.Sprintf("feed:%d", feedID), "channel"))
if err != nil {
return err
}
log.Printf("updating feed %d - %s %s\n", feedID, u, channel)
h.backend.ProcessContent(channel, u, contentType, body)
return err
}
func (h *hubIncomingBackend) FeedSetLeaseSeconds(feedID int64, leaseSeconds int64) error {
conn := pool.Get()
defer conn.Close()
log.Printf("updating feed %d lease_seconds", feedID)
args := redis.Args{}.Add(fmt.Sprintf("feed:%d", feedID), "lease_seconds", leaseSeconds)
conn.Do("HSET", args...)
return nil
}
func (h *hubIncomingBackend) run() error {
ticker := time.NewTicker(10 * time.Minute)
quit := make(chan struct{})
go func() {
for {
select {
case <-ticker.C:
case <-quit:
ticker.Stop()
return
}
}
}()
return nil
}
func newPool(addr string) *redis.Pool { func newPool(addr string) *redis.Pool {
return &redis.Pool{ return &redis.Pool{
MaxIdle: 3, MaxIdle: 3,