diff --git a/cmd/server/fetch.go b/cmd/server/fetch.go index 5e63475..3725632 100644 --- a/cmd/server/fetch.go +++ b/cmd/server/fetch.go @@ -257,6 +257,8 @@ func (b *memoryBackend) feedItems(fetchURL, contentType string, body io.Reader) return items, err } + log.Printf("%#v\n", feed) + author := microsub.Card{} author.Type = "card" author.Name = feed.Author.Name @@ -275,7 +277,7 @@ func (b *memoryBackend) feedItems(fetchURL, contentType string, body io.Reader) item.Content.Text = feedItem.ContentText item.URL = feedItem.URL item.Summary = []string{feedItem.Summary} - item.Id = hex.EncodeToString([]byte(feedItem.ID)) + item.ID = hex.EncodeToString([]byte(feedItem.ID)) item.Published = feedItem.DatePublished itemAuthor := microsub.Card{} @@ -310,9 +312,9 @@ func (b *memoryBackend) feedItems(fetchURL, contentType string, body io.Reader) item.Content.Text = feedItem.Summary item.URL = feedItem.Link if feedItem.ID == "" { - item.Id = hex.EncodeToString([]byte(feedItem.Link)) + item.ID = hex.EncodeToString([]byte(feedItem.Link)) } else { - item.Id = hex.EncodeToString([]byte(feedItem.ID)) + item.ID = hex.EncodeToString([]byte(feedItem.ID)) } item.Published = feedItem.Date.Format(time.RFC3339) items = append(items, item) @@ -330,7 +332,7 @@ func (b *memoryBackend) ProcessContent(channel, fetchURL, contentType string, bo } for _, item := range items { - item.Read = b.checkRead(channel, item.Id) + item.Read = b.checkRead(channel, item.ID) if item.Read { continue } @@ -341,21 +343,20 @@ func (b *memoryBackend) ProcessContent(channel, fetchURL, contentType string, bo } // Fetch3 fills stuff -func (b *memoryBackend) Fetch3(channel, fetchURL string) error { +func (b *memoryBackend) Fetch3(channel, fetchURL string) (*http.Response, error) { log.Printf("Fetching channel=%s fetchURL=%s\n", channel, fetchURL) resp, err := Fetch2(fetchURL) if err != nil { - return err + return nil, err } - defer resp.Body.Close() - - return b.ProcessContent(channel, fetchURL, resp.Header.Get("Content-Type"), resp.Body) + return resp, nil } func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) { - log.Printf("Adding item to channel %s\n", channel) - log.Println(item) + conn := pool.Get() + defer conn.Close() + // send to redis channelKey := fmt.Sprintf("channel:%s:posts", channel) @@ -366,26 +367,28 @@ func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) { } forRedis := redisItem{ - Id: item.Id, + ID: item.ID, Published: item.Published, Read: item.Read, Data: data, } - itemKey := fmt.Sprintf("item:%s", item.Id) - _, err = redis.String(b.Redis.Do("HMSET", redis.Args{}.Add(itemKey).AddFlat(&forRedis)...)) + itemKey := fmt.Sprintf("item:%s", item.ID) + _, err = redis.String(conn.Do("HMSET", redis.Args{}.Add(itemKey).AddFlat(&forRedis)...)) if err != nil { log.Printf("error while writing item for redis: %v\n", err) return } - added, err := redis.Int64(b.Redis.Do("SADD", channelKey, itemKey)) + added, err := redis.Int64(conn.Do("SADD", channelKey, itemKey)) if err != nil { log.Printf("error while adding item %s to channel %s for redis: %v\n", itemKey, channelKey, err) return } - if added > 0 { + log.Printf("Adding item to channel %s\n", channel) + log.Println(item) + if c, e := b.Channels[channel]; e { c.Unread = true b.Channels[channel] = c @@ -394,7 +397,7 @@ func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) { } type redisItem struct { - Id string + ID string Published string Read bool Data []byte diff --git a/cmd/server/main.go b/cmd/server/main.go index 0a2ed37..c6e093e 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -53,7 +53,6 @@ type microsubHandler struct { type hubIncomingBackend struct { backend *memoryBackend - conn redis.Conn } const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" @@ -67,7 +66,9 @@ func randStringBytes(n int) string { } func (h *hubIncomingBackend) GetSecret(id int64) string { - secret, err := redis.String(h.conn.Do("HGET", fmt.Sprintf("feed:%d", id), "secret")) + conn := pool.Get() + defer conn.Close() + secret, err := redis.String(conn.Do("HGET", fmt.Sprintf("feed:%d", id), "secret")) if err != nil { return "" } @@ -77,16 +78,18 @@ func (h *hubIncomingBackend) GetSecret(id int64) string { var hubURL = "https://hub.stuifzandapp.com/" func (h *hubIncomingBackend) CreateFeed(topic string, channel string) (int64, error) { - id, err := redis.Int64(h.conn.Do("INCR", "feed:next_id")) + conn := pool.Get() + defer conn.Close() + id, err := redis.Int64(conn.Do("INCR", "feed:next_id")) if err != nil { return 0, err } - h.conn.Do("HSET", fmt.Sprintf("feed:%d", id), "url", topic) - h.conn.Do("HSET", fmt.Sprintf("feed:%d", id), "channel", channel) + conn.Do("HSET", fmt.Sprintf("feed:%d", id), "url", topic) + conn.Do("HSET", fmt.Sprintf("feed:%d", id), "channel", channel) secret := randStringBytes(16) - h.conn.Do("HSET", fmt.Sprintf("feed:%d", id), "secret", secret) + conn.Do("HSET", fmt.Sprintf("feed:%d", id), "secret", secret) hub, err := url.Parse(hubURL) q := hub.Query() @@ -109,12 +112,14 @@ func (h *hubIncomingBackend) CreateFeed(topic string, channel string) (int64, er } func (h *hubIncomingBackend) UpdateFeed(feedID int64, contentType string, body io.Reader) error { + conn := pool.Get() + defer conn.Close() log.Printf("updating feed %d", feedID) - u, err := redis.String(h.conn.Do("HGET", fmt.Sprintf("feed:%d", feedID), "url")) + u, err := redis.String(conn.Do("HGET", fmt.Sprintf("feed:%d", feedID), "url")) if err != nil { return err } - channel, err := redis.String(h.conn.Do("HGET", fmt.Sprintf("feed:%d", feedID), "channel")) + channel, err := redis.String(conn.Do("HGET", fmt.Sprintf("feed:%d", feedID), "channel")) if err != nil { return err } @@ -276,8 +281,6 @@ func newPool(addr string) *redis.Pool { func main() { flag.Parse() - var logger = log.New(os.Stdout, "logger: ", log.Lshortfile) - createBackend := false args := flag.Args() @@ -289,9 +292,6 @@ func main() { pool = newPool(*redisServer) - conn := redis.NewLoggingConn(pool.Get(), logger, "microsub") - defer conn.Close() - var backend microsub.Microsub if createBackend { @@ -299,9 +299,9 @@ func main() { return } - backend = loadMemoryBackend(conn) + backend = loadMemoryBackend() - hubBackend := hubIncomingBackend{backend.(*memoryBackend), conn} + hubBackend := hubIncomingBackend{backend.(*memoryBackend)} http.Handle("/micropub", µpubHandler{ Backend: backend.(*memoryBackend), diff --git a/cmd/server/memory.go b/cmd/server/memory.go index a43da5e..d6f702b 100644 --- a/cmd/server/memory.go +++ b/cmd/server/memory.go @@ -35,7 +35,6 @@ import ( ) type memoryBackend struct { - Redis redis.Conn Channels map[string]microsub.Channel Feeds map[string][]microsub.Feed NextUid int @@ -56,7 +55,7 @@ func (b *memoryBackend) Debug() { fmt.Println(b.Channels) } -func (b *memoryBackend) load() { +func (b *memoryBackend) load() error { filename := "backend.json" f, err := os.Open(filename) if err != nil { @@ -66,23 +65,33 @@ func (b *memoryBackend) load() { jw := json.NewDecoder(f) err = jw.Decode(b) if err != nil { - panic("cant open backend.json") + return err } - b.Redis.Do("SETNX", "channel_sortorder_notifications", 1) + conn := pool.Get() + defer conn.Close() - b.Redis.Do("DEL", "channels") + conn.Do("SETNX", "channel_sortorder_notifications", 1) + + conn.Do("DEL", "channels") for uid, channel := range b.Channels { log.Printf("loading channel %s - %s\n", uid, channel.Name) for _, feed := range b.Feeds[uid] { log.Printf("- loading feed %s\n", feed.URL) - b.Fetch3(uid, feed.URL) + resp, err := b.Fetch3(uid, feed.URL) + if err != nil { + log.Printf("Error while Fetch3 of %s: %v\n", feed.URL, err) + continue + } + defer resp.Body.Close() + b.ProcessContent(uid, feed.URL, resp.Header.Get("Content-Type"), resp.Body) } - b.Redis.Do("SADD", "channels", uid) - b.Redis.Do("SETNX", "channel_sortorder_"+uid, 99999) + conn.Do("SADD", "channels", uid) + conn.Do("SETNX", "channel_sortorder_"+uid, 99999) } + return nil } func (b *memoryBackend) save() { @@ -93,10 +102,13 @@ func (b *memoryBackend) save() { jw.Encode(b) } -func loadMemoryBackend(conn redis.Conn) microsub.Microsub { +func loadMemoryBackend() microsub.Microsub { backend := &memoryBackend{} - backend.Redis = conn - backend.load() + err := backend.load() + if err != nil { + log.Printf("Error while loadingbackend: %v\n", err) + return nil + } return backend } @@ -121,8 +133,11 @@ func createMemoryBackend() microsub.Microsub { // ChannelsGetList gets channels func (b *memoryBackend) ChannelsGetList() []microsub.Channel { + conn := pool.Get() + defer conn.Close() + channels := []microsub.Channel{} - uids, err := redis.Strings(b.Redis.Do("SORT", "channels", "BY", "channel_sortorder_*", "ASC")) + uids, err := redis.Strings(conn.Do("SORT", "channels", "BY", "channel_sortorder_*", "ASC")) if err != nil { log.Printf("Sorting channels failed: %v\n", err) for _, v := range b.Channels { @@ -301,7 +316,7 @@ func mapToItem(result map[string]interface{}) microsub.Item { } if id, e := result["_id"]; e { - item.Id = id.(string) + item.ID = id.(string) } if read, e := result["_is_read"]; e { item.Read = read.(bool) @@ -320,7 +335,13 @@ func (b *memoryBackend) run() { case <-b.ticker.C: for uid := range b.Channels { for _, feed := range b.Feeds[uid] { - b.Fetch3(uid, feed.URL) + resp, err := b.Fetch3(uid, feed.URL) + if err != nil { + log.Printf("Error while Fetch3 of %s: %v\n", feed.URL, err) + continue + } + defer resp.Body.Close() + b.ProcessContent(uid, feed.URL, resp.Header.Get("Content-Type"), resp.Body) } } case <-b.quit: @@ -332,6 +353,9 @@ func (b *memoryBackend) run() { } func (b *memoryBackend) TimelineGet(after, before, channel string) microsub.Timeline { + conn := pool.Get() + defer conn.Close() + log.Printf("TimelineGet %s\n", channel) feeds := b.FollowGetList(channel) log.Println(feeds) @@ -340,7 +364,7 @@ func (b *memoryBackend) TimelineGet(after, before, channel string) microsub.Time channelKey := fmt.Sprintf("channel:%s:posts", channel) - itemJsons, err := redis.ByteSlices(b.Redis.Do("SORT", channelKey, "BY", "*->Published", "GET", "*->Data", "ASC", "ALPHA")) + itemJsons, err := redis.ByteSlices(conn.Do("SORT", channelKey, "BY", "*->Published", "GET", "*->Data", "ASC", "ALPHA")) if err != nil { log.Println(err) return microsub.Timeline{ @@ -352,7 +376,7 @@ func (b *memoryBackend) TimelineGet(after, before, channel string) microsub.Time for _, obj := range itemJsons { item := microsub.Item{} json.Unmarshal(obj, &item) - item.Read = b.checkRead(channel, item.Id) + item.Read = b.checkRead(channel, item.ID) if item.Read { continue } @@ -375,8 +399,10 @@ func reverseSlice(s interface{}) { } func (b *memoryBackend) checkRead(channel string, uid string) bool { + conn := pool.Get() + defer conn.Close() args := redis.Args{}.Add(fmt.Sprintf("timeline:%s:read", channel)).Add("item:" + uid) - member, err := redis.Bool(b.Redis.Do("SISMEMBER", args...)) + member, err := redis.Bool(conn.Do("SISMEMBER", args...)) if err != nil { log.Printf("Checking read for channel %s item %s has failed\n", channel, uid) } @@ -539,6 +565,9 @@ func (b *memoryBackend) PreviewURL(previewURL string) microsub.Timeline { } func (b *memoryBackend) MarkRead(channel string, uids []string) { + conn := pool.Get() + defer conn.Close() + log.Printf("Marking read for %s %v\n", channel, uids) itemUIDs := []string{} @@ -547,7 +576,7 @@ func (b *memoryBackend) MarkRead(channel string, uids []string) { } args := redis.Args{}.Add(fmt.Sprintf("timeline:%s:read", channel)).AddFlat(itemUIDs) - if _, err := b.Redis.Do("SADD", args...); err != nil { + if _, err := conn.Do("SADD", args...); err != nil { log.Printf("Marking read for channel %s has failed\n", channel) } log.Printf("Marking read success for %s %v\n", channel, itemUIDs) diff --git a/cmd/server/micropub.go b/cmd/server/micropub.go index 9a1c9ae..bcc25fa 100644 --- a/cmd/server/micropub.go +++ b/cmd/server/micropub.go @@ -17,10 +17,13 @@ type micropubHandler struct { func (h *micropubHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() + conn := pool.Get() + defer conn.Close() + if r.Method == http.MethodPost { sourceID := r.URL.Query().Get("source_id") - channel, err := redis.String(h.Backend.Redis.Do("HGET", "sources", sourceID)) + channel, err := redis.String(conn.Do("HGET", "sources", sourceID)) if err != nil { http.Error(w, "Unknown source", 400) return @@ -38,7 +41,7 @@ func (h *micropubHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } item.Read = false - item.Id = hex.EncodeToString([]byte(item.URL)) + item.ID = hex.EncodeToString([]byte(item.URL)) h.Backend.channelAddItem(channel, item) } else { diff --git a/microsub/protocol.go b/microsub/protocol.go index a492254..5d6c8ba 100644 --- a/microsub/protocol.go +++ b/microsub/protocol.go @@ -75,7 +75,7 @@ type Item struct { Latitude string `json:"latitude,omitempty"` Longitude string `json:"longitude,omitempty"` Checkin Card `json:"checkin,omitempty"` - Id string `json:"_id"` + ID string `json:"_id"` Read bool `json:"_is_read"` }