/* Microsub server Copyright (C) 2018 Peter Stuifzand This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . */ package main import ( "bufio" "bytes" "encoding/json" "fmt" "io" "log" "net/http" "net/url" "os" "reflect" "regexp" "strings" "time" "p83.nl/go/ekster/pkg/feedbin" "p83.nl/go/ekster/pkg/fetch" "p83.nl/go/ekster/pkg/microsub" "github.com/garyburd/redigo/redis" "willnorris.com/go/microformats" ) type memoryBackend struct { Channels map[string]microsub.Channel Feeds map[string][]microsub.Feed Settings map[string]channelSetting NextUid int Me string TokenEndpoint string ticker *time.Ticker quit chan struct{} } type channelSetting struct { ExcludeRegex string IncludeRegex string } type Debug interface { Debug() } type redisItem struct { ID string Published string Read bool Data []byte } type fetch2 struct {} func (f *fetch2) Fetch(url string) (*http.Response, error) { return Fetch2(url) } func (b *memoryBackend) Debug() { fmt.Println(b.Channels) fmt.Println(b.Feeds) fmt.Println(b.Settings) } func (b *memoryBackend) load() error { filename := "backend.json" f, err := os.Open(filename) if err != nil { panic("cant open backend.json") } defer f.Close() jw := json.NewDecoder(f) err = jw.Decode(b) if err != nil { return err } conn := pool.Get() defer conn.Close() conn.Do("SETNX", "channel_sortorder_notifications", 1) conn.Do("DEL", "channels") for uid, channel := range b.Channels { log.Printf("loading channel %s - %s\n", uid, channel.Name) // for _, feed := range b.Feeds[uid] { // log.Printf("- loading feed %s\n", feed.URL) // resp, err := b.Fetch3(uid, feed.URL) // if err != nil { // log.Printf("Error while Fetch3 of %s: %v\n", feed.URL, err) // continue // } // defer resp.Body.Close() // b.ProcessContent(uid, feed.URL, resp.Header.Get("Content-Type"), resp.Body) // } conn.Do("SADD", "channels", uid) conn.Do("SETNX", "channel_sortorder_"+uid, 99999) } return nil } func (b *memoryBackend) save() { filename := "backend.json" f, _ := os.Create(filename) defer f.Close() jw := json.NewEncoder(f) jw.SetIndent("", " ") jw.Encode(b) } func loadMemoryBackend() microsub.Microsub { backend := &memoryBackend{} err := backend.load() if err != nil { log.Printf("Error while loadingbackend: %v\n", err) return nil } return backend } func createMemoryBackend() microsub.Microsub { backend := memoryBackend{} defer backend.save() backend.Channels = make(map[string]microsub.Channel) backend.Feeds = make(map[string][]microsub.Feed) channels := []microsub.Channel{ microsub.Channel{UID: "notifications", Name: "Notifications"}, microsub.Channel{UID: "home", Name: "Home"}, } for _, c := range channels { backend.Channels[c.UID] = c } backend.NextUid = 1000000 backend.Me = "https://example.com/" log.Println(`Config file "backend.json" is created in the current directory.`) log.Println(`Update "Me" variable to your website address "https://example.com/"`) log.Println(`Update "TokenEndpoint" variable to the address of your token endpoint "https://example.com/token"`) return &backend } // ChannelsGetList gets channels func (b *memoryBackend) ChannelsGetList() ([]microsub.Channel, error) { conn := pool.Get() defer conn.Close() channels := []microsub.Channel{} uids, err := redis.Strings(conn.Do("SORT", "channels", "BY", "channel_sortorder_*", "ASC")) if err != nil { log.Printf("Sorting channels failed: %v\n", err) for _, v := range b.Channels { channels = append(channels, v) } } else { for _, uid := range uids { if c, e := b.Channels[uid]; e { channels = append(channels, c) } } } return channels, nil } // ChannelsCreate creates a channels func (b *memoryBackend) ChannelsCreate(name string) (microsub.Channel, error) { defer b.save() conn := pool.Get() defer conn.Close() uid := fmt.Sprintf("%04d", b.NextUid) channel := microsub.Channel{ UID: uid, Name: name, } b.Channels[channel.UID] = channel b.Feeds[channel.UID] = []microsub.Feed{} b.NextUid++ conn.Do("SADD", "channels", uid) conn.Do("SETNX", "channel_sortorder_"+uid, 99999) return channel, nil } // ChannelsUpdate updates a channels func (b *memoryBackend) ChannelsUpdate(uid, name string) (microsub.Channel, error) { defer b.save() if c, e := b.Channels[uid]; e { c.Name = name b.Channels[uid] = c return c, nil } return microsub.Channel{}, fmt.Errorf("Channel %s does not exist", uid) } // ChannelsDelete deletes a channel func (b *memoryBackend) ChannelsDelete(uid string) error { defer b.save() conn := pool.Get() defer conn.Close() conn.Do("SREM", "channels", uid) conn.Do("DEL", "channel_sortorder_"+uid) delete(b.Channels, uid) delete(b.Feeds, uid) return nil } func (b *memoryBackend) run() { b.ticker = time.NewTicker(10 * time.Minute) b.quit = make(chan struct{}) go func() { for { select { case <-b.ticker.C: for uid := range b.Channels { for _, feed := range b.Feeds[uid] { resp, err := b.Fetch3(uid, feed.URL) if err != nil { log.Printf("Error while Fetch3 of %s: %v\n", feed.URL, err) continue } defer resp.Body.Close() b.ProcessContent(uid, feed.URL, resp.Header.Get("Content-Type"), resp.Body) } } case <-b.quit: b.ticker.Stop() return } } }() } func (b *memoryBackend) TimelineGet(before, after, channel string) (microsub.Timeline, error) { conn := pool.Get() defer conn.Close() if channel == "feedbin" { fb := feedbin.New(os.Getenv("FEEDBIN_USER"), os.Getenv("FEEDBIN_PASS")) entries, err := fb.Entries() if err != nil { return microsub.Timeline{}, err } feeds := make(map[int64]feedbin.Feed) var items []microsub.Item for _, entry := range entries { var item microsub.Item var feed feedbin.Feed e := false if feed, e = feeds[entry.FeedID]; !e { feeds[entry.FeedID], _ = fb.Feed(entry.FeedID) feed = feeds[entry.FeedID] } item.Type = "entry" item.Name = entry.Title item.Content = µsub.Content{HTML: entry.Content} item.URL = entry.URL item.Published = entry.Published.Format(time.RFC3339) item.Author = µsub.Card{Type: "card", Name: feed.Title, URL: feed.SiteURL} items = append(items, item) } return microsub.Timeline{ Paging: microsub.Pagination{}, Items: items, }, nil } log.Printf("TimelineGet %s\n", channel) feeds, err := b.FollowGetList(channel) if err != nil { return microsub.Timeline{}, err } log.Println(feeds) items := []microsub.Item{} zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel) // channelKey := fmt.Sprintf("channel:%s:posts", channel) // itemJsons, err := redis.ByteSlices(conn.Do("SORT", channelKey, "BY", "*->Published", "GET", "*->Data", "ASC", "ALPHA")) // if err != nil { // log.Println(err) // return microsub.Timeline{ // Paging: microsub.Pagination{}, // Items: items, // } // } afterScore := "-inf" if len(after) != 0 { afterScore = "(" + after } beforeScore := "+inf" if len(before) != 0 { beforeScore = "(" + before } itemJSONs := [][]byte{} itemScores, err := redis.Strings( conn.Do( "ZRANGEBYSCORE", zchannelKey, afterScore, beforeScore, "LIMIT", 0, 20, "WITHSCORES", ), ) if err != nil { return microsub.Timeline{ Paging: microsub.Pagination{}, Items: items, }, err } if len(itemScores) >= 2 { before = itemScores[1] after = itemScores[len(itemScores)-1] } for i := 0; i < len(itemScores); i += 2 { itemID := itemScores[i] itemJSON, err := redis.Bytes(conn.Do("HGET", itemID, "Data")) if err != nil { log.Println(err) continue } itemJSONs = append(itemJSONs, itemJSON) } for _, obj := range itemJSONs { item := microsub.Item{} err := json.Unmarshal(obj, &item) if err != nil { // FIXME: what should we do if one of the items doen't unmarshal? log.Println(err) continue } item.Read = false items = append(items, item) } paging := microsub.Pagination{ After: after, Before: before, } return microsub.Timeline{ Paging: paging, Items: items, }, nil } // panic if s is not a slice func reverseSlice(s interface{}) { size := reflect.ValueOf(s).Len() swap := reflect.Swapper(s) for i, j := 0, size-1; i < j; i, j = i+1, j-1 { swap(i, j) } } // func (b *memoryBackend) checkRead(channel string, uid string) bool { // conn := pool.Get() // defer conn.Close() // args := redis.Args{}.Add(fmt.Sprintf("timeline:%s:read", channel)).Add("item:" + uid) // member, err := redis.Bool(conn.Do("SISMEMBER", args...)) // if err != nil { // log.Printf("Checking read for channel %s item %s has failed\n", channel, uid) // } // return member // } // func (b *memoryBackend) wasRead(channel string, item map[string]interface{}) bool { // if uid, e := item["uid"]; e { // uid = hex.EncodeToString([]byte(uid.(string))) // return b.checkRead(channel, uid.(string)) // } // if uid, e := item["url"]; e { // uid = hex.EncodeToString([]byte(uid.(string))) // return b.checkRead(channel, uid.(string)) // } // return false // } func (b *memoryBackend) FollowGetList(uid string) ([]microsub.Feed, error) { return b.Feeds[uid], nil } func (b *memoryBackend) FollowURL(uid string, url string) (microsub.Feed, error) { defer b.save() feed := microsub.Feed{Type: "feed", URL: url} resp, err := b.Fetch3(uid, feed.URL) if err != nil { return feed, err } defer resp.Body.Close() b.Feeds[uid] = append(b.Feeds[uid], feed) b.ProcessContent(uid, feed.URL, resp.Header.Get("Content-Type"), resp.Body) return feed, nil } func (b *memoryBackend) UnfollowURL(uid string, url string) error { defer b.save() index := -1 for i, f := range b.Feeds[uid] { if f.URL == url { index = i break } } if index >= 0 { feeds := b.Feeds[uid] b.Feeds[uid] = append(feeds[:index], feeds[index+1:]...) } return nil } func checkURL(u string) bool { testURL, err := url.Parse(u) if err != nil { return false } resp, err := http.Head(testURL.String()) if err != nil { log.Printf("Error while HEAD %s: %v\n", u, err) return false } defer resp.Body.Close() return resp.StatusCode == 200 } func getPossibleURLs(query string) []string { urls := []string{} if !(strings.HasPrefix(query, "https://") || strings.HasPrefix(query, "http://")) { secureURL := "https://" + query if checkURL(secureURL) { urls = append(urls, secureURL) } else { unsecureURL := "http://" + query if checkURL(unsecureURL) { urls = append(urls, unsecureURL) } } } else { urls = append(urls, query) } return urls } func (b *memoryBackend) Search(query string) ([]microsub.Feed, error) { urls := getPossibleURLs(query) feeds := []microsub.Feed{} for _, u := range urls { log.Println(u) resp, err := Fetch2(u) if err != nil { log.Printf("Error while fetching %s: %v\n", u, err) continue } fetchUrl, err := url.Parse(u) md := microformats.Parse(resp.Body, fetchUrl) if err != nil { log.Printf("Error while fetching %s: %v\n", u, err) continue } feedResp, err := Fetch2(fetchUrl.String()) if err != nil { log.Printf("Error in fetch of %s - %v\n", fetchUrl, err) continue } defer feedResp.Body.Close() parsedFeed, err := fetch.FeedHeader(&fetch2{}, fetchUrl.String(), feedResp.Header.Get("Content-Type"), feedResp.Body) if err != nil { log.Printf("Error in parse of %s - %v\n", fetchUrl, err) continue } feeds = append(feeds, parsedFeed) if alts, e := md.Rels["alternate"]; e { for _, alt := range alts { relURL := md.RelURLs[alt] log.Printf("alternate found with type %s %#v\n", relURL.Type, relURL) if strings.HasPrefix(relURL.Type, "text/html") || strings.HasPrefix(relURL.Type, "application/json") || strings.HasPrefix(relURL.Type, "application/xml") || strings.HasPrefix(relURL.Type, "text/xml") || strings.HasPrefix(relURL.Type, "application/rss+xml") || strings.HasPrefix(relURL.Type, "application/atom+xml") { feedResp, err := Fetch2(alt) if err != nil { log.Printf("Error in fetch of %s - %v\n", alt, err) continue } // FIXME: don't defer in for loop (possible memory leak) defer feedResp.Body.Close() parsedFeed, err := fetch.FeedHeader(&fetch2{}, alt, feedResp.Header.Get("Content-Type"), feedResp.Body) if err != nil { log.Printf("Error in parse of %s - %v\n", alt, err) continue } feeds = append(feeds, parsedFeed) } } } } return feeds, nil } func (b *memoryBackend) PreviewURL(previewURL string) (microsub.Timeline, error) { resp, err := Fetch2(previewURL) if err != nil { return microsub.Timeline{}, fmt.Errorf("error while fetching %s: %v", previewURL, err) } items, err := fetch.FeedItems(&fetch2{}, previewURL, resp.Header.Get("content-type"), resp.Body) if err != nil { return microsub.Timeline{}, fmt.Errorf("error while fetching %s: %v", previewURL, err) } return microsub.Timeline{ Items: items, }, nil } func (b *memoryBackend) MarkRead(channel string, uids []string) error { conn := pool.Get() defer conn.Close() itemUIDs := []string{} for _, uid := range uids { itemUIDs = append(itemUIDs, "item:"+uid) } channelKey := fmt.Sprintf("channel:%s:read", channel) args := redis.Args{}.Add(channelKey).AddFlat(itemUIDs) if _, err := conn.Do("SADD", args...); err != nil { return fmt.Errorf("Marking read for channel %s has failed: %s", channel, err) } zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel) args = redis.Args{}.Add(zchannelKey).AddFlat(itemUIDs) if _, err := conn.Do("ZREM", args...); err != nil { return fmt.Errorf("Marking read for channel %s has failed: %s", channel, err) } err := b.updateChannelUnreadCount(conn, channel) if err != nil { return err } log.Printf("Marking read success for %s %v\n", channel, itemUIDs) return nil } func (b *memoryBackend) ProcessContent(channel, fetchURL, contentType string, body io.Reader) error { conn := pool.Get() defer conn.Close() items, err := fetch.FeedItems(&fetch2{}, fetchURL, contentType, body) if err != nil { return err } for _, item := range items { item.Read = false err = b.channelAddItemWithMatcher(conn, channel, item) if err != nil { log.Printf("ERROR: %s\n", err) } } err = b.updateChannelUnreadCount(conn, channel) if err != nil { return err } return nil } // Fetch3 fills stuff func (b *memoryBackend) Fetch3(channel, fetchURL string) (*http.Response, error) { log.Printf("Fetching channel=%s fetchURL=%s\n", channel, fetchURL) return Fetch2(fetchURL) } func (b *memoryBackend) channelAddItemWithMatcher(conn redis.Conn, channel string, item microsub.Item) error { for channelKey, setting := range b.Settings { if setting.IncludeRegex != "" { included := false includeRegex, err := regexp.Compile(setting.IncludeRegex) if err != nil { log.Printf("error in regexp: %q\n", includeRegex) } else { if item.Content != nil && includeRegex.MatchString(item.Content.Text) { log.Printf("Included %#v\n", item) included = true } if includeRegex.MatchString(item.Name) { log.Printf("Included %#v\n", item) included = true } } if included { b.channelAddItem(conn, channelKey, item) } } } if setting, e := b.Settings[channel]; e { if setting.ExcludeRegex != "" { excludeRegex, err := regexp.Compile(setting.ExcludeRegex) if err != nil { log.Printf("error in regexp: %q\n", excludeRegex) } else { if item.Content != nil && excludeRegex.MatchString(item.Content.Text) { log.Printf("Excluded %#v\n", item) return nil } if excludeRegex.MatchString(item.Name) { log.Printf("Excluded %#v\n", item) return nil } } } } return b.channelAddItem(conn, channel, item) } func (b *memoryBackend) channelAddItem(conn redis.Conn, channel string, item microsub.Item) error { zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel) if item.Published == "" { item.Published = time.Now().Format(time.RFC3339) } data, err := json.Marshal(item) if err != nil { log.Printf("error while creating item for redis: %v\n", err) return err } forRedis := redisItem{ ID: item.ID, Published: item.Published, Read: item.Read, Data: data, } itemKey := fmt.Sprintf("item:%s", item.ID) _, err = redis.String(conn.Do("HMSET", redis.Args{}.Add(itemKey).AddFlat(&forRedis)...)) if err != nil { return fmt.Errorf("error while writing item for redis: %v", err) } readChannelKey := fmt.Sprintf("channel:%s:read", channel) isRead, err := redis.Bool(conn.Do("SISMEMBER", readChannelKey, itemKey)) if err != nil { return err } if isRead { return nil } score, err := time.Parse(time.RFC3339, item.Published) if err != nil { return fmt.Errorf("error can't parse %s as time", item.Published) } _, err = redis.Int64(conn.Do("ZADD", zchannelKey, score.Unix()*1.0, itemKey)) if err != nil { return fmt.Errorf("error while zadding item %s to channel %s for redis: %v", itemKey, zchannelKey, err) } return nil } func (b *memoryBackend) updateChannelUnreadCount(conn redis.Conn, channel string) error { if c, e := b.Channels[channel]; e { zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel) unread, err := redis.Int(conn.Do("ZCARD", zchannelKey)) if err != nil { return fmt.Errorf("error: while updating channel unread count for %s: %s", channel, err) } defer b.save() c.Unread = unread b.Channels[channel] = c } return nil } // Fetch2 fetches stuff func Fetch2(fetchURL string) (*http.Response, error) { conn := pool.Get() defer conn.Close() if !strings.HasPrefix(fetchURL, "http") { return nil, fmt.Errorf("error parsing %s as url, has no http(s) prefix", fetchURL) } u, err := url.Parse(fetchURL) if err != nil { return nil, fmt.Errorf("error parsing %s as url: %s", fetchURL, err) } req, err := http.NewRequest("GET", u.String(), nil) cacheKey := fmt.Sprintf("http_cache:%s", u.String()) data, err := redis.Bytes(conn.Do("GET", cacheKey)) if err == nil { log.Printf("HIT %s\n", u.String()) rd := bufio.NewReader(bytes.NewReader(data)) return http.ReadResponse(rd, req) } log.Printf("MISS %s\n", u.String()) client := http.Client{} resp, err := client.Do(req) if err != nil { return nil, fmt.Errorf("error while fetching %s: %s", u, err) } defer resp.Body.Close() var b bytes.Buffer resp.Write(&b) cachedCopy := make([]byte, b.Len()) cur := b.Bytes() copy(cachedCopy, cur) conn.Do("SET", cacheKey, cachedCopy, "EX", 60*60) cachedResp, err := http.ReadResponse(bufio.NewReader(bytes.NewReader(cachedCopy)), req) return cachedResp, err }