Compare commits

...

11 Commits

7 changed files with 278 additions and 132 deletions

View File

@ -267,7 +267,7 @@ Commands:
log.Fatalf("An error occurred: %s\n", err)
}
for _, feed := range feeds {
fmt.Println(feed.Name, " ", feed.URL)
fmt.Println(feed.URL)
}
}

View File

@ -353,11 +353,15 @@ func (b *memoryBackend) ProcessContent(channel, fetchURL, contentType string, bo
for _, item := range items {
item.Read = false
err = b.channelAddItem(conn, channel, item)
log.Printf("ERROR: %s\n", err)
if err != nil {
log.Printf("ERROR: %s\n", err)
}
}
err = b.updateChannelUnreadCount(conn, channel)
log.Printf("error: while updating channel unread count for %s: %s\n", channel, err)
if err != nil {
log.Printf("error: while updating channel unread count for %s: %s\n", channel, err)
}
return nil
}

200
cmd/eksterd/hubbackend.go Normal file
View File

@ -0,0 +1,200 @@
package main
import (
"fmt"
"io"
"log"
"net/http"
"os"
"strconv"
"strings"
"time"
"github.com/garyburd/redigo/redis"
"github.com/pstuifzand/ekster/pkg/util"
"github.com/pstuifzand/ekster/pkg/websub"
)
// LeaseSeconds is the default number of seconds we want the subscription to last
const LeaseSeconds = 24 * 60 * 60
// HubBackend handles information for the incoming handler
type HubBackend interface {
GetFeeds() []Feed
CreateFeed(url, channel string) (int64, error)
GetSecret(feedID int64) string
UpdateFeed(feedID int64, contentType string, body io.Reader) error
FeedSetLeaseSeconds(feedID int64, leaseSeconds int64) error
Subscribe(feed *Feed) error
}
type hubIncomingBackend struct {
backend *memoryBackend
}
func (h *hubIncomingBackend) GetSecret(id int64) string {
conn := pool.Get()
defer conn.Close()
secret, err := redis.String(conn.Do("HGET", fmt.Sprintf("feed:%d", id), "secret"))
if err != nil {
return ""
}
return secret
}
func (h *hubIncomingBackend) CreateFeed(topic string, channel string) (int64, error) {
conn := pool.Get()
defer conn.Close()
// TODO(peter): check if topic already is registered
id, err := redis.Int64(conn.Do("INCR", "feed:next_id"))
if err != nil {
return 0, err
}
conn.Do("HSET", fmt.Sprintf("feed:%d", id), "url", topic)
conn.Do("HSET", fmt.Sprintf("feed:%d", id), "channel", channel)
secret := util.RandStringBytes(16)
conn.Do("HSET", fmt.Sprintf("feed:%d", id), "secret", secret)
client := &http.Client{}
hubURL, err := websub.GetHubURL(client, topic)
if hubURL == "" {
log.Printf("WebSub Hub URL not found for topic=%s\n", topic)
} else {
log.Printf("WebSub Hub URL found for topic=%s hub=%s\n", topic, hubURL)
}
callbackURL := fmt.Sprintf("%s/incoming/%d", os.Getenv("EKSTER_BASEURL"), id)
if err == nil && hubURL != "" {
args := redis.Args{}.Add(fmt.Sprintf("feed:%d", id), "hub", hubURL, "callback", callbackURL)
conn.Do("HMSET", args...)
} else {
return id, nil
}
websub.Subscribe(client, hubURL, topic, callbackURL, secret, 24*3600)
return id, nil
}
func (h *hubIncomingBackend) UpdateFeed(feedID int64, contentType string, body io.Reader) error {
conn := pool.Get()
defer conn.Close()
log.Printf("updating feed %d", feedID)
u, err := redis.String(conn.Do("HGET", fmt.Sprintf("feed:%d", feedID), "url"))
if err != nil {
return err
}
channel, err := redis.String(conn.Do("HGET", fmt.Sprintf("feed:%d", feedID), "channel"))
if err != nil {
return err
}
log.Printf("updating feed %d - %s %s\n", feedID, u, channel)
h.backend.ProcessContent(channel, u, contentType, body)
return err
}
func (h *hubIncomingBackend) FeedSetLeaseSeconds(feedID int64, leaseSeconds int64) error {
conn := pool.Get()
defer conn.Close()
log.Printf("updating feed %d lease_seconds", feedID)
args := redis.Args{}.Add(fmt.Sprintf("feed:%d", feedID), "lease_seconds", leaseSeconds, "resubscribe_at", time.Now().Add(time.Duration(60*(leaseSeconds-15))*time.Second).Unix())
_, err := conn.Do("HMSET", args...)
if err != nil {
log.Println(err)
return err
}
return nil
}
type Feed struct {
ID int64 `redis:"id"`
Channel string `redis:"channel"`
URL string `redis:"url"`
Callback string `redis:"callback"`
Hub string `redis:"hub"`
Secret string `redis:"secret"`
LeaseSeconds int64 `redis:"lease_seconds"`
ResubscribeAt int64 `redis:"resubscribe_at"`
}
func (h *hubIncomingBackend) GetFeeds() []Feed {
conn := pool.Get()
defer conn.Close()
feeds := []Feed{}
feedKeys, err := redis.Strings(conn.Do("KEYS feed:*"))
if err != nil {
log.Println(err)
return feeds
}
for _, feedKey := range feedKeys {
var feed Feed
values, err := redis.Values(conn.Do("HGETALL", feedKey))
if err != nil {
log.Println(err)
continue
}
err = redis.ScanStruct(values, &feed)
if err != nil {
log.Println(err)
continue
}
if feed.ID == 0 {
parts := strings.Split(feedKey, ":")
if len(parts) == 2 {
feed.ID, _ = strconv.ParseInt(parts[1], 10, 64)
conn.Do("HPUT", feedKey, "id", feed.ID)
}
}
log.Printf("Websub feed: %#v\n", feed)
feeds = append(feeds, feed)
}
return feeds
}
func (h *hubIncomingBackend) Subscribe(feed *Feed) error {
client := http.Client{}
return websub.Subscribe(&client, feed.Hub, feed.URL, feed.Callback, feed.Secret, LeaseSeconds)
}
func (h *hubIncomingBackend) run() error {
ticker := time.NewTicker(10 * time.Minute)
quit := make(chan struct{})
go func() {
for {
select {
case <-ticker.C:
feeds := h.GetFeeds()
for _, feed := range feeds {
if feed.ResubscribeAt == 0 || time.Now().After(time.Unix(feed.ResubscribeAt, 0)) {
if feed.Callback == "" {
feed.Callback = fmt.Sprintf("%s/incoming/%d", os.Getenv("EKSTER_BASEURL"), feed.ID)
}
h.Subscribe(&feed)
}
}
case <-quit:
ticker.Stop()
return
}
}
}()
return nil
}

View File

@ -5,7 +5,6 @@ import (
"crypto/hmac"
"crypto/sha1"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
@ -14,13 +13,6 @@ import (
"strings"
)
// HubBackend handles information for the incoming handler
type HubBackend interface {
CreateFeed(url, channel string) (int64, error)
GetSecret(id int64) string
UpdateFeed(feedID int64, contentType string, body io.Reader) error
}
type incomingHandler struct {
Backend HubBackend
}
@ -37,10 +29,31 @@ func (h *incomingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log.Println(r.URL.Query())
log.Println(r.PostForm)
// find feed
matches := urlRegex.FindStringSubmatch(r.URL.Path)
feed, err := strconv.ParseInt(matches[1], 10, 64)
if err != nil {
fmt.Fprint(w, err)
}
if r.Method == http.MethodGet {
values := r.URL.Query()
// check
if leaseStr := values.Get("hub.lease_seconds"); leaseStr != "" {
// update lease_seconds
leaseSeconds, err := strconv.ParseInt(leaseStr, 10, 64)
if err != nil {
http.Error(w, fmt.Sprintf("error in hub.lease_seconds format %q: %s", leaseSeconds, err), 400)
return
}
err = h.Backend.FeedSetLeaseSeconds(feed, leaseSeconds)
if err != nil {
http.Error(w, fmt.Sprintf("error in while setting hub.lease_seconds: %s", err), 400)
return
}
}
verify := values.Get("hub.challenge")
fmt.Fprint(w, verify)
@ -53,13 +66,6 @@ func (h *incomingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
// find feed
matches := urlRegex.FindStringSubmatch(r.URL.Path)
feed, err := strconv.ParseInt(matches[1], 10, 64)
if err != nil {
fmt.Fprint(w, err)
}
// find secret
secret := h.Backend.GetSecret(feed)
if secret == "" {
@ -68,33 +74,15 @@ func (h *incomingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
// match signature
sig := r.Header.Get("X-Hub-Signature")
parts := strings.Split(sig, "=")
if len(parts) != 2 {
log.Printf("signature format %d %#v\n", feed, parts)
http.Error(w, "Signature format", 400)
return
}
if parts[0] != "sha1" {
log.Printf("signature format %d %s\n", feed, sig)
http.Error(w, "Unknown signature format", 400)
return
}
feedContent, err := ioutil.ReadAll(r.Body)
// verification
mac := hmac.New(sha1.New, []byte(secret))
mac.Write(feedContent)
signature := mac.Sum(nil)
if fmt.Sprintf("%x", signature) != parts[1] {
log.Printf("signature no match feed=%d %s %s\n", feed, signature, parts[1])
http.Error(w, "Signature doesn't match", 400)
return
// match signature
sig := r.Header.Get("X-Hub-Signature")
if sig != "" {
if err := isHubSignatureValid(sig, feedContent, secret); err != nil {
http.Error(w, fmt.Sprintf("Error in signature: %s", err), 400)
return
}
}
ct := r.Header.Get("Content-Type")
@ -106,3 +94,26 @@ func (h *incomingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
func isHubSignatureValid(sig string, feedContent []byte, secret string) error {
parts := strings.Split(sig, "=")
if len(parts) != 2 {
return fmt.Errorf("signature format is not like sha1=signature")
}
if parts[0] != "sha1" {
return fmt.Errorf("signature format is not like sha1=signature")
}
// verification
mac := hmac.New(sha1.New, []byte(secret))
mac.Write(feedContent)
signature := mac.Sum(nil)
if fmt.Sprintf("%x", signature) != parts[1] {
return fmt.Errorf("signature does not match feed %s %s", signature, parts[1])
}
return nil
}

View File

@ -20,7 +20,6 @@ package main
import (
"flag"
"fmt"
"io"
"log"
"net/http"
"os"
@ -29,8 +28,6 @@ import (
"github.com/garyburd/redigo/redis"
"github.com/pstuifzand/ekster/pkg/microsub"
"github.com/pstuifzand/ekster/pkg/util"
"github.com/pstuifzand/ekster/pkg/websub"
)
var (
@ -61,74 +58,6 @@ func (h *mainHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.NotFound(w, r)
}
type hubIncomingBackend struct {
backend *memoryBackend
}
func (h *hubIncomingBackend) GetSecret(id int64) string {
conn := pool.Get()
defer conn.Close()
secret, err := redis.String(conn.Do("HGET", fmt.Sprintf("feed:%d", id), "secret"))
if err != nil {
return ""
}
return secret
}
func (h *hubIncomingBackend) CreateFeed(topic string, channel string) (int64, error) {
conn := pool.Get()
defer conn.Close()
id, err := redis.Int64(conn.Do("INCR", "feed:next_id"))
if err != nil {
return 0, err
}
conn.Do("HSET", fmt.Sprintf("feed:%d", id), "url", topic)
conn.Do("HSET", fmt.Sprintf("feed:%d", id), "channel", channel)
secret := util.RandStringBytes(16)
conn.Do("HSET", fmt.Sprintf("feed:%d", id), "secret", secret)
client := &http.Client{}
hubURL, err := websub.GetHubURL(client, topic)
if hubURL == "" {
log.Printf("WebSub Hub URL not found for topic=%s\n", topic)
} else {
log.Printf("WebSub Hub URL found for topic=%s hub=%s\n", topic, hubURL)
}
if err == nil && hubURL != "" {
conn.Do("HSET", fmt.Sprintf("feed:%d", id), "hub", hubURL)
} else {
return id, nil
}
callbackURL := fmt.Sprintf("%s/incoming/%d", os.Getenv("EKSTER_BASEURL"), id)
websub.Subscribe(client, hubURL, topic, callbackURL, secret, 24*3600)
return id, nil
}
func (h *hubIncomingBackend) UpdateFeed(feedID int64, contentType string, body io.Reader) error {
conn := pool.Get()
defer conn.Close()
log.Printf("updating feed %d", feedID)
u, err := redis.String(conn.Do("HGET", fmt.Sprintf("feed:%d", feedID), "url"))
if err != nil {
return err
}
channel, err := redis.String(conn.Do("HGET", fmt.Sprintf("feed:%d", feedID), "channel"))
if err != nil {
return err
}
log.Printf("updating feed %d - %s %s\n", feedID, u, channel)
h.backend.ProcessContent(channel, u, contentType, body)
return err
}
func newPool(addr string) *redis.Pool {
return &redis.Pool{
MaxIdle: 3,

View File

@ -404,7 +404,7 @@ func (b *memoryBackend) TimelineGet(before, after, channel string) (microsub.Tim
entries, err := fb.Entries()
if err != nil {
log.Fatal(err)
return microsub.Timeline{}, err
}
feeds := make(map[int64]feedbin.Feed)
@ -714,8 +714,6 @@ func (b *memoryBackend) MarkRead(channel string, uids []string) error {
conn := pool.Get()
defer conn.Close()
log.Printf("Marking read for %s %v\n", channel, uids)
itemUIDs := []string{}
for _, uid := range uids {
itemUIDs = append(itemUIDs, "item:"+uid)
@ -725,27 +723,19 @@ func (b *memoryBackend) MarkRead(channel string, uids []string) error {
args := redis.Args{}.Add(channelKey).AddFlat(itemUIDs)
if _, err := conn.Do("SADD", args...); err != nil {
log.Printf("Marking read for channel %s has failed\n", channel)
return err
return fmt.Errorf("Marking read for channel %s has failed: %s", channel, err)
}
zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)
args = redis.Args{}.Add(zchannelKey).AddFlat(itemUIDs)
if _, err := conn.Do("ZREM", args...); err != nil {
log.Printf("Marking read for channel %s has failed\n", channel)
return err
return fmt.Errorf("Marking read for channel %s has failed: %s", channel, err)
}
unread, _ := redis.Int(conn.Do("ZCARD", zchannelKey))
unread -= len(uids)
if ch, e := b.Channels[channel]; e {
if unread < 0 {
unread = 0
}
ch.Unread = unread
b.Channels[channel] = ch
err := b.updateChannelUnreadCount(conn, channel)
if err != nil {
return err
}
log.Printf("Marking read success for %s %v\n", channel, itemUIDs)

View File

@ -9,6 +9,8 @@ import (
"linkheader"
"rss"
"willnorris.com/go/microformats"
)
// GetHubURL finds the HubURL for topic
@ -51,7 +53,8 @@ func parseBodyLinks(client *http.Client, topic string) (string, error) {
}
defer resp.Body.Close()
if isFeedContentType(resp.Header.Get("Content-Type")) {
contentType := resp.Header.Get("Content-Type")
if isFeedContentType(contentType) {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "", err
@ -64,7 +67,16 @@ func parseBodyLinks(client *http.Client, topic string) (string, error) {
if feed.HubURL != "" {
return feed.HubURL, nil
}
return "", fmt.Errorf("No hub url found in RSS feed")
return "", fmt.Errorf("No WebSub hub url found in the RSS feed")
} else if strings.HasPrefix(contentType, "text/html") {
topicURL, _ := url.Parse(topic)
md := microformats.Parse(resp.Body, topicURL)
if hubs, e := md.Rels["hub"]; e {
if len(hubs) >= 1 {
return hubs[0], nil
}
}
return "", fmt.Errorf("No WebSub hub url found in HTML <link> elements")
}
return "", fmt.Errorf("Unknown content type of response: %s", resp.Header.Get("Content-Type"))