diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/bootstrapmcps/BootstrapMCPUtil.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/bootstrapmcps/BootstrapMCPUtil.java index 6ebc6e8ec6a3b..4cc3edff3eb52 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/bootstrapmcps/BootstrapMCPUtil.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/bootstrapmcps/BootstrapMCPUtil.java @@ -54,6 +54,7 @@ static List generateSteps( .getBootstrap() .getTemplates() .stream() + .map(cfg -> cfg.withOverride(opContext.getObjectMapper())) .filter(cfg -> cfg.isBlocking() == isBlocking) .map(cfg -> new BootstrapMCPStep(opContext, entityService, cfg)) .collect(Collectors.toList()); diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/bootstrapmcps/model/BootstrapMCPConfigFile.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/bootstrapmcps/model/BootstrapMCPConfigFile.java index 8fd3dd7c7d897..009d19e453b6a 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/bootstrapmcps/model/BootstrapMCPConfigFile.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/bootstrapmcps/model/BootstrapMCPConfigFile.java @@ -1,5 +1,7 @@ package com.linkedin.datahub.upgrade.system.bootstrapmcps.model; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; import java.util.List; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -7,6 +9,7 @@ import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; @AllArgsConstructor @NoArgsConstructor @@ -23,6 +26,7 @@ public static class Bootstrap { private List templates; } + @Slf4j @AllArgsConstructor @NoArgsConstructor @Data @@ -36,5 +40,19 @@ public static class MCPTemplate { @Builder.Default private boolean optional = false; @Nonnull private String mcps_location; @Nullable private String values_env; + @Nullable private String revision_env; + + public MCPTemplate withOverride(ObjectMapper objectMapper) { + if (revision_env != null) { + String overrideJson = System.getenv().getOrDefault(revision_env, "{}"); + try { + return objectMapper.readerForUpdating(this).readValue(overrideJson); + } catch (IOException e) { + log.error("Error applying override {} to {}", overrideJson, this); + throw new RuntimeException(e); + } + } + return this; + } } } diff --git a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/system/bootstrapmcps/BootstrapMCPUtilTest.java b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/system/bootstrapmcps/BootstrapMCPUtilTest.java index b471e366e906f..f914b355fe780 100644 --- a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/system/bootstrapmcps/BootstrapMCPUtilTest.java +++ b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/system/bootstrapmcps/BootstrapMCPUtilTest.java @@ -4,6 +4,7 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; +import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.linkedin.common.AuditStamp; import com.linkedin.common.urn.UrnUtils; @@ -17,6 +18,7 @@ import io.datahubproject.test.metadata.context.TestOperationContexts; import java.io.IOException; import java.util.List; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Listeners; import org.testng.annotations.Test; import uk.org.webcompere.systemstubs.environment.EnvironmentVariables; @@ -28,10 +30,17 @@ public class BootstrapMCPUtilTest { static final OperationContext OP_CONTEXT = TestOperationContexts.systemContextNoSearchAuthorization(); private static final String DATAHUB_TEST_VALUES_ENV = "DATAHUB_TEST_VALUES_ENV"; + private static final String DATAHUB_TEST_REVISION_ENV = "DATAHUB_TEST_REVISION_ENV"; private static final AuditStamp TEST_AUDIT_STAMP = AuditStampUtils.createDefaultAuditStamp(); @SystemStub private EnvironmentVariables environmentVariables; + @BeforeMethod + private void resetEnvironment() { + environmentVariables.remove(DATAHUB_TEST_VALUES_ENV); + environmentVariables.remove(DATAHUB_TEST_REVISION_ENV); + } + @Test public void testResolveYamlConf() throws IOException { BootstrapMCPConfigFile initConfig = @@ -51,9 +60,28 @@ public void testResolveYamlConf() throws IOException { } @Test - public void testResolveMCPTemplateDefaults() throws IOException { - environmentVariables.remove(DATAHUB_TEST_VALUES_ENV); + public void testResolveYamlConfOverride() throws IOException { + environmentVariables.set(DATAHUB_TEST_REVISION_ENV, "{\"version\":\"2024110600\"}"); + + BootstrapMCPConfigFile initConfig = + BootstrapMCPUtil.resolveYamlConf( + OP_CONTEXT, "bootstrapmcp/test.yaml", BootstrapMCPConfigFile.class); + assertEquals(initConfig.getBootstrap().getTemplates().size(), 1); + + BootstrapMCPConfigFile.MCPTemplate template = + initConfig.getBootstrap().getTemplates().get(0).withOverride(new ObjectMapper()); + assertEquals(template.getName(), "datahub-test"); + assertEquals(template.getVersion(), "2024110600"); + assertFalse(template.isForce()); + assertFalse(template.isBlocking()); + assertTrue(template.isAsync()); + assertFalse(template.isOptional()); + assertEquals(template.getMcps_location(), "bootstrapmcp/datahub-test-mcp.yaml"); + assertEquals(template.getValues_env(), "DATAHUB_TEST_VALUES_ENV"); + } + @Test + public void testResolveMCPTemplateDefaults() throws IOException { BootstrapMCPConfigFile.MCPTemplate template = BootstrapMCPUtil.resolveYamlConf( OP_CONTEXT, "bootstrapmcp/test.yaml", BootstrapMCPConfigFile.class) @@ -186,8 +214,6 @@ public void testResolveMCPTemplateOverride() throws IOException { @Test public void testMCPBatch() throws IOException { - environmentVariables.remove(DATAHUB_TEST_VALUES_ENV); - BootstrapMCPConfigFile.MCPTemplate template = BootstrapMCPUtil.resolveYamlConf( OP_CONTEXT, "bootstrapmcp/test.yaml", BootstrapMCPConfigFile.class) diff --git a/datahub-upgrade/src/test/resources/bootstrapmcp/test.yaml b/datahub-upgrade/src/test/resources/bootstrapmcp/test.yaml index 649cc09632fc2..5718ea3ac0e04 100644 --- a/datahub-upgrade/src/test/resources/bootstrapmcp/test.yaml +++ b/datahub-upgrade/src/test/resources/bootstrapmcp/test.yaml @@ -6,4 +6,5 @@ bootstrap: # blocking: false # async: true mcps_location: "bootstrapmcp/datahub-test-mcp.yaml" - values_env: "DATAHUB_TEST_VALUES_ENV" \ No newline at end of file + values_env: "DATAHUB_TEST_VALUES_ENV" + revision_env: "DATAHUB_TEST_REVISION_ENV" \ No newline at end of file diff --git a/docs/advanced/bootstrap-mcps.md b/docs/advanced/bootstrap-mcps.md index 0aa4b7608740f..c3ad7db2016db 100644 --- a/docs/advanced/bootstrap-mcps.md +++ b/docs/advanced/bootstrap-mcps.md @@ -149,6 +149,28 @@ to the required json structure and stored as a string. executorId: default ``` +## `bootstrap_mcps.yaml` Override + +Additionally, the `bootstrap_mcps.yaml` can be overridden. +This might be useful for applying changes to the version when using helm defined template values. + +```yaml +bootstrap: + templates: + - name: myMCPTemplate + version: v1 + mcps_location: + values_env: + revision_env: REVISION_ENV +``` + +In the above example, we've added a `revision_env` which allows overriding the MCP bootstrap definition itself (excluding `revision_env`). + +In this example we could configure `REVISION_ENV` to contain a timestamp or hash: `{"version":"2024060600"}` +This value can be changed/incremented each time the helm supplied template values change. This ensures the MCP is updated +with the latest values during deployment. + + ## Known Limitations * Supported change types: diff --git a/metadata-ingestion/src/datahub/emitter/mcp_builder.py b/metadata-ingestion/src/datahub/emitter/mcp_builder.py index b7fb1fd56891c..293157f8a1ed0 100644 --- a/metadata-ingestion/src/datahub/emitter/mcp_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mcp_builder.py @@ -213,6 +213,17 @@ def gen_containers( ) container_urn = container_key.as_urn() + + if parent_container_key: # Yield Container aspect first for auto_browse_path_v2 + parent_container_urn = make_container_urn(guid=parent_container_key.guid()) + + # Set database container + parent_container_mcp = MetadataChangeProposalWrapper( + entityUrn=f"{container_urn}", + aspect=ContainerClass(container=parent_container_urn), + ) + yield parent_container_mcp.as_workunit() + yield MetadataChangeProposalWrapper( entityUrn=f"{container_urn}", aspect=ContainerProperties( @@ -276,18 +287,6 @@ def gen_containers( tags=sorted(tags), ) - if parent_container_key: - parent_container_urn = make_container_urn( - guid=parent_container_key.guid(), - ) - - # Set database container - parent_container_mcp = MetadataChangeProposalWrapper( - entityUrn=f"{container_urn}", - aspect=ContainerClass(container=parent_container_urn), - ) - yield parent_container_mcp.as_workunit() - def add_dataset_to_container( container_key: KeyType, dataset_urn: str diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py index 907e5c12e99a1..345467ab76c86 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py @@ -523,7 +523,6 @@ def _process_schema( logger.warning( f"Could not create table ref for {table_item.path}: {e}" ) - yield from [] return if self.config.include_tables: diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py index afaaaf51964f8..497947abe4ef9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py @@ -276,18 +276,23 @@ def get_workunits_internal( logger.info(f"Found {self.report.num_unique_queries} unique queries") with self.report.audit_log_load_timer, queries_deduped: - i = 0 - for _, query_instances in queries_deduped.items(): + last_log_time = datetime.now() + last_report_time = datetime.now() + for i, (_, query_instances) in enumerate(queries_deduped.items()): for query in query_instances.values(): - if i > 0 and i % 10000 == 0: + now = datetime.now() + if (now - last_log_time).total_seconds() >= 60: logger.info( - f"Added {i} query log equeries_dedupedntries to SQL aggregator" + f"Added {i} deduplicated query log entries to SQL aggregator" ) + last_log_time = now + + if (now - last_report_time).total_seconds() >= 300: if self.report.sql_aggregator: logger.info(self.report.sql_aggregator.as_string()) + last_report_time = now self.aggregator.add(query) - i += 1 yield from auto_workunit(self.aggregator.gen_metadata()) diff --git a/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_sql_queries.py b/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_sql_queries.py index ce82b35f7819a..1c247c7d1f7bc 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_sql_queries.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_sql_queries.py @@ -242,7 +242,7 @@ class DremioSQLQueries: SYS.JOBS_RECENT WHERE STATUS = 'COMPLETED' - AND LENGTH(queried_datasets)>0 + AND ARRAY_SIZE(queried_datasets)>0 AND user_name != '$dremio$' AND query_type not like '%INTERNAL%' """ @@ -251,10 +251,10 @@ class DremioSQLQueries: SELECT * FROM - SYS.PROJECT.HISTORY.JOBS + sys.project.history.jobs WHERE STATUS = 'COMPLETED' - AND LENGTH(queried_datasets)>0 + AND ARRAY_SIZE(queried_datasets)>0 AND user_name != '$dremio$' AND query_type not like '%INTERNAL%' """ diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py index 21c967e162891..adbfc48692db9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py @@ -7,7 +7,6 @@ DataProcessInstance, InstanceRunResult, ) -from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.decorators import ( SourceCapability, @@ -43,7 +42,6 @@ FineGrainedLineageDownstreamType, FineGrainedLineageUpstreamType, ) -from datahub.metadata.schema_classes import StatusClass from datahub.utilities.urns.data_flow_urn import DataFlowUrn from datahub.utilities.urns.dataset_urn import DatasetUrn @@ -281,15 +279,6 @@ def _get_connector_workunits( for mcp in datajob.generate_mcp(materialize_iolets=False): yield mcp.as_workunit() - # Materialize the upstream referenced datasets. - # We assume that the downstreams are materialized by other ingestion sources. - for iolet in datajob.inlets: - # We don't want these to be tracked by stateful ingestion. - yield MetadataChangeProposalWrapper( - entityUrn=str(iolet), - aspect=StatusClass(removed=False), - ).as_workunit(is_primary_source=False) - # Map Fivetran's job/sync history entity with Datahub's data process entity if len(connector.jobs) >= MAX_JOBS_PER_CONNECTOR: self.report.warning( diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_classes.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_classes.py index fc5cd76458a51..70e4ee68f5351 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_classes.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_classes.py @@ -229,7 +229,7 @@ class User: groupUserAccessRight: Optional[str] = None def get_urn_part(self, use_email: bool, remove_email_suffix: bool) -> str: - if use_email: + if use_email and self.emailAddress: if remove_email_suffix: return self.emailAddress.split("@")[0] else: diff --git a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py index f0496379c45b8..e8a0369597d53 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py @@ -1571,7 +1571,10 @@ def _recurse_into_query( if upstream_query_ids: for upstream_query_id in upstream_query_ids: upstream_query = self._query_map.get(upstream_query_id) - if upstream_query: + if ( + upstream_query + and upstream_query.query_id not in composed_of_queries + ): temp_query_lineage_info = _recurse_into_query( upstream_query, recursion_path ) diff --git a/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_empty_connection_user_golden.json b/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_empty_connection_user_golden.json index 0f8f4cc64e7ca..7934153051c60 100644 --- a/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_empty_connection_user_golden.json +++ b/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_empty_connection_user_golden.json @@ -226,38 +226,6 @@ "lastRunId": "no-run-id-provided" } }, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.employee,DEV)", - "changeType": "UPSERT", - "aspectName": "status", - "aspect": { - "json": { - "removed": false - } - }, - "systemMetadata": { - "lastObserved": 1654621200000, - "runId": "powerbi-test", - "lastRunId": "no-run-id-provided" - } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.company,DEV)", - "changeType": "UPSERT", - "aspectName": "status", - "aspect": { - "json": { - "removed": false - } - }, - "systemMetadata": { - "lastObserved": 1654621200000, - "runId": "powerbi-test", - "lastRunId": "no-run-id-provided" - } -}, { "entityType": "dataProcessInstance", "entityUrn": "urn:li:dataProcessInstance:ee88d32dbe3133a23a9023c097050190", diff --git a/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_golden.json b/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_golden.json index 22933f3483e76..c46bd8fb65d87 100644 --- a/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_golden.json +++ b/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_golden.json @@ -234,38 +234,6 @@ "lastRunId": "no-run-id-provided" } }, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.employee,DEV)", - "changeType": "UPSERT", - "aspectName": "status", - "aspect": { - "json": { - "removed": false - } - }, - "systemMetadata": { - "lastObserved": 1654621200000, - "runId": "powerbi-test", - "lastRunId": "no-run-id-provided" - } -}, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.company,DEV)", - "changeType": "UPSERT", - "aspectName": "status", - "aspect": { - "json": { - "removed": false - } - }, - "systemMetadata": { - "lastObserved": 1654621200000, - "runId": "powerbi-test", - "lastRunId": "no-run-id-provided" - } -}, { "entityType": "dataProcessInstance", "entityUrn": "urn:li:dataProcessInstance:ee88d32dbe3133a23a9023c097050190", diff --git a/metadata-ingestion/tests/unit/api/__init__.py b/metadata-ingestion/tests/unit/api/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/metadata-ingestion/tests/unit/api/source_helpers/__init__.py b/metadata-ingestion/tests/unit/api/source_helpers/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/metadata-ingestion/tests/unit/api/source_helpers/test_auto_browse_path_v2.py b/metadata-ingestion/tests/unit/api/source_helpers/test_auto_browse_path_v2.py new file mode 100644 index 0000000000000..0ad777c577d70 --- /dev/null +++ b/metadata-ingestion/tests/unit/api/source_helpers/test_auto_browse_path_v2.py @@ -0,0 +1,448 @@ +from itertools import zip_longest +from typing import Any, Dict, Iterable, List +from unittest.mock import patch + +import datahub.metadata.schema_classes as models +from datahub.emitter.mce_builder import ( + make_container_urn, + make_dataplatform_instance_urn, + make_dataset_urn, +) +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.emitter.mcp_builder import DatabaseKey, SchemaKey +from datahub.ingestion.api.source_helpers import ( + _prepend_platform_instance, + auto_browse_path_v2, + auto_status_aspect, +) +from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.source.sql.sql_utils import gen_schema_container +from datahub.metadata.schema_classes import BrowsePathEntryClass, BrowsePathsV2Class + + +def test_auto_browse_path_v2_gen_containers_threaded(): + database_key = DatabaseKey(platform="snowflake", database="db") + schema_keys = [ + SchemaKey(platform="snowflake", database="db", schema=f"schema_{i}") + for i in range(10) + ] + + wus_per_schema = [ + gen_schema_container( + schema=key.db_schema, + database=key.database, + sub_types=[], + database_container_key=database_key, + schema_container_key=key, + ) + for key in schema_keys + ] + for wu in auto_browse_path_v2(_iterate_wus_round_robin(wus_per_schema)): + aspect = wu.get_aspect_of_type(BrowsePathsV2Class) + if aspect: + assert aspect.path == [ + BrowsePathEntryClass( + id=database_key.as_urn(), urn=database_key.as_urn() + ) + ] + + +def _iterate_wus_round_robin( + mcps_per_schema: List[Iterable[MetadataWorkUnit]], +) -> Iterable[MetadataWorkUnit]: + # Simulate a potential ordering of MCPs when using thread pool to generate MCPs + for wus in zip_longest(*mcps_per_schema, fillvalue=None): + for wu in wus: + if wu is not None: + yield wu + + +@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") +def test_auto_browse_path_v2_by_container_hierarchy(telemetry_ping_mock): + structure = { + "one": { + "a": {"i": ["1", "2", "3"], "ii": ["4"]}, + "b": {"iii": ["5", "6"]}, + }, + "two": { + "c": {"iv": [], "v": ["7", "8"]}, + }, + "three": {"d": {}}, + "four": {}, + } + + wus = list(auto_status_aspect(_create_container_aspects(structure))) + assert ( # Sanity check + sum(bool(wu.get_aspect_of_type(models.StatusClass)) for wu in wus) == 21 + ) + + new_wus = list(auto_browse_path_v2(wus)) + assert not telemetry_ping_mock.call_count, telemetry_ping_mock.call_args_list + assert ( + sum(bool(wu.get_aspect_of_type(models.BrowsePathsV2Class)) for wu in new_wus) + == 21 + ) + + paths = _get_browse_paths_from_wu(new_wus) + assert paths["one"] == [] + assert ( + paths["7"] + == paths["8"] + == _make_container_browse_path_entries(["two", "c", "v"]) + ) + assert paths["d"] == _make_container_browse_path_entries(["three"]) + assert paths["i"] == _make_container_browse_path_entries(["one", "a"]) + + # Check urns emitted on demand -- not all at end + for urn in {wu.get_urn() for wu in new_wus}: + try: + idx = next( + i + for i, wu in enumerate(new_wus) + if wu.get_aspect_of_type(models.ContainerClass) and wu.get_urn() == urn + ) + except StopIteration: + idx = next( + i + for i, wu in enumerate(new_wus) + if wu.get_aspect_of_type(models.StatusClass) and wu.get_urn() == urn + ) + assert new_wus[idx + 1].get_aspect_of_type( + models.BrowsePathsV2Class + ) or new_wus[idx + 2].get_aspect_of_type(models.BrowsePathsV2Class) + + +@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") +def test_auto_browse_path_v2_ignores_urns_already_with(telemetry_ping_mock): + structure = {"a": {"b": {"c": {"d": ["e"]}}}} + + wus = [ + *auto_status_aspect( + _create_container_aspects( + structure, + other_aspects={ + "f": [ + models.BrowsePathsClass(paths=["/one/two"]), + models.BrowsePathsV2Class( + path=_make_browse_path_entries(["my", "path"]) + ), + ], + "c": [ + models.BrowsePathsV2Class( + path=_make_container_browse_path_entries(["custom", "path"]) + ) + ], + }, + ), + ) + ] + new_wus = list(auto_browse_path_v2(wus)) + assert not telemetry_ping_mock.call_count, telemetry_ping_mock.call_args_list + assert ( + sum(bool(wu.get_aspect_of_type(models.BrowsePathsV2Class)) for wu in new_wus) + == 6 + ) + + paths = _get_browse_paths_from_wu(new_wus) + assert paths["a"] == [] + assert paths["c"] == _make_container_browse_path_entries(["custom", "path"]) + assert paths["f"] == _make_browse_path_entries(["my", "path"]) + assert paths["d"] == _make_container_browse_path_entries(["custom", "path", "c"]) + assert paths["e"] == _make_container_browse_path_entries( + ["custom", "path", "c", "d"] + ) + + +@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") +def test_auto_browse_path_v2_with_platform_instance_and_source_browse_path_v2( + telemetry_ping_mock, +): + structure = {"a": {"b": {"c": {"d": ["e"]}}}} + + platform = "platform" + instance = "instance" + + wus = [ + *auto_status_aspect( + _create_container_aspects( + structure, + other_aspects={ + "a": [ + models.BrowsePathsV2Class( + path=_make_browse_path_entries(["my", "path"]), + ), + ], + }, + ), + ) + ] + new_wus = list( + auto_browse_path_v2(wus, platform=platform, platform_instance=instance) + ) + assert not telemetry_ping_mock.call_count, telemetry_ping_mock.call_args_list + assert ( + sum(bool(wu.get_aspect_of_type(models.BrowsePathsV2Class)) for wu in new_wus) + == 5 + ) + + paths = _get_browse_paths_from_wu(new_wus) + assert paths["a"] == _with_platform_instance( + _make_browse_path_entries(["my", "path"]), + ) + assert paths["b"] == _with_platform_instance( + [ + *_make_browse_path_entries(["my", "path"]), + *_make_container_browse_path_entries(["a"]), + ], + ) + assert paths["c"] == _with_platform_instance( + [ + *_make_browse_path_entries(["my", "path"]), + *_make_container_browse_path_entries(["a", "b"]), + ], + ) + assert paths["d"] == _with_platform_instance( + [ + *_make_browse_path_entries(["my", "path"]), + *_make_container_browse_path_entries(["a", "b", "c"]), + ], + ) + assert paths["e"] == _with_platform_instance( + [ + *_make_browse_path_entries(["my", "path"]), + *_make_container_browse_path_entries(["a", "b", "c", "d"]), + ], + ) + + +@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") +def test_auto_browse_path_v2_legacy_browse_path(telemetry_ping_mock): + platform = "platform" + env = "PROD" + wus = [ + MetadataChangeProposalWrapper( + entityUrn=make_dataset_urn(platform, "dataset-1", env), + aspect=models.BrowsePathsClass(["/one/two"]), + ).as_workunit(), + MetadataChangeProposalWrapper( + entityUrn=make_dataset_urn(platform, "dataset-2", env), + aspect=models.BrowsePathsClass([f"/{platform}/{env}/something"]), + ).as_workunit(), + MetadataChangeProposalWrapper( + entityUrn=make_dataset_urn(platform, "dataset-3", env), + aspect=models.BrowsePathsClass([f"/{platform}/one/two"]), + ).as_workunit(), + ] + new_wus = list(auto_browse_path_v2(wus, drop_dirs=["platform", "PROD", "unused"])) + assert not telemetry_ping_mock.call_count, telemetry_ping_mock.call_args_list + assert len(new_wus) == 6 + paths = _get_browse_paths_from_wu(new_wus) + assert ( + paths["platform,dataset-1,PROD)"] + == paths["platform,dataset-3,PROD)"] + == _make_browse_path_entries(["one", "two"]) + ) + assert paths["platform,dataset-2,PROD)"] == _make_browse_path_entries(["something"]) + + +@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") +def test_auto_browse_path_v2_container_over_legacy_browse_path(telemetry_ping_mock): + structure = {"a": {"b": ["c"]}} + wus = list( + auto_status_aspect( + _create_container_aspects( + structure, + other_aspects={"b": [models.BrowsePathsClass(paths=["/one/two"])]}, + ), + ) + ) + new_wus = list(auto_browse_path_v2(wus)) + assert not telemetry_ping_mock.call_count, telemetry_ping_mock.call_args_list + assert ( + sum(bool(wu.get_aspect_of_type(models.BrowsePathsV2Class)) for wu in new_wus) + == 3 + ) + + paths = _get_browse_paths_from_wu(new_wus) + assert paths["a"] == [] + assert paths["b"] == _make_container_browse_path_entries(["a"]) + assert paths["c"] == _make_container_browse_path_entries(["a", "b"]) + + +@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") +def test_auto_browse_path_v2_with_platform_instance(telemetry_ping_mock): + platform = "my_platform" + platform_instance = "my_instance" + platform_instance_urn = make_dataplatform_instance_urn(platform, platform_instance) + platform_instance_entry = models.BrowsePathEntryClass( + platform_instance_urn, platform_instance_urn + ) + + structure = {"a": {"b": ["c"]}} + wus = list(auto_status_aspect(_create_container_aspects(structure))) + + new_wus = list( + auto_browse_path_v2( + wus, + platform=platform, + platform_instance=platform_instance, + ) + ) + assert telemetry_ping_mock.call_count == 0 + + assert ( + sum(bool(wu.get_aspect_of_type(models.BrowsePathsV2Class)) for wu in new_wus) + == 3 + ) + paths = _get_browse_paths_from_wu(new_wus) + assert paths["a"] == [platform_instance_entry] + assert paths["b"] == [ + platform_instance_entry, + *_make_container_browse_path_entries(["a"]), + ] + assert paths["c"] == [ + platform_instance_entry, + *_make_container_browse_path_entries(["a", "b"]), + ] + + +@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") +def test_auto_browse_path_v2_invalid_batch_telemetry(telemetry_ping_mock): + structure = {"a": {"b": ["c"]}} + b_urn = make_container_urn("b") + wus = [ + *_create_container_aspects(structure), + MetadataChangeProposalWrapper( # Browse path for b separate from its Container aspect + entityUrn=b_urn, + aspect=models.BrowsePathsClass(paths=["/one/two"]), + ).as_workunit(), + ] + wus = list(auto_status_aspect(wus)) + + assert telemetry_ping_mock.call_count == 0 + _ = list(auto_browse_path_v2(wus)) + assert telemetry_ping_mock.call_count == 1 + assert telemetry_ping_mock.call_args_list[0][0][0] == "incorrect_browse_path_v2" + assert telemetry_ping_mock.call_args_list[0][0][1]["num_out_of_order"] == 0 + assert telemetry_ping_mock.call_args_list[0][0][1]["num_out_of_batch"] == 1 + + +@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") +def test_auto_browse_path_v2_no_invalid_batch_telemetry_for_unrelated_aspects( + telemetry_ping_mock, +): + structure = {"a": {"b": ["c"]}} + b_urn = make_container_urn("b") + wus = [ + *_create_container_aspects(structure), + MetadataChangeProposalWrapper( # Browse path for b separate from its Container aspect + entityUrn=b_urn, + aspect=models.ContainerPropertiesClass("container name"), + ).as_workunit(), + ] + wus = list(auto_status_aspect(wus)) + + assert telemetry_ping_mock.call_count == 0 + _ = list(auto_browse_path_v2(wus)) + assert telemetry_ping_mock.call_count == 0 + + +@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") +def test_auto_browse_path_v2_invalid_order_telemetry(telemetry_ping_mock): + structure = {"a": {"b": ["c"]}} + wus = list(reversed(list(_create_container_aspects(structure)))) + wus = list(auto_status_aspect(wus)) + + assert telemetry_ping_mock.call_count == 0 + new_wus = list(auto_browse_path_v2(wus)) + assert ( + sum(bool(wu.get_aspect_of_type(models.BrowsePathsV2Class)) for wu in new_wus) + > 0 + ) + assert telemetry_ping_mock.call_count == 1 + assert telemetry_ping_mock.call_args_list[0][0][0] == "incorrect_browse_path_v2" + assert telemetry_ping_mock.call_args_list[0][0][1]["num_out_of_order"] == 1 + assert telemetry_ping_mock.call_args_list[0][0][1]["num_out_of_batch"] == 0 + + +@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") +def test_auto_browse_path_v2_dry_run(telemetry_ping_mock): + structure = {"a": {"b": ["c"]}} + wus = list(reversed(list(_create_container_aspects(structure)))) + wus = list(auto_status_aspect(wus)) + + assert telemetry_ping_mock.call_count == 0 + new_wus = list(auto_browse_path_v2(wus, dry_run=True)) + assert wus == new_wus + assert ( + sum(bool(wu.get_aspect_of_type(models.BrowsePathsV2Class)) for wu in new_wus) + == 0 + ) + assert telemetry_ping_mock.call_count == 1 + + +def _with_platform_instance( + path: List[models.BrowsePathEntryClass], +) -> List[models.BrowsePathEntryClass]: + platform = "platform" + instance = "instance" + return _prepend_platform_instance(path, platform, instance) + + +def _get_browse_paths_from_wu( + stream: Iterable[MetadataWorkUnit], +) -> Dict[str, List[models.BrowsePathEntryClass]]: + paths = {} + for wu in stream: + browse_path_v2 = wu.get_aspect_of_type(models.BrowsePathsV2Class) + if browse_path_v2: + name = wu.get_urn().split(":")[-1] + paths[name] = browse_path_v2.path + return paths + + +def _create_container_aspects( + d: Dict[str, Any], + other_aspects: Dict[str, List[models._Aspect]] = {}, + root: bool = True, +) -> Iterable[MetadataWorkUnit]: + for k, v in d.items(): + urn = make_container_urn(k) + yield MetadataChangeProposalWrapper( + entityUrn=urn, aspect=models.StatusClass(removed=False) + ).as_workunit() + + for aspect in other_aspects.pop(k, []): + yield MetadataChangeProposalWrapper( + entityUrn=urn, aspect=aspect + ).as_workunit() + + for child in list(v): + yield MetadataChangeProposalWrapper( + entityUrn=make_container_urn(child), + aspect=models.ContainerClass(container=urn), + ).as_workunit() + if isinstance(v, dict): + yield from _create_container_aspects( + v, other_aspects=other_aspects, root=False + ) + + if root: + for k, v in other_aspects.items(): + for aspect in v: + yield MetadataChangeProposalWrapper( + entityUrn=make_container_urn(k), aspect=aspect + ).as_workunit() + + +def _make_container_browse_path_entries( + path: List[str], +) -> List[models.BrowsePathEntryClass]: + return [ + models.BrowsePathEntryClass(id=make_container_urn(s), urn=make_container_urn(s)) + for s in path + ] + + +def _make_browse_path_entries(path: List[str]) -> List[models.BrowsePathEntryClass]: + return [models.BrowsePathEntryClass(id=s, urn=None) for s in path] diff --git a/metadata-ingestion/tests/unit/api/source_helpers/test_source_helpers.py b/metadata-ingestion/tests/unit/api/source_helpers/test_source_helpers.py index 97f65f1bd6a5b..cdfd24554e5e5 100644 --- a/metadata-ingestion/tests/unit/api/source_helpers/test_source_helpers.py +++ b/metadata-ingestion/tests/unit/api/source_helpers/test_source_helpers.py @@ -1,25 +1,18 @@ import logging from datetime import datetime -from typing import Any, Dict, Iterable, List, Union -from unittest.mock import patch +from typing import List, Union import pytest from freezegun import freeze_time import datahub.metadata.schema_classes as models from datahub.configuration.time_window_config import BaseTimeWindowConfig -from datahub.emitter.mce_builder import ( - make_container_urn, - make_dataplatform_instance_urn, - make_dataset_urn, -) +from datahub.emitter.mce_builder import make_dataset_urn from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.auto_work_units.auto_dataset_properties_aspect import ( auto_patch_last_modified, ) from datahub.ingestion.api.source_helpers import ( - _prepend_platform_instance, - auto_browse_path_v2, auto_empty_dataset_usage_statistics, auto_lowercase_urns, auto_status_aspect, @@ -106,261 +99,6 @@ def test_auto_status_aspect(): assert list(auto_status_aspect(initial_wu)) == expected -def _create_container_aspects( - d: Dict[str, Any], - other_aspects: Dict[str, List[models._Aspect]] = {}, - root: bool = True, -) -> Iterable[MetadataWorkUnit]: - for k, v in d.items(): - urn = make_container_urn(k) - yield MetadataChangeProposalWrapper( - entityUrn=urn, aspect=models.StatusClass(removed=False) - ).as_workunit() - - for aspect in other_aspects.pop(k, []): - yield MetadataChangeProposalWrapper( - entityUrn=urn, aspect=aspect - ).as_workunit() - - for child in list(v): - yield MetadataChangeProposalWrapper( - entityUrn=make_container_urn(child), - aspect=models.ContainerClass(container=urn), - ).as_workunit() - if isinstance(v, dict): - yield from _create_container_aspects( - v, other_aspects=other_aspects, root=False - ) - - if root: - for k, v in other_aspects.items(): - for aspect in v: - yield MetadataChangeProposalWrapper( - entityUrn=make_container_urn(k), aspect=aspect - ).as_workunit() - - -def _make_container_browse_path_entries( - path: List[str], -) -> List[models.BrowsePathEntryClass]: - return [ - models.BrowsePathEntryClass(id=make_container_urn(s), urn=make_container_urn(s)) - for s in path - ] - - -def _make_browse_path_entries(path: List[str]) -> List[models.BrowsePathEntryClass]: - return [models.BrowsePathEntryClass(id=s, urn=None) for s in path] - - -def prepend_platform_instance( - path: List[models.BrowsePathEntryClass], -) -> List[models.BrowsePathEntryClass]: - platform = "platform" - instance = "instance" - return _prepend_platform_instance(path, platform, instance) - - -def _get_browse_paths_from_wu( - stream: Iterable[MetadataWorkUnit], -) -> Dict[str, List[models.BrowsePathEntryClass]]: - paths = {} - for wu in stream: - browse_path_v2 = wu.get_aspect_of_type(models.BrowsePathsV2Class) - if browse_path_v2: - name = wu.get_urn().split(":")[-1] - paths[name] = browse_path_v2.path - return paths - - -@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") -def test_auto_browse_path_v2_by_container_hierarchy(telemetry_ping_mock): - structure = { - "one": { - "a": {"i": ["1", "2", "3"], "ii": ["4"]}, - "b": {"iii": ["5", "6"]}, - }, - "two": { - "c": {"iv": [], "v": ["7", "8"]}, - }, - "three": {"d": {}}, - "four": {}, - } - - wus = list(auto_status_aspect(_create_container_aspects(structure))) - assert ( # Sanity check - sum(bool(wu.get_aspect_of_type(models.StatusClass)) for wu in wus) == 21 - ) - - new_wus = list(auto_browse_path_v2(wus)) - assert not telemetry_ping_mock.call_count, telemetry_ping_mock.call_args_list - assert ( - sum(bool(wu.get_aspect_of_type(models.BrowsePathsV2Class)) for wu in new_wus) - == 21 - ) - - paths = _get_browse_paths_from_wu(new_wus) - assert paths["one"] == [] - assert ( - paths["7"] - == paths["8"] - == _make_container_browse_path_entries(["two", "c", "v"]) - ) - assert paths["d"] == _make_container_browse_path_entries(["three"]) - assert paths["i"] == _make_container_browse_path_entries(["one", "a"]) - - # Check urns emitted on demand -- not all at end - for urn in {wu.get_urn() for wu in new_wus}: - try: - idx = next( - i - for i, wu in enumerate(new_wus) - if wu.get_aspect_of_type(models.ContainerClass) and wu.get_urn() == urn - ) - except StopIteration: - idx = next( - i - for i, wu in enumerate(new_wus) - if wu.get_aspect_of_type(models.StatusClass) and wu.get_urn() == urn - ) - assert new_wus[idx + 1].get_aspect_of_type( - models.BrowsePathsV2Class - ) or new_wus[idx + 2].get_aspect_of_type(models.BrowsePathsV2Class) - - -@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") -def test_auto_browse_path_v2_ignores_urns_already_with(telemetry_ping_mock): - structure = {"a": {"b": {"c": {"d": ["e"]}}}} - - wus = [ - *auto_status_aspect( - _create_container_aspects( - structure, - other_aspects={ - "f": [ - models.BrowsePathsClass(paths=["/one/two"]), - models.BrowsePathsV2Class( - path=_make_browse_path_entries(["my", "path"]) - ), - ], - "c": [ - models.BrowsePathsV2Class( - path=_make_container_browse_path_entries(["custom", "path"]) - ) - ], - }, - ), - ) - ] - new_wus = list(auto_browse_path_v2(wus)) - assert not telemetry_ping_mock.call_count, telemetry_ping_mock.call_args_list - assert ( - sum(bool(wu.get_aspect_of_type(models.BrowsePathsV2Class)) for wu in new_wus) - == 6 - ) - - paths = _get_browse_paths_from_wu(new_wus) - assert paths["a"] == [] - assert paths["c"] == _make_container_browse_path_entries(["custom", "path"]) - assert paths["f"] == _make_browse_path_entries(["my", "path"]) - assert paths["d"] == _make_container_browse_path_entries(["custom", "path", "c"]) - assert paths["e"] == _make_container_browse_path_entries( - ["custom", "path", "c", "d"] - ) - - -@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") -def test_auto_browse_path_v2_with_platform_instance_and_source_browse_path_v2( - telemetry_ping_mock, -): - structure = {"a": {"b": {"c": {"d": ["e"]}}}} - - platform = "platform" - instance = "instance" - - wus = [ - *auto_status_aspect( - _create_container_aspects( - structure, - other_aspects={ - "a": [ - models.BrowsePathsV2Class( - path=_make_browse_path_entries(["my", "path"]), - ), - ], - }, - ), - ) - ] - new_wus = list( - auto_browse_path_v2(wus, platform=platform, platform_instance=instance) - ) - assert not telemetry_ping_mock.call_count, telemetry_ping_mock.call_args_list - assert ( - sum(bool(wu.get_aspect_of_type(models.BrowsePathsV2Class)) for wu in new_wus) - == 5 - ) - - paths = _get_browse_paths_from_wu(new_wus) - assert paths["a"] == prepend_platform_instance( - _make_browse_path_entries(["my", "path"]), - ) - assert paths["b"] == prepend_platform_instance( - [ - *_make_browse_path_entries(["my", "path"]), - *_make_container_browse_path_entries(["a"]), - ], - ) - assert paths["c"] == prepend_platform_instance( - [ - *_make_browse_path_entries(["my", "path"]), - *_make_container_browse_path_entries(["a", "b"]), - ], - ) - assert paths["d"] == prepend_platform_instance( - [ - *_make_browse_path_entries(["my", "path"]), - *_make_container_browse_path_entries(["a", "b", "c"]), - ], - ) - assert paths["e"] == prepend_platform_instance( - [ - *_make_browse_path_entries(["my", "path"]), - *_make_container_browse_path_entries(["a", "b", "c", "d"]), - ], - ) - - -@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") -def test_auto_browse_path_v2_legacy_browse_path(telemetry_ping_mock): - platform = "platform" - env = "PROD" - wus = [ - MetadataChangeProposalWrapper( - entityUrn=make_dataset_urn(platform, "dataset-1", env), - aspect=models.BrowsePathsClass(["/one/two"]), - ).as_workunit(), - MetadataChangeProposalWrapper( - entityUrn=make_dataset_urn(platform, "dataset-2", env), - aspect=models.BrowsePathsClass([f"/{platform}/{env}/something"]), - ).as_workunit(), - MetadataChangeProposalWrapper( - entityUrn=make_dataset_urn(platform, "dataset-3", env), - aspect=models.BrowsePathsClass([f"/{platform}/one/two"]), - ).as_workunit(), - ] - new_wus = list(auto_browse_path_v2(wus, drop_dirs=["platform", "PROD", "unused"])) - assert not telemetry_ping_mock.call_count, telemetry_ping_mock.call_args_list - assert len(new_wus) == 6 - paths = _get_browse_paths_from_wu(new_wus) - assert ( - paths["platform,dataset-1,PROD)"] - == paths["platform,dataset-3,PROD)"] - == _make_browse_path_entries(["one", "two"]) - ) - assert paths["platform,dataset-2,PROD)"] == _make_browse_path_entries(["something"]) - - def test_auto_lowercase_aspects(): mcws = auto_workunit( [ @@ -430,142 +168,6 @@ def test_auto_lowercase_aspects(): assert list(auto_lowercase_urns(mcws)) == expected -@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") -def test_auto_browse_path_v2_container_over_legacy_browse_path(telemetry_ping_mock): - structure = {"a": {"b": ["c"]}} - wus = list( - auto_status_aspect( - _create_container_aspects( - structure, - other_aspects={"b": [models.BrowsePathsClass(paths=["/one/two"])]}, - ), - ) - ) - new_wus = list(auto_browse_path_v2(wus)) - assert not telemetry_ping_mock.call_count, telemetry_ping_mock.call_args_list - assert ( - sum(bool(wu.get_aspect_of_type(models.BrowsePathsV2Class)) for wu in new_wus) - == 3 - ) - - paths = _get_browse_paths_from_wu(new_wus) - assert paths["a"] == [] - assert paths["b"] == _make_container_browse_path_entries(["a"]) - assert paths["c"] == _make_container_browse_path_entries(["a", "b"]) - - -@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") -def test_auto_browse_path_v2_with_platform_instance(telemetry_ping_mock): - platform = "my_platform" - platform_instance = "my_instance" - platform_instance_urn = make_dataplatform_instance_urn(platform, platform_instance) - platform_instance_entry = models.BrowsePathEntryClass( - platform_instance_urn, platform_instance_urn - ) - - structure = {"a": {"b": ["c"]}} - wus = list(auto_status_aspect(_create_container_aspects(structure))) - - new_wus = list( - auto_browse_path_v2( - wus, - platform=platform, - platform_instance=platform_instance, - ) - ) - assert telemetry_ping_mock.call_count == 0 - - assert ( - sum(bool(wu.get_aspect_of_type(models.BrowsePathsV2Class)) for wu in new_wus) - == 3 - ) - paths = _get_browse_paths_from_wu(new_wus) - assert paths["a"] == [platform_instance_entry] - assert paths["b"] == [ - platform_instance_entry, - *_make_container_browse_path_entries(["a"]), - ] - assert paths["c"] == [ - platform_instance_entry, - *_make_container_browse_path_entries(["a", "b"]), - ] - - -@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") -def test_auto_browse_path_v2_invalid_batch_telemetry(telemetry_ping_mock): - structure = {"a": {"b": ["c"]}} - b_urn = make_container_urn("b") - wus = [ - *_create_container_aspects(structure), - MetadataChangeProposalWrapper( # Browse path for b separate from its Container aspect - entityUrn=b_urn, - aspect=models.BrowsePathsClass(paths=["/one/two"]), - ).as_workunit(), - ] - wus = list(auto_status_aspect(wus)) - - assert telemetry_ping_mock.call_count == 0 - _ = list(auto_browse_path_v2(wus)) - assert telemetry_ping_mock.call_count == 1 - assert telemetry_ping_mock.call_args_list[0][0][0] == "incorrect_browse_path_v2" - assert telemetry_ping_mock.call_args_list[0][0][1]["num_out_of_order"] == 0 - assert telemetry_ping_mock.call_args_list[0][0][1]["num_out_of_batch"] == 1 - - -@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") -def test_auto_browse_path_v2_no_invalid_batch_telemetry_for_unrelated_aspects( - telemetry_ping_mock, -): - structure = {"a": {"b": ["c"]}} - b_urn = make_container_urn("b") - wus = [ - *_create_container_aspects(structure), - MetadataChangeProposalWrapper( # Browse path for b separate from its Container aspect - entityUrn=b_urn, - aspect=models.ContainerPropertiesClass("container name"), - ).as_workunit(), - ] - wus = list(auto_status_aspect(wus)) - - assert telemetry_ping_mock.call_count == 0 - _ = list(auto_browse_path_v2(wus)) - assert telemetry_ping_mock.call_count == 0 - - -@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") -def test_auto_browse_path_v2_invalid_order_telemetry(telemetry_ping_mock): - structure = {"a": {"b": ["c"]}} - wus = list(reversed(list(_create_container_aspects(structure)))) - wus = list(auto_status_aspect(wus)) - - assert telemetry_ping_mock.call_count == 0 - new_wus = list(auto_browse_path_v2(wus)) - assert ( - sum(bool(wu.get_aspect_of_type(models.BrowsePathsV2Class)) for wu in new_wus) - > 0 - ) - assert telemetry_ping_mock.call_count == 1 - assert telemetry_ping_mock.call_args_list[0][0][0] == "incorrect_browse_path_v2" - assert telemetry_ping_mock.call_args_list[0][0][1]["num_out_of_order"] == 1 - assert telemetry_ping_mock.call_args_list[0][0][1]["num_out_of_batch"] == 0 - - -@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping") -def test_auto_browse_path_v2_dry_run(telemetry_ping_mock): - structure = {"a": {"b": ["c"]}} - wus = list(reversed(list(_create_container_aspects(structure)))) - wus = list(auto_status_aspect(wus)) - - assert telemetry_ping_mock.call_count == 0 - new_wus = list(auto_browse_path_v2(wus, dry_run=True)) - assert wus == new_wus - assert ( - sum(bool(wu.get_aspect_of_type(models.BrowsePathsV2Class)) for wu in new_wus) - == 0 - ) - assert telemetry_ping_mock.call_count == 1 - - @freeze_time("2023-01-02 00:00:00") def test_auto_empty_dataset_usage_statistics(caplog: pytest.LogCaptureFixture) -> None: has_urn = make_dataset_urn("my_platform", "has_aspect") diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/utils/ESUtils.java b/metadata-io/src/main/java/com/linkedin/metadata/search/utils/ESUtils.java index f72b5fc1f6d22..9698a1c10d8b5 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/utils/ESUtils.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/utils/ESUtils.java @@ -100,7 +100,7 @@ public class ESUtils { // top-level properties // to field level properties public static final Map> FIELDS_TO_EXPANDED_FIELDS_LIST = - new HashMap>() { + new HashMap<>() { { put("tags", ImmutableList.of("tags", "fieldTags", "editedFieldTags")); put( @@ -117,6 +117,8 @@ public class ESUtils { put( "businessAttribute", ImmutableList.of("businessAttributeRef", "businessAttributeRef.urn")); + put("origin", ImmutableList.of("origin", "env")); + put("env", ImmutableList.of("env", "origin")); } }; diff --git a/metadata-io/src/main/java/com/linkedin/metadata/timeline/eventgenerator/SchemaMetadataChangeEventGenerator.java b/metadata-io/src/main/java/com/linkedin/metadata/timeline/eventgenerator/SchemaMetadataChangeEventGenerator.java index 53f757d8d6c6b..c40fa49173627 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/timeline/eventgenerator/SchemaMetadataChangeEventGenerator.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/timeline/eventgenerator/SchemaMetadataChangeEventGenerator.java @@ -515,8 +515,8 @@ private static List getPrimaryKeyChangeEvents( SchemaMetadata targetSchema, Urn datasetUrn, AuditStamp auditStamp) { + List primaryKeyChangeEvents = new ArrayList<>(); if (changeCategories != null && changeCategories.contains(ChangeCategory.TECHNICAL_SCHEMA)) { - List primaryKeyChangeEvents = new ArrayList<>(); Set basePrimaryKeys = (baseSchema != null && baseSchema.getPrimaryKeys() != null) ? new HashSet<>(baseSchema.getPrimaryKeys()) @@ -529,51 +529,53 @@ private static List getPrimaryKeyChangeEvents( basePrimaryKeys.stream() .filter(key -> !targetPrimaryKeys.contains(key)) .collect(Collectors.toSet()); + for (String removedBaseKeyField : removedBaseKeys) { + Urn schemaFieldUrn = getSchemaFieldUrn(datasetUrn.toString(), removedBaseKeyField); + primaryKeyChangeEvents.add( + DatasetSchemaFieldChangeEvent.schemaFieldChangeEventBuilder() + .category(ChangeCategory.TECHNICAL_SCHEMA) + .modifier(schemaFieldUrn.toString()) + .fieldUrn(schemaFieldUrn) + .fieldPath(removedBaseKeyField) + .entityUrn(datasetUrn.toString()) + .operation(ChangeOperation.MODIFY) + .semVerChange(SemanticChangeType.MAJOR) + .description( + BACKWARDS_INCOMPATIBLE_DESC + + " removal of the primary key field '" + + removedBaseKeyField + + "'") + .auditStamp(auditStamp) + .modificationCategory(SchemaFieldModificationCategory.OTHER) + .build()); + } Set addedTargetKeys = targetPrimaryKeys.stream() .filter(key -> !basePrimaryKeys.contains(key)) .collect(Collectors.toSet()); - if (!removedBaseKeys.isEmpty() || !addedTargetKeys.isEmpty()) { - String keyChangeTarget; - // Just pick the first schema field we can find for the change event - if (!removedBaseKeys.isEmpty()) { - keyChangeTarget = removedBaseKeys.stream().findFirst().get(); - } else { - keyChangeTarget = addedTargetKeys.stream().findFirst().get(); - } - - StringBuilder description = - new StringBuilder(BACKWARDS_INCOMPATIBLE_DESC + " a primary key constraint change."); - if (!removedBaseKeys.isEmpty()) { - description.append(" The following fields were removed:"); - removedBaseKeys.forEach( - removedBaseKey -> description.append(" '").append(removedBaseKey).append("'")); - description.append("."); - } - if (!addedTargetKeys.isEmpty()) { - description.append(" The following fields were added:"); - addedTargetKeys.forEach( - addedTargetKey -> description.append(" '").append(addedTargetKey).append("'")); - description.append("."); - } + for (String addedTargetKeyField : addedTargetKeys) { + Urn schemaFieldUrn = getSchemaFieldUrn(datasetUrn.toString(), addedTargetKeyField); primaryKeyChangeEvents.add( DatasetSchemaFieldChangeEvent.schemaFieldChangeEventBuilder() .category(ChangeCategory.TECHNICAL_SCHEMA) - .fieldUrn(getSchemaFieldUrn(datasetUrn, keyChangeTarget)) - .fieldPath(keyChangeTarget) - .modifier(getSchemaFieldUrn(datasetUrn, keyChangeTarget).toString()) + .modifier(getSchemaFieldUrn(datasetUrn, addedTargetKeyField).toString()) + .fieldUrn(schemaFieldUrn) + .fieldPath(addedTargetKeyField) .entityUrn(datasetUrn.toString()) .operation(ChangeOperation.MODIFY) .semVerChange(SemanticChangeType.MAJOR) - .description(description.toString()) - .modificationCategory(SchemaFieldModificationCategory.OTHER) + .description( + BACKWARDS_INCOMPATIBLE_DESC + + " addition of the primary key field '" + + addedTargetKeyField + + "'") .auditStamp(auditStamp) + .modificationCategory(SchemaFieldModificationCategory.OTHER) .build()); - return primaryKeyChangeEvents; } } - return Collections.emptyList(); + return primaryKeyChangeEvents; } @Override diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/utils/ESUtilsTest.java b/metadata-io/src/test/java/com/linkedin/metadata/search/utils/ESUtilsTest.java index 892f7088e7f61..54a9e7d8b47bd 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/utils/ESUtilsTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/utils/ESUtilsTest.java @@ -707,6 +707,86 @@ public void testGetQueryBuilderFromCriterionFieldToExpand() { + " }\n" + "}"; Assert.assertEquals(result.toString(), expected); + + final Criterion originCriterion = buildCriterion("origin", Condition.EQUAL, "PROD"); + + // Ensure that the query is expanded! + QueryBuilder originExpanded = + ESUtils.getQueryBuilderFromCriterion( + originCriterion, + false, + new HashMap<>(), + mock(OperationContext.class), + QueryFilterRewriteChain.EMPTY); + String originExpected = + "{\n" + + " \"bool\" : {\n" + + " \"should\" : [\n" + + " {\n" + + " \"terms\" : {\n" + + " \"origin.keyword\" : [\n" + + " \"PROD\"\n" + + " ],\n" + + " \"boost\" : 1.0,\n" + + " \"_name\" : \"origin\"\n" + + " }\n" + + " },\n" + + " {\n" + + " \"terms\" : {\n" + + " \"env.keyword\" : [\n" + + " \"PROD\"\n" + + " ],\n" + + " \"boost\" : 1.0,\n" + + " \"_name\" : \"env\"\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"adjust_pure_negative\" : true,\n" + + " \"minimum_should_match\" : \"1\",\n" + + " \"boost\" : 1.0\n" + + " }\n" + + "}"; + Assert.assertEquals(originExpanded.toString(), originExpected); + + final Criterion envCriterion = buildCriterion("env", Condition.EQUAL, "PROD"); + + // Ensure that the query is expanded! + QueryBuilder envExpanded = + ESUtils.getQueryBuilderFromCriterion( + envCriterion, + false, + new HashMap<>(), + mock(OperationContext.class), + QueryFilterRewriteChain.EMPTY); + String envExpected = + "{\n" + + " \"bool\" : {\n" + + " \"should\" : [\n" + + " {\n" + + " \"terms\" : {\n" + + " \"env.keyword\" : [\n" + + " \"PROD\"\n" + + " ],\n" + + " \"boost\" : 1.0,\n" + + " \"_name\" : \"env\"\n" + + " }\n" + + " },\n" + + " {\n" + + " \"terms\" : {\n" + + " \"origin.keyword\" : [\n" + + " \"PROD\"\n" + + " ],\n" + + " \"boost\" : 1.0,\n" + + " \"_name\" : \"origin\"\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"adjust_pure_negative\" : true,\n" + + " \"minimum_should_match\" : \"1\",\n" + + " \"boost\" : 1.0\n" + + " }\n" + + "}"; + Assert.assertEquals(envExpanded.toString(), envExpected); } @Test diff --git a/metadata-io/src/test/java/com/linkedin/metadata/timeline/eventgenerator/SchemaMetadataChangeEventGeneratorTest.java b/metadata-io/src/test/java/com/linkedin/metadata/timeline/eventgenerator/SchemaMetadataChangeEventGeneratorTest.java index 88dd81d953947..22dc3162c1e86 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/timeline/eventgenerator/SchemaMetadataChangeEventGeneratorTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/timeline/eventgenerator/SchemaMetadataChangeEventGeneratorTest.java @@ -204,10 +204,10 @@ public void testSchemaFieldPrimaryKeyChange() throws Exception { List actual = test.getChangeEvents(urn, entity, aspect, from, to3, auditStamp); compareDescriptions( Set.of( - "A backwards incompatible change due to a primary key constraint change. " - + "The following fields were removed: 'ID'. The following fields were added: 'ID2'."), + "A backwards incompatible change due to addition of the primary key field 'ID2'", + "A backwards incompatible change due to removal of the primary key field 'ID'"), actual); - assertEquals(1, actual.size()); + assertEquals(actual.size(), 2); compareModificationCategories(Set.of(SchemaFieldModificationCategory.OTHER.toString()), actual); } @@ -274,10 +274,10 @@ public void testSchemaFieldPrimaryKeyChangeRenameAdd() throws Exception { List actual = test.getChangeEvents(urn, entity, aspect, from, to3, auditStamp); compareDescriptions( Set.of( - "A backwards incompatible change due to a primary key constraint change. " - + "The following fields were removed: 'ID'. The following fields were added: 'ID2'."), + "A backwards incompatible change due to addition of the primary key field 'ID2'", + "A backwards incompatible change due to removal of the primary key field 'ID'"), actual); - assertEquals(1, actual.size()); + assertEquals(actual.size(), 2); compareModificationCategories(Set.of(SchemaFieldModificationCategory.OTHER.toString()), actual); Aspect to4 = diff --git a/metadata-service/configuration/src/main/resources/bootstrap_mcps.yaml b/metadata-service/configuration/src/main/resources/bootstrap_mcps.yaml index a33fad1058962..f9497258c384f 100644 --- a/metadata-service/configuration/src/main/resources/bootstrap_mcps.yaml +++ b/metadata-service/configuration/src/main/resources/bootstrap_mcps.yaml @@ -42,3 +42,4 @@ bootstrap: optional: false mcps_location: "bootstrap_mcps/ingestion-datahub-gc.yaml" values_env: "DATAHUB_GC_BOOTSTRAP_VALUES" + revision_env: "DATAHUB_GC_BOOTSTRAP_REVISION" diff --git a/metadata-service/configuration/src/main/resources/bootstrap_mcps/ingestion-datahub-gc.yaml b/metadata-service/configuration/src/main/resources/bootstrap_mcps/ingestion-datahub-gc.yaml index e78b709ad6cea..395eb5db53424 100644 --- a/metadata-service/configuration/src/main/resources/bootstrap_mcps/ingestion-datahub-gc.yaml +++ b/metadata-service/configuration/src/main/resources/bootstrap_mcps/ingestion-datahub-gc.yaml @@ -12,7 +12,7 @@ timezone: '{{schedule.timezone}}{{^schedule.timezone}}UTC{{/schedule.timezone}}' interval: '{{schedule.interval}}{{^schedule.interval}}0 1 * * *{{/schedule.interval}}' config: - version: '{{&ingestion.version}}{{^ingestion.version}}0.14.1.6{{/ingestion.version}}' + version: '{{&ingestion.version}}{{^ingestion.version}}0.14.1.7rc2{{/ingestion.version}}' recipe: source: type: 'datahub-gc'