Skip to content

Commit

Permalink
Add 1s cache for lapi.GetStreamByPlaybackID()
Browse files Browse the repository at this point in the history
It causes the prod failure, because of the cascading failures
  • Loading branch information
leszko committed Jul 23, 2024
1 parent 9a61e38 commit 6169412
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 6 deletions.
9 changes: 5 additions & 4 deletions handlers/analytics/log_ext_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@ type ExternalData struct {
type ExternalDataFetcher struct {
streamCache mistapiconnector.IStreamCache
lapi *api.Client

cache map[string]ExternalData
mu sync.RWMutex
lapiCached *mistapiconnector.ApiClientCached
cache map[string]ExternalData
mu sync.RWMutex
}

func NewExternalDataFetcher(streamCache mistapiconnector.IStreamCache, lapi *api.Client) *ExternalDataFetcher {
return &ExternalDataFetcher{
streamCache: streamCache,
lapi: lapi,
lapiCached: mistapiconnector.NewApiClientCached(lapi),
cache: make(map[string]ExternalData),
}
}
Expand Down Expand Up @@ -62,7 +63,7 @@ func (e *ExternalDataFetcher) Fetch(playbackID string) (ExternalData, error) {
}

// Not found in any cache, try querying Studio API to get Stream
stream, streamErr := e.lapi.GetStreamByPlaybackID(playbackID)
stream, streamErr := e.lapiCached.GetStreamByPlaybackID(playbackID)
if streamErr == nil {
return e.extDataFromStream(playbackID, stream)
}
Expand Down
5 changes: 4 additions & 1 deletion handlers/geolocation/geolocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/livepeer/catalyst-api/cluster"
"github.com/livepeer/catalyst-api/config"
"github.com/livepeer/catalyst-api/handlers/misttriggers"
mistapiconnector "github.com/livepeer/catalyst-api/mapic"
"github.com/livepeer/catalyst-api/metrics"
"github.com/livepeer/go-api-client"
)
Expand Down Expand Up @@ -70,6 +71,7 @@ type GeolocationHandlersCollection struct {
Cluster cluster.Cluster
Config config.Cli
Lapi *api.Client
LapiCached *mistapiconnector.ApiClientCached
streamPullRateLimit *streamPullRateLimit
}

Expand All @@ -79,6 +81,7 @@ func NewGeolocationHandlersCollection(balancer balancer.Balancer, cluster cluste
Cluster: cluster,
Config: config,
Lapi: lapi,
LapiCached: mistapiconnector.NewApiClientCached(lapi),
streamPullRateLimit: newStreamPullRateLimit(streamSourceRetryInterval),
}
}
Expand Down Expand Up @@ -350,7 +353,7 @@ func (c *GeolocationHandlersCollection) getStreamPull(playbackID string, retryCo
return "", errRateLimit
}

stream, err := c.Lapi.GetStreamByPlaybackID(playbackID)
stream, err := c.LapiCached.GetStreamByPlaybackID(playbackID)
if err != nil {
return "", fmt.Errorf("failed to get stream to check stream pull: %w", err)
}
Expand Down
4 changes: 3 additions & 1 deletion mapic/mistapiconnector_app.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ type (
ctx context.Context
cancel context.CancelFunc
lapi *api.Client
lapiCached *ApiClientCached
balancerHost string
mu sync.RWMutex
mistHot string
Expand Down Expand Up @@ -131,6 +132,7 @@ func (mc *mac) Start(ctx context.Context) error {
AccessToken: mc.config.APIToken,
})
mc.lapi = lapi
mc.lapiCached = NewApiClientCached(lapi)

if mc.balancerHost != "" && !strings.Contains(mc.balancerHost, ":") {
mc.balancerHost = mc.balancerHost + ":8042" // must set default port for Mist's Load Balancer
Expand Down Expand Up @@ -781,7 +783,7 @@ func (mc *mac) getStreamInfo(playbackID string) (*streamInfo, error) {
func (mc *mac) refreshStreamInfo(playbackID string) (*streamInfo, error) {
glog.Infof("Refreshing stream info for playbackID=%s", playbackID)

stream, err := mc.lapi.GetStreamByPlaybackID(playbackID)
stream, err := mc.lapiCached.GetStreamByPlaybackID(playbackID)
if err != nil {
return nil, fmt.Errorf("error getting stream by playback ID %s: %w", playbackID, err)
}
Expand Down
58 changes: 58 additions & 0 deletions mapic/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package mistapiconnector

import (
"github.com/livepeer/go-api-client"
"sync"
"time"
)

const lapiCacheDuration = 1 * time.Second

type ApiClientCached struct {
lapi *api.Client
streamCache map[string]entry
mu sync.RWMutex
ttl time.Duration
}

func NewApiClientCached(lapi *api.Client) *ApiClientCached {
return &ApiClientCached{
lapi: lapi,
streamCache: make(map[string]entry),
ttl: lapiCacheDuration,
}
}

type entry struct {
stream *api.Stream
err error
updateAt time.Time
}

func (a *ApiClientCached) GetStreamByPlaybackID(playbackId string) (*api.Stream, error) {
a.mu.RLock()
e, ok := a.streamCache[playbackId]
if ok && e.stream != nil && e.updateAt.Add(a.ttl).After(time.Now()) {
// Use cached value
a.mu.RUnlock()
return e.stream, e.err
}
a.mu.RUnlock()

// Value not cached or expired
a.mu.Lock()
defer a.mu.Unlock()
// Check again in case another goroutine has updated the cache in the meantime
e, ok = a.streamCache[playbackId]
if ok && e.stream != nil && e.updateAt.Add(a.ttl).After(time.Now()) {
return e.stream, e.err
}
// No value in the cache, fetch from Livepeer API and store the result in a cache
stream, err := a.lapi.GetStreamByPlaybackID(playbackId)
a.streamCache[playbackId] = entry{
stream: stream,
err: err,
updateAt: time.Now(),
}
return stream, err
}
57 changes: 57 additions & 0 deletions mapic/utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package mistapiconnector

import (
"encoding/json"
"github.com/livepeer/go-api-client"
require2 "github.com/stretchr/testify/require"
"net/http"
"net/http/httptest"
"testing"
"time"
)

func TestGetStreamByPlaybackID(t *testing.T) {
require := require2.New(t)

// given
playbackId := "some-playback-id"
stubStream := &api.Stream{
ID: "123456",
}
var hits int

lapiServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
hits++
err := json.NewEncoder(w).Encode(stubStream)
if err != nil {
t.Fail()
}
}))
defer lapiServer.Close()

lapi, _ := api.NewAPIClientGeolocated(api.ClientOptions{
Server: lapiServer.URL,
})
cachedClient := NewApiClientCached(lapi)

// when first call to Livepeer API
s, err := cachedClient.GetStreamByPlaybackID(playbackId)
require.NoError(err)
require.Equal(stubStream, s)
require.Equal(1, hits)

// when multiple time the same call within 1 second
for i := 0; i < 10; i++ {
s, err = cachedClient.GetStreamByPlaybackID(playbackId)
require.NoError(err)
require.Equal(stubStream, s)
require.Equal(1, hits)
}

// when ttl is expired
time.Sleep(1 * time.Second)
s, err = cachedClient.GetStreamByPlaybackID(playbackId)
require.NoError(err)
require.Equal(stubStream, s)
require.Equal(2, hits)
}

0 comments on commit 6169412

Please sign in to comment.