Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial support for AVRO consumer #5

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<packaging>jar</packaging>

<properties>
<revision>0.1.2</revision>
<revision>0.1.3</revision>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<kafka-clients.version>2.4.0</kafka-clients.version>
Expand All @@ -29,6 +29,13 @@
</repository>
</distributionManagement>

<repositories>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>

<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
Expand Down Expand Up @@ -74,6 +81,14 @@
<version>2.4.0</version>
</dependency>

<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>5.5.0</version>
<!-- Avoid including it for everybody -->
<optional>true</optional>
</dependency>

</dependencies>

<build>
Expand Down
287 changes: 287 additions & 0 deletions src/main/java/karate/kafka/AbstractKarateKafkaConsumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
package karate.kafka;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.JsonPathException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

import static com.jayway.jsonpath.internal.Utils.isEmpty;
import static java.util.Objects.isNull;

abstract class AbstractKarateKafkaConsumer implements Runnable {

static Logger logger = LoggerFactory.getLogger(AbstractKarateKafkaConsumer.class.getName());
private KafkaConsumer<Object, Object> kafka;
private CountDownLatch startupLatch = new CountDownLatch(1);
private CountDownLatch shutdownLatch = new CountDownLatch(1);
private boolean partitionsAssigned = false;
private Pattern keyFilter; // Java regular expression
private String valueFilter; // Json path expression

private BlockingQueue<String> outputList = new LinkedBlockingQueue<>();

AbstractKarateKafkaConsumer(String kafkaTopic, Map<String, String> consumerProperties) {
this(kafkaTopic, consumerProperties, null, null);
}

AbstractKarateKafkaConsumer(String kafkaTopic) {
this(kafkaTopic, null, null);
}

AbstractKarateKafkaConsumer(String kafkaTopic, String keyFilterExpression, String valueFilterExpression) {
Properties cp = getDefaultProperties();
setKeyValueFilters(keyFilterExpression, valueFilterExpression);
create(kafkaTopic, cp);
}

AbstractKarateKafkaConsumer(
String kafkaTopic,
Map<String, String> consumerProperties,
String keyFilterExpression,
String valueFilterExpression) {

setKeyValueFilters(keyFilterExpression, valueFilterExpression);
Properties cp = new Properties();
for (String key : consumerProperties.keySet()) {
String value = consumerProperties.get(key);
cp.setProperty(key, value);
}
create(kafkaTopic, cp);
}

// All constructors eventually call this ....
private void create(String kafkaTopic, Properties cp) {

// Create the consumer and subscribe to the topic
kafka = new KafkaConsumer<Object, Object>(cp);
kafka.subscribe(Collections.singleton(kafkaTopic));
// Start the thread which will poll the topic for data.
Thread t = new Thread(this);
t.start();

// And we will wait until topics have been assigned to this consumer
// Once topics have been assigned to this consumer, the latch is set. Until then we wait ...
// and wait ... and wait ...
logger.debug("Waiting for consumer to be ready..");
try {
startupLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.debug("consumer is ready");
}

/**
* Sets the predicate to filter kafka records based on key or/and value
*
* @param keyFilterExpression Java regular expression pattern
* @param valueFilterExpression <a href="https://github.com/json-path/JsonPath">JsonPath</a>
* expression
*/
private void setKeyValueFilters(String keyFilterExpression, String valueFilterExpression) {
if (!isEmpty(keyFilterExpression)) {
this.keyFilter = Pattern.compile(keyFilterExpression);
}
if (!isEmpty(valueFilterExpression)) {
this.valueFilter = valueFilterExpression;
}
}

static Properties getDefaultProperties() {
// Consumer Configuration
Properties cp = new Properties();
cp.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");

cp.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
cp.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
cp.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "karate-kafka-default-consumer-group");

return cp;
}

public void close() {
logger.debug("asking consumer to shutdown ...");
kafka.wakeup();
try {
shutdownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public void signalWhenReady() {

if (!partitionsAssigned) {
logger.debug("checking partition assignment");
Set<TopicPartition> partitions = kafka.assignment();
if (partitions.size() > 0) {
partitionsAssigned = true;
logger.debug("partitions assigned to consumer ...");
startupLatch.countDown();
}
}
}

public void run() {

// Until consumer.poll() is caled, the consumer is just idling.
// Only after poll() is invoked, it will initiate a connection to the cluster, get assigned
// partitions and attempt to fetch messages.
// So we will call poll() and then wait for some time until the partition is assigned.
// Then raise a signal ( countdown latch ) so that the constructor can return

try {
while (true) {
// Read for records and handle it
ConsumerRecords<Object, Object> records = kafka.poll(Duration.ofMillis(500));
signalWhenReady();
if (records != null) {
for (ConsumerRecord record : records) {
logger.debug("*** Consumer got data ****");

Object key = record.key();
Object value = record.value();
Headers recordHeaders = record.headers();

logger.debug("Partition : " + record.partition() + " Offset : " + record.offset());
if (key == null) logger.debug("Key : null");
else logger.debug("Key : " + key + " Type: " + key.getClass().getName());
logger.debug("Value : " + value + " Type: " + value.getClass().getName());

// We want to return a String that can be interpreted by Karate as a JSON
String str = convertToJsonString(key, value, recordHeaders);

if (!isNull(keyFilter) && !filterByKey(key)) {
continue;
}
if (!isNull(valueFilter) && !filterByValue(value)) {
continue;
}
logger.debug("Consuming record. key: " + key + ", value: " + value);
outputList.put(str);
}
}
}
} catch (WakeupException e) {
logger.debug("Got WakeupException");
// nothing to do here
} catch (Exception e) {
e.printStackTrace();
} finally {
logger.debug("consumer is shutting down ...");
kafka.close();
logger.debug("consumer is now shut down.");
shutdownLatch.countDown();
}
}

private String convertToJsonString(Object key, Object value, Headers recordHeaders) {
ObjectMapper objectMapper = new ObjectMapper();
Map<String,String> map = new HashMap<>();
for(Header h : recordHeaders) {
String headerKey = h.key();
String headerValue = new String(h.value());
map.put(headerKey,headerValue);
}

if( map.size() == 0 ) {
// There were no headers
return "{key: " + key + ", value: " + convertValue(value) + "}";
}
else {
// headers are present ...
String headers;
try {
headers = objectMapper.writeValueAsString(map);
} catch (JsonProcessingException e) {
headers = "error";
logger.error("Unable to parse header");
}
return "{key: " + key + ", value: " + convertValue(value) + ", headers: " + headers + "}";
}
}

abstract Object convertValue(Object value);

/**
* @param value The kafka record value
* @return
*/
private boolean filterByValue(Object value) {
try {
return !isNull(value)
&& !JsonPath.parse(value.toString()).read(valueFilter, List.class).isEmpty();
} catch (JsonPathException e) {
logger.error("Exception while trying to filter value", e);
}
return false;
}

/**
* Checks whether the given string matches the keyFilter regular expression
*
* @param key String to be checked for pattern matching
* @return true if the key matches the keyFilter pattern. False otherwise
*/
private boolean filterByKey(Object key) {
return !isNull(key) && keyFilter.matcher(key.toString()).find();
}

/**
* @return The next available kafka record in the Queue (head of the queue). If no record is
* available, then the call is blocked.
* @throws InterruptedException - if interrupted while waiting
*/
public synchronized String take() throws InterruptedException {
logger.debug("take() called");
return outputList.take(); // wait if necessary for data to become available
}

/**
* @param n The number of records to read
* @return The next available kafka record in the Queue (head of the queue). If no record is
* available, then the call is blocked.
* @throws InterruptedException - if interrupted while waiting
*/
public synchronized String take(int n) throws InterruptedException {
logger.debug("take(n) called");
List<String> list = new ArrayList<>();
for(int i=0; i<n; i++){
list.add(outputList.take()); // wait if necessary for data to become available
}
// We want to return a String that can be interpreted by Karate as a JSON
String str = list.toString();
return str;
}

/**
* @param timeout maximum time in milliseconds to wait for a record
* @return The next available kafka record in the Queue (head of the queue). If no record is
* available for timeout milliseconds, then return null
* @throws InterruptedException - if interrupted while waiting
*/
public synchronized String poll(long timeout) throws InterruptedException {
logger.debug("poll() called");
return outputList.poll(timeout, TimeUnit.MILLISECONDS);
}

}
57 changes: 57 additions & 0 deletions src/main/java/karate/kafka/KarateKafkaAvroConsumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package karate.kafka;

import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.JsonEncoder;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;

public class KarateKafkaAvroConsumer extends AbstractKarateKafkaConsumer {

public KarateKafkaAvroConsumer(String kafkaTopic, Map<String, String> consumerProperties) {
super(kafkaTopic, consumerProperties, null, null);
}

public KarateKafkaAvroConsumer(String kafkaTopic) {
super(kafkaTopic, null, null);
}

public KarateKafkaAvroConsumer(String kafkaTopic, String keyFilterExpression, String valueFilterExpression) {
super(kafkaTopic, keyFilterExpression, valueFilterExpression);
}

public KarateKafkaAvroConsumer(
String kafkaTopic,
Map<String, String> consumerProperties,
String keyFilterExpression,
String valueFilterExpression) {
super(kafkaTopic, consumerProperties, keyFilterExpression, valueFilterExpression);
}

public static Properties getDefaultProperties() {
return AbstractKarateKafkaConsumer.getDefaultProperties();
}


@Override
Object convertValue(Object value) {
GenericRecord record = (GenericRecord) value;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(record.getSchema());
JsonEncoder encoder = EncoderFactory.get().jsonEncoder(record.getSchema(), baos);
writer.write(record, encoder);
encoder.flush();
return baos.toString();
} catch (IOException e) {
logger.error("Unable serialize the AVRO record", e);
}
return null;
}

}
Loading