From b29cfc70ae05079456ce6d7a6b916370141cece1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Leszko?= Date: Tue, 3 Sep 2024 14:13:54 +0200 Subject: [PATCH 1/9] Add logging to debug issues with /mist/trigger proxying --- handlers/accesscontrol/access-control.go | 4 ++++ handlers/misttriggers/user_new.go | 2 ++ 2 files changed, 6 insertions(+) diff --git a/handlers/accesscontrol/access-control.go b/handlers/accesscontrol/access-control.go index 18619e251..a2d216ee7 100644 --- a/handlers/accesscontrol/access-control.go +++ b/handlers/accesscontrol/access-control.go @@ -170,6 +170,7 @@ func NewAccessControlHandlersCollection(cli config.Cli, mapic mistapiconnector.I } func (ac *AccessControlHandlersCollection) HandleUserNew(ctx context.Context, payload *misttriggers.UserNewPayload) (bool, error) { + glog.Infof("Handling USER_NEW trigger payload=%v", payload) playbackID := payload.StreamName[strings.Index(payload.StreamName, "+")+1:] ctx = log.WithLogValues(ctx, "playback_id", playbackID) @@ -189,6 +190,7 @@ func (ac *AccessControlHandlersCollection) HandleUserNew(ctx context.Context, pa func (ac *AccessControlHandlersCollection) IsAuthorized(ctx context.Context, playbackID string, payload *misttriggers.UserNewPayload) (allowed bool, err error) { + glog.Infof("Handling IsAuthorized trigger playbackID=%s, payload=%v", playbackID, payload) if payload.Origin == "null" && payload.Referer == "" { // Allow redirects without caching match, _ := regexp.MatchString(`(?:prod|staging)-.*catalyst-\d+`, payload.Host) @@ -216,10 +218,12 @@ func (ac *AccessControlHandlersCollection) IsAuthorized(ctx context.Context, pla Inc() }() allowed, err = ac.isAuthorized(ctx, playbackID, payload) + glog.Infof("isAuthorized handled playbackID=%s, allowed=%v, err=%v", playbackID, allowed, err) return } func (ac *AccessControlHandlersCollection) isAuthorized(ctx context.Context, playbackID string, payload *misttriggers.UserNewPayload) (bool, error) { + glog.Infof("Handling AccessControlHandlersCollection.isAuthorized() trigger payload=%v", payload) webhookHeaders := make(map[string]string) webhookHeaders["User-Agent"] = payload.UserAgent diff --git a/handlers/misttriggers/user_new.go b/handlers/misttriggers/user_new.go index 174bce744..36bcf3edc 100644 --- a/handlers/misttriggers/user_new.go +++ b/handlers/misttriggers/user_new.go @@ -64,6 +64,8 @@ func (d *MistCallbackHandlersCollection) TriggerUserNew(ctx context.Context, w h payload, err := ParseUserNewPayload(body) cookies := req.Cookies() + glog.Infof("Got USER_NEW trigger sessionId=%q payload=%v", payload.SessionID, body) + var accessKey, jwt string for _, cookie := range cookies { switch cookie.Name { From c7a6441ee5b087640e1abebd25cc6083be3a53fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Leszko?= Date: Tue, 3 Sep 2024 15:38:09 +0200 Subject: [PATCH 2/9] Add more debug --- handlers/accesscontrol/access-control.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/handlers/accesscontrol/access-control.go b/handlers/accesscontrol/access-control.go index a2d216ee7..0c3a6a898 100644 --- a/handlers/accesscontrol/access-control.go +++ b/handlers/accesscontrol/access-control.go @@ -267,6 +267,7 @@ func (ac *AccessControlHandlersCollection) isAuthorized(ctx context.Context, pla return false, err } cacheKey = "accessKey_" + hashCacheKey + glog.Infof("Produced cacheKey from accessKey: %s", cacheKey) } else if jwt != "" { for _, blocked := range ac.blockedJWTs { if jwt == blocked { @@ -288,6 +289,7 @@ func (ac *AccessControlHandlersCollection) isAuthorized(ctx context.Context, pla return false, err } cacheKey = "jwtPubKey_" + hashCacheKey + glog.Infof("Produced cacheKey from JWT: %s", cacheKey) } body, err := json.Marshal(acReq) @@ -377,6 +379,7 @@ func (ac *AccessControlHandlersCollection) refreshConcurrentViewerCache(playback } func (ac *AccessControlHandlersCollection) GetPlaybackAccessControlInfo(ctx context.Context, playbackID, cacheKey string, requestBody []byte) (bool, error) { + glog.Infof("GetPlaybackAccessControlInfo playbackID=%s, cacheKey=%s, requestBody=%v", playbackID, cacheKey, requestBody) ac.mutex.RLock() entry := ac.cache[playbackID][cacheKey] ac.mutex.RUnlock() @@ -384,22 +387,27 @@ func (ac *AccessControlHandlersCollection) GetPlaybackAccessControlInfo(ctx cont if isExpired(entry) { log.V(7).LogCtx(ctx, "Cache expired", "cache_key", cacheKey) + glog.Infof("Cache expired, calling cachePlaybackAccessControlInfo") err := ac.cachePlaybackAccessControlInfo(playbackID, cacheKey, requestBody) if err != nil { return false, err } + glog.Infof("Cache expired, cached new playback access control info") } else if isStale(entry) { log.V(7).LogCtx(ctx, "Cache stale", "cache_key", cacheKey) + glog.Infof("Cache stale") go func() { ac.mutex.RLock() stillStale := isStale(ac.cache[playbackID][cacheKey]) ac.mutex.RUnlock() if stillStale { + glog.Infof("Cache still stale, calling cachePlaybackAccessControlInfo") err := ac.cachePlaybackAccessControlInfo(playbackID, cacheKey, requestBody) if err != nil { log.LogCtx(ctx, "Error caching playback access control info", "err", err) } + glog.Infof("Cache still stale, cached new playback access control info") } }() } @@ -433,6 +441,7 @@ func isStale(entry *PlaybackAccessControlEntry) bool { } func (ac *AccessControlHandlersCollection) cachePlaybackAccessControlInfo(playbackID, cacheKey string, requestBody []byte) error { + glog.Infof("cachePlaybackAccessControlInfo, playbackID=%s, cacheKey=%s, requestBody=%v", playbackID, cacheKey, requestBody) allow, gateConfig, err := ac.gateClient.QueryGate(requestBody) refreshInterval := gateConfig.RefreshInterval From 5c1dfad773f2cd5301fa3cb2a87d5495fc58d505 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Leszko?= Date: Tue, 3 Sep 2024 15:41:36 +0200 Subject: [PATCH 3/9] Add more debug --- handlers/accesscontrol/access-control.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/handlers/accesscontrol/access-control.go b/handlers/accesscontrol/access-control.go index 0c3a6a898..f5ff5d04d 100644 --- a/handlers/accesscontrol/access-control.go +++ b/handlers/accesscontrol/access-control.go @@ -308,6 +308,7 @@ func (ac *AccessControlHandlersCollection) isAuthorized(ctx context.Context, pla // checkViewerLimit is used to limit viewers per user globally (as configured with Gate API) func (ac *AccessControlHandlersCollection) checkViewerLimit(playbackID string) bool { + glog.Infof("checkViewerLimit playbackID=%s", playbackID) viewerLimitCache.mux.RLock() defer viewerLimitCache.mux.RUnlock() @@ -338,6 +339,7 @@ func (ac *AccessControlHandlersCollection) checkViewerLimit(playbackID string) b } func (ac *AccessControlHandlersCollection) refreshConcurrentViewerCache(playbackID string) { + glog.Infof("refreshConcurrentViewerCache playbackID=%s", playbackID) viewerLimitCache.mux.RLock() viewerLimit, ok := viewerLimitCache.data[playbackID] viewerLimitCache.mux.RUnlock() From 4bc1527c7a39b749aeb13d37029bb2cf3a732e74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Leszko?= Date: Wed, 4 Sep 2024 08:58:52 +0200 Subject: [PATCH 4/9] Invalidate sessions outside lock since it's an HTTP operation --- handlers/accesscontrol/access-control.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/handlers/accesscontrol/access-control.go b/handlers/accesscontrol/access-control.go index f5ff5d04d..b10e14226 100644 --- a/handlers/accesscontrol/access-control.go +++ b/handlers/accesscontrol/access-control.go @@ -124,18 +124,23 @@ func (ac *AccessControlHandlersCollection) periodicRefreshIntervalCache(mapic mi time.Sleep(5 * time.Second) ac.mutex.Lock() refreshIntervalCache.mux.Lock() + var keysToInvalidate []string for key := range refreshIntervalCache.data { if time.Since(refreshIntervalCache.data[key].LastRefresh) > time.Duration(refreshIntervalCache.data[key].RefreshInterval)*time.Second { refreshIntervalCache.data[key].LastRefresh = time.Now() - mapic.InvalidateAllSessions(key) + keysToInvalidate = append(keysToInvalidate, key) for cachedAccessKey := range ac.cache[key] { delete(ac.cache[key], cachedAccessKey) } break } } - ac.mutex.Unlock() refreshIntervalCache.mux.Unlock() + ac.mutex.Unlock() + + for _, key := range keysToInvalidate { + mapic.InvalidateAllSessions(key) + } } }() } From 513225a7fbb373ac2fce372ab214d1d8451ef748 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Leszko?= Date: Wed, 4 Sep 2024 09:13:24 +0200 Subject: [PATCH 5/9] Add even more logging --- clients/mist_client.go | 1 + handlers/accesscontrol/access-control.go | 4 ++++ mapic/mistapiconnector_app.go | 2 ++ 3 files changed, 7 insertions(+) diff --git a/clients/mist_client.go b/clients/mist_client.go index 314be7f25..d881ccc94 100644 --- a/clients/mist_client.go +++ b/clients/mist_client.go @@ -393,6 +393,7 @@ func (mc *MistClient) authorize(unauthResp string) error { } func (mc *MistClient) sendCommandToMist(command interface{}) (string, error) { + glog.Infof("Sending command to Mist") c, err := commandToString(command) if err != nil { return "", err diff --git a/handlers/accesscontrol/access-control.go b/handlers/accesscontrol/access-control.go index b10e14226..dc5842160 100644 --- a/handlers/accesscontrol/access-control.go +++ b/handlers/accesscontrol/access-control.go @@ -388,6 +388,7 @@ func (ac *AccessControlHandlersCollection) refreshConcurrentViewerCache(playback func (ac *AccessControlHandlersCollection) GetPlaybackAccessControlInfo(ctx context.Context, playbackID, cacheKey string, requestBody []byte) (bool, error) { glog.Infof("GetPlaybackAccessControlInfo playbackID=%s, cacheKey=%s, requestBody=%v", playbackID, cacheKey, requestBody) ac.mutex.RLock() + glog.Infof("ac.mutex.RLock()") entry := ac.cache[playbackID][cacheKey] ac.mutex.RUnlock() @@ -420,6 +421,7 @@ func (ac *AccessControlHandlersCollection) GetPlaybackAccessControlInfo(ctx cont } ac.mutex.RLock() + glog.Infof("ac.mutex.RLock() 2") entry = ac.cache[playbackID][cacheKey] ac.mutex.RUnlock() @@ -440,10 +442,12 @@ func (ac *AccessControlHandlersCollection) ProduceHashCacheKey(cachePayload Play } func isExpired(entry *PlaybackAccessControlEntry) bool { + glog.Infof("isExpired") return entry == nil || time.Now().After(entry.Stale) } func isStale(entry *PlaybackAccessControlEntry) bool { + glog.Infof("isStale") return entry != nil && time.Now().After(entry.MaxAge) && !isExpired(entry) } diff --git a/mapic/mistapiconnector_app.go b/mapic/mistapiconnector_app.go index a6ed86771..dbc12eb64 100644 --- a/mapic/mistapiconnector_app.go +++ b/mapic/mistapiconnector_app.go @@ -224,7 +224,9 @@ func (mc *mac) StopSessions(playbackID string) { } func (mc *mac) InvalidateAllSessions(playbackID string) { + glog.Infof("Invalidating Mist Sessions") mc.invalidateAllSessions(playbackID) + glog.Infof("Done Invalidating Mist Sessions") } func (mc *mac) handleStreamBuffer(ctx context.Context, payload *misttriggers.StreamBufferPayload) error { From fa0cd052ba8073940251e59beee37c8ff9470112 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Leszko?= Date: Wed, 4 Sep 2024 09:26:14 +0200 Subject: [PATCH 6/9] Remove arm64 build --- .github/workflows/docker.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/docker.yaml b/.github/workflows/docker.yaml index ec3fc1fda..85f9803f8 100644 --- a/.github/workflows/docker.yaml +++ b/.github/workflows/docker.yaml @@ -63,7 +63,7 @@ jobs: - name: Build and push uses: docker/build-push-action@v4 with: - platforms: linux/amd64, linux/arm64 + platforms: linux/amd64 push: true build-args: | GIT_VERSION=${{ github.ref_type == 'tag' && github.ref_name || github.event.pull_request.head.sha || github.sha }} From 29a33f5ed6e55e23529e81beba115b769260f3b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Leszko?= Date: Wed, 4 Sep 2024 09:42:36 +0200 Subject: [PATCH 7/9] Add even more logging --- clients/mist_client.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/clients/mist_client.go b/clients/mist_client.go index d881ccc94..7fc6c135c 100644 --- a/clients/mist_client.go +++ b/clients/mist_client.go @@ -404,15 +404,18 @@ func (mc *MistClient) sendCommandToMist(command interface{}) (string, error) { return "", err } req.Header.Add("Content-Type", "application/x-www-form-urlencoded") + glog.Infof("Prepared command to send, sedning") resp, err := metrics.MonitorRequest(metrics.Metrics.MistClient, mistRetryableClient, req) if err != nil { return "", err } + glog.Infof("Done Sending command to Mist") defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { return "", err } + glog.Infof("Sending command to Mist 2") return string(body), err } From 54c81eb0b4ac7f0d461203ee81b97240547d918b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Leszko?= Date: Wed, 4 Sep 2024 10:38:37 +0200 Subject: [PATCH 8/9] Add even more logging --- metrics/monitor_request.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/metrics/monitor_request.go b/metrics/monitor_request.go index 537a1a24b..59b7a341e 100644 --- a/metrics/monitor_request.go +++ b/metrics/monitor_request.go @@ -3,6 +3,7 @@ package metrics import ( "context" "fmt" + "github.com/golang/glog" "net/http" "time" @@ -20,11 +21,14 @@ func MonitorRequest(clientMetrics ClientMetrics, client *http.Client, r *http.Re req := r.WithContext(ctx) start := time.Now() + glog.Infof("client.Do(req)") res, err := client.Do(req) + glog.Infof("Done client.Do(req), err=%v", err) duration := time.Since(start) retries := ctx.Value(RetriesKey).(*Retries) if retries.lastStatusCode >= 400 { + glog.Infof("retries.lastStatusCode >= 400") clientMetrics.FailureCount.WithLabelValues(req.URL.Host, fmt.Sprint(retries.lastStatusCode)).Inc() return res, err } @@ -32,6 +36,7 @@ func MonitorRequest(clientMetrics ClientMetrics, client *http.Client, r *http.Re clientMetrics.RequestDuration.WithLabelValues(req.URL.Host).Observe(duration.Seconds()) clientMetrics.RetryCount.WithLabelValues(req.URL.Host).Set(float64(retries.count)) + glog.Infof("return res, err") return res, err } From f6ce4831a5e40094dbf8960ea831007e50262c93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Leszko?= Date: Wed, 4 Sep 2024 10:43:17 +0200 Subject: [PATCH 9/9] Add even more logging --- metrics/monitor_request.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/metrics/monitor_request.go b/metrics/monitor_request.go index 59b7a341e..441a3a9f2 100644 --- a/metrics/monitor_request.go +++ b/metrics/monitor_request.go @@ -41,6 +41,7 @@ func MonitorRequest(clientMetrics ClientMetrics, client *http.Client, r *http.Re } func HttpRetryHook(ctx context.Context, res *http.Response, err error) (bool, error) { + glog.Infof("HttpRetryHook") retries := ctx.Value(RetriesKey).(*Retries) if res == nil { // TODO: have a better way to represent closed/refused connections and timeouts @@ -52,5 +53,6 @@ func HttpRetryHook(ctx context.Context, res *http.Response, err error) (bool, er } retries.count++ + glog.Infof("Done HttpRetryHook") return retryablehttp.DefaultRetryPolicy(ctx, res, err) }