diff --git a/plugins/repository-s3/build.gradle b/plugins/repository-s3/build.gradle index 560d12d14395d..89cbe1cd0ff7a 100644 --- a/plugins/repository-s3/build.gradle +++ b/plugins/repository-s3/build.gradle @@ -69,6 +69,10 @@ dependencies { api "software.amazon.awssdk:aws-query-protocol:${versions.aws}" api "software.amazon.awssdk:sts:${versions.aws}" api "software.amazon.awssdk:netty-nio-client:${versions.aws}" + api "software.amazon.awssdk:aws-crt-client:${versions.aws}" + api "software.amazon.awssdk.crt:aws-crt:0.21.16" + api "software.amazon.awssdk:crt-core:${versions.aws}" + api "org.apache.httpcomponents:httpclient:${versions.httpclient}" api "org.apache.httpcomponents:httpcore:${versions.httpcore}" @@ -548,7 +552,6 @@ thirdPartyAudit { 'software.amazon.awssdk.crtcore.CrtConnectionHealthConfiguration', 'software.amazon.awssdk.crtcore.CrtConnectionHealthConfiguration$Builder', 'software.amazon.awssdk.crtcore.CrtConnectionHealthConfiguration$DefaultBuilder', - 'software.amazon.awssdk.crtcore.CrtProxyConfiguration', 'software.amazon.awssdk.crtcore.CrtProxyConfiguration$Builder', 'software.amazon.awssdk.crtcore.CrtProxyConfiguration$DefaultBuilder', 'software.amazon.eventstream.HeaderValue', diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/AmazonAsyncS3WithCredentials.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/AmazonAsyncS3WithCredentials.java index f8a313b55d945..6952a686fcdda 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/AmazonAsyncS3WithCredentials.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/AmazonAsyncS3WithCredentials.java @@ -21,17 +21,21 @@ final class AmazonAsyncS3WithCredentials { private final S3AsyncClient priorityClient; private final S3AsyncClient urgentClient; private final AwsCredentialsProvider credentials; + private final S3AsyncClient crtClient; + private AmazonAsyncS3WithCredentials( final S3AsyncClient client, final S3AsyncClient priorityClient, final S3AsyncClient urgentClient, - @Nullable final AwsCredentialsProvider credentials + @Nullable final AwsCredentialsProvider credentials, + final S3AsyncClient crtClient ) { this.client = client; this.credentials = credentials; this.priorityClient = priorityClient; this.urgentClient = urgentClient; + this.crtClient = crtClient; } S3AsyncClient client() { @@ -46,6 +50,9 @@ S3AsyncClient urgentClient() { return urgentClient; } + S3AsyncClient crtClient() { + return crtClient; + } AwsCredentialsProvider credentials() { return credentials; } @@ -56,6 +63,16 @@ static AmazonAsyncS3WithCredentials create( final S3AsyncClient urgentClient, @Nullable final AwsCredentialsProvider credentials ) { - return new AmazonAsyncS3WithCredentials(client, priorityClient, urgentClient, credentials); + return create(client, priorityClient, urgentClient, credentials, null); + } + + static AmazonAsyncS3WithCredentials create( + final S3AsyncClient client, + final S3AsyncClient priorityClient, + final S3AsyncClient urgentClient, + @Nullable final AwsCredentialsProvider credentials, + final S3AsyncClient crtClient + ) { + return new AmazonAsyncS3WithCredentials(client, priorityClient, urgentClient, credentials, crtClient); } } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3AsyncService.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3AsyncService.java index d691cad9c9d03..ff4abf5d57f9a 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3AsyncService.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3AsyncService.java @@ -28,6 +28,11 @@ import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; +import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder; +import software.amazon.awssdk.services.s3.crt.S3CrtConnectionHealthConfiguration; +import software.amazon.awssdk.services.s3.crt.S3CrtHttpConfiguration; +import software.amazon.awssdk.services.s3.crt.S3CrtProxyConfiguration; +import software.amazon.awssdk.services.s3.crt.S3CrtRetryConfiguration; import software.amazon.awssdk.services.sts.StsClient; import software.amazon.awssdk.services.sts.StsClientBuilder; import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; @@ -230,7 +235,33 @@ synchronized AmazonAsyncS3WithCredentials buildClient( ); final S3AsyncClient client = SocketAccess.doPrivileged(builder::build); - return AmazonAsyncS3WithCredentials.create(client, priorityClient, urgentClient, credentials); + final S3CrtAsyncClientBuilder crtBuilder = S3AsyncClient.crtBuilder(); + crtBuilder.credentialsProvider(credentials); + S3CrtHttpConfiguration.Builder httpConfiguration = S3CrtHttpConfiguration.builder(); + if (clientSettings.proxySettings.getType() != ProxySettings.ProxyType.DIRECT) { + S3CrtProxyConfiguration proxyConfiguration = S3CrtProxyConfiguration.builder() + .scheme(clientSettings.proxySettings.getType().toProtocol().toString()) + .host(clientSettings.proxySettings.getHostName()) + .port(clientSettings.proxySettings.getPort()) + .username(clientSettings.proxySettings.getUsername()) + .password(clientSettings.proxySettings.getPassword()) + .build(); + httpConfiguration.proxyConfiguration(proxyConfiguration); + } + crtBuilder.httpConfiguration(httpConfiguration.build()); + crtBuilder.retryConfiguration(S3CrtRetryConfiguration.builder() + .numRetries(clientSettings.maxRetries) + .build()); +// crtBuilder.asyncConfiguration( +// ClientAsyncConfiguration.builder() +// .advancedOption( +// SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR, +// urgentExecutorBuilder.getFutureCompletionExecutor() +// ) +// .build() +// ); + final S3AsyncClient crtClient = SocketAccess.doPrivileged(crtBuilder::build); + return AmazonAsyncS3WithCredentials.create(client, priorityClient, urgentClient, credentials, crtClient); } static ClientOverrideConfiguration buildOverrideConfiguration(final S3ClientSettings clientSettings) { diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index 14829a066ca3a..f53c3d1dfc948 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -33,34 +33,14 @@ package org.opensearch.repositories.s3; import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.S3AsyncClient; -import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest; -import software.amazon.awssdk.services.s3.model.CommonPrefix; -import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; -import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload; -import software.amazon.awssdk.services.s3.model.CompletedPart; -import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; -import software.amazon.awssdk.services.s3.model.Delete; -import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; -import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse; -import software.amazon.awssdk.services.s3.model.GetObjectAttributesRequest; -import software.amazon.awssdk.services.s3.model.GetObjectAttributesResponse; -import software.amazon.awssdk.services.s3.model.GetObjectRequest; -import software.amazon.awssdk.services.s3.model.GetObjectResponse; -import software.amazon.awssdk.services.s3.model.HeadObjectRequest; -import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; -import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; -import software.amazon.awssdk.services.s3.model.NoSuchKeyException; -import software.amazon.awssdk.services.s3.model.ObjectAttributes; -import software.amazon.awssdk.services.s3.model.ObjectIdentifier; -import software.amazon.awssdk.services.s3.model.PutObjectRequest; -import software.amazon.awssdk.services.s3.model.S3Error; -import software.amazon.awssdk.services.s3.model.ServerSideEncryption; -import software.amazon.awssdk.services.s3.model.UploadPartRequest; -import software.amazon.awssdk.services.s3.model.UploadPartResponse; +import software.amazon.awssdk.services.s3.internal.crt.S3CrtAsyncClient; +import software.amazon.awssdk.services.s3.model.*; import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable; import software.amazon.awssdk.utils.CollectionUtils; @@ -193,17 +173,106 @@ public void writeBlobWithMetadata( boolean failIfAlreadyExists, @Nullable Map metadata ) throws IOException { + logger.info("********writeBlobWithMetadata********"); + assert inputStream.markSupported() : "No mark support on inputStream breaks the S3 SDK's ability to retry requests"; SocketAccess.doPrivilegedIOException(() -> { if (blobSize <= getLargeBlobThresholdInBytes()) { + logger.info("Executing single upload"); executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata); } else { + logger.info("Executing multiplart upload"); + executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata); } return null; }); } +// public void asyncWriteBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists,@Nullable Map metadata, WritePriority priority, ActionListener completionListener) throws IOException { +// assert inputStream.markSupported() : "No mark support on inputStream breaks the S3 SDK's ability to retry requests"; +// +// try (AmazonAsyncS3Reference amazonS3Reference = SocketAccess.doPrivileged(blobStore::asyncClientReference)) { +// +// S3AsyncClient s3AsyncClient; +// if (priority == WritePriority.URGENT) { +// s3AsyncClient = amazonS3Reference.get().urgentClient(); +// } else if (priority == WritePriority.HIGH) { +// s3AsyncClient = amazonS3Reference.get().priorityClient(); +// } else { +// s3AsyncClient = amazonS3Reference.get().client(); +// } +// //getAsyncTransferManager does not support putObject. +// CompletableFuture completableFuture = blobStore.getAsyncTransferManager() +// .uploadObject(s3AsyncClient, uploadRequest, streamContext, blobStore.getStatsMetricPublisher()); +// CompletableFuture completableFuture = blobStore.getAsyncTransferManager().put +// completableFuture.whenComplete((response, throwable) -> { +// if (throwable == null) { +// completionListener.onResponse(response); +// } else { +// Exception ex = throwable instanceof Error ? new Exception(throwable) : (Exception) throwable; +// completionListener.onFailure(ex); +// } +// }); +// SocketAccess.doPrivilegedIOException(() -> { +// if (blobSize <= getLargeBlobThresholdInBytes()) { +// logger.info("Executing single upload"); +// executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata); +// } else { +// logger.info("Executing multiplart upload"); +// +// executeMultipartUpload(blobStore, buildKey(blobName), inputStream, blobSize, metadata); +// } +// return null; +// }); +// } +// } + + public void asyncStreamUpload(String blobName, InputStream inputStream, ActionListener completionListener) { +// BlockingInputStreamAsyncRequestBody body = +// AsyncRequestBody.forBlockingInputStream(null); // 'null' indicates a stream will be provided later. +// +// CompletableFuture responseFuture = +// s33CrtAsyncClient.putObject(r -> r.bucket(bucketName).key(key), body); +// +// // AsyncExampleUtils.randomString() returns a random string up to 100 characters. +// String randomString = AsyncExampleUtils.randomString(); +// logger.info("random string to upload: {}: length={}", randomString, randomString.length()); +// +// // Provide the stream of data to be uploaded. +// body.writeInputStream(new ByteArrayInputStream(randomString.getBytes())); +// +// PutObjectResponse response = responseFuture.join(); // Wait for the response. +// logger.info("Object {} uploaded to bucket {}.", key, bucketName); +// return response; + + + + try (AmazonAsyncS3Reference amazonS3Reference = SocketAccess.doPrivileged(blobStore::asyncClientReference)) { + + S3CrtAsyncClient s3AsyncClient = (S3CrtAsyncClient) amazonS3Reference.get().crtClient(); + PutObjectRequest.Builder putObjectRequestBuilder = PutObjectRequest.builder() + .bucket(blobStore.bucket()) + .key(blobName) + .overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().putObjectMetricPublisher)); + BlockingInputStreamAsyncRequestBody body = + AsyncRequestBody.forBlockingInputStream(null); + CompletableFuture completableFuture = s3AsyncClient.putObject( + putObjectRequestBuilder.build(), + body); + body.writeInputStream(inputStream); + + completableFuture.whenComplete((response, throwable) -> { + if (throwable == null) { + completionListener.onResponse(null); + } else { + Exception ex = throwable instanceof Error ? new Exception(throwable) : (Exception) throwable; + completionListener.onFailure(ex); + } + }); + } + } + @Override public void asyncBlobUpload(WriteContext writeContext, ActionListener completionListener) throws IOException { UploadRequest uploadRequest = new UploadRequest( diff --git a/plugins/repository-s3/src/main/plugin-metadata/plugin-security.policy b/plugins/repository-s3/src/main/plugin-metadata/plugin-security.policy index 106103d45e7eb..7b8ae15729397 100644 --- a/plugins/repository-s3/src/main/plugin-metadata/plugin-security.policy +++ b/plugins/repository-s3/src/main/plugin-metadata/plugin-security.policy @@ -63,4 +63,12 @@ grant { permission java.io.FilePermission "config", "read"; permission java.lang.RuntimePermission "accessDeclaredMembers"; + permission java.lang.RuntimePermission "loadLibrary.aws-crt-jni"; + permission java.lang.RuntimePermission "loadLibrary.*"; + permission java.lang.RuntimePermission "shutdownHooks"; + }; + + grant codeBase "${codebase.aws-crt-jni}" { + permission java.lang.RuntimePermission "loadLibrary.*"; + }; diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java index 017ac23ce1427..1357309d8834f 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java @@ -10,26 +10,40 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.IndexInput; import org.opensearch.Version; +import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.common.CheckedRunnable; +import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.common.blobstore.transfer.RemoteTransferContainer; +import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.io.IOUtils; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.index.Index; import org.opensearch.gateway.remote.ClusterMetadataManifest; import org.opensearch.gateway.remote.RemoteClusterStateService; +import org.opensearch.gateway.remote.RemoteClusterStateUtils; import org.opensearch.gateway.remote.routingtable.IndexRoutingTableInputStream; import org.opensearch.gateway.remote.routingtable.IndexRoutingTableInputStreamReader; import org.opensearch.index.remote.RemoteStoreUtils; +import org.opensearch.index.store.exception.ChecksumCombinationException; import org.opensearch.node.Node; import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; import org.opensearch.repositories.RepositoriesService; @@ -49,6 +63,8 @@ import java.util.function.Supplier; import java.util.stream.Collectors; +import static org.opensearch.common.blobstore.transfer.RemoteTransferContainer.checksumOfChecksum; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.FORMAT_PARAMS; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.getCusterMetadataBasePath; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled; @@ -102,6 +118,99 @@ public List writeFullRoutingTable return uploadedIndices; } + public List getChangedIndicesRouting( ClusterState previousClusterState, + ClusterState clusterState) { + Map previousIndexRoutingTable = previousClusterState.getRoutingTable().getIndicesRouting(); + + List changedIndicesRouting = new ArrayList<>(); + + for (IndexRoutingTable indexRouting : clusterState.getRoutingTable().getIndicesRouting().values()) { + if (!(previousIndexRoutingTable.containsKey(indexRouting.getIndex().getName()) && indexRouting.equals(previousIndexRoutingTable.get(indexRouting.getIndex().getName())))) { + changedIndicesRouting.add(indexRouting); + } + } + logger.info("changedIndicesRouting {}", changedIndicesRouting.toString()); + + return changedIndicesRouting; + + } + + public CheckedRunnable getIndexRoutingAsyncAction( + ClusterState clusterState, + IndexRoutingTable indexRouting, + LatchedActionListener latchedActionListener + ) throws IOException { + + //TODO: Integrate with optimized S3 prefix for index routing file path. + BlobPath custerMetadataBasePath = getCusterMetadataBasePath(blobStoreRepository, clusterState.getClusterName().value(), + clusterState.metadata().clusterUUID()); + logger.info("custerMetadataBasePath {}", custerMetadataBasePath); + + final BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer(custerMetadataBasePath.add(INDEX_ROUTING_PATH_TOKEN).add(indexRouting.getIndex().getUUID())); + + final String fileName = getIndexRoutingFileName(); + logger.info("fileName {}", fileName); + + ActionListener completionListener = ActionListener.wrap( + resp -> latchedActionListener.onResponse( + new ClusterMetadataManifest.UploadedIndexMetadata( + + indexRouting.getIndex().getName(), + indexRouting.getIndex().getUUID(), + blobContainer.path().buildAsString() + fileName, + "indexRouting--" + ) + ), + ex -> latchedActionListener.onFailure(new RemoteClusterStateUtils.RemoteStateTransferException(indexRouting.getIndex().toString(), ex)) + ); + + if (blobContainer instanceof AsyncMultiStreamBlobContainer == false) { + logger.info("TRYING FILE UPLOAD"); + + return () -> { + logger.info("Going to upload {}", indexRouting.prettyPrint()); + uploadIndex(indexRouting, clusterState.getRoutingTable().version(), custerMetadataBasePath); + logger.info("upload done {}", indexRouting.prettyPrint()); + + completionListener.onResponse(null); + logger.info("response done {}", indexRouting.prettyPrint()); + + }; + } + +// try ( +// InputStream indexRoutingStream = new IndexRoutingTableInputStream(indexRouting); +// IndexInput input = new ByteArrayIndexInput("indexrouting", indexRoutingStream.readAllBytes())) { +//// long expectedChecksum; +//// try { +//// expectedChecksum = checksumOfChecksum(input.clone(), 8); +//// } catch (Exception e) { +//// throw e; +//// } +// try ( +// +// RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( +// fileName, +// fileName, +// input.length(), +// true, +// WritePriority.URGENT, +// (size, position) -> new OffsetRangeIndexInputStream(input, size, position), +// null, +// false +// ) +// ) { +// return () -> ((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(remoteTransferContainer.createWriteContext(), completionListener); +// } catch (IOException e) { +// e.printStackTrace(); +// } +// } + logger.info("TRYING S3 UPLOAD"); + InputStream indexRoutingStream = new IndexRoutingTableInputStream(indexRouting); + return () -> ((AsyncMultiStreamBlobContainer) blobContainer).asyncStreamUpload(fileName, indexRoutingStream, completionListener); + + } + public List writeIncrementalRoutingTable( ClusterState previousClusterState, ClusterState clusterState, @@ -129,6 +238,19 @@ public List writeIncrementalRouti return uploadedIndices; } + public List getAllUploadedIndicesRouting(ClusterMetadataManifest previousManifest, List indicesRoutingToUpload) { + final Map allUploadedIndicesRouting = previousManifest.getIndicesRouting() + .stream() + .collect(Collectors.toMap(ClusterMetadataManifest.UploadedIndexMetadata::getIndexName, Function.identity())); + indicesRoutingToUpload.forEach( + uploadedIndexMetadata -> allUploadedIndicesRouting.put(uploadedIndexMetadata.getIndexName(), uploadedIndexMetadata) + ); + + logger.info("allUploadedIndicesRouting ROUTING {}", allUploadedIndicesRouting); + + return new ArrayList<>(allUploadedIndicesRouting.values()); + } + private ClusterMetadataManifest.UploadedIndexMetadata uploadIndex(IndexRoutingTable indexRouting, long routingTableVersion, BlobPath custerMetadataBasePath) { try { InputStream indexRoutingStream = new IndexRoutingTableInputStream(indexRouting); @@ -200,4 +322,5 @@ public void start() { blobStoreRepository = (BlobStoreRepository) repository; } + } diff --git a/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamBlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamBlobContainer.java index 97f304d776f5c..f15df2b9c7061 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamBlobContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamBlobContainer.java @@ -14,6 +14,7 @@ import org.opensearch.core.action.ActionListener; import java.io.IOException; +import java.io.InputStream; /** * An extension of {@link BlobContainer} that adds {@link AsyncMultiStreamBlobContainer#asyncBlobUpload} to allow @@ -34,6 +35,7 @@ public interface AsyncMultiStreamBlobContainer extends BlobContainer { */ void asyncBlobUpload(WriteContext writeContext, ActionListener completionListener) throws IOException; + void asyncStreamUpload(String blobName, InputStream inputStream, ActionListener completionListener); /** * Creates an async callback of a {@link ReadContext} containing the multipart streams for a specified blob within the container. * @param blobName The name of the blob for which the {@link ReadContext} needs to be fetched. diff --git a/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainer.java index 82bc7a0baed50..aefea8f8d7cf7 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainer.java @@ -44,6 +44,11 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp blobContainer.asyncBlobUpload(encryptedWriteContext, completionListener); } + @Override + public void asyncStreamUpload(String blobName, InputStream inputStream, ActionListener completionListener) { + throw new RuntimeException("Mehtod not implemented for encryption"); + } + @Override public void readBlobAsync(String blobName, ActionListener listener) { try { diff --git a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java index 9d80b53e79eaa..e90afc4a8ba44 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java +++ b/server/src/main/java/org/opensearch/gateway/remote/ClusterMetadataManifest.java @@ -655,6 +655,7 @@ public void writeTo(StreamOutput out) throws IOException { uploadedSettingsMetadata.writeTo(out); uploadedTemplatesMetadata.writeTo(out); out.writeMap(uploadedCustomMetadataMap, StreamOutput::writeString, (o, v) -> v.writeTo(o)); + uploadedDiscoveryNodesMetadata.writeTo(out); out.writeLong(routingTableVersion); out.writeCollection(indicesRouting); } else if (out.getVersion().onOrAfter(Version.V_2_12_0)) { @@ -899,6 +900,7 @@ public Builder(ClusterMetadataManifest manifest) { this.previousClusterUUID = manifest.previousClusterUUID; this.clusterUUIDCommitted = manifest.clusterUUIDCommitted; this.diffManifest = manifest.diffManifest; + this.discoveryNodesMetadata = manifest.uploadedDiscoveryNodesMetadata; this.routingTableVersion = manifest.routingTableVersion; this.indicesRouting = new ArrayList<>(manifest.indicesRouting); } @@ -960,6 +962,7 @@ public static class UploadedIndexMetadata implements UploadedMetadata, Writeable private static final ParseField INDEX_NAME_FIELD = new ParseField("index_name"); private static final ParseField INDEX_UUID_FIELD = new ParseField("index_uuid"); private static final ParseField UPLOADED_FILENAME_FIELD = new ParseField("uploaded_filename"); + private static final ParseField COMPONENT_PREFIX_FIELD = new ParseField("component_prefix"); private static String indexName(Object[] fields) { return (String) fields[0]; @@ -973,23 +976,34 @@ private static String uploadedFilename(Object[] fields) { return (String) fields[2]; } + private static String componentPrefix(Object[] fields) { + return (String) fields[3]; + } + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "uploaded_index_metadata", - fields -> new UploadedIndexMetadata(indexName(fields), indexUUID(fields), uploadedFilename(fields)) + fields -> new UploadedIndexMetadata(indexName(fields), indexUUID(fields), uploadedFilename(fields), componentPrefix(fields)) ); static { PARSER.declareString(ConstructingObjectParser.constructorArg(), INDEX_NAME_FIELD); PARSER.declareString(ConstructingObjectParser.constructorArg(), INDEX_UUID_FIELD); PARSER.declareString(ConstructingObjectParser.constructorArg(), UPLOADED_FILENAME_FIELD); + PARSER.declareString(ConstructingObjectParser.constructorArg(), COMPONENT_PREFIX_FIELD); } static final String COMPONENT_PREFIX = "index--"; + private final String componentPrefix; private final String indexName; private final String indexUUID; private final String uploadedFilename; public UploadedIndexMetadata(String indexName, String indexUUID, String uploadedFileName) { + this( indexName,indexUUID,uploadedFileName, COMPONENT_PREFIX); + } + + public UploadedIndexMetadata(String indexName, String indexUUID, String uploadedFileName, String componentPrefix) { + this.componentPrefix = componentPrefix; this.indexName = indexName; this.indexUUID = indexUUID; this.uploadedFilename = uploadedFileName; @@ -999,6 +1013,7 @@ public UploadedIndexMetadata(StreamInput in) throws IOException { this.indexName = in.readString(); this.indexUUID = in.readString(); this.uploadedFilename = in.readString(); + this.componentPrefix = in.readString(); } public String getUploadedFilePath() { @@ -1007,7 +1022,7 @@ public String getUploadedFilePath() { @Override public String getComponent() { - return COMPONENT_PREFIX + getIndexName(); + return componentPrefix + getIndexName(); } public String getUploadedFilename() { @@ -1023,12 +1038,18 @@ public String getIndexUUID() { return indexUUID; } + public String getComponentPrefix() { + return componentPrefix; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { return builder .field(INDEX_NAME_FIELD.getPreferredName(), getIndexName()) .field(INDEX_UUID_FIELD.getPreferredName(), getIndexUUID()) - .field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilePath()); + .field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilePath()) + .field(COMPONENT_PREFIX_FIELD.getPreferredName(), getComponentPrefix()); + } @Override @@ -1036,6 +1057,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(indexName); out.writeString(indexUUID); out.writeString(uploadedFilename); + out.writeString(componentPrefix); } @Override @@ -1049,12 +1071,14 @@ public boolean equals(Object o) { final UploadedIndexMetadata that = (UploadedIndexMetadata) o; return Objects.equals(indexName, that.indexName) && Objects.equals(indexUUID, that.indexUUID) - && Objects.equals(uploadedFilename, that.uploadedFilename); + && Objects.equals(uploadedFilename, that.uploadedFilename) + && Objects.equals(componentPrefix, that.componentPrefix); + } @Override public int hashCode() { - return Objects.hash(indexName, indexUUID, uploadedFilename); + return Objects.hash(indexName, indexUUID, uploadedFilename, componentPrefix); } @Override diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 21ecf2b56dd21..e641a3304e594 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -18,6 +18,7 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.TemplatesMetadata; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.remote.RemoteRoutingTableService; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.CheckedRunnable; @@ -200,7 +201,6 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState, Stri return null; } - UploadedMetadataResults uploadedMetadataResults = writeMetadataInParallel( clusterState, new ArrayList<>(clusterState.metadata().indices().values()), @@ -209,14 +209,8 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState, Stri true, true, true, - true - ); - - List routingIndexMetadata = new ArrayList<>(); - if(remoteRoutingTableService!=null) { - routingIndexMetadata = remoteRoutingTableService.writeFullRoutingTable(clusterState, previousClusterUUID); - logger.info("routingIndexMetadata {}", routingIndexMetadata); - } + true, + new ArrayList<>(clusterState.getRoutingTable().indicesRouting().values())); final ClusterMetadataManifest manifest = remoteManifestManager.uploadManifest( clusterState, @@ -229,7 +223,7 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState, Stri uploadedMetadataResults.uploadedDiscoveryNodes, uploadedMetadataResults.uploadedClusterBlocks, new ClusterStateDiffManifest(clusterState, ClusterState.EMPTY_STATE), - routingIndexMetadata, + uploadedMetadataResults.uploadedIndicesRoutingMetadata, false ); @@ -339,6 +333,11 @@ public ClusterMetadataManifest writeIncrementalMetadata( } previousStateIndexMetadataVersionByName.remove(indexMetadata.getIndex().getName()); } + + List indicesRoutingToUpload = new ArrayList<>(); + if(remoteRoutingTableService!=null) { + indicesRoutingToUpload = remoteRoutingTableService.getChangedIndicesRouting(previousClusterState, clusterState); + } UploadedMetadataResults uploadedMetadataResults; boolean firstUpload = !previousManifest.hasMetadataAttributesFiles(); // For migration case from codec V0 or V1 to V2, we have added null check on metadata attribute files, @@ -352,7 +351,8 @@ public ClusterMetadataManifest writeIncrementalMetadata( true, true, true, - true + true, + indicesRoutingToUpload ); } else { uploadedMetadataResults = writeMetadataInParallel( @@ -363,14 +363,10 @@ public ClusterMetadataManifest writeIncrementalMetadata( updateSettingsMetadata, updateTemplatesMetadata, updateDiscoveryNodes, - updateClusterBlocks + updateClusterBlocks, + indicesRoutingToUpload ); } - List routingIndexMetadata = new ArrayList<>(); - if(remoteRoutingTableService!=null) { - routingIndexMetadata = remoteRoutingTableService.writeIncrementalRoutingTable(previousClusterState, clusterState, previousManifest); - logger.info("routingIndexMetadata incremental {}", routingIndexMetadata); - } // update the map if the metadata was uploaded uploadedMetadataResults.uploadedIndexMetadata.forEach( @@ -380,6 +376,13 @@ public ClusterMetadataManifest writeIncrementalMetadata( // remove the data for removed custom/indices previousStateCustomMap.keySet().forEach(allUploadedCustomMap::remove); previousStateIndexMetadataVersionByName.keySet().forEach(allUploadedIndexMetadata::remove); + + List allUploadedIndicesRouting = new ArrayList<>(); + if(remoteRoutingTableService!=null) { + allUploadedIndicesRouting = remoteRoutingTableService.getAllUploadedIndicesRouting(previousManifest, uploadedMetadataResults.uploadedIndicesRoutingMetadata); + } + //TODO: Get all routing metadata, combining curently uploaed and from previous manifest + final ClusterMetadataManifest manifest = remoteManifestManager.uploadManifest( clusterState, new ArrayList<>(allUploadedIndexMetadata.values()), @@ -397,8 +400,11 @@ public ClusterMetadataManifest writeIncrementalMetadata( firstUpload || updateDiscoveryNodes ? uploadedMetadataResults.uploadedDiscoveryNodes : previousManifest.getDiscoveryNodesMetadata(), firstUpload || updateClusterBlocks ? uploadedMetadataResults.uploadedClusterBlocks : previousManifest.getClusterBlocksMetadata(), new ClusterStateDiffManifest(clusterState, previousClusterState), - routingIndexMetadata, false + allUploadedIndicesRouting, false ); + + logger.info("MANIFEST IN INC STATE {}", manifest); + this.latestClusterName = clusterState.getClusterName().value(); this.latestClusterUUID = clusterState.metadata().clusterUUID(); @@ -445,10 +451,10 @@ private UploadedMetadataResults writeMetadataInParallel( boolean uploadSettingsMetadata, boolean uploadTemplateMetadata, boolean uploadDiscoveryNodes, - boolean uploadClusterBlock - ) throws IOException { + boolean uploadClusterBlock, + List indicesRoutingToUpload) throws IOException { int totalUploadTasks = indexToUpload.size() + customToUpload.size() + (uploadCoordinationMetadata ? 1 : 0) + (uploadSettingsMetadata - ? 1 : 0) + (uploadTemplateMetadata ? 1 : 0) + (uploadDiscoveryNodes ? 1 : 0) + (uploadClusterBlock ? 1 : 0); + ? 1 : 0) + (uploadTemplateMetadata ? 1 : 0) + (uploadDiscoveryNodes ? 1 : 0) + (uploadClusterBlock ? 1 : 0) + indicesRoutingToUpload.size(); CountDownLatch latch = new CountDownLatch(totalUploadTasks); Map> uploadTasks = new HashMap<>(totalUploadTasks); Map results = new HashMap<>(totalUploadTasks); @@ -456,7 +462,7 @@ private UploadedMetadataResults writeMetadataInParallel( LatchedActionListener listener = new LatchedActionListener<>( ActionListener.wrap((ClusterMetadataManifest.UploadedMetadata uploadedMetadata) -> { - logger.trace(String.format(Locale.ROOT, "Metadata component %s uploaded successfully.", uploadedMetadata.getComponent())); + logger.info(String.format(Locale.ROOT, "Metadata component %s uploaded successfully.", uploadedMetadata.getComponent())); results.put(uploadedMetadata.getComponent(), uploadedMetadata); }, ex -> { logger.error( @@ -548,6 +554,17 @@ private UploadedMetadataResults writeMetadataInParallel( ); }); + indicesRoutingToUpload.forEach(indexRoutingTable -> { + try { + uploadTasks.put( + indexRoutingTable.getIndex().getName() + "--indexRouting", + remoteRoutingTableService.getIndexRoutingAsyncAction(clusterState, indexRoutingTable, listener) + ); + } catch (IOException e) { + e.printStackTrace(); + } + }); + // start async upload of all required metadata files for (CheckedRunnable uploadTask : uploadTasks.values()) { uploadTask.run(); @@ -592,7 +609,10 @@ private UploadedMetadataResults writeMetadataInParallel( } UploadedMetadataResults response = new UploadedMetadataResults(); results.forEach((name, uploadedMetadata) -> { - if (uploadedMetadata.getClass().equals(UploadedIndexMetadata.class)) { + if (uploadedMetadata.getClass().equals(UploadedIndexMetadata.class) && uploadedMetadata.getComponent().contains("indexRouting")) { + response.uploadedIndicesRoutingMetadata.add((UploadedIndexMetadata) uploadedMetadata); + } + else if (uploadedMetadata.getClass().equals(UploadedIndexMetadata.class)) { response.uploadedIndexMetadata.add((UploadedIndexMetadata) uploadedMetadata); } else if (uploadedMetadata.getComponent().contains(CUSTOM_METADATA)) { // component name for custom metadata will look like custom-- @@ -615,6 +635,7 @@ private UploadedMetadataResults writeMetadataInParallel( throw new IllegalStateException("Unexpected metadata component " + uploadedMetadata.getComponent()); } }); + logger.info("response {}", response.uploadedIndicesRoutingMetadata.toString()); return response; } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java index 3263cabc50687..54918e5218080 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java @@ -75,6 +75,7 @@ static class UploadedMetadataResults { ClusterMetadataManifest.UploadedMetadataAttribute uploadedTemplatesMetadata; ClusterMetadataManifest.UploadedMetadataAttribute uploadedDiscoveryNodes; ClusterMetadataManifest.UploadedMetadataAttribute uploadedClusterBlocks; + List uploadedIndicesRoutingMetadata; public UploadedMetadataResults( List uploadedIndexMetadata, @@ -83,7 +84,8 @@ public UploadedMetadataResults( ClusterMetadataManifest.UploadedMetadataAttribute uploadedSettingsMetadata, ClusterMetadataManifest.UploadedMetadataAttribute uploadedTemplatesMetadata, ClusterMetadataManifest.UploadedMetadataAttribute uploadedDiscoveryNodes, - ClusterMetadataManifest.UploadedMetadataAttribute uploadedClusterBlocks + ClusterMetadataManifest.UploadedMetadataAttribute uploadedClusterBlocks, + List uploadedIndicesRoutingMetadata ) { this.uploadedIndexMetadata = uploadedIndexMetadata; this.uploadedCustomMetadataMap = uploadedCustomMetadataMap; @@ -92,6 +94,7 @@ public UploadedMetadataResults( this.uploadedTemplatesMetadata = uploadedTemplatesMetadata; this.uploadedDiscoveryNodes = uploadedDiscoveryNodes; this.uploadedClusterBlocks = uploadedClusterBlocks; + this.uploadedIndicesRoutingMetadata = uploadedIndicesRoutingMetadata; } public UploadedMetadataResults() { @@ -102,6 +105,7 @@ public UploadedMetadataResults() { this.uploadedTemplatesMetadata = null; this.uploadedDiscoveryNodes = null; this.uploadedClusterBlocks = null; + this.uploadedIndicesRoutingMetadata = new ArrayList<>(); } } }