From aed36584bd984ba9c52f8c5e5b73c128dea92faa Mon Sep 17 00:00:00 2001 From: branberry Date: Wed, 15 Nov 2023 12:52:17 -0600 Subject: [PATCH] Refactor xray --- src/enhanced/enhancedApp.ts | 12 ++------- src/enhanced/utils/queue/index.ts | 36 ++++++++++++--------------- src/enhanced/utils/xray/index.xray.ts | 3 +++ 3 files changed, 21 insertions(+), 30 deletions(-) create mode 100644 src/enhanced/utils/xray/index.xray.ts diff --git a/src/enhanced/enhancedApp.ts b/src/enhanced/enhancedApp.ts index e8d6c14f4..952a29beb 100644 --- a/src/enhanced/enhancedApp.ts +++ b/src/enhanced/enhancedApp.ts @@ -6,7 +6,6 @@ import c from 'config'; import { handleJob } from './utils/job'; import { listenToJobQueue } from './utils/queue'; AWSXRay.config([AWSXRay.plugins.ECSPlugin]); -const xrayNamespace = AWSXRay.getNamespace(); let client: MongoClient | undefined; @@ -59,25 +58,18 @@ async function app(): Promise { console.log('[app]: starting application'); try { - const { - payload: { jobId }, - segment, - } = await listenToJobQueue(); + const { jobId } = await listenToJobQueue(); const db = await connectToDb(); await handleJobAndCleanUp(jobId, db); console.log('[app]: process completed'); - segment?.close(); } catch (e) { console.error('[app]: ERROR! Job initialization failed', e); process.exitCode = 1; } } - -xrayNamespace.runPromise(async () => { - await app(); -}); +app(); process.on('SIGTERM', async () => { await cleanupJob(); diff --git a/src/enhanced/utils/queue/index.ts b/src/enhanced/utils/queue/index.ts index e21e19176..aa8803c25 100644 --- a/src/enhanced/utils/queue/index.ts +++ b/src/enhanced/utils/queue/index.ts @@ -2,7 +2,7 @@ import { ReceiveMessageCommandInput, SQS } from '@aws-sdk/client-sqs'; import crypto from 'crypto'; import config from 'config'; import dgram from 'dgram'; -import AWSXRay from 'aws-xray-sdk-core'; + import { JobsQueuePayload } from '../../types/job-types'; import { isJobQueuePayload } from '../../types/utils/type-guards'; import { protectTask } from '../job'; @@ -13,7 +13,7 @@ const client = new SQS({ region: 'us-east-2' }); * This function listens to the job queue until a message is received. * @returns {Promise} the promise for the payload object after a message has been received */ -export async function listenToJobQueue(): Promise<{ payload: JobsQueuePayload; segment: AWSXRay.Segment | undefined }> { +export async function listenToJobQueue(): Promise { const queueUrl = config.get('jobsQueueUrl'); console.log('[listenToJobQueue]: Polling jobsQueue'); @@ -80,26 +80,22 @@ export async function listenToJobQueue(): Promise<{ payload: JobsQueuePayload; s console.log('MessageHeaderAttributes', message.Attributes, message.MessageAttributes); const xrayTraceId = message.Attributes?.['AWSTraceHeader']; - let segment: AWSXRay.Segment | undefined; if (xrayTraceId) { console.log('Xray trace id: ', xrayTraceId); - // const startTime = (Date.now() / 1000).toFixed(3); + const startTime = (Date.now() / 1000).toFixed(3); const traceId = xrayTraceId.split(';')[0].split('=')[1]; const parentSegmentId = xrayTraceId.split(';')[1].split('=')[1]; - // const segmentId = crypto.randomBytes(8).toString('hex'); - - // const newSegment = { - // name: 'Autobuilder', - // id: segmentId, - // trace_id: traceId, - // parent_id: parentSegmentId, - // start_time: Number(startTime), - // in_progress: true, - // }; - // sendUdpMessage(newSegment); - - segment = new AWSXRay.Segment('Autobuilder', traceId, parentSegmentId); - AWSXRay.setSegment(segment); + const segmentId = crypto.randomBytes(8).toString('hex'); + + const newSegment = { + name: 'Autobuilder', + id: segmentId, + trace_id: traceId, + parent_id: parentSegmentId, + start_time: Number(startTime), + in_progress: true, + }; + sendUdpMessage(newSegment); } else { console.log('no trace id found'); } @@ -119,11 +115,11 @@ export async function listenToJobQueue(): Promise<{ payload: JobsQueuePayload; s // Great! we received a proper message from the queue. Return this object as we will no longer // want to poll for more messages. - return { payload, segment }; + return { payload }; } } -async function sendUdpMessage(obj: unknown) { +export async function sendUdpMessage(obj: unknown) { const client = dgram.createSocket('udp4'); console.log('obj', obj); diff --git a/src/enhanced/utils/xray/index.xray.ts b/src/enhanced/utils/xray/index.xray.ts new file mode 100644 index 000000000..2438e915e --- /dev/null +++ b/src/enhanced/utils/xray/index.xray.ts @@ -0,0 +1,3 @@ +export async function makeSegment() {} + +export async function addParentSegment() {}