Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Apr 3, 2024
2 parents 0069678 + 3c7c3ec commit 18cf30d
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 3 deletions.
7 changes: 7 additions & 0 deletions metadata-ingestion/docs/sources/datahub/datahub_pre.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ source:
stateful_ingestion:
enabled: true
ignore_old_state: true
urn_pattern: # URN pattern to ignore/include in the ingestion
deny:
# Ignores all datahub metadata where the urn matches the regex
- ^denied.urn.*
allow:
# Ingests all datahub metadata where the urn matches the regex.
- ^allowed.urn.*
```
#### Limitations
Expand Down
7 changes: 7 additions & 0 deletions metadata-ingestion/docs/sources/datahub/datahub_recipe.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ source:
ignore_old_state: false
extractor_config:
set_system_metadata: false # Replicate system metadata
urn_pattern:
deny:
# Ignores all datahub metadata where the urn matches the regex
- ^denied.urn.*
allow:
# Ingests all datahub metadata where the urn matches the regex.
- ^allowed.urn.*

# Here, we write to a DataHub instance
# You can also use a different sink, e.g. to write the data to a file instead
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@

from datahub.cli.cli_utils import get_url_and_token
from datahub.configuration import config_loader
from datahub.configuration.common import ConfigModel, DynamicTypedConfig
from datahub.configuration.common import (
AllowDenyPattern,
ConfigModel,
DynamicTypedConfig,
)
from datahub.ingestion.graph.client import DatahubClientConfig
from datahub.ingestion.sink.file import FileSinkConfig

Expand All @@ -21,6 +25,7 @@
class SourceConfig(DynamicTypedConfig):
extractor: str = "generic"
extractor_config: dict = Field(default_factory=dict)
urn_pattern: AllowDenyPattern = Field(default=AllowDenyPattern())


class ReporterConfig(DynamicTypedConfig):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1168,6 +1168,7 @@ def get_schema_metadata() -> Optional[SchemaMetadata]:
schema_fields = get_schema_fields_for_hive_column(
hive_column_name=partition_key["Name"],
hive_column_type=partition_key.get("Type", "unknown"),
description=partition_key.get("Comment"),
default_nullable=False,
)
assert schema_fields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ class DataHubSource(StatefulIngestionSourceBase):
def __init__(self, config: DataHubSourceConfig, ctx: PipelineContext):
super().__init__(config, ctx)
self.config = config

if (
ctx.pipeline_config
and ctx.pipeline_config.source
and ctx.pipeline_config.source.urn_pattern
):
self.urn_pattern = ctx.pipeline_config.source.urn_pattern
self.report: DataHubSourceReport = DataHubSourceReport()
self.stateful_ingestion_handler = StatefulDataHubIngestionHandler(self)

Expand Down Expand Up @@ -92,6 +99,10 @@ def _get_database_workunits(
)
mcps = reader.get_aspects(from_createdon, self.report.stop_time)
for i, (mcp, createdon) in enumerate(mcps):

if not self.urn_pattern.allowed(str(mcp.entityUrn)):
continue

yield mcp.as_workunit()
self.report.num_database_aspects_ingested += 1

Expand Down Expand Up @@ -126,6 +137,9 @@ def _get_kafka_workunits(
)
continue

if not self.urn_pattern.allowed(str(mcp.entityUrn)):
continue

if isinstance(mcp, MetadataChangeProposalWrapper):
yield mcp.as_workunit()
else:
Expand Down Expand Up @@ -153,6 +167,8 @@ def _get_api_workunits(self) -> Iterable[MetadataWorkUnit]:

reader = DataHubApiReader(self.config, self.report, self.ctx.graph)
for mcp in reader.get_aspects():
if not self.urn_pattern.allowed(str(mcp.entityUrn)):
continue
yield mcp.as_workunit()

def _commit_progress(self, i: Optional[int] = None) -> None:
Expand Down
2 changes: 2 additions & 0 deletions metadata-ingestion/tests/unit/glue/glue_mces_golden.json
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
{
"fieldPath": "[version=2.0].[type=int].yr",
"nullable": true,
"description": "test comment",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
Expand Down Expand Up @@ -204,6 +205,7 @@
{
"fieldPath": "[version=2.0].[type=string].year",
"nullable": true,
"description": "partition test comment",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
{
"fieldPath": "[version=2.0].[type=int].yr",
"nullable": true,
"description": "test comment",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
Expand Down Expand Up @@ -206,6 +207,7 @@
{
"fieldPath": "[version=2.0].[type=string].year",
"nullable": true,
"description": "partition test comment",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
Expand Down
6 changes: 4 additions & 2 deletions metadata-ingestion/tests/unit/test_glue_source_stubs.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
"Retention": 0,
"StorageDescriptor": {
"Columns": [
{"Name": "yr", "Type": "int"},
{"Name": "yr", "Type": "int", "Comment": "test comment"},
{"Name": "flightdate", "Type": "string"},
{"Name": "uniquecarrier", "Type": "string"},
{"Name": "airlineid", "Type": "int"},
Expand Down Expand Up @@ -129,7 +129,9 @@
},
"StoredAsSubDirectories": False,
},
"PartitionKeys": [{"Name": "year", "Type": "string"}],
"PartitionKeys": [
{"Name": "year", "Type": "string", "Comment": "partition test comment"}
],
"TableType": "EXTERNAL_TABLE",
"Parameters": {
"CrawlerSchemaDeserializerVersion": "1.0",
Expand Down

0 comments on commit 18cf30d

Please sign in to comment.