Skip to content

Commit

Permalink
Merge pull request #345 from Lezek123/fix-polling
Browse files Browse the repository at this point in the history
Polling and logging fixes
  • Loading branch information
Lezek123 authored Dec 2, 2024
2 parents aff598c + c050cc6 commit f84526e
Show file tree
Hide file tree
Showing 6 changed files with 230 additions and 78 deletions.
1 change: 1 addition & 0 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ logs:
youtube:
clientId: google-client-id
clientSecret: google-client-secret
apiKey: youtube-api-key
# adcKeyFilePath: path/to/adc-key-file.json
# maxAllowedQuotaUsageInPercentage: 95
# proxy:
Expand Down
1 change: 1 addition & 0 deletions src/schemas/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ export const configSchema: JSONSchema7 = objectSchema({
properties: {
clientId: { type: 'string', description: 'Youtube Oauth2 Client Id' },
clientSecret: { type: 'string', description: 'Youtube Oauth2 Client Secret' },
apiKey: { type: 'string', description: 'Youtube API key used for the purpose retrieving public data' },
maxAllowedQuotaUsageInPercentage: {
description:
`Maximum percentage of daily Youtube API quota that can be used by the Periodic polling service. ` +
Expand Down
70 changes: 64 additions & 6 deletions src/services/logging/LoggingService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import winston, { Logger, LoggerOptions } from 'winston'
import 'winston-daily-rotate-file'
import { ElasticsearchTransport } from 'winston-elasticsearch'
import { ReadonlyConfig } from '../../types'
import { FetchError } from 'node-fetch'
import { GaxiosError } from 'googleapis-common'

const colors = {
error: 'red',
Expand Down Expand Up @@ -41,19 +43,71 @@ const pauseFormat: (opts: PauseFormatOpts) => Format = winston.format((info, opt
})

// Error format applied to specific log meta field
type ErrorFormatOpts = { filedName: string }
const errorFormat: (opts: ErrorFormatOpts) => Format = winston.format((info, opts: ErrorFormatOpts) => {
if (!info[opts.filedName]) {
type CLIErrorFormatOpts = { fieldName: string }
const cliErrorFormat: (opts: CLIErrorFormatOpts) => Format = winston.format((info, { fieldName }: CLIErrorFormatOpts) => {
if (!info[fieldName]) {
return info
}
const formatter = winston.format.errors({ stack: true })
info[opts.filedName] = formatter.transform(info[opts.filedName], formatter.options)
info[fieldName] = formatter.transform(info[fieldName], formatter.options)
return info
})

// Elastic error format
class NormalizedError extends Error {
message: string
name: string
code?: string
type?: string
response?: {
status: number
statusText: string
}

constructor(err: unknown) {
// Collect error data depending on type
if (err instanceof GaxiosError) {
super(err.message)
this.code = err.code
if (err.response) {
this.response = {
status: err.response.status,
statusText: err.response.statusText
// TODO: Maybe we can add .data later, but need to adjust es mappings
}
}
this.name = err.name
}
else if (err instanceof FetchError) {
super(err.message)
this.code = err.code,
this.type = err.type,
this.name = err.name
}
else if (err instanceof Error) {
super(err.message)
this.name = 'Error'
}
else {
super(String(err))
this.name = 'UnknownError'
}
}
}

type ElasticErrorFormatOpts = { fieldName: string }
const elasticErrorFormat: (opts: ElasticErrorFormatOpts) => Format = winston.format((info, { fieldName }: ElasticErrorFormatOpts) => {
const err = info[fieldName]
if (!err) {
return info
}
info[fieldName] = new NormalizedError(err)
return info
})

const cliFormat = winston.format.combine(
winston.format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss:ms' }),
errorFormat({ filedName: 'err' }),
cliErrorFormat({ fieldName: 'err' }),
winston.format.metadata({ fillExcept: ['label', 'level', 'timestamp', 'message'] }),
winston.format.colorize({ all: true }),
winston.format.printf(
Expand Down Expand Up @@ -88,7 +142,11 @@ export class LoggingService {
esTransport = new ElasticsearchTransport({
index: 'youtube-synch',
level: logs.elastic.level,
format: winston.format.combine(pauseFormat({ id: 'es' }), escFormat()),
format: winston.format.combine(
pauseFormat({ id: 'es' }),
elasticErrorFormat({ fieldName: 'err' }),
escFormat()
),
retryLimit: 10,
flushInterval: 1000,
clientOpts: {
Expand Down
117 changes: 67 additions & 50 deletions src/services/syncProcessing/YoutubePollingService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import { YtChannel, YtVideo, verifiedVariants } from '../../types/youtube'
import { LoggingService } from '../logging'
import { JoystreamClient } from '../runtime/client'
import { IYoutubeApi } from '../youtube/api'
import { GaxiosError } from 'googleapis-common'

export class YoutubePollingService {
private logger: Logger
private lastPolledChannelId: number | undefined = undefined // Track the last successfully polled channel ID

public constructor(
logging: LoggingService,
Expand Down Expand Up @@ -46,33 +46,26 @@ export class YoutubePollingService {
const sleepInterval = pollingIntervalMinutes * 60 * 1000
while (true) {
try {
const channels = _.orderBy(await this.performChannelsIngestion(), ['joystreamChannelId'], ['desc'])

console.log('Total channels to be polled:', channels.length)
// Start polling from the last successfully polled channel ID
const startIndex = this.lastPolledChannelId
? channels.findIndex((channel) => channel.joystreamChannelId === this.lastPolledChannelId)
: 0
const channelsToPoll = channels.slice(startIndex)
const channelsWithNewVideos = _.orderBy(await this.performChannelsIngestion(), ['joystreamChannelId'], ['desc'])

this.logger.info(
`Completed Channels Ingestion. Videos of ${channelsToPoll.length} channels will be prepared for syncing in this polling cycle....`
`Videos of ${channelsWithNewVideos.length} channels will be prepared for syncing in this polling cycle....`
)

// Ingest videos of channels in a batch of 50 to limit IO/CPU resource consumption
const channelsBatch = _.chunk(channelsToPoll, 100)
// Ingest videos of channels in a batch of 100 to limit IO/CPU resource consumption
const channelsBatches = _.chunk(channelsWithNewVideos, 100)
// Process each batch

let a = 0
for (const channels of channelsBatch) {
a = a + channels.length
await Promise.all(channels.map((channel) => this.performVideosIngestion(channel)))
this.lastPolledChannelId = channels[channels.length - 1].joystreamChannelId // Update last polled channel ID
console.log('videos ingestion batch:', a)
let channelsProcessed = 0
for (const channelsBatch of channelsBatches) {
await Promise.all(channelsBatch.map((channel) => this.performVideosIngestion(channel)))
channelsProcessed += channelsBatch.length
this.logger.info(`Ingested videos of ${channelsProcessed}/${channelsWithNewVideos.length} channels...`)
}
} catch (err) {
this.logger.error(`Critical Polling error`, { err })
}
this.logger.info(`All videos ingested.`)
this.logger.info(`Youtube polling service paused for ${pollingIntervalMinutes} minute(s).`)
await sleep(sleepInterval)
this.logger.info(`Resume polling....`)
Expand All @@ -84,8 +77,7 @@ export class YoutubePollingService {
*/
private async performChannelsIngestion(): Promise<YtChannel[]> {
// get all channels that need to be ingested
const channelsWithSyncEnabled = async () =>
await this.dynamodbService.repo.channels.scan(
const channelsWithSyncEnabled = await this.dynamodbService.repo.channels.scan(
{},
(scan) =>
scan
Expand All @@ -101,22 +93,22 @@ export class YoutubePollingService {
// * ingestion as we don't have access to their access/refresh tokens
.where('performUnauthorizedSync')
.eq(false)
// .and()
// .where('id')
// .eq('UCBWgpKlykQ8yPAEmTGdsXxw')
)

// updated channel objects with uptodate info
const channelsToBeIngestedChunks = _.chunk(await channelsWithSyncEnabled(), 100)
this.logger.info(`Found ${channelsWithSyncEnabled.length} channels with sync enabled.`)

const channelsToBeIngestedChunks = _.chunk(channelsWithSyncEnabled, 50)
const channelsScheduledForVideoIngestion: YtChannel[] = []

let a = 0
console.log('await channelsWithSyncEnabled()', (await channelsWithSyncEnabled()).length)
for (const channelsToBeIngested of channelsToBeIngestedChunks) {
a = a + channelsToBeIngested.length
console.log('channels ingestion batch:', a)
let checkedChannelsCount = 0
let updatedChannelsCount = 0
for (const channelsBatch of channelsToBeIngestedChunks) {
const channelsStats = await this.youtubeApi.getChannelsStats(channelsBatch.map(c => c.id))
const channelStatsbyId = new Map(channelsStats.map(c => [c.id, c.statistics]))
checkedChannelsCount += channelsBatch.length
const updatedChannels = (
await Promise.all(
channelsToBeIngested.map(async (ch) => {
channelsBatch.map(async (ch) => {
try {
// ensure that Ypp collaborator member is still set as channel's collaborator
const isCollaboratorSet = await this.joystreamClient.doesChannelHaveCollaborator(ch.joystreamChannelId)
Expand All @@ -132,26 +124,43 @@ export class YoutubePollingService {
lastActedAt: new Date(),
}
}
} catch (err: unknown) {
if (err instanceof Error && err.message.includes('This account has been terminated')) {
this.logger.warn(
`Opting out '${ch.id}' from YPP program as their Youtube channel has been terminated by the Youtube.`
)
return {
...ch,
yppStatus: 'OptedOut',
shouldBeIngested: false,
lastActedAt: new Date(),
// Check channel's updated stats
const channelStats = channelStatsbyId.get(ch.id)
if (!channelStats) {
this.logger.warn(`Missing channel stats for channel ${ch.id}! Verifying channel status...`)
try {
await this.youtubeApi.getChannel({ id: ch.userId, accessToken: ch.userAccessToken, refreshToken: ch.userRefreshToken })
} catch (e) {
if (e instanceof GaxiosError && e.message.includes('YouTube account of the authenticated user is suspended')) {
this.logger.warn(
`Opting out '${ch.id}' from YPP program as their Youtube channel has been terminated by the Youtube.`
)
return {
...ch,
yppStatus: 'OptedOut',
shouldBeIngested: false,
lastActedAt: new Date(),
}
}
this.logger.warn(`Status of channel ${ch.id} unclear. Will be skipped for this cycle...`, { err: e })
return
}
} else if (err instanceof Error && err.message.includes('The playlist does not exist')) {
this.logger.warn(`Opting out '${ch.id}' from YPP program as Channel does not exist on Youtube.`)
return {
...ch,
yppStatus: 'OptedOut',
shouldBeIngested: false,
lastActedAt: new Date(),
this.logger.warn(`Status of channel ${ch.id} unclear. Will be skipped for this cycle...`)
return
}
// Schedule for video ingestion if videoCount has changed
if (channelStats.videoCount !== ch.statistics.videoCount) {
this.logger.debug('Channel stats changed', { oldStats: ch.statistics, newStats: channelStats })
channelsScheduledForVideoIngestion.push(ch)
}
// Update stats in DynamoDB
return {
...ch,
statistics: {
...channelStats
}
}
} catch (err: unknown) {
this.logger.error('Failed to fetch updated channel info', { err, channelId: ch.joystreamChannelId })
}
})
Expand All @@ -160,12 +169,19 @@ export class YoutubePollingService {

// save updated channels
await this.dynamodbService.repo.channels.upsertAll(updatedChannels)
updatedChannelsCount += updatedChannels.length

this.logger.info(`Processed ${checkedChannelsCount}/${channelsWithSyncEnabled.length} channels...`)
// A delay between batches if necessary to prevent rate limits or high CPU/IO usage
await sleep(100)
}

return channelsWithSyncEnabled()
this.logger.info(
`Finished channel ingestion. ` +
`Updated ${updatedChannelsCount} channels.` +
`Found ${channelsScheduledForVideoIngestion.length} channels with new videos.`
)
return channelsScheduledForVideoIngestion
}

public async performVideosIngestion(channel: YtChannel, initialIngestion = false) {
Expand All @@ -179,7 +195,7 @@ export class YoutubePollingService {
// get all video Ids that are not yet being tracked
let untrackedVideos = await this.getUntrackedVideos(channel, videos)

console.log('untrackedVideos.length', videos.length, untrackedVideos.length, channel.joystreamChannelId)
this.logger.debug(`Found ${untrackedVideos.length} untracked videos (out of last ${videos.length}) for channel ${channel.joystreamChannelId}`)
// save all new videos to DB including
await this.dynamodbService.repo.videos.upsertAll(untrackedVideos)
} catch (err) {
Expand All @@ -201,6 +217,7 @@ export class YoutubePollingService {
ytChannelId: channel.id,
})

// TODO: Uncomment after verification
// await this.dynamodbService.repo.channels.save({
// ...channel,
// yppStatus: 'OptedOut',
Expand Down
Loading

0 comments on commit f84526e

Please sign in to comment.