From 8682f15c7402058f3690fa2ba7e094c6563bafe7 Mon Sep 17 00:00:00 2001 From: Shubh Sahu Date: Mon, 10 Jun 2024 21:08:43 +0530 Subject: [PATCH] Add recovery chunk size setting (#13997) Signed-off-by: Shubh Sahu --- CHANGELOG.md | 1 + .../indices/recovery/IndexRecoveryIT.java | 11 ++-- .../recovery/TruncatedRecoveryIT.java | 8 +-- .../common/settings/ClusterSettings.java | 1 + .../indices/recovery/RecoverySettings.java | 19 +++--- .../main/java/org/opensearch/node/Node.java | 5 -- .../RecoverySettingsDynamicUpdateTests.java | 8 +++ .../index/shard/IndexShardTestCase.java | 2 +- .../java/org/opensearch/node/MockNode.java | 8 --- .../node/RecoverySettingsChunkSizePlugin.java | 63 ------------------- 10 files changed, 33 insertions(+), 93 deletions(-) delete mode 100644 test/framework/src/main/java/org/opensearch/node/RecoverySettingsChunkSizePlugin.java diff --git a/CHANGELOG.md b/CHANGELOG.md index a1e19439c07e3..aacdc38089f3c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add dynamic action retry timeout setting ([#14022](https://github.com/opensearch-project/OpenSearch/issues/14022)) - Add capability to disable source recovery_source for an index ([#13590](https://github.com/opensearch-project/OpenSearch/pull/13590)) - [Remote Store] Add support to disable flush based on translog reader count ([#14027](https://github.com/opensearch-project/OpenSearch/pull/14027)) +- Add recovery chunk size setting ([#13997](https://github.com/opensearch-project/OpenSearch/pull/13997)) - [Query Insights] Add exporter support for top n queries ([#12982](https://github.com/opensearch-project/OpenSearch/pull/12982)) - [Query Insights] Add X-Opaque-Id to search request metadata for top n queries ([#13374](https://github.com/opensearch-project/OpenSearch/pull/13374)) - Move Remote Store Migration from DocRep to GA and modify remote migration settings name ([#14100](https://github.com/opensearch-project/OpenSearch/pull/14100)) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java index 8ce87f37d77cd..cf93a432d0371 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/recovery/IndexRecoveryIT.java @@ -104,7 +104,6 @@ import org.opensearch.indices.recovery.RecoveryState.Stage; import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.node.NodeClosedException; -import org.opensearch.node.RecoverySettingsChunkSizePlugin; import org.opensearch.plugins.AnalysisPlugin; import org.opensearch.plugins.Plugin; import org.opensearch.plugins.PluginsService; @@ -156,7 +155,7 @@ import static java.util.stream.Collectors.toList; import static org.opensearch.action.DocWriteResponse.Result.CREATED; import static org.opensearch.action.DocWriteResponse.Result.UPDATED; -import static org.opensearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING; +import static org.opensearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; import static org.hamcrest.Matchers.empty; @@ -187,7 +186,6 @@ protected Collection> nodePlugins() { return Arrays.asList( MockTransportService.TestPlugin.class, MockFSIndexStore.TestPlugin.class, - RecoverySettingsChunkSizePlugin.class, TestAnalysisPlugin.class, InternalSettingsPlugin.class, MockEngineFactoryPlugin.class @@ -263,7 +261,7 @@ private void slowDownRecovery(ByteSizeValue shardSize) { // one chunk per sec.. .put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), chunkSize, ByteSizeUnit.BYTES) // small chunks - .put(CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(chunkSize, ByteSizeUnit.BYTES)) + .put(INDICES_RECOVERY_CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(chunkSize, ByteSizeUnit.BYTES)) ) .get() .isAcknowledged() @@ -278,7 +276,10 @@ private void restoreRecoverySpeed() { .setTransientSettings( Settings.builder() .put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), "20mb") - .put(CHUNK_SIZE_SETTING.getKey(), RecoverySettings.DEFAULT_CHUNK_SIZE) + .put( + INDICES_RECOVERY_CHUNK_SIZE_SETTING.getKey(), + RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING.getDefault(Settings.EMPTY) + ) ) .get() .isAcknowledged() diff --git a/server/src/internalClusterTest/java/org/opensearch/recovery/TruncatedRecoveryIT.java b/server/src/internalClusterTest/java/org/opensearch/recovery/TruncatedRecoveryIT.java index bf0533143cf91..692beb86279b9 100644 --- a/server/src/internalClusterTest/java/org/opensearch/recovery/TruncatedRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/recovery/TruncatedRecoveryIT.java @@ -46,7 +46,6 @@ import org.opensearch.index.query.QueryBuilders; import org.opensearch.indices.recovery.FileChunkRequest; import org.opensearch.indices.recovery.PeerRecoveryTargetService; -import org.opensearch.node.RecoverySettingsChunkSizePlugin; import org.opensearch.plugins.Plugin; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase; @@ -61,7 +60,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; -import static org.opensearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING; +import static org.opensearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -81,7 +80,7 @@ public static Collection parameters() { @Override protected Collection> nodePlugins() { - return Arrays.asList(MockTransportService.TestPlugin.class, RecoverySettingsChunkSizePlugin.class); + return Arrays.asList(MockTransportService.TestPlugin.class); } /** @@ -96,7 +95,8 @@ public void testCancelRecoveryAndResume() throws Exception { .cluster() .prepareUpdateSettings() .setTransientSettings( - Settings.builder().put(CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(randomIntBetween(50, 300), ByteSizeUnit.BYTES)) + Settings.builder() + .put(INDICES_RECOVERY_CHUNK_SIZE_SETTING.getKey(), new ByteSizeValue(randomIntBetween(50, 300), ByteSizeUnit.BYTES)) ) .get() .isAcknowledged() diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index f157ba8851c8c..9f08d5ec442ac 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -308,6 +308,7 @@ public void apply(Settings value, Settings current, Settings previous) { RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING, RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_REMOTE_STORE_STREAMS_SETTING, RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT, + RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING, ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING, ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING, ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING, diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java index 782d3fcbd6e72..4f5c408d86a45 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java @@ -177,7 +177,14 @@ public class RecoverySettings { ); // choose 512KB-16B to ensure that the resulting byte[] is not a humongous allocation in G1. - public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512 * 1024 - 16, ByteSizeUnit.BYTES); + public static final Setting INDICES_RECOVERY_CHUNK_SIZE_SETTING = Setting.byteSizeSetting( + "indices.recovery.chunk_size", + new ByteSizeValue(512 * 1024 - 16, ByteSizeUnit.BYTES), + new ByteSizeValue(1, ByteSizeUnit.BYTES), + new ByteSizeValue(100, ByteSizeUnit.MB), + Property.Dynamic, + Property.NodeScope + ); private volatile ByteSizeValue recoveryMaxBytesPerSec; private volatile ByteSizeValue replicationMaxBytesPerSec; @@ -193,7 +200,7 @@ public class RecoverySettings { private volatile TimeValue internalActionRetryTimeout; private volatile TimeValue internalActionLongTimeout; - private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE; + private volatile ByteSizeValue chunkSize; private volatile TimeValue internalRemoteUploadTimeout; public RecoverySettings(Settings settings, ClusterSettings clusterSettings) { @@ -221,6 +228,7 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) { logger.debug("using recovery max_bytes_per_sec[{}]", recoveryMaxBytesPerSec); this.internalRemoteUploadTimeout = INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.get(settings); + this.chunkSize = INDICES_RECOVERY_CHUNK_SIZE_SETTING.get(settings); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setRecoveryMaxBytesPerSec); clusterSettings.addSettingsUpdateConsumer(INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING, this::setReplicationMaxBytesPerSec); @@ -239,11 +247,11 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) { ); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, this::setActivityTimeout); clusterSettings.addSettingsUpdateConsumer(INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT, this::setInternalRemoteUploadTimeout); + clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_CHUNK_SIZE_SETTING, this::setChunkSize); clusterSettings.addSettingsUpdateConsumer( INDICES_RECOVERY_INTERNAL_ACTION_RETRY_TIMEOUT_SETTING, this::setInternalActionRetryTimeout ); - } /** @@ -296,10 +304,7 @@ public ByteSizeValue getChunkSize() { return chunkSize; } - public void setChunkSize(ByteSizeValue chunkSize) { // only settable for tests - if (chunkSize.bytesAsInt() <= 0) { - throw new IllegalArgumentException("chunkSize must be > 0"); - } + public void setChunkSize(ByteSizeValue chunkSize) { this.chunkSize = chunkSize; } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 5f6708e98d38e..8e606d6633321 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -1335,7 +1335,6 @@ protected Node( b.bind(GatewayMetaState.class).toInstance(gatewayMetaState); b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery()); { - processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings); b.bind(PeerRecoverySourceService.class) .toInstance(new PeerRecoverySourceService(transportService, indicesService, recoverySettings)); b.bind(PeerRecoveryTargetService.class) @@ -1443,10 +1442,6 @@ protected TransportService newTransportService( return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders, tracer); } - protected void processRecoverySettings(ClusterSettings clusterSettings, RecoverySettings recoverySettings) { - // Noop in production, overridden by tests - } - /** * The settings that are used by this node. Contains original settings as well as additional settings provided by plugins. */ diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java index ccba745f9b126..650db6a2baf5a 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java @@ -119,6 +119,14 @@ public void testInternalLongActionTimeout() { assertEquals(new TimeValue(duration, timeUnit), recoverySettings.internalActionLongTimeout()); } + public void testChunkSize() { + ByteSizeValue chunkSize = new ByteSizeValue(between(1, 1000), ByteSizeUnit.BYTES); + clusterSettings.applySettings( + Settings.builder().put(RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING.getKey(), chunkSize).build() + ); + assertEquals(chunkSize, recoverySettings.getChunkSize()); + } + public void testInternalActionRetryTimeout() { long duration = between(1, 1000); TimeUnit timeUnit = randomFrom(TimeUnit.MILLISECONDS, TimeUnit.SECONDS, TimeUnit.MINUTES, TimeUnit.HOURS); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 6b609d8af62a1..655a9eb7d5d38 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -1148,7 +1148,7 @@ public final void recoverUnstartedReplica( startingSeqNo ); long fileChunkSizeInBytes = randomBoolean() - ? RecoverySettings.DEFAULT_CHUNK_SIZE.getBytes() + ? RecoverySettings.INDICES_RECOVERY_CHUNK_SIZE_SETTING.getDefault(Settings.EMPTY).getBytes() : randomIntBetween(1, 10 * 1024 * 1024); final Settings settings = Settings.builder() .put("indices.recovery.max_concurrent_file_chunks", Integer.toString(between(1, 4))) diff --git a/test/framework/src/main/java/org/opensearch/node/MockNode.java b/test/framework/src/main/java/org/opensearch/node/MockNode.java index 19c65ec169d3c..ecaee1ccc59b8 100644 --- a/test/framework/src/main/java/org/opensearch/node/MockNode.java +++ b/test/framework/src/main/java/org/opensearch/node/MockNode.java @@ -50,7 +50,6 @@ import org.opensearch.env.Environment; import org.opensearch.http.HttpServerTransport; import org.opensearch.indices.IndicesService; -import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.plugins.Plugin; import org.opensearch.script.MockScriptService; import org.opensearch.script.ScriptContext; @@ -236,13 +235,6 @@ protected TransportService newTransportService( } } - @Override - protected void processRecoverySettings(ClusterSettings clusterSettings, RecoverySettings recoverySettings) { - if (false == getPluginsService().filterPlugins(RecoverySettingsChunkSizePlugin.class).isEmpty()) { - clusterSettings.addSettingsUpdateConsumer(RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING, recoverySettings::setChunkSize); - } - } - @Override protected ClusterInfoService newClusterInfoService( Settings settings, diff --git a/test/framework/src/main/java/org/opensearch/node/RecoverySettingsChunkSizePlugin.java b/test/framework/src/main/java/org/opensearch/node/RecoverySettingsChunkSizePlugin.java deleted file mode 100644 index dabf23ce08263..0000000000000 --- a/test/framework/src/main/java/org/opensearch/node/RecoverySettingsChunkSizePlugin.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - -package org.opensearch.node; - -import org.opensearch.common.settings.Setting; -import org.opensearch.common.settings.Setting.Property; -import org.opensearch.core.common.unit.ByteSizeValue; -import org.opensearch.indices.recovery.RecoverySettings; -import org.opensearch.plugins.Plugin; - -import java.util.List; - -import static java.util.Collections.singletonList; - -/** - * Marker plugin that will trigger {@link MockNode} making {@link #CHUNK_SIZE_SETTING} dynamic. - */ -public class RecoverySettingsChunkSizePlugin extends Plugin { - /** - * The chunk size. Only exposed by tests. - */ - public static final Setting CHUNK_SIZE_SETTING = Setting.byteSizeSetting( - "indices.recovery.chunk_size", - RecoverySettings.DEFAULT_CHUNK_SIZE, - Property.Dynamic, - Property.NodeScope - ); - - @Override - public List> getSettings() { - return singletonList(CHUNK_SIZE_SETTING); - } -}