From 11317d29255bb62f1ce5a7666fb90e32156c0758 Mon Sep 17 00:00:00 2001 From: Bhumika Saini Date: Thu, 5 Oct 2023 06:52:10 +0530 Subject: [PATCH] [Remote Store] Add support to reload repository metadata inplace (#9569) --------- Signed-off-by: Bhumika Saini --- CHANGELOG.md | 1 + .../repositories/url/URLRepository.java | 2 +- .../repositories/azure/AzureRepository.java | 9 +- .../gcs/GoogleCloudStorageRepository.java | 9 +- .../repositories/hdfs/HdfsRepository.java | 2 +- .../repositories/s3/S3BlobStore.java | 28 +- .../repositories/s3/S3Repository.java | 245 ++++++++++++------ .../repositories/s3/S3RepositoryPlugin.java | 3 +- .../opensearch/repositories/s3/S3Service.java | 5 +- .../repositories/s3/S3RepositoryTests.java | 10 +- .../RemoteStoreBaseIntegTestCase.java | 1 - .../remotestore/RemoteStoreRestoreIT.java | 47 +++- .../common/blobstore/BlobStore.java | 7 + .../repositories/RepositoriesService.java | 34 ++- .../opensearch/repositories/Repository.java | 18 ++ .../blobstore/BlobStoreRepository.java | 54 ++-- .../blobstore/MeteredBlobStoreRepository.java | 11 +- .../repositories/fs/FsRepository.java | 46 ++-- .../fs/ReloadableFsRepository.java | 51 ++++ .../RepositoriesServiceTests.java | 18 +- .../blobstore/BlobStoreRepositoryTests.java | 7 +- .../fs/ReloadableFsRepositoryTests.java | 119 +++++++++ .../MockEventuallyConsistentRepository.java | 2 +- 23 files changed, 545 insertions(+), 184 deletions(-) create mode 100644 server/src/main/java/org/opensearch/repositories/fs/ReloadableFsRepository.java create mode 100644 server/src/test/java/org/opensearch/repositories/fs/ReloadableFsRepositoryTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 1c99c2ad1b99f..aea4a45b60ffc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -129,6 +129,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Disable concurrent segment search when terminate_after is used ([#10200](https://github.com/opensearch-project/OpenSearch/pull/10200)) - Add instrumentation in Inbound Handler. ([#100143](https://github.com/opensearch-project/OpenSearch/pull/10143)) - Enable remote segment upload backpressure by default ([#10356](https://github.com/opensearch-project/OpenSearch/pull/10356)) +- [Remote Store] Add support to reload repository metadata inplace ([#9569](https://github.com/opensearch-project/OpenSearch/pull/9569)) ### Deprecated diff --git a/modules/repository-url/src/main/java/org/opensearch/repositories/url/URLRepository.java b/modules/repository-url/src/main/java/org/opensearch/repositories/url/URLRepository.java index 9e9d94c8e8fc0..4c8d8aab4532b 100644 --- a/modules/repository-url/src/main/java/org/opensearch/repositories/url/URLRepository.java +++ b/modules/repository-url/src/main/java/org/opensearch/repositories/url/URLRepository.java @@ -113,7 +113,7 @@ public URLRepository( ClusterService clusterService, RecoverySettings recoverySettings ) { - super(metadata, false, namedXContentRegistry, clusterService, recoverySettings); + super(metadata, namedXContentRegistry, clusterService, recoverySettings); if (URL_SETTING.exists(metadata.settings()) == false && REPOSITORIES_URL_SETTING.exists(environment.settings()) == false) { throw new RepositoryException(metadata.name(), "missing url"); diff --git a/plugins/repository-azure/src/main/java/org/opensearch/repositories/azure/AzureRepository.java b/plugins/repository-azure/src/main/java/org/opensearch/repositories/azure/AzureRepository.java index 381a35bbc11e1..47a5536a6cd8a 100644 --- a/plugins/repository-azure/src/main/java/org/opensearch/repositories/azure/AzureRepository.java +++ b/plugins/repository-azure/src/main/java/org/opensearch/repositories/azure/AzureRepository.java @@ -116,14 +116,7 @@ public AzureRepository( final ClusterService clusterService, final RecoverySettings recoverySettings ) { - super( - metadata, - COMPRESS_SETTING.get(metadata.settings()), - namedXContentRegistry, - clusterService, - recoverySettings, - buildLocation(metadata) - ); + super(metadata, namedXContentRegistry, clusterService, recoverySettings, buildLocation(metadata)); this.chunkSize = Repository.CHUNK_SIZE_SETTING.get(metadata.settings()); this.storageService = storageService; diff --git a/plugins/repository-gcs/src/main/java/org/opensearch/repositories/gcs/GoogleCloudStorageRepository.java b/plugins/repository-gcs/src/main/java/org/opensearch/repositories/gcs/GoogleCloudStorageRepository.java index 13671bc2aa8d6..f6d078868b875 100644 --- a/plugins/repository-gcs/src/main/java/org/opensearch/repositories/gcs/GoogleCloudStorageRepository.java +++ b/plugins/repository-gcs/src/main/java/org/opensearch/repositories/gcs/GoogleCloudStorageRepository.java @@ -94,14 +94,7 @@ class GoogleCloudStorageRepository extends MeteredBlobStoreRepository { final ClusterService clusterService, final RecoverySettings recoverySettings ) { - super( - metadata, - getSetting(COMPRESS_SETTING, metadata), - namedXContentRegistry, - clusterService, - recoverySettings, - buildLocation(metadata) - ); + super(metadata, namedXContentRegistry, clusterService, recoverySettings, buildLocation(metadata)); this.storageService = storageService; String basePath = BASE_PATH.get(metadata.settings()); diff --git a/plugins/repository-hdfs/src/main/java/org/opensearch/repositories/hdfs/HdfsRepository.java b/plugins/repository-hdfs/src/main/java/org/opensearch/repositories/hdfs/HdfsRepository.java index b28d28d76cfde..f0ffec5713c1d 100644 --- a/plugins/repository-hdfs/src/main/java/org/opensearch/repositories/hdfs/HdfsRepository.java +++ b/plugins/repository-hdfs/src/main/java/org/opensearch/repositories/hdfs/HdfsRepository.java @@ -83,7 +83,7 @@ public HdfsRepository( final ClusterService clusterService, final RecoverySettings recoverySettings ) { - super(metadata, COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, clusterService, recoverySettings); + super(metadata, namedXContentRegistry, clusterService, recoverySettings); this.environment = environment; this.chunkSize = metadata.settings().getAsBytesSize("chunk_size", null); diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java index 8a5b92d71bb45..3dd373b5b9f32 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java @@ -50,6 +50,12 @@ import java.util.Locale; import java.util.Map; +import static org.opensearch.repositories.s3.S3Repository.BUCKET_SETTING; +import static org.opensearch.repositories.s3.S3Repository.BUFFER_SIZE_SETTING; +import static org.opensearch.repositories.s3.S3Repository.CANNED_ACL_SETTING; +import static org.opensearch.repositories.s3.S3Repository.SERVER_SIDE_ENCRYPTION_SETTING; +import static org.opensearch.repositories.s3.S3Repository.STORAGE_CLASS_SETTING; + class S3BlobStore implements BlobStore { private static final Logger logger = LogManager.getLogger(S3BlobStore.class); @@ -58,17 +64,17 @@ class S3BlobStore implements BlobStore { private final S3AsyncService s3AsyncService; - private final String bucket; + private volatile String bucket; - private final ByteSizeValue bufferSize; + private volatile ByteSizeValue bufferSize; - private final boolean serverSideEncryption; + private volatile boolean serverSideEncryption; - private final ObjectCannedACL cannedACL; + private volatile ObjectCannedACL cannedACL; - private final StorageClass storageClass; + private volatile StorageClass storageClass; - private final RepositoryMetadata repositoryMetadata; + private volatile RepositoryMetadata repositoryMetadata; private final StatsMetricPublisher statsMetricPublisher = new StatsMetricPublisher(); @@ -105,8 +111,14 @@ class S3BlobStore implements BlobStore { this.priorityExecutorBuilder = priorityExecutorBuilder; } - public boolean isMultipartUploadEnabled() { - return multipartUploadEnabled; + @Override + public void reload(RepositoryMetadata repositoryMetadata) { + this.repositoryMetadata = repositoryMetadata; + this.bucket = BUCKET_SETTING.get(repositoryMetadata.settings()); + this.serverSideEncryption = SERVER_SIDE_ENCRYPTION_SETTING.get(repositoryMetadata.settings()); + this.bufferSize = BUFFER_SIZE_SETTING.get(repositoryMetadata.settings()); + this.cannedACL = initCannedACL(CANNED_ACL_SETTING.get(repositoryMetadata.settings())); + this.storageClass = initStorageClass(STORAGE_CLASS_SETTING.get(repositoryMetadata.settings())); } @Override diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java index 8e69fc39f128e..a69309d9e7a6f 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java @@ -32,6 +32,9 @@ package org.opensearch.repositories.s3; +import software.amazon.awssdk.services.s3.model.ObjectCannedACL; +import software.amazon.awssdk.services.s3.model.StorageClass; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.Version; @@ -41,9 +44,11 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.blobstore.BlobStoreException; import org.opensearch.common.logging.DeprecationLogger; import org.opensearch.common.settings.SecureSetting; import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.Strings; import org.opensearch.core.common.settings.SecureString; @@ -62,9 +67,11 @@ import org.opensearch.snapshots.SnapshotInfo; import org.opensearch.threadpool.Scheduler; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -79,7 +86,6 @@ *
{@code concurrent_streams}
Number of concurrent read/write stream (per repository on each node). Defaults to 5.
*
{@code chunk_size}
*
Large file can be divided into chunks. This parameter specifies the chunk size. Defaults to not chucked.
- *
{@code compress}
If set to true metadata files will be stored compressed. Defaults to false.
* */ class S3Repository extends MeteredBlobStoreRepository { @@ -205,31 +211,27 @@ class S3Repository extends MeteredBlobStoreRepository { private final S3Service service; - private final String bucket; - - private final ByteSizeValue bufferSize; - - private final ByteSizeValue chunkSize; + private volatile String bucket; - private final BlobPath basePath; + private volatile ByteSizeValue bufferSize; - private final boolean serverSideEncryption; + private volatile ByteSizeValue chunkSize; - private final String storageClass; + private volatile BlobPath basePath; - private final String cannedACL; + private volatile boolean serverSideEncryption; - private final RepositoryMetadata repositoryMetadata; + private volatile String storageClass; + private volatile String cannedACL; private final AsyncTransferManager asyncUploadUtils; private final S3AsyncService s3AsyncService; private final boolean multipartUploadEnabled; private final AsyncExecutorContainer priorityExecutorBuilder; private final AsyncExecutorContainer normalExecutorBuilder; + private final Path pluginConfigPath; - /** - * Constructs an s3 backed repository - */ + // Used by test classes S3Repository( final RepositoryMetadata metadata, final NamedXContentRegistry namedXContentRegistry, @@ -242,77 +244,48 @@ class S3Repository extends MeteredBlobStoreRepository { final S3AsyncService s3AsyncService, final boolean multipartUploadEnabled ) { - super( + this( metadata, - COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, + service, clusterService, recoverySettings, - buildLocation(metadata) + asyncUploadUtils, + priorityExecutorBuilder, + normalExecutorBuilder, + s3AsyncService, + multipartUploadEnabled, + Path.of("") ); + } + + /** + * Constructs an s3 backed repository + */ + S3Repository( + final RepositoryMetadata metadata, + final NamedXContentRegistry namedXContentRegistry, + final S3Service service, + final ClusterService clusterService, + final RecoverySettings recoverySettings, + final AsyncTransferManager asyncUploadUtils, + final AsyncExecutorContainer priorityExecutorBuilder, + final AsyncExecutorContainer normalExecutorBuilder, + final S3AsyncService s3AsyncService, + final boolean multipartUploadEnabled, + Path pluginConfigPath + ) { + super(metadata, namedXContentRegistry, clusterService, recoverySettings, buildLocation(metadata)); this.service = service; this.s3AsyncService = s3AsyncService; this.multipartUploadEnabled = multipartUploadEnabled; - - this.repositoryMetadata = metadata; + this.pluginConfigPath = pluginConfigPath; this.asyncUploadUtils = asyncUploadUtils; this.priorityExecutorBuilder = priorityExecutorBuilder; this.normalExecutorBuilder = normalExecutorBuilder; - // Parse and validate the user's S3 Storage Class setting - this.bucket = BUCKET_SETTING.get(metadata.settings()); - if (bucket == null) { - throw new RepositoryException(metadata.name(), "No bucket defined for s3 repository"); - } - - this.bufferSize = BUFFER_SIZE_SETTING.get(metadata.settings()); - this.chunkSize = CHUNK_SIZE_SETTING.get(metadata.settings()); - - // We make sure that chunkSize is bigger or equal than/to bufferSize - if (this.chunkSize.getBytes() < bufferSize.getBytes()) { - throw new RepositoryException( - metadata.name(), - CHUNK_SIZE_SETTING.getKey() - + " (" - + this.chunkSize - + ") can't be lower than " - + BUFFER_SIZE_SETTING.getKey() - + " (" - + bufferSize - + ")." - ); - } - - final String basePath = BASE_PATH_SETTING.get(metadata.settings()); - if (Strings.hasLength(basePath)) { - this.basePath = new BlobPath().add(basePath); - } else { - this.basePath = BlobPath.cleanPath(); - } - - this.serverSideEncryption = SERVER_SIDE_ENCRYPTION_SETTING.get(metadata.settings()); - - this.storageClass = STORAGE_CLASS_SETTING.get(metadata.settings()); - this.cannedACL = CANNED_ACL_SETTING.get(metadata.settings()); - - if (S3ClientSettings.checkDeprecatedCredentials(metadata.settings())) { - // provided repository settings - deprecationLogger.deprecate( - "s3_repository_secret_settings", - "Using s3 access/secret key from repository settings. Instead " - + "store these in named clients and the opensearch keystore for secure settings." - ); - } - - logger.debug( - "using bucket [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], cannedACL [{}], storageClass [{}]", - bucket, - chunkSize, - serverSideEncryption, - bufferSize, - cannedACL, - storageClass - ); + validateRepositoryMetadata(metadata); + readRepositoryMetadata(); } private static Map buildLocation(RepositoryMetadata metadata) { @@ -367,14 +340,14 @@ protected S3BlobStore createBlobStore() { bufferSize, cannedACL, storageClass, - repositoryMetadata, + metadata, asyncUploadUtils, priorityExecutorBuilder, normalExecutorBuilder ); } - // only use for testing + // only use for testing (S3RepositoryTests) @Override protected BlobStore getBlobStore() { return super.getBlobStore(); @@ -385,6 +358,128 @@ public BlobPath basePath() { return basePath; } + @Override + public boolean isReloadable() { + return true; + } + + @Override + public void reload(RepositoryMetadata newRepositoryMetadata) { + if (isReloadable() == false) { + return; + } + + // Reload configs for S3Repository + super.reload(newRepositoryMetadata); + readRepositoryMetadata(); + + // Reload configs for S3RepositoryPlugin + final Map clientsSettings = S3ClientSettings.load(metadata.settings(), pluginConfigPath); + service.refreshAndClearCache(clientsSettings); + s3AsyncService.refreshAndClearCache(clientsSettings); + + // Reload configs for S3BlobStore + BlobStore blobStore = getBlobStore(); + blobStore.reload(metadata); + } + + /** + * Reloads the values derived from the Repository Metadata + */ + private void readRepositoryMetadata() { + this.bucket = BUCKET_SETTING.get(metadata.settings()); + this.bufferSize = BUFFER_SIZE_SETTING.get(metadata.settings()); + this.chunkSize = CHUNK_SIZE_SETTING.get(metadata.settings()); + final String basePath = BASE_PATH_SETTING.get(metadata.settings()); + if (Strings.hasLength(basePath)) { + this.basePath = new BlobPath().add(basePath); + } else { + this.basePath = BlobPath.cleanPath(); + } + + this.serverSideEncryption = SERVER_SIDE_ENCRYPTION_SETTING.get(metadata.settings()); + this.storageClass = STORAGE_CLASS_SETTING.get(metadata.settings()); + this.cannedACL = CANNED_ACL_SETTING.get(metadata.settings()); + if (S3ClientSettings.checkDeprecatedCredentials(metadata.settings())) { + // provided repository settings + deprecationLogger.deprecate( + "s3_repository_secret_settings", + "Using s3 access/secret key from repository settings. Instead " + + "store these in named clients and the opensearch keystore for secure settings." + ); + } + + logger.debug( + "using bucket [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], cannedACL [{}], storageClass [{}]", + bucket, + chunkSize, + serverSideEncryption, + bufferSize, + cannedACL, + storageClass + ); + } + + @Override + public void validateMetadata(RepositoryMetadata newRepositoryMetadata) { + super.validateMetadata(newRepositoryMetadata); + validateRepositoryMetadata(newRepositoryMetadata); + } + + private void validateRepositoryMetadata(RepositoryMetadata newRepositoryMetadata) { + Settings settings = newRepositoryMetadata.settings(); + if (BUCKET_SETTING.get(settings) == null) { + throw new RepositoryException(newRepositoryMetadata.name(), "No bucket defined for s3 repository"); + } + + // We make sure that chunkSize is bigger or equal than/to bufferSize + if (CHUNK_SIZE_SETTING.get(settings).getBytes() < BUFFER_SIZE_SETTING.get(settings).getBytes()) { + throw new RepositoryException( + newRepositoryMetadata.name(), + CHUNK_SIZE_SETTING.getKey() + + " (" + + CHUNK_SIZE_SETTING.get(settings) + + ") can't be lower than " + + BUFFER_SIZE_SETTING.getKey() + + " (" + + BUFFER_SIZE_SETTING.get(settings) + + ")." + ); + } + + validateStorageClass(STORAGE_CLASS_SETTING.get(settings)); + validateCannedACL(CANNED_ACL_SETTING.get(settings)); + } + + private static void validateStorageClass(String storageClassStringValue) { + if ((storageClassStringValue == null) || storageClassStringValue.equals("")) { + return; + } + + final StorageClass storageClass = StorageClass.fromValue(storageClassStringValue.toUpperCase(Locale.ENGLISH)); + if (storageClass.equals(StorageClass.GLACIER)) { + throw new BlobStoreException("Glacier storage class is not supported"); + } + + if (storageClass == StorageClass.UNKNOWN_TO_SDK_VERSION) { + throw new BlobStoreException("`" + storageClassStringValue + "` is not a valid S3 Storage Class."); + } + } + + private static void validateCannedACL(String cannedACLStringValue) { + if ((cannedACLStringValue == null) || cannedACLStringValue.equals("")) { + return; + } + + for (final ObjectCannedACL cur : ObjectCannedACL.values()) { + if (cur.toString().equalsIgnoreCase(cannedACLStringValue)) { + return; + } + } + + throw new BlobStoreException("cannedACL is not valid: [" + cannedACLStringValue + "]"); + } + @Override protected ByteSizeValue chunkSize() { return chunkSize; diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java index 6ef60474afe8c..a80ee0ca35fae 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java @@ -182,7 +182,8 @@ protected S3Repository createRepository( priorityExecutorBuilder, normalExecutorBuilder, s3AsyncService, - S3Repository.PARALLEL_MULTIPART_UPLOAD_ENABLED_SETTING.get(clusterService.getSettings()) + S3Repository.PARALLEL_MULTIPART_UPLOAD_ENABLED_SETTING.get(clusterService.getSettings()), + configPath ); } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Service.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Service.java index b13672b4179f8..b1b3e19eac275 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Service.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Service.java @@ -90,6 +90,7 @@ import java.security.SecureRandom; import java.time.Duration; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import static java.util.Collections.emptyMap; @@ -100,7 +101,7 @@ class S3Service implements Closeable { private static final String DEFAULT_S3_ENDPOINT = "s3.amazonaws.com"; - private volatile Map clientsCache = emptyMap(); + private volatile Map clientsCache = new ConcurrentHashMap<>(); /** * Client settings calculated from static configuration and settings in the keystore. @@ -111,7 +112,7 @@ class S3Service implements Closeable { * Client settings derived from those in {@link #staticClientSettings} by combining them with settings * in the {@link RepositoryMetadata}. */ - private volatile Map derivedClientSettings = emptyMap(); + private volatile Map derivedClientSettings = new ConcurrentHashMap<>(); S3Service(final Path configPath) { staticClientSettings = MapBuilder.newMapBuilder() diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3RepositoryTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3RepositoryTests.java index 9ea8d98505161..e65ca69a5047b 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3RepositoryTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3RepositoryTests.java @@ -125,7 +125,8 @@ public void testBasePathSetting() { } public void testDefaultBufferSize() { - final RepositoryMetadata metadata = new RepositoryMetadata("dummy-repo", "mock", Settings.EMPTY); + Settings settings = Settings.builder().build(); + final RepositoryMetadata metadata = new RepositoryMetadata("dummy-repo", "mock", settings); try (S3Repository s3repo = createS3Repo(metadata)) { assertThat(s3repo.getBlobStore(), is(nullValue())); s3repo.start(); @@ -136,6 +137,13 @@ public void testDefaultBufferSize() { } } + public void testIsReloadable() { + final RepositoryMetadata metadata = new RepositoryMetadata("dummy-repo", "mock", Settings.EMPTY); + try (S3Repository s3repo = createS3Repo(metadata)) { + assertTrue(s3repo.isReloadable()); + } + } + public void testRestrictedSettingsDefault() { final RepositoryMetadata metadata = new RepositoryMetadata("dummy-repo", "mock", Settings.EMPTY); try (S3Repository s3repo = createS3Repo(metadata)) { diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java index bf536557b1485..e2ef5f85abc74 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java @@ -271,7 +271,6 @@ public static Settings buildRemoteStoreNodeAttributes( if (withRateLimiterAttributes) { settings.put(segmentRepoSettingsAttributeKeyPrefix + "compress", randomBoolean()) - .put(segmentRepoSettingsAttributeKeyPrefix + "max_remote_download_bytes_per_sec", "4kb") .put(segmentRepoSettingsAttributeKeyPrefix + "chunk_size", 200, ByteSizeUnit.BYTES); } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java index 65335f444a2df..7626e3dba6424 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRestoreIT.java @@ -12,17 +12,26 @@ import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreResponse; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.health.ClusterHealthStatus; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.Repository; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; import java.io.IOException; +import java.nio.file.Path; import java.util.HashMap; +import java.util.Locale; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; import static org.hamcrest.Matchers.greaterThan; @@ -388,14 +397,41 @@ public void testRTSRestoreDataOnlyInTranslog() throws Exception { public void testRateLimitedRemoteDownloads() throws Exception { clusterSettingsSuppliedByTest = true; int shardCount = randomIntBetween(1, 3); + Path segmentRepoPath = randomRepoPath(); + Path tlogRepoPath = randomRepoPath(); prepareCluster( 1, 3, INDEX_NAME, 0, shardCount, - buildRemoteStoreNodeAttributes(REPOSITORY_NAME, randomRepoPath(), REPOSITORY_2_NAME, randomRepoPath(), true) + buildRemoteStoreNodeAttributes(REPOSITORY_NAME, segmentRepoPath, REPOSITORY_2_NAME, tlogRepoPath, true) ); + + // validate inplace repository metadata update + ClusterService clusterService = internalCluster().getInstance(ClusterService.class); + DiscoveryNode node = clusterService.localNode(); + String settingsAttributeKeyPrefix = String.format( + Locale.getDefault(), + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, + REPOSITORY_NAME + ); + Map settingsMap = node.getAttributes() + .keySet() + .stream() + .filter(key -> key.startsWith(settingsAttributeKeyPrefix)) + .collect(Collectors.toMap(key -> key.replace(settingsAttributeKeyPrefix, ""), key -> node.getAttributes().get(key))); + Settings.Builder settings = Settings.builder(); + settingsMap.entrySet().forEach(entry -> settings.put(entry.getKey(), entry.getValue())); + settings.put("location", segmentRepoPath).put("max_remote_download_bytes_per_sec", 4, ByteSizeUnit.KB); + + assertAcked(client().admin().cluster().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(settings).get()); + + for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) { + Repository segmentRepo = repositoriesService.repository(REPOSITORY_NAME); + assertEquals("4096b", segmentRepo.getMetadata().settings().get("max_remote_download_bytes_per_sec")); + } + Map indexStats = indexData(5, false, INDEX_NAME); assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards); internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME))); @@ -414,6 +450,15 @@ public void testRateLimitedRemoteDownloads() throws Exception { assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards); assertEquals(0, getNumShards(INDEX_NAME).numReplicas); verifyRestoredData(indexStats, INDEX_NAME); + + // revert repo metadata to pass asserts on repo metadata vs. node attrs during teardown + // https://github.com/opensearch-project/OpenSearch/pull/9569#discussion_r1345668700 + settings.remove("max_remote_download_bytes_per_sec"); + assertAcked(client().admin().cluster().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(settings).get()); + for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) { + Repository segmentRepo = repositoriesService.repository(REPOSITORY_NAME); + assertNull(segmentRepo.getMetadata().settings().get("max_remote_download_bytes_per_sec")); + } } // TODO: Restore flow - index aliases diff --git a/server/src/main/java/org/opensearch/common/blobstore/BlobStore.java b/server/src/main/java/org/opensearch/common/blobstore/BlobStore.java index ab40b1e2a082e..2ee3e9557b354 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/BlobStore.java +++ b/server/src/main/java/org/opensearch/common/blobstore/BlobStore.java @@ -31,6 +31,8 @@ package org.opensearch.common.blobstore; +import org.opensearch.cluster.metadata.RepositoryMetadata; + import java.io.Closeable; import java.util.Collections; import java.util.Map; @@ -53,4 +55,9 @@ public interface BlobStore extends Closeable { default Map stats() { return Collections.emptyMap(); } + + /** + * Reload the blob store inplace + */ + default void reload(RepositoryMetadata repositoryMetadata) {} } diff --git a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java index 0361a71116a16..72266c053a1ae 100644 --- a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java @@ -474,19 +474,29 @@ public void applyClusterState(ClusterChangedEvent event) { if (previousMetadata.type().equals(repositoryMetadata.type()) == false || previousMetadata.settings().equals(repositoryMetadata.settings()) == false) { // Previous version is different from the version in settings - logger.debug("updating repository [{}]", repositoryMetadata.name()); - closeRepository(repository); - archiveRepositoryStats(repository, state.version()); - repository = null; - try { - repository = createRepository(repositoryMetadata, typesRegistry); - } catch (RepositoryException ex) { - // TODO: this catch is bogus, it means the old repo is already closed, - // but we have nothing to replace it - logger.warn( - () -> new ParameterizedMessage("failed to change repository [{}]", repositoryMetadata.name()), - ex + if (repository.isSystemRepository() && repository.isReloadable()) { + logger.debug( + "updating repository [{}] in-place to use new metadata [{}]", + repositoryMetadata.name(), + repositoryMetadata ); + repository.validateMetadata(repositoryMetadata); + repository.reload(repositoryMetadata); + } else { + logger.debug("updating repository [{}]", repositoryMetadata.name()); + closeRepository(repository); + archiveRepositoryStats(repository, state.version()); + repository = null; + try { + repository = createRepository(repositoryMetadata, typesRegistry); + } catch (RepositoryException ex) { + // TODO: this catch is bogus, it means the old repo is already closed, + // but we have nothing to replace it + logger.warn( + () -> new ParameterizedMessage("failed to change repository [{}]", repositoryMetadata.name()), + ex + ); + } } } } else { diff --git a/server/src/main/java/org/opensearch/repositories/Repository.java b/server/src/main/java/org/opensearch/repositories/Repository.java index 02ba60cb23c4d..74cda9bcc3393 100644 --- a/server/src/main/java/org/opensearch/repositories/Repository.java +++ b/server/src/main/java/org/opensearch/repositories/Repository.java @@ -451,4 +451,22 @@ default void cloneRemoteStoreIndexShardSnapshot( default Map adaptUserMetadata(Map userMetadata) { return userMetadata; } + + /** + * Checks if the repository can be reloaded inplace or not + * @return true if the repository can be reloaded inplace, false otherwise + */ + default boolean isReloadable() { + return false; + } + + /** + * Reload the repository inplace + */ + default void reload(RepositoryMetadata repositoryMetadata) {} + + /** + * Validate the repository metadata + */ + default void validateMetadata(RepositoryMetadata repositoryMetadata) {} } diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 41ad357eaeed9..bf85e19493203 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -297,21 +297,21 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp Setting.Property.NodeScope ); - protected final boolean supportURLRepo; + protected volatile boolean supportURLRepo; - private final int maxShardBlobDeleteBatch; + private volatile int maxShardBlobDeleteBatch; - private final Compressor compressor; + private volatile Compressor compressor; - private final boolean cacheRepositoryData; + private volatile boolean cacheRepositoryData; - private final RateLimiter snapshotRateLimiter; + private volatile RateLimiter snapshotRateLimiter; - private final RateLimiter restoreRateLimiter; + private volatile RateLimiter restoreRateLimiter; - private final RateLimiter remoteUploadRateLimiter; + private volatile RateLimiter remoteUploadRateLimiter; - private final RateLimiter remoteDownloadRateLimiter; + private volatile RateLimiter remoteDownloadRateLimiter; private final CounterMetric snapshotRateLimitingTimeInNanos = new CounterMetric(); @@ -356,7 +356,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp BlobStoreIndexShardSnapshots::fromXContent ); - private final boolean readOnly; + private volatile boolean readOnly; private final boolean isSystemRepository; @@ -366,7 +366,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private final SetOnce blobStore = new SetOnce<>(); - private final ClusterService clusterService; + protected final ClusterService clusterService; private final RecoverySettings recoverySettings; @@ -400,36 +400,54 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp /** * IO buffer size hint for reading and writing to the underlying blob store. */ - protected final int bufferSize; + protected volatile int bufferSize; /** * Constructs new BlobStoreRepository - * @param metadata The metadata for this repository including name and settings + * @param repositoryMetadata The metadata for this repository including name and settings * @param clusterService ClusterService */ protected BlobStoreRepository( - final RepositoryMetadata metadata, - final boolean compress, + final RepositoryMetadata repositoryMetadata, final NamedXContentRegistry namedXContentRegistry, final ClusterService clusterService, final RecoverySettings recoverySettings ) { - this.metadata = metadata; + // Read RepositoryMetadata as the first step + readRepositoryMetadata(repositoryMetadata); + + isSystemRepository = SYSTEM_REPOSITORY_SETTING.get(metadata.settings()); this.namedXContentRegistry = namedXContentRegistry; this.threadPool = clusterService.getClusterApplierService().threadPool(); this.clusterService = clusterService; this.recoverySettings = recoverySettings; - this.supportURLRepo = SUPPORT_URL_REPO.get(metadata.settings()); + } + + @Override + public void reload(RepositoryMetadata repositoryMetadata) { + readRepositoryMetadata(repositoryMetadata); + } + + /** + * Reloads the values derived from the Repository Metadata + * + * @param repositoryMetadata RepositoryMetadata instance to derive the values from + */ + private void readRepositoryMetadata(RepositoryMetadata repositoryMetadata) { + this.metadata = repositoryMetadata; + + supportURLRepo = SUPPORT_URL_REPO.get(metadata.settings()); snapshotRateLimiter = getRateLimiter(metadata.settings(), "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB)); restoreRateLimiter = getRateLimiter(metadata.settings(), "max_restore_bytes_per_sec", ByteSizeValue.ZERO); remoteUploadRateLimiter = getRateLimiter(metadata.settings(), "max_remote_upload_bytes_per_sec", ByteSizeValue.ZERO); remoteDownloadRateLimiter = getRateLimiter(metadata.settings(), "max_remote_download_bytes_per_sec", ByteSizeValue.ZERO); readOnly = READONLY_SETTING.get(metadata.settings()); - isSystemRepository = SYSTEM_REPOSITORY_SETTING.get(metadata.settings()); cacheRepositoryData = CACHE_REPOSITORY_DATA.get(metadata.settings()); bufferSize = Math.toIntExact(BUFFER_SIZE_SETTING.get(metadata.settings()).getBytes()); maxShardBlobDeleteBatch = MAX_SNAPSHOT_SHARD_BLOB_DELETE_BATCH_SIZE.get(metadata.settings()); - this.compressor = compress ? COMPRESSION_TYPE_SETTING.get(metadata.settings()) : CompressorRegistry.none(); + compressor = COMPRESS_SETTING.get(metadata.settings()) + ? COMPRESSION_TYPE_SETTING.get(metadata.settings()) + : CompressorRegistry.none(); } @Override diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/MeteredBlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/MeteredBlobStoreRepository.java index 54f226e81025e..d4921f4e6d2e7 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/MeteredBlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/MeteredBlobStoreRepository.java @@ -53,13 +53,12 @@ public abstract class MeteredBlobStoreRepository extends BlobStoreRepository { public MeteredBlobStoreRepository( RepositoryMetadata metadata, - boolean compress, NamedXContentRegistry namedXContentRegistry, ClusterService clusterService, RecoverySettings recoverySettings, Map location ) { - super(metadata, compress, namedXContentRegistry, clusterService, recoverySettings); + super(metadata, namedXContentRegistry, clusterService, recoverySettings); ThreadPool threadPool = clusterService.getClusterApplierService().threadPool(); this.repositoryInfo = new RepositoryInfo( UUIDs.randomBase64UUID(), @@ -70,6 +69,14 @@ public MeteredBlobStoreRepository( ); } + @Override + public void reload(RepositoryMetadata repositoryMetadata) { + super.reload(repositoryMetadata); + + // Not adding any additional reload logic here is intentional as the constructor only + // initializes the repositoryInfo from the repo metadata, which cannot be changed. + } + public RepositoryStatsSnapshot statsSnapshot() { return new RepositoryStatsSnapshot(repositoryInfo, stats(), RepositoryStatsSnapshot.UNKNOWN_CLUSTER_VERSION, false); } diff --git a/server/src/main/java/org/opensearch/repositories/fs/FsRepository.java b/server/src/main/java/org/opensearch/repositories/fs/FsRepository.java index d432bab93bd71..4a9a91336ec1d 100644 --- a/server/src/main/java/org/opensearch/repositories/fs/FsRepository.java +++ b/server/src/main/java/org/opensearch/repositories/fs/FsRepository.java @@ -63,7 +63,6 @@ *
{@code concurrent_streams}
Number of concurrent read/write stream (per repository on each node). Defaults to 5.
*
{@code chunk_size}
Large file can be divided into chunks. This parameter specifies the chunk size. * Defaults to not chucked.
- *
{@code compress}
If set to true metadata files will be stored compressed. Defaults to false.
* * * @opensearch.internal @@ -103,11 +102,11 @@ public class FsRepository extends BlobStoreRepository { public static final Setting BASE_PATH_SETTING = Setting.simpleString("base_path"); - private final Environment environment; + protected final Environment environment; - private ByteSizeValue chunkSize; + protected ByteSizeValue chunkSize; - private final BlobPath basePath; + protected BlobPath basePath; /** * Constructs a shared file system repository. @@ -119,8 +118,27 @@ public FsRepository( ClusterService clusterService, RecoverySettings recoverySettings ) { - super(metadata, calculateCompress(metadata, environment), namedXContentRegistry, clusterService, recoverySettings); + super(metadata, namedXContentRegistry, clusterService, recoverySettings); this.environment = environment; + validateLocation(); + readMetadata(); + } + + protected void readMetadata() { + if (CHUNK_SIZE_SETTING.exists(metadata.settings())) { + this.chunkSize = CHUNK_SIZE_SETTING.get(metadata.settings()); + } else { + this.chunkSize = REPOSITORIES_CHUNK_SIZE_SETTING.get(environment.settings()); + } + final String basePath = BASE_PATH_SETTING.get(metadata.settings()); + if (Strings.hasLength(basePath)) { + this.basePath = new BlobPath().add(basePath); + } else { + this.basePath = BlobPath.cleanPath(); + } + } + + protected void validateLocation() { String location = REPOSITORIES_LOCATION_SETTING.get(metadata.settings()); if (location.isEmpty()) { logger.warn( @@ -153,24 +171,6 @@ public FsRepository( ); } } - - if (CHUNK_SIZE_SETTING.exists(metadata.settings())) { - this.chunkSize = CHUNK_SIZE_SETTING.get(metadata.settings()); - } else { - this.chunkSize = REPOSITORIES_CHUNK_SIZE_SETTING.get(environment.settings()); - } - final String basePath = BASE_PATH_SETTING.get(metadata.settings()); - if (Strings.hasLength(basePath)) { - this.basePath = new BlobPath().add(basePath); - } else { - this.basePath = BlobPath.cleanPath(); - } - } - - private static boolean calculateCompress(RepositoryMetadata metadata, Environment environment) { - return COMPRESS_SETTING.exists(metadata.settings()) - ? COMPRESS_SETTING.get(metadata.settings()) - : REPOSITORIES_COMPRESS_SETTING.get(environment.settings()); } @Override diff --git a/server/src/main/java/org/opensearch/repositories/fs/ReloadableFsRepository.java b/server/src/main/java/org/opensearch/repositories/fs/ReloadableFsRepository.java new file mode 100644 index 0000000000000..c06c805a39396 --- /dev/null +++ b/server/src/main/java/org/opensearch/repositories/fs/ReloadableFsRepository.java @@ -0,0 +1,51 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.repositories.fs; + +import org.opensearch.cluster.metadata.RepositoryMetadata; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.env.Environment; +import org.opensearch.indices.recovery.RecoverySettings; + +/** + * Extension of {@link FsRepository} that can be reloaded inplace + * + * @opensearch.internal + */ +public class ReloadableFsRepository extends FsRepository { + /** + * Constructs a shared file system repository that is reloadable in-place. + */ + public ReloadableFsRepository( + RepositoryMetadata metadata, + Environment environment, + NamedXContentRegistry namedXContentRegistry, + ClusterService clusterService, + RecoverySettings recoverySettings + ) { + super(metadata, environment, namedXContentRegistry, clusterService, recoverySettings); + } + + @Override + public boolean isReloadable() { + return true; + } + + @Override + public void reload(RepositoryMetadata repositoryMetadata) { + if (isReloadable() == false) { + return; + } + + super.reload(repositoryMetadata); + validateLocation(); + readMetadata(); + } +} diff --git a/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java index cc4f4f7b6413d..c4599a6e7a00e 100644 --- a/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java @@ -859,14 +859,7 @@ private static class MeteredRepositoryTypeA extends MeteredBlobStoreRepository { private final TestCryptoProvider cryptoHandler; private MeteredRepositoryTypeA(RepositoryMetadata metadata, ClusterService clusterService) { - super( - metadata, - false, - mock(NamedXContentRegistry.class), - clusterService, - mock(RecoverySettings.class), - Map.of("bucket", "bucket-a") - ); + super(metadata, mock(NamedXContentRegistry.class), clusterService, mock(RecoverySettings.class), Map.of("bucket", "bucket-a")); if (metadata.cryptoMetadata() != null) { cryptoHandler = new TestCryptoProvider( @@ -900,14 +893,7 @@ private static class MeteredRepositoryTypeB extends MeteredBlobStoreRepository { private final TestCryptoProvider cryptoHandler; private MeteredRepositoryTypeB(RepositoryMetadata metadata, ClusterService clusterService) { - super( - metadata, - false, - mock(NamedXContentRegistry.class), - clusterService, - mock(RecoverySettings.class), - Map.of("bucket", "bucket-b") - ); + super(metadata, mock(NamedXContentRegistry.class), clusterService, mock(RecoverySettings.class), Map.of("bucket", "bucket-b")); if (metadata.cryptoMetadata() != null) { cryptoHandler = new TestCryptoProvider( diff --git a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java index e097c7025e4fe..9c65ad32fa6a6 100644 --- a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java +++ b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java @@ -252,7 +252,7 @@ public void testBadChunksize() throws Exception { ); } - public void testFsRepositoryCompressDeprecated() { + public void testFsRepositoryCompressDeprecatedIgnored() { final Path location = OpenSearchIntegTestCase.randomRepoPath(node().settings()); final Settings settings = Settings.builder().put(node().settings()).put("location", location).build(); final RepositoryMetadata metadata = new RepositoryMetadata("test-repo", REPO_TYPE, settings); @@ -265,10 +265,7 @@ public void testFsRepositoryCompressDeprecated() { new FsRepository(metadata, useCompressEnvironment, null, BlobStoreTestUtil.mockClusterService(), null); - assertWarnings( - "[repositories.fs.compress] setting was deprecated in OpenSearch and will be removed in a future release!" - + " See the breaking changes documentation for the next major version." - ); + assertNoDeprecationWarnings(); } private static void writeIndexGen(BlobStoreRepository repository, RepositoryData repositoryData, long generation) throws Exception { diff --git a/server/src/test/java/org/opensearch/repositories/fs/ReloadableFsRepositoryTests.java b/server/src/test/java/org/opensearch/repositories/fs/ReloadableFsRepositoryTests.java new file mode 100644 index 0000000000000..db2cf9c3e9582 --- /dev/null +++ b/server/src/test/java/org/opensearch/repositories/fs/ReloadableFsRepositoryTests.java @@ -0,0 +1,119 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.repositories.fs; + +import org.opensearch.cluster.metadata.RepositoryMetadata; +import org.opensearch.common.compress.DeflateCompressor; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.compress.ZstdCompressor; +import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.compress.CompressorRegistry; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.env.Environment; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.repositories.blobstore.BlobStoreTestUtil; +import org.opensearch.test.OpenSearchTestCase; + +import java.nio.file.Path; +import java.util.Locale; + +public class ReloadableFsRepositoryTests extends OpenSearchTestCase { + ReloadableFsRepository repository; + RepositoryMetadata metadata; + Settings settings; + Path repo; + + @Override + public void setUp() throws Exception { + super.setUp(); + + repo = createTempDir(); + settings = Settings.builder() + .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toAbsolutePath()) + .put(Environment.PATH_REPO_SETTING.getKey(), repo.toAbsolutePath()) + .putList(Environment.PATH_DATA_SETTING.getKey(), tmpPaths()) + .put("location", repo) + .put("compress", false) + .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put(FsRepository.BASE_PATH_SETTING.getKey(), "my_base_path") + .build(); + metadata = new RepositoryMetadata("test", "fs", settings); + repository = new ReloadableFsRepository( + metadata, + new Environment(settings, null), + NamedXContentRegistry.EMPTY, + BlobStoreTestUtil.mockClusterService(), + new RecoverySettings(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)) + ); + } + + /** + * Validates that {@link ReloadableFsRepository} supports inplace reloading + */ + public void testIsReloadable() { + assertTrue(repository.isReloadable()); + } + + /** + * Updates repository metadata of an existing repository to enable default compressor + */ + public void testCompressReload() { + assertEquals(CompressorRegistry.none(), repository.getCompressor()); + updateCompressionTypeToDefault(); + repository.validateMetadata(metadata); + repository.reload(metadata); + assertEquals(CompressorRegistry.defaultCompressor(), repository.getCompressor()); + } + + /** + * Updates repository metadata of an existing repository to change compressor type from default to Zstd + */ + public void testCompressionTypeReload() { + assertEquals(CompressorRegistry.none(), repository.getCompressor()); + updateCompressionTypeToDefault(); + repository = new ReloadableFsRepository( + metadata, + new Environment(settings, null), + NamedXContentRegistry.EMPTY, + BlobStoreTestUtil.mockClusterService(), + new RecoverySettings(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)) + ); + assertEquals(CompressorRegistry.defaultCompressor(), repository.getCompressor()); + + settings = Settings.builder() + .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toAbsolutePath()) + .put(Environment.PATH_REPO_SETTING.getKey(), repo.toAbsolutePath()) + .putList(Environment.PATH_DATA_SETTING.getKey(), tmpPaths()) + .put("location", repo) + .put("compress", true) + .put("compression_type", ZstdCompressor.NAME.toLowerCase(Locale.ROOT)) + .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put(FsRepository.BASE_PATH_SETTING.getKey(), "my_base_path") + .build(); + metadata = new RepositoryMetadata("test", "fs", settings); + repository.validateMetadata(metadata); + repository.reload(metadata); + assertEquals(CompressorRegistry.getCompressor(ZstdCompressor.NAME.toUpperCase(Locale.ROOT)), repository.getCompressor()); + } + + private void updateCompressionTypeToDefault() { + settings = Settings.builder() + .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toAbsolutePath()) + .put(Environment.PATH_REPO_SETTING.getKey(), repo.toAbsolutePath()) + .putList(Environment.PATH_DATA_SETTING.getKey(), tmpPaths()) + .put("location", repo) + .put("compress", true) + .put("compression_type", DeflateCompressor.NAME.toLowerCase(Locale.ROOT)) + .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put(FsRepository.BASE_PATH_SETTING.getKey(), "my_base_path") + .build(); + metadata = new RepositoryMetadata("test", "fs", settings); + } +} diff --git a/server/src/test/java/org/opensearch/snapshots/mockstore/MockEventuallyConsistentRepository.java b/server/src/test/java/org/opensearch/snapshots/mockstore/MockEventuallyConsistentRepository.java index ca8bec469f3bc..f9388c9e4b86e 100644 --- a/server/src/test/java/org/opensearch/snapshots/mockstore/MockEventuallyConsistentRepository.java +++ b/server/src/test/java/org/opensearch/snapshots/mockstore/MockEventuallyConsistentRepository.java @@ -90,7 +90,7 @@ public MockEventuallyConsistentRepository( final Context context, final Random random ) { - super(metadata, false, namedXContentRegistry, clusterService, recoverySettings); + super(metadata, namedXContentRegistry, clusterService, recoverySettings); this.context = context; this.namedXContentRegistry = namedXContentRegistry; this.random = random;