diff --git a/Cargo.lock b/Cargo.lock index a72416c30c3e0..09dc9060e6306 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4714,7 +4714,7 @@ dependencies = [ "log", "murmur3", "once_cell", - "opendal 0.43.0 (registry+https://github.com/rust-lang/crates.io-index)", + "opendal 0.43.0", "ordered-float 3.9.1", "parquet 49.0.0", "prometheus", @@ -6216,8 +6216,9 @@ dependencies = [ [[package]] name = "opendal" -version = "0.43.0" -source = "git+https://github.com/apache/incubator-opendal?rev=9a222e4d72b328a24d5775b1565292f4636bbe69#9a222e4d72b328a24d5775b1565292f4636bbe69" +version = "0.44.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c32736a48ef08a5d2212864e2295c8e54f4d6b352b7f49aa0c29a12fc410ff66" dependencies = [ "anyhow", "async-compat", @@ -6228,6 +6229,7 @@ dependencies = [ "chrono", "flagset", "futures", + "getrandom", "http 0.2.9", "log", "md-5", @@ -7826,15 +7828,16 @@ dependencies = [ [[package]] name = "reqsign" -version = "0.14.5" +version = "0.14.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f52b6eef6975eb2decff7d7e95744c8a6b6bb8558bc9b4230c0a3431a74f59c" +checksum = "dce87f66ba6c6acef277a729f989a0eca946cb9ce6a15bcc036bda0f72d4b9fd" dependencies = [ "anyhow", "async-trait", "base64 0.21.4", "chrono", "form_urlencoded", + "getrandom", "hex", "hmac", "home", @@ -8476,7 +8479,7 @@ dependencies = [ "mysql_common", "nexmark", "num-bigint", - "opendal 0.43.0 (git+https://github.com/apache/incubator-opendal?rev=9a222e4d72b328a24d5775b1565292f4636bbe69)", + "opendal 0.44.0", "parking_lot 0.12.1", "paste", "pretty_assertions", @@ -9044,7 +9047,7 @@ dependencies = [ "itertools 0.12.0", "madsim-aws-sdk-s3", "madsim-tokio", - "opendal 0.43.0 (registry+https://github.com/rust-lang/crates.io-index)", + "opendal 0.44.0", "prometheus", "risingwave_common", "rustls", @@ -12457,7 +12460,6 @@ dependencies = [ "futures-task", "futures-util", "generic-array", - "getrandom", "governor", "hashbrown 0.12.3", "hashbrown 0.14.0", diff --git a/src/batch/src/executor/update.rs b/src/batch/src/executor/update.rs index e75fca228e1e7..b0e1b4750cfcf 100644 --- a/src/batch/src/executor/update.rs +++ b/src/batch/src/executor/update.rs @@ -150,10 +150,7 @@ impl UpdateExecutor { #[cfg(debug_assertions)] table_dml_handle.check_chunk_schema(&stream_chunk); - let cardinality = stream_chunk.cardinality(); - write_handle.write_chunk(stream_chunk).await?; - - Result::Ok(cardinality / 2) + write_handle.write_chunk(stream_chunk).await }; let mut rows_updated = 0; @@ -181,17 +178,21 @@ impl UpdateExecutor { .rows() .zip_eq_debug(updated_data_chunk.rows()) { - let None = builder.append_one_row(row_delete) else { - unreachable!("no chunk should be yielded when appending the deleted row as the chunk size is always even"); - }; - if let Some(chunk) = builder.append_one_row(row_insert) { - rows_updated += write_txn_data(chunk).await?; + rows_updated += 1; + // If row_delete == row_insert, we don't need to do a actual update + if row_delete != row_insert { + let None = builder.append_one_row(row_delete) else { + unreachable!("no chunk should be yielded when appending the deleted row as the chunk size is always even"); + }; + if let Some(chunk) = builder.append_one_row(row_insert) { + write_txn_data(chunk).await?; + } } } } if let Some(chunk) = builder.consume_all() { - rows_updated += write_txn_data(chunk).await?; + write_txn_data(chunk).await?; } write_handle.end().await?; diff --git a/src/compute/src/rpc/service/stream_service.rs b/src/compute/src/rpc/service/stream_service.rs index ac3f35ad0b8a3..22da99e58a414 100644 --- a/src/compute/src/rpc/service/stream_service.rs +++ b/src/compute/src/rpc/service/stream_service.rs @@ -165,7 +165,7 @@ impl StreamService for StreamServiceImpl { } self.mgr - .send_barrier(&barrier, req.actor_ids_to_send, req.actor_ids_to_collect) + .send_barrier(barrier, req.actor_ids_to_send, req.actor_ids_to_collect) .await?; Ok(Response::new(InjectBarrierResponse { diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 82303fd620f29..5d74d749f929d 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -79,7 +79,7 @@ mysql_common = { version = "0.31", default-features = false, features = [ ] } nexmark = { version = "0.2", features = ["serde"] } num-bigint = "0.4" -opendal = { git = "https://github.com/apache/incubator-opendal", rev = "9a222e4d72b328a24d5775b1565292f4636bbe69" } +opendal = "0.44" parking_lot = "0.12" paste = "1" prometheus = { version = "0.13", features = ["process"] } diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index 229907ab3cee8..3f6e9907c8c4a 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -36,6 +36,7 @@ futures = { version = "0.3", default-features = false, features = ["alloc"] } hex = "0.4" hyper = "0.14" itertools = "0.12" +maplit = "1.0.2" memcomparable = { version = "0.2" } mime_guess = "2" num-integer = "0.1" @@ -95,7 +96,6 @@ workspace-hack = { path = "../workspace-hack" } [dev-dependencies] assert_matches = "1" expect-test = "1.4" -maplit = "1.0.2" rand = "0.8" risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] } risingwave_test_runner = { workspace = true } diff --git a/src/meta/model_v2/migration/src/m20231008_020431_hummock.rs b/src/meta/model_v2/migration/src/m20231008_020431_hummock.rs index af5745eeadb5c..f65c396eba3b8 100644 --- a/src/meta/model_v2/migration/src/m20231008_020431_hummock.rs +++ b/src/meta/model_v2/migration/src/m20231008_020431_hummock.rs @@ -173,6 +173,24 @@ impl MigrationTrait for Migration { .to_owned(), ) .await?; + manager + .create_table( + Table::create() + .table(HummockSequence::Table) + .col( + ColumnDef::new(HummockSequence::Name) + .string() + .not_null() + .primary_key(), + ) + .col( + ColumnDef::new(HummockSequence::Seq) + .big_integer() + .not_null(), + ) + .to_owned(), + ) + .await?; Ok(()) } @@ -201,7 +219,8 @@ impl MigrationTrait for Migration { HummockPinnedVersion, HummockPinnedSnapshot, HummockVersionDelta, - HummockVersionStats + HummockVersionStats, + HummockSequence ); Ok(()) } @@ -260,3 +279,10 @@ enum HummockVersionStats { Id, Stats, } + +#[derive(DeriveIden)] +enum HummockSequence { + Table, + Name, + Seq, +} diff --git a/src/meta/model_v2/src/hummock_sequence.rs b/src/meta/model_v2/src/hummock_sequence.rs new file mode 100644 index 0000000000000..5f4ed6cb367a1 --- /dev/null +++ b/src/meta/model_v2/src/hummock_sequence.rs @@ -0,0 +1,28 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Default)] +#[sea_orm(table_name = "hummock_sequence")] +pub struct Model { + #[sea_orm(primary_key, auto_increment = false)] + pub name: String, + pub seq: i64, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/src/meta/model_v2/src/lib.rs b/src/meta/model_v2/src/lib.rs index 4c0973d176505..b7716c769ddc6 100644 --- a/src/meta/model_v2/src/lib.rs +++ b/src/meta/model_v2/src/lib.rs @@ -33,6 +33,7 @@ pub mod fragment; pub mod function; pub mod hummock_pinned_snapshot; pub mod hummock_pinned_version; +pub mod hummock_sequence; pub mod hummock_version_delta; pub mod hummock_version_stats; pub mod index; diff --git a/src/meta/src/hummock/manager/checkpoint.rs b/src/meta/src/hummock/manager/checkpoint.rs index 2709e05053574..25854428c6950 100644 --- a/src/meta/src/hummock/manager/checkpoint.rs +++ b/src/meta/src/hummock/manager/checkpoint.rs @@ -30,9 +30,6 @@ use crate::hummock::error::Result; use crate::hummock::manager::{read_lock, write_lock}; use crate::hummock::metrics_utils::trigger_gc_stat; use crate::hummock::HummockManager; -use crate::storage::{MetaStore, MetaStoreError, DEFAULT_COLUMN_FAMILY}; - -const HUMMOCK_INIT_FLAG_KEY: &[u8] = b"hummock_init_flag"; #[derive(Default)] pub struct HummockVersionCheckpoint { @@ -63,9 +60,8 @@ impl HummockVersionCheckpoint { /// A hummock version checkpoint compacts previous hummock version delta logs, and stores stale /// objects from those delta logs. impl HummockManager { - /// # Panics - /// if checkpoint is not found. - pub async fn read_checkpoint(&self) -> Result { + /// Returns Ok(None) if not found. + pub async fn try_read_checkpoint(&self) -> Result> { use prost::Message; let data = match self .object_store @@ -75,16 +71,13 @@ impl HummockManager { Ok(data) => data, Err(e) => { if e.is_object_not_found_error() { - panic!( - "Hummock version checkpoints do not exist in object store, path: {}", - self.version_checkpoint_path - ); + return Ok(None); } return Err(e.into()); } }; let ckpt = PbHummockVersionCheckpoint::decode(data).map_err(|e| anyhow::anyhow!(e))?; - Ok(HummockVersionCheckpoint::from_protobuf(&ckpt)) + Ok(Some(HummockVersionCheckpoint::from_protobuf(&ckpt))) } pub(super) async fn write_checkpoint( @@ -173,31 +166,6 @@ impl HummockManager { Ok(new_checkpoint_id - old_checkpoint_id) } - pub(super) async fn need_init(&self) -> Result { - match self - .env - .meta_store() - .get_cf(DEFAULT_COLUMN_FAMILY, HUMMOCK_INIT_FLAG_KEY) - .await - { - Ok(_) => Ok(false), - Err(MetaStoreError::ItemNotFound(_)) => Ok(true), - Err(e) => Err(e.into()), - } - } - - pub(super) async fn mark_init(&self) -> Result<()> { - self.env - .meta_store() - .put_cf( - DEFAULT_COLUMN_FAMILY, - HUMMOCK_INIT_FLAG_KEY.to_vec(), - memcomparable::to_vec(&0).unwrap(), - ) - .await - .map_err(Into::into) - } - pub fn pause_version_checkpoint(&self) { self.pause_version_checkpoint.store(true, Ordering::Relaxed); tracing::info!("hummock version checkpoint is paused."); diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 4ea7d7e8b98a6..af17174c0247b 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -101,6 +101,7 @@ pub use versioning::HummockVersionSafePoint; use versioning::*; pub(crate) mod checkpoint; mod compaction; +mod sequence; mod worker; use compaction::*; @@ -479,7 +480,11 @@ impl HummockManager { .map(|version_delta| (version_delta.id, version_delta)) .collect(); - let mut redo_state = if self.need_init().await? { + let checkpoint = self.try_read_checkpoint().await?; + let mut redo_state = if let Some(c) = checkpoint { + versioning_guard.checkpoint = c; + versioning_guard.checkpoint.version.clone() + } else { let default_compaction_config = self .compaction_group_manager .read() @@ -487,6 +492,7 @@ impl HummockManager { .default_compaction_config(); let checkpoint_version = create_init_version(default_compaction_config); tracing::info!("init hummock version checkpoint"); + // This write to meta store is idempotent. So if `write_checkpoint` fails, restarting meta node is fine. HummockVersionStats::default() .insert(self.env.meta_store()) .await?; @@ -495,13 +501,9 @@ impl HummockManager { stale_objects: Default::default(), }; self.write_checkpoint(&versioning_guard.checkpoint).await?; - self.mark_init().await?; checkpoint_version - } else { - // Read checkpoint from object store. - versioning_guard.checkpoint = self.read_checkpoint().await?; - versioning_guard.checkpoint.version.clone() }; + versioning_guard.version_stats = HummockVersionStats::list(self.env.meta_store()) .await? .into_iter() diff --git a/src/meta/src/hummock/manager/sequence.rs b/src/meta/src/hummock/manager/sequence.rs new file mode 100644 index 0000000000000..50ee756de7f0f --- /dev/null +++ b/src/meta/src/hummock/manager/sequence.rs @@ -0,0 +1,121 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![allow(dead_code)] + +use std::collections::HashMap; +use std::num::NonZeroU32; +use std::sync::LazyLock; + +use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; +use risingwave_meta_model_v2::hummock_sequence; +use sea_orm::{ActiveModelTrait, ActiveValue, DatabaseConnection, EntityTrait, TransactionTrait}; +use tokio::sync::Mutex; + +use crate::MetaResult; + +const COMPACTION_TASK_ID: &str = "compaction_task"; +const COMPACTION_GROUP_ID: &str = "compaction_group"; +const SSTABLE_OBJECT_ID: &str = "sstable_object"; +const META_BACKUP_ID: &str = "meta_backup"; + +static SEQ_INIT: LazyLock> = LazyLock::new(|| { + maplit::hashmap! { + COMPACTION_TASK_ID.into() => 1, + COMPACTION_GROUP_ID.into() => StaticCompactionGroupId::End as i64 + 1, + SSTABLE_OBJECT_ID.into() => 1, + META_BACKUP_ID.into() => 1, + } +}); + +pub struct SequenceGenerator { + db: Mutex, +} + +impl SequenceGenerator { + pub fn new(db: DatabaseConnection) -> Self { + Self { db: Mutex::new(db) } + } + + /// Returns start, indicates range [start, start + num). + /// + /// Despite being a serial function, its infrequent invocation allows for acceptable performance. + pub async fn next_interval(&self, ident: &str, num: NonZeroU32) -> MetaResult { + // TODO: add pre-allocation if necessary + let guard = self.db.lock().await; + let txn = guard.begin().await?; + let model: Option = + hummock_sequence::Entity::find_by_id(ident.to_string()) + .one(&txn) + .await + .unwrap(); + let start_seq = match model { + None => { + let init = SEQ_INIT + .get(ident) + .copied() + .unwrap_or_else(|| panic!("seq {ident} not found")); + let active_model = hummock_sequence::ActiveModel { + name: ActiveValue::set(ident.into()), + seq: ActiveValue::set(init + num.get() as i64), + }; + active_model.insert(&txn).await?; + init + } + Some(model) => { + let start_seq = model.seq; + let mut active_model: hummock_sequence::ActiveModel = model.into(); + active_model.seq = ActiveValue::set(start_seq + num.get() as i64); + active_model.update(&txn).await?; + start_seq + } + }; + txn.commit().await?; + Ok(u64::try_from(start_seq).unwrap_or_else(|_| panic!("seq {ident} overflow"))) + } +} + +#[cfg(test)] +mod tests { + use std::num::NonZeroU32; + + use crate::controller::SqlMetaStore; + use crate::hummock::manager::sequence::{SequenceGenerator, COMPACTION_TASK_ID}; + + #[cfg(not(madsim))] + #[tokio::test] + async fn test_seq_gen() { + let store = SqlMetaStore::for_test().await; + let conn = store.conn.clone(); + let s = SequenceGenerator::new(conn); + assert_eq!( + 1, + s.next_interval(COMPACTION_TASK_ID, NonZeroU32::new(1).unwrap()) + .await + .unwrap() + ); + assert_eq!( + 2, + s.next_interval(COMPACTION_TASK_ID, NonZeroU32::new(10).unwrap()) + .await + .unwrap() + ); + assert_eq!( + 12, + s.next_interval(COMPACTION_TASK_ID, NonZeroU32::new(10).unwrap()) + .await + .unwrap() + ); + } +} diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index 0a749eaf98cb0..d8ded0fb1dbd1 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -396,7 +396,7 @@ impl Eq for ActorSplitsAssignment {} impl PartialEq for ActorSplitsAssignment { fn eq(&self, other: &Self) -> bool { - self.splits.len() == other.splits.len() && self.actor_id == other.actor_id + self.splits.len() == other.splits.len() } } @@ -409,12 +409,7 @@ impl PartialOrd for ActorSplitsAssignment { impl Ord for ActorSplitsAssignment { fn cmp(&self, other: &Self) -> Ordering { // Note: this is reversed order, to make BinaryHeap a min heap. - other - .splits - .len() - .cmp(&self.splits.len()) - // To make the BinaryHeap have a deterministic order - .then(other.actor_id.cmp(&self.actor_id)) + other.splits.len().cmp(&self.splits.len()) } } @@ -510,6 +505,12 @@ where for split_id in new_discovered_splits { // ActorSplitsAssignment's Ord is reversed, so this is min heap, i.e., // we get the assignment with the least splits here. + + // Note: If multiple actors have the same number of splits, it will be randomly picked. + // When the number of source actors is larger than the number of splits, + // It's possible that the assignment is uneven. + // e.g., https://github.com/risingwavelabs/risingwave/issues/14324#issuecomment-1875033158 + // TODO: We should make the assignment rack-aware to make sure it's even. let mut peek_ref = heap.peek_mut().unwrap(); peek_ref .splits @@ -1076,46 +1077,29 @@ mod tests { #[test] fn test_reassign_splits() { - fn check( - actor_splits: HashMap>, - discovered_splits: BTreeMap, - expected: expect_test::Expect, - ) { - let diff = reassign_splits( - FragmentId::default(), - actor_splits, - &discovered_splits, - Default::default(), - ) - .map(BTreeMap::from_iter); // ensure deterministic debug string - expected.assert_debug_eq(&diff); - } - let actor_splits = HashMap::new(); let discovered_splits: BTreeMap = BTreeMap::new(); - check( + assert!(reassign_splits( + FragmentId::default(), actor_splits, - discovered_splits, - expect_test::expect![[r#" - None - "#]], - ); + &discovered_splits, + Default::default() + ) + .is_none()); let actor_splits = (0..3).map(|i| (i, vec![])).collect(); let discovered_splits: BTreeMap = BTreeMap::new(); - check( + let diff = reassign_splits( + FragmentId::default(), actor_splits, - discovered_splits, - expect_test::expect![[r#" - Some( - { - 0: [], - 1: [], - 2: [], - }, - ) - "#]], - ); + &discovered_splits, + Default::default(), + ) + .unwrap(); + assert_eq!(diff.len(), 3); + for splits in diff.values() { + assert!(splits.is_empty()) + } let actor_splits = (0..3).map(|i| (i, vec![])).collect(); let discovered_splits: BTreeMap = (0..3) @@ -1124,31 +1108,20 @@ mod tests { (split.id(), split) }) .collect(); - check( + + let diff = reassign_splits( + FragmentId::default(), actor_splits, - discovered_splits, - expect_test::expect![[r#" - Some( - { - 0: [ - TestSplit { - id: 0, - }, - ], - 1: [ - TestSplit { - id: 1, - }, - ], - 2: [ - TestSplit { - id: 2, - }, - ], - }, - ) - "#]], - ); + &discovered_splits, + Default::default(), + ) + .unwrap(); + assert_eq!(diff.len(), 3); + for splits in diff.values() { + assert_eq!(splits.len(), 1); + } + + check_all_splits(&discovered_splits, &diff); let actor_splits = (0..3).map(|i| (i, vec![TestSplit { id: i }])).collect(); let discovered_splits: BTreeMap = (0..5) @@ -1157,82 +1130,46 @@ mod tests { (split.id(), split) }) .collect(); - check( + + let diff = reassign_splits( + FragmentId::default(), actor_splits, - discovered_splits, - expect_test::expect![[r#" - Some( - { - 0: [ - TestSplit { - id: 0, - }, - TestSplit { - id: 3, - }, - ], - 1: [ - TestSplit { - id: 1, - }, - TestSplit { - id: 4, - }, - ], - 2: [ - TestSplit { - id: 2, - }, - ], - }, - ) - "#]], - ); + &discovered_splits, + Default::default(), + ) + .unwrap(); + assert_eq!(diff.len(), 3); + for splits in diff.values() { + let len = splits.len(); + assert!(len == 1 || len == 2); + } + + check_all_splits(&discovered_splits, &diff); let mut actor_splits: HashMap> = (0..3).map(|i| (i, vec![TestSplit { id: i }])).collect(); actor_splits.insert(3, vec![]); actor_splits.insert(4, vec![]); + let discovered_splits: BTreeMap = (0..5) .map(|i| { let split = TestSplit { id: i }; (split.id(), split) }) .collect(); - check( + + let diff = reassign_splits( + FragmentId::default(), actor_splits, - discovered_splits, - expect_test::expect![[r#" - Some( - { - 0: [ - TestSplit { - id: 0, - }, - ], - 1: [ - TestSplit { - id: 1, - }, - ], - 2: [ - TestSplit { - id: 2, - }, - ], - 3: [ - TestSplit { - id: 3, - }, - ], - 4: [ - TestSplit { - id: 4, - }, - ], - }, - ) - "#]], - ); + &discovered_splits, + Default::default(), + ) + .unwrap(); + assert_eq!(diff.len(), 5); + for splits in diff.values() { + assert_eq!(splits.len(), 1); + } + + check_all_splits(&discovered_splits, &diff); } } diff --git a/src/object_store/Cargo.toml b/src/object_store/Cargo.toml index f0d22ffad44cc..95354557dd943 100644 --- a/src/object_store/Cargo.toml +++ b/src/object_store/Cargo.toml @@ -26,7 +26,7 @@ hyper = { version = "0.14", features = ["tcp", "client"] } hyper-rustls = { version = "0.24.2", features = ["webpki-roots"] } hyper-tls = "0.5.0" itertools = "0.12" -opendal = "0.43" +opendal = "0.44" prometheus = { version = "0.13", features = ["process"] } risingwave_common = { workspace = true } rustls = "0.21.8" diff --git a/src/object_store/src/object/opendal_engine/opendal_object_store.rs b/src/object_store/src/object/opendal_engine/opendal_object_store.rs index 1a7f1c62e87e7..9cf89c65f5581 100644 --- a/src/object_store/src/object/opendal_engine/opendal_object_store.rs +++ b/src/object_store/src/object/opendal_engine/opendal_object_store.rs @@ -199,35 +199,6 @@ impl ObjectStore for OpendalObjectStore { } } -impl OpendalObjectStore { - // This function is only used in unit test, as list api will spawn the thread to stat Metakey::ContentLength, - // which will panic in deterministic test. - #[cfg(test)] - async fn list_for_test(&self, prefix: &str) -> ObjectResult { - let object_lister = self.op.lister_with(prefix).recursive(true).await?; - - let stream = stream::unfold(object_lister, |mut object_lister| async move { - match object_lister.next().await { - Some(Ok(object)) => { - let key = object.path().to_string(); - let last_modified = 0_f64; - let total_size = 0_usize; - let metadata = ObjectMetadata { - key, - last_modified, - total_size, - }; - Some((Ok(metadata), object_lister)) - } - Some(Err(err)) => Some((Err(err.into()), object_lister)), - None => None, - } - }); - - Ok(stream.boxed()) - } -} - /// Store multiple parts in a map, and concatenate them on finish. pub struct OpendalStreamingUploader { writer: Writer, @@ -273,7 +244,7 @@ mod tests { async fn list_all(prefix: &str, store: &OpendalObjectStore) -> Vec { store - .list_for_test(prefix) + .list(prefix) .await .unwrap() .try_collect::>() diff --git a/src/stream/src/executor/actor.rs b/src/stream/src/executor/actor.rs index 4008ac1bf3b6a..ffc29c2d25dac 100644 --- a/src/stream/src/executor/actor.rs +++ b/src/stream/src/executor/actor.rs @@ -219,7 +219,7 @@ where .into())); // Collect barriers to local barrier manager - self.context.lock_barrier_manager().collect(id, &barrier); + self.context.barrier_manager().collect(id, &barrier); // Then stop this actor if asked if barrier.is_stop(id) { diff --git a/src/stream/src/executor/chain.rs b/src/stream/src/executor/chain.rs index a4e03cf295641..0228f826a4bac 100644 --- a/src/stream/src/executor/chain.rs +++ b/src/stream/src/executor/chain.rs @@ -125,7 +125,6 @@ impl Executor for ChainExecutor { #[cfg(test)] mod test { use std::default::Default; - use std::sync::Arc; use futures::StreamExt; use risingwave_common::array::stream_chunk::StreamChunkTestExt; @@ -144,8 +143,7 @@ mod test { #[tokio::test] async fn test_basic() { let barrier_manager = LocalBarrierManager::for_test(); - let progress = - CreateMviewProgress::for_test(Arc::new(parking_lot::Mutex::new(barrier_manager))); + let progress = CreateMviewProgress::for_test(barrier_manager); let actor_id = progress.actor_id(); let schema = Schema::new(vec![Field::unnamed(DataType::Int64)]); diff --git a/src/stream/src/executor/values.rs b/src/stream/src/executor/values.rs index cc55d720083a6..1ce6f27ded573 100644 --- a/src/stream/src/executor/values.rs +++ b/src/stream/src/executor/values.rs @@ -155,7 +155,6 @@ impl Executor for ValuesExecutor { #[cfg(test)] mod tests { - use std::sync::Arc; use futures::StreamExt; use risingwave_common::array::{ @@ -174,8 +173,7 @@ mod tests { #[tokio::test] async fn test_values() { let barrier_manager = LocalBarrierManager::for_test(); - let progress = - CreateMviewProgress::for_test(Arc::new(parking_lot::Mutex::new(barrier_manager))); + let progress = CreateMviewProgress::for_test(barrier_manager); let actor_id = progress.actor_id(); let (tx, barrier_receiver) = unbounded_channel(); let value = StructValue::new(vec![Some(1.into()), Some(2.into()), Some(3.into())]); diff --git a/src/stream/src/from_proto/barrier_recv.rs b/src/stream/src/from_proto/barrier_recv.rs index 4fd779105a319..8d834642147f7 100644 --- a/src/stream/src/from_proto/barrier_recv.rs +++ b/src/stream/src/from_proto/barrier_recv.rs @@ -37,7 +37,7 @@ impl ExecutorBuilder for BarrierRecvExecutorBuilder { let (sender, barrier_receiver) = unbounded_channel(); stream .context - .lock_barrier_manager() + .barrier_manager() .register_sender(params.actor_context.id, sender); Ok(BarrierRecvExecutor::new(params.actor_context, params.info, barrier_receiver).boxed()) diff --git a/src/stream/src/from_proto/now.rs b/src/stream/src/from_proto/now.rs index 5ed8145178d85..601c1ec3ad585 100644 --- a/src/stream/src/from_proto/now.rs +++ b/src/stream/src/from_proto/now.rs @@ -36,7 +36,7 @@ impl ExecutorBuilder for NowExecutorBuilder { let (sender, barrier_receiver) = unbounded_channel(); stream .context - .lock_barrier_manager() + .barrier_manager() .register_sender(params.actor_context.id, sender); let state_table = diff --git a/src/stream/src/from_proto/source/trad_source.rs b/src/stream/src/from_proto/source/trad_source.rs index 8234188e9a4fc..7927ed3aa1827 100644 --- a/src/stream/src/from_proto/source/trad_source.rs +++ b/src/stream/src/from_proto/source/trad_source.rs @@ -44,7 +44,7 @@ impl ExecutorBuilder for SourceExecutorBuilder { let (sender, barrier_receiver) = unbounded_channel(); stream .context - .lock_barrier_manager() + .barrier_manager() .register_sender(params.actor_context.id, sender); let system_params = params.env.system_params_manager_ref().get_params(); diff --git a/src/stream/src/from_proto/values.rs b/src/stream/src/from_proto/values.rs index 4518e575e8602..a45393e200a5e 100644 --- a/src/stream/src/from_proto/values.rs +++ b/src/stream/src/from_proto/values.rs @@ -39,7 +39,7 @@ impl ExecutorBuilder for ValuesExecutorBuilder { let (sender, barrier_receiver) = unbounded_channel(); stream .context - .lock_barrier_manager() + .barrier_manager() .register_sender(params.actor_context.id, sender); let progress = stream .context diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 03b044f74ef74..0a8eede9e172a 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -13,18 +13,18 @@ // limitations under the License. use std::collections::{HashMap, HashSet}; +use std::sync::Arc; use anyhow::anyhow; use prometheus::HistogramTimer; use risingwave_pb::stream_plan::barrier::BarrierKind; use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress; -use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::sync::oneshot; use tokio::sync::oneshot::Receiver; use self::managed_state::ManagedBarrierState; use crate::error::{StreamError, StreamResult}; -use crate::executor::*; use crate::task::ActorId; mod managed_state; @@ -35,6 +35,11 @@ mod tests; pub use progress::CreateMviewProgress; use risingwave_storage::StateStoreImpl; +use crate::executor::monitor::StreamingMetrics; +use crate::executor::Barrier; +use crate::task::barrier_manager::progress::BackfillState; +use crate::task::barrier_manager::LocalBarrierEvent::{ReportActorCollected, ReportActorFailure}; + /// If enabled, all actors will be grouped in the same tracing span within one epoch. /// Note that this option will significantly increase the overhead of tracing. pub const ENABLE_BARRIER_AGGREGATION: bool = false; @@ -49,29 +54,53 @@ pub struct CollectResult { pub kind: BarrierKind, } -enum BarrierState { - /// `Local` mode should be only used for tests. In this mode, barriers are not managed or - /// collected, and there's no way to know whether or when a barrier is finished. +enum LocalBarrierEvent { + RegisterSender { + actor_id: ActorId, + sender: UnboundedSender, + }, + InjectBarrier { + barrier: Barrier, + actor_ids_to_send: Vec, + actor_ids_to_collect: Vec, + result_sender: oneshot::Sender>, + }, + Reset, + ReportActorCollected { + actor_id: ActorId, + barrier: Barrier, + }, + ReportActorFailure { + actor_id: ActorId, + err: StreamError, + }, + CollectEpoch { + epoch: u64, + result_sender: oneshot::Sender>, + }, + ReportCreateProgress { + current_epoch: u64, + actor: ActorId, + state: BackfillState, + }, #[cfg(test)] - Local, - - /// In `Managed` mode, barriers are sent and collected according to the request from meta - /// service. When the barrier is finished, the caller can be notified about this. - Managed(ManagedBarrierState), + Flush(oneshot::Sender<()>), } -/// [`LocalBarrierManager`] manages barrier control flow, used by local stream manager. -/// Specifically, [`LocalBarrierManager`] serve barrier injection from meta server, send the +/// [`LocalBarrierWorker`] manages barrier control flow, used by local stream manager. +/// Specifically, [`LocalBarrierWorker`] serve barrier injection from meta server, send the /// barriers to and collect them from all actors, and finally report the progress. -pub struct LocalBarrierManager { +struct LocalBarrierWorker { /// Stores all streaming job source sender. senders: HashMap>>, /// Current barrier collection state. - state: BarrierState, + state: ManagedBarrierState, /// Save collect `CompleteReceiver`. collect_complete_receiver: HashMap, + + streaming_metrics: Arc, } /// Information used after collection. @@ -79,27 +108,77 @@ pub struct CompleteReceiver { /// Notify all actors of completion of collection. pub complete_receiver: Option>>, /// `barrier_inflight_timer`'s metrics. - pub barrier_inflight_timer: Option, + pub barrier_inflight_timer: HistogramTimer, /// The kind of barrier. pub kind: BarrierKind, } -impl LocalBarrierManager { - fn with_state(state: BarrierState) -> Self { +impl LocalBarrierWorker { + fn new(state_store: StateStoreImpl, streaming_metrics: Arc) -> Self { Self { senders: HashMap::new(), - state, + state: ManagedBarrierState::new(state_store), collect_complete_receiver: HashMap::default(), + streaming_metrics, } } - /// Create a [`LocalBarrierManager`] with managed mode. - pub fn new(state_store: StateStoreImpl) -> Self { - Self::with_state(BarrierState::Managed(ManagedBarrierState::new(state_store))) + async fn run(mut self, mut event_rx: UnboundedReceiver) { + while let Some(event) = event_rx.recv().await { + match event { + LocalBarrierEvent::RegisterSender { actor_id, sender } => { + self.register_sender(actor_id, sender); + } + LocalBarrierEvent::InjectBarrier { + barrier, + actor_ids_to_send, + actor_ids_to_collect, + result_sender, + } => { + let timer = self + .streaming_metrics + .barrier_inflight_latency + .start_timer(); + let result = + self.send_barrier(&barrier, actor_ids_to_send, actor_ids_to_collect, timer); + let _ = result_sender.send(result).inspect_err(|e| { + warn!(err=?e, "fail to send inject barrier result"); + }); + } + LocalBarrierEvent::Reset => { + self.reset(); + } + ReportActorCollected { actor_id, barrier } => self.collect(actor_id, &barrier), + ReportActorFailure { actor_id, err } => { + self.notify_failure(actor_id, err); + } + LocalBarrierEvent::CollectEpoch { + epoch, + result_sender, + } => { + let result = self.remove_collect_rx(epoch); + let _ = result_sender.send(result).inspect_err(|e| { + warn!(err=?e.as_ref().map(|_|()), "fail to send collect epoch result"); + }); + } + LocalBarrierEvent::ReportCreateProgress { + current_epoch, + actor, + state, + } => { + self.update_create_mview_progress(current_epoch, actor, state); + } + #[cfg(test)] + LocalBarrierEvent::Flush(sender) => sender.send(()).unwrap(), + } + } } +} +// event handler +impl LocalBarrierWorker { /// Register sender for source actors, used to send barriers. - pub fn register_sender(&mut self, actor_id: ActorId, sender: UnboundedSender) { + fn register_sender(&mut self, actor_id: ActorId, sender: UnboundedSender) { tracing::debug!( target: "events::stream::barrier::manager", actor_id = actor_id, @@ -108,28 +187,16 @@ impl LocalBarrierManager { self.senders.entry(actor_id).or_default().push(sender); } - /// Return all senders. - pub fn all_senders(&self) -> HashSet { - self.senders.keys().cloned().collect() - } - /// Broadcast a barrier to all senders. Save a receiver which will get notified when this /// barrier is finished, in managed mode. - pub fn send_barrier( + fn send_barrier( &mut self, barrier: &Barrier, actor_ids_to_send: impl IntoIterator, actor_ids_to_collect: impl IntoIterator, - timer: Option, + timer: HistogramTimer, ) -> StreamResult<()> { - let to_send = { - let to_send: HashSet = actor_ids_to_send.into_iter().collect(); - match &self.state { - #[cfg(test)] - BarrierState::Local if to_send.is_empty() => self.senders.keys().cloned().collect(), - _ => to_send, - } - }; + let to_send: HashSet = actor_ids_to_send.into_iter().collect(); let to_collect: HashSet = actor_ids_to_collect.into_iter().collect(); debug!( target: "events::stream::barrier::manager::send", @@ -139,19 +206,11 @@ impl LocalBarrierManager { to_collect ); - let rx = match &mut self.state { - #[cfg(test)] - BarrierState::Local => None, - - BarrierState::Managed(state) => { - // There must be some actors to collect from. - assert!(!to_collect.is_empty()); + // There must be some actors to collect from. + assert!(!to_collect.is_empty()); - let (tx, rx) = oneshot::channel(); - state.transform_to_issued(barrier, to_collect, tx)?; - Some(rx) - } - }; + let (tx, rx) = oneshot::channel(); + self.state.transform_to_issued(barrier, to_collect, tx)?; for actor_id in to_send { match self.senders.get(&actor_id) { @@ -192,7 +251,7 @@ impl LocalBarrierManager { self.collect_complete_receiver.insert( barrier.epoch.prev, CompleteReceiver { - complete_receiver: rx, + complete_receiver: Some(rx), barrier_inflight_timer: timer, kind: barrier.kind, }, @@ -201,7 +260,7 @@ impl LocalBarrierManager { } /// Use `prev_epoch` to remove collect rx and return rx. - pub fn remove_collect_rx(&mut self, prev_epoch: u64) -> StreamResult { + fn remove_collect_rx(&mut self, prev_epoch: u64) -> StreamResult { // It's still possible that `collect_complete_receiver` does not contain the target epoch // when receiving collect_barrier request. Because `collect_complete_receiver` could // be cleared when CN is under recovering. We should return error rather than panic. @@ -217,55 +276,116 @@ impl LocalBarrierManager { } /// Reset all internal states. - pub fn reset(&mut self) { + fn reset(&mut self) { self.senders.clear(); self.collect_complete_receiver.clear(); - match &mut self.state { - #[cfg(test)] - BarrierState::Local => {} - - BarrierState::Managed(managed_state) => { - managed_state.clear_all_states(); - } - } + self.state.clear_all_states(); } - /// When a [`StreamConsumer`] (typically [`DispatchExecutor`]) get a barrier, it should report + /// When a [`crate::executor::StreamConsumer`] (typically [`crate::executor::DispatchExecutor`]) get a barrier, it should report /// and collect this barrier with its own `actor_id` using this function. - pub fn collect(&mut self, actor_id: ActorId, barrier: &Barrier) { - match &mut self.state { - #[cfg(test)] - BarrierState::Local => {} - - BarrierState::Managed(managed_state) => { - managed_state.collect(actor_id, barrier); - } - } + fn collect(&mut self, actor_id: ActorId, barrier: &Barrier) { + self.state.collect(actor_id, barrier) } /// When a actor exit unexpectedly, it should report this event using this function, so meta /// will notice actor's exit while collecting. - pub fn notify_failure(&mut self, actor_id: ActorId, err: StreamError) { - match &mut self.state { - #[cfg(test)] - BarrierState::Local => {} + fn notify_failure(&mut self, actor_id: ActorId, err: StreamError) { + self.state.notify_failure(actor_id, err) + } +} - BarrierState::Managed(managed_state) => { - managed_state.notify_failure(actor_id, err); - } +#[derive(Clone)] +pub struct LocalBarrierManager { + barrier_event_sender: UnboundedSender, +} + +impl LocalBarrierManager { + /// Create a [`LocalBarrierWorker`] with managed mode. + pub fn new(state_store: StateStoreImpl, streaming_metrics: Arc) -> Self { + let (tx, rx) = unbounded_channel(); + let worker = LocalBarrierWorker::new(state_store, streaming_metrics); + let _join_handle = tokio::spawn(worker.run(rx)); + Self { + barrier_event_sender: tx, } } + + fn send_event(&self, event: LocalBarrierEvent) { + self.barrier_event_sender + .send(event) + .expect("should be able to send event") + } +} + +impl LocalBarrierManager { + /// Register sender for source actors, used to send barriers. + pub fn register_sender(&self, actor_id: ActorId, sender: UnboundedSender) { + self.send_event(LocalBarrierEvent::RegisterSender { actor_id, sender }); + } + + /// Broadcast a barrier to all senders. Save a receiver which will get notified when this + /// barrier is finished, in managed mode. + pub async fn send_barrier( + &self, + barrier: Barrier, + actor_ids_to_send: impl IntoIterator, + actor_ids_to_collect: impl IntoIterator, + ) -> StreamResult<()> { + let (tx, rx) = oneshot::channel(); + self.send_event(LocalBarrierEvent::InjectBarrier { + barrier, + actor_ids_to_send: actor_ids_to_send.into_iter().collect(), + actor_ids_to_collect: actor_ids_to_collect.into_iter().collect(), + result_sender: tx, + }); + rx.await.expect("should receive response") + } + + /// Use `prev_epoch` to remove collect rx and return rx. + pub async fn remove_collect_rx(&self, prev_epoch: u64) -> StreamResult { + let (tx, rx) = oneshot::channel(); + self.send_event(LocalBarrierEvent::CollectEpoch { + epoch: prev_epoch, + result_sender: tx, + }); + rx.await.expect("should receive response") + } + + /// Reset all internal states. + pub fn reset(&self) { + self.send_event(LocalBarrierEvent::Reset) + } + + /// When a [`crate::executor::StreamConsumer`] (typically [`crate::executor::DispatchExecutor`]) get a barrier, it should report + /// and collect this barrier with its own `actor_id` using this function. + pub fn collect(&self, actor_id: ActorId, barrier: &Barrier) { + self.send_event(ReportActorCollected { + actor_id, + barrier: barrier.clone(), + }) + } + + /// When a actor exit unexpectedly, it should report this event using this function, so meta + /// will notice actor's exit while collecting. + pub fn notify_failure(&self, actor_id: ActorId, err: StreamError) { + self.send_event(ReportActorFailure { actor_id, err }) + } } #[cfg(test)] impl LocalBarrierManager { pub fn for_test() -> Self { - Self::with_state(BarrierState::Local) + Self::new( + StateStoreImpl::for_test(), + Arc::new(StreamingMetrics::unused()), + ) } - /// Returns whether [`BarrierState`] is `Local`. - pub fn is_local_mode(&self) -> bool { - !matches!(self.state, BarrierState::Managed(_)) + pub async fn flush_all_events(&self) { + let (tx, rx) = oneshot::channel(); + self.send_event(LocalBarrierEvent::Flush(tx)); + rx.await.unwrap() } } diff --git a/src/stream/src/task/barrier_manager/progress.rs b/src/stream/src/task/barrier_manager/progress.rs index e3ce3873146ed..2b83050a57d9a 100644 --- a/src/stream/src/task/barrier_manager/progress.rs +++ b/src/stream/src/task/barrier_manager/progress.rs @@ -12,9 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - -use super::{BarrierState, LocalBarrierManager}; +use super::LocalBarrierManager; +use crate::task::barrier_manager::LocalBarrierEvent::ReportCreateProgress; +use crate::task::barrier_manager::LocalBarrierWorker; use crate::task::{ActorId, SharedContext}; type ConsumedEpoch = u64; @@ -26,25 +26,33 @@ pub(super) enum BackfillState { Done(ConsumedRows), } -impl LocalBarrierManager { - fn update_create_mview_progress( +impl LocalBarrierWorker { + pub(crate) fn update_create_mview_progress( &mut self, current_epoch: u64, actor: ActorId, state: BackfillState, ) { - match &mut self.state { - #[cfg(test)] - BarrierState::Local => {} + self.state + .create_mview_progress + .entry(current_epoch) + .or_default() + .insert(actor, state); + } +} - BarrierState::Managed(managed_state) => { - managed_state - .create_mview_progress - .entry(current_epoch) - .or_default() - .insert(actor, state); - } - } +impl LocalBarrierManager { + fn update_create_mview_progress( + &self, + current_epoch: u64, + actor: ActorId, + state: BackfillState, + ) { + self.send_event(ReportCreateProgress { + current_epoch, + actor, + state, + }) } } @@ -79,7 +87,7 @@ impl LocalBarrierManager { /// for arrangement backfill. We can use that to estimate the progress as well, and avoid recording /// `row_count` state for it. pub struct CreateMviewProgress { - barrier_manager: Arc>, + barrier_manager: LocalBarrierManager, /// The id of the actor containing the backfill executors. backfill_actor_id: ActorId, @@ -88,10 +96,7 @@ pub struct CreateMviewProgress { } impl CreateMviewProgress { - pub fn new( - barrier_manager: Arc>, - backfill_actor_id: ActorId, - ) -> Self { + pub fn new(barrier_manager: LocalBarrierManager, backfill_actor_id: ActorId) -> Self { Self { barrier_manager, backfill_actor_id, @@ -100,7 +105,7 @@ impl CreateMviewProgress { } #[cfg(test)] - pub fn for_test(barrier_manager: Arc>) -> Self { + pub fn for_test(barrier_manager: LocalBarrierManager) -> Self { Self::new(barrier_manager, 0) } @@ -110,7 +115,7 @@ impl CreateMviewProgress { fn update_inner(&mut self, current_epoch: u64, state: BackfillState) { self.state = Some(state); - self.barrier_manager.lock().update_create_mview_progress( + self.barrier_manager.update_create_mview_progress( current_epoch, self.backfill_actor_id, state, diff --git a/src/stream/src/task/barrier_manager/tests.rs b/src/stream/src/task/barrier_manager/tests.rs index fa129b411b0f2..49e8f0819f7fb 100644 --- a/src/stream/src/task/barrier_manager/tests.rs +++ b/src/stream/src/task/barrier_manager/tests.rs @@ -21,8 +21,7 @@ use super::*; #[tokio::test] async fn test_managed_barrier_collection() -> StreamResult<()> { - let mut manager = LocalBarrierManager::new(StateStoreImpl::for_test()); - assert!(!manager.is_local_mode()); + let manager = LocalBarrierManager::for_test(); let register_sender = |actor_id: u32| { let (barrier_tx, barrier_rx) = unbounded_channel(); @@ -43,9 +42,10 @@ async fn test_managed_barrier_collection() -> StreamResult<()> { let epoch = 114514; let barrier = Barrier::new_test_barrier(epoch); manager - .send_barrier(&barrier, actor_ids.clone(), actor_ids, None) + .send_barrier(barrier.clone(), actor_ids.clone(), actor_ids) + .await .unwrap(); - let mut complete_receiver = manager.remove_collect_rx(barrier.epoch.prev)?; + let mut complete_receiver = manager.remove_collect_rx(barrier.epoch.prev).await?; // Collect barriers from actors let collected_barriers = rxs .iter_mut() @@ -59,6 +59,7 @@ async fn test_managed_barrier_collection() -> StreamResult<()> { // Report to local barrier manager for (i, (actor_id, barrier)) in collected_barriers.into_iter().enumerate() { manager.collect(actor_id, &barrier); + manager.flush_all_events().await; let notified = complete_receiver .complete_receiver .as_mut() @@ -73,8 +74,7 @@ async fn test_managed_barrier_collection() -> StreamResult<()> { #[tokio::test] async fn test_managed_barrier_collection_before_send_request() -> StreamResult<()> { - let mut manager = LocalBarrierManager::new(StateStoreImpl::for_test()); - assert!(!manager.is_local_mode()); + let manager = LocalBarrierManager::for_test(); let register_sender = |actor_id: u32| { let (barrier_tx, barrier_rx) = unbounded_channel(); @@ -107,9 +107,10 @@ async fn test_managed_barrier_collection_before_send_request() -> StreamResult<( // Send the barrier to all actors manager - .send_barrier(&barrier, actor_ids_to_send, actor_ids_to_collect, None) + .send_barrier(barrier.clone(), actor_ids_to_send, actor_ids_to_collect) + .await .unwrap(); - let mut complete_receiver = manager.remove_collect_rx(barrier.epoch.prev)?; + let mut complete_receiver = manager.remove_collect_rx(barrier.epoch.prev).await?; // Collect barriers from actors let collected_barriers = rxs @@ -124,6 +125,7 @@ async fn test_managed_barrier_collection_before_send_request() -> StreamResult<( // Report to local barrier manager for (i, (actor_id, barrier)) in collected_barriers.into_iter().enumerate() { manager.collect(actor_id, &barrier); + manager.flush_all_events().await; let notified = complete_receiver .complete_receiver .as_mut() diff --git a/src/stream/src/task/mod.rs b/src/stream/src/task/mod.rs index 42ad57aa86d79..b1f1da526bcc5 100644 --- a/src/stream/src/task/mod.rs +++ b/src/stream/src/task/mod.rs @@ -34,6 +34,8 @@ pub use env::*; use risingwave_storage::StateStoreImpl; pub use stream_manager::*; +use crate::executor::monitor::StreamingMetrics; + pub type ConsumableChannelPair = (Option, Option); pub type ActorId = u32; pub type FragmentId = u32; @@ -77,7 +79,7 @@ pub struct SharedContext { // disconnected. pub(crate) compute_client_pool: ComputeClientPool, - pub(crate) barrier_manager: Arc>, + pub(crate) barrier_manager: LocalBarrierManager, pub(crate) config: StreamingConfig, } @@ -91,13 +93,18 @@ impl std::fmt::Debug for SharedContext { } impl SharedContext { - pub fn new(addr: HostAddr, state_store: StateStoreImpl, config: &StreamingConfig) -> Self { + pub fn new( + addr: HostAddr, + state_store: StateStoreImpl, + config: &StreamingConfig, + streaming_metrics: Arc, + ) -> Self { Self { channel_map: Default::default(), actor_infos: Default::default(), addr, compute_client_pool: ComputeClientPool::default(), - barrier_manager: Arc::new(Mutex::new(LocalBarrierManager::new(state_store))), + barrier_manager: LocalBarrierManager::new(state_store, streaming_metrics), config: config.clone(), } } @@ -111,9 +118,7 @@ impl SharedContext { actor_infos: Default::default(), addr: LOCAL_TEST_ADDR.clone(), compute_client_pool: ComputeClientPool::default(), - barrier_manager: Arc::new(Mutex::new(LocalBarrierManager::new( - StateStoreImpl::for_test(), - ))), + barrier_manager: LocalBarrierManager::for_test(), config: StreamingConfig { developer: StreamingDeveloperConfig { exchange_initial_permits: permit::for_test::INITIAL_PERMITS, @@ -126,8 +131,8 @@ impl SharedContext { } } - pub fn lock_barrier_manager(&self) -> MutexGuard<'_, LocalBarrierManager> { - self.barrier_manager.lock() + pub fn barrier_manager(&self) -> &LocalBarrierManager { + &self.barrier_manager } /// Get the channel pair for the given actor ids. If the channel pair does not exist, create one diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index a984f2cee58a3..57c5f12612aae 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -101,7 +101,6 @@ pub struct LocalStreamManager { // Maintain a copy of the core to reduce async locks state_store: StateStoreImpl, context: Arc, - streaming_metrics: Arc, total_mem_val: Arc>, } @@ -174,7 +173,6 @@ impl LocalStreamManager { Self { state_store: core.state_store.clone(), context: core.context.clone(), - streaming_metrics: core.streaming_metrics.clone(), total_mem_val: core.total_mem_val.clone(), core: Mutex::new(core), } @@ -238,40 +236,33 @@ impl LocalStreamManager { /// Broadcast a barrier to all senders. Save a receiver in barrier manager pub async fn send_barrier( &self, - barrier: &Barrier, + barrier: Barrier, actor_ids_to_send: impl IntoIterator, actor_ids_to_collect: impl IntoIterator, ) -> StreamResult<()> { - let timer = self - .streaming_metrics - .barrier_inflight_latency - .start_timer(); if barrier.kind == BarrierKind::Initial { let core = self.core.lock().await; core.get_watermark_epoch() .store(barrier.epoch.curr, std::sync::atomic::Ordering::SeqCst); } - let mut barrier_manager = self.context.lock_barrier_manager(); - barrier_manager.send_barrier( - barrier, - actor_ids_to_send, - actor_ids_to_collect, - Some(timer), - )?; + let barrier_manager = self.context.barrier_manager(); + barrier_manager + .send_barrier(barrier, actor_ids_to_send, actor_ids_to_collect) + .await?; Ok(()) } /// Reset the state of the barrier manager. pub fn reset_barrier_manager(&self) { - self.context.lock_barrier_manager().reset(); + self.context.barrier_manager().reset(); } /// Use `epoch` to find collect rx. And wait for all actor to be collected before /// returning. pub async fn collect_barrier(&self, epoch: u64) -> StreamResult { let complete_receiver = { - let mut barrier_manager = self.context.lock_barrier_manager(); - barrier_manager.remove_collect_rx(epoch)? + let barrier_manager = self.context.barrier_manager(); + barrier_manager.remove_collect_rx(epoch).await? }; // Wait for all actors finishing this barrier. let result = complete_receiver @@ -279,10 +270,7 @@ impl LocalStreamManager { .expect("no rx for local mode") .await .context("failed to collect barrier")??; - complete_receiver - .barrier_inflight_timer - .expect("no timer for test") - .observe_duration(); + complete_receiver.barrier_inflight_timer.observe_duration(); Ok(result) } @@ -314,23 +302,6 @@ impl LocalStreamManager { }); } - /// Broadcast a barrier to all senders. Returns immediately, and caller won't be notified when - /// this barrier is finished. - #[cfg(test)] - pub fn send_barrier_for_test(&self, barrier: &Barrier) -> StreamResult<()> { - use std::iter::empty; - - let mut barrier_manager = self.context.lock_barrier_manager(); - assert!(barrier_manager.is_local_mode()); - let timer = self - .streaming_metrics - .barrier_inflight_latency - .start_timer(); - barrier_manager.send_barrier(barrier, empty(), empty(), Some(timer))?; - barrier_manager.remove_collect_rx(barrier.epoch.prev)?; - Ok(()) - } - /// Drop the resources of the given actors. pub async fn drop_actors(&self, actors: &[ActorId]) -> StreamResult<()> { let mut core = self.core.lock().await; @@ -404,7 +375,12 @@ impl LocalStreamManagerCore { config: StreamingConfig, await_tree_config: Option, ) -> Self { - let context = SharedContext::new(addr, state_store.clone(), &config); + let context = SharedContext::new( + addr, + state_store.clone(), + &config, + streaming_metrics.clone(), + ); Self::new_inner( state_store, context, @@ -693,7 +669,7 @@ impl LocalStreamManagerCore { // TODO: check error type and panic if it's unexpected. // Intentionally use `?` on the report to also include the backtrace. tracing::error!(actor_id, error = ?err.as_report(), "actor exit with error"); - context.lock_barrier_manager().notify_failure(actor_id, err); + context.barrier_manager().notify_failure(actor_id, err); } }; let traced = match &mut self.await_tree_reg { diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 7893c3086d776..70f9c75efb9fa 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -55,7 +55,6 @@ futures-sink = { version = "0.3" } futures-task = { version = "0.3" } futures-util = { version = "0.3", features = ["channel", "io", "sink"] } generic-array = { version = "0.14", default-features = false, features = ["more_lengths", "zeroize"] } -getrandom = { git = "https://github.com/madsim-rs/getrandom.git", rev = "e79a7ae", default-features = false, features = ["std"] } governor = { version = "0.6", default-features = false, features = ["dashmap", "jitter", "std"] } hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", features = ["nightly", "raw"] } hashbrown-5ef9efb8ec2df382 = { package = "hashbrown", version = "0.12", features = ["nightly", "raw"] } @@ -104,7 +103,7 @@ redis = { version = "0.24", features = ["async-std-comp", "tokio-comp"] } regex = { version = "1" } regex-automata = { version = "0.4", default-features = false, features = ["dfa", "hybrid", "meta", "nfa", "perf", "unicode"] } regex-syntax = { version = "0.8" } -reqwest = { version = "0.11", features = ["blocking", "json", "rustls-tls", "rustls-tls-native-roots", "stream"] } +reqwest = { version = "0.11", features = ["blocking", "json", "rustls-tls"] } ring = { version = "0.16", features = ["std"] } rust_decimal = { version = "1", features = ["db-postgres", "maths"] } scopeguard = { version = "1" } @@ -156,7 +155,6 @@ deranged = { version = "0.3", default-features = false, features = ["powerfmt", either = { version = "1", features = ["serde"] } fixedbitset = { version = "0.4" } frunk_core = { version = "0.4", default-features = false, features = ["std"] } -getrandom = { git = "https://github.com/madsim-rs/getrandom.git", rev = "e79a7ae", default-features = false, features = ["std"] } hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", features = ["nightly", "raw"] } itertools = { version = "0.11" } lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] }