Improve use of redis

This commit is contained in:
Peter Stuifzand 2018-04-09 19:20:32 +02:00
parent 2818576e2e
commit eeeb9f3b3f
5 changed files with 88 additions and 53 deletions

View File

@ -257,6 +257,8 @@ func (b *memoryBackend) feedItems(fetchURL, contentType string, body io.Reader)
return items, err
}
log.Printf("%#v\n", feed)
author := microsub.Card{}
author.Type = "card"
author.Name = feed.Author.Name
@ -275,7 +277,7 @@ func (b *memoryBackend) feedItems(fetchURL, contentType string, body io.Reader)
item.Content.Text = feedItem.ContentText
item.URL = feedItem.URL
item.Summary = []string{feedItem.Summary}
item.Id = hex.EncodeToString([]byte(feedItem.ID))
item.ID = hex.EncodeToString([]byte(feedItem.ID))
item.Published = feedItem.DatePublished
itemAuthor := microsub.Card{}
@ -310,9 +312,9 @@ func (b *memoryBackend) feedItems(fetchURL, contentType string, body io.Reader)
item.Content.Text = feedItem.Summary
item.URL = feedItem.Link
if feedItem.ID == "" {
item.Id = hex.EncodeToString([]byte(feedItem.Link))
item.ID = hex.EncodeToString([]byte(feedItem.Link))
} else {
item.Id = hex.EncodeToString([]byte(feedItem.ID))
item.ID = hex.EncodeToString([]byte(feedItem.ID))
}
item.Published = feedItem.Date.Format(time.RFC3339)
items = append(items, item)
@ -330,7 +332,7 @@ func (b *memoryBackend) ProcessContent(channel, fetchURL, contentType string, bo
}
for _, item := range items {
item.Read = b.checkRead(channel, item.Id)
item.Read = b.checkRead(channel, item.ID)
if item.Read {
continue
}
@ -341,21 +343,20 @@ func (b *memoryBackend) ProcessContent(channel, fetchURL, contentType string, bo
}
// Fetch3 fills stuff
func (b *memoryBackend) Fetch3(channel, fetchURL string) error {
func (b *memoryBackend) Fetch3(channel, fetchURL string) (*http.Response, error) {
log.Printf("Fetching channel=%s fetchURL=%s\n", channel, fetchURL)
resp, err := Fetch2(fetchURL)
if err != nil {
return err
return nil, err
}
defer resp.Body.Close()
return b.ProcessContent(channel, fetchURL, resp.Header.Get("Content-Type"), resp.Body)
return resp, nil
}
func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) {
log.Printf("Adding item to channel %s\n", channel)
log.Println(item)
conn := pool.Get()
defer conn.Close()
// send to redis
channelKey := fmt.Sprintf("channel:%s:posts", channel)
@ -366,26 +367,28 @@ func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) {
}
forRedis := redisItem{
Id: item.Id,
ID: item.ID,
Published: item.Published,
Read: item.Read,
Data: data,
}
itemKey := fmt.Sprintf("item:%s", item.Id)
_, err = redis.String(b.Redis.Do("HMSET", redis.Args{}.Add(itemKey).AddFlat(&forRedis)...))
itemKey := fmt.Sprintf("item:%s", item.ID)
_, err = redis.String(conn.Do("HMSET", redis.Args{}.Add(itemKey).AddFlat(&forRedis)...))
if err != nil {
log.Printf("error while writing item for redis: %v\n", err)
return
}
added, err := redis.Int64(b.Redis.Do("SADD", channelKey, itemKey))
added, err := redis.Int64(conn.Do("SADD", channelKey, itemKey))
if err != nil {
log.Printf("error while adding item %s to channel %s for redis: %v\n", itemKey, channelKey, err)
return
}
if added > 0 {
log.Printf("Adding item to channel %s\n", channel)
log.Println(item)
if c, e := b.Channels[channel]; e {
c.Unread = true
b.Channels[channel] = c
@ -394,7 +397,7 @@ func (b *memoryBackend) channelAddItem(channel string, item microsub.Item) {
}
type redisItem struct {
Id string
ID string
Published string
Read bool
Data []byte

View File

@ -53,7 +53,6 @@ type microsubHandler struct {
type hubIncomingBackend struct {
backend *memoryBackend
conn redis.Conn
}
const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
@ -67,7 +66,9 @@ func randStringBytes(n int) string {
}
func (h *hubIncomingBackend) GetSecret(id int64) string {
secret, err := redis.String(h.conn.Do("HGET", fmt.Sprintf("feed:%d", id), "secret"))
conn := pool.Get()
defer conn.Close()
secret, err := redis.String(conn.Do("HGET", fmt.Sprintf("feed:%d", id), "secret"))
if err != nil {
return ""
}
@ -77,16 +78,18 @@ func (h *hubIncomingBackend) GetSecret(id int64) string {
var hubURL = "https://hub.stuifzandapp.com/"
func (h *hubIncomingBackend) CreateFeed(topic string, channel string) (int64, error) {
id, err := redis.Int64(h.conn.Do("INCR", "feed:next_id"))
conn := pool.Get()
defer conn.Close()
id, err := redis.Int64(conn.Do("INCR", "feed:next_id"))
if err != nil {
return 0, err
}
h.conn.Do("HSET", fmt.Sprintf("feed:%d", id), "url", topic)
h.conn.Do("HSET", fmt.Sprintf("feed:%d", id), "channel", channel)
conn.Do("HSET", fmt.Sprintf("feed:%d", id), "url", topic)
conn.Do("HSET", fmt.Sprintf("feed:%d", id), "channel", channel)
secret := randStringBytes(16)
h.conn.Do("HSET", fmt.Sprintf("feed:%d", id), "secret", secret)
conn.Do("HSET", fmt.Sprintf("feed:%d", id), "secret", secret)
hub, err := url.Parse(hubURL)
q := hub.Query()
@ -109,12 +112,14 @@ func (h *hubIncomingBackend) CreateFeed(topic string, channel string) (int64, er
}
func (h *hubIncomingBackend) UpdateFeed(feedID int64, contentType string, body io.Reader) error {
conn := pool.Get()
defer conn.Close()
log.Printf("updating feed %d", feedID)
u, err := redis.String(h.conn.Do("HGET", fmt.Sprintf("feed:%d", feedID), "url"))
u, err := redis.String(conn.Do("HGET", fmt.Sprintf("feed:%d", feedID), "url"))
if err != nil {
return err
}
channel, err := redis.String(h.conn.Do("HGET", fmt.Sprintf("feed:%d", feedID), "channel"))
channel, err := redis.String(conn.Do("HGET", fmt.Sprintf("feed:%d", feedID), "channel"))
if err != nil {
return err
}
@ -276,8 +281,6 @@ func newPool(addr string) *redis.Pool {
func main() {
flag.Parse()
var logger = log.New(os.Stdout, "logger: ", log.Lshortfile)
createBackend := false
args := flag.Args()
@ -289,9 +292,6 @@ func main() {
pool = newPool(*redisServer)
conn := redis.NewLoggingConn(pool.Get(), logger, "microsub")
defer conn.Close()
var backend microsub.Microsub
if createBackend {
@ -299,9 +299,9 @@ func main() {
return
}
backend = loadMemoryBackend(conn)
backend = loadMemoryBackend()
hubBackend := hubIncomingBackend{backend.(*memoryBackend), conn}
hubBackend := hubIncomingBackend{backend.(*memoryBackend)}
http.Handle("/micropub", &micropubHandler{
Backend: backend.(*memoryBackend),

View File

@ -35,7 +35,6 @@ import (
)
type memoryBackend struct {
Redis redis.Conn
Channels map[string]microsub.Channel
Feeds map[string][]microsub.Feed
NextUid int
@ -56,7 +55,7 @@ func (b *memoryBackend) Debug() {
fmt.Println(b.Channels)
}
func (b *memoryBackend) load() {
func (b *memoryBackend) load() error {
filename := "backend.json"
f, err := os.Open(filename)
if err != nil {
@ -66,23 +65,33 @@ func (b *memoryBackend) load() {
jw := json.NewDecoder(f)
err = jw.Decode(b)
if err != nil {
panic("cant open backend.json")
return err
}
b.Redis.Do("SETNX", "channel_sortorder_notifications", 1)
conn := pool.Get()
defer conn.Close()
b.Redis.Do("DEL", "channels")
conn.Do("SETNX", "channel_sortorder_notifications", 1)
conn.Do("DEL", "channels")
for uid, channel := range b.Channels {
log.Printf("loading channel %s - %s\n", uid, channel.Name)
for _, feed := range b.Feeds[uid] {
log.Printf("- loading feed %s\n", feed.URL)
b.Fetch3(uid, feed.URL)
resp, err := b.Fetch3(uid, feed.URL)
if err != nil {
log.Printf("Error while Fetch3 of %s: %v\n", feed.URL, err)
continue
}
defer resp.Body.Close()
b.ProcessContent(uid, feed.URL, resp.Header.Get("Content-Type"), resp.Body)
}
b.Redis.Do("SADD", "channels", uid)
b.Redis.Do("SETNX", "channel_sortorder_"+uid, 99999)
conn.Do("SADD", "channels", uid)
conn.Do("SETNX", "channel_sortorder_"+uid, 99999)
}
return nil
}
func (b *memoryBackend) save() {
@ -93,10 +102,13 @@ func (b *memoryBackend) save() {
jw.Encode(b)
}
func loadMemoryBackend(conn redis.Conn) microsub.Microsub {
func loadMemoryBackend() microsub.Microsub {
backend := &memoryBackend{}
backend.Redis = conn
backend.load()
err := backend.load()
if err != nil {
log.Printf("Error while loadingbackend: %v\n", err)
return nil
}
return backend
}
@ -121,8 +133,11 @@ func createMemoryBackend() microsub.Microsub {
// ChannelsGetList gets channels
func (b *memoryBackend) ChannelsGetList() []microsub.Channel {
conn := pool.Get()
defer conn.Close()
channels := []microsub.Channel{}
uids, err := redis.Strings(b.Redis.Do("SORT", "channels", "BY", "channel_sortorder_*", "ASC"))
uids, err := redis.Strings(conn.Do("SORT", "channels", "BY", "channel_sortorder_*", "ASC"))
if err != nil {
log.Printf("Sorting channels failed: %v\n", err)
for _, v := range b.Channels {
@ -301,7 +316,7 @@ func mapToItem(result map[string]interface{}) microsub.Item {
}
if id, e := result["_id"]; e {
item.Id = id.(string)
item.ID = id.(string)
}
if read, e := result["_is_read"]; e {
item.Read = read.(bool)
@ -320,7 +335,13 @@ func (b *memoryBackend) run() {
case <-b.ticker.C:
for uid := range b.Channels {
for _, feed := range b.Feeds[uid] {
b.Fetch3(uid, feed.URL)
resp, err := b.Fetch3(uid, feed.URL)
if err != nil {
log.Printf("Error while Fetch3 of %s: %v\n", feed.URL, err)
continue
}
defer resp.Body.Close()
b.ProcessContent(uid, feed.URL, resp.Header.Get("Content-Type"), resp.Body)
}
}
case <-b.quit:
@ -332,6 +353,9 @@ func (b *memoryBackend) run() {
}
func (b *memoryBackend) TimelineGet(after, before, channel string) microsub.Timeline {
conn := pool.Get()
defer conn.Close()
log.Printf("TimelineGet %s\n", channel)
feeds := b.FollowGetList(channel)
log.Println(feeds)
@ -340,7 +364,7 @@ func (b *memoryBackend) TimelineGet(after, before, channel string) microsub.Time
channelKey := fmt.Sprintf("channel:%s:posts", channel)
itemJsons, err := redis.ByteSlices(b.Redis.Do("SORT", channelKey, "BY", "*->Published", "GET", "*->Data", "ASC", "ALPHA"))
itemJsons, err := redis.ByteSlices(conn.Do("SORT", channelKey, "BY", "*->Published", "GET", "*->Data", "ASC", "ALPHA"))
if err != nil {
log.Println(err)
return microsub.Timeline{
@ -352,7 +376,7 @@ func (b *memoryBackend) TimelineGet(after, before, channel string) microsub.Time
for _, obj := range itemJsons {
item := microsub.Item{}
json.Unmarshal(obj, &item)
item.Read = b.checkRead(channel, item.Id)
item.Read = b.checkRead(channel, item.ID)
if item.Read {
continue
}
@ -375,8 +399,10 @@ func reverseSlice(s interface{}) {
}
func (b *memoryBackend) checkRead(channel string, uid string) bool {
conn := pool.Get()
defer conn.Close()
args := redis.Args{}.Add(fmt.Sprintf("timeline:%s:read", channel)).Add("item:" + uid)
member, err := redis.Bool(b.Redis.Do("SISMEMBER", args...))
member, err := redis.Bool(conn.Do("SISMEMBER", args...))
if err != nil {
log.Printf("Checking read for channel %s item %s has failed\n", channel, uid)
}
@ -539,6 +565,9 @@ func (b *memoryBackend) PreviewURL(previewURL string) microsub.Timeline {
}
func (b *memoryBackend) MarkRead(channel string, uids []string) {
conn := pool.Get()
defer conn.Close()
log.Printf("Marking read for %s %v\n", channel, uids)
itemUIDs := []string{}
@ -547,7 +576,7 @@ func (b *memoryBackend) MarkRead(channel string, uids []string) {
}
args := redis.Args{}.Add(fmt.Sprintf("timeline:%s:read", channel)).AddFlat(itemUIDs)
if _, err := b.Redis.Do("SADD", args...); err != nil {
if _, err := conn.Do("SADD", args...); err != nil {
log.Printf("Marking read for channel %s has failed\n", channel)
}
log.Printf("Marking read success for %s %v\n", channel, itemUIDs)

View File

@ -17,10 +17,13 @@ type micropubHandler struct {
func (h *micropubHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
conn := pool.Get()
defer conn.Close()
if r.Method == http.MethodPost {
sourceID := r.URL.Query().Get("source_id")
channel, err := redis.String(h.Backend.Redis.Do("HGET", "sources", sourceID))
channel, err := redis.String(conn.Do("HGET", "sources", sourceID))
if err != nil {
http.Error(w, "Unknown source", 400)
return
@ -38,7 +41,7 @@ func (h *micropubHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
item.Read = false
item.Id = hex.EncodeToString([]byte(item.URL))
item.ID = hex.EncodeToString([]byte(item.URL))
h.Backend.channelAddItem(channel, item)
} else {

View File

@ -75,7 +75,7 @@ type Item struct {
Latitude string `json:"latitude,omitempty"`
Longitude string `json:"longitude,omitempty"`
Checkin Card `json:"checkin,omitempty"`
Id string `json:"_id"`
ID string `json:"_id"`
Read bool `json:"_is_read"`
}