Skip to content

Commit

Permalink
feat: add support for security schemes in Kafka config (#158)
Browse files Browse the repository at this point in the history
This commit adds support for the new security schemes introduced
in AsyncAPI 2.1.0, for code generated for Kafka servers.

When provided with AsyncAPI specification documents that describe
Kafka clusters requiring authentication, the generator will now
be able to generate the correct configuration for client code.

Signed-off-by: Dale Lane <[email protected]>
  • Loading branch information
dalelane authored Oct 28, 2021
1 parent a5ae56d commit ce7fc91
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 4 deletions.
2 changes: 1 addition & 1 deletion hooks/02_removeNotRelevantParts.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ module.exports = {
for (let server of Object.values(asyncapi.servers())) {
hasAmqp = hasAmqp || server.protocol() === 'amqp';
hasMqtt = hasMqtt || server.protocol() === 'mqtt';
hasKafka = hasKafka || server.protocol() === 'kafka';
hasKafka = hasKafka || server.protocol() === 'kafka' || server.protocol() === 'kafka-secure';
}
if (!hasKafka) {
// remove filers from template related only to Kafka
Expand Down
68 changes: 66 additions & 2 deletions partials/KafkaConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,60 @@
{%- endif -%}
{%- endfor %}

{%- set securityProtocol = "PLAINTEXT" -%}
{%- set saslMechanism = null -%}
{%- set saslJaasConfig = null -%}
{%- for serverName, server in asyncapi.servers() -%}
{%- if server.protocol() == "kafka" -%}
{%- if server.security() -%}
{%- set securityProtocol = "SASL_PLAINTEXT" -%}
{%- else -%}
{%- set securityProtocol = "PLAINTEXT" -%}
{%- endif -%}
{%- elif server.protocol() == "kafka-secure" -%}
{%- if server.security() -%}
{%- set securityProtocol = "SASL_SSL" -%}
{%- else -%}
{%- set securityProtocol = "SSL" -%}
{%- endif -%}
{%- endif -%}

{%- if asyncapi.hasComponents() and asyncapi.components().hasSecuritySchemes() and server.security() -%}
{%- for securityRef in server.security() -%}
{%- for securityName, securityObj in securityRef -%}
{%- for securityRefName, securityRefObj in securityObj -%}
{%- for securitySchemeRef, securityScheme in asyncapi.components().securitySchemes()[securityRefName] -%}
{%- if securityScheme.type == "plain" -%}
{%- set saslMechanism = "PLAIN" -%}
{%- set saslJaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required username='USERNAME' password='PASSWORD';" -%}
{%- elif securityScheme.type == "scramSha256" -%}
{%- set saslMechanism = "SCRAM-SHA-256" -%}
{%- set saslJaasConfig = "org.apache.kafka.common.security.scram.ScramLoginModule required username='USERNAME' password='PASSWORD';" -%}
{%- elif securityScheme.type == "scramSha512" -%}
{%- set saslMechanism = "SCRAM-SHA-512" -%}
{%- set saslJaasConfig = "org.apache.kafka.common.security.scram.ScramLoginModule required username='USERNAME' password='PASSWORD';" -%}
{%- elif securityScheme.type == "oauth2" -%}
{%- set saslMechanism = "OAUTHBEARER" -%}
{%- set saslJaasConfig = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub='LOGINSTRING';" -%}
{%- elif securityScheme.type == "gssapi" -%}
{%- set saslMechanism = "GSSAPI" -%}
{%- set saslJaasConfig = "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab='CLIENT.KEYTAB' principal='[email protected]';" -%}
{%- elif securityScheme.type == "X509" -%}
{%- set securityProtocol = "SSL" -%}
{%- endif -%}
{%- endfor -%}
{%- endfor -%}
{%- endfor -%}
{%- endfor -%}
{%- endif -%}
{%- endfor %}

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.config.SaslConfigs;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand Down Expand Up @@ -57,9 +107,16 @@ public Map<String, Object> producerConfigs() {
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
{% if params.addTypeInfoHeader === 'false' %}
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "{{ securityProtocol }}");
{%- if saslMechanism %}
props.put(SaslConfigs.SASL_MECHANISM, "{{ saslMechanism }}");
{%- endif -%}
{%- if saslJaasConfig %}
props.put(SaslConfigs.SASL_JAAS_CONFIG, "{{ saslJaasConfig | safe }}");
{% endif -%}
{%- if params.addTypeInfoHeader === 'false' %}
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
{% endif %}
props.put(JsonSerializer.TYPE_MAPPINGS,
{%- for schema in asyncapi.allSchemas().values() | isObjectType %}
{%- if schema.uid() | first !== '<' and schema.type() === 'object' %}
Expand Down Expand Up @@ -94,6 +151,13 @@ public Map<String, Object> consumerConfigs() {
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "{{ securityProtocol }}");
{%- if saslMechanism %}
props.put(SaslConfigs.SASL_MECHANISM, "{{ saslMechanism }}");
{%- endif -%}
{%- if saslJaasConfig %}
props.put(SaslConfigs.SASL_JAAS_CONFIG, "{{ saslJaasConfig | safe }}");
{% endif -%}
props.put(JsonDeserializer.TYPE_MAPPINGS,
{%- for schema in asyncapi.allSchemas().values() | isObjectType %}
{%- if schema.uid() | first !== '<' and schema.type() === 'object' %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@
{%- if asyncapi | isProtocol('mqtt') -%}
{{- mqttConfig(asyncapi, params) -}}
{%- endif -%}
{%- if asyncapi | isProtocol('kafka') -%}
{%- if (asyncapi | isProtocol('kafka')) or (asyncapi | isProtocol('kafka-secure')) -%}
{{- kafkaConfig(asyncapi, params) -}}
{%- endif -%}

0 comments on commit ce7fc91

Please sign in to comment.