From 44cdb046d74be65578165a7cfdff52b2175a969b Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Fri, 19 Jul 2024 11:40:02 -0700 Subject: [PATCH 01/17] fix(docs): make graphql doc gen more automated (#10953) --- .github/workflows/documentation.yml | 2 ++ docs-website/graphql/generateGraphQLSchema.sh | 22 ++++--------------- 2 files changed, 6 insertions(+), 18 deletions(-) diff --git a/.github/workflows/documentation.yml b/.github/workflows/documentation.yml index 99c1ad466b669d..9d63663693f902 100644 --- a/.github/workflows/documentation.yml +++ b/.github/workflows/documentation.yml @@ -27,6 +27,8 @@ concurrency: jobs: gh-pages: runs-on: ubuntu-latest + permissions: + contents: write steps: # We explicitly don't use acryldata/sane-checkout-action because docusaurus runs # git commands to determine the last change date for each file. diff --git a/docs-website/graphql/generateGraphQLSchema.sh b/docs-website/graphql/generateGraphQLSchema.sh index a904a2e36d7c19..9a3f32a4efd080 100755 --- a/docs-website/graphql/generateGraphQLSchema.sh +++ b/docs-website/graphql/generateGraphQLSchema.sh @@ -1,22 +1,8 @@ #!/bin/sh -rm combined.graphql +if [ -f "combined.graphql" ] ; then + rm "combined.graphql" +fi touch combined.graphql echo "Generating combined GraphQL schema..." echo "# Auto Generated During Docs Build" >> combined.graphql -cat ../../datahub-graphql-core/src/main/resources/actions.graphql >> combined.graphql -cat ../../datahub-graphql-core/src/main/resources/analytics.graphql >> combined.graphql -cat ../../datahub-graphql-core/src/main/resources/app.graphql >> combined.graphql -cat ../../datahub-graphql-core/src/main/resources/auth.graphql >> combined.graphql -cat ../../datahub-graphql-core/src/main/resources/constraints.graphql >> combined.graphql -cat ../../datahub-graphql-core/src/main/resources/entity.graphql >> combined.graphql -cat ../../datahub-graphql-core/src/main/resources/assertions.graphql >> combined.graphql -cat ../../datahub-graphql-core/src/main/resources/ingestion.graphql >> combined.graphql -cat ../../datahub-graphql-core/src/main/resources/recommendation.graphql >> combined.graphql -cat ../../datahub-graphql-core/src/main/resources/search.graphql >> combined.graphql -cat ../../datahub-graphql-core/src/main/resources/tests.graphql >> combined.graphql -cat ../../datahub-graphql-core/src/main/resources/timeline.graphql >> combined.graphql -cat ../../datahub-graphql-core/src/main/resources/step.graphql >> combined.graphql -cat ../../datahub-graphql-core/src/main/resources/lineage.graphql >> combined.graphql -cat ../../datahub-graphql-core/src/main/resources/properties.graphql >> combined.graphql -cat ../../datahub-graphql-core/src/main/resources/forms.graphql >> combined.graphql -cat ../../datahub-graphql-core/src/main/resources/connection.graphql >> combined.graphql \ No newline at end of file +cat ../../datahub-graphql-core/src/main/resources/*.graphql >> combined.graphql From 20574cf1c657b0db414171731813f1d8082c9c0a Mon Sep 17 00:00:00 2001 From: Tamas Nemeth Date: Sat, 20 Jul 2024 00:00:40 +0200 Subject: [PATCH 02/17] feat(ingest/athena): Add option for Athena partitioned profiling (#10723) --- metadata-ingestion/setup.py | 9 +- .../ingestion/source/ge_data_profiler.py | 122 +++++++++++++++++- .../ingestion/source/ge_profiling_config.py | 2 +- .../datahub/ingestion/source/sql/athena.py | 120 +++++++++++++++-- .../src/datahub/ingestion/source/sql/hive.py | 1 + .../ingestion/source/sql/hive_metastore.py | 1 + .../ingestion/source/sql/sql_common.py | 42 +++++- .../src/datahub/ingestion/source/sql/trino.py | 1 + .../utilities/sqlalchemy_type_converter.py | 69 +++++++--- 9 files changed, 323 insertions(+), 44 deletions(-) diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index f4e9de839d5f33..d8b1a47690dc8b 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -330,7 +330,14 @@ # sqlalchemy-bigquery is included here since it provides an implementation of # a SQLalchemy-conform STRUCT type definition "athena": sql_common - | {"PyAthena[SQLAlchemy]>=2.6.0,<3.0.0", "sqlalchemy-bigquery>=1.4.1"}, + # We need to set tenacity lower than 8.4.0 as + # this version has missing dependency asyncio + # https://github.com/jd/tenacity/issues/471 + | { + "PyAthena[SQLAlchemy]>=2.6.0,<3.0.0", + "sqlalchemy-bigquery>=1.4.1", + "tenacity!=8.4.0", + }, "azure-ad": set(), "bigquery": sql_common | bigquery_common diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py index 8843a0ad8eae65..0c3dbc0eaadd88 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py @@ -13,6 +13,7 @@ import uuid from functools import lru_cache from typing import ( + TYPE_CHECKING, Any, Callable, Dict, @@ -39,6 +40,7 @@ from great_expectations.dataset.dataset import Dataset from great_expectations.dataset.sqlalchemy_dataset import SqlAlchemyDataset from great_expectations.datasource.sqlalchemy_datasource import SqlAlchemyDatasource +from great_expectations.execution_engine.sqlalchemy_dialect import GXSqlDialect from great_expectations.profile.base import ProfilerDataType from great_expectations.profile.basic_dataset_profiler import BasicDatasetProfilerBase from sqlalchemy.engine import Connection, Engine @@ -72,9 +74,14 @@ get_query_columns, ) +if TYPE_CHECKING: + from pyathena.cursor import Cursor + assert MARKUPSAFE_PATCHED logger: logging.Logger = logging.getLogger(__name__) +_original_get_column_median = SqlAlchemyDataset.get_column_median + P = ParamSpec("P") POSTGRESQL = "postgresql" MYSQL = "mysql" @@ -203,6 +210,47 @@ def _get_column_quantiles_bigquery_patch( # type:ignore return list() +def _get_column_quantiles_awsathena_patch( # type:ignore + self, column: str, quantiles: Iterable +) -> list: + import ast + + table_name = ".".join( + [f'"{table_part}"' for table_part in str(self._table).split(".")] + ) + + quantiles_list = list(quantiles) + quantiles_query = ( + f"SELECT approx_percentile({column}, ARRAY{str(quantiles_list)}) as quantiles " + f"from (SELECT {column} from {table_name})" + ) + try: + quantiles_results = self.engine.execute(quantiles_query).fetchone()[0] + quantiles_results_list = ast.literal_eval(quantiles_results) + return quantiles_results_list + + except ProgrammingError as pe: + self._treat_quantiles_exception(pe) + return [] + + +def _get_column_median_patch(self, column): + # AWS Athena and presto have an special function that can be used to retrieve the median + if ( + self.sql_engine_dialect.name.lower() == GXSqlDialect.AWSATHENA + or self.sql_engine_dialect.name.lower() == GXSqlDialect.TRINO + ): + table_name = ".".join( + [f'"{table_part}"' for table_part in str(self._table).split(".")] + ) + element_values = self.engine.execute( + f"SELECT approx_percentile({column}, 0.5) FROM {table_name}" + ) + return convert_to_json_serializable(element_values.fetchone()[0]) + else: + return _original_get_column_median(self, column) + + def _is_single_row_query_method(query: Any) -> bool: SINGLE_ROW_QUERY_FILES = { # "great_expectations/dataset/dataset.py", @@ -1038,6 +1086,12 @@ def generate_profiles( ), unittest.mock.patch( "great_expectations.dataset.sqlalchemy_dataset.SqlAlchemyDataset._get_column_quantiles_bigquery", _get_column_quantiles_bigquery_patch, + ), unittest.mock.patch( + "great_expectations.dataset.sqlalchemy_dataset.SqlAlchemyDataset._get_column_quantiles_awsathena", + _get_column_quantiles_awsathena_patch, + ), unittest.mock.patch( + "great_expectations.dataset.sqlalchemy_dataset.SqlAlchemyDataset.get_column_median", + _get_column_median_patch, ), concurrent.futures.ThreadPoolExecutor( max_workers=max_workers ) as async_executor, SQLAlchemyQueryCombiner( @@ -1114,15 +1168,16 @@ def _generate_profile_from_request( **request.batch_kwargs, ) - def _drop_trino_temp_table(self, temp_dataset: Dataset) -> None: + def _drop_temp_table(self, temp_dataset: Dataset) -> None: schema = temp_dataset._table.schema table = temp_dataset._table.name + table_name = f'"{schema}"."{table}"' if schema else f'"{table}"' try: with self.base_engine.connect() as connection: - connection.execute(f"drop view if exists {schema}.{table}") - logger.debug(f"View {schema}.{table} was dropped.") + connection.execute(f"drop view if exists {table_name}") + logger.debug(f"View {table_name} was dropped.") except Exception: - logger.warning(f"Unable to delete trino temporary table: {schema}.{table}") + logger.warning(f"Unable to delete temporary table: {table_name}") def _generate_single_profile( self, @@ -1149,6 +1204,19 @@ def _generate_single_profile( } bigquery_temp_table: Optional[str] = None + temp_view: Optional[str] = None + if platform and platform.upper() == "ATHENA" and (custom_sql): + if custom_sql is not None: + # Note that limit and offset are not supported for custom SQL. + temp_view = create_athena_temp_table( + self, custom_sql, pretty_name, self.base_engine.raw_connection() + ) + ge_config["table"] = temp_view + ge_config["schema"] = None + ge_config["limit"] = None + ge_config["offset"] = None + custom_sql = None + if platform == BIGQUERY and ( custom_sql or self.config.limit or self.config.offset ): @@ -1234,8 +1302,16 @@ def _generate_single_profile( ) return None finally: - if batch is not None and self.base_engine.engine.name == TRINO: - self._drop_trino_temp_table(batch) + if batch is not None and self.base_engine.engine.name.upper() in [ + "TRINO", + "AWSATHENA", + ]: + if ( + self.base_engine.engine.name.upper() == "TRINO" + or temp_view is not None + ): + self._drop_temp_table(batch) + # if we are not on Trino then we only drop table if temp table variable was set def _get_ge_dataset( self, @@ -1299,6 +1375,40 @@ def _get_column_types_to_ignore(dialect_name: str) -> List[str]: return [] +def create_athena_temp_table( + instance: Union[DatahubGEProfiler, _SingleDatasetProfiler], + sql: str, + table_pretty_name: str, + raw_connection: Any, +) -> Optional[str]: + try: + cursor: "Cursor" = cast("Cursor", raw_connection.cursor()) + logger.debug(f"Creating view for {table_pretty_name}: {sql}") + temp_view = f"ge_{uuid.uuid4()}" + if "." in table_pretty_name: + schema_part = table_pretty_name.split(".")[-1] + schema_part_quoted = ".".join( + [f'"{part}"' for part in str(schema_part).split(".")] + ) + temp_view = f"{schema_part_quoted}_{temp_view}" + + temp_view = f"ge_{uuid.uuid4()}" + cursor.execute(f'create or replace view "{temp_view}" as {sql}') + except Exception as e: + if not instance.config.catch_exceptions: + raise e + logger.exception(f"Encountered exception while profiling {table_pretty_name}") + instance.report.report_warning( + table_pretty_name, + f"Profiling exception {e} when running custom sql {sql}", + ) + return None + finally: + raw_connection.close() + + return temp_view + + def create_bigquery_temp_table( instance: Union[DatahubGEProfiler, _SingleDatasetProfiler], bq_sql: str, diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py b/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py index 2ca6828e0bdfac..2a9068d3d49d8b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_profiling_config.py @@ -147,7 +147,7 @@ class GEProfilingConfig(ConfigModel): partition_profiling_enabled: bool = Field( default=True, - description="Whether to profile partitioned tables. Only BigQuery supports this. " + description="Whether to profile partitioned tables. Only BigQuery and Aws Athena supports this. " "If enabled, latest partition data is used for profiling.", ) partition_datetime: Optional[datetime.datetime] = Field( diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py index 9ddc671e211335..398adc3708ef2b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py @@ -1,7 +1,9 @@ +import datetime import json import logging import re import typing +from dataclasses import dataclass, field from typing import Any, Dict, Iterable, List, Optional, Tuple, Union, cast import pydantic @@ -27,6 +29,7 @@ from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.aws.s3_util import make_s3_urn from datahub.ingestion.source.common.subtypes import DatasetContainerSubTypes +from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig from datahub.ingestion.source.sql.sql_common import ( SQLAlchemySource, register_custom_type, @@ -52,6 +55,14 @@ register_custom_type(MapType, MapTypeClass) +class AthenaProfilingConfig(GEProfilingConfig): + # Overriding default value for partition_profiling + partition_profiling_enabled: bool = pydantic.Field( + default=False, + description="Enable partition profiling. This will profile the latest partition of the table.", + ) + + class CustomAthenaRestDialect(AthenaRestDialect): """Custom definition of the Athena dialect. @@ -171,13 +182,17 @@ def _get_column_type( # To extract all of them, we simply iterate over all detected fields and # convert them to SQLalchemy types struct_args = [] - for field in struct_type["fields"]: + for struct_field in struct_type["fields"]: struct_args.append( ( - field["name"], - self._get_column_type(field["type"]["type"]) - if field["type"]["type"] not in ["record", "array"] - else self._get_column_type(field["type"]), + struct_field["name"], + ( + self._get_column_type( + struct_field["type"]["native_data_type"] + ) + if struct_field["type"]["type"] not in ["record", "array"] + else self._get_column_type(struct_field["type"]) + ), ) ) @@ -189,7 +204,7 @@ def _get_column_type( detected_col_type = MapType # the type definition for maps looks like the following: key_type:val_type (e.g., string:string) - key_type_raw, value_type_raw = type_meta_information.split(",") + key_type_raw, value_type_raw = type_meta_information.split(",", 1) # convert both type names to actual SQLalchemy types args = [ @@ -257,6 +272,8 @@ class AthenaConfig(SQLCommonConfig): print_warning=True, ) + profiling: AthenaProfilingConfig = AthenaProfilingConfig() + def get_sql_alchemy_url(self): return make_sqlalchemy_uri( self.scheme, @@ -275,6 +292,12 @@ def get_sql_alchemy_url(self): ) +@dataclass +class Partitionitem: + partitions: List[str] = field(default_factory=list) + max_partition: Optional[Dict[str, str]] = None + + @platform_name("Athena") @support_status(SupportStatus.CERTIFIED) @config_class(AthenaConfig) @@ -294,6 +317,8 @@ class AthenaSource(SQLAlchemySource): - Profiling when enabled. """ + table_partition_cache: Dict[str, Dict[str, Partitionitem]] = {} + def __init__(self, config, ctx): super().__init__(config, ctx, "athena") self.cursor: Optional[BaseCursor] = None @@ -429,12 +454,53 @@ def get_schema_names(self, inspector: Inspector) -> List[str]: return [schema for schema in schemas if schema == athena_config.database] return schemas + # Overwrite to get partitions + def get_partitions( + self, inspector: Inspector, schema: str, table: str + ) -> List[str]: + partitions = [] + + if not self.cursor: + return [] + + metadata: AthenaTableMetadata = self.cursor.get_table_metadata( + table_name=table, schema_name=schema + ) + + if metadata.partition_keys: + for key in metadata.partition_keys: + if key.name: + partitions.append(key.name) + + if not partitions: + return [] + + # We create an artiificaial concatenated partition key to be able to query max partition easier + part_concat = "|| '-' ||".join(partitions) + max_partition_query = f'select {",".join(partitions)} from "{schema}"."{table}$partitions" where {part_concat} = (select max({part_concat}) from "{schema}"."{table}$partitions")' + ret = self.cursor.execute(max_partition_query) + max_partition: Dict[str, str] = {} + if ret: + max_partitons = list(ret) + for idx, row in enumerate([row[0] for row in ret.description]): + max_partition[row] = max_partitons[0][idx] + if self.table_partition_cache.get(schema) is None: + self.table_partition_cache[schema] = {} + self.table_partition_cache[schema][table] = Partitionitem( + partitions=partitions, + max_partition=max_partition, + ) + return partitions + + return [] + # Overwrite to modify the creation of schema fields def get_schema_fields_for_column( self, dataset_name: str, column: Dict, pk_constraints: Optional[dict] = None, + partition_keys: Optional[List[str]] = None, tags: Optional[List[str]] = None, ) -> List[SchemaField]: fields = get_schema_fields_for_sqlalchemy_column( @@ -442,17 +508,45 @@ def get_schema_fields_for_column( column_type=column["type"], description=column.get("comment", None), nullable=column.get("nullable", True), - is_part_of_key=True - if ( - pk_constraints is not None - and isinstance(pk_constraints, dict) - and column["name"] in pk_constraints.get("constrained_columns", []) - ) - else False, + is_part_of_key=( + True + if ( + pk_constraints is not None + and isinstance(pk_constraints, dict) + and column["name"] in pk_constraints.get("constrained_columns", []) + ) + else False + ), + is_partitioning_key=( + True + if (partition_keys is not None and column["name"] in partition_keys) + else False + ), ) return fields + def generate_partition_profiler_query( + self, schema: str, table: str, partition_datetime: Optional[datetime.datetime] + ) -> Tuple[Optional[str], Optional[str]]: + if not self.config.profiling.partition_profiling_enabled: + return None, None + + partition: Optional[Partitionitem] = self.table_partition_cache.get( + schema, {} + ).get(table, None) + + if partition and partition.max_partition: + max_partition_filters = [] + for key, value in partition.max_partition.items(): + max_partition_filters.append(f"CAST({key} as VARCHAR) = '{value}'") + max_partition = str(partition.max_partition) + return ( + max_partition, + f'SELECT * FROM "{schema}"."{table}" WHERE {" AND ".join(max_partition_filters)}', + ) + return None, None + def close(self): if self.cursor: self.cursor.close() diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hive.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hive.py index 95ce534968df5d..65f8516fd340a3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hive.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hive.py @@ -170,6 +170,7 @@ def get_schema_fields_for_column( dataset_name: str, column: Dict[Any, Any], pk_constraints: Optional[Dict[Any, Any]] = None, + partition_keys: Optional[List[str]] = None, tags: Optional[List[str]] = None, ) -> List[SchemaField]: fields = super().get_schema_fields_for_column( diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/hive_metastore.py b/metadata-ingestion/src/datahub/ingestion/source/sql/hive_metastore.py index 944b8a080cb579..655d1ba68ed79e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/hive_metastore.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/hive_metastore.py @@ -878,6 +878,7 @@ def get_schema_fields_for_column( dataset_name: str, column: Dict[Any, Any], pk_constraints: Optional[Dict[Any, Any]] = None, + partition_keys: Optional[List[str]] = None, tags: Optional[List[str]] = None, ) -> List[SchemaField]: return get_schema_fields_for_hive_column( diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index 2527ca6bc76c1a..1624203c4096b9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -713,7 +713,16 @@ def loop_tables( # noqa: C901 data_reader, ) except Exception as e: - self.warn(logger, f"{schema}.{table}", f"Ingestion error: {e}") + self.warn( + logger, + f"{schema}.{table}", + f"Ingestion error: {e}", + ) + logger.debug( + f"Error processing table {schema}.{table}: Error was: {e} Traceback:", + exc_info=e, + ) + except Exception as e: self.error(logger, f"{schema}", f"Tables error: {e}") @@ -725,6 +734,11 @@ def get_extra_tags( ) -> Optional[Dict[str, List[str]]]: return None + def get_partitions( + self, inspector: Inspector, schema: str, table: str + ) -> Optional[List[str]]: + return None + def _process_table( self, dataset_name: str, @@ -769,9 +783,14 @@ def _process_table( extra_tags = self.get_extra_tags(inspector, schema, table) pk_constraints: dict = inspector.get_pk_constraint(table, schema) + partitions: Optional[List[str]] = self.get_partitions(inspector, schema, table) foreign_keys = self._get_foreign_keys(dataset_urn, inspector, schema, table) schema_fields = self.get_schema_fields( - dataset_name, columns, pk_constraints, tags=extra_tags + dataset_name, + columns, + pk_constraints, + tags=extra_tags, + partition_keys=partitions, ) schema_metadata = get_schema_metadata( self.report, @@ -921,6 +940,7 @@ def _get_columns( if len(columns) == 0: self.warn(logger, "missing column information", dataset_name) except Exception as e: + logger.error(traceback.format_exc()) self.warn( logger, dataset_name, @@ -949,6 +969,7 @@ def get_schema_fields( dataset_name: str, columns: List[dict], pk_constraints: Optional[dict] = None, + partition_keys: Optional[List[str]] = None, tags: Optional[Dict[str, List[str]]] = None, ) -> List[SchemaField]: canonical_schema = [] @@ -957,7 +978,11 @@ def get_schema_fields( if tags: column_tags = tags.get(column["name"], []) fields = self.get_schema_fields_for_column( - dataset_name, column, pk_constraints, tags=column_tags + dataset_name, + column, + pk_constraints, + tags=column_tags, + partition_keys=partition_keys, ) canonical_schema.extend(fields) return canonical_schema @@ -967,6 +992,7 @@ def get_schema_fields_for_column( dataset_name: str, column: dict, pk_constraints: Optional[dict] = None, + partition_keys: Optional[List[str]] = None, tags: Optional[List[str]] = None, ) -> List[SchemaField]: gtc: Optional[GlobalTagsClass] = None @@ -989,6 +1015,10 @@ def get_schema_fields_for_column( and column["name"] in pk_constraints.get("constrained_columns", []) ): field.isPartOfKey = True + + if partition_keys is not None and column["name"] in partition_keys: + field.isPartitioningKey = True + return [field] def loop_views( @@ -1017,7 +1047,11 @@ def loop_views( sql_config=sql_config, ) except Exception as e: - self.warn(logger, f"{schema}.{view}", f"Ingestion error: {e}") + self.warn( + logger, + f"{schema}.{view}", + f"Ingestion error: {e} {traceback.format_exc()}", + ) except Exception as e: self.error(logger, f"{schema}", f"Views error: {e}") diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/trino.py b/metadata-ingestion/src/datahub/ingestion/source/sql/trino.py index c79af147808748..cc0a43bc5e8749 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/trino.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/trino.py @@ -388,6 +388,7 @@ def get_schema_fields_for_column( dataset_name: str, column: dict, pk_constraints: Optional[dict] = None, + partition_keys: Optional[List[str]] = None, tags: Optional[List[str]] = None, ) -> List[SchemaField]: fields = super().get_schema_fields_for_column( diff --git a/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py b/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py index 5d2fc6872c7bd9..41d02646fdb8a0 100644 --- a/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py +++ b/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py @@ -1,5 +1,6 @@ import json import logging +import traceback import uuid from typing import Any, Dict, List, Optional, Type, Union @@ -46,7 +47,6 @@ def get_avro_type( cls, column_type: Union[types.TypeEngine, STRUCT, MapType], nullable: bool ) -> Dict[str, Any]: """Determines the concrete AVRO schema type for a SQLalchemy-typed column""" - if isinstance( column_type, tuple(cls.PRIMITIVE_SQL_ALCHEMY_TYPE_TO_AVRO_TYPE.keys()) ): @@ -80,21 +80,38 @@ def get_avro_type( } if isinstance(column_type, types.ARRAY): array_type = column_type.item_type + return { "type": "array", "items": cls.get_avro_type(column_type=array_type, nullable=nullable), "native_data_type": f"array<{str(column_type.item_type)}>", } if isinstance(column_type, MapType): - key_type = column_type.types[0] - value_type = column_type.types[1] - return { - "type": "map", - "values": cls.get_avro_type(column_type=value_type, nullable=nullable), - "native_data_type": str(column_type), - "key_type": cls.get_avro_type(column_type=key_type, nullable=nullable), - "key_native_data_type": str(key_type), - } + try: + key_type = column_type.types[0] + value_type = column_type.types[1] + return { + "type": "map", + "values": cls.get_avro_type( + column_type=value_type, nullable=nullable + ), + "native_data_type": str(column_type), + "key_type": cls.get_avro_type( + column_type=key_type, nullable=nullable + ), + "key_native_data_type": str(key_type), + } + except Exception as e: + logger.warning( + f"Unable to parse MapType {column_type} the error was: {e}" + ) + return { + "type": "map", + "values": {"type": "null", "_nullable": True}, + "native_data_type": str(column_type), + "key_type": {"type": "null", "_nullable": True}, + "key_native_data_type": "null", + } if STRUCT and isinstance(column_type, STRUCT): fields = [] for field_def in column_type._STRUCT_fields: @@ -108,14 +125,23 @@ def get_avro_type( } ) struct_name = f"__struct_{str(uuid.uuid4()).replace('-', '')}" - - return { - "type": "record", - "name": struct_name, - "fields": fields, - "native_data_type": str(column_type), - "_nullable": nullable, - } + try: + return { + "type": "record", + "name": struct_name, + "fields": fields, + "native_data_type": str(column_type), + "_nullable": nullable, + } + except Exception: + # This is a workaround for the case when the struct name is not string convertable because SqlAlchemt throws an error + return { + "type": "record", + "name": struct_name, + "fields": fields, + "native_data_type": "map", + "_nullable": nullable, + } return { "type": "null", @@ -153,6 +179,7 @@ def get_schema_fields_for_sqlalchemy_column( description: Optional[str] = None, nullable: Optional[bool] = True, is_part_of_key: Optional[bool] = False, + is_partitioning_key: Optional[bool] = False, ) -> List[SchemaField]: """Creates SchemaFields from a given SQLalchemy column. @@ -181,7 +208,7 @@ def get_schema_fields_for_sqlalchemy_column( ) except Exception as e: logger.warning( - f"Unable to parse column {column_name} and type {column_type} the error was: {e}" + f"Unable to parse column {column_name} and type {column_type} the error was: {e} Traceback: {traceback.format_exc()}" ) # fallback description in case any exception occurred @@ -208,4 +235,8 @@ def get_schema_fields_for_sqlalchemy_column( is_part_of_key if is_part_of_key is not None else False ) + schema_fields[0].isPartitioningKey = ( + is_partitioning_key if is_partitioning_key is not None else False + ) + return schema_fields From aa97cba3e80430f4523c3eb30f554669d91a48f6 Mon Sep 17 00:00:00 2001 From: deepgarg-visa <149145061+deepgarg-visa@users.noreply.github.com> Date: Mon, 22 Jul 2024 13:37:37 +0530 Subject: [PATCH 03/17] fix(spark-lineage): default timeout for future responses (#10947) --- .../main/java/datahub/client/MetadataResponseFuture.java | 2 +- .../src/main/java/datahub/client/rest/RestEmitter.java | 4 ++++ .../src/main/java/datahub/spark/DatahubEventEmitter.java | 7 +++++-- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/metadata-integration/java/datahub-client/src/main/java/datahub/client/MetadataResponseFuture.java b/metadata-integration/java/datahub-client/src/main/java/datahub/client/MetadataResponseFuture.java index 11be10186f1ef3..1734b594064ccb 100644 --- a/metadata-integration/java/datahub-client/src/main/java/datahub/client/MetadataResponseFuture.java +++ b/metadata-integration/java/datahub-client/src/main/java/datahub/client/MetadataResponseFuture.java @@ -69,7 +69,7 @@ public MetadataWriteResponse get(long timeout, TimeUnit unit) return mapper.map(response); } else { // We wait for the callback to fill this out - responseLatch.await(); + responseLatch.await(timeout, unit); return responseReference.get(); } } diff --git a/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitter.java b/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitter.java index dd6a7ba98c87df..e1017372be124b 100644 --- a/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitter.java +++ b/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitter.java @@ -250,6 +250,7 @@ public void completed(SimpleHttpResponse response) { @Override public void failed(Exception ex) { + responseLatch.countDown(); if (callback != null) { try { callback.onFailure(ex); @@ -261,6 +262,7 @@ public void failed(Exception ex) { @Override public void cancelled() { + responseLatch.countDown(); if (callback != null) { try { callback.onFailure(new RuntimeException("Cancelled")); @@ -344,6 +346,7 @@ public void completed(SimpleHttpResponse response) { @Override public void failed(Exception ex) { + responseLatch.countDown(); if (callback != null) { try { callback.onFailure(ex); @@ -355,6 +358,7 @@ public void failed(Exception ex) { @Override public void cancelled() { + responseLatch.countDown(); if (callback != null) { try { callback.onFailure(new RuntimeException("Cancelled")); diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/DatahubEventEmitter.java b/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/DatahubEventEmitter.java index 5a3f4bd27b4157..dc274ad7df3b4e 100644 --- a/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/DatahubEventEmitter.java +++ b/metadata-integration/java/spark-lineage-beta/src/main/java/datahub/spark/DatahubEventEmitter.java @@ -39,6 +39,8 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -53,6 +55,7 @@ public class DatahubEventEmitter extends EventEmitter { private final List _datahubJobs = new LinkedList<>(); private final Map schemaMap = new HashMap<>(); private SparkLineageConf datahubConf; + private static final int DEFAULT_TIMEOUT_SEC = 10; private final EventFormatter eventFormatter = new EventFormatter(); @@ -386,8 +389,8 @@ protected void emitMcps(List mcps) { .forEach( future -> { try { - log.info(future.get().toString()); - } catch (InterruptedException | ExecutionException e) { + log.info(future.get(DEFAULT_TIMEOUT_SEC, TimeUnit.SECONDS).toString()); + } catch (InterruptedException | ExecutionException | TimeoutException e) { // log error, but don't impact thread log.error("Failed to emit metadata to DataHub", e); } From 9f570a7521a0f18a6fa1a27faa3788f04c80dbe9 Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Mon, 22 Jul 2024 16:03:48 +0530 Subject: [PATCH 04/17] feat(datajob/flow): add environment filter using info aspects (#10814) --- docs/how/updating-datahub.md | 1 + .../datahub/api/entities/datajob/dataflow.py | 17 ++++++++++++++ .../datahub/api/entities/datajob/datajob.py | 12 ++++++++++ ...nowflake_empty_connection_user_golden.json | 6 +++-- .../fivetran/fivetran_snowflake_golden.json | 6 +++-- .../com/linkedin/datajob/DataFlowInfo.pdl | 12 ++++++++++ .../com/linkedin/datajob/DataJobInfo.pdl | 12 ++++++++++ .../com.linkedin.entity.aspects.snapshot.json | 22 +++++++++++++++++++ ...com.linkedin.entity.entities.snapshot.json | 22 +++++++++++++++++++ .../com.linkedin.entity.runs.snapshot.json | 22 +++++++++++++++++++ ...nkedin.operations.operations.snapshot.json | 22 +++++++++++++++++++ ...m.linkedin.platform.platform.snapshot.json | 22 +++++++++++++++++++ 12 files changed, 172 insertions(+), 4 deletions(-) diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index ef9990ca3804eb..ffceb7a5d1b020 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -22,6 +22,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe - Protobuf CLI will no longer create binary encoded protoc custom properties. Flag added `-protocProp` in case this behavior is required. +- #10814 Data flow info and data job info aspect will produce an additional field that will require a corresponding upgrade of server. Otherwise server can reject the aspects. - #10868 - OpenAPI V3 - Creation of aspects will need to be wrapped within a `value` key and the API is now symmetric with respect to input and outputs. Example Global Tags Aspect: diff --git a/metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py b/metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py index cb2c536bbab20f..3870e6978ee64f 100644 --- a/metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py +++ b/metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py @@ -3,6 +3,7 @@ from typing import Callable, Dict, Iterable, List, Optional, Set, cast import datahub.emitter.mce_builder as builder +from datahub.configuration.source_common import ALL_ENV_TYPES from datahub.emitter.generic_emitter import Emitter from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.metadata.schema_classes import ( @@ -110,7 +111,20 @@ def generate_tags_aspect(self) -> List[GlobalTagsClass]: ) return [tags] + def _get_env(self) -> Optional[str]: + env: Optional[str] = None + if self.cluster in ALL_ENV_TYPES: + env = self.cluster + elif self.env in ALL_ENV_TYPES: + env = self.env + else: + logger.warning( + f"cluster {self.cluster} and {self.env} is not a valid environment type so Environment filter won't work." + ) + return env + def generate_mce(self) -> MetadataChangeEventClass: + env = self._get_env() flow_mce = MetadataChangeEventClass( proposedSnapshot=DataFlowSnapshotClass( urn=str(self.urn), @@ -120,6 +134,7 @@ def generate_mce(self) -> MetadataChangeEventClass: description=self.description, customProperties=self.properties, externalUrl=self.url, + env=env, ), *self.generate_ownership_aspect(), *self.generate_tags_aspect(), @@ -130,6 +145,7 @@ def generate_mce(self) -> MetadataChangeEventClass: return flow_mce def generate_mcp(self) -> Iterable[MetadataChangeProposalWrapper]: + env = self._get_env() mcp = MetadataChangeProposalWrapper( entityUrn=str(self.urn), aspect=DataFlowInfoClass( @@ -137,6 +153,7 @@ def generate_mcp(self) -> Iterable[MetadataChangeProposalWrapper]: description=self.description, customProperties=self.properties, externalUrl=self.url, + env=env, ), ) yield mcp diff --git a/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py b/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py index e56e9f059d724a..514f0a5093aa59 100644 --- a/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py +++ b/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py @@ -1,7 +1,9 @@ +import logging from dataclasses import dataclass, field from typing import Callable, Dict, Iterable, List, Optional, Set import datahub.emitter.mce_builder as builder +from datahub.configuration.source_common import ALL_ENV_TYPES from datahub.emitter.generic_emitter import Emitter from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.metadata.schema_classes import ( @@ -22,6 +24,8 @@ from datahub.utilities.urns.data_job_urn import DataJobUrn from datahub.utilities.urns.dataset_urn import DatasetUrn +logger = logging.getLogger(__name__) + @dataclass class DataJob: @@ -103,6 +107,13 @@ def generate_tags_aspect(self) -> Iterable[GlobalTagsClass]: def generate_mcp( self, materialize_iolets: bool = True ) -> Iterable[MetadataChangeProposalWrapper]: + env: Optional[str] = None + if self.flow_urn.cluster in ALL_ENV_TYPES: + env = self.flow_urn.cluster + else: + logger.warning( + f"cluster {self.flow_urn.cluster} is not a valid environment type so Environment filter won't work." + ) mcp = MetadataChangeProposalWrapper( entityUrn=str(self.urn), aspect=DataJobInfoClass( @@ -111,6 +122,7 @@ def generate_mcp( description=self.description, customProperties=self.properties, externalUrl=self.url, + env=env, ), ) yield mcp diff --git a/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_empty_connection_user_golden.json b/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_empty_connection_user_golden.json index d2ae437605644f..29b186978a76a5 100644 --- a/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_empty_connection_user_golden.json +++ b/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_empty_connection_user_golden.json @@ -7,7 +7,8 @@ "aspect": { "json": { "customProperties": {}, - "name": "postgres" + "name": "postgres", + "env": "PROD" } }, "systemMetadata": { @@ -68,7 +69,8 @@ "name": "postgres", "type": { "string": "COMMAND" - } + }, + "env": "PROD" } }, "systemMetadata": { diff --git a/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_golden.json b/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_golden.json index 59e545183a4ec0..0cd3bb83f90f52 100644 --- a/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_golden.json +++ b/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_golden.json @@ -7,7 +7,8 @@ "aspect": { "json": { "customProperties": {}, - "name": "postgres" + "name": "postgres", + "env": "PROD" } }, "systemMetadata": { @@ -68,7 +69,8 @@ "name": "postgres", "type": { "string": "COMMAND" - } + }, + "env": "PROD" } }, "systemMetadata": { diff --git a/metadata-models/src/main/pegasus/com/linkedin/datajob/DataFlowInfo.pdl b/metadata-models/src/main/pegasus/com/linkedin/datajob/DataFlowInfo.pdl index 2ff3e8cd930afc..766181df018099 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/datajob/DataFlowInfo.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/datajob/DataFlowInfo.pdl @@ -4,6 +4,7 @@ import com.linkedin.common.CustomProperties import com.linkedin.common.ExternalReference import com.linkedin.common.Urn import com.linkedin.common.TimeStamp +import com.linkedin.common.FabricType /** * Information about a Data processing flow @@ -63,4 +64,15 @@ record DataFlowInfo includes CustomProperties, ExternalReference { } } lastModified: optional TimeStamp + + /** + * Environment for this flow + */ + @Searchable = { + "fieldType": "KEYWORD", + "addToFilters": true, + "filterNameOverride": "Environment", + "queryByDefault": false + } + env: optional FabricType } diff --git a/metadata-models/src/main/pegasus/com/linkedin/datajob/DataJobInfo.pdl b/metadata-models/src/main/pegasus/com/linkedin/datajob/DataJobInfo.pdl index 250fb760037776..46879e359e0147 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/datajob/DataJobInfo.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/datajob/DataJobInfo.pdl @@ -5,6 +5,7 @@ import com.linkedin.common.CustomProperties import com.linkedin.common.ExternalReference import com.linkedin.common.DataFlowUrn import com.linkedin.common.TimeStamp +import com.linkedin.common.FabricType /** * Information about a Data processing job @@ -72,4 +73,15 @@ record DataJobInfo includes CustomProperties, ExternalReference { */ @deprecated = "Use Data Process Instance model, instead" status: optional JobStatus + + /** + * Environment for this job + */ + @Searchable = { + "fieldType": "KEYWORD", + "addToFilters": true, + "filterNameOverride": "Environment", + "queryByDefault": false + } + env: optional FabricType } diff --git a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.aspects.snapshot.json b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.aspects.snapshot.json index 72578be8c54d07..eb92cf75a4d4e2 100644 --- a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.aspects.snapshot.json +++ b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.aspects.snapshot.json @@ -1491,6 +1491,17 @@ "fieldType" : "DATETIME" } } + }, { + "name" : "env", + "type" : "com.linkedin.common.FabricType", + "doc" : "Environment for this flow", + "optional" : true, + "Searchable" : { + "addToFilters" : true, + "fieldType" : "KEYWORD", + "filterNameOverride" : "Environment", + "queryByDefault" : false + } } ], "Aspect" : { "name" : "dataFlowInfo" @@ -1587,6 +1598,17 @@ "doc" : "Status of the job - Deprecated for Data Process Instance model.", "optional" : true, "deprecated" : "Use Data Process Instance model, instead" + }, { + "name" : "env", + "type" : "com.linkedin.common.FabricType", + "doc" : "Environment for this job", + "optional" : true, + "Searchable" : { + "addToFilters" : true, + "fieldType" : "KEYWORD", + "filterNameOverride" : "Environment", + "queryByDefault" : false + } } ], "Aspect" : { "name" : "dataJobInfo" diff --git a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json index 9b93f1184cd59f..0c983a021d4e73 100644 --- a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json +++ b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json @@ -1541,6 +1541,17 @@ "fieldType" : "DATETIME" } } + }, { + "name" : "env", + "type" : "com.linkedin.common.FabricType", + "doc" : "Environment for this flow", + "optional" : true, + "Searchable" : { + "addToFilters" : true, + "fieldType" : "KEYWORD", + "filterNameOverride" : "Environment", + "queryByDefault" : false + } } ], "Aspect" : { "name" : "dataFlowInfo" @@ -1637,6 +1648,17 @@ "doc" : "Status of the job - Deprecated for Data Process Instance model.", "optional" : true, "deprecated" : "Use Data Process Instance model, instead" + }, { + "name" : "env", + "type" : "com.linkedin.common.FabricType", + "doc" : "Environment for this job", + "optional" : true, + "Searchable" : { + "addToFilters" : true, + "fieldType" : "KEYWORD", + "filterNameOverride" : "Environment", + "queryByDefault" : false + } } ], "Aspect" : { "name" : "dataJobInfo" diff --git a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.runs.snapshot.json b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.runs.snapshot.json index 18ef55011ed5af..4af65cdb48b502 100644 --- a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.runs.snapshot.json +++ b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.runs.snapshot.json @@ -1228,6 +1228,17 @@ "fieldType" : "DATETIME" } } + }, { + "name" : "env", + "type" : "com.linkedin.common.FabricType", + "doc" : "Environment for this flow", + "optional" : true, + "Searchable" : { + "addToFilters" : true, + "fieldType" : "KEYWORD", + "filterNameOverride" : "Environment", + "queryByDefault" : false + } } ], "Aspect" : { "name" : "dataFlowInfo" @@ -1324,6 +1335,17 @@ "doc" : "Status of the job - Deprecated for Data Process Instance model.", "optional" : true, "deprecated" : "Use Data Process Instance model, instead" + }, { + "name" : "env", + "type" : "com.linkedin.common.FabricType", + "doc" : "Environment for this job", + "optional" : true, + "Searchable" : { + "addToFilters" : true, + "fieldType" : "KEYWORD", + "filterNameOverride" : "Environment", + "queryByDefault" : false + } } ], "Aspect" : { "name" : "dataJobInfo" diff --git a/metadata-service/restli-api/src/main/snapshot/com.linkedin.operations.operations.snapshot.json b/metadata-service/restli-api/src/main/snapshot/com.linkedin.operations.operations.snapshot.json index cf059788209119..e788c5d28ce71e 100644 --- a/metadata-service/restli-api/src/main/snapshot/com.linkedin.operations.operations.snapshot.json +++ b/metadata-service/restli-api/src/main/snapshot/com.linkedin.operations.operations.snapshot.json @@ -1228,6 +1228,17 @@ "fieldType" : "DATETIME" } } + }, { + "name" : "env", + "type" : "com.linkedin.common.FabricType", + "doc" : "Environment for this flow", + "optional" : true, + "Searchable" : { + "addToFilters" : true, + "fieldType" : "KEYWORD", + "filterNameOverride" : "Environment", + "queryByDefault" : false + } } ], "Aspect" : { "name" : "dataFlowInfo" @@ -1324,6 +1335,17 @@ "doc" : "Status of the job - Deprecated for Data Process Instance model.", "optional" : true, "deprecated" : "Use Data Process Instance model, instead" + }, { + "name" : "env", + "type" : "com.linkedin.common.FabricType", + "doc" : "Environment for this job", + "optional" : true, + "Searchable" : { + "addToFilters" : true, + "fieldType" : "KEYWORD", + "filterNameOverride" : "Environment", + "queryByDefault" : false + } } ], "Aspect" : { "name" : "dataJobInfo" diff --git a/metadata-service/restli-api/src/main/snapshot/com.linkedin.platform.platform.snapshot.json b/metadata-service/restli-api/src/main/snapshot/com.linkedin.platform.platform.snapshot.json index 15f16dd2ea6cd0..dbdba0040d4434 100644 --- a/metadata-service/restli-api/src/main/snapshot/com.linkedin.platform.platform.snapshot.json +++ b/metadata-service/restli-api/src/main/snapshot/com.linkedin.platform.platform.snapshot.json @@ -1541,6 +1541,17 @@ "fieldType" : "DATETIME" } } + }, { + "name" : "env", + "type" : "com.linkedin.common.FabricType", + "doc" : "Environment for this flow", + "optional" : true, + "Searchable" : { + "addToFilters" : true, + "fieldType" : "KEYWORD", + "filterNameOverride" : "Environment", + "queryByDefault" : false + } } ], "Aspect" : { "name" : "dataFlowInfo" @@ -1637,6 +1648,17 @@ "doc" : "Status of the job - Deprecated for Data Process Instance model.", "optional" : true, "deprecated" : "Use Data Process Instance model, instead" + }, { + "name" : "env", + "type" : "com.linkedin.common.FabricType", + "doc" : "Environment for this job", + "optional" : true, + "Searchable" : { + "addToFilters" : true, + "fieldType" : "KEYWORD", + "filterNameOverride" : "Environment", + "queryByDefault" : false + } } ], "Aspect" : { "name" : "dataJobInfo" From 8b88930fad73151476c4fd6876f77cde137f10ea Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Mon, 22 Jul 2024 20:25:12 +0530 Subject: [PATCH 05/17] fix(ui/ingest): correct privilege used to show tab (#10483) Co-authored-by: Kunal-kankriya <127090035+Kunal-kankriya@users.noreply.github.com> --- .../src/app/ingest/ManageIngestionPage.tsx | 14 +- .../src/app/ingest/secret/SecretsList.tsx | 4 +- .../src/app/shared/admin/HeaderLinks.tsx | 2 +- .../cypress/e2e/mutations/ingestion_source.js | 1 + .../manage_ingestion_secret_privilege.js | 199 ++++++++++++++++++ .../cypress/e2e/mutations/managing_secrets.js | 10 +- 6 files changed, 219 insertions(+), 11 deletions(-) create mode 100644 smoke-test/tests/cypress/cypress/e2e/mutations/manage_ingestion_secret_privilege.js diff --git a/datahub-web-react/src/app/ingest/ManageIngestionPage.tsx b/datahub-web-react/src/app/ingest/ManageIngestionPage.tsx index 1d04edbac228ac..6af924be99a6a0 100644 --- a/datahub-web-react/src/app/ingest/ManageIngestionPage.tsx +++ b/datahub-web-react/src/app/ingest/ManageIngestionPage.tsx @@ -2,6 +2,8 @@ import { Tabs, Typography } from 'antd'; import React, { useState } from 'react'; import styled from 'styled-components'; import { IngestionSourceList } from './source/IngestionSourceList'; +import { useAppConfig } from '../useAppConfig'; +import { useUserContext } from '../context/useUserContext'; import { SecretsList } from './secret/SecretsList'; import { OnboardingTour } from '../onboarding/OnboardingTour'; import { @@ -48,7 +50,13 @@ export const ManageIngestionPage = () => { /** * Determines which view should be visible: ingestion sources or secrets. */ - const [selectedTab, setSelectedTab] = useState(TabType.Sources); + const me = useUserContext(); + const { config } = useAppConfig(); + const isIngestionEnabled = config?.managedIngestionConfig.enabled; + const showIngestionTab = isIngestionEnabled && me && me.platformPrivileges?.manageIngestion; + const showSecretsTab = isIngestionEnabled && me && me.platformPrivileges?.manageSecrets; + const defaultTab = showIngestionTab ? TabType.Sources : TabType.Secrets; + const [selectedTab, setSelectedTab] = useState(defaultTab); const onClickTab = (newTab: string) => { setSelectedTab(TabType[newTab]); @@ -64,8 +72,8 @@ export const ManageIngestionPage = () => { onClickTab(tab)}> - - + {showIngestionTab && } + {showSecretsTab && } {selectedTab === TabType.Sources ? : } diff --git a/datahub-web-react/src/app/ingest/secret/SecretsList.tsx b/datahub-web-react/src/app/ingest/secret/SecretsList.tsx index 2219b6147d9e06..472dbf7f849dee 100644 --- a/datahub-web-react/src/app/ingest/secret/SecretsList.tsx +++ b/datahub-web-react/src/app/ingest/secret/SecretsList.tsx @@ -118,7 +118,7 @@ export const SecretsList = () => { { urn: res.data?.createSecret || '', name: state.name, - description: state.description, + description: state.description || '', }, client, pageSize, @@ -127,7 +127,7 @@ export const SecretsList = () => { .catch((e) => { message.destroy(); message.error({ - content: `Failed to update ingestion source!: \n ${e.message || ''}`, + content: `Failed to update secret!: \n ${e.message || ''}`, duration: 3, }); }); diff --git a/datahub-web-react/src/app/shared/admin/HeaderLinks.tsx b/datahub-web-react/src/app/shared/admin/HeaderLinks.tsx index e7b0025118ff11..cce2a2336515d6 100644 --- a/datahub-web-react/src/app/shared/admin/HeaderLinks.tsx +++ b/datahub-web-react/src/app/shared/admin/HeaderLinks.tsx @@ -75,7 +75,7 @@ export function HeaderLinks(props: Props) { const showAnalytics = (isAnalyticsEnabled && me && me?.platformPrivileges?.viewAnalytics) || false; const showSettings = true; const showIngestion = - isIngestionEnabled && me && me.platformPrivileges?.manageIngestion && me.platformPrivileges?.manageSecrets; + isIngestionEnabled && me && (me.platformPrivileges?.manageIngestion || me.platformPrivileges?.manageSecrets); useToggleEducationStepIdsAllowList(!!showIngestion, HOME_PAGE_INGESTION_ID); diff --git a/smoke-test/tests/cypress/cypress/e2e/mutations/ingestion_source.js b/smoke-test/tests/cypress/cypress/e2e/mutations/ingestion_source.js index 470f9e2eec4617..8707f090acad36 100644 --- a/smoke-test/tests/cypress/cypress/e2e/mutations/ingestion_source.js +++ b/smoke-test/tests/cypress/cypress/e2e/mutations/ingestion_source.js @@ -11,6 +11,7 @@ describe("ingestion source creation flow", () => { // Go to ingestion page, create a snowflake source cy.loginWithCredentials(); cy.goToIngestionPage(); + cy.clickOptionWithId('[data-node-key="Sources"]'); cy.clickOptionWithTestId("create-ingestion-source-button"); cy.clickOptionWithText("Snowflake"); cy.waitTextVisible("Snowflake Details"); diff --git a/smoke-test/tests/cypress/cypress/e2e/mutations/manage_ingestion_secret_privilege.js b/smoke-test/tests/cypress/cypress/e2e/mutations/manage_ingestion_secret_privilege.js new file mode 100644 index 00000000000000..a4d1e6ca375a4a --- /dev/null +++ b/smoke-test/tests/cypress/cypress/e2e/mutations/manage_ingestion_secret_privilege.js @@ -0,0 +1,199 @@ +const test_id = Math.floor(Math.random() * 100000); +const platform_policy_name = `Platform test policy ${test_id}`; +const number = Math.floor(Math.random() * 100000); +const name = `Example Name ${number}`; +const email = `example${number}@example.com`; + +const tryToSignUp = () => { + cy.enterTextInTestId("email", email); + cy.enterTextInTestId("name", name); + cy.enterTextInTestId("password", "Example password"); + cy.enterTextInTestId("confirmPassword", "Example password"); + cy.mouseover("#title").click(); + cy.waitTextVisible("Other").click(); + cy.clickOptionWithId("[type=submit]"); + return { name, email }; +}; + +const signIn = () => { + cy.visit("/login"); + cy.enterTextInTestId("username", email); + cy.enterTextInTestId("password", "Example password"); + cy.clickOptionWithId("[type=submit]"); +}; + +const updateAndSave = (Id, groupName, text) => { + cy.clickOptionWithTestId(Id).type(groupName); + cy.get(".rc-virtual-list").contains(text).click({ force: true }); + cy.focused().blur(); +}; + +const clickFocusAndType = (Id, text) => { + cy.clickOptionWithTestId(Id).focused().clear().type(text); +}; + +const clickOnButton = (saveButton) => { + cy.clickOptionWithId(`#${saveButton}`); +}; + +const createPolicy = (description, policyName) => { + clickFocusAndType("policy-description", description); + clickOnButton("nextButton"); + updateAndSave("privileges", "Ingestion", "Manage Metadata Ingestion"); + cy.wait(1000); + clickOnButton("nextButton"); + updateAndSave("users", "All", "All Users"); + clickOnButton("saveButton"); + cy.waitTextVisible("Successfully saved policy."); + cy.ensureTextNotPresent("Successfully saved policy."); + cy.reload(); + searchAndToggleMetadataPolicyStatus(policyName); + cy.get(".ant-table-row-level-0").contains(policyName); +}; + +const searchAndToggleMetadataPolicyStatus = (metadataPolicyName) => { + cy.get('[data-testid="search-input"]').should("be.visible"); + cy.get('[data-testid="search-input"]').last().type(metadataPolicyName); +}; + +const editPolicy = (policyName, type, select) => { + searchAndToggleMetadataPolicyStatus(policyName); + cy.contains("tr", policyName).as("metadataPolicyRow"); + cy.contains("EDIT").click(); + clickOnButton("nextButton"); + cy.clickOptionWithId(".ant-tag-close-icon"); + updateAndSave("privileges", type, select); + clickOnButton("nextButton"); + cy.clickOptionWithId(".ant-tag-close-icon"); + updateAndSave("users", name, name); + clickOnButton("saveButton"); + cy.waitTextVisible("Successfully saved policy."); +}; + +const deactivateExistingAllUserPolicies = () => { + cy.get(".ant-pagination li") + .its("length") + .then((len) => { + const pageCount = len - 2; + for (let page = 1; page <= pageCount; page++) { + cy.get("tbody tr td").should("be.visible"); + cy.get("tbody tr").each(($row) => { + cy.wrap($row) + .find("td") + .eq(3) + .invoke("text") + .then((role) => { + if (role === "All Users") { + cy.wrap($row) + .find("td") + .eq(5) + .find("div button") + .eq(1) + .invoke("text") + .then((buttonText) => { + if (buttonText === "DEACTIVATE") { + cy.wrap($row) + .find("td") + .eq(5) + .find("div button") + .eq(1) + .click(); + cy.waitTextVisible("Successfully deactivated policy."); + } + }); + } + }); + }); + if (page < pageCount) { + cy.contains("li", `${page + 1}`).click(); + cy.ensureTextNotPresent("No Policies"); + } + } + }); +}; + +describe("Manage Ingestion and Secret Privileges", () => { + let registeredEmail = ""; + it("create Metadata Ingestion platform policy and assign privileges to all users", () => { + cy.loginWithCredentials(); + cy.visit("/settings/permissions/policies"); + cy.waitTextVisible("Manage Permissions"); + cy.get(".ant-select-selection-item").should("be.visible").click(); + cy.get(".ant-select-item-option-content").contains("All").click(); + cy.get('[data-icon="delete"]').should("be.visible"); + deactivateExistingAllUserPolicies(); + cy.reload(); + cy.clickOptionWithText("Create new policy"); + clickFocusAndType("policy-name", platform_policy_name); + cy.clickOptionWithId('[data-testid="policy-type"] [title="Metadata"]'); + cy.clickOptionWithTestId("platform"); + createPolicy( + `Platform policy description ${test_id}`, + platform_policy_name, + ); + cy.logout(); + }); + + it("Create user and verify ingestion tab not present", () => { + cy.loginWithCredentials(); + cy.visit("/settings/identities/users"); + cy.waitTextVisible("Invite Users"); + cy.clickOptionWithText("Invite Users"); + cy.waitTextVisible(/signup\?invite_token=\w{32}/).then(($elem) => { + const inviteLink = $elem.text(); + cy.log(inviteLink); + cy.visit("/settings/identities/users"); + cy.logout(); + cy.visit(inviteLink); + const { name, email } = tryToSignUp(); + registeredEmail = email; + cy.waitTextVisible("Welcome to DataHub"); + cy.hideOnboardingTour(); + cy.waitTextVisible(name); + }); + }); + + it("Edit Metadata Ingestion platform policy and assign privileges to the user", () => { + cy.loginWithCredentials(); + cy.visit("/settings/permissions/policies"); + cy.waitTextVisible("Manage Permissions"); + editPolicy(platform_policy_name, "Ingestion", "Manage Metadata Ingestion"); + }); + + it("Verify new user can see ingestion and access Manage Ingestion tab", () => { + cy.clearCookies(); + cy.clearLocalStorage(); + signIn(); + cy.waitTextVisible("Welcome to DataHub"); + cy.hideOnboardingTour(); + cy.waitTextVisible(name); + cy.clickOptionWithText("Ingestion"); + cy.wait(1000); + cy.get("body").click(); + cy.waitTextVisible("Manage Data Sources"); + cy.waitTextVisible("Sources"); + cy.get(".ant-tabs-nav-list").contains("Source").should("be.visible"); + cy.get(".ant-tabs-tab").should("have.length", 1); + }); + + it("Verify new user can see ingestion and access Manage Secret tab", () => { + cy.clearCookies(); + cy.clearLocalStorage(); + cy.loginWithCredentials(); + cy.visit("/settings/permissions/policies"); + cy.waitTextVisible("Manage Permissions"); + editPolicy(platform_policy_name, "Secret", "Manage Secrets"); + cy.logout(); + signIn(); + cy.waitTextVisible("Welcome to DataHub"); + cy.hideOnboardingTour(); + cy.waitTextVisible(name); + cy.clickOptionWithText("Ingestion"); + cy.wait(1000); + cy.clickOptionWithId("body"); + cy.waitTextVisible("Manage Data Sources"); + cy.waitTextVisible("Secrets"); + cy.get(".ant-tabs-nav-list").contains("Secrets").should("be.visible"); + cy.get(".ant-tabs-tab").should("have.length", 1); + }); +}); diff --git a/smoke-test/tests/cypress/cypress/e2e/mutations/managing_secrets.js b/smoke-test/tests/cypress/cypress/e2e/mutations/managing_secrets.js index 1d95c1533c93c2..6953fe04940520 100644 --- a/smoke-test/tests/cypress/cypress/e2e/mutations/managing_secrets.js +++ b/smoke-test/tests/cypress/cypress/e2e/mutations/managing_secrets.js @@ -11,7 +11,6 @@ describe("managing secrets for ingestion creation", () => { // Navigate to the manage ingestion page → secrets cy.loginWithCredentials(); cy.goToIngestionPage(); - cy.openEntityTab("Secrets"); // Create a new secret cy.clickOptionWithTestId("create-secret-button"); @@ -28,6 +27,7 @@ describe("managing secrets for ingestion creation", () => { // Create an ingestion source using a secret cy.goToIngestionPage(); + cy.clickOptionWithId('[data-node-key="Sources"]'); cy.get("#ingestion-create-source").click(); cy.clickOptionWithText("Snowflake"); cy.waitTextVisible("Snowflake Details"); @@ -60,10 +60,10 @@ describe("managing secrets for ingestion creation", () => { // Remove ingestion source cy.goToIngestionPage(); - cy.get('[data-testid="delete-button"]').first().click(); + cy.clickOptionWithId('[data-node-key="Sources"]'); + cy.get('[aria-label="delete"]').first().click(); cy.waitTextVisible("Confirm Ingestion Source Removal"); cy.get("button").contains("Yes").click(); - cy.waitTextVisible("Removed ingestion source."); cy.ensureTextNotPresent(ingestion_source_name); // Verify secret is not present during ingestion source creation for password dropdown @@ -99,10 +99,10 @@ describe("managing secrets for ingestion creation", () => { // Remove ingestion source and secret cy.goToIngestionPage(); - cy.get('[data-testid="delete-button"]').first().click(); + cy.clickOptionWithId('[data-node-key="Sources"]'); + cy.get('[aria-label="delete"]').first().click(); cy.waitTextVisible("Confirm Ingestion Source Removal"); cy.get("button").contains("Yes").click(); - cy.waitTextVisible("Removed ingestion source."); cy.ensureTextNotPresent(ingestion_source_name); cy.clickOptionWithText("Secrets"); cy.waitTextVisible(`secretname${number}`); From a0905180f22bb151d6c1f3c434a701d2c5ef6e76 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 22 Jul 2024 15:26:37 -0700 Subject: [PATCH 06/17] feat(ingest/looker): include dashboard urns in browse v2 (#10955) --- .../ingestion/source/looker/looker_source.py | 9 ++++----- ...olden_test_external_project_view_mces.json | 3 ++- .../looker/golden_test_file_path_ingest.json | 3 ++- .../golden_test_independent_look_ingest.json | 3 ++- .../looker/golden_test_ingest.json | 3 ++- .../looker/golden_test_ingest_joins.json | 3 ++- .../looker_mces_golden_deleted_stateful.json | 19 ++++++++++--------- 7 files changed, 24 insertions(+), 19 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py index d951a6dbe7a626..cd050fec35c2c7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py @@ -670,11 +670,12 @@ def _make_chart_metadata_events( ) chart_snapshot.aspects.append(browse_path) + dashboard_urn = self.make_dashboard_urn(dashboard) browse_path_v2 = BrowsePathsV2Class( path=[ BrowsePathEntryClass("Folders"), *self._get_folder_browse_path_v2_entries(dashboard.folder), - BrowsePathEntryClass(id=dashboard.title), + BrowsePathEntryClass(id=dashboard_urn, urn=dashboard_urn), ], ) elif ( @@ -819,7 +820,7 @@ def _make_dashboard_metadata_events( return proposals - def make_dashboard_urn(self, looker_dashboard): + def make_dashboard_urn(self, looker_dashboard: LookerDashboard) -> str: return builder.make_dashboard_urn( self.source_config.platform_name, looker_dashboard.get_urn_dashboard_id() ) @@ -1202,9 +1203,7 @@ def _input_fields_from_dashboard_element( def _make_metrics_dimensions_dashboard_mcp( self, dashboard: LookerDashboard ) -> MetadataChangeProposalWrapper: - dashboard_urn = builder.make_dashboard_urn( - self.source_config.platform_name, dashboard.get_urn_dashboard_id() - ) + dashboard_urn = self.make_dashboard_urn(dashboard) all_fields = [] for dashboard_element in dashboard.dashboard_elements: all_fields.extend( diff --git a/metadata-ingestion/tests/integration/looker/golden_test_external_project_view_mces.json b/metadata-ingestion/tests/integration/looker/golden_test_external_project_view_mces.json index 3f11798d0aa5a4..d893a9bcc50b7b 100644 --- a/metadata-ingestion/tests/integration/looker/golden_test_external_project_view_mces.json +++ b/metadata-ingestion/tests/integration/looker/golden_test_external_project_view_mces.json @@ -175,7 +175,8 @@ "urn": "urn:li:container:691314a7b63628684d62a14861d057a8" }, { - "id": "foo" + "id": "urn:li:dashboard:(looker,dashboards.1)", + "urn": "urn:li:dashboard:(looker,dashboards.1)" } ] } diff --git a/metadata-ingestion/tests/integration/looker/golden_test_file_path_ingest.json b/metadata-ingestion/tests/integration/looker/golden_test_file_path_ingest.json index ec2c46c5daf27a..411005fcb6f054 100644 --- a/metadata-ingestion/tests/integration/looker/golden_test_file_path_ingest.json +++ b/metadata-ingestion/tests/integration/looker/golden_test_file_path_ingest.json @@ -175,7 +175,8 @@ "urn": "urn:li:container:691314a7b63628684d62a14861d057a8" }, { - "id": "foo" + "id": "urn:li:dashboard:(looker,dashboards.1)", + "urn": "urn:li:dashboard:(looker,dashboards.1)" } ] } diff --git a/metadata-ingestion/tests/integration/looker/golden_test_independent_look_ingest.json b/metadata-ingestion/tests/integration/looker/golden_test_independent_look_ingest.json index bb3c3ccb4e2146..6000332b2bf997 100644 --- a/metadata-ingestion/tests/integration/looker/golden_test_independent_look_ingest.json +++ b/metadata-ingestion/tests/integration/looker/golden_test_independent_look_ingest.json @@ -182,7 +182,8 @@ "urn": "urn:li:container:691314a7b63628684d62a14861d057a8" }, { - "id": "foo" + "id": "urn:li:dashboard:(looker,dashboards.1)", + "urn": "urn:li:dashboard:(looker,dashboards.1)" } ] } diff --git a/metadata-ingestion/tests/integration/looker/golden_test_ingest.json b/metadata-ingestion/tests/integration/looker/golden_test_ingest.json index a7b8abed02da31..639e69a6f82059 100644 --- a/metadata-ingestion/tests/integration/looker/golden_test_ingest.json +++ b/metadata-ingestion/tests/integration/looker/golden_test_ingest.json @@ -175,7 +175,8 @@ "urn": "urn:li:container:691314a7b63628684d62a14861d057a8" }, { - "id": "foo" + "id": "urn:li:dashboard:(looker,dashboards.1)", + "urn": "urn:li:dashboard:(looker,dashboards.1)" } ] } diff --git a/metadata-ingestion/tests/integration/looker/golden_test_ingest_joins.json b/metadata-ingestion/tests/integration/looker/golden_test_ingest_joins.json index a1a7747c741a6b..487a18022cb087 100644 --- a/metadata-ingestion/tests/integration/looker/golden_test_ingest_joins.json +++ b/metadata-ingestion/tests/integration/looker/golden_test_ingest_joins.json @@ -175,7 +175,8 @@ "urn": "urn:li:container:691314a7b63628684d62a14861d057a8" }, { - "id": "foo" + "id": "urn:li:dashboard:(looker,dashboards.1)", + "urn": "urn:li:dashboard:(looker,dashboards.1)" } ] } diff --git a/metadata-ingestion/tests/integration/looker/looker_mces_golden_deleted_stateful.json b/metadata-ingestion/tests/integration/looker/looker_mces_golden_deleted_stateful.json index e3cbf43059c7c7..ec5a4f1f801598 100644 --- a/metadata-ingestion/tests/integration/looker/looker_mces_golden_deleted_stateful.json +++ b/metadata-ingestion/tests/integration/looker/looker_mces_golden_deleted_stateful.json @@ -182,7 +182,8 @@ "urn": "urn:li:container:691314a7b63628684d62a14861d057a8" }, { - "id": "foo" + "id": "urn:li:dashboard:(looker,dashboards.1)", + "urn": "urn:li:dashboard:(looker,dashboards.1)" } ] } @@ -807,8 +808,8 @@ } }, { - "entityType": "chart", - "entityUrn": "urn:li:chart:(looker,dashboard_elements.10)", + "entityType": "container", + "entityUrn": "urn:li:container:621eb6e00da9abece0f64522f81be0e7", "changeType": "UPSERT", "aspectName": "status", "aspect": { @@ -824,8 +825,8 @@ } }, { - "entityType": "dashboard", - "entityUrn": "urn:li:dashboard:(looker,dashboards.11)", + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,bogus data.explore.my_view,PROD)", "changeType": "UPSERT", "aspectName": "status", "aspect": { @@ -841,8 +842,8 @@ } }, { - "entityType": "container", - "entityUrn": "urn:li:container:621eb6e00da9abece0f64522f81be0e7", + "entityType": "dashboard", + "entityUrn": "urn:li:dashboard:(looker,dashboards.11)", "changeType": "UPSERT", "aspectName": "status", "aspect": { @@ -858,8 +859,8 @@ } }, { - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,bogus data.explore.my_view,PROD)", + "entityType": "chart", + "entityUrn": "urn:li:chart:(looker,dashboard_elements.10)", "changeType": "UPSERT", "aspectName": "status", "aspect": { From ebe687bbc5da8a066275ac1f9d5e37c2ef66e90d Mon Sep 17 00:00:00 2001 From: Kevin Chun Date: Mon, 22 Jul 2024 16:21:06 -0700 Subject: [PATCH 07/17] add a structured type to batchGet in OpenAPI V3 spec (#10956) --- .../controller/GenericEntitiesController.java | 13 +++++--- .../openapi/v3/OpenAPIV3Generator.java | 31 +++++++++++-------- .../v3/controller/EntityController.java | 2 ++ .../openapi/v3/OpenAPIV3GeneratorTest.java | 13 ++++++++ 4 files changed, 41 insertions(+), 18 deletions(-) diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/controller/GenericEntitiesController.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/controller/GenericEntitiesController.java index c91c8ac987e5c9..de5d2ae1118d4a 100644 --- a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/controller/GenericEntitiesController.java +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/controller/GenericEntitiesController.java @@ -13,6 +13,7 @@ import com.datahub.authorization.AuthorizerChain; import com.datahub.util.RecordUtils; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableSet; import com.linkedin.common.urn.Urn; @@ -508,10 +509,10 @@ public ResponseEntity createAspect( @PathVariable("aspectName") String aspectName, @RequestParam(value = "systemMetadata", required = false, defaultValue = "false") Boolean withSystemMetadata, - @RequestParam(value = "createIfNotExists", required = false, defaultValue = "false") + @RequestParam(value = "createIfNotExists", required = false, defaultValue = "true") Boolean createIfNotExists, @RequestBody @Nonnull String jsonAspect) - throws URISyntaxException { + throws URISyntaxException, JsonProcessingException { Urn urn = validatedUrn(entityUrn); EntitySpec entitySpec = entityRegistry.getEntitySpec(entityName); @@ -649,8 +650,8 @@ protected Boolean exists( * fixes) * * @param requestedAspectNames requested aspects - * @return updated map * @param map values + * @return updated map */ protected LinkedHashMap> resolveAspectNames( LinkedHashMap> requestedAspectNames, @Nonnull T defaultValue) { @@ -732,7 +733,9 @@ protected ChangeMCP toUpsertItem( Boolean createIfNotExists, String jsonAspect, Actor actor) - throws URISyntaxException { + throws JsonProcessingException { + JsonNode jsonNode = objectMapper.readTree(jsonAspect); + String aspectJson = jsonNode.get("value").toString(); return ChangeItemImpl.builder() .urn(entityUrn) .aspectName(aspectSpec.getName()) @@ -740,7 +743,7 @@ protected ChangeMCP toUpsertItem( .auditStamp(AuditStampUtils.createAuditStamp(actor.toUrnStr())) .recordTemplate( GenericRecordUtils.deserializeAspect( - ByteString.copyString(jsonAspect, StandardCharsets.UTF_8), + ByteString.copyString(aspectJson, StandardCharsets.UTF_8), GenericRecordUtils.JSON, aspectSpec)) .build(aspectRetriever); diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/OpenAPIV3Generator.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/OpenAPIV3Generator.java index 3a93eb304b8f80..f423be82d6e8dc 100644 --- a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/OpenAPIV3Generator.java +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/OpenAPIV3Generator.java @@ -82,6 +82,20 @@ public static OpenAPI generateOpenApiSpec(EntityRegistry entityRegistry) { "SystemMetadata", new Schema().type(TYPE_OBJECT).additionalProperties(true)); components.addSchemas("SortOrder", new Schema()._enum(List.of("ASCENDING", "DESCENDING"))); components.addSchemas("AspectPatch", buildAspectPatchSchema()); + components.addSchemas( + "BatchGetRequestBody", + new Schema<>() + .type(TYPE_OBJECT) + .description("Request body for batch get aspects.") + .properties( + Map.of( + "headers", + new Schema<>() + .type(TYPE_OBJECT) + .additionalProperties(new Schema<>().type(TYPE_STRING)) + .description("System headers for the operation.") + .nullable(true))) + .nullable(true)); entityRegistry .getAspectSpecs() .values() @@ -645,28 +659,19 @@ private static Schema buildEntityScrollSchema(final EntitySpec entity) { private static Schema buildEntityBatchGetRequestSchema( final EntitySpec entity, Set aspectNames) { - final Schema stringTypeSchema = new Schema<>(); - stringTypeSchema.setType(TYPE_STRING); - final Map headers = - Map.of( - "headers", - new Schema<>() - .type(TYPE_OBJECT) - .additionalProperties(stringTypeSchema) - .description("System headers for the operation.") - .nullable(true)); - final Map properties = entity.getAspectSpecMap().entrySet().stream() .filter(a -> aspectNames.contains(a.getValue().getName())) .collect( Collectors.toMap( - Map.Entry::getKey, a -> new Schema().type(TYPE_OBJECT).properties(headers))); + Map.Entry::getKey, + a -> new Schema().$ref("#/components/schemas/BatchGetRequestBody"))); properties.put( PROPERTY_URN, new Schema<>().type(TYPE_STRING).description("Unique id for " + entity.getName())); - properties.put(entity.getKeyAspectName(), new Schema().type(TYPE_OBJECT).properties(headers)); + properties.put( + entity.getKeyAspectName(), new Schema().$ref("#/components/schemas/BatchGetRequestBody")); return new Schema<>() .type(TYPE_OBJECT) diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/controller/EntityController.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/controller/EntityController.java index d6feb6cc460c97..9ca34934e4c657 100644 --- a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/controller/EntityController.java +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/controller/EntityController.java @@ -37,6 +37,7 @@ import io.datahubproject.openapi.v3.models.GenericAspectV3; import io.datahubproject.openapi.v3.models.GenericEntityScrollResultV3; import io.datahubproject.openapi.v3.models.GenericEntityV3; +import io.swagger.v3.oas.annotations.Hidden; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.tags.Tag; import jakarta.servlet.http.HttpServletRequest; @@ -67,6 +68,7 @@ @RequiredArgsConstructor @RequestMapping("/v3/entity") @Slf4j +@Hidden public class EntityController extends GenericEntitiesController< GenericAspectV3, GenericEntityV3, GenericEntityScrollResultV3> { diff --git a/metadata-service/openapi-servlet/src/test/java/io/datahubproject/openapi/v3/OpenAPIV3GeneratorTest.java b/metadata-service/openapi-servlet/src/test/java/io/datahubproject/openapi/v3/OpenAPIV3GeneratorTest.java index 10b75fd7faed37..b0fbbce05a0f8e 100644 --- a/metadata-service/openapi-servlet/src/test/java/io/datahubproject/openapi/v3/OpenAPIV3GeneratorTest.java +++ b/metadata-service/openapi-servlet/src/test/java/io/datahubproject/openapi/v3/OpenAPIV3GeneratorTest.java @@ -88,5 +88,18 @@ public void testOpenApiSpecBuilder() throws Exception { Schema fabricType = openAPI.getComponents().getSchemas().get("FabricType"); assertEquals("string", fabricType.getType()); assertFalse(fabricType.getEnum().isEmpty()); + + Map batchProperties = + openAPI + .getComponents() + .getSchemas() + .get("BatchGetContainerEntityRequest_v3") + .getProperties(); + batchProperties.entrySet().stream() + .filter(entry -> !entry.getKey().equals("urn")) + .forEach( + entry -> + assertEquals( + "#/components/schemas/BatchGetRequestBody", entry.getValue().get$ref())); } } From c5d3153e83607c9407a13a025df8301060f74d6c Mon Sep 17 00:00:00 2001 From: Chris Collins Date: Tue, 23 Jul 2024 04:14:50 -0400 Subject: [PATCH 08/17] fix(ui): scroll on the domain sidebar to show all domains (#10966) --- datahub-web-react/src/app/shared/sidebar/components.tsx | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datahub-web-react/src/app/shared/sidebar/components.tsx b/datahub-web-react/src/app/shared/sidebar/components.tsx index c5e529bd3a91c8..1b53843ee992f2 100644 --- a/datahub-web-react/src/app/shared/sidebar/components.tsx +++ b/datahub-web-react/src/app/shared/sidebar/components.tsx @@ -8,6 +8,8 @@ export const SidebarWrapper = styled.div<{ width: number }>` width: ${(props) => props.width}px; min-width: ${(props) => props.width}px; display: ${(props) => (props.width ? 'block' : 'none')}; + display: flex; + flex-direction: column; `; export function RotatingTriangle({ isOpen, onClick }: { isOpen: boolean; onClick?: () => void }) { From c5dae7448b0e08d160095a4de704e6b405e80899 Mon Sep 17 00:00:00 2001 From: Tristan Heisler Date: Tue, 23 Jul 2024 06:27:52 -0600 Subject: [PATCH 09/17] fix(ingest/sagemaker): resolve incorrect variable assignment for SageMaker API call (#10965) --- .../datahub/ingestion/source/aws/sagemaker_processors/models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/sagemaker_processors/models.py b/metadata-ingestion/src/datahub/ingestion/source/aws/sagemaker_processors/models.py index e82cfc58f75a74..eef2b26ee08f2e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws/sagemaker_processors/models.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/sagemaker_processors/models.py @@ -418,7 +418,7 @@ def get_model_wu( model_image_groups: Set[str] = set() if model_image is not None: - model_uri_groups = self.lineage.model_image_to_groups.get( + model_image_groups = self.lineage.model_image_to_groups.get( model_image, set() ) From 123e84cfedf7ffd1d68e12594d131a52800ade30 Mon Sep 17 00:00:00 2001 From: Tamas Nemeth Date: Tue, 23 Jul 2024 21:30:34 +0200 Subject: [PATCH 10/17] fix(airflow/build): Pinning mypy (#10972) --- metadata-ingestion-modules/airflow-plugin/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion-modules/airflow-plugin/setup.py b/metadata-ingestion-modules/airflow-plugin/setup.py index 2d2f6fbd2b0891..6d5aa74b1d96ff 100644 --- a/metadata-ingestion-modules/airflow-plugin/setup.py +++ b/metadata-ingestion-modules/airflow-plugin/setup.py @@ -73,7 +73,7 @@ def get_long_description(): "flake8>=3.8.3", "flake8-tidy-imports>=4.3.0", "isort>=5.7.0", - "mypy>=1.4.0", + "mypy==1.10.1", # pydantic 1.8.2 is incompatible with mypy 0.910. # See https://github.com/samuelcolvin/pydantic/pull/3175#issuecomment-995382910. "pydantic>=1.10", From f0f226cbc19ca5280e7bf6a5c6aa13cc1c503ba3 Mon Sep 17 00:00:00 2001 From: Ajoy Majumdar Date: Tue, 23 Jul 2024 17:35:51 -0700 Subject: [PATCH 11/17] Fixed a bug where the OpenAPI V3 spec was incorrect. The bug was introduced in https://github.com/datahub-project/datahub/pull/10939. (#10974) --- .../io/datahubproject/openapi/config/SpringWebConfig.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/config/SpringWebConfig.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/config/SpringWebConfig.java index 09a6cc7c1e4b76..61c641e358f090 100644 --- a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/config/SpringWebConfig.java +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/config/SpringWebConfig.java @@ -16,6 +16,7 @@ import io.swagger.v3.oas.models.Components; import io.swagger.v3.oas.models.OpenAPI; import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -150,6 +151,11 @@ private Map concat(Supplier> a, Supplier> b) { : b.get() == null ? a.get() : Stream.concat(a.get().entrySet().stream(), b.get().entrySet().stream()) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + .collect( + Collectors.toMap( + Map.Entry::getKey, + Map.Entry::getValue, + (v1, v2) -> v2, + LinkedHashMap::new)); } } From 8d874ad1e4bef9d7afbe20fb3cb457566a15c61c Mon Sep 17 00:00:00 2001 From: Tamas Nemeth Date: Wed, 24 Jul 2024 13:11:25 +0200 Subject: [PATCH 12/17] fix(ingest/test): Fix for mssql integration tests (#10978) --- .../tests/integration/sql_server/test_sql_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/tests/integration/sql_server/test_sql_server.py b/metadata-ingestion/tests/integration/sql_server/test_sql_server.py index 4e9b4bee8ce6ba..1f418ffbd32ea9 100644 --- a/metadata-ingestion/tests/integration/sql_server/test_sql_server.py +++ b/metadata-ingestion/tests/integration/sql_server/test_sql_server.py @@ -22,7 +22,7 @@ def mssql_runner(docker_compose_runner, pytestconfig): time.sleep(5) # Run the setup.sql file to populate the database. - command = "docker exec testsqlserver /opt/mssql-tools/bin/sqlcmd -S localhost -U sa -P 'test!Password' -d master -i /setup/setup.sql" + command = "docker exec testsqlserver /opt/mssql-tools18/bin/sqlcmd -C -S localhost -U sa -P 'test!Password' -d master -i /setup/setup.sql" ret = subprocess.run(command, shell=True, capture_output=True) assert ret.returncode == 0 yield docker_services From 4921967e439407d9fd7f5715c2760b916abc5535 Mon Sep 17 00:00:00 2001 From: Jay <159848059+jayacryl@users.noreply.github.com> Date: Wed, 24 Jul 2024 11:11:45 -0400 Subject: [PATCH 13/17] fix(entity-service) exist check correctly extracts status (#10973) --- .../java/com/linkedin/metadata/entity/EntityServiceImpl.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java index 6f2f95b8b115ab..8c83e7f469fe31 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java @@ -2176,7 +2176,8 @@ public Set exists( urn -> // key aspect is always returned, make sure to only consider the status aspect statusResult.getOrDefault(urn, List.of()).stream() - .filter(aspect -> STATUS_ASPECT_NAME.equals(aspect.schema().getName())) + .filter( + aspect -> STATUS_ASPECT_NAME.equalsIgnoreCase(aspect.schema().getName())) .noneMatch(aspect -> ((Status) aspect).isRemoved())) .collect(Collectors.toSet()); } From 959d3517ec761568eafda9513d19ff64c26cabd6 Mon Sep 17 00:00:00 2001 From: Chris Collins Date: Wed, 24 Jul 2024 12:30:26 -0400 Subject: [PATCH 14/17] fix(structuredProps) casing bug in StructuredPropertiesValidator (#10982) --- .../validation/StructuredPropertiesValidator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/structuredproperties/validation/StructuredPropertiesValidator.java b/metadata-io/src/main/java/com/linkedin/metadata/structuredproperties/validation/StructuredPropertiesValidator.java index cdbe2eb95a15d2..25cbfb3a12ab39 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/structuredproperties/validation/StructuredPropertiesValidator.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/structuredproperties/validation/StructuredPropertiesValidator.java @@ -356,7 +356,7 @@ private static Optional validateType( throw new RuntimeException(e); } String allowedEntityName = getValueTypeId(typeUrn); - if (typeValue.getEntityType().equals(allowedEntityName)) { + if (typeValue.getEntityType().equalsIgnoreCase(allowedEntityName)) { matchedAny = true; } } From 725df58893bf17af24c934a5c984b66f11672b39 Mon Sep 17 00:00:00 2001 From: Kevin Chun Date: Wed, 24 Jul 2024 14:51:10 -0700 Subject: [PATCH 15/17] bugfix: use anyOf instead of allOf when creating references in openapi v3 spec (#10986) --- .../openapi/v3/OpenAPIV3Generator.java | 12 ++++++------ .../openapi/v3/OpenAPIV3GeneratorTest.java | 6 +++--- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/OpenAPIV3Generator.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/OpenAPIV3Generator.java index f423be82d6e8dc..f26ad6821c5833 100644 --- a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/OpenAPIV3Generator.java +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v3/OpenAPIV3Generator.java @@ -547,12 +547,12 @@ private static void addAspectSchemas(final Components components, final AspectSp String $ref = schema.get$ref(); boolean isNameRequired = requiredNames.contains(name); if ($ref != null && !isNameRequired) { - // A non-required $ref property must be wrapped in a { allOf: [ $ref ] } + // A non-required $ref property must be wrapped in a { anyOf: [ $ref ] } // object to allow the // property to be marked as nullable schema.setType(TYPE_OBJECT); schema.set$ref(null); - schema.setAllOf(List.of(new Schema().$ref($ref))); + schema.setAnyOf(List.of(new Schema().$ref($ref))); } schema.setNullable(!isNameRequired); }); @@ -578,7 +578,7 @@ private static Schema buildAspectRefResponseSchema(final String aspectName) { "systemMetadata", new Schema<>() .type(TYPE_OBJECT) - .allOf(List.of(new Schema().$ref(PATH_DEFINITIONS + "SystemMetadata"))) + .anyOf(List.of(new Schema().$ref(PATH_DEFINITIONS + "SystemMetadata"))) .description("System metadata for the aspect.") .nullable(true)); return result; @@ -595,7 +595,7 @@ private static Schema buildAspectRefRequestSchema(final String aspectName) { "systemMetadata", new Schema<>() .type(TYPE_OBJECT) - .allOf(List.of(new Schema().$ref(PATH_DEFINITIONS + "SystemMetadata"))) + .anyOf(List.of(new Schema().$ref(PATH_DEFINITIONS + "SystemMetadata"))) .description("System metadata for the aspect.") .nullable(true)); @@ -681,7 +681,7 @@ private static Schema buildEntityBatchGetRequestSchema( } private static Schema buildAspectRef(final String aspect, final boolean withSystemMetadata) { - // A non-required $ref property must be wrapped in a { allOf: [ $ref ] } + // A non-required $ref property must be wrapped in a { anyOf: [ $ref ] } // object to allow the // property to be marked as nullable final Schema result = new Schema<>(); @@ -697,7 +697,7 @@ private static Schema buildAspectRef(final String aspect, final boolean withSyst internalRef = String.format(FORMAT_PATH_DEFINITIONS, toUpperFirst(aspect), ASPECT_REQUEST_SUFFIX); } - result.setAllOf(List.of(new Schema().$ref(internalRef))); + result.setAnyOf(List.of(new Schema().$ref(internalRef))); return result; } diff --git a/metadata-service/openapi-servlet/src/test/java/io/datahubproject/openapi/v3/OpenAPIV3GeneratorTest.java b/metadata-service/openapi-servlet/src/test/java/io/datahubproject/openapi/v3/OpenAPIV3GeneratorTest.java index b0fbbce05a0f8e..e1568017156d9b 100644 --- a/metadata-service/openapi-servlet/src/test/java/io/datahubproject/openapi/v3/OpenAPIV3GeneratorTest.java +++ b/metadata-service/openapi-servlet/src/test/java/io/datahubproject/openapi/v3/OpenAPIV3GeneratorTest.java @@ -61,12 +61,12 @@ public void testOpenApiSpecBuilder() throws Exception { assertFalse(requiredNames.contains("name")); assertTrue(name.getNullable()); - // Assert non-required $ref properties are replaced by nullable { allOf: [ $ref ] } objects + // Assert non-required $ref properties are replaced by nullable { anyOf: [ $ref ] } objects Schema created = properties.get("created"); assertFalse(requiredNames.contains("created")); assertEquals("object", created.getType()); assertNull(created.get$ref()); - assertEquals(List.of(new Schema().$ref("#/components/schemas/TimeStamp")), created.getAllOf()); + assertEquals(List.of(new Schema().$ref("#/components/schemas/TimeStamp")), created.getAnyOf()); assertTrue(created.getNullable()); // Assert systemMetadata property on response schema is optional. @@ -81,7 +81,7 @@ public void testOpenApiSpecBuilder() throws Exception { assertNull(systemMetadata.get$ref()); assertEquals( List.of(new Schema().$ref("#/components/schemas/SystemMetadata")), - systemMetadata.getAllOf()); + systemMetadata.getAnyOf()); assertTrue(systemMetadata.getNullable()); // Assert enum property is string. From 66f8930164a65d5b973d626e6ccd47b510c9850d Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Wed, 24 Jul 2024 17:35:42 -0700 Subject: [PATCH 16/17] fix(ui): Remove ant less imports (#10988) --- .../ermodelrelationships/preview/ERModelRelationshipAction.less | 2 -- .../styled/ERModelRelationship/CreateERModelRelationModal.less | 2 -- .../styled/ERModelRelationship/ERModelRelationPreview.less | 2 -- .../shared/tabs/Dataset/Relationship/RelationshipsTab.less | 2 -- .../shared/tabs/ERModelRelationship/ERModelRelationshipTab.less | 2 -- 5 files changed, 10 deletions(-) diff --git a/datahub-web-react/src/app/entity/ermodelrelationships/preview/ERModelRelationshipAction.less b/datahub-web-react/src/app/entity/ermodelrelationships/preview/ERModelRelationshipAction.less index 0f63ee197fecb2..41e201585f3b8e 100644 --- a/datahub-web-react/src/app/entity/ermodelrelationships/preview/ERModelRelationshipAction.less +++ b/datahub-web-react/src/app/entity/ermodelrelationships/preview/ERModelRelationshipAction.less @@ -1,5 +1,3 @@ -@import '../../../../../node_modules/antd/dist/antd.less'; - .joinName { width: 385px; height: 24px; diff --git a/datahub-web-react/src/app/entity/shared/components/styled/ERModelRelationship/CreateERModelRelationModal.less b/datahub-web-react/src/app/entity/shared/components/styled/ERModelRelationship/CreateERModelRelationModal.less index 8c1f29aa7fc77c..363c5080efffcc 100644 --- a/datahub-web-react/src/app/entity/shared/components/styled/ERModelRelationship/CreateERModelRelationModal.less +++ b/datahub-web-react/src/app/entity/shared/components/styled/ERModelRelationship/CreateERModelRelationModal.less @@ -1,5 +1,3 @@ -@import '../../../../../../../node_modules/antd/dist/antd.less'; - .CreateERModelRelationModal { .ermodelrelation-name { padding: 8px 16px; diff --git a/datahub-web-react/src/app/entity/shared/components/styled/ERModelRelationship/ERModelRelationPreview.less b/datahub-web-react/src/app/entity/shared/components/styled/ERModelRelationship/ERModelRelationPreview.less index fa55efd7730077..6f598bb3b457a2 100644 --- a/datahub-web-react/src/app/entity/shared/components/styled/ERModelRelationship/ERModelRelationPreview.less +++ b/datahub-web-react/src/app/entity/shared/components/styled/ERModelRelationship/ERModelRelationPreview.less @@ -1,5 +1,3 @@ -@import '../../../../../../../node_modules/antd/dist/antd.less'; - .ERModelRelationPreview { .preview-main-div { display: flex; diff --git a/datahub-web-react/src/app/entity/shared/tabs/Dataset/Relationship/RelationshipsTab.less b/datahub-web-react/src/app/entity/shared/tabs/Dataset/Relationship/RelationshipsTab.less index a25a450f468cbc..71d13d64e86c95 100644 --- a/datahub-web-react/src/app/entity/shared/tabs/Dataset/Relationship/RelationshipsTab.less +++ b/datahub-web-react/src/app/entity/shared/tabs/Dataset/Relationship/RelationshipsTab.less @@ -1,5 +1,3 @@ -@import '../../../../../../../node_modules/antd/dist/antd.less'; - .RelationshipsTab { .add-btn-link { height: 56px !important; diff --git a/datahub-web-react/src/app/entity/shared/tabs/ERModelRelationship/ERModelRelationshipTab.less b/datahub-web-react/src/app/entity/shared/tabs/ERModelRelationship/ERModelRelationshipTab.less index 897dbf251330bf..afab46b958706a 100644 --- a/datahub-web-react/src/app/entity/shared/tabs/ERModelRelationship/ERModelRelationshipTab.less +++ b/datahub-web-react/src/app/entity/shared/tabs/ERModelRelationship/ERModelRelationshipTab.less @@ -1,5 +1,3 @@ -@import '../../../../../../node_modules/antd/dist/antd.less'; - .ERModelRelationTab { .add-btn-link { padding-left: 1155px !important; From fde71d450024f10b129be99bcadc4458ec771de3 Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Wed, 24 Jul 2024 17:49:41 -0700 Subject: [PATCH 17/17] feat(ingest/graph): Add get_results_by_filter to DataHubGraph (#10987) --- .../src/datahub/ingestion/graph/client.py | 150 +++++++++++++++++- .../src/datahub/ingestion/graph/filters.py | 20 +++ 2 files changed, 169 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index 1d6097da231f8f..5ce2fe9941d988 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -171,6 +171,22 @@ def test_connection(self) -> None: self.server_id = _MISSING_SERVER_ID logger.debug(f"Failed to get server id due to {e}") + @property + def frontend_base_url(self) -> str: + """Get the public-facing base url of the frontend + + This url can be used to construct links to the frontend. The url will not include a trailing slash. + Note: Only supported with Acryl Cloud. + """ + + if not self.server_config: + self.test_connection() + + base_url = self.server_config.get("baseUrl") + if not base_url: + raise ValueError("baseUrl not found in server config") + return base_url + @classmethod def from_emitter(cls, emitter: DatahubRestEmitter) -> "DataHubGraph": return cls( @@ -812,6 +828,7 @@ def get_urns_by_filter( status: RemovedStatusFilter = RemovedStatusFilter.NOT_SOFT_DELETED, batch_size: int = 10000, extraFilters: Optional[List[SearchFilterRule]] = None, + extra_or_filters: Optional[List[Dict[str, List[SearchFilterRule]]]] = None, ) -> Iterable[str]: """Fetch all urns that match all of the given filters. @@ -841,7 +858,13 @@ def get_urns_by_filter( # Env filter. orFilters = generate_filter( - platform, platform_instance, env, container, status, extraFilters + platform, + platform_instance, + env, + container, + status, + extraFilters, + extra_or_filters=extra_or_filters, ) graphql_query = textwrap.dedent( @@ -885,6 +908,131 @@ def get_urns_by_filter( for entity in self._scroll_across_entities(graphql_query, variables): yield entity["urn"] + def get_results_by_filter( + self, + *, + entity_types: Optional[List[str]] = None, + platform: Optional[str] = None, + platform_instance: Optional[str] = None, + env: Optional[str] = None, + query: Optional[str] = None, + container: Optional[str] = None, + status: RemovedStatusFilter = RemovedStatusFilter.NOT_SOFT_DELETED, + batch_size: int = 10000, + extra_and_filters: Optional[List[SearchFilterRule]] = None, + extra_or_filters: Optional[List[Dict[str, List[SearchFilterRule]]]] = None, + extra_source_fields: Optional[List[str]] = None, + skip_cache: bool = False, + ) -> Iterable[dict]: + """Fetch all results that match all of the given filters. + + Filters are combined conjunctively. If multiple filters are specified, the results will match all of them. + Note that specifying a platform filter will automatically exclude all entity types that do not have a platform. + The same goes for the env filter. + + :param entity_types: List of entity types to include. If None, all entity types will be returned. + :param platform: Platform to filter on. If None, all platforms will be returned. + :param platform_instance: Platform instance to filter on. If None, all platform instances will be returned. + :param env: Environment (e.g. PROD, DEV) to filter on. If None, all environments will be returned. + :param query: Query string to filter on. If None, all entities will be returned. + :param container: A container urn that entities must be within. + This works recursively, so it will include entities within sub-containers as well. + If None, all entities will be returned. + Note that this requires browsePathV2 aspects (added in 0.10.4+). + :param status: Filter on the deletion status of the entity. The default is only return non-soft-deleted entities. + :param extra_and_filters: Additional filters to apply. If specified, the + results will match all of the filters. + :param extra_or_filters: Additional filters to apply. If specified, the + results will match any of the filters. + + :return: An iterable of urns that match the filters. + """ + + types = self._get_types(entity_types) + + # Add the query default of * if no query is specified. + query = query or "*" + + or_filters_final = generate_filter( + platform, + platform_instance, + env, + container, + status, + extra_and_filters, + extra_or_filters, + ) + graphql_query = textwrap.dedent( + """ + query scrollUrnsWithFilters( + $types: [EntityType!], + $query: String!, + $orFilters: [AndFilterInput!], + $batchSize: Int!, + $scrollId: String, + $skipCache: Boolean!, + $fetchExtraFields: [String!]) { + + scrollAcrossEntities(input: { + query: $query, + count: $batchSize, + scrollId: $scrollId, + types: $types, + orFilters: $orFilters, + searchFlags: { + skipHighlighting: true + skipAggregates: true + skipCache: $skipCache + fetchExtraFields: $fetchExtraFields + } + }) { + nextScrollId + searchResults { + entity { + urn + } + } + } + } + """ + ) + + variables = { + "types": types, + "query": query, + "orFilters": or_filters_final, + "batchSize": batch_size, + "skipCache": "true" if skip_cache else "false", + "fetchExtraFields": extra_source_fields, + } + + for result in self._scroll_across_entities_results(graphql_query, variables): + yield result + + def _scroll_across_entities_results( + self, graphql_query: str, variables_orig: dict + ) -> Iterable[dict]: + variables = variables_orig.copy() + first_iter = True + scroll_id: Optional[str] = None + while first_iter or scroll_id: + first_iter = False + variables["scrollId"] = scroll_id + + response = self.execute_graphql( + graphql_query, + variables=variables, + ) + data = response["scrollAcrossEntities"] + scroll_id = data["nextScrollId"] + for entry in data["searchResults"]: + yield entry + + if scroll_id: + logger.debug( + f"Scrolling to next scrollAcrossEntities page: {scroll_id}" + ) + def _scroll_across_entities( self, graphql_query: str, variables_orig: dict ) -> Iterable[dict]: diff --git a/metadata-ingestion/src/datahub/ingestion/graph/filters.py b/metadata-ingestion/src/datahub/ingestion/graph/filters.py index 1a63aea8357296..8974f159171d1e 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/filters.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/filters.py @@ -30,7 +30,19 @@ def generate_filter( container: Optional[str], status: RemovedStatusFilter, extra_filters: Optional[List[SearchFilterRule]], + extra_or_filters: Optional[List[SearchFilterRule]] = None, ) -> List[Dict[str, List[SearchFilterRule]]]: + """ + Generate a search filter based on the provided parameters. + :param platform: The platform to filter by. + :param platform_instance: The platform instance to filter by. + :param env: The environment to filter by. + :param container: The container to filter by. + :param status: The status to filter by. + :param extra_filters: Extra AND filters to apply. + :param extra_or_filters: Extra OR filters to apply. These are combined with + the AND filters using an OR at the top level. + """ and_filters: List[SearchFilterRule] = [] # Platform filter. @@ -66,6 +78,14 @@ def generate_filter( for and_filter in or_filters ] + # Extra OR filters are distributed across the top level and lists. + if extra_or_filters: + or_filters = [ + {"and": and_filter["and"] + [extra_or_filter]} + for extra_or_filter in extra_or_filters + for and_filter in or_filters + ] + return or_filters