From f218b092b4d610ed1cdbc0dd0d2fd521606dbd8a Mon Sep 17 00:00:00 2001 From: Stephen Hand Date: Thu, 25 Jul 2024 10:50:47 +0100 Subject: [PATCH 1/6] Queue Transfer Capacity workaround --- .../transfersListener.private.ts | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/functions/taskrouterListeners/transfersListener.private.ts b/functions/taskrouterListeners/transfersListener.private.ts index 751e7401..419325a0 100644 --- a/functions/taskrouterListeners/transfersListener.private.ts +++ b/functions/taskrouterListeners/transfersListener.private.ts @@ -29,6 +29,7 @@ import { TASK_QUEUE_ENTERED, } from '@tech-matters/serverless-helpers/taskrouter'; import type { TransferMeta, ChatTransferTaskAttributes } from '../transfer/helpers.private'; +import { AdjustChatCapacityType } from '../adjustChatCapacity'; export const eventTypes: EventType[] = [ RESERVATION_ACCEPTED, @@ -161,6 +162,7 @@ export const handleEvent = async (context: Context, event: EventFields) TaskChannelUniqueName: taskChannelUniqueName, TaskSid: taskSid, TaskAttributes: taskAttributesString, + WorkerSid: workerSid, } = event; console.log(`===== Executing TransfersListener for event: ${eventType} =====`); @@ -217,6 +219,18 @@ export const handleEvent = async (context: Context, event: EventFields) const { originalTask: originalTaskSid } = taskAttributes.transferMeta; const client = context.getTwilioClient(); + const { path } = Runtime.getFunctions().adjustChatCapacity; + + // eslint-disable-next-line global-require,import/no-dynamic-require,prefer-destructuring + const adjustChatCapacity: AdjustChatCapacityType = require(path).adjustChatCapacity; + + const body = { + workerSid, + adjustment: 'decrease', + } as const; + + await adjustChatCapacity(context, body); + await client.taskrouter .workspaces(context.TWILIO_WORKSPACE_SID) .tasks(originalTaskSid) From ee7be6a8e287f6c97650c57f21e44194fd7c2e65 Mon Sep 17 00:00:00 2001 From: Stephen Hand Date: Thu, 25 Jul 2024 10:54:19 +0100 Subject: [PATCH 2/6] Queue Transfer Capacity workaround --- .../transfersListener.private.ts | 32 ++++++++++++------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/functions/taskrouterListeners/transfersListener.private.ts b/functions/taskrouterListeners/transfersListener.private.ts index 419325a0..0921d63d 100644 --- a/functions/taskrouterListeners/transfersListener.private.ts +++ b/functions/taskrouterListeners/transfersListener.private.ts @@ -155,6 +155,25 @@ const updateWarmVoiceTransferAttributes = async ( */ export const shouldHandle = (event: EventFields) => eventTypes.includes(event.EventType); +const decreaseChatCapacity = async (context: Context, workerSid: string) => { + const serviceConfig = await context.getTwilioClient().flexApi.configuration.get().fetch(); + const { + feature_flags: { enable_backend_manual_pulling: enableBackendManualPulling }, + } = serviceConfig.attributes; + if (enableBackendManualPulling) { + const { path } = Runtime.getFunctions().adjustChatCapacity; + // eslint-disable-next-line global-require,import/no-dynamic-require,prefer-destructuring + const adjustChatCapacity: AdjustChatCapacityType = require(path).adjustChatCapacity; + + const body = { + workerSid, + adjustment: 'decrease', + } as const; + + await adjustChatCapacity(context, body); + } +}; + export const handleEvent = async (context: Context, event: EventFields) => { try { const { @@ -215,22 +234,11 @@ export const handleEvent = async (context: Context, event: EventFields) if (isChatTransferToQueueComplete(eventType, taskChannelUniqueName, taskAttributes)) { console.log('Handling chat transfer to queue entering target queue...'); + await decreaseChatCapacity(context, workerSid); const { originalTask: originalTaskSid } = taskAttributes.transferMeta; const client = context.getTwilioClient(); - const { path } = Runtime.getFunctions().adjustChatCapacity; - - // eslint-disable-next-line global-require,import/no-dynamic-require,prefer-destructuring - const adjustChatCapacity: AdjustChatCapacityType = require(path).adjustChatCapacity; - - const body = { - workerSid, - adjustment: 'decrease', - } as const; - - await adjustChatCapacity(context, body); - await client.taskrouter .workspaces(context.TWILIO_WORKSPACE_SID) .tasks(originalTaskSid) From f5452fd224450445418737f16d0e741dba0bc68f Mon Sep 17 00:00:00 2001 From: Stephen Hand Date: Thu, 25 Jul 2024 11:43:16 +0100 Subject: [PATCH 3/6] Queue Transfer Capacity workaround fix --- .../transfersListener.private.ts | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/functions/taskrouterListeners/transfersListener.private.ts b/functions/taskrouterListeners/transfersListener.private.ts index 0921d63d..e15cb5f9 100644 --- a/functions/taskrouterListeners/transfersListener.private.ts +++ b/functions/taskrouterListeners/transfersListener.private.ts @@ -155,12 +155,24 @@ const updateWarmVoiceTransferAttributes = async ( */ export const shouldHandle = (event: EventFields) => eventTypes.includes(event.EventType); -const decreaseChatCapacity = async (context: Context, workerSid: string) => { +const decreaseChatCapacity = async (context: Context, taskSid: string) => { const serviceConfig = await context.getTwilioClient().flexApi.configuration.get().fetch(); const { feature_flags: { enable_backend_manual_pulling: enableBackendManualPulling }, } = serviceConfig.attributes; if (enableBackendManualPulling) { + const task = await context + .getTwilioClient() + .taskrouter.workspaces(context.TWILIO_WORKSPACE_SID) + .tasks(taskSid); + const reservations = await task.reservations.list(); + const workerSid = reservations.find((r) => r.reservationStatus === 'accepted')?.workerSid; + + if (!workerSid) { + console.warn(`No worker found for task ${taskSid} to decrease chat capacity.`); + return; + } + const { path } = Runtime.getFunctions().adjustChatCapacity; // eslint-disable-next-line global-require,import/no-dynamic-require,prefer-destructuring const adjustChatCapacity: AdjustChatCapacityType = require(path).adjustChatCapacity; @@ -181,7 +193,6 @@ export const handleEvent = async (context: Context, event: EventFields) TaskChannelUniqueName: taskChannelUniqueName, TaskSid: taskSid, TaskAttributes: taskAttributesString, - WorkerSid: workerSid, } = event; console.log(`===== Executing TransfersListener for event: ${eventType} =====`); @@ -200,7 +211,6 @@ export const handleEvent = async (context: Context, event: EventFields) const { originalTask: originalTaskSid } = taskAttributes.transferMeta; const client = context.getTwilioClient(); - await client.taskrouter .workspaces(context.TWILIO_WORKSPACE_SID) .tasks(originalTaskSid) @@ -234,9 +244,9 @@ export const handleEvent = async (context: Context, event: EventFields) if (isChatTransferToQueueComplete(eventType, taskChannelUniqueName, taskAttributes)) { console.log('Handling chat transfer to queue entering target queue...'); - await decreaseChatCapacity(context, workerSid); const { originalTask: originalTaskSid } = taskAttributes.transferMeta; + await decreaseChatCapacity(context, originalTaskSid); const client = context.getTwilioClient(); await client.taskrouter From 2f0d1399c17543ed151bfb8f5bb0a2d7721b97f2 Mon Sep 17 00:00:00 2001 From: Stephen Hand Date: Thu, 25 Jul 2024 12:35:43 +0100 Subject: [PATCH 4/6] Ensure pulling a task increases capacity enough --- functions/adjustChatCapacity.ts | 55 +++++++++++++++---- functions/pullTask.ts | 8 ++- .../transfersListener.private.ts | 3 + 3 files changed, 55 insertions(+), 11 deletions(-) diff --git a/functions/adjustChatCapacity.ts b/functions/adjustChatCapacity.ts index eb977c90..d091f63f 100644 --- a/functions/adjustChatCapacity.ts +++ b/functions/adjustChatCapacity.ts @@ -24,6 +24,7 @@ import { send, functionValidator as TokenValidator, } from '@tech-matters/serverless-helpers'; +import { WorkerChannelInstance } from 'twilio/lib/rest/taskrouter/v1/workspace/worker/workerChannel'; type EnvVars = { TWILIO_WORKSPACE_SID: string; @@ -31,10 +32,32 @@ type EnvVars = { export type Body = { workerSid?: string; - adjustment?: 'increase' | 'decrease'; + adjustment?: 'increase' | 'decrease' | 'increaseUntilCapacityAvailable'; request: { cookies: {}; headers: {} }; }; +const increaseChatCapacity = async (channel: WorkerChannelInstance, maxMessageCapacity: number) => { + if (channel.availableCapacityPercentage > 0) { + return { + result: { status: 412, message: 'Still have available capacity, no need to increase.' }, + updatedChannel: channel, + }; + } + + if (!(channel.configuredCapacity < maxMessageCapacity)) { + return { + result: { status: 412, message: 'Reached the max capacity.' }, + updatedChannel: channel, + }; + } + + const updatedChannel = await channel.update({ capacity: channel.configuredCapacity + 1 }); + return { + result: { status: 200, message: 'Successfully increased channel capacity' }, + updatedChannel, + }; +}; + export const adjustChatCapacity = async ( context: Context, body: Required>, @@ -59,21 +82,33 @@ export const adjustChatCapacity = async ( } const channels = await worker.workerChannels().list(); - const channel = channels.find((c) => c.taskChannelUniqueName === 'chat'); + let channel = channels.find((c) => c.taskChannelUniqueName === 'chat'); if (!channel) return { status: 404, message: 'Could not find chat channel.' }; if (body.adjustment === 'increase') { - if (channel.availableCapacityPercentage > 0) { - return { status: 412, message: 'Still have available capacity, no need to increase.' }; - } + return (await increaseChatCapacity(channel, maxMessageCapacity)).result; + } - if (!(channel.configuredCapacity < maxMessageCapacity)) { - return { status: 412, message: 'Reached the max capacity.' }; + if (body.adjustment === 'increaseUntilCapacityAvailable') { + let result: Awaited>['result'] = { + status: 200, + message: '', + }; + while (result.status === 200) { + // eslint-disable-next-line no-await-in-loop + ({ result, updatedChannel: channel } = await increaseChatCapacity( + channel, + maxMessageCapacity, + )); } - - await channel.update({ capacity: channel.configuredCapacity + 1 }); - return { status: 200, message: 'Successfully increased channel capacity' }; + if ( + channel.configuredCapacity === maxMessageCapacity && + channel.availableCapacityPercentage === 0 + ) { + return { status: 412, message: 'Reached the max capacity with no available capacity.' }; + } + return { status: 200, message: 'Adjusted chat capacity until there is capacity available' }; } if (body.adjustment === 'decrease') { diff --git a/functions/pullTask.ts b/functions/pullTask.ts index b55ca396..d5613395 100644 --- a/functions/pullTask.ts +++ b/functions/pullTask.ts @@ -70,7 +70,13 @@ export const handler = TokenValidator( ).map((r) => r.sid), ); - await adjustChatCapacity(context, { workerSid, adjustment: 'increase' }); + const { status } = await adjustChatCapacity(context, { + workerSid, + adjustment: 'increaseUntilCapacityAvailable', + }); + if (status !== 200) { + resolve(error400('Failed to provide available chat capacity')); + } const pullAttemptExpiry = Date.now() + PULL_ATTEMPT_TIMEOUT_MS; // Polling is much more self contained and less messy than event driven with the backend TaskRouter API diff --git a/functions/taskrouterListeners/transfersListener.private.ts b/functions/taskrouterListeners/transfersListener.private.ts index e15cb5f9..8f6c1d2d 100644 --- a/functions/taskrouterListeners/transfersListener.private.ts +++ b/functions/taskrouterListeners/transfersListener.private.ts @@ -246,6 +246,9 @@ export const handleEvent = async (context: Context, event: EventFields) console.log('Handling chat transfer to queue entering target queue...'); const { originalTask: originalTaskSid } = taskAttributes.transferMeta; + + // We need to decrease chat capacity before completing the task, it until the task completed event introduces a race condition + // The worker can still be offered another task before capacity is reduced if we don't do it now await decreaseChatCapacity(context, originalTaskSid); const client = context.getTwilioClient(); From 41c27b23b72b5a8ddd599284164ae8bca37432a4 Mon Sep 17 00:00:00 2001 From: Stephen Hand Date: Thu, 25 Jul 2024 15:29:09 +0100 Subject: [PATCH 5/6] Add workaround to worker transfers as well --- functions/taskrouterListeners/transfersListener.private.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/functions/taskrouterListeners/transfersListener.private.ts b/functions/taskrouterListeners/transfersListener.private.ts index 8f6c1d2d..09b8d98f 100644 --- a/functions/taskrouterListeners/transfersListener.private.ts +++ b/functions/taskrouterListeners/transfersListener.private.ts @@ -210,6 +210,10 @@ export const handleEvent = async (context: Context, event: EventFields) console.log('Handling chat transfer accepted...'); const { originalTask: originalTaskSid } = taskAttributes.transferMeta; + + // We need to decrease chat capacity before completing the task, it until the task completed event introduces a race condition + // The worker can still be offered another task before capacity is reduced if we don't do it now + await decreaseChatCapacity(context, originalTaskSid); const client = context.getTwilioClient(); await client.taskrouter .workspaces(context.TWILIO_WORKSPACE_SID) From 8296956bffa3ae5b057129f25eb864faa2b0a460 Mon Sep 17 00:00:00 2001 From: Stephen Hand Date: Thu, 25 Jul 2024 15:57:25 +0100 Subject: [PATCH 6/6] Fix tests --- tests/taskrouterListeners/transfersListener.test.ts | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/taskrouterListeners/transfersListener.test.ts b/tests/taskrouterListeners/transfersListener.test.ts index 841eb950..545680fd 100644 --- a/tests/taskrouterListeners/transfersListener.test.ts +++ b/tests/taskrouterListeners/transfersListener.test.ts @@ -147,6 +147,15 @@ const context = { throw new Error('Workspace does not exists'); }, }, + flexApi: { + configuration: { + get: () => ({ + fetch: async () => ({ + attributes: { feature_flags: {} }, + }), + }), + }, + }, }), TWILIO_WORKSPACE_SID: 'WSxxx', };