From fb57c23cfbb663c57d326038eea6a6243f1911cd Mon Sep 17 00:00:00 2001 From: Bohan Zhang Date: Mon, 28 Nov 2022 17:14:49 +0800 Subject: [PATCH] feat: allow stop offset for kafka connector (#6572) * add end offset * yield before exit Signed-off-by: tabVersion Signed-off-by: tabVersion Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- .../src/source/kafka/source/reader.rs | 27 ++++++++++++++----- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/src/connector/src/source/kafka/source/reader.rs b/src/connector/src/source/kafka/source/reader.rs index f695b12e89b99..e5cd6c3d6486b 100644 --- a/src/connector/src/source/kafka/source/reader.rs +++ b/src/connector/src/source/kafka/source/reader.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; use std::time::{SystemTime, UNIX_EPOCH}; use anyhow::{Context, Result}; @@ -21,16 +20,15 @@ use futures::StreamExt; use futures_async_stream::try_stream; use rdkafka::config::RDKafkaLogLevel; use rdkafka::consumer::{Consumer, DefaultConsumerContext, StreamConsumer}; -use rdkafka::{ClientConfig, Offset, TopicPartitionList}; +use rdkafka::{ClientConfig, Message, Offset, TopicPartitionList}; use crate::source::base::{SourceMessage, SplitReader, MAX_CHUNK_SIZE}; -use crate::source::kafka::split::KafkaSplit; use crate::source::kafka::KafkaProperties; use crate::source::{BoxSourceStream, Column, ConnectorState, SplitImpl}; pub struct KafkaSplitReader { consumer: StreamConsumer, - assigned_splits: HashMap>, + stop_offset: Option, } #[async_trait] @@ -73,7 +71,9 @@ impl SplitReader for KafkaSplitReader { .await .context("failed to create kafka consumer")?; + let mut stop_offset = None; if let Some(splits) = state { + assert_eq!(splits.len(), 1); let mut tpl = TopicPartitionList::with_capacity(splits.len()); for split in &splits { @@ -87,6 +87,7 @@ impl SplitReader for KafkaSplitReader { } else { tpl.add_partition(k.topic.as_str(), k.partition); } + stop_offset = k.stop_offset; } } @@ -95,7 +96,7 @@ impl SplitReader for KafkaSplitReader { Ok(Self { consumer, - assigned_splits: HashMap::new(), + stop_offset, }) } @@ -108,10 +109,22 @@ impl KafkaSplitReader { #[try_stream(boxed, ok = Vec, error = anyhow::Error)] pub async fn into_stream(self) { #[for_await] - for msgs in self.consumer.stream().ready_chunks(MAX_CHUNK_SIZE) { + 'for_outer_loop: for msgs in self.consumer.stream().ready_chunks(MAX_CHUNK_SIZE) { let mut res = Vec::with_capacity(msgs.len()); for msg in msgs { - res.push(SourceMessage::from(msg?)); + let msg = msg?; + if let Some(stop_offset) = self.stop_offset { + if msg.offset() >= stop_offset { + tracing::debug!( + "stop offset reached, stop reading, offset: {}, stop offset: {}", + msg.offset(), + stop_offset + ); + yield res; + break 'for_outer_loop; + } + } + res.push(SourceMessage::from(msg)); } yield res; }