From 194832197947de40142d8ec18e71b7143f93f99f Mon Sep 17 00:00:00 2001 From: Himshikha Gupta Date: Mon, 23 Sep 2024 18:55:16 +0530 Subject: [PATCH 01/10] checksum parallelization Signed-off-by: Himshikha Gupta --- .../gateway/remote/ClusterStateChecksum.java | 192 +++++++++++++----- .../remote/RemoteClusterStateService.java | 2 +- .../gateway/remote/RemoteManifestManager.java | 1 + 3 files changed, 144 insertions(+), 51 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java index d6739c4572d1a..bff4a9ae0d19a 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java @@ -27,6 +27,12 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; import com.jcraft.jzlib.JZlib; @@ -50,6 +56,7 @@ public class ClusterStateChecksum implements ToXContentFragment, Writeable { static final String INDICES_CS = "indices_md"; private static final String CLUSTER_STATE_CS = "cluster_state"; private static final int CHECKSUM_SIZE = 8; + private static final int COMPONENT_SIZE = 11; private static final Logger logger = LogManager.getLogger(ClusterStateChecksum.class); long routingTableChecksum; @@ -66,61 +73,146 @@ public class ClusterStateChecksum implements ToXContentFragment, Writeable { long clusterStateChecksum; public ClusterStateChecksum(ClusterState clusterState) { + long start = System.currentTimeMillis(); + // keeping thread pool size to number of components. + ExecutorService executorService = Executors.newFixedThreadPool(COMPONENT_SIZE); + CountDownLatch latch = new CountDownLatch(COMPONENT_SIZE); + + executeChecksumTask((stream) -> { + try { + clusterState.routingTable().writeVerifiableTo(stream); + } catch (IOException e) { + throw new RemoteStateTransferException("Failed to create checksum for routing table", e); + } + return null; + }, checksum -> routingTableChecksum = checksum, executorService, latch); + + executeChecksumTask((stream) -> { + try { + clusterState.nodes().writeVerifiableTo(stream); + } catch (IOException e) { + throw new RemoteStateTransferException("Failed to create checksum for discovery nodes", e); + } + return null; + }, checksum -> nodesChecksum = checksum, executorService, latch); + + executeChecksumTask((stream) -> { + try { + clusterState.coordinationMetadata().writeVerifiableTo(stream); + } catch (IOException e) { + throw new RemoteStateTransferException("Failed to create checksum for coordination metadata", e); + } + return null; + }, checksum -> coordinationMetadataChecksum = checksum, executorService, latch); + + executeChecksumTask((stream) -> { + try { + Settings.writeSettingsToStream(clusterState.metadata().persistentSettings(), stream); + } catch (IOException e) { + throw new RemoteStateTransferException("Failed to create checksum for settings metadata", e); + } + return null; + }, checksum -> settingMetadataChecksum = checksum, executorService, latch); + + executeChecksumTask((stream) -> { + try { + Settings.writeSettingsToStream(clusterState.metadata().transientSettings(), stream); + } catch (IOException e) { + throw new RemoteStateTransferException("Failed to create checksum for transient settings metadata", e); + } + return null; + }, checksum -> transientSettingsMetadataChecksum = checksum, executorService, latch); + + executeChecksumTask((stream) -> { + try { + clusterState.metadata().templatesMetadata().writeVerifiableTo(stream); + } catch (IOException e) { + throw new RemoteStateTransferException("Failed to create checksum for templates metadata", e); + } + return null; + }, checksum -> templatesMetadataChecksum = checksum, executorService, latch); + + executeChecksumTask((stream) -> { + try { + stream.writeStringCollection(clusterState.metadata().customs().keySet()); + } catch (IOException e) { + throw new RemoteStateTransferException("Failed to create checksum for customs metadata", e); + } + return null; + }, checksum -> customMetadataMapChecksum = checksum, executorService, latch); + + executeChecksumTask((stream) -> { + try { + ((DiffableStringMap) clusterState.metadata().hashesOfConsistentSettings()).writeTo(stream); + } catch (IOException e) { + throw new RemoteStateTransferException("Failed to create checksum for hashesOfConsistentSettings", e); + } + return null; + }, checksum -> hashesOfConsistentSettingsChecksum = checksum, executorService, latch); + + executeChecksumTask((stream) -> { + try { + stream.writeMapValues( + clusterState.metadata().indices(), + (checksumStream, value) -> value.writeVerifiableTo((BufferedChecksumStreamOutput) checksumStream) + ); + } catch (IOException e) { + throw new RemoteStateTransferException("Failed to create checksum for indices metadata", e); + } + return null; + }, checksum -> indicesChecksum = checksum, executorService, latch); + + executeChecksumTask((stream) -> { + try { + clusterState.blocks().writeVerifiableTo(stream); + } catch (IOException e) { + throw new RemoteStateTransferException("Failed to create checksum for cluster state blocks", e); + } + return null; + }, checksum -> blocksChecksum = checksum, executorService, latch); + + executeChecksumTask((stream) -> { + try { + stream.writeStringCollection(clusterState.customs().keySet()); + } catch (IOException e) { + throw new RemoteStateTransferException("Failed to create checksum for cluster state customs", e); + } + return null; + }, checksum -> clusterStateCustomsChecksum = checksum, executorService, latch); + + executorService.shutdown(); + try { + latch.await(); + } catch (InterruptedException e) { + throw new RemoteStateTransferException("Failed to create checksum for cluster state.", e); + } + logger.info("checksum execution time {}", System.currentTimeMillis() - start); + + createClusterStateChecksum(); + + } + + private void executeChecksumTask(Function checksumTask, Consumer checksumConsumer, ExecutorService executorService, CountDownLatch latch) { + executorService.execute(() -> { + try { + long checksum = createChecksum(checksumTask); + checksumConsumer.accept(checksum); + latch.countDown(); + } catch (IOException e) { + throw new RuntimeException(e); + }git + }); + + } + + private long createChecksum(Function o) throws IOException { try ( BytesStreamOutput out = new BytesStreamOutput(); BufferedChecksumStreamOutput checksumOut = new BufferedChecksumStreamOutput(out) ) { - clusterState.routingTable().writeVerifiableTo(checksumOut); - routingTableChecksum = checksumOut.getChecksum(); - - checksumOut.reset(); - clusterState.nodes().writeVerifiableTo(checksumOut); - nodesChecksum = checksumOut.getChecksum(); - - checksumOut.reset(); - clusterState.coordinationMetadata().writeVerifiableTo(checksumOut); - coordinationMetadataChecksum = checksumOut.getChecksum(); - - // Settings create sortedMap by default, so no explicit sorting required here. - checksumOut.reset(); - Settings.writeSettingsToStream(clusterState.metadata().persistentSettings(), checksumOut); - settingMetadataChecksum = checksumOut.getChecksum(); - - checksumOut.reset(); - Settings.writeSettingsToStream(clusterState.metadata().transientSettings(), checksumOut); - transientSettingsMetadataChecksum = checksumOut.getChecksum(); - - checksumOut.reset(); - clusterState.metadata().templatesMetadata().writeVerifiableTo(checksumOut); - templatesMetadataChecksum = checksumOut.getChecksum(); - - checksumOut.reset(); - checksumOut.writeStringCollection(clusterState.metadata().customs().keySet()); - customMetadataMapChecksum = checksumOut.getChecksum(); - - checksumOut.reset(); - ((DiffableStringMap) clusterState.metadata().hashesOfConsistentSettings()).writeTo(checksumOut); - hashesOfConsistentSettingsChecksum = checksumOut.getChecksum(); - - checksumOut.reset(); - checksumOut.writeMapValues( - clusterState.metadata().indices(), - (stream, value) -> value.writeVerifiableTo((BufferedChecksumStreamOutput) stream) - ); - indicesChecksum = checksumOut.getChecksum(); - - checksumOut.reset(); - clusterState.blocks().writeVerifiableTo(checksumOut); - blocksChecksum = checksumOut.getChecksum(); - - checksumOut.reset(); - checksumOut.writeStringCollection(clusterState.customs().keySet()); - clusterStateCustomsChecksum = checksumOut.getChecksum(); - } catch (IOException e) { - logger.error("Failed to create checksum for cluster state.", e); - throw new RemoteStateTransferException("Failed to create checksum for cluster state.", e); + o.apply(checksumOut); + return checksumOut.getChecksum(); } - createClusterStateChecksum(); } private void createClusterStateChecksum() { diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 12d10fd908b44..3ee1a68e09278 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -156,7 +156,7 @@ public class RemoteClusterStateService implements Closeable { public static final Setting REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING = new Setting<>( "cluster.remote_store.state.checksum_validation.mode", - RemoteClusterStateValidationMode.NONE.name(), + RemoteClusterStateValidationMode.FAILURE.name(), RemoteClusterStateValidationMode::parseString, Setting.Property.Dynamic, Setting.Property.NodeScope diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteManifestManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteManifestManager.java index b243269fe323e..8c5294c6d43e0 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteManifestManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteManifestManager.java @@ -103,6 +103,7 @@ RemoteClusterStateManifestInfo uploadManifest( boolean committed, int codecVersion ) { + logger.info("checksum created {}", clusterStateChecksum); synchronized (this) { ClusterMetadataManifest.Builder manifestBuilder = ClusterMetadataManifest.builder(); manifestBuilder.clusterTerm(clusterState.term()) From 1b5edaa112c5ed596997bce1a58dbf6df7a42f9c Mon Sep 17 00:00:00 2001 From: Himshikha Gupta Date: Mon, 23 Sep 2024 22:12:08 +0530 Subject: [PATCH 02/10] cleaning up Signed-off-by: Himshikha Gupta --- .../opensearch/gateway/remote/ClusterStateChecksum.java | 9 +++------ .../gateway/remote/RemoteClusterStateService.java | 2 +- .../opensearch/gateway/remote/RemoteManifestManager.java | 1 - 3 files changed, 4 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java index bff4a9ae0d19a..62e8ba7944f3a 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java @@ -186,10 +186,8 @@ public ClusterStateChecksum(ClusterState clusterState) { } catch (InterruptedException e) { throw new RemoteStateTransferException("Failed to create checksum for cluster state.", e); } - logger.info("checksum execution time {}", System.currentTimeMillis() - start); - createClusterStateChecksum(); - + logger.debug("Checksum execution time {}", System.currentTimeMillis() - start); } private void executeChecksumTask(Function checksumTask, Consumer checksumConsumer, ExecutorService executorService, CountDownLatch latch) { @@ -199,10 +197,9 @@ private void executeChecksumTask(Function ch checksumConsumer.accept(checksum); latch.countDown(); } catch (IOException e) { - throw new RuntimeException(e); - }git + throw new RemoteStateTransferException("Failed to execute checksum task", e); + } }); - } private long createChecksum(Function o) throws IOException { diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 3ee1a68e09278..12d10fd908b44 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -156,7 +156,7 @@ public class RemoteClusterStateService implements Closeable { public static final Setting REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING = new Setting<>( "cluster.remote_store.state.checksum_validation.mode", - RemoteClusterStateValidationMode.FAILURE.name(), + RemoteClusterStateValidationMode.NONE.name(), RemoteClusterStateValidationMode::parseString, Setting.Property.Dynamic, Setting.Property.NodeScope diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteManifestManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteManifestManager.java index 8c5294c6d43e0..b243269fe323e 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteManifestManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteManifestManager.java @@ -103,7 +103,6 @@ RemoteClusterStateManifestInfo uploadManifest( boolean committed, int codecVersion ) { - logger.info("checksum created {}", clusterStateChecksum); synchronized (this) { ClusterMetadataManifest.Builder manifestBuilder = ClusterMetadataManifest.builder(); manifestBuilder.clusterTerm(clusterState.term()) From 542d70941e69922e40f9a8883aa25da06a23d41b Mon Sep 17 00:00:00 2001 From: Himshikha Gupta Date: Tue, 24 Sep 2024 15:15:54 +0530 Subject: [PATCH 03/10] addressing comments Signed-off-by: Himshikha Gupta --- .../gateway/remote/ClusterStateChecksum.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java index 62e8ba7944f3a..7f97dd7ee1a3d 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java @@ -14,6 +14,7 @@ import org.opensearch.cluster.metadata.DiffableStringMap; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.io.stream.BufferedChecksumStreamOutput; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; @@ -30,7 +31,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Function; @@ -73,7 +73,7 @@ public class ClusterStateChecksum implements ToXContentFragment, Writeable { long clusterStateChecksum; public ClusterStateChecksum(ClusterState clusterState) { - long start = System.currentTimeMillis(); + long start = System.nanoTime(); // keeping thread pool size to number of components. ExecutorService executorService = Executors.newFixedThreadPool(COMPONENT_SIZE); CountDownLatch latch = new CountDownLatch(COMPONENT_SIZE); @@ -187,16 +187,21 @@ public ClusterStateChecksum(ClusterState clusterState) { throw new RemoteStateTransferException("Failed to create checksum for cluster state.", e); } createClusterStateChecksum(); - logger.debug("Checksum execution time {}", System.currentTimeMillis() - start); + logger.debug("Checksum execution time {}", TimeValue.nsecToMSec(System.nanoTime() - start)); } - private void executeChecksumTask(Function checksumTask, Consumer checksumConsumer, ExecutorService executorService, CountDownLatch latch) { + private void executeChecksumTask( + Function checksumTask, + Consumer checksumConsumer, + ExecutorService executorService, + CountDownLatch latch + ) { executorService.execute(() -> { try { long checksum = createChecksum(checksumTask); checksumConsumer.accept(checksum); latch.countDown(); - } catch (IOException e) { + } catch (IOException e) { throw new RemoteStateTransferException("Failed to execute checksum task", e); } }); From 0472a1c49dce09f1b0cd67917dea76b66b61be17 Mon Sep 17 00:00:00 2001 From: Himshikha Gupta Date: Wed, 25 Sep 2024 13:30:01 +0530 Subject: [PATCH 04/10] fixing thread pool Signed-off-by: Himshikha Gupta --- .../gateway/remote/ClusterStateChecksum.java | 12 +++---- .../remote/RemoteClusterStateService.java | 8 ++--- .../org/opensearch/threadpool/ThreadPool.java | 11 +++++++ .../remote/ClusterMetadataManifestTests.java | 14 ++++++-- .../remote/ClusterStateChecksumTests.java | 32 ++++++++++++------- .../RemoteClusterStateServiceTests.java | 28 ++++++++-------- .../threadpool/ScalingThreadPoolTests.java | 1 + 7 files changed, 68 insertions(+), 38 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java index 7f97dd7ee1a3d..891f2a4dd0abf 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java @@ -30,11 +30,11 @@ import java.util.Objects; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.function.Consumer; import java.util.function.Function; import com.jcraft.jzlib.JZlib; +import org.opensearch.threadpool.ThreadPool; import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; @@ -72,10 +72,9 @@ public class ClusterStateChecksum implements ToXContentFragment, Writeable { long indicesChecksum; long clusterStateChecksum; - public ClusterStateChecksum(ClusterState clusterState) { - long start = System.nanoTime(); - // keeping thread pool size to number of components. - ExecutorService executorService = Executors.newFixedThreadPool(COMPONENT_SIZE); + public ClusterStateChecksum(ClusterState clusterState, ThreadPool threadpool) { + long start = threadpool.relativeTimeInNanos(); + ExecutorService executorService = threadpool.executor(ThreadPool.Names.REMOTE_STATE_CHECKSUM); CountDownLatch latch = new CountDownLatch(COMPONENT_SIZE); executeChecksumTask((stream) -> { @@ -180,14 +179,13 @@ public ClusterStateChecksum(ClusterState clusterState) { return null; }, checksum -> clusterStateCustomsChecksum = checksum, executorService, latch); - executorService.shutdown(); try { latch.await(); } catch (InterruptedException e) { throw new RemoteStateTransferException("Failed to create checksum for cluster state.", e); } createClusterStateChecksum(); - logger.debug("Checksum execution time {}", TimeValue.nsecToMSec(System.nanoTime() - start)); + logger.debug("Checksum execution time {}", TimeValue.nsecToMSec(threadpool.relativeTimeInNanos() - start)); } private void executeChecksumTask( diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 12d10fd908b44..022c109c943a3 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -332,7 +332,7 @@ public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterStat uploadedMetadataResults, previousClusterUUID, clusterStateDiffManifest, - !remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) ? new ClusterStateChecksum(clusterState) : null, + !remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) ? new ClusterStateChecksum(clusterState, threadpool) : null, false, codecVersion ); @@ -539,7 +539,7 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( uploadedMetadataResults, previousManifest.getPreviousClusterUUID(), clusterStateDiffManifest, - !remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) ? new ClusterStateChecksum(clusterState) : null, + !remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) ? new ClusterStateChecksum(clusterState,threadpool) : null, false, previousManifest.getCodecVersion() ); @@ -1010,7 +1010,7 @@ public RemoteClusterStateManifestInfo markLastStateAsCommitted( uploadedMetadataResults, previousManifest.getPreviousClusterUUID(), previousManifest.getDiffManifest(), - !remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) ? new ClusterStateChecksum(clusterState) : null, + !remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) ? new ClusterStateChecksum(clusterState, threadpool) : null, true, previousManifest.getCodecVersion() ); @@ -1631,7 +1631,7 @@ void validateClusterStateFromChecksum( String localNodeId, boolean isFullStateDownload ) { - ClusterStateChecksum newClusterStateChecksum = new ClusterStateChecksum(clusterState); + ClusterStateChecksum newClusterStateChecksum = new ClusterStateChecksum(clusterState, threadpool); List failedValidation = newClusterStateChecksum.getMismatchEntities(manifest.getClusterStateChecksum()); if (failedValidation.isEmpty()) { return; diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java index 81220ab171b34..c90589ea0c7b0 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java @@ -118,6 +118,7 @@ public static class Names { public static final String REMOTE_RECOVERY = "remote_recovery"; public static final String REMOTE_STATE_READ = "remote_state_read"; public static final String INDEX_SEARCHER = "index_searcher"; + public static final String REMOTE_STATE_CHECKSUM = "remote_state_checksum"; } /** @@ -191,6 +192,7 @@ public static ThreadPoolType fromType(String type) { map.put(Names.REMOTE_RECOVERY, ThreadPoolType.SCALING); map.put(Names.REMOTE_STATE_READ, ThreadPoolType.SCALING); map.put(Names.INDEX_SEARCHER, ThreadPoolType.RESIZABLE); + map.put(Names.REMOTE_STATE_CHECKSUM, ThreadPoolType.SCALING); THREAD_POOL_TYPES = Collections.unmodifiableMap(map); } @@ -307,6 +309,15 @@ public ThreadPool( runnableTaskListener ) ); + builders.put( + Names.REMOTE_STATE_CHECKSUM, + new ScalingExecutorBuilder( + Names.REMOTE_STATE_CHECKSUM, + 1, + twiceAllocatedProcessors(allocatedProcessors), + TimeValue.timeValueMinutes(5) + ) + ); for (final ExecutorBuilder builder : customBuilders) { if (builders.containsKey(builder.name())) { diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java index 3f9aa1245cab3..13af1a801361e 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java @@ -8,6 +8,7 @@ package org.opensearch.gateway.remote; +import org.junit.AfterClass; import org.opensearch.Version; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; @@ -45,6 +46,8 @@ import java.util.stream.Collectors; import org.mockito.Mockito; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V0; import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V1; @@ -64,6 +67,13 @@ public class ClusterMetadataManifestTests extends OpenSearchTestCase { + private static final ThreadPool threadPool = new TestThreadPool(ClusterMetadataManifestTests.class.getName()); + + @AfterClass + public static void shutdown() throws Exception { + threadPool.shutdown(); + } + public void testClusterMetadataManifestXContentV0() throws IOException { UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path", CODEC_V0); ClusterMetadataManifest originalManifest = ClusterMetadataManifest.builder() @@ -214,7 +224,7 @@ public void testClusterMetadataManifestSerializationEqualsHashCode() { "indicesRoutingDiffPath" ) ) - .checksum(new ClusterStateChecksum(createClusterState())) + .checksum(new ClusterStateChecksum(createClusterState(),threadPool)) .build(); { // Mutate Cluster Term EqualsHashCodeTestUtils.checkEqualsAndHashCode( @@ -647,7 +657,7 @@ public void testClusterMetadataManifestXContentV4() throws IOException { UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path"); UploadedMetadataAttribute uploadedMetadataAttribute = new UploadedMetadataAttribute("attribute_name", "testing_attribute"); final StringKeyDiffProvider routingTableIncrementalDiff = Mockito.mock(StringKeyDiffProvider.class); - ClusterStateChecksum checksum = new ClusterStateChecksum(createClusterState()); + ClusterStateChecksum checksum = new ClusterStateChecksum(createClusterState(),threadPool); ClusterMetadataManifest originalManifest = ClusterMetadataManifest.builder() .clusterTerm(1L) .stateVersion(1L) diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterStateChecksumTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterStateChecksumTests.java index 0203e56dd2d5c..d476fcfe2f637 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/ClusterStateChecksumTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterStateChecksumTests.java @@ -8,6 +8,8 @@ package org.opensearch.gateway.remote; +import org.junit.After; +import org.junit.AfterClass; import org.opensearch.Version; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; @@ -34,6 +36,8 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.util.EnumSet; @@ -41,14 +45,20 @@ import java.util.Map; public class ClusterStateChecksumTests extends OpenSearchTestCase { + private static final ThreadPool threadPool = new TestThreadPool(ClusterStateChecksumTests.class.getName()); + + @AfterClass + public static void shutdown() throws Exception { + threadPool.shutdown(); + } public void testClusterStateChecksumEmptyClusterState() { - ClusterStateChecksum checksum = new ClusterStateChecksum(ClusterState.EMPTY_STATE); + ClusterStateChecksum checksum = new ClusterStateChecksum(ClusterState.EMPTY_STATE, threadPool); assertNotNull(checksum); } public void testClusterStateChecksum() { - ClusterStateChecksum checksum = new ClusterStateChecksum(generateClusterState()); + ClusterStateChecksum checksum = new ClusterStateChecksum(generateClusterState(), threadPool); assertNotNull(checksum); assertTrue(checksum.routingTableChecksum != 0); assertTrue(checksum.nodesChecksum != 0); @@ -65,8 +75,8 @@ public void testClusterStateChecksum() { } public void testClusterStateMatchChecksum() { - ClusterStateChecksum checksum = new ClusterStateChecksum(generateClusterState()); - ClusterStateChecksum newChecksum = new ClusterStateChecksum(generateClusterState()); + ClusterStateChecksum checksum = new ClusterStateChecksum(generateClusterState(), threadPool); + ClusterStateChecksum newChecksum = new ClusterStateChecksum(generateClusterState(), threadPool); assertNotNull(checksum); assertNotNull(newChecksum); assertEquals(checksum.routingTableChecksum, newChecksum.routingTableChecksum); @@ -84,7 +94,7 @@ public void testClusterStateMatchChecksum() { } public void testXContentConversion() throws IOException { - ClusterStateChecksum checksum = new ClusterStateChecksum(generateClusterState()); + ClusterStateChecksum checksum = new ClusterStateChecksum(generateClusterState(), threadPool); final XContentBuilder builder = JsonXContent.contentBuilder(); builder.startObject(); checksum.toXContent(builder, ToXContent.EMPTY_PARAMS); @@ -97,7 +107,7 @@ public void testXContentConversion() throws IOException { } public void testSerialization() throws IOException { - ClusterStateChecksum checksum = new ClusterStateChecksum(generateClusterState()); + ClusterStateChecksum checksum = new ClusterStateChecksum(generateClusterState(), threadPool); BytesStreamOutput output = new BytesStreamOutput(); checksum.writeTo(output); @@ -109,10 +119,10 @@ public void testSerialization() throws IOException { public void testGetMismatchEntities() { ClusterState clsState1 = generateClusterState(); - ClusterStateChecksum checksum = new ClusterStateChecksum(clsState1); + ClusterStateChecksum checksum = new ClusterStateChecksum(clsState1, threadPool); assertTrue(checksum.getMismatchEntities(checksum).isEmpty()); - ClusterStateChecksum checksum2 = new ClusterStateChecksum(clsState1); + ClusterStateChecksum checksum2 = new ClusterStateChecksum(clsState1, threadPool); assertTrue(checksum.getMismatchEntities(checksum2).isEmpty()); ClusterState clsState2 = ClusterState.builder(ClusterName.DEFAULT) @@ -122,7 +132,7 @@ public void testGetMismatchEntities() { .customs(Map.of()) .metadata(Metadata.EMPTY_METADATA) .build(); - ClusterStateChecksum checksum3 = new ClusterStateChecksum(clsState2); + ClusterStateChecksum checksum3 = new ClusterStateChecksum(clsState2, threadPool); List mismatches = checksum.getMismatchEntities(checksum3); assertFalse(mismatches.isEmpty()); assertEquals(11, mismatches.size()); @@ -151,8 +161,8 @@ public void testGetMismatchEntitiesUnorderedInput() { ClusterState state2 = ClusterState.builder(state1).nodes(nodes1).build(); ClusterState state3 = ClusterState.builder(state1).nodes(nodes2).build(); - ClusterStateChecksum checksum1 = new ClusterStateChecksum(state2); - ClusterStateChecksum checksum2 = new ClusterStateChecksum(state3); + ClusterStateChecksum checksum1 = new ClusterStateChecksum(state2, threadPool); + ClusterStateChecksum checksum2 = new ClusterStateChecksum(state3, threadPool); assertEquals(checksum2, checksum1); } diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index b11d5e48bec06..1afbcacb1d46b 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -3123,7 +3123,7 @@ public void testWriteFullMetadataSuccessWithChecksumValidationEnabled() throws I .previousClusterUUID("prev-cluster-uuid") .routingTableVersion(1L) .indicesRouting(List.of(uploadedIndiceRoutingMetadata)) - .checksum(new ClusterStateChecksum(clusterState)) + .checksum(new ClusterStateChecksum(clusterState, threadPool)) .build(); assertThat(manifest.getIndices().size(), is(1)); @@ -3193,7 +3193,7 @@ public void testWriteIncrementalMetadataSuccessWithChecksumValidationEnabled() t final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder() .indices(Collections.emptyList()) - .checksum(new ClusterStateChecksum(clusterState)) + .checksum(new ClusterStateChecksum(clusterState, threadPool)) .build(); when((blobStoreRepository.basePath())).thenReturn(BlobPath.cleanPath().add("base-path")); @@ -3219,7 +3219,7 @@ public void testWriteIncrementalMetadataSuccessWithChecksumValidationEnabled() t .previousClusterUUID("prev-cluster-uuid") .routingTableVersion(1) .indicesRouting(List.of(uploadedIndiceRoutingMetadata)) - .checksum(new ClusterStateChecksum(clusterState)) + .checksum(new ClusterStateChecksum(clusterState, threadPool)) .build(); assertThat(manifest.getIndices().size(), is(1)); @@ -3245,7 +3245,7 @@ public void testWriteIncrementalMetadataSuccessWithChecksumValidationModeNone() final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder() .indices(Collections.emptyList()) - .checksum(new ClusterStateChecksum(clusterState)) + .checksum(new ClusterStateChecksum(clusterState, threadPool)) .build(); when((blobStoreRepository.basePath())).thenReturn(BlobPath.cleanPath().add("base-path")); @@ -3271,7 +3271,7 @@ public void testWriteIncrementalMetadataSuccessWithChecksumValidationModeNone() .previousClusterUUID("prev-cluster-uuid") .routingTableVersion(1) .indicesRouting(List.of(uploadedIndiceRoutingMetadata)) - .checksum(new ClusterStateChecksum(clusterState)) + .checksum(new ClusterStateChecksum(clusterState, threadPool)) .build(); assertThat(manifest.getIndices().size(), is(1)); @@ -3348,7 +3348,7 @@ public void testGetClusterStateForManifestWithChecksumValidationEnabled() throws initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.FAILURE); ClusterState clusterState = generateClusterStateWithAllAttributes().build(); ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum( - new ClusterStateChecksum(clusterState) + new ClusterStateChecksum(clusterState, threadPool) ).build(); remoteClusterStateService.start(); RemoteClusterStateService mockService = spy(remoteClusterStateService); @@ -3380,7 +3380,7 @@ public void testGetClusterStateForManifestWithChecksumValidationModeNone() throw initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.NONE); ClusterState clusterState = generateClusterStateWithAllAttributes().build(); ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum( - new ClusterStateChecksum(clusterState) + new ClusterStateChecksum(clusterState, threadPool) ).build(); remoteClusterStateService.start(); RemoteClusterStateService mockService = spy(remoteClusterStateService); @@ -3412,7 +3412,7 @@ public void testGetClusterStateForManifestWithChecksumValidationEnabledWithMisma initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.FAILURE); ClusterState clusterState = generateClusterStateWithAllAttributes().build(); ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum( - new ClusterStateChecksum(clusterState) + new ClusterStateChecksum(clusterState, threadPool) ).build(); remoteClusterStateService.start(); RemoteClusterStateService mockService = spy(remoteClusterStateService); @@ -3461,7 +3461,7 @@ public void testGetClusterStateForManifestWithChecksumValidationDebugWithMismatc ); ClusterState clusterState = generateClusterStateWithAllAttributes().build(); ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum( - new ClusterStateChecksum(clusterState) + new ClusterStateChecksum(clusterState, threadPool) ).build(); remoteClusterStateService.start(); RemoteClusterStateService mockService = spy(remoteClusterStateService); @@ -3500,7 +3500,7 @@ public void testGetClusterStateUsingDiffWithChecksum() throws IOException { initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.FAILURE); ClusterState clusterState = generateClusterStateWithAllAttributes().build(); ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum( - new ClusterStateChecksum(clusterState) + new ClusterStateChecksum(clusterState, threadPool) ).diffManifest(ClusterStateDiffManifest.builder().build()).build(); remoteClusterStateService.start(); @@ -3541,7 +3541,7 @@ public void testGetClusterStateUsingDiffWithChecksumModeNone() throws IOExceptio initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.NONE); ClusterState clusterState = generateClusterStateWithAllAttributes().build(); ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum( - new ClusterStateChecksum(clusterState) + new ClusterStateChecksum(clusterState, threadPool) ).diffManifest(ClusterStateDiffManifest.builder().build()).build(); remoteClusterStateService.start(); @@ -3582,7 +3582,7 @@ public void testGetClusterStateUsingDiffWithChecksumModeDebugMismatch() throws I initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.DEBUG); ClusterState clusterState = generateClusterStateWithAllAttributes().build(); ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum( - new ClusterStateChecksum(clusterState) + new ClusterStateChecksum(clusterState, threadPool) ).diffManifest(ClusterStateDiffManifest.builder().build()).build(); remoteClusterStateService.start(); @@ -3622,7 +3622,7 @@ public void testGetClusterStateUsingDiffWithChecksumModeTraceMismatch() throws I initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.TRACE); ClusterState clusterState = generateClusterStateWithAllAttributes().build(); ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum( - new ClusterStateChecksum(clusterState) + new ClusterStateChecksum(clusterState, threadPool) ).diffManifest(ClusterStateDiffManifest.builder().build()).build(); remoteClusterStateService.start(); @@ -3683,7 +3683,7 @@ public void testGetClusterStateUsingDiffWithChecksumMismatch() throws IOExceptio initializeWithChecksumEnabled(RemoteClusterStateService.RemoteClusterStateValidationMode.FAILURE); ClusterState clusterState = generateClusterStateWithAllAttributes().build(); ClusterMetadataManifest manifest = generateClusterMetadataManifestWithAllAttributes().checksum( - new ClusterStateChecksum(clusterState) + new ClusterStateChecksum(clusterState, threadPool) ).diffManifest(ClusterStateDiffManifest.builder().build()).build(); remoteClusterStateService.start(); diff --git a/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java b/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java index 4f59f9688fb7e..8a2b6011cbdaf 100644 --- a/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java +++ b/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java @@ -157,6 +157,7 @@ private int expectedSize(final String threadPoolName, final int numberOfProcesso sizes.put(ThreadPool.Names.REMOTE_REFRESH_RETRY, ThreadPool::halfAllocatedProcessors); sizes.put(ThreadPool.Names.REMOTE_RECOVERY, ThreadPool::twiceAllocatedProcessors); sizes.put(ThreadPool.Names.REMOTE_STATE_READ, ThreadPool::twiceAllocatedProcessors); + sizes.put(ThreadPool.Names.REMOTE_STATE_CHECKSUM, ThreadPool::twiceAllocatedProcessors); return sizes.get(threadPoolName).apply(numberOfProcessors); } From cc43b335834967ef4381490f9ea90ce55cf80dfc Mon Sep 17 00:00:00 2001 From: Himshikha Gupta Date: Wed, 25 Sep 2024 13:32:20 +0530 Subject: [PATCH 05/10] spotless Signed-off-by: Himshikha Gupta --- .../gateway/remote/ClusterStateChecksum.java | 2 +- .../gateway/remote/RemoteClusterStateService.java | 12 +++++++++--- .../gateway/remote/ClusterMetadataManifestTests.java | 10 +++++----- .../gateway/remote/ClusterStateChecksumTests.java | 3 +-- 4 files changed, 16 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java index 891f2a4dd0abf..0767f637081d3 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java @@ -23,6 +23,7 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParseException; import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.util.ArrayList; @@ -34,7 +35,6 @@ import java.util.function.Function; import com.jcraft.jzlib.JZlib; -import org.opensearch.threadpool.ThreadPool; import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 022c109c943a3..2cae9698c8b3f 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -332,7 +332,9 @@ public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterStat uploadedMetadataResults, previousClusterUUID, clusterStateDiffManifest, - !remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) ? new ClusterStateChecksum(clusterState, threadpool) : null, + !remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) + ? new ClusterStateChecksum(clusterState, threadpool) + : null, false, codecVersion ); @@ -539,7 +541,9 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( uploadedMetadataResults, previousManifest.getPreviousClusterUUID(), clusterStateDiffManifest, - !remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) ? new ClusterStateChecksum(clusterState,threadpool) : null, + !remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) + ? new ClusterStateChecksum(clusterState, threadpool) + : null, false, previousManifest.getCodecVersion() ); @@ -1010,7 +1014,9 @@ public RemoteClusterStateManifestInfo markLastStateAsCommitted( uploadedMetadataResults, previousManifest.getPreviousClusterUUID(), previousManifest.getDiffManifest(), - !remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) ? new ClusterStateChecksum(clusterState, threadpool) : null, + !remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) + ? new ClusterStateChecksum(clusterState, threadpool) + : null, true, previousManifest.getCodecVersion() ); diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java index 13af1a801361e..63993904176a0 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java @@ -8,7 +8,6 @@ package org.opensearch.gateway.remote; -import org.junit.AfterClass; import org.opensearch.Version; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; @@ -35,6 +34,9 @@ import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; import org.opensearch.test.EqualsHashCodeTestUtils; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.junit.AfterClass; import java.io.IOException; import java.util.ArrayList; @@ -46,8 +48,6 @@ import java.util.stream.Collectors; import org.mockito.Mockito; -import org.opensearch.threadpool.TestThreadPool; -import org.opensearch.threadpool.ThreadPool; import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V0; import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V1; @@ -224,7 +224,7 @@ public void testClusterMetadataManifestSerializationEqualsHashCode() { "indicesRoutingDiffPath" ) ) - .checksum(new ClusterStateChecksum(createClusterState(),threadPool)) + .checksum(new ClusterStateChecksum(createClusterState(), threadPool)) .build(); { // Mutate Cluster Term EqualsHashCodeTestUtils.checkEqualsAndHashCode( @@ -657,7 +657,7 @@ public void testClusterMetadataManifestXContentV4() throws IOException { UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "test-uuid", "/test/upload/path"); UploadedMetadataAttribute uploadedMetadataAttribute = new UploadedMetadataAttribute("attribute_name", "testing_attribute"); final StringKeyDiffProvider routingTableIncrementalDiff = Mockito.mock(StringKeyDiffProvider.class); - ClusterStateChecksum checksum = new ClusterStateChecksum(createClusterState(),threadPool); + ClusterStateChecksum checksum = new ClusterStateChecksum(createClusterState(), threadPool); ClusterMetadataManifest originalManifest = ClusterMetadataManifest.builder() .clusterTerm(1L) .stateVersion(1L) diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterStateChecksumTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterStateChecksumTests.java index d476fcfe2f637..31495e259e749 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/ClusterStateChecksumTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterStateChecksumTests.java @@ -8,8 +8,6 @@ package org.opensearch.gateway.remote; -import org.junit.After; -import org.junit.AfterClass; import org.opensearch.Version; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; @@ -38,6 +36,7 @@ import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; +import org.junit.AfterClass; import java.io.IOException; import java.util.EnumSet; From 79b9f7d50873b5d792051810fbe025095d5cf251 Mon Sep 17 00:00:00 2001 From: Himshikha Gupta Date: Wed, 25 Sep 2024 15:06:39 +0530 Subject: [PATCH 06/10] fixing thread leak in UT Signed-off-by: Himshikha Gupta --- .../gateway/remote/ClusterMetadataManifestTests.java | 9 +++++---- .../gateway/remote/ClusterStateChecksumTests.java | 9 +++++---- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java index 63993904176a0..09c2933680be3 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterMetadataManifestTests.java @@ -36,7 +36,7 @@ import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; -import org.junit.AfterClass; +import org.junit.After; import java.io.IOException; import java.util.ArrayList; @@ -67,10 +67,11 @@ public class ClusterMetadataManifestTests extends OpenSearchTestCase { - private static final ThreadPool threadPool = new TestThreadPool(ClusterMetadataManifestTests.class.getName()); + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); - @AfterClass - public static void shutdown() throws Exception { + @After + public void teardown() throws Exception { + super.tearDown(); threadPool.shutdown(); } diff --git a/server/src/test/java/org/opensearch/gateway/remote/ClusterStateChecksumTests.java b/server/src/test/java/org/opensearch/gateway/remote/ClusterStateChecksumTests.java index 31495e259e749..9b98187053a39 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/ClusterStateChecksumTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/ClusterStateChecksumTests.java @@ -36,7 +36,7 @@ import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; -import org.junit.AfterClass; +import org.junit.After; import java.io.IOException; import java.util.EnumSet; @@ -44,10 +44,11 @@ import java.util.Map; public class ClusterStateChecksumTests extends OpenSearchTestCase { - private static final ThreadPool threadPool = new TestThreadPool(ClusterStateChecksumTests.class.getName()); + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); - @AfterClass - public static void shutdown() throws Exception { + @After + public void teardown() throws Exception { + super.tearDown(); threadPool.shutdown(); } From d316b4d5657cd592e64ebd6a810a3ed5ceb9b287 Mon Sep 17 00:00:00 2001 From: Himshikha Gupta Date: Mon, 30 Sep 2024 11:47:06 +0530 Subject: [PATCH 07/10] comments address Signed-off-by: Himshikha Gupta --- .../gateway/remote/ClusterStateChecksum.java | 53 ++----------------- 1 file changed, 5 insertions(+), 48 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java index 0767f637081d3..cc954baf504c7 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java @@ -12,6 +12,7 @@ import org.apache.logging.log4j.Logger; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.DiffableStringMap; +import org.opensearch.common.CheckedFunction; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; @@ -78,104 +79,60 @@ public ClusterStateChecksum(ClusterState clusterState, ThreadPool threadpool) { CountDownLatch latch = new CountDownLatch(COMPONENT_SIZE); executeChecksumTask((stream) -> { - try { - clusterState.routingTable().writeVerifiableTo(stream); - } catch (IOException e) { - throw new RemoteStateTransferException("Failed to create checksum for routing table", e); - } + clusterState.routingTable().writeVerifiableTo(stream); return null; - }, checksum -> routingTableChecksum = checksum, executorService, latch); + }, checksum -> routingTableChecksum = checksum, executorService, latch); executeChecksumTask((stream) -> { - try { clusterState.nodes().writeVerifiableTo(stream); - } catch (IOException e) { - throw new RemoteStateTransferException("Failed to create checksum for discovery nodes", e); - } return null; }, checksum -> nodesChecksum = checksum, executorService, latch); executeChecksumTask((stream) -> { - try { clusterState.coordinationMetadata().writeVerifiableTo(stream); - } catch (IOException e) { - throw new RemoteStateTransferException("Failed to create checksum for coordination metadata", e); - } return null; }, checksum -> coordinationMetadataChecksum = checksum, executorService, latch); executeChecksumTask((stream) -> { - try { Settings.writeSettingsToStream(clusterState.metadata().persistentSettings(), stream); - } catch (IOException e) { - throw new RemoteStateTransferException("Failed to create checksum for settings metadata", e); - } return null; }, checksum -> settingMetadataChecksum = checksum, executorService, latch); executeChecksumTask((stream) -> { - try { Settings.writeSettingsToStream(clusterState.metadata().transientSettings(), stream); - } catch (IOException e) { - throw new RemoteStateTransferException("Failed to create checksum for transient settings metadata", e); - } return null; }, checksum -> transientSettingsMetadataChecksum = checksum, executorService, latch); executeChecksumTask((stream) -> { - try { clusterState.metadata().templatesMetadata().writeVerifiableTo(stream); - } catch (IOException e) { - throw new RemoteStateTransferException("Failed to create checksum for templates metadata", e); - } return null; }, checksum -> templatesMetadataChecksum = checksum, executorService, latch); executeChecksumTask((stream) -> { - try { stream.writeStringCollection(clusterState.metadata().customs().keySet()); - } catch (IOException e) { - throw new RemoteStateTransferException("Failed to create checksum for customs metadata", e); - } return null; }, checksum -> customMetadataMapChecksum = checksum, executorService, latch); executeChecksumTask((stream) -> { - try { ((DiffableStringMap) clusterState.metadata().hashesOfConsistentSettings()).writeTo(stream); - } catch (IOException e) { - throw new RemoteStateTransferException("Failed to create checksum for hashesOfConsistentSettings", e); - } return null; }, checksum -> hashesOfConsistentSettingsChecksum = checksum, executorService, latch); executeChecksumTask((stream) -> { - try { stream.writeMapValues( clusterState.metadata().indices(), (checksumStream, value) -> value.writeVerifiableTo((BufferedChecksumStreamOutput) checksumStream) ); - } catch (IOException e) { - throw new RemoteStateTransferException("Failed to create checksum for indices metadata", e); - } return null; }, checksum -> indicesChecksum = checksum, executorService, latch); executeChecksumTask((stream) -> { - try { clusterState.blocks().writeVerifiableTo(stream); - } catch (IOException e) { - throw new RemoteStateTransferException("Failed to create checksum for cluster state blocks", e); - } return null; }, checksum -> blocksChecksum = checksum, executorService, latch); executeChecksumTask((stream) -> { - try { stream.writeStringCollection(clusterState.customs().keySet()); - } catch (IOException e) { - throw new RemoteStateTransferException("Failed to create checksum for cluster state customs", e); - } return null; }, checksum -> clusterStateCustomsChecksum = checksum, executorService, latch); @@ -189,7 +146,7 @@ public ClusterStateChecksum(ClusterState clusterState, ThreadPool threadpool) { } private void executeChecksumTask( - Function checksumTask, + CheckedFunction checksumTask, Consumer checksumConsumer, ExecutorService executorService, CountDownLatch latch @@ -205,7 +162,7 @@ private void executeChecksumTask( }); } - private long createChecksum(Function o) throws IOException { + private long createChecksum(CheckedFunction o) throws IOException { try ( BytesStreamOutput out = new BytesStreamOutput(); BufferedChecksumStreamOutput checksumOut = new BufferedChecksumStreamOutput(out) From 022d98f4f5a63c4c53379c4115118691937fa7fb Mon Sep 17 00:00:00 2001 From: Himshikha Gupta Date: Mon, 30 Sep 2024 12:04:08 +0530 Subject: [PATCH 08/10] spotless Signed-off-by: Himshikha Gupta --- .../gateway/remote/ClusterStateChecksum.java | 29 +++++++++---------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java index cc954baf504c7..d15163f6131f1 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java @@ -33,7 +33,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.function.Consumer; -import java.util.function.Function; import com.jcraft.jzlib.JZlib; @@ -81,58 +80,58 @@ public ClusterStateChecksum(ClusterState clusterState, ThreadPool threadpool) { executeChecksumTask((stream) -> { clusterState.routingTable().writeVerifiableTo(stream); return null; - }, checksum -> routingTableChecksum = checksum, executorService, latch); + }, checksum -> routingTableChecksum = checksum, executorService, latch); executeChecksumTask((stream) -> { - clusterState.nodes().writeVerifiableTo(stream); + clusterState.nodes().writeVerifiableTo(stream); return null; }, checksum -> nodesChecksum = checksum, executorService, latch); executeChecksumTask((stream) -> { - clusterState.coordinationMetadata().writeVerifiableTo(stream); + clusterState.coordinationMetadata().writeVerifiableTo(stream); return null; }, checksum -> coordinationMetadataChecksum = checksum, executorService, latch); executeChecksumTask((stream) -> { - Settings.writeSettingsToStream(clusterState.metadata().persistentSettings(), stream); + Settings.writeSettingsToStream(clusterState.metadata().persistentSettings(), stream); return null; }, checksum -> settingMetadataChecksum = checksum, executorService, latch); executeChecksumTask((stream) -> { - Settings.writeSettingsToStream(clusterState.metadata().transientSettings(), stream); + Settings.writeSettingsToStream(clusterState.metadata().transientSettings(), stream); return null; }, checksum -> transientSettingsMetadataChecksum = checksum, executorService, latch); executeChecksumTask((stream) -> { - clusterState.metadata().templatesMetadata().writeVerifiableTo(stream); + clusterState.metadata().templatesMetadata().writeVerifiableTo(stream); return null; }, checksum -> templatesMetadataChecksum = checksum, executorService, latch); executeChecksumTask((stream) -> { - stream.writeStringCollection(clusterState.metadata().customs().keySet()); + stream.writeStringCollection(clusterState.metadata().customs().keySet()); return null; }, checksum -> customMetadataMapChecksum = checksum, executorService, latch); executeChecksumTask((stream) -> { - ((DiffableStringMap) clusterState.metadata().hashesOfConsistentSettings()).writeTo(stream); + ((DiffableStringMap) clusterState.metadata().hashesOfConsistentSettings()).writeTo(stream); return null; }, checksum -> hashesOfConsistentSettingsChecksum = checksum, executorService, latch); executeChecksumTask((stream) -> { - stream.writeMapValues( - clusterState.metadata().indices(), - (checksumStream, value) -> value.writeVerifiableTo((BufferedChecksumStreamOutput) checksumStream) - ); + stream.writeMapValues( + clusterState.metadata().indices(), + (checksumStream, value) -> value.writeVerifiableTo((BufferedChecksumStreamOutput) checksumStream) + ); return null; }, checksum -> indicesChecksum = checksum, executorService, latch); executeChecksumTask((stream) -> { - clusterState.blocks().writeVerifiableTo(stream); + clusterState.blocks().writeVerifiableTo(stream); return null; }, checksum -> blocksChecksum = checksum, executorService, latch); executeChecksumTask((stream) -> { - stream.writeStringCollection(clusterState.customs().keySet()); + stream.writeStringCollection(clusterState.customs().keySet()); return null; }, checksum -> clusterStateCustomsChecksum = checksum, executorService, latch); From 93f9a4339ae23af991297b6239c9c1b21521f22c Mon Sep 17 00:00:00 2001 From: Himshikha Gupta Date: Mon, 30 Sep 2024 12:21:34 +0530 Subject: [PATCH 09/10] refactor Signed-off-by: Himshikha Gupta --- .../org/opensearch/gateway/remote/ClusterStateChecksum.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java index d15163f6131f1..3b4d856354c5c 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java @@ -161,12 +161,12 @@ private void executeChecksumTask( }); } - private long createChecksum(CheckedFunction o) throws IOException { + private long createChecksum(CheckedFunction task) throws IOException { try ( BytesStreamOutput out = new BytesStreamOutput(); BufferedChecksumStreamOutput checksumOut = new BufferedChecksumStreamOutput(out) ) { - o.apply(checksumOut); + task.apply(checksumOut); return checksumOut.getChecksum(); } } From 989e4febf49fb6e554f4a6f10eb83f9851e6430a Mon Sep 17 00:00:00 2001 From: Himshikha Gupta Date: Tue, 1 Oct 2024 12:11:59 +0530 Subject: [PATCH 10/10] changing thread pool Signed-off-by: Himshikha Gupta --- .../gateway/remote/ClusterStateChecksum.java | 2 +- .../java/org/opensearch/threadpool/ThreadPool.java | 10 +++------- .../opensearch/threadpool/ScalingThreadPoolTests.java | 1 - 3 files changed, 4 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java index 3b4d856354c5c..aa007f5da15b3 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterStateChecksum.java @@ -43,6 +43,7 @@ */ public class ClusterStateChecksum implements ToXContentFragment, Writeable { + public static final int COMPONENT_SIZE = 11; static final String ROUTING_TABLE_CS = "routing_table"; static final String NODES_CS = "discovery_nodes"; static final String BLOCKS_CS = "blocks"; @@ -56,7 +57,6 @@ public class ClusterStateChecksum implements ToXContentFragment, Writeable { static final String INDICES_CS = "indices_md"; private static final String CLUSTER_STATE_CS = "cluster_state"; private static final int CHECKSUM_SIZE = 8; - private static final int COMPONENT_SIZE = 11; private static final Logger logger = LogManager.getLogger(ClusterStateChecksum.class); long routingTableChecksum; diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java index c90589ea0c7b0..d795fd252b7fc 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java @@ -53,6 +53,7 @@ import org.opensearch.core.service.ReportingService; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.gateway.remote.ClusterStateChecksum; import org.opensearch.node.Node; import java.io.IOException; @@ -192,7 +193,7 @@ public static ThreadPoolType fromType(String type) { map.put(Names.REMOTE_RECOVERY, ThreadPoolType.SCALING); map.put(Names.REMOTE_STATE_READ, ThreadPoolType.SCALING); map.put(Names.INDEX_SEARCHER, ThreadPoolType.RESIZABLE); - map.put(Names.REMOTE_STATE_CHECKSUM, ThreadPoolType.SCALING); + map.put(Names.REMOTE_STATE_CHECKSUM, ThreadPoolType.FIXED); THREAD_POOL_TYPES = Collections.unmodifiableMap(map); } @@ -311,12 +312,7 @@ public ThreadPool( ); builders.put( Names.REMOTE_STATE_CHECKSUM, - new ScalingExecutorBuilder( - Names.REMOTE_STATE_CHECKSUM, - 1, - twiceAllocatedProcessors(allocatedProcessors), - TimeValue.timeValueMinutes(5) - ) + new FixedExecutorBuilder(settings, Names.REMOTE_STATE_CHECKSUM, ClusterStateChecksum.COMPONENT_SIZE, 1000) ); for (final ExecutorBuilder builder : customBuilders) { diff --git a/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java b/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java index 8a2b6011cbdaf..4f59f9688fb7e 100644 --- a/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java +++ b/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java @@ -157,7 +157,6 @@ private int expectedSize(final String threadPoolName, final int numberOfProcesso sizes.put(ThreadPool.Names.REMOTE_REFRESH_RETRY, ThreadPool::halfAllocatedProcessors); sizes.put(ThreadPool.Names.REMOTE_RECOVERY, ThreadPool::twiceAllocatedProcessors); sizes.put(ThreadPool.Names.REMOTE_STATE_READ, ThreadPool::twiceAllocatedProcessors); - sizes.put(ThreadPool.Names.REMOTE_STATE_CHECKSUM, ThreadPool::twiceAllocatedProcessors); return sizes.get(threadPoolName).apply(numberOfProcessors); }