Implement events handling and add documentation
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Peter Stuifzand 2019-03-24 16:21:38 +01:00
parent 481019bcd7
commit 15d7c69c30
Signed by: peter
GPG Key ID: 374322D56E5209E8
8 changed files with 158 additions and 67 deletions

View File

@ -1,20 +1,4 @@
/*
ek - microsub client
Copyright (C) 2018 Peter Stuifzand
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// Ek is a microsub client.
package main
import (
@ -49,8 +33,10 @@ type Export struct {
Feeds map[string][]ExportFeed `json:"feeds,omitempty"`
}
// ExportFeed is a feed.
type ExportFeed string
// ExportChannel contains the channel information for exports.
type ExportChannel struct {
UID string `json:"uid,omitempty"`
Name string `json:"channel,omitempty"`
@ -359,9 +345,9 @@ func performCommands(sub microsub.Microsub, commands []string) {
filetype := commands[1]
if filetype == "opml" {
exportOpmlFromMicrosub(sub)
exportOPMLFromMicrosub(sub)
} else if filetype == "json" {
exportJsonFromMicrosub(sub)
exportJSONFromMicrosub(sub)
} else {
log.Fatalf("unsupported filetype %q", filetype)
}
@ -372,9 +358,9 @@ func performCommands(sub microsub.Microsub, commands []string) {
filename := commands[2]
if filetype == "opml" {
importOpmlIntoMicrosub(sub, filename)
importOPMLIntoMicrosub(sub, filename)
} else if filetype == "json" {
importJsonIntoMicrosub(sub, filename)
importJSONIntoMicrosub(sub, filename)
} else {
log.Fatalf("unsupported filetype %q", filetype)
}
@ -383,9 +369,19 @@ func performCommands(sub microsub.Microsub, commands []string) {
if len(commands) == 1 && commands[0] == "version" {
fmt.Printf("ek %s\n", Version)
}
if len(commands) == 1 && commands[0] == "events" {
c, err := sub.Events()
if err != nil {
log.Fatalf("could not start event listener: %+v", err)
}
for msg := range c {
log.Printf("%s: %s", msg.Event, msg.Data)
}
}
}
func exportOpmlFromMicrosub(sub microsub.Microsub) {
func exportOPMLFromMicrosub(sub microsub.Microsub) {
output := opml.OPML{}
output.Head.Title = "Microsub channels and feeds"
output.Head.DateCreated = time.Now().Format(time.RFC3339)
@ -424,7 +420,7 @@ func exportOpmlFromMicrosub(sub microsub.Microsub) {
os.Stdout.WriteString(xml)
}
func exportJsonFromMicrosub(sub microsub.Microsub) {
func exportJSONFromMicrosub(sub microsub.Microsub) {
contents := Export{Version: "1.0", Generator: "ek version " + Version}
channels, err := sub.ChannelsGetList()
if err != nil {
@ -449,7 +445,7 @@ func exportJsonFromMicrosub(sub microsub.Microsub) {
}
}
func importJsonIntoMicrosub(sub microsub.Microsub, filename string) {
func importJSONIntoMicrosub(sub microsub.Microsub, filename string) {
var export Export
f, err := os.Open(filename)
if err != nil {
@ -509,7 +505,7 @@ func importJsonIntoMicrosub(sub microsub.Microsub, filename string) {
}
}
func importOpmlIntoMicrosub(sub microsub.Microsub, filename string) {
func importOPMLIntoMicrosub(sub microsub.Microsub, filename string) {
channelMap := make(map[string]microsub.Channel)
channels, err := sub.ChannelsGetList()
if err != nil {

View File

@ -260,9 +260,7 @@ func getAppInfo(clientID string) (app, error) {
}
func (h *mainHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
pool := h.pool
conn := pool.Get()
conn := h.pool.Get()
defer conn.Close()
err := r.ParseForm()

View File

@ -515,6 +515,10 @@ func (b *memoryBackend) MarkRead(channel string, uids []string) error {
return nil
}
func (b *memoryBackend) Events() (chan sse.Message, error) {
return sse.StartConnection(b.broker)
}
func (b *memoryBackend) ProcessContent(channel, fetchURL, contentType string, body io.Reader) error {
cachingFetch := WithCaching(b.pool, Fetch2)

View File

@ -10,7 +10,9 @@ import (
"net/url"
"strings"
"github.com/pkg/errors"
"p83.nl/go/ekster/pkg/microsub"
"p83.nl/go/ekster/pkg/sse"
)
// Client is a HTTP client for Microsub
@ -201,6 +203,7 @@ func (c *Client) PreviewURL(url string) (microsub.Timeline, error) {
return timeline, nil
}
// FollowGetList gets the list of followed feeds.
func (c *Client) FollowGetList(channel string) ([]microsub.Feed, error) {
args := make(map[string]string)
args["channel"] = channel
@ -228,6 +231,7 @@ func (c *Client) FollowGetList(channel string) ([]microsub.Feed, error) {
return response.Items, nil
}
// ChannelsCreate creates and new channel on a microsub server.
func (c *Client) ChannelsCreate(name string) (microsub.Channel, error) {
args := make(map[string]string)
args["name"] = name
@ -245,6 +249,7 @@ func (c *Client) ChannelsCreate(name string) (microsub.Channel, error) {
return channel, nil
}
// ChannelsUpdate updates a channel.
func (c *Client) ChannelsUpdate(uid, name string) (microsub.Channel, error) {
args := make(map[string]string)
args["name"] = name
@ -263,6 +268,7 @@ func (c *Client) ChannelsUpdate(uid, name string) (microsub.Channel, error) {
return channel, nil
}
// ChannelsDelete deletes a channel.
func (c *Client) ChannelsDelete(uid string) error {
args := make(map[string]string)
args["channel"] = uid
@ -275,6 +281,7 @@ func (c *Client) ChannelsDelete(uid string) error {
return nil
}
// FollowURL follows a url.
func (c *Client) FollowURL(channel, url string) (microsub.Feed, error) {
args := make(map[string]string)
args["channel"] = channel
@ -293,6 +300,7 @@ func (c *Client) FollowURL(channel, url string) (microsub.Feed, error) {
return feed, nil
}
// UnfollowURL unfollows a url in a channel.
func (c *Client) UnfollowURL(channel, url string) error {
args := make(map[string]string)
args["channel"] = channel
@ -305,6 +313,7 @@ func (c *Client) UnfollowURL(channel, url string) error {
return nil
}
// Search asks the server to search for the query.
func (c *Client) Search(query string) ([]microsub.Feed, error) {
args := make(map[string]string)
args["query"] = query
@ -325,6 +334,7 @@ func (c *Client) Search(query string) ([]microsub.Feed, error) {
return response.Results, nil
}
// MarkRead marks an item read on the server.
func (c *Client) MarkRead(channel string, uids []string) error {
args := make(map[string]string)
args["channel"] = channel
@ -342,3 +352,17 @@ func (c *Client) MarkRead(channel string, uids []string) error {
res.Body.Close()
return nil
}
// Events open an event channel to the server.
func (c *Client) Events() (chan sse.Message, error) {
res, err := c.microsubGetRequest("events", nil)
if err != nil {
return nil, err
}
ch, err := sse.Reader(res.Body)
if err != nil {
return nil, errors.Wrap(err, "could not create reader")
}
return ch, nil
}

View File

@ -4,6 +4,8 @@ package microsub
import (
"encoding/json"
"fmt"
"p83.nl/go/ekster/pkg/sse"
)
/*
@ -37,6 +39,7 @@ type Channel struct {
Unread Unread `json:"unread,omitempty"`
}
// Card contains the fields of an author or location.
type Card struct {
// Filled bool `json:"filled,omitempty"`
Type string `json:"type,omitempty"`
@ -50,6 +53,7 @@ type Card struct {
Latitude string `json:"latitude,omitempty" mf2:"latitude"`
}
// Content contains the Text or HTML content of an Item.
type Content struct {
Text string `json:"text,omitempty" mf2:"value"`
HTML string `json:"html,omitempty" mf2:"html"`
@ -92,6 +96,7 @@ type Timeline struct {
Paging Pagination `json:"paging"`
}
// Feed is one microsub feed.
type Feed struct {
Type string `json:"type"`
URL string `json:"url"`
@ -101,16 +106,6 @@ type Feed struct {
Author Card `json:"author,omitempty"`
}
type Message string
type Event struct {
Msg Message
}
type EventListener interface {
WriteMessage(evt Event)
}
// Microsub is the main protocol that should be implemented by a backend
type Microsub interface {
ChannelsGetList() ([]Channel, error)
@ -129,8 +124,11 @@ type Microsub interface {
Search(query string) ([]Feed, error)
PreviewURL(url string) (Timeline, error)
Events() (chan sse.Message, error)
}
// MarshalJSON encodes an Unread value as JSON
func (unread Unread) MarshalJSON() ([]byte, error) {
switch unread.Type {
case UnreadBool:
@ -141,6 +139,7 @@ func (unread Unread) MarshalJSON() ([]byte, error) {
return json.Marshal(nil)
}
// UnmarshalJSON decodes an Unread value from JSON
func (unread *Unread) UnmarshalJSON(bytes []byte) error {
var b bool
err := json.Unmarshal(bytes, &b)
@ -161,6 +160,7 @@ func (unread *Unread) UnmarshalJSON(bytes []byte) error {
return fmt.Errorf("can't unmarshal as bool or int")
}
// String returns a string of the unread value
func (unread Unread) String() string {
switch unread.Type {
case UnreadBool:
@ -171,6 +171,7 @@ func (unread Unread) String() string {
return ""
}
// HasUnread return true of there are unread items.
func (unread *Unread) HasUnread() bool {
switch unread.Type {
case UnreadBool:

View File

@ -7,6 +7,7 @@ package server
import (
"encoding/json"
"fmt"
"log"
"net/http"
"regexp"
@ -99,10 +100,30 @@ func (h *microsubHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
"items": following,
})
} else if action == "events" {
err := sse.StartConnection(h.Broker, w)
events, err := h.backend.Events()
if err != nil {
http.Error(w, "could not start sse connection", 500)
}
// Remove this client from the map of connected clients
// when this handler exits.
defer func() {
h.Broker.CloseClient(events)
}()
// Listen to connection close and un-register messageChan
notify := w.(http.CloseNotifier).CloseNotify()
go func() {
<-notify
h.Broker.CloseClient(events)
}()
err = sse.WriteMessages(w, events)
if err != nil {
log.Println(err)
http.Error(w, "internal server error", 500)
}
} else {
http.Error(w, fmt.Sprintf("unknown action %s\n", action), 400)
return

View File

@ -2,6 +2,7 @@ package server
import (
"p83.nl/go/ekster/pkg/microsub"
"p83.nl/go/ekster/pkg/sse"
)
// NullBackend is the simplest possible backend
@ -81,3 +82,10 @@ func (b *NullBackend) PreviewURL(url string) (microsub.Timeline, error) {
func (b *NullBackend) MarkRead(channel string, uids []string) error {
return nil
}
// Events returns a closed channel.
func (b *NullBackend) Events() (chan sse.Message, error) {
ch := make(chan sse.Message)
close(ch)
return ch, nil
}

View File

@ -1,10 +1,15 @@
package sse
import (
"bufio"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"strings"
"github.com/pkg/errors"
)
// A MessageChan is a channel of channels
@ -16,6 +21,7 @@ type MessageChan chan Message
// Message is a message.
type Message struct {
Event string
Data string
Object interface{}
}
@ -77,8 +83,24 @@ func NewBroker() (broker *Broker) {
return
}
// CloseClient closes the client channel
func (broker *Broker) CloseClient(ch MessageChan) {
broker.closingClients <- ch
}
// StartConnection starts a SSE connection, based on an existing HTTP connection.
func StartConnection(broker *Broker, w http.ResponseWriter) error {
func StartConnection(broker *Broker) (MessageChan, error) {
// Each connection registers its own message channel with the Broker's connections registry
messageChan := make(MessageChan)
// Signal the broker that we have a new connection
broker.newClients <- messageChan
return messageChan, nil
}
// WriteMessages writes SSE formatted messages to the writer
func WriteMessages(w http.ResponseWriter, messageChan chan Message) error {
// Make sure that the writer supports flushing.
flusher, ok := w.(http.Flusher)
if !ok {
@ -102,39 +124,56 @@ func StartConnection(broker *Broker, w http.ResponseWriter) error {
flusher.Flush()
// Each connection registers its own message channel with the Broker's connections registry
messageChan := make(MessageChan)
// Signal the broker that we have a new connection
broker.newClients <- messageChan
// Remove this client from the map of connected clients
// when this handler exits.
defer func() {
broker.closingClients <- messageChan
}()
// Listen to connection close and un-register messageChan
notify := w.(http.CloseNotifier).CloseNotify()
go func() {
<-notify
broker.closingClients <- messageChan
}()
// block waiting or messages broadcast on this connection's messageChan
for {
for message := range messageChan {
// Write to the ResponseWriter, Server Sent Events compatible
message := <-messageChan
output, err := json.Marshal(message.Object)
if err != nil {
log.Println(err)
continue
return errors.Wrap(err, "could not marshal message data")
}
fmt.Fprintf(w, "event: %s\n", message.Event)
fmt.Fprintf(w, "data: %s\n\n", output)
// Flush the data immediately instead of buffering it for later.
_, err = fmt.Fprintf(w, "event: %s\r\n", message.Event)
if err != nil {
return errors.Wrap(err, "could not write message header")
}
_, err = fmt.Fprintf(w, "data: %s\r\n\r\n", output)
if err != nil {
return errors.Wrap(err, "could not write message data")
}
flusher.Flush()
}
return nil
}
// Reader returns a channel that contains parsed SSE messages.
func Reader(body io.ReadCloser) (MessageChan, error) {
ch := make(MessageChan)
r := bufio.NewScanner(body)
var msg Message
go func() {
for r.Scan() {
line := r.Text()
if line == "" {
ch <- msg
msg = Message{}
continue
}
if strings.HasPrefix(line, "event: ") {
line = line[len("event: "):]
msg.Event = line
}
if strings.HasPrefix(line, "data: ") {
line = line[len("data: "):]
msg.Data = line
}
}
if err := r.Err(); err != nil {
log.Printf("could not scanner lines from sse events: %+v", err)
}
}()
return ch, nil
}