Skip to content

Commit

Permalink
Fix broker enable dedup cause client publish failed (#1530)
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- authored Nov 13, 2024
1 parent c2431a6 commit e78bcfa
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public final class MessagePublishContext implements PublishContext {
private Topic topic;
private long startTimeNs;
private CompletableFuture<Position> positionFuture;
private long sequenceId;

/**
* Executed from managed ledger thread when the message is persisted.
Expand All @@ -58,11 +59,12 @@ public void completed(Exception exception, long ledgerId, long entryId) {

// recycler
public static MessagePublishContext get(CompletableFuture<Position> positionFuture, String producerName,
Topic topic, long startTimeNs) {
Topic topic, long sequenceId, long startTimeNs) {
MessagePublishContext callback = RECYCLER.get();
callback.positionFuture = positionFuture;
callback.producerName = producerName;
callback.topic = topic;
callback.sequenceId = sequenceId;
callback.startTimeNs = startTimeNs;
return callback;
}
Expand All @@ -77,6 +79,12 @@ public String getProducerName() {
return producerName;
}

@Override
public long getSequenceId() {
return this.sequenceId;
}


private static final Recycler<MessagePublishContext> RECYCLER = new Recycler<MessagePublishContext>() {
protected MessagePublishContext newObject(Handle<MessagePublishContext> handle) {
return new MessagePublishContext(handle);
Expand All @@ -87,19 +95,20 @@ public void recycle() {
positionFuture = null;
topic = null;
startTimeNs = -1;
sequenceId = -1;
recyclerHandle.recycle(this);
}

/**
* publish mqtt message to pulsar topic, no batch.
*/
public static CompletableFuture<Position> publishMessages(String producerName, Message<byte[]> message,
Topic topic) {
long sequenceId, Topic topic) {
CompletableFuture<Position> future = new CompletableFuture<>();

ByteBuf headerAndPayload = messageToByteBuf(message);
topic.publishMessage(headerAndPayload,
MessagePublishContext.get(future, producerName, topic, System.nanoTime()));
MessagePublishContext.get(future, producerName, topic, sequenceId, System.nanoTime()));
headerAndPayload.release();
return future;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@
import io.streamnative.pulsar.handlers.mqtt.common.utils.PulsarTopicUtils;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.util.FutureUtil;

Expand All @@ -45,6 +47,8 @@ public abstract class AbstractQosPublishHandler implements QosPublishHandler {
protected final PulsarService pulsarService;
protected final RetainedMessageHandler retainedMessageHandler;
protected final MQTTServerConfiguration configuration;
private final ConcurrentHashMap<String, Long> sequenceIdMap = new ConcurrentHashMap<>();


protected AbstractQosPublishHandler(MQTTService mqttService) {
this.pulsarService = mqttService.getPulsarService();
Expand Down Expand Up @@ -104,9 +108,23 @@ protected CompletableFuture<Position> writeToPulsarTopic(Connection connection,
mqttTopicName = msg.variableHeader().topicName();
}
return getTopicReference(mqttTopicName).thenCompose(topicOp -> topicOp.map(topic -> {
long lastPublishedSequenceId = -1;
if (topic instanceof PersistentTopic) {
final long lastPublishedId = ((PersistentTopic) topic).getLastPublishedSequenceId(producerName);
lastPublishedSequenceId = sequenceIdMap.compute(producerName, (k, v) -> {
long id;
if (v == null) {
id = lastPublishedId + 1;
} else {
id = Math.max(v, lastPublishedId) + 1;
}
return id;
});
}
MessageImpl<byte[]> message = toPulsarMsg(configuration, topic, msg.variableHeader().properties(),
msg.payload().nioBuffer());
CompletableFuture<Position> ret = MessagePublishContext.publishMessages(producerName, message, topic);
CompletableFuture<Position> ret = MessagePublishContext.publishMessages(producerName, message,
lastPublishedSequenceId, topic);
message.recycle();
return ret.thenApply(position -> {
if (checkSubscription && topic.getSubscriptions().isEmpty()) {
Expand Down
6 changes: 6 additions & 0 deletions tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.streamnative.pulsar.handlers</groupId>
<artifactId>pulsar-protocol-handler-mqtt-common</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.streamnative</groupId>
<artifactId>testmocks</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,22 @@ public void testSendAndConsume() throws Exception {
received.ack();
connection.disconnect();
}

@Test
public void testDedup() throws Exception {
MQTT mqtt = createMQTTClient();
String topicName = "testDedup";
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
Topic[] topics = { new Topic(topicName, QoS.AT_MOST_ONCE) };
connection.subscribe(topics);
String message = "Hello MQTT";
for (int i = 1; i <= 10; i++) {
connection.publish(topicName, (message + i).getBytes(), QoS.AT_MOST_ONCE, false);
Message received = connection.receive();
Assert.assertEquals(received.getTopic(), topicName);
Assert.assertEquals(new String(received.getPayload()), message + i);
}
connection.disconnect();
}
}

0 comments on commit e78bcfa

Please sign in to comment.