From ce7fc912f4e9d53b7aa3f8caeec711bbd026c36a Mon Sep 17 00:00:00 2001 From: Dale Lane Date: Thu, 28 Oct 2021 10:34:25 +0100 Subject: [PATCH] feat: add support for security schemes in Kafka config (#158) This commit adds support for the new security schemes introduced in AsyncAPI 2.1.0, for code generated for Kafka servers. When provided with AsyncAPI specification documents that describe Kafka clusters requiring authentication, the generator will now be able to generate the correct configuration for client code. Signed-off-by: Dale Lane --- hooks/02_removeNotRelevantParts.js | 2 +- partials/KafkaConfig.java | 68 ++++++++++++++++++- .../com/asyncapi/infrastructure/Config.java | 2 +- 3 files changed, 68 insertions(+), 4 deletions(-) diff --git a/hooks/02_removeNotRelevantParts.js b/hooks/02_removeNotRelevantParts.js index de44549ef..54d0b9e90 100644 --- a/hooks/02_removeNotRelevantParts.js +++ b/hooks/02_removeNotRelevantParts.js @@ -8,7 +8,7 @@ module.exports = { for (let server of Object.values(asyncapi.servers())) { hasAmqp = hasAmqp || server.protocol() === 'amqp'; hasMqtt = hasMqtt || server.protocol() === 'mqtt'; - hasKafka = hasKafka || server.protocol() === 'kafka'; + hasKafka = hasKafka || server.protocol() === 'kafka' || server.protocol() === 'kafka-secure'; } if (!hasKafka) { // remove filers from template related only to Kafka diff --git a/partials/KafkaConfig.java b/partials/KafkaConfig.java index cbcb3eb94..242833284 100644 --- a/partials/KafkaConfig.java +++ b/partials/KafkaConfig.java @@ -10,10 +10,60 @@ {%- endif -%} {%- endfor %} +{%- set securityProtocol = "PLAINTEXT" -%} +{%- set saslMechanism = null -%} +{%- set saslJaasConfig = null -%} +{%- for serverName, server in asyncapi.servers() -%} + {%- if server.protocol() == "kafka" -%} + {%- if server.security() -%} + {%- set securityProtocol = "SASL_PLAINTEXT" -%} + {%- else -%} + {%- set securityProtocol = "PLAINTEXT" -%} + {%- endif -%} + {%- elif server.protocol() == "kafka-secure" -%} + {%- if server.security() -%} + {%- set securityProtocol = "SASL_SSL" -%} + {%- else -%} + {%- set securityProtocol = "SSL" -%} + {%- endif -%} + {%- endif -%} + + {%- if asyncapi.hasComponents() and asyncapi.components().hasSecuritySchemes() and server.security() -%} + {%- for securityRef in server.security() -%} + {%- for securityName, securityObj in securityRef -%} + {%- for securityRefName, securityRefObj in securityObj -%} + {%- for securitySchemeRef, securityScheme in asyncapi.components().securitySchemes()[securityRefName] -%} + {%- if securityScheme.type == "plain" -%} + {%- set saslMechanism = "PLAIN" -%} + {%- set saslJaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required username='USERNAME' password='PASSWORD';" -%} + {%- elif securityScheme.type == "scramSha256" -%} + {%- set saslMechanism = "SCRAM-SHA-256" -%} + {%- set saslJaasConfig = "org.apache.kafka.common.security.scram.ScramLoginModule required username='USERNAME' password='PASSWORD';" -%} + {%- elif securityScheme.type == "scramSha512" -%} + {%- set saslMechanism = "SCRAM-SHA-512" -%} + {%- set saslJaasConfig = "org.apache.kafka.common.security.scram.ScramLoginModule required username='USERNAME' password='PASSWORD';" -%} + {%- elif securityScheme.type == "oauth2" -%} + {%- set saslMechanism = "OAUTHBEARER" -%} + {%- set saslJaasConfig = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub='LOGINSTRING';" -%} + {%- elif securityScheme.type == "gssapi" -%} + {%- set saslMechanism = "GSSAPI" -%} + {%- set saslJaasConfig = "com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab='CLIENT.KEYTAB' principal='EMAIL@DOMAIN.COM';" -%} + {%- elif securityScheme.type == "X509" -%} + {%- set securityProtocol = "SSL" -%} + {%- endif -%} + {%- endfor -%} + {%- endfor -%} + {%- endfor -%} + {%- endfor -%} + {%- endif -%} +{%- endfor %} + +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.config.SaslConfigs; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -57,9 +107,16 @@ public Map producerConfigs() { props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - {% if params.addTypeInfoHeader === 'false' %} - props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false); + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "{{ securityProtocol }}"); + {%- if saslMechanism %} + props.put(SaslConfigs.SASL_MECHANISM, "{{ saslMechanism }}"); + {%- endif -%} + {%- if saslJaasConfig %} + props.put(SaslConfigs.SASL_JAAS_CONFIG, "{{ saslJaasConfig | safe }}"); {% endif -%} + {%- if params.addTypeInfoHeader === 'false' %} + props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false); + {% endif %} props.put(JsonSerializer.TYPE_MAPPINGS, {%- for schema in asyncapi.allSchemas().values() | isObjectType %} {%- if schema.uid() | first !== '<' and schema.type() === 'object' %} @@ -94,6 +151,13 @@ public Map consumerConfigs() { props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "{{ securityProtocol }}"); + {%- if saslMechanism %} + props.put(SaslConfigs.SASL_MECHANISM, "{{ saslMechanism }}"); + {%- endif -%} + {%- if saslJaasConfig %} + props.put(SaslConfigs.SASL_JAAS_CONFIG, "{{ saslJaasConfig | safe }}"); + {% endif -%} props.put(JsonDeserializer.TYPE_MAPPINGS, {%- for schema in asyncapi.allSchemas().values() | isObjectType %} {%- if schema.uid() | first !== '<' and schema.type() === 'object' %} diff --git a/template/src/main/java/com/asyncapi/infrastructure/Config.java b/template/src/main/java/com/asyncapi/infrastructure/Config.java index c10bc507e..3f530fec8 100644 --- a/template/src/main/java/com/asyncapi/infrastructure/Config.java +++ b/template/src/main/java/com/asyncapi/infrastructure/Config.java @@ -10,6 +10,6 @@ {%- if asyncapi | isProtocol('mqtt') -%} {{- mqttConfig(asyncapi, params) -}} {%- endif -%} -{%- if asyncapi | isProtocol('kafka') -%} +{%- if (asyncapi | isProtocol('kafka')) or (asyncapi | isProtocol('kafka-secure')) -%} {{- kafkaConfig(asyncapi, params) -}} {%- endif -%} \ No newline at end of file