Improve updating the count of items in a channel
This commit is contained in:
parent
6677bd95ab
commit
f1483f4171
|
@ -342,6 +342,9 @@ func (b *memoryBackend) feedItems(fetchURL, contentType string, body io.Reader)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *memoryBackend) ProcessContent(channel, fetchURL, contentType string, body io.Reader) error {
|
func (b *memoryBackend) ProcessContent(channel, fetchURL, contentType string, body io.Reader) error {
|
||||||
|
conn := pool.Get()
|
||||||
|
defer conn.Close()
|
||||||
|
|
||||||
items, err := b.feedItems(fetchURL, contentType, body)
|
items, err := b.feedItems(fetchURL, contentType, body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -349,27 +352,23 @@ func (b *memoryBackend) ProcessContent(channel, fetchURL, contentType string, bo
|
||||||
|
|
||||||
for _, item := range items {
|
for _, item := range items {
|
||||||
item.Read = false
|
item.Read = false
|
||||||
b.channelAddItem(channel, item)
|
err = b.channelAddItem(conn, channel, item)
|
||||||
|
log.Printf("ERROR: %s\n", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = b.updateChannelUnreadCount(conn, channel)
|
||||||
|
log.Printf("error: while updating channel unread count for %s: %s\n", channel, err)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fetch3 fills stuff
|
// Fetch3 fills stuff
|
||||||
func (b *memoryBackend) Fetch3(channel, fetchURL string) (*http.Response, error) {
|
func (b *memoryBackend) Fetch3(channel, fetchURL string) (*http.Response, error) {
|
||||||
log.Printf("Fetching channel=%s fetchURL=%s\n", channel, fetchURL)
|
log.Printf("Fetching channel=%s fetchURL=%s\n", channel, fetchURL)
|
||||||
|
return Fetch2(fetchURL)
|
||||||
resp, err := Fetch2(fetchURL)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return resp, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) {
|
func (b *memoryBackend) channelAddItem(conn redis.Conn, channel string, item microsub.Item) error {
|
||||||
conn := pool.Get()
|
|
||||||
defer conn.Close()
|
|
||||||
|
|
||||||
// send to redis
|
// send to redis
|
||||||
channelKey := fmt.Sprintf("channel:%s:posts", channel)
|
channelKey := fmt.Sprintf("channel:%s:posts", channel)
|
||||||
zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)
|
zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)
|
||||||
|
@ -381,7 +380,7 @@ func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) {
|
||||||
data, err := json.Marshal(item)
|
data, err := json.Marshal(item)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("error while creating item for redis: %v\n", err)
|
log.Printf("error while creating item for redis: %v\n", err)
|
||||||
return
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
forRedis := redisItem{
|
forRedis := redisItem{
|
||||||
|
@ -397,46 +396,56 @@ func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) {
|
||||||
_, err = redis.String(conn.Do("HMSET", redis.Args{}.Add(itemKey).AddFlat(&forRedis)...))
|
_, err = redis.String(conn.Do("HMSET", redis.Args{}.Add(itemKey).AddFlat(&forRedis)...))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("error while writing item for redis: %v\n", err)
|
log.Printf("error while writing item for redis: %v\n", err)
|
||||||
return
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
readChannelKey := fmt.Sprintf("channel:%s:read", channel)
|
readChannelKey := fmt.Sprintf("channel:%s:read", channel)
|
||||||
isRead, err := redis.Bool(conn.Do("SISMEMBER", readChannelKey, itemKey))
|
isRead, err := redis.Bool(conn.Do("SISMEMBER", readChannelKey, itemKey))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
if isRead {
|
if isRead {
|
||||||
return
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
added, err := redis.Int64(conn.Do("SADD", channelKey, itemKey))
|
added, err := redis.Int64(conn.Do("SADD", channelKey, itemKey))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("error while adding item %s to channel %s for redis: %v\n", itemKey, channelKey, err)
|
log.Printf("error while adding item %s to channel %s for redis: %v\n", itemKey, channelKey, err)
|
||||||
return
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
score, err := time.Parse(time.RFC3339, item.Published)
|
score, err := time.Parse(time.RFC3339, item.Published)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("error can't parse %s as time\n", item.Published)
|
log.Printf("error can't parse %s as time\n", item.Published)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
added, err = redis.Int64(conn.Do("ZADD", zchannelKey, score.Unix()*1.0, itemKey))
|
added, err = redis.Int64(conn.Do("ZADD", zchannelKey, score.Unix()*1.0, itemKey))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("error while zadding item %s to channel %s for redis: %v\n", itemKey, channelKey, err)
|
log.Printf("error while zadding item %s to channel %s for redis: %v\n", itemKey, channelKey, err)
|
||||||
return
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if added > 0 {
|
if added > 0 {
|
||||||
log.Printf("Adding item to channel %s\n", channel)
|
log.Printf("Added item to channel %s\n", channel)
|
||||||
log.Println(item)
|
log.Println(item)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *memoryBackend) updateChannelUnreadCount(conn redis.Conn, channel string) error {
|
||||||
if c, e := b.Channels[channel]; e {
|
if c, e := b.Channels[channel]; e {
|
||||||
|
zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)
|
||||||
unread, err := redis.Int(conn.Do("ZCARD", zchannelKey))
|
unread, err := redis.Int(conn.Do("ZCARD", zchannelKey))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("error while getting length of channel %s: %v\n", channelKey, err)
|
return err
|
||||||
}
|
}
|
||||||
c.Unread = unread
|
c.Unread = unread
|
||||||
b.Channels[channel] = c
|
b.Channels[channel] = c
|
||||||
}
|
}
|
||||||
}
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type redisItem struct {
|
type redisItem struct {
|
||||||
|
|
|
@ -73,7 +73,8 @@ func (h *micropubHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
item.Read = false
|
item.Read = false
|
||||||
id, _ := redis.Int(conn.Do("INCR", "source:"+sourceID+"next_id"))
|
id, _ := redis.Int(conn.Do("INCR", "source:"+sourceID+"next_id"))
|
||||||
item.ID = fmt.Sprintf("%x", sha1.Sum([]byte(fmt.Sprintf("source:%s:%d", sourceID, id))))
|
item.ID = fmt.Sprintf("%x", sha1.Sum([]byte(fmt.Sprintf("source:%s:%d", sourceID, id))))
|
||||||
h.Backend.channelAddItem(channel, item)
|
h.Backend.channelAddItem(conn, channel, item)
|
||||||
|
h.Backend.updateChannelUnreadCount(conn, channel)
|
||||||
}
|
}
|
||||||
|
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
|
Loading…
Reference in New Issue
Block a user