Skip to content

Commit

Permalink
feat(test): support sync point for integration test (#4855)
Browse files Browse the repository at this point in the history
* feat(test): support sync point in test and add demo tests.

* add comments

* add comments

* move sync to dedicated folder

* add license header

* rename sync* to sync_point*

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
zwang28 and mergify[bot] authored Aug 25, 2022
1 parent 698d4c8 commit 2c62ae9
Show file tree
Hide file tree
Showing 26 changed files with 842 additions and 7 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,4 @@ risedev-compose.yml
ft.txt
# generated output from regress tests
src/tests/regress/output/*
src/tests/sync_point/slt/e2e_test
42 changes: 42 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ members = [
"src/tests/regress",
"src/tests/simulation",
"src/tests/sqlsmith",
"src/tests/sync_point",
"src/tracing",
"src/utils/async_stack_trace",
"src/utils/memcomparable",
Expand Down
22 changes: 22 additions & 0 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,18 @@ risedev:
- use: compute-node
- use: frontend

playground-test:
- use: meta-node
enable-dashboard-v2: false
unsafe-disable-recovery: true
max-idle-secs-to-exit: 1800
vacuum-interval-sec: 1
collect-gc-watermark-spin-interval-sec: 1
min-sst-retention-time-sec: 0
- use: compute-node
- use: frontend
- use: compactor

playground-3cn:
- use: meta-node
enable-dashboard-v2: false
Expand Down Expand Up @@ -516,6 +528,16 @@ template:
# Whether to disable recovery mode
unsafe-disable-recovery: false

# Interval of GC stale metadata and SST
vacuum-interval-sec: 30

# The spin interval when collecting global GC watermark in hummock
collect-gc-watermark-spin-interval-sec: 5

# Threshold used by worker node to filter out new SSTs when scanning object store, during full SST GC.
# 7 days by default.
min-sst-retention-time-sec: 604800

prometheus:
# Advertise address of Prometheus
address: "127.0.0.1"
Expand Down
1 change: 1 addition & 0 deletions src/cmd_all/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ anyhow = "1"
clap = { version = "3", features = ["derive"] }
log = { version = "0.4", features = ["release_max_level_info"] }
risedev = { path = "../risedevtool" }
risingwave_common = { path = "../common" }
risingwave_compactor = { path = "../storage/compactor" }
risingwave_compute = { path = "../compute" }
risingwave_ctl = { path = "../ctl" }
Expand Down
7 changes: 6 additions & 1 deletion src/cmd_all/src/playground.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub async fn playground() -> Result<()> {
} else {
"playground".to_string()
};
let force_shared_hummock_in_mem = std::env::var("FORCE_SHARED_HUMMOCK_IN_MEM").is_ok();

// TODO: may allow specifying the config file for the playground.
let apply_config_file = |cmd: &mut Command| {
Expand Down Expand Up @@ -94,7 +95,7 @@ pub async fn playground() -> Result<()> {
ComputeNodeService::apply_command_args(
&mut command,
c,
if compute_node_count > 1 {
if force_shared_hummock_in_mem || compute_node_count > 1 {
HummockInMemoryStrategy::Shared
} else {
HummockInMemoryStrategy::Isolated
Expand Down Expand Up @@ -188,6 +189,10 @@ pub async fn playground() -> Result<()> {
}
}

risingwave_common::util::sync_point::on_sync_point("CLUSTER_READY")
.await
.unwrap();

// TODO: should we join all handles?
// Currently, not all services can be shutdown gracefully, just quit on Ctrl-C now.
signal::ctrl_c().await.unwrap();
Expand Down
2 changes: 2 additions & 0 deletions src/common/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ pub mod try_match;
pub mod epoch;
mod future_utils;
pub mod scan_range;
pub mod sync_point;

pub mod value_encoding;
pub mod worker_util;

Expand Down
31 changes: 31 additions & 0 deletions src/common/src/util/sync_point/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2022 Singularity Data
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#[cfg(sync_point_test)]
mod utils;
#[cfg(sync_point_test)]
pub use utils::*;

#[cfg(not(sync_point_test))]
#[inline(always)]
#[expect(clippy::unused_async)]
pub async fn on_sync_point(_sync_point: &str) -> Result<(), Error> {
Ok(())
}

#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("Wait for signal {0} timeout")]
WaitForSignalTimeout(String),
}
169 changes: 169 additions & 0 deletions src/common/src/util/sync_point/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// Copyright 2022 Singularity Data
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::ops::DerefMut;
use std::sync::Arc;
use std::time::Duration;

use crate::util::sync_point::Error;

pub type SyncPoint = String;
pub type Signal = String;

#[derive(Clone)]
pub enum Action {
WaitForSignal(WaitForSignal),
EmitSignal(Signal),
}

#[derive(Clone)]
pub struct WaitForSignal {
/// The signal being waited for.
pub signal: Signal,
/// Whether to stop the signal from further propagation after receiving one.
///
/// If true, the signal is relayed and another waiter is signalled right away.
///
/// If false, other waiter needs to wait for another signal.
pub relay_signal: bool,
/// Max duration to wait for.
pub timeout: Duration,
}

lazy_static::lazy_static! {
static ref SYNC_FACILITY: SyncFacility = {
SyncFacility::new()
};
}

/// A `SyncPoint` is activated by attaching a `SyncPointInfo` to it.
#[derive(Clone)]
struct SyncPointInfo {
/// `Action`s to be executed when `SyncPoint` is triggered.
actions: Vec<Action>,
/// The `SyncPoint` is deactivated after triggered `execute_times`.
execute_times: u64,
}

struct SyncFacility {
/// `Notify` for each `Signal`.
signals: parking_lot::Mutex<HashMap<Signal, Arc<tokio::sync::Notify>>>,
/// `SyncPointInfo` for active `SyncPoint`.
sync_points: parking_lot::Mutex<HashMap<SyncPoint, SyncPointInfo>>,
}

impl SyncFacility {
fn new() -> Self {
Self {
signals: Default::default(),
sync_points: Default::default(),
}
}

async fn wait_for_signal(&self, wait_for_signal: WaitForSignal) -> Result<(), Error> {
let entry = self
.signals
.lock()
.entry(wait_for_signal.signal.to_owned())
.or_insert_with(|| Arc::new(tokio::sync::Notify::new()))
.clone();
match tokio::time::timeout(wait_for_signal.timeout, entry.notified()).await {
Ok(_) => {
if wait_for_signal.relay_signal {
entry.notify_one();
}
}
Err(_) => {
return Err(Error::WaitForSignalTimeout(wait_for_signal.signal));
}
}
Ok(())
}

fn emit_signal(&self, signal: Signal) {
let entry = self
.signals
.lock()
.entry(signal)
.or_insert_with(|| Arc::new(tokio::sync::Notify::new()))
.clone();
entry.notify_one();
}

fn set_actions(&self, sync_point: &str, actions: Vec<Action>, execute_times: u64) {
if execute_times == 0 {
return;
}
let mut guard = self.sync_points.lock();
let sync_points = guard.deref_mut();
sync_points.insert(
sync_point.to_owned(),
SyncPointInfo {
actions,
execute_times,
},
);
}

fn reset_actions(&self, sync_point: &str) {
self.sync_points.lock().remove(sync_point);
}

async fn on_sync_point(&self, sync_point: &str) -> Result<(), Error> {
let actions = {
let mut guard = self.sync_points.lock();
match guard.entry(sync_point.to_owned()) {
Entry::Occupied(mut o) => {
if o.get().execute_times == 1 {
// Deactivate the sync point and execute its actions for the last time.
guard.remove(sync_point).unwrap().actions
} else {
o.get_mut().execute_times -= 1;
o.get().actions.clone()
}
}
Entry::Vacant(_) => {
return Ok(());
}
}
};
for action in actions {
match action {
Action::WaitForSignal(w) => {
self.wait_for_signal(w.to_owned()).await?;
}
Action::EmitSignal(s) => {
self.emit_signal(s.to_owned());
}
}
}
Ok(())
}
}

/// The activation is reset after executed `execute_times`.
pub fn activate_sync_point(sync_point: &str, actions: Vec<Action>, execute_times: u64) {
SYNC_FACILITY.set_actions(sync_point, actions, execute_times);
}

pub fn deactivate_sync_point(sync_point: &str) {
SYNC_FACILITY.reset_actions(sync_point);
}

/// The sync point is triggered
pub async fn on_sync_point(sync_point: &str) -> Result<(), Error> {
SYNC_FACILITY.on_sync_point(sync_point).await
}
4 changes: 4 additions & 0 deletions src/meta/src/hummock/compaction_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::Arc;
use std::time::Duration;

use parking_lot::Mutex;
use risingwave_common::util::sync_point::on_sync_point;
use risingwave_hummock_sdk::compact::compact_task_to_string;
use risingwave_hummock_sdk::CompactionGroupId;
use risingwave_pb::hummock::subscribe_compact_tasks_response::Task;
Expand Down Expand Up @@ -112,6 +113,9 @@ where
break 'compaction_trigger;
}
};
on_sync_point("BEFORE_SCHEDULE_COMPACTION_TASK")
.await
.unwrap();
self.pick_and_assign(compaction_group, request_channel.clone())
.await;
}
Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/hummock/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub use compaction_scheduler::CompactionScheduler;
pub use compactor_manager::*;
#[cfg(any(test, feature = "test"))]
pub use mock_hummock_meta_client::MockHummockMetaClient;
use risingwave_common::util::sync_point::on_sync_point;
use tokio::sync::oneshot::Sender;
use tokio::task::JoinHandle;
use tokio_retry::strategy::{jitter, ExponentialBackoff};
Expand Down Expand Up @@ -172,6 +173,7 @@ where
if let Err(err) = vacuum.vacuum_sst_data().await {
tracing::warn!("Vacuum SST error {:#?}", err);
}
on_sync_point("AFTER_SCHEDULE_VACUUM").await.unwrap();
}
});
(join_handle, shutdown_tx)
Expand Down
Loading

0 comments on commit 2c62ae9

Please sign in to comment.