From ca7f48f6138f7a40449d90bb0005109c9368a44c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Leszko?= Date: Fri, 12 Jul 2024 10:05:37 +0200 Subject: [PATCH 1/2] Add Mist authentication to catalyst-api (#1317) --- clients/mist_client.go | 77 +++++++++++++++++++++++++++++++++++++++--- main.go | 4 --- 2 files changed, 72 insertions(+), 9 deletions(-) diff --git a/clients/mist_client.go b/clients/mist_client.go index 46351c85..cd782f50 100644 --- a/clients/mist_client.go +++ b/clients/mist_client.go @@ -2,6 +2,7 @@ package clients import ( "bytes" + "crypto/md5" "encoding/json" "errors" "fmt" @@ -44,6 +45,8 @@ type MistAPIClient interface { type MistClient struct { ApiUrl string + Username string + Password string HttpReqUrl string TriggerCallback string configMu sync.Mutex @@ -53,6 +56,8 @@ type MistClient struct { func NewMistAPIClient(user, password, host string, port int, ownURL string) MistAPIClient { mist := &MistClient{ ApiUrl: fmt.Sprintf("http://%s:%d", host, port), + Username: user, + Password: password, TriggerCallback: ownURL, cache: cache.New(defaultCacheExpiration, cacheCleanupInterval), } @@ -117,6 +122,13 @@ type MistState struct { PushAutoList []*MistPushAuto `json:"push_auto_list"` } +type AuthorizationResponse struct { + Authorize struct { + Status string `json:"status"` + Challenge string `json:"challenge"` + } `json:"authorize"` +} + func (ms MistState) IsIngestStream(stream string) bool { if ms.ActiveStreams == nil { return false @@ -351,6 +363,38 @@ func (mc *MistClient) getCurrentTriggers() (Triggers, error) { } func (mc *MistClient) sendCommand(command interface{}) (string, error) { + resp, err := mc.sendCommandToMist(command) + if authErr := validateAuth(resp, err); authErr != nil { + glog.Infof("Request to Mist not authorized, authorizing and retrying command: %v", command) + if authErr := mc.authorize(resp); authErr != nil { + glog.Warningf("Failed to authorize Mist request: %v", authErr) + return resp, err + } + return mc.sendCommandToMist(command) + } + return resp, err +} + +// authorize authorizes the communication with Mist Server by sending the authorization command. +// Mist doc: https://docs.mistserver.org/docs/mistserver/integration/api/authentication +func (mc *MistClient) authorize(unauthResp string) error { + r := AuthorizationResponse{} + if err := json.Unmarshal([]byte(unauthResp), &r); err != nil { + return err + } + passwordMd5, err := computeMD5Hash(mc.Password) + if err != nil { + return err + } + password, err := computeMD5Hash(passwordMd5 + r.Authorize.Challenge) + if err != nil { + return err + } + c := commandAuthorize(mc.Username, password) + return validateAuth(mc.sendCommandToMist(c)) +} + +func (mc *MistClient) sendCommandToMist(command interface{}) (string, error) { c, err := commandToString(command) if err != nil { return "", err @@ -449,6 +493,24 @@ func (mc *MistClient) GetState() (MistState, error) { return stats, nil } +type authorizeCommand struct { + Authorize Authorize `json:"authorize"` +} + +type Authorize struct { + Username string `json:"username"` + Password string `json:"password"` +} + +func commandAuthorize(username, password string) interface{} { + return authorizeCommand{ + Authorize: Authorize{ + Username: username, + Password: password, + }, + } +} + type addStreamCommand struct { Addstream map[string]Stream `json:"addstream"` } @@ -723,11 +785,7 @@ func validateAuth(resp string, err error) error { if err != nil { return err } - r := struct { - Authorize struct { - Status string `json:"status"` - } `json:"authorize"` - }{} + r := AuthorizationResponse{} if err := json.Unmarshal([]byte(resp), &r); err != nil { return err @@ -738,6 +796,15 @@ func validateAuth(resp string, err error) error { return nil } +func computeMD5Hash(input string) (string, error) { + hasher := md5.New() + _, err := io.WriteString(hasher, input) + if err != nil { + return "", err + } + return fmt.Sprintf("%x", hasher.Sum(nil)), nil +} + func wrapErr(err error, streamName string) error { if err != nil { return fmt.Errorf("error in processing stream '%s': %w", streamName, err) diff --git a/main.go b/main.go index d82b72d7..0fbcf0e2 100644 --- a/main.go +++ b/main.go @@ -175,10 +175,6 @@ func main() { return } - if cli.MistUser != "" || cli.MistPassword != "" { - glog.Warning("DEPRECATION NOTICE: mist-user and mist-password are no longer used and will be removed in a later version") - } - // TODO: I don't love the global variables for these config.ImportIPFSGatewayURLs = cli.ImportIPFSGatewayURLs config.ImportArweaveGatewayURLs = cli.ImportArweaveGatewayURLs From 10a437c9ead794785218bd4ef3f94e414370d613 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Leszko?= Date: Tue, 16 Jul 2024 09:16:18 +0200 Subject: [PATCH 2/2] Refactor cluster abstraction: remove Member() and ResolveNodeURL() (#1320) This change is refactoring-only, it does not introduce any functional changes. It improves the Cluster interface abstraction, two of its functions were used only inside geolocation, so it's better to move it there and keep it package-private. --- cluster/cluster.go | 56 ------------------------ handlers/geolocation/geolocation.go | 52 +++++++++++++++++++++- handlers/geolocation/geolocation_test.go | 10 ++--- 3 files changed, 53 insertions(+), 65 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index 8db832d4..2c39d340 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -7,8 +7,6 @@ import ( "fmt" "math/rand" "net" - "net/url" - "path/filepath" "strconv" "strings" "time" @@ -23,9 +21,7 @@ import ( type Cluster interface { Start(ctx context.Context) error MembersFiltered(filter map[string]string, status, name string) ([]Member, error) - Member(filter map[string]string, status, name string) (Member, error) MemberChan() chan []Member - ResolveNodeURL(streamURL string) (string, error) EventChan() <-chan serf.UserEvent BroadcastEvent(serf.UserEvent) error } @@ -214,63 +210,11 @@ func (c *ClusterImpl) MembersFiltered(filter map[string]string, status, name str return nodes, nil } -func (c *ClusterImpl) Member(filter map[string]string, status, name string) (Member, error) { - members, err := c.MembersFiltered(filter, "", name) - if err != nil { - return Member{}, err - } - if len(members) < 1 { - return Member{}, fmt.Errorf("could not find serf member name=%s", name) - } - if len(members) > 1 { - glog.Errorf("found multiple serf members with the same name! this shouldn't happen! name=%s count=%d", name, len(members)) - } - if members[0].Status != status { - return Member{}, fmt.Errorf("found serf member name=%s but status=%s (wanted %s)", name, members[0].Status, status) - } - - return members[0], nil -} - // Subscribe to changes in the member list. Please only call me once. I only have one channel internally. func (c *ClusterImpl) MemberChan() chan []Member { return c.memberCh } -// Given a dtsc:// or https:// url, resolve the proper address of the node via serf tags -func (c *ClusterImpl) ResolveNodeURL(streamURL string) (string, error) { - return ResolveNodeURL(c, streamURL) -} - -// Separated here to be more easily fed mocks for testing -func ResolveNodeURL(c Cluster, streamURL string) (string, error) { - u, err := url.Parse(streamURL) - if err != nil { - return "", err - } - nodeName := u.Host - protocol := u.Scheme - - member, err := c.Member(map[string]string{}, "alive", nodeName) - if err != nil { - return "", err - } - addr, has := member.Tags[protocol] - if !has { - glog.V(7).Infof("no tag found, not tag resolving protocol=%s nodeName=%s", protocol, nodeName) - return streamURL, nil - } - u2, err := url.Parse(addr) - if err != nil { - err = fmt.Errorf("node has unparsable tag!! nodeName=%s protocol=%s tag=%s", nodeName, protocol, addr) - glog.Error(err) - return "", err - } - u2.Path = filepath.Join(u2.Path, u.Path) - u2.RawQuery = u.RawQuery - return u2.String(), nil -} - // Subscribe to events broadcaster in the serf cluster. Please only call me once. I only have one channel internally. func (c *ClusterImpl) EventChan() <-chan serf.UserEvent { return c.eventCh diff --git a/handlers/geolocation/geolocation.go b/handlers/geolocation/geolocation.go index 356de619..b6ce64f4 100644 --- a/handlers/geolocation/geolocation.go +++ b/handlers/geolocation/geolocation.go @@ -7,6 +7,7 @@ import ( "math/rand" "net/http" "net/url" + "path/filepath" "regexp" "strconv" "strings" @@ -189,7 +190,7 @@ func (c *GeolocationHandlersCollection) RedirectHandler() httprouter.Handle { rPath := fmt.Sprintf(pathTmpl, fullPlaybackID) rURL := fmt.Sprintf("%s://%s%s?%s", protocol(r), bestNode, rPath, r.URL.RawQuery) - rURL, err = c.Cluster.ResolveNodeURL(rURL) + rURL, err = c.resolveNodeURL(rURL) if err != nil { glog.Errorf("failed to resolve node URL playbackID=%s err=%s", playbackID, err) w.WriteHeader(http.StatusInternalServerError) @@ -213,6 +214,53 @@ func (c *GeolocationHandlersCollection) RedirectHandler() httprouter.Handle { } } +// Given a dtsc:// or https:// url, resolve the proper address of the node via serf tags +func (c *GeolocationHandlersCollection) resolveNodeURL(streamURL string) (string, error) { + u, err := url.Parse(streamURL) + if err != nil { + return "", err + } + nodeName := u.Host + protocol := u.Scheme + + member, err := c.clusterMember(map[string]string{}, "alive", nodeName) + if err != nil { + return "", err + } + addr, has := member.Tags[protocol] + if !has { + glog.V(7).Infof("no tag found, not tag resolving protocol=%s nodeName=%s", protocol, nodeName) + return streamURL, nil + } + u2, err := url.Parse(addr) + if err != nil { + err = fmt.Errorf("node has unparsable tag!! nodeName=%s protocol=%s tag=%s", nodeName, protocol, addr) + glog.Error(err) + return "", err + } + u2.Path = filepath.Join(u2.Path, u.Path) + u2.RawQuery = u.RawQuery + return u2.String(), nil +} + +func (c *GeolocationHandlersCollection) clusterMember(filter map[string]string, status, name string) (cluster.Member, error) { + members, err := c.Cluster.MembersFiltered(filter, "", name) + if err != nil { + return cluster.Member{}, err + } + if len(members) < 1 { + return cluster.Member{}, fmt.Errorf("could not find serf member name=%s", name) + } + if len(members) > 1 { + glog.Errorf("found multiple serf members with the same name! this shouldn't happen! name=%s count=%d", name, len(members)) + } + if members[0].Status != status { + return cluster.Member{}, fmt.Errorf("found serf member name=%s but status=%s (wanted %s)", name, members[0].Status, status) + } + + return members[0], nil +} + // RedirectConstPathHandler redirects const path into the self catalyst node if it was not yet redirected. func (c *GeolocationHandlersCollection) RedirectConstPathHandler() httprouter.Handle { return func(w http.ResponseWriter, r *http.Request, params httprouter.Params) { @@ -283,7 +331,7 @@ func playbackIdFor(streamName string) string { } func (c *GeolocationHandlersCollection) resolveReplicatedStream(dtscURL string, streamName string) (string, error) { - outURL, err := c.Cluster.ResolveNodeURL(dtscURL) + outURL, err := c.resolveNodeURL(dtscURL) if err != nil { glog.Errorf("error finding STREAM_SOURCE: %s", err) return "push://", nil diff --git a/handlers/geolocation/geolocation_test.go b/handlers/geolocation/geolocation_test.go index b6103b31..da5578d6 100644 --- a/handlers/geolocation/geolocation_test.go +++ b/handlers/geolocation/geolocation_test.go @@ -36,6 +36,7 @@ var fakeSerfMember = cluster.Member{ "https": fmt.Sprintf("https://%s", closestNodeAddr), "dtsc": fmt.Sprintf("dtsc://%s", closestNodeAddr), }, + Status: "alive", } var prefixes = [...]string{"video", "videorec", "stream", "playback", "vod"} @@ -158,15 +159,10 @@ func mockHandlers(t *testing.T) *GeolocationHandlersCollection { Return("", "", errors.New("")) mc.EXPECT(). - Member(map[string]string{}, "alive", closestNodeAddr). + MembersFiltered(map[string]string{}, gomock.Any(), closestNodeAddr). AnyTimes(). - Return(fakeSerfMember, nil) + Return([]cluster.Member{fakeSerfMember}, nil) - mc.EXPECT(). - ResolveNodeURL(gomock.Any()).DoAndReturn(func(streamURL string) (string, error) { - return cluster.ResolveNodeURL(mc, streamURL) - }). - AnyTimes() coll := GeolocationHandlersCollection{ Balancer: mb, Cluster: mc,