diff --git a/.github/workflows/pr-labeler.yml b/.github/workflows/pr-labeler.yml index 7da20ece44f6d6..de7ad21b3e67bb 100644 --- a/.github/workflows/pr-labeler.yml +++ b/.github/workflows/pr-labeler.yml @@ -29,7 +29,6 @@ jobs: "swaroopjagadish", "treff7es", "yoonhyejin", - "eboneil", "gabe-lyons", "hsheth2", "jjoyce0510", @@ -37,16 +36,17 @@ jobs: "pedro93", "RyanHolstien", "sakethvarma397", - "Kunal-kankriya", "purnimagarg1", - "dushayntAW", "sagar-salvi-apptware", "kushagra-apptware", "Salman-Apptware", "mayurinehate", "noggi", "skrydal", - "kevinkarchacryl" + "kevinkarchacryl", + "sgomezvillamor", + "acrylJonny", + "chakru-r" ]'), github.actor ) diff --git a/datahub-web-react/src/app/ingest/source/executions/reporting/StructuredReportItem.tsx b/datahub-web-react/src/app/ingest/source/executions/reporting/StructuredReportItem.tsx index d15f30bc03211c..1cd4349f37d949 100644 --- a/datahub-web-react/src/app/ingest/source/executions/reporting/StructuredReportItem.tsx +++ b/datahub-web-react/src/app/ingest/source/executions/reporting/StructuredReportItem.tsx @@ -16,6 +16,7 @@ const StyledCollapse = styled(Collapse)<{ color: string }>` .ant-collapse-header { display: flex; align-items: center; + overflow: auto; } .ant-collapse-item { diff --git a/entity-registry/src/main/java/com/linkedin/metadata/models/StructuredPropertyUtils.java b/entity-registry/src/main/java/com/linkedin/metadata/models/StructuredPropertyUtils.java index 41ef9c25a0f3eb..e9ee7789550c6c 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/models/StructuredPropertyUtils.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/models/StructuredPropertyUtils.java @@ -178,7 +178,7 @@ public static String toElasticsearchFieldName( /** * Return an elasticsearch type from structured property type * - * @param fieldName filter or facet field name + * @param fieldName filter or facet field name - must match actual FQN of structured prop * @param aspectRetriever aspect retriever * @return elasticsearch type */ diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index c6d55fb5bcc56e..5ae5438e212c5b 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -142,6 +142,15 @@ # datahub does not depend on traitlets directly but great expectations does. # https://github.com/ipython/traitlets/issues/741 "traitlets!=5.2.2", + # GE depends on IPython - we have no direct dependency on it. + # IPython 8.22.0 added a dependency on traitlets 5.13.x, but only declared a + # version requirement of traitlets>5. + # See https://github.com/ipython/ipython/issues/14352. + # This issue was fixed by https://github.com/ipython/ipython/pull/14353, + # which first appeared in IPython 8.22.1. + # As such, we just need to avoid that version in order to get the + # dependencies that we need. IPython probably should've yanked 8.22.0. + "IPython!=8.22.0", "greenlet", *cachetools_lib, } diff --git a/metadata-ingestion/src/datahub/cli/delete_cli.py b/metadata-ingestion/src/datahub/cli/delete_cli.py index 8b852513e03c0f..a640f941b75276 100644 --- a/metadata-ingestion/src/datahub/cli/delete_cli.py +++ b/metadata-ingestion/src/datahub/cli/delete_cli.py @@ -214,14 +214,47 @@ def references(urn: str, dry_run: bool, force: bool) -> None: @delete.command() -@click.option("--urn", required=True, type=str, help="the urn of the entity") -def undo_by_filter(urn: str) -> None: +@click.option("--urn", required=False, type=str, help="the urn of the entity") +@click.option( + "-p", + "--platform", + required=False, + type=str, + help="Platform filter (e.g. snowflake)", +) +@click.option( + "-b", + "--batch-size", + required=False, + default=3000, + type=int, + help="Batch size when querying for entities to un-soft delete." + "Maximum 10000. Large batch sizes may cause timeouts.", +) +def undo_by_filter( + urn: Optional[str], platform: Optional[str], batch_size: int +) -> None: """ - Undo a soft deletion of an entity + Undo soft deletion by filters """ graph = get_default_graph() logger.info(f"Using {graph}") - graph.set_soft_delete_status(urn=urn, delete=False) + if urn: + graph.set_soft_delete_status(urn=urn, delete=False) + else: + urns = list( + graph.get_urns_by_filter( + platform=platform, + query="*", + status=RemovedStatusFilter.ONLY_SOFT_DELETED, + batch_size=batch_size, + ) + ) + logger.info(f"Going to un-soft delete {len(urns)} urns") + urns_iter = progressbar.progressbar(urns, redirect_stdout=True) + for urn in urns_iter: + assert urn + graph.set_soft_delete_status(urn=urn, delete=False) @delete.command(no_args_is_help=True) diff --git a/metadata-ingestion/src/datahub/configuration/kafka_consumer_config.py b/metadata-ingestion/src/datahub/configuration/kafka_consumer_config.py index cac6bb4996391f..f08c78cadc0b2b 100644 --- a/metadata-ingestion/src/datahub/configuration/kafka_consumer_config.py +++ b/metadata-ingestion/src/datahub/configuration/kafka_consumer_config.py @@ -1,3 +1,4 @@ +import inspect import logging from typing import Any, Dict, Optional @@ -34,5 +35,34 @@ def _resolve_oauth_callback(self) -> None: "oauth_cb must be a string representing python function reference " "in the format :." ) + + call_back_fn = import_path(call_back) + self._validate_call_back_fn_signature(call_back_fn) + # Set the callback - self._config[CallableConsumerConfig.CALLBACK_ATTRIBUTE] = import_path(call_back) + self._config[CallableConsumerConfig.CALLBACK_ATTRIBUTE] = call_back_fn + + def _validate_call_back_fn_signature(self, call_back_fn: Any) -> None: + sig = inspect.signature(call_back_fn) + + num_positional_args = len( + [ + param + for param in sig.parameters.values() + if param.kind + in ( + inspect.Parameter.POSITIONAL_ONLY, + inspect.Parameter.POSITIONAL_OR_KEYWORD, + ) + and param.default == inspect.Parameter.empty + ] + ) + + has_variadic_args = any( + param.kind == inspect.Parameter.VAR_POSITIONAL + for param in sig.parameters.values() + ) + + assert num_positional_args == 1 or ( + has_variadic_args and num_positional_args <= 1 + ), "oauth_cb function must accept single positional argument." diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py b/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py index 90641b7059ca40..3e51b7da9e8be1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py @@ -208,22 +208,28 @@ def fetch_dpis(self, job_urn: str, batch_size: int) -> List[dict]: dpis = [] start = 0 while True: - job_query_result = self.ctx.graph.execute_graphql( - DATA_PROCESS_INSTANCES_QUERY, - {"dataJobUrn": job_urn, "start": start, "count": batch_size}, - ) - job_data = job_query_result.get("dataJob") - if not job_data: - raise ValueError(f"Error getting job {job_urn}") - - runs_data = job_data.get("runs") - if not runs_data: - raise ValueError(f"Error getting runs for {job_urn}") - - runs = runs_data.get("runs") - dpis.extend(runs) - start += batch_size - if len(runs) < batch_size: + try: + job_query_result = self.ctx.graph.execute_graphql( + DATA_PROCESS_INSTANCES_QUERY, + {"dataJobUrn": job_urn, "start": start, "count": batch_size}, + ) + job_data = job_query_result.get("dataJob") + if not job_data: + logger.error(f"Error getting job {job_urn}") + break + + runs_data = job_data.get("runs") + if not runs_data: + logger.error(f"Error getting runs for {job_urn}") + break + + runs = runs_data.get("runs") + dpis.extend(runs) + start += batch_size + if len(runs) < batch_size: + break + except Exception as e: + logger.error(f"Exception while fetching DPIs for job {job_urn}: {e}") break return dpis @@ -243,8 +249,12 @@ def keep_last_n_dpi( futures[future] = dpi for future in as_completed(futures): - deleted_count_last_n += 1 - futures[future]["deleted"] = True + try: + future.result() + deleted_count_last_n += 1 + futures[future]["deleted"] = True + except Exception as e: + logger.error(f"Exception while deleting DPI: {e}") if deleted_count_last_n % self.config.batch_size == 0: logger.info(f"Deleted {deleted_count_last_n} DPIs from {job.urn}") @@ -279,7 +289,7 @@ def delete_dpi_from_datajobs(self, job: DataJobEntity) -> None: dpis = self.fetch_dpis(job.urn, self.config.batch_size) dpis.sort( key=lambda x: x["created"]["time"] - if x["created"] and x["created"]["time"] + if "created" in x and "time" in x["created"] else 0, reverse=True, ) @@ -314,15 +324,23 @@ def remove_old_dpis( if dpi.get("deleted"): continue - if dpi["created"]["time"] < retention_time * 1000: + if ( + "created" not in dpi + or "time" not in dpi["created"] + or dpi["created"]["time"] < retention_time * 1000 + ): future = executor.submit( self.delete_entity, dpi["urn"], "dataprocessInstance" ) futures[future] = dpi for future in as_completed(futures): - deleted_count_retention += 1 - futures[future]["deleted"] = True + try: + future.result() + deleted_count_retention += 1 + futures[future]["deleted"] = True + except Exception as e: + logger.error(f"Exception while deleting DPI: {e}") if deleted_count_retention % self.config.batch_size == 0: logger.info( @@ -378,8 +396,11 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: dataFlows[flow.urn] = flow scroll_id: Optional[str] = None + previous_scroll_id: Optional[str] = None + dataJobs: Dict[str, List[DataJobEntity]] = defaultdict(list) deleted_jobs: int = 0 + while True: result = self.ctx.graph.execute_graphql( DATAJOB_QUERY, @@ -426,9 +447,11 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: else: dataJobs[datajob_entity.flow_urn].append(datajob_entity) - if not scroll_id: + if not scroll_id or previous_scroll_id == scroll_id: break + previous_scroll_id = scroll_id + logger.info(f"Deleted {deleted_jobs} DataJobs") # Delete empty dataflows if needed if self.config.delete_empty_data_flows: @@ -443,4 +466,5 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: if deleted_jobs % self.config.batch_size == 0: logger.info(f"Deleted {deleted_data_flows} DataFlows") logger.info(f"Deleted {deleted_data_flows} DataFlows") + return [] diff --git a/metadata-ingestion/src/datahub/ingestion/source/pulsar.py b/metadata-ingestion/src/datahub/ingestion/source/pulsar.py index 790c1f918cdfd2..15ee995b2d5fdc 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/pulsar.py +++ b/metadata-ingestion/src/datahub/ingestion/source/pulsar.py @@ -78,7 +78,17 @@ class PulsarSchema: def __init__(self, schema): self.schema_version = schema.get("version") - avro_schema = json.loads(schema.get("data")) + schema_data = schema.get("data") + if not schema_data: + logger.warning("Schema data is empty or None. Using default empty schema.") + schema_data = "{}" + + try: + avro_schema = json.loads(schema_data) + except json.JSONDecodeError as e: + logger.error(f"Invalid JSON schema: {schema_data}. Error: {str(e)}") + avro_schema = {} + self.schema_name = avro_schema.get("namespace") + "." + avro_schema.get("name") self.schema_description = avro_schema.get("doc") self.schema_type = schema.get("type") diff --git a/metadata-ingestion/tests/integration/kafka/oauth.py b/metadata-ingestion/tests/integration/kafka/oauth.py index 28cfee521d6c0f..81a91fcd5e4069 100644 --- a/metadata-ingestion/tests/integration/kafka/oauth.py +++ b/metadata-ingestion/tests/integration/kafka/oauth.py @@ -12,3 +12,19 @@ def create_token(*args: Any, **kwargs: Any) -> Tuple[str, int]: "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJjbGllbnRfaWQiOiJrYWZrYV9jbGllbnQiLCJleHAiOjE2OTg3NjYwMDB9.dummy_sig_abcdef123456", 3600, ) + + +def create_token_no_args() -> Tuple[str, int]: + logger.warning(MESSAGE) + return ( + "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJjbGllbnRfaWQiOiJrYWZrYV9jbGllbnQiLCJleHAiOjE2OTg3NjYwMDB9.dummy_sig_abcdef123456", + 3600, + ) + + +def create_token_only_kwargs(**kwargs: Any) -> Tuple[str, int]: + logger.warning(MESSAGE) + return ( + "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJjbGllbnRfaWQiOiJrYWZrYV9jbGllbnQiLCJleHAiOjE2OTg3NjYwMDB9.dummy_sig_abcdef123456", + 3600, + ) diff --git a/metadata-ingestion/tests/integration/kafka/test_kafka.py b/metadata-ingestion/tests/integration/kafka/test_kafka.py index 7462f177684b7e..bf0ec1845a66c2 100644 --- a/metadata-ingestion/tests/integration/kafka/test_kafka.py +++ b/metadata-ingestion/tests/integration/kafka/test_kafka.py @@ -5,9 +5,10 @@ import yaml from freezegun import freeze_time +from datahub.configuration.common import ConfigurationError from datahub.ingestion.api.source import SourceCapability from datahub.ingestion.run.pipeline import Pipeline -from datahub.ingestion.source.kafka.kafka import KafkaSource +from datahub.ingestion.source.kafka.kafka import KafkaSource, KafkaSourceConfig from tests.integration.kafka import oauth # type: ignore from tests.test_helpers import mce_helpers, test_connection_helpers from tests.test_helpers.click_helpers import run_datahub_cmd @@ -157,3 +158,31 @@ def test_kafka_oauth_callback( assert checks["consumer_oauth_callback"], "Consumer oauth callback not found" assert checks["admin_polling"], "Admin polling was not initiated" assert checks["admin_oauth_callback"], "Admin oauth callback not found" + + +def test_kafka_source_oauth_cb_signature(): + with pytest.raises( + ConfigurationError, + match=("oauth_cb function must accept single positional argument."), + ): + KafkaSourceConfig.parse_obj( + { + "connection": { + "bootstrap": "foobar:9092", + "consumer_config": {"oauth_cb": "oauth:create_token_no_args"}, + } + } + ) + + with pytest.raises( + ConfigurationError, + match=("oauth_cb function must accept single positional argument."), + ): + KafkaSourceConfig.parse_obj( + { + "connection": { + "bootstrap": "foobar:9092", + "consumer_config": {"oauth_cb": "oauth:create_token_only_kwargs"}, + } + } + ) diff --git a/metadata-ingestion/tests/unit/test_gc.py b/metadata-ingestion/tests/unit/test_gc.py new file mode 100644 index 00000000000000..5429c85dd608dc --- /dev/null +++ b/metadata-ingestion/tests/unit/test_gc.py @@ -0,0 +1,109 @@ +import unittest +from datetime import datetime, timezone +from unittest.mock import MagicMock, patch + +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.source.gc.dataprocess_cleanup import ( + DataJobEntity, + DataProcessCleanup, + DataProcessCleanupConfig, + DataProcessCleanupReport, +) + + +class TestDataProcessCleanup(unittest.TestCase): + def setUp(self): + self.ctx = PipelineContext(run_id="test_run") + self.ctx.graph = MagicMock() + self.config = DataProcessCleanupConfig() + self.report = DataProcessCleanupReport() + self.cleanup = DataProcessCleanup( + self.ctx, self.config, self.report, dry_run=True + ) + + @patch( + "datahub.ingestion.source.gc.dataprocess_cleanup.DataProcessCleanup.fetch_dpis" + ) + def test_delete_dpi_from_datajobs(self, mock_fetch_dpis): + job = DataJobEntity( + urn="urn:li:dataJob:1", + flow_urn="urn:li:dataFlow:1", + lastIngested=int(datetime.now(timezone.utc).timestamp()), + jobId="job1", + dataPlatformInstance="urn:li:dataPlatformInstance:1", + total_runs=10, + ) + mock_fetch_dpis.return_value = [ + { + "urn": f"urn:li:dataprocessInstance:{i}", + "created": { + "time": int(datetime.now(timezone.utc).timestamp() + i) * 1000 + }, + } + for i in range(10) + ] + self.cleanup.delete_dpi_from_datajobs(job) + self.assertEqual(5, self.report.num_aspects_removed) + + @patch( + "datahub.ingestion.source.gc.dataprocess_cleanup.DataProcessCleanup.fetch_dpis" + ) + def test_delete_dpi_from_datajobs_without_dpis(self, mock_fetch_dpis): + job = DataJobEntity( + urn="urn:li:dataJob:1", + flow_urn="urn:li:dataFlow:1", + lastIngested=int(datetime.now(timezone.utc).timestamp()), + jobId="job1", + dataPlatformInstance="urn:li:dataPlatformInstance:1", + total_runs=10, + ) + mock_fetch_dpis.return_value = [] + self.cleanup.delete_dpi_from_datajobs(job) + self.assertEqual(0, self.report.num_aspects_removed) + + @patch( + "datahub.ingestion.source.gc.dataprocess_cleanup.DataProcessCleanup.fetch_dpis" + ) + def test_delete_dpi_from_datajobs_without_dpi_created_time(self, mock_fetch_dpis): + job = DataJobEntity( + urn="urn:li:dataJob:1", + flow_urn="urn:li:dataFlow:1", + lastIngested=int(datetime.now(timezone.utc).timestamp()), + jobId="job1", + dataPlatformInstance="urn:li:dataPlatformInstance:1", + total_runs=10, + ) + mock_fetch_dpis.return_value = [ + {"urn": f"urn:li:dataprocessInstance:{i}"} for i in range(10) + ] + [ + { + "urn": "urn:li:dataprocessInstance:11", + "created": {"time": int(datetime.now(timezone.utc).timestamp() * 1000)}, + } + ] + self.cleanup.delete_dpi_from_datajobs(job) + self.assertEqual(10, self.report.num_aspects_removed) + + def test_fetch_dpis(self): + assert self.cleanup.ctx.graph + self.cleanup.ctx.graph = MagicMock() + self.cleanup.ctx.graph.execute_graphql.return_value = { + "dataJob": { + "runs": { + "runs": [ + { + "urn": "urn:li:dataprocessInstance:1", + "created": { + "time": int(datetime.now(timezone.utc).timestamp()) + }, + } + ] + } + } + } + dpis = self.cleanup.fetch_dpis("urn:li:dataJob:1", 10) + self.assertEqual(len(dpis), 1) + + +if __name__ == "__main__": + unittest.main() diff --git a/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java b/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java index fa9109689caad4..29faa3955ea662 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java @@ -18,6 +18,7 @@ import com.linkedin.entity.Entity; import com.linkedin.entity.EntityResponse; import com.linkedin.entity.client.EntityClient; +import com.linkedin.entity.client.EntityClientConfig; import com.linkedin.metadata.Constants; import com.linkedin.metadata.aspect.EnvelopedAspect; import com.linkedin.metadata.aspect.EnvelopedAspectArray; @@ -97,7 +98,7 @@ public class JavaEntityClient implements EntityClient { private final TimeseriesAspectService timeseriesAspectService; private final RollbackService rollbackService; private final EventProducer eventProducer; - private final int batchGetV2Size; + private final EntityClientConfig entityClientConfig; @Override @Nullable @@ -132,7 +133,7 @@ public Map batchGetV2( Map responseMap = new HashMap<>(); - Iterators.partition(urns.iterator(), Math.max(1, batchGetV2Size)) + Iterators.partition(urns.iterator(), Math.max(1, entityClientConfig.getBatchGetV2Size())) .forEachRemaining( batch -> { try { @@ -159,7 +160,8 @@ public Map batchGetVersionedV2( Map responseMap = new HashMap<>(); - Iterators.partition(versionedUrns.iterator(), Math.max(1, batchGetV2Size)) + Iterators.partition( + versionedUrns.iterator(), Math.max(1, entityClientConfig.getBatchGetV2Size())) .forEachRemaining( batch -> { try { @@ -760,48 +762,62 @@ public List batchIngestProposals( : Constants.UNKNOWN_ACTOR; final AuditStamp auditStamp = AuditStampUtils.createAuditStamp(actorUrnStr); - AspectsBatch batch = - AspectsBatchImpl.builder() - .mcps( - metadataChangeProposals, - auditStamp, - opContext.getRetrieverContext().get(), - opContext.getValidationContext().isAlternateValidation()) - .build(); - - List results = entityService.ingestProposal(opContext, batch, async); - entitySearchService.appendRunId(opContext, results); - - Map, List> resultMap = - results.stream() - .collect( - Collectors.groupingBy( - result -> - Pair.of( - result.getRequest().getUrn(), result.getRequest().getAspectName()))); - - // Preserve ordering - return batch.getItems().stream() - .map( - requestItem -> { - // Urns generated - List urnsForRequest = - resultMap - .getOrDefault( - Pair.of(requestItem.getUrn(), requestItem.getAspectName()), List.of()) - .stream() - .map(IngestResult::getUrn) - .filter(Objects::nonNull) - .distinct() - .collect(Collectors.toList()); - - // Update runIds - urnsForRequest.forEach( - urn -> tryIndexRunId(opContext, urn, requestItem.getSystemMetadata())); - - return urnsForRequest.isEmpty() ? null : urnsForRequest.get(0).toString(); - }) - .collect(Collectors.toList()); + List updatedUrns = new ArrayList<>(); + Iterators.partition( + metadataChangeProposals.iterator(), Math.max(1, entityClientConfig.getBatchGetV2Size())) + .forEachRemaining( + batch -> { + AspectsBatch aspectsBatch = + AspectsBatchImpl.builder() + .mcps( + batch, + auditStamp, + opContext.getRetrieverContext().get(), + opContext.getValidationContext().isAlternateValidation()) + .build(); + + List results = + entityService.ingestProposal(opContext, aspectsBatch, async); + entitySearchService.appendRunId(opContext, results); + + Map, List> resultMap = + results.stream() + .collect( + Collectors.groupingBy( + result -> + Pair.of( + result.getRequest().getUrn(), + result.getRequest().getAspectName()))); + + // Preserve ordering + updatedUrns.addAll( + aspectsBatch.getItems().stream() + .map( + requestItem -> { + // Urns generated + List urnsForRequest = + resultMap + .getOrDefault( + Pair.of(requestItem.getUrn(), requestItem.getAspectName()), + List.of()) + .stream() + .map(IngestResult::getUrn) + .filter(Objects::nonNull) + .distinct() + .collect(Collectors.toList()); + + // Update runIds + urnsForRequest.forEach( + urn -> + tryIndexRunId(opContext, urn, requestItem.getSystemMetadata())); + + return urnsForRequest.isEmpty() + ? null + : urnsForRequest.get(0).toString(); + }) + .collect(Collectors.toList())); + }); + return updatedUrns; } @SneakyThrows diff --git a/metadata-io/src/main/java/com/linkedin/metadata/client/SystemJavaEntityClient.java b/metadata-io/src/main/java/com/linkedin/metadata/client/SystemJavaEntityClient.java index ab68abc69bce7c..eda9b3a880228f 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/client/SystemJavaEntityClient.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/client/SystemJavaEntityClient.java @@ -5,6 +5,7 @@ import com.linkedin.common.urn.Urn; import com.linkedin.entity.EntityResponse; import com.linkedin.entity.client.EntityClientCache; +import com.linkedin.entity.client.EntityClientConfig; import com.linkedin.entity.client.SystemEntityClient; import com.linkedin.metadata.config.cache.client.EntityClientCacheConfig; import com.linkedin.metadata.entity.DeleteEntityService; @@ -43,7 +44,7 @@ public SystemJavaEntityClient( RollbackService rollbackService, EventProducer eventProducer, EntityClientCacheConfig cacheConfig, - int batchGetV2Size) { + EntityClientConfig entityClientConfig) { super( entityService, deleteEntityService, @@ -54,7 +55,7 @@ public SystemJavaEntityClient( timeseriesAspectService, rollbackService, eventProducer, - batchGetV2Size); + entityClientConfig); this.operationContextMap = CacheBuilder.newBuilder().maximumSize(500).build(); this.entityClientCache = buildEntityClientCache(SystemJavaEntityClient.class, cacheConfig); } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/request/AggregationQueryBuilder.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/request/AggregationQueryBuilder.java index 39f69ed1716abd..60ca7649331a00 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/request/AggregationQueryBuilder.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/request/AggregationQueryBuilder.java @@ -379,7 +379,7 @@ private void addCriteriaFiltersToAggregationMetadata( } } - private void addCriterionFiltersToAggregationMetadata( + public void addCriterionFiltersToAggregationMetadata( @Nonnull final Criterion criterion, @Nonnull final List aggregationMetadata, @Nullable AspectRetriever aspectRetriever) { @@ -422,6 +422,17 @@ private void addCriterionFiltersToAggregationMetadata( value -> addMissingAggregationValueToAggregationMetadata(value, originalAggMetadata)); } + } else if (aggregationMetadataMap.containsKey(criterion.getField())) { + /* + * If we already have aggregations for the facet field (original field name), simply inject any missing values counts into the set. + * If there are no results for a particular facet value, it will NOT be in the original aggregation set returned by + * Elasticsearch. + */ + AggregationMetadata originalAggMetadata = aggregationMetadataMap.get(criterion.getField()); + criterion + .getValues() + .forEach( + value -> addMissingAggregationValueToAggregationMetadata(value, originalAggMetadata)); } else { /* * If we do not have ANY aggregation for the facet field, then inject a new aggregation metadata object for the @@ -429,10 +440,14 @@ private void addCriterionFiltersToAggregationMetadata( * If there are no results for a particular facet, it will NOT be in the original aggregation set returned by * Elasticsearch. */ + // Simply replace suffix from original field when there are no aggregations for it. Prevents + // bug where ES mappings for field are different from how we map the field back to UI + // (ie. Structured Properties with dots in them) + String facetField = ESUtils.replaceSuffix(criterion.getField()); aggregationMetadata.add( buildAggregationMetadata( - finalFacetField, - getFacetToDisplayNames().getOrDefault(finalFacetField, finalFacetField), + facetField, + getFacetToDisplayNames().getOrDefault(facetField, facetField), new LongMap( criterion.getValues().stream().collect(Collectors.toMap(i -> i, i -> 0L))), new FilterValueArray( diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/utils/ESUtils.java b/metadata-io/src/main/java/com/linkedin/metadata/search/utils/ESUtils.java index 9698a1c10d8b54..17bbbaf059dec4 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/utils/ESUtils.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/utils/ESUtils.java @@ -448,9 +448,20 @@ public static String toParentField( urnDefinition.getFirst(), urnDefinition.getSecond())) .orElse(filterField); + return replaceSuffix(fieldName); + } + + /** + * Strip subfields from filter field + * + * @param fieldName name of the field + * @return normalized field name without subfields + */ + @Nonnull + public static String replaceSuffix(@Nonnull final String fieldName) { for (String subfield : SUBFIELDS) { String SUFFIX = "." + subfield; - if (filterField.endsWith(SUFFIX)) { + if (fieldName.endsWith(SUFFIX)) { return fieldName.replace(SUFFIX, ""); } } @@ -710,7 +721,8 @@ private static QueryBuilder buildEqualsConditionFromCriterionWithValues( final Map> searchableFieldTypes, @Nonnull AspectRetriever aspectRetriever, boolean enableCaseInsensitiveSearch) { - Set fieldTypes = getFieldTypes(searchableFieldTypes, fieldName, aspectRetriever); + Set fieldTypes = + getFieldTypes(searchableFieldTypes, fieldName, criterion, aspectRetriever); if (fieldTypes.size() > 1) { log.warn( "Multiple field types for field name {}, determining best fit for set: {}", @@ -753,12 +765,16 @@ private static QueryBuilder buildEqualsConditionFromCriterionWithValues( private static Set getFieldTypes( Map> searchableFields, String fieldName, + @Nonnull final Criterion criterion, @Nullable AspectRetriever aspectRetriever) { final Set finalFieldTypes; if (fieldName.startsWith(STRUCTURED_PROPERTY_MAPPING_FIELD_PREFIX)) { + // use criterion field here for structured props since fieldName has dots replaced with + // underscores finalFieldTypes = - StructuredPropertyUtils.toElasticsearchFieldType(fieldName, aspectRetriever); + StructuredPropertyUtils.toElasticsearchFieldType( + replaceSuffix(criterion.getField()), aspectRetriever); } else { Set fieldTypes = searchableFields.getOrDefault(fieldName.split("\\.")[0], Collections.emptySet()); @@ -782,7 +798,8 @@ private static RangeQueryBuilder buildRangeQueryFromCriterion( Condition condition, boolean isTimeseries, AspectRetriever aspectRetriever) { - Set fieldTypes = getFieldTypes(searchableFieldTypes, fieldName, aspectRetriever); + Set fieldTypes = + getFieldTypes(searchableFieldTypes, fieldName, criterion, aspectRetriever); // Determine criterion value, range query only accepts single value so take first value in // values if multiple diff --git a/metadata-io/src/test/java/com/linkedin/metadata/client/JavaEntityClientTest.java b/metadata-io/src/test/java/com/linkedin/metadata/client/JavaEntityClientTest.java index 7b1fccafbb9e63..4d977d179f91e4 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/client/JavaEntityClientTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/client/JavaEntityClientTest.java @@ -12,6 +12,7 @@ import com.linkedin.common.urn.UrnUtils; import com.linkedin.data.template.RequiredFieldNotPresentException; import com.linkedin.domain.Domains; +import com.linkedin.entity.client.EntityClientConfig; import com.linkedin.events.metadata.ChangeType; import com.linkedin.metadata.Constants; import com.linkedin.metadata.aspect.batch.AspectsBatch; @@ -90,7 +91,7 @@ private JavaEntityClient getJavaEntityClient() { _timeseriesAspectService, rollbackService, _eventProducer, - 1); + EntityClientConfig.builder().batchGetV2Size(1).build()); } @Test diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/query/request/AggregationQueryBuilderTest.java b/metadata-io/src/test/java/com/linkedin/metadata/search/query/request/AggregationQueryBuilderTest.java index cef463802a6b14..3969223981ec3f 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/query/request/AggregationQueryBuilderTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/query/request/AggregationQueryBuilderTest.java @@ -3,6 +3,7 @@ import static com.linkedin.metadata.Constants.DATA_TYPE_URN_PREFIX; import static com.linkedin.metadata.Constants.STRUCTURED_PROPERTY_DEFINITION_ASPECT_NAME; import static com.linkedin.metadata.utils.SearchUtil.*; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anySet; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; @@ -12,23 +13,36 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.data.DataMap; +import com.linkedin.data.template.LongMap; import com.linkedin.data.template.SetMode; +import com.linkedin.data.template.StringArray; import com.linkedin.entity.Aspect; import com.linkedin.metadata.aspect.AspectRetriever; import com.linkedin.metadata.config.search.SearchConfiguration; import com.linkedin.metadata.models.EntitySpec; import com.linkedin.metadata.models.annotation.SearchableAnnotation; +import com.linkedin.metadata.query.filter.Condition; +import com.linkedin.metadata.query.filter.Criterion; +import com.linkedin.metadata.search.AggregationMetadata; +import com.linkedin.metadata.search.FilterValue; +import com.linkedin.metadata.search.FilterValueArray; import com.linkedin.metadata.search.elasticsearch.query.request.AggregationQueryBuilder; import com.linkedin.r2.RemoteInvocationException; import com.linkedin.structured.StructuredPropertyDefinition; import io.datahubproject.test.metadata.context.TestOperationContexts; import java.net.URISyntaxException; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; +import org.mockito.Mockito; import org.opensearch.search.aggregations.AggregationBuilder; import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.testng.Assert; @@ -598,4 +612,94 @@ public void testMissingAggregation() { .equals( MISSING_SPECIAL_TYPE + AGGREGATION_SPECIAL_TYPE_DELIMITER + "test"))); } + + @Test + public void testAddFiltersToMetadataWithStructuredPropsNoResults() { + final Urn propertyUrn = UrnUtils.getUrn("urn:li:structuredProperty:test_me.one"); + + SearchConfiguration config = new SearchConfiguration(); + config.setMaxTermBucketSize(25); + + AggregationQueryBuilder builder = + new AggregationQueryBuilder( + config, ImmutableMap.of(mock(EntitySpec.class), ImmutableList.of())); + + Criterion criterion = + new Criterion() + .setField("structuredProperties.test_me.one") + .setValues(new StringArray("test123")) + .setCondition(Condition.EQUAL); + + AspectRetriever mockAspectRetriever = getMockAspectRetriever(propertyUrn); + + final List aggregationMetadataList = new ArrayList<>(); + builder.addCriterionFiltersToAggregationMetadata( + criterion, aggregationMetadataList, mockAspectRetriever); + + // ensure we add the correct structured prop aggregation here + Assert.assertEquals(aggregationMetadataList.size(), 1); + // Assert.assertEquals(aggregationMetadataList.get(0).getEntity(), propertyUrn); + Assert.assertEquals( + aggregationMetadataList.get(0).getName(), "structuredProperties.test_me.one"); + Assert.assertEquals(aggregationMetadataList.get(0).getAggregations().size(), 1); + Assert.assertEquals(aggregationMetadataList.get(0).getAggregations().get("test123"), 0); + } + + @Test + public void testAddFiltersToMetadataWithStructuredPropsWithAggregations() { + final Urn propertyUrn = UrnUtils.getUrn("urn:li:structuredProperty:test_me.one"); + + final AggregationMetadata aggregationMetadata = new AggregationMetadata(); + aggregationMetadata.setName("structuredProperties.test_me.one"); + FilterValue filterValue = + new FilterValue().setValue("test123").setFiltered(false).setFacetCount(1); + aggregationMetadata.setFilterValues(new FilterValueArray(filterValue)); + LongMap aggregations = new LongMap(); + aggregations.put("test123", 1L); + aggregationMetadata.setAggregations(aggregations); + + SearchConfiguration config = new SearchConfiguration(); + config.setMaxTermBucketSize(25); + + AggregationQueryBuilder builder = + new AggregationQueryBuilder( + config, ImmutableMap.of(mock(EntitySpec.class), ImmutableList.of())); + + Criterion criterion = + new Criterion() + .setField("structuredProperties.test_me.one") + .setValues(new StringArray("test123")) + .setCondition(Condition.EQUAL); + + AspectRetriever mockAspectRetriever = getMockAspectRetriever(propertyUrn); + + final List aggregationMetadataList = new ArrayList<>(); + aggregationMetadataList.add(aggregationMetadata); + builder.addCriterionFiltersToAggregationMetadata( + criterion, aggregationMetadataList, mockAspectRetriever); + + Assert.assertEquals(aggregationMetadataList.size(), 1); + Assert.assertEquals( + aggregationMetadataList.get(0).getName(), "structuredProperties.test_me.one"); + Assert.assertEquals(aggregationMetadataList.get(0).getAggregations().size(), 1); + Assert.assertEquals(aggregationMetadataList.get(0).getAggregations().get("test123"), 1); + } + + private AspectRetriever getMockAspectRetriever(Urn propertyUrn) { + AspectRetriever mockAspectRetriever = Mockito.mock(AspectRetriever.class); + Map> mockResult = new HashMap<>(); + Map aspectMap = new HashMap<>(); + DataMap definition = new DataMap(); + definition.put("qualifiedName", "test_me.one"); + definition.put("valueType", "urn:li:dataType:datahub.string"); + Aspect definitionAspect = new Aspect(definition); + aspectMap.put(STRUCTURED_PROPERTY_DEFINITION_ASPECT_NAME, definitionAspect); + mockResult.put(propertyUrn, aspectMap); + Set urns = new HashSet<>(); + urns.add(propertyUrn); + Mockito.when(mockAspectRetriever.getLatestAspectObjects(eq(urns), any())) + .thenReturn(mockResult); + + return mockAspectRetriever; + } } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/search/utils/ESUtilsTest.java b/metadata-io/src/test/java/com/linkedin/metadata/search/utils/ESUtilsTest.java index 54a9e7d8b47bda..4f2bda39ad2117 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/search/utils/ESUtilsTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/search/utils/ESUtilsTest.java @@ -45,6 +45,7 @@ public static void setup() throws RemoteInvocationException, URISyntaxException Urn abFghTenUrn = Urn.createFromString("urn:li:structuredProperty:ab.fgh.ten"); Urn underscoresAndDotsUrn = Urn.createFromString("urn:li:structuredProperty:under.scores.and.dots_make_a_mess"); + Urn dateWithDotsUrn = Urn.createFromString("urn:li:structuredProperty:date_here.with_dot"); // legacy aspectRetriever = mock(AspectRetriever.class); @@ -64,6 +65,18 @@ public static void setup() throws RemoteInvocationException, URISyntaxException STRUCTURED_PROPERTY_DEFINITION_ASPECT_NAME, new Aspect(structPropAbFghTenDefinition.data())))); + StructuredPropertyDefinition dateWithDotsDefinition = new StructuredPropertyDefinition(); + dateWithDotsDefinition.setVersion(null, SetMode.REMOVE_IF_NULL); + dateWithDotsDefinition.setValueType(Urn.createFromString(DATA_TYPE_URN_PREFIX + "date")); + dateWithDotsDefinition.setQualifiedName("date_here.with_dot"); + when(aspectRetriever.getLatestAspectObjects(eq(Set.of(dateWithDotsUrn)), anySet())) + .thenReturn( + Map.of( + dateWithDotsUrn, + Map.of( + STRUCTURED_PROPERTY_DEFINITION_ASPECT_NAME, + new Aspect(dateWithDotsDefinition.data())))); + StructuredPropertyDefinition structPropUnderscoresAndDotsDefinition = new StructuredPropertyDefinition(); structPropUnderscoresAndDotsDefinition.setVersion(null, SetMode.REMOVE_IF_NULL); @@ -895,6 +908,36 @@ public void testGetQueryBuilderFromNamespacedStructPropEqualsValueV1() { Assert.assertEquals(result.toString(), expected); } + @Test + public void testGetQueryBuilderFromDatesWithDots() { + + final Criterion singleValueCriterion = + buildCriterion( + "structuredProperties.date_here.with_dot", Condition.GREATER_THAN, "1731974400000"); + + OperationContext opContext = mock(OperationContext.class); + when(opContext.getAspectRetriever()).thenReturn(aspectRetriever); + QueryBuilder result = + ESUtils.getQueryBuilderFromCriterion( + singleValueCriterion, false, new HashMap<>(), opContext, QueryFilterRewriteChain.EMPTY); + // structuredProperties.date_here_with_dot should not have .keyword at the end since this field + // type is type long for dates + String expected = + "{\n" + + " \"range\" : {\n" + + " \"structuredProperties.date_here_with_dot\" : {\n" + + " \"from\" : 1731974400000,\n" + + " \"to\" : null,\n" + + " \"include_lower\" : false,\n" + + " \"include_upper\" : true,\n" + + " \"boost\" : 1.0,\n" + + " \"_name\" : \"structuredProperties.date_here.with_dot\"\n" + + " }\n" + + " }\n" + + "}"; + Assert.assertEquals(result.toString(), expected); + } + @Test public void testGetQueryBuilderFromStructPropExists() { final Criterion singleValueCriterion = buildExistsCriterion("structuredProperties.ab.fgh.ten"); diff --git a/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SampleDataFixtureConfiguration.java b/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SampleDataFixtureConfiguration.java index e47cdf80281c9a..d5aa7e9c51983a 100644 --- a/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SampleDataFixtureConfiguration.java +++ b/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SampleDataFixtureConfiguration.java @@ -7,6 +7,7 @@ import static org.mockito.Mockito.when; import com.linkedin.entity.client.EntityClient; +import com.linkedin.entity.client.EntityClientConfig; import com.linkedin.metadata.client.JavaEntityClient; import com.linkedin.metadata.config.PreProcessHooks; import com.linkedin.metadata.config.cache.EntityDocCountCacheConfiguration; @@ -330,6 +331,6 @@ private EntityClient entityClientHelper( null, null, null, - 1); + EntityClientConfig.builder().batchGetV2Size(1).build()); } } diff --git a/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SearchLineageFixtureConfiguration.java b/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SearchLineageFixtureConfiguration.java index 889473d32d1a35..b7b698c73ddac3 100644 --- a/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SearchLineageFixtureConfiguration.java +++ b/metadata-io/src/test/java/io/datahubproject/test/fixtures/search/SearchLineageFixtureConfiguration.java @@ -4,6 +4,7 @@ import static io.datahubproject.test.search.SearchTestUtils.getGraphQueryConfiguration; import com.linkedin.entity.client.EntityClient; +import com.linkedin.entity.client.EntityClientConfig; import com.linkedin.metadata.client.JavaEntityClient; import com.linkedin.metadata.config.DataHubAppConfiguration; import com.linkedin.metadata.config.MetadataChangeProposalConfig; @@ -276,6 +277,6 @@ protected EntityClient entityClient( null, null, null, - 1); + EntityClientConfig.builder().batchGetV2Size(1).build()); } } diff --git a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java index ef87afdef46cb7..4e356f5fb3670a 100644 --- a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java +++ b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java @@ -15,26 +15,21 @@ import com.linkedin.gms.factory.config.ConfigurationProvider; import com.linkedin.gms.factory.entityclient.RestliEntityClientFactory; import com.linkedin.metadata.EventUtils; -import com.linkedin.metadata.dao.throttle.ThrottleControl; import com.linkedin.metadata.dao.throttle.ThrottleSensor; import com.linkedin.metadata.kafka.config.MetadataChangeProposalProcessorCondition; +import com.linkedin.metadata.kafka.util.KafkaListenerUtil; import com.linkedin.metadata.utils.metrics.MetricUtils; -import com.linkedin.mxe.FailedMetadataChangeProposal; import com.linkedin.mxe.MetadataChangeProposal; import com.linkedin.mxe.Topics; import io.datahubproject.metadata.context.OperationContext; -import java.io.IOException; import java.util.Optional; -import javax.annotation.Nonnull; import javax.annotation.PostConstruct; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; -import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; import org.slf4j.MDC; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; @@ -43,7 +38,6 @@ import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; -import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.stereotype.Component; @Slf4j @@ -80,38 +74,7 @@ public class MetadataChangeProposalsProcessor { @PostConstruct public void registerConsumerThrottle() { - if (kafkaThrottle != null - && provider - .getMetadataChangeProposal() - .getThrottle() - .getComponents() - .getMceConsumer() - .isEnabled()) { - log.info("MCE Consumer Throttle Enabled"); - kafkaThrottle.addCallback( - (throttleEvent) -> { - Optional container = - Optional.ofNullable(registry.getListenerContainer(mceConsumerGroupId)); - if (container.isEmpty()) { - log.warn( - "Expected container was missing: {} throttle is not possible.", - mceConsumerGroupId); - } else { - if (throttleEvent.isThrottled()) { - container.ifPresent(MessageListenerContainer::pause); - return ThrottleControl.builder() - // resume consumer after sleep - .callback( - (resumeEvent) -> container.ifPresent(MessageListenerContainer::resume)) - .build(); - } - } - - return ThrottleControl.NONE; - }); - } else { - log.info("MCE Consumer Throttle Disabled"); - } + KafkaListenerUtil.registerThrottle(kafkaThrottle, provider, registry, mceConsumerGroupId); } @KafkaListener( @@ -132,7 +95,9 @@ public void consume(final ConsumerRecord consumerRecord) consumerRecord.serializedValueSize(), consumerRecord.timestamp()); - log.debug("Record {}", record); + if (log.isDebugEnabled()) { + log.debug("Record {}", record); + } MetadataChangeProposal event = new MetadataChangeProposal(); try { @@ -148,45 +113,18 @@ public void consume(final ConsumerRecord consumerRecord) MDC.put( MDC_CHANGE_TYPE, Optional.ofNullable(changeType).map(ChangeType::toString).orElse("")); - log.debug("MetadataChangeProposal {}", event); - // TODO: Get this from the event itself. + if (log.isDebugEnabled()) { + log.debug("MetadataChangeProposal {}", event); + } String urn = entityClient.ingestProposal(systemOperationContext, event, false); log.info("Successfully processed MCP event urn: {}", urn); } catch (Throwable throwable) { log.error("MCP Processor Error", throwable); log.error("Message: {}", record); - sendFailedMCP(event, throwable); + KafkaListenerUtil.sendFailedMCP(event, throwable, fmcpTopicName, kafkaProducer); } } finally { MDC.clear(); } } - - private void sendFailedMCP(@Nonnull MetadataChangeProposal event, @Nonnull Throwable throwable) { - final FailedMetadataChangeProposal failedMetadataChangeProposal = - createFailedMCPEvent(event, throwable); - try { - final GenericRecord genericFailedMCERecord = - EventUtils.pegasusToAvroFailedMCP(failedMetadataChangeProposal); - log.debug("Sending FailedMessages to topic - {}", fmcpTopicName); - log.info( - "Error while processing FMCP: FailedMetadataChangeProposal - {}", - failedMetadataChangeProposal); - kafkaProducer.send(new ProducerRecord<>(fmcpTopicName, genericFailedMCERecord)); - } catch (IOException e) { - log.error( - "Error while sending FailedMetadataChangeProposal: Exception - {}, FailedMetadataChangeProposal - {}", - e.getStackTrace(), - failedMetadataChangeProposal); - } - } - - @Nonnull - private FailedMetadataChangeProposal createFailedMCPEvent( - @Nonnull MetadataChangeProposal event, @Nonnull Throwable throwable) { - final FailedMetadataChangeProposal fmcp = new FailedMetadataChangeProposal(); - fmcp.setError(ExceptionUtils.getStackTrace(throwable)); - fmcp.setMetadataChangeProposal(event); - return fmcp; - } } diff --git a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/batch/BatchMetadataChangeProposalsProcessor.java b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/batch/BatchMetadataChangeProposalsProcessor.java new file mode 100644 index 00000000000000..fed93628fe4d79 --- /dev/null +++ b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/batch/BatchMetadataChangeProposalsProcessor.java @@ -0,0 +1,116 @@ +package com.linkedin.metadata.kafka.batch; + +import com.codahale.metrics.Histogram; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; +import com.linkedin.entity.client.SystemEntityClient; +import com.linkedin.gms.factory.config.ConfigurationProvider; +import com.linkedin.gms.factory.entityclient.RestliEntityClientFactory; +import com.linkedin.metadata.EventUtils; +import com.linkedin.metadata.dao.throttle.ThrottleSensor; +import com.linkedin.metadata.kafka.config.batch.BatchMetadataChangeProposalProcessorCondition; +import com.linkedin.metadata.kafka.util.KafkaListenerUtil; +import com.linkedin.metadata.utils.metrics.MetricUtils; +import com.linkedin.mxe.MetadataChangeProposal; +import com.linkedin.mxe.Topics; +import io.datahubproject.metadata.context.OperationContext; +import java.util.ArrayList; +import java.util.List; +import javax.annotation.PostConstruct; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.Producer; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Conditional; +import org.springframework.context.annotation.Import; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +@Import({RestliEntityClientFactory.class}) +@Conditional(BatchMetadataChangeProposalProcessorCondition.class) +@EnableKafka +@RequiredArgsConstructor +public class BatchMetadataChangeProposalsProcessor { + private static final String CONSUMER_GROUP_ID_VALUE = + "${METADATA_CHANGE_PROPOSAL_KAFKA_CONSUMER_GROUP_ID:generic-mce-consumer-job-client}"; + + private final OperationContext systemOperationContext; + private final SystemEntityClient entityClient; + private final Producer kafkaProducer; + + @Qualifier("kafkaThrottle") + private final ThrottleSensor kafkaThrottle; + + private final KafkaListenerEndpointRegistry registry; + private final ConfigurationProvider provider; + + private final Histogram kafkaLagStats = + MetricUtils.get().histogram(MetricRegistry.name(this.getClass(), "kafkaLag")); + + @Value( + "${FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME:" + + Topics.FAILED_METADATA_CHANGE_PROPOSAL + + "}") + private String fmcpTopicName; + + @Value(CONSUMER_GROUP_ID_VALUE) + private String mceConsumerGroupId; + + @PostConstruct + public void registerConsumerThrottle() { + KafkaListenerUtil.registerThrottle(kafkaThrottle, provider, registry, mceConsumerGroupId); + } + + @KafkaListener( + id = CONSUMER_GROUP_ID_VALUE, + topics = "${METADATA_CHANGE_PROPOSAL_TOPIC_NAME:" + Topics.METADATA_CHANGE_PROPOSAL + "}", + containerFactory = "kafkaEventConsumer", + batch = "true") + public void consume(final List> consumerRecords) { + try (Timer.Context ignored = MetricUtils.timer(this.getClass(), "consume").time()) { + List metadataChangeProposals = + new ArrayList<>(consumerRecords.size()); + for (ConsumerRecord consumerRecord : consumerRecords) { + kafkaLagStats.update(System.currentTimeMillis() - consumerRecord.timestamp()); + final GenericRecord record = consumerRecord.value(); + + log.info( + "Got MCP event key: {}, topic: {}, partition: {}, offset: {}, value size: {}, timestamp: {}", + consumerRecord.key(), + consumerRecord.topic(), + consumerRecord.partition(), + consumerRecord.offset(), + consumerRecord.serializedValueSize(), + consumerRecord.timestamp()); + + MetadataChangeProposal event = new MetadataChangeProposal(); + try { + event = EventUtils.avroToPegasusMCP(record); + } catch (Throwable throwable) { + log.error("MCP Processor Error", throwable); + log.error("Message: {}", record); + KafkaListenerUtil.sendFailedMCP(event, throwable, fmcpTopicName, kafkaProducer); + } + metadataChangeProposals.add(event); + } + + try { + List urns = + entityClient.batchIngestProposals( + systemOperationContext, metadataChangeProposals, false); + log.info("Successfully processed MCP event urns: {}", urns); + } catch (Exception e) { + // Java client should never throw this + log.error("Exception in batch ingest", e); + } + } + } +} diff --git a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/config/MetadataChangeProposalProcessorCondition.java b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/config/MetadataChangeProposalProcessorCondition.java index 1cdb05b04e0ac9..554684d5e8fe77 100644 --- a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/config/MetadataChangeProposalProcessorCondition.java +++ b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/config/MetadataChangeProposalProcessorCondition.java @@ -9,7 +9,8 @@ public class MetadataChangeProposalProcessorCondition implements Condition { @Override public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) { Environment env = context.getEnvironment(); - return "true".equals(env.getProperty("MCE_CONSUMER_ENABLED")) - || "true".equals(env.getProperty("MCP_CONSUMER_ENABLED")); + return ("true".equals(env.getProperty("MCE_CONSUMER_ENABLED")) + || "true".equals(env.getProperty("MCP_CONSUMER_ENABLED"))) + && !Boolean.parseBoolean(env.getProperty("MCP_CONSUMER_BATCH_ENABLED")); } } diff --git a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/config/batch/BatchMetadataChangeProposalProcessorCondition.java b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/config/batch/BatchMetadataChangeProposalProcessorCondition.java new file mode 100644 index 00000000000000..296e37c7a90695 --- /dev/null +++ b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/config/batch/BatchMetadataChangeProposalProcessorCondition.java @@ -0,0 +1,16 @@ +package com.linkedin.metadata.kafka.config.batch; + +import org.springframework.context.annotation.Condition; +import org.springframework.context.annotation.ConditionContext; +import org.springframework.core.env.Environment; +import org.springframework.core.type.AnnotatedTypeMetadata; + +public class BatchMetadataChangeProposalProcessorCondition implements Condition { + @Override + public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) { + Environment env = context.getEnvironment(); + return ("true".equals(env.getProperty("MCE_CONSUMER_ENABLED")) + || "true".equals(env.getProperty("MCP_CONSUMER_ENABLED"))) + && Boolean.parseBoolean(env.getProperty("MCP_CONSUMER_BATCH_ENABLED")); + } +} diff --git a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/util/KafkaListenerUtil.java b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/util/KafkaListenerUtil.java new file mode 100644 index 00000000000000..874a45c995e911 --- /dev/null +++ b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/util/KafkaListenerUtil.java @@ -0,0 +1,96 @@ +package com.linkedin.metadata.kafka.util; + +import com.linkedin.gms.factory.config.ConfigurationProvider; +import com.linkedin.metadata.EventUtils; +import com.linkedin.metadata.dao.throttle.ThrottleControl; +import com.linkedin.metadata.dao.throttle.ThrottleSensor; +import com.linkedin.mxe.FailedMetadataChangeProposal; +import com.linkedin.mxe.MetadataChangeProposal; +import java.io.IOException; +import java.util.Optional; +import javax.annotation.Nonnull; +import lombok.extern.slf4j.Slf4j; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.listener.MessageListenerContainer; + +@Slf4j +public class KafkaListenerUtil { + + private KafkaListenerUtil() {} + + public static void registerThrottle( + ThrottleSensor kafkaThrottle, + ConfigurationProvider provider, + KafkaListenerEndpointRegistry registry, + String mceConsumerGroupId) { + if (kafkaThrottle != null + && provider + .getMetadataChangeProposal() + .getThrottle() + .getComponents() + .getMceConsumer() + .isEnabled()) { + log.info("MCE Consumer Throttle Enabled"); + kafkaThrottle.addCallback( + (throttleEvent) -> { + Optional container = + Optional.ofNullable(registry.getListenerContainer(mceConsumerGroupId)); + if (container.isEmpty()) { + log.warn( + "Expected container was missing: {} throttle is not possible.", + mceConsumerGroupId); + } else { + if (throttleEvent.isThrottled()) { + container.ifPresent(MessageListenerContainer::pause); + return ThrottleControl.builder() + // resume consumer after sleep + .callback( + (resumeEvent) -> container.ifPresent(MessageListenerContainer::resume)) + .build(); + } + } + + return ThrottleControl.NONE; + }); + } else { + log.info("MCE Consumer Throttle Disabled"); + } + } + + public static void sendFailedMCP( + @Nonnull MetadataChangeProposal event, + @Nonnull Throwable throwable, + String fmcpTopicName, + Producer kafkaProducer) { + final FailedMetadataChangeProposal failedMetadataChangeProposal = + createFailedMCPEvent(event, throwable); + try { + final GenericRecord genericFailedMCERecord = + EventUtils.pegasusToAvroFailedMCP(failedMetadataChangeProposal); + log.debug("Sending FailedMessages to topic - {}", fmcpTopicName); + log.info( + "Error while processing FMCP: FailedMetadataChangeProposal - {}", + failedMetadataChangeProposal); + kafkaProducer.send(new ProducerRecord<>(fmcpTopicName, genericFailedMCERecord)); + } catch (IOException e) { + log.error( + "Error while sending FailedMetadataChangeProposal: Exception - {}, FailedMetadataChangeProposal - {}", + e.getStackTrace(), + failedMetadataChangeProposal); + } + } + + @Nonnull + public static FailedMetadataChangeProposal createFailedMCPEvent( + @Nonnull MetadataChangeProposal event, @Nonnull Throwable throwable) { + final FailedMetadataChangeProposal fmcp = new FailedMetadataChangeProposal(); + fmcp.setError(ExceptionUtils.getStackTrace(throwable)); + fmcp.setMetadataChangeProposal(event); + return fmcp; + } +} diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityclient/JavaEntityClientFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityclient/JavaEntityClientFactory.java index e99978a26d6cf5..e783b4e1963d0a 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityclient/JavaEntityClientFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityclient/JavaEntityClientFactory.java @@ -50,7 +50,7 @@ public EntityClient entityClient( _timeseriesAspectService, rollbackService, _eventProducer, - entityClientConfig.getBatchGetV2Size()); + entityClientConfig); } @Bean("systemEntityClient") @@ -79,6 +79,6 @@ public SystemEntityClient systemEntityClient( rollbackService, _eventProducer, entityClientCacheConfig, - entityClientConfig.getBatchGetV2Size()); + entityClientConfig); } }