Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
jayacryl authored Nov 5, 2024
2 parents 1199cc4 + 5c58128 commit 6ef9fe9
Show file tree
Hide file tree
Showing 24 changed files with 478 additions and 221 deletions.
13 changes: 8 additions & 5 deletions datahub-frontend/app/auth/AuthModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
import com.linkedin.entity.client.EntityClientConfig;
import com.linkedin.entity.client.SystemEntityClient;
import com.linkedin.entity.client.SystemRestliEntityClient;
import com.linkedin.metadata.models.registry.EmptyEntityRegistry;
Expand Down Expand Up @@ -213,11 +214,13 @@ protected SystemEntityClient provideEntityClient(

return new SystemRestliEntityClient(
buildRestliClient(),
new ExponentialBackoff(configs.getInt(ENTITY_CLIENT_RETRY_INTERVAL)),
configs.getInt(ENTITY_CLIENT_NUM_RETRIES),
configurationProvider.getCache().getClient().getEntityClient(),
Math.max(1, configs.getInt(ENTITY_CLIENT_RESTLI_GET_BATCH_SIZE)),
Math.max(1, configs.getInt(ENTITY_CLIENT_RESTLI_GET_BATCH_CONCURRENCY)));
EntityClientConfig.builder()
.backoffPolicy(new ExponentialBackoff(configs.getInt(ENTITY_CLIENT_RETRY_INTERVAL)))
.retryCount(configs.getInt(ENTITY_CLIENT_NUM_RETRIES))
.batchGetV2Size(configs.getInt(ENTITY_CLIENT_RESTLI_GET_BATCH_SIZE))
.batchGetV2Concurrency(2)
.build(),
configurationProvider.getCache().getClient().getEntityClient());
}

@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.springframework.core.io.ClassPathResource;
Expand Down Expand Up @@ -109,13 +110,29 @@ static List<ObjectNode> resolveMCPTemplate(
AuditStamp auditStamp)
throws IOException {

String template = loadTemplate(mcpTemplate.getMcps_location());
Mustache mustache = MUSTACHE_FACTORY.compile(new StringReader(template), mcpTemplate.getName());
final String template = loadTemplate(mcpTemplate.getMcps_location());
Map<String, Object> scopeValues = resolveValues(opContext, mcpTemplate, auditStamp);

StringWriter writer = new StringWriter();
mustache.execute(writer, scopeValues);
try {
Mustache mustache =
MUSTACHE_FACTORY.compile(new StringReader(template), mcpTemplate.getName());
mustache.execute(writer, scopeValues);
} catch (Exception e) {
log.error(
"Failed to apply mustache template. Template: {} Values: {}",
template,
resolveEnv(mcpTemplate));
throw e;
}

return opContext.getYamlMapper().readValue(writer.toString(), new TypeReference<>() {});
final String yaml = writer.toString();
try {
return opContext.getYamlMapper().readValue(yaml, new TypeReference<>() {});
} catch (Exception e) {
log.error("Failed to parse rendered MCP bootstrap yaml: {}", yaml);
throw e;
}
}

static Map<String, Object> resolveValues(
Expand All @@ -128,13 +145,21 @@ static Map<String, Object> resolveValues(
// built-in
scopeValues.put("auditStamp", RecordUtils.toJsonString(auditStamp));

String envValue = resolveEnv(mcpTemplate);
if (envValue != null) {
scopeValues.putAll(opContext.getObjectMapper().readValue(envValue, new TypeReference<>() {}));
}
return scopeValues;
}

@Nullable
private static String resolveEnv(BootstrapMCPConfigFile.MCPTemplate mcpTemplate) {
if (mcpTemplate.getValues_env() != null
&& !mcpTemplate.getValues_env().isEmpty()
&& System.getenv().containsKey(mcpTemplate.getValues_env())) {
String envValue = System.getenv(mcpTemplate.getValues_env());
scopeValues.putAll(opContext.getObjectMapper().readValue(envValue, new TypeReference<>() {}));
return System.getenv(mcpTemplate.getValues_env());
}
return scopeValues;
return null;
}

private static String loadTemplate(String source) throws IOException {
Expand Down
2 changes: 2 additions & 0 deletions li-utils/src/main/java/com/linkedin/metadata/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -464,5 +464,7 @@ public class Constants {
public static final String MDC_ENTITY_TYPE = "entityType";
public static final String MDC_CHANGE_TYPE = "changeType";

public static final String RESTLI_SUCCESS = "success";

private Constants() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ private Optional<Emitter> getEmitter() {
(KafkaDatahubEmitterConfig) datahubConf.getDatahubEmitterConfig();
try {
emitter =
Optional.of(new KafkaEmitter(datahubKafkaEmitterConfig.getKafkaEmitterConfig()));
Optional.of(
new KafkaEmitter(
datahubKafkaEmitterConfig.getKafkaEmitterConfig(),
datahubKafkaEmitterConfig.getMcpTopic()));
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,12 @@ public Optional<DatahubEmitterConfig> initializeEmitter(Config sparkConf) {
});
kafkaEmitterConfig.producerConfig(kafkaConfig);
}

return Optional.of(new KafkaDatahubEmitterConfig(kafkaEmitterConfig.build()));
if (sparkConf.hasPath(SparkConfigParser.KAFKA_MCP_TOPIC)) {
String mcpTopic = sparkConf.getString(SparkConfigParser.KAFKA_MCP_TOPIC);
return Optional.of(new KafkaDatahubEmitterConfig(kafkaEmitterConfig.build(), mcpTopic));
} else {
return Optional.of(new KafkaDatahubEmitterConfig(kafkaEmitterConfig.build()));
}
case "file":
log.info("File Emitter Configuration: File emitter will be used");
FileEmitterConfig.FileEmitterConfigBuilder fileEmitterConfig = FileEmitterConfig.builder();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package datahub.spark.conf;

import datahub.client.kafka.KafkaEmitter;
import datahub.client.kafka.KafkaEmitterConfig;
import lombok.Getter;
import lombok.Setter;
Expand All @@ -11,8 +12,15 @@
public class KafkaDatahubEmitterConfig implements DatahubEmitterConfig {
final String type = "kafka";
KafkaEmitterConfig kafkaEmitterConfig;
String mcpTopic;

public KafkaDatahubEmitterConfig(KafkaEmitterConfig kafkaEmitterConfig) {
this.kafkaEmitterConfig = kafkaEmitterConfig;
this.mcpTopic = KafkaEmitter.DEFAULT_MCP_KAFKA_TOPIC;
}

public KafkaDatahubEmitterConfig(KafkaEmitterConfig kafkaEmitterConfig, String mcpTopic) {
this.kafkaEmitterConfig = kafkaEmitterConfig;
this.mcpTopic = mcpTopic;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class SparkConfigParser {
public static final String DISABLE_SSL_VERIFICATION_KEY = "rest.disable_ssl_verification";
public static final String MAX_RETRIES = "rest.max_retries";
public static final String RETRY_INTERVAL_IN_SEC = "rest.retry_interval_in_sec";
public static final String KAFKA_MCP_TOPIC = "kafka.mcp_topic";
public static final String KAFKA_EMITTER_BOOTSTRAP = "kafka.bootstrap";
public static final String KAFKA_EMITTER_SCHEMA_REGISTRY_URL = "kafka.schema_registry_url";
public static final String KAFKA_EMITTER_SCHEMA_REGISTRY_CONFIG = "kafka.schema_registry_config";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;

class AvroSerializer {
public class AvroSerializer {

private final Schema _recordSchema;
private final Schema _genericAspectSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,25 @@ public class KafkaEmitter implements Emitter {
private final Properties kafkaConfigProperties;
private AvroSerializer _avroSerializer;
private static final int ADMIN_CLIENT_TIMEOUT_MS = 5000;
private final String mcpKafkaTopic;

/**
* The default constructor
*
* @param config
* @throws IOException
* @param config KafkaEmitterConfig
* @throws IOException when Avro Serialization fails
*/
public KafkaEmitter(KafkaEmitterConfig config) throws IOException {
this(config, DEFAULT_MCP_KAFKA_TOPIC);
}

/**
* Constructor that takes in KafkaEmitterConfig and mcp Kafka Topic Name
*
* @param config KafkaEmitterConfig
* @throws IOException when Avro Serialization fails
*/
public KafkaEmitter(KafkaEmitterConfig config, String mcpKafkaTopic) throws IOException {
this.config = config;
kafkaConfigProperties = new Properties();
kafkaConfigProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.config.getBootstrap());
Expand All @@ -54,6 +65,7 @@ public KafkaEmitter(KafkaEmitterConfig config) throws IOException {

producer = new KafkaProducer<>(kafkaConfigProperties);
_avroSerializer = new AvroSerializer();
this.mcpKafkaTopic = mcpKafkaTopic;
}

@Override
Expand All @@ -73,8 +85,7 @@ public Future<MetadataWriteResponse> emit(MetadataChangeProposal mcp, Callback d
throws IOException {
GenericRecord genricRecord = _avroSerializer.serialize(mcp);
ProducerRecord<Object, Object> record =
new ProducerRecord<>(
KafkaEmitter.DEFAULT_MCP_KAFKA_TOPIC, mcp.getEntityUrn().toString(), genricRecord);
new ProducerRecord<>(this.mcpKafkaTopic, mcp.getEntityUrn().toString(), genricRecord);
org.apache.kafka.clients.producer.Callback callback =
new org.apache.kafka.clients.producer.Callback() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static org.testng.AssertJUnit.assertNotNull;
import static org.testng.AssertJUnit.assertTrue;

import com.linkedin.data.schema.annotation.PathSpecBasedSchemaAnnotationVisitor;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.metadata.kafka.MCLKafkaListenerRegistrar;
import com.linkedin.metadata.kafka.hook.UpdateIndicesHook;
Expand Down Expand Up @@ -36,6 +37,12 @@ public class MCLMAESpringTest extends AbstractTestNGSpringContextTests {

@Autowired private UpdateIndicesService updateIndicesService;

static {
PathSpecBasedSchemaAnnotationVisitor.class
.getClassLoader()
.setClassAssertionStatus(PathSpecBasedSchemaAnnotationVisitor.class.getName(), false);
}

@Test
public void testHooks() {
MCLKafkaListenerRegistrar registrar =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.datahub.authentication.Authentication;
import com.datahub.metadata.ingestion.IngestionScheduler;
import com.linkedin.entity.client.EntityClientConfig;
import com.linkedin.entity.client.SystemEntityClient;
import com.linkedin.gms.factory.plugins.SpringStandardPluginConfiguration;
import com.linkedin.metadata.boot.kafka.DataHubUpgradeKafkaListener;
Expand Down Expand Up @@ -58,6 +59,11 @@ public class MCLSpringCommonTestConfiguration {

@MockBean public IngestionScheduler ingestionScheduler;

@Bean
public EntityClientConfig entityClientConfig() {
return EntityClientConfig.builder().build();
}

@MockBean(name = "systemEntityClient")
public SystemEntityClient systemEntityClient;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.metadata.kafka;

import com.linkedin.entity.client.EntityClientConfig;
import com.linkedin.entity.client.SystemEntityClient;
import com.linkedin.entity.client.SystemRestliEntityClient;
import com.linkedin.gms.factory.auth.SystemAuthenticationFactory;
Expand Down Expand Up @@ -39,16 +40,25 @@ public class MceConsumerApplicationTestConfiguration {
@Bean
@Primary
public SystemEntityClient systemEntityClient(
@Qualifier("configurationProvider") final ConfigurationProvider configurationProvider) {
@Qualifier("configurationProvider") final ConfigurationProvider configurationProvider,
final EntityClientConfig entityClientConfig) {
String selfUri = restTemplate.getRootUri();
final Client restClient = DefaultRestliClientFactory.getRestLiClient(URI.create(selfUri), null);
return new SystemRestliEntityClient(
restClient,
new ExponentialBackoff(1),
1,
configurationProvider.getCache().getClient().getEntityClient(),
1,
2);
entityClientConfig,
configurationProvider.getCache().getClient().getEntityClient());
}

@Bean
@Primary
public EntityClientConfig entityClientConfig() {
return EntityClientConfig.builder()
.backoffPolicy(new ExponentialBackoff(1))
.retryCount(1)
.batchGetV2Size(1)
.batchGetV2Concurrency(2)
.build();
}

@MockBean public Database ebeanServer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,10 +457,19 @@ entityClient:
java:
get:
batchSize: ${ENTITY_CLIENT_JAVA_GET_BATCH_SIZE:375} # matches EbeanAspectDao batch size
ingest:
batchSize: ${ENTITY_CLIENT_JAVA_INGEST_BATCH_SIZE:375}
restli:
get:
batchSize: ${ENTITY_CLIENT_RESTLI_GET_BATCH_SIZE:100} # limited to prevent exceeding restli URI size limit
batchConcurrency: ${ENTITY_CLIENT_RESTLI_GET_BATCH_CONCURRENCY:2} # parallel threads
batchQueueSize: ${ENTITY_CLIENT_RESTLI_GET_BATCH_QUEUE_SIZE:500}
batchThreadKeepAlive: ${ENTITY_CLIENT_RESTLI_GET_BATCH_THREAD_KEEP_ALIVE:60}
ingest:
batchSize: ${ENTITY_CLIENT_RESTLI_INGEST_BATCH_SIZE:50} # limited to prevent exceeding restli timeouts
batchConcurrency: ${ENTITY_CLIENT_RESTLI_INGEST_BATCH_CONCURRENCY:2} # parallel threads
batchQueueSize: ${ENTITY_CLIENT_RESTLI_INGEST_BATCH_QUEUE_SIZE:500}
batchThreadKeepAlive: ${ENTITY_CLIENT_RESTLI_INGEST_BATCH_THREAD_KEEP_ALIVE:60}

usageClient:
retryInterval: ${USAGE_CLIENT_RETRY_INTERVAL:2}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package com.linkedin.gms.factory.entityclient;

import com.linkedin.entity.client.EntityClientConfig;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.metadata.config.cache.client.EntityClientCacheConfig;
import com.linkedin.parseq.retry.backoff.ExponentialBackoff;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

Expand All @@ -14,4 +17,30 @@ public EntityClientCacheConfig entityClientCacheConfig(
@Qualifier("configurationProvider") final ConfigurationProvider configurationProvider) {
return configurationProvider.getCache().getClient().getEntityClient();
}

@Bean
public EntityClientConfig entityClientConfig(
final @Value("${entityClient.retryInterval:2}") int retryInterval,
final @Value("${entityClient.numRetries:3}") int numRetries,
final @Value("${entityClient.restli.get.batchSize}") int batchGetV2Size,
final @Value("${entityClient.restli.get.batchConcurrency}") int batchGetV2Concurrency,
final @Value("${entityClient.restli.get.batchQueueSize}") int batchGetV2QueueSize,
final @Value("${entityClient.restli.get.batchThreadKeepAlive}") int batchGetV2KeepAlive,
final @Value("${entityClient.restli.ingest.batchSize}") int batchIngestSize,
final @Value("${entityClient.restli.ingest.batchConcurrency}") int batchIngestConcurrency,
final @Value("${entityClient.restli.ingest.batchQueueSize}") int batchIngestQueueSize,
final @Value("${entityClient.restli.ingest.batchThreadKeepAlive}") int batchIngestKeepAlive) {
return EntityClientConfig.builder()
.backoffPolicy(new ExponentialBackoff(retryInterval))
.retryCount(numRetries)
.batchGetV2Size(batchGetV2Size)
.batchGetV2Concurrency(batchGetV2Concurrency)
.batchGetV2QueueSize(batchGetV2QueueSize)
.batchGetV2KeepAlive(batchGetV2KeepAlive)
.batchIngestSize(batchIngestSize)
.batchIngestConcurrency(batchIngestConcurrency)
.batchIngestQueueSize(batchIngestQueueSize)
.batchIngestKeepAlive(batchIngestKeepAlive)
.build();
}
}
Loading

0 comments on commit 6ef9fe9

Please sign in to comment.