Skip to content

Commit

Permalink
Refactor xray
Browse files Browse the repository at this point in the history
  • Loading branch information
branberry committed Nov 15, 2023
1 parent 5a42cae commit aed3658
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 30 deletions.
12 changes: 2 additions & 10 deletions src/enhanced/enhancedApp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import c from 'config';
import { handleJob } from './utils/job';
import { listenToJobQueue } from './utils/queue';
AWSXRay.config([AWSXRay.plugins.ECSPlugin]);
const xrayNamespace = AWSXRay.getNamespace();

let client: MongoClient | undefined;

Expand Down Expand Up @@ -59,25 +58,18 @@ async function app(): Promise<void> {
console.log('[app]: starting application');

try {
const {
payload: { jobId },
segment,
} = await listenToJobQueue();
const { jobId } = await listenToJobQueue();
const db = await connectToDb();

await handleJobAndCleanUp(jobId, db);

console.log('[app]: process completed');
segment?.close();
} catch (e) {
console.error('[app]: ERROR! Job initialization failed', e);
process.exitCode = 1;
}
}

xrayNamespace.runPromise(async () => {
await app();
});
app();

process.on('SIGTERM', async () => {
await cleanupJob();
Expand Down
36 changes: 16 additions & 20 deletions src/enhanced/utils/queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { ReceiveMessageCommandInput, SQS } from '@aws-sdk/client-sqs';
import crypto from 'crypto';
import config from 'config';
import dgram from 'dgram';
import AWSXRay from 'aws-xray-sdk-core';

import { JobsQueuePayload } from '../../types/job-types';
import { isJobQueuePayload } from '../../types/utils/type-guards';
import { protectTask } from '../job';
Expand All @@ -13,7 +13,7 @@ const client = new SQS({ region: 'us-east-2' });
* This function listens to the job queue until a message is received.
* @returns {Promise<JobsQueuePayload>} the promise for the payload object after a message has been received
*/
export async function listenToJobQueue(): Promise<{ payload: JobsQueuePayload; segment: AWSXRay.Segment | undefined }> {
export async function listenToJobQueue(): Promise<JobsQueuePayload> {
const queueUrl = config.get<string>('jobsQueueUrl');

console.log('[listenToJobQueue]: Polling jobsQueue');
Expand Down Expand Up @@ -80,26 +80,22 @@ export async function listenToJobQueue(): Promise<{ payload: JobsQueuePayload; s
console.log('MessageHeaderAttributes', message.Attributes, message.MessageAttributes);

const xrayTraceId = message.Attributes?.['AWSTraceHeader'];
let segment: AWSXRay.Segment | undefined;
if (xrayTraceId) {
console.log('Xray trace id: ', xrayTraceId);
// const startTime = (Date.now() / 1000).toFixed(3);
const startTime = (Date.now() / 1000).toFixed(3);
const traceId = xrayTraceId.split(';')[0].split('=')[1];
const parentSegmentId = xrayTraceId.split(';')[1].split('=')[1];
// const segmentId = crypto.randomBytes(8).toString('hex');

// const newSegment = {
// name: 'Autobuilder',
// id: segmentId,
// trace_id: traceId,
// parent_id: parentSegmentId,
// start_time: Number(startTime),
// in_progress: true,
// };
// sendUdpMessage(newSegment);

segment = new AWSXRay.Segment('Autobuilder', traceId, parentSegmentId);
AWSXRay.setSegment(segment);
const segmentId = crypto.randomBytes(8).toString('hex');

const newSegment = {
name: 'Autobuilder',
id: segmentId,
trace_id: traceId,
parent_id: parentSegmentId,
start_time: Number(startTime),
in_progress: true,
};
sendUdpMessage(newSegment);
} else {
console.log('no trace id found');
}
Expand All @@ -119,11 +115,11 @@ export async function listenToJobQueue(): Promise<{ payload: JobsQueuePayload; s

// Great! we received a proper message from the queue. Return this object as we will no longer
// want to poll for more messages.
return { payload, segment };
return { payload };
}
}

async function sendUdpMessage(obj: unknown) {
export async function sendUdpMessage(obj: unknown) {
const client = dgram.createSocket('udp4');

console.log('obj', obj);
Expand Down
3 changes: 3 additions & 0 deletions src/enhanced/utils/xray/index.xray.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export async function makeSegment() {}

Check failure on line 1 in src/enhanced/utils/xray/index.xray.ts

View workflow job for this annotation

GitHub Actions / test

Unexpected empty async function 'makeSegment'

export async function addParentSegment() {}

Check failure on line 3 in src/enhanced/utils/xray/index.xray.ts

View workflow job for this annotation

GitHub Actions / test

Unexpected empty async function 'addParentSegment'

0 comments on commit aed3658

Please sign in to comment.