Skip to content

Commit

Permalink
feat: support multiple messages (#179)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tenischev authored May 23, 2022
1 parent db6344e commit e5c94fd
Show file tree
Hide file tree
Showing 12 changed files with 874 additions and 71 deletions.
13 changes: 10 additions & 3 deletions partials/KafkaPublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
{% for channelName, channel in asyncapi.channels() %}
{%- if channel.hasSubscribe() %}
import {{params['userJavaPackage']}}.model.{{channel.subscribe().message().payload().uid() | camelCase | upperFirst}};
{%- if channel.hasSubscribe() %}
{%- for message in channel.subscribe().messages() %}
import {{params['userJavaPackage']}}.model.{{message.payload().uid() | camelCase | upperFirst}};
{%- endfor -%}
{% endif -%}
{% endfor %}

Expand All @@ -18,7 +20,12 @@ public class PublisherService {
@Autowired
private KafkaTemplate<Integer, Object> kafkaTemplate;
{% for channelName, channel in asyncapi.channels() %}
{%- if channel.hasSubscribe() %} {% set varName = channel.subscribe().message().payload().uid() | camelCase %}
{%- if channel.hasSubscribe() %}
{%- if channel.subscribe().hasMultipleMessages() %}
{%- set varName = "object" %}
{%- else %}
{%- set varName = channel.subscribe().message().payload().uid() | camelCase %}
{%- endif %}
{% if channel.description() or channel.subscribe().description() %}/**{% for line in channel.description() | splitByLines %}
* {{line | safe}}{% endfor %}{% for line in channel.subscribe().description() | splitByLines %}
* {{line | safe}}{% endfor %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ public void run(String... args) {

{%- for channelName, channel in asyncapi.channels() %}
{%- if channel.hasSubscribe() %}
publisherService.{{channel.subscribe().id() | camelCase}}({% if asyncapi | isProtocol('kafka') %}(new Random()).nextInt(), new {{ params['userJavaPackage'] }}.model.{{channel.subscribe().message().payload().uid() | camelCase | upperFirst}}(){% else %}"Hello World from {{channelName}}"{% endif %});
{%- for message in channel.subscribe().messages() %}
publisherService.{{channel.subscribe().id() | camelCase}}({% if asyncapi | isProtocol('kafka') %}(new Random()).nextInt(), new {{ params['userJavaPackage'] }}.model.{{message.payload().uid() | camelCase | upperFirst}}(){% else %}"Hello World from {{channelName}}"{% endif %});
{% endfor -%}
{% endif -%}
{%- endfor %}
System.out.println("Message sent");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import org.springframework.messaging.handler.annotation.Payload;
{% for channelName, channel in asyncapi.channels() %}
{%- if channel.hasPublish() %}
import {{ params['userJavaPackage'] }}.model.{{channel.publish().message().payload().uid() | camelCase | upperFirst}};
{% endif -%}
{% endfor -%}
{%- for message in channel.publish().messages() %}
import {{ params['userJavaPackage'] }}.model.{{message.payload().uid() | camelCase | upperFirst}};
{%- endfor %}
{%- endif %}
{%- endfor %}
{% endif %}
@Service
public class MessageHandlerService {
Expand All @@ -32,13 +34,17 @@ public class MessageHandlerService {
{% if asyncapi | isProtocol('kafka') %}
{% for channelName, channel in asyncapi.channels() %}
{%- if channel.hasPublish() %}

{%- if channel.publish().hasMultipleMessages() %}
{%- set typeName = "Object" %}
{%- else %}
{%- set typeName = channel.publish().message().payload().uid() | camelCase | upperFirst %}
{%- endif %}
{% if channel.description() or channel.publish().description() %}/**{% for line in channel.description() | splitByLines %}
* {{line | safe}}{% endfor %}{% for line in channel.publish().description() | splitByLines %}
* {{line | safe}}{% endfor %}
*/{% endif %}
@KafkaListener(topics = "{{channelName}}"{% if channel.publish().binding('kafka') %}, groupId = "{{channel.publish().binding('kafka').groupId}}"{% endif %})
public void {{channel.publish().id() | camelCase}}(@Payload {{channel.publish().message().payload().uid() | camelCase | upperFirst}} payload,
public void {{channel.publish().id() | camelCase}}(@Payload {{typeName}} payload,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp) {
Expand Down
36 changes: 20 additions & 16 deletions template/src/test/java/com/asyncapi/SimpleKafkaTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
{%- endfor -%}
package {{ params['userJavaPackage'] }};

{% for channelName, channel in asyncapi.channels() %} {% if channel.hasSubscribe() %}
import {{ params['userJavaPackage'] }}.model.{{channel.subscribe().message().payload().uid() | camelCase | upperFirst}};
{% endif %} {% endfor %}
{% for channelName, channel in asyncapi.channels() %} {% if channel.hasPublish() %}
import {{ params['userJavaPackage'] }}.model.{{channel.publish().message().payload().uid() | camelCase | upperFirst}};
{% endif %} {% endfor %}
{% for channelName, channel in asyncapi.channels() %} {% if channel.hasSubscribe() %} {% for message in channel.subscribe().messages() %}
import {{params['userJavaPackage']}}.model.{{message.payload().uid() | camelCase | upperFirst}};
{% endfor %} {% endif %} {% endfor %}
{% for channelName, channel in asyncapi.channels() %} {% if channel.hasPublish() %} {% for message in channel.publish().messages() %}
import {{ params['userJavaPackage'] }}.model.{{message.payload().uid() | camelCase | upperFirst}};
{% endfor %} {% endif %} {% endfor %}
{% if hasSubscribe %}import {{ params['userJavaPackage'] }}.service.PublisherService;{% endif %}
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand Down Expand Up @@ -55,12 +55,12 @@
@SpringBootTest
public class SimpleKafkaTest {
{% for channelName, channel in asyncapi.channels() %} {% if channel.hasSubscribe() %}
private static final String {{channel.subscribe().id() | upper-}}_TOPIC = "{{channelName}}";
private static final String {{channel.subscribe().id() | upper-}}_SUBSCRIBE_TOPIC = "{{channelName}}";
{% endif %} {% if channel.hasPublish() %}
private static final String {{channel.publish().id() | upper-}}_TOPIC = "{{channelName}}";
private static final String {{channel.publish().id() | upper-}}_PUBLISH_TOPIC = "{{channelName}}";
{% endif %} {% endfor %}
@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, false, 1{% for channelName, channel in asyncapi.channels() %}{% if channel.hasSubscribe() %}, {{channel.subscribe().id() | upper-}}_TOPIC{% endif %}{% endfor %});
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, false, 1{% for channelName, channel in asyncapi.channels() %}{% if channel.hasSubscribe() %}, {{channel.subscribe().id() | upper-}}_SUBSCRIBE_TOPIC{% endif %}{% endfor %});

private static EmbeddedKafkaBroker embeddedKafkaBroker = embeddedKafka.getEmbeddedKafka();

Expand All @@ -73,7 +73,8 @@ public static void kafkaProperties(DynamicPropertyRegistry registry) {
@Autowired
private PublisherService publisherService;
{% for channelName, channel in asyncapi.channels() %} {% if channel.hasSubscribe() %}
Consumer<Integer, {{channel.subscribe().message().payload().uid() | camelCase | upperFirst}}> consumer{{ channelName | camelCase | upperFirst}};
{%- if channel.subscribe().hasMultipleMessages() %} {% set typeName = "Object" %} {% else %} {% set typeName = channel.subscribe().message().payload().uid() | camelCase | upperFirst %} {% endif %}
Consumer<Integer, {{typeName}}> consumer{{ channelName | camelCase | upperFirst}};
{% endif %} {% endfor %} {% endif %} {% if hasPublish %}
Producer<Integer, Object> producer;
{% endif %}
Expand All @@ -83,8 +84,9 @@ public void init() {
Map<String, Object> consumerConfigs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "true", embeddedKafkaBroker));
consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
{% for channelName, channel in asyncapi.channels() %} {% if channel.hasSubscribe() %}
consumer{{ channelName | camelCase | upperFirst}} = new DefaultKafkaConsumerFactory<>(consumerConfigs, new IntegerDeserializer(), new JsonDeserializer<>({{channel.subscribe().message().payload().uid() | camelCase | upperFirst}}.class)).createConsumer();
consumer{{ channelName | camelCase | upperFirst}}.subscribe(Collections.singleton({{channel.subscribe().id() | upper-}}_TOPIC));
{%- if channel.subscribe().hasMultipleMessages() %} {% set typeName = "Object" %} {% else %} {% set typeName = channel.subscribe().message().payload().uid() | camelCase | upperFirst %} {% endif %}
consumer{{ channelName | camelCase | upperFirst}} = new DefaultKafkaConsumerFactory<>(consumerConfigs, new IntegerDeserializer(), new JsonDeserializer<>({{typeName}}.class)).createConsumer();
consumer{{ channelName | camelCase | upperFirst}}.subscribe(Collections.singleton({{channel.subscribe().id() | upper-}}_SUBSCRIBE_TOPIC));
consumer{{ channelName | camelCase | upperFirst}}.poll(Duration.ZERO);
{% endif %} {% endfor %} {% endif %} {% if hasPublish %}
Map<String, Object> producerConfigs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
Expand All @@ -94,24 +96,26 @@ public void init() {
{% for channelName, channel in asyncapi.channels() %} {% if channel.hasSubscribe() %}
@Test
public void {{channel.subscribe().id() | camelCase}}ProducerTest() {
{{channel.subscribe().message().payload().uid() | camelCase | upperFirst}} payload = new {{channel.subscribe().message().payload().uid() | camelCase | upperFirst}}();
{%- if channel.subscribe().hasMultipleMessages() %} {% set typeName = "Object" %} {% else %} {% set typeName = channel.subscribe().message().payload().uid() | camelCase | upperFirst %} {% endif %}
{{typeName}} payload = new {{typeName}}();
Integer key = 1;

KafkaTestUtils.getRecords(consumer{{ channelName | camelCase | upperFirst}});

publisherService.{{channel.subscribe().id() | camelCase}}(key, payload);

ConsumerRecord<Integer, {{channel.subscribe().message().payload().uid() | camelCase | upperFirst}}> singleRecord = KafkaTestUtils.getSingleRecord(consumer{{ channelName | camelCase | upperFirst}}, {{channel.subscribe().id() | upper-}}_TOPIC);
ConsumerRecord<Integer, {{typeName}}> singleRecord = KafkaTestUtils.getSingleRecord(consumer{{ channelName | camelCase | upperFirst}}, {{channel.subscribe().id() | upper-}}_SUBSCRIBE_TOPIC);

assertEquals("Key is wrong", key, singleRecord.key());
}
{% endif %} {% if channel.hasPublish() %}
@Test
public void {{channel.publish().id() | camelCase}}ConsumerTest() throws InterruptedException {
Integer key = 1;
{{channel.publish().message().payload().uid() | camelCase | upperFirst}} payload = new {{channel.publish().message().payload().uid() | camelCase | upperFirst}}();
{%- if channel.publish().hasMultipleMessages() %} {% set typeName = "Object" %} {% else %} {% set typeName = channel.publish().message().payload().uid() | camelCase | upperFirst %} {% endif %}
{{typeName}} payload = new {{typeName}}();

ProducerRecord<Integer, Object> producerRecord = new ProducerRecord<>({{channel.publish().id() | upper-}}_TOPIC, key, payload);
ProducerRecord<Integer, Object> producerRecord = new ProducerRecord<>({{channel.publish().id() | upper-}}_PUBLISH_TOPIC, key, payload);
producer.send(producerRecord);
producer.flush();
Thread.sleep(1_000);
Expand Down
28 changes: 15 additions & 13 deletions template/src/test/java/com/asyncapi/TestcontainerKafkaTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
{%- endfor -%}
package {{ params['userJavaPackage'] }};

{% for channelName, channel in asyncapi.channels() %} {% if channel.hasSubscribe() %}
import {{ params['userJavaPackage'] }}.model.{{channel.subscribe().message().payload().uid() | camelCase | upperFirst}};
{% endif %} {% endfor %}
{% for channelName, channel in asyncapi.channels() %} {% if channel.hasPublish() %}
import {{ params['userJavaPackage'] }}.model.{{channel.publish().message().payload().uid() | camelCase | upperFirst}};
{% endif %} {% endfor %}
{% for channelName, channel in asyncapi.channels() %} {% if channel.hasSubscribe() %}{% for message in channel.subscribe().messages() %}
import {{params['userJavaPackage']}}.model.{{message.payload().uid() | camelCase | upperFirst}};
{% endfor %}{% endif %} {% endfor %}
{% for channelName, channel in asyncapi.channels() %} {% if channel.hasPublish() %}{% for message in channel.publish().messages() %}
import {{ params['userJavaPackage'] }}.model.{{message.payload().uid() | camelCase | upperFirst}};
{% endfor %}{% endif %} {% endfor %}
{% if hasSubscribe %}import {{ params['userJavaPackage'] }}.service.PublisherService;{% endif %}
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand Down Expand Up @@ -56,9 +56,9 @@
public class TestcontainerKafkaTest {

{% for channelName, channel in asyncapi.channels() %} {% if channel.hasSubscribe() %}
private static final String {{channel.subscribe().id() | upper-}}_TOPIC = "{{channelName}}";
private static final String {{channel.subscribe().id() | upper-}}_SUBSCRIBE_TOPIC = "{{channelName}}";
{% endif %} {% if channel.hasPublish() %}
private static final String {{channel.publish().id() | upper-}}_TOPIC = "{{channelName}}";
private static final String {{channel.publish().id() | upper-}}_PUBLISH_TOPIC = "{{channelName}}";
{% endif %} {% endfor %}
@ClassRule
public static KafkaContainer kafka = new KafkaContainer();
Expand All @@ -71,17 +71,18 @@ public static void kafkaProperties(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
}
{% for channelName, channel in asyncapi.channels() %} {% if channel.hasSubscribe() %}
{%- if channel.subscribe().hasMultipleMessages() %}{% set typeName = "Object" %}{% else %}{% set typeName = channel.subscribe().message().payload().uid() | camelCase | upperFirst %}{% endif %}
@Test
public void {{channel.subscribe().id() | camelCase}}ProducerTestcontainers() {
{{channel.subscribe().message().payload().uid() | camelCase | upperFirst}} payload = new {{channel.subscribe().message().payload().uid() | camelCase | upperFirst}}();
{{typeName}} payload = new {{typeName}}();
Integer key = 1;
Integer wrongKey = key + 1;

consumeMessages({{channel.subscribe().id() | upper-}}_TOPIC);
consumeMessages({{channel.subscribe().id() | upper-}}_SUBSCRIBE_TOPIC);

publisherService.{{channel.subscribe().id() | camelCase}}(key, payload);

ConsumerRecord<Integer, Object> consumedMessage = consumeMessage({{channel.subscribe().id() | upper-}}_TOPIC);
ConsumerRecord<Integer, Object> consumedMessage = consumeMessage({{channel.subscribe().id() | upper-}}_SUBSCRIBE_TOPIC);

assertEquals("Key is wrong", key, consumedMessage.key());
assertNotEquals("Key is wrong", wrongKey, consumedMessage.key());
Expand All @@ -90,9 +91,10 @@ public static void kafkaProperties(DynamicPropertyRegistry registry) {
@Test
public void {{channel.publish().id() | camelCase}}ConsumerTestcontainers() throws Exception {
Integer key = 1;
{{channel.publish().message().payload().uid() | camelCase | upperFirst}} payload = new {{channel.publish().message().payload().uid() | camelCase | upperFirst}}();
{%- if channel.publish().hasMultipleMessages() %}{% set typeName = "Object" %}{% else %}{% set typeName = channel.publish().message().payload().uid() | camelCase | upperFirst %}{% endif %}
{{typeName}} payload = new {{typeName}}();

ProducerRecord<Integer, Object> producerRecord = new ProducerRecord<>({{channel.publish().id() | upper-}}_TOPIC, key, payload);
ProducerRecord<Integer, Object> producerRecord = new ProducerRecord<>({{channel.publish().id() | upper-}}_PUBLISH_TOPIC, key, payload);

sendMessage(producerRecord);

Expand Down
26 changes: 14 additions & 12 deletions template/src/test/java/com/asyncapi/TestcontainerMqttTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
{%- endfor -%}
package {{ params['userJavaPackage'] }};

{% for channelName, channel in asyncapi.channels() %} {% if channel.hasSubscribe() %}
import {{ params['userJavaPackage'] }}.model.{{channel.subscribe().message().payload().uid() | camelCase | upperFirst}};
{% endif %} {% endfor %}
{% for channelName, channel in asyncapi.channels() %} {% if channel.hasPublish() %}
import {{ params['userJavaPackage'] }}.model.{{channel.publish().message().payload().uid() | camelCase | upperFirst}};
{% endif %} {% endfor %}
{% for channelName, channel in asyncapi.channels() %} {% if channel.hasSubscribe() %}{% for message in channel.subscribe().messages() %}
import {{params['userJavaPackage']}}.model.{{message.payload().uid() | camelCase | upperFirst}};
{% endfor %}{% endif %} {% endfor %}
{% for channelName, channel in asyncapi.channels() %} {% if channel.hasPublish() %}{% for message in channel.publish().messages() %}
import {{ params['userJavaPackage'] }}.model.{{message.payload().uid() | camelCase | upperFirst}};
{% endfor %}{% endif %} {% endfor %}
{% if hasSubscribe %}import {{ params['userJavaPackage'] }}.service.PublisherService;{% endif %}
import org.eclipse.paho.client.mqttv3.*;
import org.junit.ClassRule;
Expand Down Expand Up @@ -46,10 +46,10 @@ public class TestcontainerMqttTest {

{% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %}
@Value("${mqtt.topic.{{-channel.publish().id() | camelCase-}}}")
private String {{channel.publish().id() | camelCase-}}Topic;
private String {{channel.publish().id() | camelCase-}}PublishTopic;
{% elif channel.hasSubscribe() %}
@Value("${mqtt.topic.{{-channel.subscribe().id() | camelCase-}}}")
private String {{channel.subscribe().id() | camelCase-}}Topic;
private String {{channel.subscribe().id() | camelCase-}}SubscribeTopic;
{% endif %}{% endfor %}

@ClassRule
Expand Down Expand Up @@ -79,12 +79,13 @@ public void after() throws MqttException {
}

{% for channelName, channel in asyncapi.channels() %} {% if channel.hasSubscribe() %}
{% if channel.subscribe().hasMultipleMessages() %}{% set typeName = "Object" %}{% else %}{% set typeName = channel.subscribe().message().payload().uid() | camelCase | upperFirst %}{% endif -%}
@Test
public void {{channel.subscribe().id() | camelCase}}ProducerTestcontainers() throws MqttException {
{{channel.subscribe().message().payload().uid() | camelCase | upperFirst}} payload = new {{channel.subscribe().message().payload().uid() | camelCase | upperFirst}}();
{{typeName}} payload = new {{typeName}}();

List<MqttMessage> receivedMessages = new ArrayList<>();
publisher.subscribe({{channel.subscribe().id() | camelCase-}}Topic, (topic, message) -> {
publisher.subscribe({{channel.subscribe().id() | camelCase-}}SubscribeTopic, (topic, message) -> {
receivedMessages.add(message);
});

Expand All @@ -97,9 +98,10 @@ public void after() throws MqttException {
{% endif %} {% if channel.hasPublish() %}
@Test
public void {{channel.publish().id() | camelCase}}ConsumerTestcontainers() throws Exception {
{{channel.publish().message().payload().uid() | camelCase | upperFirst}} payload = new {{channel.publish().message().payload().uid() | camelCase | upperFirst}}();
{% if channel.publish().hasMultipleMessages() %}{% set typeName = "Object" %}{% else %}{% set typeName = channel.publish().message().payload().uid() | camelCase | upperFirst %}{% endif -%}
{{typeName}} payload = new {{typeName}}();

sendMessage({{channel.publish().id() | camelCase-}}Topic, payload.toString().getBytes());
sendMessage({{channel.publish().id() | camelCase-}}PublishTopic, payload.toString().getBytes());

Thread.sleep(1_000);
}
Expand Down
Loading

0 comments on commit e5c94fd

Please sign in to comment.