Skip to content

Commit

Permalink
feat(#3357): Add consumer group to Kafka adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
SvenO3 committed Nov 27, 2024
1 parent 18dbe30 commit a45029e
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ private Consumer<byte[], byte[]> createConsumer(KafkaConfig kafkaConfig) throws
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaConfig.getKafkaHost() + ":" + kafkaConfig.getKafkaPort());

props.put(ConsumerConfig.GROUP_ID_CONFIG,
"KafkaExampleConsumer" + System.currentTimeMillis());
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfig.getGroupId());

props.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 6000);

Expand Down Expand Up @@ -160,6 +159,10 @@ public IAdapterConfiguration declareConfig() {
.requiredTextParameter(KafkaConnectUtils.getHostLabel())
.requiredIntegerParameter(KafkaConnectUtils.getPortLabel())

.requiredAlternatives(KafkaConnectUtils.getConsumerGroupLabel(),
KafkaConnectUtils.getAlternativesRandomGroupId(),
KafkaConnectUtils.getAlternativesGroupId())

.requiredSlideToggle(KafkaConnectUtils.getHideInternalTopicsLabel(), true)

.requiredSingleValueSelectionFromContainer(KafkaConnectUtils.getTopicLabel(), Arrays.asList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,21 @@ public class KafkaConfig {
private String kafkaHost;
private Integer kafkaPort;
private String topic;
private String groupId;

KafkaSecurityConfig securityConfig;
AutoOffsetResetConfig autoOffsetResetConfig;

public KafkaConfig(String kafkaHost,
Integer kafkaPort,
String topic,
String groupId,
KafkaSecurityConfig securityConfig,
AutoOffsetResetConfig autoOffsetResetConfig) {
this.kafkaHost = kafkaHost;
this.kafkaPort = kafkaPort;
this.topic = topic;
this.groupId = groupId;
this.securityConfig = securityConfig;
this.autoOffsetResetConfig = autoOffsetResetConfig;
}
Expand Down Expand Up @@ -66,6 +69,14 @@ public void setTopic(String topic) {
this.topic = topic;
}

public String getGroupId() {
return groupId;
}

public void setGroupId(String groupId) {
this.groupId = groupId;
}

public KafkaSecurityConfig getSecurityConfig() {
return securityConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ public class KafkaConnectUtils {
public static final String USERNAME_KEY = "username";
public static final String PASSWORD_KEY = "password";

public static final String CONSUMER_GROUP = "consumer-group";
public static final String RANDOM_GROUP_ID = "random-group-id";
public static final String GROUP_ID = "group-id";
public static final String GROUP_ID_INPUT = "group-id-input";


private static final String HIDE_INTERNAL_TOPICS = "hide-internal-topics";

Expand Down Expand Up @@ -88,6 +93,10 @@ public static Label getAccessModeLabel() {
return Labels.withId(ACCESS_MODE);
}

public static Label getConsumerGroupLabel() {
return Labels.withId(CONSUMER_GROUP);
}

public static Label getAutoOffsetResetConfigLabel() {
return Labels.withId(AUTO_OFFSET_RESET_CONFIG);
}
Expand Down Expand Up @@ -124,19 +133,26 @@ public static KafkaConfig getConfig(IStaticPropertyExtractor extractor, boolean
new KafkaSecurityUnauthenticatedPlainConfig();
}

String groupId;
if (extractor.selectedAlternativeInternalId(CONSUMER_GROUP).equals(RANDOM_GROUP_ID)){
groupId = "KafkaExampleConsumer" + System.currentTimeMillis();
} else {
groupId = extractor.singleValueParameter(GROUP_ID_INPUT, String.class);
}

StaticPropertyAlternatives alternatives = extractor.getStaticPropertyByName(AUTO_OFFSET_RESET_CONFIG,
StaticPropertyAlternatives.class);

// Set default value if no value is provided.
if (alternatives == null) {
AutoOffsetResetConfig autoOffsetResetConfig = new AutoOffsetResetConfig(KafkaConnectUtils.LATEST);

return new KafkaConfig(brokerUrl, port, topic, securityConfig, autoOffsetResetConfig);
return new KafkaConfig(brokerUrl, port, topic, groupId, securityConfig, autoOffsetResetConfig);
} else {
String auto = extractor.selectedAlternativeInternalId(AUTO_OFFSET_RESET_CONFIG);
AutoOffsetResetConfig autoOffsetResetConfig = new AutoOffsetResetConfig(auto);

return new KafkaConfig(brokerUrl, port, topic, securityConfig, autoOffsetResetConfig);
return new KafkaConfig(brokerUrl, port, topic, groupId, securityConfig, autoOffsetResetConfig);
}
}

Expand Down Expand Up @@ -172,6 +188,14 @@ public static StaticPropertyAlternative getAlternativesSaslSSL() {
StaticProperties.secretValue(Labels.withId(KafkaConnectUtils.PASSWORD_KEY))));
}

public static StaticPropertyAlternative getAlternativesRandomGroupId(){
return Alternatives.from(Labels.withId(RANDOM_GROUP_ID));
}

public static StaticPropertyAlternative getAlternativesGroupId(){
return Alternatives.from(Labels.withId(KafkaConnectUtils.GROUP_ID),
StaticProperties.stringFreeTextProperty(Labels.withId(KafkaConnectUtils.GROUP_ID_INPUT)));
}

public static StaticPropertyAlternative getAlternativesLatest() {
return Alternatives.from(Labels.withId(LATEST));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,18 @@ sasl-ssl.description=Username and password, with ssl encryption

username-group.title=Username and password

consumer-group.title=Consumer Group
consumer-group.description=Use random group id or insert a specific one

random-group-id.title=Random group id
random-group-id.description=StreamPipes generates a random group id

group-id.title=Insert group id
group-id.description=Insert the group id

group-id-input.title=Group id
group-id-input.description=

key-deserialization.title=Key Deserializer
key-deserialization.description=

Expand Down

0 comments on commit a45029e

Please sign in to comment.