Skip to content

Commit

Permalink
Fix PulsarUtils to not share buffer (apache#12671)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang authored Mar 19, 2024
1 parent d12ce88 commit 8c92341
Showing 1 changed file with 19 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,77 +18,59 @@
*/
package org.apache.pinot.plugin.stream.pulsar;

import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class PulsarUtils {

private static final Logger LOGGER = LoggerFactory.getLogger(PulsarUtils.class);

private static final ByteBuffer LENGTH_BUF = ByteBuffer.allocate(4);

private PulsarUtils() {
}

public static SubscriptionInitialPosition offsetCriteriaToSubscription(OffsetCriteria offsetCriteria)
throws IllegalArgumentException {
public static SubscriptionInitialPosition offsetCriteriaToSubscription(OffsetCriteria offsetCriteria) {
if (offsetCriteria.isLargest()) {
return SubscriptionInitialPosition.Latest;
}
if (offsetCriteria.isSmallest()) {
return SubscriptionInitialPosition.Earliest;
}

throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria);
throw new IllegalArgumentException("Unsupported offset criteria: " + offsetCriteria);
}

public static MessageId offsetCriteriaToMessageId(OffsetCriteria offsetCriteria)
throws IllegalArgumentException {
public static MessageId offsetCriteriaToMessageId(OffsetCriteria offsetCriteria) {
if (offsetCriteria.isLargest()) {
return MessageId.latest;
}
if (offsetCriteria.isSmallest()) {
return MessageId.earliest;
}

throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria);
throw new IllegalArgumentException("Unsupported offset criteria: " + offsetCriteria);
}

/**
* Stitch key and value bytes together using a simple format:
* 4 bytes for key length + key bytes + 4 bytes for value length + value bytes
*/
protected static byte[] stitchKeyValue(byte[] keyBytes, byte[] valueBytes) {
int keyLen = keyBytes.length;
int valueLen = valueBytes.length;
int totalByteArrayLength = 8 + keyLen + valueLen;
try (ByteArrayOutputStream bos = new ByteArrayOutputStream(totalByteArrayLength)) {
LENGTH_BUF.clear();
bos.write(LENGTH_BUF.putInt(keyLen).array());
bos.write(keyBytes);
LENGTH_BUF.clear();
bos.write(LENGTH_BUF.putInt(valueLen).array());
bos.write(valueBytes);
return bos.toByteArray();
} catch (Exception e) {
LOGGER.error("Unable to stitch key and value bytes together", e);
}
return null;
public static byte[] stitchKeyValue(byte[] keyBytes, byte[] valueBytes) {
byte[] stitchedBytes = new byte[8 + keyBytes.length + valueBytes.length];
ByteBuffer buffer = ByteBuffer.wrap(stitchedBytes);
buffer.putInt(keyBytes.length);
buffer.put(keyBytes);
buffer.putInt(valueBytes.length);
buffer.put(valueBytes);
return stitchedBytes;
}

protected static PulsarStreamMessage buildPulsarStreamMessage(Message<byte[]> message, boolean enableKeyValueStitch,
public static PulsarStreamMessage buildPulsarStreamMessage(Message<byte[]> message, boolean enableKeyValueStitch,
PulsarMetadataExtractor pulsarMetadataExtractor) {
byte[] key = message.getKeyBytes();
byte[] data = enableKeyValueStitch ? stitchKeyValue(key, message.getData()) : message.getData();
int dataLength = (data != null) ? data.length : 0;
return new PulsarStreamMessage(key, data, message.getMessageId(),
(PulsarStreamMessageMetadata) pulsarMetadataExtractor.extract(message), dataLength);
byte[] value = message.getData();
if (enableKeyValueStitch) {
value = stitchKeyValue(key, value);
}
return new PulsarStreamMessage(key, value, message.getMessageId(),
(PulsarStreamMessageMetadata) pulsarMetadataExtractor.extract(message), value.length);
}
}

0 comments on commit 8c92341

Please sign in to comment.