Skip to content

Commit

Permalink
Async write flow for routing table
Browse files Browse the repository at this point in the history
  • Loading branch information
Himshikha Gupta committed May 23, 2024
1 parent 043a170 commit 89a10ac
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,41 +10,39 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.Version;
import org.apache.lucene.store.IndexInput;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.blobstore.transfer.RemoteTransferContainer;
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream;

import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.index.Index;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.gateway.remote.RemoteClusterStateUtils;
import org.opensearch.gateway.remote.routingtable.IndexRoutingTableInputStream;
import org.opensearch.gateway.remote.routingtable.IndexRoutingTableInputStreamReader;
import org.opensearch.index.remote.RemoteStoreEnums;
import org.opensearch.index.remote.RemoteStorePathStrategy;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat;
import org.opensearch.threadpool.ThreadPool;

import java.io.*;
import java.io.Closeable;
import java.io.IOException;

Expand All @@ -53,11 +51,13 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.opensearch.common.blobstore.transfer.RemoteTransferContainer.checksumOfChecksum;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.getCusterMetadataBasePath;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;

Expand All @@ -78,100 +78,157 @@ public class RemoteRoutingTableService implements Closeable {
Setting.Property.Final
);
public static final String INDEX_ROUTING_PATH_TOKEN = "index-routing";
public static final String INDEX_ROUTING_FILE_PREFIX = "index_routing";
public static final String DELIMITER = "__";

public static final String INDEX_ROUTING_METADATA_PREFIX = "indexRouting--";
private static final Logger logger = LogManager.getLogger(RemoteRoutingTableService.class);
private final Settings settings;
private final Supplier<RepositoriesService> repositoriesService;
private final ClusterSettings clusterSettings;
private BlobStoreRepository blobStoreRepository;
private final ThreadPool threadPool;

public RemoteRoutingTableService(Supplier<RepositoriesService> repositoriesService,
Settings settings,
ClusterSettings clusterSettings, ThreadPool threadPool) {
ThreadPool threadPool) {
assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled";
this.repositoriesService = repositoriesService;
this.settings = settings;
this.clusterSettings = clusterSettings;
this.threadPool = threadPool;
}

public List<ClusterMetadataManifest.UploadedIndexMetadata> writeFullRoutingTable(ClusterState clusterState, String previousClusterUUID) {
public List<IndexRoutingTable> getChangedIndicesRouting( ClusterState previousClusterState,
ClusterState clusterState) {
Map<String, IndexRoutingTable> previousIndexRoutingTable = previousClusterState.getRoutingTable().getIndicesRouting();
List<IndexRoutingTable> changedIndicesRouting = new ArrayList<>();
for (IndexRoutingTable indexRouting : clusterState.getRoutingTable().getIndicesRouting().values()) {
if (!(previousIndexRoutingTable.containsKey(indexRouting.getIndex().getName()) && indexRouting.equals(previousIndexRoutingTable.get(indexRouting.getIndex().getName())))) {
changedIndicesRouting.add(indexRouting);
logger.info("changedIndicesRouting {}", indexRouting.prettyPrint());
}
}

return changedIndicesRouting;
}

public CheckedRunnable<IOException> getIndexRoutingAsyncAction(
ClusterState clusterState,
IndexRoutingTable indexRouting,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
) throws IOException {

//batch index count and parallelize
RoutingTable currentRoutingTable = clusterState.getRoutingTable();
List<ClusterMetadataManifest.UploadedIndexMetadata> uploadedIndices = new ArrayList<>();
BlobPath custerMetadataBasePath = getCusterMetadataBasePath(blobStoreRepository, clusterState.getClusterName().value(),
clusterState.metadata().clusterUUID());
for (IndexRoutingTable indexRouting : currentRoutingTable.getIndicesRouting().values()) {
uploadedIndices.add(uploadIndex(indexRouting, custerMetadataBasePath));
clusterState.metadata().clusterUUID()).add(INDEX_ROUTING_PATH_TOKEN);
logger.info("custerMetadataBasePath {}", custerMetadataBasePath);

BlobPath path = RemoteStoreEnums.PathType.HASHED_PREFIX.path(RemoteStorePathStrategy.PathInput.builder()
.basePath(custerMetadataBasePath)
.indexUUID(indexRouting.getIndex().getUUID())
.build(),
RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64);
logger.info("path from prefix hasd {}", path);
final BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer(path);

final String fileName = getIndexRoutingFileName();
logger.info("fileName {}", fileName);

ActionListener<Void> completionListener = ActionListener.wrap(
resp -> latchedActionListener.onResponse(
new ClusterMetadataManifest.UploadedIndexMetadata(

indexRouting.getIndex().getName(),
indexRouting.getIndex().getUUID(),
path.buildAsString() + fileName,
INDEX_ROUTING_METADATA_PREFIX
)
),
ex -> latchedActionListener.onFailure(new RemoteClusterStateUtils.RemoteStateTransferException(indexRouting.getIndex().toString(), ex))
);

if (blobContainer instanceof AsyncMultiStreamBlobContainer == false) {
logger.info("TRYING FILE UPLOAD");

return () -> {
logger.info("Going to upload {}", indexRouting.prettyPrint());

uploadIndex(indexRouting, fileName , blobContainer);
logger.info("upload done {}", indexRouting.prettyPrint());

completionListener.onResponse(null);
logger.info("response done {}", indexRouting.prettyPrint());

};
}
logger.info("uploadedIndices {}", uploadedIndices);

return uploadedIndices;
logger.info("TRYING S3 UPLOAD");

//TODO: Integrate with S3AsyncCrtClient for using buffered stream directly with putObject.
try (
InputStream indexRoutingStream = new IndexRoutingTableInputStream(indexRouting);
IndexInput input = new ByteArrayIndexInput("indexrouting", indexRoutingStream.readAllBytes())) {
long expectedChecksum;
try {
expectedChecksum = checksumOfChecksum(input.clone(), 8);
} catch (Exception e) {
throw e;
}
try (
RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
fileName,
fileName,
input.length(),
true,
WritePriority.URGENT,
(size, position) -> new OffsetRangeIndexInputStream(input, size, position),
expectedChecksum,
((AsyncMultiStreamBlobContainer) blobContainer).remoteIntegrityCheckSupported()
)
) {
return () -> ((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(remoteTransferContainer.createWriteContext(), completionListener);
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
}

public List<ClusterMetadataManifest.UploadedIndexMetadata> writeIncrementalRoutingTable(
ClusterState previousClusterState,
ClusterState clusterState,
ClusterMetadataManifest previousManifest) {

public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndicesRouting(ClusterMetadataManifest previousManifest, List<ClusterMetadataManifest.UploadedIndexMetadata> indicesRoutingToUpload, Set<String> indicesRoutingToDelete) {
final Map<String, ClusterMetadataManifest.UploadedIndexMetadata> allUploadedIndicesRouting = previousManifest.getIndicesRouting()
.stream()
.collect(Collectors.toMap(ClusterMetadataManifest.UploadedIndexMetadata::getIndexName, Function.identity()));

indicesRoutingToUpload.forEach(
uploadedIndexRouting -> allUploadedIndicesRouting.put(uploadedIndexRouting.getIndexName(), uploadedIndexRouting)
);

indicesRoutingToDelete.forEach(index -> allUploadedIndicesRouting.remove(index));

logger.info("allUploadedIndicesRouting ROUTING {}", allUploadedIndicesRouting);

Map<String, IndexRoutingTable> previousIndexRoutingTable = previousClusterState.getRoutingTable().getIndicesRouting();
List<ClusterMetadataManifest.UploadedIndexMetadata> uploadedIndices = new ArrayList<>();
BlobPath custerMetadataBasePath = getCusterMetadataBasePath(blobStoreRepository, clusterState.getClusterName().value(),
clusterState.metadata().clusterUUID());
for (IndexRoutingTable indexRouting : clusterState.getRoutingTable().getIndicesRouting().values()) {
if (previousIndexRoutingTable.containsKey(indexRouting.getIndex().getName()) && indexRouting.equals(previousIndexRoutingTable.get(indexRouting.getIndex().getName()))) {
logger.info("index exists {}", indexRouting.getIndex().getName());
//existing index with no shard change.
uploadedIndices.add(allUploadedIndicesRouting.get(indexRouting.getIndex().getName()));
} else {
// new index or shards changed, in both cases we upload new index file.
uploadedIndices.add(uploadIndex(indexRouting, custerMetadataBasePath));
}
}
return uploadedIndices;
return new ArrayList<>(allUploadedIndicesRouting.values());
}

private ClusterMetadataManifest.UploadedIndexMetadata uploadIndex(IndexRoutingTable indexRouting, BlobPath custerMetadataBasePath) {
private void uploadIndex(IndexRoutingTable indexRouting, String fileName, BlobContainer container) {
logger.info("Starting write");

try {
InputStream indexRoutingStream = new IndexRoutingTableInputStream(indexRouting);
BlobContainer container = blobStoreRepository.blobStore().blobContainer(custerMetadataBasePath.add(INDEX_ROUTING_PATH_TOKEN).add(indexRouting.getIndex().getUUID()));
String indexRoutingFileName = getIndexRoutingFileName();
container.writeBlob(indexRoutingFileName, indexRoutingStream, 4096, true);
return new ClusterMetadataManifest.UploadedIndexMetadata(indexRouting.getIndex().getName(), indexRouting.getIndex().getUUID(), container.path().buildAsString() + indexRoutingFileName);

container.writeBlob(fileName, indexRoutingStream, 4096, true);
logger.info("SUccessful write");
} catch (IOException e) {
logger.error("Failed to write {}", e);
}
logger.info("SUccessful write");
return null;
}

private String getIndexRoutingFileName() {
return String.join(
DELIMITER,
//RemoteStoreUtils.invertLong(indexMetadata.getVersion()),
RemoteStoreUtils.invertLong(System.currentTimeMillis()),
String.valueOf("CODEC1") // Keep the codec version at last place only, during read we reads last
// place to determine codec version.
INDEX_ROUTING_FILE_PREFIX,
RemoteStoreUtils.invertLong(System.currentTimeMillis())
);

}
public RoutingTable getLatestRoutingTable(String clusterName, String clusterUUID) {
return null;
}

public RoutingTable getIncrementalRoutingTable(ClusterState previousClusterState, ClusterMetadataManifest previousManifest, String clusterName, String clusterUUID) {
return null;
}

public RoutingTable getIncrementalRoutingTable(ClusterState previousClusterState, ClusterMetadataManifest manifest){
List<String> indicesRoutingDeleted = manifest.getDiffManifest().getIndicesRoutingDeleted();
Expand Down Expand Up @@ -232,9 +289,10 @@ public CheckedRunnable<IOException> getAsyncIndexMetadataReadAction(
String uploadedFilename,
Index index,
LatchedActionListener<RemoteIndexRoutingResult> latchedActionListener) {
BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer(getCusterMetadataBasePath(blobStoreRepository, clusterName, clusterUUID).add(INDEX_ROUTING_PATH_TOKEN).add(index.getUUID()));
String[] fileNameTokens = uploadedFilename.split("/");
String blobFileName = fileNameTokens[fileNameTokens.length -1];
int idx = uploadedFilename.lastIndexOf("/");
String blobFileName = uploadedFilename.substring(idx+1);
BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer( BlobPath.cleanPath().add(uploadedFilename.substring(0,idx)));

return () -> readAsync(
blobContainer,
blobFileName,
Expand Down Expand Up @@ -262,8 +320,6 @@ public IndexRoutingTableInputStreamReader read(BlobContainer blobContainer, Stri
}
return null;
}
private void deleteStaleRoutingTable(String clusterName, String clusterUUID, int manifestsToRetain) {
}

@Override
public void close() throws IOException {
Expand Down Expand Up @@ -300,4 +356,5 @@ public IndexRoutingTable getIndexRoutingTable() {
return indexRoutingTable;
}
}

}
Loading

0 comments on commit 89a10ac

Please sign in to comment.