Skip to content

Commit

Permalink
merge chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 committed Jul 31, 2024
1 parent 657f85e commit 3115a0e
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 15 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1090,6 +1090,16 @@ def section_streaming_actors(outer_panels):
),
],
),
panels.timeseries_count(
"Actor Input Chunk Rows",
"",
[
panels.target(
f"sum(irate({metric('stream_actor_in_chunk_rows_sum')}[$__rate_interval])) by (fragment_id, upstream_fragment_id) / sum(irate({metric('stream_actor_in_chunk_rows_count')}[$__rate_interval])) by (fragment_id, upstream_fragment_id) > 0",
"stream_actor_in_chunk_rows - fragment {{fragment_id}}<-{{upstream_fragment_id}}",
),
],
),
panels.timeseries_rowsps(
"Actor Output Throughput (rows/s)",
"",
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

45 changes: 35 additions & 10 deletions src/stream/src/executor/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ pub struct MergeExecutor {

/// Streaming metrics.
metrics: Arc<StreamingMetrics>,

/// Schema of inputs
schema: Schema,
}

impl MergeExecutor {
Expand All @@ -62,6 +65,7 @@ impl MergeExecutor {
context: Arc<SharedContext>,
_receiver_id: u64,
metrics: Arc<StreamingMetrics>,
schema: Schema,
) -> Self {
Self {
actor_context: ctx,
Expand All @@ -70,6 +74,7 @@ impl MergeExecutor {
upstream_fragment_id,
context,
metrics,
schema,
}
}

Expand Down Expand Up @@ -120,6 +125,11 @@ impl MergeExecutor {
None
};

let mut chunk_builder = StreamChunkBuilder::new(
self.context.config.developer.chunk_size,
self.schema.data_types(),
);

// Futures of all active upstreams.
let select_all = SelectReceivers::new(
self.actor_context.id,
Expand All @@ -141,16 +151,32 @@ impl MergeExecutor {
metrics
.actor_input_buffer_blocking_duration_ns
.inc_by(start_time.elapsed().as_nanos() as u64);
let mut msg: Message = msg?;

match &mut msg {
Message::Watermark(_) => {
// Do nothing.
let msg: Message = msg?;

match msg {
Message::Watermark(watermark) => {
if let Some(c) = chunk_builder.take() {
metrics.actor_in_record_cnt.inc_by(c.cardinality() as _);
metrics.actor_in_chunk_rows.observe(c.cardinality() as _);
yield Message::Chunk(c);
}
yield Message::Watermark(watermark);
}
Message::Chunk(chunk) => {
metrics.actor_in_record_cnt.inc_by(chunk.cardinality() as _);
for (op, row) in chunk.rows() {
if let Some(c) = chunk_builder.append_row(op, row) {
metrics.actor_in_record_cnt.inc_by(c.cardinality() as _);
metrics.actor_in_chunk_rows.observe(c.cardinality() as _);
yield Message::Chunk(c);
}
}
}
Message::Barrier(barrier) => {
Message::Barrier(mut barrier) => {
if let Some(c) = chunk_builder.take() {
metrics.actor_in_record_cnt.inc_by(c.cardinality() as _);
metrics.actor_in_chunk_rows.observe(c.cardinality() as _);
yield Message::Chunk(c);
}
tracing::debug!(
target: "events::stream::barrier::path",
actor_id = actor_id,
Expand Down Expand Up @@ -220,7 +246,7 @@ impl MergeExecutor {
merge_barrier_align_duration.clone(),
);
let new_barrier = expect_first_barrier(&mut select_new).await?;
assert_eq!(barrier, &new_barrier);
assert_eq!(barrier, new_barrier);

// Add the new upstreams to select.
select_all.add_upstreams_from(select_new);
Expand Down Expand Up @@ -254,10 +280,9 @@ impl MergeExecutor {

select_all.update_actor_ids();
}
yield Message::Barrier(barrier);
}
}

yield msg;
start_time = Instant::now();
}
}
Expand Down
26 changes: 23 additions & 3 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ use prometheus::{
use risingwave_common::catalog::TableId;
use risingwave_common::config::MetricLevel;
use risingwave_common::metrics::{
LabelGuardedGauge, LabelGuardedGaugeVec, LabelGuardedHistogramVec, LabelGuardedIntCounter,
LabelGuardedIntCounterVec, LabelGuardedIntGauge, LabelGuardedIntGaugeVec,
RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec,
LabelGuardedGauge, LabelGuardedGaugeVec, LabelGuardedHistogram, LabelGuardedHistogramVec,
LabelGuardedIntCounter, LabelGuardedIntCounterVec, LabelGuardedIntGauge,
LabelGuardedIntGaugeVec, RelabeledGuardedHistogramVec, RelabeledGuardedIntCounterVec,
};
use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
use risingwave_common::{
Expand Down Expand Up @@ -64,6 +64,7 @@ pub struct StreamingMetrics {
#[expect(dead_code)]
actor_memory_usage: LabelGuardedIntGaugeVec<2>,
actor_in_record_cnt: LabelGuardedIntCounterVec<3>,
pub actor_in_chunk_rows: LabelGuardedHistogramVec<3>,
pub actor_out_record_cnt: LabelGuardedIntCounterVec<2>,

// Source
Expand Down Expand Up @@ -390,6 +391,18 @@ impl StreamingMetrics {
)
.unwrap();

let opts = histogram_opts!(
"stream_actor_in_chunk_rows",
"Input chunk rows of actor received",
exponential_buckets(1.0, 2.0, 10).unwrap(), // 1 to 1024
);
let actor_in_chunk_rows = register_guarded_histogram_vec_with_registry!(
opts,
&["actor_id", "fragment_id", "upstream_fragment_id"],
registry
)
.unwrap();

let actor_out_record_cnt = register_guarded_int_counter_vec_with_registry!(
"stream_actor_out_record_cnt",
"Total number of rows actor sent",
Expand Down Expand Up @@ -1117,6 +1130,7 @@ impl StreamingMetrics {
actor_idle_cnt,
actor_memory_usage,
actor_in_record_cnt,
actor_in_chunk_rows,
actor_out_record_cnt,
source_output_row_count,
source_split_change_count,
Expand Down Expand Up @@ -1349,6 +1363,11 @@ impl StreamingMetrics {
&fragment_id_str,
&upstream_fragment_id_str,
]),
actor_in_chunk_rows: self.actor_in_chunk_rows.with_guarded_label_values(&[
&actor_id_str,
&fragment_id_str,
&upstream_fragment_id_str,
]),
actor_input_buffer_blocking_duration_ns: self
.actor_input_buffer_blocking_duration_ns
.with_guarded_label_values(&[
Expand Down Expand Up @@ -1630,6 +1649,7 @@ impl StreamingMetrics {

pub(crate) struct ActorInputMetrics {
pub(crate) actor_in_record_cnt: LabelGuardedIntCounter<3>,
pub(crate) actor_in_chunk_rows: LabelGuardedHistogram<3>,
pub(crate) actor_input_buffer_blocking_duration_ns: LabelGuardedIntCounter<3>,
}

Expand Down
2 changes: 2 additions & 0 deletions src/stream/src/from_proto/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::catalog::{Field, Schema};
use risingwave_pb::stream_plan::{DispatcherType, MergeNode};

use super::*;
Expand Down Expand Up @@ -76,6 +77,7 @@ impl ExecutorBuilder for MergeExecutorBuilder {
params.shared_context.clone(),
params.operator_id,
params.executor_stats.clone(),
Schema::from_iter(node.fields.iter().map(|f| Field::from(f))),
)
.boxed()
};
Expand Down

0 comments on commit 3115a0e

Please sign in to comment.