Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logging to debug issues with /mist/trigger proxying #1375

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/docker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ jobs:
- name: Build and push
uses: docker/build-push-action@v4
with:
platforms: linux/amd64, linux/arm64
platforms: linux/amd64
push: true
build-args: |
GIT_VERSION=${{ github.ref_type == 'tag' && github.ref_name || github.event.pull_request.head.sha || github.sha }}
Expand Down
4 changes: 4 additions & 0 deletions clients/mist_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ func (mc *MistClient) authorize(unauthResp string) error {
}

func (mc *MistClient) sendCommandToMist(command interface{}) (string, error) {
glog.Infof("Sending command to Mist")
c, err := commandToString(command)
if err != nil {
return "", err
Expand All @@ -403,15 +404,18 @@ func (mc *MistClient) sendCommandToMist(command interface{}) (string, error) {
return "", err
}
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
glog.Infof("Prepared command to send, sedning")
resp, err := metrics.MonitorRequest(metrics.Metrics.MistClient, mistRetryableClient, req)
if err != nil {
return "", err
}
glog.Infof("Done Sending command to Mist")
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}
glog.Infof("Sending command to Mist 2")
return string(body), err
}

Expand Down
28 changes: 26 additions & 2 deletions handlers/accesscontrol/access-control.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,18 +124,23 @@ func (ac *AccessControlHandlersCollection) periodicRefreshIntervalCache(mapic mi
time.Sleep(5 * time.Second)
ac.mutex.Lock()
refreshIntervalCache.mux.Lock()
var keysToInvalidate []string
for key := range refreshIntervalCache.data {
if time.Since(refreshIntervalCache.data[key].LastRefresh) > time.Duration(refreshIntervalCache.data[key].RefreshInterval)*time.Second {
refreshIntervalCache.data[key].LastRefresh = time.Now()
mapic.InvalidateAllSessions(key)
keysToInvalidate = append(keysToInvalidate, key)
for cachedAccessKey := range ac.cache[key] {
delete(ac.cache[key], cachedAccessKey)
}
break
}
}
ac.mutex.Unlock()
refreshIntervalCache.mux.Unlock()
ac.mutex.Unlock()

for _, key := range keysToInvalidate {
mapic.InvalidateAllSessions(key)
}
}
}()
}
Expand Down Expand Up @@ -170,6 +175,7 @@ func NewAccessControlHandlersCollection(cli config.Cli, mapic mistapiconnector.I
}

func (ac *AccessControlHandlersCollection) HandleUserNew(ctx context.Context, payload *misttriggers.UserNewPayload) (bool, error) {
glog.Infof("Handling USER_NEW trigger payload=%v", payload)
playbackID := payload.StreamName[strings.Index(payload.StreamName, "+")+1:]
ctx = log.WithLogValues(ctx, "playback_id", playbackID)

Expand All @@ -189,6 +195,7 @@ func (ac *AccessControlHandlersCollection) HandleUserNew(ctx context.Context, pa

func (ac *AccessControlHandlersCollection) IsAuthorized(ctx context.Context, playbackID string, payload *misttriggers.UserNewPayload) (allowed bool, err error) {

glog.Infof("Handling IsAuthorized trigger playbackID=%s, payload=%v", playbackID, payload)
if payload.Origin == "null" && payload.Referer == "" {
// Allow redirects without caching
match, _ := regexp.MatchString(`(?:prod|staging)-.*catalyst-\d+`, payload.Host)
Expand Down Expand Up @@ -216,10 +223,12 @@ func (ac *AccessControlHandlersCollection) IsAuthorized(ctx context.Context, pla
Inc()
}()
allowed, err = ac.isAuthorized(ctx, playbackID, payload)
glog.Infof("isAuthorized handled playbackID=%s, allowed=%v, err=%v", playbackID, allowed, err)
return
}

func (ac *AccessControlHandlersCollection) isAuthorized(ctx context.Context, playbackID string, payload *misttriggers.UserNewPayload) (bool, error) {
glog.Infof("Handling AccessControlHandlersCollection.isAuthorized() trigger payload=%v", payload)
webhookHeaders := make(map[string]string)

webhookHeaders["User-Agent"] = payload.UserAgent
Expand Down Expand Up @@ -263,6 +272,7 @@ func (ac *AccessControlHandlersCollection) isAuthorized(ctx context.Context, pla
return false, err
}
cacheKey = "accessKey_" + hashCacheKey
glog.Infof("Produced cacheKey from accessKey: %s", cacheKey)
} else if jwt != "" {
for _, blocked := range ac.blockedJWTs {
if jwt == blocked {
Expand All @@ -284,6 +294,7 @@ func (ac *AccessControlHandlersCollection) isAuthorized(ctx context.Context, pla
return false, err
}
cacheKey = "jwtPubKey_" + hashCacheKey
glog.Infof("Produced cacheKey from JWT: %s", cacheKey)
}

body, err := json.Marshal(acReq)
Expand All @@ -302,6 +313,7 @@ func (ac *AccessControlHandlersCollection) isAuthorized(ctx context.Context, pla

// checkViewerLimit is used to limit viewers per user globally (as configured with Gate API)
func (ac *AccessControlHandlersCollection) checkViewerLimit(playbackID string) bool {
glog.Infof("checkViewerLimit playbackID=%s", playbackID)
viewerLimitCache.mux.RLock()
defer viewerLimitCache.mux.RUnlock()

Expand Down Expand Up @@ -332,6 +344,7 @@ func (ac *AccessControlHandlersCollection) checkViewerLimit(playbackID string) b
}

func (ac *AccessControlHandlersCollection) refreshConcurrentViewerCache(playbackID string) {
glog.Infof("refreshConcurrentViewerCache playbackID=%s", playbackID)
viewerLimitCache.mux.RLock()
viewerLimit, ok := viewerLimitCache.data[playbackID]
viewerLimitCache.mux.RUnlock()
Expand Down Expand Up @@ -373,34 +386,42 @@ func (ac *AccessControlHandlersCollection) refreshConcurrentViewerCache(playback
}

func (ac *AccessControlHandlersCollection) GetPlaybackAccessControlInfo(ctx context.Context, playbackID, cacheKey string, requestBody []byte) (bool, error) {
glog.Infof("GetPlaybackAccessControlInfo playbackID=%s, cacheKey=%s, requestBody=%v", playbackID, cacheKey, requestBody)
ac.mutex.RLock()
glog.Infof("ac.mutex.RLock()")
entry := ac.cache[playbackID][cacheKey]
ac.mutex.RUnlock()

if isExpired(entry) {
log.V(7).LogCtx(ctx, "Cache expired",
"cache_key", cacheKey)
glog.Infof("Cache expired, calling cachePlaybackAccessControlInfo")
err := ac.cachePlaybackAccessControlInfo(playbackID, cacheKey, requestBody)
if err != nil {
return false, err
}
glog.Infof("Cache expired, cached new playback access control info")
} else if isStale(entry) {
log.V(7).LogCtx(ctx, "Cache stale",
"cache_key", cacheKey)
glog.Infof("Cache stale")
go func() {
ac.mutex.RLock()
stillStale := isStale(ac.cache[playbackID][cacheKey])
ac.mutex.RUnlock()
if stillStale {
glog.Infof("Cache still stale, calling cachePlaybackAccessControlInfo")
err := ac.cachePlaybackAccessControlInfo(playbackID, cacheKey, requestBody)
if err != nil {
log.LogCtx(ctx, "Error caching playback access control info", "err", err)
}
glog.Infof("Cache still stale, cached new playback access control info")
}
}()
}

ac.mutex.RLock()
glog.Infof("ac.mutex.RLock() 2")
entry = ac.cache[playbackID][cacheKey]
ac.mutex.RUnlock()

Expand All @@ -421,14 +442,17 @@ func (ac *AccessControlHandlersCollection) ProduceHashCacheKey(cachePayload Play
}

func isExpired(entry *PlaybackAccessControlEntry) bool {
glog.Infof("isExpired")
return entry == nil || time.Now().After(entry.Stale)
}

func isStale(entry *PlaybackAccessControlEntry) bool {
glog.Infof("isStale")
return entry != nil && time.Now().After(entry.MaxAge) && !isExpired(entry)
}

func (ac *AccessControlHandlersCollection) cachePlaybackAccessControlInfo(playbackID, cacheKey string, requestBody []byte) error {
glog.Infof("cachePlaybackAccessControlInfo, playbackID=%s, cacheKey=%s, requestBody=%v", playbackID, cacheKey, requestBody)
allow, gateConfig, err := ac.gateClient.QueryGate(requestBody)

refreshInterval := gateConfig.RefreshInterval
Expand Down
2 changes: 2 additions & 0 deletions handlers/misttriggers/user_new.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ func (d *MistCallbackHandlersCollection) TriggerUserNew(ctx context.Context, w h
payload, err := ParseUserNewPayload(body)
cookies := req.Cookies()

glog.Infof("Got USER_NEW trigger sessionId=%q payload=%v", payload.SessionID, body)

var accessKey, jwt string
for _, cookie := range cookies {
switch cookie.Name {
Expand Down
2 changes: 2 additions & 0 deletions mapic/mistapiconnector_app.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,9 @@ func (mc *mac) StopSessions(playbackID string) {
}

func (mc *mac) InvalidateAllSessions(playbackID string) {
glog.Infof("Invalidating Mist Sessions")
mc.invalidateAllSessions(playbackID)
glog.Infof("Done Invalidating Mist Sessions")
}

func (mc *mac) handleStreamBuffer(ctx context.Context, payload *misttriggers.StreamBufferPayload) error {
Expand Down
7 changes: 7 additions & 0 deletions metrics/monitor_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package metrics
import (
"context"
"fmt"
"github.com/golang/glog"
"net/http"
"time"

Expand All @@ -20,22 +21,27 @@ func MonitorRequest(clientMetrics ClientMetrics, client *http.Client, r *http.Re
req := r.WithContext(ctx)

start := time.Now()
glog.Infof("client.Do(req)")
res, err := client.Do(req)
glog.Infof("Done client.Do(req), err=%v", err)
duration := time.Since(start)

retries := ctx.Value(RetriesKey).(*Retries)
if retries.lastStatusCode >= 400 {
glog.Infof("retries.lastStatusCode >= 400")
clientMetrics.FailureCount.WithLabelValues(req.URL.Host, fmt.Sprint(retries.lastStatusCode)).Inc()
return res, err
}

clientMetrics.RequestDuration.WithLabelValues(req.URL.Host).Observe(duration.Seconds())
clientMetrics.RetryCount.WithLabelValues(req.URL.Host).Set(float64(retries.count))

glog.Infof("return res, err")
return res, err
}

func HttpRetryHook(ctx context.Context, res *http.Response, err error) (bool, error) {
glog.Infof("HttpRetryHook")
retries := ctx.Value(RetriesKey).(*Retries)
if res == nil {
// TODO: have a better way to represent closed/refused connections and timeouts
Expand All @@ -47,5 +53,6 @@ func HttpRetryHook(ctx context.Context, res *http.Response, err error) (bool, er
}
retries.count++

glog.Infof("Done HttpRetryHook")
return retryablehttp.DefaultRetryPolicy(ctx, res, err)
}
Loading