Skip to content

Commit

Permalink
feat: allow stop offset for kafka connector (risingwavelabs#6572)
Browse files Browse the repository at this point in the history
* add end offset

* yield before exit

Signed-off-by: tabVersion <[email protected]>

Signed-off-by: tabVersion <[email protected]>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
tabVersion and mergify[bot] authored Nov 28, 2022
1 parent 560b456 commit fb57c23
Showing 1 changed file with 20 additions and 7 deletions.
27 changes: 20 additions & 7 deletions src/connector/src/source/kafka/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::time::{SystemTime, UNIX_EPOCH};

use anyhow::{Context, Result};
Expand All @@ -21,16 +20,15 @@ use futures::StreamExt;
use futures_async_stream::try_stream;
use rdkafka::config::RDKafkaLogLevel;
use rdkafka::consumer::{Consumer, DefaultConsumerContext, StreamConsumer};
use rdkafka::{ClientConfig, Offset, TopicPartitionList};
use rdkafka::{ClientConfig, Message, Offset, TopicPartitionList};

use crate::source::base::{SourceMessage, SplitReader, MAX_CHUNK_SIZE};
use crate::source::kafka::split::KafkaSplit;
use crate::source::kafka::KafkaProperties;
use crate::source::{BoxSourceStream, Column, ConnectorState, SplitImpl};

pub struct KafkaSplitReader {
consumer: StreamConsumer<DefaultConsumerContext>,
assigned_splits: HashMap<String, Vec<KafkaSplit>>,
stop_offset: Option<i64>,
}

#[async_trait]
Expand Down Expand Up @@ -73,7 +71,9 @@ impl SplitReader for KafkaSplitReader {
.await
.context("failed to create kafka consumer")?;

let mut stop_offset = None;
if let Some(splits) = state {
assert_eq!(splits.len(), 1);
let mut tpl = TopicPartitionList::with_capacity(splits.len());

for split in &splits {
Expand All @@ -87,6 +87,7 @@ impl SplitReader for KafkaSplitReader {
} else {
tpl.add_partition(k.topic.as_str(), k.partition);
}
stop_offset = k.stop_offset;
}
}

Expand All @@ -95,7 +96,7 @@ impl SplitReader for KafkaSplitReader {

Ok(Self {
consumer,
assigned_splits: HashMap::new(),
stop_offset,
})
}

Expand All @@ -108,10 +109,22 @@ impl KafkaSplitReader {
#[try_stream(boxed, ok = Vec<SourceMessage>, error = anyhow::Error)]
pub async fn into_stream(self) {
#[for_await]
for msgs in self.consumer.stream().ready_chunks(MAX_CHUNK_SIZE) {
'for_outer_loop: for msgs in self.consumer.stream().ready_chunks(MAX_CHUNK_SIZE) {
let mut res = Vec::with_capacity(msgs.len());
for msg in msgs {
res.push(SourceMessage::from(msg?));
let msg = msg?;
if let Some(stop_offset) = self.stop_offset {
if msg.offset() >= stop_offset {
tracing::debug!(
"stop offset reached, stop reading, offset: {}, stop offset: {}",
msg.offset(),
stop_offset
);
yield res;
break 'for_outer_loop;
}
}
res.push(SourceMessage::from(msg));
}
yield res;
}
Expand Down

0 comments on commit fb57c23

Please sign in to comment.