Fix a number of data races in the memory backend
This commit is contained in:
parent
fa61569e56
commit
e02050013d
|
|
@ -27,12 +27,11 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
"reflect"
|
|
||||||
"regexp"
|
"regexp"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"p83.nl/go/ekster/pkg/feedbin"
|
|
||||||
"p83.nl/go/ekster/pkg/fetch"
|
"p83.nl/go/ekster/pkg/fetch"
|
||||||
"p83.nl/go/ekster/pkg/microsub"
|
"p83.nl/go/ekster/pkg/microsub"
|
||||||
|
|
||||||
|
|
@ -41,10 +40,12 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type memoryBackend struct {
|
type memoryBackend struct {
|
||||||
Channels map[string]microsub.Channel
|
lock sync.RWMutex
|
||||||
Feeds map[string][]microsub.Feed
|
Channels map[string]microsub.Channel
|
||||||
Settings map[string]channelSetting
|
Feeds map[string][]microsub.Feed
|
||||||
NextUid int
|
Settings map[string]channelSetting
|
||||||
|
NextUid int
|
||||||
|
|
||||||
Me string
|
Me string
|
||||||
TokenEndpoint string
|
TokenEndpoint string
|
||||||
|
|
||||||
|
|
@ -75,6 +76,8 @@ func (f *fetch2) Fetch(url string) (*http.Response, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *memoryBackend) Debug() {
|
func (b *memoryBackend) Debug() {
|
||||||
|
b.lock.RLock()
|
||||||
|
defer b.lock.RUnlock()
|
||||||
fmt.Println(b.Channels)
|
fmt.Println(b.Channels)
|
||||||
fmt.Println(b.Feeds)
|
fmt.Println(b.Feeds)
|
||||||
fmt.Println(b.Settings)
|
fmt.Println(b.Settings)
|
||||||
|
|
@ -100,22 +103,15 @@ func (b *memoryBackend) load() error {
|
||||||
|
|
||||||
conn.Do("DEL", "channels")
|
conn.Do("DEL", "channels")
|
||||||
|
|
||||||
|
b.lock.RLock()
|
||||||
|
defer b.lock.RUnlock()
|
||||||
|
|
||||||
for uid, channel := range b.Channels {
|
for uid, channel := range b.Channels {
|
||||||
log.Printf("loading channel %s - %s\n", uid, channel.Name)
|
log.Printf("loading channel %s - %s\n", uid, channel.Name)
|
||||||
// for _, feed := range b.Feeds[uid] {
|
|
||||||
// log.Printf("- loading feed %s\n", feed.URL)
|
|
||||||
// resp, err := b.Fetch3(uid, feed.URL)
|
|
||||||
// if err != nil {
|
|
||||||
// log.Printf("Error while Fetch3 of %s: %v\n", feed.URL, err)
|
|
||||||
// continue
|
|
||||||
// }
|
|
||||||
// defer resp.Body.Close()
|
|
||||||
// b.ProcessContent(uid, feed.URL, resp.Header.Get("Content-Type"), resp.Body)
|
|
||||||
// }
|
|
||||||
|
|
||||||
conn.Do("SADD", "channels", uid)
|
conn.Do("SADD", "channels", uid)
|
||||||
conn.Do("SETNX", "channel_sortorder_"+uid, 99999)
|
conn.Do("SETNX", "channel_sortorder_"+uid, 99999)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -125,6 +121,8 @@ func (b *memoryBackend) save() {
|
||||||
defer f.Close()
|
defer f.Close()
|
||||||
jw := json.NewEncoder(f)
|
jw := json.NewEncoder(f)
|
||||||
jw.SetIndent("", " ")
|
jw.SetIndent("", " ")
|
||||||
|
b.lock.RLock()
|
||||||
|
defer b.lock.RUnlock()
|
||||||
jw.Encode(b)
|
jw.Encode(b)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -141,20 +139,24 @@ func loadMemoryBackend() microsub.Microsub {
|
||||||
|
|
||||||
func createMemoryBackend() microsub.Microsub {
|
func createMemoryBackend() microsub.Microsub {
|
||||||
backend := memoryBackend{}
|
backend := memoryBackend{}
|
||||||
defer backend.save()
|
backend.lock.Lock()
|
||||||
|
|
||||||
backend.Channels = make(map[string]microsub.Channel)
|
backend.Channels = make(map[string]microsub.Channel)
|
||||||
backend.Feeds = make(map[string][]microsub.Feed)
|
backend.Feeds = make(map[string][]microsub.Feed)
|
||||||
channels := []microsub.Channel{
|
channels := []microsub.Channel{
|
||||||
microsub.Channel{UID: "notifications", Name: "Notifications"},
|
{UID: "notifications", Name: "Notifications"},
|
||||||
microsub.Channel{UID: "home", Name: "Home"},
|
{UID: "home", Name: "Home"},
|
||||||
}
|
}
|
||||||
for _, c := range channels {
|
for _, c := range channels {
|
||||||
backend.Channels[c.UID] = c
|
backend.Channels[c.UID] = c
|
||||||
}
|
}
|
||||||
backend.NextUid = 1000000
|
backend.NextUid = 1000000
|
||||||
|
|
||||||
backend.Me = "https://example.com/"
|
backend.Me = "https://example.com/"
|
||||||
|
|
||||||
|
backend.lock.Unlock()
|
||||||
|
|
||||||
|
backend.save()
|
||||||
|
|
||||||
log.Println(`Config file "backend.json" is created in the current directory.`)
|
log.Println(`Config file "backend.json" is created in the current directory.`)
|
||||||
log.Println(`Update "Me" variable to your website address "https://example.com/"`)
|
log.Println(`Update "Me" variable to your website address "https://example.com/"`)
|
||||||
log.Println(`Update "TokenEndpoint" variable to the address of your token endpoint "https://example.com/token"`)
|
log.Println(`Update "TokenEndpoint" variable to the address of your token endpoint "https://example.com/token"`)
|
||||||
|
|
@ -166,7 +168,10 @@ func (b *memoryBackend) ChannelsGetList() ([]microsub.Channel, error) {
|
||||||
conn := pool.Get()
|
conn := pool.Get()
|
||||||
defer conn.Close()
|
defer conn.Close()
|
||||||
|
|
||||||
channels := []microsub.Channel{}
|
b.lock.RLock()
|
||||||
|
defer b.lock.RUnlock()
|
||||||
|
|
||||||
|
var channels []microsub.Channel
|
||||||
uids, err := redis.Strings(conn.Do("SORT", "channels", "BY", "channel_sortorder_*", "ASC"))
|
uids, err := redis.Strings(conn.Do("SORT", "channels", "BY", "channel_sortorder_*", "ASC"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Sorting channels failed: %v\n", err)
|
log.Printf("Sorting channels failed: %v\n", err)
|
||||||
|
|
@ -196,9 +201,11 @@ func (b *memoryBackend) ChannelsCreate(name string) (microsub.Channel, error) {
|
||||||
Name: name,
|
Name: name,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
b.lock.Lock()
|
||||||
b.Channels[channel.UID] = channel
|
b.Channels[channel.UID] = channel
|
||||||
b.Feeds[channel.UID] = []microsub.Feed{}
|
b.Feeds[channel.UID] = []microsub.Feed{}
|
||||||
b.NextUid++
|
b.NextUid++
|
||||||
|
b.lock.Unlock()
|
||||||
|
|
||||||
conn.Do("SADD", "channels", uid)
|
conn.Do("SADD", "channels", uid)
|
||||||
conn.Do("SETNX", "channel_sortorder_"+uid, 99999)
|
conn.Do("SETNX", "channel_sortorder_"+uid, 99999)
|
||||||
|
|
@ -209,11 +216,24 @@ func (b *memoryBackend) ChannelsCreate(name string) (microsub.Channel, error) {
|
||||||
// ChannelsUpdate updates a channels
|
// ChannelsUpdate updates a channels
|
||||||
func (b *memoryBackend) ChannelsUpdate(uid, name string) (microsub.Channel, error) {
|
func (b *memoryBackend) ChannelsUpdate(uid, name string) (microsub.Channel, error) {
|
||||||
defer b.save()
|
defer b.save()
|
||||||
if c, e := b.Channels[uid]; e {
|
|
||||||
|
b.lock.RLock()
|
||||||
|
defer b.lock.RUnlock()
|
||||||
|
|
||||||
|
b.lock.RLock()
|
||||||
|
c, e := b.Channels[uid]
|
||||||
|
b.lock.RUnlock()
|
||||||
|
|
||||||
|
if e {
|
||||||
c.Name = name
|
c.Name = name
|
||||||
|
|
||||||
|
b.lock.Lock()
|
||||||
b.Channels[uid] = c
|
b.Channels[uid] = c
|
||||||
|
b.lock.Unlock()
|
||||||
|
|
||||||
return c, nil
|
return c, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return microsub.Channel{}, fmt.Errorf("Channel %s does not exist", uid)
|
return microsub.Channel{}, fmt.Errorf("Channel %s does not exist", uid)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -227,12 +247,26 @@ func (b *memoryBackend) ChannelsDelete(uid string) error {
|
||||||
conn.Do("SREM", "channels", uid)
|
conn.Do("SREM", "channels", uid)
|
||||||
conn.Do("DEL", "channel_sortorder_"+uid)
|
conn.Do("DEL", "channel_sortorder_"+uid)
|
||||||
|
|
||||||
|
b.lock.Lock()
|
||||||
delete(b.Channels, uid)
|
delete(b.Channels, uid)
|
||||||
delete(b.Feeds, uid)
|
delete(b.Feeds, uid)
|
||||||
|
b.lock.Unlock()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *memoryBackend) getFeeds() map[string][]string {
|
||||||
|
feeds := make(map[string][]string)
|
||||||
|
b.lock.RLock()
|
||||||
|
for uid := range b.Channels {
|
||||||
|
for _, feed := range b.Feeds[uid] {
|
||||||
|
feeds[uid] = append(feeds[uid], feed.URL)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
b.lock.RUnlock()
|
||||||
|
return feeds
|
||||||
|
}
|
||||||
|
|
||||||
func (b *memoryBackend) run() {
|
func (b *memoryBackend) run() {
|
||||||
b.ticker = time.NewTicker(10 * time.Minute)
|
b.ticker = time.NewTicker(10 * time.Minute)
|
||||||
b.quit = make(chan struct{})
|
b.quit = make(chan struct{})
|
||||||
|
|
@ -241,17 +275,20 @@ func (b *memoryBackend) run() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-b.ticker.C:
|
case <-b.ticker.C:
|
||||||
for uid := range b.Channels {
|
feeds := b.getFeeds()
|
||||||
for _, feed := range b.Feeds[uid] {
|
|
||||||
resp, err := b.Fetch3(uid, feed.URL)
|
for uid := range feeds {
|
||||||
|
for _, feedURL := range feeds[uid] {
|
||||||
|
resp, err := b.Fetch3(uid, feedURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Error while Fetch3 of %s: %v\n", feed.URL, err)
|
log.Printf("Error while Fetch3 of %s: %v\n", feedURL, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
b.ProcessContent(uid, feed.URL, resp.Header.Get("Content-Type"), resp.Body)
|
b.ProcessContent(uid, feedURL, resp.Header.Get("Content-Type"), resp.Body)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
case <-b.quit:
|
case <-b.quit:
|
||||||
b.ticker.Stop()
|
b.ticker.Stop()
|
||||||
return
|
return
|
||||||
|
|
@ -264,44 +301,6 @@ func (b *memoryBackend) TimelineGet(before, after, channel string) (microsub.Tim
|
||||||
conn := pool.Get()
|
conn := pool.Get()
|
||||||
defer conn.Close()
|
defer conn.Close()
|
||||||
|
|
||||||
if channel == "feedbin" {
|
|
||||||
fb := feedbin.New(os.Getenv("FEEDBIN_USER"), os.Getenv("FEEDBIN_PASS"))
|
|
||||||
|
|
||||||
entries, err := fb.Entries()
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return microsub.Timeline{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
feeds := make(map[int64]feedbin.Feed)
|
|
||||||
|
|
||||||
var items []microsub.Item
|
|
||||||
|
|
||||||
for _, entry := range entries {
|
|
||||||
var item microsub.Item
|
|
||||||
|
|
||||||
var feed feedbin.Feed
|
|
||||||
e := false
|
|
||||||
if feed, e = feeds[entry.FeedID]; !e {
|
|
||||||
feeds[entry.FeedID], _ = fb.Feed(entry.FeedID)
|
|
||||||
feed = feeds[entry.FeedID]
|
|
||||||
}
|
|
||||||
|
|
||||||
item.Type = "entry"
|
|
||||||
item.Name = entry.Title
|
|
||||||
item.Content = µsub.Content{HTML: entry.Content}
|
|
||||||
item.URL = entry.URL
|
|
||||||
item.Published = entry.Published.Format(time.RFC3339)
|
|
||||||
item.Author = µsub.Card{Type: "card", Name: feed.Title, URL: feed.SiteURL}
|
|
||||||
|
|
||||||
items = append(items, item)
|
|
||||||
}
|
|
||||||
return microsub.Timeline{
|
|
||||||
Paging: microsub.Pagination{},
|
|
||||||
Items: items,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Printf("TimelineGet %s\n", channel)
|
log.Printf("TimelineGet %s\n", channel)
|
||||||
feeds, err := b.FollowGetList(channel)
|
feeds, err := b.FollowGetList(channel)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -309,19 +308,9 @@ func (b *memoryBackend) TimelineGet(before, after, channel string) (microsub.Tim
|
||||||
}
|
}
|
||||||
log.Println(feeds)
|
log.Println(feeds)
|
||||||
|
|
||||||
items := []microsub.Item{}
|
var items []microsub.Item
|
||||||
|
|
||||||
zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)
|
zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)
|
||||||
// channelKey := fmt.Sprintf("channel:%s:posts", channel)
|
|
||||||
|
|
||||||
// itemJsons, err := redis.ByteSlices(conn.Do("SORT", channelKey, "BY", "*->Published", "GET", "*->Data", "ASC", "ALPHA"))
|
|
||||||
// if err != nil {
|
|
||||||
// log.Println(err)
|
|
||||||
// return microsub.Timeline{
|
|
||||||
// Paging: microsub.Pagination{},
|
|
||||||
// Items: items,
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
afterScore := "-inf"
|
afterScore := "-inf"
|
||||||
if len(after) != 0 {
|
if len(after) != 0 {
|
||||||
|
|
@ -332,7 +321,7 @@ func (b *memoryBackend) TimelineGet(before, after, channel string) (microsub.Tim
|
||||||
beforeScore = "(" + before
|
beforeScore = "(" + before
|
||||||
}
|
}
|
||||||
|
|
||||||
itemJSONs := [][]byte{}
|
var itemJSONs [][]byte
|
||||||
|
|
||||||
itemScores, err := redis.Strings(
|
itemScores, err := redis.Strings(
|
||||||
conn.Do(
|
conn.Do(
|
||||||
|
|
@ -391,41 +380,9 @@ func (b *memoryBackend) TimelineGet(before, after, channel string) (microsub.Tim
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// panic if s is not a slice
|
|
||||||
func reverseSlice(s interface{}) {
|
|
||||||
size := reflect.ValueOf(s).Len()
|
|
||||||
swap := reflect.Swapper(s)
|
|
||||||
for i, j := 0, size-1; i < j; i, j = i+1, j-1 {
|
|
||||||
swap(i, j)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// func (b *memoryBackend) checkRead(channel string, uid string) bool {
|
|
||||||
// conn := pool.Get()
|
|
||||||
// defer conn.Close()
|
|
||||||
// args := redis.Args{}.Add(fmt.Sprintf("timeline:%s:read", channel)).Add("item:" + uid)
|
|
||||||
// member, err := redis.Bool(conn.Do("SISMEMBER", args...))
|
|
||||||
// if err != nil {
|
|
||||||
// log.Printf("Checking read for channel %s item %s has failed\n", channel, uid)
|
|
||||||
// }
|
|
||||||
// return member
|
|
||||||
// }
|
|
||||||
|
|
||||||
// func (b *memoryBackend) wasRead(channel string, item map[string]interface{}) bool {
|
|
||||||
// if uid, e := item["uid"]; e {
|
|
||||||
// uid = hex.EncodeToString([]byte(uid.(string)))
|
|
||||||
// return b.checkRead(channel, uid.(string))
|
|
||||||
// }
|
|
||||||
|
|
||||||
// if uid, e := item["url"]; e {
|
|
||||||
// uid = hex.EncodeToString([]byte(uid.(string)))
|
|
||||||
// return b.checkRead(channel, uid.(string))
|
|
||||||
// }
|
|
||||||
|
|
||||||
// return false
|
|
||||||
// }
|
|
||||||
|
|
||||||
func (b *memoryBackend) FollowGetList(uid string) ([]microsub.Feed, error) {
|
func (b *memoryBackend) FollowGetList(uid string) ([]microsub.Feed, error) {
|
||||||
|
b.lock.RLock()
|
||||||
|
defer b.lock.RUnlock()
|
||||||
return b.Feeds[uid], nil
|
return b.Feeds[uid], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -439,7 +396,9 @@ func (b *memoryBackend) FollowURL(uid string, url string) (microsub.Feed, error)
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
b.lock.Lock()
|
||||||
b.Feeds[uid] = append(b.Feeds[uid], feed)
|
b.Feeds[uid] = append(b.Feeds[uid], feed)
|
||||||
|
b.lock.Unlock()
|
||||||
|
|
||||||
b.ProcessContent(uid, feed.URL, resp.Header.Get("Content-Type"), resp.Body)
|
b.ProcessContent(uid, feed.URL, resp.Header.Get("Content-Type"), resp.Body)
|
||||||
|
|
||||||
|
|
@ -449,6 +408,7 @@ func (b *memoryBackend) FollowURL(uid string, url string) (microsub.Feed, error)
|
||||||
func (b *memoryBackend) UnfollowURL(uid string, url string) error {
|
func (b *memoryBackend) UnfollowURL(uid string, url string) error {
|
||||||
defer b.save()
|
defer b.save()
|
||||||
index := -1
|
index := -1
|
||||||
|
b.lock.Lock()
|
||||||
for i, f := range b.Feeds[uid] {
|
for i, f := range b.Feeds[uid] {
|
||||||
if f.URL == url {
|
if f.URL == url {
|
||||||
index = i
|
index = i
|
||||||
|
|
@ -459,6 +419,7 @@ func (b *memoryBackend) UnfollowURL(uid string, url string) error {
|
||||||
feeds := b.Feeds[uid]
|
feeds := b.Feeds[uid]
|
||||||
b.Feeds[uid] = append(feeds[:index], feeds[index+1:]...)
|
b.Feeds[uid] = append(feeds[:index], feeds[index+1:]...)
|
||||||
}
|
}
|
||||||
|
b.lock.Unlock()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
@ -502,7 +463,7 @@ func getPossibleURLs(query string) []string {
|
||||||
func (b *memoryBackend) Search(query string) ([]microsub.Feed, error) {
|
func (b *memoryBackend) Search(query string) ([]microsub.Feed, error) {
|
||||||
urls := getPossibleURLs(query)
|
urls := getPossibleURLs(query)
|
||||||
|
|
||||||
feeds := []microsub.Feed{}
|
var feeds []microsub.Feed
|
||||||
|
|
||||||
for _, u := range urls {
|
for _, u := range urls {
|
||||||
log.Println(u)
|
log.Println(u)
|
||||||
|
|
@ -651,7 +612,12 @@ func (b *memoryBackend) channelAddItemWithMatcher(conn redis.Conn, channel strin
|
||||||
// - add item to channel
|
// - add item to channel
|
||||||
|
|
||||||
var updatedChannels []string
|
var updatedChannels []string
|
||||||
for channelKey, setting := range b.Settings {
|
|
||||||
|
b.lock.RLock()
|
||||||
|
settings := b.Settings
|
||||||
|
b.lock.RUnlock()
|
||||||
|
|
||||||
|
for channelKey, setting := range settings {
|
||||||
if setting.IncludeRegex != "" {
|
if setting.IncludeRegex != "" {
|
||||||
re, err := regexp.Compile(setting.IncludeRegex)
|
re, err := regexp.Compile(setting.IncludeRegex)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -673,17 +639,19 @@ func (b *memoryBackend) channelAddItemWithMatcher(conn redis.Conn, channel strin
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check for the exclude regex
|
// Check for the exclude regex
|
||||||
if setting, e := b.Settings[channel]; e {
|
b.lock.RLock()
|
||||||
if setting.ExcludeRegex != "" {
|
setting, exists := b.Settings[channel]
|
||||||
excludeRegex, err := regexp.Compile(setting.ExcludeRegex)
|
b.lock.RUnlock()
|
||||||
if err != nil {
|
|
||||||
log.Printf("error in regexp: %q\n", excludeRegex)
|
if exists && setting.ExcludeRegex != "" {
|
||||||
return nil
|
excludeRegex, err := regexp.Compile(setting.ExcludeRegex)
|
||||||
}
|
if err != nil {
|
||||||
if matchItem(item, excludeRegex) {
|
log.Printf("error in regexp: %q\n", excludeRegex)
|
||||||
log.Printf("Excluded %#v\n", item)
|
return nil
|
||||||
return nil
|
}
|
||||||
}
|
if matchItem(item, excludeRegex) {
|
||||||
|
log.Printf("Excluded %#v\n", item)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -766,7 +734,11 @@ func (b *memoryBackend) channelAddItem(conn redis.Conn, channel string, item mic
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *memoryBackend) updateChannelUnreadCount(conn redis.Conn, channel string) error {
|
func (b *memoryBackend) updateChannelUnreadCount(conn redis.Conn, channel string) error {
|
||||||
if c, e := b.Channels[channel]; e {
|
b.lock.RLock()
|
||||||
|
c, exists := b.Channels[channel]
|
||||||
|
b.lock.RUnlock()
|
||||||
|
|
||||||
|
if exists {
|
||||||
zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)
|
zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)
|
||||||
unread, err := redis.Int(conn.Do("ZCARD", zchannelKey))
|
unread, err := redis.Int(conn.Do("ZCARD", zchannelKey))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -774,8 +746,12 @@ func (b *memoryBackend) updateChannelUnreadCount(conn redis.Conn, channel string
|
||||||
}
|
}
|
||||||
defer b.save()
|
defer b.save()
|
||||||
c.Unread = unread
|
c.Unread = unread
|
||||||
|
|
||||||
|
b.lock.Lock()
|
||||||
b.Channels[channel] = c
|
b.Channels[channel] = c
|
||||||
|
b.lock.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -37,8 +37,8 @@ type microsubHandler struct {
|
||||||
|
|
||||||
func (h *microsubHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
func (h *microsubHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
var logger = log.New(os.Stdout, "logger: ", log.Lshortfile)
|
var logger = log.New(os.Stdout, "logger: ", log.Lshortfile)
|
||||||
h.Redis = redis.NewLoggingConn(pool.Get(), logger, "microsub")
|
conn := redis.NewLoggingConn(pool.Get(), logger, "microsub")
|
||||||
defer h.Redis.Close()
|
defer conn.Close()
|
||||||
|
|
||||||
r.ParseForm()
|
r.ParseForm()
|
||||||
log.Printf("%s %s\n", r.Method, r.URL)
|
log.Printf("%s %s\n", r.Method, r.URL)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user