Skip to content

Commit

Permalink
Rebased to latest
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
Krishna Kondaka committed Jan 18, 2024
1 parent 41eab73 commit 3f6c09b
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.AbstractBuffer;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.buffer.SizeOverflowException;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.opensearch.dataprepper.plugins.kafka.admin.KafkaAdminAccessor;
import org.opensearch.dataprepper.plugins.kafka.buffer.serialization.BufferSerializationFactory;
import org.opensearch.dataprepper.plugins.kafka.common.serialization.CommonSerializationFactory;
Expand Down Expand Up @@ -69,7 +72,7 @@ public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig ka
this.byteDecoder = byteDecoder;
final String metricPrefixName = kafkaBufferConfig.getCustomMetricPrefix().orElse(pluginSetting.getName());
final PluginMetrics producerMetrics = PluginMetrics.fromNames(metricPrefixName + WRITE, pluginSetting.getPipelineName());
producer = kafkaCustomProducerFactory.createProducer(kafkaBufferConfig, pluginFactory, pluginSetting, null, null, producerMetrics, false);
producer = kafkaCustomProducerFactory.createProducer(kafkaBufferConfig, pluginFactory, pluginSetting, null, null, producerMetrics, false, false);
final KafkaCustomConsumerFactory kafkaCustomConsumerFactory = new KafkaCustomConsumerFactory(serializationFactory, awsCredentialsSupplier);
innerBuffer = new BlockingBuffer<>(INNER_BUFFER_CAPACITY, INNER_BUFFER_BATCH_SIZE, pluginSetting.getPipelineName());
this.shutdownInProgress = new AtomicBoolean(false);
Expand All @@ -89,7 +92,15 @@ public void writeBytes(final byte[] bytes, final String key, int timeoutInMillis
producer.produceRawData(bytes, key);
} catch (final Exception e) {
LOG.error(e.getMessage(), e);
throw new RuntimeException(e);
if (e.getCause() == null || e instanceof TimeoutException) {
throw e;
}
if (e.getCause() instanceof RecordTooLargeException ||
e.getCause() instanceof RecordBatchTooLargeException) {
throw new SizeOverflowException(e.getMessage());
} else {
throw new RuntimeException(e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,18 @@ KafkaTopicProducerMetrics getTopicMetrics() {
return topicMetrics;
}

public void produceRawData(final byte[] bytes, final String key) {
public void produceRawData(final byte[] bytes, final String key) throws Exception{
try {
send(topicName, key, bytes).get();
topicMetrics.update(producer);
} catch (Exception e) {
topicMetrics.getNumberOfRawDataSendErrors().increment();
LOG.error("Error occurred while publishing raw data", e);
throw e;
}
}

public void produceRecords(final Record<Event> record) {
public void produceRecords(final Record<Event> record) throws Exception {
bufferedEventHandles.add(record.getData().getEventHandle());
Event event = getEvent(record);
final String key = event.formatString(kafkaProducerConfig.getPartitionKey(), expressionEvaluator);
Expand All @@ -126,21 +127,21 @@ public void produceRecords(final Record<Event> record) {
} catch (Exception e) {
LOG.error("Error occurred while publishing record {}", e.getMessage());
topicMetrics.getNumberOfRecordSendErrors().increment();
releaseEventHandles(false);
if (dlqSink != null) {
JsonNode dataNode = record.getData().getJsonNode();
dlqSink.perform(dataNode, e);
} else {
releaseEventHandles(false);
throw e;
}
}

}

private void publishJsonMessageAsBytes(Record<Event> record, String key) {
private void publishJsonMessageAsBytes(Record<Event> record, String key) throws Exception {
JsonNode dataNode = record.getData().getJsonNode();

try {
byte[] bytes = objectMapper.writeValueAsBytes(dataNode);
send(topicName, key, bytes);
}
catch (Throwable ex) {
dlqSink.perform(dataNode, ex);
}
byte[] bytes = objectMapper.writeValueAsBytes(dataNode);
send(topicName, key, bytes);
}

private Event getEvent(final Record<Event> record) {
Expand All @@ -154,11 +155,11 @@ private Event getEvent(final Record<Event> record) {
}


private void publishPlaintextMessage(final Record<Event> record, final String key) {
private void publishPlaintextMessage(final Record<Event> record, final String key) throws Exception {
send(topicName, key, record.getData().toJsonString());
}

private void publishAvroMessage(final Record<Event> record, final String key) {
private void publishAvroMessage(final Record<Event> record, final String key) throws Exception {
final Schema avroSchema = schemaService.getSchema(topicName);
if (avroSchema == null) {
throw new RuntimeException("Schema definition is mandatory in case of type avro");
Expand All @@ -167,22 +168,17 @@ private void publishAvroMessage(final Record<Event> record, final String key) {
send(topicName, key, genericRecord);
}

private Future send(final String topicName, String key, final Object record) {
private Future send(final String topicName, String key, final Object record) throws Exception {
if (Objects.isNull(key)) {
return producer.send(new ProducerRecord(topicName, record), callBack(record));
}

return producer.send(new ProducerRecord(topicName, key, record), callBack(record));
}

private void publishJsonMessage(final Record<Event> record, final String key) throws IOException, ProcessingException {
private void publishJsonMessage(final Record<Event> record, final String key) throws IOException, ProcessingException, Exception {
JsonNode dataNode = record.getData().getJsonNode();
try {
send(topicName, key, dataNode);
}
catch (Throwable ex) {
dlqSink.perform(dataNode, ex);
}
send(topicName, key, dataNode);
}

public boolean validateSchema(final String jsonData, final String schemaJson) throws IOException, ProcessingException {
Expand All @@ -200,8 +196,6 @@ private Callback callBack(final Object dataForDlq) {
if (null != exception) {
LOG.error("Error occurred while publishing {}", exception.getMessage());
topicMetrics.getNumberOfRecordProcessingErrors().increment();
releaseEventHandles(false);
dlqSink.perform(dataForDlq, exception);
} else {
releaseEventHandles(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public KafkaCustomProducerFactory(final SerializationFactory serializationFactor

public KafkaCustomProducer createProducer(final KafkaProducerConfig kafkaProducerConfig, final PluginFactory pluginFactory, final PluginSetting pluginSetting,
final ExpressionEvaluator expressionEvaluator, final SinkContext sinkContext, final PluginMetrics pluginMetrics,
final boolean topicNameInMetrics) {
final boolean topicNameInMetrics, final boolean configureDlq) {
AwsContext awsContext = new AwsContext(kafkaProducerConfig, awsCredentialsSupplier);
KeyFactory keyFactory = new KeyFactory(awsContext);
// If either or both of Producer's max_request_size or
Expand Down Expand Up @@ -80,7 +80,7 @@ public KafkaCustomProducer createProducer(final KafkaProducerConfig kafkaProduce
final String topicName = ObjectUtils.isEmpty(kafkaProducerConfig.getTopic()) ? null : kafkaProducerConfig.getTopic().getName();
final SchemaService schemaService = new SchemaService.SchemaServiceBuilder().getFetchSchemaService(topicName, kafkaProducerConfig.getSchemaConfig()).build();
return new KafkaCustomProducer(producer,
kafkaProducerConfig, dlqSink,
kafkaProducerConfig, configureDlq ? dlqSink : null,
expressionEvaluator, Objects.nonNull(sinkContext) ? sinkContext.getTagsTargetKey() : null, topicMetrics, schemaService);
}
private void prepareTopicAndSchema(final KafkaProducerConfig kafkaProducerConfig, final Integer maxRequestSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* * A Multithreaded helper class which helps to produce the records to multiple topics in an
* asynchronous way.
*/

public class ProducerWorker implements Runnable {

private static final Logger LOG = LoggerFactory.getLogger(ProducerWorker.class);
private final Record<Event> record;
private final KafkaCustomProducer producer;

Expand All @@ -27,7 +31,11 @@ public ProducerWorker(final KafkaCustomProducer producer,

@Override
public void run() {
producer.produceRecords(record);
try {
producer.produceRecords(record);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ public DLQSink(final PluginFactory pluginFactory, final KafkaProducerConfig kafk

public void perform(final Object failedData, final Throwable e) {
final DlqWriter dlqWriter = getDlqWriter();
if (dlqWriter == null) {
throw new RuntimeException("DLQ not configured");
}
final DlqObject dlqObject = DlqObject.builder()
.withPluginId(randomUUID().toString())
.withPluginName(pluginSetting.getName())
Expand All @@ -54,6 +57,9 @@ public void perform(final Object failedData, final Throwable e) {
}

private DlqWriter getDlqWriter() {
if (dlqProvider == null) {
return null;
}
final Optional<DlqWriter> potentialDlq = dlqProvider.getDlqWriter(new StringJoiner(MetricNames.DELIMITER)
.add(pluginSetting.getPipelineName())
.add(pluginSetting.getName()).toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,7 @@ private void checkTopicCreationCriteriaAndCreateTopic() {
}

private KafkaCustomProducer createProducer() {
// TODO: Add the DLQSink here. new DLQSink(pluginFactory, kafkaSinkConfig, pluginSetting)
return kafkaCustomProducerFactory.createProducer(kafkaSinkConfig, pluginFactory, pluginSetting, expressionEvaluator, sinkContext, pluginMetrics, true);
return kafkaCustomProducerFactory.createProducer(kafkaSinkConfig, pluginFactory, pluginSetting, expressionEvaluator, sinkContext, pluginMetrics, true, true);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public KafkaBuffer createObjectUnderTest(final List<KafkaCustomConsumer> consume
final MockedConstruction<KafkaCustomProducerFactory> producerFactoryMock =
mockConstruction(KafkaCustomProducerFactory.class, (mock, context) -> {
producerFactory = mock;
when(producerFactory.createProducer(any() ,any(), any(), isNull(), isNull(), any(), anyBoolean())).thenReturn(producer);
when(producerFactory.createProducer(any() ,any(), any(), isNull(), isNull(), any(), anyBoolean(), anyBoolean())).thenReturn(producer);
});
final MockedConstruction<KafkaCustomConsumerFactory> consumerFactoryMock =
mockConstruction(KafkaCustomConsumerFactory.class, (mock, context) -> {
Expand Down Expand Up @@ -197,7 +197,7 @@ void setUp() {
}

@Test
void test_kafkaBuffer_basicFunctionality() throws TimeoutException {
void test_kafkaBuffer_basicFunctionality() throws TimeoutException, Exception {
kafkaBuffer = createObjectUnderTest();
assertTrue(Objects.nonNull(kafkaBuffer));

Expand All @@ -209,7 +209,7 @@ void test_kafkaBuffer_basicFunctionality() throws TimeoutException {
}

@Test
void test_kafkaBuffer_producerThrows() throws TimeoutException {
void test_kafkaBuffer_producerThrows() throws TimeoutException, Exception {

kafkaBuffer = createObjectUnderTest();
Record<Event> record = new Record<Event>(JacksonEvent.fromMessage(UUID.randomUUID().toString()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,10 @@ public void produceRawDataTest() {
sinkProducer = spy(producer);
final String key = UUID.randomUUID().toString();
final byte[] byteData = record.getData().toJsonString().getBytes();
sinkProducer.produceRawData(byteData, key);
verify(sinkProducer).produceRawData(record.getData().toJsonString().getBytes(), key);
try {
sinkProducer.produceRawData(byteData, key);
verify(sinkProducer).produceRawData(record.getData().toJsonString().getBytes(), key);
} catch (Exception e){}
final ArgumentCaptor<ProducerRecord> recordArgumentCaptor = ArgumentCaptor.forClass(ProducerRecord.class);
verify(kafkaProducer).send(recordArgumentCaptor.capture(), any(Callback.class));
assertEquals(recordArgumentCaptor.getValue().topic(), kafkaSinkConfig.getTopic().getName());
Expand All @@ -124,8 +126,10 @@ public void produceRawData_sendError() {
sinkProducer = spy(producer);
final String key = UUID.randomUUID().toString();
final byte[] byteData = record.getData().toJsonString().getBytes();
sinkProducer.produceRawData(byteData, key);
verify(sinkProducer).produceRawData(record.getData().toJsonString().getBytes(), key);
try {
sinkProducer.produceRawData(byteData, key);
verify(sinkProducer).produceRawData(record.getData().toJsonString().getBytes(), key);
} catch (Exception e){}
final ArgumentCaptor<ProducerRecord> recordArgumentCaptor = ArgumentCaptor.forClass(ProducerRecord.class);
verify(kafkaProducer).send(recordArgumentCaptor.capture(), any(Callback.class));
assertEquals(recordArgumentCaptor.getValue().topic(), kafkaSinkConfig.getTopic().getName());
Expand All @@ -135,7 +139,7 @@ public void produceRawData_sendError() {
}

@Test
public void producePlainTextRecords() {
public void producePlainTextRecords() throws Exception {
when(kafkaSinkConfig.getSerdeFormat()).thenReturn("plaintext");
KafkaProducer kafkaProducer = mock(KafkaProducer.class);
producer = new KafkaCustomProducer(kafkaProducer, kafkaSinkConfig, dlqSink, mock(ExpressionEvaluator.class),
Expand All @@ -151,7 +155,7 @@ public void producePlainTextRecords() {
}

@Test
public void producePlainTextRecords_sendError() {
public void producePlainTextRecords_sendError() throws Exception {
when(kafkaSinkConfig.getSerdeFormat()).thenReturn("plaintext");
KafkaProducer kafkaProducer = mock(KafkaProducer.class);
producer = new KafkaCustomProducer(kafkaProducer, kafkaSinkConfig, dlqSink, mock(ExpressionEvaluator.class),
Expand All @@ -169,7 +173,7 @@ public void producePlainTextRecords_sendError() {
}

@Test
public void producePlainTextRecords_callbackException() {
public void producePlainTextRecords_callbackException() throws Exception {
when(kafkaSinkConfig.getSerdeFormat()).thenReturn("plaintext");
KafkaProducer kafkaProducer = mock(KafkaProducer.class);
producer = new KafkaCustomProducer(kafkaProducer, kafkaSinkConfig, dlqSink, mock(ExpressionEvaluator.class),
Expand All @@ -189,7 +193,7 @@ public void producePlainTextRecords_callbackException() {
}

@Test
public void produceJsonRecordsTest() {
public void produceJsonRecordsTest() throws Exception {
when(kafkaSinkConfig.getSerdeFormat()).thenReturn("JSON");
KafkaProducer kafkaProducer = mock(KafkaProducer.class);
producer = new KafkaCustomProducer(kafkaProducer, kafkaSinkConfig, dlqSink, mock(ExpressionEvaluator.class),
Expand Down Expand Up @@ -217,7 +221,7 @@ public void produceJsonRecordsTest() {
}

@Test
public void produceAvroRecordsTest() {
public void produceAvroRecordsTest() throws Exception {
when(kafkaSinkConfig.getSerdeFormat()).thenReturn("AVRO");
KafkaProducer kafkaProducer = mock(KafkaProducer.class);
producer = new KafkaCustomProducer(kafkaProducer, kafkaSinkConfig, dlqSink, mock(ExpressionEvaluator.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public void after() {
private KafkaSink createObjectUnderTest() {
final KafkaSink objectUnderTest;
try(final MockedConstruction<KafkaCustomProducerFactory> ignored = mockConstruction(KafkaCustomProducerFactory.class, (mock, context) -> {
when(mock.createProducer(any(), any(), any(), any(), any(), any(), anyBoolean())).thenReturn(kafkaCustomProducer);
when(mock.createProducer(any(), any(), any(), any(), any(), any(), anyBoolean(), anyBoolean())).thenReturn(kafkaCustomProducer);
})) {
objectUnderTest = new KafkaSink(pluginSetting, kafkaSinkConfig, pluginFactoryMock, pluginMetrics, mock(ExpressionEvaluator.class), sinkContext, awsCredentialsSupplier);
}
Expand Down

0 comments on commit 3f6c09b

Please sign in to comment.