diff --git a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java index bedf97f..11ae25d 100644 --- a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java @@ -243,7 +243,7 @@ public void open(Configuration parameters) throws Exception { if (restored) { initOffsetTableFromRestoredOffsets(messageQueues); } else { - initOffsets(messageQueues); + initOffsets(messageQueues, false); } } @@ -370,10 +370,12 @@ private void awaitTermination() throws InterruptedException { * @param messageQueues * @throws MQClientException */ - private void initOffsets(List messageQueues) throws MQClientException { + private void initOffsets(List messageQueues, boolean added) throws MQClientException { + StartupMode startupMode = startMode; + if(added) startupMode = StartupMode.EARLIEST; for (MessageQueue mq : messageQueues) { long offset; - switch (startMode) { + switch (startupMode) { case LATEST: consumer.seekToEnd(mq); offset = consumer.committed(mq); @@ -554,7 +556,7 @@ public void initOffsetTableFromRestoredOffsets(List messageQueues) } if (extMessageQueue.size() != 0) { log.info("no restoredOffsets for {}, so init offset for these queues", extMessageQueue); - initOffsets(extMessageQueue); + initOffsets(extMessageQueue, true); } log.info("init offset table [{}] from restoredOffsets successful.", offsetTable); }