Skip to content

Commit

Permalink
Merge pull request #657 from techmatters/CHI-2818-queue_transfer_capa…
Browse files Browse the repository at this point in the history
…city_workaround

CHI-2818 queue transfer capacity workaround
  • Loading branch information
stephenhand authored Jul 29, 2024
2 parents debce45 + 8296956 commit 4f8ece8
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 12 deletions.
55 changes: 45 additions & 10 deletions functions/adjustChatCapacity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,40 @@ import {
send,
functionValidator as TokenValidator,
} from '@tech-matters/serverless-helpers';
import { WorkerChannelInstance } from 'twilio/lib/rest/taskrouter/v1/workspace/worker/workerChannel';

type EnvVars = {
TWILIO_WORKSPACE_SID: string;
};

export type Body = {
workerSid?: string;
adjustment?: 'increase' | 'decrease';
adjustment?: 'increase' | 'decrease' | 'increaseUntilCapacityAvailable';
request: { cookies: {}; headers: {} };
};

const increaseChatCapacity = async (channel: WorkerChannelInstance, maxMessageCapacity: number) => {
if (channel.availableCapacityPercentage > 0) {
return {
result: { status: 412, message: 'Still have available capacity, no need to increase.' },
updatedChannel: channel,
};
}

if (!(channel.configuredCapacity < maxMessageCapacity)) {
return {
result: { status: 412, message: 'Reached the max capacity.' },
updatedChannel: channel,
};
}

const updatedChannel = await channel.update({ capacity: channel.configuredCapacity + 1 });
return {
result: { status: 200, message: 'Successfully increased channel capacity' },
updatedChannel,
};
};

export const adjustChatCapacity = async (
context: Context<EnvVars>,
body: Required<Pick<Body, 'adjustment' | 'workerSid'>>,
Expand All @@ -59,21 +82,33 @@ export const adjustChatCapacity = async (
}

const channels = await worker.workerChannels().list();
const channel = channels.find((c) => c.taskChannelUniqueName === 'chat');
let channel = channels.find((c) => c.taskChannelUniqueName === 'chat');

if (!channel) return { status: 404, message: 'Could not find chat channel.' };

if (body.adjustment === 'increase') {
if (channel.availableCapacityPercentage > 0) {
return { status: 412, message: 'Still have available capacity, no need to increase.' };
}
return (await increaseChatCapacity(channel, maxMessageCapacity)).result;
}

if (!(channel.configuredCapacity < maxMessageCapacity)) {
return { status: 412, message: 'Reached the max capacity.' };
if (body.adjustment === 'increaseUntilCapacityAvailable') {
let result: Awaited<ReturnType<typeof increaseChatCapacity>>['result'] = {
status: 200,
message: '',
};
while (result.status === 200) {
// eslint-disable-next-line no-await-in-loop
({ result, updatedChannel: channel } = await increaseChatCapacity(
channel,
maxMessageCapacity,
));
}

await channel.update({ capacity: channel.configuredCapacity + 1 });
return { status: 200, message: 'Successfully increased channel capacity' };
if (
channel.configuredCapacity === maxMessageCapacity &&
channel.availableCapacityPercentage === 0
) {
return { status: 412, message: 'Reached the max capacity with no available capacity.' };
}
return { status: 200, message: 'Adjusted chat capacity until there is capacity available' };
}

if (body.adjustment === 'decrease') {
Expand Down
8 changes: 7 additions & 1 deletion functions/pullTask.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,13 @@ export const handler = TokenValidator(
).map((r) => r.sid),
);

await adjustChatCapacity(context, { workerSid, adjustment: 'increase' });
const { status } = await adjustChatCapacity(context, {
workerSid,
adjustment: 'increaseUntilCapacityAvailable',
});
if (status !== 200) {
resolve(error400('Failed to provide available chat capacity'));
}
const pullAttemptExpiry = Date.now() + PULL_ATTEMPT_TIMEOUT_MS;

// Polling is much more self contained and less messy than event driven with the backend TaskRouter API
Expand Down
41 changes: 40 additions & 1 deletion functions/taskrouterListeners/transfersListener.private.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import {
TASK_QUEUE_ENTERED,
} from '@tech-matters/serverless-helpers/taskrouter';
import type { TransferMeta, ChatTransferTaskAttributes } from '../transfer/helpers.private';
import { AdjustChatCapacityType } from '../adjustChatCapacity';

export const eventTypes: EventType[] = [
RESERVATION_ACCEPTED,
Expand Down Expand Up @@ -154,6 +155,37 @@ const updateWarmVoiceTransferAttributes = async (
*/
export const shouldHandle = (event: EventFields) => eventTypes.includes(event.EventType);

const decreaseChatCapacity = async (context: Context<EnvVars>, taskSid: string) => {
const serviceConfig = await context.getTwilioClient().flexApi.configuration.get().fetch();
const {
feature_flags: { enable_backend_manual_pulling: enableBackendManualPulling },
} = serviceConfig.attributes;
if (enableBackendManualPulling) {
const task = await context
.getTwilioClient()
.taskrouter.workspaces(context.TWILIO_WORKSPACE_SID)
.tasks(taskSid);
const reservations = await task.reservations.list();
const workerSid = reservations.find((r) => r.reservationStatus === 'accepted')?.workerSid;

if (!workerSid) {
console.warn(`No worker found for task ${taskSid} to decrease chat capacity.`);
return;
}

const { path } = Runtime.getFunctions().adjustChatCapacity;
// eslint-disable-next-line global-require,import/no-dynamic-require,prefer-destructuring
const adjustChatCapacity: AdjustChatCapacityType = require(path).adjustChatCapacity;

const body = {
workerSid,
adjustment: 'decrease',
} as const;

await adjustChatCapacity(context, body);
}
};

export const handleEvent = async (context: Context<EnvVars>, event: EventFields) => {
try {
const {
Expand All @@ -178,8 +210,11 @@ export const handleEvent = async (context: Context<EnvVars>, event: EventFields)
console.log('Handling chat transfer accepted...');

const { originalTask: originalTaskSid } = taskAttributes.transferMeta;
const client = context.getTwilioClient();

// We need to decrease chat capacity before completing the task, it until the task completed event introduces a race condition
// The worker can still be offered another task before capacity is reduced if we don't do it now
await decreaseChatCapacity(context, originalTaskSid);
const client = context.getTwilioClient();
await client.taskrouter
.workspaces(context.TWILIO_WORKSPACE_SID)
.tasks(originalTaskSid)
Expand Down Expand Up @@ -215,6 +250,10 @@ export const handleEvent = async (context: Context<EnvVars>, event: EventFields)
console.log('Handling chat transfer to queue entering target queue...');

const { originalTask: originalTaskSid } = taskAttributes.transferMeta;

// We need to decrease chat capacity before completing the task, it until the task completed event introduces a race condition
// The worker can still be offered another task before capacity is reduced if we don't do it now
await decreaseChatCapacity(context, originalTaskSid);
const client = context.getTwilioClient();

await client.taskrouter
Expand Down
9 changes: 9 additions & 0 deletions tests/taskrouterListeners/transfersListener.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,15 @@ const context = {
throw new Error('Workspace does not exists');
},
},
flexApi: {
configuration: {
get: () => ({
fetch: async () => ({
attributes: { feature_flags: {} },
}),
}),
},
},
}),
TWILIO_WORKSPACE_SID: 'WSxxx',
};
Expand Down

0 comments on commit 4f8ece8

Please sign in to comment.