Compare commits

..

No commits in common. "89a50821d4972e6bf770bf61657035d62e4c6dac" and "fa61569e565ca42d88b067ea5c4939a2f48a904b" have entirely different histories.

4 changed files with 136 additions and 110 deletions

View File

@ -39,7 +39,7 @@ type TokenResponse struct {
var authHeaderRegex = regexp.MustCompile("^Bearer (.+)$")
func (h *microsubHandler) cachedCheckAuthToken(conn redis.Conn, header string, r *TokenResponse) bool {
func (h *microsubHandler) cachedCheckAuthToken(header string, r *TokenResponse) bool {
log.Println("Cached checking Auth Token")
tokens := authHeaderRegex.FindStringSubmatch(header)
@ -51,7 +51,7 @@ func (h *microsubHandler) cachedCheckAuthToken(conn redis.Conn, header string, r
var err error
values, err := redis.Values(conn.Do("HGETALL", key))
values, err := redis.Values(h.Redis.Do("HGETALL", key))
if err == nil && len(values) > 0 {
if err = redis.ScanStruct(values, r); err == nil {
return true
@ -65,16 +65,16 @@ func (h *microsubHandler) cachedCheckAuthToken(conn redis.Conn, header string, r
if authorized {
fmt.Printf("Token response: %#v\n", r)
_, err = conn.Do("HMSET", redis.Args{}.Add(key).AddFlat(r)...)
_, err = h.Redis.Do("HMSET", redis.Args{}.Add(key).AddFlat(r)...)
if err != nil {
log.Printf("Error while setting token: %v\n", err)
return authorized
}
_, err = conn.Do("EXPIRE", key, uint64(10*time.Minute/time.Second))
_, err = h.Redis.Do("EXPIRE", key, uint64(10*time.Minute/time.Second))
if err != nil {
log.Printf("Error while setting expire on token: %v\n", err)
log.Println("Deleting token")
_, err = conn.Do("DEL", key)
_, err = h.Redis.Do("DEL", key)
if err != nil {
log.Printf("Deleting token failed: %v", err)
}

View File

@ -101,6 +101,7 @@ func main() {
http.Handle("/microsub", &microsubHandler{
Backend: backend,
HubIncomingBackend: &hubBackend,
Redis: nil,
})
http.Handle("/incoming/", &incomingHandler{
Backend: &hubBackend,

View File

@ -27,11 +27,12 @@ import (
"net/http"
"net/url"
"os"
"reflect"
"regexp"
"strings"
"sync"
"time"
"p83.nl/go/ekster/pkg/feedbin"
"p83.nl/go/ekster/pkg/fetch"
"p83.nl/go/ekster/pkg/microsub"
@ -40,12 +41,10 @@ import (
)
type memoryBackend struct {
lock sync.RWMutex
Channels map[string]microsub.Channel
Feeds map[string][]microsub.Feed
Settings map[string]channelSetting
NextUid int
Channels map[string]microsub.Channel
Feeds map[string][]microsub.Feed
Settings map[string]channelSetting
NextUid int
Me string
TokenEndpoint string
@ -76,8 +75,6 @@ func (f *fetch2) Fetch(url string) (*http.Response, error) {
}
func (b *memoryBackend) Debug() {
b.lock.RLock()
defer b.lock.RUnlock()
fmt.Println(b.Channels)
fmt.Println(b.Feeds)
fmt.Println(b.Settings)
@ -103,15 +100,22 @@ func (b *memoryBackend) load() error {
conn.Do("DEL", "channels")
b.lock.RLock()
defer b.lock.RUnlock()
for uid, channel := range b.Channels {
log.Printf("loading channel %s - %s\n", uid, channel.Name)
// for _, feed := range b.Feeds[uid] {
// log.Printf("- loading feed %s\n", feed.URL)
// resp, err := b.Fetch3(uid, feed.URL)
// if err != nil {
// log.Printf("Error while Fetch3 of %s: %v\n", feed.URL, err)
// continue
// }
// defer resp.Body.Close()
// b.ProcessContent(uid, feed.URL, resp.Header.Get("Content-Type"), resp.Body)
// }
conn.Do("SADD", "channels", uid)
conn.Do("SETNX", "channel_sortorder_"+uid, 99999)
}
return nil
}
@ -121,8 +125,6 @@ func (b *memoryBackend) save() {
defer f.Close()
jw := json.NewEncoder(f)
jw.SetIndent("", " ")
b.lock.RLock()
defer b.lock.RUnlock()
jw.Encode(b)
}
@ -139,24 +141,20 @@ func loadMemoryBackend() microsub.Microsub {
func createMemoryBackend() microsub.Microsub {
backend := memoryBackend{}
backend.lock.Lock()
defer backend.save()
backend.Channels = make(map[string]microsub.Channel)
backend.Feeds = make(map[string][]microsub.Feed)
channels := []microsub.Channel{
{UID: "notifications", Name: "Notifications"},
{UID: "home", Name: "Home"},
microsub.Channel{UID: "notifications", Name: "Notifications"},
microsub.Channel{UID: "home", Name: "Home"},
}
for _, c := range channels {
backend.Channels[c.UID] = c
}
backend.NextUid = 1000000
backend.Me = "https://example.com/"
backend.lock.Unlock()
backend.save()
log.Println(`Config file "backend.json" is created in the current directory.`)
log.Println(`Update "Me" variable to your website address "https://example.com/"`)
log.Println(`Update "TokenEndpoint" variable to the address of your token endpoint "https://example.com/token"`)
@ -168,10 +166,7 @@ func (b *memoryBackend) ChannelsGetList() ([]microsub.Channel, error) {
conn := pool.Get()
defer conn.Close()
b.lock.RLock()
defer b.lock.RUnlock()
var channels []microsub.Channel
channels := []microsub.Channel{}
uids, err := redis.Strings(conn.Do("SORT", "channels", "BY", "channel_sortorder_*", "ASC"))
if err != nil {
log.Printf("Sorting channels failed: %v\n", err)
@ -201,11 +196,9 @@ func (b *memoryBackend) ChannelsCreate(name string) (microsub.Channel, error) {
Name: name,
}
b.lock.Lock()
b.Channels[channel.UID] = channel
b.Feeds[channel.UID] = []microsub.Feed{}
b.NextUid++
b.lock.Unlock()
conn.Do("SADD", "channels", uid)
conn.Do("SETNX", "channel_sortorder_"+uid, 99999)
@ -216,24 +209,11 @@ func (b *memoryBackend) ChannelsCreate(name string) (microsub.Channel, error) {
// ChannelsUpdate updates a channels
func (b *memoryBackend) ChannelsUpdate(uid, name string) (microsub.Channel, error) {
defer b.save()
b.lock.RLock()
defer b.lock.RUnlock()
b.lock.RLock()
c, e := b.Channels[uid]
b.lock.RUnlock()
if e {
if c, e := b.Channels[uid]; e {
c.Name = name
b.lock.Lock()
b.Channels[uid] = c
b.lock.Unlock()
return c, nil
}
return microsub.Channel{}, fmt.Errorf("Channel %s does not exist", uid)
}
@ -247,26 +227,12 @@ func (b *memoryBackend) ChannelsDelete(uid string) error {
conn.Do("SREM", "channels", uid)
conn.Do("DEL", "channel_sortorder_"+uid)
b.lock.Lock()
delete(b.Channels, uid)
delete(b.Feeds, uid)
b.lock.Unlock()
return nil
}
func (b *memoryBackend) getFeeds() map[string][]string {
feeds := make(map[string][]string)
b.lock.RLock()
for uid := range b.Channels {
for _, feed := range b.Feeds[uid] {
feeds[uid] = append(feeds[uid], feed.URL)
}
}
b.lock.RUnlock()
return feeds
}
func (b *memoryBackend) run() {
b.ticker = time.NewTicker(10 * time.Minute)
b.quit = make(chan struct{})
@ -275,20 +241,17 @@ func (b *memoryBackend) run() {
for {
select {
case <-b.ticker.C:
feeds := b.getFeeds()
for uid := range feeds {
for _, feedURL := range feeds[uid] {
resp, err := b.Fetch3(uid, feedURL)
for uid := range b.Channels {
for _, feed := range b.Feeds[uid] {
resp, err := b.Fetch3(uid, feed.URL)
if err != nil {
log.Printf("Error while Fetch3 of %s: %v\n", feedURL, err)
log.Printf("Error while Fetch3 of %s: %v\n", feed.URL, err)
continue
}
defer resp.Body.Close()
b.ProcessContent(uid, feedURL, resp.Header.Get("Content-Type"), resp.Body)
b.ProcessContent(uid, feed.URL, resp.Header.Get("Content-Type"), resp.Body)
}
}
case <-b.quit:
b.ticker.Stop()
return
@ -301,6 +264,44 @@ func (b *memoryBackend) TimelineGet(before, after, channel string) (microsub.Tim
conn := pool.Get()
defer conn.Close()
if channel == "feedbin" {
fb := feedbin.New(os.Getenv("FEEDBIN_USER"), os.Getenv("FEEDBIN_PASS"))
entries, err := fb.Entries()
if err != nil {
return microsub.Timeline{}, err
}
feeds := make(map[int64]feedbin.Feed)
var items []microsub.Item
for _, entry := range entries {
var item microsub.Item
var feed feedbin.Feed
e := false
if feed, e = feeds[entry.FeedID]; !e {
feeds[entry.FeedID], _ = fb.Feed(entry.FeedID)
feed = feeds[entry.FeedID]
}
item.Type = "entry"
item.Name = entry.Title
item.Content = &microsub.Content{HTML: entry.Content}
item.URL = entry.URL
item.Published = entry.Published.Format(time.RFC3339)
item.Author = &microsub.Card{Type: "card", Name: feed.Title, URL: feed.SiteURL}
items = append(items, item)
}
return microsub.Timeline{
Paging: microsub.Pagination{},
Items: items,
}, nil
}
log.Printf("TimelineGet %s\n", channel)
feeds, err := b.FollowGetList(channel)
if err != nil {
@ -308,9 +309,19 @@ func (b *memoryBackend) TimelineGet(before, after, channel string) (microsub.Tim
}
log.Println(feeds)
var items []microsub.Item
items := []microsub.Item{}
zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)
// channelKey := fmt.Sprintf("channel:%s:posts", channel)
// itemJsons, err := redis.ByteSlices(conn.Do("SORT", channelKey, "BY", "*->Published", "GET", "*->Data", "ASC", "ALPHA"))
// if err != nil {
// log.Println(err)
// return microsub.Timeline{
// Paging: microsub.Pagination{},
// Items: items,
// }
// }
afterScore := "-inf"
if len(after) != 0 {
@ -321,7 +332,7 @@ func (b *memoryBackend) TimelineGet(before, after, channel string) (microsub.Tim
beforeScore = "(" + before
}
var itemJSONs [][]byte
itemJSONs := [][]byte{}
itemScores, err := redis.Strings(
conn.Do(
@ -380,9 +391,41 @@ func (b *memoryBackend) TimelineGet(before, after, channel string) (microsub.Tim
}, nil
}
// panic if s is not a slice
func reverseSlice(s interface{}) {
size := reflect.ValueOf(s).Len()
swap := reflect.Swapper(s)
for i, j := 0, size-1; i < j; i, j = i+1, j-1 {
swap(i, j)
}
}
// func (b *memoryBackend) checkRead(channel string, uid string) bool {
// conn := pool.Get()
// defer conn.Close()
// args := redis.Args{}.Add(fmt.Sprintf("timeline:%s:read", channel)).Add("item:" + uid)
// member, err := redis.Bool(conn.Do("SISMEMBER", args...))
// if err != nil {
// log.Printf("Checking read for channel %s item %s has failed\n", channel, uid)
// }
// return member
// }
// func (b *memoryBackend) wasRead(channel string, item map[string]interface{}) bool {
// if uid, e := item["uid"]; e {
// uid = hex.EncodeToString([]byte(uid.(string)))
// return b.checkRead(channel, uid.(string))
// }
// if uid, e := item["url"]; e {
// uid = hex.EncodeToString([]byte(uid.(string)))
// return b.checkRead(channel, uid.(string))
// }
// return false
// }
func (b *memoryBackend) FollowGetList(uid string) ([]microsub.Feed, error) {
b.lock.RLock()
defer b.lock.RUnlock()
return b.Feeds[uid], nil
}
@ -396,9 +439,7 @@ func (b *memoryBackend) FollowURL(uid string, url string) (microsub.Feed, error)
}
defer resp.Body.Close()
b.lock.Lock()
b.Feeds[uid] = append(b.Feeds[uid], feed)
b.lock.Unlock()
b.ProcessContent(uid, feed.URL, resp.Header.Get("Content-Type"), resp.Body)
@ -408,7 +449,6 @@ func (b *memoryBackend) FollowURL(uid string, url string) (microsub.Feed, error)
func (b *memoryBackend) UnfollowURL(uid string, url string) error {
defer b.save()
index := -1
b.lock.Lock()
for i, f := range b.Feeds[uid] {
if f.URL == url {
index = i
@ -419,7 +459,6 @@ func (b *memoryBackend) UnfollowURL(uid string, url string) error {
feeds := b.Feeds[uid]
b.Feeds[uid] = append(feeds[:index], feeds[index+1:]...)
}
b.lock.Unlock()
return nil
}
@ -463,7 +502,7 @@ func getPossibleURLs(query string) []string {
func (b *memoryBackend) Search(query string) ([]microsub.Feed, error) {
urls := getPossibleURLs(query)
var feeds []microsub.Feed
feeds := []microsub.Feed{}
for _, u := range urls {
log.Println(u)
@ -612,12 +651,7 @@ func (b *memoryBackend) channelAddItemWithMatcher(conn redis.Conn, channel strin
// - add item to channel
var updatedChannels []string
b.lock.RLock()
settings := b.Settings
b.lock.RUnlock()
for channelKey, setting := range settings {
for channelKey, setting := range b.Settings {
if setting.IncludeRegex != "" {
re, err := regexp.Compile(setting.IncludeRegex)
if err != nil {
@ -639,19 +673,17 @@ func (b *memoryBackend) channelAddItemWithMatcher(conn redis.Conn, channel strin
}
// Check for the exclude regex
b.lock.RLock()
setting, exists := b.Settings[channel]
b.lock.RUnlock()
if exists && setting.ExcludeRegex != "" {
excludeRegex, err := regexp.Compile(setting.ExcludeRegex)
if err != nil {
log.Printf("error in regexp: %q\n", excludeRegex)
return nil
}
if matchItem(item, excludeRegex) {
log.Printf("Excluded %#v\n", item)
return nil
if setting, e := b.Settings[channel]; e {
if setting.ExcludeRegex != "" {
excludeRegex, err := regexp.Compile(setting.ExcludeRegex)
if err != nil {
log.Printf("error in regexp: %q\n", excludeRegex)
return nil
}
if matchItem(item, excludeRegex) {
log.Printf("Excluded %#v\n", item)
return nil
}
}
}
@ -734,11 +766,7 @@ func (b *memoryBackend) channelAddItem(conn redis.Conn, channel string, item mic
}
func (b *memoryBackend) updateChannelUnreadCount(conn redis.Conn, channel string) error {
b.lock.RLock()
c, exists := b.Channels[channel]
b.lock.RUnlock()
if exists {
if c, e := b.Channels[channel]; e {
zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)
unread, err := redis.Int(conn.Do("ZCARD", zchannelKey))
if err != nil {
@ -746,12 +774,8 @@ func (b *memoryBackend) updateChannelUnreadCount(conn redis.Conn, channel string
}
defer b.save()
c.Unread = unread
b.lock.Lock()
b.Channels[channel] = c
b.lock.Unlock()
}
return nil
}

View File

@ -32,12 +32,13 @@ import (
type microsubHandler struct {
Backend microsub.Microsub
HubIncomingBackend HubBackend
Redis redis.Conn
}
func (h *microsubHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var logger = log.New(os.Stdout, "logger: ", log.Lshortfile)
conn := redis.NewLoggingConn(pool.Get(), logger, "microsub")
defer conn.Close()
h.Redis = redis.NewLoggingConn(pool.Get(), logger, "microsub")
defer h.Redis.Close()
r.ParseForm()
log.Printf("%s %s\n", r.Method, r.URL)
@ -56,7 +57,7 @@ func (h *microsubHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var token TokenResponse
if !h.cachedCheckAuthToken(conn, authorization, &token) {
if !h.cachedCheckAuthToken(authorization, &token) {
log.Printf("Token could not be validated")
http.Error(w, "Can't validate token", 403)
return