Move jf2 to own package, start cleanup of fetch

This commit is contained in:
Peter Stuifzand 2018-08-05 12:15:59 +02:00
parent 27f1187399
commit 573816d75f
Signed by: peter
GPG Key ID: 374322D56E5209E8
8 changed files with 191 additions and 174 deletions

View File

@ -29,19 +29,20 @@ import (
"log"
"net/http"
"net/url"
"regexp"
"rss"
"strings"
"time"
"rss"
"p83.nl/go/ekster/pkg/jf2"
"p83.nl/go/ekster/pkg/jsonfeed"
"p83.nl/go/ekster/pkg/microsub"
"github.com/garyburd/redigo/redis"
"p83.nl/go/ekster/pkg/jsonfeed"
"willnorris.com/go/microformats"
)
func (b *memoryBackend) feedHeader(fetchURL, contentType string, body io.Reader) (microsub.Feed, error) {
func feedHeader(fetchURL, contentType string, body io.Reader) (microsub.Feed, error) {
log.Printf("ProcessContent %s\n", fetchURL)
log.Println("Found " + contentType)
@ -53,7 +54,7 @@ func (b *memoryBackend) feedHeader(fetchURL, contentType string, body io.Reader)
if strings.HasPrefix(contentType, "text/html") {
data := microformats.Parse(body, u)
results := simplifyMicroformatData(data)
results := jf2.SimplifyMicroformatData(data)
found := -1
for i, r := range results {
if r["type"] == "card" {
@ -75,7 +76,7 @@ func (b *memoryBackend) feedHeader(fetchURL, contentType string, body io.Reader)
u, _ := url.Parse(fetchURL)
md := microformats.Parse(resp.Body, u)
author := simplifyMicroformatData(md)
author := jf2.SimplifyMicroformatData(md)
for _, a := range author {
if a["type"] == "card" {
card = a
@ -154,7 +155,7 @@ func (b *memoryBackend) feedHeader(fetchURL, contentType string, body io.Reader)
return feed, nil
}
func (b *memoryBackend) feedItems(fetchURL, contentType string, body io.Reader) ([]microsub.Item, error) {
func feedItems(fetchURL, contentType string, body io.Reader) ([]microsub.Item, error) {
log.Printf("ProcessContent %s\n", fetchURL)
log.Println("Found " + contentType)
@ -164,7 +165,7 @@ func (b *memoryBackend) feedItems(fetchURL, contentType string, body io.Reader)
if strings.HasPrefix(contentType, "text/html") {
data := microformats.Parse(body, u)
results := simplifyMicroformatData(data)
results := jf2.SimplifyMicroformatData(data)
found := -1
for {
for i, r := range results {
@ -198,7 +199,7 @@ func (b *memoryBackend) feedItems(fetchURL, contentType string, body io.Reader)
u, _ := url.Parse(fetchURL)
md := microformats.Parse(resp.Body, u)
author := simplifyMicroformatData(md)
author := jf2.SimplifyMicroformatData(md)
for _, a := range author {
if a["type"] == "card" {
results[i]["author"] = a
@ -333,147 +334,6 @@ func (b *memoryBackend) feedItems(fetchURL, contentType string, body io.Reader)
return items, nil
}
func (b *memoryBackend) ProcessContent(channel, fetchURL, contentType string, body io.Reader) error {
conn := pool.Get()
defer conn.Close()
items, err := b.feedItems(fetchURL, contentType, body)
if err != nil {
return err
}
for _, item := range items {
item.Read = false
err = b.channelAddItemWithMatcher(conn, channel, item)
if err != nil {
log.Printf("ERROR: %s\n", err)
}
}
err = b.updateChannelUnreadCount(conn, channel)
if err != nil {
return err
}
return nil
}
// Fetch3 fills stuff
func (b *memoryBackend) Fetch3(channel, fetchURL string) (*http.Response, error) {
log.Printf("Fetching channel=%s fetchURL=%s\n", channel, fetchURL)
return Fetch2(fetchURL)
}
func (b *memoryBackend) channelAddItemWithMatcher(conn redis.Conn, channel string, item microsub.Item) error {
for channelKey, setting := range b.Settings {
if setting.IncludeRegex != "" {
included := false
includeRegex, err := regexp.Compile(setting.IncludeRegex)
if err != nil {
log.Printf("error in regexp: %q\n", includeRegex)
} else {
if item.Content != nil && includeRegex.MatchString(item.Content.Text) {
log.Printf("Included %#v\n", item)
included = true
}
if includeRegex.MatchString(item.Name) {
log.Printf("Included %#v\n", item)
included = true
}
}
if included {
b.channelAddItem(conn, channelKey, item)
}
}
}
if setting, e := b.Settings[channel]; e {
if setting.ExcludeRegex != "" {
excludeRegex, err := regexp.Compile(setting.ExcludeRegex)
if err != nil {
log.Printf("error in regexp: %q\n", excludeRegex)
} else {
if item.Content != nil && excludeRegex.MatchString(item.Content.Text) {
log.Printf("Excluded %#v\n", item)
return nil
}
if excludeRegex.MatchString(item.Name) {
log.Printf("Excluded %#v\n", item)
return nil
}
}
}
}
return b.channelAddItem(conn, channel, item)
}
func (b *memoryBackend) channelAddItem(conn redis.Conn, channel string, item microsub.Item) error {
zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)
if item.Published == "" {
item.Published = time.Now().Format(time.RFC3339)
}
data, err := json.Marshal(item)
if err != nil {
log.Printf("error while creating item for redis: %v\n", err)
return err
}
forRedis := redisItem{
ID: item.ID,
Published: item.Published,
Read: item.Read,
Data: data,
}
itemKey := fmt.Sprintf("item:%s", item.ID)
_, err = redis.String(conn.Do("HMSET", redis.Args{}.Add(itemKey).AddFlat(&forRedis)...))
if err != nil {
return fmt.Errorf("error while writing item for redis: %v", err)
}
readChannelKey := fmt.Sprintf("channel:%s:read", channel)
isRead, err := redis.Bool(conn.Do("SISMEMBER", readChannelKey, itemKey))
if err != nil {
return err
}
if isRead {
return nil
}
score, err := time.Parse(time.RFC3339, item.Published)
if err != nil {
return fmt.Errorf("error can't parse %s as time", item.Published)
}
_, err = redis.Int64(conn.Do("ZADD", zchannelKey, score.Unix()*1.0, itemKey))
if err != nil {
return fmt.Errorf("error while zadding item %s to channel %s for redis: %v", itemKey, zchannelKey, err)
}
return nil
}
func (b *memoryBackend) updateChannelUnreadCount(conn redis.Conn, channel string) error {
if c, e := b.Channels[channel]; e {
zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)
unread, err := redis.Int(conn.Do("ZCARD", zchannelKey))
if err != nil {
return fmt.Errorf("error: while updating channel unread count for %s: %s", channel, err)
}
defer b.save()
c.Unread = unread
b.Channels[channel] = c
}
return nil
}
type redisItem struct {
ID string
Published string

View File

@ -20,11 +20,13 @@ package main
import (
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"net/url"
"os"
"reflect"
"regexp"
"strings"
"time"
@ -665,7 +667,7 @@ func (b *memoryBackend) Search(query string) ([]microsub.Feed, error) {
}
defer feedResp.Body.Close()
parsedFeed, err := b.feedHeader(fetchUrl.String(), feedResp.Header.Get("Content-Type"), feedResp.Body)
parsedFeed, err := feedHeader(fetchUrl.String(), feedResp.Header.Get("Content-Type"), feedResp.Body)
if err != nil {
log.Printf("Error in parse of %s - %v\n", fetchUrl, err)
continue
@ -684,9 +686,10 @@ func (b *memoryBackend) Search(query string) ([]microsub.Feed, error) {
log.Printf("Error in fetch of %s - %v\n", alt, err)
continue
}
// FIXME: don't defer in for loop (possible memory leak)
defer feedResp.Body.Close()
parsedFeed, err := b.feedHeader(alt, feedResp.Header.Get("Content-Type"), feedResp.Body)
parsedFeed, err := feedHeader(alt, feedResp.Header.Get("Content-Type"), feedResp.Body)
if err != nil {
log.Printf("Error in parse of %s - %v\n", alt, err)
continue
@ -706,7 +709,7 @@ func (b *memoryBackend) PreviewURL(previewURL string) (microsub.Timeline, error)
if err != nil {
return microsub.Timeline{}, fmt.Errorf("error while fetching %s: %v", previewURL, err)
}
items, err := b.feedItems(previewURL, resp.Header.Get("content-type"), resp.Body)
items, err := feedItems(previewURL, resp.Header.Get("content-type"), resp.Body)
if err != nil {
return microsub.Timeline{}, fmt.Errorf("error while fetching %s: %v", previewURL, err)
}
@ -748,3 +751,145 @@ func (b *memoryBackend) MarkRead(channel string, uids []string) error {
return nil
}
func (b *memoryBackend) ProcessContent(channel, fetchURL, contentType string, body io.Reader) error {
conn := pool.Get()
defer conn.Close()
items, err := feedItems(fetchURL, contentType, body)
if err != nil {
return err
}
for _, item := range items {
item.Read = false
err = b.channelAddItemWithMatcher(conn, channel, item)
if err != nil {
log.Printf("ERROR: %s\n", err)
}
}
err = b.updateChannelUnreadCount(conn, channel)
if err != nil {
return err
}
return nil
}
// Fetch3 fills stuff
func (b *memoryBackend) Fetch3(channel, fetchURL string) (*http.Response, error) {
log.Printf("Fetching channel=%s fetchURL=%s\n", channel, fetchURL)
return Fetch2(fetchURL)
}
func (b *memoryBackend) channelAddItemWithMatcher(conn redis.Conn, channel string, item microsub.Item) error {
for channelKey, setting := range b.Settings {
if setting.IncludeRegex != "" {
included := false
includeRegex, err := regexp.Compile(setting.IncludeRegex)
if err != nil {
log.Printf("error in regexp: %q\n", includeRegex)
} else {
if item.Content != nil && includeRegex.MatchString(item.Content.Text) {
log.Printf("Included %#v\n", item)
included = true
}
if includeRegex.MatchString(item.Name) {
log.Printf("Included %#v\n", item)
included = true
}
}
if included {
b.channelAddItem(conn, channelKey, item)
}
}
}
if setting, e := b.Settings[channel]; e {
if setting.ExcludeRegex != "" {
excludeRegex, err := regexp.Compile(setting.ExcludeRegex)
if err != nil {
log.Printf("error in regexp: %q\n", excludeRegex)
} else {
if item.Content != nil && excludeRegex.MatchString(item.Content.Text) {
log.Printf("Excluded %#v\n", item)
return nil
}
if excludeRegex.MatchString(item.Name) {
log.Printf("Excluded %#v\n", item)
return nil
}
}
}
}
return b.channelAddItem(conn, channel, item)
}
func (b *memoryBackend) channelAddItem(conn redis.Conn, channel string, item microsub.Item) error {
zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)
if item.Published == "" {
item.Published = time.Now().Format(time.RFC3339)
}
data, err := json.Marshal(item)
if err != nil {
log.Printf("error while creating item for redis: %v\n", err)
return err
}
forRedis := redisItem{
ID: item.ID,
Published: item.Published,
Read: item.Read,
Data: data,
}
itemKey := fmt.Sprintf("item:%s", item.ID)
_, err = redis.String(conn.Do("HMSET", redis.Args{}.Add(itemKey).AddFlat(&forRedis)...))
if err != nil {
return fmt.Errorf("error while writing item for redis: %v", err)
}
readChannelKey := fmt.Sprintf("channel:%s:read", channel)
isRead, err := redis.Bool(conn.Do("SISMEMBER", readChannelKey, itemKey))
if err != nil {
return err
}
if isRead {
return nil
}
score, err := time.Parse(time.RFC3339, item.Published)
if err != nil {
return fmt.Errorf("error can't parse %s as time", item.Published)
}
_, err = redis.Int64(conn.Do("ZADD", zchannelKey, score.Unix()*1.0, itemKey))
if err != nil {
return fmt.Errorf("error while zadding item %s to channel %s for redis: %v", itemKey, zchannelKey, err)
}
return nil
}
func (b *memoryBackend) updateChannelUnreadCount(conn redis.Conn, channel string) error {
if c, e := b.Channels[channel]; e {
zchannelKey := fmt.Sprintf("zchannel:%s:posts", channel)
unread, err := redis.Int(conn.Do("ZCARD", zchannelKey))
if err != nil {
return fmt.Errorf("error: while updating channel unread count for %s: %s", channel, err)
}
defer b.save()
c.Unread = unread
b.Channels[channel] = c
}
return nil
}

View File

@ -26,6 +26,7 @@ import (
"strings"
"time"
"p83.nl/go/ekster/pkg/jf2"
"p83.nl/go/ekster/pkg/microsub"
"github.com/garyburd/redigo/redis"
@ -91,7 +92,7 @@ func (h *micropubHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
item = mapToItem(simplifyMicroformat(&mfItem))
item = mapToItem(jf2.SimplifyMicroformat(&mfItem))
ok = true
} else if r.Header.Get("Content-Type") == "application/x-www-form-urlencoded" {
content := r.FormValue("content")

View File

@ -28,10 +28,8 @@ type NullBackend struct {
// ChannelsGetList gets no channels
func (b *NullBackend) ChannelsGetList() ([]microsub.Channel, error) {
return []microsub.Channel{
microsub.Channel{UID: "0000", Name: "default", Unread: 0},
microsub.Channel{UID: "0001", Name: "notifications", Unread: 0},
microsub.Channel{UID: "1000", Name: "Friends", Unread: 0},
microsub.Channel{UID: "1001", Name: "Family", Unread: 0},
microsub.Channel{UID: "0000", Name: "default", Unread: 0},
}, nil
}

1
cmd/jf2test/main.go Normal file
View File

@ -0,0 +1 @@
package jf2test

View File

@ -15,9 +15,11 @@
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package main
package jf2
import (
"fmt"
"log"
"strings"
"willnorris.com/go/microformats"
@ -56,16 +58,13 @@ func simplify(itemType string, item map[string][]interface{}) map[string]interfa
} else if k == "featured" {
feedItem[k] = v
} else if k == "checkin" || k == "author" {
if value, ok := v[0].(*microformats.Microformat); ok {
card := make(map[string]string)
card["type"] = "card"
for ik, vk := range value.Properties {
if p, ok := vk[0].(string); ok {
card[ik] = p
}
}
feedItem[k] = card
card, err := simplifyCard(v)
if err != nil {
log.Println(err)
continue
}
feedItem[k] = card
} else if value, ok := v[0].(*microformats.Microformat); ok {
mType := value.Type[0][2:]
m := simplify(mType, value.Properties)
@ -95,8 +94,21 @@ func simplify(itemType string, item map[string][]interface{}) map[string]interfa
return feedItem
}
func simplifyCard(v []interface{}) (map[string]string, error) {
if value, ok := v[0].(*microformats.Microformat); ok {
card := make(map[string]string)
card["type"] = "card"
for ik, vk := range value.Properties {
if p, ok := vk[0].(string); ok {
card[ik] = p
}
}
return card, nil
}
return nil, fmt.Errorf("not convertable to a card %q", v)
}
func simplifyMicroformat(item *microformats.Microformat) map[string]interface{} {
func SimplifyMicroformat(item *microformats.Microformat) map[string]interface{} {
itemType := item.Type[0][2:]
newItem := simplify(itemType, item.Properties)
newItem["type"] = itemType
@ -105,7 +117,7 @@ func simplifyMicroformat(item *microformats.Microformat) map[string]interface{}
if len(item.Children) > 0 {
for _, c := range item.Children {
child := simplifyMicroformat(c)
child := SimplifyMicroformat(c)
if c, e := child["children"]; e {
if ar, ok := c.([]map[string]interface{}); ok {
children = append(children, ar...)
@ -121,18 +133,18 @@ func simplifyMicroformat(item *microformats.Microformat) map[string]interface{}
return newItem
}
func simplifyMicroformatData(md *microformats.Data) []map[string]interface{} {
func SimplifyMicroformatData(md *microformats.Data) []map[string]interface{} {
items := []map[string]interface{}{}
for _, item := range md.Items {
if len(item.Type) >= 1 && item.Type[0] == "h-feed" {
for _, childItem := range item.Children {
newItem := simplifyMicroformat(childItem)
newItem := SimplifyMicroformat(childItem)
items = append(items, newItem)
}
return items
}
newItem := simplifyMicroformat(item)
newItem := SimplifyMicroformat(item)
items = append(items, newItem)
if c, e := newItem["children"]; e {
if ar, ok := c.([]map[string]interface{}); ok {

View File

@ -15,7 +15,7 @@
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package main
package jf2
import (
"log"
@ -40,7 +40,7 @@ func TestInReplyTo(t *testing.T) {
}
data := microformats.Parse(f, u)
results := simplifyMicroformatData(data)
results := SimplifyMicroformatData(data)
if results[0]["type"] != "entry" {
t.Fatalf("not an h-entry, but %s", results[0]["type"])