From b3d5736a10190637625f753402315cea47892cb3 Mon Sep 17 00:00:00 2001 From: Romain Bioteau Date: Mon, 9 Sep 2024 15:25:06 +0200 Subject: [PATCH] feat(connector): Force core and max pool size to 1 for connector executor (#3138) --- .../search/SearchActivityInstanceIT.java | 39 +++---- .../search/SearchProcessInstanceIT.java | 4 +- .../bonitasoft/engine/test/APITestUtil.java | 12 +- .../service/impl/SpringBeanAccessor.java | 6 + .../src/main/resources/bonita-community.xml | 13 --- .../bonita-tenant-community.properties | 3 - .../BonitaConnectorExecutorFactory.java | 22 ++++ .../connector/impl/ConnectorExecutorImpl.java | 109 +++++------------- .../ConnectorSingleThreadExecutorFactory.java | 62 ++++++++++ .../impl/ConnectorExecutorImplIT.java | 8 +- .../impl/ConnectorExecutorImplTest.java | 10 +- ...ry.java => BonitaWorkExecutorFactory.java} | 2 +- .../DefaultBonitaExecutorServiceFactory.java | 8 +- ... WorkSingleThreadPoolExecutorFactory.java} | 6 +- ...faultBonitaExecutorServiceFactoryTest.java | 6 +- .../DefaultBonitaExecutorServiceTest.java | 2 +- 16 files changed, 175 insertions(+), 137 deletions(-) create mode 100644 services/bonita-connector-executor/src/main/java/org/bonitasoft/engine/connector/BonitaConnectorExecutorFactory.java create mode 100644 services/bonita-connector-executor/src/main/java/org/bonitasoft/engine/connector/impl/ConnectorSingleThreadExecutorFactory.java rename services/bonita-work/src/main/java/org/bonitasoft/engine/work/{BonitaThreadPoolExecutorFactory.java => BonitaWorkExecutorFactory.java} (94%) rename services/bonita-work/src/main/java/org/bonitasoft/engine/work/{SingleThreadPoolExecutorFactory.java => WorkSingleThreadPoolExecutorFactory.java} (92%) diff --git a/bonita-integration-tests/bonita-integration-tests-client/src/test/java/org/bonitasoft/engine/search/SearchActivityInstanceIT.java b/bonita-integration-tests/bonita-integration-tests-client/src/test/java/org/bonitasoft/engine/search/SearchActivityInstanceIT.java index 48653614292..5979dbe4bce 100644 --- a/bonita-integration-tests/bonita-integration-tests-client/src/test/java/org/bonitasoft/engine/search/SearchActivityInstanceIT.java +++ b/bonita-integration-tests/bonita-integration-tests-client/src/test/java/org/bonitasoft/engine/search/SearchActivityInstanceIT.java @@ -31,24 +31,7 @@ import org.bonitasoft.engine.bpm.bar.BusinessArchive; import org.bonitasoft.engine.bpm.bar.BusinessArchiveBuilder; import org.bonitasoft.engine.bpm.connector.ConnectorEvent; -import org.bonitasoft.engine.bpm.flownode.ActivityInstance; -import org.bonitasoft.engine.bpm.flownode.ActivityInstanceCriterion; -import org.bonitasoft.engine.bpm.flownode.ActivityInstanceSearchDescriptor; -import org.bonitasoft.engine.bpm.flownode.ActivityStates; -import org.bonitasoft.engine.bpm.flownode.ArchivedActivityInstance; -import org.bonitasoft.engine.bpm.flownode.ArchivedActivityInstanceSearchDescriptor; -import org.bonitasoft.engine.bpm.flownode.ArchivedAutomaticTaskInstance; -import org.bonitasoft.engine.bpm.flownode.ArchivedFlowNodeInstance; -import org.bonitasoft.engine.bpm.flownode.ArchivedFlowNodeInstanceSearchDescriptor; -import org.bonitasoft.engine.bpm.flownode.ArchivedHumanTaskInstance; -import org.bonitasoft.engine.bpm.flownode.ArchivedHumanTaskInstanceSearchDescriptor; -import org.bonitasoft.engine.bpm.flownode.ArchivedManualTaskInstance; -import org.bonitasoft.engine.bpm.flownode.ArchivedUserTaskInstance; -import org.bonitasoft.engine.bpm.flownode.FlowNodeInstance; -import org.bonitasoft.engine.bpm.flownode.FlowNodeType; -import org.bonitasoft.engine.bpm.flownode.HumanTaskInstance; -import org.bonitasoft.engine.bpm.flownode.HumanTaskInstanceSearchDescriptor; -import org.bonitasoft.engine.bpm.flownode.TaskPriority; +import org.bonitasoft.engine.bpm.flownode.*; import org.bonitasoft.engine.bpm.process.DesignProcessDefinition; import org.bonitasoft.engine.bpm.process.ProcessDefinition; import org.bonitasoft.engine.bpm.process.ProcessInstance; @@ -66,6 +49,7 @@ import org.bonitasoft.engine.identity.Role; import org.bonitasoft.engine.identity.User; import org.bonitasoft.engine.test.BuildTestUtil; +import org.bonitasoft.engine.test.TestStates; import org.bonitasoft.engine.test.check.CheckNbOfActivities; import org.junit.Test; @@ -357,8 +341,8 @@ public void searchAssignedAndPendingHumanTaskInstances() throws Exception { ProcessDefinition p1 = deployAndEnableProcessWithActor(bar1, "a", user); ProcessDefinition p2 = deployAndEnableProcessWithActor(bar2, "a", user); - getProcessAPI().startProcess(p1.getId()); - getProcessAPI().startProcess(p2.getId()); + var instance1 = getProcessAPI().startProcess(p1.getId()); + var instance2 = getProcessAPI().startProcess(p2.getId()); waitForUserTask("p1task1"); long p1task2 = waitForUserTask("p1task2"); long p1task3 = waitForUserTask("p1task3"); @@ -388,6 +372,16 @@ public void searchAssignedAndPendingHumanTaskInstances() throws Exception { .filter("name", "p1task3").done()); assertThat(searchResult).matches(haveTasks(), toDescription(searchResult)); + try { + waitForTaskInState(instance1, "p1task3", TestStates.NORMAL_FINAL); + } catch (ActivityInstanceNotFoundException e) { + // ignore it, instance is already completed + } + try { + waitForTaskInState(instance2, "p2task3", TestStates.NORMAL_FINAL); + } catch (ActivityInstanceNotFoundException e) { + // ignore it, instance is already completed + } disableAndDeleteProcess(p1, p2); } @@ -1267,6 +1261,11 @@ public void searchPendingHumanTasksAssignedToUser() throws Exception { assertThat(searchHumanTaskInstancesCountOnly.getResult()).describedAs("Human tasks").isEmpty(); // -------- tear down + try { + waitForTaskInState(processInstance, "userTask6_assigned_to_John_long_execution", TestStates.NORMAL_FINAL); + } catch (ActivityInstanceNotFoundException e) { + // ignore it, activity is already finished + } disableAndDeleteProcess(processDefinition); deleteUser(john); } diff --git a/bonita-integration-tests/bonita-integration-tests-client/src/test/java/org/bonitasoft/engine/search/SearchProcessInstanceIT.java b/bonita-integration-tests/bonita-integration-tests-client/src/test/java/org/bonitasoft/engine/search/SearchProcessInstanceIT.java index c549f5614a3..3615bab72cd 100644 --- a/bonita-integration-tests/bonita-integration-tests-client/src/test/java/org/bonitasoft/engine/search/SearchProcessInstanceIT.java +++ b/bonita-integration-tests/bonita-integration-tests-client/src/test/java/org/bonitasoft/engine/search/SearchProcessInstanceIT.java @@ -144,13 +144,13 @@ public void searchFailedProcessInstances() throws Exception { TestConnectorThatThrowException.class, "TestConnectorThatThrowException.jar"); final ProcessInstance instance1 = getProcessAPI().startProcess(processDefinitionWithFailedConnector.getId()); - waitForProcessToBeInState(instance1, ProcessInstanceState.ERROR); + waitForProcessToBeInState(instance1.getId(), ProcessInstanceState.ERROR); final ProcessInstance instance2 = getProcessAPI().startProcess(processDefinitionWithFailedTask.getId()); waitForFlowNodeInFailedState(instance2); final ProcessInstance instance3 = getProcessAPI().startProcess(processDefinitionWithFailedConnector.getId()); - waitForProcessToBeInState(instance3, ProcessInstanceState.ERROR); + waitForProcessToBeInState(instance3.getId(), ProcessInstanceState.ERROR); // search and check result ASC final SearchOptionsBuilder searchOptions1 = BuildTestUtil.buildSearchOptions(0, 2, diff --git a/bonita-integration-tests/bonita-test-utils/src/main/java/org/bonitasoft/engine/test/APITestUtil.java b/bonita-integration-tests/bonita-test-utils/src/main/java/org/bonitasoft/engine/test/APITestUtil.java index dc916d3b6f7..dabcae63dbe 100644 --- a/bonita-integration-tests/bonita-test-utils/src/main/java/org/bonitasoft/engine/test/APITestUtil.java +++ b/bonita-integration-tests/bonita-test-utils/src/main/java/org/bonitasoft/engine/test/APITestUtil.java @@ -1036,9 +1036,15 @@ public void waitForProcessToBeInState(final ProcessInstance processInstance, fin public void waitForProcessToBeInState(final long processInstanceId, final ProcessInstanceState state) throws Exception { + waitForProcessToBeInState(processInstanceId, state, DEFAULT_TIMEOUT); + } + + public void waitForProcessToBeInState(final long processInstanceId, final ProcessInstanceState state, + final int timeoutInMillis) + throws Exception { ClientEventUtil.executeWaitServerCommand(getCommandAPI(), ClientEventUtil.getProcessInstanceInState(processInstanceId, state.getId()), - DEFAULT_TIMEOUT); + timeoutInMillis); } public void waitForInitializingProcess() throws Exception { @@ -1049,7 +1055,7 @@ public void waitForInitializingProcess() throws Exception { private Long waitForFlowNode(final long processInstanceId, final TestStates state, final String flowNodeName, final boolean useRootProcessInstance, - final int timeout) throws Exception { + final int timeoutInMillis) throws Exception { Map params; if (useRootProcessInstance) { params = ClientEventUtil.getFlowNodeInState(processInstanceId, state.getStateName(), flowNodeName); @@ -1057,7 +1063,7 @@ private Long waitForFlowNode(final long processInstanceId, final TestStates stat params = ClientEventUtil.getFlowNodeInStateWithParentId(processInstanceId, state.getStateName(), flowNodeName); } - return ClientEventUtil.executeWaitServerCommand(getCommandAPI(), params, timeout); + return ClientEventUtil.executeWaitServerCommand(getCommandAPI(), params, timeoutInMillis); } public FlowNodeInstance waitForFlowNodeInReadyState(final ProcessInstance processInstance, diff --git a/bpm/bonita-core/bonita-process-engine/src/main/java/org/bonitasoft/engine/service/impl/SpringBeanAccessor.java b/bpm/bonita-core/bonita-process-engine/src/main/java/org/bonitasoft/engine/service/impl/SpringBeanAccessor.java index 03f1cd58cfa..1cb434be4cc 100644 --- a/bpm/bonita-core/bonita-process-engine/src/main/java/org/bonitasoft/engine/service/impl/SpringBeanAccessor.java +++ b/bpm/bonita-core/bonita-process-engine/src/main/java/org/bonitasoft/engine/service/impl/SpringBeanAccessor.java @@ -45,6 +45,9 @@ public class SpringBeanAccessor { private static final String WORK_CORE_POOL_SIZE = "bonita.tenant.work.corePoolSize"; private static final String WORK_MAX_POOL_SIZE = "bonita.tenant.work.maximumPoolSize"; private static final String WORK_KEEP_ALIVE_IN_SECONDS = "bonita.tenant.work.keepAliveTimeSeconds"; + private static final String CONNECTOR_CORE_POOL_SIZE = "bonita.tenant.connector.corePoolSize"; + private static final String CONNECTOR_MAX_POOL_SIZE = "bonita.tenant.connector.maximumPoolSize"; + private static final String CONNECTOR_KEEP_ALIVE_IN_SECONDS = "bonita.tenant.connector.keepAliveTimeSeconds"; private BonitaSpringContext context; @@ -117,6 +120,9 @@ protected void warnDeprecatedProperties(MutablePropertySources propertySources) warnIfPropertyIsDeprecated(propertySources, WORK_CORE_POOL_SIZE); warnIfPropertyIsDeprecated(propertySources, WORK_MAX_POOL_SIZE); warnIfPropertyIsDeprecated(propertySources, WORK_KEEP_ALIVE_IN_SECONDS); + warnIfPropertyIsDeprecated(propertySources, CONNECTOR_CORE_POOL_SIZE); + warnIfPropertyIsDeprecated(propertySources, CONNECTOR_MAX_POOL_SIZE); + warnIfPropertyIsDeprecated(propertySources, CONNECTOR_KEEP_ALIVE_IN_SECONDS); } private void warnIfPropertyIsDeprecated(MutablePropertySources propertySources, String property) { diff --git a/bpm/bonita-core/bonita-process-engine/src/main/resources/bonita-community.xml b/bpm/bonita-core/bonita-process-engine/src/main/resources/bonita-community.xml index 2bd72f8ffef..297c5da1bdb 100644 --- a/bpm/bonita-core/bonita-process-engine/src/main/resources/bonita-community.xml +++ b/bpm/bonita-core/bonita-process-engine/src/main/resources/bonita-community.xml @@ -1528,17 +1528,6 @@ - - - - - - - - - - - @@ -1726,7 +1715,6 @@ - @@ -1976,7 +1964,6 @@ - diff --git a/bpm/bonita-core/bonita-process-engine/src/main/resources/bonita-tenant-community.properties b/bpm/bonita-core/bonita-process-engine/src/main/resources/bonita-tenant-community.properties index 2665bbd5b78..80c04ca04f7 100644 --- a/bpm/bonita-core/bonita-process-engine/src/main/resources/bonita-tenant-community.properties +++ b/bpm/bonita-core/bonita-process-engine/src/main/resources/bonita-tenant-community.properties @@ -22,9 +22,6 @@ bonita.runtime.session.duration=3600000 # Connector executor bonita.tenant.connector.queueCapacity=10000 -bonita.tenant.connector.corePoolSize=10 -bonita.tenant.connector.maximumPoolSize=10 -bonita.tenant.connector.keepAliveTimeSeconds=100 # Produce a warning log when connector took longer to execute than this value bonita.tenant.connector.warnWhenLongerThanMillis=10000 diff --git a/services/bonita-connector-executor/src/main/java/org/bonitasoft/engine/connector/BonitaConnectorExecutorFactory.java b/services/bonita-connector-executor/src/main/java/org/bonitasoft/engine/connector/BonitaConnectorExecutorFactory.java new file mode 100644 index 00000000000..fd177a98723 --- /dev/null +++ b/services/bonita-connector-executor/src/main/java/org/bonitasoft/engine/connector/BonitaConnectorExecutorFactory.java @@ -0,0 +1,22 @@ +/** + * Copyright (C) 2024 Bonitasoft S.A. + * Bonitasoft, 32 rue Gustave Eiffel - 38000 Grenoble + * This library is free software; you can redistribute it and/or modify it under the terms + * of the GNU Lesser General Public License as published by the Free Software Foundation + * version 2.1 of the License. + * This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; + * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU Lesser General Public License for more details. + * You should have received a copy of the GNU Lesser General Public License along with this + * program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth + * Floor, Boston, MA 02110-1301, USA. + **/ +package org.bonitasoft.engine.connector; + +import java.util.concurrent.ThreadPoolExecutor; + +public interface BonitaConnectorExecutorFactory { + + ThreadPoolExecutor create(); + +} diff --git a/services/bonita-connector-executor/src/main/java/org/bonitasoft/engine/connector/impl/ConnectorExecutorImpl.java b/services/bonita-connector-executor/src/main/java/org/bonitasoft/engine/connector/impl/ConnectorExecutorImpl.java index 05213aceaa4..0d051a16914 100644 --- a/services/bonita-connector-executor/src/main/java/org/bonitasoft/engine/connector/impl/ConnectorExecutorImpl.java +++ b/services/bonita-connector-executor/src/main/java/org/bonitasoft/engine/connector/impl/ConnectorExecutorImpl.java @@ -18,14 +18,9 @@ import java.util.Arrays; import java.util.Collection; import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -36,6 +31,7 @@ import io.micrometer.core.instrument.Tags; import lombok.extern.slf4j.Slf4j; import org.bonitasoft.engine.commons.exceptions.SBonitaRuntimeException; +import org.bonitasoft.engine.connector.BonitaConnectorExecutorFactory; import org.bonitasoft.engine.connector.ConnectorExecutionResult; import org.bonitasoft.engine.connector.ConnectorExecutor; import org.bonitasoft.engine.connector.SConnector; @@ -47,6 +43,9 @@ import org.bonitasoft.engine.sessionaccessor.SessionIdNotSetException; import org.bonitasoft.engine.tracking.TimeTracker; import org.bonitasoft.engine.tracking.TimeTrackerRecords; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnSingleCandidate; +import org.springframework.stereotype.Component; /** * Execute connectors directly @@ -56,72 +55,44 @@ * @author Matthieu Chaffotte */ @Slf4j +@Component +@ConditionalOnSingleCandidate(ConnectorExecutor.class) public class ConnectorExecutorImpl implements ConnectorExecutor { public static final String NUMBER_OF_CONNECTORS_PENDING = "bonita.bpmengine.connector.pending"; public static final String NUMBER_OF_CONNECTORS_RUNNING = "bonita.bpmengine.connector.running"; public static final String NUMBER_OF_CONNECTORS_EXECUTED = "bonita.bpmengine.connector.executed"; + public static final String CONNECTORS_UNIT = "connectors"; private ExecutorService executorService; - + private final BonitaConnectorExecutorFactory bonitaConnectorExecutorFactory; private final SessionAccessor sessionAccessor; - private final SessionService sessionService; - private final int queueCapacity; - - private final int corePoolSize; - - private final int maximumPoolSize; - - private final long keepAliveTimeSeconds; - private final TimeTracker timeTracker; - private MeterRegistry meterRegistry; - private long tenantId; - private ExecutorServiceMetricsProvider executorServiceMetricsProvider; + private final MeterRegistry meterRegistry; + private final long tenantId; + private final ExecutorServiceMetricsProvider executorServiceMetricsProvider; private final AtomicLong runningWorks = new AtomicLong(); private Counter executedWorkCounter; private Gauge numberOfConnectorsPending; private Gauge numberOfConnectorsRunning; - /** - * The handling of threads relies on the JVM - * The rules to create new thread are: - * - If the number of threads is less than the corePoolSize, create a new Thread to run a new task. - * - If the number of threads is equal (or greater than) the corePoolSize, put the task into the queue. - * - If the queue is full, and the number of threads is less than the maxPoolSize, create a new thread to run tasks - * in. - * - If the queue is full, and the number of threads is greater than or equal to maxPoolSize, reject the task. - * - * @param queueCapacity - * The maximum number of execution of connector to queue for each thread - * @param corePoolSize - * the number of threads to keep in the pool, even - * if they are idle, unless {@code allowCoreThreadTimeOut} is set - * @param maximumPoolSize - * the maximum number of threads to allow in the - * pool - * @param keepAliveTimeSeconds - * when the number of threads is greater than - * the core, this is the maximum time that excess idle threads - * will wait for new tasks before terminating. (in seconds) - */ - public ConnectorExecutorImpl(final int queueCapacity, final int corePoolSize, - final int maximumPoolSize, final long keepAliveTimeSeconds, final SessionAccessor sessionAccessor, - final SessionService sessionService, final TimeTracker timeTracker, final MeterRegistry meterRegistry, - long tenantId, ExecutorServiceMetricsProvider executorServiceMetricsProvider) { - this.queueCapacity = queueCapacity; - this.corePoolSize = corePoolSize; - this.maximumPoolSize = maximumPoolSize; - this.keepAliveTimeSeconds = keepAliveTimeSeconds; + public ConnectorExecutorImpl(final SessionAccessor sessionAccessor, + final SessionService sessionService, + final TimeTracker timeTracker, + final MeterRegistry meterRegistry, + @Value("${tenantId}") long tenantId, + ExecutorServiceMetricsProvider executorServiceMetricsProvider, + BonitaConnectorExecutorFactory bonitaConnectorExecutorFactory) { this.sessionAccessor = sessionAccessor; this.sessionService = sessionService; this.timeTracker = timeTracker; this.meterRegistry = meterRegistry; this.tenantId = tenantId; this.executorServiceMetricsProvider = executorServiceMetricsProvider; + this.bonitaConnectorExecutorFactory = bonitaConnectorExecutorFactory; } @Override @@ -187,7 +158,7 @@ void disconnectSilently(final SConnector sConnector) { try { sConnector.disconnect(); } catch (final Exception t) { - log.warn("An error occurred while disconnecting the connector: " + sConnector, t); + log.warn("An error occurred while disconnecting the connector: {}", sConnector, t); } } @@ -283,46 +254,26 @@ public boolean isCompleted() { } } - private final class QueueRejectedExecutionHandler implements RejectedExecutionHandler { - - public QueueRejectedExecutionHandler() { - - } - - @Override - public void rejectedExecution(final Runnable task, final ThreadPoolExecutor executor) { - log.warn("The work was rejected. Requeue work : {}", task.toString()); - try { - executor.getQueue().put(task); - } catch (final InterruptedException e) { - throw new RejectedExecutionException("Queuing " + task + " got interrupted.", e); - } - } - - } - @Override public void start() { if (executorService == null) { - final BlockingQueue workQueue = new ArrayBlockingQueue<>(queueCapacity); - final RejectedExecutionHandler handler = new QueueRejectedExecutionHandler(); - final ConnectorExecutorThreadFactory threadFactory = new ConnectorExecutorThreadFactory( - "ConnectorExecutor"); + var threadPoolExecutor = bonitaConnectorExecutorFactory.create(); executorService = executorServiceMetricsProvider .bind(meterRegistry, - new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTimeSeconds, - TimeUnit.SECONDS, - workQueue, threadFactory, handler), - "bonita-connector-executor", tenantId); + threadPoolExecutor, + "bonita-connector-executor", + tenantId); Tags tags = Tags.of("tenant", String.valueOf(tenantId)); - numberOfConnectorsPending = Gauge.builder(NUMBER_OF_CONNECTORS_PENDING, workQueue, Collection::size) - .tags(tags).baseUnit("connectors").description("Connectors pending in the execution queue") + numberOfConnectorsPending = Gauge + .builder(NUMBER_OF_CONNECTORS_PENDING, threadPoolExecutor.getQueue(), Collection::size) + .tags(tags).baseUnit(CONNECTORS_UNIT).description("Connectors pending in the execution queue") .register(meterRegistry); numberOfConnectorsRunning = Gauge.builder(NUMBER_OF_CONNECTORS_RUNNING, runningWorks, AtomicLong::get) - .tags(tags).baseUnit("connectors").description("Connectors currently executing") + .tags(tags).baseUnit(CONNECTORS_UNIT).description("Connectors currently executing") .register(meterRegistry); executedWorkCounter = Counter.builder(NUMBER_OF_CONNECTORS_EXECUTED) - .tags(tags).baseUnit("connectors").description("Total connectors executed since last server start") + .tags(tags).baseUnit(CONNECTORS_UNIT) + .description("Total connectors executed since last server start") .register(meterRegistry); } } diff --git a/services/bonita-connector-executor/src/main/java/org/bonitasoft/engine/connector/impl/ConnectorSingleThreadExecutorFactory.java b/services/bonita-connector-executor/src/main/java/org/bonitasoft/engine/connector/impl/ConnectorSingleThreadExecutorFactory.java new file mode 100644 index 00000000000..192ee9dd664 --- /dev/null +++ b/services/bonita-connector-executor/src/main/java/org/bonitasoft/engine/connector/impl/ConnectorSingleThreadExecutorFactory.java @@ -0,0 +1,62 @@ +/** + * Copyright (C) 2024 Bonitasoft S.A. + * Bonitasoft, 32 rue Gustave Eiffel - 38000 Grenoble + * This library is free software; you can redistribute it and/or modify it under the terms + * of the GNU Lesser General Public License as published by the Free Software Foundation + * version 2.1 of the License. + * This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; + * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU Lesser General Public License for more details. + * You should have received a copy of the GNU Lesser General Public License along with this + * program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth + * Floor, Boston, MA 02110-1301, USA. + **/ +package org.bonitasoft.engine.connector.impl; + +import static java.util.concurrent.TimeUnit.*; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; + +import org.bonitasoft.engine.connector.BonitaConnectorExecutorFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnSingleCandidate; +import org.springframework.stereotype.Component; + +@Component +@ConditionalOnSingleCandidate(BonitaConnectorExecutorFactory.class) +public class ConnectorSingleThreadExecutorFactory implements BonitaConnectorExecutorFactory { + + private final int queueCapacity; + + public ConnectorSingleThreadExecutorFactory(@Value("${bonita.tenant.connector.queueCapacity}") int queueCapacity) { + this.queueCapacity = queueCapacity; + } + + @Override + public ThreadPoolExecutor create() { + return new ThreadPoolExecutor(1, 1, 0L, MILLISECONDS, + new ArrayBlockingQueue<>(queueCapacity), new ConnectorExecutorThreadFactory("ConnectorExecutor"), + new QueueRejectedExecutionHandler()); + } + + public static class QueueRejectedExecutionHandler implements RejectedExecutionHandler { + + private static final Logger log = LoggerFactory.getLogger(QueueRejectedExecutionHandler.class); + + @Override + public void rejectedExecution(final Runnable task, final ThreadPoolExecutor executor) { + log.warn("The work was rejected. Requeue work : {}", task); + try { + executor.getQueue().put(task); + } catch (final InterruptedException e) { + throw new RejectedExecutionException("Queuing " + task + " got interrupted.", e); + } + } + + } +} diff --git a/services/bonita-connector-executor/src/test/java/org/bonitasoft/engine/connector/impl/ConnectorExecutorImplIT.java b/services/bonita-connector-executor/src/test/java/org/bonitasoft/engine/connector/impl/ConnectorExecutorImplIT.java index b6d127ce586..0d3d36a1c4c 100644 --- a/services/bonita-connector-executor/src/test/java/org/bonitasoft/engine/connector/impl/ConnectorExecutorImplIT.java +++ b/services/bonita-connector-executor/src/test/java/org/bonitasoft/engine/connector/impl/ConnectorExecutorImplIT.java @@ -56,9 +56,13 @@ public class ConnectorExecutorImplIT { @Before public void setUp() { - connectorExecutor = new ConnectorExecutorImpl(10, 5, 100, 100, sessionAccessor, + connectorExecutor = new ConnectorExecutorImpl(sessionAccessor, sessionService, - timeTracker, new SimpleMeterRegistry(), 12L, new DefaultExecutorServiceMetricsProvider()); + timeTracker, + new SimpleMeterRegistry(), + 12L, + new DefaultExecutorServiceMetricsProvider(), + new ConnectorSingleThreadExecutorFactory(10)); connectorExecutor.start(); } diff --git a/services/bonita-connector-executor/src/test/java/org/bonitasoft/engine/connector/impl/ConnectorExecutorImplTest.java b/services/bonita-connector-executor/src/test/java/org/bonitasoft/engine/connector/impl/ConnectorExecutorImplTest.java index acf10b714b3..289981ad6ff 100644 --- a/services/bonita-connector-executor/src/test/java/org/bonitasoft/engine/connector/impl/ConnectorExecutorImplTest.java +++ b/services/bonita-connector-executor/src/test/java/org/bonitasoft/engine/connector/impl/ConnectorExecutorImplTest.java @@ -15,7 +15,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.verify; import java.time.Duration; import java.util.Collections; @@ -72,9 +73,12 @@ public void before() { // So that micrometer updates its counters every 1 ms: k -> k.equals("simple.step") ? Duration.ofMillis(1).toString() : null, Clock.SYSTEM); - connectorExecutorImpl = new ConnectorExecutorImpl(1, 1, 1, 1, sessionAccessor, sessionService, + connectorExecutorImpl = new ConnectorExecutorImpl(sessionAccessor, sessionService, timeTracker, - meterRegistry, TENANT_ID, new DefaultExecutorServiceMetricsProvider()); + meterRegistry, + TENANT_ID, + new DefaultExecutorServiceMetricsProvider(), + new ConnectorSingleThreadExecutorFactory(1)); connectorExecutorImpl.start(); } diff --git a/services/bonita-work/src/main/java/org/bonitasoft/engine/work/BonitaThreadPoolExecutorFactory.java b/services/bonita-work/src/main/java/org/bonitasoft/engine/work/BonitaWorkExecutorFactory.java similarity index 94% rename from services/bonita-work/src/main/java/org/bonitasoft/engine/work/BonitaThreadPoolExecutorFactory.java rename to services/bonita-work/src/main/java/org/bonitasoft/engine/work/BonitaWorkExecutorFactory.java index e6bb373e13d..6b3f4148d60 100644 --- a/services/bonita-work/src/main/java/org/bonitasoft/engine/work/BonitaThreadPoolExecutorFactory.java +++ b/services/bonita-work/src/main/java/org/bonitasoft/engine/work/BonitaWorkExecutorFactory.java @@ -15,7 +15,7 @@ import java.util.concurrent.ThreadPoolExecutor; -public interface BonitaThreadPoolExecutorFactory { +public interface BonitaWorkExecutorFactory { ThreadPoolExecutor create(); diff --git a/services/bonita-work/src/main/java/org/bonitasoft/engine/work/DefaultBonitaExecutorServiceFactory.java b/services/bonita-work/src/main/java/org/bonitasoft/engine/work/DefaultBonitaExecutorServiceFactory.java index 05ee66479a6..7be3bea0385 100644 --- a/services/bonita-work/src/main/java/org/bonitasoft/engine/work/DefaultBonitaExecutorServiceFactory.java +++ b/services/bonita-work/src/main/java/org/bonitasoft/engine/work/DefaultBonitaExecutorServiceFactory.java @@ -46,7 +46,7 @@ public class DefaultBonitaExecutorServiceFactory implements BonitaExecutorServic private final long tenantId; private final MeterRegistry meterRegistry; private final ExecutorServiceMetricsProvider executorServiceMetricsProvider; - private final BonitaThreadPoolExecutorFactory bonitaThreadPoolExecutorFactory; + private final BonitaWorkExecutorFactory bonitaWorkExecutorFactory; private final EngineClock engineClock; private final WorkFactory workFactory; private final WorkExecutionAuditor workExecutionAuditor; @@ -57,19 +57,19 @@ public DefaultBonitaExecutorServiceFactory(@Value("${tenantId}") long tenantId, WorkFactory workFactory, WorkExecutionAuditor workExecutionAuditor, ExecutorServiceMetricsProvider executorServiceMetricsProvider, - BonitaThreadPoolExecutorFactory bonitaThreadPoolExecutorFactory) { + BonitaWorkExecutorFactory bonitaWorkExecutorFactory) { this.tenantId = tenantId; this.meterRegistry = meterRegistry; this.workFactory = workFactory; this.workExecutionAuditor = workExecutionAuditor; this.engineClock = engineClock; this.executorServiceMetricsProvider = executorServiceMetricsProvider; - this.bonitaThreadPoolExecutorFactory = bonitaThreadPoolExecutorFactory; + this.bonitaWorkExecutorFactory = bonitaWorkExecutorFactory; } @Override public BonitaExecutorService createExecutorService(WorkExecutionCallback workExecutionCallback) { - final ThreadPoolExecutor bonitaThreadPoolExecutor = bonitaThreadPoolExecutorFactory.create(); + final ThreadPoolExecutor bonitaThreadPoolExecutor = bonitaWorkExecutorFactory.create(); final BonitaExecutorService bonitaExecutorService = new DefaultBonitaExecutorService(bonitaThreadPoolExecutor, workFactory, engineClock, diff --git a/services/bonita-work/src/main/java/org/bonitasoft/engine/work/SingleThreadPoolExecutorFactory.java b/services/bonita-work/src/main/java/org/bonitasoft/engine/work/WorkSingleThreadPoolExecutorFactory.java similarity index 92% rename from services/bonita-work/src/main/java/org/bonitasoft/engine/work/SingleThreadPoolExecutorFactory.java rename to services/bonita-work/src/main/java/org/bonitasoft/engine/work/WorkSingleThreadPoolExecutorFactory.java index 7a1a05b670b..49028ef142b 100644 --- a/services/bonita-work/src/main/java/org/bonitasoft/engine/work/SingleThreadPoolExecutorFactory.java +++ b/services/bonita-work/src/main/java/org/bonitasoft/engine/work/WorkSingleThreadPoolExecutorFactory.java @@ -22,13 +22,13 @@ import org.springframework.stereotype.Component; @Component -@ConditionalOnSingleCandidate(BonitaThreadPoolExecutorFactory.class) -public class SingleThreadPoolExecutorFactory implements BonitaThreadPoolExecutorFactory { +@ConditionalOnSingleCandidate(BonitaWorkExecutorFactory.class) +public class WorkSingleThreadPoolExecutorFactory implements BonitaWorkExecutorFactory { private final int queueCapacity; private final long tenantId; - public SingleThreadPoolExecutorFactory(@Value("${tenantId}") long tenantId, + public WorkSingleThreadPoolExecutorFactory(@Value("${tenantId}") long tenantId, @Value("${bonita.tenant.work.queueCapacity}") int queueCapacity) { this.queueCapacity = queueCapacity; this.tenantId = tenantId; diff --git a/services/bonita-work/src/test/java/org/bonitasoft/engine/work/DefaultBonitaExecutorServiceFactoryTest.java b/services/bonita-work/src/test/java/org/bonitasoft/engine/work/DefaultBonitaExecutorServiceFactoryTest.java index 509a33d9a7b..4239d442b80 100644 --- a/services/bonita-work/src/test/java/org/bonitasoft/engine/work/DefaultBonitaExecutorServiceFactoryTest.java +++ b/services/bonita-work/src/test/java/org/bonitasoft/engine/work/DefaultBonitaExecutorServiceFactoryTest.java @@ -43,7 +43,7 @@ public void threadNameInExecutorService_should_contain_tenantId() { workFactory, mock(WorkExecutionAuditor.class), new DefaultExecutorServiceMetricsProvider(), - new SingleThreadPoolExecutorFactory(tenantId, 10)); + new WorkSingleThreadPoolExecutorFactory(tenantId, 10)); BonitaExecutorService createExecutorService = defaultBonitaExecutorServiceFactory .createExecutorService(workExecutionCallback); @@ -66,7 +66,7 @@ public void createExecutorService_should_register_ExecutorServiceMetrics() { workFactory, mock(WorkExecutionAuditor.class), new DefaultExecutorServiceMetricsProvider(), - new SingleThreadPoolExecutorFactory(tenantId, 10)); + new WorkSingleThreadPoolExecutorFactory(tenantId, 10)); // when: defaultBonitaExecutorServiceFactory.createExecutorService(workExecutionCallback); @@ -91,7 +91,7 @@ public void should_not_have_metrics_when_unbind_is_called() { workFactory, mock(WorkExecutionAuditor.class), new DefaultExecutorServiceMetricsProvider(), - new SingleThreadPoolExecutorFactory(tenantId, 10)); + new WorkSingleThreadPoolExecutorFactory(tenantId, 10)); // when: defaultBonitaExecutorServiceFactory.createExecutorService(workExecutionCallback); diff --git a/services/bonita-work/src/test/java/org/bonitasoft/engine/work/DefaultBonitaExecutorServiceTest.java b/services/bonita-work/src/test/java/org/bonitasoft/engine/work/DefaultBonitaExecutorServiceTest.java index dda5e76f108..be1680f4e32 100644 --- a/services/bonita-work/src/test/java/org/bonitasoft/engine/work/DefaultBonitaExecutorServiceTest.java +++ b/services/bonita-work/src/test/java/org/bonitasoft/engine/work/DefaultBonitaExecutorServiceTest.java @@ -56,7 +56,7 @@ public class DefaultBonitaExecutorServiceTest { @Before public void before() { - var threadPoolExecutor = new SingleThreadPoolExecutorFactory.SingleThreadPoolExecutor( + var threadPoolExecutor = new WorkSingleThreadPoolExecutorFactory.SingleThreadPoolExecutor( new LinkedBlockingQueue<>(10), new WorkerThreadFactory("test-worker", 1, 1)); bonitaExecutorService = new DefaultBonitaExecutorService(threadPoolExecutor, workFactory, engineClock,