Remove global variable pool and replace fetcher with func
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Peter Stuifzand 2019-03-23 18:23:56 +01:00
parent 8f59930268
commit 76819ac804
Signed by: peter
GPG Key ID: 374322D56E5209E8
12 changed files with 112 additions and 69 deletions

View File

@ -24,6 +24,7 @@ type mainHandler struct {
Backend *memoryBackend
BaseURL string
TemplateDir string
pool *redis.Pool
}
type session struct {
@ -85,7 +86,7 @@ type authRequest struct {
AccessToken string `redis:"access_token"`
}
func newMainHandler(backend *memoryBackend, baseURL, templateDir string) (*mainHandler, error) {
func newMainHandler(backend *memoryBackend, baseURL, templateDir string, pool *redis.Pool) (*mainHandler, error) {
h := &mainHandler{Backend: backend}
h.BaseURL = baseURL
@ -93,6 +94,8 @@ func newMainHandler(backend *memoryBackend, baseURL, templateDir string) (*mainH
templateDir = strings.TrimRight(templateDir, "/")
h.TemplateDir = templateDir
h.pool = pool
return h, nil
}
@ -257,6 +260,8 @@ func getAppInfo(clientID string) (app, error) {
}
func (h *mainHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
pool := h.pool
conn := pool.Get()
defer conn.Close()

View File

@ -34,6 +34,7 @@ type HubBackend interface {
type hubIncomingBackend struct {
backend *memoryBackend
baseURL string
pool *redis.Pool
}
// Feed contains information about the feed subscriptions
@ -49,7 +50,7 @@ type Feed struct {
}
func (h *hubIncomingBackend) GetSecret(id int64) string {
conn := pool.Get()
conn := h.pool.Get()
defer conn.Close()
secret, err := redis.String(conn.Do("HGET", fmt.Sprintf("feed:%d", id), "secret"))
if err != nil {
@ -59,7 +60,7 @@ func (h *hubIncomingBackend) GetSecret(id int64) string {
}
func (h *hubIncomingBackend) CreateFeed(topic string, channel string) (int64, error) {
conn := pool.Get()
conn := h.pool.Get()
defer conn.Close()
// TODO(peter): check if topic already is registered
@ -104,7 +105,7 @@ func (h *hubIncomingBackend) CreateFeed(topic string, channel string) (int64, er
}
func (h *hubIncomingBackend) UpdateFeed(feedID int64, contentType string, body io.Reader) error {
conn := pool.Get()
conn := h.pool.Get()
defer conn.Close()
log.Printf("updating feed %d", feedID)
u, err := redis.String(conn.Do("HGET", fmt.Sprintf("feed:%d", feedID), "url"))
@ -126,7 +127,7 @@ func (h *hubIncomingBackend) UpdateFeed(feedID int64, contentType string, body i
}
func (h *hubIncomingBackend) FeedSetLeaseSeconds(feedID int64, leaseSeconds int64) error {
conn := pool.Get()
conn := h.pool.Get()
defer conn.Close()
log.Printf("updating feed %d lease_seconds", feedID)
@ -152,7 +153,7 @@ func (h *hubIncomingBackend) GetFeeds() []Feed {
// Feeds returns a list of subscribed feeds
func (h *hubIncomingBackend) Feeds() ([]Feed, error) {
conn := pool.Get()
conn := h.pool.Get()
defer conn.Close()
feeds := []Feed{}

View File

@ -42,10 +42,11 @@ type AppOptions struct {
RedisServer string
BaseURL string
TemplateDir string
pool *redis.Pool
}
var (
pool *redis.Pool
// pool *redis.Pool
)
func init() {
@ -113,8 +114,9 @@ func NewApp(options AppOptions) *App {
app.backend = loadMemoryBackend()
app.backend.AuthEnabled = options.AuthEnabled
app.backend.baseURL = options.BaseURL
app.backend.pool = options.pool
app.hubBackend = &hubIncomingBackend{app.backend, options.BaseURL}
app.hubBackend = &hubIncomingBackend{app.backend, options.BaseURL, options.pool}
http.Handle("/micropub", &micropubHandler{
Backend: app.backend,
@ -134,7 +136,7 @@ func NewApp(options AppOptions) *App {
})
if !options.Headless {
handler, err := newMainHandler(app.backend, options.BaseURL, options.TemplateDir)
handler, err := newMainHandler(app.backend, options.BaseURL, options.TemplateDir, options.pool)
if err != nil {
log.Fatal(err)
}
@ -200,7 +202,8 @@ func main() {
return
}
pool = newPool(options.RedisServer)
pool := newPool(options.RedisServer)
options.pool = pool
NewApp(options).Run()
}

View File

@ -45,6 +45,8 @@ type memoryBackend struct {
quit chan struct{}
broker *server.Broker
pool *redis.Pool
}
type channelSetting struct {
@ -78,7 +80,7 @@ func (f *fetch2) Fetch(url string) (*http.Response, error) {
}
func (b *memoryBackend) AuthTokenAccepted(header string, r *auth.TokenResponse) bool {
conn := pool.Get()
conn := b.pool.Get()
defer conn.Close()
return b.cachedCheckAuthToken(conn, header, r)
}
@ -108,7 +110,7 @@ func (b *memoryBackend) load() error {
}
func (b *memoryBackend) refreshChannels() {
conn := pool.Get()
conn := b.pool.Get()
defer conn.Close()
conn.Do("DEL", "channels")
@ -173,7 +175,7 @@ func createMemoryBackend() {
// ChannelsGetList gets channels
func (b *memoryBackend) ChannelsGetList() ([]microsub.Channel, error) {
conn := pool.Get()
conn := b.pool.Get()
defer conn.Close()
b.lock.RLock()
@ -207,7 +209,7 @@ func (b *memoryBackend) ChannelsCreate(name string) (microsub.Channel, error) {
channel := b.createChannel(name)
b.setChannel(channel)
conn := pool.Get()
conn := b.pool.Get()
defer conn.Close()
updateChannelInRedis(conn, channel.UID, DefaultPrio)
@ -240,7 +242,7 @@ func (b *memoryBackend) ChannelsUpdate(uid, name string) (microsub.Channel, erro
func (b *memoryBackend) ChannelsDelete(uid string) error {
defer b.save()
conn := pool.Get()
conn := b.pool.Get()
defer conn.Close()
removeChannelFromRedis(conn, uid)
@ -423,9 +425,11 @@ func (b *memoryBackend) Search(query string) ([]microsub.Feed, error) {
// needs to be like this, because we get a null result otherwise in the json output
feeds := []microsub.Feed{}
cachingFetch := WithCaching(b.pool, Fetch2)
for _, u := range urls {
log.Println(u)
resp, err := Fetch2(u)
resp, err := cachingFetch(u)
if err != nil {
log.Printf("Error while fetching %s: %v\n", u, err)
continue
@ -439,7 +443,7 @@ func (b *memoryBackend) Search(query string) ([]microsub.Feed, error) {
continue
}
feedResp, err := Fetch2(fetchURL.String())
feedResp, err := cachingFetch(fetchURL.String())
if err != nil {
log.Printf("Error in fetch of %s - %v\n", fetchURL, err)
continue
@ -447,7 +451,7 @@ func (b *memoryBackend) Search(query string) ([]microsub.Feed, error) {
defer feedResp.Body.Close()
// TODO: Combine FeedHeader and FeedItems so we can use it here
parsedFeed, err := fetch.FeedHeader(&fetch2{}, fetchURL.String(), feedResp.Header.Get("Content-Type"), feedResp.Body)
parsedFeed, err := fetch.FeedHeader(cachingFetch, fetchURL.String(), feedResp.Header.Get("Content-Type"), feedResp.Body)
if err != nil {
log.Printf("Error in parse of %s - %v\n", fetchURL, err)
continue
@ -462,7 +466,7 @@ func (b *memoryBackend) Search(query string) ([]microsub.Feed, error) {
log.Printf("alternate found with type %s %#v\n", relURL.Type, relURL)
if strings.HasPrefix(relURL.Type, "text/html") || strings.HasPrefix(relURL.Type, "application/json") || strings.HasPrefix(relURL.Type, "application/xml") || strings.HasPrefix(relURL.Type, "text/xml") || strings.HasPrefix(relURL.Type, "application/rss+xml") || strings.HasPrefix(relURL.Type, "application/atom+xml") {
feedResp, err := Fetch2(alt)
feedResp, err := cachingFetch(alt)
if err != nil {
log.Printf("Error in fetch of %s - %v\n", alt, err)
continue
@ -470,7 +474,7 @@ func (b *memoryBackend) Search(query string) ([]microsub.Feed, error) {
// FIXME: don't defer in for loop (possible memory leak)
defer feedResp.Body.Close()
parsedFeed, err := fetch.FeedHeader(&fetch2{}, alt, feedResp.Header.Get("Content-Type"), feedResp.Body)
parsedFeed, err := fetch.FeedHeader(cachingFetch, alt, feedResp.Header.Get("Content-Type"), feedResp.Body)
if err != nil {
log.Printf("Error in parse of %s - %v\n", alt, err)
continue
@ -486,12 +490,13 @@ func (b *memoryBackend) Search(query string) ([]microsub.Feed, error) {
}
func (b *memoryBackend) PreviewURL(previewURL string) (microsub.Timeline, error) {
resp, err := Fetch2(previewURL)
cachingFetch := WithCaching(b.pool, Fetch2)
resp, err := cachingFetch(previewURL)
if err != nil {
return microsub.Timeline{}, fmt.Errorf("error while fetching %s: %v", previewURL, err)
}
defer resp.Body.Close()
items, err := fetch.FeedItems(&fetch2{}, previewURL, resp.Header.Get("content-type"), resp.Body)
items, err := fetch.FeedItems(cachingFetch, previewURL, resp.Header.Get("content-type"), resp.Body)
if err != nil {
return microsub.Timeline{}, fmt.Errorf("error while fetching %s: %v", previewURL, err)
}
@ -518,10 +523,9 @@ func (b *memoryBackend) MarkRead(channel string, uids []string) error {
}
func (b *memoryBackend) ProcessContent(channel, fetchURL, contentType string, body io.Reader) error {
conn := pool.Get()
defer conn.Close()
cachingFetch := WithCaching(b.pool, Fetch2)
items, err := fetch.FeedItems(&fetch2{}, fetchURL, contentType, body)
items, err := fetch.FeedItems(cachingFetch, fetchURL, contentType, body)
if err != nil {
return err
}
@ -666,11 +670,54 @@ func (b *memoryBackend) updateChannelUnreadCount(channel string) error {
return nil
}
// Fetch2 fetches stuff
func Fetch2(fetchURL string) (*http.Response, error) {
// WithCaching adds caching to a FetcherFunc
func WithCaching(pool *redis.Pool, ff fetch.FetcherFunc) fetch.FetcherFunc {
conn := pool.Get()
defer conn.Close()
return func(fetchURL string) (*http.Response, error) {
cacheKey := fmt.Sprintf("http_cache:%s", fetchURL)
u, err := url.Parse(fetchURL)
if err != nil {
return nil, fmt.Errorf("error parsing %s as url: %s", fetchURL, err)
}
req, err := http.NewRequest("GET", u.String(), nil)
data, err := redis.Bytes(conn.Do("GET", cacheKey))
if err == nil {
log.Printf("HIT %s\n", fetchURL)
rd := bufio.NewReader(bytes.NewReader(data))
return http.ReadResponse(rd, req)
}
log.Printf("MISS %s\n", fetchURL)
resp, err := ff(fetchURL)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var b bytes.Buffer
err = resp.Write(&b)
if err != nil {
return nil, err
}
cachedCopy := make([]byte, b.Len())
cur := b.Bytes()
copy(cachedCopy, cur)
conn.Do("SET", cacheKey, cachedCopy, "EX", 60*60)
cachedResp, err := http.ReadResponse(bufio.NewReader(bytes.NewReader(cachedCopy)), req)
return cachedResp, err
}
}
// Fetch2 fetches stuff
func Fetch2(fetchURL string) (*http.Response, error) {
if !strings.HasPrefix(fetchURL, "http") {
return nil, fmt.Errorf("error parsing %s as url, has no http(s) prefix", fetchURL)
}
@ -682,34 +729,13 @@ func Fetch2(fetchURL string) (*http.Response, error) {
req, err := http.NewRequest("GET", u.String(), nil)
cacheKey := fmt.Sprintf("http_cache:%s", u.String())
data, err := redis.Bytes(conn.Do("GET", cacheKey))
if err == nil {
log.Printf("HIT %s\n", u.String())
rd := bufio.NewReader(bytes.NewReader(data))
return http.ReadResponse(rd, req)
}
log.Printf("MISS %s\n", u.String())
client := http.Client{}
resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("fetch failed: %s: %s", u, err)
}
defer resp.Body.Close()
var b bytes.Buffer
resp.Write(&b)
cachedCopy := make([]byte, b.Len())
cur := b.Bytes()
copy(cachedCopy, cur)
conn.Do("SET", cacheKey, cachedCopy, "EX", 60*60)
cachedResp, err := http.ReadResponse(bufio.NewReader(bytes.NewReader(cachedCopy)), req)
return cachedResp, err
return resp, err
}
func (b *memoryBackend) createChannel(name string) microsub.Channel {

View File

@ -18,6 +18,7 @@ import (
type micropubHandler struct {
Backend *memoryBackend
pool *redis.Pool
}
/*
@ -30,7 +31,7 @@ type micropubHandler struct {
func (h *micropubHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
conn := pool.Get()
conn := h.pool.Get()
defer conn.Close()
r.ParseForm()

View File

@ -12,6 +12,7 @@ import (
type redisSortedSetTimeline struct {
channel string
pool *redis.Pool
}
/*
@ -22,7 +23,7 @@ func (timeline *redisSortedSetTimeline) Init() error {
}
func (timeline *redisSortedSetTimeline) Items(before, after string) (microsub.Timeline, error) {
conn := pool.Get()
conn := timeline.pool.Get()
defer conn.Close()
items := []microsub.Item{}
@ -103,7 +104,7 @@ func (timeline *redisSortedSetTimeline) Items(before, after string) (microsub.Ti
}
func (timeline *redisSortedSetTimeline) AddItem(item microsub.Item) error {
conn := pool.Get()
conn := timeline.pool.Get()
defer conn.Close()
channel := timeline.channel
@ -158,7 +159,7 @@ func (timeline *redisSortedSetTimeline) AddItem(item microsub.Item) error {
}
func (timeline *redisSortedSetTimeline) Count() (int, error) {
conn := pool.Get()
conn := timeline.pool.Get()
defer conn.Close()
channel := timeline.channel
@ -171,7 +172,7 @@ func (timeline *redisSortedSetTimeline) Count() (int, error) {
}
func (timeline *redisSortedSetTimeline) MarkRead(uids []string) error {
conn := pool.Get()
conn := timeline.pool.Get()
defer conn.Close()
channel := timeline.channel

View File

@ -12,6 +12,8 @@ import (
type redisStreamTimeline struct {
channel, channelKey string
pool *redis.Pool
}
/*
@ -23,7 +25,7 @@ func (timeline *redisStreamTimeline) Init() error {
}
func (timeline *redisStreamTimeline) Items(before, after string) (microsub.Timeline, error) {
conn := pool.Get()
conn := timeline.pool.Get()
defer conn.Close()
if before == "" {
@ -69,7 +71,7 @@ func (timeline *redisStreamTimeline) Items(before, after string) (microsub.Timel
}
func (timeline *redisStreamTimeline) AddItem(item microsub.Item) error {
conn := pool.Get()
conn := timeline.pool.Get()
defer conn.Close()
if item.Published == "" {
@ -90,7 +92,7 @@ func (timeline *redisStreamTimeline) AddItem(item microsub.Item) error {
}
func (timeline *redisStreamTimeline) Count() (int, error) {
conn := pool.Get()
conn := timeline.pool.Get()
defer conn.Close()
return redis.Int(conn.Do("XLEN", timeline.channelKey))

View File

@ -31,7 +31,7 @@ func (b *memoryBackend) getTimeline(channel string) TimelineBackend {
}
if timelineType == "sorted-set" {
timeline := &redisSortedSetTimeline{channel}
timeline := &redisSortedSetTimeline{channel: channel, pool: b.pool}
err := timeline.Init()
if err != nil {
return nil
@ -39,7 +39,7 @@ func (b *memoryBackend) getTimeline(channel string) TimelineBackend {
return timeline
}
if timelineType == "stream" {
timeline := &redisStreamTimeline{channel: channel}
timeline := &redisStreamTimeline{channel: channel, pool: b.pool}
err := timeline.Init()
if err != nil {
return nil

View File

@ -17,9 +17,8 @@ func init() {
log.SetOutput(f)
}
type fetcher struct{}
func (f fetcher) Fetch(url string) (*http.Response, error) {
// Fetch calls http.Get
func Fetch(url string) (*http.Response, error) {
return http.Get(url)
}
@ -33,7 +32,7 @@ func main() {
}
defer resp.Body.Close()
items, err := fetch.FeedItems(fetcher{}, url, resp.Header.Get("Content-Type"), resp.Body)
items, err := fetch.FeedItems(Fetch, url, resp.Header.Get("Content-Type"), resp.Body)
if err != nil {
log.Fatal(err)
}

View File

@ -25,7 +25,7 @@ import (
)
// FeedHeader returns a new microsub.Feed with the information parsed from body.
func FeedHeader(fetcher Fetcher, fetchURL, contentType string, body io.Reader) (microsub.Feed, error) {
func FeedHeader(fetcher FetcherFunc, fetchURL, contentType string, body io.Reader) (microsub.Feed, error) {
log.Printf("ProcessContent %s\n", fetchURL)
log.Println("Found " + contentType)
@ -38,7 +38,7 @@ func FeedHeader(fetcher Fetcher, fetchURL, contentType string, body io.Reader) (
author, ok := jf2.SimplifyMicroformatDataAuthor(data)
if !ok {
if strings.HasPrefix(author.URL, "http") {
resp, err := fetcher.Fetch(fetchURL)
resp, err := fetcher(fetchURL)
if err != nil {
return feed, err
}
@ -108,7 +108,7 @@ func FeedHeader(fetcher Fetcher, fetchURL, contentType string, body io.Reader) (
}
// FeedItems returns the items from the url, parsed from body.
func FeedItems(fetcher Fetcher, fetchURL, contentType string, body io.Reader) ([]microsub.Item, error) {
func FeedItems(fetcher FetcherFunc, fetchURL, contentType string, body io.Reader) ([]microsub.Item, error) {
log.Printf("ProcessContent %s\n", fetchURL)
log.Println("Found " + contentType)

View File

@ -2,6 +2,5 @@ package fetch
import "net/http"
type Fetcher interface {
Fetch(url string) (*http.Response, error)
}
// FetcherFunc is a function that fetches an url
type FetcherFunc func(url string) (*http.Response, error)

View File

@ -1,6 +1,8 @@
package server
import (
"io/ioutil"
"log"
"net/http"
"net/http/httptest"
"net/url"
@ -11,6 +13,10 @@ import (
"p83.nl/go/ekster/pkg/microsub"
)
func init() {
log.SetOutput(ioutil.Discard)
}
func createServerClient() (*httptest.Server, *client.Client) {
backend := &NullBackend{}