Compare commits
No commits in common. "15d7c69c30168d671d318957d78dde11a59c8105" and "1e00d32aedb8c41cd5141eaa6ec6122f99dedbe5" have entirely different histories.
15d7c69c30
...
1e00d32aed
8
TODO.md
8
TODO.md
|
|
@ -4,11 +4,3 @@
|
|||
- Increase ease of use for people who want to try Ekster
|
||||
- Hosted version??
|
||||
- Per user backends
|
||||
|
||||
|
||||
|
||||
|
||||
#### Code
|
||||
|
||||
- Remove dependency between hubIncomingHandler and memoryBackend
|
||||
- Improve error handling
|
||||
|
|
|
|||
|
|
@ -1,4 +1,20 @@
|
|||
// Ek is a microsub client.
|
||||
/*
|
||||
ek - microsub client
|
||||
Copyright (C) 2018 Peter Stuifzand
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
package main
|
||||
|
||||
import (
|
||||
|
|
@ -33,10 +49,8 @@ type Export struct {
|
|||
Feeds map[string][]ExportFeed `json:"feeds,omitempty"`
|
||||
}
|
||||
|
||||
// ExportFeed is a feed.
|
||||
type ExportFeed string
|
||||
|
||||
// ExportChannel contains the channel information for exports.
|
||||
type ExportChannel struct {
|
||||
UID string `json:"uid,omitempty"`
|
||||
Name string `json:"channel,omitempty"`
|
||||
|
|
@ -345,9 +359,9 @@ func performCommands(sub microsub.Microsub, commands []string) {
|
|||
filetype := commands[1]
|
||||
|
||||
if filetype == "opml" {
|
||||
exportOPMLFromMicrosub(sub)
|
||||
exportOpmlFromMicrosub(sub)
|
||||
} else if filetype == "json" {
|
||||
exportJSONFromMicrosub(sub)
|
||||
exportJsonFromMicrosub(sub)
|
||||
} else {
|
||||
log.Fatalf("unsupported filetype %q", filetype)
|
||||
}
|
||||
|
|
@ -358,9 +372,9 @@ func performCommands(sub microsub.Microsub, commands []string) {
|
|||
filename := commands[2]
|
||||
|
||||
if filetype == "opml" {
|
||||
importOPMLIntoMicrosub(sub, filename)
|
||||
importOpmlIntoMicrosub(sub, filename)
|
||||
} else if filetype == "json" {
|
||||
importJSONIntoMicrosub(sub, filename)
|
||||
importJsonIntoMicrosub(sub, filename)
|
||||
} else {
|
||||
log.Fatalf("unsupported filetype %q", filetype)
|
||||
}
|
||||
|
|
@ -369,19 +383,9 @@ func performCommands(sub microsub.Microsub, commands []string) {
|
|||
if len(commands) == 1 && commands[0] == "version" {
|
||||
fmt.Printf("ek %s\n", Version)
|
||||
}
|
||||
|
||||
if len(commands) == 1 && commands[0] == "events" {
|
||||
c, err := sub.Events()
|
||||
if err != nil {
|
||||
log.Fatalf("could not start event listener: %+v", err)
|
||||
}
|
||||
for msg := range c {
|
||||
log.Printf("%s: %s", msg.Event, msg.Data)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func exportOPMLFromMicrosub(sub microsub.Microsub) {
|
||||
func exportOpmlFromMicrosub(sub microsub.Microsub) {
|
||||
output := opml.OPML{}
|
||||
output.Head.Title = "Microsub channels and feeds"
|
||||
output.Head.DateCreated = time.Now().Format(time.RFC3339)
|
||||
|
|
@ -420,7 +424,7 @@ func exportOPMLFromMicrosub(sub microsub.Microsub) {
|
|||
os.Stdout.WriteString(xml)
|
||||
}
|
||||
|
||||
func exportJSONFromMicrosub(sub microsub.Microsub) {
|
||||
func exportJsonFromMicrosub(sub microsub.Microsub) {
|
||||
contents := Export{Version: "1.0", Generator: "ek version " + Version}
|
||||
channels, err := sub.ChannelsGetList()
|
||||
if err != nil {
|
||||
|
|
@ -445,7 +449,7 @@ func exportJSONFromMicrosub(sub microsub.Microsub) {
|
|||
}
|
||||
}
|
||||
|
||||
func importJSONIntoMicrosub(sub microsub.Microsub, filename string) {
|
||||
func importJsonIntoMicrosub(sub microsub.Microsub, filename string) {
|
||||
var export Export
|
||||
f, err := os.Open(filename)
|
||||
if err != nil {
|
||||
|
|
@ -505,7 +509,7 @@ func importJSONIntoMicrosub(sub microsub.Microsub, filename string) {
|
|||
}
|
||||
}
|
||||
|
||||
func importOPMLIntoMicrosub(sub microsub.Microsub, filename string) {
|
||||
func importOpmlIntoMicrosub(sub microsub.Microsub, filename string) {
|
||||
channelMap := make(map[string]microsub.Channel)
|
||||
channels, err := sub.ChannelsGetList()
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -260,7 +260,9 @@ func getAppInfo(clientID string) (app, error) {
|
|||
}
|
||||
|
||||
func (h *mainHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
conn := h.pool.Get()
|
||||
pool := h.pool
|
||||
|
||||
conn := pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
err := r.ParseForm()
|
||||
|
|
|
|||
|
|
@ -515,10 +515,6 @@ func (b *memoryBackend) MarkRead(channel string, uids []string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (b *memoryBackend) Events() (chan sse.Message, error) {
|
||||
return sse.StartConnection(b.broker)
|
||||
}
|
||||
|
||||
func (b *memoryBackend) ProcessContent(channel, fetchURL, contentType string, body io.Reader) error {
|
||||
cachingFetch := WithCaching(b.pool, Fetch2)
|
||||
|
||||
|
|
|
|||
|
|
@ -10,9 +10,7 @@ import (
|
|||
"net/url"
|
||||
"strings"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"p83.nl/go/ekster/pkg/microsub"
|
||||
"p83.nl/go/ekster/pkg/sse"
|
||||
)
|
||||
|
||||
// Client is a HTTP client for Microsub
|
||||
|
|
@ -203,7 +201,6 @@ func (c *Client) PreviewURL(url string) (microsub.Timeline, error) {
|
|||
return timeline, nil
|
||||
}
|
||||
|
||||
// FollowGetList gets the list of followed feeds.
|
||||
func (c *Client) FollowGetList(channel string) ([]microsub.Feed, error) {
|
||||
args := make(map[string]string)
|
||||
args["channel"] = channel
|
||||
|
|
@ -231,7 +228,6 @@ func (c *Client) FollowGetList(channel string) ([]microsub.Feed, error) {
|
|||
return response.Items, nil
|
||||
}
|
||||
|
||||
// ChannelsCreate creates and new channel on a microsub server.
|
||||
func (c *Client) ChannelsCreate(name string) (microsub.Channel, error) {
|
||||
args := make(map[string]string)
|
||||
args["name"] = name
|
||||
|
|
@ -249,7 +245,6 @@ func (c *Client) ChannelsCreate(name string) (microsub.Channel, error) {
|
|||
return channel, nil
|
||||
}
|
||||
|
||||
// ChannelsUpdate updates a channel.
|
||||
func (c *Client) ChannelsUpdate(uid, name string) (microsub.Channel, error) {
|
||||
args := make(map[string]string)
|
||||
args["name"] = name
|
||||
|
|
@ -268,7 +263,6 @@ func (c *Client) ChannelsUpdate(uid, name string) (microsub.Channel, error) {
|
|||
return channel, nil
|
||||
}
|
||||
|
||||
// ChannelsDelete deletes a channel.
|
||||
func (c *Client) ChannelsDelete(uid string) error {
|
||||
args := make(map[string]string)
|
||||
args["channel"] = uid
|
||||
|
|
@ -281,7 +275,6 @@ func (c *Client) ChannelsDelete(uid string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// FollowURL follows a url.
|
||||
func (c *Client) FollowURL(channel, url string) (microsub.Feed, error) {
|
||||
args := make(map[string]string)
|
||||
args["channel"] = channel
|
||||
|
|
@ -300,7 +293,6 @@ func (c *Client) FollowURL(channel, url string) (microsub.Feed, error) {
|
|||
return feed, nil
|
||||
}
|
||||
|
||||
// UnfollowURL unfollows a url in a channel.
|
||||
func (c *Client) UnfollowURL(channel, url string) error {
|
||||
args := make(map[string]string)
|
||||
args["channel"] = channel
|
||||
|
|
@ -313,7 +305,6 @@ func (c *Client) UnfollowURL(channel, url string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Search asks the server to search for the query.
|
||||
func (c *Client) Search(query string) ([]microsub.Feed, error) {
|
||||
args := make(map[string]string)
|
||||
args["query"] = query
|
||||
|
|
@ -334,7 +325,6 @@ func (c *Client) Search(query string) ([]microsub.Feed, error) {
|
|||
return response.Results, nil
|
||||
}
|
||||
|
||||
// MarkRead marks an item read on the server.
|
||||
func (c *Client) MarkRead(channel string, uids []string) error {
|
||||
args := make(map[string]string)
|
||||
args["channel"] = channel
|
||||
|
|
@ -352,17 +342,3 @@ func (c *Client) MarkRead(channel string, uids []string) error {
|
|||
res.Body.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Events open an event channel to the server.
|
||||
func (c *Client) Events() (chan sse.Message, error) {
|
||||
res, err := c.microsubGetRequest("events", nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ch, err := sse.Reader(res.Body)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not create reader")
|
||||
}
|
||||
|
||||
return ch, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,8 +4,6 @@ package microsub
|
|||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"p83.nl/go/ekster/pkg/sse"
|
||||
)
|
||||
|
||||
/*
|
||||
|
|
@ -39,7 +37,6 @@ type Channel struct {
|
|||
Unread Unread `json:"unread,omitempty"`
|
||||
}
|
||||
|
||||
// Card contains the fields of an author or location.
|
||||
type Card struct {
|
||||
// Filled bool `json:"filled,omitempty"`
|
||||
Type string `json:"type,omitempty"`
|
||||
|
|
@ -53,7 +50,6 @@ type Card struct {
|
|||
Latitude string `json:"latitude,omitempty" mf2:"latitude"`
|
||||
}
|
||||
|
||||
// Content contains the Text or HTML content of an Item.
|
||||
type Content struct {
|
||||
Text string `json:"text,omitempty" mf2:"value"`
|
||||
HTML string `json:"html,omitempty" mf2:"html"`
|
||||
|
|
@ -96,7 +92,6 @@ type Timeline struct {
|
|||
Paging Pagination `json:"paging"`
|
||||
}
|
||||
|
||||
// Feed is one microsub feed.
|
||||
type Feed struct {
|
||||
Type string `json:"type"`
|
||||
URL string `json:"url"`
|
||||
|
|
@ -106,6 +101,16 @@ type Feed struct {
|
|||
Author Card `json:"author,omitempty"`
|
||||
}
|
||||
|
||||
type Message string
|
||||
|
||||
type Event struct {
|
||||
Msg Message
|
||||
}
|
||||
|
||||
type EventListener interface {
|
||||
WriteMessage(evt Event)
|
||||
}
|
||||
|
||||
// Microsub is the main protocol that should be implemented by a backend
|
||||
type Microsub interface {
|
||||
ChannelsGetList() ([]Channel, error)
|
||||
|
|
@ -124,11 +129,8 @@ type Microsub interface {
|
|||
|
||||
Search(query string) ([]Feed, error)
|
||||
PreviewURL(url string) (Timeline, error)
|
||||
|
||||
Events() (chan sse.Message, error)
|
||||
}
|
||||
|
||||
// MarshalJSON encodes an Unread value as JSON
|
||||
func (unread Unread) MarshalJSON() ([]byte, error) {
|
||||
switch unread.Type {
|
||||
case UnreadBool:
|
||||
|
|
@ -139,7 +141,6 @@ func (unread Unread) MarshalJSON() ([]byte, error) {
|
|||
return json.Marshal(nil)
|
||||
}
|
||||
|
||||
// UnmarshalJSON decodes an Unread value from JSON
|
||||
func (unread *Unread) UnmarshalJSON(bytes []byte) error {
|
||||
var b bool
|
||||
err := json.Unmarshal(bytes, &b)
|
||||
|
|
@ -160,7 +161,6 @@ func (unread *Unread) UnmarshalJSON(bytes []byte) error {
|
|||
return fmt.Errorf("can't unmarshal as bool or int")
|
||||
}
|
||||
|
||||
// String returns a string of the unread value
|
||||
func (unread Unread) String() string {
|
||||
switch unread.Type {
|
||||
case UnreadBool:
|
||||
|
|
@ -171,7 +171,6 @@ func (unread Unread) String() string {
|
|||
return ""
|
||||
}
|
||||
|
||||
// HasUnread return true of there are unread items.
|
||||
func (unread *Unread) HasUnread() bool {
|
||||
switch unread.Type {
|
||||
case UnreadBool:
|
||||
|
|
|
|||
|
|
@ -7,7 +7,6 @@ package server
|
|||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"regexp"
|
||||
|
||||
|
|
@ -100,30 +99,10 @@ func (h *microsubHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
"items": following,
|
||||
})
|
||||
} else if action == "events" {
|
||||
events, err := h.backend.Events()
|
||||
err := sse.StartConnection(h.Broker, w)
|
||||
if err != nil {
|
||||
http.Error(w, "could not start sse connection", 500)
|
||||
}
|
||||
|
||||
// Remove this client from the map of connected clients
|
||||
// when this handler exits.
|
||||
defer func() {
|
||||
h.Broker.CloseClient(events)
|
||||
}()
|
||||
|
||||
// Listen to connection close and un-register messageChan
|
||||
notify := w.(http.CloseNotifier).CloseNotify()
|
||||
|
||||
go func() {
|
||||
<-notify
|
||||
h.Broker.CloseClient(events)
|
||||
}()
|
||||
|
||||
err = sse.WriteMessages(w, events)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
http.Error(w, "internal server error", 500)
|
||||
}
|
||||
} else {
|
||||
http.Error(w, fmt.Sprintf("unknown action %s\n", action), 400)
|
||||
return
|
||||
|
|
|
|||
|
|
@ -2,7 +2,6 @@ package server
|
|||
|
||||
import (
|
||||
"p83.nl/go/ekster/pkg/microsub"
|
||||
"p83.nl/go/ekster/pkg/sse"
|
||||
)
|
||||
|
||||
// NullBackend is the simplest possible backend
|
||||
|
|
@ -82,10 +81,3 @@ func (b *NullBackend) PreviewURL(url string) (microsub.Timeline, error) {
|
|||
func (b *NullBackend) MarkRead(channel string, uids []string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Events returns a closed channel.
|
||||
func (b *NullBackend) Events() (chan sse.Message, error) {
|
||||
ch := make(chan sse.Message)
|
||||
close(ch)
|
||||
return ch, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,15 +1,10 @@
|
|||
package sse
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
// A MessageChan is a channel of channels
|
||||
|
|
@ -21,7 +16,6 @@ type MessageChan chan Message
|
|||
// Message is a message.
|
||||
type Message struct {
|
||||
Event string
|
||||
Data string
|
||||
Object interface{}
|
||||
}
|
||||
|
||||
|
|
@ -83,24 +77,8 @@ func NewBroker() (broker *Broker) {
|
|||
return
|
||||
}
|
||||
|
||||
// CloseClient closes the client channel
|
||||
func (broker *Broker) CloseClient(ch MessageChan) {
|
||||
broker.closingClients <- ch
|
||||
}
|
||||
|
||||
// StartConnection starts a SSE connection, based on an existing HTTP connection.
|
||||
func StartConnection(broker *Broker) (MessageChan, error) {
|
||||
// Each connection registers its own message channel with the Broker's connections registry
|
||||
messageChan := make(MessageChan)
|
||||
|
||||
// Signal the broker that we have a new connection
|
||||
broker.newClients <- messageChan
|
||||
|
||||
return messageChan, nil
|
||||
}
|
||||
|
||||
// WriteMessages writes SSE formatted messages to the writer
|
||||
func WriteMessages(w http.ResponseWriter, messageChan chan Message) error {
|
||||
func StartConnection(broker *Broker, w http.ResponseWriter) error {
|
||||
// Make sure that the writer supports flushing.
|
||||
flusher, ok := w.(http.Flusher)
|
||||
if !ok {
|
||||
|
|
@ -124,56 +102,39 @@ func WriteMessages(w http.ResponseWriter, messageChan chan Message) error {
|
|||
|
||||
flusher.Flush()
|
||||
|
||||
// block waiting or messages broadcast on this connection's messageChan
|
||||
for message := range messageChan {
|
||||
// Write to the ResponseWriter, Server Sent Events compatible
|
||||
output, err := json.Marshal(message.Object)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not marshal message data")
|
||||
}
|
||||
// Each connection registers its own message channel with the Broker's connections registry
|
||||
messageChan := make(MessageChan)
|
||||
|
||||
_, err = fmt.Fprintf(w, "event: %s\r\n", message.Event)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not write message header")
|
||||
}
|
||||
// Signal the broker that we have a new connection
|
||||
broker.newClients <- messageChan
|
||||
|
||||
_, err = fmt.Fprintf(w, "data: %s\r\n\r\n", output)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not write message data")
|
||||
}
|
||||
flusher.Flush()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Reader returns a channel that contains parsed SSE messages.
|
||||
func Reader(body io.ReadCloser) (MessageChan, error) {
|
||||
ch := make(MessageChan)
|
||||
|
||||
r := bufio.NewScanner(body)
|
||||
var msg Message
|
||||
go func() {
|
||||
for r.Scan() {
|
||||
line := r.Text()
|
||||
if line == "" {
|
||||
ch <- msg
|
||||
msg = Message{}
|
||||
continue
|
||||
}
|
||||
if strings.HasPrefix(line, "event: ") {
|
||||
line = line[len("event: "):]
|
||||
msg.Event = line
|
||||
}
|
||||
if strings.HasPrefix(line, "data: ") {
|
||||
line = line[len("data: "):]
|
||||
msg.Data = line
|
||||
}
|
||||
}
|
||||
if err := r.Err(); err != nil {
|
||||
log.Printf("could not scanner lines from sse events: %+v", err)
|
||||
}
|
||||
// Remove this client from the map of connected clients
|
||||
// when this handler exits.
|
||||
defer func() {
|
||||
broker.closingClients <- messageChan
|
||||
}()
|
||||
|
||||
return ch, nil
|
||||
// Listen to connection close and un-register messageChan
|
||||
notify := w.(http.CloseNotifier).CloseNotify()
|
||||
|
||||
go func() {
|
||||
<-notify
|
||||
broker.closingClients <- messageChan
|
||||
}()
|
||||
|
||||
// block waiting or messages broadcast on this connection's messageChan
|
||||
for {
|
||||
// Write to the ResponseWriter, Server Sent Events compatible
|
||||
message := <-messageChan
|
||||
output, err := json.Marshal(message.Object)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
continue
|
||||
}
|
||||
fmt.Fprintf(w, "event: %s\n", message.Event)
|
||||
fmt.Fprintf(w, "data: %s\n\n", output)
|
||||
|
||||
// Flush the data immediately instead of buffering it for later.
|
||||
flusher.Flush()
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user