diff --git a/pkg/streamcontrol/youtube/youtube.go b/pkg/streamcontrol/youtube/youtube.go index 711b972..65bf838 100644 --- a/pkg/streamcontrol/youtube/youtube.go +++ b/pkg/streamcontrol/youtube/youtube.go @@ -1231,23 +1231,79 @@ type LiveBroadcast = youtube.LiveBroadcast func (yt *YouTube) ListBroadcasts( ctx context.Context, -) ([]*youtube.LiveBroadcast, error) { + limit uint, + continueFunc func(*youtube.LiveBroadcastListResponse) bool, +) (_ret []*youtube.LiveBroadcast, _err error) { + logger.Debugf(ctx, "ListBroadcasts(ctx, %d)", limit) + defer func() { + logger.Debugf(ctx, "/ListBroadcasts(ctx, %d): len(result):%d; err:%v", limit, len(_ret), _err) + }() + + var items []*youtube.LiveBroadcast + var pageToken string + for receivedCount := uint(0); receivedCount < limit; { + maxResults := uint(limit - receivedCount) + if maxResults > 50 { + maxResults = 50 + } + + resp, err := yt.listBroadcastsPage(ctx, maxResults, pageToken) + if err != nil { + return nil, fmt.Errorf("listBroadcastsPage: %w", err) + } + + if len(resp.Items) == 0 { + break + } + + oldCount := receivedCount + pageToken = resp.NextPageToken + receivedCount += uint(len(resp.Items)) + items = append(items, resp.Items...) + + if pageToken == "" { + break + } + if uint(len(resp.Items)) < maxResults { + logger.Errorf(ctx, "received less than expected: %d < %d; breaking the loop", resp.PageInfo.TotalResults, maxResults) + } + if continueFunc != nil && !continueFunc(resp) { + break + } + logger.Debugf(ctx, "ListBroadcasts: count %d -> %d...", oldCount, receivedCount) + } + + return items, nil +} + +func (yt *YouTube) listBroadcastsPage( + ctx context.Context, + limit uint, + pageToken string, +) (*youtube.LiveBroadcastListResponse, error) { + logger.Debugf(ctx, "listBroadcastsPage(ctx, %d, '%s')", limit, pageToken) + if limit > 50 { + return nil, fmt.Errorf("one page may return only 50 items max, see 'maxResults' in https://developers.google.com/youtube/v3/live/docs/liveBroadcasts/list") + } counter := 0 for { - response, err := yt.YouTubeService. + query := yt.YouTubeService. LiveBroadcasts. List([]string{"id", "snippet", "contentDetails", "monetizationDetails", "status"}). Mine(true). - MaxResults(1000). - Context(ctx).Do() - logger.Debugf(ctx, "YouTube.LiveBroadcasts result: %v", err) + MaxResults(int64(limit)) + if pageToken != "" { + query = query.PageToken(pageToken) + } + response, err := query.Context(ctx).Do() + logger.Debugf(ctx, "YouTube.LiveBroadcasts result: %v (counter: %d)", err, counter) if err != nil { if yt.fixError(ctx, err, &counter) { continue } return nil, fmt.Errorf("unable to query the list of broadcasts: %w", err) } - return response.Items, nil + return response, nil } } diff --git a/pkg/streamd/streamd.go b/pkg/streamd/streamd.go index 5ab6a9f..42a816e 100644 --- a/pkg/streamd/streamd.go +++ b/pkg/streamd/streamd.go @@ -415,47 +415,6 @@ func (d *StreamD) normalizeTwitchData() { }) } -func (d *StreamD) initYoutubeData(ctx context.Context) bool { - logger.FromCtx(ctx).Debugf("initializing Youtube data") - defer logger.FromCtx(ctx).Debugf("endof initializing Youtube data") - - if c := len(d.Cache.Youtube.Broadcasts); c != 0 { - logger.FromCtx(ctx).Debugf("already have broadcasts (count: %d)", c) - return false - } - - youtube := d.StreamControllers.YouTube - if youtube == nil { - logger.FromCtx(ctx).Debugf("youtube controller is not initialized") - return false - } - - broadcasts, err := youtube.ListBroadcasts(d.ctxForController(ctx)) - if err != nil { - d.UI.DisplayError(err) - return false - } - - logger.FromCtx(ctx).Debugf("got broadcasts: %#+v", broadcasts) - - func() { - d.CacheLock.Do(ctx, func() { - d.Cache.Youtube.Broadcasts = broadcasts - }) - }() - - err = d.SaveConfig(ctx) - errmon.ObserveErrorCtx(ctx, err) - return true -} - -func (d *StreamD) normalizeYoutubeData() { - s := d.Cache.Youtube.Broadcasts - sort.Slice(s, func(i, j int) bool { - return s[i].Snippet.Title < s[j].Snippet.Title - }) -} - func (d *StreamD) SaveConfig(ctx context.Context) error { return xsync.DoA1R1(ctx, &d.ConfigLock, d.saveConfig, ctx) } diff --git a/pkg/streamd/youtube.go b/pkg/streamd/youtube.go new file mode 100644 index 0000000..7e9b29b --- /dev/null +++ b/pkg/streamd/youtube.go @@ -0,0 +1,119 @@ +package streamd + +import ( + "context" + "fmt" + "sort" + + "github.com/facebookincubator/go-belt/tool/experimental/errmon" + "github.com/facebookincubator/go-belt/tool/logger" + upstreamyoutube "google.golang.org/api/youtube/v3" +) + +func (d *StreamD) initYoutubeData(ctx context.Context) bool { + logger.FromCtx(ctx).Debugf("initializing Youtube data") + defer logger.FromCtx(ctx).Debugf("endof initializing Youtube data") + + if c := len(d.Cache.Youtube.Broadcasts); c != 0 { + logger.FromCtx(ctx).Debugf("already have broadcasts (count: %d)", c) + updated, err := d.updateYoutubeBroadcasts(ctx) + if err != nil { + d.UI.DisplayError(err) + } + return updated + } + + youtube := d.StreamControllers.YouTube + if youtube == nil { + logger.FromCtx(ctx).Debugf("youtube controller is not initialized") + return false + } + + broadcasts, err := youtube.ListBroadcasts(d.ctxForController(ctx), 50000, nil) + if err != nil { + d.UI.DisplayError(err) + return false + } + + logger.FromCtx(ctx).Debugf("got broadcasts: %#+v", broadcasts) + d.CacheLock.Do(ctx, func() { + d.Cache.Youtube.Broadcasts = broadcasts + }) + + err = d.SaveConfig(ctx) + errmon.ObserveErrorCtx(ctx, err) + return true +} + +func (d *StreamD) updateYoutubeBroadcasts( + ctx context.Context, +) (bool, error) { + oldCount := len(d.Cache.Youtube.Broadcasts) + logger.FromCtx(ctx).Debugf("downloading new broadcasts only (count: %d)", oldCount) + defer func() { + logger.FromCtx(ctx).Debugf("downloading new broadcasts only (count: %d -> %d)", oldCount, len(d.Cache.Youtube.Broadcasts)) + }() + + yt := d.StreamControllers.YouTube + if yt == nil { + return false, fmt.Errorf("youtube controller is not initialized") + } + + broadcastIDs := map[string]struct{}{} + d.CacheLock.Do(ctx, func() { + for _, broadcast := range d.Cache.Youtube.Broadcasts { + broadcastIDs[broadcast.Id] = struct{}{} + } + }) + + broadcasts, err := yt.ListBroadcasts( + d.ctxForController(ctx), + 50000, + func(resp *upstreamyoutube.LiveBroadcastListResponse) bool { + for _, broadcast := range resp.Items { + if _, ok := broadcastIDs[broadcast.Id]; ok { + return false + } + } + return true + }, + ) + if err != nil { + return false, fmt.Errorf("unable to get new broadcasts: %w", err) + } + + if len(broadcasts) == 0 { + return false, nil + } + + var ( + prevLen, newLen int + ) + d.CacheLock.Do(ctx, func() { + prevLen = len(d.Cache.Youtube.Broadcasts) + expectedLen := len(broadcasts) + len(d.Cache.Youtube.Broadcasts) + result := make([]*upstreamyoutube.LiveBroadcast, 0, expectedLen) + alreadySet := make(map[string]struct{}, expectedLen) + appendWith := func(broadcasts []*upstreamyoutube.LiveBroadcast) { + for _, broadcast := range broadcasts { + if _, ok := alreadySet[broadcast.Id]; ok { + continue + } + alreadySet[broadcast.Id] = struct{}{} + result = append(result, broadcast) + } + } + appendWith(broadcasts) + appendWith(d.Cache.Youtube.Broadcasts) + d.Cache.Youtube.Broadcasts = result + newLen = len(result) + }) + return newLen != prevLen, nil +} + +func (d *StreamD) normalizeYoutubeData() { + s := d.Cache.Youtube.Broadcasts + sort.Slice(s, func(i, j int) bool { + return s[i].Snippet.Title < s[j].Snippet.Title + }) +}