Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(storage): remove enable_test_epoch and modify epochs used by all unit tests to be realistic #14557

Merged
merged 52 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from 48 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
c558c9b
save work, remain about 30-40 unit tests
wcy-fdu Jan 14, 2024
db99819
modified a lot, remain 30
wcy-fdu Jan 15, 2024
404be29
remain 26
wcy-fdu Jan 15, 2024
e95c965
remain 8
wcy-fdu Jan 15, 2024
30821a3
remain 7, help wanted
wcy-fdu Jan 15, 2024
104c2a1
remain 6, help wanted
wcy-fdu Jan 16, 2024
ffbbb65
rolved another one
wcy-fdu Jan 16, 2024
9a1a0d0
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
wcy-fdu Jan 16, 2024
e5a6fd6
find a bug, only remain two ut
wcy-fdu Jan 16, 2024
8d5c0a7
all ut pass
wcy-fdu Jan 17, 2024
5277575
refactor 1/3
wcy-fdu Jan 22, 2024
e151d9b
save work
wcy-fdu Jan 23, 2024
302601b
minor
wcy-fdu Jan 24, 2024
2e3c58a
finish 99%
wcy-fdu Jan 24, 2024
2244612
fmt
wcy-fdu Jan 24, 2024
2379d70
fix one ut
wcy-fdu Jan 25, 2024
98558a2
update comments
wcy-fdu Jan 30, 2024
8b9302e
refactor, remove TestEpoch and TestEpochWithGap
wcy-fdu Jan 31, 2024
47fd634
resolve some commntes
wcy-fdu Feb 7, 2024
4e72501
remove 90% magic number
wcy-fdu Feb 7, 2024
7e74b13
refactor agg uts
wcy-fdu Feb 8, 2024
2e8b83e
minor
wcy-fdu Feb 18, 2024
a76514d
resolve comments
wcy-fdu Feb 22, 2024
4fe88e0
fmt toml file
wcy-fdu Feb 22, 2024
0a1dfbb
fix stest
wcy-fdu Feb 22, 2024
2816fa9
timeout 25mins
wcy-fdu Feb 22, 2024
00ec751
minor
wcy-fdu Feb 29, 2024
e338c45
remove usage of EpochWithGap::new_for_test(magic_number).as_u64_for_t…
wcy-fdu Feb 29, 2024
8f99665
add EpochExt for u64
wcy-fdu Feb 29, 2024
2eebf85
add doc
wcy-fdu Mar 1, 2024
8f28f85
avoid leak the concept of EpochWithGap
wcy-fdu Mar 1, 2024
ace01c4
clippy happy
wcy-fdu Mar 1, 2024
d6b9211
do not change timeout minutes
wcy-fdu Mar 1, 2024
6fde6b7
make clippy happy
wcy-fdu Mar 1, 2024
a548190
make clippy happy
wcy-fdu Mar 1, 2024
aef803d
rollback compaction test
wcy-fdu Mar 1, 2024
64319f6
resolve all comments
wcy-fdu Mar 1, 2024
b427f4d
timeout 25mins
wcy-fdu Mar 4, 2024
dc63913
resolve some comments
wcy-fdu Mar 4, 2024
60b4a2a
resolve comments
wcy-fdu Mar 6, 2024
2ead893
resolve comments
wcy-fdu Mar 6, 2024
7e02709
resolve all conflicts
wcy-fdu Mar 6, 2024
5930b19
save work
wcy-fdu Mar 6, 2024
fd9672a
timeout 22
wcy-fdu Mar 7, 2024
339f9d4
minor
wcy-fdu Mar 7, 2024
2398adf
go ahead resolve some comments
wcy-fdu Mar 7, 2024
4dde294
rebase main
wcy-fdu Mar 7, 2024
e2eff55
fix merger
wcy-fdu Mar 7, 2024
6c4aff9
resolve some comments
wcy-fdu Mar 7, 2024
e271f7a
add comments
wcy-fdu Mar 7, 2024
2d0a335
fix merger
wcy-fdu Mar 7, 2024
6db3372
resolve comments, can merge
wcy-fdu Mar 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/scripts/check.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ sccache --show-stats
sccache --zero-stats

echo "--- Run clippy check (release)"
cargo clippy --release --all-targets --features "rw-static-link,enable_test_epoch_in_release" --locked -- -D warnings
cargo clippy --release --all-targets --features "rw-static-link" --locked -- -D warnings

echo "--- Run cargo check on building the release binary (release)"
cargo check -p risingwave_cmd_all --features "rw-static-link" --profile release
Expand Down
2 changes: 1 addition & 1 deletion src/batch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ workspace-hack = { path = "../workspace-hack" }
criterion = { workspace = true, features = ["async_tokio", "async"] }
rand = "0.8"
risingwave_expr_impl = { workspace = true }
risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] }
risingwave_hummock_sdk = { workspace = true }
tempfile = "3"
tikv-jemallocator = { workspace = true }

Expand Down
47 changes: 38 additions & 9 deletions src/common/src/util/epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::sync::LazyLock;
use std::time::{Duration, SystemTime};

use easy_ext::ext;
use parse_display::Display;

use crate::types::{ScalarImpl, Timestamptz};
Expand Down Expand Up @@ -122,6 +123,11 @@ pub const EPOCH_SPILL_TIME_MASK: u64 = (1 << EPOCH_AVAILABLE_BITS) - 1;
const EPOCH_MASK: u64 = !EPOCH_SPILL_TIME_MASK;
pub const MAX_EPOCH: u64 = u64::MAX & EPOCH_MASK;

// EPOCH_INC_MIN_STEP_FOR_TEST is the minimum increment step for epoch in unit tests.
// We need to keep the lower 16 bits of the epoch unchanged during each increment,
// and only increase the upper 48 bits.
const EPOCH_INC_MIN_STEP_FOR_TEST: u64 = test_epoch(1);

pub fn is_max_epoch(epoch: u64) -> bool {
// Since we have write `MAX_EPOCH` as max epoch to sstable in some previous version,
// it means that there may be two value in our system which represent infinite. We must check
Expand Down Expand Up @@ -150,20 +156,43 @@ impl EpochPair {
Self { curr, prev }
}

pub fn inc(&mut self) {
self.curr += 1;
self.prev += 1;
pub fn inc_for_test(&mut self) {
self.prev = self.curr;
self.curr += EPOCH_INC_MIN_STEP_FOR_TEST;
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
}

pub fn inc_for_test(&mut self, inc_by: u64) {
self.prev = self.curr;
pub fn new_test_epoch(curr: u64) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to test the real scenarios if we only use test epoch in UT ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand what you mean, after this pr, we can test mem table spill in unit test.

if !is_max_epoch(curr) {
assert!(curr >= EPOCH_INC_MIN_STEP_FOR_TEST);
assert!((curr & EPOCH_SPILL_TIME_MASK) == 0);
}
Self::new(curr, curr - EPOCH_INC_MIN_STEP_FOR_TEST)
}
}
/// As most unit tests initializ a new epoch from a random value (e.g. 1, 2, 233 etc.), but the correct epoch in the system is a u64 with the last `EPOCH_AVAILABLE_BITS` bits set to 0.
/// This method is to turn a a random epoch into a well shifted value.
pub const fn test_epoch(value: u64) -> u64 {
value << EPOCH_AVAILABLE_BITS
}

self.curr += inc_by;
/// There are numerous operations in our system's unit tests that involve incrementing or decrementing the epoch.
/// These extensions for u64 type are specifically used within the unit tests.
#[ext(EpochExt)]
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
pub impl u64 {
fn inc_epoch(&mut self) {
*self += EPOCH_INC_MIN_STEP_FOR_TEST;
}

pub fn new_test_epoch(curr: u64) -> Self {
assert!(curr > 0);
Self::new(curr, curr - 1)
fn dec_epoch(&mut self) {
*self -= EPOCH_INC_MIN_STEP_FOR_TEST;
}

fn next_epoch(self) -> u64 {
self + EPOCH_INC_MIN_STEP_FOR_TEST
}

fn prev_epoch(self) -> u64 {
self - EPOCH_INC_MIN_STEP_FOR_TEST
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/compute/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ workspace-hack = { path = "../workspace-hack" }
[dev-dependencies]
futures-async-stream = { workspace = true }
rand = "0.8"
risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] }
risingwave_hummock_sdk = { workspace = true }
tempfile = "3"

[lints]
Expand Down
13 changes: 8 additions & 5 deletions src/compute/tests/cdc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use risingwave_batch::executor::{Executor as BatchExecutor, RowSeqScanExecutor,
use risingwave_common::array::{Array, ArrayBuilder, DataChunk, Op, StreamChunk, Utf8ArrayBuilder};
use risingwave_common::catalog::{ColumnDesc, ColumnId, ConflictBehavior, Field, Schema, TableId};
use risingwave_common::types::{Datum, JsonbVal};
use risingwave_common::util::epoch::{test_epoch, EpochExt};
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_connector::source::cdc::external::mock_external_table::MockExternalTableReader;
use risingwave_connector::source::cdc::external::{
Expand Down Expand Up @@ -279,7 +280,7 @@ async fn test_cdc_backfill() -> StreamResult<()> {
let stream_chunk2 = create_stream_chunk(chunk2_datums, &chunk_schema);

// The first barrier
let curr_epoch = 11;
let mut curr_epoch = test_epoch(11);
let mut splits = HashMap::new();
splits.insert(
actor_id,
Expand Down Expand Up @@ -311,17 +312,19 @@ async fn test_cdc_backfill() -> StreamResult<()> {
// ingest data and barrier
let interval = Duration::from_millis(10);
tx.push_chunk(stream_chunk1);

tokio::time::sleep(interval).await;
tx.push_barrier(curr_epoch + 1, false);
curr_epoch.inc_epoch();
tx.push_barrier(curr_epoch, false);

tx.push_chunk(stream_chunk2);

tokio::time::sleep(interval).await;
tx.push_barrier(curr_epoch + 2, false);
curr_epoch.inc_epoch();
tx.push_barrier(curr_epoch, false);

tokio::time::sleep(interval).await;
tx.push_barrier(curr_epoch + 3, true);
curr_epoch.inc_epoch();
tx.push_barrier(curr_epoch, true);

// scan the final result of the mv table
let column_descs = vec![
Expand Down
12 changes: 6 additions & 6 deletions src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use risingwave_common::row::OwnedRow;
use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
use risingwave_common::test_prelude::DataChunkTestExt;
use risingwave_common::types::{DataType, IntoOrdered};
use risingwave_common::util::epoch::EpochPair;
use risingwave_common::util::epoch::{test_epoch, EpochExt, EpochPair};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_connector::source::reader::desc::test_utils::create_source_desc_builder;
Expand Down Expand Up @@ -275,7 +275,7 @@ async fn test_table_materialize() -> StreamResult<()> {
assert!(result.is_none());

// Send a barrier to start materialized view.
let mut curr_epoch = 1919;
let mut curr_epoch = test_epoch(1919);
barrier_tx
.send(Barrier::new_test_barrier(curr_epoch))
.unwrap();
Expand All @@ -289,7 +289,7 @@ async fn test_table_materialize() -> StreamResult<()> {
}) if epoch.curr == curr_epoch
));

curr_epoch += 1;
curr_epoch.inc_epoch();
let barrier_tx_clone = barrier_tx.clone();
tokio::spawn(async move {
let mut stream = insert.execute();
Expand Down Expand Up @@ -371,7 +371,7 @@ async fn test_table_materialize() -> StreamResult<()> {
0,
));

curr_epoch += 1;
curr_epoch.inc_epoch();
let barrier_tx_clone = barrier_tx.clone();
tokio::spawn(async move {
let mut stream = delete.execute();
Expand Down Expand Up @@ -464,7 +464,7 @@ async fn test_row_seq_scan() -> StreamResult<()> {
vec![0, 1, 2],
);

let mut epoch = EpochPair::new_test_epoch(1);
let mut epoch = EpochPair::new_test_epoch(test_epoch(1));
state.init_epoch(epoch);
state.insert(OwnedRow::new(vec![
Some(1_i32.into()),
Expand All @@ -477,7 +477,7 @@ async fn test_row_seq_scan() -> StreamResult<()> {
Some(8_i64.into()),
]));

epoch.inc();
epoch.inc_for_test();
state.commit(epoch).await.unwrap();

let executor = Box::new(RowSeqScanExecutor::new(
Expand Down
5 changes: 3 additions & 2 deletions src/connector/src/sink/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ mod tests {
use std::task::Poll;

use futures::{FutureExt, TryFuture};
use risingwave_common::util::epoch::test_epoch;
use tokio::sync::oneshot;
use tokio::sync::oneshot::Receiver;

Expand Down Expand Up @@ -640,14 +641,14 @@ mod tests {
#[tokio::test]
async fn test_future_delivery_manager_compress_chunk() {
let mut manager = DeliveryFutureManager::new(10);
let epoch1 = 233;
let epoch1 = test_epoch(233);
let chunk_id1 = 1;
let chunk_id2 = chunk_id1 + 1;
let chunk_id3 = chunk_id2 + 1;
let (tx1_1, rx1_1) = oneshot::channel();
let (tx1_2, rx1_2) = oneshot::channel();
let (tx1_3, rx1_3) = oneshot::channel();
let epoch2 = epoch1 + 1;
let epoch2 = test_epoch(234);
let (tx2_1, rx2_1) = oneshot::channel();
assert!(!manager
.start_write_chunk(epoch1, chunk_id1)
Expand Down
2 changes: 1 addition & 1 deletion src/ctl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ uuid = { version = "1", features = ["v4"] }
workspace-hack = { path = "../workspace-hack" }

[dev-dependencies]
risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] }
risingwave_hummock_sdk = { workspace = true }

[lints]
workspace = true
Expand Down
2 changes: 1 addition & 1 deletion src/jni_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ tracing = "0.1"
[dev-dependencies]
expect-test = "1"
risingwave_expr = { workspace = true }
risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] }
risingwave_hummock_sdk = { workspace = true }

[lints]
workspace = true
2 changes: 1 addition & 1 deletion src/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ workspace-hack = { path = "../workspace-hack" }
assert_matches = "1"
expect-test = "1.4"
rand = "0.8"
risingwave_hummock_sdk = { workspace = true, features = ["enable_test_epoch"] }
risingwave_hummock_sdk = { workspace = true }
risingwave_test_runner = { workspace = true }

[features]
Expand Down
Loading
Loading