package main import ( "crypto/hmac" "crypto/sha1" "fmt" "io/ioutil" "log" "net/http" "net/url" "os" "strconv" "strings" "time" "p83.nl/go/websub-hub/cmd/hubserver/storage" ) type subscriptionHandler struct { store storage.Service baseURL string } func (handler *subscriptionHandler) handlePublish(w http.ResponseWriter, r *http.Request) error { topic := r.Form.Get("hub.topic") log.Printf("publish: topic = %s\n", topic) client := &http.Client{} req, err := http.NewRequest("GET", topic, nil) if err != nil { return err } req.Header.Add("Accept", "*/*") res, err := client.Do(req) if err != nil { return err } defer res.Body.Close() feedContentType := res.Header.Get("Content-Type") feedContent, err := ioutil.ReadAll(res.Body) if err != nil { return err } if subs, err := handler.store.Subscribers(topic); err != nil { for _, sub := range subs { log.Printf("publish: creating post to %s\n", sub.Callback) postReq, err := http.NewRequest("POST", sub.Callback, strings.NewReader(string(feedContent))) if err != nil { log.Printf("While creating request to %s: %s", sub.Callback, err) continue } postReq.Header.Add("Content-Type", feedContentType) postReq.Header.Add("Link", fmt.Sprintf( "<%s>; rel=hub, <%s>; rel=self", handler.baseURL, topic, )) if sub.Secret != "" { mac := hmac.New(sha1.New, []byte(sub.Secret)) mac.Write(feedContent) signature := mac.Sum(nil) postReq.Header.Add("X-Hub-Signature", fmt.Sprintf("sha1=%x", signature)) } postRes, err := client.Do(postReq) if err != nil { log.Printf("While POSTing to %s: %s", sub.Callback, err) continue } log.Printf("publish: post send to %s\n", sub.Callback) log.Println("Response:") _ = postRes.Write(os.Stdout) } } return nil } func (handler *subscriptionHandler) handleUnsubscription(w http.ResponseWriter, r *http.Request) error { callback := r.Form.Get("hub.callback") topic := r.Form.Get("hub.topic") mode := r.Form.Get("hub.mode") if subscribers, err := handler.store.Subscribers(topic); err != nil { for _, subscriber := range subscribers { if subscriber.Callback != callback { continue } ourChallenge := randStringBytes(12) validationURL, err := url.Parse(callback) if err != nil { log.Println(err) return err } q := validationURL.Query() q.Add("hub.mode", mode) q.Add("hub.topic", topic) q.Add("hub.challenge", ourChallenge) validationURL.RawQuery = q.Encode() if validateURL(validationURL.String(), ourChallenge) { err = handler.store.Unsubscribe(topic, callback) if err != nil { return err } } } w.WriteHeader(200) _, err = fmt.Fprintf(w, "Unsubscribed\n") return err } else { http.Error(w, "Hub does not handle subscription for topic", 400) } return nil } func (handler *subscriptionHandler) handleSubscription(w http.ResponseWriter, r *http.Request) error { log.Printf("subscription request received: %s %#v\n", r.URL.String(), r.Form) callback := r.Form.Get("hub.callback") topic := r.Form.Get("hub.topic") secret := r.Form.Get("hub.secret") leaseSecondsStr := r.Form.Get("hub.lease_seconds") leaseSeconds, err := strconv.ParseInt(leaseSecondsStr, 10, 64) if leaseSecondsStr != "" && err != nil { http.Error(w, "hub.lease_seconds is used, but not a valid integer", 400) log.Printf("hub.lease_seconds is used, but not a valid integer (%s)\n", leaseSecondsStr) return err } log.Printf("subscribe: received for topic=%s to callback=%s (lease=%ds)\n", topic, callback, leaseSeconds) if _, e := r.Form["hub.lease_seconds"]; !e { leaseSeconds = 3600 leaseSecondsStr = "3600" log.Printf("subscribe: lease_seconds was empty use default %ds\n", leaseSeconds) } callbackURL, err := url.Parse(callback) if callback == "" || err != nil { http.Error(w, "Can not parse callback url", 400) log.Printf("Can not parse callback url: %s\n", callback) return err } topicURL, err := url.Parse(topic) if topic == "" || err != nil { http.Error(w, "Can't parse topic url", 400) log.Printf("Can't parse topic url: %s\n", topic) return err } log.Println("subscribe: sending 202 header request accepted") w.WriteHeader(202) _, _ = fmt.Fprint(w, "Accepted\r\n") go func() { ourChallenge := randStringBytes(12) validationURL := *callbackURL q := validationURL.Query() q.Add("hub.mode", "subscribe") q.Add("hub.topic", topicURL.String()) q.Add("hub.challenge", ourChallenge) q.Add("hub.lease_seconds", leaseSecondsStr) if secret != "" { q.Add("hub.verify_token", secret) } validationURL.RawQuery = q.Encode() log.Printf("subscribe: async validation with url %s\n", validationURL.String()) if validateURL(validationURL.String(), ourChallenge) { log.Printf("subscribe: validation valid\n") err = handler.store.Subscribe(topicURL.String(), storage.Subscriber{callbackURL.String(), leaseSeconds, secret, time.Now()}) log.Printf("saving suscriber failed: %v", err) } else { log.Printf("subscribe: validation failed\n") } }() return nil } func validateURL(url, challenge string) bool { client := http.Client{} req, err := http.NewRequest(http.MethodGet, url, nil) if err != nil { log.Println(err) return false } res, err := client.Do(req) if err != nil { log.Println(err) return false } defer res.Body.Close() body, err := ioutil.ReadAll(res.Body) if err != nil { log.Println(err) return false } return strings.Contains(string(body), challenge) } func (handler *subscriptionHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if r.Header.Get("Content-Type") != "application/x-www-form-urlencoded" { http.Error(w, "Bad Request", 400) return } err := r.ParseForm() if err != nil { http.Error(w, "Bad Request", 400) return } mode := r.Form.Get("hub.mode") if mode == "subscribe" { err = handler.handleSubscription(w, r) log.Printf("saving subscribe failed: %v", err) return } else if mode == "unsubscribe" { err = handler.handleUnsubscription(w, r) log.Printf("saving unsubscribe failed: %v", err) return } else if mode == "publish" { log.Println("hub.mode=publish received") err = handler.handlePublish(w, r) log.Printf("handling publish failed: %v", err) return } else { http.Error(w, "Unknown hub.mode", 400) return } }