Skip to content

Commit

Permalink
refactor(storage): remove enable_test_epoch and modify epochs used …
Browse files Browse the repository at this point in the history
…by all unit tests to be realistic (#14557)
  • Loading branch information
wcy-fdu authored Mar 11, 2024
1 parent 474b1c3 commit cd28ff0
Show file tree
Hide file tree
Showing 108 changed files with 1,337 additions and 1,563 deletions.
2 changes: 1 addition & 1 deletion ci/scripts/check.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ sccache --show-stats
sccache --zero-stats

echo "--- Run clippy check (release)"
cargo clippy --release --all-targets --features "rw-static-link,enable_test_epoch_in_release" --locked -- -D warnings
cargo clippy --release --all-targets --features "rw-static-link" --locked -- -D warnings

echo "--- Run cargo check on building the release binary (release)"
cargo check -p risingwave_cmd_all --features "rw-static-link" --profile release
Expand Down
2 changes: 1 addition & 1 deletion src/batch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ workspace-hack = { path = "../workspace-hack" }
criterion = { workspace = true, features = ["async_tokio", "async"] }
rand = "0.8"
risingwave_expr_impl = { workspace = true }
risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] }
risingwave_hummock_sdk = { workspace = true }
tempfile = "3"
tikv-jemallocator = { workspace = true }

Expand Down
47 changes: 38 additions & 9 deletions src/common/src/util/epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::sync::LazyLock;
use std::time::{Duration, SystemTime};

use easy_ext::ext;
use parse_display::Display;

use crate::types::{ScalarImpl, Timestamptz};
Expand Down Expand Up @@ -122,6 +123,11 @@ pub const EPOCH_SPILL_TIME_MASK: u64 = (1 << EPOCH_AVAILABLE_BITS) - 1;
const EPOCH_MASK: u64 = !EPOCH_SPILL_TIME_MASK;
pub const MAX_EPOCH: u64 = u64::MAX & EPOCH_MASK;

// EPOCH_INC_MIN_STEP_FOR_TEST is the minimum increment step for epoch in unit tests.
// We need to keep the lower 16 bits of the epoch unchanged during each increment,
// and only increase the upper 48 bits.
const EPOCH_INC_MIN_STEP_FOR_TEST: u64 = test_epoch(1);

pub fn is_max_epoch(epoch: u64) -> bool {
// Since we have write `MAX_EPOCH` as max epoch to sstable in some previous version,
// it means that there may be two value in our system which represent infinite. We must check
Expand Down Expand Up @@ -150,20 +156,43 @@ impl EpochPair {
Self { curr, prev }
}

pub fn inc(&mut self) {
self.curr += 1;
self.prev += 1;
pub fn inc_for_test(&mut self) {
self.prev = self.curr;
self.curr += EPOCH_INC_MIN_STEP_FOR_TEST;
}

pub fn inc_for_test(&mut self, inc_by: u64) {
self.prev = self.curr;
pub fn new_test_epoch(curr: u64) -> Self {
if !is_max_epoch(curr) {
assert!(curr >= EPOCH_INC_MIN_STEP_FOR_TEST);
assert!((curr & EPOCH_SPILL_TIME_MASK) == 0);
}
Self::new(curr, curr - EPOCH_INC_MIN_STEP_FOR_TEST)
}
}
/// As most unit tests initializ a new epoch from a random value (e.g. 1, 2, 233 etc.), but the correct epoch in the system is a u64 with the last `EPOCH_AVAILABLE_BITS` bits set to 0.
/// This method is to turn a a random epoch into a well shifted value.
pub const fn test_epoch(value: u64) -> u64 {
value << EPOCH_AVAILABLE_BITS
}

self.curr += inc_by;
/// There are numerous operations in our system's unit tests that involve incrementing or decrementing the epoch.
/// These extensions for u64 type are specifically used within the unit tests.
#[ext(EpochExt)]
pub impl u64 {
fn inc_epoch(&mut self) {
*self += EPOCH_INC_MIN_STEP_FOR_TEST;
}

pub fn new_test_epoch(curr: u64) -> Self {
assert!(curr > 0);
Self::new(curr, curr - 1)
fn dec_epoch(&mut self) {
*self -= EPOCH_INC_MIN_STEP_FOR_TEST;
}

fn next_epoch(self) -> u64 {
self + EPOCH_INC_MIN_STEP_FOR_TEST
}

fn prev_epoch(self) -> u64 {
self - EPOCH_INC_MIN_STEP_FOR_TEST
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/compute/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ workspace-hack = { path = "../workspace-hack" }
[dev-dependencies]
futures-async-stream = { workspace = true }
rand = "0.8"
risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] }
risingwave_hummock_sdk = { workspace = true }
tempfile = "3"

[lints]
Expand Down
13 changes: 8 additions & 5 deletions src/compute/tests/cdc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use risingwave_batch::executor::{Executor as BatchExecutor, RowSeqScanExecutor,
use risingwave_common::array::{Array, ArrayBuilder, DataChunk, Op, StreamChunk, Utf8ArrayBuilder};
use risingwave_common::catalog::{ColumnDesc, ColumnId, ConflictBehavior, Field, Schema, TableId};
use risingwave_common::types::{Datum, JsonbVal};
use risingwave_common::util::epoch::{test_epoch, EpochExt};
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_connector::source::cdc::external::mock_external_table::MockExternalTableReader;
use risingwave_connector::source::cdc::external::{
Expand Down Expand Up @@ -279,7 +280,7 @@ async fn test_cdc_backfill() -> StreamResult<()> {
let stream_chunk2 = create_stream_chunk(chunk2_datums, &chunk_schema);

// The first barrier
let curr_epoch = 11;
let mut curr_epoch = test_epoch(11);
let mut splits = HashMap::new();
splits.insert(
actor_id,
Expand Down Expand Up @@ -311,17 +312,19 @@ async fn test_cdc_backfill() -> StreamResult<()> {
// ingest data and barrier
let interval = Duration::from_millis(10);
tx.push_chunk(stream_chunk1);

tokio::time::sleep(interval).await;
tx.push_barrier(curr_epoch + 1, false);
curr_epoch.inc_epoch();
tx.push_barrier(curr_epoch, false);

tx.push_chunk(stream_chunk2);

tokio::time::sleep(interval).await;
tx.push_barrier(curr_epoch + 2, false);
curr_epoch.inc_epoch();
tx.push_barrier(curr_epoch, false);

tokio::time::sleep(interval).await;
tx.push_barrier(curr_epoch + 3, true);
curr_epoch.inc_epoch();
tx.push_barrier(curr_epoch, true);

// scan the final result of the mv table
let column_descs = vec![
Expand Down
12 changes: 6 additions & 6 deletions src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use risingwave_common::row::OwnedRow;
use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
use risingwave_common::test_prelude::DataChunkTestExt;
use risingwave_common::types::{DataType, IntoOrdered};
use risingwave_common::util::epoch::EpochPair;
use risingwave_common::util::epoch::{test_epoch, EpochExt, EpochPair};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_connector::source::reader::desc::test_utils::create_source_desc_builder;
Expand Down Expand Up @@ -275,7 +275,7 @@ async fn test_table_materialize() -> StreamResult<()> {
assert!(result.is_none());

// Send a barrier to start materialized view.
let mut curr_epoch = 1919;
let mut curr_epoch = test_epoch(1919);
barrier_tx
.send(Barrier::new_test_barrier(curr_epoch))
.unwrap();
Expand All @@ -289,7 +289,7 @@ async fn test_table_materialize() -> StreamResult<()> {
}) if epoch.curr == curr_epoch
));

curr_epoch += 1;
curr_epoch.inc_epoch();
let barrier_tx_clone = barrier_tx.clone();
tokio::spawn(async move {
let mut stream = insert.execute();
Expand Down Expand Up @@ -371,7 +371,7 @@ async fn test_table_materialize() -> StreamResult<()> {
0,
));

curr_epoch += 1;
curr_epoch.inc_epoch();
let barrier_tx_clone = barrier_tx.clone();
tokio::spawn(async move {
let mut stream = delete.execute();
Expand Down Expand Up @@ -464,7 +464,7 @@ async fn test_row_seq_scan() -> StreamResult<()> {
vec![0, 1, 2],
);

let mut epoch = EpochPair::new_test_epoch(1);
let mut epoch = EpochPair::new_test_epoch(test_epoch(1));
state.init_epoch(epoch);
state.insert(OwnedRow::new(vec![
Some(1_i32.into()),
Expand All @@ -477,7 +477,7 @@ async fn test_row_seq_scan() -> StreamResult<()> {
Some(8_i64.into()),
]));

epoch.inc();
epoch.inc_for_test();
state.commit(epoch).await.unwrap();

let executor = Box::new(RowSeqScanExecutor::new(
Expand Down
5 changes: 3 additions & 2 deletions src/connector/src/sink/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ mod tests {
use std::task::Poll;

use futures::{FutureExt, TryFuture};
use risingwave_common::util::epoch::test_epoch;
use tokio::sync::oneshot;
use tokio::sync::oneshot::Receiver;

Expand Down Expand Up @@ -640,14 +641,14 @@ mod tests {
#[tokio::test]
async fn test_future_delivery_manager_compress_chunk() {
let mut manager = DeliveryFutureManager::new(10);
let epoch1 = 233;
let epoch1 = test_epoch(233);
let chunk_id1 = 1;
let chunk_id2 = chunk_id1 + 1;
let chunk_id3 = chunk_id2 + 1;
let (tx1_1, rx1_1) = oneshot::channel();
let (tx1_2, rx1_2) = oneshot::channel();
let (tx1_3, rx1_3) = oneshot::channel();
let epoch2 = epoch1 + 1;
let epoch2 = test_epoch(234);
let (tx2_1, rx2_1) = oneshot::channel();
assert!(!manager
.start_write_chunk(epoch1, chunk_id1)
Expand Down
2 changes: 1 addition & 1 deletion src/ctl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ uuid = { version = "1", features = ["v4"] }
workspace-hack = { path = "../workspace-hack" }

[dev-dependencies]
risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] }
risingwave_hummock_sdk = { workspace = true }

[lints]
workspace = true
Expand Down
2 changes: 1 addition & 1 deletion src/jni_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ tracing = "0.1"
[dev-dependencies]
expect-test = "1"
risingwave_expr = { workspace = true }
risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] }
risingwave_hummock_sdk = { workspace = true }

[lints]
workspace = true
2 changes: 1 addition & 1 deletion src/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ workspace-hack = { path = "../workspace-hack" }
assert_matches = "1"
expect-test = "1.4"
rand = "0.8"
risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] }
risingwave_hummock_sdk = { workspace = true }
risingwave_test_runner = { workspace = true }

[features]
Expand Down
Loading

0 comments on commit cd28ff0

Please sign in to comment.