Add MarkRead function
- Remove conn parameters - Move MarkRead function - Make uids an array parameter
This commit is contained in:
parent
723b5b24d5
commit
22cbf01566
|
|
@ -485,34 +485,17 @@ func (b *memoryBackend) PreviewURL(previewURL string) (microsub.Timeline, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *memoryBackend) MarkRead(channel string, uids []string) error {
|
func (b *memoryBackend) MarkRead(channel string, uids []string) error {
|
||||||
conn := pool.Get()
|
timeline := b.getTimeline(channel)
|
||||||
defer conn.Close()
|
err := timeline.MarkRead(uids)
|
||||||
|
|
||||||
itemUIDs := []string{}
|
|
||||||
for _, uid := range uids {
|
|
||||||
itemUIDs = append(itemUIDs, "item:"+uid)
|
|
||||||
}
|
|
||||||
|
|
||||||
channelKey := fmt.Sprintf("channel:%s:read", channel)
|
|
||||||
args := redis.Args{}.Add(channelKey).AddFlat(itemUIDs)
|
|
||||||
|
|
||||||
if _, err := conn.Do("SADD", args...); err != nil {
|
|
||||||
return fmt.Errorf("marking read for channel %s has failed: %s", channel, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)
|
|
||||||
args = redis.Args{}.Add(zchannelKey).AddFlat(itemUIDs)
|
|
||||||
|
|
||||||
if _, err := conn.Do("ZREM", args...); err != nil {
|
|
||||||
return fmt.Errorf("marking read for channel %s has failed: %s", channel, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err := b.updateChannelUnreadCount(conn, channel)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("Marking read success for %s %v\n", channel, itemUIDs)
|
err = b.updateChannelUnreadCount(channel)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
@ -528,13 +511,13 @@ func (b *memoryBackend) ProcessContent(channel, fetchURL, contentType string, bo
|
||||||
|
|
||||||
for _, item := range items {
|
for _, item := range items {
|
||||||
item.Read = false
|
item.Read = false
|
||||||
err = b.channelAddItemWithMatcher(conn, channel, item)
|
err = b.channelAddItemWithMatcher(channel, item)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("ERROR: %s\n", err)
|
log.Printf("ERROR: %s\n", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = b.updateChannelUnreadCount(conn, channel)
|
err = b.updateChannelUnreadCount(channel)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -548,7 +531,7 @@ func (b *memoryBackend) Fetch3(channel, fetchURL string) (*http.Response, error)
|
||||||
return Fetch2(fetchURL)
|
return Fetch2(fetchURL)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *memoryBackend) channelAddItemWithMatcher(conn redis.Conn, channel string, item microsub.Item) error {
|
func (b *memoryBackend) channelAddItemWithMatcher(channel string, item microsub.Item) error {
|
||||||
// an item is posted
|
// an item is posted
|
||||||
// check for all channels as channel
|
// check for all channels as channel
|
||||||
// if regex matches item
|
// if regex matches item
|
||||||
|
|
@ -570,7 +553,7 @@ func (b *memoryBackend) channelAddItemWithMatcher(conn redis.Conn, channel strin
|
||||||
|
|
||||||
if matchItem(item, re) {
|
if matchItem(item, re) {
|
||||||
log.Printf("Included %#v\n", item)
|
log.Printf("Included %#v\n", item)
|
||||||
err := b.channelAddItem(conn, channelKey, item)
|
err := b.channelAddItem(channelKey, item)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
@ -581,7 +564,7 @@ func (b *memoryBackend) channelAddItemWithMatcher(conn redis.Conn, channel strin
|
||||||
|
|
||||||
// Update all channels that have added items, because of the include matching
|
// Update all channels that have added items, because of the include matching
|
||||||
for _, value := range updatedChannels {
|
for _, value := range updatedChannels {
|
||||||
err := b.updateChannelUnreadCount(conn, value)
|
err := b.updateChannelUnreadCount(value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("error while updating unread count for %s: %s", value, err)
|
log.Printf("error while updating unread count for %s: %s", value, err)
|
||||||
continue
|
continue
|
||||||
|
|
@ -605,7 +588,7 @@ func (b *memoryBackend) channelAddItemWithMatcher(conn redis.Conn, channel strin
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return b.channelAddItem(conn, channel, item)
|
return b.channelAddItem(channel, item)
|
||||||
}
|
}
|
||||||
|
|
||||||
func matchItem(item microsub.Item, re *regexp.Regexp) bool {
|
func matchItem(item microsub.Item, re *regexp.Regexp) bool {
|
||||||
|
|
@ -634,12 +617,12 @@ func matchItemText(item microsub.Item, re *regexp.Regexp) bool {
|
||||||
return re.MatchString(item.Name)
|
return re.MatchString(item.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *memoryBackend) channelAddItem(conn redis.Conn, channel string, item microsub.Item) error {
|
func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) error {
|
||||||
timelineBackend := b.getTimeline(channel)
|
timelineBackend := b.getTimeline(channel)
|
||||||
return timelineBackend.AddItem(item)
|
return timelineBackend.AddItem(item)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *memoryBackend) updateChannelUnreadCount(conn redis.Conn, channel string) error {
|
func (b *memoryBackend) updateChannelUnreadCount(channel string) error {
|
||||||
b.lock.RLock()
|
b.lock.RLock()
|
||||||
c, exists := b.Channels[channel]
|
c, exists := b.Channels[channel]
|
||||||
b.lock.RUnlock()
|
b.lock.RUnlock()
|
||||||
|
|
|
||||||
|
|
@ -111,8 +111,8 @@ func (h *micropubHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
item.Read = false
|
item.Read = false
|
||||||
id, _ := redis.Int(conn.Do("INCR", "source:"+sourceID+"next_id"))
|
id, _ := redis.Int(conn.Do("INCR", "source:"+sourceID+"next_id"))
|
||||||
item.ID = fmt.Sprintf("%x", sha1.Sum([]byte(fmt.Sprintf("source:%s:%d", sourceID, id))))
|
item.ID = fmt.Sprintf("%x", sha1.Sum([]byte(fmt.Sprintf("source:%s:%d", sourceID, id))))
|
||||||
h.Backend.channelAddItemWithMatcher(conn, channel, item)
|
err = h.Backend.channelAddItemWithMatcher(channel, item)
|
||||||
err = h.Backend.updateChannelUnreadCount(conn, channel)
|
err = h.Backend.updateChannelUnreadCount(channel)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("error: while updating channel unread count for %s: %s\n", channel, err)
|
log.Printf("error: while updating channel unread count for %s: %s\n", channel, err)
|
||||||
}
|
}
|
||||||
|
|
@ -121,7 +121,7 @@ func (h *micropubHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
|
||||||
enc := json.NewEncoder(w)
|
enc := json.NewEncoder(w)
|
||||||
enc.Encode(map[string]string{
|
err = enc.Encode(map[string]string{
|
||||||
"ok": "1",
|
"ok": "1",
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -15,8 +15,8 @@ type TimelineBackend interface {
|
||||||
AddItem(item microsub.Item) error
|
AddItem(item microsub.Item) error
|
||||||
Count() (int, error)
|
Count() (int, error)
|
||||||
|
|
||||||
MarkRead(uid string) error
|
MarkRead(uids []string) error
|
||||||
MarkUnread(uid string) error
|
MarkUnread(uids []string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type redisSortedSetTimeline struct {
|
type redisSortedSetTimeline struct {
|
||||||
|
|
@ -192,11 +192,35 @@ func (timeline *redisSortedSetTimeline) Count() (int, error) {
|
||||||
return unread, nil
|
return unread, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (timeline *redisSortedSetTimeline) MarkRead(uid string) error {
|
func (timeline *redisSortedSetTimeline) MarkRead(uids []string) error {
|
||||||
panic("implement me")
|
conn := pool.Get()
|
||||||
|
defer conn.Close()
|
||||||
|
|
||||||
|
channel := timeline.channel
|
||||||
|
|
||||||
|
itemUIDs := []string{}
|
||||||
|
for _, uid := range uids {
|
||||||
|
itemUIDs = append(itemUIDs, "item:"+uid)
|
||||||
|
}
|
||||||
|
|
||||||
|
channelKey := fmt.Sprintf("channel:%s:read", channel)
|
||||||
|
args := redis.Args{}.Add(channelKey).AddFlat(itemUIDs)
|
||||||
|
|
||||||
|
if _, err := conn.Do("SADD", args...); err != nil {
|
||||||
|
return fmt.Errorf("marking read for channel %s has failed: %s", channel, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)
|
||||||
|
args = redis.Args{}.Add(zchannelKey).AddFlat(itemUIDs)
|
||||||
|
|
||||||
|
if _, err := conn.Do("ZREM", args...); err != nil {
|
||||||
|
return fmt.Errorf("marking read for channel %s has failed: %s", channel, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (timeline *redisSortedSetTimeline) MarkUnread(uid string) error {
|
func (timeline *redisSortedSetTimeline) MarkUnread(uids []string) error {
|
||||||
panic("implement me")
|
panic("implement me")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -215,10 +239,10 @@ func (*redisStreamTimeline) Count() (int, error) {
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (*redisStreamTimeline) MarkRead(uid string) error {
|
func (*redisStreamTimeline) MarkRead(uids []string) error {
|
||||||
panic("implement me")
|
panic("implement me")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (*redisStreamTimeline) MarkUnread(uid string) error {
|
func (*redisStreamTimeline) MarkUnread(uids []string) error {
|
||||||
panic("implement me")
|
panic("implement me")
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user