diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java index d22f8b0b5fbc..772357aa6baf 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarUtils.java @@ -18,77 +18,59 @@ */ package org.apache.pinot.plugin.stream.pulsar; -import java.io.ByteArrayOutputStream; import java.nio.ByteBuffer; import org.apache.pinot.spi.stream.OffsetCriteria; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.SubscriptionInitialPosition; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class PulsarUtils { - - private static final Logger LOGGER = LoggerFactory.getLogger(PulsarUtils.class); - - private static final ByteBuffer LENGTH_BUF = ByteBuffer.allocate(4); - private PulsarUtils() { } - public static SubscriptionInitialPosition offsetCriteriaToSubscription(OffsetCriteria offsetCriteria) - throws IllegalArgumentException { + public static SubscriptionInitialPosition offsetCriteriaToSubscription(OffsetCriteria offsetCriteria) { if (offsetCriteria.isLargest()) { return SubscriptionInitialPosition.Latest; } if (offsetCriteria.isSmallest()) { return SubscriptionInitialPosition.Earliest; } - - throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria); + throw new IllegalArgumentException("Unsupported offset criteria: " + offsetCriteria); } - public static MessageId offsetCriteriaToMessageId(OffsetCriteria offsetCriteria) - throws IllegalArgumentException { + public static MessageId offsetCriteriaToMessageId(OffsetCriteria offsetCriteria) { if (offsetCriteria.isLargest()) { return MessageId.latest; } if (offsetCriteria.isSmallest()) { return MessageId.earliest; } - - throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria); + throw new IllegalArgumentException("Unsupported offset criteria: " + offsetCriteria); } /** * Stitch key and value bytes together using a simple format: * 4 bytes for key length + key bytes + 4 bytes for value length + value bytes */ - protected static byte[] stitchKeyValue(byte[] keyBytes, byte[] valueBytes) { - int keyLen = keyBytes.length; - int valueLen = valueBytes.length; - int totalByteArrayLength = 8 + keyLen + valueLen; - try (ByteArrayOutputStream bos = new ByteArrayOutputStream(totalByteArrayLength)) { - LENGTH_BUF.clear(); - bos.write(LENGTH_BUF.putInt(keyLen).array()); - bos.write(keyBytes); - LENGTH_BUF.clear(); - bos.write(LENGTH_BUF.putInt(valueLen).array()); - bos.write(valueBytes); - return bos.toByteArray(); - } catch (Exception e) { - LOGGER.error("Unable to stitch key and value bytes together", e); - } - return null; + public static byte[] stitchKeyValue(byte[] keyBytes, byte[] valueBytes) { + byte[] stitchedBytes = new byte[8 + keyBytes.length + valueBytes.length]; + ByteBuffer buffer = ByteBuffer.wrap(stitchedBytes); + buffer.putInt(keyBytes.length); + buffer.put(keyBytes); + buffer.putInt(valueBytes.length); + buffer.put(valueBytes); + return stitchedBytes; } - protected static PulsarStreamMessage buildPulsarStreamMessage(Message message, boolean enableKeyValueStitch, + public static PulsarStreamMessage buildPulsarStreamMessage(Message message, boolean enableKeyValueStitch, PulsarMetadataExtractor pulsarMetadataExtractor) { byte[] key = message.getKeyBytes(); - byte[] data = enableKeyValueStitch ? stitchKeyValue(key, message.getData()) : message.getData(); - int dataLength = (data != null) ? data.length : 0; - return new PulsarStreamMessage(key, data, message.getMessageId(), - (PulsarStreamMessageMetadata) pulsarMetadataExtractor.extract(message), dataLength); + byte[] value = message.getData(); + if (enableKeyValueStitch) { + value = stitchKeyValue(key, value); + } + return new PulsarStreamMessage(key, value, message.getMessageId(), + (PulsarStreamMessageMetadata) pulsarMetadataExtractor.extract(message), value.length); } }