Skip to content

Commit

Permalink
executor-spi (apache#13921)
Browse files Browse the repository at this point in the history
This PR adds a new SPI that can be extended to support new executor services.

By default 2 executors are provided:

cached, the default, which is a wrapper on top of Executors.newCachedThreadPool.
fixed, which is a wrapper on top of Executors.newFixedThreadPool.
Right now only org.apache.pinot.query.runtime.QueryRunner uses this SPI.
  • Loading branch information
gortiz authored Sep 9, 2024
1 parent f5327a5 commit 3b0d92d
Show file tree
Hide file tree
Showing 14 changed files with 267 additions and 18 deletions.
4 changes: 4 additions & 0 deletions pinot-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,10 @@
<groupId>com.google.re2j</groupId>
<artifactId>re2j</artifactId>
</dependency>
<dependency>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service</artifactId>
</dependency>

<!-- Test -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.utils;

import com.google.auto.service.AutoService;
import java.util.concurrent.Executors;
import org.apache.pinot.spi.executor.ExecutorServicePlugin;
import org.apache.pinot.spi.executor.ExecutorServiceProvider;


/**
* This is the plugin for the cached executor service.
*
* The provider included in this plugin creates cached thread pools, which are the recommended executor service for
* cases where the tasks are short-lived and not CPU bound.
*
* If that is not the case, this executor may create a large number of threads that will be competing for CPU resources,
* which may lead to performance degradation and even system instability.
* In that case {@link FixedExecutorServicePlugin} could be used, but it may need changes to the code to avoid
* deadlocks. Deployments using Java 21 or above could consider using a virtual thread executor service plugin.
*
* @see org.apache.pinot.spi.executor.ExecutorServiceUtils
*/
@AutoService(ExecutorServicePlugin.class)
public class CachedExecutorServicePlugin implements ExecutorServicePlugin {
@Override
public String id() {
return "cached";
}

@Override
public ExecutorServiceProvider provider() {
return (conf, confPrefix, baseName) -> Executors.newCachedThreadPool(new NamedThreadFactory(baseName));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.utils;

import com.google.auto.service.AutoService;
import java.util.concurrent.Executors;
import org.apache.pinot.spi.executor.ExecutorServicePlugin;
import org.apache.pinot.spi.executor.ExecutorServiceProvider;
import org.apache.pinot.spi.utils.CommonConstants;


/**
* This is the plugin for the fixed executor service.
*
* The fixed executor service plugin creates a new fixed thread pool.
* The number of threads is defined in the configuration with the {@code <prefix>.numThreads} property.
* This executor service is recommended for cases where the tasks are long-lived or CPU bound, but it may need changes
* to the code to avoid deadlocks.
*
* @see org.apache.pinot.spi.executor.ExecutorServiceUtils
*/
@AutoService(ExecutorServicePlugin.class)
public class FixedExecutorServicePlugin implements ExecutorServicePlugin {
@Override
public String id() {
return "fixed";
}

@Override
public ExecutorServiceProvider provider() {
return (conf, confPrefix, baseName) -> {
String defaultFixedThreadsStr = conf.getProperty(
CommonConstants.CONFIG_OF_EXECUTORS_FIXED_NUM_THREADS, CommonConstants.DEFAULT_EXECUTORS_FIXED_NUM_THREADS);
int defaultFixedThreads = Integer.parseInt(defaultFixedThreadsStr);
if (defaultFixedThreads < 0) {
defaultFixedThreads = Runtime.getRuntime().availableProcessors();
}
int numThreads = conf.getProperty(confPrefix + ".numThreads", defaultFixedThreads);
return Executors.newFixedThreadPool(numThreads, new NamedThreadFactory(baseName));
};
}
}
10 changes: 10 additions & 0 deletions pinot-integration-test-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,16 @@
<artifactId>pinot-minion</artifactId>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-query-runtime</artifactId>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-query-planner</artifactId>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.testng</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.pinot.query.routing.StagePlan;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.executor.ExecutorServiceUtils;
import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
import org.apache.pinot.query.runtime.operator.OpChain;
Expand All @@ -52,6 +51,7 @@
import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestUtils;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.executor.ExecutorServiceUtils;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
import org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.JoinOverFlowMode;
Expand Down Expand Up @@ -116,8 +116,9 @@ public void init(PinotConfiguration config, InstanceDataManager instanceDataMana
String joinOverflowModeStr = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_JOIN_OVERFLOW_MODE);
_joinOverflowMode = joinOverflowModeStr != null ? JoinOverFlowMode.valueOf(joinOverflowModeStr) : null;

//TODO: make this configurable
_executorService = ExecutorServiceUtils.createDefault("query-runner-on-" + port);
_executorService = ExecutorServiceUtils.create(
config, CommonConstants.Server.CONFIG_OF_QUERY_EXECUTOR_OPCHAIN_EXECUTOR, "query-runner-on-" + port,
CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_OPCHAIN_EXECUTOR);
_opChainScheduler = new OpChainSchedulerService(_executorService);
_mailboxService = new MailboxService(hostname, port, config);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,12 @@ public class QueryServerEnclosure {
private final QueryRunner _queryRunner;

public QueryServerEnclosure(MockInstanceDataManagerFactory factory) {
this(factory, Map.of());
}

public QueryServerEnclosure(MockInstanceDataManagerFactory factory, Map<String, Object> config) {
_queryRunnerPort = QueryTestUtils.getAvailablePort();
Map<String, Object> runnerConfig = new HashMap<>();
Map<String, Object> runnerConfig = new HashMap<>(config);
runnerConfig.put(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME, "Server_localhost");
runnerConfig.put(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT, _queryRunnerPort);
InstanceDataManager instanceDataManager = factory.buildInstanceDataManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.pinot.query.runtime.operator.MultiStageOperator;
import org.apache.pinot.query.runtime.operator.OpChain;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.spi.executor.ExecutorServiceUtils;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.testng.Assert;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockTestUtils;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.executor.ExecutorServiceUtils;
import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
import org.apache.pinot.query.runtime.operator.OperatorTestUtil;
import org.apache.pinot.spi.executor.ExecutorServiceUtils;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.Assert;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -85,6 +86,10 @@ public static List<GenericRow> buildRows(String tableName) {
return rows;
}

protected Map<String, Object> getConfiguration() {
return Collections.emptyMap();
}

@BeforeClass
public void setUp()
throws Exception {
Expand Down Expand Up @@ -128,10 +133,10 @@ public void setUp()
_mailboxService = new MailboxService(_reducerHostname, _reducerPort, new PinotConfiguration(reducerConfig));
_mailboxService.start();

QueryServerEnclosure server1 = new QueryServerEnclosure(factory1);
QueryServerEnclosure server1 = new QueryServerEnclosure(factory1, getConfiguration());
server1.start();
// Start server1 to ensure the next server will have a different port.
QueryServerEnclosure server2 = new QueryServerEnclosure(factory2);
QueryServerEnclosure server2 = new QueryServerEnclosure(factory2, getConfiguration());
server2.start();
// this doesn't test the QueryServer functionality so the server port can be the same as the mailbox port.
// this is only use for test identifier purpose.
Expand Down Expand Up @@ -203,7 +208,7 @@ public void testSqlWithExceptionMsgChecker(String sql, String exceptionMsg) {
}

@DataProvider(name = "testDataWithSqlToFinalRowCount")
private Object[][] provideTestSqlAndRowCount() {
protected Object[][] provideTestSqlAndRowCount() {
//@formatter:off
return new Object[][]{
// special hint test, the table is not actually partitioned by col1, thus this hint gives wrong result. but
Expand Down Expand Up @@ -283,7 +288,7 @@ private Object[][] provideTestSqlAndRowCount() {
}

@DataProvider(name = "testDataWithSqlExecutionExceptions")
private Object[][] provideTestSqlWithExecutionException() {
protected Object[][] provideTestSqlWithExecutionException() {
//@formatter:off
return new Object[][]{
// Missing index
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.spi.executor;

public interface ExecutorServicePlugin {

/**
* An id that identifies this specific provider.
* <p>
* This id is used to select the provider in the configuration.
*/
String id();

/**
* Returns the provider that will create the {@link java.util.concurrent.ExecutorService} instances.
*/
ExecutorServiceProvider provider();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.spi.executor;

import java.util.concurrent.ExecutorService;
import org.apache.pinot.spi.env.PinotConfiguration;


/**
* A provider for {@link ExecutorService} instances.
*/
public interface ExecutorServiceProvider {
/**
* Creates a new {@link ExecutorService} instance.
*
* @param conf the configuration to use
* @param confPrefix the prefix to use for the configuration
* @param baseName the base name for the threads. A prefix that all threads will share.
* @return a new {@link ExecutorService} instance
*/
ExecutorService create(PinotConfiguration conf, String confPrefix, String baseName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,62 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.query.runtime.executor;
package org.apache.pinot.spi.executor;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.utils.NamedThreadFactory;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* A utility class to create {@link ExecutorService} instances.
*
* In order to create a new executor, the {@code create} methods should be called.
* These methods take an executor type as an argument.
*
* Pinot includes two executor service plugins:
* <ul>
* <li>{@code cached}: creates a new cached thread pool</li>
* <li>{@code fixed}: creates a new fixed thread pool.</li>
* </ul>
*
* @see ServiceLoader
*/
public class ExecutorServiceUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(ExecutorServiceUtils.class);
private static final long DEFAULT_TERMINATION_MILLIS = 30_000;

private ExecutorServiceUtils() {
private static final Map<String, ExecutorServiceProvider> PROVIDERS;

static {
PROVIDERS = new HashMap<>();
for (ExecutorServicePlugin plugin : ServiceLoader.load(ExecutorServicePlugin.class)) {
ExecutorServiceProvider provider = plugin.provider();
ExecutorServiceProvider old = PROVIDERS.put(plugin.id(), provider);
if (old != null) {
LOGGER.warn("Duplicate executor provider for id '{}': {} and {}", plugin.id(), old, provider);
} else {
LOGGER.info("Registered executor provider for id '{}': {}", plugin.id(), provider);
}
}
}

public static ExecutorService createDefault(String baseName) {
return Executors.newCachedThreadPool(new NamedThreadFactory(baseName));
private ExecutorServiceUtils() {
}

public static ExecutorService create(PinotConfiguration conf, String confPrefix, String baseName) {
//TODO: make this configurable
return Executors.newCachedThreadPool(new NamedThreadFactory(baseName));
public static ExecutorService create(PinotConfiguration conf, String confPrefix, String baseName, String defType) {
String type = conf.getProperty(confPrefix + ".type", defType);
ExecutorServiceProvider provider = PROVIDERS.get(type);
if (provider == null) {
throw new IllegalArgumentException("Unknown executor service provider: " + type);
}
return provider.create(conf, confPrefix, baseName);
}

/**
Expand Down
Loading

0 comments on commit 3b0d92d

Please sign in to comment.