diff --git a/cmd/eksterd/memory.go b/cmd/eksterd/memory.go index 3bbf2a1..b832ba6 100644 --- a/cmd/eksterd/memory.go +++ b/cmd/eksterd/memory.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "database/sql" + "expvar" "fmt" "io" "io/ioutil" @@ -15,6 +16,7 @@ import ( "sync" "time" + "github.com/lib/pq" "p83.nl/go/ekster/pkg/auth" "p83.nl/go/ekster/pkg/fetch" "p83.nl/go/ekster/pkg/microsub" @@ -26,6 +28,14 @@ import ( "willnorris.com/go/microformats" ) +var ( + varMicrosub *expvar.Map +) + +func init() { + varMicrosub = expvar.NewMap("microsub") +} + // DefaultPrio is the priority value for new channels const DefaultPrio = 9999999 @@ -125,27 +135,53 @@ GROUP BY c.id; return channels, nil } +func shouldRetryWithNewUID(err error, try int) bool { + if err == nil { + return false + } + + if e, ok := err.(*pq.Error); ok { + if e.Code == "23505" && e.Constraint == "channels_uid_key" { + if try > 5 { + return false + } + return true + } + } + return false +} + // ChannelsCreate creates a channels func (b *memoryBackend) ChannelsCreate(name string) (microsub.Channel, error) { - uid := util.RandStringBytes(24) - + varMicrosub.Add("ChannelsCreate", 1) + /* + * try 5 times to generate a uid for a channel. + * If we get a database error we retry. + */ + try := 0 channel := microsub.Channel{ - UID: uid, Name: name, Unread: microsub.Unread{Type: microsub.UnreadCount}, } - - result, err := b.database.Exec(`insert into "channels" ("uid", "name", "created_at") values($1, $2, DEFAULT)`, channel.UID, channel.Name) - if err != nil { - return channel, err - } - - if n, err := result.RowsAffected(); err != nil { - if n > 0 { - b.broker.Notifier <- sse.Message{Event: "new channel", Object: channelMessage{1, channel}} + for { + varMicrosub.Add("ChannelsCreate.RandStringBytes", 1) + channel.UID = util.RandStringBytes(24) + result, err := b.database.Exec(`insert into "channels" ("uid", "name", "created_at") values($1, $2, DEFAULT)`, channel.UID, channel.Name) + if err != nil { + log.Println("channels insert", err) + if !shouldRetryWithNewUID(err, try) { + return channel, err + } + try++ + continue } + if n, err := result.RowsAffected(); err != nil { + if n > 0 { + b.broker.Notifier <- sse.Message{Event: "new channel", Object: channelMessage{1, channel}} + } + } + return channel, nil } - return channel, nil } // ChannelsUpdate updates a channels