From edb87ff2ea896abdfaeba483a573e20f25917bd7 Mon Sep 17 00:00:00 2001 From: ronybony1990 <33382997+ronybony1990@users.noreply.github.com> Date: Tue, 5 Nov 2024 03:18:15 -0700 Subject: [PATCH 1/3] feat(spark-plugin): user should be able to pass custom mcp kafka topic (#11767) Co-authored-by: Neelab Chaudhuri --- .../datahub/spark/DatahubEventEmitter.java | 5 ++++- .../datahub/spark/DatahubSparkListener.java | 8 ++++++-- .../spark/conf/KafkaDatahubEmitterConfig.java | 8 ++++++++ .../datahub/spark/conf/SparkConfigParser.java | 1 + .../datahub/client/kafka/AvroSerializer.java | 2 +- .../datahub/client/kafka/KafkaEmitter.java | 19 +++++++++++++++---- 6 files changed, 35 insertions(+), 8 deletions(-) diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubEventEmitter.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubEventEmitter.java index 7a5fdeaeb8e0d..0bcc7db9e8740 100644 --- a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubEventEmitter.java +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubEventEmitter.java @@ -82,7 +82,10 @@ private Optional getEmitter() { (KafkaDatahubEmitterConfig) datahubConf.getDatahubEmitterConfig(); try { emitter = - Optional.of(new KafkaEmitter(datahubKafkaEmitterConfig.getKafkaEmitterConfig())); + Optional.of( + new KafkaEmitter( + datahubKafkaEmitterConfig.getKafkaEmitterConfig(), + datahubKafkaEmitterConfig.getMcpTopic())); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java index 52507a682a1f8..ee0938edb5045 100644 --- a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java @@ -188,8 +188,12 @@ public Optional initializeEmitter(Config sparkConf) { }); kafkaEmitterConfig.producerConfig(kafkaConfig); } - - return Optional.of(new KafkaDatahubEmitterConfig(kafkaEmitterConfig.build())); + if (sparkConf.hasPath(SparkConfigParser.KAFKA_MCP_TOPIC)) { + String mcpTopic = sparkConf.getString(SparkConfigParser.KAFKA_MCP_TOPIC); + return Optional.of(new KafkaDatahubEmitterConfig(kafkaEmitterConfig.build(), mcpTopic)); + } else { + return Optional.of(new KafkaDatahubEmitterConfig(kafkaEmitterConfig.build())); + } case "file": log.info("File Emitter Configuration: File emitter will be used"); FileEmitterConfig.FileEmitterConfigBuilder fileEmitterConfig = FileEmitterConfig.builder(); diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/KafkaDatahubEmitterConfig.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/KafkaDatahubEmitterConfig.java index 6ed66dbc9230f..a5f9b59f70846 100644 --- a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/KafkaDatahubEmitterConfig.java +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/KafkaDatahubEmitterConfig.java @@ -1,5 +1,6 @@ package datahub.spark.conf; +import datahub.client.kafka.KafkaEmitter; import datahub.client.kafka.KafkaEmitterConfig; import lombok.Getter; import lombok.Setter; @@ -11,8 +12,15 @@ public class KafkaDatahubEmitterConfig implements DatahubEmitterConfig { final String type = "kafka"; KafkaEmitterConfig kafkaEmitterConfig; + String mcpTopic; public KafkaDatahubEmitterConfig(KafkaEmitterConfig kafkaEmitterConfig) { this.kafkaEmitterConfig = kafkaEmitterConfig; + this.mcpTopic = KafkaEmitter.DEFAULT_MCP_KAFKA_TOPIC; + } + + public KafkaDatahubEmitterConfig(KafkaEmitterConfig kafkaEmitterConfig, String mcpTopic) { + this.kafkaEmitterConfig = kafkaEmitterConfig; + this.mcpTopic = mcpTopic; } } diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkConfigParser.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkConfigParser.java index 630f10b08b411..45ec5365d09b3 100644 --- a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkConfigParser.java +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkConfigParser.java @@ -32,6 +32,7 @@ public class SparkConfigParser { public static final String DISABLE_SSL_VERIFICATION_KEY = "rest.disable_ssl_verification"; public static final String MAX_RETRIES = "rest.max_retries"; public static final String RETRY_INTERVAL_IN_SEC = "rest.retry_interval_in_sec"; + public static final String KAFKA_MCP_TOPIC = "kafka.mcp_topic"; public static final String KAFKA_EMITTER_BOOTSTRAP = "kafka.bootstrap"; public static final String KAFKA_EMITTER_SCHEMA_REGISTRY_URL = "kafka.schema_registry_url"; public static final String KAFKA_EMITTER_SCHEMA_REGISTRY_CONFIG = "kafka.schema_registry_config"; diff --git a/metadata-integration/java/datahub-client/src/main/java/datahub/client/kafka/AvroSerializer.java b/metadata-integration/java/datahub-client/src/main/java/datahub/client/kafka/AvroSerializer.java index 0d0341562e7dd..e8ceeab696321 100644 --- a/metadata-integration/java/datahub-client/src/main/java/datahub/client/kafka/AvroSerializer.java +++ b/metadata-integration/java/datahub-client/src/main/java/datahub/client/kafka/AvroSerializer.java @@ -9,7 +9,7 @@ import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; -class AvroSerializer { +public class AvroSerializer { private final Schema _recordSchema; private final Schema _genericAspectSchema; diff --git a/metadata-integration/java/datahub-client/src/main/java/datahub/client/kafka/KafkaEmitter.java b/metadata-integration/java/datahub-client/src/main/java/datahub/client/kafka/KafkaEmitter.java index d00dc09669045..777d2d5f301d7 100644 --- a/metadata-integration/java/datahub-client/src/main/java/datahub/client/kafka/KafkaEmitter.java +++ b/metadata-integration/java/datahub-client/src/main/java/datahub/client/kafka/KafkaEmitter.java @@ -31,14 +31,25 @@ public class KafkaEmitter implements Emitter { private final Properties kafkaConfigProperties; private AvroSerializer _avroSerializer; private static final int ADMIN_CLIENT_TIMEOUT_MS = 5000; + private final String mcpKafkaTopic; /** * The default constructor * - * @param config - * @throws IOException + * @param config KafkaEmitterConfig + * @throws IOException when Avro Serialization fails */ public KafkaEmitter(KafkaEmitterConfig config) throws IOException { + this(config, DEFAULT_MCP_KAFKA_TOPIC); + } + + /** + * Constructor that takes in KafkaEmitterConfig and mcp Kafka Topic Name + * + * @param config KafkaEmitterConfig + * @throws IOException when Avro Serialization fails + */ + public KafkaEmitter(KafkaEmitterConfig config, String mcpKafkaTopic) throws IOException { this.config = config; kafkaConfigProperties = new Properties(); kafkaConfigProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.config.getBootstrap()); @@ -54,6 +65,7 @@ public KafkaEmitter(KafkaEmitterConfig config) throws IOException { producer = new KafkaProducer<>(kafkaConfigProperties); _avroSerializer = new AvroSerializer(); + this.mcpKafkaTopic = mcpKafkaTopic; } @Override @@ -73,8 +85,7 @@ public Future emit(MetadataChangeProposal mcp, Callback d throws IOException { GenericRecord genricRecord = _avroSerializer.serialize(mcp); ProducerRecord record = - new ProducerRecord<>( - KafkaEmitter.DEFAULT_MCP_KAFKA_TOPIC, mcp.getEntityUrn().toString(), genricRecord); + new ProducerRecord<>(this.mcpKafkaTopic, mcp.getEntityUrn().toString(), genricRecord); org.apache.kafka.clients.producer.Callback callback = new org.apache.kafka.clients.producer.Callback() { From 26529f2b059df025771998bf1fb8e9d9b103b178 Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Tue, 5 Nov 2024 09:42:06 -0600 Subject: [PATCH 2/3] logging(template-mcp): adding more logging around templating (#11786) --- .../bootstrapmcps/BootstrapMCPUtil.java | 39 +++++++++++++++---- 1 file changed, 32 insertions(+), 7 deletions(-) diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/bootstrapmcps/BootstrapMCPUtil.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/bootstrapmcps/BootstrapMCPUtil.java index b8b7e828c16c6..6ebc6e8ec6a3b 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/bootstrapmcps/BootstrapMCPUtil.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/bootstrapmcps/BootstrapMCPUtil.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.stream.Collectors; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.IOUtils; import org.springframework.core.io.ClassPathResource; @@ -109,13 +110,29 @@ static List resolveMCPTemplate( AuditStamp auditStamp) throws IOException { - String template = loadTemplate(mcpTemplate.getMcps_location()); - Mustache mustache = MUSTACHE_FACTORY.compile(new StringReader(template), mcpTemplate.getName()); + final String template = loadTemplate(mcpTemplate.getMcps_location()); Map scopeValues = resolveValues(opContext, mcpTemplate, auditStamp); + StringWriter writer = new StringWriter(); - mustache.execute(writer, scopeValues); + try { + Mustache mustache = + MUSTACHE_FACTORY.compile(new StringReader(template), mcpTemplate.getName()); + mustache.execute(writer, scopeValues); + } catch (Exception e) { + log.error( + "Failed to apply mustache template. Template: {} Values: {}", + template, + resolveEnv(mcpTemplate)); + throw e; + } - return opContext.getYamlMapper().readValue(writer.toString(), new TypeReference<>() {}); + final String yaml = writer.toString(); + try { + return opContext.getYamlMapper().readValue(yaml, new TypeReference<>() {}); + } catch (Exception e) { + log.error("Failed to parse rendered MCP bootstrap yaml: {}", yaml); + throw e; + } } static Map resolveValues( @@ -128,13 +145,21 @@ static Map resolveValues( // built-in scopeValues.put("auditStamp", RecordUtils.toJsonString(auditStamp)); + String envValue = resolveEnv(mcpTemplate); + if (envValue != null) { + scopeValues.putAll(opContext.getObjectMapper().readValue(envValue, new TypeReference<>() {})); + } + return scopeValues; + } + + @Nullable + private static String resolveEnv(BootstrapMCPConfigFile.MCPTemplate mcpTemplate) { if (mcpTemplate.getValues_env() != null && !mcpTemplate.getValues_env().isEmpty() && System.getenv().containsKey(mcpTemplate.getValues_env())) { - String envValue = System.getenv(mcpTemplate.getValues_env()); - scopeValues.putAll(opContext.getObjectMapper().readValue(envValue, new TypeReference<>() {})); + return System.getenv(mcpTemplate.getValues_env()); } - return scopeValues; + return null; } private static String loadTemplate(String source) throws IOException { From 5c5812804b38c1122b68de4b20a49d66c523a401 Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Tue, 5 Nov 2024 09:42:21 -0600 Subject: [PATCH 3/3] feat(entity-client): batch entity-client ingestProposals (#11787) --- datahub-frontend/app/auth/AuthModule.java | 13 +- .../java/com/linkedin/metadata/Constants.java | 2 + .../kafka/hook/spring/MCLMAESpringTest.java | 7 + .../MCLSpringCommonTestConfiguration.java | 6 + ...eConsumerApplicationTestConfiguration.java | 22 +- .../src/main/resources/application.yaml | 9 + .../EntityClientConfigFactory.java | 29 ++ .../entityclient/JavaEntityClientFactory.java | 10 +- .../RestliEntityClientFactory.java | 27 +- .../linkedin/common/client/BaseClient.java | 25 +- .../entity/client/EntityClientConfig.java | 46 +++ .../entity/client/RestliEntityClient.java | 321 ++++++++++-------- .../client/SystemRestliEntityClient.java | 10 +- .../com/linkedin/usage/RestliUsageClient.java | 5 +- .../common/client/BaseClientTest.java | 28 +- .../client/SystemRestliEntityClientTest.java | 54 ++- .../resources/entity/AspectResource.java | 3 +- 17 files changed, 411 insertions(+), 206 deletions(-) create mode 100644 metadata-service/restli-client-api/src/main/java/com/linkedin/entity/client/EntityClientConfig.java diff --git a/datahub-frontend/app/auth/AuthModule.java b/datahub-frontend/app/auth/AuthModule.java index d51795330f5ce..7fa99ab3cb262 100644 --- a/datahub-frontend/app/auth/AuthModule.java +++ b/datahub-frontend/app/auth/AuthModule.java @@ -13,6 +13,7 @@ import com.google.inject.Provides; import com.google.inject.Singleton; import com.google.inject.name.Named; +import com.linkedin.entity.client.EntityClientConfig; import com.linkedin.entity.client.SystemEntityClient; import com.linkedin.entity.client.SystemRestliEntityClient; import com.linkedin.metadata.models.registry.EmptyEntityRegistry; @@ -213,11 +214,13 @@ protected SystemEntityClient provideEntityClient( return new SystemRestliEntityClient( buildRestliClient(), - new ExponentialBackoff(configs.getInt(ENTITY_CLIENT_RETRY_INTERVAL)), - configs.getInt(ENTITY_CLIENT_NUM_RETRIES), - configurationProvider.getCache().getClient().getEntityClient(), - Math.max(1, configs.getInt(ENTITY_CLIENT_RESTLI_GET_BATCH_SIZE)), - Math.max(1, configs.getInt(ENTITY_CLIENT_RESTLI_GET_BATCH_CONCURRENCY))); + EntityClientConfig.builder() + .backoffPolicy(new ExponentialBackoff(configs.getInt(ENTITY_CLIENT_RETRY_INTERVAL))) + .retryCount(configs.getInt(ENTITY_CLIENT_NUM_RETRIES)) + .batchGetV2Size(configs.getInt(ENTITY_CLIENT_RESTLI_GET_BATCH_SIZE)) + .batchGetV2Concurrency(2) + .build(), + configurationProvider.getCache().getClient().getEntityClient()); } @Provides diff --git a/li-utils/src/main/java/com/linkedin/metadata/Constants.java b/li-utils/src/main/java/com/linkedin/metadata/Constants.java index b29741b732224..f1f096640bc21 100644 --- a/li-utils/src/main/java/com/linkedin/metadata/Constants.java +++ b/li-utils/src/main/java/com/linkedin/metadata/Constants.java @@ -464,5 +464,7 @@ public class Constants { public static final String MDC_ENTITY_TYPE = "entityType"; public static final String MDC_CHANGE_TYPE = "changeType"; + public static final String RESTLI_SUCCESS = "success"; + private Constants() {} } diff --git a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLMAESpringTest.java b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLMAESpringTest.java index d613c7d4af879..8ee3560a0cb74 100644 --- a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLMAESpringTest.java +++ b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLMAESpringTest.java @@ -4,6 +4,7 @@ import static org.testng.AssertJUnit.assertNotNull; import static org.testng.AssertJUnit.assertTrue; +import com.linkedin.data.schema.annotation.PathSpecBasedSchemaAnnotationVisitor; import com.linkedin.gms.factory.config.ConfigurationProvider; import com.linkedin.metadata.kafka.MCLKafkaListenerRegistrar; import com.linkedin.metadata.kafka.hook.UpdateIndicesHook; @@ -36,6 +37,12 @@ public class MCLMAESpringTest extends AbstractTestNGSpringContextTests { @Autowired private UpdateIndicesService updateIndicesService; + static { + PathSpecBasedSchemaAnnotationVisitor.class + .getClassLoader() + .setClassAssertionStatus(PathSpecBasedSchemaAnnotationVisitor.class.getName(), false); + } + @Test public void testHooks() { MCLKafkaListenerRegistrar registrar = diff --git a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLSpringCommonTestConfiguration.java b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLSpringCommonTestConfiguration.java index c5f08fa8dcc8b..cf9d73dfa729b 100644 --- a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLSpringCommonTestConfiguration.java +++ b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/spring/MCLSpringCommonTestConfiguration.java @@ -5,6 +5,7 @@ import com.datahub.authentication.Authentication; import com.datahub.metadata.ingestion.IngestionScheduler; +import com.linkedin.entity.client.EntityClientConfig; import com.linkedin.entity.client.SystemEntityClient; import com.linkedin.gms.factory.plugins.SpringStandardPluginConfiguration; import com.linkedin.metadata.boot.kafka.DataHubUpgradeKafkaListener; @@ -58,6 +59,11 @@ public class MCLSpringCommonTestConfiguration { @MockBean public IngestionScheduler ingestionScheduler; + @Bean + public EntityClientConfig entityClientConfig() { + return EntityClientConfig.builder().build(); + } + @MockBean(name = "systemEntityClient") public SystemEntityClient systemEntityClient; diff --git a/metadata-jobs/mce-consumer-job/src/test/java/com/linkedin/metadata/kafka/MceConsumerApplicationTestConfiguration.java b/metadata-jobs/mce-consumer-job/src/test/java/com/linkedin/metadata/kafka/MceConsumerApplicationTestConfiguration.java index ba650c25a6117..f65e803a499b0 100644 --- a/metadata-jobs/mce-consumer-job/src/test/java/com/linkedin/metadata/kafka/MceConsumerApplicationTestConfiguration.java +++ b/metadata-jobs/mce-consumer-job/src/test/java/com/linkedin/metadata/kafka/MceConsumerApplicationTestConfiguration.java @@ -1,5 +1,6 @@ package com.linkedin.metadata.kafka; +import com.linkedin.entity.client.EntityClientConfig; import com.linkedin.entity.client.SystemEntityClient; import com.linkedin.entity.client.SystemRestliEntityClient; import com.linkedin.gms.factory.auth.SystemAuthenticationFactory; @@ -39,16 +40,25 @@ public class MceConsumerApplicationTestConfiguration { @Bean @Primary public SystemEntityClient systemEntityClient( - @Qualifier("configurationProvider") final ConfigurationProvider configurationProvider) { + @Qualifier("configurationProvider") final ConfigurationProvider configurationProvider, + final EntityClientConfig entityClientConfig) { String selfUri = restTemplate.getRootUri(); final Client restClient = DefaultRestliClientFactory.getRestLiClient(URI.create(selfUri), null); return new SystemRestliEntityClient( restClient, - new ExponentialBackoff(1), - 1, - configurationProvider.getCache().getClient().getEntityClient(), - 1, - 2); + entityClientConfig, + configurationProvider.getCache().getClient().getEntityClient()); + } + + @Bean + @Primary + public EntityClientConfig entityClientConfig() { + return EntityClientConfig.builder() + .backoffPolicy(new ExponentialBackoff(1)) + .retryCount(1) + .batchGetV2Size(1) + .batchGetV2Concurrency(2) + .build(); } @MockBean public Database ebeanServer; diff --git a/metadata-service/configuration/src/main/resources/application.yaml b/metadata-service/configuration/src/main/resources/application.yaml index 2a8971b445fbe..eb7bb2869584b 100644 --- a/metadata-service/configuration/src/main/resources/application.yaml +++ b/metadata-service/configuration/src/main/resources/application.yaml @@ -457,10 +457,19 @@ entityClient: java: get: batchSize: ${ENTITY_CLIENT_JAVA_GET_BATCH_SIZE:375} # matches EbeanAspectDao batch size + ingest: + batchSize: ${ENTITY_CLIENT_JAVA_INGEST_BATCH_SIZE:375} restli: get: batchSize: ${ENTITY_CLIENT_RESTLI_GET_BATCH_SIZE:100} # limited to prevent exceeding restli URI size limit batchConcurrency: ${ENTITY_CLIENT_RESTLI_GET_BATCH_CONCURRENCY:2} # parallel threads + batchQueueSize: ${ENTITY_CLIENT_RESTLI_GET_BATCH_QUEUE_SIZE:500} + batchThreadKeepAlive: ${ENTITY_CLIENT_RESTLI_GET_BATCH_THREAD_KEEP_ALIVE:60} + ingest: + batchSize: ${ENTITY_CLIENT_RESTLI_INGEST_BATCH_SIZE:50} # limited to prevent exceeding restli timeouts + batchConcurrency: ${ENTITY_CLIENT_RESTLI_INGEST_BATCH_CONCURRENCY:2} # parallel threads + batchQueueSize: ${ENTITY_CLIENT_RESTLI_INGEST_BATCH_QUEUE_SIZE:500} + batchThreadKeepAlive: ${ENTITY_CLIENT_RESTLI_INGEST_BATCH_THREAD_KEEP_ALIVE:60} usageClient: retryInterval: ${USAGE_CLIENT_RETRY_INTERVAL:2} diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityclient/EntityClientConfigFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityclient/EntityClientConfigFactory.java index c6ed9c625d6b5..4b87914012f40 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityclient/EntityClientConfigFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityclient/EntityClientConfigFactory.java @@ -1,8 +1,11 @@ package com.linkedin.gms.factory.entityclient; +import com.linkedin.entity.client.EntityClientConfig; import com.linkedin.gms.factory.config.ConfigurationProvider; import com.linkedin.metadata.config.cache.client.EntityClientCacheConfig; +import com.linkedin.parseq.retry.backoff.ExponentialBackoff; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -14,4 +17,30 @@ public EntityClientCacheConfig entityClientCacheConfig( @Qualifier("configurationProvider") final ConfigurationProvider configurationProvider) { return configurationProvider.getCache().getClient().getEntityClient(); } + + @Bean + public EntityClientConfig entityClientConfig( + final @Value("${entityClient.retryInterval:2}") int retryInterval, + final @Value("${entityClient.numRetries:3}") int numRetries, + final @Value("${entityClient.restli.get.batchSize}") int batchGetV2Size, + final @Value("${entityClient.restli.get.batchConcurrency}") int batchGetV2Concurrency, + final @Value("${entityClient.restli.get.batchQueueSize}") int batchGetV2QueueSize, + final @Value("${entityClient.restli.get.batchThreadKeepAlive}") int batchGetV2KeepAlive, + final @Value("${entityClient.restli.ingest.batchSize}") int batchIngestSize, + final @Value("${entityClient.restli.ingest.batchConcurrency}") int batchIngestConcurrency, + final @Value("${entityClient.restli.ingest.batchQueueSize}") int batchIngestQueueSize, + final @Value("${entityClient.restli.ingest.batchThreadKeepAlive}") int batchIngestKeepAlive) { + return EntityClientConfig.builder() + .backoffPolicy(new ExponentialBackoff(retryInterval)) + .retryCount(numRetries) + .batchGetV2Size(batchGetV2Size) + .batchGetV2Concurrency(batchGetV2Concurrency) + .batchGetV2QueueSize(batchGetV2QueueSize) + .batchGetV2KeepAlive(batchGetV2KeepAlive) + .batchIngestSize(batchIngestSize) + .batchIngestConcurrency(batchIngestConcurrency) + .batchIngestQueueSize(batchIngestQueueSize) + .batchIngestKeepAlive(batchIngestKeepAlive) + .build(); + } } diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityclient/JavaEntityClientFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityclient/JavaEntityClientFactory.java index fc35e6d045d0c..e99978a26d6cf 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityclient/JavaEntityClientFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityclient/JavaEntityClientFactory.java @@ -1,6 +1,7 @@ package com.linkedin.gms.factory.entityclient; import com.linkedin.entity.client.EntityClient; +import com.linkedin.entity.client.EntityClientConfig; import com.linkedin.entity.client.SystemEntityClient; import com.linkedin.metadata.client.JavaEntityClient; import com.linkedin.metadata.client.SystemJavaEntityClient; @@ -16,7 +17,6 @@ import com.linkedin.metadata.timeseries.TimeseriesAspectService; import javax.inject.Singleton; import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -39,7 +39,7 @@ public EntityClient entityClient( final @Qualifier("relationshipSearchService") LineageSearchService _lineageSearchService, final @Qualifier("kafkaEventProducer") EventProducer _eventProducer, final RollbackService rollbackService, - final @Value("${entityClient.restli.get.batchSize:375}") int batchGetV2Size) { + final EntityClientConfig entityClientConfig) { return new JavaEntityClient( _entityService, _deleteEntityService, @@ -50,7 +50,7 @@ public EntityClient entityClient( _timeseriesAspectService, rollbackService, _eventProducer, - batchGetV2Size); + entityClientConfig.getBatchGetV2Size()); } @Bean("systemEntityClient") @@ -67,7 +67,7 @@ public SystemEntityClient systemEntityClient( final @Qualifier("kafkaEventProducer") EventProducer _eventProducer, final RollbackService rollbackService, final EntityClientCacheConfig entityClientCacheConfig, - final @Value("${entityClient.restli.get.batchSize:375}") int batchGetV2Size) { + final EntityClientConfig entityClientConfig) { return new SystemJavaEntityClient( _entityService, _deleteEntityService, @@ -79,6 +79,6 @@ public SystemEntityClient systemEntityClient( rollbackService, _eventProducer, entityClientCacheConfig, - batchGetV2Size); + entityClientConfig.getBatchGetV2Size()); } } diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityclient/RestliEntityClientFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityclient/RestliEntityClientFactory.java index 9e7255bf43a34..727dc9f6abc5b 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityclient/RestliEntityClientFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityclient/RestliEntityClientFactory.java @@ -1,12 +1,12 @@ package com.linkedin.gms.factory.entityclient; import com.linkedin.entity.client.EntityClient; +import com.linkedin.entity.client.EntityClientConfig; import com.linkedin.entity.client.RestliEntityClient; import com.linkedin.entity.client.SystemEntityClient; import com.linkedin.entity.client.SystemRestliEntityClient; import com.linkedin.metadata.config.cache.client.EntityClientCacheConfig; import com.linkedin.metadata.restli.DefaultRestliClientFactory; -import com.linkedin.parseq.retry.backoff.ExponentialBackoff; import com.linkedin.restli.client.Client; import java.net.URI; import javax.inject.Singleton; @@ -28,10 +28,7 @@ public EntityClient entityClient( @Value("${datahub.gms.useSSL}") boolean gmsUseSSL, @Value("${datahub.gms.uri}") String gmsUri, @Value("${datahub.gms.sslContext.protocol}") String gmsSslProtocol, - @Value("${entityClient.retryInterval:2}") int retryInterval, - @Value("${entityClient.numRetries:3}") int numRetries, - final @Value("${entityClient.restli.get.batchSize}") int batchGetV2Size, - final @Value("${entityClient.restli.get.batchConcurrency}") int batchGetV2Concurrency) { + final EntityClientConfig entityClientConfig) { final Client restClient; if (gmsUri != null) { restClient = DefaultRestliClientFactory.getRestLiClient(URI.create(gmsUri), gmsSslProtocol); @@ -39,12 +36,7 @@ public EntityClient entityClient( restClient = DefaultRestliClientFactory.getRestLiClient(gmsHost, gmsPort, gmsUseSSL, gmsSslProtocol); } - return new RestliEntityClient( - restClient, - new ExponentialBackoff(retryInterval), - numRetries, - batchGetV2Size, - batchGetV2Concurrency); + return new RestliEntityClient(restClient, entityClientConfig); } @Bean("systemEntityClient") @@ -55,11 +47,8 @@ public SystemEntityClient systemEntityClient( @Value("${datahub.gms.useSSL}") boolean gmsUseSSL, @Value("${datahub.gms.uri}") String gmsUri, @Value("${datahub.gms.sslContext.protocol}") String gmsSslProtocol, - @Value("${entityClient.retryInterval:2}") int retryInterval, - @Value("${entityClient.numRetries:3}") int numRetries, final EntityClientCacheConfig entityClientCacheConfig, - final @Value("${entityClient.restli.get.batchSize}") int batchGetV2Size, - final @Value("${entityClient.restli.get.batchConcurrency}") int batchGetV2Concurrency) { + final EntityClientConfig entityClientConfig) { final Client restClient; if (gmsUri != null) { @@ -68,12 +57,6 @@ public SystemEntityClient systemEntityClient( restClient = DefaultRestliClientFactory.getRestLiClient(gmsHost, gmsPort, gmsUseSSL, gmsSslProtocol); } - return new SystemRestliEntityClient( - restClient, - new ExponentialBackoff(retryInterval), - numRetries, - entityClientCacheConfig, - batchGetV2Size, - batchGetV2Concurrency); + return new SystemRestliEntityClient(restClient, entityClientConfig, entityClientCacheConfig); } } diff --git a/metadata-service/restli-client-api/src/main/java/com/linkedin/common/client/BaseClient.java b/metadata-service/restli-client-api/src/main/java/com/linkedin/common/client/BaseClient.java index 4474fd5ce67ec..6cc4cf294d270 100644 --- a/metadata-service/restli-client-api/src/main/java/com/linkedin/common/client/BaseClient.java +++ b/metadata-service/restli-client-api/src/main/java/com/linkedin/common/client/BaseClient.java @@ -2,8 +2,8 @@ import com.datahub.authentication.Authentication; import com.linkedin.common.callback.FutureCallback; +import com.linkedin.entity.client.EntityClientConfig; import com.linkedin.metadata.utils.metrics.MetricUtils; -import com.linkedin.parseq.retry.backoff.BackoffPolicy; import com.linkedin.r2.RemoteInvocationException; import com.linkedin.restli.client.AbstractRequestBuilder; import com.linkedin.restli.client.Client; @@ -19,17 +19,15 @@ @Slf4j public abstract class BaseClient implements AutoCloseable { - protected final Client _client; - protected final BackoffPolicy _backoffPolicy; - protected final int _retryCount; + protected final Client client; + protected final EntityClientConfig entityClientConfig; protected static final Set NON_RETRYABLE = Set.of("com.linkedin.data.template.RequiredFieldNotPresentException"); - protected BaseClient(@Nonnull Client restliClient, BackoffPolicy backoffPolicy, int retryCount) { - _client = Objects.requireNonNull(restliClient); - _backoffPolicy = backoffPolicy; - _retryCount = retryCount; + protected BaseClient(@Nonnull Client restliClient, EntityClientConfig entityClientConfig) { + client = Objects.requireNonNull(restliClient); + this.entityClientConfig = entityClientConfig; } protected Response sendClientRequest( @@ -52,9 +50,9 @@ protected Response sendClientRequest( int attemptCount = 0; - while (attemptCount < _retryCount + 1) { + while (attemptCount < entityClientConfig.getRetryCount() + 1) { try { - return _client.sendRequest(requestBuilder.build()).getResponse(); + return client.sendRequest(requestBuilder.build()).getResponse(); } catch (Throwable ex) { MetricUtils.counter( BaseClient.class, @@ -66,12 +64,13 @@ protected Response sendClientRequest( || (ex.getCause() != null && NON_RETRYABLE.contains(ex.getCause().getClass().getCanonicalName())); - if (attemptCount == _retryCount || skipRetry) { + if (attemptCount == entityClientConfig.getRetryCount() || skipRetry) { throw ex; } else { attemptCount = attemptCount + 1; try { - Thread.sleep(_backoffPolicy.nextBackoff(attemptCount, ex) * 1000); + Thread.sleep( + entityClientConfig.getBackoffPolicy().nextBackoff(attemptCount, ex) * 1000); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -84,6 +83,6 @@ protected Response sendClientRequest( @Override public void close() { - _client.shutdown(new FutureCallback<>()); + client.shutdown(new FutureCallback<>()); } } diff --git a/metadata-service/restli-client-api/src/main/java/com/linkedin/entity/client/EntityClientConfig.java b/metadata-service/restli-client-api/src/main/java/com/linkedin/entity/client/EntityClientConfig.java new file mode 100644 index 0000000000000..dab368f506d48 --- /dev/null +++ b/metadata-service/restli-client-api/src/main/java/com/linkedin/entity/client/EntityClientConfig.java @@ -0,0 +1,46 @@ +package com.linkedin.entity.client; + +import com.linkedin.parseq.retry.backoff.BackoffPolicy; +import com.linkedin.parseq.retry.backoff.ExponentialBackoff; +import javax.annotation.Nonnull; +import lombok.Builder; +import lombok.Value; + +@Builder(toBuilder = true) +@Value +public class EntityClientConfig { + @Nonnull @Builder.Default BackoffPolicy backoffPolicy = new ExponentialBackoff(2); + @Builder.Default int retryCount = 3; + @Builder.Default int batchGetV2Size = 5; + @Builder.Default int batchGetV2Concurrency = 1; + @Builder.Default int batchGetV2QueueSize = 100; + @Builder.Default int batchGetV2KeepAlive = 60; + @Builder.Default int batchIngestSize = 5; + @Builder.Default int batchIngestConcurrency = 1; + @Builder.Default int batchIngestQueueSize = 100; + @Builder.Default int batchIngestKeepAlive = 60; + + public int getBatchGetV2Size() { + return Math.max(1, batchGetV2Size); + } + + public int getBatchGetV2Concurrency() { + return Math.max(1, batchGetV2Concurrency); + } + + public int getBatchIngestSize() { + return Math.max(1, batchIngestSize); + } + + public int getBatchIngestConcurrency() { + return Math.max(1, batchIngestConcurrency); + } + + public int getBatchIngestQueueSize() { + return Math.max(1, batchIngestQueueSize); + } + + public int getBatchGetV2QueueSize() { + return Math.max(1, batchGetV2QueueSize); + } +} diff --git a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java index 824460b8a1a50..e78df209bd9e3 100644 --- a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java +++ b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java @@ -1,5 +1,6 @@ package com.linkedin.entity.client; +import static com.linkedin.metadata.Constants.RESTLI_SUCCESS; import static com.linkedin.metadata.utils.CriterionUtils.buildCriterion; import com.datahub.plugins.auth.authorization.Authorizer; @@ -74,7 +75,6 @@ import com.linkedin.mxe.MetadataChangeProposalArray; import com.linkedin.mxe.PlatformEvent; import com.linkedin.mxe.SystemMetadata; -import com.linkedin.parseq.retry.backoff.BackoffPolicy; import com.linkedin.platform.PlatformDoProducePlatformEventRequestBuilder; import com.linkedin.platform.PlatformRequestBuilders; import com.linkedin.r2.RemoteInvocationException; @@ -83,17 +83,21 @@ import com.linkedin.restli.common.HttpStatus; import io.datahubproject.metadata.context.OperationContext; import java.net.URISyntaxException; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -120,18 +124,32 @@ public class RestliEntityClient extends BaseClient implements EntityClient { new PlatformRequestBuilders(); private static final RunsRequestBuilders RUNS_REQUEST_BUILDERS = new RunsRequestBuilders(); - private final int batchGetV2Size; - private final int batchGetV2Concurrency; + private final ExecutorService batchGetV2Pool; + private final ExecutorService batchIngestPool; public RestliEntityClient( - @Nonnull final Client restliClient, - @Nonnull final BackoffPolicy backoffPolicy, - int retryCount, - int batchGetV2Size, - int batchGetV2Concurrency) { - super(restliClient, backoffPolicy, retryCount); - this.batchGetV2Size = Math.max(1, batchGetV2Size); - this.batchGetV2Concurrency = batchGetV2Concurrency; + @Nonnull final Client restliClient, EntityClientConfig entityClientConfig) { + super(restliClient, entityClientConfig); + this.batchGetV2Pool = + new ThreadPoolExecutor( + entityClientConfig.getBatchGetV2Concurrency(), // core threads + entityClientConfig.getBatchGetV2Concurrency(), // max threads + entityClientConfig.getBatchGetV2KeepAlive(), + TimeUnit.SECONDS, // thread keep-alive time + new ArrayBlockingQueue<>( + entityClientConfig.getBatchGetV2QueueSize()), // fixed size queue + new ThreadPoolExecutor.AbortPolicy() // optional - this is the default + ); + this.batchIngestPool = + new ThreadPoolExecutor( + entityClientConfig.getBatchIngestConcurrency(), // core threads + entityClientConfig.getBatchIngestConcurrency(), // max threads + entityClientConfig.getBatchIngestKeepAlive(), + TimeUnit.SECONDS, // thread keep-alive time + new ArrayBlockingQueue<>( + entityClientConfig.getBatchIngestQueueSize()), // fixed size queue + new ThreadPoolExecutor.AbortPolicy() // optional - this is the default + ); } @Override @@ -229,54 +247,50 @@ public Map batchGetV2( throws RemoteInvocationException, URISyntaxException { Map responseMap = new HashMap<>(); - ExecutorService executor = Executors.newFixedThreadPool(Math.max(1, batchGetV2Concurrency)); - try { - Iterable> iterable = () -> Iterators.partition(urns.iterator(), batchGetV2Size); - List>> futures = - StreamSupport.stream(iterable.spliterator(), false) - .map( - batch -> - executor.submit( - () -> { - try { - log.debug("Executing batchGetV2 with batch size: {}", batch.size()); - final EntitiesV2BatchGetRequestBuilder requestBuilder = - ENTITIES_V2_REQUEST_BUILDERS - .batchGet() - .aspectsParam(aspectNames) - .ids( - batch.stream() - .map(Urn::toString) - .collect(Collectors.toList())); - - return sendClientRequest( - requestBuilder, opContext.getSessionAuthentication()) - .getEntity() - .getResults() - .entrySet() - .stream() - .collect( - Collectors.toMap( - entry -> UrnUtils.getUrn(entry.getKey()), - entry -> entry.getValue().getEntity())); - } catch (RemoteInvocationException e) { - throw new RuntimeException(e); - } - })) - .collect(Collectors.toList()); - - futures.forEach( - result -> { - try { - responseMap.putAll(result.get()); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } - }); - } finally { - executor.shutdown(); - } + Iterable> iterable = + () -> Iterators.partition(urns.iterator(), entityClientConfig.getBatchGetV2Size()); + List>> futures = + StreamSupport.stream(iterable.spliterator(), false) + .map( + batch -> + batchGetV2Pool.submit( + () -> { + try { + log.debug("Executing batchGetV2 with batch size: {}", batch.size()); + final EntitiesV2BatchGetRequestBuilder requestBuilder = + ENTITIES_V2_REQUEST_BUILDERS + .batchGet() + .aspectsParam(aspectNames) + .ids( + batch.stream() + .map(Urn::toString) + .collect(Collectors.toList())); + + return sendClientRequest( + requestBuilder, opContext.getSessionAuthentication()) + .getEntity() + .getResults() + .entrySet() + .stream() + .collect( + Collectors.toMap( + entry -> UrnUtils.getUrn(entry.getKey()), + entry -> entry.getValue().getEntity())); + } catch (RemoteInvocationException e) { + throw new RuntimeException(e); + } + })) + .collect(Collectors.toList()); + + futures.forEach( + result -> { + try { + responseMap.putAll(result.get()); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); return responseMap; } @@ -298,62 +312,56 @@ public Map batchGetVersionedV2( @Nullable final Set aspectNames) { Map responseMap = new HashMap<>(); - ExecutorService executor = Executors.newFixedThreadPool(Math.max(1, batchGetV2Concurrency)); - try { - Iterable> iterable = - () -> Iterators.partition(versionedUrns.iterator(), batchGetV2Size); - List>> futures = - StreamSupport.stream(iterable.spliterator(), false) - .map( - batch -> - executor.submit( - () -> { - try { - log.debug( - "Executing batchGetVersionedV2 with batch size: {}", - batch.size()); - final EntitiesVersionedV2BatchGetRequestBuilder requestBuilder = - ENTITIES_VERSIONED_V2_REQUEST_BUILDERS - .batchGet() - .aspectsParam(aspectNames) - .entityTypeParam(entityName) - .ids( - batch.stream() - .map( - versionedUrn -> - com.linkedin.common.urn.VersionedUrn.of( - versionedUrn.getUrn().toString(), - versionedUrn.getVersionStamp())) - .collect(Collectors.toSet())); - - return sendClientRequest( - requestBuilder, opContext.getSessionAuthentication()) - .getEntity() - .getResults() - .entrySet() - .stream() - .collect( - Collectors.toMap( - entry -> UrnUtils.getUrn(entry.getKey().getUrn()), - entry -> entry.getValue().getEntity())); - } catch (RemoteInvocationException e) { - throw new RuntimeException(e); - } - })) - .collect(Collectors.toList()); - - futures.forEach( - result -> { - try { - responseMap.putAll(result.get()); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } - }); - } finally { - executor.shutdown(); - } + Iterable> iterable = + () -> Iterators.partition(versionedUrns.iterator(), entityClientConfig.getBatchGetV2Size()); + List>> futures = + StreamSupport.stream(iterable.spliterator(), false) + .map( + batch -> + batchGetV2Pool.submit( + () -> { + try { + log.debug( + "Executing batchGetVersionedV2 with batch size: {}", batch.size()); + final EntitiesVersionedV2BatchGetRequestBuilder requestBuilder = + ENTITIES_VERSIONED_V2_REQUEST_BUILDERS + .batchGet() + .aspectsParam(aspectNames) + .entityTypeParam(entityName) + .ids( + batch.stream() + .map( + versionedUrn -> + com.linkedin.common.urn.VersionedUrn.of( + versionedUrn.getUrn().toString(), + versionedUrn.getVersionStamp())) + .collect(Collectors.toSet())); + + return sendClientRequest( + requestBuilder, opContext.getSessionAuthentication()) + .getEntity() + .getResults() + .entrySet() + .stream() + .collect( + Collectors.toMap( + entry -> UrnUtils.getUrn(entry.getKey().getUrn()), + entry -> entry.getValue().getEntity())); + } catch (RemoteInvocationException e) { + throw new RuntimeException(e); + } + })) + .collect(Collectors.toList()); + + futures.forEach( + result -> { + try { + responseMap.putAll(result.get()); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); return responseMap; } @@ -1075,29 +1083,68 @@ public List batchIngestProposals( @Nonnull Collection metadataChangeProposals, boolean async) throws RemoteInvocationException { - final AspectsDoIngestProposalBatchRequestBuilder requestBuilder = - ASPECTS_REQUEST_BUILDERS - .actionIngestProposalBatch() - .proposalsParam(new MetadataChangeProposalArray(metadataChangeProposals)) - .asyncParam(String.valueOf(async)); - String result = - sendClientRequest(requestBuilder, opContext.getSessionAuthentication()).getEntity(); - return metadataChangeProposals.stream() - .map( - proposal -> { - if ("success".equals(result)) { - if (proposal.getEntityUrn() != null) { - return proposal.getEntityUrn().toString(); - } else { - EntitySpec entitySpec = - opContext.getEntityRegistry().getEntitySpec(proposal.getEntityType()); - return EntityKeyUtils.getUrnFromProposal(proposal, entitySpec.getKeyAspectSpec()) - .toString(); - } - } - return null; - }) - .collect(Collectors.toList()); + + List response = new ArrayList<>(); + + Iterable> iterable = + () -> + Iterators.partition( + metadataChangeProposals.iterator(), entityClientConfig.getBatchIngestSize()); + List>> futures = + StreamSupport.stream(iterable.spliterator(), false) + .map( + batch -> + batchIngestPool.submit( + () -> { + try { + log.debug( + "Executing batchIngestProposals with batch size: {}", batch.size()); + final AspectsDoIngestProposalBatchRequestBuilder requestBuilder = + ASPECTS_REQUEST_BUILDERS + .actionIngestProposalBatch() + .proposalsParam( + new MetadataChangeProposalArray(metadataChangeProposals)) + .asyncParam(String.valueOf(async)); + String result = + sendClientRequest( + requestBuilder, opContext.getSessionAuthentication()) + .getEntity(); + + if (RESTLI_SUCCESS.equals(result)) { + return batch.stream() + .map( + mcp -> { + if (mcp.getEntityUrn() != null) { + return mcp.getEntityUrn().toString(); + } else { + EntitySpec entitySpec = + opContext + .getEntityRegistry() + .getEntitySpec(mcp.getEntityType()); + return EntityKeyUtils.getUrnFromProposal( + mcp, entitySpec.getKeyAspectSpec()) + .toString(); + } + }) + .collect(Collectors.toList()); + } + return Collections.emptyList(); + } catch (RemoteInvocationException e) { + throw new RuntimeException(e); + } + })) + .collect(Collectors.toList()); + + futures.forEach( + result -> { + try { + response.addAll(result.get()); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + + return response; } @Override diff --git a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/SystemRestliEntityClient.java b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/SystemRestliEntityClient.java index 7546d1f0a3b54..2637e2d067c6d 100644 --- a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/SystemRestliEntityClient.java +++ b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/SystemRestliEntityClient.java @@ -5,7 +5,6 @@ import com.linkedin.common.urn.Urn; import com.linkedin.entity.EntityResponse; import com.linkedin.metadata.config.cache.client.EntityClientCacheConfig; -import com.linkedin.parseq.retry.backoff.BackoffPolicy; import com.linkedin.r2.RemoteInvocationException; import com.linkedin.restli.client.Client; import io.datahubproject.metadata.context.OperationContext; @@ -24,12 +23,9 @@ public class SystemRestliEntityClient extends RestliEntityClient implements Syst public SystemRestliEntityClient( @Nonnull final Client restliClient, - @Nonnull final BackoffPolicy backoffPolicy, - int retryCount, - EntityClientCacheConfig cacheConfig, - int batchGetV2Size, - int batchGetV2Concurrency) { - super(restliClient, backoffPolicy, retryCount, batchGetV2Size, batchGetV2Concurrency); + @Nonnull EntityClientConfig clientConfig, + EntityClientCacheConfig cacheConfig) { + super(restliClient, clientConfig); this.operationContextMap = CacheBuilder.newBuilder().maximumSize(500).build(); this.entityClientCache = buildEntityClientCache(SystemRestliEntityClient.class, cacheConfig); } diff --git a/metadata-service/restli-client/src/main/java/com/linkedin/usage/RestliUsageClient.java b/metadata-service/restli-client/src/main/java/com/linkedin/usage/RestliUsageClient.java index e55d9b29f1d2f..e50c7badb5b9f 100644 --- a/metadata-service/restli-client/src/main/java/com/linkedin/usage/RestliUsageClient.java +++ b/metadata-service/restli-client/src/main/java/com/linkedin/usage/RestliUsageClient.java @@ -5,6 +5,7 @@ import com.linkedin.common.EntityRelationships; import com.linkedin.common.WindowDuration; import com.linkedin.common.client.BaseClient; +import com.linkedin.entity.client.EntityClientConfig; import com.linkedin.metadata.config.cache.client.UsageClientCacheConfig; import com.linkedin.parseq.retry.backoff.BackoffPolicy; import com.linkedin.r2.RemoteInvocationException; @@ -26,7 +27,9 @@ public RestliUsageClient( @Nonnull final BackoffPolicy backoffPolicy, int retryCount, UsageClientCacheConfig cacheConfig) { - super(restliClient, backoffPolicy, retryCount); + super( + restliClient, + EntityClientConfig.builder().backoffPolicy(backoffPolicy).retryCount(retryCount).build()); this.operationContextMap = Caffeine.newBuilder().maximumSize(500).build(); this.usageClientCache = UsageClientCache.builder() diff --git a/metadata-service/restli-client/src/test/java/com/linkedin/common/client/BaseClientTest.java b/metadata-service/restli-client/src/test/java/com/linkedin/common/client/BaseClientTest.java index 797ead10c1a66..285b57972ece4 100644 --- a/metadata-service/restli-client/src/test/java/com/linkedin/common/client/BaseClientTest.java +++ b/metadata-service/restli-client/src/test/java/com/linkedin/common/client/BaseClientTest.java @@ -13,6 +13,7 @@ import com.linkedin.data.template.RequiredFieldNotPresentException; import com.linkedin.entity.AspectsDoIngestProposalRequestBuilder; import com.linkedin.entity.AspectsRequestBuilders; +import com.linkedin.entity.client.EntityClientConfig; import com.linkedin.entity.client.RestliEntityClient; import com.linkedin.mxe.MetadataChangeProposal; import com.linkedin.parseq.retry.backoff.ExponentialBackoff; @@ -37,7 +38,14 @@ public void testZeroRetry() throws RemoteInvocationException { when(mockRestliClient.sendRequest(any(ActionRequest.class))).thenReturn(mockFuture); RestliEntityClient testClient = - new RestliEntityClient(mockRestliClient, new ExponentialBackoff(1), 0, 10, 2); + new RestliEntityClient( + mockRestliClient, + EntityClientConfig.builder() + .backoffPolicy(new ExponentialBackoff(1)) + .retryCount(0) + .batchGetV2Size(10) + .batchGetV2Concurrency(2) + .build()); testClient.sendClientRequest(testRequestBuilder, AUTH); // Expected 1 actual try and 0 retries verify(mockRestliClient).sendRequest(any(ActionRequest.class)); @@ -56,7 +64,14 @@ public void testMultipleRetries() throws RemoteInvocationException { .thenReturn(mockFuture); RestliEntityClient testClient = - new RestliEntityClient(mockRestliClient, new ExponentialBackoff(1), 1, 10, 2); + new RestliEntityClient( + mockRestliClient, + EntityClientConfig.builder() + .backoffPolicy(new ExponentialBackoff(1)) + .retryCount(1) + .batchGetV2Size(10) + .batchGetV2Concurrency(2) + .build()); testClient.sendClientRequest(testRequestBuilder, AUTH); // Expected 1 actual try and 1 retries verify(mockRestliClient, times(2)).sendRequest(any(ActionRequest.class)); @@ -73,7 +88,14 @@ public void testNonRetry() { .thenThrow(new RuntimeException(new RequiredFieldNotPresentException("value"))); RestliEntityClient testClient = - new RestliEntityClient(mockRestliClient, new ExponentialBackoff(1), 1, 10, 2); + new RestliEntityClient( + mockRestliClient, + EntityClientConfig.builder() + .backoffPolicy(new ExponentialBackoff(1)) + .retryCount(1) + .batchGetV2Size(10) + .batchGetV2Concurrency(2) + .build()); assertThrows( RuntimeException.class, () -> testClient.sendClientRequest(testRequestBuilder, AUTH)); } diff --git a/metadata-service/restli-client/src/test/java/com/linkedin/entity/client/SystemRestliEntityClientTest.java b/metadata-service/restli-client/src/test/java/com/linkedin/entity/client/SystemRestliEntityClientTest.java index 817b76f74268c..0449b3dfa523b 100644 --- a/metadata-service/restli-client/src/test/java/com/linkedin/entity/client/SystemRestliEntityClientTest.java +++ b/metadata-service/restli-client/src/test/java/com/linkedin/entity/client/SystemRestliEntityClientTest.java @@ -47,7 +47,14 @@ public void testCache() throws RemoteInvocationException, URISyntaxException { SystemRestliEntityClient noCacheTest = new SystemRestliEntityClient( - mockRestliClient, new ConstantBackoff(0), 0, noCacheConfig, 1, 2); + mockRestliClient, + EntityClientConfig.builder() + .backoffPolicy(new ConstantBackoff(0)) + .retryCount(0) + .batchGetV2Size(1) + .batchGetV2Concurrency(2) + .build(), + noCacheConfig); com.linkedin.entity.EntityResponse responseStatusTrue = buildStatusResponse(true); com.linkedin.entity.EntityResponse responseStatusFalse = buildStatusResponse(false); @@ -86,7 +93,14 @@ public void testCache() throws RemoteInvocationException, URISyntaxException { SystemRestliEntityClient cacheTest = new SystemRestliEntityClient( - mockRestliClient, new ConstantBackoff(0), 0, cacheConfig, 1, 2); + mockRestliClient, + EntityClientConfig.builder() + .backoffPolicy(new ConstantBackoff(0)) + .retryCount(0) + .batchGetV2Size(1) + .batchGetV2Concurrency(2) + .build(), + cacheConfig); mockResponse(mockRestliClient, responseStatusTrue); assertEquals( @@ -121,7 +135,14 @@ public void testBatchCache() throws RemoteInvocationException, URISyntaxExceptio SystemRestliEntityClient noCacheTest = new SystemRestliEntityClient( - mockRestliClient, new ConstantBackoff(0), 0, noCacheConfig, 1, 2); + mockRestliClient, + EntityClientConfig.builder() + .backoffPolicy(new ConstantBackoff(0)) + .retryCount(0) + .batchGetV2Size(1) + .batchGetV2Concurrency(2) + .build(), + noCacheConfig); com.linkedin.entity.EntityResponse responseStatusTrue = buildStatusResponse(true); com.linkedin.entity.EntityResponse responseStatusFalse = buildStatusResponse(false); @@ -160,7 +181,14 @@ public void testBatchCache() throws RemoteInvocationException, URISyntaxExceptio SystemRestliEntityClient cacheTest = new SystemRestliEntityClient( - mockRestliClient, new ConstantBackoff(0), 0, cacheConfig, 1, 2); + mockRestliClient, + EntityClientConfig.builder() + .backoffPolicy(new ConstantBackoff(0)) + .retryCount(0) + .batchGetV2Size(1) + .batchGetV2Concurrency(2) + .build(), + cacheConfig); mockResponse(mockRestliClient, responseStatusTrue); assertEquals( @@ -195,7 +223,14 @@ public void testCacheNullValue() throws RemoteInvocationException, URISyntaxExce SystemRestliEntityClient noCacheTest = new SystemRestliEntityClient( - mockRestliClient, new ConstantBackoff(0), 0, noCacheConfig, 1, 2); + mockRestliClient, + EntityClientConfig.builder() + .backoffPolicy(new ConstantBackoff(0)) + .retryCount(0) + .batchGetV2Size(1) + .batchGetV2Concurrency(2) + .build(), + noCacheConfig); com.linkedin.entity.EntityResponse responseStatusTrue = buildStatusResponse(true); com.linkedin.entity.EntityResponse responseStatusFalse = buildStatusResponse(false); @@ -238,7 +273,14 @@ public void testCacheNullValue() throws RemoteInvocationException, URISyntaxExce SystemRestliEntityClient cacheTest = new SystemRestliEntityClient( - mockRestliClient, new ConstantBackoff(0), 0, cacheConfig, 1, 2); + mockRestliClient, + EntityClientConfig.builder() + .backoffPolicy(new ConstantBackoff(0)) + .retryCount(0) + .batchGetV2Size(1) + .batchGetV2Concurrency(2) + .build(), + cacheConfig); mockResponse(mockRestliClient, responseStatusTrue); assertEquals( diff --git a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java index 63b607f8c9967..676232d5b5c04 100644 --- a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java +++ b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/AspectResource.java @@ -4,6 +4,7 @@ import static com.datahub.authorization.AuthUtil.isAPIAuthorizedEntityUrns; import static com.datahub.authorization.AuthUtil.isAPIAuthorizedUrns; import static com.datahub.authorization.AuthUtil.isAPIOperationsAuthorized; +import static com.linkedin.metadata.Constants.RESTLI_SUCCESS; import static com.linkedin.metadata.authorization.ApiGroup.COUNTS; import static com.linkedin.metadata.authorization.ApiGroup.ENTITY; import static com.linkedin.metadata.authorization.ApiGroup.TIMESERIES; @@ -330,7 +331,7 @@ private Task ingestProposals( } // TODO: We don't actually use this return value anywhere. Maybe we should just stop returning it altogether? - return "success"; + return RESTLI_SUCCESS; } catch (ValidationException e) { throw new RestLiServiceException(HttpStatus.S_422_UNPROCESSABLE_ENTITY, e.getMessage()); }