ekster/cmd/eksterd/memory.go

848 lines
20 KiB
Go

package main
import (
"bufio"
"bytes"
"database/sql"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"net/url"
"regexp"
"strings"
"sync"
"time"
"p83.nl/go/ekster/pkg/auth"
"p83.nl/go/ekster/pkg/fetch"
"p83.nl/go/ekster/pkg/microsub"
"p83.nl/go/ekster/pkg/sse"
"p83.nl/go/ekster/pkg/timeline"
"p83.nl/go/ekster/pkg/util"
"github.com/gomodule/redigo/redis"
"willnorris.com/go/microformats"
)
// DefaultPrio is the priority value for new channels
const DefaultPrio = 9999999
type memoryBackend struct {
hubIncomingBackend
lock sync.RWMutex
Channels map[string]microsub.Channel
Feeds map[string][]microsub.Feed
Settings map[string]channelSetting
NextUID int
Me string // FIXME: should be removed
TokenEndpoint string // FIXME: should be removed
AuthEnabled bool
ticker *time.Ticker
quit chan struct{}
broker *sse.Broker
pool *redis.Pool
database *sql.DB
}
type channelSetting struct {
ExcludeRegex string
IncludeRegex string
ExcludeType []string
ChannelType string
}
type channelMessage struct {
Version int `json:"version"`
Channel microsub.Channel `json:"channel"`
}
type channelDeletedMessage struct {
Version int `json:"version"`
UID string `json:"uid"`
}
type newItemMessage struct {
Item microsub.Item `json:"item"`
Channel string `json:"channel"`
}
type fetch2 struct{}
func (f *fetch2) Fetch(url string) (*http.Response, error) {
return Fetch2(url)
}
func (b *memoryBackend) AuthTokenAccepted(header string, r *auth.TokenResponse) (bool, error) {
conn := b.pool.Get()
defer func() {
err := conn.Close()
if err != nil {
log.Printf("could not close redis connection: %v", err)
}
}()
return cachedCheckAuthToken(conn, header, b.TokenEndpoint, r)
}
func loadMemoryBackend(pool *redis.Pool, database *sql.DB) (*memoryBackend, error) {
backend := &memoryBackend{pool: pool, database: database}
return backend, nil
}
// ChannelsGetList gets channels
func (b *memoryBackend) ChannelsGetList() ([]microsub.Channel, error) {
conn := b.pool.Get()
defer conn.Close()
var channels []microsub.Channel
rows, err := b.database.Query(`
SELECT c.uid, c.name, count(i.channel_id)
FROM "channels" "c" left join items i on c.id = i.channel_id and i.is_read = 0
GROUP BY c.id;
`)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var uid, name string
var count int
_ = rows.Scan(&uid, &name, &count)
channels = append(channels, microsub.Channel{UID: uid, Name: name, Unread: microsub.Unread{
Type: microsub.UnreadCount,
UnreadCount: count,
}})
}
return channels, nil
}
// ChannelsCreate creates a channels
func (b *memoryBackend) ChannelsCreate(name string) (microsub.Channel, error) {
uid := util.RandStringBytes(24)
channel := microsub.Channel{
UID: uid,
Name: name,
Unread: microsub.Unread{Type: microsub.UnreadCount},
}
result, err := b.database.Exec(`insert into "channels" ("uid", "name", "created_at") values($1, $2, DEFAULT)`, channel.UID, channel.Name)
if err != nil {
return channel, err
}
if n, err := result.RowsAffected(); err != nil {
if n > 0 {
b.broker.Notifier <- sse.Message{Event: "new channel", Object: channelMessage{1, channel}}
}
}
return channel, nil
}
// ChannelsUpdate updates a channels
func (b *memoryBackend) ChannelsUpdate(uid, name string) (microsub.Channel, error) {
_, err := b.database.Exec(`UPDATE "channels" SET "name" = $1 WHERE "uid" = $2`, name, uid)
if err != nil {
return microsub.Channel{}, err
}
c := microsub.Channel{
UID: uid,
Name: name,
Unread: microsub.Unread{},
}
b.broker.Notifier <- sse.Message{Event: "update channel", Object: channelMessage{1, c}}
return c, nil
}
// ChannelsDelete deletes a channel
func (b *memoryBackend) ChannelsDelete(uid string) error {
_, err := b.database.Exec(`delete from "channels" where "uid" = $1`, uid)
if err != nil {
return err
}
b.broker.Notifier <- sse.Message{Event: "delete channel", Object: channelDeletedMessage{1, uid}}
return nil
}
type feed struct {
UID string // channel
ID int
URL string
}
func (b *memoryBackend) getFeeds() ([]feed, error) {
rows, err := b.database.Query(`SELECT "f"."id", "f"."url", "c"."uid" FROM "feeds" AS "f" INNER JOIN public.channels c on c.id = f.channel_id`)
if err != nil {
return nil, err
}
var feeds []feed
for rows.Next() {
var feedID int
var feedURL, UID string
err = rows.Scan(&feedID, &feedURL, &UID)
if err != nil {
log.Printf("while scanning feeds: %s", err)
continue
}
feeds = append(feeds, feed{UID, feedID, feedURL})
}
return feeds, nil
}
func (b *memoryBackend) run() {
b.ticker = time.NewTicker(10 * time.Minute)
b.quit = make(chan struct{})
go func() {
for {
select {
case <-b.ticker.C:
b.RefreshFeeds()
case <-b.quit:
b.ticker.Stop()
return
}
}
}()
}
func (b *memoryBackend) RefreshFeeds() {
feeds, err := b.getFeeds()
if err != nil {
return
}
count := 0
for _, feed := range feeds {
feedURL := feed.URL
feedID := feed.ID
uid := feed.UID
log.Println("Processing", feedURL)
resp, err := b.Fetch3(uid, feedURL)
if err != nil {
log.Printf("Error while Fetch3 of %s: %v\n", feedURL, err)
b.addNotification("Error while fetching feed", feedURL, err)
count++
continue
}
err = b.ProcessContent(uid, fmt.Sprintf("%d", feedID), feedURL, resp.Header.Get("Content-Type"), resp.Body)
if err != nil {
log.Printf("Error while processing content for %s: %v\n", feedURL, err)
b.addNotification("Error while processing feed", feedURL, err)
count++
continue
}
_ = resp.Body.Close()
}
if count > 0 {
_ = b.updateChannelUnreadCount("notifications")
}
}
func (b *memoryBackend) addNotification(name string, feedURL string, err error) {
err = b.channelAddItem("notifications", microsub.Item{
Type: "entry",
Name: name,
Content: &microsub.Content{
Text: fmt.Sprintf("ERROR: while updating feed: %s", err),
},
Published: time.Now().Format(time.RFC3339),
})
if err != nil {
log.Printf("ERROR: %s", err)
}
}
func (b *memoryBackend) TimelineGet(before, after, channel string) (microsub.Timeline, error) {
log.Printf("TimelineGet %s\n", channel)
// Check if feed exists
_, err := b.FollowGetList(channel)
if err != nil {
return microsub.Timeline{Items: []microsub.Item{}}, err
}
timelineBackend := b.getTimeline(channel)
_ = b.updateChannelUnreadCount(channel)
return timelineBackend.Items(before, after)
}
func (b *memoryBackend) FollowGetList(uid string) ([]microsub.Feed, error) {
rows, err := b.database.Query(`SELECT "f"."url" FROM "feeds" AS "f" INNER JOIN channels c on c.id = f.channel_id WHERE c.uid = $1`, uid)
if err != nil {
return nil, err
}
var feeds []microsub.Feed
for rows.Next() {
var feedURL string
err = rows.Scan(&feedURL)
if err != nil {
continue
}
feeds = append(feeds, microsub.Feed{
Type: "feed",
URL: feedURL,
})
}
return feeds, nil
}
func (b *memoryBackend) FollowURL(uid string, url string) (microsub.Feed, error) {
feed := microsub.Feed{Type: "feed", URL: url}
var channelID int
if row := b.database.QueryRow(`SELECT "id" FROM "channels" WHERE "uid" = $1`, uid); row != nil {
err := row.Scan(&channelID)
if err != nil {
log.Fatal(err)
}
}
result, err := b.database.Exec(
`INSERT INTO "feeds" ("channel_id", "url") VALUES ($1, $2)`,
channelID,
feed.URL,
)
if err != nil {
return microsub.Feed{}, err
}
feedID, _ := result.LastInsertId()
resp, err := b.Fetch3(uid, feed.URL)
if err != nil {
log.Println(err)
b.addNotification("Error while fetching feed", feed.URL, err)
_ = b.updateChannelUnreadCount("notifications")
return feed, err
}
defer resp.Body.Close()
_ = b.ProcessContent(uid, fmt.Sprintf("%d", feedID), feed.URL, resp.Header.Get("Content-Type"), resp.Body)
// FIXME: re-enable CreateFeed
// _, _ = b.CreateFeed(url, uid)
return feed, nil
}
func (b *memoryBackend) UnfollowURL(uid string, url string) error {
_, err := b.database.Exec(`DELETE FROM "feeds" "f" USING "channels" "c" WHERE "c"."id" = "f"."channel_id" AND f.url = $1 AND c.uid = $2`, url, uid)
return err
}
func checkURL(u string) bool {
testURL, err := url.Parse(u)
if err != nil {
return false
}
resp, err := http.Head(testURL.String())
if err != nil {
log.Printf("Error while HEAD %s: %v\n", u, err)
return false
}
defer resp.Body.Close()
return resp.StatusCode == 200
}
func getPossibleURLs(query string) []string {
urls := []string{}
if !(strings.HasPrefix(query, "https://") || strings.HasPrefix(query, "http://")) {
secureURL := "https://" + query
if checkURL(secureURL) {
urls = append(urls, secureURL)
} else {
unsecureURL := "http://" + query
if checkURL(unsecureURL) {
urls = append(urls, unsecureURL)
}
}
} else {
urls = append(urls, query)
}
return urls
}
func (b *memoryBackend) ItemSearch(channel, query string) ([]microsub.Item, error) {
return querySearch(channel, query)
}
func (b *memoryBackend) Search(query string) ([]microsub.Feed, error) {
urls := getPossibleURLs(query)
// needs to be like this, because we get a null result otherwise in the json output
feeds := []microsub.Feed{}
cachingFetch := WithCaching(b.pool, fetch.FetcherFunc(Fetch2))
for _, u := range urls {
log.Println(u)
resp, err := cachingFetch.Fetch(u)
if err != nil {
log.Printf("Error while fetching %s: %v\n", u, err)
continue
}
defer resp.Body.Close()
fetchURL, err := url.Parse(u)
md := microformats.Parse(resp.Body, fetchURL)
if err != nil {
log.Printf("Error while fetching %s: %v\n", u, err)
continue
}
feedResp, err := cachingFetch.Fetch(fetchURL.String())
if err != nil {
log.Printf("Error in fetch of %s - %v\n", fetchURL, err)
continue
}
defer feedResp.Body.Close()
// TODO: Combine FeedHeader and FeedItems so we can use it here
parsedFeed, err := fetch.FeedHeader(cachingFetch, fetchURL.String(), feedResp.Header.Get("Content-Type"), feedResp.Body)
if err != nil {
log.Printf("Error in parse of %s - %v\n", fetchURL, err)
continue
}
// TODO: Only include the feed if it contains some items
feeds = append(feeds, parsedFeed)
if alts, e := md.Rels["alternate"]; e {
for _, alt := range alts {
relURL := md.RelURLs[alt]
log.Printf("alternate found with type %s %#v\n", relURL.Type, relURL)
if strings.HasPrefix(relURL.Type, "text/html") || strings.HasPrefix(relURL.Type, "application/json") || strings.HasPrefix(relURL.Type, "application/xml") || strings.HasPrefix(relURL.Type, "text/xml") || strings.HasPrefix(relURL.Type, "application/rss+xml") || strings.HasPrefix(relURL.Type, "application/atom+xml") {
feedResp, err := cachingFetch.Fetch(alt)
if err != nil {
log.Printf("Error in fetch of %s - %v\n", alt, err)
continue
}
// FIXME: don't defer in for loop (possible memory leak)
defer feedResp.Body.Close()
parsedFeed, err := fetch.FeedHeader(cachingFetch, alt, feedResp.Header.Get("Content-Type"), feedResp.Body)
if err != nil {
log.Printf("Error in parse of %s - %v\n", alt, err)
continue
}
feeds = append(feeds, parsedFeed)
}
}
}
}
return feeds, nil
}
func (b *memoryBackend) PreviewURL(previewURL string) (microsub.Timeline, error) {
cachingFetch := WithCaching(b.pool, fetch.FetcherFunc(Fetch2))
resp, err := cachingFetch.Fetch(previewURL)
if err != nil {
return microsub.Timeline{}, fmt.Errorf("error while fetching %s: %v", previewURL, err)
}
defer resp.Body.Close()
items, err := ProcessSourcedItems(cachingFetch, previewURL, resp.Header.Get("content-type"), resp.Body)
if err != nil {
return microsub.Timeline{}, fmt.Errorf("error while fetching %s: %v", previewURL, err)
}
return microsub.Timeline{
Items: items,
}, nil
}
func (b *memoryBackend) MarkRead(channel string, uids []string) error {
tl := b.getTimeline(channel)
err := tl.MarkRead(uids)
if err != nil {
return err
}
err = b.updateChannelUnreadCount(channel)
if err != nil {
return err
}
return nil
}
func (b *memoryBackend) Events() (chan sse.Message, error) {
return sse.StartConnection(b.broker)
}
// ProcessSourcedItems processes items and adds the Source
func ProcessSourcedItems(fetcher fetch.Fetcher, fetchURL, contentType string, body io.Reader) ([]microsub.Item, error) {
// When the source is available from the Header, we fill the Source of the item
bodyBytes, err := ioutil.ReadAll(body)
if err != nil {
return nil, err
}
var source *microsub.Source
if header, err := fetch.FeedHeader(fetcher, fetchURL, contentType, bytes.NewBuffer(bodyBytes)); err == nil {
source = &microsub.Source{
ID: header.URL,
URL: header.URL,
Name: header.Name,
Photo: header.Photo,
}
} else {
source = &microsub.Source{
ID: fetchURL,
URL: fetchURL,
}
}
items, err := fetch.FeedItems(fetcher, fetchURL, contentType, bytes.NewBuffer(bodyBytes))
if err != nil {
return nil, err
}
for i, item := range items {
item.Read = false
item.Source = source
items[i] = item
}
return items, nil
}
func (b *memoryBackend) ProcessContent(channel, feedID, fetchURL, contentType string, body io.Reader) error {
cachingFetch := WithCaching(b.pool, fetch.FetcherFunc(Fetch2))
items, err := ProcessSourcedItems(cachingFetch, fetchURL, contentType, body)
if err != nil {
return err
}
for _, item := range items {
item.Source.ID = feedID
err = b.channelAddItemWithMatcher(channel, item)
if err != nil {
log.Printf("ERROR: %s\n", err)
}
}
err = b.updateChannelUnreadCount(channel)
if err != nil {
return err
}
return nil
}
// Fetch3 fills stuff
func (b *memoryBackend) Fetch3(channel, fetchURL string) (*http.Response, error) {
log.Printf("Fetching channel=%s fetchURL=%s\n", channel, fetchURL)
return Fetch2(fetchURL)
}
func (b *memoryBackend) channelAddItemWithMatcher(channel string, item microsub.Item) error {
// an item is posted
// check for all channels as channel
// if regex matches item
// - add item to channel
err := addToSearch(item, channel)
if err != nil {
return fmt.Errorf("addToSearch in channelAddItemWithMatcher: %v", err)
}
var updatedChannels []string
b.lock.RLock()
settings := b.Settings
b.lock.RUnlock()
for channelKey, setting := range settings {
if len(setting.ExcludeType) > 0 {
for _, v := range setting.ExcludeType {
switch v {
case "repost":
if len(item.RepostOf) > 0 {
return nil
}
break
case "like":
if len(item.LikeOf) > 0 {
return nil
}
break
case "bookmark":
if len(item.BookmarkOf) > 0 {
return nil
}
break
case "reply":
if len(item.InReplyTo) > 0 {
return nil
}
break
case "checkin":
if item.Checkin != nil {
return nil
}
break
}
}
}
if setting.IncludeRegex != "" {
re, err := regexp.Compile(setting.IncludeRegex)
if err != nil {
log.Printf("error in regexp: %q, %s\n", setting.IncludeRegex, err)
return nil
}
if matchItem(item, re) {
log.Printf("Included %#v\n", item)
err := b.channelAddItem(channelKey, item)
if err != nil {
continue
}
updatedChannels = append(updatedChannels, channelKey)
}
}
}
// Update all channels that have added items, because of the include matching
for _, value := range updatedChannels {
err := b.updateChannelUnreadCount(value)
if err != nil {
log.Printf("error while updating unread count for %s: %s", value, err)
continue
}
}
// Check for the exclude regex
b.lock.RLock()
setting, exists := b.Settings[channel]
b.lock.RUnlock()
if exists && setting.ExcludeRegex != "" {
excludeRegex, err := regexp.Compile(setting.ExcludeRegex)
if err != nil {
log.Printf("error in regexp: %q\n", excludeRegex)
return nil
}
if matchItem(item, excludeRegex) {
log.Printf("Excluded %#v\n", item)
return nil
}
}
return b.channelAddItem(channel, item)
}
func matchItem(item microsub.Item, re *regexp.Regexp) bool {
if matchItemText(item, re) {
return true
}
for _, v := range item.Refs {
if matchItemText(v, re) {
return true
}
}
return false
}
func matchItemText(item microsub.Item, re *regexp.Regexp) bool {
if item.Content != nil {
if re.MatchString(item.Content.Text) {
return true
}
if re.MatchString(item.Content.HTML) {
return true
}
}
return re.MatchString(item.Name)
}
func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) error {
timelineBackend := b.getTimeline(channel)
added, err := timelineBackend.AddItem(item)
if err != nil {
return err
}
// Sent message to Server-Sent-Events
if added {
b.broker.Notifier <- sse.Message{Event: "new item", Object: newItemMessage{item, channel}}
}
return err
}
func (b *memoryBackend) updateChannelUnreadCount(channel string) error {
// tl := b.getTimeline(channel)
// unread, err := tl.Count()
// if err != nil {
// return err
// }
//
// currentCount := c.Unread.UnreadCount
// c.Unread = microsub.Unread{Type: microsub.UnreadCount, UnreadCount: unread}
//
// // Sent message to Server-Sent-Events
// if currentCount != unread {
// b.broker.Notifier <- sse.Message{Event: "new item in channel", Object: c}
// }
return nil
}
// WithCaching adds caching to a fetch.Fetcher
func WithCaching(pool *redis.Pool, ff fetch.Fetcher) fetch.Fetcher {
ff2 := (func(fetchURL string) (*http.Response, error) {
conn := pool.Get()
defer conn.Close()
cacheKey := fmt.Sprintf("http_cache:%s", fetchURL)
u, err := url.Parse(fetchURL)
if err != nil {
return nil, fmt.Errorf("error parsing %s as url: %s", fetchURL, err)
}
req, err := http.NewRequest("GET", u.String(), nil)
data, err := redis.Bytes(conn.Do("GET", cacheKey))
if err == nil {
log.Printf("HIT %s\n", fetchURL)
rd := bufio.NewReader(bytes.NewReader(data))
return http.ReadResponse(rd, req)
}
log.Printf("MISS %s\n", fetchURL)
resp, err := ff.Fetch(fetchURL)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var b bytes.Buffer
err = resp.Write(&b)
if err != nil {
return nil, err
}
cachedCopy := make([]byte, b.Len())
cur := b.Bytes()
copy(cachedCopy, cur)
conn.Do("SET", cacheKey, cachedCopy, "EX", 60*60)
cachedResp, err := http.ReadResponse(bufio.NewReader(bytes.NewReader(cachedCopy)), req)
return cachedResp, err
})
return fetch.FetcherFunc(ff2)
}
// Fetch2 fetches stuff
func Fetch2(fetchURL string) (*http.Response, error) {
if !strings.HasPrefix(fetchURL, "http") {
return nil, fmt.Errorf("error parsing %s as url, has no http(s) prefix", fetchURL)
}
u, err := url.Parse(fetchURL)
if err != nil {
return nil, fmt.Errorf("error parsing %s as url: %s", fetchURL, err)
}
req, err := http.NewRequest("GET", u.String(), nil)
client := http.Client{}
resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("fetch failed: %s", err)
}
return resp, err
}
func (b *memoryBackend) getTimeline(channel string) timeline.Backend {
// Set a default timeline type if not set
timelineType := "postgres-stream"
// if setting, ok := b.Settings[channel]; ok && setting.ChannelType != "" {
// timelineType = setting.ChannelType
// }
tl := timeline.Create(channel, timelineType, b.pool, b.database)
if tl == nil {
log.Printf("no timeline found with name %q and type %q", channel, timelineType)
}
return tl
}
func (b *memoryBackend) createChannel(name string) microsub.Channel {
uid := fmt.Sprintf("%012d", b.NextUID)
channel := microsub.Channel{
UID: uid,
Name: name,
Unread: microsub.Unread{Type: microsub.UnreadCount},
}
return channel
}
func (b *memoryBackend) fetchChannel(name string) (microsub.Channel, bool) {
b.lock.RLock()
defer b.lock.RUnlock()
for _, c := range b.Channels {
if c.Name == name {
return c, true
}
}
return microsub.Channel{}, false
}
func (b *memoryBackend) setChannel(channel microsub.Channel) {
b.lock.Lock()
defer b.lock.Unlock()
b.Channels[channel.UID] = channel
b.Feeds[channel.UID] = []microsub.Feed{}
b.NextUID++
}
func updateChannelInRedis(conn redis.Conn, uid string, prio int) {
conn.Do("SADD", "channels", uid)
conn.Do("SETNX", "channel_sortorder_"+uid, prio)
}
func removeChannelFromRedis(conn redis.Conn, uid string) {
conn.Do("SREM", "channels", uid)
conn.Do("DEL", "channel_sortorder_"+uid)
}