-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathKafkaJavaExample.java
90 lines (80 loc) · 3.83 KB
/
KafkaJavaExample.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import edu.umd.cs.findbugs.annotations.NonNull;
import io.gatling.javaapi.core.ScenarioBuilder;
import io.github.amerousful.kafka.javaapi.KafkaMessageMatcher;
import io.github.amerousful.kafka.javaapi.KafkaProtocolBuilder;
import io.github.amerousful.kafka.protocol.SaslMechanism;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Collections;
import static io.gatling.javaapi.core.CoreDsl.*;
import static io.github.amerousful.kafka.javaapi.KafkaDsl.*;
public class KafkaJavaExample {
private static final KafkaMessageMatcher customMatcher =
new KafkaMessageMatcher() {
@NonNull
@Override
public String requestMatchId(@NonNull ProducerRecord<String, ?> msg) {
return msg.key();
}
@NonNull
@Override
public String responseMatchId(@NonNull ConsumerRecord<String, ?> msg) {
return msg.key();
}
};
KafkaProtocolBuilder kafkaProtocol = kafka
.broker(KafkaBroker("kafka.us-east-1.amazonaws.com", 9096))
.brokers(
KafkaBroker("kafka.us-east-2.amazonaws.com", 9096),
KafkaBroker("kafka.us-east-3.amazonaws.com", 9096)
)
.acks("1")
.producerIdenticalSerializer("org.apache.kafka.common.serialization.StringSerializer")
.consumerIdenticalDeserializer("org.apache.kafka.common.serialization.StringDeserializer")
.addProducerProperty("retries", "3")
.addConsumerProperty("heartbeat.interval.ms", "3000")
.credentials("admin", "password", true, SaslMechanism.plain())
.replyTimeout(10)
.matchByKey()
.matchByValue()
.messageMatcher(customMatcher)
.replyConsumerName("gatling-test-consumer");
//#simple
public boolean checkRecordValue(ConsumerRecord<String, ?> record) {
return record.value().equals("myValue");
}
ScenarioBuilder scn =
scenario("scenario")
.exec(
kafka("Kafka: fire and forget")
.send()
.topic("input_topic")
.payload("#{payload}")
.key("#{key}")
.header("k1", "v1")
.headers(Collections.singletonMap("key", "value"))
)
.exec(
kafka("Kafka: request with reply")
.requestReply()
.topic("input_topic")
.payload("message")
.replyTopic("output_topic")
.key("#{key}")
.check(jsonPath("$.m").is("#{payload}_1"))
.checkIf("#{bool}")
.then(jsonPath("$..foo"))
.checkIf((message, session) -> true)
.then(jsonPath("$").is("hello"))
.check(header("header1").in("value1"))
.check(simpleCheck(this::checkRecordValue))
)
.exec(
kafka("Kafka only consume")
.onlyConsume()
.readTopic("")
.payloadForTracking("")
.keyForTracking("")
.startTime(session -> 100000000L)
);
}