Skip to content

Commit

Permalink
feat(stream): add kafka backill executor
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Jan 22, 2024
1 parent 3cbc2f2 commit 6a7f538
Show file tree
Hide file tree
Showing 18 changed files with 877 additions and 6 deletions.
19 changes: 19 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,23 @@ message StreamFsFetchNode {
StreamFsFetch node_inner = 1;
}

message SourceBackfillNode {
uint32 source_id = 1;
optional uint32 row_id_index = 3;
// XXX: is this all columns or only required columns?
repeated plan_common.ColumnCatalog columns = 4;
catalog.StreamSourceInfo info = 7;
string source_name = 8;
map<string, string> with_properties = 6;
// Streaming rate limit
// optional uint32 rate_limit = 9;

// fields above are the same as StreamSource

// `| partition_id | backfill_progress |`
catalog.Table state_table = 2;
}

message SinkDesc {
reserved 4;
reserved "columns";
Expand Down Expand Up @@ -758,6 +775,7 @@ message StreamNode {
StreamFsFetchNode stream_fs_fetch = 138;
StreamCdcScanNode stream_cdc_scan = 139;
CdcFilterNode cdc_filter = 140;
SourceBackfillNode source_backfill = 141;
}
// The id for the operator. This is local per mview.
// TODO: should better be a uint32.
Expand Down Expand Up @@ -852,6 +870,7 @@ enum FragmentTypeFlag {
FRAGMENT_TYPE_FLAG_VALUES = 64;
FRAGMENT_TYPE_FLAG_DML = 128;
FRAGMENT_TYPE_FLAG_CDC_FILTER = 256;
FRAGMENT_TYPE_FLAG_SOURCE_BACKFILL = 512;
}

// The streaming context associated with a stream plan
Expand Down
5 changes: 5 additions & 0 deletions src/common/src/array/data_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ impl DataChunk {
self.visibility.next_set_bit(row_idx)
}

/// Return the prev visible row index ~~on or~~ before `row_idx`.
pub fn prev_visible_row_idx(&self, row_idx: usize) -> Option<usize> {
self.visibility.prev_set_bit(row_idx)
}

pub fn into_parts(self) -> (Vec<ArrayRef>, Bitmap) {
(self.columns.to_vec(), self.visibility)
}
Expand Down
18 changes: 18 additions & 0 deletions src/common/src/array/data_chunk_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,24 @@ impl<'a> Iterator for DataChunkRefIter<'a> {
}
}

impl<'a> DoubleEndedIterator for DataChunkRefIter<'a> {
fn next_back(&mut self) -> Option<Self::Item> {
if self.idx.start == self.idx.end {
return None;
}
match self.chunk.prev_visible_row_idx(self.idx.end) {
Some(idx) if idx >= self.idx.start => {
self.idx.end = idx;
Some(RowRef::new(self.chunk, idx))
}
_ => {
self.idx.end = self.idx.start;
None
}
}
}
}

impl<'a> FusedIterator for DataChunkRefIter<'a> {}

pub struct DataChunkRefIterWithHoles<'a> {
Expand Down
6 changes: 6 additions & 0 deletions src/common/src/array/stream_chunk_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ impl StreamChunk {
self.rows_in(0..self.capacity())
}

/// # Panics
/// If the chunk is empty.
pub fn last_row(&self) -> RowRef<'_> {
self.data_chunk().rows().next_back().unwrap()
}

/// Return an iterator on rows of this stream chunk in a range.
pub fn rows_in(&self, range: Range<usize>) -> impl Iterator<Item = (Op, RowRef<'_>)> {
self.data_chunk().rows_in(range).map(|row| {
Expand Down
7 changes: 7 additions & 0 deletions src/common/src/buffer/bitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,13 @@ impl Bitmap {
(bit_idx..self.len()).find(|&idx| unsafe { self.is_set_unchecked(idx) })
}

/// Return the prev set bit index ~~on or~~ before `bit_idx`.
pub fn prev_set_bit(&self, bit_idx: usize) -> Option<usize> {
(0..bit_idx)
.rev()
.find(|&idx| unsafe { self.is_set_unchecked(idx) })
}

/// Counts the number of bits set to 1.
pub fn count_ones(&self) -> usize {
self.count_ones
Expand Down
7 changes: 6 additions & 1 deletion src/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,13 +680,17 @@ macro_rules! impl_convert {

paste! {
impl ScalarImpl {
/// # Panics
/// If the scalar is not of the expected type.
pub fn [<as_ $suffix_name>](&self) -> &$scalar {
match self {
Self::$variant_name(ref scalar) => scalar,
other_scalar => panic!("cannot convert ScalarImpl::{} to concrete type {}", other_scalar.get_ident(), stringify!($variant_name))
}
}

/// # Panics
/// If the scalar is not of the expected type.
pub fn [<into_ $suffix_name>](self) -> $scalar {
match self {
Self::$variant_name(scalar) => scalar,
Expand All @@ -696,7 +700,8 @@ macro_rules! impl_convert {
}

impl <'scalar> ScalarRefImpl<'scalar> {
// Note that this conversion consume self.
/// # Panics
/// If the scalar is not of the expected type.
pub fn [<into_ $suffix_name>](self) -> $scalar_ref {
match self {
Self::$variant_name(inner) => inner,
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,7 @@ pub enum ByteStreamSourceParserImpl {
pub type ParsedStreamImpl = impl SourceWithStateStream + Unpin;

impl ByteStreamSourceParserImpl {
/// Converts this parser into a stream of [`StreamChunk`].
/// Converts this `SourceMessage` stream into a stream of [`StreamChunk`].
pub fn into_stream(self, msg_stream: BoxSourceStream) -> ParsedStreamImpl {
#[auto_enum(futures03::Stream)]
let stream = match self {
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ pub type BoxTryStream<M> = BoxStream<'static, Result<M, RwError>>;
#[derive(Clone, Debug, PartialEq)]
pub struct StreamChunkWithState {
pub chunk: StreamChunk,
/// The latest offsets for each split in this chunk.
pub split_offset_mapping: Option<HashMap<SplitId, String>>,
}

Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ async fn extract_json_table_schema(
}
}

/// Note: these columns are added in `SourceStreamChunkRowWriter::do_action`.
/// May also look for the usage of `SourceColumnType`.
pub fn debezium_cdc_source_schema() -> Vec<ColumnCatalog> {
let columns = vec![
ColumnCatalog {
Expand Down
4 changes: 4 additions & 0 deletions src/source/src/connector_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ impl ConnectorSource {
}
}

/// If `state` is `None`, returns a pending stream.
///
/// If the connector supports multiple splits natively (currently only Kafka - see `TopicPartitionList`),
/// use the built-in support. Otherwise, `select_all` the streams from each split.
pub async fn stream_reader(
&self,
state: ConnectorState,
Expand Down
2 changes: 1 addition & 1 deletion src/stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ risingwave_rpc_client = { workspace = true }
risingwave_source = { workspace = true }
risingwave_storage = { workspace = true }
rw_futures_util = { workspace = true }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
smallvec = "1"
static_assertions = "1"
Expand Down Expand Up @@ -91,7 +92,6 @@ risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] }
risingwave_hummock_test = { path = "../storage/hummock_test", features = [
"test",
] }
serde = { version = "1.0", features = ["derive"] }
serde_yaml = "0.9"
tracing-test = "0.2"

Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/backfill/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ pub(crate) fn get_cdc_chunk_last_offset(
table_reader: &ExternalTableReaderImpl,
chunk: &StreamChunk,
) -> StreamExecutorResult<Option<CdcOffset>> {
let row = chunk.rows().last().unwrap().1;
let row = chunk.last_row();
let offset_col = row.iter().last().unwrap();
let output = offset_col
.map(|scalar| Ok::<_, ConnectorError>(table_reader.parse_cdc_offset(scalar.into_utf8()))?);
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/exchange/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl Output for LocalOutput {
.await
.map_err(|SendError(message)| {
anyhow!(
"failed to send message to actor {}: {:?}",
"failed to send message to actor {}, message: {:?}",
self.actor_id,
message
)
Expand Down Expand Up @@ -130,7 +130,7 @@ impl Output for RemoteOutput {
.await
.map_err(|SendError(message)| {
anyhow!(
"failed to send message to actor {}: {:#?}",
"failed to send message to actor {}, message: {:?}",
self.actor_id,
message
)
Expand Down
Loading

0 comments on commit 6a7f538

Please sign in to comment.