Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es authored Dec 5, 2024
2 parents 4f1f3f2 + 3c388a5 commit b31d8b1
Show file tree
Hide file tree
Showing 26 changed files with 794 additions and 167 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/pr-labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,24 @@ jobs:
"swaroopjagadish",
"treff7es",
"yoonhyejin",
"eboneil",
"gabe-lyons",
"hsheth2",
"jjoyce0510",
"maggiehays",
"pedro93",
"RyanHolstien",
"sakethvarma397",
"Kunal-kankriya",
"purnimagarg1",
"dushayntAW",
"sagar-salvi-apptware",
"kushagra-apptware",
"Salman-Apptware",
"mayurinehate",
"noggi",
"skrydal",
"kevinkarchacryl"
"kevinkarchacryl",
"sgomezvillamor",
"acrylJonny",
"chakru-r"
]'),
github.actor
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const StyledCollapse = styled(Collapse)<{ color: string }>`
.ant-collapse-header {
display: flex;
align-items: center;
overflow: auto;
}
.ant-collapse-item {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public static String toElasticsearchFieldName(
/**
* Return an elasticsearch type from structured property type
*
* @param fieldName filter or facet field name
* @param fieldName filter or facet field name - must match actual FQN of structured prop
* @param aspectRetriever aspect retriever
* @return elasticsearch type
*/
Expand Down
9 changes: 9 additions & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,15 @@
# datahub does not depend on traitlets directly but great expectations does.
# https://github.com/ipython/traitlets/issues/741
"traitlets!=5.2.2",
# GE depends on IPython - we have no direct dependency on it.
# IPython 8.22.0 added a dependency on traitlets 5.13.x, but only declared a
# version requirement of traitlets>5.
# See https://github.com/ipython/ipython/issues/14352.
# This issue was fixed by https://github.com/ipython/ipython/pull/14353,
# which first appeared in IPython 8.22.1.
# As such, we just need to avoid that version in order to get the
# dependencies that we need. IPython probably should've yanked 8.22.0.
"IPython!=8.22.0",
"greenlet",
*cachetools_lib,
}
Expand Down
41 changes: 37 additions & 4 deletions metadata-ingestion/src/datahub/cli/delete_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,47 @@ def references(urn: str, dry_run: bool, force: bool) -> None:


@delete.command()
@click.option("--urn", required=True, type=str, help="the urn of the entity")
def undo_by_filter(urn: str) -> None:
@click.option("--urn", required=False, type=str, help="the urn of the entity")
@click.option(
"-p",
"--platform",
required=False,
type=str,
help="Platform filter (e.g. snowflake)",
)
@click.option(
"-b",
"--batch-size",
required=False,
default=3000,
type=int,
help="Batch size when querying for entities to un-soft delete."
"Maximum 10000. Large batch sizes may cause timeouts.",
)
def undo_by_filter(
urn: Optional[str], platform: Optional[str], batch_size: int
) -> None:
"""
Undo a soft deletion of an entity
Undo soft deletion by filters
"""
graph = get_default_graph()
logger.info(f"Using {graph}")
graph.set_soft_delete_status(urn=urn, delete=False)
if urn:
graph.set_soft_delete_status(urn=urn, delete=False)
else:
urns = list(
graph.get_urns_by_filter(
platform=platform,
query="*",
status=RemovedStatusFilter.ONLY_SOFT_DELETED,
batch_size=batch_size,
)
)
logger.info(f"Going to un-soft delete {len(urns)} urns")
urns_iter = progressbar.progressbar(urns, redirect_stdout=True)
for urn in urns_iter:
assert urn
graph.set_soft_delete_status(urn=urn, delete=False)


@delete.command(no_args_is_help=True)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import inspect
import logging
from typing import Any, Dict, Optional

Expand Down Expand Up @@ -34,5 +35,34 @@ def _resolve_oauth_callback(self) -> None:
"oauth_cb must be a string representing python function reference "
"in the format <python-module>:<function-name>."
)

call_back_fn = import_path(call_back)
self._validate_call_back_fn_signature(call_back_fn)

# Set the callback
self._config[CallableConsumerConfig.CALLBACK_ATTRIBUTE] = import_path(call_back)
self._config[CallableConsumerConfig.CALLBACK_ATTRIBUTE] = call_back_fn

def _validate_call_back_fn_signature(self, call_back_fn: Any) -> None:
sig = inspect.signature(call_back_fn)

num_positional_args = len(
[
param
for param in sig.parameters.values()
if param.kind
in (
inspect.Parameter.POSITIONAL_ONLY,
inspect.Parameter.POSITIONAL_OR_KEYWORD,
)
and param.default == inspect.Parameter.empty
]
)

has_variadic_args = any(
param.kind == inspect.Parameter.VAR_POSITIONAL
for param in sig.parameters.values()
)

assert num_positional_args == 1 or (
has_variadic_args and num_positional_args <= 1
), "oauth_cb function must accept single positional argument."
Original file line number Diff line number Diff line change
Expand Up @@ -208,22 +208,28 @@ def fetch_dpis(self, job_urn: str, batch_size: int) -> List[dict]:
dpis = []
start = 0
while True:
job_query_result = self.ctx.graph.execute_graphql(
DATA_PROCESS_INSTANCES_QUERY,
{"dataJobUrn": job_urn, "start": start, "count": batch_size},
)
job_data = job_query_result.get("dataJob")
if not job_data:
raise ValueError(f"Error getting job {job_urn}")

runs_data = job_data.get("runs")
if not runs_data:
raise ValueError(f"Error getting runs for {job_urn}")

runs = runs_data.get("runs")
dpis.extend(runs)
start += batch_size
if len(runs) < batch_size:
try:
job_query_result = self.ctx.graph.execute_graphql(
DATA_PROCESS_INSTANCES_QUERY,
{"dataJobUrn": job_urn, "start": start, "count": batch_size},
)
job_data = job_query_result.get("dataJob")
if not job_data:
logger.error(f"Error getting job {job_urn}")
break

runs_data = job_data.get("runs")
if not runs_data:
logger.error(f"Error getting runs for {job_urn}")
break

runs = runs_data.get("runs")
dpis.extend(runs)
start += batch_size
if len(runs) < batch_size:
break
except Exception as e:
logger.error(f"Exception while fetching DPIs for job {job_urn}: {e}")
break
return dpis

Expand All @@ -243,8 +249,12 @@ def keep_last_n_dpi(
futures[future] = dpi

for future in as_completed(futures):
deleted_count_last_n += 1
futures[future]["deleted"] = True
try:
future.result()
deleted_count_last_n += 1
futures[future]["deleted"] = True
except Exception as e:
logger.error(f"Exception while deleting DPI: {e}")

if deleted_count_last_n % self.config.batch_size == 0:
logger.info(f"Deleted {deleted_count_last_n} DPIs from {job.urn}")
Expand Down Expand Up @@ -279,7 +289,7 @@ def delete_dpi_from_datajobs(self, job: DataJobEntity) -> None:
dpis = self.fetch_dpis(job.urn, self.config.batch_size)
dpis.sort(
key=lambda x: x["created"]["time"]
if x["created"] and x["created"]["time"]
if "created" in x and "time" in x["created"]
else 0,
reverse=True,
)
Expand Down Expand Up @@ -314,15 +324,23 @@ def remove_old_dpis(
if dpi.get("deleted"):
continue

if dpi["created"]["time"] < retention_time * 1000:
if (
"created" not in dpi
or "time" not in dpi["created"]
or dpi["created"]["time"] < retention_time * 1000
):
future = executor.submit(
self.delete_entity, dpi["urn"], "dataprocessInstance"
)
futures[future] = dpi

for future in as_completed(futures):
deleted_count_retention += 1
futures[future]["deleted"] = True
try:
future.result()
deleted_count_retention += 1
futures[future]["deleted"] = True
except Exception as e:
logger.error(f"Exception while deleting DPI: {e}")

if deleted_count_retention % self.config.batch_size == 0:
logger.info(
Expand Down Expand Up @@ -378,8 +396,11 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
dataFlows[flow.urn] = flow

scroll_id: Optional[str] = None
previous_scroll_id: Optional[str] = None

dataJobs: Dict[str, List[DataJobEntity]] = defaultdict(list)
deleted_jobs: int = 0

while True:
result = self.ctx.graph.execute_graphql(
DATAJOB_QUERY,
Expand Down Expand Up @@ -426,9 +447,11 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
else:
dataJobs[datajob_entity.flow_urn].append(datajob_entity)

if not scroll_id:
if not scroll_id or previous_scroll_id == scroll_id:
break

previous_scroll_id = scroll_id

logger.info(f"Deleted {deleted_jobs} DataJobs")
# Delete empty dataflows if needed
if self.config.delete_empty_data_flows:
Expand All @@ -443,4 +466,5 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
if deleted_jobs % self.config.batch_size == 0:
logger.info(f"Deleted {deleted_data_flows} DataFlows")
logger.info(f"Deleted {deleted_data_flows} DataFlows")

return []
12 changes: 11 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/pulsar.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,17 @@ class PulsarSchema:
def __init__(self, schema):
self.schema_version = schema.get("version")

avro_schema = json.loads(schema.get("data"))
schema_data = schema.get("data")
if not schema_data:
logger.warning("Schema data is empty or None. Using default empty schema.")
schema_data = "{}"

try:
avro_schema = json.loads(schema_data)
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON schema: {schema_data}. Error: {str(e)}")
avro_schema = {}

self.schema_name = avro_schema.get("namespace") + "." + avro_schema.get("name")
self.schema_description = avro_schema.get("doc")
self.schema_type = schema.get("type")
Expand Down
16 changes: 16 additions & 0 deletions metadata-ingestion/tests/integration/kafka/oauth.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,19 @@ def create_token(*args: Any, **kwargs: Any) -> Tuple[str, int]:
"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJjbGllbnRfaWQiOiJrYWZrYV9jbGllbnQiLCJleHAiOjE2OTg3NjYwMDB9.dummy_sig_abcdef123456",
3600,
)


def create_token_no_args() -> Tuple[str, int]:
logger.warning(MESSAGE)
return (
"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJjbGllbnRfaWQiOiJrYWZrYV9jbGllbnQiLCJleHAiOjE2OTg3NjYwMDB9.dummy_sig_abcdef123456",
3600,
)


def create_token_only_kwargs(**kwargs: Any) -> Tuple[str, int]:
logger.warning(MESSAGE)
return (
"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJjbGllbnRfaWQiOiJrYWZrYV9jbGllbnQiLCJleHAiOjE2OTg3NjYwMDB9.dummy_sig_abcdef123456",
3600,
)
31 changes: 30 additions & 1 deletion metadata-ingestion/tests/integration/kafka/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
import yaml
from freezegun import freeze_time

from datahub.configuration.common import ConfigurationError
from datahub.ingestion.api.source import SourceCapability
from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.source.kafka.kafka import KafkaSource
from datahub.ingestion.source.kafka.kafka import KafkaSource, KafkaSourceConfig
from tests.integration.kafka import oauth # type: ignore
from tests.test_helpers import mce_helpers, test_connection_helpers
from tests.test_helpers.click_helpers import run_datahub_cmd
Expand Down Expand Up @@ -157,3 +158,31 @@ def test_kafka_oauth_callback(
assert checks["consumer_oauth_callback"], "Consumer oauth callback not found"
assert checks["admin_polling"], "Admin polling was not initiated"
assert checks["admin_oauth_callback"], "Admin oauth callback not found"


def test_kafka_source_oauth_cb_signature():
with pytest.raises(
ConfigurationError,
match=("oauth_cb function must accept single positional argument."),
):
KafkaSourceConfig.parse_obj(
{
"connection": {
"bootstrap": "foobar:9092",
"consumer_config": {"oauth_cb": "oauth:create_token_no_args"},
}
}
)

with pytest.raises(
ConfigurationError,
match=("oauth_cb function must accept single positional argument."),
):
KafkaSourceConfig.parse_obj(
{
"connection": {
"bootstrap": "foobar:9092",
"consumer_config": {"oauth_cb": "oauth:create_token_only_kwargs"},
}
}
)
Loading

0 comments on commit b31d8b1

Please sign in to comment.