From a4e597aef4a7a0d784adc65321383c5287f474c5 Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Tue, 10 Sep 2024 11:52:12 -0500 Subject: [PATCH 1/5] docs(oidc): document azure logout uri (#11344) --- docs/authentication/guides/sso/configure-oidc-react.md | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/docs/authentication/guides/sso/configure-oidc-react.md b/docs/authentication/guides/sso/configure-oidc-react.md index 3676bbdfcc9b8c..4dd882cb9a8642 100644 --- a/docs/authentication/guides/sso/configure-oidc-react.md +++ b/docs/authentication/guides/sso/configure-oidc-react.md @@ -79,7 +79,12 @@ At this point, your app registration should look like the following. Finally, cl :::note Optional Once registration is done, you will land on the app registration **Overview** tab. -On the left-side navigation bar, click on **Authentication** under **Manage** and add extra redirect URIs if need be (if you want to support both local testing and Azure deployments). Finally, click **Save**. +On the left-side navigation bar, click on **Authentication** under **Manage** and add extra redirect URIs if need be (if you want to support both local testing and Azure deployments). + +For logout URI: +- **Front-channel logout URL**. `https://your-datahub-domain.com/login` + +Finally, click **Save**.

From c63d75e9c555a53b7153bb1a9d3eef7c30f46ab0 Mon Sep 17 00:00:00 2001 From: Nate Bryant Date: Tue, 10 Sep 2024 13:01:35 -0400 Subject: [PATCH 2/5] feat(logging): add option to log slow GraphQL queries (#11308) --- .../app/controllers/Application.java | 46 +++++++++++++++++++ datahub-frontend/conf/application.conf | 8 +++- 2 files changed, 53 insertions(+), 1 deletion(-) diff --git a/datahub-frontend/app/controllers/Application.java b/datahub-frontend/app/controllers/Application.java index d17e600aadc072..017847367de053 100644 --- a/datahub-frontend/app/controllers/Application.java +++ b/datahub-frontend/app/controllers/Application.java @@ -9,12 +9,15 @@ import akka.util.ByteString; import auth.Authenticator; import com.datahub.authentication.AuthenticationConstants; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.linkedin.util.Pair; import com.typesafe.config.Config; import java.io.InputStream; import java.net.URI; import java.time.Duration; +import java.time.Instant; import java.util.List; import java.util.Map; import java.util.Optional; @@ -33,6 +36,7 @@ import play.libs.ws.StandaloneWSClient; import play.libs.ws.ahc.StandaloneAhcWSClient; import play.mvc.Controller; +import play.mvc.Http.Cookie; import play.mvc.Http; import play.mvc.ResponseHeader; import play.mvc.Result; @@ -132,6 +136,9 @@ public CompletableFuture proxy(String path, Http.Request request) headers.put(Http.HeaderNames.X_FORWARDED_PROTO, List.of(schema)); } + // Get the current time to measure the duration of the request + Instant start = Instant.now(); + return _ws.url( String.format( "%s://%s:%s%s", protocol, metadataServiceHost, metadataServicePort, resolvedUri)) @@ -160,6 +167,15 @@ AuthenticationConstants.LEGACY_X_DATAHUB_ACTOR_HEADER, getDataHubActorHeader(req .execute() .thenApply( apiResponse -> { + // Log the query if it takes longer than the configured threshold and verbose logging is enabled + boolean verboseGraphQLLogging = _config.getBoolean("graphql.verbose.logging"); + int verboseGraphQLLongQueryMillis = _config.getInt("graphql.verbose.slowQueryMillis"); + Instant finish = Instant.now(); + long timeElapsed = Duration.between(start, finish).toMillis(); + if (verboseGraphQLLogging && timeElapsed >= verboseGraphQLLongQueryMillis) { + logSlowQuery(request, resolvedUri, timeElapsed); + } + final ResponseHeader header = new ResponseHeader( apiResponse.getStatus(), @@ -359,4 +375,34 @@ private String mapPath(@Nonnull final String path) { // Otherwise, return original path return path; } + + + /** + * Called if verbose logging is enabled and request takes longer that the slow query milliseconds defined in the config + * @param request GraphQL request that was made + * @param resolvedUri URI that was requested + * @param duration How long the query took to complete + */ + private void logSlowQuery(Http.Request request, String resolvedUri, float duration) { + StringBuilder jsonBody = new StringBuilder(); + Optional actorCookie = request.getCookie("actor"); + String actorValue = actorCookie.isPresent() ? actorCookie.get().value() : "N/A"; + + try { + ObjectMapper mapper = new ObjectMapper(); + JsonNode jsonNode = request.body().asJson(); + ((ObjectNode) jsonNode).remove("query"); + jsonBody.append(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(jsonNode)); + } + catch (Exception e) { + _logger.info("GraphQL Request Received: {}, Unable to parse JSON body", resolvedUri); + } + String jsonBodyStr = jsonBody.toString(); + _logger.info("Slow GraphQL Request Received: {}, Request query string: {}, Request actor: {}, Request JSON: {}, Request completed in {} ms", + resolvedUri, + request.queryString(), + actorValue, + jsonBodyStr, + duration); + } } diff --git a/datahub-frontend/conf/application.conf b/datahub-frontend/conf/application.conf index 63ff2c9166fbc9..be57a33b13564d 100644 --- a/datahub-frontend/conf/application.conf +++ b/datahub-frontend/conf/application.conf @@ -298,4 +298,10 @@ entityClient.numRetries = ${?ENTITY_CLIENT_NUM_RETRIES} entityClient.restli.get.batchSize = 50 entityClient.restli.get.batchSize = ${?ENTITY_CLIENT_RESTLI_GET_BATCH_SIZE} entityClient.restli.get.batchConcurrency = 2 -entityClient.restli.get.batchConcurrency = ${?ENTITY_CLIENT_RESTLI_GET_BATCH_CONCURRENCY} \ No newline at end of file +entityClient.restli.get.batchConcurrency = ${?ENTITY_CLIENT_RESTLI_GET_BATCH_CONCURRENCY} + +# Enable verbose authentication logging +graphql.verbose.logging = false +graphql.verbose.logging = ${?GRAPHQL_VERBOSE_LOGGING} +graphql.verbose.slowQueryMillis = 2500 +graphql.verbose.slowQueryMillis = ${?GRAPHQL_VERBOSE_LONG_QUERY_MILLIS} \ No newline at end of file From f082287a07870622440456eba27fd6a8ce0c1cea Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 10 Sep 2024 10:13:25 -0700 Subject: [PATCH 3/5] docs(ingest/dbt): add docs on hiding sources (#11334) --- metadata-ingestion/docs/sources/dbt/dbt.md | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/metadata-ingestion/docs/sources/dbt/dbt.md b/metadata-ingestion/docs/sources/dbt/dbt.md index 2333ddcee677bd..52a19777dd0337 100644 --- a/metadata-ingestion/docs/sources/dbt/dbt.md +++ b/metadata-ingestion/docs/sources/dbt/dbt.md @@ -273,16 +273,19 @@ source: # ... other configs ``` -

- [Experimental] Reducing "composed of" sprawl with multiproject setups +If you have models that have tons of sources from other projects listed in the "Composed Of" section, it may also make sense to hide sources. -When many dbt projects use a single table as a source, the "Composed Of" relationships can become very large and difficult to navigate. -To address this, we are experimenting with an alternative approach to handling multiproject setups: not including sources. +### Reducing "composed of" sprawl by hiding sources + +When many dbt projects use a single table as a source, the "Composed Of" relationships can become very large and difficult to navigate +and extra source nodes can clutter the lineage graph. + +This is particularly useful for multi-project setups, but can be useful in single-project setups as well. The benefit is that your entire dbt estate becomes much easier to navigate, and the borders between projects less noticeable. The downside is that we will not pick up any documentation or meta mappings applied to dbt sources. -To enable this, set a few additional flags in your dbt source config: +To enable this, set `entities_enabled.sources: No` and `skip_sources_in_lineage: true` in your dbt source config: ```yaml source: @@ -298,4 +301,4 @@ source: skip_sources_in_lineage: true ``` -
+[Experimental] It's also possible to use `skip_sources_in_lineage: true` without disabling sources entirely. If you do this, sources will not participate in the lineage graph - they'll have upstreams but no downstreams. However, they will still contribute to docs, tags, etc to the warehouse entity. From 852a23b845122f1c2c5739d3fc24f0d135834d86 Mon Sep 17 00:00:00 2001 From: sagar-salvi-apptware <159135491+sagar-salvi-apptware@users.noreply.github.com> Date: Tue, 10 Sep 2024 22:54:55 +0530 Subject: [PATCH 4/5] feat(mode/ingest): Add support for missing Mode datasets in lineage (#11290) --- .../ingestion/source/common/subtypes.py | 1 + .../src/datahub/ingestion/source/mode.py | 128 +++++++++++---- .../integration/mode/mode_mces_golden.json | 119 ++++++++++++-- .../mode/setup/dataset_24f66e1701b6.json | 149 ++++++++++++++++++ .../setup/dataset_queries_24f66e1701b6.json | 64 ++++++++ .../mode/setup/datasets_157933cc1168.json | 10 ++ .../mode/setup/datasets_75737b70402e.json | 149 ++++++++++++++++++ .../mode/setup/reports_75737b70402e.json | 29 +++- .../tests/integration/mode/test_mode.py | 4 + 9 files changed, 615 insertions(+), 38 deletions(-) create mode 100644 metadata-ingestion/tests/integration/mode/setup/dataset_24f66e1701b6.json create mode 100644 metadata-ingestion/tests/integration/mode/setup/dataset_queries_24f66e1701b6.json create mode 100644 metadata-ingestion/tests/integration/mode/setup/datasets_157933cc1168.json create mode 100644 metadata-ingestion/tests/integration/mode/setup/datasets_75737b70402e.json diff --git a/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py b/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py index fb22f0b6edde26..4bc120fbecf8f1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py +++ b/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py @@ -72,6 +72,7 @@ class BIAssetSubTypes(StrEnum): # Mode MODE_REPORT = "Report" + MODE_DATASET = "Dataset" MODE_QUERY = "Query" MODE_CHART = "Chart" diff --git a/metadata-ingestion/src/datahub/ingestion/source/mode.py b/metadata-ingestion/src/datahub/ingestion/source/mode.py index 47475c5825a493..73427d9084dd3c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/mode.py +++ b/metadata-ingestion/src/datahub/ingestion/source/mode.py @@ -106,7 +106,7 @@ infer_output_schema, ) from datahub.utilities import config_clean -from datahub.utilities.lossy_collections import LossyDict, LossyList +from datahub.utilities.lossy_collections import LossyList logger: logging.Logger = logging.getLogger(__name__) @@ -199,10 +199,6 @@ class ModeSourceReport(StaleEntityRemovalSourceReport): num_query_template_render_failures: int = 0 num_query_template_render_success: int = 0 - dropped_imported_datasets: LossyDict[str, LossyList[str]] = dataclasses.field( - default_factory=LossyDict - ) - def report_dropped_space(self, ent_name: str) -> None: self.filtered_spaces.append(ent_name) @@ -429,10 +425,25 @@ def construct_dashboard( # Last refreshed ts. last_refreshed_ts = self._parse_last_run_at(report_info) + # Datasets + datasets = [] + for imported_dataset_name in report_info.get("imported_datasets", {}): + mode_dataset = self._get_request_json( + f"{self.workspace_uri}/reports/{imported_dataset_name.get('token')}" + ) + dataset_urn = builder.make_dataset_urn_with_platform_instance( + self.platform, + str(mode_dataset.get("id")), + platform_instance=None, + env=self.config.env, + ) + datasets.append(dataset_urn) + dashboard_info_class = DashboardInfoClass( description=description if description else "", title=title if title else "", charts=self._get_chart_urns(report_token), + datasets=datasets if datasets else None, lastModified=last_modified, lastRefreshed=last_refreshed_ts, dashboardUrl=f"{self.config.connect_uri}/{self.config.workspace}/reports/{report_token}", @@ -725,6 +736,10 @@ def _get_platform_and_dbname( data_source.get("adapter", ""), data_source.get("name", "") ) database = data_source.get("database", "") + # This is hacky but on bigquery we want to change the database if its default + # For lineage we need project_id.db.table + if platform == "bigquery" and database == "default": + database = data_source.get("host", "") return platform, database else: self.report.report_warning( @@ -900,24 +915,36 @@ def normalize_mode_query(self, query: str) -> str: return rendered_query - def construct_query_from_api_data( + def construct_query_or_dataset( self, report_token: str, query_data: dict, space_token: str, report_info: dict, + is_mode_dataset: bool, ) -> Iterable[MetadataWorkUnit]: - query_urn = self.get_dataset_urn_from_query(query_data) + query_urn = ( + self.get_dataset_urn_from_query(query_data) + if not is_mode_dataset + else self.get_dataset_urn_from_query(report_info) + ) + query_token = query_data.get("token") + externalUrl = ( + f"{self.config.connect_uri}/{self.config.workspace}/datasets/{report_token}" + if is_mode_dataset + else f"{self.config.connect_uri}/{self.config.workspace}/reports/{report_token}/details/queries/{query_token}" + ) + dataset_props = DatasetPropertiesClass( - name=query_data.get("name"), + name=report_info.get("name") if is_mode_dataset else query_data.get("name"), description=f"""### Source Code ``` sql {query_data.get("raw_query")} ``` """, - externalUrl=f"{self.config.connect_uri}/{self.config.workspace}/reports/{report_token}/details/queries/{query_token}", + externalUrl=externalUrl, customProperties=self.get_custom_props_from_dict( query_data, [ @@ -939,7 +966,22 @@ def construct_query_from_api_data( ).as_workunit() ) - subtypes = SubTypesClass(typeNames=([BIAssetSubTypes.MODE_QUERY])) + if is_mode_dataset: + space_container_key = self.gen_space_key(space_token) + yield from add_dataset_to_container( + container_key=space_container_key, + dataset_urn=query_urn, + ) + + subtypes = SubTypesClass( + typeNames=( + [ + BIAssetSubTypes.MODE_DATASET + if is_mode_dataset + else BIAssetSubTypes.MODE_QUERY + ] + ) + ) yield ( MetadataChangeProposalWrapper( entityUrn=query_urn, @@ -950,7 +992,9 @@ def construct_query_from_api_data( yield MetadataChangeProposalWrapper( entityUrn=query_urn, aspect=BrowsePathsV2Class( - path=self._browse_path_query(space_token, report_info) + path=self._browse_path_dashboard(space_token) + if is_mode_dataset + else self._browse_path_query(space_token, report_info) ), ).as_workunit() @@ -958,7 +1002,6 @@ def construct_query_from_api_data( upstream_warehouse_platform, upstream_warehouse_db_name, ) = self._get_platform_and_dbname(query_data.get("data_source_id")) - if upstream_warehouse_platform is None: # this means we can't infer the platform return @@ -1022,7 +1065,7 @@ def construct_query_from_api_data( schema_fields = infer_output_schema(parsed_query_object) if schema_fields: schema_metadata = SchemaMetadataClass( - schemaName="mode_query", + schemaName="mode_dataset" if is_mode_dataset else "mode_query", platform=f"urn:li:dataPlatform:{self.platform}", version=0, fields=schema_fields, @@ -1040,7 +1083,7 @@ def construct_query_from_api_data( ) yield from self.get_upstream_lineage_for_parsed_sql( - query_data, parsed_query_object + query_urn, query_data, parsed_query_object ) operation = OperationClass( @@ -1089,10 +1132,9 @@ def construct_query_from_api_data( ).as_workunit() def get_upstream_lineage_for_parsed_sql( - self, query_data: dict, parsed_query_object: SqlParsingResult + self, query_urn: str, query_data: dict, parsed_query_object: SqlParsingResult ) -> List[MetadataWorkUnit]: wu = [] - query_urn = self.get_dataset_urn_from_query(query_data) if parsed_query_object is None: logger.info( @@ -1350,6 +1392,24 @@ def _get_reports(self, space_token: str) -> List[dict]: ) return reports + @lru_cache(maxsize=None) + def _get_datasets(self, space_token: str) -> List[dict]: + """ + Retrieves datasets for a given space token. + """ + datasets = [] + try: + url = f"{self.workspace_uri}/spaces/{space_token}/datasets" + datasets_json = self._get_request_json(url) + datasets = datasets_json.get("_embedded", {}).get("reports", []) + except HTTPError as http_error: + self.report.report_failure( + title="Failed to Retrieve Datasets for Space", + message=f"Unable to retrieve datasets for space token {space_token}.", + context=f"Error: {str(http_error)}", + ) + return datasets + @lru_cache(maxsize=None) def _get_queries(self, report_token: str) -> list: queries = [] @@ -1523,24 +1583,14 @@ def emit_chart_mces(self) -> Iterable[MetadataWorkUnit]: for report in reports: report_token = report.get("token", "") - if report.get("imported_datasets"): - # The connector doesn't support imported datasets yet. - # For now, we just keep this in the report to track what we're missing. - imported_datasets = [ - imported_dataset.get("name") or str(imported_dataset) - for imported_dataset in report["imported_datasets"] - ] - self.report.dropped_imported_datasets.setdefault( - report_token, LossyList() - ).extend(imported_datasets) - queries = self._get_queries(report_token) for query in queries: - query_mcps = self.construct_query_from_api_data( + query_mcps = self.construct_query_or_dataset( report_token, query, space_token=space_token, report_info=report, + is_mode_dataset=False, ) chart_fields: Dict[str, SchemaFieldClass] = {} for wu in query_mcps: @@ -1566,6 +1616,27 @@ def emit_chart_mces(self) -> Iterable[MetadataWorkUnit]: query_name=query["name"], ) + def emit_dataset_mces(self): + """ + Emits MetadataChangeEvents (MCEs) for datasets within each space. + """ + for space_token, _ in self.space_tokens.items(): + datasets = self._get_datasets(space_token) + + for report in datasets: + report_token = report.get("token", "") + queries = self._get_queries(report_token) + for query in queries: + query_mcps = self.construct_query_or_dataset( + report_token, + query, + space_token=space_token, + report_info=report, + is_mode_dataset=True, + ) + for wu in query_mcps: + yield wu + @classmethod def create(cls, config_dict: dict, ctx: PipelineContext) -> "ModeSource": config: ModeConfig = ModeConfig.parse_obj(config_dict) @@ -1581,6 +1652,7 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: yield from self.emit_dashboard_mces() + yield from self.emit_dataset_mces() yield from self.emit_chart_mces() def get_report(self) -> SourceReport: diff --git a/metadata-ingestion/tests/integration/mode/mode_mces_golden.json b/metadata-ingestion/tests/integration/mode/mode_mces_golden.json index 2fa9f4ee86a860..a6a685672bda00 100644 --- a/metadata-ingestion/tests/integration/mode/mode_mces_golden.json +++ b/metadata-ingestion/tests/integration/mode/mode_mces_golden.json @@ -132,8 +132,8 @@ "json": { "timestampMillis": 1638860400000, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "viewsCount": 6 } @@ -173,7 +173,9 @@ "charts": [ "urn:li:chart:(mode,f622b9ee725b)" ], - "datasets": [], + "datasets": [ + "urn:li:dataset:(urn:li:dataPlatform:mode,5450544,PROD)" + ], "lastModified": { "created": { "time": 1639169724316, @@ -243,6 +245,89 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mode,5450544,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": { + "updated_at": "2024-09-02T07:40:44.046Z", + "last_run_id": "3535709679", + "data_source_id": "44763", + "report_imports_count": "2" + }, + "externalUrl": "https://app.mode.com/acryl/datasets/24f66e1701b6", + "name": "Dataset 1", + "description": "### Source Code\n``` sql\n-- Returns first 100 rows from DATAHUB_COMMUNITY.POSTGRES_PUBLIC.COMPANY\n SELECT \n\t\tAGE,\n\t\tID,\n\t\tNAME,\n\t\t_FIVETRAN_DELETED,\n\t\t_FIVETRAN_SYNCED\n FROM DATAHUB_COMMUNITY.POSTGRES_PUBLIC.COMPANY LIMIT 100;\n\n-- Returns first 100 rows from ETHAN_TEST_DB.PUBLIC.ACCOUNT_PHONE_NUMBER\n SELECT \n\t\tCOMMUNICATION_ACCOUNT_ID,\n\t\tID,\n\t\tMMS_CAPABLE,\n\t\tPHONE_NUMBER,\n\t\tSMS_CAPABLE,\n\t\tSTATUS,\n\t\tSTATUS_TLM,\n\t\tTLM,\n\t\tVOICE_CAPABLE,\n\t\tWHEN_CREATED\n FROM ETHAN_TEST_DB.PUBLIC.ACCOUNT_PHONE_NUMBER LIMIT 100;\n \n \n```\n ", + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "mode-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mode,5450544,PROD)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:800cfcb4cec6ad587cafde11a0b0bb4a" + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "mode-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mode,5450544,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Dataset" + ] + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "mode-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mode,5450544,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "acryl" + }, + { + "id": "urn:li:container:800cfcb4cec6ad587cafde11a0b0bb4a", + "urn": "urn:li:container:800cfcb4cec6ad587cafde11a0b0bb4a" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "mode-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mode,10149707,PROD)", @@ -643,8 +728,8 @@ "json": { "timestampMillis": 1638860400000, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "operationType": "UPDATE", "lastUpdatedTimestamp": 1639177973273 @@ -721,9 +806,9 @@ "json": { "fields": [ { - "schemaFieldUrn": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mode,10149707,PROD),payment_date)", + "schemaFieldUrn": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mode,10149707,PROD),amount)", "schemaField": { - "fieldPath": "payment_date", + "fieldPath": "amount", "nullable": false, "type": { "type": { @@ -743,9 +828,9 @@ } }, { - "schemaFieldUrn": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mode,10149707,PROD),amount)", + "schemaFieldUrn": "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mode,10149707,PROD),payment_date)", "schemaField": { - "fieldPath": "amount", + "fieldPath": "payment_date", "nullable": false, "type": { "type": { @@ -943,6 +1028,22 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mode,5450544,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "mode-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "query", "entityUrn": "urn:li:query:10149707.34499.1897576958", diff --git a/metadata-ingestion/tests/integration/mode/setup/dataset_24f66e1701b6.json b/metadata-ingestion/tests/integration/mode/setup/dataset_24f66e1701b6.json new file mode 100644 index 00000000000000..4e9cb911ab565d --- /dev/null +++ b/metadata-ingestion/tests/integration/mode/setup/dataset_24f66e1701b6.json @@ -0,0 +1,149 @@ +{ + "token": "24f66e1701b6", + "id": 5450544, + "name": "Dataset 1", + "description": "", + "created_at": "2024-09-02T07:38:43.722Z", + "updated_at": "2024-09-02T07:40:44.026Z", + "published_at": null, + "edited_at": "2024-09-02T07:40:32.668Z", + "type": "DatasetReport", + "last_successful_sync_at": null, + "last_saved_at": "2024-09-02T07:40:32.679Z", + "archived": false, + "space_token": "75737b70402e", + "account_id": 751252, + "account_username": "acryltest", + "public": false, + "manual_run_disabled": false, + "drill_anywhere_enabled": false, + "run_privately": true, + "drilldowns_enabled": false, + "expected_runtime": 0.763795, + "last_successfully_run_at": "2024-09-02T07:40:44.009Z", + "last_run_at": "2024-09-02T07:40:43.185Z", + "last_successful_run_token": "29e56ca29a45", + "query_count": 1, + "max_query_count": 160, + "runs_count": 3, + "schedules_count": 0, + "query_preview": "-- Returns first 100 rows from DATAHUB_COMMUNITY.POSTGRES_PUBLIC.COMPANY\n SELECT \n\t\tAGE,\n\t\tID,\n\t\tNAME,\n\t\t_FIVETRAN_DELE", + "view_count": 6, + "thoughtspot_published_at": null, + "_links": { + "self": { + "href": "/api/acryltest/reports/24f66e1701b6" + }, + "web": { + "href": "https://app.mode.com/acryltest/datasets/24f66e1701b6" + }, + "web_edit": { + "href": "/editor/acryltest/datasets/24f66e1701b6" + }, + "account": { + "href": "/api/acryltest" + }, + "report_run": { + "templated": true, + "href": "/api/acryltest/reports/24f66e1701b6/runs/{id}?embed[result]=1" + }, + "space": { + "href": "/api/acryltest/collections/75737b70402e" + }, + "space_links": { + "href": "/api/acryltest/reports/24f66e1701b6/space_links" + }, + "queries": { + "href": "/api/acryltest/reports/24f66e1701b6/queries" + }, + "report_runs": { + "href": "/api/acryltest/reports/24f66e1701b6/runs" + }, + "report_pins": { + "href": "/api/acryltest/reports/24f66e1701b6/pins" + }, + "report_schedules": { + "href": "/api/acryltest/reports/24f66e1701b6/schedules" + }, + "dataset_dependencies": { + "href": "/api/acryltest/datasets/24f66e1701b6/reports" + }, + "last_run": { + "href": "/api/acryltest/reports/24f66e1701b6/runs/29e56ca29a45" + }, + "last_successful_run": { + "href": "/api/acryltest/reports/24f66e1701b6/runs/29e56ca29a45" + }, + "perspective_email_subscription_memberships": { + "href": "/api/acryltest/reports/24f66e1701b6/perspective_email_report_subscription_memberships" + }, + "creator": { + "href": "/api/modeuser" + }, + "report_index_web": { + "href": "/acryltest/spaces/75737b70402e" + } + }, + "_forms": { + "edit": { + "method": "patch", + "action": "/api/acryltest/reports/24f66e1701b6", + "input": { + "report": { + "name": { + "type": "text", + "value": "Dataset_2" + }, + "description": { + "type": "text", + "value": "" + }, + "account_id": { + "type": "text", + "value": 751252 + }, + "space_token": { + "type": "text", + "value": "75737b70402e" + } + } + } + }, + "destroy": { + "method": "delete", + "action": "/api/acryltest/reports/24f66e1701b6" + }, + "archive": { + "method": "patch", + "action": "/api/acryltest/reports/24f66e1701b6/archive" + }, + "unarchive": { + "method": "patch", + "action": "/api/acryltest/reports/24f66e1701b6/unarchive" + }, + "update_settings": { + "method": "patch", + "action": "/api/acryltest/reports/24f66e1701b6/update_settings", + "input": { + "report": { + "manual_run_disabled": { + "type": "select", + "options": [ + true, + false + ], + "value": false + }, + "drill_anywhere_enabled": { + "type": "select", + "options": [ + true, + false + ], + "value": false + } + } + } + } + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/mode/setup/dataset_queries_24f66e1701b6.json b/metadata-ingestion/tests/integration/mode/setup/dataset_queries_24f66e1701b6.json new file mode 100644 index 00000000000000..ba3be157786e6f --- /dev/null +++ b/metadata-ingestion/tests/integration/mode/setup/dataset_queries_24f66e1701b6.json @@ -0,0 +1,64 @@ +{ + "_links": { + "self": { + "href": "/api/acryl/reports/24f66e1701b6/queries" + } + }, + "_embedded": { + "queries": [ + { + "id": 19780522, + "token": "9b2f34343531", + "raw_query": "-- Returns first 100 rows from DATAHUB_COMMUNITY.POSTGRES_PUBLIC.COMPANY\n SELECT \n\t\tAGE,\n\t\tID,\n\t\tNAME,\n\t\t_FIVETRAN_DELETED,\n\t\t_FIVETRAN_SYNCED\n FROM DATAHUB_COMMUNITY.POSTGRES_PUBLIC.COMPANY LIMIT 100;\n\n-- Returns first 100 rows from ETHAN_TEST_DB.PUBLIC.ACCOUNT_PHONE_NUMBER\n SELECT \n\t\tCOMMUNICATION_ACCOUNT_ID,\n\t\tID,\n\t\tMMS_CAPABLE,\n\t\tPHONE_NUMBER,\n\t\tSMS_CAPABLE,\n\t\tSTATUS,\n\t\tSTATUS_TLM,\n\t\tTLM,\n\t\tVOICE_CAPABLE,\n\t\tWHEN_CREATED\n FROM ETHAN_TEST_DB.PUBLIC.ACCOUNT_PHONE_NUMBER LIMIT 100;\n \n ", + "created_at": "2024-09-02T07:38:43.755Z", + "updated_at": "2024-09-02T07:40:44.046Z", + "name": "Query 1", + "last_run_id": 3535709679, + "data_source_id": 44763, + "explorations_count": 0, + "report_imports_count": 2, + "dbt_metric_id": null, + "_links": { + "self": { + "href": "/api/acryl/reports/24f66e1701b6/queries/9b2f34343531" + }, + "report": { + "href": "/api/acryl/reports/24f66e1701b6" + }, + "report_runs": { + "href": "/api/acryl/reports/24f66e1701b6/runs" + }, + "query_runs": { + "href": "/api/acryl/reports/24f66e1701b6/queries/9b2f34343531/runs" + }, + "creator": { + "href": "/api/modeuser" + } + }, + "_forms": { + "edit": { + "method": "patch", + "action": "/api/acryl/reports/24f66e1701b6/queries/9b2f34343531", + "content_type": "application/json", + "input": { + "query": { + "raw_query": { + "type": "text", + "value": "-- Returns first 100 rows from DATAHUB_COMMUNITY.POSTGRES_PUBLIC.COMPANY\n SELECT \n\t\tAGE,\n\t\tID,\n\t\tNAME,\n\t\t_FIVETRAN_DELETED,\n\t\t_FIVETRAN_SYNCED\n FROM DATAHUB_COMMUNITY.POSTGRES_PUBLIC.COMPANY LIMIT 100;\n\n-- Returns first 100 rows from ETHAN_TEST_DB.PUBLIC.ACCOUNT_PHONE_NUMBER\n SELECT \n\t\tCOMMUNICATION_ACCOUNT_ID,\n\t\tID,\n\t\tMMS_CAPABLE,\n\t\tPHONE_NUMBER,\n\t\tSMS_CAPABLE,\n\t\tSTATUS,\n\t\tSTATUS_TLM,\n\t\tTLM,\n\t\tVOICE_CAPABLE,\n\t\tWHEN_CREATED\n FROM ETHAN_TEST_DB.PUBLIC.ACCOUNT_PHONE_NUMBER LIMIT 100;\n \n " + }, + "name": { + "type": "text", + "value": "Query 1" + }, + "data_source_id": { + "type": "text", + "value": 44763 + } + } + } + } + } + } + ] + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/mode/setup/datasets_157933cc1168.json b/metadata-ingestion/tests/integration/mode/setup/datasets_157933cc1168.json new file mode 100644 index 00000000000000..4ca48a84e9110f --- /dev/null +++ b/metadata-ingestion/tests/integration/mode/setup/datasets_157933cc1168.json @@ -0,0 +1,10 @@ +{ + "_links": { + "self": { + "href": "/api/acryltest/collections/157933cc1168/reports" + } + }, + "_embedded": { + "reports": [] + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/mode/setup/datasets_75737b70402e.json b/metadata-ingestion/tests/integration/mode/setup/datasets_75737b70402e.json new file mode 100644 index 00000000000000..ffb1bbf521db76 --- /dev/null +++ b/metadata-ingestion/tests/integration/mode/setup/datasets_75737b70402e.json @@ -0,0 +1,149 @@ +{ + "_links": { + "self": { + "href": "/api/acryltest/collections/75737b70402e/reports" + } + }, + "_embedded": { + "reports": [ + { + "account_id": 751252, + "account_username": "acryltest", + "collection_name": "AcrylTest", + "collection_token": "75737b70402e", + "created_at": "2024-09-02T07:38:43.722Z", + "description": "", + "drilldowns_enabled": false, + "edited_at": "2024-09-02T07:40:32.668Z", + "id": 5450544, + "is_sample": false, + "last_run_at": "2024-09-02T07:40:43.185Z", + "last_saved_at": "2024-09-02T07:40:32.679Z", + "last_successful_run_token": "29e56ca29a45", + "last_successful_sync_at": null, + "last_successfully_run_at": "2024-09-02T07:40:44.009Z", + "manual_run_disabled": false, + "max_query_count": 1, + "name": "Dataset 1", + "public": false, + "query_count": 1, + "query_preview": "-- Returns first 100 rows from DATAHUB_COMMUNITY.POSTGRES_PUBLIC.COMPANY\n SELECT \n\t\tAGE,\n\t\tID,\n\t\tNAME,\n\t\t_FIVETRAN_DELE", + "run_privately": true, + "runs_count": 3, + "schedules_count": 0, + "space_token": "75737b70402e", + "switch_view_token": "f213a1bb8f8a", + "token": "24f66e1701b6", + "type": "DatasetReport", + "updated_at": "2024-09-02T07:40:44.026Z", + "view_count": 6, + "thoughtspot_published_at": null, + "_links": { + "account": { + "href": "/api/acryltest" + }, + "creator": { + "href": "/api/modeuser" + }, + "dataset_dependencies": { + "href": "/api/acryltest/datasets/24f66e1701b6/reports" + }, + "last_run": { + "href": "/api/acryltest/reports/24f66e1701b6/runs/29e56ca29a45" + }, + "last_successful_run": { + "href": "/api/acryltest/reports/24f66e1701b6/runs/29e56ca29a45" + }, + "queries": { + "href": "/api/acryltest/reports/24f66e1701b6/queries" + }, + "report_index_web": { + "href": "/acryltest/spaces/75737b70402e" + }, + "report_pins": { + "href": "/api/acryltest/reports/24f66e1701b6/pins" + }, + "report_run": { + "templated": true, + "href": "/api/acryltest/reports/24f66e1701b6/runs/{id}?embed[result]=1" + }, + "report_runs": { + "href": "/api/acryltest/reports/24f66e1701b6/runs" + }, + "report_schedules": { + "href": "/api/acryltest/reports/24f66e1701b6/schedules" + }, + "self": { + "href": "/api/acryltest/reports/24f66e1701b6" + }, + "space": { + "href": "/api/acryltest/collections/75737b70402e" + }, + "space_links": { + "href": "/api/acryltest/reports/24f66e1701b6/space_links" + }, + "web": { + "href": "https://app.mode.com/acryltest/datasets/24f66e1701b6" + }, + "web_edit": { + "href": "/editor/acryltest/datasets/24f66e1701b6" + } + }, + "_forms": { + "destroy": { + "method": "delete", + "action": "/api/acryltest/reports/24f66e1701b6" + }, + "edit": { + "method": "patch", + "action": "/api/acryltest/reports/24f66e1701b6", + "input": { + "report": { + "name": { + "type": "text", + "value": "Dataset_2" + }, + "description": { + "type": "text", + "value": "" + }, + "account_id": { + "type": "text", + "value": 751252 + }, + "space_token": { + "type": "text", + "value": "75737b70402e" + } + } + } + }, + "update_settings": { + "method": "patch", + "action": "/api/acryltest/reports/24f66e1701b6/update_settings", + "input": { + "report": { + "manual_run_disabled": { + "type": "select", + "options": [ + true, + false + ], + "value": false + }, + "drill_anywhere_enabled": { + "type": "select", + "options": [ + true, + false + ], + "value": false + } + } + } + } + } + } + ] + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/mode/setup/reports_75737b70402e.json b/metadata-ingestion/tests/integration/mode/setup/reports_75737b70402e.json index 9718967e5e463f..956093a95d8492 100644 --- a/metadata-ingestion/tests/integration/mode/setup/reports_75737b70402e.json +++ b/metadata-ingestion/tests/integration/mode/setup/reports_75737b70402e.json @@ -221,7 +221,34 @@ } } } - } + }, + "imported_datasets": [ + { + "name": "Dataset 1", + "token": "24f66e1701b6", + "_links": { + "report": { + "href": "/api/acryltest/reports/94750a190dc8" + }, + "source_dataset": { + "href": "/api/acryltest/reports/24f66e1701b6" + } + }, + "_forms": { + "refresh": { + "method": "post", + "action": "/api/acryltest/reports/94750a190dc8/runs", + "input": { + "dataset_tokens": [ + { + "token": "24f66e1701b6" + } + ] + } + } + } + } + ] }] } } \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/mode/test_mode.py b/metadata-ingestion/tests/integration/mode/test_mode.py index 0346767b05d253..7ea6597460de20 100644 --- a/metadata-ingestion/tests/integration/mode/test_mode.py +++ b/metadata-ingestion/tests/integration/mode/test_mode.py @@ -22,6 +22,10 @@ "https://app.mode.com/api/acryl/reports/9d2da37fa91e/queries/6e26a9f3d4e2/charts": "charts.json", "https://app.mode.com/api/acryl/data_sources": "data_sources.json", "https://app.mode.com/api/acryl/definitions": "definitions.json", + "https://app.mode.com/api/acryl/spaces/157933cc1168/datasets": "datasets_157933cc1168.json", + "https://app.mode.com/api/acryl/spaces/75737b70402e/datasets": "datasets_75737b70402e.json", + "https://app.mode.com/api/acryl/reports/24f66e1701b6": "dataset_24f66e1701b6.json", + "https://app.mode.com/api/acryl/reports/24f66e1701b6/queries": "dataset_queries_24f66e1701b6.json", } RESPONSE_ERROR_LIST = ["https://app.mode.com/api/acryl/spaces/75737b70402e/reports"] From fc92d23cc16ab0dd5f675d04c4bb29442b3b1d6a Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Tue, 10 Sep 2024 12:41:21 -0500 Subject: [PATCH 5/5] feat(entity-service): fallback logic for aspect version (#11304) --- .../entity/ebean/batch/AspectsBatchImpl.java | 2 +- .../linkedin/metadata/entity/AspectDao.java | 26 ++-- .../metadata/entity/EntityServiceImpl.java | 35 +++-- .../linkedin/metadata/entity/EntityUtils.java | 59 +++++--- .../metadata/entity/TransactionContext.java | 69 +++++++++ .../entity/cassandra/CassandraAspectDao.java | 29 ++-- .../metadata/entity/ebean/EbeanAspectDao.java | 57 +++---- .../entity/EbeanEntityServiceTest.java | 140 ++++++++++++++++++ .../java/entities/EntitiesControllerTest.java | 7 +- 9 files changed, 325 insertions(+), 99 deletions(-) create mode 100644 metadata-io/src/main/java/com/linkedin/metadata/entity/TransactionContext.java diff --git a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java index 3ec090a3db3a45..1fba8426317209 100644 --- a/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java +++ b/metadata-io/metadata-io-api/src/main/java/com/linkedin/metadata/entity/ebean/batch/AspectsBatchImpl.java @@ -181,7 +181,7 @@ public AspectsBatchImplBuilder mcps( mcp, auditStamp, retrieverContext.getAspectRetriever()); } } catch (IllegalArgumentException e) { - log.error("Invalid proposal, skipping and proceeding with batch: " + mcp, e); + log.error("Invalid proposal, skipping and proceeding with batch: {}", mcp, e); return null; } }) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/AspectDao.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/AspectDao.java index 401d40ec177cee..3f0545b6f94a85 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/AspectDao.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/AspectDao.java @@ -6,13 +6,11 @@ import com.linkedin.metadata.entity.ebean.PartitionedStream; import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs; import com.linkedin.metadata.utils.metrics.MetricUtils; -import io.ebean.Transaction; import java.sql.Timestamp; import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Function; -import java.util.function.Supplier; import java.util.stream.Stream; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -27,10 +25,10 @@ * aspect is set to 0 for efficient retrieval. In most cases only the latest state of an aspect will * be fetched. See {@link EntityServiceImpl} for more details. * - *

TODO: This interface exposes {@link #runInTransactionWithRetry(Supplier, int)} because {@link - * EntityServiceImpl} concerns itself with batching multiple commands into a single transaction. It - * exposes storage concerns somewhat and it'd be worth looking into ways to move this responsibility - * inside {@link AspectDao} implementations. + *

TODO: This interface exposes {@link #runInTransactionWithRetry(Function, int)} + * (TransactionContext)} because {@link EntityServiceImpl} concerns itself with batching multiple + * commands into a single transaction. It exposes storage concerns somewhat and it'd be worth + * looking into ways to move this responsibility inside {@link AspectDao} implementations. */ public interface AspectDao { String ASPECT_WRITE_COUNT_METRIC_NAME = "aspectWriteCount"; @@ -77,7 +75,7 @@ Map> getLatestAspects( Map> urnAspects, boolean forUpdate); void saveAspect( - @Nullable Transaction tx, + @Nullable TransactionContext txContext, @Nonnull final String urn, @Nonnull final String aspectName, @Nonnull final String aspectMetadata, @@ -89,10 +87,12 @@ void saveAspect( final boolean insert); void saveAspect( - @Nullable Transaction tx, @Nonnull final EntityAspect aspect, final boolean insert); + @Nullable TransactionContext txContext, + @Nonnull final EntityAspect aspect, + final boolean insert); long saveLatestAspect( - @Nullable Transaction tx, + @Nullable TransactionContext txContext, @Nonnull final String urn, @Nonnull final String aspectName, @Nullable final String oldAspectMetadata, @@ -107,7 +107,7 @@ long saveLatestAspect( @Nullable final String newSystemMetadata, final Long nextVersion); - void deleteAspect(@Nullable Transaction tx, @Nonnull final EntityAspect aspect); + void deleteAspect(@Nullable TransactionContext txContext, @Nonnull final EntityAspect aspect); @Nonnull ListResult listUrns( @@ -125,7 +125,7 @@ ListResult listUrns( @Nonnull Stream streamAspects(String entityName, String aspectName); - int deleteUrn(@Nullable Transaction tx, @Nonnull final String urn); + int deleteUrn(@Nullable TransactionContext txContext, @Nonnull final String urn); @Nonnull ListResult listLatestAspectMetadata( @@ -159,11 +159,11 @@ default Map getNextVersions( @Nonnull T runInTransactionWithRetry( - @Nonnull final Function block, final int maxTransactionRetry); + @Nonnull final Function block, final int maxTransactionRetry); @Nonnull default List runInTransactionWithRetry( - @Nonnull final Function block, + @Nonnull final Function block, AspectsBatch batch, final int maxTransactionRetry) { return List.of(runInTransactionWithRetry(block, maxTransactionRetry)); diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java index fd6ad57c0adf52..4b83ea40f722db 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java @@ -6,6 +6,7 @@ import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME; import static com.linkedin.metadata.Constants.SYSTEM_ACTOR; import static com.linkedin.metadata.Constants.UI_SOURCE; +import static com.linkedin.metadata.entity.TransactionContext.DEFAULT_MAX_TRANSACTION_RETRY; import static com.linkedin.metadata.utils.PegasusUtils.constructMCL; import static com.linkedin.metadata.utils.PegasusUtils.getDataTemplateClassFromSchema; import static com.linkedin.metadata.utils.PegasusUtils.urnToEntityName; @@ -79,7 +80,6 @@ import com.linkedin.r2.RemoteInvocationException; import com.linkedin.util.Pair; import io.datahubproject.metadata.context.OperationContext; -import io.ebean.Transaction; import io.opentelemetry.extension.annotations.WithSpan; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; @@ -146,8 +146,6 @@ public class EntityServiceImpl implements EntityService { * As described above, the latest version of an aspect should always take the value 0, with * monotonically increasing version incrementing as usual once the latest version is replaced. */ - private static final int DEFAULT_MAX_TRANSACTION_RETRY = 3; - protected final AspectDao aspectDao; @VisibleForTesting @Getter private final EventProducer producer; @@ -837,7 +835,7 @@ private List ingestAspectsToLocalDB( return aspectDao .runInTransactionWithRetry( - (tx) -> { + (txContext) -> { // Generate default aspects within the transaction (they are re-calculated on retry) AspectsBatch batchWithDefaults = DefaultAspectsUtil.withAdditionalChanges( @@ -852,7 +850,8 @@ private List ingestAspectsToLocalDB( aspectDao.getLatestAspects(urnAspects, true)); // read #2 (potentially) final Map> nextVersions = - EntityUtils.calculateNextVersions(aspectDao, latestAspects, urnAspects); + EntityUtils.calculateNextVersions( + txContext, aspectDao, latestAspects, urnAspects); // 1. Convert patches to full upserts // 2. Run any entity/aspect level hooks @@ -872,7 +871,7 @@ private List ingestAspectsToLocalDB( Map> newNextVersions = EntityUtils.calculateNextVersions( - aspectDao, updatedLatestAspects, updatedItems.getFirst()); + txContext, aspectDao, updatedLatestAspects, updatedItems.getFirst()); // merge updatedNextVersions = AspectsBatch.merge(nextVersions, newNextVersions); } else { @@ -939,7 +938,7 @@ private List ingestAspectsToLocalDB( if (overwrite || latest == null) { result = ingestAspectToLocalDB( - tx, + txContext, item.getUrn(), item.getAspectName(), item.getRecordTemplate(), @@ -973,8 +972,8 @@ private List ingestAspectsToLocalDB( .collect(Collectors.toList()); // commit upserts prior to retention or kafka send, if supported by impl - if (tx != null) { - tx.commitAndContinue(); + if (txContext != null) { + txContext.commitAndContinue(); } long took = ingestToLocalDBTimer.stop(); log.info( @@ -2209,7 +2208,7 @@ private RollbackResult deleteAspectWithoutMCL( final RollbackResult result = aspectDao.runInTransactionWithRetry( - (tx) -> { + (txContext) -> { Integer additionalRowsDeleted = 0; // 1. Fetch the latest existing version of the aspect. @@ -2282,7 +2281,7 @@ private RollbackResult deleteAspectWithoutMCL( } // 5. Apply deletes and fix up latest row - aspectsToDelete.forEach(aspect -> aspectDao.deleteAspect(tx, aspect)); + aspectsToDelete.forEach(aspect -> aspectDao.deleteAspect(txContext, aspect)); if (survivingAspect != null) { // if there was a surviving aspect, copy its information into the latest row @@ -2300,16 +2299,16 @@ private RollbackResult deleteAspectWithoutMCL( latest .getEntityAspect() .setCreatedFor(survivingAspect.getEntityAspect().getCreatedFor()); - aspectDao.saveAspect(tx, latest.getEntityAspect(), false); + aspectDao.saveAspect(txContext, latest.getEntityAspect(), false); // metrics aspectDao.incrementWriteMetrics( aspectName, 1, latest.getMetadataRaw().getBytes(StandardCharsets.UTF_8).length); - aspectDao.deleteAspect(tx, survivingAspect.getEntityAspect()); + aspectDao.deleteAspect(txContext, survivingAspect.getEntityAspect()); } else { if (isKeyAspect) { if (hardDelete) { // If this is the key aspect, delete the entity entirely. - additionalRowsDeleted = aspectDao.deleteUrn(tx, urn); + additionalRowsDeleted = aspectDao.deleteUrn(txContext, urn); } else if (deleteItem.getEntitySpec().hasAspect(Constants.STATUS_ASPECT_NAME)) { // soft delete by setting status.removed=true (if applicable) final Status statusAspect = new Status(); @@ -2326,7 +2325,7 @@ private RollbackResult deleteAspectWithoutMCL( } } else { // Else, only delete the specific aspect. - aspectDao.deleteAspect(tx, latest.getEntityAspect()); + aspectDao.deleteAspect(txContext, latest.getEntityAspect()); } } @@ -2466,7 +2465,7 @@ private Map getEnvelopedAspects( @Nonnull private UpdateAspectResult ingestAspectToLocalDB( - @Nullable Transaction tx, + @Nullable TransactionContext txContext, @Nonnull final Urn urn, @Nonnull final String aspectName, @Nonnull final RecordTemplate newValue, @@ -2495,7 +2494,7 @@ private UpdateAspectResult ingestAspectToLocalDB( latest.getEntityAspect().setSystemMetadata(RecordUtils.toJsonString(latestSystemMetadata)); log.info("Ingesting aspect with name {}, urn {}", aspectName, urn); - aspectDao.saveAspect(tx, latest.getEntityAspect(), false); + aspectDao.saveAspect(txContext, latest.getEntityAspect(), false); // metrics aspectDao.incrementWriteMetrics( @@ -2518,7 +2517,7 @@ private UpdateAspectResult ingestAspectToLocalDB( String newValueStr = EntityApiUtils.toJsonAspect(newValue); long versionOfOld = aspectDao.saveLatestAspect( - tx, + txContext, urn.toString(), aspectName, latest == null ? null : EntityApiUtils.toJsonAspect(oldValue), diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityUtils.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityUtils.java index 7842365ce429be..3c4109970e9d0b 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityUtils.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityUtils.java @@ -285,38 +285,51 @@ public static List toSystemAspects( * Use the precalculated next version from system metadata if it exists, otherwise lookup the next * version the normal way from the database * + * @param txContext * @param aspectDao database access * @param latestAspects aspect version 0 with system metadata * @param urnAspects urn/aspects which we need next version information for * @return map of the urn/aspect to the next aspect version */ public static Map> calculateNextVersions( + TransactionContext txContext, AspectDao aspectDao, Map> latestAspects, Map> urnAspects) { - Map> precalculatedVersions = - latestAspects.entrySet().stream() - .map( - entry -> - Map.entry( - entry.getKey(), convertSystemAspectToNextVersionMap(entry.getValue()))) - .filter(entry -> !entry.getValue().isEmpty()) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - - Map> missingAspectVersions = - urnAspects.entrySet().stream() - .flatMap( - entry -> - entry.getValue().stream() - .map(aspectName -> Pair.of(entry.getKey(), aspectName))) - .filter( - urnAspectName -> - !precalculatedVersions - .getOrDefault(urnAspectName.getKey(), Map.of()) - .containsKey(urnAspectName.getValue())) - .collect( - Collectors.groupingBy( - Pair::getKey, Collectors.mapping(Pair::getValue, Collectors.toSet()))); + + final Map> precalculatedVersions; + final Map> missingAspectVersions; + if (txContext.getFailedAttempts() > 2 && txContext.lastExceptionIsDuplicateKey()) { + log.warn( + "Multiple exceptions detected, last exception detected as DuplicateKey, fallback to database max(version)+1"); + precalculatedVersions = Map.of(); + missingAspectVersions = urnAspects; + } else { + precalculatedVersions = + latestAspects.entrySet().stream() + .map( + entry -> + Map.entry( + entry.getKey(), convertSystemAspectToNextVersionMap(entry.getValue()))) + .filter(entry -> !entry.getValue().isEmpty()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + missingAspectVersions = + urnAspects.entrySet().stream() + .flatMap( + entry -> + entry.getValue().stream() + .map(aspectName -> Pair.of(entry.getKey(), aspectName))) + .filter( + urnAspectName -> + !precalculatedVersions + .getOrDefault(urnAspectName.getKey(), Map.of()) + .containsKey(urnAspectName.getValue())) + .collect( + Collectors.groupingBy( + Pair::getKey, Collectors.mapping(Pair::getValue, Collectors.toSet()))); + } + Map> databaseVersions = missingAspectVersions.isEmpty() ? Map.of() diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/TransactionContext.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/TransactionContext.java new file mode 100644 index 00000000000000..69f2f1c8981c03 --- /dev/null +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/TransactionContext.java @@ -0,0 +1,69 @@ +package com.linkedin.metadata.entity; + +import io.ebean.DuplicateKeyException; +import io.ebean.Transaction; +import java.util.ArrayList; +import java.util.List; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NonNull; +import lombok.experimental.Accessors; +import org.springframework.lang.Nullable; + +/** Wrap the transaction with additional information about the exceptions during retry. */ +@Data +@AllArgsConstructor +@Accessors(fluent = true) +public class TransactionContext { + public static final int DEFAULT_MAX_TRANSACTION_RETRY = 3; + + public static TransactionContext empty() { + return empty(DEFAULT_MAX_TRANSACTION_RETRY); + } + + public static TransactionContext empty(@Nullable Integer maxRetries) { + return empty(null, maxRetries == null ? DEFAULT_MAX_TRANSACTION_RETRY : maxRetries); + } + + public static TransactionContext empty(Transaction tx, int maxRetries) { + return new TransactionContext(tx, maxRetries, new ArrayList<>()); + } + + @Nullable private Transaction tx; + private int maxRetries; + @NonNull private List exceptions; + + public TransactionContext success() { + exceptions.clear(); + return this; + } + + public TransactionContext addException(RuntimeException e) { + exceptions.add(e); + return this; + } + + public int getFailedAttempts() { + return exceptions.size(); + } + + @Nullable + public RuntimeException lastException() { + return exceptions.isEmpty() ? null : exceptions.get(exceptions.size() - 1); + } + + public boolean lastExceptionIsDuplicateKey() { + return lastException() instanceof DuplicateKeyException; + } + + public boolean shouldAttemptRetry() { + return exceptions.size() <= maxRetries; + } + + public void commitAndContinue() { + if (tx != null) { + tx.commitAndContinue(); + } + success(); + } +} diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/cassandra/CassandraAspectDao.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/cassandra/CassandraAspectDao.java index 51f898d3122af3..9e7387947a9547 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/cassandra/CassandraAspectDao.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/cassandra/CassandraAspectDao.java @@ -29,13 +29,13 @@ import com.linkedin.metadata.entity.EntityAspect; import com.linkedin.metadata.entity.EntityAspectIdentifier; import com.linkedin.metadata.entity.ListResult; +import com.linkedin.metadata.entity.TransactionContext; import com.linkedin.metadata.entity.ebean.EbeanAspectV2; import com.linkedin.metadata.entity.ebean.PartitionedStream; import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs; import com.linkedin.metadata.query.ExtraInfo; import com.linkedin.metadata.query.ExtraInfoArray; import com.linkedin.metadata.query.ListResultMetadata; -import io.ebean.Transaction; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.sql.Timestamp; @@ -187,7 +187,7 @@ private Map getMaxVersions( @Override public void saveAspect( - @Nullable Transaction tx, @Nonnull EntityAspect aspect, final boolean insert) { + @Nullable TransactionContext txContext, @Nonnull EntityAspect aspect, final boolean insert) { validateConnection(); SimpleStatement statement = generateSaveStatement(aspect, insert); _cqlSession.execute(statement); @@ -287,23 +287,21 @@ public ListResult listAspectMetadata( @Override @Nonnull public T runInTransactionWithRetry( - @Nonnull final Function block, final int maxTransactionRetry) { + @Nonnull final Function block, final int maxTransactionRetry) { validateConnection(); - int retryCount = 0; - Exception lastException; - + TransactionContext txContext = TransactionContext.empty(maxTransactionRetry); do { try { // TODO: Try to bend this code to make use of Cassandra batches. This method is called from // single-urn operations, so perf should not suffer much - return block.apply(null); + return block.apply(txContext); } catch (DriverException exception) { - lastException = exception; + txContext.addException(exception); } - } while (++retryCount <= maxTransactionRetry); + } while (txContext.shouldAttemptRetry()); throw new RetryLimitReached( - "Failed to add after " + maxTransactionRetry + " retries", lastException); + "Failed to add after " + maxTransactionRetry + " retries", txContext.lastException()); } private ListResult toListResult( @@ -368,7 +366,8 @@ private static AuditStamp toAuditStamp(@Nonnull final EntityAspect aspect) { } @Override - public void deleteAspect(@Nullable Transaction tx, @Nonnull final EntityAspect aspect) { + public void deleteAspect( + @Nullable TransactionContext txContext, @Nonnull final EntityAspect aspect) { validateConnection(); SimpleStatement ss = deleteFrom(CassandraAspect.TABLE_NAME) @@ -385,7 +384,7 @@ public void deleteAspect(@Nullable Transaction tx, @Nonnull final EntityAspect a } @Override - public int deleteUrn(@Nullable Transaction tx, @Nonnull final String urn) { + public int deleteUrn(@Nullable TransactionContext txContext, @Nonnull final String urn) { validateConnection(); SimpleStatement ss = deleteFrom(CassandraAspect.TABLE_NAME) @@ -569,7 +568,7 @@ public Map> getNextVersions(Map> u @Override public long saveLatestAspect( - @Nullable Transaction tx, + @Nullable TransactionContext txContext, @Nonnull final String urn, @Nonnull final String aspectName, @Nullable final String oldAspectMetadata, @@ -675,7 +674,7 @@ public void setWritable(boolean canWrite) { @Override public void saveAspect( - @Nullable Transaction tx, + @Nullable TransactionContext txContext, @Nonnull final String urn, @Nonnull final String aspectName, @Nonnull final String aspectMetadata, @@ -698,7 +697,7 @@ public void saveAspect( actor, impersonator); - saveAspect(tx, aspect, insert); + saveAspect(txContext, aspect, insert); // metrics incrementWriteMetrics(aspectName, 1, aspectMetadata.getBytes(StandardCharsets.UTF_8).length); diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java index 93c06b9236d501..4304be1aa2a00a 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java @@ -19,6 +19,7 @@ import com.linkedin.metadata.entity.EntityAspect; import com.linkedin.metadata.entity.EntityAspectIdentifier; import com.linkedin.metadata.entity.ListResult; +import com.linkedin.metadata.entity.TransactionContext; import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl; import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs; import com.linkedin.metadata.models.AspectSpec; @@ -143,7 +144,7 @@ private boolean validateConnection() { @Override public long saveLatestAspect( - @Nullable Transaction tx, + @Nullable TransactionContext txContext, @Nonnull final String urn, @Nonnull final String aspectName, @Nullable final String oldAspectMetadata, @@ -167,7 +168,7 @@ public long saveLatestAspect( if (oldAspectMetadata != null && oldTime != null) { largestVersion = nextVersion; saveAspect( - tx, + txContext, urn, aspectName, oldAspectMetadata, @@ -181,7 +182,7 @@ public long saveLatestAspect( // Save newValue as the latest version (v0) saveAspect( - tx, + txContext, urn, aspectName, newAspectMetadata, @@ -197,7 +198,7 @@ public long saveLatestAspect( @Override public void saveAspect( - @Nullable Transaction tx, + @Nullable TransactionContext txContext, @Nonnull final String urn, @Nonnull final String aspectName, @Nonnull final String aspectMetadata, @@ -220,23 +221,27 @@ public void saveAspect( aspect.setCreatedFor(impersonator); } - saveEbeanAspect(tx, aspect, insert); + saveEbeanAspect(txContext, aspect, insert); } @Override public void saveAspect( - @Nullable Transaction tx, @Nonnull final EntityAspect aspect, final boolean insert) { + @Nullable TransactionContext txContext, + @Nonnull final EntityAspect aspect, + final boolean insert) { EbeanAspectV2 ebeanAspect = EbeanAspectV2.fromEntityAspect(aspect); - saveEbeanAspect(tx, ebeanAspect, insert); + saveEbeanAspect(txContext, ebeanAspect, insert); } private void saveEbeanAspect( - @Nullable Transaction tx, @Nonnull final EbeanAspectV2 ebeanAspect, final boolean insert) { + @Nullable TransactionContext txContext, + @Nonnull final EbeanAspectV2 ebeanAspect, + final boolean insert) { validateConnection(); if (insert) { - _server.insert(ebeanAspect, tx); + _server.insert(ebeanAspect, txContext.tx()); } else { - _server.update(ebeanAspect, tx); + _server.update(ebeanAspect, txContext.tx()); } } @@ -304,20 +309,21 @@ public EntityAspect getAspect(@Nonnull final EntityAspectIdentifier key) { } @Override - public void deleteAspect(@Nullable Transaction tx, @Nonnull final EntityAspect aspect) { + public void deleteAspect( + @Nullable TransactionContext txContext, @Nonnull final EntityAspect aspect) { validateConnection(); EbeanAspectV2 ebeanAspect = EbeanAspectV2.fromEntityAspect(aspect); - _server.delete(ebeanAspect, tx); + _server.delete(ebeanAspect, txContext.tx()); } @Override - public int deleteUrn(@Nullable Transaction tx, @Nonnull final String urn) { + public int deleteUrn(@Nullable TransactionContext txContext, @Nonnull final String urn) { validateConnection(); return _server .createQuery(EbeanAspectV2.class) .where() .eq(EbeanAspectV2.URN_COLUMN, urn) - .delete(tx); + .delete(txContext.tx()); } @Override @@ -658,14 +664,14 @@ public ListResult listLatestAspectMetadata( @Override @Nonnull public T runInTransactionWithRetry( - @Nonnull final Function block, final int maxTransactionRetry) { + @Nonnull final Function block, final int maxTransactionRetry) { return runInTransactionWithRetry(block, null, maxTransactionRetry).get(0); } @Override @Nonnull public List runInTransactionWithRetry( - @Nonnull final Function block, + @Nonnull final Function block, @Nullable AspectsBatch batch, final int maxTransactionRetry) { @@ -720,13 +726,12 @@ public List runInTransactionWithRetry( @Nonnull public T runInTransactionWithRetryUnlocked( - @Nonnull final Function block, + @Nonnull final Function block, @Nullable AspectsBatch batch, final int maxTransactionRetry) { validateConnection(); - int retryCount = 0; - Exception lastException = null; + TransactionContext transactionContext = TransactionContext.empty(maxTransactionRetry); T result = null; do { @@ -734,9 +739,8 @@ public T runInTransactionWithRetryUnlocked( _server.beginTransaction( TxScope.requiresNew().setIsolation(TxIsolation.REPEATABLE_READ))) { transaction.setBatchMode(true); - result = block.apply(transaction); + result = block.apply(transactionContext.tx(transaction)); transaction.commit(); - lastException = null; break; } catch (PersistenceException exception) { if (exception instanceof DuplicateKeyException) { @@ -749,20 +753,21 @@ public T runInTransactionWithRetryUnlocked( log.warn( "Skipping DuplicateKeyException retry since aspect is the key aspect. {}", batch.getUrnAspectsMap().keySet()); - continue; + break; } } MetricUtils.counter(MetricRegistry.name(this.getClass(), "txFailed")).inc(); log.warn("Retryable PersistenceException: {}", exception.getMessage()); - lastException = exception; + transactionContext.addException(exception); } - } while (++retryCount <= maxTransactionRetry); + } while (transactionContext.shouldAttemptRetry()); - if (lastException != null) { + if (transactionContext.lastException() != null) { MetricUtils.counter(MetricRegistry.name(this.getClass(), "txFailedAfterRetries")).inc(); throw new RetryLimitReached( - "Failed to add after " + maxTransactionRetry + " retries", lastException); + "Failed to add after " + maxTransactionRetry + " retries", + transactionContext.lastException()); } return result; diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java index b9f5984e576678..e8d3c654f6f639 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java @@ -1,14 +1,19 @@ package com.linkedin.metadata.entity; +import static com.linkedin.metadata.Constants.CORP_USER_ENTITY_NAME; +import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME; import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import com.linkedin.common.AuditStamp; +import com.linkedin.common.Status; import com.linkedin.common.urn.Urn; import com.linkedin.common.urn.UrnUtils; import com.linkedin.data.template.DataTemplateUtil; import com.linkedin.data.template.RecordTemplate; +import com.linkedin.entity.EnvelopedAspect; import com.linkedin.identity.CorpUserInfo; import com.linkedin.metadata.AspectGenerationUtils; import com.linkedin.metadata.Constants; @@ -36,6 +41,8 @@ import io.ebean.TxScope; import io.ebean.annotation.TxIsolation; import java.net.URISyntaxException; +import java.sql.Timestamp; +import java.time.Instant; import java.util.Collection; import java.util.List; import java.util.Map; @@ -292,6 +299,139 @@ public void testNestedTransactions() throws AssertionError { System.out.println("done"); } + @Test + public void testSystemMetadataDuplicateKey() throws Exception { + Urn entityUrn = UrnUtils.getUrn("urn:li:corpuser:duplicateKeyTest"); + SystemMetadata systemMetadata = AspectGenerationUtils.createSystemMetadata(); + ChangeItemImpl item = + ChangeItemImpl.builder() + .urn(entityUrn) + .aspectName(STATUS_ASPECT_NAME) + .recordTemplate(new Status().setRemoved(true)) + .systemMetadata(systemMetadata) + .auditStamp(TEST_AUDIT_STAMP) + .build(TestOperationContexts.emptyAspectRetriever(null)); + _entityServiceImpl.ingestAspects( + opContext, + AspectsBatchImpl.builder() + .retrieverContext(opContext.getRetrieverContext().get()) + .items(List.of(item)) + .build(), + false, + true); + + // List aspects urns + EnvelopedAspect envelopedAspect = + _entityServiceImpl.getLatestEnvelopedAspect( + opContext, CORP_USER_ENTITY_NAME, entityUrn, STATUS_ASPECT_NAME); + + assertNotNull(envelopedAspect); + assertEquals(envelopedAspect.getVersion(), 0L, "Expected version 0"); + assertEquals( + envelopedAspect.getSystemMetadata().getVersion(), + "1", + "Expected version 0 with systemMeta version 1"); + + // Corrupt the version 0 systemMeta + try (Transaction transaction = + ((EbeanAspectDao) _entityServiceImpl.aspectDao) + .getServer() + .beginTransaction(TxScope.requiresNew().setIsolation(TxIsolation.REPEATABLE_READ))) { + TransactionContext transactionContext = TransactionContext.empty(transaction, 3); + _entityServiceImpl.aspectDao.saveAspect( + transactionContext, + entityUrn.toString(), + STATUS_ASPECT_NAME, + new Status().setRemoved(false).toString(), + entityUrn.toString(), + null, + Timestamp.from(Instant.now()), + systemMetadata.toString(), + 1, + true); + transaction.commit(); + } + + // Run another update + _entityServiceImpl.ingestAspects( + opContext, + AspectsBatchImpl.builder() + .retrieverContext(opContext.getRetrieverContext().get()) + .items( + List.of( + ChangeItemImpl.builder() + .urn(entityUrn) + .aspectName(STATUS_ASPECT_NAME) + .recordTemplate(new Status().setRemoved(false)) + .systemMetadata(systemMetadata) + .auditStamp(TEST_AUDIT_STAMP) + .build(TestOperationContexts.emptyAspectRetriever(null)))) + .build(), + false, + true); + EnvelopedAspect envelopedAspect2 = + _entityServiceImpl.getLatestEnvelopedAspect( + opContext, CORP_USER_ENTITY_NAME, entityUrn, STATUS_ASPECT_NAME); + + assertNotNull(envelopedAspect2); + assertEquals(envelopedAspect2.getVersion(), 0L, "Expected version 0"); + assertEquals( + envelopedAspect2.getSystemMetadata().getVersion(), + "3", + "Expected version 0 with systemMeta version 3 accounting for the the collision"); + } + + @Test + public void testBatchDuplicate() throws Exception { + Urn entityUrn = UrnUtils.getUrn("urn:li:corpuser:batchDuplicateTest"); + SystemMetadata systemMetadata = AspectGenerationUtils.createSystemMetadata(); + ChangeItemImpl item1 = + ChangeItemImpl.builder() + .urn(entityUrn) + .aspectName(STATUS_ASPECT_NAME) + .recordTemplate(new Status().setRemoved(true)) + .systemMetadata(systemMetadata.copy()) + .auditStamp(TEST_AUDIT_STAMP) + .build(TestOperationContexts.emptyAspectRetriever(null)); + ChangeItemImpl item2 = + ChangeItemImpl.builder() + .urn(entityUrn) + .aspectName(STATUS_ASPECT_NAME) + .recordTemplate(new Status().setRemoved(false)) + .systemMetadata(systemMetadata.copy()) + .auditStamp(TEST_AUDIT_STAMP) + .build(TestOperationContexts.emptyAspectRetriever(null)); + _entityServiceImpl.ingestAspects( + opContext, + AspectsBatchImpl.builder() + .retrieverContext(opContext.getRetrieverContext().get()) + .items(List.of(item1, item2)) + .build(), + false, + true); + + // List aspects urns + ListUrnsResult batch = _entityServiceImpl.listUrns(opContext, entityUrn.getEntityType(), 0, 2); + + assertEquals(batch.getStart().intValue(), 0); + assertEquals(batch.getCount().intValue(), 1); + assertEquals(batch.getTotal().intValue(), 1); + assertEquals(batch.getEntities().size(), 1); + assertEquals(entityUrn.toString(), batch.getEntities().get(0).toString()); + + EnvelopedAspect envelopedAspect = + _entityServiceImpl.getLatestEnvelopedAspect( + opContext, CORP_USER_ENTITY_NAME, entityUrn, STATUS_ASPECT_NAME); + assertEquals( + envelopedAspect.getSystemMetadata().getVersion(), + "2", + "Expected version 2 accounting for duplicates"); + assertEquals( + envelopedAspect.getValue().toString(), + "{removed=false}", + "Expected 2nd item to be the latest"); + } + @Test public void dataGeneratorThreadingTest() { DataGenerator dataGenerator = new DataGenerator(opContext, _entityServiceImpl); diff --git a/metadata-service/openapi-servlet/src/test/java/entities/EntitiesControllerTest.java b/metadata-service/openapi-servlet/src/test/java/entities/EntitiesControllerTest.java index 3e352403c88bca..8b530b218532d0 100644 --- a/metadata-service/openapi-servlet/src/test/java/entities/EntitiesControllerTest.java +++ b/metadata-service/openapi-servlet/src/test/java/entities/EntitiesControllerTest.java @@ -14,6 +14,7 @@ import com.linkedin.metadata.aspect.batch.AspectsBatch; import com.linkedin.metadata.config.PreProcessHooks; import com.linkedin.metadata.entity.AspectDao; +import com.linkedin.metadata.entity.TransactionContext; import com.linkedin.metadata.entity.UpdateAspectResult; import com.linkedin.metadata.event.EventProducer; import io.datahubproject.metadata.context.OperationContext; @@ -69,14 +70,14 @@ public void setup() OperationContext opContext = TestOperationContexts.systemContextNoSearchAuthorization(); AspectDao aspectDao = Mockito.mock(AspectDao.class); when(aspectDao.runInTransactionWithRetry( - ArgumentMatchers.>>any(), + ArgumentMatchers.>>any(), any(AspectsBatch.class), anyInt())) .thenAnswer( i -> List.of( - ((Function>) i.getArgument(0)) - .apply(Mockito.mock(Transaction.class)))); + ((Function>) i.getArgument(0)) + .apply(TransactionContext.empty(Mockito.mock(Transaction.class), 0)))); EventProducer mockEntityEventProducer = Mockito.mock(EventProducer.class); PreProcessHooks preProcessHooks = new PreProcessHooks();