Skip to content

Commit

Permalink
feat: display source name along with table (risingwavelabs#6610)
Browse files Browse the repository at this point in the history
* display source name along with table

* update ts proto

* add dep

* fix source exec v2

* remove protoc-gen-ts as dep

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
tabVersion and mergify[bot] authored Nov 28, 2022
1 parent 465b238 commit 66df2f7
Show file tree
Hide file tree
Showing 11 changed files with 45 additions and 10 deletions.
5 changes: 5 additions & 0 deletions dashboard/proto/gen/stream_plan.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion grafana/risingwave-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ def section_streaming(panels):
[
panels.target(
f"rate({metric('stream_source_output_rows_counts')}[$__rate_interval])",
"source={{source_id}} @ {{instance}}",
"source={{source_name}} {{source_id}} @ {{instance}}",
),
],
),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dashboard.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ message SourceNode {
repeated int32 pk_column_ids = 5;
map<string, string> properties = 6;
catalog.SourceInfo info = 7;
string source_name = 8;
}

message SinkNode {
Expand Down
3 changes: 3 additions & 0 deletions src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ use risingwave_stream::executor::{
};
use tokio::sync::mpsc::unbounded_channel;

const MOCK_SOURCE_NAME: &str = "mock_source";

struct SingleChunkExecutor {
chunk: Option<DataChunk>,
schema: Schema,
Expand Down Expand Up @@ -138,6 +140,7 @@ async fn test_table_materialize() -> StreamResult<()> {
ActorContext::create(0x3f3f3f),
source_builder,
source_table_id,
MOCK_SOURCE_NAME.to_string(),
vnodes,
state_table,
all_column_ids.clone(),
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,7 @@ pub fn to_stream_prost_body(
let me = &me.core.catalog;
ProstNode::Source(SourceNode {
source_id: me.id,
source_name: me.name.clone(),
state_table: Some(
generic::Source::infer_internal_table_catalog(base)
.with_id(state.gen_table_id_wrapped())
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/stream_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ impl StreamNode for StreamSource {
let source_catalog = self.logical.source_catalog();
ProstStreamNode::Source(SourceNode {
source_id: source_catalog.id,
source_name: source_catalog.name.clone(),
state_table: Some(
self.logical
.infer_internal_table_catalog()
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl StreamingMetrics {
let source_output_row_count = register_int_counter_vec_with_registry!(
"stream_source_output_rows_counts",
"Total number of rows that have been output from source",
&["source_id"],
&["source_id", "source_name"],
registry
)
.unwrap();
Expand Down
13 changes: 12 additions & 1 deletion src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ pub struct SourceExecutor<S: StateStore> {
stream_source_splits: Vec<SplitImpl>,

source_identify: String,
source_name: String,

split_state_store: SourceStateTableHandler<S>,

Expand All @@ -81,6 +82,7 @@ impl<S: StateStore> SourceExecutor<S> {
ctx: ActorContextRef,
source_desc_builder: SourceDescBuilder,
source_id: TableId,
source_name: String,
vnodes: Bitmap,
state_table: SourceStateTableHandler<S>,
column_ids: Vec<ColumnId>,
Expand All @@ -98,6 +100,7 @@ impl<S: StateStore> SourceExecutor<S> {
Ok(Self {
ctx,
source_id,
source_name,
source_desc_builder,
row_id_generator: RowIdGenerator::with_epoch(
vnode_id as u32,
Expand Down Expand Up @@ -415,7 +418,10 @@ impl<S: StateStore> SourceExecutor<S> {

self.metrics
.source_output_row_count
.with_label_values(&[self.source_identify.as_str()])
.with_label_values(&[
self.source_identify.as_str(),
self.source_name.as_str(),
])
.inc_by(chunk.cardinality() as u64);
yield Message::Chunk(chunk);
}
Expand Down Expand Up @@ -533,6 +539,8 @@ mod tests {

use super::*;

const MOCK_SOURCE_NAME: &str = "mock_source";

#[tokio::test]
async fn test_table_source() {
let table_id = TableId::default();
Expand Down Expand Up @@ -584,6 +592,7 @@ mod tests {
ActorContext::create(0x3f3f3f),
source_builder,
table_id,
MOCK_SOURCE_NAME.to_string(),
vnodes,
state_table,
column_ids,
Expand Down Expand Up @@ -687,6 +696,7 @@ mod tests {
ActorContext::create(0x3f3f3f),
source_builder,
table_id,
MOCK_SOURCE_NAME.to_string(),
vnodes,
state_table,
column_ids,
Expand Down Expand Up @@ -816,6 +826,7 @@ mod tests {
ActorContext::create(0),
source_builder,
source_table_id,
MOCK_SOURCE_NAME.to_string(),
vnodes,
source_state_handler,
column_ids.clone(),
Expand Down
23 changes: 17 additions & 6 deletions src/stream/src/executor/source/source_executor_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const WAIT_BARRIER_MULTIPLE_TIMES: u128 = 5;
/// external connector.
pub struct StreamSourceCore<S: StateStore> {
table_id: TableId,
source_name: String,

column_ids: Vec<ColumnId>,

Expand Down Expand Up @@ -396,12 +397,18 @@ impl<S: StateStore> SourceExecutorV2<S> {

self.metrics
.source_output_row_count
.with_label_values(&[self
.stream_source_core
.as_ref()
.unwrap()
.source_identify
.as_str()])
.with_label_values(&[
self.stream_source_core
.as_ref()
.unwrap()
.source_identify
.as_str(),
self.stream_source_core
.as_ref()
.unwrap()
.source_name
.as_ref(),
])
.inc_by(chunk.cardinality() as u64);
yield Message::Chunk(chunk);
}
Expand Down Expand Up @@ -489,6 +496,8 @@ mod tests {
use super::*;
use crate::executor::ActorContext;

const MOCK_SOURCE_NAME: &str = "mock_source";

#[tokio::test]
async fn test_source_executor() {
let table_id = TableId::default();
Expand Down Expand Up @@ -536,6 +545,7 @@ mod tests {
stream_source_splits: vec![],
split_state_store,
state_cache: HashMap::new(),
source_name: MOCK_SOURCE_NAME.to_string(),
};

let executor = SourceExecutorV2::new(
Expand Down Expand Up @@ -627,6 +637,7 @@ mod tests {
stream_source_splits: vec![],
split_state_store,
state_cache: HashMap::new(),
source_name: MOCK_SOURCE_NAME.to_string(),
};

let executor = SourceExecutorV2::new(
Expand Down
2 changes: 2 additions & 0 deletions src/stream/src/from_proto/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ impl ExecutorBuilder for SourceExecutorBuilder {
.register_sender(params.actor_context.id, sender);

let source_id = TableId::new(node.source_id);
let source_name = node.source_name.clone();

let source_builder = SourceDescBuilder::new(
source_id,
Expand Down Expand Up @@ -81,6 +82,7 @@ impl ExecutorBuilder for SourceExecutorBuilder {
params.actor_context,
source_builder,
source_id,
source_name,
vnodes,
state_table_handler,
column_ids,
Expand Down

0 comments on commit 66df2f7

Please sign in to comment.