From 682ae0d923b5a4434a1f55e284465305dc242248 Mon Sep 17 00:00:00 2001 From: Gaurav Bafna Date: Tue, 8 Oct 2024 17:26:58 +0530 Subject: [PATCH] Add support to dynamically resize threadpools size Signed-off-by: Gaurav Bafna --- CHANGELOG.md | 1 + .../cluster/settings/ClusterSettingsIT.java | 29 ++++++++ .../common/settings/ClusterSettings.java | 4 +- .../org/opensearch/threadpool/ThreadPool.java | 71 ++++++++++++++----- .../threadpool/ThreadPoolTests.java | 48 +++++++++++++ 5 files changed, 133 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b69c33df3b6a5..ca8ca8b56b871 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add _list/indices API as paginated alternate to _cat/indices ([#14718](https://github.com/opensearch-project/OpenSearch/pull/14718)) - Add success and failure metrics for async shard fetch ([#15976](https://github.com/opensearch-project/OpenSearch/pull/15976)) - Add new metric REMOTE_STORE to NodeStats API response ([#15611](https://github.com/opensearch-project/OpenSearch/pull/15611)) +- Add support to dynamically resize threadpools size. - [S3 Repository] Change default retry mechanism of s3 clients to Standard Mode ([#15978](https://github.com/opensearch-project/OpenSearch/pull/15978)) - Add changes to block calls in cat shards, indices and segments based on dynamic limit settings ([#15986](https://github.com/opensearch-project/OpenSearch/pull/15986)) diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/settings/ClusterSettingsIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/settings/ClusterSettingsIT.java index 541f1048bb246..33e51f1ca0831 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/settings/ClusterSettingsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/settings/ClusterSettingsIT.java @@ -380,6 +380,35 @@ public void testMissingUnits() { } } + public void testThreadPoolSettings() { + String key1 = "cluster.thread_pool.snapshot.max"; + Settings transientSettings = Settings.builder().put(key1, "-1").build(); + + String key2 = "cluster.thread_pool.snapshot.max"; + Settings persistentSettings = Settings.builder().put(key2, "5").build(); + + try { + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(transientSettings) + .setPersistentSettings(persistentSettings) + .get(); + fail("bogus value"); + } catch (IllegalArgumentException ex) { + assertEquals(ex.getMessage(), "illegal value for [cluster.thread_pool.snapshot], has to be positive value"); + } + + transientSettings = Settings.builder().put(key1, "1").build(); + persistentSettings = Settings.builder().put(key2, "5").build(); + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(transientSettings) + .setPersistentSettings(persistentSettings) + .get(); + } + public void testLoggerLevelUpdate() { assertAcked(prepareCreate("test")); diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 6180229a11150..12f37af903a94 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -804,8 +804,8 @@ public void apply(Settings value, Settings current, Settings previous) { ResponseLimitSettings.CAT_SHARDS_RESPONSE_LIMIT_SETTING, ResponseLimitSettings.CAT_SEGMENTS_RESPONSE_LIMIT_SETTING, - //Thread pool - ThreadPool.THREADPOOL_SNAPSHOT_SETTING + // Thread pool Settings + ThreadPool.CLUSTER_THREAD_POOL_SIZE_SETTING ) ) ); diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java index 9f279de4e87a0..76ddcb3da4fa9 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java @@ -214,7 +214,6 @@ public static ThreadPoolType fromType(String type) { private ClusterSettings clusterSettings = null; - public Collection builders() { return Collections.unmodifiableCollection(builders.values()); } @@ -226,32 +225,68 @@ public Collection builders() { Setting.Property.NodeScope ); + public static final Setting CLUSTER_THREAD_POOL_SIZE_SETTING = Setting.groupSetting("cluster.thread_pool.", (tpSettings) -> { + Map tpGroups = tpSettings.getAsGroups(); + for (Map.Entry entry : tpGroups.entrySet()) { + String tpName = entry.getKey(); + Settings tpGroup = entry.getValue(); + int max = tpGroup.getAsInt("max", 1); + int core = tpGroup.getAsInt("core", 1); + int size = tpGroup.getAsInt("size", 1); + if (max <= 0 || core <= 0 || size <= 0) { + throw new IllegalArgumentException("illegal value for [cluster.thread_pool." + tpName + "], has to be positive value"); + } + } + }, + Setting.Property.Dynamic, + Setting.Property.NodeScope + + ); + public ThreadPool(final Settings settings, final ExecutorBuilder... customBuilders) { this(settings, null, customBuilders); } - public static final Setting THREADPOOL_SNAPSHOT_SETTING = Setting.intSetting( - "cluster.thread_pool.snapshot", - -1, - -1, - Setting.Property.NodeScope, - Setting.Property.Dynamic - ); - - public void setSnapshotThread(int snapshotThread) { - OpenSearchThreadPoolExecutor o = (OpenSearchThreadPoolExecutor) this.executors.get(Names.SNAPSHOT).executor; - if (snapshotThread != -1) { - o.setCorePoolSize(snapshotThread); - o.setMaximumPoolSize(snapshotThread); + public void setThreadPool(Settings tpSettings) { + Map tpGroups = tpSettings.getAsGroups(); + for (Map.Entry entry : tpGroups.entrySet()) { + String tpName = entry.getKey(); + Settings tpGroup = entry.getValue(); + ExecutorHolder holder = executors.get(tpName); + assert holder.executor instanceof OpenSearchThreadPoolExecutor; + OpenSearchThreadPoolExecutor o = (OpenSearchThreadPoolExecutor) holder.executor; + if (holder.info.type == ThreadPoolType.SCALING) { + int max = tpGroup.getAsInt("max", o.getMaximumPoolSize()); + int core = tpGroup.getAsInt("core", o.getCorePoolSize()); + if (core > max) { + // Can we do better than silently ignoring this as this can't be caught in static validation ? + logger.error("Thread pool {} core {} is higher than maximum value {}. Ignoring it", tpName, core, max); + continue; + } + // Below check makes sure we adhere to the constraint that cores <= max at all the time. + if (core < o.getCorePoolSize()) { + o.setCorePoolSize(core); + o.setMaximumPoolSize(max); + } else { + o.setMaximumPoolSize(max); + o.setCorePoolSize(core); + } + } else { + int size = tpGroup.getAsInt("size", o.getMaximumPoolSize()); + if (size < o.getCorePoolSize()) { + o.setCorePoolSize(size); + o.setMaximumPoolSize(size); + } else { + o.setMaximumPoolSize(size); + o.setCorePoolSize(size); + } + } } } public void setClusterSettings(ClusterSettings clusterSettings) { this.clusterSettings = clusterSettings; - this.clusterSettings.addSettingsUpdateConsumer( - THREADPOOL_SNAPSHOT_SETTING, - this::setSnapshotThread - ); + this.clusterSettings.addSettingsUpdateConsumer(CLUSTER_THREAD_POOL_SIZE_SETTING, this::setThreadPool); } public ThreadPool( diff --git a/server/src/test/java/org/opensearch/threadpool/ThreadPoolTests.java b/server/src/test/java/org/opensearch/threadpool/ThreadPoolTests.java index 658de5ec49500..336152df63a23 100644 --- a/server/src/test/java/org/opensearch/threadpool/ThreadPoolTests.java +++ b/server/src/test/java/org/opensearch/threadpool/ThreadPoolTests.java @@ -36,6 +36,7 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.FutureUtils; import org.opensearch.common.util.concurrent.OpenSearchExecutors; +import org.opensearch.common.util.concurrent.OpenSearchThreadPoolExecutor; import org.opensearch.test.OpenSearchTestCase; import java.util.concurrent.CountDownLatch; @@ -152,4 +153,51 @@ public void testInheritContextOnSchedule() throws InterruptedException { terminate(threadPool); } } + + public void testThreadPoolResize() { + TestThreadPool threadPool = new TestThreadPool("test"); + try { + // increase it + Settings commonSettings = Settings.builder().put("snapshot.max", "10").put("snapshot.core", "2").put("get.size", "100").build(); + threadPool.setThreadPool(commonSettings); + ExecutorService executorService = threadPool.executor("snapshot"); + OpenSearchThreadPoolExecutor executor = (OpenSearchThreadPoolExecutor) executorService; + assertEquals(10, executor.getMaximumPoolSize()); + assertEquals(2, executor.getCorePoolSize()); + + executorService = threadPool.executor("get"); + executor = (OpenSearchThreadPoolExecutor) executorService; + assertEquals(100, executor.getMaximumPoolSize()); + assertEquals(100, executor.getCorePoolSize()); + + // decrease it + commonSettings = Settings.builder().put("snapshot.max", "2").put("snapshot.core", "1").put("get.size", "90").build(); + threadPool.setThreadPool(commonSettings); + executorService = threadPool.executor("snapshot"); + executor = (OpenSearchThreadPoolExecutor) executorService; + assertEquals(2, executor.getMaximumPoolSize()); + assertEquals(1, executor.getCorePoolSize()); + + executorService = threadPool.executor("get"); + executor = (OpenSearchThreadPoolExecutor) executorService; + assertEquals(90, executor.getMaximumPoolSize()); + assertEquals(90, executor.getCorePoolSize()); + } finally { + terminate(threadPool); + } + } + + public void testThreadPoolResizeFail() { + TestThreadPool threadPool = new TestThreadPool("test"); + try { + Settings commonSettings = Settings.builder().put("snapshot.max", "50").put("snapshot.core", "100").build(); + threadPool.setThreadPool(commonSettings); + ExecutorService executorService = threadPool.executor("snapshot"); + OpenSearchThreadPoolExecutor executor = (OpenSearchThreadPoolExecutor) executorService; + assertNotEquals(50, executor.getMaximumPoolSize()); + assertNotEquals(100, executor.getCorePoolSize()); + } finally { + terminate(threadPool); + } + } }