Skip to content

Commit

Permalink
feat(stream): add kafka backill executor
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Jan 19, 2024
1 parent 26bebdf commit b8516e5
Show file tree
Hide file tree
Showing 13 changed files with 764 additions and 22 deletions.
5 changes: 5 additions & 0 deletions src/common/src/array/data_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ impl DataChunk {
self.visibility.next_set_bit(row_idx)
}

/// Return the prev visible row index ~~on or~~ before `row_idx`.
pub fn prev_visible_row_idx(&self, row_idx: usize) -> Option<usize> {
self.visibility.next_set_bit(row_idx)
}

pub fn into_parts(self) -> (Vec<ArrayRef>, Bitmap) {
(self.columns.to_vec(), self.visibility)
}
Expand Down
18 changes: 18 additions & 0 deletions src/common/src/array/data_chunk_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,24 @@ impl<'a> Iterator for DataChunkRefIter<'a> {
}
}

impl<'a> DoubleEndedIterator for DataChunkRefIter<'a> {
fn next_back(&mut self) -> Option<Self::Item> {
if self.idx.start == self.idx.end {
return None;
}
match self.chunk.prev_visible_row_idx(self.idx.end) {
Some(idx) if idx >= self.idx.start => {
self.idx.end = idx;
Some(RowRef::new(self.chunk, idx))
}
_ => {
self.idx.end = self.idx.start;
None
}
}
}
}

impl<'a> FusedIterator for DataChunkRefIter<'a> {}

pub struct DataChunkRefIterWithHoles<'a> {
Expand Down
6 changes: 6 additions & 0 deletions src/common/src/array/stream_chunk_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ impl StreamChunk {
self.rows_in(0..self.capacity())
}

/// # Panics
/// If the chunk is empty.
pub fn last_row(&self) -> RowRef<'_> {
self.data_chunk().rows().next_back().unwrap()
}

/// Return an iterator on rows of this stream chunk in a range.
pub fn rows_in(&self, range: Range<usize>) -> impl Iterator<Item = (Op, RowRef<'_>)> {
self.data_chunk().rows_in(range).map(|row| {
Expand Down
7 changes: 7 additions & 0 deletions src/common/src/buffer/bitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,13 @@ impl Bitmap {
(bit_idx..self.len()).find(|&idx| unsafe { self.is_set_unchecked(idx) })
}

/// Return the prev set bit index ~~on or~~ before `bit_idx`.
pub fn prev_set_bit(&self, bit_idx: usize) -> Option<usize> {
(0..bit_idx)
.rev()
.find(|&idx| unsafe { self.is_set_unchecked(idx) })
}

/// Counts the number of bits set to 1.
pub fn count_ones(&self) -> usize {
self.count_ones
Expand Down
7 changes: 6 additions & 1 deletion src/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,13 +680,17 @@ macro_rules! impl_convert {

paste! {
impl ScalarImpl {
/// # Panics
/// If the scalar is not of the expected type.
pub fn [<as_ $suffix_name>](&self) -> &$scalar {
match self {
Self::$variant_name(ref scalar) => scalar,
other_scalar => panic!("cannot convert ScalarImpl::{} to concrete type {}", other_scalar.get_ident(), stringify!($variant_name))
}
}

/// # Panics
/// If the scalar is not of the expected type.
pub fn [<into_ $suffix_name>](self) -> $scalar {
match self {
Self::$variant_name(scalar) => scalar,
Expand All @@ -696,7 +700,8 @@ macro_rules! impl_convert {
}

impl <'scalar> ScalarRefImpl<'scalar> {
// Note that this conversion consume self.
/// # Panics
/// If the scalar is not of the expected type.
pub fn [<into_ $suffix_name>](self) -> $scalar_ref {
match self {
Self::$variant_name(inner) => inner,
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,7 @@ pub enum ByteStreamSourceParserImpl {
pub type ParsedStreamImpl = impl SourceWithStateStream + Unpin;

impl ByteStreamSourceParserImpl {
/// Converts this parser into a stream of [`StreamChunk`].
/// Converts this `SourceMessage` stream into a stream of [`StreamChunk`].
pub fn into_stream(self, msg_stream: BoxSourceStream) -> ParsedStreamImpl {
#[auto_enum(futures03::Stream)]
let stream = match self {
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ pub type BoxTryStream<M> = BoxStream<'static, Result<M, RwError>>;
#[derive(Clone, Debug, PartialEq)]
pub struct StreamChunkWithState {
pub chunk: StreamChunk,
/// The latest offsets for each split in this chunk.
pub split_offset_mapping: Option<HashMap<SplitId, String>>,
}

Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ async fn extract_json_table_schema(
}
}

/// Note: these columns are added in `SourceStreamChunkRowWriter::do_action`.
/// May also look for the usage of `SourceColumnType`.
pub fn debezium_cdc_source_schema() -> Vec<ColumnCatalog> {
let columns = vec![
ColumnCatalog {
Expand Down
4 changes: 4 additions & 0 deletions src/source/src/connector_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ impl ConnectorSource {
}
}

/// If `state` is `None`, returns a pending stream.
///
/// If the connector supports multiple splits natively (currently only Kafka - see `TopicPartitionList`),
/// use the built-in support. Otherwise, `select_all` the streams from each split.
pub async fn stream_reader(
&self,
state: ConnectorState,
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/backfill/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ pub(crate) fn get_cdc_chunk_last_offset(
table_reader: &ExternalTableReaderImpl,
chunk: &StreamChunk,
) -> StreamExecutorResult<Option<CdcOffset>> {
let row = chunk.rows().last().unwrap().1;
let row = chunk.last_row();
let offset_col = row.iter().last().unwrap();
let output = offset_col
.map(|scalar| Ok::<_, ConnectorError>(table_reader.parse_cdc_offset(scalar.into_utf8()))?);
Expand Down
Loading

0 comments on commit b8516e5

Please sign in to comment.