From acbdc9fddbad75e7cb2d8e148301239628a5ba48 Mon Sep 17 00:00:00 2001 From: Gianluca Finocchiaro Date: Wed, 30 Oct 2024 11:21:47 +0100 Subject: [PATCH] Revise some devault settings for the internal Kafka consumer (#25) --- .../adapters/config/ConnectorConfig.java | 4 +- .../adapters/config/ConnectorConfigTest.java | 54 ++++++++++--------- 2 files changed, 32 insertions(+), 26 deletions(-) diff --git a/kafka-connector-project/kafka-connector/src/main/java/com/lightstreamer/kafka/adapters/config/ConnectorConfig.java b/kafka-connector-project/kafka-connector/src/main/java/com/lightstreamer/kafka/adapters/config/ConnectorConfig.java index 5153e1f3..80c27525 100644 --- a/kafka-connector-project/kafka-connector/src/main/java/com/lightstreamer/kafka/adapters/config/ConnectorConfig.java +++ b/kafka-connector-project/kafka-connector/src/main/java/com/lightstreamer/kafka/adapters/config/ConnectorConfig.java @@ -279,7 +279,7 @@ public final class ConnectorConfig extends AbstractConfig { false, INT, false, - defaultValue("1000")) + defaultValue("30000")) .add(CONSUMER_RETRIES, false, false, INT, false, defaultValue("0")) .add( CONSUMER_DEFAULT_API_TIMEOUT_MS_CONFIG, @@ -287,7 +287,7 @@ public final class ConnectorConfig extends AbstractConfig { false, INT, false, - defaultValue("15000")) + defaultValue("60000")) .withEnabledChildConfigs(EncryptionConfigs.spec(), ENCYRPTION_ENABLE) .withEnabledChildConfigs( BrokerAuthenticationConfigs.spec(), AUTHENTICATION_ENABLE) diff --git a/kafka-connector-project/kafka-connector/src/test/java/com/lightstreamer/kafka/adapters/config/ConnectorConfigTest.java b/kafka-connector-project/kafka-connector/src/test/java/com/lightstreamer/kafka/adapters/config/ConnectorConfigTest.java index 484359fb..78d9a4ba 100644 --- a/kafka-connector-project/kafka-connector/src/test/java/com/lightstreamer/kafka/adapters/config/ConnectorConfigTest.java +++ b/kafka-connector-project/kafka-connector/src/test/java/com/lightstreamer/kafka/adapters/config/ConnectorConfigTest.java @@ -20,10 +20,31 @@ import static com.google.common.truth.Truth.assertThat; import static com.lightstreamer.kafka.adapters.config.specs.ConfigTypes.SslProtocol.TLSv12; import static com.lightstreamer.kafka.adapters.config.specs.ConfigTypes.SslProtocol.TLSv13; - import static org.junit.Assert.assertThrows; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.stream.Stream; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.junit.function.ThrowingRunnable; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + import com.lightstreamer.kafka.adapters.config.specs.ConfigTypes.EvaluatorType; import com.lightstreamer.kafka.adapters.config.specs.ConfigTypes.RecordComsumeFrom; import com.lightstreamer.kafka.adapters.config.specs.ConfigTypes.RecordErrorHandlingStrategy; @@ -40,28 +61,6 @@ import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig; -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.config.SaslConfigs; -import org.apache.kafka.common.config.SslConfigs; -import org.apache.kafka.common.security.auth.SecurityProtocol; -import org.junit.function.ThrowingRunnable; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.MethodSource; -import org.junit.jupiter.params.provider.ValueSource; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.stream.Stream; - public class ConnectorConfigTest { private Path adapterDir; @@ -312,6 +311,8 @@ private Map standardParameters() { standardParams.put(ConnectorConfig.CONSUMER_SESSION_TIMEOUT_MS, "800"); standardParams.put(ConnectorConfig.CONSUMER_MAX_POLL_INTERVAL_MS, "2000"); // Unmodifiable standardParams.put(ConnectorConfig.CONSUMER_METADATA_MAX_AGE_CONFIG, "250"); // Unmodifiable + standardParams.put(ConnectorConfig.CONSUMER_DEFAULT_API_TIMEOUT_MS_CONFIG, "1000"); // Unmodifiable + standardParams.put(ConnectorConfig.CONSUMER_REQUEST_TIMEOUT_MS_CONFIG, "15000"); // Unmodifiable standardParams.put("item-template.template1", "template1-#{v=VALUE}"); standardParams.put("item-template.template2", "template2-#{v=OFFSET}"); standardParams.put("map.topic1.to", "template1"); @@ -499,7 +500,12 @@ public void shouldRetrieveBaseConsumerProperties() { ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "5000", ConsumerConfig.METADATA_MAX_AGE_CONFIG, - "250"); + "250", + ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, + "60000", + ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, + "30000" + ); assertThat(baseConsumerProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG)) .startsWith("KAFKA-CONNECTOR-"); }