Skip to content

Commit

Permalink
feat(restore-indices): add additional step to also clear system metad…
Browse files Browse the repository at this point in the history
…ata service (datahub-project#10662)

Co-authored-by: John Joyce <[email protected]>
  • Loading branch information
Masterchen09 and jjoyce0510 authored Jul 31, 2024
1 parent bcde06d commit 76f1b23
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.linkedin.datahub.upgrade.common.steps;

import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.datahub.upgrade.nocode.NoCodeUpgrade;
import com.linkedin.metadata.systemmetadata.SystemMetadataService;
import java.util.function.Function;

public class ClearSystemMetadataServiceStep implements UpgradeStep {

private final SystemMetadataService _systemMetadataService;
private final boolean _alwaysRun;

public ClearSystemMetadataServiceStep(
final SystemMetadataService systemMetadataService, final boolean alwaysRun) {
_systemMetadataService = systemMetadataService;
_alwaysRun = alwaysRun;
}

@Override
public String id() {
return "ClearSystemMetadataServiceStep";
}

@Override
public boolean skip(UpgradeContext context) {
if (_alwaysRun) {
return false;
}
if (context.parsedArgs().containsKey(NoCodeUpgrade.CLEAN_ARG_NAME)) {
return false;
}
context.report().addLine("Cleanup has not been requested.");
return true;
}

@Override
public int retryCount() {
return 1;
}

@Override
public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {
try {
_systemMetadataService.clear();
} catch (Exception e) {
context.report().addLine("Failed to clear system metadata service", e);
return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.FAILED);
}
return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.SUCCEEDED);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
@RequiredArgsConstructor
public class GMSDisableWriteModeStep implements UpgradeStep {

private final SystemEntityClient entityClient;
private final SystemEntityClient systemEntityClient;

@Override
public String id() {
Expand All @@ -29,7 +29,7 @@ public int retryCount() {
public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {
try {
entityClient.setWritable(context.opContext(), false);
systemEntityClient.setWritable(context.opContext(), false);
} catch (Exception e) {
log.error("Failed to turn write mode off in GMS", e);
context.report().addLine("Failed to turn write mode off in GMS");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
@Slf4j
@RequiredArgsConstructor
public class GMSEnableWriteModeStep implements UpgradeStep {
private final SystemEntityClient entityClient;
private final SystemEntityClient systemEntityClient;

@Override
public String id() {
Expand All @@ -28,7 +28,7 @@ public int retryCount() {
public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {
try {
entityClient.setWritable(context.opContext(), true);
systemEntityClient.setWritable(context.opContext(), true);
} catch (Exception e) {
log.error("Failed to turn write mode back on in GMS", e);
context.report().addLine("Failed to turn write mode back on in GMS");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.search.EntitySearchService;
import com.linkedin.metadata.systemmetadata.SystemMetadataService;
import io.ebean.Database;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -26,29 +27,40 @@ public class RestoreBackupConfig {
"ebeanServer",
"entityService",
"systemEntityClient",
"graphService",
"systemMetadataService",
"searchService",
"graphService",
"entityRegistry"
})
@ConditionalOnProperty(name = "entityService.impl", havingValue = "ebean", matchIfMissing = true)
@Nonnull
public RestoreBackup createInstance() {
final Database ebeanServer = applicationContext.getBean(Database.class);
final EntityService<?> entityService = applicationContext.getBean(EntityService.class);
final SystemEntityClient entityClient = applicationContext.getBean(SystemEntityClient.class);
final GraphService graphClient = applicationContext.getBean(GraphService.class);
final EntitySearchService searchClient = applicationContext.getBean(EntitySearchService.class);
final SystemEntityClient systemEntityClient =
applicationContext.getBean(SystemEntityClient.class);
final SystemMetadataService systemMetadataService =
applicationContext.getBean(SystemMetadataService.class);
final EntitySearchService entitySearchService =
applicationContext.getBean(EntitySearchService.class);
final GraphService graphService = applicationContext.getBean(GraphService.class);
final EntityRegistry entityRegistry = applicationContext.getBean(EntityRegistry.class);

return new RestoreBackup(
ebeanServer, entityService, entityRegistry, entityClient, graphClient, searchClient);
ebeanServer,
entityService,
entityRegistry,
systemEntityClient,
systemMetadataService,
entitySearchService,
graphService);
}

@Bean(name = "restoreBackup")
@ConditionalOnProperty(name = "entityService.impl", havingValue = "cassandra")
@Nonnull
public RestoreBackup createNotImplInstance() {
log.warn("restoreIndices is not supported for cassandra!");
return new RestoreBackup(null, null, null, null, null, null);
return new RestoreBackup(null, null, null, null, null, null, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.search.EntitySearchService;
import com.linkedin.metadata.systemmetadata.SystemMetadataService;
import io.ebean.Database;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -20,24 +21,33 @@ public class RestoreIndicesConfig {
@Autowired ApplicationContext applicationContext;

@Bean(name = "restoreIndices")
@DependsOn({"ebeanServer", "entityService", "searchService", "graphService"})
@DependsOn({
"ebeanServer",
"entityService",
"systemMetadataService",
"searchService",
"graphService"
})
@ConditionalOnProperty(name = "entityService.impl", havingValue = "ebean", matchIfMissing = true)
@Nonnull
public RestoreIndices createInstance() {
final Database ebeanServer = applicationContext.getBean(Database.class);
final EntityService<?> entityService = applicationContext.getBean(EntityService.class);
final SystemMetadataService systemMetadataService =
applicationContext.getBean(SystemMetadataService.class);
final EntitySearchService entitySearchService =
applicationContext.getBean(EntitySearchService.class);
final GraphService graphService = applicationContext.getBean(GraphService.class);

return new RestoreIndices(ebeanServer, entityService, entitySearchService, graphService);
return new RestoreIndices(
ebeanServer, entityService, systemMetadataService, entitySearchService, graphService);
}

@Bean(name = "restoreIndices")
@ConditionalOnProperty(name = "entityService.impl", havingValue = "cassandra")
@Nonnull
public RestoreIndices createNotImplInstance() {
log.warn("restoreIndices is not supported for cassandra!");
return new RestoreIndices(null, null, null, null);
return new RestoreIndices(null, null, null, null, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.common.steps.ClearGraphServiceStep;
import com.linkedin.datahub.upgrade.common.steps.ClearSearchServiceStep;
import com.linkedin.datahub.upgrade.common.steps.ClearSystemMetadataServiceStep;
import com.linkedin.datahub.upgrade.common.steps.GMSDisableWriteModeStep;
import com.linkedin.datahub.upgrade.common.steps.GMSEnableWriteModeStep;
import com.linkedin.entity.client.SystemEntityClient;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.search.EntitySearchService;
import com.linkedin.metadata.systemmetadata.SystemMetadataService;
import io.ebean.Database;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -26,13 +28,20 @@ public RestoreBackup(
@Nullable final Database server,
final EntityService<?> entityService,
final EntityRegistry entityRegistry,
final SystemEntityClient entityClient,
final GraphService graphClient,
final EntitySearchService searchClient) {
final SystemEntityClient systemEntityClient,
final SystemMetadataService systemMetadataService,
final EntitySearchService entitySearchService,
final GraphService graphClient) {
if (server != null) {
_steps =
buildSteps(
server, entityService, entityRegistry, entityClient, graphClient, searchClient);
server,
entityService,
entityRegistry,
systemEntityClient,
systemMetadataService,
entitySearchService,
graphClient);
} else {
_steps = List.of();
}
Expand All @@ -52,16 +61,18 @@ private List<UpgradeStep> buildSteps(
final Database server,
final EntityService<?> entityService,
final EntityRegistry entityRegistry,
final SystemEntityClient entityClient,
final GraphService graphClient,
final EntitySearchService searchClient) {
final SystemEntityClient systemEntityClient,
final SystemMetadataService systemMetadataService,
final EntitySearchService entitySearchService,
final GraphService graphClient) {
final List<UpgradeStep> steps = new ArrayList<>();
steps.add(new GMSDisableWriteModeStep(entityClient));
steps.add(new ClearSearchServiceStep(searchClient, true));
steps.add(new GMSDisableWriteModeStep(systemEntityClient));
steps.add(new ClearSystemMetadataServiceStep(systemMetadataService, true));
steps.add(new ClearSearchServiceStep(entitySearchService, true));
steps.add(new ClearGraphServiceStep(graphClient, true));
steps.add(new ClearAspectV2TableStep(server));
steps.add(new RestoreStorageStep(entityService, entityRegistry));
steps.add(new GMSEnableWriteModeStep(entityClient));
steps.add(new GMSEnableWriteModeStep(systemEntityClient));
return steps;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.common.steps.ClearGraphServiceStep;
import com.linkedin.datahub.upgrade.common.steps.ClearSearchServiceStep;
import com.linkedin.datahub.upgrade.common.steps.ClearSystemMetadataServiceStep;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.search.EntitySearchService;
import com.linkedin.metadata.systemmetadata.SystemMetadataService;
import io.ebean.Database;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -32,10 +34,13 @@ public class RestoreIndices implements Upgrade {
public RestoreIndices(
@Nullable final Database server,
final EntityService<?> entityService,
final SystemMetadataService systemMetadataService,
final EntitySearchService entitySearchService,
final GraphService graphService) {
if (server != null) {
_steps = buildSteps(server, entityService, entitySearchService, graphService);
_steps =
buildSteps(
server, entityService, systemMetadataService, entitySearchService, graphService);
} else {
_steps = List.of();
}
Expand All @@ -54,9 +59,11 @@ public List<UpgradeStep> steps() {
private List<UpgradeStep> buildSteps(
final Database server,
final EntityService<?> entityService,
final SystemMetadataService systemMetadataService,
final EntitySearchService entitySearchService,
final GraphService graphService) {
final List<UpgradeStep> steps = new ArrayList<>();
steps.add(new ClearSystemMetadataServiceStep(systemMetadataService, false));
steps.add(new ClearSearchServiceStep(entitySearchService, false));
steps.add(new ClearGraphServiceStep(graphService, false));
steps.add(new SendMAEStep(server, entityService));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.linkedin.common.urn.Urn;
import com.linkedin.metadata.aspect.models.graph.Edge;
Expand Down Expand Up @@ -296,7 +295,6 @@ public List<ReindexConfig> buildReindexConfigs(
Collections.emptyMap()));
}

@VisibleForTesting
@Override
public void clear() {
_esBulkProcessor.deleteByQuery(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.linkedin.common.urn.Urn;
import com.linkedin.metadata.run.AspectRowSummary;
Expand Down Expand Up @@ -250,7 +249,6 @@ public List<ReindexConfig> buildReindexConfigs(
Collections.emptyMap()));
}

@VisibleForTesting
@Override
public void clear() {
_esBulkProcessor.deleteByQuery(
Expand Down

0 comments on commit 76f1b23

Please sign in to comment.