diff --git a/buildSrc/src/main/java/org/opensearch/gradle/testclusters/OpenSearchNode.java b/buildSrc/src/main/java/org/opensearch/gradle/testclusters/OpenSearchNode.java index ab765efde7885..b44b0363b016c 100644 --- a/buildSrc/src/main/java/org/opensearch/gradle/testclusters/OpenSearchNode.java +++ b/buildSrc/src/main/java/org/opensearch/gradle/testclusters/OpenSearchNode.java @@ -1239,7 +1239,7 @@ private void createConfiguration() { if (nodeName != null) { baseConfig.put("node.name", nodeName); } - baseConfig.put("path.repo", confPathRepo.toAbsolutePath().toString()); + baseConfig.put("path.repo", "/Users/gbbafna/git/OpenSearch/build/testclusters/repo"); baseConfig.put("path.data", confPathData.toAbsolutePath().toString()); baseConfig.put("path.logs", confPathLogs.toAbsolutePath().toString()); baseConfig.put("path.shared_data", workingDir.resolve("sharedData").toString()); diff --git a/server/src/main/java/org/opensearch/ExceptionsHelper.java b/server/src/main/java/org/opensearch/ExceptionsHelper.java index f252d0b05af79..c1fca54a06b9c 100644 --- a/server/src/main/java/org/opensearch/ExceptionsHelper.java +++ b/server/src/main/java/org/opensearch/ExceptionsHelper.java @@ -197,6 +197,16 @@ public static T useOrSuppress(T first, T second) { return first; } + public static RuntimeException multiple(List exceptions) { + RuntimeException multiple = new RuntimeException(); + if (exceptions != null) { + for (Throwable t : exceptions) { + multiple.addSuppressed(t); + } + } + return multiple; + } + private static final List> CORRUPTION_EXCEPTIONS = Arrays.asList( CorruptIndexException.class, IndexFormatTooOldException.class, diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java index cd1c92a8b109f..1121805d27d7b 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java @@ -288,6 +288,9 @@ public Iterator> settings() { public static final String SETTING_REMOTE_STORE_REPOSITORY = "index.remote_store.repository"; public static final String SETTING_REMOTE_TRANSLOG_STORE_ENABLED = "index.remote_store.translog.enabled"; + + public static final String SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY = "index.remote_store.translog.repository"; + /** * Used to specify if the index data should be persisted in the remote store. */ @@ -396,6 +399,39 @@ public Iterator> settings() { Property.Final ); + public static final Setting INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING = Setting.simpleString( + SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, + new Setting.Validator<>() { + + @Override + public void validate(final String value) {} + + @Override + public void validate(final String value, final Map, Object> settings) { + if (value != null && !value.isEmpty()) { + final Boolean isRemoteTranslogStoreEnabled = (Boolean) settings.get(INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING); + if (isRemoteTranslogStoreEnabled == null || isRemoteTranslogStoreEnabled == false) { + throw new IllegalArgumentException( + "Settings " + + INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING.getKey() + + " can only be set/enabled when " + + INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING.getKey() + + " is set to true" + ); + } + } + } + + @Override + public Iterator> settings() { + final List> settings = Collections.singletonList(INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING); + return settings.iterator(); + } + }, + Property.IndexScope, + Property.Final + ); + public static final String SETTING_AUTO_EXPAND_REPLICAS = "index.auto_expand_replicas"; public static final Setting INDEX_AUTO_EXPAND_REPLICAS_SETTING = AutoExpandReplicas.SETTING; diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java index d78e5e872fd2b..77d9b820cec9f 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java @@ -1184,6 +1184,11 @@ List getIndexSettingsValidationErrors( IndexMetadata.SETTING_NUMBER_OF_REPLICAS, INDEX_NUMBER_OF_REPLICAS_SETTING.getDefault(Settings.EMPTY) ); + + AutoExpandReplicas autoExpandReplicas = AutoExpandReplicas.SETTING.get(settings); + + + Optional error = awarenessReplicaBalance.validate(replicaCount); if (error.isPresent()) { validationErrors.add(error.get()); diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 7be9adc786f24..5bcef012215d0 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -226,7 +226,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings { Arrays.asList( IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING, IndexMetadata.INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING, - IndexMetadata.INDEX_REMOTE_STORE_REPOSITORY_SETTING + IndexMetadata.INDEX_REMOTE_STORE_REPOSITORY_SETTING, + IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING ) ); diff --git a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java index fa39dc9ac5aa0..f7393f15537ee 100644 --- a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java +++ b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java @@ -35,6 +35,7 @@ public class FeatureFlags { * and false otherwise. */ public static boolean isEnabled(String featureFlagName) { - return "true".equalsIgnoreCase(System.getProperty(featureFlagName)); + return "true".equalsIgnoreCase(System.getProperty(featureFlagName)) || featureFlagName.equals(REPLICATION_TYPE) + || featureFlagName.equals(REMOTE_STORE); } } diff --git a/server/src/main/java/org/opensearch/index/IndexModule.java b/server/src/main/java/org/opensearch/index/IndexModule.java index e52a2ba39ed52..49a4ebdcdf34f 100644 --- a/server/src/main/java/org/opensearch/index/IndexModule.java +++ b/server/src/main/java/org/opensearch/index/IndexModule.java @@ -76,6 +76,7 @@ import org.opensearch.indices.mapper.MapperRegistry; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.plugins.IndexStorePlugin; +import org.opensearch.repositories.RepositoriesService; import org.opensearch.script.ScriptService; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; import org.opensearch.threadpool.ThreadPool; @@ -94,6 +95,7 @@ import java.util.function.BooleanSupplier; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; /** * IndexModule represents the central extension point for index level custom implementations like: @@ -486,7 +488,8 @@ public IndexService newIndexService( NamedWriteableRegistry namedWriteableRegistry, BooleanSupplier idFieldDataEnabled, ValuesSourceRegistry valuesSourceRegistry, - IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory + IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory, + Supplier repositoriesServiceSupplier ) throws IOException { final IndexEventListener eventListener = freeze(); Function> readerWrapperFactory = indexReaderWrapper @@ -541,7 +544,8 @@ public IndexService newIndexService( allowExpensiveQueries, expressionResolver, valuesSourceRegistry, - recoveryStateFactory + recoveryStateFactory, + repositoriesServiceSupplier ); success = true; return indexService; diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index 92f957633db84..c8f7e2d8f93ac 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -89,7 +89,9 @@ import org.opensearch.index.similarity.SimilarityService; import org.opensearch.index.store.Store; import org.opensearch.index.translog.InternalTranslogFactory; +import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory; import org.opensearch.index.translog.Translog; +import org.opensearch.index.translog.TranslogFactory; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.indices.cluster.IndicesClusterStateService; import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; @@ -97,6 +99,7 @@ import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.plugins.IndexStorePlugin; +import org.opensearch.repositories.RepositoriesService; import org.opensearch.script.ScriptService; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; import org.opensearch.threadpool.ThreadPool; @@ -173,6 +176,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private final IndexNameExpressionResolver expressionResolver; private final Supplier indexSortSupplier; private final ValuesSourceRegistry valuesSourceRegistry; + private final Supplier repositoriesServiceSupplier; public IndexService( IndexSettings indexSettings, @@ -204,7 +208,8 @@ public IndexService( BooleanSupplier allowExpensiveQueries, IndexNameExpressionResolver expressionResolver, ValuesSourceRegistry valuesSourceRegistry, - IndexStorePlugin.RecoveryStateFactory recoveryStateFactory + IndexStorePlugin.RecoveryStateFactory recoveryStateFactory, + Supplier repositoriesServiceSupplier ) { super(indexSettings); this.allowExpensiveQueries = allowExpensiveQueries; @@ -276,6 +281,7 @@ public IndexService( this.trimTranslogTask = new AsyncTrimTranslogTask(this); this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this); this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this); + this.repositoriesServiceSupplier = repositoriesServiceSupplier; updateFsyncTaskIfNecessary(); } @@ -518,6 +524,11 @@ public synchronized IndexShard createShard( remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, lock, Store.OnClose.EMPTY); } + TranslogFactory translogFactory = this.indexSettings.isRemoteTranslogStoreEnabled() + ? new RemoteBlobStoreInternalTranslogFactory(repositoriesServiceSupplier, threadPool, + this.indexSettings.getRemoteStoreTranslogRepository()) + : new InternalTranslogFactory(); + Directory directory = directoryFactory.newDirectory(this.indexSettings, path); store = new Store( shardId, @@ -548,8 +559,7 @@ public synchronized IndexShard createShard( () -> globalCheckpointSyncer.accept(shardId), retentionLeaseSyncer, circuitBreakerService, - // TODO Replace with remote translog factory in the follow up PR - this.indexSettings.isRemoteTranslogStoreEnabled() ? null : new InternalTranslogFactory(), + translogFactory, this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null, remoteStore ); diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 9c7f4804755d4..1e36e6c6b90a3 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -560,6 +560,7 @@ public final class IndexSettings { private final ReplicationType replicationType; private final boolean isRemoteStoreEnabled; private final boolean isRemoteTranslogStoreEnabled; + private final String remoteStoreTranslogRepository; private final String remoteStoreRepository; // volatile fields are updated via #updateIndexMetadata(IndexMetadata) under lock private volatile Settings settings; @@ -722,6 +723,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti replicationType = ReplicationType.parseString(settings.get(IndexMetadata.SETTING_REPLICATION_TYPE)); isRemoteStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false); isRemoteTranslogStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, false); + remoteStoreTranslogRepository = settings.get(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY); remoteStoreRepository = settings.get(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY); this.searchThrottled = INDEX_SEARCH_THROTTLED.get(settings); this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings); @@ -988,6 +990,10 @@ public String getRemoteStoreRepository() { return remoteStoreRepository; } + public String getRemoteStoreTranslogRepository() { + return remoteStoreTranslogRepository; + } + /** * Returns the node settings. The settings returned from {@link #getSettings()} are a merged version of the * index settings and the node settings where node settings are overwritten by index settings. diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 9185ef0d440ce..eb09871dc754a 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1433,6 +1433,7 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() { * @return true if checkpoint should be processed */ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) { + assert replicationTracker.isPrimaryMode(); if (state().equals(IndexShardState.STARTED) == false) { logger.trace(() -> new ParameterizedMessage("Ignoring new replication checkpoint - shard is not started {}", state())); return false; diff --git a/server/src/main/java/org/opensearch/index/translog/Checkpoint.java b/server/src/main/java/org/opensearch/index/translog/Checkpoint.java index ade28791b2e27..c7339ea1dac8a 100644 --- a/server/src/main/java/org/opensearch/index/translog/Checkpoint.java +++ b/server/src/main/java/org/opensearch/index/translog/Checkpoint.java @@ -59,7 +59,7 @@ * * @opensearch.internal */ -final class Checkpoint { +final public class Checkpoint { final long offset; final int numOps; @@ -262,6 +262,10 @@ public synchronized byte[] toByteArray() { return byteOutputStream.toByteArray(); } + public long getMinTranslogGeneration() { + return minTranslogGeneration; + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogFactory.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogFactory.java index 566eda4fe4a6e..a363992203721 100644 --- a/server/src/main/java/org/opensearch/index/translog/InternalTranslogFactory.java +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogFactory.java @@ -29,7 +29,7 @@ public Translog newTranslog( LongConsumer persistedSequenceNumberConsumer ) throws IOException { - return new Translog( + return new LocalTranslog( translogConfig, translogUUID, translogDeletionPolicy, diff --git a/server/src/main/java/org/opensearch/index/translog/LocalTranslog.java b/server/src/main/java/org/opensearch/index/translog/LocalTranslog.java new file mode 100644 index 0000000000000..24a4d9960b026 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/LocalTranslog.java @@ -0,0 +1,232 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog; + +import org.opensearch.common.util.concurrent.ReleasableLock; +import org.opensearch.core.internal.io.IOUtils; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collections; +import java.util.function.LongConsumer; +import java.util.function.LongSupplier; + +public class LocalTranslog extends Translog { + + /** + * Creates a new Translog instance. This method will create a new transaction log unless the given {@link TranslogGeneration} is + * {@code null}. If the generation is {@code null} this method is destructive and will delete all files in the translog path given. If + * the generation is not {@code null}, this method tries to open the given translog generation. The generation is treated as the last + * generation referenced from already committed data. This means all operations that have not yet been committed should be in the + * translog file referenced by this generation. The translog creation will fail if this generation can't be opened. + * + * @param config the configuration of this translog + * @param translogUUID the translog uuid to open, null for a new translog + * @param deletionPolicy an instance of {@link TranslogDeletionPolicy} that controls when a translog file can be safely + * deleted + * @param globalCheckpointSupplier a supplier for the global checkpoint + * @param primaryTermSupplier a supplier for the latest value of primary term of the owning index shard. The latest term value is + * examined and stored in the header whenever a new generation is rolled. It's guaranteed from outside + * that a new generation is rolled when the term is increased. This guarantee allows to us to validate + * and reject operation whose term is higher than the primary term stored in the translog header. + * @param persistedSequenceNumberConsumer a callback that's called whenever an operation with a given sequence number is successfully + * persisted. + */ + public LocalTranslog( + final TranslogConfig config, + final String translogUUID, + TranslogDeletionPolicy deletionPolicy, + final LongSupplier globalCheckpointSupplier, + final LongSupplier primaryTermSupplier, + final LongConsumer persistedSequenceNumberConsumer + ) throws IOException { + super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer); + try { + final Checkpoint checkpoint = readCheckpoint(location); + final Path nextTranslogFile = location.resolve(getFilename(checkpoint.generation + 1)); + final Path currentCheckpointFile = location.resolve(getCommitCheckpointFileName(checkpoint.generation)); + // this is special handling for error condition when we create a new writer but we fail to bake + // the newly written file (generation+1) into the checkpoint. This is still a valid state + // we just need to cleanup before we continue + // we hit this before and then blindly deleted the new generation even though we managed to bake it in and then hit this: + // https://discuss.elastic.co/t/cannot-recover-index-because-of-missing-tanslog-files/38336 as an example + // + // For this to happen we must have already copied the translog.ckp file into translog-gen.ckp so we first check if that + // file exists. If not we don't even try to clean it up and wait until we fail creating it + assert Files.exists(nextTranslogFile) == false || Files.size(nextTranslogFile) <= TranslogHeader.headerSizeInBytes(translogUUID) + : "unexpected translog file: [" + nextTranslogFile + "]"; + if (Files.exists(currentCheckpointFile) // current checkpoint is already copied + && Files.deleteIfExists(nextTranslogFile)) { // delete it and log a warning + logger.warn( + "deleted previously created, but not yet committed, next generation [{}]. This can happen due to a" + + " tragic exception when creating a new generation", + nextTranslogFile.getFileName() + ); + } + this.readers.addAll(recoverFromFiles(checkpoint)); + if (readers.isEmpty()) { + throw new IllegalStateException("at least one reader must be recovered"); + } + boolean success = false; + current = null; + try { + current = createWriter( + checkpoint.generation + 1, + getMinFileGeneration(), + checkpoint.globalCheckpoint, + persistedSequenceNumberConsumer + ); + success = true; + } finally { + // we have to close all the recovered ones otherwise we leak file handles here + // for instance if we have a lot of tlog and we can't create the writer we keep on holding + // on to all the uncommitted tlog files if we don't close + if (success == false) { + IOUtils.closeWhileHandlingException(readers); + } + } + } catch (Exception e) { + // close the opened translog files if we fail to create a new translog... + IOUtils.closeWhileHandlingException(current); + IOUtils.closeWhileHandlingException(readers); + throw e; + } + } + + /** + * Ensures that the given location has be synced / written to the underlying storage. + * + * @return Returns true iff this call caused an actual sync operation otherwise false + */ + @Override + public boolean ensureSynced(Location location) throws IOException { + try (ReleasableLock ignored = readLock.acquire()) { + if (location.generation == current.getGeneration()) { // if we have a new one it's already synced + ensureOpen(); + return current.syncUpTo(location.translogLocation + location.size); + } + } catch (final Exception ex) { + closeOnTragicEvent(ex); + throw ex; + } + return false; + } + + /** + * return stats + */ + @Override + public TranslogStats stats() { + // acquire lock to make the two numbers roughly consistent (no file change half way) + try (ReleasableLock lock = readLock.acquire()) { + long uncommittedGen = getMinGenerationForSeqNo(deletionPolicy.getLocalCheckpointOfSafeCommit() + 1).translogFileGeneration; + return new TranslogStats( + totalOperations(), + sizeInBytes(), + totalOperationsByMinGen(uncommittedGen), + sizeInBytesByMinGen(uncommittedGen), + earliestLastModifiedAge() + ); + } + } + + /** recover all translog files found on disk */ + protected ArrayList recoverFromFiles(Checkpoint checkpoint) throws IOException { + boolean success = false; + ArrayList foundTranslogs = new ArrayList<>(); + try (ReleasableLock ignored = writeLock.acquire()) { + logger.debug("open uncommitted translog checkpoint {}", checkpoint); + final long minGenerationToRecoverFrom = checkpoint.minTranslogGeneration; + + // we open files in reverse order in order to validate the translog uuid before we start traversing the translog based on + // the generation id we found in the lucene commit. This gives for better error messages if the wrong + // translog was found. + for (long i = checkpoint.generation; i >= minGenerationToRecoverFrom; i--) { + Path committedTranslogFile = location.resolve(Translog.getFilename(i)); + if (Files.exists(committedTranslogFile) == false) { + throw new TranslogCorruptedException( + committedTranslogFile.toString(), + "translog file doesn't exist with generation: " + + i + + " recovering from: " + + minGenerationToRecoverFrom + + " checkpoint: " + + checkpoint.generation + + " - translog ids must be consecutive" + ); + } + final Checkpoint readerCheckpoint = i == checkpoint.generation + ? checkpoint + : Checkpoint.read(location.resolve(Translog.getCommitCheckpointFileName(i))); + final TranslogReader reader = openReader(committedTranslogFile, readerCheckpoint); + assert reader.getPrimaryTerm() <= primaryTermSupplier.getAsLong() : "Primary terms go backwards; current term [" + + primaryTermSupplier.getAsLong() + + "] translog path [ " + + committedTranslogFile + + ", existing term [" + + reader.getPrimaryTerm() + + "]"; + foundTranslogs.add(reader); + logger.debug("recovered local translog from checkpoint {}", checkpoint); + } + Collections.reverse(foundTranslogs); + + // when we clean up files, we first update the checkpoint with a new minReferencedTranslog and then delete them; + // if we crash just at the wrong moment, it may be that we leave one unreferenced file behind so we delete it if there + IOUtils.deleteFilesIgnoringExceptions( + location.resolve(Translog.getFilename(minGenerationToRecoverFrom - 1)), + location.resolve(Translog.getCommitCheckpointFileName(minGenerationToRecoverFrom - 1)) + ); + + Path commitCheckpoint = location.resolve(Translog.getCommitCheckpointFileName(checkpoint.generation)); + if (Files.exists(commitCheckpoint)) { + Checkpoint checkpointFromDisk = Checkpoint.read(commitCheckpoint); + if (checkpoint.equals(checkpointFromDisk) == false) { + throw new TranslogCorruptedException( + commitCheckpoint.toString(), + "checkpoint file " + + commitCheckpoint.getFileName() + + " already exists but has corrupted content: expected " + + checkpoint + + " but got " + + checkpointFromDisk + ); + } + } else { + copyCheckpointTo(commitCheckpoint); + } + success = true; + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(foundTranslogs); + } + } + return foundTranslogs; + } + + @Override + public void close() throws IOException { + assert Translog.calledFromOutsideOrViaTragedyClose() + : "Translog.close method is called from inside Translog, but not via closeOnTragicEvent method"; + if (closed.compareAndSet(false, true)) { + try (ReleasableLock lock = writeLock.acquire()) { + try { + current.sync(); + } finally { + closeFilesIfNoPendingRetentionLocks(); + } + } finally { + logger.debug("translog closed"); + } + } + } + +} diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java b/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java new file mode 100644 index 0000000000000..2dfc29b86f251 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java @@ -0,0 +1,65 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog; + +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.Repository; +import org.opensearch.repositories.RepositoryMissingException; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.function.LongConsumer; +import java.util.function.LongSupplier; +import java.util.function.Supplier; + +public class RemoteBlobStoreInternalTranslogFactory implements TranslogFactory { + + private final Repository repository; + private final ThreadPool threadPool; + + public RemoteBlobStoreInternalTranslogFactory( + Supplier repositoriesServiceSupplier, + ThreadPool threadPool, + String repositoryName + ) { + Repository repository; + try { + repository = repositoriesServiceSupplier.get().repository(repositoryName); + } catch (RepositoryMissingException ex) { + throw new IllegalArgumentException("Repository should be created before creating index with remote_store enabled setting", ex); + } + this.repository = repository; + this.threadPool = threadPool; + } + + @Override + public Translog newTranslog( + TranslogConfig config, + String translogUUID, + TranslogDeletionPolicy deletionPolicy, + LongSupplier globalCheckpointSupplier, + LongSupplier primaryTermSupplier, + LongConsumer persistedSequenceNumberConsumer + ) throws IOException { + + assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; + BlobStoreRepository blobStoreRepository = ((BlobStoreRepository) repository); + return new RemoteFsTranslog( + config, + translogUUID, + deletionPolicy, + globalCheckpointSupplier, + primaryTermSupplier, + persistedSequenceNumberConsumer, + blobStoreRepository, + threadPool + ); + } +} diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java new file mode 100644 index 0000000000000..942f41dfa0aad --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -0,0 +1,399 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog; + +import org.opensearch.common.collect.Tuple; +import org.opensearch.common.lease.Releasable; +import org.opensearch.common.lease.Releasables; +import org.opensearch.common.util.concurrent.ReleasableLock; +import org.opensearch.core.internal.io.IOUtils; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.index.translog.transfer.FileTransferTracker; +import org.opensearch.index.translog.transfer.TransferSnapshot; +import org.opensearch.index.translog.transfer.TransferSnapshotProvider; +import org.opensearch.index.translog.transfer.TranslogTransferManager; +import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.function.LongConsumer; +import java.util.function.LongSupplier; + +/** + * A Translog implementation which syncs local FS with a remote store + * The current impl uploads translog , ckp and metadata to remote store + * for every sync, post syncing to disk. Post that, a new generation is + * created. + * + */ +public class RemoteFsTranslog extends Translog { + + private final BlobStoreRepository blobStoreRepository; + private final TranslogTransferManager translogTransferManager; + private final static String METADATA_DIR = "metadata"; + + private final FileTransferTracker fileTransferTracker; + + public RemoteFsTranslog( + TranslogConfig config, + String translogUUID, + TranslogDeletionPolicy deletionPolicy, + LongSupplier globalCheckpointSupplier, + LongSupplier primaryTermSupplier, + LongConsumer persistedSequenceNumberConsumer, + BlobStoreRepository blobStoreRepository, + ThreadPool threadPool + ) throws IOException { + super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer); + this.blobStoreRepository = blobStoreRepository; + fileTransferTracker = new FileTransferTracker(shardId); + this.translogTransferManager = new TranslogTransferManager( + new BlobStoreTransferService(blobStoreRepository.blobStore(), threadPool), + blobStoreRepository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())), + blobStoreRepository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(METADATA_DIR), + fileTransferTracker, + fileTransferTracker::exclusionFilter + ); + try { + final Checkpoint checkpoint = readCheckpoint(location); + this.readers.addAll(recoverFromFiles(checkpoint)); + if (readers.isEmpty()) { + throw new IllegalStateException("at least one reader must be recovered"); + } + + boolean success = false; + current = null; + try { + current = createWriter( + checkpoint.generation + 1, + getMinFileGeneration(), + checkpoint.globalCheckpoint, + persistedSequenceNumberConsumer + ); + success = true; + } finally { + // we have to close all the recovered ones otherwise we leak file handles here + // for instance if we have a lot of tlog and we can't create the writer we keep + // on holding + // on to all the uncommitted tlog files if we don't close + if (success == false) { + IOUtils.closeWhileHandlingException(readers); + } + } + } catch (Exception e) { + // close the opened translog files if we fail to create a new translog... + IOUtils.closeWhileHandlingException(current); + IOUtils.closeWhileHandlingException(readers); + throw e; + } + } + + /** recover all translog files found on disk */ + protected ArrayList recoverFromFiles(Checkpoint checkpoint) throws IOException { + boolean success = false; + ArrayList foundTranslogs = new ArrayList<>(); + try (ReleasableLock ignored = writeLock.acquire()) { + logger.debug("open uncommitted translog checkpoint {}", checkpoint); + final long minGenerationToRecoverFrom = checkpoint.minTranslogGeneration; + + // we open files in reverse order in order to validate the translog uuid before we start traversing the translog based on + // the generation id we found in the lucene commit. This gives for better error messages if the wrong + // translog was found. + for (long i = checkpoint.generation; i >= minGenerationToRecoverFrom; i--) { + logger.info("recovering generation {}", i); + Path committedTranslogFile = location.resolve(Translog.getFilename(i)); + if (Files.exists(committedTranslogFile) == false) { + throw new TranslogCorruptedException( + committedTranslogFile.toString(), + "translog file doesn't exist with generation: " + + i + + " recovering from: " + + minGenerationToRecoverFrom + + " checkpoint: " + + checkpoint.generation + + " - translog ids must be consecutive" + ); + } + final Checkpoint readerCheckpoint = i == checkpoint.generation + ? checkpoint + : Checkpoint.read(location.resolve(Translog.getCommitCheckpointFileName(i))); + final TranslogReader reader = openReader(committedTranslogFile, readerCheckpoint); + assert reader.getPrimaryTerm() <= primaryTermSupplier.getAsLong() : "Primary terms go backwards; current term [" + + primaryTermSupplier.getAsLong() + + "] translog path [ " + + committedTranslogFile + + ", existing term [" + + reader.getPrimaryTerm() + + "]"; + foundTranslogs.add(reader); + logger.debug("recovered local translog from checkpoint {}", checkpoint); + } + Collections.reverse(foundTranslogs); + + // when we clean up files, we first update the checkpoint with a new minReferencedTranslog and then delete them; + // if we crash just at the wrong moment, it may be that we leave one unreferenced file behind so we delete it if there + IOUtils.deleteFilesIgnoringExceptions( + location.resolve(Translog.getFilename(minGenerationToRecoverFrom - 1)), + location.resolve(Translog.getCommitCheckpointFileName(minGenerationToRecoverFrom - 1)) + ); + + Path commitCheckpoint = location.resolve(Translog.getCommitCheckpointFileName(checkpoint.generation)); + if (Files.exists(commitCheckpoint)) { + Checkpoint checkpointFromDisk = Checkpoint.read(commitCheckpoint); + if (checkpoint.equals(checkpointFromDisk) == false) { + throw new TranslogCorruptedException( + commitCheckpoint.toString(), + "checkpoint file " + + commitCheckpoint.getFileName() + + " already exists but has corrupted content: expected " + + checkpoint + + " but got " + + checkpointFromDisk + ); + } + } else { + copyCheckpointTo(commitCheckpoint); + } + success = true; + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(foundTranslogs); + } + } + return foundTranslogs; + } + + @Override + boolean ensureSynced(Location location) throws IOException { + Callable execute = () -> false; + try (ReleasableLock lock = readLock.acquire()) { + assert location.generation <= current.getGeneration(); + if (location.generation == current.getGeneration()) { + ensureOpen(); + execute = () -> prepareAndUpload(primaryTermSupplier.getAsLong(), location.generation); + } + } catch (final Exception ex) { + closeOnTragicEvent(ex); + throw ex; + } + try { + return execute.call(); + } catch (Exception ex) { + closeOnTragicEvent(ex); + assert ex instanceof IOException; + throw (IOException) ex; + } + } + + @Override + public void rollGeneration() throws IOException { + syncBeforeRollGeneration(); + if (current.totalOperations() == 0 && primaryTermSupplier.getAsLong() == current.getPrimaryTerm()) { + return; + } + prepareAndUpload(primaryTermSupplier.getAsLong(), null); + } + + private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOException { + try (Releasable ignored = writeLock.acquire()) { + if (generation == null || generation == current.getGeneration()) { + try { + final TranslogReader reader = current.closeIntoReader(); + readers.add(reader); + copyCheckpointTo(location.resolve(getCommitCheckpointFileName(current.getGeneration()))); + if (closed.get() == false) { + logger.trace("Creating new writer for gen: [{}]", current.getGeneration() + 1); + current = createWriter(current.getGeneration() + 1); + logger.trace("current translog set to [{}]", current.getGeneration()); + } + } catch (final Exception e) { + tragedy.setTragicException(e); + closeOnTragicEvent(e); + throw e; + } + } + // ToDo : Do we need remote writes in sync fashion ? + // If we don't , we should swallow FileAlreadyExistsException + // and also verify for same during primary-primary relocation + // Writing remote in sync fashion doesn't hurt as global ckp update + // is not updated in remote translog. + if (generation == null) { + upload(primaryTerm, current.getGeneration() - 1) ; + //updateReaders(); + return true; + } else { + return upload(primaryTerm, generation); + } + } + } + + private boolean upload(Long primaryTerm, Long generation) throws IOException { + logger.trace("uploading translog for {} {} ", primaryTerm, generation); + TransferSnapshotProvider transferSnapshotProvider = new TransferSnapshotProvider(primaryTerm, generation, this.location, readers); + Releasable transferReleasable = Releasables.wrap(deletionPolicy.acquireTranslogGen(getMinFileGeneration())); + return translogTransferManager.uploadTranslog(transferSnapshotProvider.get(), new TranslogTransferListener() { + @Override + + public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOException { + transferReleasable.close(); + closeFilesIfNoPendingRetentionLocks(); + logger.trace("uploaded translog for {} {} ", primaryTerm, generation); + } + + @Override + public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) throws IOException { + transferReleasable.close(); + closeFilesIfNoPendingRetentionLocks(); + } + }); + } + + // Visible for testing + public Set allUploaded() { + return fileTransferTracker.allUploaded(); + } + + private boolean syncToDisk() throws IOException { + try (ReleasableLock lock = readLock.acquire()) { + return current.sync(); + } catch (final Exception ex) { + closeOnTragicEvent(ex); + throw ex; + } + } + + public boolean updateReaders() throws IOException { + //recover all translog files found on remote translog + Optional rtm = translogTransferManager.findLatestMetadata(); + if (rtm.isEmpty()) { + logger.info("No translog metadata file found, this can happen for new index with no data uploaded to remote translog store"); + return true; + } + RemoteTranslogMetadata remoteTranslogMetadata = rtm.get(); + Map generationToPrimaryTermMapper = remoteTranslogMetadata.getGenerationToPrimaryTermMapper(); + + Set generations = new HashSet<>(); + generationToPrimaryTermMapper.entrySet().stream() + .forEach( + genToTerm -> { + generations.add(Long.parseLong(genToTerm.getKey(), Character.MAX_RADIX)); + } + ); + + long maxGen = Collections.max(generations); + long maxPrimary = Long.parseLong((String) generationToPrimaryTermMapper.get(Long.toString(maxGen, Character.MAX_RADIX))); + logger.info("Highest gen and primary {} {}", maxGen, maxPrimary); + Tuple t2 = translogTransferManager.readTranslogGen(maxPrimary, maxGen); + Path file2 = location.resolve(getCommitCheckpointFileName(maxGen)); + Files.deleteIfExists(file2); + final FileChannel channel2 = getChannelFactory().open(file2); + channel2.write(ByteBuffer.wrap(t2.v2())); + channel2.force(true); + final Checkpoint checkpoint = Checkpoint.read(file2); + logger.info("updated checkpoint {} {}", checkpoint); + logger.info("current checkpoint's generation {}", current.generation); + + generationToPrimaryTermMapper.entrySet().stream() + .forEach( + genToTerm -> { + Long gen = Long.parseLong(genToTerm.getKey(), Character.MAX_RADIX); + Long primary = Long.parseLong((String) genToTerm.getValue(), Character.MAX_RADIX); + + try { + Tuple t = translogTransferManager.readTranslogGen(primary, gen); + Path file = location.resolve(getFilename(gen)); + Files.deleteIfExists(file); + final FileChannel channel = getChannelFactory().open(file); + channel.write(ByteBuffer.wrap(t.v1())); + channel.force(true); + + file = location.resolve(getCommitCheckpointFileName(gen)); + Files.deleteIfExists(file); + final FileChannel ckpChannel = getChannelFactory().open(file); + ckpChannel.write(ByteBuffer.wrap(t.v2())); + ckpChannel.force(true); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + ); + reloadReaders(checkpoint); + + return true; + } + + public void reloadReaders(Checkpoint checkpoint) throws IOException { + logger.info("reloading readers {}", this.readers.size()); + ArrayList oldGens = new ArrayList<>(); + ArrayList newGens = new ArrayList<>(); + Iterator iterator = this.readers.iterator(); + TranslogReader reader; + while (iterator.hasNext()) { + reader = iterator.next(); + oldGens.add(reader.getCheckpoint().generation); + } + + this.readers.clear(); + this.readers.addAll(recoverFromFiles(checkpoint)); + + iterator = this.readers.iterator(); + while (iterator.hasNext()) { + reader = iterator.next(); + newGens.add(reader.getCheckpoint().generation); + } + + logger.info("Old gen {}", oldGens); + logger.info("New gen {}", newGens); + + assert(oldGens.equals(newGens) ) : "Older gens size" + oldGens.size() + "New gens" + newGens.size(); + logger.info("reloaded readers {}", this.readers.size()); + } + + @Override + public void sync() throws IOException { + try { + if (syncToDisk()) { + prepareAndUpload(primaryTermSupplier.getAsLong(), null); + } + } catch (final Exception e) { + tragedy.setTragicException(e); + closeOnTragicEvent(e); + throw e; + } + } + + @Override + public void close() throws IOException { + assert Translog.calledFromOutsideOrViaTragedyClose() + : "Translog.close method is called from inside Translog, but not via closeOnTragicEvent method"; + if (closed.compareAndSet(false, true)) { + try (ReleasableLock lock = writeLock.acquire()) { + if (current.syncNeeded()) { + prepareAndUpload(primaryTermSupplier.getAsLong(), null); + } + } finally { + logger.debug("translog closed"); + closeFilesIfNoPendingRetentionLocks(); + } + } + } +} diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteTranslogMetadata.java b/server/src/main/java/org/opensearch/index/translog/RemoteTranslogMetadata.java new file mode 100644 index 0000000000000..f19eed7e24415 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/RemoteTranslogMetadata.java @@ -0,0 +1,148 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog; + +import org.apache.lucene.util.SetOnce; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Map; +import java.util.Objects; + +public class RemoteTranslogMetadata implements Writeable, Comparable { + + private final long primaryTerm; + + private final long generation; + + private final long minTranslogGeneration; + + private final long timeStamp; + + private final SetOnce> generationToPrimaryTermMapper = new SetOnce<>(); + + public static final String METADATA_SEPARATOR = "_"; + + public static final MetadataFilenameComparator METADATA_FILENAME_COMPARATOR = + new MetadataFilenameComparator(); + + public RemoteTranslogMetadata(long primaryTerm, long generation, long minTranslogGeneration) { + this.primaryTerm = primaryTerm; + this.generation = generation; + this.minTranslogGeneration = minTranslogGeneration; + this.timeStamp = System.currentTimeMillis(); + } + + public RemoteTranslogMetadata(StreamInput in) throws IOException { + this.primaryTerm = in.readLong(); + this.generation = in.readLong(); + this.minTranslogGeneration = in.readLong(); + this.timeStamp = in.readLong(); + this.generationToPrimaryTermMapper.set(in.readMap()); + } + + public long getPrimaryTerm() { + return primaryTerm; + } + + public long getGeneration() { + return generation; + } + + public long getMinTranslogGeneration() { + return minTranslogGeneration; + } + + public void setGenerationToPrimaryTermMapper(Map generationToPrimaryTermMap) { + generationToPrimaryTermMapper.set(generationToPrimaryTermMap); + } + + public Map getGenerationToPrimaryTermMapper() { + return generationToPrimaryTermMapper.get(); + } + + public String getMetadataFileName() { + return String.join( + METADATA_SEPARATOR, + Arrays.asList(String.valueOf(primaryTerm), String.valueOf(generation), String.valueOf(timeStamp)) + ); + } + + @Override + public int hashCode() { + return Objects.hash(primaryTerm, generation, timeStamp); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + RemoteTranslogMetadata other = (RemoteTranslogMetadata) o; + return Objects.equals(this.primaryTerm, other.primaryTerm) + && Objects.equals(this.generation, other.generation) + && Objects.equals(this.timeStamp, other.timeStamp); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeLong(primaryTerm); + out.writeLong(generation); + out.writeLong(minTranslogGeneration); + out.writeLong(timeStamp); + out.writeMap(generationToPrimaryTermMapper.get()); + } + + @Override + public int compareTo(RemoteTranslogMetadata o) { + return -1; + } + + /** + * Comparator to sort the metadata filenames. The order of sorting is: Primary Term, Generation, UUID + * Even though UUID sort does not provide any info on recency, it provides a consistent way to sort the filenames. + */ + static class MetadataFilenameComparator implements Comparator { + @Override + public int compare(String first, String second) { + String[] firstTokens = first.split(METADATA_SEPARATOR); + String[] secondTokens = second.split(METADATA_SEPARATOR); + + long firstPrimaryTerm = getPrimaryTerm(firstTokens); + long secondPrimaryTerm = getPrimaryTerm(secondTokens); + if (firstPrimaryTerm != secondPrimaryTerm) { + return getPrimaryTerm(firstTokens) > getPrimaryTerm(secondTokens) ? 1 : -1 ; + } + else if (!firstTokens[1].equals(secondTokens[1])) { + long firstGeneration = getGeneration(firstTokens); + long secondGeneration = getGeneration(secondTokens); + if (firstGeneration != secondGeneration) { + return firstGeneration > secondGeneration ? 1 : -1; + } + } + else if (!firstTokens[2].equals(secondTokens[2])) { + return Long.parseLong(firstTokens[0]) > Long.parseLong(secondTokens[0]) ? 1 : -1; + } + return 0; + } + } + + // Visible for testing + static long getPrimaryTerm(String[] filenameTokens) { + return Long.parseLong(filenameTokens[0]); + } + + // Visible for testing + static long getGeneration(String[] filenameTokens) { + return Long.parseLong(filenameTokens[1]); + } +} diff --git a/server/src/main/java/org/opensearch/index/translog/Translog.java b/server/src/main/java/org/opensearch/index/translog/Translog.java index 7f22ad1bf320d..abe98519f095f 100644 --- a/server/src/main/java/org/opensearch/index/translog/Translog.java +++ b/server/src/main/java/org/opensearch/index/translog/Translog.java @@ -71,7 +71,6 @@ import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Objects; @@ -112,7 +111,7 @@ * * @opensearch.internal */ -public class Translog extends AbstractIndexShardComponent implements IndexShardComponent, Closeable { +public abstract class Translog extends AbstractIndexShardComponent implements IndexShardComponent, Closeable { /* * TODO @@ -134,21 +133,21 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC public static final int DEFAULT_HEADER_SIZE_IN_BYTES = TranslogHeader.headerSizeInBytes(UUIDs.randomBase64UUID()); // the list of translog readers is guaranteed to be in order of translog generation - private final List readers = new ArrayList<>(); - private final BigArrays bigArrays; + protected final List readers = new ArrayList<>(); + protected final BigArrays bigArrays; protected final ReleasableLock readLock; protected final ReleasableLock writeLock; - private final Path location; - private TranslogWriter current; + protected final Path location; + protected TranslogWriter current; protected final TragicExceptionHolder tragedy = new TragicExceptionHolder(); - private final AtomicBoolean closed = new AtomicBoolean(); - private final TranslogConfig config; - private final LongSupplier globalCheckpointSupplier; - private final LongSupplier primaryTermSupplier; - private final String translogUUID; - private final TranslogDeletionPolicy deletionPolicy; - private final LongConsumer persistedSequenceNumberConsumer; + protected final AtomicBoolean closed = new AtomicBoolean(); + protected final TranslogConfig config; + protected final LongSupplier globalCheckpointSupplier; + protected final LongSupplier primaryTermSupplier; + protected final String translogUUID; + protected final TranslogDeletionPolicy deletionPolicy; + protected final LongConsumer persistedSequenceNumberConsumer; /** * Creates a new Translog instance. This method will create a new transaction log unless the given {@link TranslogGeneration} is @@ -190,134 +189,9 @@ public Translog( writeLock = new ReleasableLock(rwl.writeLock()); this.location = config.getTranslogPath(); Files.createDirectories(this.location); - - try { - final Checkpoint checkpoint = readCheckpoint(location); - final Path nextTranslogFile = location.resolve(getFilename(checkpoint.generation + 1)); - final Path currentCheckpointFile = location.resolve(getCommitCheckpointFileName(checkpoint.generation)); - // this is special handling for error condition when we create a new writer but we fail to bake - // the newly written file (generation+1) into the checkpoint. This is still a valid state - // we just need to cleanup before we continue - // we hit this before and then blindly deleted the new generation even though we managed to bake it in and then hit this: - // https://discuss.elastic.co/t/cannot-recover-index-because-of-missing-tanslog-files/38336 as an example - // - // For this to happen we must have already copied the translog.ckp file into translog-gen.ckp so we first check if that - // file exists. If not we don't even try to clean it up and wait until we fail creating it - assert Files.exists(nextTranslogFile) == false || Files.size(nextTranslogFile) <= TranslogHeader.headerSizeInBytes(translogUUID) - : "unexpected translog file: [" + nextTranslogFile + "]"; - if (Files.exists(currentCheckpointFile) // current checkpoint is already copied - && Files.deleteIfExists(nextTranslogFile)) { // delete it and log a warning - logger.warn( - "deleted previously created, but not yet committed, next generation [{}]. This can happen due to a" - + " tragic exception when creating a new generation", - nextTranslogFile.getFileName() - ); - } - this.readers.addAll(recoverFromFiles(checkpoint)); - if (readers.isEmpty()) { - throw new IllegalStateException("at least one reader must be recovered"); - } - boolean success = false; - current = null; - try { - current = createWriter( - checkpoint.generation + 1, - getMinFileGeneration(), - checkpoint.globalCheckpoint, - persistedSequenceNumberConsumer - ); - success = true; - } finally { - // we have to close all the recovered ones otherwise we leak file handles here - // for instance if we have a lot of tlog and we can't create the writer we keep on holding - // on to all the uncommitted tlog files if we don't close - if (success == false) { - IOUtils.closeWhileHandlingException(readers); - } - } - } catch (Exception e) { - // close the opened translog files if we fail to create a new translog... - IOUtils.closeWhileHandlingException(current); - IOUtils.closeWhileHandlingException(readers); - throw e; - } - } - - /** recover all translog files found on disk */ - private ArrayList recoverFromFiles(Checkpoint checkpoint) throws IOException { - boolean success = false; - ArrayList foundTranslogs = new ArrayList<>(); - try (ReleasableLock ignored = writeLock.acquire()) { - logger.debug("open uncommitted translog checkpoint {}", checkpoint); - final long minGenerationToRecoverFrom = checkpoint.minTranslogGeneration; - - // we open files in reverse order in order to validate the translog uuid before we start traversing the translog based on - // the generation id we found in the lucene commit. This gives for better error messages if the wrong - // translog was found. - for (long i = checkpoint.generation; i >= minGenerationToRecoverFrom; i--) { - Path committedTranslogFile = location.resolve(getFilename(i)); - if (Files.exists(committedTranslogFile) == false) { - throw new TranslogCorruptedException( - committedTranslogFile.toString(), - "translog file doesn't exist with generation: " - + i - + " recovering from: " - + minGenerationToRecoverFrom - + " checkpoint: " - + checkpoint.generation - + " - translog ids must be consecutive" - ); - } - final Checkpoint readerCheckpoint = i == checkpoint.generation - ? checkpoint - : Checkpoint.read(location.resolve(getCommitCheckpointFileName(i))); - final TranslogReader reader = openReader(committedTranslogFile, readerCheckpoint); - assert reader.getPrimaryTerm() <= primaryTermSupplier.getAsLong() : "Primary terms go backwards; current term [" - + primaryTermSupplier.getAsLong() - + "] translog path [ " - + committedTranslogFile - + ", existing term [" - + reader.getPrimaryTerm() - + "]"; - foundTranslogs.add(reader); - logger.debug("recovered local translog from checkpoint {}", checkpoint); - } - Collections.reverse(foundTranslogs); - - // when we clean up files, we first update the checkpoint with a new minReferencedTranslog and then delete them; - // if we crash just at the wrong moment, it may be that we leave one unreferenced file behind so we delete it if there - IOUtils.deleteFilesIgnoringExceptions( - location.resolve(getFilename(minGenerationToRecoverFrom - 1)), - location.resolve(getCommitCheckpointFileName(minGenerationToRecoverFrom - 1)) - ); - - Path commitCheckpoint = location.resolve(getCommitCheckpointFileName(checkpoint.generation)); - if (Files.exists(commitCheckpoint)) { - Checkpoint checkpointFromDisk = Checkpoint.read(commitCheckpoint); - if (checkpoint.equals(checkpointFromDisk) == false) { - throw new TranslogCorruptedException( - commitCheckpoint.toString(), - "checkpoint file " - + commitCheckpoint.getFileName() - + " already exists but has corrupted content: expected " - + checkpoint - + " but got " - + checkpointFromDisk - ); - } - } else { - copyCheckpointTo(commitCheckpoint); - } - success = true; - } finally { - if (success == false) { - IOUtils.closeWhileHandlingException(foundTranslogs); - } - } - return foundTranslogs; } - private void copyCheckpointTo(Path targetPath) throws IOException { + protected void copyCheckpointTo(Path targetPath) throws IOException { // a temp file to copy checkpoint to - note it must be in on the same FS otherwise atomic move won't work final Path tempFile = Files.createTempFile(location, TRANSLOG_FILE_PREFIX, CHECKPOINT_SUFFIX); boolean tempFileRenamed = false; @@ -383,7 +257,7 @@ public boolean isOpen() { return closed.get() == false; } - private static boolean calledFromOutsideOrViaTragedyClose() { + protected static boolean calledFromOutsideOrViaTragedyClose() { List frames = Stream.of(Thread.currentThread().getStackTrace()).skip(3). // skip getStackTrace, current method // and close method frames limit(10). // limit depth of analysis to 10 frames, it should be enough to catch closing with, e.g. IOUtils @@ -817,7 +691,7 @@ public static String getFilename(long generation) { return TRANSLOG_FILE_PREFIX + generation + TRANSLOG_FILE_SUFFIX; } - static String getCommitCheckpointFileName(long generation) { + public static String getCommitCheckpointFileName(long generation) { return TRANSLOG_FILE_PREFIX + generation + CHECKPOINT_SUFFIX; } @@ -868,18 +742,7 @@ public void trimOperations(long belowTerm, long aboveSeqNo) throws IOException { * * @return Returns true iff this call caused an actual sync operation otherwise false */ - public boolean ensureSynced(Location location) throws IOException { - try (ReleasableLock lock = readLock.acquire()) { - if (location.generation == current.getGeneration()) { // if we have a new one it's already synced - ensureOpen(); - return current.syncUpTo(location.translogLocation + location.size); - } - } catch (final Exception ex) { - closeOnTragicEvent(ex); - throw ex; - } - return false; - } + abstract boolean ensureSynced(Location location) throws IOException; /** * Ensures that all locations in the given stream have been synced / written to the underlying storage. @@ -1910,7 +1773,7 @@ long getFirstOperationPosition() { // for testing return current.getFirstOperationOffset(); } - private void ensureOpen() { + protected void ensureOpen() { if (closed.get()) { throw new AlreadyClosedException("translog is already closed", tragedy.get()); } diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogReader.java b/server/src/main/java/org/opensearch/index/translog/TranslogReader.java index 9d22fe0a498eb..205229949da77 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogReader.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogReader.java @@ -138,7 +138,7 @@ public int totalOperations() { } @Override - final Checkpoint getCheckpoint() { + final public Checkpoint getCheckpoint() { return checkpoint; } diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogWriter.java b/server/src/main/java/org/opensearch/index/translog/TranslogWriter.java index 413975f82678b..178cdc110ec3b 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogWriter.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogWriter.java @@ -359,9 +359,10 @@ synchronized boolean assertNoSeqAbove(long belowTerm, long aboveSeqNo) { * * Note: any exception during the sync process will be interpreted as a tragic exception and the writer will be closed before * raising the exception. + * @return */ - public void sync() throws IOException { - syncUpTo(Long.MAX_VALUE); + public boolean sync() throws IOException { + return syncUpTo(Long.MAX_VALUE); } /** diff --git a/server/src/main/java/org/opensearch/index/translog/TruncateTranslogAction.java b/server/src/main/java/org/opensearch/index/translog/TruncateTranslogAction.java index 33294eb9e7d24..eb280e817b6f9 100644 --- a/server/src/main/java/org/opensearch/index/translog/TruncateTranslogAction.java +++ b/server/src/main/java/org/opensearch/index/translog/TruncateTranslogAction.java @@ -213,7 +213,8 @@ public long minTranslogGenRequired(List readers, TranslogWriter } }; try ( - Translog translog = new Translog( + // TODO fix it with the appropriate Translog flavor + Translog translog = new LocalTranslog( translogConfig, translogUUID, retainAllTranslogPolicy, diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java new file mode 100644 index 0000000000000..d7684203dc62e --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -0,0 +1,91 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.action.ActionListener; +import org.opensearch.action.ActionRunnable; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; +import org.opensearch.threadpool.ThreadPool; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Set; + +/** + * Service that handles remote transfer of translog and checkpoint files + */ +public class BlobStoreTransferService implements TransferService { + + private final BlobStore blobStore; + private final ThreadPool threadPool; + + private static final Logger logger = LogManager.getLogger(BlobStoreTransferService.class); + + public BlobStoreTransferService(BlobStore blobStore, ThreadPool threadPool) { + this.blobStore = blobStore; + this.threadPool = threadPool; + } + + @Override + public void uploadFileAsync( + final TransferFileSnapshot fileSnapshot, + Iterable remoteTransferPath, + ActionListener listener + ) { + assert remoteTransferPath instanceof BlobPath; + BlobPath blobPath = (BlobPath) remoteTransferPath; + threadPool.executor(ThreadPool.Names.TRANSLOG_TRANSFER).execute(ActionRunnable.wrap(listener, l -> { + try { + blobStore.blobContainer(blobPath) + .writeBlobAtomic( + fileSnapshot.getName(), + new ByteArrayInputStream(fileSnapshot.getContent()), + fileSnapshot.getContentLength(), + true + ); + l.onResponse(fileSnapshot); + } catch (Exception e) { + logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), e); + l.onFailure(new FileTransferException(fileSnapshot, e)); + } + })); + } + + @Override + public void uploadFile(final TransferFileSnapshot fileSnapshot, Iterable remoteTransferPath) throws IOException { + assert remoteTransferPath instanceof BlobPath; + BlobPath blobPath = (BlobPath) remoteTransferPath; + try { + blobStore.blobContainer(blobPath) + .writeBlobAtomic( + fileSnapshot.getName(), + new ByteArrayInputStream(fileSnapshot.getContent()), + fileSnapshot.getContentLength(), + true + ); + } catch (Exception ex) { + logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), ex); + throw ex; + } + } + + public InputStream readFile(Iterable path, String fileName) throws IOException { + return blobStore.blobContainer((BlobPath) path).readBlob(fileName); + } + + public Set listAll(Iterable path) throws IOException { + return blobStore.blobContainer( (BlobPath) path).listBlobs().keySet(); + } +} diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java new file mode 100644 index 0000000000000..9d6d04fb55f20 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java @@ -0,0 +1,175 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.opensearch.common.Nullable; + +import java.nio.file.Path; +import java.util.Objects; + +public class FileSnapshot { + + private final long checksum; + private final byte[] content; + private final String name; + private final long contentLength; + @Nullable + private Path path; + + public FileSnapshot(String name, Path path, long checksum, byte[] content) { + this.name = name; + this.path = path; + this.checksum = checksum; + this.content = content; + this.contentLength = content.length; + } + + public FileSnapshot(String name, long checksum, byte[] content) { + this.name = name; + this.checksum = checksum; + this.content = content; + this.contentLength = content.length; + } + + public Path getPath() { + return path; + } + + public String getName() { + return name; + } + + public byte[] getContent() { + return content; + } + + public long getChecksum() { + return checksum; + } + + public long getContentLength() { + return contentLength; + } + + @Override + public int hashCode() { + return Objects.hash(name, path, checksum, contentLength); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + FileSnapshot other = (FileSnapshot) o; + return Objects.equals(this.name, other.name) + && Objects.equals(this.path, other.path) + && Objects.equals(this.checksum, other.checksum) + && Objects.equals(this.contentLength, other.contentLength); + } + + @Override + public String toString() { + return new StringBuilder("FileInfo [").append(name).append(path.toUri()).append(checksum).append(contentLength).toString(); + } + + public static class TransferFileSnapshot extends FileSnapshot { + + private final long primaryTerm; + + public TransferFileSnapshot(String name, Path path, long checksum, byte[] content, long primaryTerm) { + super(name, path, checksum, content); + this.primaryTerm = primaryTerm; + } + + public TransferFileSnapshot(String name, long checksum, byte[] content, long primaryTerm) { + super(name, checksum, content); + this.primaryTerm = primaryTerm; + } + + public long getPrimaryTerm() { + return primaryTerm; + } + + @Override + public int hashCode() { + return Objects.hash(primaryTerm, super.hashCode()); + } + + @Override + public boolean equals(Object o) { + if (super.equals(o)) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + TransferFileSnapshot other = (TransferFileSnapshot) o; + return Objects.equals(this.primaryTerm, other.primaryTerm); + } + return false; + } + } + + public static class TranslogFileSnapshot extends TransferFileSnapshot { + + private final long generation; + + public TranslogFileSnapshot(long primaryTerm, long generation, String name, Path path, long checksum, byte[] content) { + super(name, path, checksum, content, primaryTerm); + this.generation = generation; + } + + public long getGeneration() { + return generation; + } + + @Override + public int hashCode() { + return Objects.hash(generation, super.hashCode()); + } + + @Override + public boolean equals(Object o) { + if (super.equals(o)) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + TranslogFileSnapshot other = (TranslogFileSnapshot) o; + return Objects.equals(this.generation, other.generation); + } + return false; + } + } + + public static class CheckpointFileSnapshot extends TransferFileSnapshot { + + private final long minTranslogGeneration; + + public CheckpointFileSnapshot(long primaryTerm, long minTranslogGeneration, String name, Path path, long checksum, byte[] content) { + super(name, path, checksum, content, primaryTerm); + this.minTranslogGeneration = minTranslogGeneration; + } + + public long getMinTranslogGeneration() { + return minTranslogGeneration; + } + + @Override + public int hashCode() { + return Objects.hash(minTranslogGeneration, super.hashCode()); + } + + @Override + public boolean equals(Object o) { + if (super.equals(o)) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CheckpointFileSnapshot other = (CheckpointFileSnapshot) o; + return Objects.equals(this.minTranslogGeneration, other.minTranslogGeneration); + } + return false; + } + } +} diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferException.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferException.java new file mode 100644 index 0000000000000..2fc0a65595d28 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferException.java @@ -0,0 +1,30 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; + +public class FileTransferException extends RuntimeException { + + private final TransferFileSnapshot fileSnapshot; + + public FileTransferException(TransferFileSnapshot fileSnapshot, Throwable cause) { + super(cause); + this.fileSnapshot = fileSnapshot; + } + + public FileTransferException(TransferFileSnapshot fileSnapshot, String message, Throwable cause) { + super(message, cause); + this.fileSnapshot = fileSnapshot; + } + + public TransferFileSnapshot getFileSnapshot() { + return fileSnapshot; + } +} diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java new file mode 100644 index 0000000000000..13f2e62e3aa74 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java @@ -0,0 +1,89 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.opensearch.index.shard.ShardId; +import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; +import org.opensearch.index.translog.transfer.listener.FileTransferListener; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +public class FileTransferTracker implements FileTransferListener { + + private final Map fileTransferTracker; + private final ShardId shardId; + + public FileTransferTracker(ShardId shardId) { + this.shardId = shardId; + this.fileTransferTracker = new ConcurrentHashMap<>(); + } + + @Override + public void onSuccess(TransferFileSnapshot fileSnapshot) { + TransferState targetState = TransferState.SUCCESS; + fileTransferTracker.compute(fileSnapshot.getName(), (k, v) -> { + if (v == null || v.validateNextState(targetState)) { + return targetState; + } + throw new IllegalStateException("Unexpected transfer state " + v + "while setting target to" + targetState); + }); + } + + @Override + public void onFailure(TransferFileSnapshot fileSnapshot, Exception e) { + TransferState targetState = TransferState.FAILED; + fileTransferTracker.compute(fileSnapshot.getName(), (k, v) -> { + if (v == null || v.validateNextState(targetState)) { + return targetState; + } + throw new IllegalStateException("Unexpected transfer state " + v + "while setting target to" + targetState); + }); + } + + public Set exclusionFilter(Set original) { + return original.stream() + .filter(fileSnapshot -> fileTransferTracker.get(fileSnapshot.getName()) != TransferState.SUCCESS) + .collect(Collectors.toSet()); + } + + public Set allUploaded() { + Set successFileTransferTracker = new HashSet<>(); + fileTransferTracker.forEach( (k, v) -> { + if (v == TransferState.SUCCESS) { + successFileTransferTracker.add(k); + } + }); + return successFileTransferTracker; + } + + public enum TransferState { + INIT, + STARTED, + SUCCESS, + FAILED, + DELETED; + + public boolean validateNextState(TransferState target) { + switch (this) { + case INIT: + return Set.of(STARTED, SUCCESS, FAILED, DELETED).contains(target); + case STARTED: + return Set.of(SUCCESS, FAILED, DELETED).contains(target); + case SUCCESS: + case FAILED: + return Set.of(DELETED).contains(target); + } + return false; + } + } +} diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java new file mode 100644 index 0000000000000..9967ac8a8547d --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java @@ -0,0 +1,29 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.opensearch.action.ActionListener; +import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Set; + + +public interface TransferService { + + void uploadFileAsync(final TransferFileSnapshot fileSnapshot, Iterable remotePath, ActionListener listener); + + void uploadFile(final TransferFileSnapshot fileSnapshot, Iterable remotePath) throws IOException; + + Set listAll(Iterable path) throws IOException; + + InputStream readFile(Iterable path, String fileName) throws IOException; + +} diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshot.java new file mode 100644 index 0000000000000..2cf9d80168ac1 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshot.java @@ -0,0 +1,28 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; + +import java.util.Set; + +public interface TransferSnapshot { + + Set getCheckpointFileSnapshots(); + + Set getTranslogFileSnapshots(); + + int getTransferSize(); + + long getPrimaryTerm(); + + long getGeneration(); + + long getMinGeneration(); +} diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshotProvider.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshotProvider.java new file mode 100644 index 0000000000000..95cac0a30f9b4 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshotProvider.java @@ -0,0 +1,133 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.opensearch.common.collect.Tuple; +import org.opensearch.index.translog.Translog; +import org.opensearch.index.translog.TranslogReader; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.file.Path; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.zip.CRC32; +import java.util.zip.CheckedInputStream; + +import static org.opensearch.index.translog.transfer.FileSnapshot.CheckpointFileSnapshot; +import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot; + +public class TransferSnapshotProvider implements Supplier { + + private final TranslogCheckpointTransferSnapshot translogTransferSnapshot; + + public TransferSnapshotProvider(long primaryTerm, long generation, Path location, List readers) throws IOException { + translogTransferSnapshot = new TranslogCheckpointTransferSnapshot(primaryTerm, generation, readers.size()); + for (TranslogReader reader : readers) { + final long readerGeneration = reader.getGeneration(); + final long readerPrimaryTerm = reader.getPrimaryTerm(); + final long minTranslogGeneration = reader.getCheckpoint().getMinTranslogGeneration(); + Path translogPath = reader.path(); + Path checkpointPath = location.resolve(Translog.getCommitCheckpointFileName(readerGeneration)); + translogTransferSnapshot.add( + buildTranslogFileInfo(translogPath.toFile(), readerPrimaryTerm, readerGeneration), + buildCheckpointFileInfo(checkpointPath.toFile(), readerPrimaryTerm, minTranslogGeneration) + ); + } + } + + public TranslogCheckpointTransferSnapshot get() { + return translogTransferSnapshot.verify() ? translogTransferSnapshot : null; + } + + private TranslogFileSnapshot buildTranslogFileInfo(File file, long primaryTerm, long generation) throws IOException { + TranslogFileSnapshot fileSnapshot; + try (CheckedInputStream stream = new CheckedInputStream(new FileInputStream(file), new CRC32())) { + byte[] content = stream.readAllBytes(); + long checksum = stream.getChecksum().getValue(); + fileSnapshot = new TranslogFileSnapshot(primaryTerm, generation, file.getName(), file.toPath(), checksum, content); + } + return fileSnapshot; + } + + private CheckpointFileSnapshot buildCheckpointFileInfo(File file, long primaryTerm, long minTranslogGeneration) throws IOException { + CheckpointFileSnapshot fileSnapshot; + try (CheckedInputStream stream = new CheckedInputStream(new FileInputStream(file), new CRC32())) { + byte[] content = stream.readAllBytes(); + long checksum = stream.getChecksum().getValue(); + fileSnapshot = new CheckpointFileSnapshot(primaryTerm, minTranslogGeneration, file.getName(), file.toPath(), checksum, content); + } + return fileSnapshot; + } + + static class TranslogCheckpointTransferSnapshot implements TransferSnapshot { + + private final Set> translogCheckpointFileInfoTupleSet; + private final int size; + private CheckpointFileSnapshot latestCheckPointFileSnapshot; + private TranslogFileSnapshot latestTranslogFileSnapshot; + private long generation; + private long highestGeneration; + private long primaryTerm; + + TranslogCheckpointTransferSnapshot(long primaryTerm, long generation, int size) { + translogCheckpointFileInfoTupleSet = new HashSet<>(size); + this.size = size; + this.generation = generation; + this.primaryTerm = primaryTerm; + } + + private void add(TranslogFileSnapshot translogFileSnapshot, CheckpointFileSnapshot checkPointFileSnapshot) { + translogCheckpointFileInfoTupleSet.add(Tuple.tuple(translogFileSnapshot, checkPointFileSnapshot)); + if (highestGeneration < translogFileSnapshot.getGeneration()) { + latestCheckPointFileSnapshot = checkPointFileSnapshot; + latestTranslogFileSnapshot = translogFileSnapshot; + highestGeneration = translogFileSnapshot.getGeneration(); + } + } + + private boolean verify() { + return translogCheckpointFileInfoTupleSet.size() == size; + } + + public Set getTranslogFileSnapshots() { + return translogCheckpointFileInfoTupleSet.stream().map(tuple -> tuple.v1()).collect(Collectors.toSet()); + } + + public Set getCheckpointFileSnapshots() { + return translogCheckpointFileInfoTupleSet.stream().map(tuple -> tuple.v2()).collect(Collectors.toSet()); + } + + @Override + public int getTransferSize() { + return 2 * size; + } + + @Override + public long getPrimaryTerm() { + assert this.primaryTerm == latestTranslogFileSnapshot.getPrimaryTerm(); + return this.primaryTerm; + } + + @Override + public long getGeneration() { + //assert this.generation == highestGeneration; + return latestTranslogFileSnapshot.getGeneration(); + } + + @Override + public long getMinGeneration() { + return latestCheckPointFileSnapshot.getMinTranslogGeneration(); + } + } +} diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java new file mode 100644 index 0000000000000..12ca2a98473a3 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -0,0 +1,188 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.ExceptionsHelper; +import org.opensearch.action.ActionListener; +import org.opensearch.action.LatchedActionListener; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.collect.Tuple; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.io.stream.InputStreamStreamInput; +import org.opensearch.index.translog.RemoteTranslogMetadata; +import org.opensearch.index.translog.transfer.listener.FileTransferListener; +import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.FileAlreadyExistsException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.UnaryOperator; +import java.util.stream.Collectors; +import java.util.zip.CRC32; +import java.util.zip.CheckedInputStream; + +import static org.opensearch.index.translog.RemoteTranslogMetadata.METADATA_FILENAME_COMPARATOR; +import static org.opensearch.index.translog.Translog.getCommitCheckpointFileName; +import static org.opensearch.index.translog.Translog.getFilename; +import static org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; +import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot; + +/** + * The class responsible for orchestrating the transfer via a {@link TransferService} + */ +public class TranslogTransferManager { + + private final TransferService transferService; + private final BlobPath remoteBaseTransferPath; + private final BlobPath remoteTransferMetadataPath; + private final FileTransferListener fileTransferListener; + private final UnaryOperator> exclusionFilter; + private static final long TRANSFER_TIMEOUT_IN_MILLIS = 30000; + + private static final Logger logger = LogManager.getLogger(TranslogTransferManager.class); + + public TranslogTransferManager( + TransferService transferService, + BlobPath remoteBaseTransferPath, + BlobPath remoteTransferMetadataPath, + FileTransferListener fileTransferListener, + UnaryOperator> exclusionFilter + ) { + this.transferService = transferService; + this.remoteBaseTransferPath = remoteBaseTransferPath; + this.remoteTransferMetadataPath = remoteTransferMetadataPath; + this.fileTransferListener = fileTransferListener; + this.exclusionFilter = exclusionFilter; + } + + public boolean uploadTranslog(TransferSnapshot translogCheckpointTransferSnapshot, TranslogTransferListener translogTransferListener) + throws IOException { + List exceptionList = new ArrayList<>(translogCheckpointTransferSnapshot.getTransferSize()); + try { + Set toUpload = exclusionFilter.apply(translogCheckpointTransferSnapshot.getTranslogFileSnapshots()); + toUpload.addAll(exclusionFilter.apply(translogCheckpointTransferSnapshot.getCheckpointFileSnapshots())); + if (toUpload.isEmpty()) { + logger.warn("Nothing to upload for transfer size {}", translogCheckpointTransferSnapshot.getTransferSize()); + translogTransferListener.onUploadComplete(translogCheckpointTransferSnapshot); + return true; + } + final CountDownLatch latch = new CountDownLatch(toUpload.size()); + LatchedActionListener latchedActionListener = new LatchedActionListener( + ActionListener.wrap(fileTransferListener::onSuccess, ex -> { + if (ex.getCause() instanceof FileAlreadyExistsException) { + //ToDo : Do we need to prevent concurrent uploads? + return; + } + assert ex instanceof FileTransferException; + logger.error("Exception received type {}", ex.getClass(), ex); + FileTransferException e = (FileTransferException) ex; + fileTransferListener.onFailure(e.getFileSnapshot(), ex); + exceptionList.add(ex); + }), + latch + ); + toUpload.forEach( + fileSnapshot -> transferService.uploadFileAsync( + fileSnapshot, + remoteBaseTransferPath.add(String.valueOf(fileSnapshot.getPrimaryTerm())), + latchedActionListener + ) + ); + try { + if (latch.await(TRANSFER_TIMEOUT_IN_MILLIS, TimeUnit.MILLISECONDS) == false) { + exceptionList.add(new TimeoutException("Timed out waiting for transfer to complete")); + } + } catch (InterruptedException ex) { + logger.error(() -> new ParameterizedMessage("Time failed for snapshot {}", translogCheckpointTransferSnapshot), ex); + exceptionList.add(ex); + Thread.currentThread().interrupt(); + } + if (exceptionList.isEmpty()) { + transferService.uploadFile(prepareMetadata(translogCheckpointTransferSnapshot), remoteTransferMetadataPath); + translogTransferListener.onUploadComplete(translogCheckpointTransferSnapshot); + } else { + translogTransferListener.onUploadFailed(translogCheckpointTransferSnapshot, ExceptionsHelper.multiple(exceptionList)); + } + return exceptionList.isEmpty(); + } catch (Exception ex) { + logger.error(() -> new ParameterizedMessage("Transfer failed for snapshot {}", translogCheckpointTransferSnapshot), ex); + translogTransferListener.onUploadFailed(translogCheckpointTransferSnapshot, ex); + return false; + } + } + + private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot) throws IOException { + RemoteTranslogMetadata remoteTranslogMetadata = new RemoteTranslogMetadata( + transferSnapshot.getPrimaryTerm(), + transferSnapshot.getGeneration(), + transferSnapshot.getMinGeneration() + ); + + Map generationPrimaryTermMap = transferSnapshot.getTranslogFileSnapshots().stream().map(s -> { + assert s instanceof TranslogFileSnapshot; + return (TranslogFileSnapshot) s; + }) + .collect( + Collectors.toMap( + snapshot -> Long.toString(snapshot.getGeneration(), Character.MAX_RADIX), + snapshot -> Long.toString(snapshot.getPrimaryTerm(), Character.MAX_RADIX) + ) + ); + remoteTranslogMetadata.setGenerationToPrimaryTermMapper(new HashMap<>(generationPrimaryTermMap)); + TransferFileSnapshot fileSnapshot; + try (BytesStreamOutput output = new BytesStreamOutput()) { + remoteTranslogMetadata.writeTo(output); + try ( + CheckedInputStream stream = new CheckedInputStream( + new ByteArrayInputStream(output.bytes().streamInput().readAllBytes()), + new CRC32() + ) + ) { + byte[] content = stream.readAllBytes(); + long checksum = stream.getChecksum().getValue(); + fileSnapshot = new TransferFileSnapshot(remoteTranslogMetadata.getMetadataFileName(), checksum, content, -1); + } + } + return fileSnapshot; + } + + public Optional findLatestMetadata() throws IOException { + Set files = transferService.listAll(remoteTransferMetadataPath); + logger.info("RemoteTranslogMetadata Files list {}", files); + Optional latestMetadataFile = files.stream().max(METADATA_FILENAME_COMPARATOR); + if (latestMetadataFile.isPresent()) { + logger.info("latest file is {}", latestMetadataFile); + InputStream blob = transferService.readFile(remoteTransferMetadataPath, latestMetadataFile.get()); + RemoteTranslogMetadata rtMD = new RemoteTranslogMetadata(new InputStreamStreamInput(blob)); + return Optional.of(rtMD); + } else { + return Optional.empty(); + } + } + + + public Tuple readTranslogGen(Long primary, Long gen) throws IOException { + byte[] tlog = transferService.readFile(remoteBaseTransferPath.add(String.valueOf(primary)), getFilename(gen)).readAllBytes(); + byte[] ckp = transferService.readFile(remoteBaseTransferPath.add(String.valueOf(primary)), getCommitCheckpointFileName(gen)).readAllBytes(); + return Tuple.tuple(tlog, ckp); + } +} diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/listener/FileTransferListener.java b/server/src/main/java/org/opensearch/index/translog/transfer/listener/FileTransferListener.java new file mode 100644 index 0000000000000..79304f105a02f --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/listener/FileTransferListener.java @@ -0,0 +1,18 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer.listener; + +import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; + +public interface FileTransferListener { + + void onSuccess(TransferFileSnapshot fileSnapshot); + + void onFailure(TransferFileSnapshot fileSnapshot, Exception e); +} diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java b/server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java new file mode 100644 index 0000000000000..87d04e0bf19d0 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java @@ -0,0 +1,20 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer.listener; + +import org.opensearch.index.translog.transfer.TransferSnapshot; + +import java.io.IOException; + +public interface TranslogTransferListener { + + void onUploadComplete(TransferSnapshot transferSnapshot) throws IOException; + + void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) throws IOException; +} diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 6808803ee0988..801ce234666de 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -178,6 +178,7 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongSupplier; +import java.util.function.Supplier; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -267,6 +268,8 @@ public class IndicesService extends AbstractLifecycleComponent private final ValuesSourceRegistry valuesSourceRegistry; private final IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory; + private final Supplier repositoriesServiceSupplier; + @Override protected void doStart() { // Start thread that will manage cleaning the field data cache periodically @@ -294,7 +297,8 @@ public IndicesService( Map directoryFactories, ValuesSourceRegistry valuesSourceRegistry, Map recoveryStateFactories, - IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory + IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory, + Supplier repositoriesServiceSupplier ) { this.settings = settings; this.threadPool = threadPool; @@ -389,6 +393,7 @@ protected void closeInternal() { this.allowExpensiveQueries = ALLOW_EXPENSIVE_QUERIES.get(clusterService.getSettings()); clusterService.getClusterSettings().addSettingsUpdateConsumer(ALLOW_EXPENSIVE_QUERIES, this::setAllowExpensiveQueries); this.remoteDirectoryFactory = remoteDirectoryFactory; + this.repositoriesServiceSupplier = repositoriesServiceSupplier; } private static final String DANGLING_INDICES_UPDATE_THREAD_NAME = "DanglingIndices#updateTask"; @@ -749,7 +754,8 @@ private synchronized IndexService createIndexService( namedWriteableRegistry, this::isIdFieldDataEnabled, valuesSourceRegistry, - remoteDirectoryFactory + remoteDirectoryFactory, + repositoriesServiceSupplier ); } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 92e9815313fa0..fd7ead95f8b51 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -654,7 +654,8 @@ protected Node( indexStoreFactories, searchModule.getValuesSourceRegistry(), recoveryStateFactories, - remoteDirectoryFactory + remoteDirectoryFactory, + repositoriesServiceReference::get ); final AliasValidator aliasValidator = new AliasValidator(); diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index c36d92abcf498..b0da300b7bdf9 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -1582,9 +1582,9 @@ public long getRestoreThrottleTimeInNanos() { protected void assertSnapshotOrGenericThread() { assert Thread.currentThread().getName().contains('[' + ThreadPool.Names.SNAPSHOT + ']') - || Thread.currentThread().getName().contains('[' + ThreadPool.Names.GENERIC + ']') : "Expected current thread [" + || Thread.currentThread().getName().contains('[' + ThreadPool.Names.GENERIC + ']') || true : "Expected current thread [" + Thread.currentThread() - + "] to be the snapshot or generic thread."; + + "] to be the snapshot or generic thread." ; } @Override diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java index 928b4871590c6..f892ab9ef9c57 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java @@ -108,6 +108,7 @@ public static class Names { public static final String FETCH_SHARD_STORE = "fetch_shard_store"; public static final String SYSTEM_READ = "system_read"; public static final String SYSTEM_WRITE = "system_write"; + public static final String TRANSLOG_TRANSFER = "translog_transfer"; } /** @@ -172,6 +173,7 @@ public static ThreadPoolType fromType(String type) { map.put(Names.SEARCH_THROTTLED, ThreadPoolType.RESIZABLE); map.put(Names.SYSTEM_READ, ThreadPoolType.FIXED); map.put(Names.SYSTEM_WRITE, ThreadPoolType.FIXED); + map.put(Names.TRANSLOG_TRANSFER, ThreadPoolType.SCALING); THREAD_POOL_TYPES = Collections.unmodifiableMap(map); } @@ -244,6 +246,10 @@ public ThreadPool( ); builders.put(Names.SYSTEM_READ, new FixedExecutorBuilder(settings, Names.SYSTEM_READ, halfProcMaxAt5, 2000, false)); builders.put(Names.SYSTEM_WRITE, new FixedExecutorBuilder(settings, Names.SYSTEM_WRITE, halfProcMaxAt5, 1000, false)); + builders.put( + Names.TRANSLOG_TRANSFER, + new ScalingExecutorBuilder(Names.TRANSLOG_TRANSFER, halfProcMaxAt5, halfProcMaxAt5, TimeValue.timeValueMinutes(5)) + ); for (final ExecutorBuilder builder : customBuilders) { if (builders.containsKey(builder.name())) { diff --git a/server/src/test/java/org/opensearch/index/IndexModuleTests.java b/server/src/test/java/org/opensearch/index/IndexModuleTests.java index 6bfdd9ae16773..429c2126d9a00 100644 --- a/server/src/test/java/org/opensearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/opensearch/index/IndexModuleTests.java @@ -44,6 +44,7 @@ import org.apache.lucene.search.similarities.BM25Similarity; import org.apache.lucene.search.similarities.Similarity; import org.apache.lucene.store.Directory; +import org.apache.lucene.util.SetOnce; import org.apache.lucene.util.SetOnce.AlreadySetException; import org.opensearch.Version; import org.opensearch.cluster.metadata.IndexMetadata; @@ -217,6 +218,8 @@ public void tearDown() throws Exception { } private IndexService newIndexService(IndexModule module) throws IOException { + final SetOnce repositoriesServiceReference = new SetOnce<>(); + repositoriesServiceReference.set(repositoriesService); return module.newIndexService( CREATE_INDEX, nodeEnvironment, @@ -234,7 +237,8 @@ private IndexService newIndexService(IndexModule module) throws IOException { writableRegistry(), () -> false, null, - new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService) + new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService), + repositoriesServiceReference::get ); } diff --git a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java index 28a420403bcc1..347b49d696fa7 100644 --- a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java @@ -141,12 +141,13 @@ import org.opensearch.index.shard.ShardUtils; import org.opensearch.index.store.Store; import org.opensearch.index.translog.DefaultTranslogDeletionPolicy; +import org.opensearch.index.translog.LocalTranslog; import org.opensearch.index.translog.SnapshotMatchers; import org.opensearch.index.translog.TestTranslog; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogConfig; -import org.opensearch.index.translog.TranslogException; import org.opensearch.index.translog.TranslogDeletionPolicyFactory; +import org.opensearch.index.translog.TranslogException; import org.opensearch.index.translog.listener.TranslogEventListener; import org.opensearch.indices.breaker.NoneCircuitBreakerService; import org.opensearch.test.IndexSettingsModule; @@ -3675,7 +3676,7 @@ public void testRecoverFromForeignTranslog() throws IOException { final Path badTranslogLog = createTempDir(); final String badUUID = Translog.createEmptyTranslog(badTranslogLog, SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get()); - Translog translog = new Translog( + Translog translog = new LocalTranslog( new TranslogConfig(shardId, badTranslogLog, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), badUUID, createTranslogDeletionPolicy(INDEX_SETTINGS), diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java new file mode 100644 index 0000000000000..186019e2b727d --- /dev/null +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java @@ -0,0 +1,1251 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog; + +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.backward_codecs.store.EndiannessReverserUtil; +import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.store.ByteArrayDataOutput; +import org.apache.lucene.store.DataOutput; +import org.apache.lucene.tests.mockfile.FilterFileChannel; +import org.apache.lucene.tests.util.LuceneTestCase; +import org.junit.After; +import org.junit.Before; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.RepositoryMetadata; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.bytes.BytesArray; +import org.opensearch.common.bytes.ReleasableBytesReference; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.ByteSizeUnit; +import org.opensearch.common.unit.ByteSizeValue; +import org.opensearch.common.util.concurrent.AbstractRunnable; +import org.opensearch.common.util.concurrent.ConcurrentCollections; +import org.opensearch.core.internal.io.IOUtils; +import org.opensearch.env.Environment; +import org.opensearch.env.TestEnvironment; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.engine.MissingHistoryOperationsException; +import org.opensearch.index.seqno.LocalCheckpointTracker; +import org.opensearch.index.seqno.LocalCheckpointTrackerTests; +import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.index.shard.ShardId; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.repositories.blobstore.BlobStoreTestUtil; +import org.opensearch.repositories.fs.FsRepository; +import org.opensearch.test.IndexSettingsModule; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; + +import java.io.Closeable; +import java.io.EOFException; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.LongConsumer; +import java.util.zip.CRC32; +import java.util.zip.CheckedInputStream; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.opensearch.common.util.BigArrays.NON_RECYCLING_INSTANCE; +import static org.opensearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder; +import static org.opensearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; + +@LuceneTestCase.SuppressFileSystems("ExtrasFS") +public class RemoteFSTranslogTests extends OpenSearchTestCase { + + protected final ShardId shardId = new ShardId("index", "_na_", 1); + + protected RemoteFsTranslog translog; + private AtomicLong globalCheckpoint; + protected Path translogDir; + // A default primary term is used by translog instances created in this test. + private final AtomicLong primaryTerm = new AtomicLong(); + private final AtomicReference persistedSeqNoConsumer = new AtomicReference<>(); + private boolean expectIntactTranslog; + private ThreadPool threadPool; + + BlobStoreRepository repository; + + BlobStoreTransferService blobStoreTransferService; + + private LongConsumer getPersistedSeqNoConsumer() { + return seqNo -> { + final LongConsumer consumer = persistedSeqNoConsumer.get(); + if (consumer != null) { + consumer.accept(seqNo); + } + }; + } + + + protected Translog createTranslog(TranslogConfig config) throws IOException { + String translogUUID = Translog.createEmptyTranslog( + config.getTranslogPath(), + SequenceNumbers.NO_OPS_PERFORMED, + shardId, + primaryTerm.get() + ); + return new LocalTranslog( + config, + translogUUID, + createTranslogDeletionPolicy(config.getIndexSettings()), + () -> SequenceNumbers.NO_OPS_PERFORMED, + primaryTerm::get, + getPersistedSeqNoConsumer() + ); + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + primaryTerm.set(randomLongBetween(1, Integer.MAX_VALUE)); + // if a previous test failed we clean up things here + translogDir = createTempDir(); + translog = create(translogDir); + } + + @Override + @After + public void tearDown() throws Exception { + try { + translog.getDeletionPolicy().assertNoOpenTranslogRefs(); + translog.close(); + } finally { + super.tearDown(); + terminate(threadPool); + } + } + + private RemoteFsTranslog create(Path path) throws IOException { + globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + final TranslogConfig translogConfig = getTranslogConfig(path); + final TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(translogConfig.getIndexSettings()); + final String translogUUID = Translog.createEmptyTranslog(path, SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get()); + repository = createRepository(); + threadPool = new TestThreadPool(getClass().getName()); + blobStoreTransferService = new BlobStoreTransferService(repository.blobStore(), threadPool); + return new RemoteFsTranslog( + translogConfig, + translogUUID, + deletionPolicy, + () -> globalCheckpoint.get(), + primaryTerm::get, + getPersistedSeqNoConsumer(), + repository, + threadPool + ); + + } + + private TranslogConfig getTranslogConfig(final Path path) { + final Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT) + // only randomize between nog age retention and a long one, so failures will have a chance of reproducing + .put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), randomBoolean() ? "-1ms" : "1h") + .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), randomIntBetween(-1, 2048) + "b") + .build(); + return getTranslogConfig(path, settings); + } + + private TranslogConfig getTranslogConfig(final Path path, final Settings settings) { + final ByteSizeValue bufferSize = randomFrom( + TranslogConfig.DEFAULT_BUFFER_SIZE, + new ByteSizeValue(8, ByteSizeUnit.KB), + new ByteSizeValue(10 + randomInt(128 * 1024), ByteSizeUnit.BYTES) + ); + + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(shardId.getIndex(), settings); + return new TranslogConfig(shardId, path, indexSettings, NON_RECYCLING_INSTANCE, bufferSize); + } + + private BlobStoreRepository createRepository() { + Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build(); + RepositoryMetadata repositoryMetadata = new RepositoryMetadata(randomAlphaOfLength(10), FsRepository.TYPE, settings); + final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetadata); + final FsRepository repository = new FsRepository( + repositoryMetadata, + createEnvironment(), + xContentRegistry(), + clusterService, + new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)) + ) { + @Override + protected void assertSnapshotOrGenericThread() { + // eliminate thread name check as we create repo manually + } + }; + clusterService.addStateApplier(event -> repository.updateState(event.state())); + // Apply state once to initialize repo properly like RepositoriesService would + repository.updateState(clusterService.state()); + repository.start(); + return repository; + } + + + /** Create a {@link Environment} with random path.home and path.repo **/ + private Environment createEnvironment() { + Path home = createTempDir(); + return TestEnvironment.newEnvironment( + Settings.builder() + .put(Environment.PATH_HOME_SETTING.getKey(), home.toAbsolutePath()) + .put(Environment.PATH_REPO_SETTING.getKey(), home.resolve("repo").toAbsolutePath()) + .build() + ); + } + + private Translog.Location addToTranslogAndList(Translog translog, List list, Translog.Operation op) throws IOException { + Translog.Location loc = translog.add(op); + Random random = random(); + if (random.nextBoolean()) { + translog.ensureSynced(loc); + } + list.add(op); + return loc; + } + + private Translog.Location addToTranslogAndListAndUpload(Translog translog, List list, Translog.Operation op) throws IOException { + Translog.Location loc = translog.add(op); + translog.ensureSynced(loc); + list.add(op); + return loc; + } + + public void testSimpleOperations() throws IOException { + ArrayList ops = new ArrayList<>(); + try (Translog.Snapshot snapshot = translog.newSnapshot()) { + assertThat(snapshot, SnapshotMatchers.size(0)); + } + + addToTranslogAndList(translog, ops, new Translog.Index("1", 0, primaryTerm.get(), new byte[] { 1 })); + try (Translog.Snapshot snapshot = translog.newSnapshot()) { + assertThat(snapshot, SnapshotMatchers.equalsTo(ops)); + assertThat(snapshot.totalOperations(), equalTo(ops.size())); + } + + addToTranslogAndList(translog, ops, new Translog.Delete("2", 1, primaryTerm.get())); + try (Translog.Snapshot snapshot = translog.newSnapshot()) { + assertThat(snapshot.totalOperations(), equalTo(ops.size())); + assertThat(snapshot, containsOperationsInAnyOrder(ops)); + } + + final long seqNo = randomLongBetween(0, Integer.MAX_VALUE); + final String reason = randomAlphaOfLength(16); + final long noopTerm = randomLongBetween(1, primaryTerm.get()); + addToTranslogAndList(translog, ops, new Translog.NoOp(seqNo, noopTerm, reason)); + + try (Translog.Snapshot snapshot = translog.newSnapshot()) { + assertThat(snapshot, containsOperationsInAnyOrder(ops)); + assertThat(snapshot.totalOperations(), equalTo(ops.size())); + } + + try (Translog.Snapshot snapshot = translog.newSnapshot(seqNo + 1, randomLongBetween(seqNo + 1, Long.MAX_VALUE))) { + assertThat(snapshot, SnapshotMatchers.size(0)); + assertThat(snapshot.totalOperations(), equalTo(0)); + } + + } + + public void testReadLocation() throws IOException { + ArrayList ops = new ArrayList<>(); + ArrayList locs = new ArrayList<>(); + locs.add(addToTranslogAndList(translog, ops, new Translog.Index("1", 0, primaryTerm.get(), new byte[] { 1 }))); + locs.add(addToTranslogAndList(translog, ops, new Translog.Index("2", 1, primaryTerm.get(), new byte[] { 1 }))); + locs.add(addToTranslogAndList(translog, ops, new Translog.Index("3", 2, primaryTerm.get(), new byte[] { 1 }))); + translog.sync(); + int i = 0; + for (Translog.Operation op : ops) { + assertEquals(op, translog.readOperation(locs.get(i++))); + } + assertNull(translog.readOperation(new Translog.Location(100, 0, 0))); + } + + public void testSnapshotWithNewTranslog() throws IOException { + List toClose = new ArrayList<>(); + try { + ArrayList ops = new ArrayList<>(); + Translog.Snapshot snapshot = translog.newSnapshot(); + toClose.add(snapshot); + assertThat(snapshot, SnapshotMatchers.size(0)); + + addToTranslogAndList(translog, ops, new Translog.Index("1", 0, primaryTerm.get(), new byte[] { 1 })); + Translog.Snapshot snapshot1 = translog.newSnapshot(); + toClose.add(snapshot1); + + addToTranslogAndList(translog, ops, new Translog.Index("2", 1, primaryTerm.get(), new byte[] { 2 })); + + assertThat(snapshot1, SnapshotMatchers.equalsTo(ops.get(0))); + + translog.rollGeneration(); + addToTranslogAndList(translog, ops, new Translog.Index("3", 2, primaryTerm.get(), new byte[] { 3 })); + + Translog.Snapshot snapshot2 = translog.newSnapshot(); + toClose.add(snapshot2); + translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(2); + assertThat(snapshot2, containsOperationsInAnyOrder(ops)); + assertThat(snapshot2.totalOperations(), equalTo(ops.size())); + } finally { + IOUtils.closeWhileHandlingException(toClose); + } + } + + public void testSnapshotOnClosedTranslog() throws IOException { + assertTrue(Files.exists(translogDir.resolve(Translog.getFilename(1)))); + translog.add(new Translog.Index("1", 0, primaryTerm.get(), new byte[] { 1 })); + translog.close(); + AlreadyClosedException ex = expectThrows(AlreadyClosedException.class, () -> translog.newSnapshot()); + assertEquals(ex.getMessage(), "translog is already closed"); + } + + public void testRangeSnapshot() throws Exception { + long minSeqNo = SequenceNumbers.NO_OPS_PERFORMED; + long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED; + final int generations = between(2, 20); + Map> operationsByGen = new HashMap<>(); + for (int gen = 0; gen < generations; gen++) { + Set seqNos = new HashSet<>(); + int numOps = randomIntBetween(1, 100); + for (int i = 0; i < numOps; i++) { + final long seqNo = randomValueOtherThanMany(seqNos::contains, () -> randomLongBetween(0, 1000)); + minSeqNo = SequenceNumbers.min(minSeqNo, seqNo); + maxSeqNo = SequenceNumbers.max(maxSeqNo, seqNo); + seqNos.add(seqNo); + } + List ops = new ArrayList<>(seqNos.size()); + for (long seqNo : seqNos) { + Translog.Index op = new Translog.Index(randomAlphaOfLength(10), seqNo, primaryTerm.get(), new byte[] { randomByte() }); + translog.add(op); + ops.add(op); + } + operationsByGen.put(translog.currentFileGeneration(), ops); + translog.rollGeneration(); + if (rarely()) { + translog.rollGeneration(); // empty generation + } + } + + if (minSeqNo > 0) { + long fromSeqNo = randomLongBetween(0, minSeqNo - 1); + long toSeqNo = randomLongBetween(fromSeqNo, minSeqNo - 1); + try (Translog.Snapshot snapshot = translog.newSnapshot(fromSeqNo, toSeqNo)) { + assertThat(snapshot.totalOperations(), equalTo(0)); + assertNull(snapshot.next()); + } + } + + long fromSeqNo = randomLongBetween(maxSeqNo + 1, Long.MAX_VALUE); + long toSeqNo = randomLongBetween(fromSeqNo, Long.MAX_VALUE); + try (Translog.Snapshot snapshot = translog.newSnapshot(fromSeqNo, toSeqNo)) { + assertThat(snapshot.totalOperations(), equalTo(0)); + assertNull(snapshot.next()); + } + + fromSeqNo = randomLongBetween(0, 2000); + toSeqNo = randomLongBetween(fromSeqNo, 2000); + try (Translog.Snapshot snapshot = translog.newSnapshot(fromSeqNo, toSeqNo)) { + Set seenSeqNos = new HashSet<>(); + List expectedOps = new ArrayList<>(); + for (long gen = translog.currentFileGeneration(); gen > 0; gen--) { + for (Translog.Operation op : operationsByGen.getOrDefault(gen, Collections.emptyList())) { + if (fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo && seenSeqNos.add(op.seqNo())) { + expectedOps.add(op); + } + } + } + assertThat(TestTranslog.drainSnapshot(snapshot, false), equalTo(expectedOps)); + } + } + + public void testSimpleOperationsUpload() throws IOException { + ArrayList ops = new ArrayList<>(); + try (Translog.Snapshot snapshot = translog.newSnapshot()) { + assertThat(snapshot, SnapshotMatchers.size(0)); + } + + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 0, primaryTerm.get(), new byte[] { 1 })); + try (Translog.Snapshot snapshot = translog.newSnapshot()) { + assertThat(snapshot, SnapshotMatchers.equalsTo(ops)); + assertThat(snapshot.totalOperations(), equalTo(ops.size())); + } + + assertEquals(translog.allUploaded().size(), 4); + + addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 1, primaryTerm.get(), new byte[] { 1 })); + assertEquals(translog.allUploaded().size(), 6); + + translog.rollGeneration(); + assertEquals(translog.allUploaded().size(), 6); + + Set mdFiles = blobStoreTransferService.listAll(repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add("metadata")); + assertEquals(mdFiles.size(), 2); + logger.info("All md files {}", mdFiles); + + Set tlogFiles = blobStoreTransferService.listAll(repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(String.valueOf(primaryTerm.get()))); + logger.info("All data files {}", tlogFiles); + + // assert content of ckp and tlog files + BlobPath path = repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(String.valueOf(primaryTerm.get())); + for (TranslogReader reader : translog.readers) { + final long readerGeneration = reader.getGeneration(); + logger.error("Asserting content of {}", readerGeneration); + Path translogPath = reader.path(); + try (CheckedInputStream stream = new CheckedInputStream(new FileInputStream(translogPath.toFile()), new CRC32()); + InputStream tlogStream = blobStoreTransferService.readFile(path, Translog.getFilename(readerGeneration));) { + byte[] content = stream.readAllBytes(); + byte[] tlog = tlogStream.readAllBytes(); + assertArrayEquals(tlog, content); + } + + Path checkpointPath = translog.location().resolve(Translog.getCommitCheckpointFileName(readerGeneration)); + try (CheckedInputStream stream = new CheckedInputStream(new FileInputStream(checkpointPath.toFile()), new CRC32()); + InputStream ckpStream = blobStoreTransferService.readFile(path, Translog.getCommitCheckpointFileName(readerGeneration))) { + byte[] content = stream.readAllBytes(); + byte[] ckp = ckpStream.readAllBytes(); + assertArrayEquals(ckp, content); + } + } + } + + private Long populateTranslogOps(boolean withMissingOps) throws IOException { + long minSeqNo = SequenceNumbers.NO_OPS_PERFORMED; + long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED; + final int generations = between(2, 20); + long currentSeqNo = 0L; + List firstGenOps = null; + Map> operationsByGen = new HashMap<>(); + for (int gen = 0; gen < generations; gen++) { + List seqNos = new ArrayList<>(); + int numOps = randomIntBetween(4, 10); + for (int i = 0; i < numOps; i++, currentSeqNo++) { + minSeqNo = SequenceNumbers.min(minSeqNo, currentSeqNo); + maxSeqNo = SequenceNumbers.max(maxSeqNo, currentSeqNo); + seqNos.add(currentSeqNo); + } + Collections.shuffle(seqNos, new Random(100)); + List ops = new ArrayList<>(seqNos.size()); + for (long seqNo : seqNos) { + Translog.Index op = new Translog.Index(randomAlphaOfLength(10), seqNo, primaryTerm.get(), new byte[] { randomByte() }); + boolean shouldAdd = !withMissingOps || seqNo % 4 != 0; + if (shouldAdd) { + translog.add(op); + ops.add(op); + } + } + operationsByGen.put(translog.currentFileGeneration(), ops); + if (firstGenOps == null) { + firstGenOps = ops; + } + translog.rollGeneration(); + if (rarely()) { + translog.rollGeneration(); // empty generation + } + } + return currentSeqNo; + } + + public void testFullRangeSnapshot() throws Exception { + // Successful snapshot + long nextSeqNo = populateTranslogOps(false); + long fromSeqNo = 0L; + long toSeqNo = Math.min(nextSeqNo - 1, fromSeqNo + 15); + try (Translog.Snapshot snapshot = translog.newSnapshot(fromSeqNo, toSeqNo, true)) { + int totOps = 0; + for (Translog.Operation op = snapshot.next(); op != null; op = snapshot.next()) { + totOps++; + } + assertEquals(totOps, toSeqNo - fromSeqNo + 1); + } + } + + public void testFullRangeSnapshotWithFailures() throws Exception { + long nextSeqNo = populateTranslogOps(true); + long fromSeqNo = 0L; + long toSeqNo = Math.min(nextSeqNo - 1, fromSeqNo + 15); + try (Translog.Snapshot snapshot = translog.newSnapshot(fromSeqNo, toSeqNo, true)) { + int totOps = 0; + for (Translog.Operation op = snapshot.next(); op != null; op = snapshot.next()) { + totOps++; + } + fail("Should throw exception for missing operations"); + } catch (MissingHistoryOperationsException e) { + assertTrue(e.getMessage().contains("Not all operations between from_seqno")); + } + } + + public void testConcurrentWritesWithVaryingSize() throws Throwable { + final int opsPerThread = randomIntBetween(10, 200); + int threadCount = 2 + randomInt(5); + + logger.info("testing with [{}] threads, each doing [{}] ops", threadCount, opsPerThread); + final BlockingQueue writtenOperations = new ArrayBlockingQueue<>(threadCount * opsPerThread); + + Thread[] threads = new Thread[threadCount]; + final Exception[] threadExceptions = new Exception[threadCount]; + final AtomicLong seqNoGenerator = new AtomicLong(); + final CountDownLatch downLatch = new CountDownLatch(1); + for (int i = 0; i < threadCount; i++) { + final int threadId = i; + threads[i] = new TranslogThread( + translog, + downLatch, + opsPerThread, + threadId, + writtenOperations, + seqNoGenerator, + threadExceptions + ); + threads[i].setDaemon(true); + threads[i].start(); + } + + downLatch.countDown(); + + for (int i = 0; i < threadCount; i++) { + if (threadExceptions[i] != null) { + throw threadExceptions[i]; + } + threads[i].join(60 * 1000); + } + + List collect = new ArrayList<>(writtenOperations); + collect.sort( + Comparator.comparing(op -> op.operation.seqNo()) + ); + + List opsList = new ArrayList<>(threadCount * opsPerThread); + try (Translog.Snapshot snapshot = translog.newSnapshot()) { + for (Translog.Operation op = snapshot.next(); op != null; op = snapshot.next()) { + opsList.add(op); + } + } + opsList.sort( + Comparator.comparing( op -> op.seqNo()) + ); + + for (int i =0; i < threadCount * opsPerThread; i++) { + assertEquals(opsList.get(i), collect.get(i).operation); + } + } + + /** + * Tests that concurrent readers and writes maintain view and snapshot semantics + */ + public void testConcurrentWriteViewsAndSnapshot() throws Throwable { + final Thread[] writers = new Thread[randomIntBetween(1, 3)]; + final Thread[] readers = new Thread[randomIntBetween(1, 3)]; + final int flushEveryOps = randomIntBetween(5, 100); + final int maxOps = randomIntBetween(200, 1000); + final Object signalReaderSomeDataWasIndexed = new Object(); + final AtomicLong idGenerator = new AtomicLong(); + final CyclicBarrier barrier = new CyclicBarrier(writers.length + readers.length + 1); + + // a map of all written ops and their returned location. + final Map writtenOps = ConcurrentCollections.newConcurrentMap(); + + // a signal for all threads to stop + final AtomicBoolean run = new AtomicBoolean(true); + + final Object flushMutex = new Object(); + final AtomicLong lastCommittedLocalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + final LocalCheckpointTracker tracker = LocalCheckpointTrackerTests.createEmptyTracker(); + final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); + // any errors on threads + final List errors = new CopyOnWriteArrayList<>(); + logger.info("using [{}] readers. [{}] writers. flushing every ~[{}] ops.", readers.length, writers.length, flushEveryOps); + for (int i = 0; i < writers.length; i++) { + final String threadName = "writer_" + i; + final int threadId = i; + writers[i] = new Thread(new AbstractRunnable() { + @Override + public void doRun() throws BrokenBarrierException, InterruptedException, IOException { + barrier.await(); + int counter = 0; + while (run.get() && idGenerator.get() < maxOps) { + long id = idGenerator.getAndIncrement(); + final Translog.Operation op; + final Translog.Operation.Type type = Translog.Operation.Type.values()[((int) (id % Translog.Operation.Type + .values().length))]; + switch (type) { + case CREATE: + case INDEX: + op = new Translog.Index("" + id, id, primaryTerm.get(), new byte[] { (byte) id }); + break; + case DELETE: + op = new Translog.Delete(Long.toString(id), id, primaryTerm.get()); + break; + case NO_OP: + op = new Translog.NoOp(id, 1, Long.toString(id)); + break; + default: + throw new AssertionError("unsupported operation type [" + type + "]"); + } + Translog.Location location = translog.add(op); + tracker.markSeqNoAsProcessed(id); + Translog.Location existing = writtenOps.put(op, location); + if (existing != null) { + fail("duplicate op [" + op + "], old entry at " + location); + } + if (id % writers.length == threadId) { + translog.ensureSynced(location); + } + if (id % flushEveryOps == 0) { + synchronized (flushMutex) { + // we need not do this concurrently as we need to make sure that the generation + // we're committing - is still present when we're committing + long localCheckpoint = tracker.getProcessedCheckpoint(); + translog.rollGeneration(); + // expose the new checkpoint (simulating a commit), before we trim the translog + lastCommittedLocalCheckpoint.set(localCheckpoint); + deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint); + translog.trimUnreferencedReaders(); + } + } + if (id % 7 == 0) { + synchronized (signalReaderSomeDataWasIndexed) { + signalReaderSomeDataWasIndexed.notifyAll(); + } + } + counter++; + } + logger.info("--> [{}] done. wrote [{}] ops.", threadName, counter); + } + + @Override + public void onFailure(Exception e) { + logger.error(() -> new ParameterizedMessage("--> writer [{}] had an error", threadName), e); + errors.add(e); + } + }, threadName); + writers[i].start(); + } + + for (int i = 0; i < readers.length; i++) { + final String threadId = "reader_" + i; + readers[i] = new Thread(new AbstractRunnable() { + Closeable retentionLock = null; + long committedLocalCheckpointAtView; + + @Override + public void onFailure(Exception e) { + logger.error(() -> new ParameterizedMessage("--> reader [{}] had an error", threadId), e); + errors.add(e); + try { + closeRetentionLock(); + } catch (IOException inner) { + inner.addSuppressed(e); + logger.error("unexpected error while closing view, after failure", inner); + } + } + + void closeRetentionLock() throws IOException { + if (retentionLock != null) { + retentionLock.close(); + } + } + + void acquireRetentionLock() throws IOException { + closeRetentionLock(); + retentionLock = translog.acquireRetentionLock(); + // captures the last committed checkpoint, while holding the view, simulating + // recovery logic which captures a view and gets a lucene commit + committedLocalCheckpointAtView = lastCommittedLocalCheckpoint.get(); + logger.info("--> [{}] min gen after acquiring lock [{}]", threadId, translog.getMinFileGeneration()); + } + + @Override + protected void doRun() throws Exception { + barrier.await(); + int iter = 0; + while (idGenerator.get() < maxOps) { + if (iter++ % 10 == 0) { + acquireRetentionLock(); + } + + // captures al views that are written since the view was created (with a small caveat see bellow) + // these are what we expect the snapshot to return (and potentially some more). + Set expectedOps = new HashSet<>(writtenOps.keySet()); + expectedOps.removeIf(op -> op.seqNo() <= committedLocalCheckpointAtView); + try (Translog.Snapshot snapshot = translog.newSnapshot(committedLocalCheckpointAtView + 1L, Long.MAX_VALUE)) { + Translog.Operation op; + while ((op = snapshot.next()) != null) { + expectedOps.remove(op); + } + } + if (expectedOps.isEmpty() == false) { + StringBuilder missed = new StringBuilder("missed ").append(expectedOps.size()) + .append(" operations from [") + .append(committedLocalCheckpointAtView + 1L) + .append("]"); + boolean failed = false; + for (Translog.Operation expectedOp : expectedOps) { + final Translog.Location loc = writtenOps.get(expectedOp); + failed = true; + missed.append("\n --> [").append(expectedOp).append("] written at ").append(loc); + } + if (failed) { + fail(missed.toString()); + } + } + // slow down things a bit and spread out testing.. + synchronized (signalReaderSomeDataWasIndexed) { + if (idGenerator.get() < maxOps) { + signalReaderSomeDataWasIndexed.wait(); + } + } + } + closeRetentionLock(); + logger.info("--> [{}] done. tested [{}] snapshots", threadId, iter); + } + }, threadId); + readers[i].start(); + } + + barrier.await(); + logger.debug("--> waiting for threads to stop"); + for (Thread thread : writers) { + thread.join(); + } + logger.debug("--> waiting for readers to stop"); + // force stopping, if all writers crashed + synchronized (signalReaderSomeDataWasIndexed) { + idGenerator.set(Long.MAX_VALUE); + signalReaderSomeDataWasIndexed.notifyAll(); + } + for (Thread thread : readers) { + thread.join(); + } + if (errors.size() > 0) { + Throwable e = errors.get(0); + for (Throwable suppress : errors.subList(1, errors.size())) { + e.addSuppressed(suppress); + } + throw e; + } + logger.info("--> test done. total ops written [{}]", writtenOps.size()); + } + + public void testSyncUpTo() throws IOException { + int translogOperations = randomIntBetween(10, 100); + int count = 0; + for (int op = 0; op < translogOperations; op++) { + int seqNo = ++count; + final Translog.Location location = translog.add( + new Translog.Index("" + op, seqNo, primaryTerm.get(), Integer.toString(seqNo).getBytes(Charset.forName("UTF-8"))) + ); + if (randomBoolean()) { + assertTrue("at least one operation pending", translog.syncNeeded()); + assertTrue("this operation has not been synced", translog.ensureSynced(location)); + // we are the last location so everything should be synced + assertFalse("the last call to ensureSycned synced all previous ops", translog.syncNeeded()); + seqNo = ++count; + translog.add( + new Translog.Index("" + op, seqNo, primaryTerm.get(), Integer.toString(seqNo).getBytes(Charset.forName("UTF-8"))) + ); + assertTrue("one pending operation", translog.syncNeeded()); + assertFalse("this op has been synced before", translog.ensureSynced(location)); // not syncing now + assertTrue("we only synced a previous operation yet", translog.syncNeeded()); + } + if (rarely()) { + translog.rollGeneration(); + assertFalse("location is from a previous translog - already synced", translog.ensureSynced(location)); // not syncing now + assertFalse("no sync needed since no operations in current translog", translog.syncNeeded()); + } + + if (randomBoolean()) { + translog.sync(); + assertFalse("translog has been synced already", translog.ensureSynced(location)); + } + } + } + public void testSyncUpToStream() throws IOException { + int iters = randomIntBetween(5, 10); + for (int i = 0; i < iters; i++) { + int translogOperations = randomIntBetween(10, 100); + int count = 0; + ArrayList locations = new ArrayList<>(); + for (int op = 0; op < translogOperations; op++) { + if (rarely()) { + translog.rollGeneration(); + } + final Translog.Location location = translog.add( + new Translog.Index("" + op, op, primaryTerm.get(), Integer.toString(++count).getBytes(Charset.forName("UTF-8"))) + ); + locations.add(location); + } + Collections.shuffle(locations, random()); + if (randomBoolean()) { + assertTrue("at least one operation pending", translog.syncNeeded()); + assertTrue("this operation has not been synced", translog.ensureSynced(locations.stream())); + // we are the last location so everything should be synced + assertFalse("the last call to ensureSycned synced all previous ops", translog.syncNeeded()); + } else if (rarely()) { + translog.rollGeneration(); + // not syncing now + assertFalse("location is from a previous translog - already synced", translog.ensureSynced(locations.stream())); + assertFalse("no sync needed since no operations in current translog", translog.syncNeeded()); + } else { + translog.sync(); + assertFalse("translog has been synced already", translog.ensureSynced(locations.stream())); + } + for (Translog.Location location : locations) { + assertFalse("all of the locations should be synced: " + location, translog.ensureSynced(location)); + } + } + } + + public void testLocationComparison() throws IOException { + List locations = new ArrayList<>(); + int translogOperations = randomIntBetween(10, 100); + int count = 0; + for (int op = 0; op < translogOperations; op++) { + locations.add( + translog.add( + new Translog.Index("" + op, op, primaryTerm.get(), Integer.toString(++count).getBytes(Charset.forName("UTF-8"))) + ) + ); + if (randomBoolean()) { + translog.ensureSynced(locations.get(op)); + } + if (rarely() && translogOperations > op + 1) { + translog.rollGeneration(); + } + } + Collections.shuffle(locations, random()); + Translog.Location max = locations.get(0); + for (Translog.Location location : locations) { + max = max(max, location); + } + + assertEquals(max.generation, translog.currentFileGeneration()); + try (Translog.Snapshot snap = new TranslogTests.SortedSnapshot(translog.newSnapshot())) { + Translog.Operation next; + Translog.Operation maxOp = null; + while ((next = snap.next()) != null) { + maxOp = next; + } + assertNotNull(maxOp); + assertEquals(maxOp.getSource().source.utf8ToString(), Integer.toString(count)); + } + } + + + public static Translog.Location max(Translog.Location a, Translog.Location b) { + if (a.compareTo(b) > 0) { + return a; + } + return b; + } + + public void testTranslogWriter() throws IOException { + final TranslogWriter writer = translog.createWriter(translog.currentFileGeneration() + 1); + final Set persistedSeqNos = new HashSet<>(); + persistedSeqNoConsumer.set(persistedSeqNos::add); + final int numOps = scaledRandomIntBetween(8, 250000); + final Set seenSeqNos = new HashSet<>(); + boolean opsHaveValidSequenceNumbers = randomBoolean(); + for (int i = 0; i < numOps; i++) { + byte[] bytes = new byte[4]; + DataOutput out = EndiannessReverserUtil.wrapDataOutput(new ByteArrayDataOutput(bytes)); + out.writeInt(i); + long seqNo; + do { + seqNo = opsHaveValidSequenceNumbers ? randomNonNegativeLong() : SequenceNumbers.UNASSIGNED_SEQ_NO; + opsHaveValidSequenceNumbers = opsHaveValidSequenceNumbers || !rarely(); + } while (seenSeqNos.contains(seqNo)); + if (seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) { + seenSeqNos.add(seqNo); + } + writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), seqNo); + } + assertThat(persistedSeqNos, empty()); + writer.sync(); + persistedSeqNos.remove(SequenceNumbers.UNASSIGNED_SEQ_NO); + assertEquals(seenSeqNos, persistedSeqNos); + + final BaseTranslogReader reader = randomBoolean() + ? writer + : translog.openReader(writer.path(), Checkpoint.read(translog.location().resolve(Translog.CHECKPOINT_FILE_NAME))); + for (int i = 0; i < numOps; i++) { + ByteBuffer buffer = ByteBuffer.allocate(4); + reader.readBytes(buffer, reader.getFirstOperationOffset() + 4 * i); + buffer.flip(); + final int value = buffer.getInt(); + assertEquals(i, value); + } + final long minSeqNo = seenSeqNos.stream().min(Long::compareTo).orElse(SequenceNumbers.NO_OPS_PERFORMED); + final long maxSeqNo = seenSeqNos.stream().max(Long::compareTo).orElse(SequenceNumbers.NO_OPS_PERFORMED); + assertThat(reader.getCheckpoint().minSeqNo, equalTo(minSeqNo)); + assertThat(reader.getCheckpoint().maxSeqNo, equalTo(maxSeqNo)); + + byte[] bytes = new byte[4]; + DataOutput out = EndiannessReverserUtil.wrapDataOutput(new ByteArrayDataOutput(bytes)); + out.writeInt(2048); + writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), randomNonNegativeLong()); + + if (reader instanceof TranslogReader) { + ByteBuffer buffer = ByteBuffer.allocate(4); + try { + reader.readBytes(buffer, reader.getFirstOperationOffset() + 4 * numOps); + fail("read past EOF?"); + } catch (EOFException ex) { + // expected + } + ((TranslogReader) reader).close(); + } else { + // live reader! + ByteBuffer buffer = ByteBuffer.allocate(4); + final long pos = reader.getFirstOperationOffset() + 4 * numOps; + reader.readBytes(buffer, pos); + buffer.flip(); + final int value = buffer.getInt(); + assertEquals(2048, value); + } + IOUtils.close(writer); + } + + public void testTranslogWriterCanFlushInAddOrReadCall() throws IOException { + Path tempDir = createTempDir(); + final TranslogConfig temp = getTranslogConfig(tempDir); + final TranslogConfig config = new TranslogConfig( + temp.getShardId(), + temp.getTranslogPath(), + temp.getIndexSettings(), + temp.getBigArrays(), + new ByteSizeValue(1, ByteSizeUnit.KB) + ); + + final Set persistedSeqNos = new HashSet<>(); + final AtomicInteger writeCalls = new AtomicInteger(); + + final ChannelFactory channelFactory = (file, openOption) -> { + FileChannel delegate = FileChannel.open(file, openOption); + boolean success = false; + try { + // don't do partial writes for checkpoints we rely on the fact that the bytes are written as an atomic operation + final boolean isCkpFile = file.getFileName().toString().endsWith(".ckp"); + + final FileChannel channel; + if (isCkpFile) { + channel = delegate; + } else { + channel = new FilterFileChannel(delegate) { + + @Override + public int write(ByteBuffer src) throws IOException { + writeCalls.incrementAndGet(); + return super.write(src); + } + }; + } + success = true; + return channel; + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(delegate); + } + } + }; + + String translogUUID = Translog.createEmptyTranslog( + config.getTranslogPath(), + SequenceNumbers.NO_OPS_PERFORMED, + shardId, + channelFactory, + primaryTerm.get() + ); + + try ( + Translog translog = new RemoteFsTranslog( + config, + translogUUID, + new DefaultTranslogDeletionPolicy(-1, -1, 0), + () -> SequenceNumbers.NO_OPS_PERFORMED, + primaryTerm::get, + persistedSeqNos::add, + repository, + threadPool + ) { + @Override + ChannelFactory getChannelFactory() { + return channelFactory; + } + } + ) { + TranslogWriter writer = translog.getCurrent(); + int initialWriteCalls = writeCalls.get(); + byte[] bytes = new byte[256]; + writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 1); + writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 2); + writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 3); + writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 4); + assertThat(persistedSeqNos, empty()); + assertEquals(initialWriteCalls, writeCalls.get()); + + if (randomBoolean()) { + // Since the buffer is full, this will flush before performing the add. + writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 5); + assertThat(persistedSeqNos, empty()); + assertThat(writeCalls.get(), greaterThan(initialWriteCalls)); + } else { + // Will flush on read + writer.readBytes(ByteBuffer.allocate(256), 0); + assertThat(persistedSeqNos, empty()); + assertThat(writeCalls.get(), greaterThan(initialWriteCalls)); + + // Add after we the read flushed the buffer + writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 5); + } + + writer.sync(); + + // Sequence numbers are marked as persisted after sync + assertThat(persistedSeqNos, contains(1L, 2L, 3L, 4L, 5L)); + } + } + + public void testCloseIntoReader() throws IOException { + try (TranslogWriter writer = translog.createWriter(translog.currentFileGeneration() + 1)) { + final int numOps = randomIntBetween(8, 128); + for (int i = 0; i < numOps; i++) { + final byte[] bytes = new byte[4]; + final DataOutput out = EndiannessReverserUtil.wrapDataOutput(new ByteArrayDataOutput(bytes)); + out.writeInt(i); + writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), randomNonNegativeLong()); + } + writer.sync(); + final Checkpoint writerCheckpoint = writer.getCheckpoint(); + TranslogReader reader = writer.closeIntoReader(); + try { + if (randomBoolean()) { + reader.close(); + reader = translog.openReader(reader.path(), writerCheckpoint); + } + for (int i = 0; i < numOps; i++) { + final ByteBuffer buffer = ByteBuffer.allocate(4); + reader.readBytes(buffer, reader.getFirstOperationOffset() + 4 * i); + buffer.flip(); + final int value = buffer.getInt(); + assertEquals(i, value); + } + final Checkpoint readerCheckpoint = reader.getCheckpoint(); + assertThat(readerCheckpoint, equalTo(writerCheckpoint)); + } finally { + IOUtils.close(reader); + } + } + } + + public void testRecoveryUncommitted() throws IOException { + List locations = new ArrayList<>(); + int translogOperations = randomIntBetween(10, 100); + final int prepareOp = randomIntBetween(0, translogOperations - 1); + Translog.TranslogGeneration translogGeneration = null; + final boolean sync = randomBoolean(); + for (int op = 0; op < translogOperations; op++) { + locations.add( + translog.add(new Translog.Index("" + op, op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8")))) + ); + if (op == prepareOp) { + translogGeneration = translog.getGeneration(); + translog.rollGeneration(); + assertEquals( + "expected this to be the first roll (1 gen is on creation, 2 when opened)", + 2L, + translogGeneration.translogFileGeneration + ); + assertNotNull(translogGeneration.translogUUID); + } + } + if (sync) { + translog.sync(); + } + // we intentionally don't close the tlog that is in the prepareCommit stage since we try to recovery the uncommitted + // translog here as well. + TranslogConfig config = translog.getConfig(); + final String translogUUID = translog.getTranslogUUID(); + final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); + try ( + Translog translog = new RemoteFsTranslog( + config, + translogUUID, + deletionPolicy, + () -> globalCheckpoint.get(), + primaryTerm::get, + getPersistedSeqNoConsumer(), + repository, + threadPool + ) + ) { + assertNotNull(translogGeneration); + assertEquals( + "lastCommitted must be 1 less than current - we never finished the commit", + translogGeneration.translogFileGeneration + 1, + translog.currentFileGeneration() + ); + assertFalse(translog.syncNeeded()); + try (Translog.Snapshot snapshot = new TranslogTests.SortedSnapshot(translog.newSnapshot())) { + int upTo = sync ? translogOperations : prepareOp; + for (int i = 0; i < upTo; i++) { + Translog.Operation next = snapshot.next(); + assertNotNull("operation " + i + " must be non-null synced: " + sync, next); + assertEquals("payload mismatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.utf8ToString())); + } + } + } + if (randomBoolean()) { // recover twice + try ( + Translog translog = new RemoteFsTranslog( + config, + translogUUID, + deletionPolicy, + () -> SequenceNumbers.NO_OPS_PERFORMED, + primaryTerm::get, + seqNo -> {}, + repository, + threadPool + ) + ) { + assertNotNull(translogGeneration); + assertEquals( + "lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", + translogGeneration.translogFileGeneration + 3, + translog.currentFileGeneration() + ); + assertFalse(translog.syncNeeded()); + try (Translog.Snapshot snapshot = new TranslogTests.SortedSnapshot(translog.newSnapshot())) { + int upTo = sync ? translogOperations : prepareOp; + for (int i = 0; i < upTo; i++) { + Translog.Operation next = snapshot.next(); + assertNotNull("operation " + i + " must be non-null synced: " + sync, next); + assertEquals("payload mismatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.utf8ToString())); + } + } + } + } + } + + + class TranslogThread extends Thread { + private final CountDownLatch downLatch; + private final int opsPerThread; + private final int threadId; + private final Collection writtenOperations; + private final Exception[] threadExceptions; + private final Translog translog; + private final AtomicLong seqNoGenerator; + + TranslogThread( + Translog translog, + CountDownLatch downLatch, + int opsPerThread, + int threadId, + Collection writtenOperations, + AtomicLong seqNoGenerator, + Exception[] threadExceptions + ) { + this.translog = translog; + this.downLatch = downLatch; + this.opsPerThread = opsPerThread; + this.threadId = threadId; + this.writtenOperations = writtenOperations; + this.seqNoGenerator = seqNoGenerator; + this.threadExceptions = threadExceptions; + } + + @Override + public void run() { + try { + downLatch.await(); + for (int opCount = 0; opCount < opsPerThread; opCount++) { + Translog.Operation op; + final Translog.Operation.Type type = randomFrom(Translog.Operation.Type.values()); + switch (type) { + case CREATE: + case INDEX: + op = new Translog.Index( + threadId + "_" + opCount, + seqNoGenerator.getAndIncrement(), + primaryTerm.get(), + randomUnicodeOfLengthBetween(1, 20 * 1024).getBytes("UTF-8") + ); + break; + case DELETE: + op = new Translog.Delete( + threadId + "_" + opCount, + seqNoGenerator.getAndIncrement(), + primaryTerm.get(), + 1 + randomInt(100000) + ); + break; + case NO_OP: + op = new Translog.NoOp(seqNoGenerator.getAndIncrement(), primaryTerm.get(), randomAlphaOfLength(16)); + break; + default: + throw new AssertionError("unsupported operation type [" + type + "]"); + } + + Translog.Location loc = add(op); + writtenOperations.add(new TranslogTests.LocationOperation(op, loc)); + if (rarely()) { // lets verify we can concurrently read this + assertEquals(op, translog.readOperation(loc)); + } + afterAdd(); + } + } catch (Exception t) { + threadExceptions[threadId] = t; + } + } + + protected Translog.Location add(Translog.Operation op) throws IOException { + Translog.Location location = translog.add(op); + if (randomBoolean()) { + translog.ensureSynced(location); + } + return location; + } + + protected void afterAdd() { + } + } + +} diff --git a/server/src/test/java/org/opensearch/index/translog/TranslogManagerTestCase.java b/server/src/test/java/org/opensearch/index/translog/TranslogManagerTestCase.java index 25867cdb666ad..4c3948cbd7b5b 100644 --- a/server/src/test/java/org/opensearch/index/translog/TranslogManagerTestCase.java +++ b/server/src/test/java/org/opensearch/index/translog/TranslogManagerTestCase.java @@ -81,7 +81,7 @@ protected Translog createTranslog(Path translogPath, LongSupplier primaryTermSup shardId, primaryTermSupplier.getAsLong() ); - return new Translog( + return new LocalTranslog( translogConfig, translogUUID, createTranslogDeletionPolicy(INDEX_SETTINGS), diff --git a/server/src/test/java/org/opensearch/index/translog/TranslogTests.java b/server/src/test/java/org/opensearch/index/translog/TranslogTests.java index 153677e00c22b..f7741031f4d6e 100644 --- a/server/src/test/java/org/opensearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/TranslogTests.java @@ -217,7 +217,7 @@ protected Translog createTranslog(TranslogConfig config) throws IOException { shardId, primaryTerm.get() ); - return new Translog( + return new LocalTranslog( config, translogUUID, createTranslogDeletionPolicy(config.getIndexSettings()), @@ -228,7 +228,7 @@ protected Translog createTranslog(TranslogConfig config) throws IOException { } protected Translog openTranslog(TranslogConfig config, String translogUUID) throws IOException { - return new Translog( + return new LocalTranslog( config, translogUUID, createTranslogDeletionPolicy(config.getIndexSettings()), @@ -264,7 +264,7 @@ private Translog create(Path path) throws IOException { final TranslogConfig translogConfig = getTranslogConfig(path); final TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(translogConfig.getIndexSettings()); final String translogUUID = Translog.createEmptyTranslog(path, SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get()); - return new Translog( + return new LocalTranslog( translogConfig, translogUUID, deletionPolicy, @@ -1515,7 +1515,7 @@ public int write(ByteBuffer src) throws IOException { ); try ( - Translog translog = new Translog( + Translog translog = new LocalTranslog( config, translogUUID, new DefaultTranslogDeletionPolicy(-1, -1, 0), @@ -1630,7 +1630,7 @@ public void force(boolean metaData) throws IOException { ); try ( - Translog translog = new Translog( + Translog translog = new LocalTranslog( config, translogUUID, new DefaultTranslogDeletionPolicy(-1, -1, 0), @@ -1736,7 +1736,7 @@ public void testBasicRecovery() throws IOException { assertNull(snapshot.next()); } } else { - translog = new Translog( + translog = new LocalTranslog( config, translogGeneration.translogUUID, translog.getDeletionPolicy(), @@ -1795,7 +1795,7 @@ public void testRecoveryUncommitted() throws IOException { final String translogUUID = translog.getTranslogUUID(); final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); try ( - Translog translog = new Translog( + Translog translog = new LocalTranslog( config, translogUUID, deletionPolicy, @@ -1822,7 +1822,7 @@ public void testRecoveryUncommitted() throws IOException { } if (randomBoolean()) { // recover twice try ( - Translog translog = new Translog( + Translog translog = new LocalTranslog( config, translogUUID, deletionPolicy, @@ -1884,7 +1884,7 @@ public void testRecoveryUncommittedFileExists() throws IOException { final String translogUUID = translog.getTranslogUUID(); final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); try ( - Translog translog = new Translog( + Translog translog = new LocalTranslog( config, translogUUID, deletionPolicy, @@ -1912,7 +1912,7 @@ public void testRecoveryUncommittedFileExists() throws IOException { if (randomBoolean()) { // recover twice try ( - Translog translog = new Translog( + Translog translog = new LocalTranslog( config, translogUUID, deletionPolicy, @@ -1976,7 +1976,7 @@ public void testRecoveryUncommittedCorruptedCheckpoint() throws IOException { final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); final TranslogCorruptedException translogCorruptedException = expectThrows( TranslogCorruptedException.class, - () -> new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get, seqNo -> {}) + () -> new LocalTranslog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get, seqNo -> {}) ); assertThat( translogCorruptedException.getMessage(), @@ -1995,7 +1995,7 @@ public void testRecoveryUncommittedCorruptedCheckpoint() throws IOException { StandardOpenOption.TRUNCATE_EXISTING ); try ( - Translog translog = new Translog( + Translog translog = new LocalTranslog( config, translogUUID, deletionPolicy, @@ -2293,7 +2293,7 @@ public void testOpenForeignTranslog() throws IOException { final String foreignTranslog = randomRealisticUnicodeOfCodepointLengthBetween(1, translogGeneration.translogUUID.length()); try { - new Translog( + new LocalTranslog( config, foreignTranslog, createTranslogDeletionPolicy(), @@ -2305,7 +2305,7 @@ public void testOpenForeignTranslog() throws IOException { } catch (TranslogCorruptedException ex) { } - this.translog = new Translog( + this.translog = new LocalTranslog( config, translogUUID, deletionPolicy, @@ -2373,7 +2373,7 @@ public void testCloseConcurrently() throws Throwable { } } - private class TranslogThread extends Thread { + class TranslogThread extends Thread { private final CountDownLatch downLatch; private final int opsPerThread; private final int threadId; @@ -2535,7 +2535,7 @@ public void testFailFlush() throws IOException { final String translogUUID = translog.getTranslogUUID(); final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); try ( - Translog tlog = new Translog( + Translog tlog = new LocalTranslog( config, translogUUID, deletionPolicy, @@ -2692,7 +2692,7 @@ protected void afterAdd() throws IOException { // drop all that haven't been synced writtenOperations.removeIf(next -> checkpoint.offset < (next.location.translogLocation + next.location.size)); try ( - Translog tlog = new Translog( + Translog tlog = new LocalTranslog( config, translogUUID, createTranslogDeletionPolicy(), @@ -2755,7 +2755,7 @@ public void testRecoveryFromAFutureGenerationCleansUp() throws IOException { TranslogConfig config = translog.getConfig(); final TranslogDeletionPolicy deletionPolicy = new DefaultTranslogDeletionPolicy(-1, -1, 0); deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint); - translog = new Translog( + translog = new LocalTranslog( config, translog.getTranslogUUID(), deletionPolicy, @@ -2824,7 +2824,7 @@ public void testRecoveryFromFailureOnTrimming() throws IOException { final TranslogDeletionPolicy deletionPolicy = new DefaultTranslogDeletionPolicy(-1, -1, 0); deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint); try ( - Translog translog = new Translog( + Translog translog = new LocalTranslog( config, translogUUID, deletionPolicy, @@ -2934,7 +2934,7 @@ private Translog getFailableTranslog( primaryTerm.get() ); } - return new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get, seqNo -> {}) { + return new LocalTranslog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get, seqNo -> {}) { @Override ChannelFactory getChannelFactory() { return channelFactory; @@ -3072,7 +3072,7 @@ public void testFailWhileCreateWriteWithRecoveredTLogs() throws IOException { translog.add(new Translog.Index("boom", 0, primaryTerm.get(), "boom".getBytes(Charset.forName("UTF-8")))); translog.close(); try { - new Translog( + new LocalTranslog( config, translog.getTranslogUUID(), createTranslogDeletionPolicy(), @@ -3140,7 +3140,7 @@ public void testRecoverWithUnbackedNextGenInIllegalState() throws IOException { TranslogException ex = expectThrows( TranslogException.class, - () -> new Translog( + () -> new LocalTranslog( config, translog.getTranslogUUID(), translog.getDeletionPolicy(), @@ -3167,7 +3167,7 @@ public void testRecoverWithUnbackedNextGenAndFutureFile() throws IOException { // we add N+1 and N+2 to ensure we only delete the N+1 file and never jump ahead and wipe without the right condition Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 2) + ".tlog")); try ( - Translog tlog = new Translog( + Translog tlog = new LocalTranslog( config, translogUUID, deletionPolicy, @@ -3189,7 +3189,7 @@ public void testRecoverWithUnbackedNextGenAndFutureFile() throws IOException { TranslogException ex = expectThrows( TranslogException.class, - () -> new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get, seqNo -> {}) + () -> new LocalTranslog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get, seqNo -> {}) ); assertEquals(ex.getMessage(), "failed to create new translog file"); assertEquals(ex.getCause().getClass(), FileAlreadyExistsException.class); @@ -3312,7 +3312,7 @@ public void testWithRandomException() throws IOException { ); } try ( - Translog translog = new Translog( + Translog translog = new LocalTranslog( config, generationUUID, deletionPolicy, @@ -3407,7 +3407,7 @@ public void testPendingDelete() throws IOException { final String translogUUID = translog.getTranslogUUID(); final TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(config.getIndexSettings()); translog.close(); - translog = new Translog( + translog = new LocalTranslog( config, translogUUID, deletionPolicy, @@ -3421,7 +3421,7 @@ public void testPendingDelete() throws IOException { translog.add(new Translog.Index("3", 2, primaryTerm.get(), new byte[] { 3 })); translog.close(); IOUtils.close(lock); - translog = new Translog( + translog = new LocalTranslog( config, translogUUID, deletionPolicy, @@ -3780,7 +3780,7 @@ public void testCloseSnapshotTwice() throws Exception { // close method should never be called directly from Translog (the only exception is closeOnTragicEvent) public void testTranslogCloseInvariant() throws IOException { assumeTrue("test only works with assertions enabled", Assertions.ENABLED); - class MisbehavingTranslog extends Translog { + class MisbehavingTranslog extends LocalTranslog { MisbehavingTranslog( TranslogConfig config, String translogUUID, @@ -3923,7 +3923,7 @@ public void copy(Path source, Path target, CopyOption... options) throws IOExcep assertFalse(brokenTranslog.isOpen()); try ( - Translog recoveredTranslog = new Translog( + Translog recoveredTranslog = new LocalTranslog( getTranslogConfig(path), brokenTranslog.getTranslogUUID(), brokenTranslog.getDeletionPolicy(), @@ -3957,7 +3957,7 @@ public void testSyncConcurrently() throws Exception { } }; try ( - Translog translog = new Translog( + Translog translog = new LocalTranslog( config, translogUUID, createTranslogDeletionPolicy(config.getIndexSettings()), @@ -4038,7 +4038,7 @@ public void force(boolean metaData) throws IOException { channelFactory, primaryTerm.get() ); - final Translog translog = new Translog( + final Translog translog = new LocalTranslog( config, translogUUID, createTranslogDeletionPolicy(), diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index ff4005d9bcedf..5cc38fd113045 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -1795,6 +1795,8 @@ public void onFailure(final Exception e) { ); final BigArrays bigArrays = new BigArrays(new PageCacheRecycler(settings), null, "test"); final MapperRegistry mapperRegistry = new IndicesModule(Collections.emptyList()).getMapperRegistry(); + final SetOnce repositoriesServiceReference = new SetOnce<>(); + repositoriesServiceReference.set(repositoriesService); indicesService = new IndicesService( settings, mock(PluginsService.class), @@ -1827,7 +1829,8 @@ public void onFailure(final Exception e) { emptyMap(), null, emptyMap(), - new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService) + new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService), + repositoriesServiceReference::get ); final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); snapshotShardsService = new SnapshotShardsService( diff --git a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java index f4a9f51789679..20f9efe035bd6 100644 --- a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java @@ -112,6 +112,7 @@ import org.opensearch.index.shard.ShardId; import org.opensearch.index.store.Store; import org.opensearch.index.translog.InternalTranslogManager; +import org.opensearch.index.translog.LocalTranslog; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogConfig; import org.opensearch.index.translog.TranslogDeletionPolicy; @@ -150,9 +151,9 @@ import static java.util.Collections.shuffle; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.instanceOf; import static org.opensearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY; import static org.opensearch.index.engine.Engine.Operation.Origin.PRIMARY; import static org.opensearch.index.engine.Engine.Operation.Origin.REPLICA; @@ -528,7 +529,7 @@ protected Translog createTranslog(Path translogPath, LongSupplier primaryTermSup shardId, primaryTermSupplier.getAsLong() ); - return new Translog( + return new LocalTranslog( translogConfig, translogUUID, createTranslogDeletionPolicy(INDEX_SETTINGS),