Skip to content

Commit

Permalink
feat(stream): add kafka backfill executor (#14172)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan authored Mar 11, 2024
1 parent 0b23c9d commit c7d5d9a
Show file tree
Hide file tree
Showing 26 changed files with 1,311 additions and 110 deletions.
2 changes: 1 addition & 1 deletion ci/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ ENV PATH /root/.cargo/bin/:$PATH
RUN rustup show
RUN rustup default `rustup show active-toolchain | awk '{print $1}'`

RUN curl -sSL "https://github.com/bufbuild/buf/releases/download/v1.4.0/buf-$(uname -s)-$(uname -m).tar.gz" | \
RUN curl -sSL "https://github.com/bufbuild/buf/releases/download/v1.29.0/buf-$(uname -s)-$(uname -m).tar.gz" | \
tar -xvzf - -C /usr/local --strip-components 1

# install python dependencies
Expand Down
2 changes: 1 addition & 1 deletion ci/build-ci-image.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ cat ../rust-toolchain
# shellcheck disable=SC2155

# REMEMBER TO ALSO UPDATE ci/docker-compose.yml
export BUILD_ENV_VERSION=v20240223
export BUILD_ENV_VERSION=v20240229

export BUILD_TAG="public.ecr.aws/x5u3w5h6/rw-build-env:${BUILD_ENV_VERSION}"

Expand Down
10 changes: 5 additions & 5 deletions ci/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ services:
retries: 5

source-test-env:
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240223
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240229
depends_on:
- mysql
- db
Expand All @@ -81,7 +81,7 @@ services:
- ..:/risingwave

sink-test-env:
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240223
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240229
depends_on:
- mysql
- db
Expand All @@ -98,12 +98,12 @@ services:


rw-build-env:
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240223
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240229
volumes:
- ..:/risingwave

ci-flamegraph-env:
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240223
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240229
# NOTE(kwannoel): This is used in order to permit
# syscalls for `nperf` (perf_event_open),
# so it can do CPU profiling.
Expand All @@ -114,7 +114,7 @@ services:
- ..:/risingwave

regress-test-env:
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240223
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240229
depends_on:
db:
condition: service_healthy
Expand Down
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,16 @@ def section_streaming(outer_panels):
)
],
),
panels.timeseries_rowsps(
"Source Backfill Throughput(rows/s)",
"The figure shows the number of rows read by each source per second.",
[
panels.target(
f"sum(rate({metric('stream_source_backfill_rows_counts')}[$__rate_interval])) by (source_id, source_name, fragment_id)",
"{{source_id}} {{source_name}} (fragment {{fragment_id}})",
),
],
),
panels.timeseries_count(
"Source Upstream Status",
"Monitor each source upstream, 0 means the upstream is not normal, 1 means the source is ready.",
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions grafana/risingwave-user-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,16 @@ def section_streaming(outer_panels):
)
],
),
panels.timeseries_rowsps(
"Source Backfill Throughput(rows/s)",
"The figure shows the number of rows read by each source per second.",
[
panels.target(
f"sum(rate({metric('stream_source_backfill_rows_counts')}[$__rate_interval])) by (source_id, source_name, fragment_id)",
"{{source_id}} {{source_name}} (fragment {{fragment_id}})",
),
],
),
panels.timeseries_rowsps(
"Materialized View Throughput(rows/s)",
"The figure shows the number of rows written into each materialized executor actor per second.",
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

18 changes: 18 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,22 @@ message StreamFsFetchNode {
StreamFsFetch node_inner = 1;
}

message SourceBackfillNode {
uint32 source_id = 1;
optional uint32 row_id_index = 2;
repeated plan_common.ColumnCatalog columns = 3;
catalog.StreamSourceInfo info = 4;
string source_name = 5;
map<string, string> with_properties = 6;
// Streaming rate limit
optional uint32 rate_limit = 7;

// fields above are the same as StreamSource

// `| partition_id | backfill_progress |`
catalog.Table state_table = 8;
}

message SinkDesc {
reserved 4;
reserved "columns";
Expand Down Expand Up @@ -770,6 +786,7 @@ message StreamNode {
StreamCdcScanNode stream_cdc_scan = 139;
CdcFilterNode cdc_filter = 140;
SubscriptionNode subscription = 141;
SourceBackfillNode source_backfill = 142;
}
// The id for the operator. This is local per mview.
// TODO: should better be a uint32.
Expand Down Expand Up @@ -865,6 +882,7 @@ enum FragmentTypeFlag {
FRAGMENT_TYPE_FLAG_DML = 128;
FRAGMENT_TYPE_FLAG_CDC_FILTER = 256;
FRAGMENT_TYPE_FLAG_SUBSCRIPTION = 512;
FRAGMENT_TYPE_FLAG_SOURCE_BACKFILL = 1024;
}

// The streaming context associated with a stream plan
Expand Down
7 changes: 6 additions & 1 deletion src/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -724,13 +724,17 @@ macro_rules! impl_convert {

paste! {
impl ScalarImpl {
/// # Panics
/// If the scalar is not of the expected type.
pub fn [<as_ $suffix_name>](&self) -> &$scalar {
match self {
Self::$variant_name(ref scalar) => scalar,
other_scalar => panic!("cannot convert ScalarImpl::{} to concrete type {}", other_scalar.get_ident(), stringify!($variant_name))
}
}

/// # Panics
/// If the scalar is not of the expected type.
pub fn [<into_ $suffix_name>](self) -> $scalar {
match self {
Self::$variant_name(scalar) => scalar,
Expand All @@ -740,7 +744,8 @@ macro_rules! impl_convert {
}

impl <'scalar> ScalarRefImpl<'scalar> {
// Note that this conversion consume self.
/// # Panics
/// If the scalar is not of the expected type.
pub fn [<into_ $suffix_name>](self) -> $scalar_ref {
match self {
Self::$variant_name(inner) => inner,
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,7 @@ pub enum ByteStreamSourceParserImpl {
pub type ParsedStreamImpl = impl ChunkSourceStream + Unpin;

impl ByteStreamSourceParserImpl {
/// Converts this parser into a stream of [`StreamChunk`].
/// Converts this `SourceMessage` stream into a stream of [`StreamChunk`].
pub fn into_stream(self, msg_stream: BoxSourceStream) -> ParsedStreamImpl {
#[auto_enum(futures03::Stream)]
let stream = match self {
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,7 @@ pub type DataType = risingwave_common::types::DataType;
pub struct Column {
pub name: String,
pub data_type: DataType,
/// This field is only used by datagen.
pub is_visible: bool,
}

Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ async fn extract_json_table_schema(
}
}

/// Note: these columns are added in `SourceStreamChunkRowWriter::do_action`.
/// May also look for the usage of `SourceColumnType`.
pub fn debezium_cdc_source_schema() -> Vec<ColumnCatalog> {
let columns = vec![
ColumnCatalog {
Expand Down
20 changes: 20 additions & 0 deletions src/meta/src/stream/source_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,27 @@ impl Default for SplitDiffOptions {
/// Reassigns splits if there are new splits or dropped splits,
/// i.e., `actor_splits` and `discovered_splits` differ.
///
/// The existing splits will remain unmoved in their currently assigned actor.
///
/// - `fragment_id`: just for logging
///
/// ## Different connectors' behavior of split change
///
/// ### Kafka and Pulsar
/// They only support increasing the number of splits via adding new empty splits.
/// Old data is not moved.
///
/// ### Kinesis
/// It supports *pairwise* shard split and merge.
///
/// In both cases, old data remain in the old shard(s) and the old shard is still available.
/// New data are routed to the new shard(s).
/// After the retention period has expired, the old shard will become `EXPIRED` and isn't
/// listed any more. In other words, the total number of shards will first increase and then decrease.
///
/// See also:
/// - [Kinesis resharding doc](https://docs.aws.amazon.com/streams/latest/dev/kinesis-using-sdk-java-after-resharding.html#kinesis-using-sdk-java-resharding-data-routing)
/// - An example of how the shards can be like: <https://stackoverflow.com/questions/72272034/list-shard-show-more-shards-than-provisioned>
fn reassign_splits<T>(
fragment_id: FragmentId,
actor_splits: HashMap<ActorId, Vec<T>>,
Expand Down
2 changes: 1 addition & 1 deletion src/stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ risingwave_pb = { workspace = true }
risingwave_rpc_client = { workspace = true }
risingwave_storage = { workspace = true }
rw_futures_util = { workspace = true }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
smallvec = "1"
static_assertions = "1"
Expand Down Expand Up @@ -93,7 +94,6 @@ risingwave_hummock_sdk = { workspace = true }
risingwave_hummock_test = { path = "../storage/hummock_test", features = [
"test",
] }
serde = { version = "1.0", features = ["derive"] }
serde_yaml = "0.9"
tracing-test = "0.2"

Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/exchange/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl Output for LocalOutput {
.await
.map_err(|SendError(message)| {
anyhow!(
"failed to send message to actor {}: {:?}",
"failed to send message to actor {}, message: {:?}",
self.actor_id,
message
)
Expand Down Expand Up @@ -130,7 +130,7 @@ impl Output for RemoteOutput {
.await
.map_err(|SendError(message)| {
anyhow!(
"failed to send message to actor {}: {:#?}",
"failed to send message to actor {}, message: {:?}",
self.actor_id,
message
)
Expand Down
10 changes: 10 additions & 0 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pub struct StreamingMetrics {
// Source
pub source_output_row_count: GenericCounterVec<AtomicU64>,
pub source_split_change_count: GenericCounterVec<AtomicU64>,
pub source_backfill_row_count: LabelGuardedIntCounterVec<4>,

// Sink & materialized view
pub sink_input_row_count: LabelGuardedIntCounterVec<3>,
Expand Down Expand Up @@ -225,6 +226,14 @@ impl StreamingMetrics {
)
.unwrap();

let source_backfill_row_count = register_guarded_int_counter_vec_with_registry!(
"stream_source_backfill_rows_counts",
"Total number of rows that have been backfilled for source",
&["source_id", "source_name", "actor_id", "fragment_id"],
registry
)
.unwrap();

let sink_input_row_count = register_guarded_int_counter_vec_with_registry!(
"stream_sink_input_row_count",
"Total number of rows streamed into sink executors",
Expand Down Expand Up @@ -1069,6 +1078,7 @@ impl StreamingMetrics {
actor_out_record_cnt,
source_output_row_count,
source_split_change_count,
source_backfill_row_count,
sink_input_row_count,
mview_input_row_count,
exchange_frag_recv_size,
Expand Down
3 changes: 3 additions & 0 deletions src/stream/src/executor/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ pub use state_table_handler::*;
pub mod fetch_executor;
pub use fetch_executor::*;

pub mod source_backfill_executor;
pub mod source_backfill_state_table;
pub use source_backfill_state_table::BackfillStateTableHandler;
pub mod source_executor;

pub mod list_executor;
Expand Down
Loading

0 comments on commit c7d5d9a

Please sign in to comment.