Skip to content

Commit

Permalink
Download all youtube streams, instead of only 50
Browse files Browse the repository at this point in the history
  • Loading branch information
xaionaro committed Dec 1, 2024
1 parent e039b0f commit 4503ccc
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 47 deletions.
68 changes: 62 additions & 6 deletions pkg/streamcontrol/youtube/youtube.go
Original file line number Diff line number Diff line change
Expand Up @@ -1231,23 +1231,79 @@ type LiveBroadcast = youtube.LiveBroadcast

func (yt *YouTube) ListBroadcasts(
ctx context.Context,
) ([]*youtube.LiveBroadcast, error) {
limit uint,
continueFunc func(*youtube.LiveBroadcastListResponse) bool,
) (_ret []*youtube.LiveBroadcast, _err error) {
logger.Debugf(ctx, "ListBroadcasts(ctx, %d)", limit)
defer func() {
logger.Debugf(ctx, "/ListBroadcasts(ctx, %d): len(result):%d; err:%v", limit, len(_ret), _err)
}()

var items []*youtube.LiveBroadcast
var pageToken string
for receivedCount := uint(0); receivedCount < limit; {
maxResults := uint(limit - receivedCount)
if maxResults > 50 {
maxResults = 50
}

resp, err := yt.listBroadcastsPage(ctx, maxResults, pageToken)
if err != nil {
return nil, fmt.Errorf("listBroadcastsPage: %w", err)
}

if len(resp.Items) == 0 {
break
}

oldCount := receivedCount
pageToken = resp.NextPageToken
receivedCount += uint(len(resp.Items))
items = append(items, resp.Items...)

if pageToken == "" {
break
}
if uint(len(resp.Items)) < maxResults {
logger.Errorf(ctx, "received less than expected: %d < %d; breaking the loop", resp.PageInfo.TotalResults, maxResults)
}
if continueFunc != nil && !continueFunc(resp) {
break
}
logger.Debugf(ctx, "ListBroadcasts: count %d -> %d...", oldCount, receivedCount)
}

return items, nil
}

func (yt *YouTube) listBroadcastsPage(
ctx context.Context,
limit uint,
pageToken string,
) (*youtube.LiveBroadcastListResponse, error) {
logger.Debugf(ctx, "listBroadcastsPage(ctx, %d, '%s')", limit, pageToken)
if limit > 50 {
return nil, fmt.Errorf("one page may return only 50 items max, see 'maxResults' in https://developers.google.com/youtube/v3/live/docs/liveBroadcasts/list")
}
counter := 0
for {
response, err := yt.YouTubeService.
query := yt.YouTubeService.
LiveBroadcasts.
List([]string{"id", "snippet", "contentDetails", "monetizationDetails", "status"}).
Mine(true).
MaxResults(1000).
Context(ctx).Do()
logger.Debugf(ctx, "YouTube.LiveBroadcasts result: %v", err)
MaxResults(int64(limit))
if pageToken != "" {
query = query.PageToken(pageToken)
}
response, err := query.Context(ctx).Do()
logger.Debugf(ctx, "YouTube.LiveBroadcasts result: %v (counter: %d)", err, counter)
if err != nil {
if yt.fixError(ctx, err, &counter) {
continue
}
return nil, fmt.Errorf("unable to query the list of broadcasts: %w", err)
}
return response.Items, nil
return response, nil
}
}

Expand Down
41 changes: 0 additions & 41 deletions pkg/streamd/streamd.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,47 +415,6 @@ func (d *StreamD) normalizeTwitchData() {
})
}

func (d *StreamD) initYoutubeData(ctx context.Context) bool {
logger.FromCtx(ctx).Debugf("initializing Youtube data")
defer logger.FromCtx(ctx).Debugf("endof initializing Youtube data")

if c := len(d.Cache.Youtube.Broadcasts); c != 0 {
logger.FromCtx(ctx).Debugf("already have broadcasts (count: %d)", c)
return false
}

youtube := d.StreamControllers.YouTube
if youtube == nil {
logger.FromCtx(ctx).Debugf("youtube controller is not initialized")
return false
}

broadcasts, err := youtube.ListBroadcasts(d.ctxForController(ctx))
if err != nil {
d.UI.DisplayError(err)
return false
}

logger.FromCtx(ctx).Debugf("got broadcasts: %#+v", broadcasts)

func() {
d.CacheLock.Do(ctx, func() {
d.Cache.Youtube.Broadcasts = broadcasts
})
}()

err = d.SaveConfig(ctx)
errmon.ObserveErrorCtx(ctx, err)
return true
}

func (d *StreamD) normalizeYoutubeData() {
s := d.Cache.Youtube.Broadcasts
sort.Slice(s, func(i, j int) bool {
return s[i].Snippet.Title < s[j].Snippet.Title
})
}

func (d *StreamD) SaveConfig(ctx context.Context) error {
return xsync.DoA1R1(ctx, &d.ConfigLock, d.saveConfig, ctx)
}
Expand Down
119 changes: 119 additions & 0 deletions pkg/streamd/youtube.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package streamd

import (
"context"
"fmt"
"sort"

"github.com/facebookincubator/go-belt/tool/experimental/errmon"
"github.com/facebookincubator/go-belt/tool/logger"
upstreamyoutube "google.golang.org/api/youtube/v3"
)

func (d *StreamD) initYoutubeData(ctx context.Context) bool {
logger.FromCtx(ctx).Debugf("initializing Youtube data")
defer logger.FromCtx(ctx).Debugf("endof initializing Youtube data")

if c := len(d.Cache.Youtube.Broadcasts); c != 0 {
logger.FromCtx(ctx).Debugf("already have broadcasts (count: %d)", c)
updated, err := d.updateYoutubeBroadcasts(ctx)
if err != nil {
d.UI.DisplayError(err)
}
return updated
}

youtube := d.StreamControllers.YouTube
if youtube == nil {
logger.FromCtx(ctx).Debugf("youtube controller is not initialized")
return false
}

broadcasts, err := youtube.ListBroadcasts(d.ctxForController(ctx), 50000, nil)
if err != nil {
d.UI.DisplayError(err)
return false
}

logger.FromCtx(ctx).Debugf("got broadcasts: %#+v", broadcasts)
d.CacheLock.Do(ctx, func() {
d.Cache.Youtube.Broadcasts = broadcasts
})

err = d.SaveConfig(ctx)
errmon.ObserveErrorCtx(ctx, err)
return true
}

func (d *StreamD) updateYoutubeBroadcasts(
ctx context.Context,
) (bool, error) {
oldCount := len(d.Cache.Youtube.Broadcasts)
logger.FromCtx(ctx).Debugf("downloading new broadcasts only (count: %d)", oldCount)
defer func() {
logger.FromCtx(ctx).Debugf("downloading new broadcasts only (count: %d -> %d)", oldCount, len(d.Cache.Youtube.Broadcasts))
}()

yt := d.StreamControllers.YouTube
if yt == nil {
return false, fmt.Errorf("youtube controller is not initialized")
}

broadcastIDs := map[string]struct{}{}
d.CacheLock.Do(ctx, func() {
for _, broadcast := range d.Cache.Youtube.Broadcasts {
broadcastIDs[broadcast.Id] = struct{}{}
}
})

broadcasts, err := yt.ListBroadcasts(
d.ctxForController(ctx),
50000,
func(resp *upstreamyoutube.LiveBroadcastListResponse) bool {
for _, broadcast := range resp.Items {
if _, ok := broadcastIDs[broadcast.Id]; ok {
return false
}
}
return true
},
)
if err != nil {
return false, fmt.Errorf("unable to get new broadcasts: %w", err)
}

if len(broadcasts) == 0 {
return false, nil
}

var (
prevLen, newLen int
)
d.CacheLock.Do(ctx, func() {
prevLen = len(d.Cache.Youtube.Broadcasts)
expectedLen := len(broadcasts) + len(d.Cache.Youtube.Broadcasts)
result := make([]*upstreamyoutube.LiveBroadcast, 0, expectedLen)
alreadySet := make(map[string]struct{}, expectedLen)
appendWith := func(broadcasts []*upstreamyoutube.LiveBroadcast) {
for _, broadcast := range broadcasts {
if _, ok := alreadySet[broadcast.Id]; ok {
continue
}
alreadySet[broadcast.Id] = struct{}{}
result = append(result, broadcast)
}
}
appendWith(broadcasts)
appendWith(d.Cache.Youtube.Broadcasts)
d.Cache.Youtube.Broadcasts = result
newLen = len(result)
})
return newLen != prevLen, nil
}

func (d *StreamD) normalizeYoutubeData() {
s := d.Cache.Youtube.Broadcasts
sort.Slice(s, func(i, j int) bool {
return s[i].Snippet.Title < s[j].Snippet.Title
})
}

0 comments on commit 4503ccc

Please sign in to comment.