Compare commits
2 Commits
edb816f35b
...
a75bbb2551
| Author | SHA1 | Date | |
|---|---|---|---|
| a75bbb2551 | |||
| 5d5ee63d68 |
|
|
@ -59,7 +59,12 @@ func NewApp(options AppOptions) (*App, error) {
|
||||||
app.backend.hubIncomingBackend.baseURL = options.BaseURL
|
app.backend.hubIncomingBackend.baseURL = options.BaseURL
|
||||||
app.backend.hubIncomingBackend.backend = app.backend
|
app.backend.hubIncomingBackend.backend = app.backend
|
||||||
|
|
||||||
app.hubBackend = &hubIncomingBackend{backend: app.backend, baseURL: options.BaseURL, pool: options.pool}
|
app.hubBackend = &hubIncomingBackend{
|
||||||
|
backend: app.backend,
|
||||||
|
baseURL: options.BaseURL,
|
||||||
|
pool: options.pool,
|
||||||
|
database: options.database,
|
||||||
|
}
|
||||||
app.backend.hubIncomingBackend = *app.hubBackend
|
app.backend.hubIncomingBackend = *app.hubBackend
|
||||||
|
|
||||||
http.Handle("/micropub", µpubHandler{
|
http.Handle("/micropub", µpubHandler{
|
||||||
|
|
@ -78,6 +83,7 @@ func NewApp(options AppOptions) (*App, error) {
|
||||||
|
|
||||||
http.Handle("/incoming/", &incomingHandler{
|
http.Handle("/incoming/", &incomingHandler{
|
||||||
Backend: app.hubBackend,
|
Backend: app.hubBackend,
|
||||||
|
Processor: app.backend,
|
||||||
})
|
})
|
||||||
|
|
||||||
if !options.Headless {
|
if !options.Headless {
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"database/sql"
|
||||||
"expvar"
|
"expvar"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
|
@ -22,7 +23,7 @@ type HubBackend interface {
|
||||||
Feeds() ([]Feed, error)
|
Feeds() ([]Feed, error)
|
||||||
CreateFeed(url string) (int64, error)
|
CreateFeed(url string) (int64, error)
|
||||||
GetSecret(feedID int64) string
|
GetSecret(feedID int64) string
|
||||||
UpdateFeed(feedID int64, contentType string, body io.Reader) error
|
UpdateFeed(processor ContentProcessor, feedID int64, contentType string, body io.Reader) error
|
||||||
FeedSetLeaseSeconds(feedID int64, leaseSeconds int64) error
|
FeedSetLeaseSeconds(feedID int64, leaseSeconds int64) error
|
||||||
Subscribe(feed *Feed) error
|
Subscribe(feed *Feed) error
|
||||||
}
|
}
|
||||||
|
|
@ -31,6 +32,7 @@ type hubIncomingBackend struct {
|
||||||
backend *memoryBackend
|
backend *memoryBackend
|
||||||
baseURL string
|
baseURL string
|
||||||
pool *redis.Pool
|
pool *redis.Pool
|
||||||
|
database *sql.DB
|
||||||
}
|
}
|
||||||
|
|
||||||
// Feed contains information about the feed subscriptions
|
// Feed contains information about the feed subscriptions
|
||||||
|
|
@ -53,7 +55,7 @@ func init() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *hubIncomingBackend) GetSecret(id int64) string {
|
func (h *hubIncomingBackend) GetSecret(id int64) string {
|
||||||
db := h.backend.database
|
db := h.database
|
||||||
var secret string
|
var secret string
|
||||||
err := db.QueryRow(
|
err := db.QueryRow(
|
||||||
`select "subscription_secret" from "subscriptions" where "id" = $1`,
|
`select "subscription_secret" from "subscriptions" where "id" = $1`,
|
||||||
|
|
@ -67,7 +69,7 @@ func (h *hubIncomingBackend) GetSecret(id int64) string {
|
||||||
|
|
||||||
func (h *hubIncomingBackend) CreateFeed(topic string) (int64, error) {
|
func (h *hubIncomingBackend) CreateFeed(topic string) (int64, error) {
|
||||||
log.Println("CreateFeed", topic)
|
log.Println("CreateFeed", topic)
|
||||||
db := h.backend.database
|
db := h.database
|
||||||
|
|
||||||
secret := util.RandStringBytes(32)
|
secret := util.RandStringBytes(32)
|
||||||
urlSecret := util.RandStringBytes(32)
|
urlSecret := util.RandStringBytes(32)
|
||||||
|
|
@ -112,10 +114,10 @@ VALUES ($1, $2, $3, $4, DEFAULT) RETURNING "id"`, topic, secret, urlSecret, 60*6
|
||||||
return int64(subscriptionID), nil
|
return int64(subscriptionID), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *hubIncomingBackend) UpdateFeed(subscriptionID int64, contentType string, body io.Reader) error {
|
func (h *hubIncomingBackend) UpdateFeed(processor ContentProcessor, subscriptionID int64, contentType string, body io.Reader) error {
|
||||||
log.Println("UpdateFeed", subscriptionID)
|
log.Println("UpdateFeed", subscriptionID)
|
||||||
|
|
||||||
db := h.backend.database
|
db := h.database
|
||||||
var (
|
var (
|
||||||
topic string
|
topic string
|
||||||
channel string
|
channel string
|
||||||
|
|
@ -139,7 +141,7 @@ func (h *hubIncomingBackend) UpdateFeed(subscriptionID int64, contentType string
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("Updating feed %s %q in %q\n", feedID, topic, channel)
|
log.Printf("Updating feed %s %q in %q\n", feedID, topic, channel)
|
||||||
err = h.backend.ProcessContent(channel, feedID, topic, contentType, body)
|
err = processor.ProcessContent(channel, feedID, topic, contentType, body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("could not process content for channel %s: %s", channel, err)
|
log.Printf("could not process content for channel %s: %s", channel, err)
|
||||||
}
|
}
|
||||||
|
|
@ -149,7 +151,7 @@ func (h *hubIncomingBackend) UpdateFeed(subscriptionID int64, contentType string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *hubIncomingBackend) FeedSetLeaseSeconds(subscriptionID int64, leaseSeconds int64) error {
|
func (h *hubIncomingBackend) FeedSetLeaseSeconds(subscriptionID int64, leaseSeconds int64) error {
|
||||||
db := h.backend.database
|
db := h.database
|
||||||
_, err := db.Exec(`
|
_, err := db.Exec(`
|
||||||
update subscriptions
|
update subscriptions
|
||||||
set lease_seconds = $1,
|
set lease_seconds = $1,
|
||||||
|
|
@ -161,7 +163,7 @@ where id = $3
|
||||||
|
|
||||||
// Feeds returns a list of subscribed feeds
|
// Feeds returns a list of subscribed feeds
|
||||||
func (h *hubIncomingBackend) Feeds() ([]Feed, error) {
|
func (h *hubIncomingBackend) Feeds() ([]Feed, error) {
|
||||||
db := h.backend.database
|
db := h.database
|
||||||
var feeds []Feed
|
var feeds []Feed
|
||||||
|
|
||||||
rows, err := db.Query(`
|
rows, err := db.Query(`
|
||||||
|
|
|
||||||
|
|
@ -14,6 +14,7 @@ import (
|
||||||
|
|
||||||
type incomingHandler struct {
|
type incomingHandler struct {
|
||||||
Backend HubBackend
|
Backend HubBackend
|
||||||
|
Processor ContentProcessor
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
@ -94,7 +95,7 @@ func (h *incomingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
|
|
||||||
ct := r.Header.Get("Content-Type")
|
ct := r.Header.Get("Content-Type")
|
||||||
err = h.Backend.UpdateFeed(feed, ct, bytes.NewBuffer(feedContent))
|
err = h.Backend.UpdateFeed(h.Processor, feed, ct, bytes.NewBuffer(feedContent))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
http.Error(w, fmt.Sprintf("could not update feed: %s (%s)", ct, err), 400)
|
http.Error(w, fmt.Sprintf("could not update feed: %s (%s)", ct, err), 400)
|
||||||
return
|
return
|
||||||
|
|
|
||||||
|
|
@ -573,6 +573,11 @@ func ProcessSourcedItems(fetcher fetch.Fetcher, fetchURL, contentType string, bo
|
||||||
return items, nil
|
return items, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ContentProcessor processes content for a channel and feed
|
||||||
|
type ContentProcessor interface {
|
||||||
|
ProcessContent(channel, feedID, fetchURL, contentType string, body io.Reader) error
|
||||||
|
}
|
||||||
|
|
||||||
func (b *memoryBackend) ProcessContent(channel, feedID, fetchURL, contentType string, body io.Reader) error {
|
func (b *memoryBackend) ProcessContent(channel, feedID, fetchURL, contentType string, body io.Reader) error {
|
||||||
cachingFetch := WithCaching(b.pool, fetch.FetcherFunc(Fetch2))
|
cachingFetch := WithCaching(b.pool, fetch.FetcherFunc(Fetch2))
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user