Compare commits
11 Commits
f1483f4171
...
e782709911
Author | SHA1 | Date | |
---|---|---|---|
e782709911 | |||
2c6f421e3e | |||
b291019093 | |||
55b279f363 | |||
97a98bbfed | |||
e8796eae5a | |||
471ff2c564 | |||
c6902909b1 | |||
b8ec0f3700 | |||
c9ca63b7fa | |||
dfd9b51cc5 |
|
@ -267,7 +267,7 @@ Commands:
|
|||
log.Fatalf("An error occurred: %s\n", err)
|
||||
}
|
||||
for _, feed := range feeds {
|
||||
fmt.Println(feed.Name, " ", feed.URL)
|
||||
fmt.Println(feed.URL)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -353,11 +353,15 @@ func (b *memoryBackend) ProcessContent(channel, fetchURL, contentType string, bo
|
|||
for _, item := range items {
|
||||
item.Read = false
|
||||
err = b.channelAddItem(conn, channel, item)
|
||||
log.Printf("ERROR: %s\n", err)
|
||||
if err != nil {
|
||||
log.Printf("ERROR: %s\n", err)
|
||||
}
|
||||
}
|
||||
|
||||
err = b.updateChannelUnreadCount(conn, channel)
|
||||
log.Printf("error: while updating channel unread count for %s: %s\n", channel, err)
|
||||
if err != nil {
|
||||
log.Printf("error: while updating channel unread count for %s: %s\n", channel, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
200
cmd/eksterd/hubbackend.go
Normal file
200
cmd/eksterd/hubbackend.go
Normal file
|
@ -0,0 +1,200 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/garyburd/redigo/redis"
|
||||
"github.com/pstuifzand/ekster/pkg/util"
|
||||
"github.com/pstuifzand/ekster/pkg/websub"
|
||||
)
|
||||
|
||||
// LeaseSeconds is the default number of seconds we want the subscription to last
|
||||
const LeaseSeconds = 24 * 60 * 60
|
||||
|
||||
// HubBackend handles information for the incoming handler
|
||||
type HubBackend interface {
|
||||
GetFeeds() []Feed
|
||||
CreateFeed(url, channel string) (int64, error)
|
||||
GetSecret(feedID int64) string
|
||||
UpdateFeed(feedID int64, contentType string, body io.Reader) error
|
||||
FeedSetLeaseSeconds(feedID int64, leaseSeconds int64) error
|
||||
Subscribe(feed *Feed) error
|
||||
}
|
||||
|
||||
type hubIncomingBackend struct {
|
||||
backend *memoryBackend
|
||||
}
|
||||
|
||||
func (h *hubIncomingBackend) GetSecret(id int64) string {
|
||||
conn := pool.Get()
|
||||
defer conn.Close()
|
||||
secret, err := redis.String(conn.Do("HGET", fmt.Sprintf("feed:%d", id), "secret"))
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
return secret
|
||||
}
|
||||
|
||||
func (h *hubIncomingBackend) CreateFeed(topic string, channel string) (int64, error) {
|
||||
conn := pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
// TODO(peter): check if topic already is registered
|
||||
|
||||
id, err := redis.Int64(conn.Do("INCR", "feed:next_id"))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
conn.Do("HSET", fmt.Sprintf("feed:%d", id), "url", topic)
|
||||
conn.Do("HSET", fmt.Sprintf("feed:%d", id), "channel", channel)
|
||||
secret := util.RandStringBytes(16)
|
||||
conn.Do("HSET", fmt.Sprintf("feed:%d", id), "secret", secret)
|
||||
|
||||
client := &http.Client{}
|
||||
|
||||
hubURL, err := websub.GetHubURL(client, topic)
|
||||
if hubURL == "" {
|
||||
log.Printf("WebSub Hub URL not found for topic=%s\n", topic)
|
||||
} else {
|
||||
log.Printf("WebSub Hub URL found for topic=%s hub=%s\n", topic, hubURL)
|
||||
}
|
||||
|
||||
callbackURL := fmt.Sprintf("%s/incoming/%d", os.Getenv("EKSTER_BASEURL"), id)
|
||||
|
||||
if err == nil && hubURL != "" {
|
||||
args := redis.Args{}.Add(fmt.Sprintf("feed:%d", id), "hub", hubURL, "callback", callbackURL)
|
||||
conn.Do("HMSET", args...)
|
||||
} else {
|
||||
return id, nil
|
||||
}
|
||||
|
||||
websub.Subscribe(client, hubURL, topic, callbackURL, secret, 24*3600)
|
||||
|
||||
return id, nil
|
||||
}
|
||||
|
||||
func (h *hubIncomingBackend) UpdateFeed(feedID int64, contentType string, body io.Reader) error {
|
||||
conn := pool.Get()
|
||||
defer conn.Close()
|
||||
log.Printf("updating feed %d", feedID)
|
||||
u, err := redis.String(conn.Do("HGET", fmt.Sprintf("feed:%d", feedID), "url"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
channel, err := redis.String(conn.Do("HGET", fmt.Sprintf("feed:%d", feedID), "channel"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Printf("updating feed %d - %s %s\n", feedID, u, channel)
|
||||
h.backend.ProcessContent(channel, u, contentType, body)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (h *hubIncomingBackend) FeedSetLeaseSeconds(feedID int64, leaseSeconds int64) error {
|
||||
conn := pool.Get()
|
||||
defer conn.Close()
|
||||
log.Printf("updating feed %d lease_seconds", feedID)
|
||||
|
||||
args := redis.Args{}.Add(fmt.Sprintf("feed:%d", feedID), "lease_seconds", leaseSeconds, "resubscribe_at", time.Now().Add(time.Duration(60*(leaseSeconds-15))*time.Second).Unix())
|
||||
_, err := conn.Do("HMSET", args...)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type Feed struct {
|
||||
ID int64 `redis:"id"`
|
||||
Channel string `redis:"channel"`
|
||||
URL string `redis:"url"`
|
||||
Callback string `redis:"callback"`
|
||||
Hub string `redis:"hub"`
|
||||
Secret string `redis:"secret"`
|
||||
LeaseSeconds int64 `redis:"lease_seconds"`
|
||||
ResubscribeAt int64 `redis:"resubscribe_at"`
|
||||
}
|
||||
|
||||
func (h *hubIncomingBackend) GetFeeds() []Feed {
|
||||
conn := pool.Get()
|
||||
defer conn.Close()
|
||||
feeds := []Feed{}
|
||||
|
||||
feedKeys, err := redis.Strings(conn.Do("KEYS feed:*"))
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return feeds
|
||||
}
|
||||
|
||||
for _, feedKey := range feedKeys {
|
||||
var feed Feed
|
||||
values, err := redis.Values(conn.Do("HGETALL", feedKey))
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
continue
|
||||
}
|
||||
|
||||
err = redis.ScanStruct(values, &feed)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
continue
|
||||
}
|
||||
|
||||
if feed.ID == 0 {
|
||||
parts := strings.Split(feedKey, ":")
|
||||
if len(parts) == 2 {
|
||||
feed.ID, _ = strconv.ParseInt(parts[1], 10, 64)
|
||||
conn.Do("HPUT", feedKey, "id", feed.ID)
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("Websub feed: %#v\n", feed)
|
||||
|
||||
feeds = append(feeds, feed)
|
||||
}
|
||||
|
||||
return feeds
|
||||
}
|
||||
|
||||
func (h *hubIncomingBackend) Subscribe(feed *Feed) error {
|
||||
client := http.Client{}
|
||||
return websub.Subscribe(&client, feed.Hub, feed.URL, feed.Callback, feed.Secret, LeaseSeconds)
|
||||
}
|
||||
|
||||
func (h *hubIncomingBackend) run() error {
|
||||
ticker := time.NewTicker(10 * time.Minute)
|
||||
quit := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
feeds := h.GetFeeds()
|
||||
for _, feed := range feeds {
|
||||
if feed.ResubscribeAt == 0 || time.Now().After(time.Unix(feed.ResubscribeAt, 0)) {
|
||||
if feed.Callback == "" {
|
||||
feed.Callback = fmt.Sprintf("%s/incoming/%d", os.Getenv("EKSTER_BASEURL"), feed.ID)
|
||||
}
|
||||
h.Subscribe(&feed)
|
||||
}
|
||||
}
|
||||
case <-quit:
|
||||
ticker.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
|
@ -5,7 +5,6 @@ import (
|
|||
"crypto/hmac"
|
||||
"crypto/sha1"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
|
@ -14,13 +13,6 @@ import (
|
|||
"strings"
|
||||
)
|
||||
|
||||
// HubBackend handles information for the incoming handler
|
||||
type HubBackend interface {
|
||||
CreateFeed(url, channel string) (int64, error)
|
||||
GetSecret(id int64) string
|
||||
UpdateFeed(feedID int64, contentType string, body io.Reader) error
|
||||
}
|
||||
|
||||
type incomingHandler struct {
|
||||
Backend HubBackend
|
||||
}
|
||||
|
@ -37,10 +29,31 @@ func (h *incomingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
log.Println(r.URL.Query())
|
||||
log.Println(r.PostForm)
|
||||
|
||||
// find feed
|
||||
matches := urlRegex.FindStringSubmatch(r.URL.Path)
|
||||
feed, err := strconv.ParseInt(matches[1], 10, 64)
|
||||
if err != nil {
|
||||
fmt.Fprint(w, err)
|
||||
}
|
||||
|
||||
if r.Method == http.MethodGet {
|
||||
values := r.URL.Query()
|
||||
|
||||
// check
|
||||
if leaseStr := values.Get("hub.lease_seconds"); leaseStr != "" {
|
||||
// update lease_seconds
|
||||
|
||||
leaseSeconds, err := strconv.ParseInt(leaseStr, 10, 64)
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("error in hub.lease_seconds format %q: %s", leaseSeconds, err), 400)
|
||||
return
|
||||
}
|
||||
err = h.Backend.FeedSetLeaseSeconds(feed, leaseSeconds)
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("error in while setting hub.lease_seconds: %s", err), 400)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
verify := values.Get("hub.challenge")
|
||||
fmt.Fprint(w, verify)
|
||||
|
@ -53,13 +66,6 @@ func (h *incomingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
// find feed
|
||||
matches := urlRegex.FindStringSubmatch(r.URL.Path)
|
||||
feed, err := strconv.ParseInt(matches[1], 10, 64)
|
||||
if err != nil {
|
||||
fmt.Fprint(w, err)
|
||||
}
|
||||
|
||||
// find secret
|
||||
secret := h.Backend.GetSecret(feed)
|
||||
if secret == "" {
|
||||
|
@ -68,33 +74,15 @@ func (h *incomingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
// match signature
|
||||
sig := r.Header.Get("X-Hub-Signature")
|
||||
parts := strings.Split(sig, "=")
|
||||
|
||||
if len(parts) != 2 {
|
||||
log.Printf("signature format %d %#v\n", feed, parts)
|
||||
http.Error(w, "Signature format", 400)
|
||||
return
|
||||
}
|
||||
|
||||
if parts[0] != "sha1" {
|
||||
log.Printf("signature format %d %s\n", feed, sig)
|
||||
http.Error(w, "Unknown signature format", 400)
|
||||
return
|
||||
}
|
||||
|
||||
feedContent, err := ioutil.ReadAll(r.Body)
|
||||
|
||||
// verification
|
||||
mac := hmac.New(sha1.New, []byte(secret))
|
||||
mac.Write(feedContent)
|
||||
signature := mac.Sum(nil)
|
||||
|
||||
if fmt.Sprintf("%x", signature) != parts[1] {
|
||||
log.Printf("signature no match feed=%d %s %s\n", feed, signature, parts[1])
|
||||
http.Error(w, "Signature doesn't match", 400)
|
||||
return
|
||||
// match signature
|
||||
sig := r.Header.Get("X-Hub-Signature")
|
||||
if sig != "" {
|
||||
if err := isHubSignatureValid(sig, feedContent, secret); err != nil {
|
||||
http.Error(w, fmt.Sprintf("Error in signature: %s", err), 400)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
ct := r.Header.Get("Content-Type")
|
||||
|
@ -106,3 +94,26 @@ func (h *incomingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
|
||||
return
|
||||
}
|
||||
|
||||
func isHubSignatureValid(sig string, feedContent []byte, secret string) error {
|
||||
parts := strings.Split(sig, "=")
|
||||
|
||||
if len(parts) != 2 {
|
||||
return fmt.Errorf("signature format is not like sha1=signature")
|
||||
}
|
||||
|
||||
if parts[0] != "sha1" {
|
||||
return fmt.Errorf("signature format is not like sha1=signature")
|
||||
}
|
||||
|
||||
// verification
|
||||
mac := hmac.New(sha1.New, []byte(secret))
|
||||
mac.Write(feedContent)
|
||||
signature := mac.Sum(nil)
|
||||
|
||||
if fmt.Sprintf("%x", signature) != parts[1] {
|
||||
return fmt.Errorf("signature does not match feed %s %s", signature, parts[1])
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -20,7 +20,6 @@ package main
|
|||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
|
@ -29,8 +28,6 @@ import (
|
|||
|
||||
"github.com/garyburd/redigo/redis"
|
||||
"github.com/pstuifzand/ekster/pkg/microsub"
|
||||
"github.com/pstuifzand/ekster/pkg/util"
|
||||
"github.com/pstuifzand/ekster/pkg/websub"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -61,74 +58,6 @@ func (h *mainHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
http.NotFound(w, r)
|
||||
}
|
||||
|
||||
type hubIncomingBackend struct {
|
||||
backend *memoryBackend
|
||||
}
|
||||
|
||||
func (h *hubIncomingBackend) GetSecret(id int64) string {
|
||||
conn := pool.Get()
|
||||
defer conn.Close()
|
||||
secret, err := redis.String(conn.Do("HGET", fmt.Sprintf("feed:%d", id), "secret"))
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
return secret
|
||||
}
|
||||
|
||||
func (h *hubIncomingBackend) CreateFeed(topic string, channel string) (int64, error) {
|
||||
conn := pool.Get()
|
||||
defer conn.Close()
|
||||
id, err := redis.Int64(conn.Do("INCR", "feed:next_id"))
|
||||
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
conn.Do("HSET", fmt.Sprintf("feed:%d", id), "url", topic)
|
||||
conn.Do("HSET", fmt.Sprintf("feed:%d", id), "channel", channel)
|
||||
secret := util.RandStringBytes(16)
|
||||
conn.Do("HSET", fmt.Sprintf("feed:%d", id), "secret", secret)
|
||||
|
||||
client := &http.Client{}
|
||||
|
||||
hubURL, err := websub.GetHubURL(client, topic)
|
||||
if hubURL == "" {
|
||||
log.Printf("WebSub Hub URL not found for topic=%s\n", topic)
|
||||
} else {
|
||||
log.Printf("WebSub Hub URL found for topic=%s hub=%s\n", topic, hubURL)
|
||||
}
|
||||
|
||||
if err == nil && hubURL != "" {
|
||||
conn.Do("HSET", fmt.Sprintf("feed:%d", id), "hub", hubURL)
|
||||
} else {
|
||||
return id, nil
|
||||
}
|
||||
|
||||
callbackURL := fmt.Sprintf("%s/incoming/%d", os.Getenv("EKSTER_BASEURL"), id)
|
||||
websub.Subscribe(client, hubURL, topic, callbackURL, secret, 24*3600)
|
||||
|
||||
return id, nil
|
||||
}
|
||||
|
||||
func (h *hubIncomingBackend) UpdateFeed(feedID int64, contentType string, body io.Reader) error {
|
||||
conn := pool.Get()
|
||||
defer conn.Close()
|
||||
log.Printf("updating feed %d", feedID)
|
||||
u, err := redis.String(conn.Do("HGET", fmt.Sprintf("feed:%d", feedID), "url"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
channel, err := redis.String(conn.Do("HGET", fmt.Sprintf("feed:%d", feedID), "channel"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Printf("updating feed %d - %s %s\n", feedID, u, channel)
|
||||
h.backend.ProcessContent(channel, u, contentType, body)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func newPool(addr string) *redis.Pool {
|
||||
return &redis.Pool{
|
||||
MaxIdle: 3,
|
||||
|
|
|
@ -404,7 +404,7 @@ func (b *memoryBackend) TimelineGet(before, after, channel string) (microsub.Tim
|
|||
entries, err := fb.Entries()
|
||||
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
return microsub.Timeline{}, err
|
||||
}
|
||||
|
||||
feeds := make(map[int64]feedbin.Feed)
|
||||
|
@ -714,8 +714,6 @@ func (b *memoryBackend) MarkRead(channel string, uids []string) error {
|
|||
conn := pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
log.Printf("Marking read for %s %v\n", channel, uids)
|
||||
|
||||
itemUIDs := []string{}
|
||||
for _, uid := range uids {
|
||||
itemUIDs = append(itemUIDs, "item:"+uid)
|
||||
|
@ -725,27 +723,19 @@ func (b *memoryBackend) MarkRead(channel string, uids []string) error {
|
|||
args := redis.Args{}.Add(channelKey).AddFlat(itemUIDs)
|
||||
|
||||
if _, err := conn.Do("SADD", args...); err != nil {
|
||||
log.Printf("Marking read for channel %s has failed\n", channel)
|
||||
return err
|
||||
return fmt.Errorf("Marking read for channel %s has failed: %s", channel, err)
|
||||
}
|
||||
|
||||
zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)
|
||||
args = redis.Args{}.Add(zchannelKey).AddFlat(itemUIDs)
|
||||
|
||||
if _, err := conn.Do("ZREM", args...); err != nil {
|
||||
log.Printf("Marking read for channel %s has failed\n", channel)
|
||||
return err
|
||||
return fmt.Errorf("Marking read for channel %s has failed: %s", channel, err)
|
||||
}
|
||||
|
||||
unread, _ := redis.Int(conn.Do("ZCARD", zchannelKey))
|
||||
unread -= len(uids)
|
||||
|
||||
if ch, e := b.Channels[channel]; e {
|
||||
if unread < 0 {
|
||||
unread = 0
|
||||
}
|
||||
ch.Unread = unread
|
||||
b.Channels[channel] = ch
|
||||
err := b.updateChannelUnreadCount(conn, channel)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Printf("Marking read success for %s %v\n", channel, itemUIDs)
|
||||
|
|
|
@ -9,6 +9,8 @@ import (
|
|||
|
||||
"linkheader"
|
||||
"rss"
|
||||
|
||||
"willnorris.com/go/microformats"
|
||||
)
|
||||
|
||||
// GetHubURL finds the HubURL for topic
|
||||
|
@ -51,7 +53,8 @@ func parseBodyLinks(client *http.Client, topic string) (string, error) {
|
|||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if isFeedContentType(resp.Header.Get("Content-Type")) {
|
||||
contentType := resp.Header.Get("Content-Type")
|
||||
if isFeedContentType(contentType) {
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return "", err
|
||||
|
@ -64,7 +67,16 @@ func parseBodyLinks(client *http.Client, topic string) (string, error) {
|
|||
if feed.HubURL != "" {
|
||||
return feed.HubURL, nil
|
||||
}
|
||||
return "", fmt.Errorf("No hub url found in RSS feed")
|
||||
return "", fmt.Errorf("No WebSub hub url found in the RSS feed")
|
||||
} else if strings.HasPrefix(contentType, "text/html") {
|
||||
topicURL, _ := url.Parse(topic)
|
||||
md := microformats.Parse(resp.Body, topicURL)
|
||||
if hubs, e := md.Rels["hub"]; e {
|
||||
if len(hubs) >= 1 {
|
||||
return hubs[0], nil
|
||||
}
|
||||
}
|
||||
return "", fmt.Errorf("No WebSub hub url found in HTML <link> elements")
|
||||
}
|
||||
|
||||
return "", fmt.Errorf("Unknown content type of response: %s", resp.Header.Get("Content-Type"))
|
||||
|
|
Loading…
Reference in New Issue
Block a user