From 76819ac8046fc8149ceed067faf72e8646e980c2 Mon Sep 17 00:00:00 2001 From: Peter Stuifzand Date: Sat, 23 Mar 2019 18:23:56 +0100 Subject: [PATCH] Remove global variable pool and replace fetcher with func --- cmd/eksterd/http.go | 7 ++- cmd/eksterd/hubbackend.go | 11 ++-- cmd/eksterd/main.go | 11 ++-- cmd/eksterd/memory.go | 104 ++++++++++++++++++++++-------------- cmd/eksterd/micropub.go | 3 +- cmd/eksterd/redisset.go | 9 ++-- cmd/eksterd/redisstreams.go | 8 +-- cmd/eksterd/timeline.go | 4 +- cmd/jf2test/main.go | 7 ++- pkg/fetch/fetch.go | 6 +-- pkg/fetch/fetcher.go | 5 +- pkg/server/microsub_test.go | 6 +++ 12 files changed, 112 insertions(+), 69 deletions(-) diff --git a/cmd/eksterd/http.go b/cmd/eksterd/http.go index 6a75d03..70ac289 100644 --- a/cmd/eksterd/http.go +++ b/cmd/eksterd/http.go @@ -24,6 +24,7 @@ type mainHandler struct { Backend *memoryBackend BaseURL string TemplateDir string + pool *redis.Pool } type session struct { @@ -85,7 +86,7 @@ type authRequest struct { AccessToken string `redis:"access_token"` } -func newMainHandler(backend *memoryBackend, baseURL, templateDir string) (*mainHandler, error) { +func newMainHandler(backend *memoryBackend, baseURL, templateDir string, pool *redis.Pool) (*mainHandler, error) { h := &mainHandler{Backend: backend} h.BaseURL = baseURL @@ -93,6 +94,8 @@ func newMainHandler(backend *memoryBackend, baseURL, templateDir string) (*mainH templateDir = strings.TrimRight(templateDir, "/") h.TemplateDir = templateDir + h.pool = pool + return h, nil } @@ -257,6 +260,8 @@ func getAppInfo(clientID string) (app, error) { } func (h *mainHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + pool := h.pool + conn := pool.Get() defer conn.Close() diff --git a/cmd/eksterd/hubbackend.go b/cmd/eksterd/hubbackend.go index 6bcbf96..fa3f97c 100644 --- a/cmd/eksterd/hubbackend.go +++ b/cmd/eksterd/hubbackend.go @@ -34,6 +34,7 @@ type HubBackend interface { type hubIncomingBackend struct { backend *memoryBackend baseURL string + pool *redis.Pool } // Feed contains information about the feed subscriptions @@ -49,7 +50,7 @@ type Feed struct { } func (h *hubIncomingBackend) GetSecret(id int64) string { - conn := pool.Get() + conn := h.pool.Get() defer conn.Close() secret, err := redis.String(conn.Do("HGET", fmt.Sprintf("feed:%d", id), "secret")) if err != nil { @@ -59,7 +60,7 @@ func (h *hubIncomingBackend) GetSecret(id int64) string { } func (h *hubIncomingBackend) CreateFeed(topic string, channel string) (int64, error) { - conn := pool.Get() + conn := h.pool.Get() defer conn.Close() // TODO(peter): check if topic already is registered @@ -104,7 +105,7 @@ func (h *hubIncomingBackend) CreateFeed(topic string, channel string) (int64, er } func (h *hubIncomingBackend) UpdateFeed(feedID int64, contentType string, body io.Reader) error { - conn := pool.Get() + conn := h.pool.Get() defer conn.Close() log.Printf("updating feed %d", feedID) u, err := redis.String(conn.Do("HGET", fmt.Sprintf("feed:%d", feedID), "url")) @@ -126,7 +127,7 @@ func (h *hubIncomingBackend) UpdateFeed(feedID int64, contentType string, body i } func (h *hubIncomingBackend) FeedSetLeaseSeconds(feedID int64, leaseSeconds int64) error { - conn := pool.Get() + conn := h.pool.Get() defer conn.Close() log.Printf("updating feed %d lease_seconds", feedID) @@ -152,7 +153,7 @@ func (h *hubIncomingBackend) GetFeeds() []Feed { // Feeds returns a list of subscribed feeds func (h *hubIncomingBackend) Feeds() ([]Feed, error) { - conn := pool.Get() + conn := h.pool.Get() defer conn.Close() feeds := []Feed{} diff --git a/cmd/eksterd/main.go b/cmd/eksterd/main.go index 904d789..c721a61 100644 --- a/cmd/eksterd/main.go +++ b/cmd/eksterd/main.go @@ -42,10 +42,11 @@ type AppOptions struct { RedisServer string BaseURL string TemplateDir string + pool *redis.Pool } var ( - pool *redis.Pool +// pool *redis.Pool ) func init() { @@ -113,8 +114,9 @@ func NewApp(options AppOptions) *App { app.backend = loadMemoryBackend() app.backend.AuthEnabled = options.AuthEnabled app.backend.baseURL = options.BaseURL + app.backend.pool = options.pool - app.hubBackend = &hubIncomingBackend{app.backend, options.BaseURL} + app.hubBackend = &hubIncomingBackend{app.backend, options.BaseURL, options.pool} http.Handle("/micropub", µpubHandler{ Backend: app.backend, @@ -134,7 +136,7 @@ func NewApp(options AppOptions) *App { }) if !options.Headless { - handler, err := newMainHandler(app.backend, options.BaseURL, options.TemplateDir) + handler, err := newMainHandler(app.backend, options.BaseURL, options.TemplateDir, options.pool) if err != nil { log.Fatal(err) } @@ -200,7 +202,8 @@ func main() { return } - pool = newPool(options.RedisServer) + pool := newPool(options.RedisServer) + options.pool = pool NewApp(options).Run() } diff --git a/cmd/eksterd/memory.go b/cmd/eksterd/memory.go index 238f838..91afd01 100644 --- a/cmd/eksterd/memory.go +++ b/cmd/eksterd/memory.go @@ -45,6 +45,8 @@ type memoryBackend struct { quit chan struct{} broker *server.Broker + + pool *redis.Pool } type channelSetting struct { @@ -78,7 +80,7 @@ func (f *fetch2) Fetch(url string) (*http.Response, error) { } func (b *memoryBackend) AuthTokenAccepted(header string, r *auth.TokenResponse) bool { - conn := pool.Get() + conn := b.pool.Get() defer conn.Close() return b.cachedCheckAuthToken(conn, header, r) } @@ -108,7 +110,7 @@ func (b *memoryBackend) load() error { } func (b *memoryBackend) refreshChannels() { - conn := pool.Get() + conn := b.pool.Get() defer conn.Close() conn.Do("DEL", "channels") @@ -173,7 +175,7 @@ func createMemoryBackend() { // ChannelsGetList gets channels func (b *memoryBackend) ChannelsGetList() ([]microsub.Channel, error) { - conn := pool.Get() + conn := b.pool.Get() defer conn.Close() b.lock.RLock() @@ -207,7 +209,7 @@ func (b *memoryBackend) ChannelsCreate(name string) (microsub.Channel, error) { channel := b.createChannel(name) b.setChannel(channel) - conn := pool.Get() + conn := b.pool.Get() defer conn.Close() updateChannelInRedis(conn, channel.UID, DefaultPrio) @@ -240,7 +242,7 @@ func (b *memoryBackend) ChannelsUpdate(uid, name string) (microsub.Channel, erro func (b *memoryBackend) ChannelsDelete(uid string) error { defer b.save() - conn := pool.Get() + conn := b.pool.Get() defer conn.Close() removeChannelFromRedis(conn, uid) @@ -423,9 +425,11 @@ func (b *memoryBackend) Search(query string) ([]microsub.Feed, error) { // needs to be like this, because we get a null result otherwise in the json output feeds := []microsub.Feed{} + cachingFetch := WithCaching(b.pool, Fetch2) + for _, u := range urls { log.Println(u) - resp, err := Fetch2(u) + resp, err := cachingFetch(u) if err != nil { log.Printf("Error while fetching %s: %v\n", u, err) continue @@ -439,7 +443,7 @@ func (b *memoryBackend) Search(query string) ([]microsub.Feed, error) { continue } - feedResp, err := Fetch2(fetchURL.String()) + feedResp, err := cachingFetch(fetchURL.String()) if err != nil { log.Printf("Error in fetch of %s - %v\n", fetchURL, err) continue @@ -447,7 +451,7 @@ func (b *memoryBackend) Search(query string) ([]microsub.Feed, error) { defer feedResp.Body.Close() // TODO: Combine FeedHeader and FeedItems so we can use it here - parsedFeed, err := fetch.FeedHeader(&fetch2{}, fetchURL.String(), feedResp.Header.Get("Content-Type"), feedResp.Body) + parsedFeed, err := fetch.FeedHeader(cachingFetch, fetchURL.String(), feedResp.Header.Get("Content-Type"), feedResp.Body) if err != nil { log.Printf("Error in parse of %s - %v\n", fetchURL, err) continue @@ -462,7 +466,7 @@ func (b *memoryBackend) Search(query string) ([]microsub.Feed, error) { log.Printf("alternate found with type %s %#v\n", relURL.Type, relURL) if strings.HasPrefix(relURL.Type, "text/html") || strings.HasPrefix(relURL.Type, "application/json") || strings.HasPrefix(relURL.Type, "application/xml") || strings.HasPrefix(relURL.Type, "text/xml") || strings.HasPrefix(relURL.Type, "application/rss+xml") || strings.HasPrefix(relURL.Type, "application/atom+xml") { - feedResp, err := Fetch2(alt) + feedResp, err := cachingFetch(alt) if err != nil { log.Printf("Error in fetch of %s - %v\n", alt, err) continue @@ -470,7 +474,7 @@ func (b *memoryBackend) Search(query string) ([]microsub.Feed, error) { // FIXME: don't defer in for loop (possible memory leak) defer feedResp.Body.Close() - parsedFeed, err := fetch.FeedHeader(&fetch2{}, alt, feedResp.Header.Get("Content-Type"), feedResp.Body) + parsedFeed, err := fetch.FeedHeader(cachingFetch, alt, feedResp.Header.Get("Content-Type"), feedResp.Body) if err != nil { log.Printf("Error in parse of %s - %v\n", alt, err) continue @@ -486,12 +490,13 @@ func (b *memoryBackend) Search(query string) ([]microsub.Feed, error) { } func (b *memoryBackend) PreviewURL(previewURL string) (microsub.Timeline, error) { - resp, err := Fetch2(previewURL) + cachingFetch := WithCaching(b.pool, Fetch2) + resp, err := cachingFetch(previewURL) if err != nil { return microsub.Timeline{}, fmt.Errorf("error while fetching %s: %v", previewURL, err) } defer resp.Body.Close() - items, err := fetch.FeedItems(&fetch2{}, previewURL, resp.Header.Get("content-type"), resp.Body) + items, err := fetch.FeedItems(cachingFetch, previewURL, resp.Header.Get("content-type"), resp.Body) if err != nil { return microsub.Timeline{}, fmt.Errorf("error while fetching %s: %v", previewURL, err) } @@ -518,10 +523,9 @@ func (b *memoryBackend) MarkRead(channel string, uids []string) error { } func (b *memoryBackend) ProcessContent(channel, fetchURL, contentType string, body io.Reader) error { - conn := pool.Get() - defer conn.Close() + cachingFetch := WithCaching(b.pool, Fetch2) - items, err := fetch.FeedItems(&fetch2{}, fetchURL, contentType, body) + items, err := fetch.FeedItems(cachingFetch, fetchURL, contentType, body) if err != nil { return err } @@ -666,11 +670,54 @@ func (b *memoryBackend) updateChannelUnreadCount(channel string) error { return nil } -// Fetch2 fetches stuff -func Fetch2(fetchURL string) (*http.Response, error) { +// WithCaching adds caching to a FetcherFunc +func WithCaching(pool *redis.Pool, ff fetch.FetcherFunc) fetch.FetcherFunc { conn := pool.Get() defer conn.Close() + return func(fetchURL string) (*http.Response, error) { + cacheKey := fmt.Sprintf("http_cache:%s", fetchURL) + u, err := url.Parse(fetchURL) + if err != nil { + return nil, fmt.Errorf("error parsing %s as url: %s", fetchURL, err) + } + + req, err := http.NewRequest("GET", u.String(), nil) + + data, err := redis.Bytes(conn.Do("GET", cacheKey)) + if err == nil { + log.Printf("HIT %s\n", fetchURL) + rd := bufio.NewReader(bytes.NewReader(data)) + return http.ReadResponse(rd, req) + } + + log.Printf("MISS %s\n", fetchURL) + + resp, err := ff(fetchURL) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var b bytes.Buffer + err = resp.Write(&b) + if err != nil { + return nil, err + } + + cachedCopy := make([]byte, b.Len()) + cur := b.Bytes() + copy(cachedCopy, cur) + + conn.Do("SET", cacheKey, cachedCopy, "EX", 60*60) + + cachedResp, err := http.ReadResponse(bufio.NewReader(bytes.NewReader(cachedCopy)), req) + return cachedResp, err + } +} + +// Fetch2 fetches stuff +func Fetch2(fetchURL string) (*http.Response, error) { if !strings.HasPrefix(fetchURL, "http") { return nil, fmt.Errorf("error parsing %s as url, has no http(s) prefix", fetchURL) } @@ -682,34 +729,13 @@ func Fetch2(fetchURL string) (*http.Response, error) { req, err := http.NewRequest("GET", u.String(), nil) - cacheKey := fmt.Sprintf("http_cache:%s", u.String()) - data, err := redis.Bytes(conn.Do("GET", cacheKey)) - if err == nil { - log.Printf("HIT %s\n", u.String()) - rd := bufio.NewReader(bytes.NewReader(data)) - return http.ReadResponse(rd, req) - } - - log.Printf("MISS %s\n", u.String()) - client := http.Client{} resp, err := client.Do(req) if err != nil { return nil, fmt.Errorf("fetch failed: %s: %s", u, err) } - defer resp.Body.Close() - var b bytes.Buffer - resp.Write(&b) - - cachedCopy := make([]byte, b.Len()) - cur := b.Bytes() - copy(cachedCopy, cur) - - conn.Do("SET", cacheKey, cachedCopy, "EX", 60*60) - - cachedResp, err := http.ReadResponse(bufio.NewReader(bytes.NewReader(cachedCopy)), req) - return cachedResp, err + return resp, err } func (b *memoryBackend) createChannel(name string) microsub.Channel { diff --git a/cmd/eksterd/micropub.go b/cmd/eksterd/micropub.go index 9a5749b..3d3c862 100644 --- a/cmd/eksterd/micropub.go +++ b/cmd/eksterd/micropub.go @@ -18,6 +18,7 @@ import ( type micropubHandler struct { Backend *memoryBackend + pool *redis.Pool } /* @@ -30,7 +31,7 @@ type micropubHandler struct { func (h *micropubHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() - conn := pool.Get() + conn := h.pool.Get() defer conn.Close() r.ParseForm() diff --git a/cmd/eksterd/redisset.go b/cmd/eksterd/redisset.go index a452ae0..3bd5e74 100644 --- a/cmd/eksterd/redisset.go +++ b/cmd/eksterd/redisset.go @@ -12,6 +12,7 @@ import ( type redisSortedSetTimeline struct { channel string + pool *redis.Pool } /* @@ -22,7 +23,7 @@ func (timeline *redisSortedSetTimeline) Init() error { } func (timeline *redisSortedSetTimeline) Items(before, after string) (microsub.Timeline, error) { - conn := pool.Get() + conn := timeline.pool.Get() defer conn.Close() items := []microsub.Item{} @@ -103,7 +104,7 @@ func (timeline *redisSortedSetTimeline) Items(before, after string) (microsub.Ti } func (timeline *redisSortedSetTimeline) AddItem(item microsub.Item) error { - conn := pool.Get() + conn := timeline.pool.Get() defer conn.Close() channel := timeline.channel @@ -158,7 +159,7 @@ func (timeline *redisSortedSetTimeline) AddItem(item microsub.Item) error { } func (timeline *redisSortedSetTimeline) Count() (int, error) { - conn := pool.Get() + conn := timeline.pool.Get() defer conn.Close() channel := timeline.channel @@ -171,7 +172,7 @@ func (timeline *redisSortedSetTimeline) Count() (int, error) { } func (timeline *redisSortedSetTimeline) MarkRead(uids []string) error { - conn := pool.Get() + conn := timeline.pool.Get() defer conn.Close() channel := timeline.channel diff --git a/cmd/eksterd/redisstreams.go b/cmd/eksterd/redisstreams.go index c1e96d0..41700ef 100644 --- a/cmd/eksterd/redisstreams.go +++ b/cmd/eksterd/redisstreams.go @@ -12,6 +12,8 @@ import ( type redisStreamTimeline struct { channel, channelKey string + + pool *redis.Pool } /* @@ -23,7 +25,7 @@ func (timeline *redisStreamTimeline) Init() error { } func (timeline *redisStreamTimeline) Items(before, after string) (microsub.Timeline, error) { - conn := pool.Get() + conn := timeline.pool.Get() defer conn.Close() if before == "" { @@ -69,7 +71,7 @@ func (timeline *redisStreamTimeline) Items(before, after string) (microsub.Timel } func (timeline *redisStreamTimeline) AddItem(item microsub.Item) error { - conn := pool.Get() + conn := timeline.pool.Get() defer conn.Close() if item.Published == "" { @@ -90,7 +92,7 @@ func (timeline *redisStreamTimeline) AddItem(item microsub.Item) error { } func (timeline *redisStreamTimeline) Count() (int, error) { - conn := pool.Get() + conn := timeline.pool.Get() defer conn.Close() return redis.Int(conn.Do("XLEN", timeline.channelKey)) diff --git a/cmd/eksterd/timeline.go b/cmd/eksterd/timeline.go index 391758f..8d18690 100644 --- a/cmd/eksterd/timeline.go +++ b/cmd/eksterd/timeline.go @@ -31,7 +31,7 @@ func (b *memoryBackend) getTimeline(channel string) TimelineBackend { } if timelineType == "sorted-set" { - timeline := &redisSortedSetTimeline{channel} + timeline := &redisSortedSetTimeline{channel: channel, pool: b.pool} err := timeline.Init() if err != nil { return nil @@ -39,7 +39,7 @@ func (b *memoryBackend) getTimeline(channel string) TimelineBackend { return timeline } if timelineType == "stream" { - timeline := &redisStreamTimeline{channel: channel} + timeline := &redisStreamTimeline{channel: channel, pool: b.pool} err := timeline.Init() if err != nil { return nil diff --git a/cmd/jf2test/main.go b/cmd/jf2test/main.go index 426307f..c927d5b 100644 --- a/cmd/jf2test/main.go +++ b/cmd/jf2test/main.go @@ -17,9 +17,8 @@ func init() { log.SetOutput(f) } -type fetcher struct{} - -func (f fetcher) Fetch(url string) (*http.Response, error) { +// Fetch calls http.Get +func Fetch(url string) (*http.Response, error) { return http.Get(url) } @@ -33,7 +32,7 @@ func main() { } defer resp.Body.Close() - items, err := fetch.FeedItems(fetcher{}, url, resp.Header.Get("Content-Type"), resp.Body) + items, err := fetch.FeedItems(Fetch, url, resp.Header.Get("Content-Type"), resp.Body) if err != nil { log.Fatal(err) } diff --git a/pkg/fetch/fetch.go b/pkg/fetch/fetch.go index fb29c4a..4294ce2 100644 --- a/pkg/fetch/fetch.go +++ b/pkg/fetch/fetch.go @@ -25,7 +25,7 @@ import ( ) // FeedHeader returns a new microsub.Feed with the information parsed from body. -func FeedHeader(fetcher Fetcher, fetchURL, contentType string, body io.Reader) (microsub.Feed, error) { +func FeedHeader(fetcher FetcherFunc, fetchURL, contentType string, body io.Reader) (microsub.Feed, error) { log.Printf("ProcessContent %s\n", fetchURL) log.Println("Found " + contentType) @@ -38,7 +38,7 @@ func FeedHeader(fetcher Fetcher, fetchURL, contentType string, body io.Reader) ( author, ok := jf2.SimplifyMicroformatDataAuthor(data) if !ok { if strings.HasPrefix(author.URL, "http") { - resp, err := fetcher.Fetch(fetchURL) + resp, err := fetcher(fetchURL) if err != nil { return feed, err } @@ -108,7 +108,7 @@ func FeedHeader(fetcher Fetcher, fetchURL, contentType string, body io.Reader) ( } // FeedItems returns the items from the url, parsed from body. -func FeedItems(fetcher Fetcher, fetchURL, contentType string, body io.Reader) ([]microsub.Item, error) { +func FeedItems(fetcher FetcherFunc, fetchURL, contentType string, body io.Reader) ([]microsub.Item, error) { log.Printf("ProcessContent %s\n", fetchURL) log.Println("Found " + contentType) diff --git a/pkg/fetch/fetcher.go b/pkg/fetch/fetcher.go index 0f6561c..6769a0b 100644 --- a/pkg/fetch/fetcher.go +++ b/pkg/fetch/fetcher.go @@ -2,6 +2,5 @@ package fetch import "net/http" -type Fetcher interface { - Fetch(url string) (*http.Response, error) -} \ No newline at end of file +// FetcherFunc is a function that fetches an url +type FetcherFunc func(url string) (*http.Response, error) diff --git a/pkg/server/microsub_test.go b/pkg/server/microsub_test.go index 57bbb41..0dd268a 100644 --- a/pkg/server/microsub_test.go +++ b/pkg/server/microsub_test.go @@ -1,6 +1,8 @@ package server import ( + "io/ioutil" + "log" "net/http" "net/http/httptest" "net/url" @@ -11,6 +13,10 @@ import ( "p83.nl/go/ekster/pkg/microsub" ) +func init() { + log.SetOutput(ioutil.Discard) +} + func createServerClient() (*httptest.Server, *client.Client) { backend := &NullBackend{}