Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored May 16, 2024
2 parents b6f14b1 + 3d5735c commit 2537629
Show file tree
Hide file tree
Showing 127 changed files with 1,524 additions and 572 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,39 @@ public class EntityTypeMapper {

static final Map<EntityType, String> ENTITY_TYPE_TO_NAME =
ImmutableMap.<EntityType, String>builder()
.put(EntityType.DATASET, "dataset")
.put(EntityType.ROLE, "role")
.put(EntityType.CORP_USER, "corpuser")
.put(EntityType.CORP_GROUP, "corpGroup")
.put(EntityType.DATA_PLATFORM, "dataPlatform")
.put(EntityType.DASHBOARD, "dashboard")
.put(EntityType.CHART, "chart")
.put(EntityType.TAG, "tag")
.put(EntityType.DATA_FLOW, "dataFlow")
.put(EntityType.DATA_JOB, "dataJob")
.put(EntityType.DATASET, Constants.DATASET_ENTITY_NAME)
.put(EntityType.ROLE, Constants.ROLE_ENTITY_NAME)
.put(EntityType.CORP_USER, Constants.CORP_USER_ENTITY_NAME)
.put(EntityType.CORP_GROUP, Constants.CORP_GROUP_ENTITY_NAME)
.put(EntityType.DATA_PLATFORM, Constants.DATA_PLATFORM_ENTITY_NAME)
.put(EntityType.DASHBOARD, Constants.DASHBOARD_ENTITY_NAME)
.put(EntityType.CHART, Constants.CHART_ENTITY_NAME)
.put(EntityType.TAG, Constants.TAG_ENTITY_NAME)
.put(EntityType.DATA_FLOW, Constants.DATA_FLOW_ENTITY_NAME)
.put(EntityType.DATA_JOB, Constants.DATA_JOB_ENTITY_NAME)
.put(EntityType.DATA_PROCESS_INSTANCE, Constants.DATA_PROCESS_INSTANCE_ENTITY_NAME)
.put(EntityType.GLOSSARY_TERM, "glossaryTerm")
.put(EntityType.GLOSSARY_NODE, "glossaryNode")
.put(EntityType.MLMODEL, "mlModel")
.put(EntityType.MLMODEL_GROUP, "mlModelGroup")
.put(EntityType.MLFEATURE_TABLE, "mlFeatureTable")
.put(EntityType.MLFEATURE, "mlFeature")
.put(EntityType.MLPRIMARY_KEY, "mlPrimaryKey")
.put(EntityType.CONTAINER, "container")
.put(EntityType.DOMAIN, "domain")
.put(EntityType.NOTEBOOK, "notebook")
.put(EntityType.DATA_PLATFORM_INSTANCE, "dataPlatformInstance")
.put(EntityType.TEST, "test")
.put(EntityType.GLOSSARY_TERM, Constants.GLOSSARY_TERM_ENTITY_NAME)
.put(EntityType.GLOSSARY_NODE, Constants.GLOSSARY_NODE_ENTITY_NAME)
.put(EntityType.MLMODEL, Constants.ML_MODEL_ENTITY_NAME)
.put(EntityType.MLMODEL_GROUP, Constants.ML_MODEL_GROUP_ENTITY_NAME)
.put(EntityType.MLFEATURE_TABLE, Constants.ML_FEATURE_TABLE_ENTITY_NAME)
.put(EntityType.MLFEATURE, Constants.ML_FEATURE_ENTITY_NAME)
.put(EntityType.MLPRIMARY_KEY, Constants.ML_PRIMARY_KEY_ENTITY_NAME)
.put(EntityType.CONTAINER, Constants.CONTAINER_ENTITY_NAME)
.put(EntityType.DOMAIN, Constants.DOMAIN_ENTITY_NAME)
.put(EntityType.NOTEBOOK, Constants.NOTEBOOK_ENTITY_NAME)
.put(EntityType.DATA_PLATFORM_INSTANCE, Constants.DATA_PLATFORM_INSTANCE_ENTITY_NAME)
.put(EntityType.TEST, Constants.TEST_ENTITY_NAME)
.put(EntityType.ER_MODEL_RELATIONSHIP, Constants.ER_MODEL_RELATIONSHIP_ENTITY_NAME)
.put(EntityType.DATAHUB_VIEW, Constants.DATAHUB_VIEW_ENTITY_NAME)
.put(EntityType.DATA_PRODUCT, Constants.DATA_PRODUCT_ENTITY_NAME)
.put(EntityType.SCHEMA_FIELD, "schemaField")
.put(EntityType.SCHEMA_FIELD, Constants.SCHEMA_FIELD_ENTITY_NAME)
.put(EntityType.STRUCTURED_PROPERTY, Constants.STRUCTURED_PROPERTY_ENTITY_NAME)
.put(EntityType.ASSERTION, Constants.ASSERTION_ENTITY_NAME)
.put(EntityType.RESTRICTED, Constants.RESTRICTED_ENTITY_NAME)
.put(EntityType.BUSINESS_ATTRIBUTE, Constants.BUSINESS_ATTRIBUTE_ENTITY_NAME)
.put(EntityType.QUERY, Constants.QUERY_ENTITY_NAME)
.put(EntityType.POST, Constants.POST_ENTITY_NAME)
.build();

private static final Map<String, EntityType> ENTITY_NAME_TO_TYPE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public KafkaJob(UpgradeContext context, RestoreIndicesArgs args) {
@Override
public RestoreIndicesResult call() {
return _entityService
.streamRestoreIndices(context.opContext(), args, context.report()::addLine)
.restoreIndices(context.opContext(), args, context.report()::addLine)
.stream()
.findFirst()
.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.EntityUtils;
import com.linkedin.metadata.entity.ebean.EbeanAspectV2;
import com.linkedin.metadata.entity.ebean.PartitionedStream;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.metadata.utils.AuditStampUtils;
import com.linkedin.util.Pair;
Expand Down Expand Up @@ -76,54 +78,58 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
args = args.urnLike(getUrnLike());
}

aspectDao
.streamAspectBatches(args)
.forEach(
batch -> {
log.info("Processing batch({}) of size {}.", getAspectName(), batchSize);

List<Pair<Future<?>, Boolean>> futures =
EntityUtils.toSystemAspectFromEbeanAspects(
opContext.getRetrieverContext().get(),
batch.collect(Collectors.toList()))
.stream()
.map(
systemAspect ->
entityService.alwaysProduceMCLAsync(
opContext,
systemAspect.getUrn(),
systemAspect.getUrn().getEntityType(),
getAspectName(),
systemAspect.getAspectSpec(),
null,
systemAspect.getRecordTemplate(),
null,
systemAspect
.getSystemMetadata()
.setRunId(id())
.setLastObserved(System.currentTimeMillis()),
AuditStampUtils.createDefaultAuditStamp(),
ChangeType.UPSERT))
.collect(Collectors.toList());

futures.forEach(
f -> {
try {
f.getFirst().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});

if (batchDelayMs > 0) {
log.info("Sleeping for {} ms", batchDelayMs);
try {
Thread.sleep(batchDelayMs);
} catch (InterruptedException e) {
throw new RuntimeException(e);
try (PartitionedStream<EbeanAspectV2> stream = aspectDao.streamAspectBatches(args)) {
stream
.partition(args.batchSize)
.forEach(
batch -> {
log.info("Processing batch({}) of size {}.", getAspectName(), batchSize);

List<Pair<Future<?>, Boolean>> futures;

futures =
EntityUtils.toSystemAspectFromEbeanAspects(
opContext.getRetrieverContext().get(),
batch.collect(Collectors.toList()))
.stream()
.map(
systemAspect ->
entityService.alwaysProduceMCLAsync(
opContext,
systemAspect.getUrn(),
systemAspect.getUrn().getEntityType(),
getAspectName(),
systemAspect.getAspectSpec(),
null,
systemAspect.getRecordTemplate(),
null,
systemAspect
.getSystemMetadata()
.setRunId(id())
.setLastObserved(System.currentTimeMillis()),
AuditStampUtils.createDefaultAuditStamp(),
ChangeType.UPSERT))
.collect(Collectors.toList());

futures.forEach(
f -> {
try {
f.getFirst().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});

if (batchDelayMs > 0) {
log.info("Sleeping for {} ms", batchDelayMs);
try {
Thread.sleep(batchDelayMs);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
});
});
}

BootstrapStep.setUpgradeResult(opContext, getUpgradeIdUrn(), entityService);
context.report().addLine("State updated: " + getUpgradeIdUrn());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.gms.factory.search.BaseElasticSearchComponentsFactory;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityAspect;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.ReindexConfig;
import com.linkedin.metadata.shared.ElasticSearchIndexed;
Expand All @@ -28,6 +29,7 @@
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.OpenSearchStatusException;
Expand Down Expand Up @@ -156,28 +158,34 @@ private boolean blockWrites(String indexName) throws InterruptedException, IOExc

private static Set<StructuredPropertyDefinition> getActiveStructuredPropertiesDefinitions(
AspectDao aspectDao) {
Set<String> removedStructuredPropertyUrns =
aspectDao
.streamAspects(STRUCTURED_PROPERTY_ENTITY_NAME, STATUS_ASPECT_NAME)
.map(
entityAspect ->
Pair.of(
entityAspect.getUrn(),
RecordUtils.toRecordTemplate(Status.class, entityAspect.getMetadata())))
.filter(status -> status.getSecond().isRemoved())
.map(Pair::getFirst)
.collect(Collectors.toSet());

return aspectDao
.streamAspects(STRUCTURED_PROPERTY_ENTITY_NAME, STRUCTURED_PROPERTY_DEFINITION_ASPECT_NAME)
.map(
entityAspect ->
Pair.of(
entityAspect.getUrn(),
RecordUtils.toRecordTemplate(
StructuredPropertyDefinition.class, entityAspect.getMetadata())))
.filter(definition -> !removedStructuredPropertyUrns.contains(definition.getKey()))
.map(Pair::getSecond)
.collect(Collectors.toSet());
Set<String> removedStructuredPropertyUrns;
try (Stream<EntityAspect> stream =
aspectDao.streamAspects(STRUCTURED_PROPERTY_ENTITY_NAME, STATUS_ASPECT_NAME)) {
removedStructuredPropertyUrns =
stream
.map(
entityAspect ->
Pair.of(
entityAspect.getUrn(),
RecordUtils.toRecordTemplate(Status.class, entityAspect.getMetadata())))
.filter(status -> status.getSecond().isRemoved())
.map(Pair::getFirst)
.collect(Collectors.toSet());
}

try (Stream<EntityAspect> stream =
aspectDao.streamAspects(
STRUCTURED_PROPERTY_ENTITY_NAME, STRUCTURED_PROPERTY_DEFINITION_ASPECT_NAME)) {
return stream
.map(
entityAspect ->
Pair.of(
entityAspect.getUrn(),
RecordUtils.toRecordTemplate(
StructuredPropertyDefinition.class, entityAspect.getMetadata())))
.filter(definition -> !removedStructuredPropertyUrns.contains(definition.getKey()))
.map(Pair::getSecond)
.collect(Collectors.toSet());
}
}
}
11 changes: 10 additions & 1 deletion docs/lineage/airflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ If you're looking to schedule DataHub ingestion using Airflow, see the guide on

The DataHub Airflow plugin supports:

- Automatic column-level lineage extraction from various operators e.g. SQL operators (including `MySqlOperator`, `PostgresOperator`, `SnowflakeOperator`, and more), `S3FileTransformOperator`, and more.
- Automatic column-level lineage extraction from various operators e.g. SQL operators (including `MySqlOperator`, `PostgresOperator`, `SnowflakeOperator`, `BigQueryInsertJobOperator`, and more), `S3FileTransformOperator`, and more.
- Airflow DAG and tasks, including properties, ownership, and tags.
- Task run information, including task successes and failures.
- Manual lineage annotations using `inlets` and `outlets` on Airflow operators.
Expand Down Expand Up @@ -166,6 +166,7 @@ Supported operators:
- `SQLExecuteQueryOperator`, including any subclasses. Note that in newer versions of Airflow (generally Airflow 2.5+), most SQL operators inherit from this class.
- `AthenaOperator` and `AWSAthenaOperator`
- `BigQueryOperator` and `BigQueryExecuteQueryOperator`
- `BigQueryInsertJobOperator` (incubating)
- `MySqlOperator`
- `PostgresOperator`
- `RedshiftSQLOperator`
Expand Down Expand Up @@ -224,6 +225,14 @@ class DbtOperator(BaseOperator):

If you override the `pre_execute` and `post_execute` function, ensure they include the `@prepare_lineage` and `@apply_lineage` decorators respectively. Reference the [Airflow docs](https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/lineage.html#lineage) for more details.

### Custom Extractors

Note: these are only supported in the v2 plugin.

You can also create a custom extractor to extract lineage from any operator. This is useful if you're using a built-in Airflow operator for which we don't support automatic lineage extraction.

See this [example PR](https://github.com/datahub-project/datahub/pull/10452) which adds a custom extractor for the `BigQueryInsertJobOperator` operator.

## Emit Lineage Directly

If you can't use the plugin or annotate inlets/outlets, you can also emit lineage using the `DatahubEmitterOperator`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ def __init__(self):
for operator in _sql_operator_overrides:
self.task_to_extractor.extractors[operator] = GenericSqlExtractor

self.task_to_extractor.extractors[
"BigQueryInsertJobOperator"
] = BigQueryInsertJobOperatorExtractor

self._graph: Optional["DataHubGraph"] = None

@contextlib.contextmanager
Expand All @@ -78,7 +82,7 @@ def _patch_extractors(self):
unittest.mock.patch.object(
SnowflakeExtractor,
"default_schema",
property(snowflake_default_schema),
property(_snowflake_default_schema),
)
)

Expand Down Expand Up @@ -166,12 +170,6 @@ def _sql_extractor_extract(self: "SqlExtractor") -> TaskMetadata:
task_name = f"{self.operator.dag_id}.{self.operator.task_id}"
sql = self.operator.sql

run_facets = {}
job_facets = {"sql": SqlJobFacet(query=self._normalize_sql(sql))}

# Prepare to run the SQL parser.
graph = self.context.get(_DATAHUB_GRAPH_CONTEXT_KEY, None)

default_database = getattr(self.operator, "database", None)
if not default_database:
default_database = self.database
Expand All @@ -185,6 +183,31 @@ def _sql_extractor_extract(self: "SqlExtractor") -> TaskMetadata:
# Run the SQL parser.
scheme = self.scheme
platform = OL_SCHEME_TWEAKS.get(scheme, scheme)

return _parse_sql_into_task_metadata(
self,
sql,
platform=platform,
default_database=default_database,
default_schema=default_schema,
)


def _parse_sql_into_task_metadata(
self: "BaseExtractor",
sql: str,
platform: str,
default_database: Optional[str],
default_schema: Optional[str],
) -> TaskMetadata:
task_name = f"{self.operator.dag_id}.{self.operator.task_id}"

run_facets = {}
job_facets = {"sql": SqlJobFacet(query=self._normalize_sql(sql))}

# Prepare to run the SQL parser.
graph = self.context.get(_DATAHUB_GRAPH_CONTEXT_KEY, None)

self.log.debug(
"Running the SQL parser %s (platform=%s, default db=%s, schema=%s): %s",
"with graph client" if graph else "in offline mode",
Expand Down Expand Up @@ -232,7 +255,28 @@ def _sql_extractor_extract(self: "SqlExtractor") -> TaskMetadata:
)


def snowflake_default_schema(self: "SnowflakeExtractor") -> Optional[str]:
class BigQueryInsertJobOperatorExtractor(BaseExtractor):
def extract(self) -> Optional[TaskMetadata]:
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryInsertJobOperator, # type: ignore
)

operator: "BigQueryInsertJobOperator" = self.operator
sql = operator.configuration.get("query")
if not sql:
self.log.warning("No query found in BigQueryInsertJobOperator")
return None

return _parse_sql_into_task_metadata(
self,
sql,
platform="bigquery",
default_database=operator.project_id,
default_schema=None,
)


def _snowflake_default_schema(self: "SnowflakeExtractor") -> Optional[str]:
if hasattr(self.operator, "schema") and self.operator.schema is not None:
return self.operator.schema
return (
Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ filterwarnings =
ignore:pkg_resources is deprecated as an API:DeprecationWarning
ignore:Did not recognize type:sqlalchemy.exc.SAWarning
ignore::datahub.configuration.pydantic_migration_helpers.PydanticDeprecatedSince20
ignore::datahub.configuration.common.ConfigurationWarning

[coverage:run]
# Because of some quirks in the way setup.cfg, coverage.py, pytest-cov,
Expand Down
Loading

0 comments on commit 2537629

Please sign in to comment.