From 2c62ae9528ad63e222f68a136d91032b123f1992 Mon Sep 17 00:00:00 2001 From: zwang28 <70626450+zwang28@users.noreply.github.com> Date: Thu, 25 Aug 2022 16:51:39 +0800 Subject: [PATCH] feat(test): support sync point for integration test (#4855) * feat(test): support sync point in test and add demo tests. * add comments * add comments * move sync to dedicated folder * add license header * rename sync* to sync_point* Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- .gitignore | 1 + Cargo.lock | 42 +++ Cargo.toml | 1 + risedev.yml | 22 ++ src/cmd_all/Cargo.toml | 1 + src/cmd_all/src/playground.rs | 7 +- src/common/src/util/mod.rs | 2 + src/common/src/util/sync_point/mod.rs | 31 ++ src/common/src/util/sync_point/utils.rs | 169 +++++++++++ src/meta/src/hummock/compaction_scheduler.rs | 4 + src/meta/src/hummock/mod.rs | 2 + src/meta/src/hummock/vacuum.rs | 5 +- src/meta/src/lib.rs | 4 +- src/meta/src/manager/env.rs | 4 +- src/meta/src/rpc/service/hummock_service.rs | 2 + src/object_store/src/object/mod.rs | 2 +- src/risedevtool/src/service_config.rs | 3 + src/risedevtool/src/task/meta_node_service.rs | 7 + src/storage/src/hummock/compactor/mod.rs | 3 + src/tests/sync_point/Cargo.toml | 24 ++ src/tests/sync_point/README.md | 9 + src/tests/sync_point/run_sync_point_test.sh | 15 + .../sync_point/slt/tpch_snapshot_no_drop.slt | 33 +++ src/tests/sync_point/src/lib.rs | 18 ++ src/tests/sync_point/src/test_utils.rs | 159 ++++++++++ src/tests/sync_point/src/tests.rs | 279 ++++++++++++++++++ 26 files changed, 842 insertions(+), 7 deletions(-) create mode 100644 src/common/src/util/sync_point/mod.rs create mode 100644 src/common/src/util/sync_point/utils.rs create mode 100644 src/tests/sync_point/Cargo.toml create mode 100644 src/tests/sync_point/README.md create mode 100644 src/tests/sync_point/run_sync_point_test.sh create mode 100644 src/tests/sync_point/slt/tpch_snapshot_no_drop.slt create mode 100644 src/tests/sync_point/src/lib.rs create mode 100644 src/tests/sync_point/src/test_utils.rs create mode 100644 src/tests/sync_point/src/tests.rs diff --git a/.gitignore b/.gitignore index 26004e8c0b00a..3d778ca6e5cf8 100644 --- a/.gitignore +++ b/.gitignore @@ -54,3 +54,4 @@ risedev-compose.yml ft.txt # generated output from regress tests src/tests/regress/output/* +src/tests/sync_point/slt/e2e_test diff --git a/Cargo.lock b/Cargo.lock index 6d79d3d04d767..e88d01639afbb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4219,6 +4219,7 @@ dependencies = [ "log", "madsim-tokio", "risedev", + "risingwave_common", "risingwave_compactor", "risingwave_compute", "risingwave_ctl", @@ -5029,6 +5030,21 @@ dependencies = [ "workspace-hack", ] +[[package]] +name = "risingwave_sync_point_test" +version = "0.1.0" +dependencies = [ + "bytes", + "itertools", + "madsim-tokio", + "risingwave_cmd_all", + "risingwave_common", + "risingwave_object_store", + "risingwave_pb", + "risingwave_rpc_client", + "serial_test", +] + [[package]] name = "risingwave_test_runner" version = "0.1.11" @@ -5333,6 +5349,32 @@ dependencies = [ "unsafe-libyaml", ] +[[package]] +name = "serial_test" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92761393ee4dc3ff8f4af487bd58f4307c9329bbedea02cac0089ad9c411e153" +dependencies = [ + "dashmap", + "futures", + "lazy_static", + "log", + "parking_lot 0.12.1", + "serial_test_derive", +] + +[[package]] +name = "serial_test_derive" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b6f5d1c3087fb119617cff2966fe3808a80e5eb59a8c1601d5994d66f4346a5" +dependencies = [ + "proc-macro-error", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "sha-1" version = "0.10.0" diff --git a/Cargo.toml b/Cargo.toml index 2e28712713b3b..7ffa2ed6ca9ae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ members = [ "src/tests/regress", "src/tests/simulation", "src/tests/sqlsmith", + "src/tests/sync_point", "src/tracing", "src/utils/async_stack_trace", "src/utils/memcomparable", diff --git a/risedev.yml b/risedev.yml index 7bc2b62482ac1..de4e344a5d1a9 100644 --- a/risedev.yml +++ b/risedev.yml @@ -121,6 +121,18 @@ risedev: - use: compute-node - use: frontend + playground-test: + - use: meta-node + enable-dashboard-v2: false + unsafe-disable-recovery: true + max-idle-secs-to-exit: 1800 + vacuum-interval-sec: 1 + collect-gc-watermark-spin-interval-sec: 1 + min-sst-retention-time-sec: 0 + - use: compute-node + - use: frontend + - use: compactor + playground-3cn: - use: meta-node enable-dashboard-v2: false @@ -516,6 +528,16 @@ template: # Whether to disable recovery mode unsafe-disable-recovery: false + # Interval of GC stale metadata and SST + vacuum-interval-sec: 30 + + # The spin interval when collecting global GC watermark in hummock + collect-gc-watermark-spin-interval-sec: 5 + + # Threshold used by worker node to filter out new SSTs when scanning object store, during full SST GC. + # 7 days by default. + min-sst-retention-time-sec: 604800 + prometheus: # Advertise address of Prometheus address: "127.0.0.1" diff --git a/src/cmd_all/Cargo.toml b/src/cmd_all/Cargo.toml index 6b4df71390d98..9e8d5545ef0ea 100644 --- a/src/cmd_all/Cargo.toml +++ b/src/cmd_all/Cargo.toml @@ -11,6 +11,7 @@ anyhow = "1" clap = { version = "3", features = ["derive"] } log = { version = "0.4", features = ["release_max_level_info"] } risedev = { path = "../risedevtool" } +risingwave_common = { path = "../common" } risingwave_compactor = { path = "../storage/compactor" } risingwave_compute = { path = "../compute" } risingwave_ctl = { path = "../ctl" } diff --git a/src/cmd_all/src/playground.rs b/src/cmd_all/src/playground.rs index 99bd34266bb5f..21767a47b8782 100644 --- a/src/cmd_all/src/playground.rs +++ b/src/cmd_all/src/playground.rs @@ -59,6 +59,7 @@ pub async fn playground() -> Result<()> { } else { "playground".to_string() }; + let force_shared_hummock_in_mem = std::env::var("FORCE_SHARED_HUMMOCK_IN_MEM").is_ok(); // TODO: may allow specifying the config file for the playground. let apply_config_file = |cmd: &mut Command| { @@ -94,7 +95,7 @@ pub async fn playground() -> Result<()> { ComputeNodeService::apply_command_args( &mut command, c, - if compute_node_count > 1 { + if force_shared_hummock_in_mem || compute_node_count > 1 { HummockInMemoryStrategy::Shared } else { HummockInMemoryStrategy::Isolated @@ -188,6 +189,10 @@ pub async fn playground() -> Result<()> { } } + risingwave_common::util::sync_point::on_sync_point("CLUSTER_READY") + .await + .unwrap(); + // TODO: should we join all handles? // Currently, not all services can be shutdown gracefully, just quit on Ctrl-C now. signal::ctrl_c().await.unwrap(); diff --git a/src/common/src/util/mod.rs b/src/common/src/util/mod.rs index 660162836d20c..1c6cb7f2155a2 100644 --- a/src/common/src/util/mod.rs +++ b/src/common/src/util/mod.rs @@ -34,6 +34,8 @@ pub mod try_match; pub mod epoch; mod future_utils; pub mod scan_range; +pub mod sync_point; + pub mod value_encoding; pub mod worker_util; diff --git a/src/common/src/util/sync_point/mod.rs b/src/common/src/util/sync_point/mod.rs new file mode 100644 index 0000000000000..85e9b8cf6f079 --- /dev/null +++ b/src/common/src/util/sync_point/mod.rs @@ -0,0 +1,31 @@ +// Copyright 2022 Singularity Data +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#[cfg(sync_point_test)] +mod utils; +#[cfg(sync_point_test)] +pub use utils::*; + +#[cfg(not(sync_point_test))] +#[inline(always)] +#[expect(clippy::unused_async)] +pub async fn on_sync_point(_sync_point: &str) -> Result<(), Error> { + Ok(()) +} + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("Wait for signal {0} timeout")] + WaitForSignalTimeout(String), +} diff --git a/src/common/src/util/sync_point/utils.rs b/src/common/src/util/sync_point/utils.rs new file mode 100644 index 0000000000000..379dc65dd45e1 --- /dev/null +++ b/src/common/src/util/sync_point/utils.rs @@ -0,0 +1,169 @@ +// Copyright 2022 Singularity Data +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::ops::DerefMut; +use std::sync::Arc; +use std::time::Duration; + +use crate::util::sync_point::Error; + +pub type SyncPoint = String; +pub type Signal = String; + +#[derive(Clone)] +pub enum Action { + WaitForSignal(WaitForSignal), + EmitSignal(Signal), +} + +#[derive(Clone)] +pub struct WaitForSignal { + /// The signal being waited for. + pub signal: Signal, + /// Whether to stop the signal from further propagation after receiving one. + /// + /// If true, the signal is relayed and another waiter is signalled right away. + /// + /// If false, other waiter needs to wait for another signal. + pub relay_signal: bool, + /// Max duration to wait for. + pub timeout: Duration, +} + +lazy_static::lazy_static! { + static ref SYNC_FACILITY: SyncFacility = { + SyncFacility::new() + }; +} + +/// A `SyncPoint` is activated by attaching a `SyncPointInfo` to it. +#[derive(Clone)] +struct SyncPointInfo { + /// `Action`s to be executed when `SyncPoint` is triggered. + actions: Vec, + /// The `SyncPoint` is deactivated after triggered `execute_times`. + execute_times: u64, +} + +struct SyncFacility { + /// `Notify` for each `Signal`. + signals: parking_lot::Mutex>>, + /// `SyncPointInfo` for active `SyncPoint`. + sync_points: parking_lot::Mutex>, +} + +impl SyncFacility { + fn new() -> Self { + Self { + signals: Default::default(), + sync_points: Default::default(), + } + } + + async fn wait_for_signal(&self, wait_for_signal: WaitForSignal) -> Result<(), Error> { + let entry = self + .signals + .lock() + .entry(wait_for_signal.signal.to_owned()) + .or_insert_with(|| Arc::new(tokio::sync::Notify::new())) + .clone(); + match tokio::time::timeout(wait_for_signal.timeout, entry.notified()).await { + Ok(_) => { + if wait_for_signal.relay_signal { + entry.notify_one(); + } + } + Err(_) => { + return Err(Error::WaitForSignalTimeout(wait_for_signal.signal)); + } + } + Ok(()) + } + + fn emit_signal(&self, signal: Signal) { + let entry = self + .signals + .lock() + .entry(signal) + .or_insert_with(|| Arc::new(tokio::sync::Notify::new())) + .clone(); + entry.notify_one(); + } + + fn set_actions(&self, sync_point: &str, actions: Vec, execute_times: u64) { + if execute_times == 0 { + return; + } + let mut guard = self.sync_points.lock(); + let sync_points = guard.deref_mut(); + sync_points.insert( + sync_point.to_owned(), + SyncPointInfo { + actions, + execute_times, + }, + ); + } + + fn reset_actions(&self, sync_point: &str) { + self.sync_points.lock().remove(sync_point); + } + + async fn on_sync_point(&self, sync_point: &str) -> Result<(), Error> { + let actions = { + let mut guard = self.sync_points.lock(); + match guard.entry(sync_point.to_owned()) { + Entry::Occupied(mut o) => { + if o.get().execute_times == 1 { + // Deactivate the sync point and execute its actions for the last time. + guard.remove(sync_point).unwrap().actions + } else { + o.get_mut().execute_times -= 1; + o.get().actions.clone() + } + } + Entry::Vacant(_) => { + return Ok(()); + } + } + }; + for action in actions { + match action { + Action::WaitForSignal(w) => { + self.wait_for_signal(w.to_owned()).await?; + } + Action::EmitSignal(s) => { + self.emit_signal(s.to_owned()); + } + } + } + Ok(()) + } +} + +/// The activation is reset after executed `execute_times`. +pub fn activate_sync_point(sync_point: &str, actions: Vec, execute_times: u64) { + SYNC_FACILITY.set_actions(sync_point, actions, execute_times); +} + +pub fn deactivate_sync_point(sync_point: &str) { + SYNC_FACILITY.reset_actions(sync_point); +} + +/// The sync point is triggered +pub async fn on_sync_point(sync_point: &str) -> Result<(), Error> { + SYNC_FACILITY.on_sync_point(sync_point).await +} diff --git a/src/meta/src/hummock/compaction_scheduler.rs b/src/meta/src/hummock/compaction_scheduler.rs index d9d79d0221dc3..87073891fbf4d 100644 --- a/src/meta/src/hummock/compaction_scheduler.rs +++ b/src/meta/src/hummock/compaction_scheduler.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use std::time::Duration; use parking_lot::Mutex; +use risingwave_common::util::sync_point::on_sync_point; use risingwave_hummock_sdk::compact::compact_task_to_string; use risingwave_hummock_sdk::CompactionGroupId; use risingwave_pb::hummock::subscribe_compact_tasks_response::Task; @@ -112,6 +113,9 @@ where break 'compaction_trigger; } }; + on_sync_point("BEFORE_SCHEDULE_COMPACTION_TASK") + .await + .unwrap(); self.pick_and_assign(compaction_group, request_channel.clone()) .await; } diff --git a/src/meta/src/hummock/mod.rs b/src/meta/src/hummock/mod.rs index cc8936614e2ca..4e88f59629c65 100644 --- a/src/meta/src/hummock/mod.rs +++ b/src/meta/src/hummock/mod.rs @@ -37,6 +37,7 @@ pub use compaction_scheduler::CompactionScheduler; pub use compactor_manager::*; #[cfg(any(test, feature = "test"))] pub use mock_hummock_meta_client::MockHummockMetaClient; +use risingwave_common::util::sync_point::on_sync_point; use tokio::sync::oneshot::Sender; use tokio::task::JoinHandle; use tokio_retry::strategy::{jitter, ExponentialBackoff}; @@ -172,6 +173,7 @@ where if let Err(err) = vacuum.vacuum_sst_data().await { tracing::warn!("Vacuum SST error {:#?}", err); } + on_sync_point("AFTER_SCHEDULE_VACUUM").await.unwrap(); } }); (join_handle, shutdown_tx) diff --git a/src/meta/src/hummock/vacuum.rs b/src/meta/src/hummock/vacuum.rs index ff06e9a70aeec..719a65b6d70f0 100644 --- a/src/meta/src/hummock/vacuum.rs +++ b/src/meta/src/hummock/vacuum.rs @@ -181,7 +181,10 @@ where /// 3. Meta node decides which SSTs to delete. See `VacuumManager::complete_full_gc`. pub async fn start_full_gc(&self, sst_retention_time: Duration) -> Result<()> { // Set a minimum sst_retention_time to avoid deleting SSTs of on-going write op. - let sst_retention_time = std::cmp::max(sst_retention_time, Duration::from_secs(3600 * 3)); + let sst_retention_time = cmp::max( + sst_retention_time, + Duration::from_secs(self.env.opts.min_sst_retention_time_sec), + ); tracing::info!( "run full GC with sst_retention_time = {} secs", sst_retention_time.as_secs() diff --git a/src/meta/src/lib.rs b/src/meta/src/lib.rs index 1f3dbef82d852..a130c508f9f43 100644 --- a/src/meta/src/lib.rs +++ b/src/meta/src/lib.rs @@ -134,7 +134,7 @@ pub struct MetaNodeOpts { /// Threshold used by worker node to filter out new SSTs when scanning object store, during /// full SST GC. #[clap(long, default_value = "604800")] - sst_retention_time_sec: u64, + min_sst_retention_time_sec: u64, /// Compaction scheduler retries compactor selection with this interval. #[clap(long, default_value = "5")] @@ -202,7 +202,7 @@ pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { max_idle_ms, in_flight_barrier_nums, vacuum_interval_sec: opts.vacuum_interval_sec, - sst_retention_time_sec: opts.sst_retention_time_sec, + min_sst_retention_time_sec: opts.min_sst_retention_time_sec, compactor_selection_retry_interval_sec: opts.compactor_selection_retry_interval_sec, collect_gc_watermark_spin_interval_sec: opts.collect_gc_watermark_spin_interval_sec, }, diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index 06d3b4fe537c2..b33b014870b49 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -74,7 +74,7 @@ pub struct MetaOpts { /// Interval of GC metadata in meta store and stale SSTs in object store. pub vacuum_interval_sec: u64, /// Threshold used by worker node to filter out new SSTs when scanning object store. - pub sst_retention_time_sec: u64, + pub min_sst_retention_time_sec: u64, /// Compaction scheduler retries compactor selection with this interval. pub compactor_selection_retry_interval_sec: u64, /// The spin interval when collecting global GC watermark in hummock @@ -89,7 +89,7 @@ impl Default for MetaOpts { max_idle_ms: 0, in_flight_barrier_nums: 40, vacuum_interval_sec: 30, - sst_retention_time_sec: 3600 * 24 * 7, + min_sst_retention_time_sec: 3600 * 24 * 7, compactor_selection_retry_interval_sec: 5, collect_gc_watermark_spin_interval_sec: 5, } diff --git a/src/meta/src/rpc/service/hummock_service.rs b/src/meta/src/rpc/service/hummock_service.rs index df53ac7034414..d5b2b2e7f67b4 100644 --- a/src/meta/src/rpc/service/hummock_service.rs +++ b/src/meta/src/rpc/service/hummock_service.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use std::time::Duration; use risingwave_common::catalog::{TableId, NON_RESERVED_PG_CATALOG_TABLE_ID}; +use risingwave_common::util::sync_point::on_sync_point; use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerService; use risingwave_pb::hummock::*; use tonic::{Request, Response, Status}; @@ -214,6 +215,7 @@ where .await .map_err(meta_error_to_tonic)?; } + on_sync_point("AFTER_REPORT_VACUUM").await.unwrap(); Ok(Response::new(ReportVacuumTaskResponse { status: None })) } diff --git a/src/object_store/src/object/mod.rs b/src/object_store/src/object/mod.rs index d0a15569cf4f8..39e8248532010 100644 --- a/src/object_store/src/object/mod.rs +++ b/src/object_store/src/object/mod.rs @@ -83,7 +83,7 @@ pub struct BlockLocation { pub size: usize, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct ObjectMetadata { // Full path pub key: String, diff --git a/src/risedevtool/src/service_config.rs b/src/risedevtool/src/service_config.rs index 713fc0a34ebcd..c1f5b0b9c3ade 100644 --- a/src/risedevtool/src/service_config.rs +++ b/src/risedevtool/src/service_config.rs @@ -61,6 +61,9 @@ pub struct MetaNodeConfig { pub enable_dashboard_v2: bool, pub unsafe_disable_recovery: bool, pub max_idle_secs_to_exit: Option, + pub vacuum_interval_sec: u64, + pub collect_gc_watermark_spin_interval_sec: u64, + pub min_sst_retention_time_sec: u64, } #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] diff --git a/src/risedevtool/src/task/meta_node_service.rs b/src/risedevtool/src/task/meta_node_service.rs index 78da77fdf6175..ef698e74a617f 100644 --- a/src/risedevtool/src/task/meta_node_service.rs +++ b/src/risedevtool/src/task/meta_node_service.rs @@ -90,6 +90,13 @@ impl MetaNodeService { } } + cmd.arg("--vacuum-interval-sec") + .arg(format!("{}", config.vacuum_interval_sec)) + .arg("--collect-gc-watermark-spin-interval-sec") + .arg(format!("{}", config.collect_gc_watermark_spin_interval_sec)) + .arg("--min-sst-retention-time-sec") + .arg(format!("{}", config.min_sst_retention_time_sec)); + Ok(()) } } diff --git a/src/storage/src/hummock/compactor/mod.rs b/src/storage/src/hummock/compactor/mod.rs index 5195e152b6cb2..1d7e66c3304a6 100644 --- a/src/storage/src/hummock/compactor/mod.rs +++ b/src/storage/src/hummock/compactor/mod.rs @@ -37,6 +37,7 @@ use futures::{stream, FutureExt, StreamExt}; pub use iterator::ConcatSstableIterator; use itertools::Itertools; use risingwave_common::config::constant::hummock::CompactionFilterFlag; +use risingwave_common::util::sync_point::on_sync_point; use risingwave_hummock_sdk::compact::compact_task_to_string; use risingwave_hummock_sdk::filter_key_extractor::FilterKeyExtractorImpl; use risingwave_hummock_sdk::key::{get_epoch, FullKey}; @@ -255,6 +256,7 @@ impl Compactor { // Sort by split/key range index. output_ssts.sort_by_key(|(split_index, _)| *split_index); + on_sync_point("BEFORE_COMPACT_REPORT").await.unwrap(); // After a compaction is done, mutate the compaction task. Self::compact_done( &mut compact_task, @@ -263,6 +265,7 @@ impl Compactor { compact_success, ) .await; + on_sync_point("AFTER_COMPACT_REPORT").await.unwrap(); let cost_time = timer.stop_and_record() * 1000.0; tracing::info!( "Finished compaction task in {:?}ms: \n{}", diff --git a/src/tests/sync_point/Cargo.toml b/src/tests/sync_point/Cargo.toml new file mode 100644 index 0000000000000..29925966805c8 --- /dev/null +++ b/src/tests/sync_point/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "risingwave_sync_point_test" +version = "0.1.0" +edition = "2021" +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +bytes = { version = "1" } +itertools = "0.10" +risingwave_cmd_all = { path = "../../cmd_all" } +risingwave_common = { path = "../../common" } +risingwave_object_store = { path = "../../object_store" } +risingwave_pb = { path = "../../prost" } +risingwave_rpc_client = { path = "../../rpc_client" } +serial_test = "0.9" +tokio = { version = "0.2", package = "madsim-tokio", features = [ + "rt", + "rt-multi-thread", + "sync", + "macros", + "time", + "signal", + "fs", +] } diff --git a/src/tests/sync_point/README.md b/src/tests/sync_point/README.md new file mode 100644 index 0000000000000..4ba8752b69431 --- /dev/null +++ b/src/tests/sync_point/README.md @@ -0,0 +1,9 @@ +Integration tests based on risedev playground and sync point support. + +# How to run + +bash run_sync_point_test.sh + +# How to write a test + +Refer to test_gc_watermark in src/tests/sync/src/tests.rs. \ No newline at end of file diff --git a/src/tests/sync_point/run_sync_point_test.sh b/src/tests/sync_point/run_sync_point_test.sh new file mode 100644 index 0000000000000..ea3d0fe40af0e --- /dev/null +++ b/src/tests/sync_point/run_sync_point_test.sh @@ -0,0 +1,15 @@ +#!/bin/bash + +set -e + +SCRIPT_PATH="$( cd -- "$(dirname "$0")" >/dev/null 2>&1 ; pwd -P )" +RW_WORKSPACE=$(realpath "${SCRIPT_PATH}"/../../../) +export RW_WORKSPACE=${RW_WORKSPACE} +RUSTFLAGS='--cfg tokio_unstable --cfg sync_point_test' +export RUSTFLAGS=${RUSTFLAGS} + +cp -R -n "${RW_WORKSPACE}"/e2e_test "${SCRIPT_PATH}"/slt/ || : + +# cargo test -p risingwave_sync_point_test +# A workaround before we can wipe object store before each test. Because all tests are sharing the same shared in-mem object store. +cargo test -p risingwave_sync_point_test -- --list --format=terse| awk '/: test/{print substr($1, 1, length($1)-1)}' |xargs -I{} cargo test -p risingwave_sync_point_test {} diff --git a/src/tests/sync_point/slt/tpch_snapshot_no_drop.slt b/src/tests/sync_point/slt/tpch_snapshot_no_drop.slt new file mode 100644 index 0000000000000..4b09585087603 --- /dev/null +++ b/src/tests/sync_point/slt/tpch_snapshot_no_drop.slt @@ -0,0 +1,33 @@ +include e2e_test/tpch/create_tables.slt.part + +include e2e_test/tpch/insert_customer.slt.part +include e2e_test/tpch/insert_lineitem.slt.part +include e2e_test/tpch/insert_nation.slt.part +include e2e_test/tpch/insert_orders.slt.part +include e2e_test/tpch/insert_part.slt.part +include e2e_test/tpch/insert_partsupp.slt.part +include e2e_test/tpch/insert_region.slt.part +include e2e_test/tpch/insert_supplier.slt.part + +include e2e_test/streaming/tpch/create_views.slt.part + +include e2e_test/streaming/tpch/q1.slt.part +include e2e_test/streaming/tpch/q2.slt.part +include e2e_test/streaming/tpch/q3.slt.part +include e2e_test/streaming/tpch/q4.slt.part +include e2e_test/streaming/tpch/q5.slt.part +include e2e_test/streaming/tpch/q6.slt.part +include e2e_test/streaming/tpch/q7.slt.part +include e2e_test/streaming/tpch/q8.slt.part +include e2e_test/streaming/tpch/q9.slt.part +include e2e_test/streaming/tpch/q10.slt.part +include e2e_test/streaming/tpch/q11.slt.part +include e2e_test/streaming/tpch/q12.slt.part +include e2e_test/streaming/tpch/q13.slt.part +include e2e_test/streaming/tpch/q14.slt.part +include e2e_test/streaming/tpch/q17.slt.part +include e2e_test/streaming/tpch/q18.slt.part +include e2e_test/streaming/tpch/q19.slt.part +include e2e_test/streaming/tpch/q20.slt.part +include e2e_test/streaming/tpch/q22.slt.part + diff --git a/src/tests/sync_point/src/lib.rs b/src/tests/sync_point/src/lib.rs new file mode 100644 index 0000000000000..b132cb55f252d --- /dev/null +++ b/src/tests/sync_point/src/lib.rs @@ -0,0 +1,18 @@ +// Copyright 2022 Singularity Data +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#[cfg(all(test, sync_point_test))] +mod test_utils; +#[cfg(all(test, sync_point_test))] +mod tests; diff --git a/src/tests/sync_point/src/test_utils.rs b/src/tests/sync_point/src/test_utils.rs new file mode 100644 index 0000000000000..1d76c88c5a476 --- /dev/null +++ b/src/tests/sync_point/src/test_utils.rs @@ -0,0 +1,159 @@ +// Copyright 2022 Singularity Data +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::process::{Command, Output}; +use std::sync::Arc; +use std::thread::JoinHandle; +use std::time::Duration; + +use risingwave_cmd_all::playground; +use risingwave_common::util::sync_point; +use risingwave_common::util::sync_point::{Signal, WaitForSignal}; +use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; +use risingwave_object_store::object::{parse_remote_object_store, ObjectStoreImpl}; +use risingwave_pb::common::WorkerType; +use risingwave_rpc_client::MetaClient; +use serial_test::serial; + +pub fn setup_env() { + let current_dir = + std::env::var("RW_WORKSPACE").expect("set env RW_WORKSPACE to project root path"); + std::env::set_current_dir(current_dir).unwrap(); + std::env::set_var("RW_META_ADDR", "http://127.0.0.1:5690"); + std::env::set_var("FORCE_SHARED_HUMMOCK_IN_MEM", "1"); + std::env::set_var("PLAYGROUND_PROFILE", "playground-test"); + std::env::set_var("OBJECT_STORE_URL", "memory-shared"); + std::env::set_var("OBJECT_STORE_BUCKET", "hummock_001"); +} + +pub async fn start_cluster() -> (JoinHandle<()>, std::sync::mpsc::Sender<()>) { + wipe_object_store().await; + sync_point::activate_sync_point( + "CLUSTER_READY", + vec![sync_point::Action::EmitSignal( + "SIG_CLUSTER_READY".to_owned(), + )], + 1, + ); + + let (tx, rx) = std::sync::mpsc::channel(); + let join_handle = std::thread::spawn(move || { + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .build() + .unwrap(); + runtime.block_on(async { + tokio::spawn(async { playground().await }); + rx.recv().unwrap(); + }); + }); + // It will find "SIG_CLUSTER_READY" even when it is reached after "SIG_CLUSTER_READY" has been + // emitted. + wait_now("SIG_CLUSTER_READY".to_owned(), Duration::from_secs(30)) + .await + .unwrap(); + (join_handle, tx) +} + +pub async fn stop_cluster(join_handle: JoinHandle<()>, shutdown_tx: std::sync::mpsc::Sender<()>) { + shutdown_tx.send(()).unwrap(); + join_handle.join().unwrap(); + wipe_object_store().await; +} + +pub fn run_slt() -> Output { + Command::new("./risedev") + .args([ + "slt", + "-d", + "dev", + "-p", + "4566", + "src/tests/sync_point/slt/tpch_snapshot_no_drop.slt", + ]) + .spawn() + .unwrap() + .wait_with_output() + .unwrap() +} + +pub async fn get_object_store_client() -> ObjectStoreImpl { + let url = std::env::var("OBJECT_STORE_URL").unwrap(); + parse_remote_object_store(&url, Arc::new(ObjectStoreMetrics::unused())).await +} + +pub fn get_object_store_bucket() -> String { + std::env::var("OBJECT_STORE_BUCKET").unwrap() +} + +async fn wipe_object_store() { + // TODO: wipe content of object store +} + +pub async fn get_meta_client() -> MetaClient { + let meta_addr = std::env::var("RW_META_ADDR").unwrap(); + let mut client = MetaClient::new(&meta_addr).await.unwrap(); + let worker_id = client + .register(WorkerType::Generic, &"127.0.0.1:2333".parse().unwrap(), 0) + .await + .unwrap(); + client.set_worker_id(worker_id); + client +} + +pub async fn wait_now(signal: Signal, timeout: Duration) -> Result<(), sync_point::Error> { + sync_point::activate_sync_point( + "NOW", + vec![sync_point::Action::WaitForSignal(WaitForSignal { + signal, + relay_signal: false, + timeout, + })], + 1, + ); + sync_point::on_sync_point("NOW").await +} + +pub async fn emit_now(signal: Signal) { + sync_point::activate_sync_point("NOW", vec![sync_point::Action::EmitSignal(signal)], 1); + sync_point::on_sync_point("NOW").await.unwrap(); +} + +#[tokio::test] +#[serial] +async fn test_wait_for_signal_timeout() { + sync_point::activate_sync_point( + "TEST_SETUP_TIMEOUT", + vec![sync_point::Action::WaitForSignal(WaitForSignal { + signal: "SIG_NEVER_EMIT".to_owned(), + relay_signal: false, + timeout: Duration::from_secs(1), + })], + 1, + ); + + // timeout + sync_point::on_sync_point("TEST_SETUP_TIMEOUT") + .await + .unwrap_err(); +} + +#[tokio::test] +#[serial] +async fn test_launch_cluster() { + setup_env(); + let (join_handle, tx) = start_cluster().await; + stop_cluster(join_handle, tx).await; +} diff --git a/src/tests/sync_point/src/tests.rs b/src/tests/sync_point/src/tests.rs new file mode 100644 index 0000000000000..ab410ee342d26 --- /dev/null +++ b/src/tests/sync_point/src/tests.rs @@ -0,0 +1,279 @@ +// Copyright 2022 Singularity Data +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use itertools::Itertools; +use risingwave_common::util::sync_point; +use risingwave_common::util::sync_point::WaitForSignal; +use risingwave_rpc_client::HummockMetaClient; +use serial_test::serial; + +use crate::test_utils::*; + +/// With support of sync point, this test is executed in the following sequential order: +/// 1. Block compaction scheduler, thus no compaction will be automatically scheduled. +/// 2. Import data with risedev slt. The slt is modified so that it will not drop MVs in the end. +/// 3. Schedule exactly one compaction. +/// 4. Wait until compactor has uploaded its output to object store. It doesn't report task result +/// to meta, until we tell it to do so in step 6. +/// 5. Verify GC logic. +/// 6. Compactor reports task result to meta. +/// 7. Verify GC logic. +#[tokio::test] +#[serial] +async fn test_gc_watermark() { + setup_env(); + + let (join_handle, tx) = start_cluster().await; + let object_store_client = get_object_store_client().await; + let meta_client = get_meta_client().await; + + // Activate predefined sync points with customized actions + sync_point::activate_sync_point( + "BEFORE_COMPACT_REPORT", + vec![ + sync_point::Action::EmitSignal("SIG_DONE_COMPACT_UPLOAD".to_owned()), + sync_point::Action::WaitForSignal(WaitForSignal { + signal: "SIG_START_COMPACT_REPORT".to_owned(), + relay_signal: false, + timeout: Duration::from_secs(3600), + }), + ], + 1, + ); + sync_point::activate_sync_point( + "AFTER_COMPACT_REPORT", + vec![sync_point::Action::EmitSignal( + "SIG_DONE_COMPACT_REPORT".to_owned(), + )], + 1, + ); + sync_point::activate_sync_point( + "AFTER_REPORT_VACUUM", + vec![sync_point::Action::EmitSignal( + "SIG_DONE_REPORT_VACUUM".to_owned(), + )], + 2, + ); + sync_point::activate_sync_point( + "AFTER_SCHEDULE_VACUUM", + vec![sync_point::Action::EmitSignal( + "SIG_DONE_SCHEDULE_VACUUM".to_owned(), + )], + u64::MAX, + ); + // Block compaction scheduler so that we can control scheduling explicitly + sync_point::activate_sync_point( + "BEFORE_SCHEDULE_COMPACTION_TASK", + vec![sync_point::Action::WaitForSignal(WaitForSignal { + signal: "SIG_SCHEDULE_COMPACTION_TASK".to_owned(), + relay_signal: false, + timeout: Duration::from_secs(3600), + })], + u64::MAX, + ); + + // Import data + let run_slt = run_slt(); + assert!(run_slt.status.success()); + + let before_compaction = object_store_client.list("").await.unwrap(); + assert!(!before_compaction.is_empty()); + + // Schedule a compaction task + emit_now("SIG_SCHEDULE_COMPACTION_TASK".to_owned()).await; + + // Wait until SSTs have been written to object store + wait_now( + "SIG_DONE_COMPACT_UPLOAD".to_owned(), + Duration::from_secs(10), + ) + .await + .unwrap(); + + let after_compaction_upload = object_store_client.list("").await.unwrap(); + let new_objects = after_compaction_upload + .iter() + .filter(|after| { + !before_compaction + .iter() + .any(|before| before.key == after.key) + }) + .cloned() + .collect_vec(); + assert!(!new_objects.is_empty()); + + // Upload a garbage object + let sst_id = meta_client.get_new_sst_ids(1).await.unwrap().start_id; + object_store_client + .upload( + &format!("{}/{}.data", get_object_store_bucket(), sst_id), + bytes::Bytes::from(vec![1, 2, 3]), + ) + .await + .unwrap(); + let after_garbage_upload = object_store_client.list("").await.unwrap(); + assert_eq!( + after_garbage_upload.len(), + after_compaction_upload.len() + 1 + ); + + meta_client.trigger_full_gc(0).await.unwrap(); + // Wait until VACUUM is scheduled and reported + for _ in 0..2 { + wait_now( + "SIG_DONE_SCHEDULE_VACUUM".to_owned(), + Duration::from_secs(10), + ) + .await + .unwrap(); + } + // Expect timeout aka no SST is deleted, because the garbage SST has greater id than watermark, + // which is held by the on-going compaction. + wait_now("SIG_DONE_REPORT_VACUUM".to_owned(), Duration::from_secs(10)) + .await + .unwrap_err(); + let after_gc = object_store_client.list("").await.unwrap(); + assert_eq!(after_gc.len(), after_compaction_upload.len() + 1); + + // Signal to continue compaction report + emit_now("SIG_START_COMPACT_REPORT".to_owned()).await; + + // Wait until SSts have been written to hummock version + wait_now( + "SIG_DONE_COMPACT_REPORT".to_owned(), + Duration::from_secs(10), + ) + .await + .unwrap(); + + // Wait until VACUUM is scheduled and reported + for _ in 0..2 { + wait_now( + "SIG_DONE_SCHEDULE_VACUUM".to_owned(), + Duration::from_secs(10), + ) + .await + .unwrap(); + } + // Expect some stale SSTs as the result of compaction are deleted. + wait_now("SIG_DONE_REPORT_VACUUM".to_owned(), Duration::from_secs(10)) + .await + .unwrap(); + let after_gc = object_store_client.list("").await.unwrap(); + assert!(after_gc.len() < after_compaction_upload.len()); + + meta_client.trigger_full_gc(0).await.unwrap(); + // Wait until VACUUM is scheduled and reported + for _ in 0..2 { + wait_now( + "SIG_DONE_SCHEDULE_VACUUM".to_owned(), + Duration::from_secs(10), + ) + .await + .unwrap(); + } + // Expect the garbage SST is deleted. + wait_now("SIG_DONE_REPORT_VACUUM".to_owned(), Duration::from_secs(10)) + .await + .unwrap(); + let after_gc_2 = object_store_client.list("").await.unwrap(); + assert_eq!(after_gc.len(), after_gc_2.len() + 1); + + stop_cluster(join_handle, tx).await; +} + +#[tokio::test] +#[serial] +async fn test_gc_sst_retention_time() { + setup_env(); + + let (join_handle, tx) = start_cluster().await; + let object_store_client = get_object_store_client().await; + let meta_client = get_meta_client().await; + + sync_point::activate_sync_point( + "AFTER_SCHEDULE_VACUUM", + vec![sync_point::Action::EmitSignal( + "SIG_DONE_SCHEDULE_VACUUM".to_owned(), + )], + u64::MAX, + ); + // Activate predefined sync points with customized actions + sync_point::activate_sync_point( + "AFTER_REPORT_VACUUM", + vec![sync_point::Action::EmitSignal( + "SIG_DONE_REPORT_VACUUM".to_owned(), + )], + 1, + ); + + let before_garbage_upload = object_store_client.list("").await.unwrap(); + assert_eq!(before_garbage_upload.len(), 0); + + // Upload a garbage object + let sst_id = meta_client.get_new_sst_ids(1).await.unwrap().start_id; + object_store_client + .upload( + &format!("{}/{}.data", get_object_store_bucket(), sst_id), + bytes::Bytes::from(vec![1, 2, 3]), + ) + .await + .unwrap(); + let after_garbage_upload = object_store_client.list("").await.unwrap(); + assert_eq!(after_garbage_upload.len(), 1); + + // With large sst_retention_time + meta_client.trigger_full_gc(3600).await.unwrap(); + // Wait until VACUUM is scheduled and reported + for _ in 0..2 { + wait_now( + "SIG_DONE_SCHEDULE_VACUUM".to_owned(), + Duration::from_secs(10), + ) + .await + .unwrap(); + } + // Expect timeout aka no SST is deleted, because all SSTs are within sst_retention_time, even + // the garbage one. + wait_now("SIG_DONE_REPORT_VACUUM".to_owned(), Duration::from_secs(10)) + .await + .unwrap_err(); + let after_gc = object_store_client.list("").await.unwrap(); + // Garbage is not deleted. + assert_eq!(after_gc, after_garbage_upload); + + // Ensure SST's last modified is less than now + tokio::time::sleep(Duration::from_secs(1)).await; + // With 0 sst_retention_time + meta_client.trigger_full_gc(0).await.unwrap(); + // Wait until VACUUM is scheduled and reported + for _ in 0..2 { + wait_now( + "SIG_DONE_SCHEDULE_VACUUM".to_owned(), + Duration::from_secs(10), + ) + .await + .unwrap(); + } + wait_now("SIG_DONE_REPORT_VACUUM".to_owned(), Duration::from_secs(10)) + .await + .unwrap(); + let after_gc = object_store_client.list("").await.unwrap(); + // Garbage is deleted. + assert_eq!(after_gc, before_garbage_upload); + + stop_cluster(join_handle, tx).await; +}