From 8d75efd64936b830fb502176bf83d4fa73198b81 Mon Sep 17 00:00:00 2001 From: Gaurav Bafna Date: Tue, 8 Oct 2024 17:33:27 +0530 Subject: [PATCH] Add support to dynamically resize threadpools size Signed-off-by: Gaurav Bafna --- CHANGELOG.md | 1 + .../cluster/settings/ClusterSettingsIT.java | 29 +++++++++ .../common/settings/ClusterSettings.java | 5 +- .../main/java/org/opensearch/node/Node.java | 1 + .../org/opensearch/threadpool/ThreadPool.java | 63 +++++++++++++++++++ .../threadpool/ThreadPoolTests.java | 48 ++++++++++++++ 6 files changed, 146 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0cad8c764a8e5..5c45276894b26 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add _list/indices API as paginated alternate to _cat/indices ([#14718](https://github.com/opensearch-project/OpenSearch/pull/14718)) - Add success and failure metrics for async shard fetch ([#15976](https://github.com/opensearch-project/OpenSearch/pull/15976)) - Add new metric REMOTE_STORE to NodeStats API response ([#15611](https://github.com/opensearch-project/OpenSearch/pull/15611)) +- Add support to dynamically resize threadpools size. ([#16236](https://github.com/opensearch-project/OpenSearch/pull/16236)) - [S3 Repository] Change default retry mechanism of s3 clients to Standard Mode ([#15978](https://github.com/opensearch-project/OpenSearch/pull/15978)) - Add changes to block calls in cat shards, indices and segments based on dynamic limit settings ([#15986](https://github.com/opensearch-project/OpenSearch/pull/15986)) - New `phone` & `phone-search` analyzer + tokenizer ([#15915](https://github.com/opensearch-project/OpenSearch/pull/15915)) diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/settings/ClusterSettingsIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/settings/ClusterSettingsIT.java index 541f1048bb246..33e51f1ca0831 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/settings/ClusterSettingsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/settings/ClusterSettingsIT.java @@ -380,6 +380,35 @@ public void testMissingUnits() { } } + public void testThreadPoolSettings() { + String key1 = "cluster.thread_pool.snapshot.max"; + Settings transientSettings = Settings.builder().put(key1, "-1").build(); + + String key2 = "cluster.thread_pool.snapshot.max"; + Settings persistentSettings = Settings.builder().put(key2, "5").build(); + + try { + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(transientSettings) + .setPersistentSettings(persistentSettings) + .get(); + fail("bogus value"); + } catch (IllegalArgumentException ex) { + assertEquals(ex.getMessage(), "illegal value for [cluster.thread_pool.snapshot], has to be positive value"); + } + + transientSettings = Settings.builder().put(key1, "1").build(); + persistentSettings = Settings.builder().put(key2, "5").build(); + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(transientSettings) + .setPersistentSettings(persistentSettings) + .get(); + } + public void testLoggerLevelUpdate() { assertAcked(prepareCreate("test")); diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index a84a29256ee19..f769f8729c25b 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -805,7 +805,10 @@ public void apply(Settings value, Settings current, Settings previous) { // Settings to be used for limiting rest requests ResponseLimitSettings.CAT_INDICES_RESPONSE_LIMIT_SETTING, ResponseLimitSettings.CAT_SHARDS_RESPONSE_LIMIT_SETTING, - ResponseLimitSettings.CAT_SEGMENTS_RESPONSE_LIMIT_SETTING + ResponseLimitSettings.CAT_SEGMENTS_RESPONSE_LIMIT_SETTING, + + // Thread pool Settings + ThreadPool.CLUSTER_THREAD_POOL_SIZE_SETTING ) ) ); diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 5c965b06a4b69..994a8bf2d57aa 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -624,6 +624,7 @@ protected Node( additionalSettingsFilter, settingsUpgraders ); + threadPool.setClusterSettings(settingsModule.getClusterSettings()); scriptModule.registerClusterSettingsListeners(scriptService, settingsModule.getClusterSettings()); final NetworkService networkService = new NetworkService( getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class)) diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java index d795fd252b7fc..76ddcb3da4fa9 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java @@ -38,6 +38,7 @@ import org.opensearch.Version; import org.opensearch.common.Nullable; import org.opensearch.common.annotation.PublicApi; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.SizeValue; @@ -211,6 +212,8 @@ public static ThreadPoolType fromType(String type) { private final ScheduledThreadPoolExecutor scheduler; + private ClusterSettings clusterSettings = null; + public Collection builders() { return Collections.unmodifiableCollection(builders.values()); } @@ -222,10 +225,70 @@ public Collection builders() { Setting.Property.NodeScope ); + public static final Setting CLUSTER_THREAD_POOL_SIZE_SETTING = Setting.groupSetting("cluster.thread_pool.", (tpSettings) -> { + Map tpGroups = tpSettings.getAsGroups(); + for (Map.Entry entry : tpGroups.entrySet()) { + String tpName = entry.getKey(); + Settings tpGroup = entry.getValue(); + int max = tpGroup.getAsInt("max", 1); + int core = tpGroup.getAsInt("core", 1); + int size = tpGroup.getAsInt("size", 1); + if (max <= 0 || core <= 0 || size <= 0) { + throw new IllegalArgumentException("illegal value for [cluster.thread_pool." + tpName + "], has to be positive value"); + } + } + }, + Setting.Property.Dynamic, + Setting.Property.NodeScope + + ); + public ThreadPool(final Settings settings, final ExecutorBuilder... customBuilders) { this(settings, null, customBuilders); } + public void setThreadPool(Settings tpSettings) { + Map tpGroups = tpSettings.getAsGroups(); + for (Map.Entry entry : tpGroups.entrySet()) { + String tpName = entry.getKey(); + Settings tpGroup = entry.getValue(); + ExecutorHolder holder = executors.get(tpName); + assert holder.executor instanceof OpenSearchThreadPoolExecutor; + OpenSearchThreadPoolExecutor o = (OpenSearchThreadPoolExecutor) holder.executor; + if (holder.info.type == ThreadPoolType.SCALING) { + int max = tpGroup.getAsInt("max", o.getMaximumPoolSize()); + int core = tpGroup.getAsInt("core", o.getCorePoolSize()); + if (core > max) { + // Can we do better than silently ignoring this as this can't be caught in static validation ? + logger.error("Thread pool {} core {} is higher than maximum value {}. Ignoring it", tpName, core, max); + continue; + } + // Below check makes sure we adhere to the constraint that cores <= max at all the time. + if (core < o.getCorePoolSize()) { + o.setCorePoolSize(core); + o.setMaximumPoolSize(max); + } else { + o.setMaximumPoolSize(max); + o.setCorePoolSize(core); + } + } else { + int size = tpGroup.getAsInt("size", o.getMaximumPoolSize()); + if (size < o.getCorePoolSize()) { + o.setCorePoolSize(size); + o.setMaximumPoolSize(size); + } else { + o.setMaximumPoolSize(size); + o.setCorePoolSize(size); + } + } + } + } + + public void setClusterSettings(ClusterSettings clusterSettings) { + this.clusterSettings = clusterSettings; + this.clusterSettings.addSettingsUpdateConsumer(CLUSTER_THREAD_POOL_SIZE_SETTING, this::setThreadPool); + } + public ThreadPool( final Settings settings, final AtomicReference runnableTaskListener, diff --git a/server/src/test/java/org/opensearch/threadpool/ThreadPoolTests.java b/server/src/test/java/org/opensearch/threadpool/ThreadPoolTests.java index 658de5ec49500..336152df63a23 100644 --- a/server/src/test/java/org/opensearch/threadpool/ThreadPoolTests.java +++ b/server/src/test/java/org/opensearch/threadpool/ThreadPoolTests.java @@ -36,6 +36,7 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.FutureUtils; import org.opensearch.common.util.concurrent.OpenSearchExecutors; +import org.opensearch.common.util.concurrent.OpenSearchThreadPoolExecutor; import org.opensearch.test.OpenSearchTestCase; import java.util.concurrent.CountDownLatch; @@ -152,4 +153,51 @@ public void testInheritContextOnSchedule() throws InterruptedException { terminate(threadPool); } } + + public void testThreadPoolResize() { + TestThreadPool threadPool = new TestThreadPool("test"); + try { + // increase it + Settings commonSettings = Settings.builder().put("snapshot.max", "10").put("snapshot.core", "2").put("get.size", "100").build(); + threadPool.setThreadPool(commonSettings); + ExecutorService executorService = threadPool.executor("snapshot"); + OpenSearchThreadPoolExecutor executor = (OpenSearchThreadPoolExecutor) executorService; + assertEquals(10, executor.getMaximumPoolSize()); + assertEquals(2, executor.getCorePoolSize()); + + executorService = threadPool.executor("get"); + executor = (OpenSearchThreadPoolExecutor) executorService; + assertEquals(100, executor.getMaximumPoolSize()); + assertEquals(100, executor.getCorePoolSize()); + + // decrease it + commonSettings = Settings.builder().put("snapshot.max", "2").put("snapshot.core", "1").put("get.size", "90").build(); + threadPool.setThreadPool(commonSettings); + executorService = threadPool.executor("snapshot"); + executor = (OpenSearchThreadPoolExecutor) executorService; + assertEquals(2, executor.getMaximumPoolSize()); + assertEquals(1, executor.getCorePoolSize()); + + executorService = threadPool.executor("get"); + executor = (OpenSearchThreadPoolExecutor) executorService; + assertEquals(90, executor.getMaximumPoolSize()); + assertEquals(90, executor.getCorePoolSize()); + } finally { + terminate(threadPool); + } + } + + public void testThreadPoolResizeFail() { + TestThreadPool threadPool = new TestThreadPool("test"); + try { + Settings commonSettings = Settings.builder().put("snapshot.max", "50").put("snapshot.core", "100").build(); + threadPool.setThreadPool(commonSettings); + ExecutorService executorService = threadPool.executor("snapshot"); + OpenSearchThreadPoolExecutor executor = (OpenSearchThreadPoolExecutor) executorService; + assertNotEquals(50, executor.getMaximumPoolSize()); + assertNotEquals(100, executor.getCorePoolSize()); + } finally { + terminate(threadPool); + } + } }