Skip to content

Commit

Permalink
Skip index retention if its remote-reindex migration is currently run… (
Browse files Browse the repository at this point in the history
#20121)

* Skip index retention if its remote-reindex migration is currently running

* fixed jetbrains annotation

* Added changelog

* Fix ES adapter for migrations
  • Loading branch information
todvora authored Aug 12, 2024
1 parent 27c2363 commit 9d2355c
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 2 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/pr-20121.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type = "f"
message = "Stop index retention during remote reindex migration"

pulls = ["20121"]
issues = ["graylog-plugin-enterprise#8097"]
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import org.graylog2.indexer.IndexSet;
import org.graylog2.indexer.datanode.RemoteReindexRequest;
import org.graylog2.indexer.datanode.RemoteReindexingMigrationAdapter;
import org.graylog2.indexer.migration.IndexerConnectionCheckResult;
Expand All @@ -30,6 +31,11 @@ public class UnsupportedRemoteReindexingMigrationAdapterES7 implements RemoteRei

public static final String UNSUPPORTED_MESSAGE = "This operation should never be called. We remote-reindex into the DataNode that contains OpenSearch. This adapter only exists for API completeness";

@Override
public boolean isMigrationRunning(IndexSet indexSet) {
return false; // we'll never run a remote reindex migration against elasticsearch target. It's always OS in datanode.
}

@Override
public String start(RemoteReindexRequest request) {
throw new UnsupportedOperationException(UNSUPPORTED_MESSAGE);
Expand All @@ -47,6 +53,6 @@ public IndexerConnectionCheckResult checkConnection(@Nonnull URI uri, @Nullable

@Override
public Optional<String> getLatestMigrationId() {
throw new UnsupportedOperationException(UNSUPPORTED_MESSAGE);
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.graylog.shaded.opensearch2.org.opensearch.tasks.Task;
import org.graylog2.datanode.RemoteReindexAllowlistEvent;
import org.graylog2.events.ClusterEventBus;
import org.graylog2.indexer.IndexSet;
import org.graylog2.indexer.IndexSetRegistry;
import org.graylog2.indexer.datanode.IndexMigrationConfiguration;
import org.graylog2.indexer.datanode.MigrationConfiguration;
Expand Down Expand Up @@ -85,6 +86,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -133,6 +135,28 @@ public RemoteReindexingMigrationAdapterOS2(final OpenSearchClient client,
this.notificationService = notificationService;
}

@Override
public boolean isMigrationRunning(IndexSet indexSet) {
return reindexMigrationService.getLatestMigrationId()
.map(this::status)
.map(migration -> isIndexSetCurrentlyMigrated(indexSet, migration))
.orElse(false);
}

@Nonnull
private Boolean isIndexSetCurrentlyMigrated(IndexSet indexSet, RemoteReindexMigration mig) {
if (mig.status() == Status.NOT_STARTED || mig.status() == Status.RUNNING) {
final Set<String> runningIndices = mig.indices().stream()
.filter(i -> !i.isCompleted())
.map(RemoteReindexIndex::name)
.collect(Collectors.toSet());
final Set<IndexSet> migratedIndexSets = indexSetRegistry.getForIndices(runningIndices);
return migratedIndexSets.contains(indexSet);
} else {
return false;
}
}

@Override
public String start(RemoteReindexRequest request) {
final AggregatedConnectionResponse response = getAllIndicesFrom(request.uri(), request.username(), request.password(), request.trustUnknownCerts());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@

import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import org.graylog2.indexer.IndexSet;
import org.graylog2.indexer.migration.IndexerConnectionCheckResult;
import org.graylog2.indexer.migration.RemoteReindexMigration;

import java.net.URI;
import java.util.Optional;

public interface RemoteReindexingMigrationAdapter {
boolean isMigrationRunning(IndexSet indexSet);

enum Status {
NOT_STARTED, RUNNING, ERROR, FINISHED
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.graylog2.indexer.IndexSet;
import org.graylog2.indexer.IndexSetRegistry;
import org.graylog2.indexer.cluster.Cluster;
import org.graylog2.indexer.datanode.RemoteReindexingMigrationAdapter;
import org.graylog2.indexer.indexset.IndexSetConfig;
import org.graylog2.notifications.Notification;
import org.graylog2.notifications.NotificationService;
Expand Down Expand Up @@ -48,21 +49,24 @@ public class IndexRetentionThread extends Periodical {
private final Map<String, Provider<RetentionStrategy>> retentionStrategyMap;
private final DataTieringOrchestrator dataTieringOrchestrator;

private final RemoteReindexingMigrationAdapter migrationService;

@Inject
public IndexRetentionThread(ElasticsearchConfiguration configuration,
IndexSetRegistry indexSetRegistry,
Cluster cluster,
NodeId nodeId,
NotificationService notificationService,
Map<String, Provider<RetentionStrategy>> retentionStrategyMap,
DataTieringOrchestrator dataTieringOrchestrator) {
DataTieringOrchestrator dataTieringOrchestrator, RemoteReindexingMigrationAdapter migrationService) {
this.configuration = configuration;
this.indexSetRegistry = indexSetRegistry;
this.cluster = cluster;
this.nodeId = nodeId;
this.notificationService = notificationService;
this.retentionStrategyMap = retentionStrategyMap;
this.dataTieringOrchestrator = dataTieringOrchestrator;
this.migrationService = migrationService;
}

@Override
Expand All @@ -81,6 +85,12 @@ public void doRun() {
LOG.debug("Skipping non-writable index set <{}> ({})", indexSet.getConfig().id(), indexSet.getConfig().title());
continue;
}

if(isCurrentlyMigrated(indexSet)) {
LOG.info("Index set <{}> is currently being migrated, skipping retention", indexSet.getConfig().title());
continue;
}

final IndexSetConfig config = indexSet.getConfig();
if (config.dataTieringConfig() != null) {
dataTieringOrchestrator.retain(indexSet);
Expand All @@ -99,6 +109,10 @@ public void doRun() {
}
}

private boolean isCurrentlyMigrated(IndexSet indexSet) {
return migrationService.isMigrationRunning(indexSet);
}

private void retentionProblemNotification(String title, String description) {
final Notification notification = notificationService.buildNow()
.addNode(nodeId.getNodeId())
Expand Down

0 comments on commit 9d2355c

Please sign in to comment.