diff --git a/clients/mist_client.go b/clients/mist_client.go index 46351c85a..cd782f50d 100644 --- a/clients/mist_client.go +++ b/clients/mist_client.go @@ -2,6 +2,7 @@ package clients import ( "bytes" + "crypto/md5" "encoding/json" "errors" "fmt" @@ -44,6 +45,8 @@ type MistAPIClient interface { type MistClient struct { ApiUrl string + Username string + Password string HttpReqUrl string TriggerCallback string configMu sync.Mutex @@ -53,6 +56,8 @@ type MistClient struct { func NewMistAPIClient(user, password, host string, port int, ownURL string) MistAPIClient { mist := &MistClient{ ApiUrl: fmt.Sprintf("http://%s:%d", host, port), + Username: user, + Password: password, TriggerCallback: ownURL, cache: cache.New(defaultCacheExpiration, cacheCleanupInterval), } @@ -117,6 +122,13 @@ type MistState struct { PushAutoList []*MistPushAuto `json:"push_auto_list"` } +type AuthorizationResponse struct { + Authorize struct { + Status string `json:"status"` + Challenge string `json:"challenge"` + } `json:"authorize"` +} + func (ms MistState) IsIngestStream(stream string) bool { if ms.ActiveStreams == nil { return false @@ -351,6 +363,38 @@ func (mc *MistClient) getCurrentTriggers() (Triggers, error) { } func (mc *MistClient) sendCommand(command interface{}) (string, error) { + resp, err := mc.sendCommandToMist(command) + if authErr := validateAuth(resp, err); authErr != nil { + glog.Infof("Request to Mist not authorized, authorizing and retrying command: %v", command) + if authErr := mc.authorize(resp); authErr != nil { + glog.Warningf("Failed to authorize Mist request: %v", authErr) + return resp, err + } + return mc.sendCommandToMist(command) + } + return resp, err +} + +// authorize authorizes the communication with Mist Server by sending the authorization command. +// Mist doc: https://docs.mistserver.org/docs/mistserver/integration/api/authentication +func (mc *MistClient) authorize(unauthResp string) error { + r := AuthorizationResponse{} + if err := json.Unmarshal([]byte(unauthResp), &r); err != nil { + return err + } + passwordMd5, err := computeMD5Hash(mc.Password) + if err != nil { + return err + } + password, err := computeMD5Hash(passwordMd5 + r.Authorize.Challenge) + if err != nil { + return err + } + c := commandAuthorize(mc.Username, password) + return validateAuth(mc.sendCommandToMist(c)) +} + +func (mc *MistClient) sendCommandToMist(command interface{}) (string, error) { c, err := commandToString(command) if err != nil { return "", err @@ -449,6 +493,24 @@ func (mc *MistClient) GetState() (MistState, error) { return stats, nil } +type authorizeCommand struct { + Authorize Authorize `json:"authorize"` +} + +type Authorize struct { + Username string `json:"username"` + Password string `json:"password"` +} + +func commandAuthorize(username, password string) interface{} { + return authorizeCommand{ + Authorize: Authorize{ + Username: username, + Password: password, + }, + } +} + type addStreamCommand struct { Addstream map[string]Stream `json:"addstream"` } @@ -723,11 +785,7 @@ func validateAuth(resp string, err error) error { if err != nil { return err } - r := struct { - Authorize struct { - Status string `json:"status"` - } `json:"authorize"` - }{} + r := AuthorizationResponse{} if err := json.Unmarshal([]byte(resp), &r); err != nil { return err @@ -738,6 +796,15 @@ func validateAuth(resp string, err error) error { return nil } +func computeMD5Hash(input string) (string, error) { + hasher := md5.New() + _, err := io.WriteString(hasher, input) + if err != nil { + return "", err + } + return fmt.Sprintf("%x", hasher.Sum(nil)), nil +} + func wrapErr(err error, streamName string) error { if err != nil { return fmt.Errorf("error in processing stream '%s': %w", streamName, err) diff --git a/cluster/cluster.go b/cluster/cluster.go index 8db832d4a..2c39d3400 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -7,8 +7,6 @@ import ( "fmt" "math/rand" "net" - "net/url" - "path/filepath" "strconv" "strings" "time" @@ -23,9 +21,7 @@ import ( type Cluster interface { Start(ctx context.Context) error MembersFiltered(filter map[string]string, status, name string) ([]Member, error) - Member(filter map[string]string, status, name string) (Member, error) MemberChan() chan []Member - ResolveNodeURL(streamURL string) (string, error) EventChan() <-chan serf.UserEvent BroadcastEvent(serf.UserEvent) error } @@ -214,63 +210,11 @@ func (c *ClusterImpl) MembersFiltered(filter map[string]string, status, name str return nodes, nil } -func (c *ClusterImpl) Member(filter map[string]string, status, name string) (Member, error) { - members, err := c.MembersFiltered(filter, "", name) - if err != nil { - return Member{}, err - } - if len(members) < 1 { - return Member{}, fmt.Errorf("could not find serf member name=%s", name) - } - if len(members) > 1 { - glog.Errorf("found multiple serf members with the same name! this shouldn't happen! name=%s count=%d", name, len(members)) - } - if members[0].Status != status { - return Member{}, fmt.Errorf("found serf member name=%s but status=%s (wanted %s)", name, members[0].Status, status) - } - - return members[0], nil -} - // Subscribe to changes in the member list. Please only call me once. I only have one channel internally. func (c *ClusterImpl) MemberChan() chan []Member { return c.memberCh } -// Given a dtsc:// or https:// url, resolve the proper address of the node via serf tags -func (c *ClusterImpl) ResolveNodeURL(streamURL string) (string, error) { - return ResolveNodeURL(c, streamURL) -} - -// Separated here to be more easily fed mocks for testing -func ResolveNodeURL(c Cluster, streamURL string) (string, error) { - u, err := url.Parse(streamURL) - if err != nil { - return "", err - } - nodeName := u.Host - protocol := u.Scheme - - member, err := c.Member(map[string]string{}, "alive", nodeName) - if err != nil { - return "", err - } - addr, has := member.Tags[protocol] - if !has { - glog.V(7).Infof("no tag found, not tag resolving protocol=%s nodeName=%s", protocol, nodeName) - return streamURL, nil - } - u2, err := url.Parse(addr) - if err != nil { - err = fmt.Errorf("node has unparsable tag!! nodeName=%s protocol=%s tag=%s", nodeName, protocol, addr) - glog.Error(err) - return "", err - } - u2.Path = filepath.Join(u2.Path, u.Path) - u2.RawQuery = u.RawQuery - return u2.String(), nil -} - // Subscribe to events broadcaster in the serf cluster. Please only call me once. I only have one channel internally. func (c *ClusterImpl) EventChan() <-chan serf.UserEvent { return c.eventCh diff --git a/handlers/geolocation/geolocation.go b/handlers/geolocation/geolocation.go index 356de6196..b6ce64f4d 100644 --- a/handlers/geolocation/geolocation.go +++ b/handlers/geolocation/geolocation.go @@ -7,6 +7,7 @@ import ( "math/rand" "net/http" "net/url" + "path/filepath" "regexp" "strconv" "strings" @@ -189,7 +190,7 @@ func (c *GeolocationHandlersCollection) RedirectHandler() httprouter.Handle { rPath := fmt.Sprintf(pathTmpl, fullPlaybackID) rURL := fmt.Sprintf("%s://%s%s?%s", protocol(r), bestNode, rPath, r.URL.RawQuery) - rURL, err = c.Cluster.ResolveNodeURL(rURL) + rURL, err = c.resolveNodeURL(rURL) if err != nil { glog.Errorf("failed to resolve node URL playbackID=%s err=%s", playbackID, err) w.WriteHeader(http.StatusInternalServerError) @@ -213,6 +214,53 @@ func (c *GeolocationHandlersCollection) RedirectHandler() httprouter.Handle { } } +// Given a dtsc:// or https:// url, resolve the proper address of the node via serf tags +func (c *GeolocationHandlersCollection) resolveNodeURL(streamURL string) (string, error) { + u, err := url.Parse(streamURL) + if err != nil { + return "", err + } + nodeName := u.Host + protocol := u.Scheme + + member, err := c.clusterMember(map[string]string{}, "alive", nodeName) + if err != nil { + return "", err + } + addr, has := member.Tags[protocol] + if !has { + glog.V(7).Infof("no tag found, not tag resolving protocol=%s nodeName=%s", protocol, nodeName) + return streamURL, nil + } + u2, err := url.Parse(addr) + if err != nil { + err = fmt.Errorf("node has unparsable tag!! nodeName=%s protocol=%s tag=%s", nodeName, protocol, addr) + glog.Error(err) + return "", err + } + u2.Path = filepath.Join(u2.Path, u.Path) + u2.RawQuery = u.RawQuery + return u2.String(), nil +} + +func (c *GeolocationHandlersCollection) clusterMember(filter map[string]string, status, name string) (cluster.Member, error) { + members, err := c.Cluster.MembersFiltered(filter, "", name) + if err != nil { + return cluster.Member{}, err + } + if len(members) < 1 { + return cluster.Member{}, fmt.Errorf("could not find serf member name=%s", name) + } + if len(members) > 1 { + glog.Errorf("found multiple serf members with the same name! this shouldn't happen! name=%s count=%d", name, len(members)) + } + if members[0].Status != status { + return cluster.Member{}, fmt.Errorf("found serf member name=%s but status=%s (wanted %s)", name, members[0].Status, status) + } + + return members[0], nil +} + // RedirectConstPathHandler redirects const path into the self catalyst node if it was not yet redirected. func (c *GeolocationHandlersCollection) RedirectConstPathHandler() httprouter.Handle { return func(w http.ResponseWriter, r *http.Request, params httprouter.Params) { @@ -283,7 +331,7 @@ func playbackIdFor(streamName string) string { } func (c *GeolocationHandlersCollection) resolveReplicatedStream(dtscURL string, streamName string) (string, error) { - outURL, err := c.Cluster.ResolveNodeURL(dtscURL) + outURL, err := c.resolveNodeURL(dtscURL) if err != nil { glog.Errorf("error finding STREAM_SOURCE: %s", err) return "push://", nil diff --git a/handlers/geolocation/geolocation_test.go b/handlers/geolocation/geolocation_test.go index b6103b31c..da5578d68 100644 --- a/handlers/geolocation/geolocation_test.go +++ b/handlers/geolocation/geolocation_test.go @@ -36,6 +36,7 @@ var fakeSerfMember = cluster.Member{ "https": fmt.Sprintf("https://%s", closestNodeAddr), "dtsc": fmt.Sprintf("dtsc://%s", closestNodeAddr), }, + Status: "alive", } var prefixes = [...]string{"video", "videorec", "stream", "playback", "vod"} @@ -158,15 +159,10 @@ func mockHandlers(t *testing.T) *GeolocationHandlersCollection { Return("", "", errors.New("")) mc.EXPECT(). - Member(map[string]string{}, "alive", closestNodeAddr). + MembersFiltered(map[string]string{}, gomock.Any(), closestNodeAddr). AnyTimes(). - Return(fakeSerfMember, nil) + Return([]cluster.Member{fakeSerfMember}, nil) - mc.EXPECT(). - ResolveNodeURL(gomock.Any()).DoAndReturn(func(streamURL string) (string, error) { - return cluster.ResolveNodeURL(mc, streamURL) - }). - AnyTimes() coll := GeolocationHandlersCollection{ Balancer: mb, Cluster: mc, diff --git a/main.go b/main.go index a42acb35b..4b5f193bf 100644 --- a/main.go +++ b/main.go @@ -176,11 +176,7 @@ func main() { return } - if cli.MistUser != "" || cli.MistPassword != "" { - glog.Warning("DEPRECATION NOTICE: mist-user and mist-password are no longer used and will be removed in a later version") - } - - // TODO: I don't love the global variables for these. Me neither. + // TODO: I don't love the global variables for these config.ImportIPFSGatewayURLs = cli.ImportIPFSGatewayURLs config.ImportArweaveGatewayURLs = cli.ImportArweaveGatewayURLs config.HTTPInternalAddress = cli.HTTPInternalAddress