From cbb2d1d82943f33a93f7d53525126cf9168121be Mon Sep 17 00:00:00 2001 From: Lezek123 Date: Thu, 28 Nov 2024 14:56:57 +0100 Subject: [PATCH 1/6] Fix polling: Retrieve missing video details, schedule channels based on changed videosNum --- .../syncProcessing/YoutubePollingService.ts | 127 ++++++++++-------- src/services/youtube/api.ts | 98 +++++++++++--- 2 files changed, 156 insertions(+), 69 deletions(-) diff --git a/src/services/syncProcessing/YoutubePollingService.ts b/src/services/syncProcessing/YoutubePollingService.ts index 8501b3e..c6b6739 100644 --- a/src/services/syncProcessing/YoutubePollingService.ts +++ b/src/services/syncProcessing/YoutubePollingService.ts @@ -7,10 +7,10 @@ import { YtChannel, YtVideo, verifiedVariants } from '../../types/youtube' import { LoggingService } from '../logging' import { JoystreamClient } from '../runtime/client' import { IYoutubeApi } from '../youtube/api' +import { GaxiosError } from 'googleapis-common' export class YoutubePollingService { private logger: Logger - private lastPolledChannelId: number | undefined = undefined // Track the last successfully polled channel ID public constructor( logging: LoggingService, @@ -46,33 +46,27 @@ export class YoutubePollingService { const sleepInterval = pollingIntervalMinutes * 60 * 1000 while (true) { try { - const channels = _.orderBy(await this.performChannelsIngestion(), ['joystreamChannelId'], ['desc']) - - console.log('Total channels to be polled:', channels.length) - // Start polling from the last successfully polled channel ID - const startIndex = this.lastPolledChannelId - ? channels.findIndex((channel) => channel.joystreamChannelId === this.lastPolledChannelId) - : 0 - const channelsToPoll = channels.slice(startIndex) + const channelsWithNewVideos = _.orderBy(await this.performChannelsIngestion(), ['joystreamChannelId'], ['desc']) this.logger.info( - `Completed Channels Ingestion. Videos of ${channelsToPoll.length} channels will be prepared for syncing in this polling cycle....` + `Videos of ${channelsWithNewVideos.length} channels will be prepared for syncing in this polling cycle....` ) - // Ingest videos of channels in a batch of 50 to limit IO/CPU resource consumption - const channelsBatch = _.chunk(channelsToPoll, 100) + // Ingest videos of channels in a batch of 100 to limit IO/CPU resource consumption + const channelsBatches = _.chunk(channelsWithNewVideos, 100) // Process each batch - let a = 0 - for (const channels of channelsBatch) { - a = a + channels.length - await Promise.all(channels.map((channel) => this.performVideosIngestion(channel))) - this.lastPolledChannelId = channels[channels.length - 1].joystreamChannelId // Update last polled channel ID - console.log('videos ingestion batch:', a) + let channelsProcessed = 0 + for (const channelsBatch of channelsBatches) { + // TODO: Uncomment after verification + // await Promise.all(channelsBatch.map((channel) => this.performVideosIngestion(channel))) + channelsProcessed += channelsBatch.length + this.logger.info(`Ingested videos of ${channelsProcessed}/${channelsWithNewVideos.length} channels...`) } } catch (err) { this.logger.error(`Critical Polling error`, { err }) } + this.logger.info(`All videos ingested.`) this.logger.info(`Youtube polling service paused for ${pollingIntervalMinutes} minute(s).`) await sleep(sleepInterval) this.logger.info(`Resume polling....`) @@ -84,8 +78,7 @@ export class YoutubePollingService { */ private async performChannelsIngestion(): Promise { // get all channels that need to be ingested - const channelsWithSyncEnabled = async () => - await this.dynamodbService.repo.channels.scan( + const channelsWithSyncEnabled = await this.dynamodbService.repo.channels.scan( {}, (scan) => scan @@ -101,22 +94,22 @@ export class YoutubePollingService { // * ingestion as we don't have access to their access/refresh tokens .where('performUnauthorizedSync') .eq(false) - // .and() - // .where('id') - // .eq('UCBWgpKlykQ8yPAEmTGdsXxw') ) - // updated channel objects with uptodate info - const channelsToBeIngestedChunks = _.chunk(await channelsWithSyncEnabled(), 100) + this.logger.info(`Found ${channelsWithSyncEnabled.length} channels with sync enabled.`) + + const channelsToBeIngestedChunks = _.chunk(channelsWithSyncEnabled, 50) + const channelsScheduledForVideoIngestion: YtChannel[] = [] - let a = 0 - console.log('await channelsWithSyncEnabled()', (await channelsWithSyncEnabled()).length) - for (const channelsToBeIngested of channelsToBeIngestedChunks) { - a = a + channelsToBeIngested.length - console.log('channels ingestion batch:', a) + let checkedChannelsCount = 0 + let updatedChannelsCount = 0 + for (const channelsBatch of channelsToBeIngestedChunks) { + const channelsStats = await this.youtubeApi.getChannelsStats(channelsBatch.map(c => c.id)) + const channelStatsbyId = new Map(channelsStats.map(c => [c.id, c.statistics])) + checkedChannelsCount += channelsBatch.length const updatedChannels = ( await Promise.all( - channelsToBeIngested.map(async (ch) => { + channelsBatch.map(async (ch) => { try { // ensure that Ypp collaborator member is still set as channel's collaborator const isCollaboratorSet = await this.joystreamClient.doesChannelHaveCollaborator(ch.joystreamChannelId) @@ -132,26 +125,45 @@ export class YoutubePollingService { lastActedAt: new Date(), } } - } catch (err: unknown) { - if (err instanceof Error && err.message.includes('This account has been terminated')) { - this.logger.warn( - `Opting out '${ch.id}' from YPP program as their Youtube channel has been terminated by the Youtube.` - ) - return { - ...ch, - yppStatus: 'OptedOut', - shouldBeIngested: false, - lastActedAt: new Date(), - } - } else if (err instanceof Error && err.message.includes('The playlist does not exist')) { - this.logger.warn(`Opting out '${ch.id}' from YPP program as Channel does not exist on Youtube.`) - return { - ...ch, - yppStatus: 'OptedOut', - shouldBeIngested: false, - lastActedAt: new Date(), - } + // Check channel's updated stats + const channelStats = channelStatsbyId.get(ch.id) + if (!channelStats) { + this.logger.warn(`Missing channel stats for channel ${ch.id}! Verifying channel status...`) + // TODO: Uncomment after verification + // try { + // await this.youtubeApi.getChannel({ id: ch.userId, accessToken: ch.userAccessToken, refreshToken: ch.userRefreshToken }) + // } catch (e) { + // if (e instanceof GaxiosError && e.message.includes('YouTube account of the authenticated user is suspended')) { + // this.logger.warn( + // `Opting out '${ch.id}' from YPP program as their Youtube channel has been terminated by the Youtube.` + // ) + // return { + // ...ch, + // yppStatus: 'OptedOut', + // shouldBeIngested: false, + // lastActedAt: new Date(), + // } + // } + // this.logger.warn(`Status of channel ${ch.id} unclear. Will be skipped for this cycle...`, { err: e }) + // return + // } + this.logger.warn(`Status of channel ${ch.id} unclear. Will be skipped for this cycle...`) + return + } + // Schedule for video ingestion if videoCount has changed + if (channelStats.videoCount !== ch.statistics.videoCount) { + this.logger.debug('Channel stats changed', { oldStats: ch.statistics, newStats: channelStats }) + channelsScheduledForVideoIngestion.push(ch) } + // Update stats in DynamoDB + // TODO: Uncomment after verification + // return { + // ...ch, + // statistics: { + // ...channelStats + // } + // } + } catch (err: unknown) { this.logger.error('Failed to fetch updated channel info', { err, channelId: ch.joystreamChannelId }) } }) @@ -159,13 +171,21 @@ export class YoutubePollingService { ).filter((ch): ch is YtChannel => ch !== undefined) // save updated channels - await this.dynamodbService.repo.channels.upsertAll(updatedChannels) + // TODO: Uncomment after verification + // await this.dynamodbService.repo.channels.upsertAll(updatedChannels) + updatedChannelsCount += updatedChannels.length + this.logger.info(`Processed ${checkedChannelsCount}/${channelsWithSyncEnabled.length} channels...`) // A delay between batches if necessary to prevent rate limits or high CPU/IO usage await sleep(100) } - return channelsWithSyncEnabled() + this.logger.info( + `Finished channel ingestion. ` + + `Updated ${updatedChannelsCount} channels.` + + `Found ${channelsScheduledForVideoIngestion.length} channels with new videos.` + ) + return channelsScheduledForVideoIngestion } public async performVideosIngestion(channel: YtChannel, initialIngestion = false) { @@ -179,7 +199,7 @@ export class YoutubePollingService { // get all video Ids that are not yet being tracked let untrackedVideos = await this.getUntrackedVideos(channel, videos) - console.log('untrackedVideos.length', videos.length, untrackedVideos.length, channel.joystreamChannelId) + this.logger.debug(`Found ${untrackedVideos.length} untracked videos (out of last ${videos.length}) for channel ${channel.joystreamChannelId}`) // save all new videos to DB including await this.dynamodbService.repo.videos.upsertAll(untrackedVideos) } catch (err) { @@ -201,6 +221,7 @@ export class YoutubePollingService { ytChannelId: channel.id, }) + // TODO: Uncomment after verification // await this.dynamodbService.repo.channels.save({ // ...channel, // yppStatus: 'OptedOut', diff --git a/src/services/youtube/api.ts b/src/services/youtube/api.ts index cdf07f3..10f17df 100644 --- a/src/services/youtube/api.ts +++ b/src/services/youtube/api.ts @@ -4,6 +4,7 @@ import { youtube_v3 } from '@googleapis/youtube' import { exec } from 'child_process' import { GetTokenResponse } from 'google-auth-library/build/src/auth/oauth2client' import { GaxiosError, OAuth2Client } from 'googleapis-common' +import { parse, toSeconds } from 'iso8601-duration' import _ from 'lodash' import moment from 'moment-timezone' import { FetchError } from 'node-fetch' @@ -18,7 +19,7 @@ import { YtChannel, YtDlpFlatPlaylistOutput, YtDlpVideoOutput, YtUser, YtVideo } import sleep from 'sleep-promise' import Schema$Channel = youtube_v3.Schema$Channel -import Schema$PlaylistItem = youtube_v3.Schema$PlaylistItem +import Schema$Video = youtube_v3.Schema$Video import { LoggingService } from '../logging' import { Logger } from 'winston' @@ -187,6 +188,7 @@ export interface IYoutubeApi { ytdlpClient: YtDlpClient getUserFromCode(code: string, youtubeRedirectUri: string): Promise getChannel(user: Pick): Promise + getChannelsStats(ids: string[]): Promise[]> getVerifiedChannel( user: Pick ): Promise<{ channel: YtChannel; errors: YoutubeApiError[] }> @@ -224,7 +226,7 @@ export class YoutubeClient implements IYoutubeApi { }) } - private getYoutube(accessToken: string, refreshToken: string) { + private getYoutube(accessToken?: string, refreshToken?: string) { const auth = this.getAuth() auth.setCredentials({ access_token: accessToken, @@ -310,6 +312,45 @@ export class YoutubeClient implements IYoutubeApi { return channel } + async getChannelsStats(ids: string[]): Promise[]> { + const yt = this.getYoutube() + + let nextPageToken: string = '' + const allChannelsStats: Pick[] = [] + do { + const channelsResponse = await yt.channels + .list({ + id: ids, + part: ['id', 'statistics'], + maxResults: 50 + }) + .catch((err) => { + if (err instanceof FetchError && err.code === 'ENOTFOUND') { + throw new YoutubeApiError(ExitCodes.YoutubeApi.YOUTUBE_API_NOT_CONNECTED, err.message) + } + throw err + }) + nextPageToken = channelsResponse.data.nextPageToken ?? '' + + // YouTube API will return HTTP 200 even if some of the channels + // are missing due to being suspended or removed. + const channelsStats = (channelsResponse.data.items || []) + .flatMap(c => c.id && c.statistics ? [{ id: c.id, statistics: c.statistics }] : []) + .map(c => ({ + id: c.id, + statistics: { + viewCount: parseInt(c.statistics.viewCount ?? '0'), + subscriberCount: parseInt(c.statistics.subscriberCount ?? '0'), + videoCount: parseInt(c.statistics.videoCount ?? '0'), + commentCount: parseInt(c.statistics.commentCount ?? '0'), + } + })) + allChannelsStats.push(...channelsStats) + } while (nextPageToken) + + return allChannelsStats + } + async getVerifiedChannel( user: Pick ): Promise<{ channel: YtChannel; errors: YoutubeApiError[] }> { @@ -470,7 +511,7 @@ export class YoutubeClient implements IYoutubeApi { do { const nextPage = await youtube.playlistItems .list({ - part: ['contentDetails', 'snippet', 'id', 'status'], + part: ['contentDetails'], playlistId: channel.uploadsPlaylistId, maxResults: 50, pageToken: nextPageToken ?? undefined, @@ -483,9 +524,21 @@ export class YoutubeClient implements IYoutubeApi { }) nextPageToken = nextPage.data.nextPageToken ?? '' - const page = this.mapVideos(nextPage.data.items ?? [], channel) - videos = [...videos, ...page] - console.log('videos.length < max', videos.length, limit, channel.joystreamChannelId) + const videoIds = nextPage.data.items?.flatMap(item => item.contentDetails?.videoId ? [item.contentDetails.videoId] : []) || [] + if (videoIds.length) { + const videosPage = await youtube.videos.list({ + id: videoIds, + part: ['id', 'status', 'snippet', 'statistics', 'fileDetails', 'contentDetails', 'liveStreamingDetails'], + maxResults: 50 + }).catch((err) => { + if (err instanceof FetchError && err.code === 'ENOTFOUND') { + throw new YoutubeApiError(ExitCodes.YoutubeApi.YOUTUBE_API_NOT_CONNECTED, err.message) + } + throw err + }) + const page = this.mapVideos(videosPage.data.items || [], channel) + videos = [...videos, ...page] + } } while (nextPageToken && videos.length < limit) return videos } @@ -529,13 +582,13 @@ export class YoutubeClient implements IYoutubeApi { ) } - private mapVideos(videos: Schema$PlaylistItem[], channel: YtChannel): YtVideo[] { + private mapVideos(videos: Schema$Video[], channel: YtChannel): YtVideo[] { return ( videos .map( (video) => { - id: video.contentDetails?.videoId, + id: video.id, description: video.snippet?.description, title: video.snippet?.title, channelId: video.snippet?.channelId, @@ -545,19 +598,19 @@ export class YoutubeClient implements IYoutubeApi { standard: video.snippet?.thumbnails?.standard?.url, default: video.snippet?.thumbnails?.default?.url, }, - url: `https://youtube.com/watch?v=${video.snippet?.resourceId?.videoId}`, + url: `https://youtube.com/watch?v=${video.id}`, publishedAt: video.snippet?.publishedAt, createdAt: new Date(), category: channel.videoCategoryId, languageIso: channel.joystreamChannelLanguageIso, privacyStatus: video.status?.privacyStatus, - ytRating: undefined, - liveBroadcastContent: 'none', - license: 'creativeCommon', - duration: 0, - container: '', - uploadStatus: 'processed', - viewCount: 0, + ytRating: video.contentDetails?.contentRating?.ytRating, + liveBroadcastContent: video.snippet?.liveBroadcastContent, + license: video.status?.license, + duration: toSeconds(parse(video.contentDetails?.duration ?? 'PT0S')), + container: video.fileDetails?.container, + uploadStatus: video.status?.uploadStatus, + viewCount: parseInt(video.statistics?.viewCount ?? '0'), state: 'New', } ) @@ -705,6 +758,19 @@ class QuotaMonitoringClient implements IQuotaMonitoringClient, IYoutubeApi { return channels } + async getChannelsStats(ids: string[]) { + // ensure have some left api quota + await this.ensureYoutubeQuota() + + // get channels from api + const channels = await this.decorated.getChannelsStats(ids) + + // increase used quota count, 1 api call is being used per page of 50 channels + await this.increaseUsedQuota({ syncQuotaIncrement: Math.ceil(ids.length / 50) }) + + return channels + } + async getVideos(channel: YtChannel, top: number) { // ensure have some left api quota await this.ensureYoutubeQuota() From 89b2bd500f7172ee31ddad17eb35234ee4d84190 Mon Sep 17 00:00:00 2001 From: Lezek123 Date: Thu, 28 Nov 2024 18:06:01 +0100 Subject: [PATCH 2/6] Add YouTube API key auth --- config.yml | 1 + src/schemas/config.ts | 1 + src/services/youtube/api.ts | 17 +++++++++++------ src/types/generated/ConfigJson.d.ts | 4 ++++ 4 files changed, 17 insertions(+), 6 deletions(-) diff --git a/config.yml b/config.yml index 17b9f12..c6f27a3 100644 --- a/config.yml +++ b/config.yml @@ -41,6 +41,7 @@ logs: youtube: clientId: google-client-id clientSecret: google-client-secret + apiKey: youtube-api-key # adcKeyFilePath: path/to/adc-key-file.json # maxAllowedQuotaUsageInPercentage: 95 # proxy: diff --git a/src/schemas/config.ts b/src/schemas/config.ts index 84c5c57..3c49238 100644 --- a/src/schemas/config.ts +++ b/src/schemas/config.ts @@ -177,6 +177,7 @@ export const configSchema: JSONSchema7 = objectSchema({ properties: { clientId: { type: 'string', description: 'Youtube Oauth2 Client Id' }, clientSecret: { type: 'string', description: 'Youtube Oauth2 Client Secret' }, + apiKey: { type: 'string', description: 'Youtube API key used for the purpose retrieving public data' }, maxAllowedQuotaUsageInPercentage: { description: `Maximum percentage of daily Youtube API quota that can be used by the Periodic polling service. ` + diff --git a/src/services/youtube/api.ts b/src/services/youtube/api.ts index 10f17df..497e6fa 100644 --- a/src/services/youtube/api.ts +++ b/src/services/youtube/api.ts @@ -227,12 +227,17 @@ export class YoutubeClient implements IYoutubeApi { } private getYoutube(accessToken?: string, refreshToken?: string) { - const auth = this.getAuth() - auth.setCredentials({ - access_token: accessToken, - refresh_token: refreshToken, - }) - return new youtube_v3.Youtube({ auth }) + if (accessToken || refreshToken) { + const auth = this.getAuth() + auth.setCredentials({ + access_token: accessToken, + refresh_token: refreshToken, + }) + + return new youtube_v3.Youtube({ auth }) + } + + return new youtube_v3.Youtube({ key: this.config.youtube.apiKey }) } private async getAccessToken( diff --git a/src/types/generated/ConfigJson.d.ts b/src/types/generated/ConfigJson.d.ts index 01a3d7a..cab4564 100644 --- a/src/types/generated/ConfigJson.d.ts +++ b/src/types/generated/ConfigJson.d.ts @@ -195,6 +195,10 @@ export interface YoutubeOauth2ClientConfiguration { * Youtube Oauth2 Client Secret */ clientSecret: string + /** + * Youtube API key used for the purpose retrieving public data + */ + apiKey?: string /** * Maximum percentage of daily Youtube API quota that can be used by the Periodic polling service. Once this limit is reached the service will stop polling for new videos until the next day(when Quota resets). All the remaining quota (100 - maxAllowedQuotaUsageInPercentage) will be used for potential channel's signups. */ From 1e22ef2ae67eeb3188cdcf3d1603127ea39e171f Mon Sep 17 00:00:00 2001 From: Lezek123 Date: Thu, 28 Nov 2024 18:13:31 +0100 Subject: [PATCH 3/6] Youtube auth fix --- src/services/youtube/api.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/services/youtube/api.ts b/src/services/youtube/api.ts index 497e6fa..76df10f 100644 --- a/src/services/youtube/api.ts +++ b/src/services/youtube/api.ts @@ -237,7 +237,7 @@ export class YoutubeClient implements IYoutubeApi { return new youtube_v3.Youtube({ auth }) } - return new youtube_v3.Youtube({ key: this.config.youtube.apiKey }) + return new youtube_v3.Youtube({ auth: this.config.youtube.apiKey }) } private async getAccessToken( From 9ad022b188270eab2c49abb0857a611338136853 Mon Sep 17 00:00:00 2001 From: Lezek123 Date: Thu, 28 Nov 2024 19:28:55 +0100 Subject: [PATCH 4/6] Enable commented-out code --- .../syncProcessing/YoutubePollingService.ts | 54 +++++++++---------- 1 file changed, 25 insertions(+), 29 deletions(-) diff --git a/src/services/syncProcessing/YoutubePollingService.ts b/src/services/syncProcessing/YoutubePollingService.ts index c6b6739..1fa6881 100644 --- a/src/services/syncProcessing/YoutubePollingService.ts +++ b/src/services/syncProcessing/YoutubePollingService.ts @@ -58,8 +58,7 @@ export class YoutubePollingService { let channelsProcessed = 0 for (const channelsBatch of channelsBatches) { - // TODO: Uncomment after verification - // await Promise.all(channelsBatch.map((channel) => this.performVideosIngestion(channel))) + await Promise.all(channelsBatch.map((channel) => this.performVideosIngestion(channel))) channelsProcessed += channelsBatch.length this.logger.info(`Ingested videos of ${channelsProcessed}/${channelsWithNewVideos.length} channels...`) } @@ -129,24 +128,23 @@ export class YoutubePollingService { const channelStats = channelStatsbyId.get(ch.id) if (!channelStats) { this.logger.warn(`Missing channel stats for channel ${ch.id}! Verifying channel status...`) - // TODO: Uncomment after verification - // try { - // await this.youtubeApi.getChannel({ id: ch.userId, accessToken: ch.userAccessToken, refreshToken: ch.userRefreshToken }) - // } catch (e) { - // if (e instanceof GaxiosError && e.message.includes('YouTube account of the authenticated user is suspended')) { - // this.logger.warn( - // `Opting out '${ch.id}' from YPP program as their Youtube channel has been terminated by the Youtube.` - // ) - // return { - // ...ch, - // yppStatus: 'OptedOut', - // shouldBeIngested: false, - // lastActedAt: new Date(), - // } - // } - // this.logger.warn(`Status of channel ${ch.id} unclear. Will be skipped for this cycle...`, { err: e }) - // return - // } + try { + await this.youtubeApi.getChannel({ id: ch.userId, accessToken: ch.userAccessToken, refreshToken: ch.userRefreshToken }) + } catch (e) { + if (e instanceof GaxiosError && e.message.includes('YouTube account of the authenticated user is suspended')) { + this.logger.warn( + `Opting out '${ch.id}' from YPP program as their Youtube channel has been terminated by the Youtube.` + ) + return { + ...ch, + yppStatus: 'OptedOut', + shouldBeIngested: false, + lastActedAt: new Date(), + } + } + this.logger.warn(`Status of channel ${ch.id} unclear. Will be skipped for this cycle...`, { err: e }) + return + } this.logger.warn(`Status of channel ${ch.id} unclear. Will be skipped for this cycle...`) return } @@ -156,13 +154,12 @@ export class YoutubePollingService { channelsScheduledForVideoIngestion.push(ch) } // Update stats in DynamoDB - // TODO: Uncomment after verification - // return { - // ...ch, - // statistics: { - // ...channelStats - // } - // } + return { + ...ch, + statistics: { + ...channelStats + } + } } catch (err: unknown) { this.logger.error('Failed to fetch updated channel info', { err, channelId: ch.joystreamChannelId }) } @@ -171,8 +168,7 @@ export class YoutubePollingService { ).filter((ch): ch is YtChannel => ch !== undefined) // save updated channels - // TODO: Uncomment after verification - // await this.dynamodbService.repo.channels.upsertAll(updatedChannels) + await this.dynamodbService.repo.channels.upsertAll(updatedChannels) updatedChannelsCount += updatedChannels.length this.logger.info(`Processed ${checkedChannelsCount}/${channelsWithSyncEnabled.length} channels...`) From cd15846afd0b4c56b6b078fa7e6570c128fd96fd Mon Sep 17 00:00:00 2001 From: Lezek123 Date: Thu, 28 Nov 2024 20:16:56 +0100 Subject: [PATCH 5/6] Fix elastic logging --- src/services/logging/LoggingService.ts | 57 +++++++++++++++++++++++--- 1 file changed, 51 insertions(+), 6 deletions(-) diff --git a/src/services/logging/LoggingService.ts b/src/services/logging/LoggingService.ts index b7f763f..9832756 100644 --- a/src/services/logging/LoggingService.ts +++ b/src/services/logging/LoggingService.ts @@ -8,6 +8,8 @@ import winston, { Logger, LoggerOptions } from 'winston' import 'winston-daily-rotate-file' import { ElasticsearchTransport } from 'winston-elasticsearch' import { ReadonlyConfig } from '../../types' +import { FetchError } from 'node-fetch' +import { GaxiosError } from 'googleapis-common' const colors = { error: 'red', @@ -41,19 +43,58 @@ const pauseFormat: (opts: PauseFormatOpts) => Format = winston.format((info, opt }) // Error format applied to specific log meta field -type ErrorFormatOpts = { filedName: string } -const errorFormat: (opts: ErrorFormatOpts) => Format = winston.format((info, opts: ErrorFormatOpts) => { - if (!info[opts.filedName]) { +type CLIErrorFormatOpts = { fieldName: string } +const cliErrorFormat: (opts: CLIErrorFormatOpts) => Format = winston.format((info, { fieldName }: CLIErrorFormatOpts) => { + if (!info[fieldName]) { return info } const formatter = winston.format.errors({ stack: true }) - info[opts.filedName] = formatter.transform(info[opts.filedName], formatter.options) + info[fieldName] = formatter.transform(info[fieldName], formatter.options) + return info +}) + +// Elastic error format +type ElasticErrorFormatOpts = { fieldName: string } +const elasticErrorFormat: (opts: ElasticErrorFormatOpts) => Format = winston.format((info, { fieldName }: ElasticErrorFormatOpts) => { + const err = info[fieldName] + if (!err) { + return info + } + // Collect error data depending on type + if (err instanceof GaxiosError) { + info[fieldName] = { + message: err.message, + code: err.code, + ...(err.response ? { + response: { + status: err.response.status, + statusText: err.response.statusText, + // TODO: Maybe we can add .data later, but need to adjust es mappings + } + }: {}), + name: err.name + } + } + else if (err instanceof FetchError) { + info[fieldName] = { + message: err.message, + code: err.code, + type: err.type, + name: err.name + } + } + else if (err instanceof Error) { + info[fieldName] = { message: err.message } + } + else { + info[fieldName] = { message: String(err) } + } return info }) const cliFormat = winston.format.combine( winston.format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss:ms' }), - errorFormat({ filedName: 'err' }), + cliErrorFormat({ fieldName: 'err' }), winston.format.metadata({ fillExcept: ['label', 'level', 'timestamp', 'message'] }), winston.format.colorize({ all: true }), winston.format.printf( @@ -88,7 +129,11 @@ export class LoggingService { esTransport = new ElasticsearchTransport({ index: 'youtube-synch', level: logs.elastic.level, - format: winston.format.combine(pauseFormat({ id: 'es' }), escFormat()), + format: winston.format.combine( + pauseFormat({ id: 'es' }), + elasticErrorFormat({ fieldName: 'err' }), + escFormat() + ), retryLimit: 10, flushInterval: 1000, clientOpts: { From c050cc6dabef10aca7c160cc6b133c032fbd546d Mon Sep 17 00:00:00 2001 From: Lezek123 Date: Thu, 28 Nov 2024 20:34:51 +0100 Subject: [PATCH 6/6] Fix elastic error logging: 2nd attempt --- src/services/logging/LoggingService.ts | 67 +++++++++++++++----------- 1 file changed, 40 insertions(+), 27 deletions(-) diff --git a/src/services/logging/LoggingService.ts b/src/services/logging/LoggingService.ts index 9832756..11a2df0 100644 --- a/src/services/logging/LoggingService.ts +++ b/src/services/logging/LoggingService.ts @@ -54,41 +54,54 @@ const cliErrorFormat: (opts: CLIErrorFormatOpts) => Format = winston.format((inf }) // Elastic error format -type ElasticErrorFormatOpts = { fieldName: string } -const elasticErrorFormat: (opts: ElasticErrorFormatOpts) => Format = winston.format((info, { fieldName }: ElasticErrorFormatOpts) => { - const err = info[fieldName] - if (!err) { - return info +class NormalizedError extends Error { + message: string + name: string + code?: string + type?: string + response?: { + status: number + statusText: string } - // Collect error data depending on type - if (err instanceof GaxiosError) { - info[fieldName] = { - message: err.message, - code: err.code, - ...(err.response ? { - response: { + + constructor(err: unknown) { + // Collect error data depending on type + if (err instanceof GaxiosError) { + super(err.message) + this.code = err.code + if (err.response) { + this.response = { status: err.response.status, - statusText: err.response.statusText, + statusText: err.response.statusText // TODO: Maybe we can add .data later, but need to adjust es mappings } - }: {}), - name: err.name + } + this.name = err.name } - } - else if (err instanceof FetchError) { - info[fieldName] = { - message: err.message, - code: err.code, - type: err.type, - name: err.name + else if (err instanceof FetchError) { + super(err.message) + this.code = err.code, + this.type = err.type, + this.name = err.name + } + else if (err instanceof Error) { + super(err.message) + this.name = 'Error' + } + else { + super(String(err)) + this.name = 'UnknownError' } } - else if (err instanceof Error) { - info[fieldName] = { message: err.message } - } - else { - info[fieldName] = { message: String(err) } +} + +type ElasticErrorFormatOpts = { fieldName: string } +const elasticErrorFormat: (opts: ElasticErrorFormatOpts) => Format = winston.format((info, { fieldName }: ElasticErrorFormatOpts) => { + const err = info[fieldName] + if (!err) { + return info } + info[fieldName] = new NormalizedError(err) return info })