Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Nov 20, 2024
2 parents dccf589 + 524ef8c commit 971d989
Show file tree
Hide file tree
Showing 37 changed files with 548 additions and 166 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ buildscript {
ext.hazelcastVersion = '5.3.6'
ext.ebeanVersion = '15.5.2'
ext.googleJavaFormatVersion = '1.18.1'
ext.openLineageVersion = '1.19.0'
ext.openLineageVersion = '1.24.2'
ext.logbackClassicJava8 = '1.2.12'

ext.docker_registry = 'acryldata'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package com.linkedin.datahub.upgrade.system;

import static com.linkedin.metadata.Constants.DATA_HUB_UPGRADE_RESULT_ASPECT_NAME;

import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.SystemAspect;
import com.linkedin.metadata.boot.BootstrapStep;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
Expand All @@ -16,10 +15,13 @@
import com.linkedin.metadata.entity.ebean.PartitionedStream;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.metadata.utils.AuditStampUtils;
import com.linkedin.upgrade.DataHubUpgradeResult;
import com.linkedin.upgrade.DataHubUpgradeState;
import com.linkedin.util.Pair;
import io.datahubproject.metadata.context.OperationContext;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Function;
Expand All @@ -33,6 +35,8 @@
*/
@Slf4j
public abstract class AbstractMCLStep implements UpgradeStep {
public static final String LAST_URN_KEY = "lastUrn";

private final OperationContext opContext;
private final EntityService<?> entityService;
private final AspectDao aspectDao;
Expand Down Expand Up @@ -70,10 +74,30 @@ protected Urn getUpgradeIdUrn() {
@Override
public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {
// Resume state
Optional<DataHubUpgradeResult> prevResult =
context.upgrade().getUpgradeResult(opContext, getUpgradeIdUrn(), entityService);
String resumeUrn =
prevResult
.filter(
result ->
DataHubUpgradeState.IN_PROGRESS.equals(result.getState())
&& result.getResult() != null
&& result.getResult().containsKey(LAST_URN_KEY))
.map(result -> result.getResult().get(LAST_URN_KEY))
.orElse(null);
if (resumeUrn != null) {
log.info("{}: Resuming from URN: {}", getUpgradeIdUrn(), resumeUrn);
}

// re-using for configuring the sql scan
RestoreIndicesArgs args =
new RestoreIndicesArgs().aspectName(getAspectName()).batchSize(batchSize).limit(limit);
new RestoreIndicesArgs()
.aspectName(getAspectName())
.batchSize(batchSize)
.lastUrn(resumeUrn)
.urnBasedPagination(resumeUrn != null)
.limit(limit);

if (getUrnLike() != null) {
args = args.urnLike(getUrnLike());
Expand All @@ -86,40 +110,62 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
batch -> {
log.info("Processing batch({}) of size {}.", getAspectName(), batchSize);

List<Pair<Future<?>, Boolean>> futures;

List<Pair<Future<?>, SystemAspect>> futures;
futures =
EntityUtils.toSystemAspectFromEbeanAspects(
opContext.getRetrieverContext().get(),
batch.collect(Collectors.toList()))
.stream()
.map(
systemAspect ->
entityService.alwaysProduceMCLAsync(
opContext,
systemAspect.getUrn(),
systemAspect.getUrn().getEntityType(),
getAspectName(),
systemAspect.getAspectSpec(),
null,
systemAspect.getRecordTemplate(),
null,
systemAspect
.getSystemMetadata()
.setRunId(id())
.setLastObserved(System.currentTimeMillis()),
AuditStampUtils.createDefaultAuditStamp(),
ChangeType.UPSERT))
.collect(Collectors.toList());

futures.forEach(
f -> {
try {
f.getFirst().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
systemAspect -> {
Pair<Future<?>, Boolean> future =
entityService.alwaysProduceMCLAsync(
opContext,
systemAspect.getUrn(),
systemAspect.getUrn().getEntityType(),
getAspectName(),
systemAspect.getAspectSpec(),
null,
systemAspect.getRecordTemplate(),
null,
systemAspect
.getSystemMetadata()
.setRunId(id())
.setLastObserved(System.currentTimeMillis()),
AuditStampUtils.createDefaultAuditStamp(),
ChangeType.UPSERT);
return Pair.<Future<?>, SystemAspect>of(
future.getFirst(), systemAspect);
})
.toList();

SystemAspect lastAspect =
futures.stream()
.map(
f -> {
try {
f.getFirst().get();
return f.getSecond();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
})
.reduce((a, b) -> b)
.orElse(null);

// record progress
if (lastAspect != null) {
log.info(
"{}: Saving state. Last urn:{}", getUpgradeIdUrn(), lastAspect.getUrn());
context
.upgrade()
.setUpgradeResult(
opContext,
getUpgradeIdUrn(),
entityService,
DataHubUpgradeState.IN_PROGRESS,
Map.of(LAST_URN_KEY, lastAspect.getUrn().toString()));
}

if (batchDelayMs > 0) {
log.info("Sleeping for {} ms", batchDelayMs);
Expand All @@ -142,12 +188,23 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
@Override
/** Returns whether the upgrade should be skipped. */
public boolean skip(UpgradeContext context) {
boolean previouslyRun =
entityService.exists(
opContext, getUpgradeIdUrn(), DATA_HUB_UPGRADE_RESULT_ASPECT_NAME, true);
if (previouslyRun) {
log.info("{} was already run. Skipping.", id());
Optional<DataHubUpgradeResult> prevResult =
context.upgrade().getUpgradeResult(opContext, getUpgradeIdUrn(), entityService);

boolean previousRunFinal =
prevResult
.filter(
result ->
DataHubUpgradeState.SUCCEEDED.equals(result.getState())
|| DataHubUpgradeState.ABORTED.equals(result.getState()))
.isPresent();

if (previousRunFinal) {
log.info(
"{} was already run. State: {} Skipping.",
id(),
prevResult.map(DataHubUpgradeResult::getState));
}
return previouslyRun;
return previousRunFinal;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.datahub.upgrade.system.schemafield;

import static com.linkedin.datahub.upgrade.system.AbstractMCLStep.LAST_URN_KEY;
import static com.linkedin.metadata.Constants.APP_SOURCE;
import static com.linkedin.metadata.Constants.DATASET_ENTITY_NAME;
import static com.linkedin.metadata.Constants.SCHEMA_METADATA_ASPECT_NAME;
Expand Down Expand Up @@ -61,7 +62,6 @@
*/
@Slf4j
public class GenerateSchemaFieldsFromSchemaMetadataStep implements UpgradeStep {
private static final String LAST_URN_KEY = "lastUrn";
private static final List<String> REQUIRED_ASPECTS =
List.of(SCHEMA_METADATA_ASPECT_NAME, STATUS_ASPECT_NAME);

Expand Down
Loading

0 comments on commit 971d989

Please sign in to comment.