Add error messages to nofications through stream channel backend
All checks were successful
the build was successful
All checks were successful
the build was successful
This commit is contained in:
parent
22cbf01566
commit
1e4847cf35
|
@ -78,6 +78,12 @@ type redisItem struct {
|
|||
Data []byte
|
||||
}
|
||||
|
||||
func (ri *redisItem) Item() microsub.Item {
|
||||
var item microsub.Item
|
||||
_ = json.Unmarshal(ri.Data, &item)
|
||||
return item
|
||||
}
|
||||
|
||||
type fetch2 struct{}
|
||||
|
||||
func (f *fetch2) Fetch(url string) (*http.Response, error) {
|
||||
|
@ -288,11 +294,19 @@ func (b *memoryBackend) run() {
|
|||
for _, feedURL := range feeds[uid] {
|
||||
resp, err := b.Fetch3(uid, feedURL)
|
||||
if err != nil {
|
||||
_ = b.channelAddItem("notifications", microsub.Item{
|
||||
Type: "entry",
|
||||
Name: fmt.Sprintf("Error while Fetch3 of %s: %v", feedURL, err),
|
||||
Content: µsub.Content{
|
||||
Text: "error while fetching feed",
|
||||
},
|
||||
UID: time.Now().String(),
|
||||
})
|
||||
log.Printf("Error while Fetch3 of %s: %v\n", feedURL, err)
|
||||
continue
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
b.ProcessContent(uid, feedURL, resp.Header.Get("Content-Type"), resp.Body)
|
||||
_ = b.ProcessContent(uid, feedURL, resp.Header.Get("Content-Type"), resp.Body)
|
||||
_ = resp.Body.Close()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -330,6 +344,14 @@ func (b *memoryBackend) FollowURL(uid string, url string) (microsub.Feed, error)
|
|||
|
||||
resp, err := b.Fetch3(uid, feed.URL)
|
||||
if err != nil {
|
||||
_ = b.channelAddItem("notifications", microsub.Item{
|
||||
Type: "entry",
|
||||
Name: fmt.Sprintf("Error while Fetch3 of %s: %v", feed.URL, err),
|
||||
Content: µsub.Content{
|
||||
Text: "error while fetching feed",
|
||||
},
|
||||
UID: time.Now().String(),
|
||||
})
|
||||
return feed, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
@ -338,9 +360,9 @@ func (b *memoryBackend) FollowURL(uid string, url string) (microsub.Feed, error)
|
|||
b.Feeds[uid] = append(b.Feeds[uid], feed)
|
||||
b.lock.Unlock()
|
||||
|
||||
b.ProcessContent(uid, feed.URL, resp.Header.Get("Content-Type"), resp.Body)
|
||||
_ = b.ProcessContent(uid, feed.URL, resp.Header.Get("Content-Type"), resp.Body)
|
||||
|
||||
b.CreateFeed(url, uid)
|
||||
_, _ = b.CreateFeed(url, uid)
|
||||
|
||||
return feed, nil
|
||||
}
|
||||
|
|
|
@ -11,12 +11,16 @@ import (
|
|||
)
|
||||
|
||||
type TimelineBackend interface {
|
||||
Init() error
|
||||
|
||||
Items(before, after string) (microsub.Timeline, error)
|
||||
AddItem(item microsub.Item) error
|
||||
Count() (int, error)
|
||||
|
||||
MarkRead(uids []string) error
|
||||
MarkUnread(uids []string) error
|
||||
|
||||
// Not used at the moment
|
||||
// MarkUnread(uids []string) error
|
||||
}
|
||||
|
||||
type redisSortedSetTimeline struct {
|
||||
|
@ -24,17 +28,30 @@ type redisSortedSetTimeline struct {
|
|||
}
|
||||
|
||||
type redisStreamTimeline struct {
|
||||
channel string
|
||||
channel, channelKey string
|
||||
}
|
||||
|
||||
func (b *memoryBackend) getTimeline(channel string) TimelineBackend {
|
||||
// TODO: fetch timeline type from channel
|
||||
timelineType := "sorted-set"
|
||||
if channel == "notifications" {
|
||||
timelineType = "stream"
|
||||
}
|
||||
if timelineType == "sorted-set" {
|
||||
return &redisSortedSetTimeline{channel}
|
||||
timeline := &redisSortedSetTimeline{channel}
|
||||
err := timeline.Init()
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
return timeline
|
||||
}
|
||||
if timelineType == "stream" {
|
||||
return &redisStreamTimeline{channel}
|
||||
timeline := &redisStreamTimeline{channel: channel}
|
||||
err := timeline.Init()
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
return timeline
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -42,6 +59,10 @@ func (b *memoryBackend) getTimeline(channel string) TimelineBackend {
|
|||
/*
|
||||
* REDIS SORTED SETS TIMELINE
|
||||
*/
|
||||
func (timeline *redisSortedSetTimeline) Init() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (timeline *redisSortedSetTimeline) Items(before, after string) (microsub.Timeline, error) {
|
||||
conn := pool.Get()
|
||||
defer conn.Close()
|
||||
|
@ -227,22 +248,87 @@ func (timeline *redisSortedSetTimeline) MarkUnread(uids []string) error {
|
|||
/*
|
||||
* REDIS STREAMS TIMELINE
|
||||
*/
|
||||
func (*redisStreamTimeline) Items(before, after string) (microsub.Timeline, error) {
|
||||
func (timeline *redisStreamTimeline) Init() error {
|
||||
timeline.channelKey = fmt.Sprintf("stream:%s", timeline.channel)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (timeline *redisStreamTimeline) Items(before, after string) (microsub.Timeline, error) {
|
||||
conn := pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
if before == "" {
|
||||
before = "-"
|
||||
}
|
||||
|
||||
if after == "" {
|
||||
after = "+"
|
||||
}
|
||||
|
||||
results, err := redis.Values(conn.Do("XREVRANGE", redis.Args{}.Add(timeline.channelKey, after, before, "COUNT", "20")...))
|
||||
if err != nil {
|
||||
return microsub.Timeline{}, err
|
||||
}
|
||||
|
||||
var forRedis redisItem
|
||||
|
||||
var items []microsub.Item
|
||||
for _, result := range results {
|
||||
if value, ok := result.([]interface{}); ok {
|
||||
id, ok2 := value[0].([]uint8)
|
||||
|
||||
if item, ok3 := value[1].([]interface{}); ok3 {
|
||||
err = redis.ScanStruct(item, &forRedis)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
item := forRedis.Item()
|
||||
if ok2 {
|
||||
item.ID = string(id)
|
||||
}
|
||||
items = append(items, item)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return microsub.Timeline{
|
||||
Items: items,
|
||||
Paging: microsub.Pagination{},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (timeline *redisStreamTimeline) AddItem(item microsub.Item) error {
|
||||
conn := pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
if item.Published == "" {
|
||||
item.Published = time.Now().Format(time.RFC3339)
|
||||
}
|
||||
|
||||
data, err := json.Marshal(item)
|
||||
if err != nil {
|
||||
log.Printf("error while creating item for redis: %v\n", err)
|
||||
return err
|
||||
}
|
||||
|
||||
args := redis.Args{}.Add(timeline.channelKey).Add("*").Add("ID").Add(item.ID).Add("Published").Add(item.Published).Add("Read").Add(item.Read).Add("Data").Add(data)
|
||||
|
||||
_, err = redis.String(conn.Do("XADD", args...))
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (timeline *redisStreamTimeline) Count() (int, error) {
|
||||
conn := pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
return redis.Int(conn.Do("XLEN", timeline.channelKey))
|
||||
}
|
||||
|
||||
func (timeline *redisStreamTimeline) MarkRead(uids []string) error {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (*redisStreamTimeline) AddItem(item microsub.Item) error {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (*redisStreamTimeline) Count() (int, error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func (*redisStreamTimeline) MarkRead(uids []string) error {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (*redisStreamTimeline) MarkUnread(uids []string) error {
|
||||
func (timeline *redisStreamTimeline) MarkUnread(uids []string) error {
|
||||
panic("implement me")
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue
Block a user