diff --git a/handlers/analytics/log_ext_data.go b/handlers/analytics/log_ext_data.go index 79bbe848..21348988 100644 --- a/handlers/analytics/log_ext_data.go +++ b/handlers/analytics/log_ext_data.go @@ -23,15 +23,16 @@ type ExternalData struct { type ExternalDataFetcher struct { streamCache mistapiconnector.IStreamCache lapi *api.Client - - cache map[string]ExternalData - mu sync.RWMutex + lapiCached *mistapiconnector.ApiClientCached + cache map[string]ExternalData + mu sync.RWMutex } func NewExternalDataFetcher(streamCache mistapiconnector.IStreamCache, lapi *api.Client) *ExternalDataFetcher { return &ExternalDataFetcher{ streamCache: streamCache, lapi: lapi, + lapiCached: mistapiconnector.NewApiClientCached(lapi), cache: make(map[string]ExternalData), } } @@ -62,7 +63,7 @@ func (e *ExternalDataFetcher) Fetch(playbackID string) (ExternalData, error) { } // Not found in any cache, try querying Studio API to get Stream - stream, streamErr := e.lapi.GetStreamByPlaybackID(playbackID) + stream, streamErr := e.lapiCached.GetStreamByPlaybackID(playbackID) if streamErr == nil { return e.extDataFromStream(playbackID, stream) } diff --git a/handlers/geolocation/geolocation.go b/handlers/geolocation/geolocation.go index b6ce64f4..181634e2 100644 --- a/handlers/geolocation/geolocation.go +++ b/handlers/geolocation/geolocation.go @@ -23,6 +23,7 @@ import ( "github.com/livepeer/catalyst-api/cluster" "github.com/livepeer/catalyst-api/config" "github.com/livepeer/catalyst-api/handlers/misttriggers" + mistapiconnector "github.com/livepeer/catalyst-api/mapic" "github.com/livepeer/catalyst-api/metrics" "github.com/livepeer/go-api-client" ) @@ -70,6 +71,7 @@ type GeolocationHandlersCollection struct { Cluster cluster.Cluster Config config.Cli Lapi *api.Client + LapiCached *mistapiconnector.ApiClientCached streamPullRateLimit *streamPullRateLimit } @@ -79,6 +81,7 @@ func NewGeolocationHandlersCollection(balancer balancer.Balancer, cluster cluste Cluster: cluster, Config: config, Lapi: lapi, + LapiCached: mistapiconnector.NewApiClientCached(lapi), streamPullRateLimit: newStreamPullRateLimit(streamSourceRetryInterval), } } @@ -350,7 +353,7 @@ func (c *GeolocationHandlersCollection) getStreamPull(playbackID string, retryCo return "", errRateLimit } - stream, err := c.Lapi.GetStreamByPlaybackID(playbackID) + stream, err := c.LapiCached.GetStreamByPlaybackID(playbackID) if err != nil { return "", fmt.Errorf("failed to get stream to check stream pull: %w", err) } diff --git a/mapic/mistapiconnector_app.go b/mapic/mistapiconnector_app.go index feec9887..bacb4bcd 100644 --- a/mapic/mistapiconnector_app.go +++ b/mapic/mistapiconnector_app.go @@ -93,6 +93,7 @@ type ( ctx context.Context cancel context.CancelFunc lapi *api.Client + lapiCached *ApiClientCached balancerHost string mu sync.RWMutex mistHot string @@ -131,6 +132,7 @@ func (mc *mac) Start(ctx context.Context) error { AccessToken: mc.config.APIToken, }) mc.lapi = lapi + mc.lapiCached = NewApiClientCached(lapi) if mc.balancerHost != "" && !strings.Contains(mc.balancerHost, ":") { mc.balancerHost = mc.balancerHost + ":8042" // must set default port for Mist's Load Balancer @@ -781,7 +783,7 @@ func (mc *mac) getStreamInfo(playbackID string) (*streamInfo, error) { func (mc *mac) refreshStreamInfo(playbackID string) (*streamInfo, error) { glog.Infof("Refreshing stream info for playbackID=%s", playbackID) - stream, err := mc.lapi.GetStreamByPlaybackID(playbackID) + stream, err := mc.lapiCached.GetStreamByPlaybackID(playbackID) if err != nil { return nil, fmt.Errorf("error getting stream by playback ID %s: %w", playbackID, err) } diff --git a/mapic/utils.go b/mapic/utils.go new file mode 100644 index 00000000..cb7c92c8 --- /dev/null +++ b/mapic/utils.go @@ -0,0 +1,58 @@ +package mistapiconnector + +import ( + "github.com/livepeer/go-api-client" + "sync" + "time" +) + +const lapiCacheDuration = 1 * time.Second + +type ApiClientCached struct { + lapi *api.Client + streamCache map[string]entry + mu sync.RWMutex + ttl time.Duration +} + +func NewApiClientCached(lapi *api.Client) *ApiClientCached { + return &ApiClientCached{ + lapi: lapi, + streamCache: make(map[string]entry), + ttl: lapiCacheDuration, + } +} + +type entry struct { + stream *api.Stream + err error + updateAt time.Time +} + +func (a *ApiClientCached) GetStreamByPlaybackID(playbackId string) (*api.Stream, error) { + a.mu.RLock() + e, ok := a.streamCache[playbackId] + if ok && e.stream != nil && e.updateAt.Add(a.ttl).After(time.Now()) { + // Use cached value + a.mu.RUnlock() + return e.stream, e.err + } + a.mu.RUnlock() + + // Value not cached or expired + a.mu.Lock() + defer a.mu.Unlock() + // Check again in case another goroutine has updated the cache in the meantime + e, ok = a.streamCache[playbackId] + if ok && e.stream != nil && e.updateAt.Add(a.ttl).After(time.Now()) { + return e.stream, e.err + } + // No value in the cache, fetch from Livepeer API and store the result in a cache + stream, err := a.lapi.GetStreamByPlaybackID(playbackId) + a.streamCache[playbackId] = entry{ + stream: stream, + err: err, + updateAt: time.Now(), + } + return stream, err +} diff --git a/mapic/utils_test.go b/mapic/utils_test.go new file mode 100644 index 00000000..c99a213d --- /dev/null +++ b/mapic/utils_test.go @@ -0,0 +1,57 @@ +package mistapiconnector + +import ( + "encoding/json" + "github.com/livepeer/go-api-client" + require2 "github.com/stretchr/testify/require" + "net/http" + "net/http/httptest" + "testing" + "time" +) + +func TestGetStreamByPlaybackID(t *testing.T) { + require := require2.New(t) + + // given + playbackId := "some-playback-id" + stubStream := &api.Stream{ + ID: "123456", + } + var hits int + + lapiServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + hits++ + err := json.NewEncoder(w).Encode(stubStream) + if err != nil { + t.Fail() + } + })) + defer lapiServer.Close() + + lapi, _ := api.NewAPIClientGeolocated(api.ClientOptions{ + Server: lapiServer.URL, + }) + cachedClient := NewApiClientCached(lapi) + + // when first call to Livepeer API + s, err := cachedClient.GetStreamByPlaybackID(playbackId) + require.NoError(err) + require.Equal(stubStream, s) + require.Equal(1, hits) + + // when multiple time the same call within 1 second + for i := 0; i < 10; i++ { + s, err = cachedClient.GetStreamByPlaybackID(playbackId) + require.NoError(err) + require.Equal(stubStream, s) + require.Equal(1, hits) + } + + // when ttl is expired + time.Sleep(1 * time.Second) + s, err = cachedClient.GetStreamByPlaybackID(playbackId) + require.NoError(err) + require.Equal(stubStream, s) + require.Equal(2, hits) +}