From 7af70c9f28207300b4eef1bfcb32e3214332ab65 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Leszko?= Date: Wed, 24 Jul 2024 09:39:13 +0200 Subject: [PATCH] Revert "Add 1s cache for lapi.GetStreamByPlaybackID()" (#1342) This reverts commit a45d981d2886828d5aade2a8feecf8b39afe6d2b. --- handlers/analytics/log_ext_data.go | 9 ++--- handlers/geolocation/geolocation.go | 5 +-- mapic/mistapiconnector_app.go | 4 +- mapic/utils.go | 58 ----------------------------- mapic/utils_test.go | 57 ---------------------------- 5 files changed, 6 insertions(+), 127 deletions(-) delete mode 100644 mapic/utils.go delete mode 100644 mapic/utils_test.go diff --git a/handlers/analytics/log_ext_data.go b/handlers/analytics/log_ext_data.go index 213489888..79bbe8485 100644 --- a/handlers/analytics/log_ext_data.go +++ b/handlers/analytics/log_ext_data.go @@ -23,16 +23,15 @@ type ExternalData struct { type ExternalDataFetcher struct { streamCache mistapiconnector.IStreamCache lapi *api.Client - lapiCached *mistapiconnector.ApiClientCached - cache map[string]ExternalData - mu sync.RWMutex + + cache map[string]ExternalData + mu sync.RWMutex } func NewExternalDataFetcher(streamCache mistapiconnector.IStreamCache, lapi *api.Client) *ExternalDataFetcher { return &ExternalDataFetcher{ streamCache: streamCache, lapi: lapi, - lapiCached: mistapiconnector.NewApiClientCached(lapi), cache: make(map[string]ExternalData), } } @@ -63,7 +62,7 @@ func (e *ExternalDataFetcher) Fetch(playbackID string) (ExternalData, error) { } // Not found in any cache, try querying Studio API to get Stream - stream, streamErr := e.lapiCached.GetStreamByPlaybackID(playbackID) + stream, streamErr := e.lapi.GetStreamByPlaybackID(playbackID) if streamErr == nil { return e.extDataFromStream(playbackID, stream) } diff --git a/handlers/geolocation/geolocation.go b/handlers/geolocation/geolocation.go index 181634e2d..b6ce64f4d 100644 --- a/handlers/geolocation/geolocation.go +++ b/handlers/geolocation/geolocation.go @@ -23,7 +23,6 @@ import ( "github.com/livepeer/catalyst-api/cluster" "github.com/livepeer/catalyst-api/config" "github.com/livepeer/catalyst-api/handlers/misttriggers" - mistapiconnector "github.com/livepeer/catalyst-api/mapic" "github.com/livepeer/catalyst-api/metrics" "github.com/livepeer/go-api-client" ) @@ -71,7 +70,6 @@ type GeolocationHandlersCollection struct { Cluster cluster.Cluster Config config.Cli Lapi *api.Client - LapiCached *mistapiconnector.ApiClientCached streamPullRateLimit *streamPullRateLimit } @@ -81,7 +79,6 @@ func NewGeolocationHandlersCollection(balancer balancer.Balancer, cluster cluste Cluster: cluster, Config: config, Lapi: lapi, - LapiCached: mistapiconnector.NewApiClientCached(lapi), streamPullRateLimit: newStreamPullRateLimit(streamSourceRetryInterval), } } @@ -353,7 +350,7 @@ func (c *GeolocationHandlersCollection) getStreamPull(playbackID string, retryCo return "", errRateLimit } - stream, err := c.LapiCached.GetStreamByPlaybackID(playbackID) + stream, err := c.Lapi.GetStreamByPlaybackID(playbackID) if err != nil { return "", fmt.Errorf("failed to get stream to check stream pull: %w", err) } diff --git a/mapic/mistapiconnector_app.go b/mapic/mistapiconnector_app.go index bacb4bcd0..feec9887b 100644 --- a/mapic/mistapiconnector_app.go +++ b/mapic/mistapiconnector_app.go @@ -93,7 +93,6 @@ type ( ctx context.Context cancel context.CancelFunc lapi *api.Client - lapiCached *ApiClientCached balancerHost string mu sync.RWMutex mistHot string @@ -132,7 +131,6 @@ func (mc *mac) Start(ctx context.Context) error { AccessToken: mc.config.APIToken, }) mc.lapi = lapi - mc.lapiCached = NewApiClientCached(lapi) if mc.balancerHost != "" && !strings.Contains(mc.balancerHost, ":") { mc.balancerHost = mc.balancerHost + ":8042" // must set default port for Mist's Load Balancer @@ -783,7 +781,7 @@ func (mc *mac) getStreamInfo(playbackID string) (*streamInfo, error) { func (mc *mac) refreshStreamInfo(playbackID string) (*streamInfo, error) { glog.Infof("Refreshing stream info for playbackID=%s", playbackID) - stream, err := mc.lapiCached.GetStreamByPlaybackID(playbackID) + stream, err := mc.lapi.GetStreamByPlaybackID(playbackID) if err != nil { return nil, fmt.Errorf("error getting stream by playback ID %s: %w", playbackID, err) } diff --git a/mapic/utils.go b/mapic/utils.go deleted file mode 100644 index cb7c92c8e..000000000 --- a/mapic/utils.go +++ /dev/null @@ -1,58 +0,0 @@ -package mistapiconnector - -import ( - "github.com/livepeer/go-api-client" - "sync" - "time" -) - -const lapiCacheDuration = 1 * time.Second - -type ApiClientCached struct { - lapi *api.Client - streamCache map[string]entry - mu sync.RWMutex - ttl time.Duration -} - -func NewApiClientCached(lapi *api.Client) *ApiClientCached { - return &ApiClientCached{ - lapi: lapi, - streamCache: make(map[string]entry), - ttl: lapiCacheDuration, - } -} - -type entry struct { - stream *api.Stream - err error - updateAt time.Time -} - -func (a *ApiClientCached) GetStreamByPlaybackID(playbackId string) (*api.Stream, error) { - a.mu.RLock() - e, ok := a.streamCache[playbackId] - if ok && e.stream != nil && e.updateAt.Add(a.ttl).After(time.Now()) { - // Use cached value - a.mu.RUnlock() - return e.stream, e.err - } - a.mu.RUnlock() - - // Value not cached or expired - a.mu.Lock() - defer a.mu.Unlock() - // Check again in case another goroutine has updated the cache in the meantime - e, ok = a.streamCache[playbackId] - if ok && e.stream != nil && e.updateAt.Add(a.ttl).After(time.Now()) { - return e.stream, e.err - } - // No value in the cache, fetch from Livepeer API and store the result in a cache - stream, err := a.lapi.GetStreamByPlaybackID(playbackId) - a.streamCache[playbackId] = entry{ - stream: stream, - err: err, - updateAt: time.Now(), - } - return stream, err -} diff --git a/mapic/utils_test.go b/mapic/utils_test.go deleted file mode 100644 index c99a213d3..000000000 --- a/mapic/utils_test.go +++ /dev/null @@ -1,57 +0,0 @@ -package mistapiconnector - -import ( - "encoding/json" - "github.com/livepeer/go-api-client" - require2 "github.com/stretchr/testify/require" - "net/http" - "net/http/httptest" - "testing" - "time" -) - -func TestGetStreamByPlaybackID(t *testing.T) { - require := require2.New(t) - - // given - playbackId := "some-playback-id" - stubStream := &api.Stream{ - ID: "123456", - } - var hits int - - lapiServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - hits++ - err := json.NewEncoder(w).Encode(stubStream) - if err != nil { - t.Fail() - } - })) - defer lapiServer.Close() - - lapi, _ := api.NewAPIClientGeolocated(api.ClientOptions{ - Server: lapiServer.URL, - }) - cachedClient := NewApiClientCached(lapi) - - // when first call to Livepeer API - s, err := cachedClient.GetStreamByPlaybackID(playbackId) - require.NoError(err) - require.Equal(stubStream, s) - require.Equal(1, hits) - - // when multiple time the same call within 1 second - for i := 0; i < 10; i++ { - s, err = cachedClient.GetStreamByPlaybackID(playbackId) - require.NoError(err) - require.Equal(stubStream, s) - require.Equal(1, hits) - } - - // when ttl is expired - time.Sleep(1 * time.Second) - s, err = cachedClient.GetStreamByPlaybackID(playbackId) - require.NoError(err) - require.Equal(stubStream, s) - require.Equal(2, hits) -}