From a46292eedc9a6d1ad10b5fb3e7d89495edfccad2 Mon Sep 17 00:00:00 2001
From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com>
Date: Fri, 6 Oct 2023 18:26:16 +0000
Subject: [PATCH 001/107] chore(deps): bump multimap from 0.8.3 to 0.9.0
(#11464)
Signed-off-by: dependabot[bot]
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: dependabot[bot]
Co-authored-by: xxchan
---
Cargo.lock | 11 ++++++++---
src/stream/Cargo.toml | 2 +-
src/workspace-hack/Cargo.toml | 2 --
3 files changed, 9 insertions(+), 6 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 01d09a424b84f..f18d4a3ba81e7 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4800,6 +4800,12 @@ name = "multimap"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
+
+[[package]]
+name = "multimap"
+version = "0.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "70db9248a93dc36a36d9a47898caa007a32755c7ad140ec64eeeb50d5a730631"
dependencies = [
"serde",
]
@@ -6261,7 +6267,7 @@ dependencies = [
"itertools 0.10.5",
"lazy_static",
"log",
- "multimap",
+ "multimap 0.8.3",
"petgraph",
"prettyplease",
"prost",
@@ -7985,7 +7991,7 @@ dependencies = [
"madsim-tonic",
"maplit",
"memcomparable",
- "multimap",
+ "multimap 0.9.0",
"num-traits",
"parking_lot 0.12.1",
"parse-display",
@@ -10837,7 +10843,6 @@ dependencies = [
"madsim-tokio",
"md-5",
"mio",
- "multimap",
"nom",
"num-bigint",
"num-integer",
diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml
index 65ad0efff97a4..69398ceeb12dc 100644
--- a/src/stream/Cargo.toml
+++ b/src/stream/Cargo.toml
@@ -39,7 +39,7 @@ local_stats_alloc = { path = "../utils/local_stats_alloc" }
lru = { git = "https://github.com/risingwavelabs/lru-rs.git", rev = "cb2d7c7" }
maplit = "1.0.2"
memcomparable = "0.2"
-multimap = "0.8"
+multimap = "0.9"
num-traits = "0.2"
parking_lot = "0.12"
parse-display = "0.8"
diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml
index 2e9e0fec03e1e..9ac71917cdcd5 100644
--- a/src/workspace-hack/Cargo.toml
+++ b/src/workspace-hack/Cargo.toml
@@ -71,7 +71,6 @@ madsim-rdkafka = { git = "https://github.com/madsim-rs/madsim.git", rev = "fedb1
madsim-tokio = { version = "0.2", default-features = false, features = ["fs", "io-util", "macros", "net", "process", "rt", "rt-multi-thread", "signal", "sync", "time", "tracing"] }
md-5 = { version = "0.10" }
mio = { version = "0.8", features = ["net", "os-ext"] }
-multimap = { version = "0.8" }
nom = { version = "7" }
num-bigint = { version = "0.4" }
num-integer = { version = "0.1", features = ["i128"] }
@@ -169,7 +168,6 @@ log = { version = "0.4", default-features = false, features = ["kv_unstable", "s
madsim-tokio = { version = "0.2", default-features = false, features = ["fs", "io-util", "macros", "net", "process", "rt", "rt-multi-thread", "signal", "sync", "time", "tracing"] }
md-5 = { version = "0.10" }
mio = { version = "0.8", features = ["net", "os-ext"] }
-multimap = { version = "0.8" }
nom = { version = "7" }
num-bigint = { version = "0.4" }
num-integer = { version = "0.1", features = ["i128"] }
From 04f33a1e4bd21baed664c25584dce11485001cee Mon Sep 17 00:00:00 2001
From: Eric Fu
Date: Sat, 7 Oct 2023 10:24:00 +0800
Subject: [PATCH 002/107] feat(metrics): fragment level streaming metrics (part
1) (#12634)
---
src/stream/src/executor/actor.rs | 2 +
.../src/executor/aggregation/distinct.rs | 8 +-
src/stream/src/executor/exchange/input.rs | 2 +-
src/stream/src/executor/hash_agg.rs | 27 +++---
src/stream/src/executor/hash_join.rs | 15 ++--
src/stream/src/executor/lookup/impl_.rs | 7 +-
.../src/executor/managed_state/join/mod.rs | 10 ++-
src/stream/src/executor/merge.rs | 2 +-
.../src/executor/monitor/streaming_stats.rs | 90 +++++++++++--------
src/stream/src/executor/receiver.rs | 2 +-
src/stream/src/executor/temporal_join.rs | 8 +-
src/stream/src/executor/top_n/group_top_n.rs | 7 +-
.../executor/top_n/group_top_n_appendonly.rs | 7 +-
src/stream/src/task/stream_manager.rs | 3 +-
14 files changed, 117 insertions(+), 73 deletions(-)
diff --git a/src/stream/src/executor/actor.rs b/src/stream/src/executor/actor.rs
index 7a3c951292086..85846557a3c4a 100644
--- a/src/stream/src/executor/actor.rs
+++ b/src/stream/src/executor/actor.rs
@@ -37,9 +37,11 @@ pub struct ActorContext {
pub id: ActorId,
pub fragment_id: u32,
+ // TODO(eric): these seem to be useless now?
last_mem_val: Arc,
cur_mem_val: Arc,
total_mem_val: Arc>,
+
pub streaming_metrics: Arc,
pub error_suppressor: Arc>,
}
diff --git a/src/stream/src/executor/aggregation/distinct.rs b/src/stream/src/executor/aggregation/distinct.rs
index bcc12d065169e..9e1d8d66da848 100644
--- a/src/stream/src/executor/aggregation/distinct.rs
+++ b/src/stream/src/executor/aggregation/distinct.rs
@@ -69,6 +69,7 @@ impl ColumnDeduplicater {
.map(|_| BitmapBuilder::zeroed(column.len()))
.collect_vec();
let actor_id_str = ctx.id.to_string();
+ let fragment_id_str = ctx.fragment_id.to_string();
let table_id_str = dedup_table.table_id().to_string();
for (datum_idx, (op, datum)) in ops.iter().zip_eq_fast(column.iter()).enumerate() {
// skip if this item is hidden to all agg calls (this is likely to happen)
@@ -85,7 +86,7 @@ impl ColumnDeduplicater {
self.metrics_info
.metrics
.agg_distinct_total_cache_count
- .with_label_values(&[&table_id_str, &actor_id_str])
+ .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.inc();
// TODO(yuhao): avoid this `contains`.
// https://github.com/risingwavelabs/risingwave/issues/9233
@@ -95,7 +96,7 @@ impl ColumnDeduplicater {
self.metrics_info
.metrics
.agg_distinct_cache_miss_count
- .with_label_values(&[&table_id_str, &actor_id_str])
+ .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.inc();
// load from table into the cache
let counts = if let Some(counts_row) =
@@ -190,11 +191,12 @@ impl ColumnDeduplicater {
// WARN: if you want to change to batching the write to table. please remember to change
// `self.cache.evict()` too.
let actor_id_str = ctx.id.to_string();
+ let fragment_id_str = ctx.fragment_id.to_string();
let table_id_str = dedup_table.table_id().to_string();
self.metrics_info
.metrics
.agg_distinct_cached_entry_count
- .with_label_values(&[&table_id_str, &actor_id_str])
+ .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.set(self.cache.len() as i64);
self.cache.evict();
}
diff --git a/src/stream/src/executor/exchange/input.rs b/src/stream/src/executor/exchange/input.rs
index 576542ecfcecd..eb2fc97532c16 100644
--- a/src/stream/src/executor/exchange/input.rs
+++ b/src/stream/src/executor/exchange/input.rs
@@ -177,7 +177,7 @@ impl RemoteInput {
let msg_res = Message::from_protobuf(&msg);
metrics
.actor_sampled_deserialize_duration_ns
- .with_label_values(&[&down_actor_id])
+ .with_label_values(&[&down_actor_id, &down_fragment_id])
.inc_by(start_time.elapsed().as_nanos() as u64);
msg_res
} else {
diff --git a/src/stream/src/executor/hash_agg.rs b/src/stream/src/executor/hash_agg.rs
index e338341156091..6f2e6a490ad9c 100644
--- a/src/stream/src/executor/hash_agg.rs
+++ b/src/stream/src/executor/hash_agg.rs
@@ -405,15 +405,17 @@ impl HashAggExecutor {
// Update the metrics.
let actor_id_str = this.actor_ctx.id.to_string();
+ let fragment_id_str = this.actor_ctx.fragment_id.to_string();
let table_id_str = this.intermediate_state_table.table_id().to_string();
- let metric_dirty_count = this
- .metrics
- .agg_dirty_group_count
- .with_label_values(&[&table_id_str, &actor_id_str]);
+ let metric_dirty_count = this.metrics.agg_dirty_group_count.with_label_values(&[
+ &table_id_str,
+ &actor_id_str,
+ &fragment_id_str,
+ ]);
let metric_dirty_heap_size = this
.metrics
.agg_dirty_group_heap_size
- .with_label_values(&[&table_id_str, &actor_id_str]);
+ .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str]);
let new_group_size = agg_group.estimated_size();
if let Some(old_group_size) = old_group_size {
match new_group_size.cmp(&old_group_size) {
@@ -448,29 +450,30 @@ impl HashAggExecutor {
) {
// Update metrics.
let actor_id_str = this.actor_ctx.id.to_string();
+ let fragment_id_str = this.actor_ctx.fragment_id.to_string();
let table_id_str = this.intermediate_state_table.table_id().to_string();
this.metrics
.agg_lookup_miss_count
- .with_label_values(&[&table_id_str, &actor_id_str])
+ .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.inc_by(vars.stats.lookup_miss_count);
vars.stats.lookup_miss_count = 0;
this.metrics
.agg_total_lookup_count
- .with_label_values(&[&table_id_str, &actor_id_str])
+ .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.inc_by(vars.stats.total_lookup_count);
vars.stats.total_lookup_count = 0;
this.metrics
.agg_cached_keys
- .with_label_values(&[&table_id_str, &actor_id_str])
+ .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.set(vars.agg_group_cache.len() as i64);
this.metrics
.agg_chunk_lookup_miss_count
- .with_label_values(&[&table_id_str, &actor_id_str])
+ .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.inc_by(vars.stats.chunk_lookup_miss_count);
vars.stats.chunk_lookup_miss_count = 0;
this.metrics
.agg_chunk_total_lookup_count
- .with_label_values(&[&table_id_str, &actor_id_str])
+ .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.inc_by(vars.stats.chunk_total_lookup_count);
vars.stats.chunk_total_lookup_count = 0;
@@ -552,11 +555,11 @@ impl HashAggExecutor {
vars.dirty_groups_heap_size.set(0);
this.metrics
.agg_dirty_group_count
- .with_label_values(&[&table_id_str, &actor_id_str])
+ .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.set(0);
this.metrics
.agg_dirty_group_heap_size
- .with_label_values(&[&table_id_str, &actor_id_str])
+ .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.set(0);
// Yield the remaining rows in chunk builder.
diff --git a/src/stream/src/executor/hash_join.rs b/src/stream/src/executor/hash_join.rs
index 8c1998811c159..ad380a56b3da0 100644
--- a/src/stream/src/executor/hash_join.rs
+++ b/src/stream/src/executor/hash_join.rs
@@ -639,6 +639,7 @@ impl HashJoinExecutor HashJoinExecutor HashJoinExecutor HashJoinExecutor {
@@ -761,7 +764,7 @@ impl HashJoinExecutor {
@@ -787,7 +790,7 @@ impl HashJoinExecutor {
@@ -817,17 +820,17 @@ impl HashJoinExecutor LookupExecutor {
.into_owned_row();
let table_id_str = self.arrangement.storage_table.table_id().to_string();
let actor_id_str = self.ctx.id.to_string();
+ let fragment_id_str = self.ctx.fragment_id.to_string();
self.ctx
.streaming_metrics
.lookup_total_query_cache_count
- .with_label_values(&[&table_id_str, &actor_id_str])
+ .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.inc();
if let Some(result) = self.lookup_cache.lookup(&lookup_row) {
return Ok(result.iter().cloned().collect_vec());
@@ -384,7 +385,7 @@ impl LookupExecutor {
self.ctx
.streaming_metrics
.lookup_cache_miss_count
- .with_label_values(&[&table_id_str, &actor_id_str])
+ .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.inc();
tracing::trace!(target: "events::stream::lookup::lookup_row", "{:?}", lookup_row);
@@ -433,7 +434,7 @@ impl LookupExecutor {
self.ctx
.streaming_metrics
.lookup_cached_entry_count
- .with_label_values(&[&table_id_str, &actor_id_str])
+ .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.set(self.lookup_cache.len() as i64);
Ok(all_rows.into_inner())
diff --git a/src/stream/src/executor/managed_state/join/mod.rs b/src/stream/src/executor/managed_state/join/mod.rs
index 7ee23c06a5631..cbb90a7cb5ae6 100644
--- a/src/stream/src/executor/managed_state/join/mod.rs
+++ b/src/stream/src/executor/managed_state/join/mod.rs
@@ -40,7 +40,7 @@ use crate::common::metrics::MetricsInfo;
use crate::common::table::state_table::StateTable;
use crate::executor::error::StreamExecutorResult;
use crate::executor::monitor::StreamingMetrics;
-use crate::task::{ActorId, AtomicU64Ref};
+use crate::task::{ActorId, AtomicU64Ref, FragmentId};
type DegreeType = u64;
@@ -161,6 +161,7 @@ pub struct JoinHashMapMetrics {
metrics: Arc,
/// Basic information
actor_id: String,
+ fragment_id: String,
join_table_id: String,
degree_table_id: String,
side: &'static str,
@@ -175,6 +176,7 @@ impl JoinHashMapMetrics {
pub fn new(
metrics: Arc,
actor_id: ActorId,
+ fragment_id: FragmentId,
side: &'static str,
join_table_id: u32,
degree_table_id: u32,
@@ -182,6 +184,7 @@ impl JoinHashMapMetrics {
Self {
metrics,
actor_id: actor_id.to_string(),
+ fragment_id: fragment_id.to_string(),
join_table_id: join_table_id.to_string(),
degree_table_id: degree_table_id.to_string(),
side,
@@ -199,6 +202,7 @@ impl JoinHashMapMetrics {
&self.join_table_id,
&self.degree_table_id,
&self.actor_id,
+ &self.fragment_id,
])
.inc_by(self.lookup_miss_count as u64);
self.metrics
@@ -208,6 +212,7 @@ impl JoinHashMapMetrics {
&self.join_table_id,
&self.degree_table_id,
&self.actor_id,
+ &self.fragment_id,
])
.inc_by(self.total_lookup_count as u64);
self.metrics
@@ -217,6 +222,7 @@ impl JoinHashMapMetrics {
&self.join_table_id,
&self.degree_table_id,
&self.actor_id,
+ &self.fragment_id,
])
.inc_by(self.insert_cache_miss_count as u64);
self.total_lookup_count = 0;
@@ -284,6 +290,7 @@ impl JoinHashMap {
pk_contained_in_jk: bool,
metrics: Arc,
actor_id: ActorId,
+ fragment_id: FragmentId,
side: &'static str,
) -> Self {
let alloc = StatsAlloc::new(Global).shared();
@@ -335,6 +342,7 @@ impl JoinHashMap {
metrics: JoinHashMapMetrics::new(
metrics,
actor_id,
+ fragment_id,
side,
join_table_id,
degree_table_id,
diff --git a/src/stream/src/executor/merge.rs b/src/stream/src/executor/merge.rs
index 4df4ae4d3e302..9c71684f529b8 100644
--- a/src/stream/src/executor/merge.rs
+++ b/src/stream/src/executor/merge.rs
@@ -134,7 +134,7 @@ impl MergeExecutor {
Message::Chunk(chunk) => {
self.metrics
.actor_in_record_cnt
- .with_label_values(&[&actor_id_str])
+ .with_label_values(&[&actor_id_str, &fragment_id_str])
.inc_by(chunk.cardinality() as _);
}
Message::Barrier(barrier) => {
diff --git a/src/stream/src/executor/monitor/streaming_stats.rs b/src/stream/src/executor/monitor/streaming_stats.rs
index 7737e84c3d0eb..189eb1ae334d1 100644
--- a/src/stream/src/executor/monitor/streaming_stats.rs
+++ b/src/stream/src/executor/monitor/streaming_stats.rs
@@ -33,7 +33,7 @@ pub struct StreamingMetrics {
// Executor metrics (disabled by default)
pub executor_row_count: GenericCounterVec,
- // Actor metrics
+ // Streaming actor metrics from tokio (disabled by default)
pub actor_execution_time: GenericGaugeVec,
pub actor_output_buffer_blocking_duration_ns: GenericCounterVec,
pub actor_input_buffer_blocking_duration_ns: GenericCounterVec,
@@ -47,6 +47,8 @@ pub struct StreamingMetrics {
pub actor_poll_cnt: GenericGaugeVec,
pub actor_idle_duration: GenericGaugeVec,
pub actor_idle_cnt: GenericGaugeVec,
+
+ // Streaming actor
pub actor_memory_usage: GenericGaugeVec,
pub actor_in_record_cnt: GenericCounterVec,
pub actor_out_record_cnt: GenericCounterVec,
@@ -96,7 +98,7 @@ pub struct StreamingMetrics {
pub group_top_n_appendonly_total_query_cache_count: GenericCounterVec,
pub group_top_n_appendonly_cached_entry_count: GenericGaugeVec,
- // look up
+ // Lookup executor
pub lookup_cache_miss_count: GenericCounterVec,
pub lookup_total_query_cache_count: GenericCounterVec,
pub lookup_cached_entry_count: GenericGaugeVec,
@@ -328,7 +330,7 @@ impl StreamingMetrics {
let actor_in_record_cnt = register_int_counter_vec_with_registry!(
"stream_actor_in_record_cnt",
"Total number of rows actor received",
- &["actor_id"],
+ &["actor_id", "fragment_id"],
registry
)
.unwrap();
@@ -344,7 +346,7 @@ impl StreamingMetrics {
let actor_sampled_deserialize_duration_ns = register_int_counter_vec_with_registry!(
"actor_sampled_deserialize_duration_ns",
"Duration (ns) of sampled chunk deserialization",
- &["actor_id"],
+ &["actor_id", "fragment_id"],
registry
)
.unwrap();
@@ -352,7 +354,7 @@ impl StreamingMetrics {
let actor_memory_usage = register_int_gauge_vec_with_registry!(
"actor_memory_usage",
"Memory usage (bytes)",
- &["actor_id"],
+ &["actor_id", "fragment_id"],
registry,
)
.unwrap();
@@ -360,7 +362,13 @@ impl StreamingMetrics {
let join_lookup_miss_count = register_int_counter_vec_with_registry!(
"stream_join_lookup_miss_count",
"Join executor lookup miss duration",
- &["side", "join_table_id", "degree_table_id", "actor_id"],
+ &[
+ "side",
+ "join_table_id",
+ "degree_table_id",
+ "actor_id",
+ "fragment_id"
+ ],
registry
)
.unwrap();
@@ -368,7 +376,13 @@ impl StreamingMetrics {
let join_total_lookup_count = register_int_counter_vec_with_registry!(
"stream_join_lookup_total_count",
"Join executor lookup total operation",
- &["side", "join_table_id", "degree_table_id", "actor_id"],
+ &[
+ "side",
+ "join_table_id",
+ "degree_table_id",
+ "actor_id",
+ "fragment_id"
+ ],
registry
)
.unwrap();
@@ -376,7 +390,13 @@ impl StreamingMetrics {
let join_insert_cache_miss_count = register_int_counter_vec_with_registry!(
"stream_join_insert_cache_miss_count",
"Join executor cache miss when insert operation",
- &["side", "join_table_id", "degree_table_id", "actor_id"],
+ &[
+ "side",
+ "join_table_id",
+ "degree_table_id",
+ "actor_id",
+ "fragment_id"
+ ],
registry
)
.unwrap();
@@ -384,7 +404,7 @@ impl StreamingMetrics {
let join_actor_input_waiting_duration_ns = register_int_counter_vec_with_registry!(
"stream_join_actor_input_waiting_duration_ns",
"Total waiting duration (ns) of input buffer of join actor",
- &["actor_id"],
+ &["actor_id", "fragment_id"],
registry
)
.unwrap();
@@ -392,7 +412,7 @@ impl StreamingMetrics {
let join_match_duration_ns = register_int_counter_vec_with_registry!(
"stream_join_match_duration_ns",
"Matching duration for each side",
- &["actor_id", "side"],
+ &["actor_id", "fragment_id", "side"],
registry
)
.unwrap();
@@ -419,7 +439,7 @@ impl StreamingMetrics {
let join_cached_entries = register_int_gauge_vec_with_registry!(
"stream_join_cached_entries",
"Number of cached entries in streaming join operators",
- &["actor_id", "side"],
+ &["actor_id", "fragment_id", "side"],
registry
)
.unwrap();
@@ -427,7 +447,7 @@ impl StreamingMetrics {
let join_cached_rows = register_int_gauge_vec_with_registry!(
"stream_join_cached_rows",
"Number of cached rows in streaming join operators",
- &["actor_id", "side"],
+ &["actor_id", "fragment_id", "side"],
registry
)
.unwrap();
@@ -435,7 +455,7 @@ impl StreamingMetrics {
let join_cached_estimated_size = register_int_gauge_vec_with_registry!(
"stream_join_cached_estimated_size",
"Estimated size of all cached entries in streaming join operators",
- &["actor_id", "side"],
+ &["actor_id", "fragment_id", "side"],
registry
)
.unwrap();
@@ -463,7 +483,7 @@ impl StreamingMetrics {
let agg_lookup_miss_count = register_int_counter_vec_with_registry!(
"stream_agg_lookup_miss_count",
"Aggregation executor lookup miss duration",
- &["table_id", "actor_id"],
+ &["table_id", "actor_id", "fragment_id"],
registry
)
.unwrap();
@@ -471,7 +491,7 @@ impl StreamingMetrics {
let agg_total_lookup_count = register_int_counter_vec_with_registry!(
"stream_agg_lookup_total_count",
"Aggregation executor lookup total operation",
- &["table_id", "actor_id"],
+ &["table_id", "actor_id", "fragment_id"],
registry
)
.unwrap();
@@ -479,7 +499,7 @@ impl StreamingMetrics {
let agg_distinct_cache_miss_count = register_int_counter_vec_with_registry!(
"stream_agg_distinct_cache_miss_count",
"Aggregation executor dinsinct miss duration",
- &["table_id", "actor_id"],
+ &["table_id", "actor_id", "fragment_id"],
registry
)
.unwrap();
@@ -487,7 +507,7 @@ impl StreamingMetrics {
let agg_distinct_total_cache_count = register_int_counter_vec_with_registry!(
"stream_agg_distinct_total_cache_count",
"Aggregation executor distinct total operation",
- &["table_id", "actor_id"],
+ &["table_id", "actor_id", "fragment_id"],
registry
)
.unwrap();
@@ -495,7 +515,7 @@ impl StreamingMetrics {
let agg_distinct_cached_entry_count = register_int_gauge_vec_with_registry!(
"stream_agg_distinct_cached_entry_count",
"Total entry counts in distinct aggregation executor cache",
- &["table_id", "actor_id"],
+ &["table_id", "actor_id", "fragment_id"],
registry
)
.unwrap();
@@ -503,7 +523,7 @@ impl StreamingMetrics {
let agg_dirty_group_count = register_int_gauge_vec_with_registry!(
"stream_agg_dirty_group_count",
"Total dirty group counts in aggregation executor",
- &["table_id", "actor_id"],
+ &["table_id", "actor_id", "fragment_id"],
registry
)
.unwrap();
@@ -511,7 +531,7 @@ impl StreamingMetrics {
let agg_dirty_group_heap_size = register_int_gauge_vec_with_registry!(
"stream_agg_dirty_group_heap_size",
"Total dirty group heap size in aggregation executor",
- &["table_id", "actor_id"],
+ &["table_id", "actor_id", "fragment_id"],
registry
)
.unwrap();
@@ -519,7 +539,7 @@ impl StreamingMetrics {
let group_top_n_cache_miss_count = register_int_counter_vec_with_registry!(
"stream_group_top_n_cache_miss_count",
"Group top n executor cache miss count",
- &["table_id", "actor_id"],
+ &["table_id", "actor_id", "fragment_id"],
registry
)
.unwrap();
@@ -527,7 +547,7 @@ impl StreamingMetrics {
let group_top_n_total_query_cache_count = register_int_counter_vec_with_registry!(
"stream_group_top_n_total_query_cache_count",
"Group top n executor query cache total count",
- &["table_id", "actor_id"],
+ &["table_id", "actor_id", "fragment_id"],
registry
)
.unwrap();
@@ -535,7 +555,7 @@ impl StreamingMetrics {
let group_top_n_cached_entry_count = register_int_gauge_vec_with_registry!(
"stream_group_top_n_cached_entry_count",
"Total entry counts in group top n executor cache",
- &["table_id", "actor_id"],
+ &["table_id", "actor_id", "fragment_id"],
registry
)
.unwrap();
@@ -543,7 +563,7 @@ impl StreamingMetrics {
let group_top_n_appendonly_cache_miss_count = register_int_counter_vec_with_registry!(
"stream_group_top_n_appendonly_cache_miss_count",
"Group top n appendonly executor cache miss count",
- &["table_id", "actor_id"],
+ &["table_id", "actor_id", "fragment_id"],
registry
)
.unwrap();
@@ -552,7 +572,7 @@ impl StreamingMetrics {
register_int_counter_vec_with_registry!(
"stream_group_top_n_appendonly_total_query_cache_count",
"Group top n appendonly executor total cache count",
- &["table_id", "actor_id"],
+ &["table_id", "actor_id", "fragment_id"],
registry
)
.unwrap();
@@ -560,7 +580,7 @@ impl StreamingMetrics {
let group_top_n_appendonly_cached_entry_count = register_int_gauge_vec_with_registry!(
"stream_group_top_n_appendonly_cached_entry_count",
"Total entry counts in group top n appendonly executor cache",
- &["table_id", "actor_id"],
+ &["table_id", "actor_id", "fragment_id"],
registry
)
.unwrap();
@@ -568,7 +588,7 @@ impl StreamingMetrics {
let lookup_cache_miss_count = register_int_counter_vec_with_registry!(
"stream_lookup_cache_miss_count",
"Lookup executor cache miss count",
- &["table_id", "actor_id"],
+ &["table_id", "actor_id", "fragment_id"],
registry
)
.unwrap();
@@ -576,7 +596,7 @@ impl StreamingMetrics {
let lookup_total_query_cache_count = register_int_counter_vec_with_registry!(
"stream_lookup_total_query_cache_count",
"Lookup executor query cache total count",
- &["table_id", "actor_id"],
+ &["table_id", "actor_id", "fragment_id"],
registry
)
.unwrap();
@@ -584,7 +604,7 @@ impl StreamingMetrics {
let lookup_cached_entry_count = register_int_gauge_vec_with_registry!(
"stream_lookup_cached_entry_count",
"Total entry counts in lookup executor cache",
- &["table_id", "actor_id"],
+ &["table_id", "actor_id", "fragment_id"],
registry
)
.unwrap();
@@ -592,7 +612,7 @@ impl StreamingMetrics {
let temporal_join_cache_miss_count = register_int_counter_vec_with_registry!(
"stream_temporal_join_cache_miss_count",
"Temporal join executor cache miss count",
- &["table_id", "actor_id"],
+ &["table_id", "actor_id", "fragment_id"],
registry
)
.unwrap();
@@ -600,7 +620,7 @@ impl StreamingMetrics {
let temporal_join_total_query_cache_count = register_int_counter_vec_with_registry!(
"stream_temporal_join_total_query_cache_count",
"Temporal join executor query cache total count",
- &["table_id", "actor_id"],
+ &["table_id", "actor_id", "fragment_id"],
registry
)
.unwrap();
@@ -608,7 +628,7 @@ impl StreamingMetrics {
let temporal_join_cached_entry_count = register_int_gauge_vec_with_registry!(
"stream_temporal_join_cached_entry_count",
"Total entry count in temporal join executor cache",
- &["table_id", "actor_id"],
+ &["table_id", "actor_id", "fragment_id"],
registry
)
.unwrap();
@@ -616,7 +636,7 @@ impl StreamingMetrics {
let agg_cached_keys = register_int_gauge_vec_with_registry!(
"stream_agg_cached_keys",
"Number of cached keys in streaming aggregation operators",
- &["table_id", "actor_id"],
+ &["table_id", "actor_id", "fragment_id"],
registry
)
.unwrap();
@@ -624,7 +644,7 @@ impl StreamingMetrics {
let agg_chunk_lookup_miss_count = register_int_counter_vec_with_registry!(
"stream_agg_chunk_lookup_miss_count",
"Aggregation executor chunk-level lookup miss duration",
- &["table_id", "actor_id"],
+ &["table_id", "actor_id", "fragment_id"],
registry
)
.unwrap();
@@ -632,7 +652,7 @@ impl StreamingMetrics {
let agg_chunk_total_lookup_count = register_int_counter_vec_with_registry!(
"stream_agg_chunk_lookup_total_count",
"Aggregation executor chunk-level lookup total operation",
- &["table_id", "actor_id"],
+ &["table_id", "actor_id", "fragment_id"],
registry
)
.unwrap();
diff --git a/src/stream/src/executor/receiver.rs b/src/stream/src/executor/receiver.rs
index 64c4bfabc1b58..a99bbc2347a03 100644
--- a/src/stream/src/executor/receiver.rs
+++ b/src/stream/src/executor/receiver.rs
@@ -140,7 +140,7 @@ impl Executor for ReceiverExecutor {
Message::Chunk(chunk) => {
self.metrics
.actor_in_record_cnt
- .with_label_values(&[&actor_id_str])
+ .with_label_values(&[&actor_id_str, &fragment_id_str])
.inc_by(chunk.cardinality() as _);
}
Message::Barrier(barrier) => {
diff --git a/src/stream/src/executor/temporal_join.rs b/src/stream/src/executor/temporal_join.rs
index 8267edc3154d5..3c8cde63c4ca9 100644
--- a/src/stream/src/executor/temporal_join.rs
+++ b/src/stream/src/executor/temporal_join.rs
@@ -154,10 +154,11 @@ impl TemporalSide {
async fn lookup(&mut self, key: &K, epoch: HummockEpoch) -> StreamExecutorResult {
let table_id_str = self.source.table_id().to_string();
let actor_id_str = self.ctx.id.to_string();
+ let fragment_id_str = self.ctx.id.to_string();
self.ctx
.streaming_metrics
.temporal_join_total_query_cache_count
- .with_label_values(&[&table_id_str, &actor_id_str])
+ .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.inc();
let res = if self.cache.contains(key) {
@@ -168,7 +169,7 @@ impl TemporalSide {
self.ctx
.streaming_metrics
.temporal_join_cache_miss_count
- .with_label_values(&[&table_id_str, &actor_id_str])
+ .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.inc();
let pk_prefix = key.deserialize(&self.join_key_data_types)?;
@@ -414,13 +415,14 @@ impl TemporalJoinExecutor
let table_id_str = self.right_table.source.table_id().to_string();
let actor_id_str = self.ctx.id.to_string();
+ let fragment_id_str = self.ctx.fragment_id.to_string();
#[for_await]
for msg in align_input(self.left, self.right) {
self.right_table.cache.evict();
self.ctx
.streaming_metrics
.temporal_join_cached_entry_count
- .with_label_values(&[&table_id_str, &actor_id_str])
+ .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.set(self.right_table.cache.len() as i64);
match msg? {
InternalMessage::WaterMark(watermark) => {
diff --git a/src/stream/src/executor/top_n/group_top_n.rs b/src/stream/src/executor/top_n/group_top_n.rs
index 78c12ee82f3cd..031517403b0b3 100644
--- a/src/stream/src/executor/top_n/group_top_n.rs
+++ b/src/stream/src/executor/top_n/group_top_n.rs
@@ -184,6 +184,7 @@ where
let keys = K::build(&self.group_by, chunk.data_chunk())?;
let table_id_str = self.managed_state.state_table.table_id().to_string();
let actor_id_str = self.ctx.id.to_string();
+ let fragment_id_str = self.ctx.fragment_id.to_string();
for (r, group_cache_key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
let Some((op, row_ref)) = r else {
continue;
@@ -196,7 +197,7 @@ where
self.ctx
.streaming_metrics
.group_top_n_total_query_cache_count
- .with_label_values(&[&table_id_str, &actor_id_str])
+ .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.inc();
// If 'self.caches' does not already have a cache for the current group, create a new
// cache for it and insert it into `self.caches`
@@ -204,7 +205,7 @@ where
self.ctx
.streaming_metrics
.group_top_n_cache_miss_count
- .with_label_values(&[&table_id_str, &actor_id_str])
+ .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.inc();
let mut topn_cache =
TopNCache::new(self.offset, self.limit, self.schema().data_types());
@@ -241,7 +242,7 @@ where
self.ctx
.streaming_metrics
.group_top_n_cached_entry_count
- .with_label_values(&[&table_id_str, &actor_id_str])
+ .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.set(self.caches.len() as i64);
generate_output(res_rows, res_ops, self.schema())
}
diff --git a/src/stream/src/executor/top_n/group_top_n_appendonly.rs b/src/stream/src/executor/top_n/group_top_n_appendonly.rs
index 140a06984e586..45e05abfd21dc 100644
--- a/src/stream/src/executor/top_n/group_top_n_appendonly.rs
+++ b/src/stream/src/executor/top_n/group_top_n_appendonly.rs
@@ -181,6 +181,7 @@ where
let row_deserializer = RowDeserializer::new(data_types.clone());
let table_id_str = self.managed_state.state_table.table_id().to_string();
let actor_id_str = self.ctx.id.to_string();
+ let fragment_id_str = self.ctx.fragment_id.to_string();
for (r, group_cache_key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
let Some((op, row_ref)) = r else {
continue;
@@ -193,7 +194,7 @@ where
self.ctx
.streaming_metrics
.group_top_n_appendonly_total_query_cache_count
- .with_label_values(&[&table_id_str, &actor_id_str])
+ .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.inc();
// If 'self.caches' does not already have a cache for the current group, create a new
// cache for it and insert it into `self.caches`
@@ -201,7 +202,7 @@ where
self.ctx
.streaming_metrics
.group_top_n_appendonly_cache_miss_count
- .with_label_values(&[&table_id_str, &actor_id_str])
+ .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.inc();
let mut topn_cache = TopNCache::new(self.offset, self.limit, data_types.clone());
self.managed_state
@@ -224,7 +225,7 @@ where
self.ctx
.streaming_metrics
.group_top_n_appendonly_cached_entry_count
- .with_label_values(&[&table_id_str, &actor_id_str])
+ .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str])
.set(self.caches.len() as i64);
generate_output(res_rows, res_ops, self.schema())
}
diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs
index eee41e62f321a..d1afaeb061809 100644
--- a/src/stream/src/task/stream_manager.rs
+++ b/src/stream/src/task/stream_manager.rs
@@ -709,13 +709,14 @@ impl LocalStreamManagerCore {
{
let metrics = self.streaming_metrics.clone();
let actor_id_str = actor_id.to_string();
+ let fragment_id_str = actor_context.fragment_id.to_string();
let allocation_stated = task_stats_alloc::allocation_stat(
instrumented,
Duration::from_millis(1000),
move |bytes| {
metrics
.actor_memory_usage
- .with_label_values(&[&actor_id_str])
+ .with_label_values(&[&actor_id_str, &fragment_id_str])
.set(bytes as i64);
actor_context.store_mem_usage(bytes);
From 01c7c2bed08c447cca9a927e626d95498ce271b8 Mon Sep 17 00:00:00 2001
From: congyi wang <58715567+wcy-fdu@users.noreply.github.com>
Date: Sat, 7 Oct 2023 14:21:21 +0800
Subject: [PATCH 003/107] feat(object storage): add is_https_endpoint for minio
(#12517)
---
src/object_store/src/object/s3.rs | 14 +++++++++++---
1 file changed, 11 insertions(+), 3 deletions(-)
diff --git a/src/object_store/src/object/s3.rs b/src/object_store/src/object/s3.rs
index 90e419567bceb..a53f1a6825281 100644
--- a/src/object_store/src/object/s3.rs
+++ b/src/object_store/src/object/s3.rs
@@ -616,7 +616,16 @@ impl S3ObjectStore {
pub async fn with_minio(server: &str, metrics: Arc) -> Self {
let server = server.strip_prefix("minio://").unwrap();
let (access_key_id, rest) = server.split_once(':').unwrap();
- let (secret_access_key, rest) = rest.split_once('@').unwrap();
+ let (secret_access_key, mut rest) = rest.split_once('@').unwrap();
+ let endpoint_prefix = if let Some(rest_stripped) = rest.strip_prefix("https://") {
+ rest = rest_stripped;
+ "https://"
+ } else if let Some(rest_stripped) = rest.strip_prefix("http://") {
+ rest = rest_stripped;
+ "http://"
+ } else {
+ "http://"
+ };
let (address, bucket) = rest.split_once('/').unwrap();
#[cfg(madsim)]
@@ -626,10 +635,9 @@ impl S3ObjectStore {
aws_sdk_s3::config::Builder::from(&aws_config::ConfigLoader::default().load().await)
.force_path_style(true)
.http_connector(Self::new_http_connector(&S3ObjectStoreConfig::default()));
-
let config = builder
.region(Region::new("custom"))
- .endpoint_url(format!("http://{}", address))
+ .endpoint_url(format!("{}{}", endpoint_prefix, address))
.credentials_provider(Credentials::from_keys(
access_key_id,
secret_access_key,
From 848b0a117c945c515923d3dd4e19ddea9325fbd1 Mon Sep 17 00:00:00 2001
From: William Wen <44139337+wenym1@users.noreply.github.com>
Date: Sat, 7 Oct 2023 15:15:21 +0800
Subject: [PATCH 004/107] feat(sink): kafka async truncate log (#12587)
---
src/common/src/util/future_utils.rs | 10 +
src/common/src/util/mod.rs | 2 +-
src/connector/src/sink/kafka.rs | 249 ++++++++---------
src/connector/src/sink/log_store.rs | 410 +++++++++++++++++++++++++++-
src/connector/src/sink/writer.rs | 46 ----
5 files changed, 531 insertions(+), 186 deletions(-)
diff --git a/src/common/src/util/future_utils.rs b/src/common/src/util/future_utils.rs
index 4b077ef76fdbd..75c38488457ac 100644
--- a/src/common/src/util/future_utils.rs
+++ b/src/common/src/util/future_utils.rs
@@ -14,6 +14,7 @@
use std::future::pending;
+use futures::future::Either;
use futures::{Future, FutureExt, Stream};
/// Convert a list of streams into a [`Stream`] of results from the streams.
@@ -33,3 +34,12 @@ pub fn pending_on_none(future: impl Future
-[![Slack](https://badgen.net/badge/Slack/Join%20RisingWave/0abd59?icon=slack)](https://join.slack.com/t/risingwave-community/shared_invite/zt-120rft0mr-d8uGk3d~NZiZAQWPnElOfw)
+[![Slack](https://badgen.net/badge/Slack/Join%20RisingWave/0abd59?icon=slack)](https://risingwave-community.slack.com/join/shared_invite/zt-1afreobhd-5Npy1oIpUWvDA~Od6zPxTA#/shared-invite/email)
[![Build status](https://badge.buildkite.com/9394d2bca0f87e2e97aa78b25f765c92d4207c0b65e7f6648f.svg)](https://buildkite.com/risingwavelabs/main)
[![codecov](https://codecov.io/gh/risingwavelabs/risingwave/branch/main/graph/badge.svg?token=EB44K9K38B)](https://codecov.io/gh/risingwavelabs/risingwave)
@@ -45,7 +45,7 @@ To learn about how to use RisingWave, refer to [RisingWave User Documentation](h
## Community
-Looking for help, discussions, collaboration opportunities, or a casual afternoon chat with our fellow engineers and community members? Join our [Slack workspace](https://join.slack.com/t/risingwave-community/shared_invite/zt-120rft0mr-d8uGk3d~NZiZAQWPnElOfw)!
+Looking for help, discussions, collaboration opportunities, or a casual afternoon chat with our fellow engineers and community members? Join our [Slack workspace](https://risingwave-community.slack.com/join/shared_invite/zt-1afreobhd-5Npy1oIpUWvDA~Od6zPxTA#/shared-invite/email)!
## License
From 456f3e98d7a00e3c383f8b03ee40b5da09af58d9 Mon Sep 17 00:00:00 2001
From: Li0k
Date: Mon, 9 Oct 2023 14:20:22 +0800
Subject: [PATCH 027/107] chore(storage): upgrade write limit (#12658)
---
.../manager/compaction_group_manager.rs | 17 ++++---
src/meta/src/hummock/manager/mod.rs | 44 ++++++++++++++++++-
2 files changed, 53 insertions(+), 8 deletions(-)
diff --git a/src/meta/src/hummock/manager/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction_group_manager.rs
index 20bf9407d1121..b873a55e8d6ce 100644
--- a/src/meta/src/hummock/manager/compaction_group_manager.rs
+++ b/src/meta/src/hummock/manager/compaction_group_manager.rs
@@ -386,8 +386,9 @@ impl HummockManager {
&self,
compaction_group_ids: &[CompactionGroupId],
config_to_update: &[MutableConfig],
- ) -> Result<()> {
- self.compaction_group_manager
+ ) -> Result> {
+ let result = self
+ .compaction_group_manager
.write()
.await
.update_compaction_config(
@@ -402,7 +403,7 @@ impl HummockManager {
{
self.try_update_write_limits(compaction_group_ids).await;
}
- Ok(())
+ Ok(result)
}
/// Gets complete compaction group info.
@@ -801,13 +802,14 @@ impl CompactionGroupManager {
self.default_config.clone()
}
- async fn update_compaction_config(
+ pub async fn update_compaction_config(
&mut self,
compaction_group_ids: &[CompactionGroupId],
config_to_update: &[MutableConfig],
meta_store: &S,
- ) -> Result<()> {
+ ) -> Result> {
let mut compaction_groups = BTreeMapTransaction::new(&mut self.compaction_groups);
+ let mut result = Vec::with_capacity(compaction_group_ids.len());
for compaction_group_id in compaction_group_ids.iter().unique() {
let group = compaction_groups.get(compaction_group_id).ok_or_else(|| {
Error::CompactionGroup(format!("invalid group {}", *compaction_group_id))
@@ -819,14 +821,15 @@ impl CompactionGroupManager {
}
let mut new_group = group.clone();
new_group.compaction_config = Arc::new(config);
- compaction_groups.insert(*compaction_group_id, new_group);
+ compaction_groups.insert(*compaction_group_id, new_group.clone());
+ result.push(new_group);
}
let mut trx = Transaction::default();
compaction_groups.apply_to_txn(&mut trx)?;
meta_store.txn(trx).await?;
compaction_groups.commit();
- Ok(())
+ Ok(result)
}
/// Initializes the config for a group.
diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs
index bded17217d360..29dd913df7a31 100644
--- a/src/meta/src/hummock/manager/mod.rs
+++ b/src/meta/src/hummock/manager/mod.rs
@@ -28,6 +28,7 @@ use futures::stream::{BoxStream, FuturesUnordered};
use futures::{FutureExt, StreamExt};
use itertools::Itertools;
use parking_lot::Mutex;
+use risingwave_common::config::default::compaction_config;
use risingwave_common::monitor::rwlock::MonitoredRwLock;
use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH};
use risingwave_common::util::{pending_on_none, select_all};
@@ -44,6 +45,7 @@ use risingwave_hummock_sdk::{
};
use risingwave_pb::hummock::compact_task::{self, TaskStatus, TaskType};
use risingwave_pb::hummock::group_delta::DeltaType;
+use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config;
use risingwave_pb::hummock::subscribe_compaction_event_request::{
Event as RequestEvent, HeartBeat, PullTask, ReportTask,
};
@@ -511,7 +513,7 @@ impl HummockManager {
versioning_guard.mark_objects_for_deletion();
let all_group_ids = get_compaction_group_ids(&versioning_guard.current_version);
- let configs = self
+ let mut configs = self
.compaction_group_manager
.write()
.await
@@ -520,6 +522,46 @@ impl HummockManager {
self.env.meta_store(),
)
.await?;
+
+ // We've already lowered the default limit for write limit in PR-12183, and to prevent older clusters from continuing to use the outdated configuration, we've introduced a new logic to rewrite it in a uniform way.
+ let mut rewrite_cg_ids = vec![];
+ for (cg_id, compaction_group_config) in &mut configs {
+ // update write limit
+ let relaxed_default_write_stop_level_count = 1000;
+ if compaction_group_config
+ .compaction_config
+ .level0_sub_level_compact_level_count
+ == relaxed_default_write_stop_level_count
+ {
+ rewrite_cg_ids.push(*cg_id);
+ }
+ }
+
+ if !rewrite_cg_ids.is_empty() {
+ tracing::info!("Compaction group {:?} configs rewrite ", rewrite_cg_ids);
+
+ // update meta store
+ let result = self
+ .compaction_group_manager
+ .write()
+ .await
+ .update_compaction_config(
+ &rewrite_cg_ids,
+ &[
+ mutable_config::MutableConfig::Level0StopWriteThresholdSubLevelNumber(
+ compaction_config::level0_stop_write_threshold_sub_level_number(),
+ ),
+ ],
+ self.env.meta_store(),
+ )
+ .await?;
+
+ // update memory
+ for new_config in result {
+ configs.insert(new_config.group_id(), new_config);
+ }
+ }
+
versioning_guard.write_limit =
calc_new_write_limits(configs, HashMap::new(), &versioning_guard.current_version);
trigger_write_stop_stats(&self.metrics, &versioning_guard.write_limit);
From 2a7ef0d808f17a0e82ee72e6376e7f77c2ba07aa Mon Sep 17 00:00:00 2001
From: William Wen <44139337+wenym1@users.noreply.github.com>
Date: Mon, 9 Oct 2023 14:32:32 +0800
Subject: [PATCH 028/107] fix(log-store): handle unaligned epoch after recovery
(#12407)
---
.../common/log_store_impl/kv_log_store/mod.rs | 248 +++++++++--
.../log_store_impl/kv_log_store/reader.rs | 6 +-
.../log_store_impl/kv_log_store/serde.rs | 392 ++++++++++++------
.../log_store_impl/kv_log_store/test_utils.rs | 160 +++++--
src/stream/src/common/table/test_utils.rs | 43 ++
5 files changed, 660 insertions(+), 189 deletions(-)
diff --git a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs
index a84850b04f069..c93cd2e36eae4 100644
--- a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs
+++ b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs
@@ -101,6 +101,10 @@ impl LogStoreFactory for KvLogStoreFactory {
#[cfg(test)]
mod tests {
+ use std::sync::Arc;
+
+ use risingwave_common::buffer::{Bitmap, BitmapBuilder};
+ use risingwave_common::hash::VirtualNode;
use risingwave_common::util::epoch::EpochPair;
use risingwave_connector::sink::log_store::{
LogReader, LogStoreFactory, LogStoreReadItem, LogWriter, TruncateOffset,
@@ -111,7 +115,8 @@ mod tests {
use risingwave_storage::StateStore;
use crate::common::log_store_impl::kv_log_store::test_utils::{
- gen_stream_chunk, gen_test_log_store_table,
+ calculate_vnode_bitmap, check_rows_eq, check_stream_chunk_eq,
+ gen_multi_vnode_stream_chunks, gen_stream_chunk, gen_test_log_store_table,
};
use crate::common::log_store_impl::kv_log_store::KvLogStoreFactory;
@@ -129,17 +134,18 @@ mod tests {
test_env.register_table(table.clone()).await;
+ let stream_chunk1 = gen_stream_chunk(0);
+ let stream_chunk2 = gen_stream_chunk(10);
+ let bitmap = calculate_vnode_bitmap(stream_chunk1.rows().chain(stream_chunk2.rows()));
+
let factory = KvLogStoreFactory::new(
test_env.storage.clone(),
table.clone(),
- None,
+ Some(Arc::new(bitmap)),
max_stream_chunk_count,
);
let (mut reader, mut writer) = factory.build().await;
- let stream_chunk1 = gen_stream_chunk(0);
- let stream_chunk2 = gen_stream_chunk(10);
-
let epoch1 = test_env
.storage
.get_pinned_version()
@@ -172,7 +178,7 @@ mod tests {
},
) => {
assert_eq!(epoch, epoch1);
- assert_eq!(stream_chunk1, read_stream_chunk);
+ assert!(check_stream_chunk_eq(&stream_chunk1, &read_stream_chunk));
}
_ => unreachable!(),
}
@@ -192,7 +198,7 @@ mod tests {
},
) => {
assert_eq!(epoch, epoch2);
- assert_eq!(stream_chunk2, read_stream_chunk);
+ assert!(check_stream_chunk_eq(&stream_chunk2, &read_stream_chunk));
}
_ => unreachable!(),
}
@@ -219,17 +225,19 @@ mod tests {
test_env.register_table(table.clone()).await;
+ let stream_chunk1 = gen_stream_chunk(0);
+ let stream_chunk2 = gen_stream_chunk(10);
+ let bitmap = calculate_vnode_bitmap(stream_chunk1.rows().chain(stream_chunk2.rows()));
+ let bitmap = Arc::new(bitmap);
+
let factory = KvLogStoreFactory::new(
test_env.storage.clone(),
table.clone(),
- None,
+ Some(bitmap.clone()),
max_stream_chunk_count,
);
let (mut reader, mut writer) = factory.build().await;
- let stream_chunk1 = gen_stream_chunk(0);
- let stream_chunk2 = gen_stream_chunk(10);
-
let epoch1 = test_env
.storage
.get_pinned_version()
@@ -259,7 +267,7 @@ mod tests {
},
) => {
assert_eq!(epoch, epoch1);
- assert_eq!(stream_chunk1, read_stream_chunk);
+ assert!(check_stream_chunk_eq(&stream_chunk1, &read_stream_chunk));
}
_ => unreachable!(),
}
@@ -279,7 +287,7 @@ mod tests {
},
) => {
assert_eq!(epoch, epoch2);
- assert_eq!(stream_chunk2, read_stream_chunk);
+ assert!(check_stream_chunk_eq(&stream_chunk2, &read_stream_chunk));
}
_ => unreachable!(),
}
@@ -310,7 +318,7 @@ mod tests {
let factory = KvLogStoreFactory::new(
test_env.storage.clone(),
table.clone(),
- None,
+ Some(bitmap),
max_stream_chunk_count,
);
let (mut reader, mut writer) = factory.build().await;
@@ -328,7 +336,7 @@ mod tests {
},
) => {
assert_eq!(epoch, epoch1);
- assert_eq!(stream_chunk1, read_stream_chunk);
+ assert!(check_stream_chunk_eq(&stream_chunk1, &read_stream_chunk));
}
_ => unreachable!(),
}
@@ -348,7 +356,7 @@ mod tests {
},
) => {
assert_eq!(epoch, epoch2);
- assert_eq!(stream_chunk2, read_stream_chunk);
+ assert!(check_stream_chunk_eq(&stream_chunk2, &read_stream_chunk));
}
_ => unreachable!(),
}
@@ -375,18 +383,27 @@ mod tests {
test_env.register_table(table.clone()).await;
+ let stream_chunk1_1 = gen_stream_chunk(0);
+ let stream_chunk1_2 = gen_stream_chunk(10);
+ let stream_chunk2 = gen_stream_chunk(20);
+ let stream_chunk3 = gen_stream_chunk(20);
+ let bitmap = calculate_vnode_bitmap(
+ stream_chunk1_1
+ .rows()
+ .chain(stream_chunk1_2.rows())
+ .chain(stream_chunk2.rows())
+ .chain(stream_chunk3.rows()),
+ );
+ let bitmap = Arc::new(bitmap);
+
let factory = KvLogStoreFactory::new(
test_env.storage.clone(),
table.clone(),
- None,
+ Some(bitmap.clone()),
max_stream_chunk_count,
);
let (mut reader, mut writer) = factory.build().await;
- let stream_chunk1_1 = gen_stream_chunk(0);
- let stream_chunk1_2 = gen_stream_chunk(10);
- let stream_chunk2 = gen_stream_chunk(20);
-
let epoch1 = test_env
.storage
.get_pinned_version()
@@ -415,7 +432,7 @@ mod tests {
},
) => {
assert_eq!(epoch, epoch1);
- assert_eq!(stream_chunk1_1, read_stream_chunk);
+ assert!(check_stream_chunk_eq(&stream_chunk1_1, &read_stream_chunk));
chunk_id
}
_ => unreachable!(),
@@ -429,7 +446,7 @@ mod tests {
},
) => {
assert_eq!(epoch, epoch1);
- assert_eq!(stream_chunk1_2, read_stream_chunk);
+ assert!(check_stream_chunk_eq(&stream_chunk1_2, &read_stream_chunk));
chunk_id
}
_ => unreachable!(),
@@ -452,7 +469,7 @@ mod tests {
},
) => {
assert_eq!(epoch, epoch2);
- assert_eq!(stream_chunk2, read_stream_chunk);
+ assert!(check_stream_chunk_eq(&stream_chunk2, &read_stream_chunk));
}
_ => unreachable!(),
}
@@ -491,7 +508,7 @@ mod tests {
let factory = KvLogStoreFactory::new(
test_env.storage.clone(),
table.clone(),
- None,
+ Some(bitmap),
max_stream_chunk_count,
);
let (mut reader, mut writer) = factory.build().await;
@@ -500,7 +517,7 @@ mod tests {
.init(EpochPair::new_test_epoch(epoch3))
.await
.unwrap();
- let stream_chunk3 = gen_stream_chunk(30);
+
writer.write_chunk(stream_chunk3.clone()).await.unwrap();
reader.init().await.unwrap();
@@ -513,7 +530,7 @@ mod tests {
},
) => {
assert_eq!(epoch, epoch1);
- assert_eq!(stream_chunk1_2, read_stream_chunk);
+ assert!(check_stream_chunk_eq(&stream_chunk1_2, &read_stream_chunk));
}
_ => unreachable!(),
}
@@ -533,7 +550,7 @@ mod tests {
},
) => {
assert_eq!(epoch, epoch2);
- assert_eq!(stream_chunk2, read_stream_chunk);
+ assert!(check_stream_chunk_eq(&stream_chunk2, &read_stream_chunk));
}
_ => unreachable!(),
}
@@ -553,7 +570,180 @@ mod tests {
},
) => {
assert_eq!(epoch, epoch3);
- assert_eq!(stream_chunk3, read_stream_chunk);
+ assert!(check_stream_chunk_eq(&stream_chunk3, &read_stream_chunk));
+ }
+ _ => unreachable!(),
+ }
+ }
+
+ #[tokio::test]
+ async fn test_update_vnode_recover() {
+ let test_env = prepare_hummock_test_env().await;
+
+ let table = gen_test_log_store_table();
+
+ test_env.register_table(table.clone()).await;
+
+ fn build_bitmap(indexes: impl Iterator- ) -> Arc {
+ let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT);
+ for i in indexes {
+ builder.set(i, true);
+ }
+ Arc::new(builder.finish())
+ }
+
+ let vnodes1 = build_bitmap((0..VirtualNode::COUNT).filter(|i| i % 2 == 0));
+ let vnodes2 = build_bitmap((0..VirtualNode::COUNT).filter(|i| i % 2 == 1));
+
+ let factory1 =
+ KvLogStoreFactory::new(test_env.storage.clone(), table.clone(), Some(vnodes1), 10);
+ let factory2 =
+ KvLogStoreFactory::new(test_env.storage.clone(), table.clone(), Some(vnodes2), 10);
+ let (mut reader1, mut writer1) = factory1.build().await;
+ let (mut reader2, mut writer2) = factory2.build().await;
+
+ let epoch1 = test_env
+ .storage
+ .get_pinned_version()
+ .version()
+ .max_committed_epoch
+ + 1;
+ writer1
+ .init(EpochPair::new_test_epoch(epoch1))
+ .await
+ .unwrap();
+ writer2
+ .init(EpochPair::new_test_epoch(epoch1))
+ .await
+ .unwrap();
+ reader1.init().await.unwrap();
+ reader2.init().await.unwrap();
+ let [chunk1_1, chunk1_2] = gen_multi_vnode_stream_chunks::<2>(0, 100);
+ writer1.write_chunk(chunk1_1.clone()).await.unwrap();
+ writer2.write_chunk(chunk1_2.clone()).await.unwrap();
+ let epoch2 = epoch1 + 1;
+ writer1.flush_current_epoch(epoch2, false).await.unwrap();
+ writer2.flush_current_epoch(epoch2, false).await.unwrap();
+ let [chunk2_1, chunk2_2] = gen_multi_vnode_stream_chunks::<2>(200, 100);
+ writer1.write_chunk(chunk2_1.clone()).await.unwrap();
+ writer2.write_chunk(chunk2_2.clone()).await.unwrap();
+
+ match reader1.next_item().await.unwrap() {
+ (epoch, LogStoreReadItem::StreamChunk { chunk, .. }) => {
+ assert_eq!(epoch, epoch1);
+ assert!(check_stream_chunk_eq(&chunk1_1, &chunk));
+ }
+ _ => unreachable!(),
+ };
+ match reader1.next_item().await.unwrap() {
+ (epoch, LogStoreReadItem::Barrier { is_checkpoint }) => {
+ assert_eq!(epoch, epoch1);
+ assert!(!is_checkpoint);
+ }
+ _ => unreachable!(),
+ }
+
+ match reader2.next_item().await.unwrap() {
+ (epoch, LogStoreReadItem::StreamChunk { chunk, .. }) => {
+ assert_eq!(epoch, epoch1);
+ assert!(check_stream_chunk_eq(&chunk1_2, &chunk));
+ }
+ _ => unreachable!(),
+ }
+ match reader2.next_item().await.unwrap() {
+ (epoch, LogStoreReadItem::Barrier { is_checkpoint }) => {
+ assert_eq!(epoch, epoch1);
+ assert!(!is_checkpoint);
+ }
+ _ => unreachable!(),
+ }
+
+ // Only reader1 will truncate
+ reader1
+ .truncate(TruncateOffset::Barrier { epoch: epoch1 })
+ .await
+ .unwrap();
+
+ match reader1.next_item().await.unwrap() {
+ (epoch, LogStoreReadItem::StreamChunk { chunk, .. }) => {
+ assert_eq!(epoch, epoch2);
+ assert!(check_stream_chunk_eq(&chunk2_1, &chunk));
+ }
+ _ => unreachable!(),
+ }
+ match reader2.next_item().await.unwrap() {
+ (epoch, LogStoreReadItem::StreamChunk { chunk, .. }) => {
+ assert_eq!(epoch, epoch2);
+ assert!(check_stream_chunk_eq(&chunk2_2, &chunk));
+ }
+ _ => unreachable!(),
+ }
+
+ let epoch3 = epoch2 + 1;
+ writer1.flush_current_epoch(epoch3, true).await.unwrap();
+ writer2.flush_current_epoch(epoch3, true).await.unwrap();
+
+ match reader1.next_item().await.unwrap() {
+ (epoch, LogStoreReadItem::Barrier { is_checkpoint }) => {
+ assert_eq!(epoch, epoch2);
+ assert!(is_checkpoint);
+ }
+ _ => unreachable!(),
+ }
+ match reader2.next_item().await.unwrap() {
+ (epoch, LogStoreReadItem::Barrier { is_checkpoint }) => {
+ assert_eq!(epoch, epoch2);
+ assert!(is_checkpoint);
+ }
+ _ => unreachable!(),
+ }
+
+ // Truncation of reader1 on epoch1 should work because it is before this sync
+ test_env.storage.seal_epoch(epoch1, false);
+ test_env.commit_epoch(epoch2).await;
+ test_env
+ .storage
+ .try_wait_epoch(HummockReadEpoch::Committed(epoch2))
+ .await
+ .unwrap();
+
+ // Recovery
+ test_env.storage.clear_shared_buffer().await.unwrap();
+
+ let vnodes = build_bitmap(0..VirtualNode::COUNT);
+ let factory =
+ KvLogStoreFactory::new(test_env.storage.clone(), table.clone(), Some(vnodes), 10);
+ let (mut reader, mut writer) = factory.build().await;
+ writer.init(EpochPair::new(epoch3, epoch2)).await.unwrap();
+ reader.init().await.unwrap();
+ match reader.next_item().await.unwrap() {
+ (epoch, LogStoreReadItem::StreamChunk { chunk, .. }) => {
+ assert_eq!(epoch, epoch1);
+ assert!(check_stream_chunk_eq(&chunk1_2, &chunk));
+ }
+ _ => unreachable!(),
+ }
+ match reader.next_item().await.unwrap() {
+ (epoch, LogStoreReadItem::Barrier { is_checkpoint }) => {
+ assert_eq!(epoch, epoch1);
+ assert!(!is_checkpoint);
+ }
+ _ => unreachable!(),
+ }
+ match reader.next_item().await.unwrap() {
+ (epoch, LogStoreReadItem::StreamChunk { chunk, .. }) => {
+ assert_eq!(epoch, epoch2);
+ assert!(check_rows_eq(
+ chunk2_1.rows().chain(chunk2_2.rows()),
+ chunk.rows()
+ ));
+ }
+ _ => unreachable!(),
+ }
+ match reader.next_item().await.unwrap() {
+ (epoch, LogStoreReadItem::Barrier { is_checkpoint }) => {
+ assert_eq!(epoch, epoch2);
+ assert!(is_checkpoint);
}
_ => unreachable!(),
}
diff --git a/src/stream/src/common/log_store_impl/kv_log_store/reader.rs b/src/stream/src/common/log_store_impl/kv_log_store/reader.rs
index 4336c3d961626..f8dfa5850af9a 100644
--- a/src/stream/src/common/log_store_impl/kv_log_store/reader.rs
+++ b/src/stream/src/common/log_store_impl/kv_log_store/reader.rs
@@ -35,7 +35,7 @@ use crate::common::log_store_impl::kv_log_store::buffer::{
LogStoreBufferItem, LogStoreBufferReceiver,
};
use crate::common::log_store_impl::kv_log_store::serde::{
- new_log_store_item_stream, KvLogStoreItem, LogStoreItemStream, LogStoreRowSerde,
+ merge_log_store_item_stream, KvLogStoreItem, LogStoreItemMergeStream, LogStoreRowSerde,
};
pub struct KvLogStoreReader {
@@ -51,7 +51,7 @@ pub struct KvLogStoreReader {
first_write_epoch: Option,
/// `Some` means consuming historical log data
- state_store_stream: Option>>>,
+ state_store_stream: Option>>>,
latest_offset: TruncateOffset,
@@ -108,7 +108,7 @@ impl LogReader for KvLogStoreReader
{
"should not init twice"
);
// TODO: set chunk size by config
- self.state_store_stream = Some(Box::pin(new_log_store_item_stream(
+ self.state_store_stream = Some(Box::pin(merge_log_store_item_stream(
streams,
self.serde.clone(),
1024,
diff --git a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs
index 2aa01c42af196..a875206fa29e6 100644
--- a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs
+++ b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs
@@ -18,7 +18,7 @@ use std::sync::Arc;
use anyhow::anyhow;
use bytes::Bytes;
-use futures::stream::{FuturesUnordered, StreamFuture};
+use futures::stream::{FuturesUnordered, Peekable, StreamFuture};
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
use itertools::Itertools;
@@ -41,6 +41,7 @@ use risingwave_common::util::value_encoding::{
use risingwave_connector::sink::log_store::LogStoreResult;
use risingwave_hummock_sdk::key::{next_key, TableKey};
use risingwave_pb::catalog::Table;
+use risingwave_storage::error::StorageError;
use risingwave_storage::row_serde::row_serde_util::serialize_pk_with_vnode;
use risingwave_storage::row_serde::value_serde::ValueRowSerdeNew;
use risingwave_storage::store::StateStoreReadIterStream;
@@ -371,14 +372,18 @@ pub(crate) enum KvLogStoreItem {
Barrier { is_checkpoint: bool },
}
+type BoxPeekableLogStoreItemStream = Pin>>>;
+
struct LogStoreRowOpStream {
serde: LogStoreRowSerde,
/// Streams that have not reached a barrier
- row_streams: FuturesUnordered>>>,
+ row_streams: FuturesUnordered>>,
/// Streams that have reached a barrier
- barrier_streams: Vec>>,
+ barrier_streams: Vec>,
+
+ not_started_streams: Vec<(u64, BoxPeekableLogStoreItemStream)>,
stream_state: StreamState,
}
@@ -387,46 +392,17 @@ impl LogStoreRowOpStream {
pub(crate) fn new(streams: Vec, serde: LogStoreRowSerde) -> Self {
assert!(!streams.is_empty());
Self {
- serde,
- barrier_streams: Vec::with_capacity(streams.len()),
- row_streams: streams
+ serde: serde.clone(),
+ barrier_streams: streams
.into_iter()
- .map(|s| Box::pin(s).into_future())
+ .map(|s| Box::pin(deserialize_stream(s, serde.clone()).peekable()))
.collect(),
+ row_streams: FuturesUnordered::new(),
+ not_started_streams: Vec::new(),
stream_state: StreamState::Uninitialized,
}
}
- fn check_epoch(&self, epoch: u64) -> LogStoreResult<()> {
- match &self.stream_state {
- StreamState::Uninitialized => Ok(()),
- StreamState::AllConsumingRow { curr_epoch }
- | StreamState::BarrierAligning { curr_epoch, .. } => {
- if *curr_epoch != epoch {
- Err(anyhow!(
- "epoch {} does not match with current epoch {}",
- epoch,
- curr_epoch
- ))
- } else {
- Ok(())
- }
- }
-
- StreamState::BarrierEmitted { prev_epoch } => {
- if *prev_epoch >= epoch {
- Err(anyhow!(
- "epoch {} should be greater than prev epoch {}",
- epoch,
- prev_epoch
- ))
- } else {
- Ok(())
- }
- }
- }
- }
-
fn check_is_checkpoint(&self, is_checkpoint: bool) -> LogStoreResult<()> {
if let StreamState::BarrierAligning {
is_checkpoint: curr_is_checkpoint,
@@ -448,11 +424,16 @@ impl LogStoreRowOpStream {
}
#[try_stream(ok = (u64, KvLogStoreItem), error = anyhow::Error)]
- async fn into_log_store_item_stream(self, chunk_size: usize) {
+ async fn into_log_store_item_stream(mut self, chunk_size: usize) {
let mut ops = Vec::with_capacity(chunk_size);
let mut data_chunk_builder =
DataChunkBuilder::new(self.serde.payload_schema.clone(), chunk_size);
+ if !self.init().await? {
+ // no data in all stream
+ return Ok(());
+ }
+
let this = self;
pin_mut!(this);
@@ -483,37 +464,145 @@ impl LogStoreRowOpStream {
}
}
-pub(crate) type LogStoreItemStream = impl Stream- >;
-pub(crate) fn new_log_store_item_stream(
+pub(crate) type LogStoreItemMergeStream
=
+ impl Stream- >;
+pub(crate) fn merge_log_store_item_stream(
streams: Vec
,
serde: LogStoreRowSerde,
chunk_size: usize,
-) -> LogStoreItemStream {
+) -> LogStoreItemMergeStream {
LogStoreRowOpStream::new(streams, serde).into_log_store_item_stream(chunk_size)
}
+type LogStoreItemStream =
+ impl Stream- > + Send;
+fn deserialize_stream(
+ stream: S,
+ serde: LogStoreRowSerde,
+) -> LogStoreItemStream
{
+ stream.map(
+ move |result: Result<_, StorageError>| -> LogStoreResult<(u64, LogStoreRowOp)> {
+ match result {
+ Ok((_key, value)) => serde.deserialize(value),
+ Err(e) => Err(e.into()),
+ }
+ },
+ )
+}
+
impl LogStoreRowOpStream {
+ // Return Ok(false) means all streams have reach the end.
+ async fn init(&mut self) -> LogStoreResult {
+ match &self.stream_state {
+ StreamState::Uninitialized => {}
+ _ => unreachable!("cannot call init for twice"),
+ };
+
+ // before init, all streams are in `barrier_streams`
+ assert!(
+ self.row_streams.is_empty(),
+ "when uninitialized, row_streams should be empty"
+ );
+ assert!(self.not_started_streams.is_empty());
+ assert!(!self.barrier_streams.is_empty());
+
+ for mut stream in self.barrier_streams.drain(..) {
+ match stream.as_mut().peek().await {
+ Some(Ok((epoch, _))) => {
+ self.not_started_streams.push((*epoch, stream));
+ }
+ Some(Err(_)) => match stream.next().await {
+ Some(Err(e)) => {
+ return Err(e);
+ }
+ _ => unreachable!("on peek we have checked it's Some(Err(_))"),
+ },
+ None => {
+ continue;
+ }
+ }
+ }
+
+ if self.not_started_streams.is_empty() {
+ // No stream has data
+ return Ok(false);
+ }
+
+ // sorted by epoch descending. Earlier epoch at the end
+ self.not_started_streams
+ .sort_by_key(|(epoch, _)| u64::MAX - *epoch);
+
+ let (epoch, stream) = self
+ .not_started_streams
+ .pop()
+ .expect("have check non-empty");
+ self.row_streams.push(stream.into_future());
+ while let Some((stream_epoch, _)) = self.not_started_streams.last() && *stream_epoch == epoch {
+ let (_, stream) = self.not_started_streams.pop().expect("should not be empty");
+ self.row_streams.push(stream.into_future());
+ }
+ self.stream_state = StreamState::AllConsumingRow { curr_epoch: epoch };
+ Ok(true)
+ }
+
+ fn may_init_epoch(&mut self, epoch: u64) -> LogStoreResult<()> {
+ let prev_epoch = match &self.stream_state {
+ StreamState::Uninitialized => unreachable!("should have init"),
+ StreamState::BarrierEmitted { prev_epoch } => *prev_epoch,
+ StreamState::AllConsumingRow { curr_epoch }
+ | StreamState::BarrierAligning { curr_epoch, .. } => {
+ return if *curr_epoch != epoch {
+ Err(anyhow!(
+ "epoch {} does not match with current epoch {}",
+ epoch,
+ curr_epoch
+ ))
+ } else {
+ Ok(())
+ };
+ }
+ };
+
+ if prev_epoch >= epoch {
+ return Err(anyhow!(
+ "epoch {} should be greater than prev epoch {}",
+ epoch,
+ prev_epoch
+ ));
+ }
+
+ while let Some((stream_epoch, _)) = self.not_started_streams.last() {
+ if *stream_epoch > epoch {
+ // Current epoch has not reached the first epoch of
+ // the stream. Later streams must also have greater epoch, so break here.
+ break;
+ }
+ if *stream_epoch < epoch {
+ return Err(anyhow!(
+ "current epoch {} has exceed epoch {} of stream not started",
+ epoch,
+ stream_epoch
+ ));
+ }
+ let (_, stream) = self.not_started_streams.pop().expect("should not be empty");
+ self.row_streams.push(stream.into_future());
+ }
+
+ self.stream_state = StreamState::AllConsumingRow { curr_epoch: epoch };
+ Ok(())
+ }
+
async fn next_op(&mut self) -> LogStoreResult