diff --git a/cmd/eksterd/app.go b/cmd/eksterd/app.go index fde24a5..72a5ce9 100644 --- a/cmd/eksterd/app.go +++ b/cmd/eksterd/app.go @@ -82,7 +82,8 @@ func NewApp(options AppOptions) (*App, error) { http.Handle("/microsub", handler) http.Handle("/incoming/", &incomingHandler{ - Backend: app.hubBackend, + Backend: app.hubBackend, + Processor: app.backend, }) if !options.Headless { diff --git a/cmd/eksterd/hubbackend.go b/cmd/eksterd/hubbackend.go index 32dc071..e90c382 100644 --- a/cmd/eksterd/hubbackend.go +++ b/cmd/eksterd/hubbackend.go @@ -23,7 +23,7 @@ type HubBackend interface { Feeds() ([]Feed, error) CreateFeed(url string) (int64, error) GetSecret(feedID int64) string - UpdateFeed(feedID int64, contentType string, body io.Reader) error + UpdateFeed(processor ContentProcessor, feedID int64, contentType string, body io.Reader) error FeedSetLeaseSeconds(feedID int64, leaseSeconds int64) error Subscribe(feed *Feed) error } @@ -114,7 +114,7 @@ VALUES ($1, $2, $3, $4, DEFAULT) RETURNING "id"`, topic, secret, urlSecret, 60*6 return int64(subscriptionID), nil } -func (h *hubIncomingBackend) UpdateFeed(subscriptionID int64, contentType string, body io.Reader) error { +func (h *hubIncomingBackend) UpdateFeed(processor ContentProcessor, subscriptionID int64, contentType string, body io.Reader) error { log.Println("UpdateFeed", subscriptionID) db := h.database @@ -141,7 +141,7 @@ func (h *hubIncomingBackend) UpdateFeed(subscriptionID int64, contentType string } log.Printf("Updating feed %s %q in %q\n", feedID, topic, channel) - err = h.backend.ProcessContent(channel, feedID, topic, contentType, body) + err = processor.ProcessContent(channel, feedID, topic, contentType, body) if err != nil { log.Printf("could not process content for channel %s: %s", channel, err) } diff --git a/cmd/eksterd/incoming.go b/cmd/eksterd/incoming.go index 2ab6cde..740b1e9 100644 --- a/cmd/eksterd/incoming.go +++ b/cmd/eksterd/incoming.go @@ -13,7 +13,8 @@ import ( ) type incomingHandler struct { - Backend HubBackend + Backend HubBackend + Processor ContentProcessor } var ( @@ -94,7 +95,7 @@ func (h *incomingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } ct := r.Header.Get("Content-Type") - err = h.Backend.UpdateFeed(feed, ct, bytes.NewBuffer(feedContent)) + err = h.Backend.UpdateFeed(h.Processor, feed, ct, bytes.NewBuffer(feedContent)) if err != nil { http.Error(w, fmt.Sprintf("could not update feed: %s (%s)", ct, err), 400) return diff --git a/cmd/eksterd/memory.go b/cmd/eksterd/memory.go index b832ba6..b0e238d 100644 --- a/cmd/eksterd/memory.go +++ b/cmd/eksterd/memory.go @@ -573,6 +573,11 @@ func ProcessSourcedItems(fetcher fetch.Fetcher, fetchURL, contentType string, bo return items, nil } +// ContentProcessor processes content for a channel and feed +type ContentProcessor interface { + ProcessContent(channel, feedID, fetchURL, contentType string, body io.Reader) error +} + func (b *memoryBackend) ProcessContent(channel, feedID, fetchURL, contentType string, body io.Reader) error { cachingFetch := WithCaching(b.pool, fetch.FetcherFunc(Fetch2))