Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(storage): remove enable_test_epoch and modify epochs used by all unit tests to be realistic #14557

Merged
merged 52 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
c558c9b
save work, remain about 30-40 unit tests
wcy-fdu Jan 14, 2024
db99819
modified a lot, remain 30
wcy-fdu Jan 15, 2024
404be29
remain 26
wcy-fdu Jan 15, 2024
e95c965
remain 8
wcy-fdu Jan 15, 2024
30821a3
remain 7, help wanted
wcy-fdu Jan 15, 2024
104c2a1
remain 6, help wanted
wcy-fdu Jan 16, 2024
ffbbb65
rolved another one
wcy-fdu Jan 16, 2024
9a1a0d0
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
wcy-fdu Jan 16, 2024
e5a6fd6
find a bug, only remain two ut
wcy-fdu Jan 16, 2024
8d5c0a7
all ut pass
wcy-fdu Jan 17, 2024
5277575
refactor 1/3
wcy-fdu Jan 22, 2024
e151d9b
save work
wcy-fdu Jan 23, 2024
302601b
minor
wcy-fdu Jan 24, 2024
2e3c58a
finish 99%
wcy-fdu Jan 24, 2024
2244612
fmt
wcy-fdu Jan 24, 2024
2379d70
fix one ut
wcy-fdu Jan 25, 2024
98558a2
update comments
wcy-fdu Jan 30, 2024
8b9302e
refactor, remove TestEpoch and TestEpochWithGap
wcy-fdu Jan 31, 2024
47fd634
resolve some commntes
wcy-fdu Feb 7, 2024
4e72501
remove 90% magic number
wcy-fdu Feb 7, 2024
7e74b13
refactor agg uts
wcy-fdu Feb 8, 2024
2e8b83e
minor
wcy-fdu Feb 18, 2024
a76514d
resolve comments
wcy-fdu Feb 22, 2024
4fe88e0
fmt toml file
wcy-fdu Feb 22, 2024
0a1dfbb
fix stest
wcy-fdu Feb 22, 2024
2816fa9
timeout 25mins
wcy-fdu Feb 22, 2024
00ec751
minor
wcy-fdu Feb 29, 2024
e338c45
remove usage of EpochWithGap::new_for_test(magic_number).as_u64_for_t…
wcy-fdu Feb 29, 2024
8f99665
add EpochExt for u64
wcy-fdu Feb 29, 2024
2eebf85
add doc
wcy-fdu Mar 1, 2024
8f28f85
avoid leak the concept of EpochWithGap
wcy-fdu Mar 1, 2024
ace01c4
clippy happy
wcy-fdu Mar 1, 2024
d6b9211
do not change timeout minutes
wcy-fdu Mar 1, 2024
6fde6b7
make clippy happy
wcy-fdu Mar 1, 2024
a548190
make clippy happy
wcy-fdu Mar 1, 2024
aef803d
rollback compaction test
wcy-fdu Mar 1, 2024
64319f6
resolve all comments
wcy-fdu Mar 1, 2024
b427f4d
timeout 25mins
wcy-fdu Mar 4, 2024
dc63913
resolve some comments
wcy-fdu Mar 4, 2024
60b4a2a
resolve comments
wcy-fdu Mar 6, 2024
2ead893
resolve comments
wcy-fdu Mar 6, 2024
7e02709
resolve all conflicts
wcy-fdu Mar 6, 2024
5930b19
save work
wcy-fdu Mar 6, 2024
fd9672a
timeout 22
wcy-fdu Mar 7, 2024
339f9d4
minor
wcy-fdu Mar 7, 2024
2398adf
go ahead resolve some comments
wcy-fdu Mar 7, 2024
4dde294
rebase main
wcy-fdu Mar 7, 2024
e2eff55
fix merger
wcy-fdu Mar 7, 2024
6c4aff9
resolve some comments
wcy-fdu Mar 7, 2024
e271f7a
add comments
wcy-fdu Mar 7, 2024
2d0a335
fix merger
wcy-fdu Mar 7, 2024
6db3372
resolve comments, can merge
wcy-fdu Mar 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 127 additions & 4 deletions src/common/src/util/epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ impl EpochPair {
}

pub fn inc(&mut self) {
self.curr += 1;
self.prev += 1;
self.prev = self.curr;
self.curr += 1 << 16;
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
}

pub fn inc_for_test(&mut self, inc_by: u64) {
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -162,8 +162,131 @@ impl EpochPair {
}

pub fn new_test_epoch(curr: u64) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to test the real scenarios if we only use test epoch in UT ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand what you mean, after this pr, we can test mem table spill in unit test.

assert!(curr > 0);
Self::new(curr, curr - 1)
assert!(curr > 65535);
Self::new(curr, curr - TestEpoch::new_without_offset(1).as_u64())
}
}

/// The `TestEpoch` struct is used in unit tests to provide consistent logic similar to a normal epoch.
/// It ensures that the lower 16 bits are always zero and any addition or subtraction operations are only applied to the upper 48 bits.
#[derive(Clone, Copy, PartialEq, Eq, Hash, Default, Debug, PartialOrd, Ord)]
pub struct TestEpoch {
epoch_with_gap: TestEpochWithGap,
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
}

impl TestEpoch {
pub fn new_without_offset(epoch: u64) -> Self {
Self {
epoch_with_gap: TestEpochWithGap::new(epoch * (1 << EPOCH_PHYSICAL_SHIFT_BITS), 0),
}
}

pub fn new(epoch: u64, spill_offset: u16) -> Self {
Self {
epoch_with_gap: TestEpochWithGap::new(
epoch * (1 << EPOCH_PHYSICAL_SHIFT_BITS),
spill_offset,
),
}
}

pub fn inc(&mut self) {
self.epoch_with_gap.inc();
}

pub fn next_epoch(&self) -> TestEpoch {
Self {
epoch_with_gap: TestEpochWithGap::new(
self.epoch_with_gap.0 + (1 << EPOCH_PHYSICAL_SHIFT_BITS),
0,
),
}
}

pub fn prev_epoch(&self) -> TestEpoch {
Self {
epoch_with_gap: TestEpochWithGap::new(
self.epoch_with_gap.0 - (1 << EPOCH_PHYSICAL_SHIFT_BITS),
0,
),
}
}

pub fn sub(&mut self) {
self.epoch_with_gap.sub();
}

pub fn inc_by(&mut self, e: u64) {
self.epoch_with_gap.inc_by(e);
}

pub fn pure_epoch(&self) -> u64 {
self.epoch_with_gap.pure_epoch()
}

pub fn as_u64(&self) -> u64 {
self.epoch_with_gap._as_u64()
}
}

/// This structure is the same as `EpochWithGap` in the storage crate,
/// but it is mocked in the common crate to prevent circular dependencies between the common and storage crates,
/// as it is needed for all unit tests to use.
#[derive(Clone, Copy, PartialEq, Eq, Hash, Default, Debug, PartialOrd, Ord)]
pub struct TestEpochWithGap(u64);
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved

impl TestEpochWithGap {
#[allow(unused_variables)]
pub fn new(epoch: u64, spill_offset: u16) -> Self {
// We only use 48 high bit to store epoch and use 16 low bit to store spill offset. But for MAX epoch,
// we still keep `u64::MAX` because we have use it in delete range and persist this value to sstable files.
// So for compatibility, we must skip checking it for u64::MAX. See bug description in https://github.com/risingwavelabs/risingwave/issues/13717
if is_max_epoch(epoch) {
TestEpochWithGap::new_max_epoch()
} else {
debug_assert!((epoch & EPOCH_SPILL_TIME_MASK) == 0);
TestEpochWithGap(epoch + spill_offset as u64)
}
}

pub fn new_from_epoch(epoch: u64) -> Self {
TestEpochWithGap::new(epoch, 0)
}

pub fn new_min_epoch() -> Self {
TestEpochWithGap(0)
}

pub fn new_max_epoch() -> Self {
TestEpochWithGap(u64::MAX)
}

// return the epoch_with_gap(epoch + spill_offset)
pub(crate) fn _as_u64(&self) -> u64 {
self.0
}

// return the pure epoch without spill offset
pub fn pure_epoch(&self) -> u64 {
self.0 & !EPOCH_SPILL_TIME_MASK
}

pub fn offset(&self) -> u64 {
self.0 & EPOCH_SPILL_TIME_MASK
}

pub fn inc(&mut self) {
self.0 += 1 << EPOCH_PHYSICAL_SHIFT_BITS;
}

pub fn sub(&mut self) {
if self.0 > (1 << EPOCH_PHYSICAL_SHIFT_BITS) {
self.0 -= 1 << EPOCH_PHYSICAL_SHIFT_BITS;
}
}

pub fn inc_by(&mut self, e: u64) {
self.0 += e << EPOCH_PHYSICAL_SHIFT_BITS;
}
}

Expand Down
17 changes: 10 additions & 7 deletions src/compute/tests/cdc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use risingwave_batch::executor::{Executor as BatchExecutor, RowSeqScanExecutor,
use risingwave_common::array::{Array, ArrayBuilder, DataChunk, Op, StreamChunk, Utf8ArrayBuilder};
use risingwave_common::catalog::{ColumnDesc, ColumnId, ConflictBehavior, Field, Schema, TableId};
use risingwave_common::types::{Datum, JsonbVal};
use risingwave_common::util::epoch::TestEpoch;
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_connector::source::cdc::external::mock_external_table::MockExternalTableReader;
use risingwave_connector::source::cdc::external::{
Expand Down Expand Up @@ -290,7 +291,7 @@ async fn test_cdc_backfill() -> StreamResult<()> {
let stream_chunk2 = create_stream_chunk(chunk2_datums, &chunk_schema);

// The first barrier
let curr_epoch = 11;
let mut curr_epoch = TestEpoch::new_without_offset(11);
let mut splits = HashMap::new();
splits.insert(
actor_id,
Expand All @@ -307,7 +308,7 @@ async fn test_cdc_backfill() -> StreamResult<()> {
})],
);
let init_barrier =
Barrier::new_test_barrier(curr_epoch).with_mutation(Mutation::Add(AddMutation {
Barrier::new_test_barrier(curr_epoch.as_u64()).with_mutation(Mutation::Add(AddMutation {
adds: HashMap::new(),
added_actors: HashSet::new(),
splits,
Expand All @@ -323,7 +324,7 @@ async fn test_cdc_backfill() -> StreamResult<()> {
epoch,
mutation: Some(_),
..
}) if epoch.curr == curr_epoch
}) if epoch.curr == curr_epoch.as_u64()
));

// start the stream pipeline src -> backfill -> mview
Expand All @@ -332,17 +333,19 @@ async fn test_cdc_backfill() -> StreamResult<()> {
// ingest data and barrier
let interval = Duration::from_millis(10);
tx.push_chunk(stream_chunk1);

tokio::time::sleep(interval).await;
tx.push_barrier(curr_epoch + 1, false);
curr_epoch.inc();
tx.push_barrier(curr_epoch.as_u64(), false);

tx.push_chunk(stream_chunk2);

tokio::time::sleep(interval).await;
tx.push_barrier(curr_epoch + 2, false);
curr_epoch.inc();
tx.push_barrier(curr_epoch.as_u64(), false);

tokio::time::sleep(interval).await;
tx.push_barrier(curr_epoch + 3, true);
curr_epoch.inc();
tx.push_barrier(curr_epoch.as_u64(), true);

// scan the final result of the mv table
let column_descs = vec![
Expand Down
22 changes: 11 additions & 11 deletions src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use risingwave_common::row::OwnedRow;
use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
use risingwave_common::test_prelude::DataChunkTestExt;
use risingwave_common::types::{DataType, IntoOrdered};
use risingwave_common::util::epoch::EpochPair;
use risingwave_common::util::epoch::{EpochPair, TestEpoch};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_connector::source::SourceCtrlOpts;
Expand Down Expand Up @@ -274,9 +274,9 @@ async fn test_table_materialize() -> StreamResult<()> {
assert!(result.is_none());

// Send a barrier to start materialized view.
let mut curr_epoch = 1919;
let mut curr_epoch = TestEpoch::new_without_offset(1919);
barrier_tx
.send(Barrier::new_test_barrier(curr_epoch))
.send(Barrier::new_test_barrier(curr_epoch.as_u64()))
.unwrap();

assert!(matches!(
Expand All @@ -285,17 +285,17 @@ async fn test_table_materialize() -> StreamResult<()> {
epoch,
mutation: None,
..
}) if epoch.curr == curr_epoch
}) if epoch.curr == curr_epoch.as_u64()
));

curr_epoch += 1;
curr_epoch.inc();
let barrier_tx_clone = barrier_tx.clone();
tokio::spawn(async move {
let mut stream = insert.execute();
let _ = stream.next().await.unwrap()?;
// Send a barrier and poll again, should write changes to storage.
barrier_tx_clone
.send(Barrier::new_test_barrier(curr_epoch))
.send(Barrier::new_test_barrier(curr_epoch.as_u64()))
.unwrap();
Ok::<_, RwError>(())
});
Expand Down Expand Up @@ -325,7 +325,7 @@ async fn test_table_materialize() -> StreamResult<()> {
epoch,
mutation: None,
..
}) if epoch.curr == curr_epoch
}) if epoch.curr == curr_epoch.as_u64()
));

// Scan the table again, we are able to get the data now!
Expand Down Expand Up @@ -370,14 +370,14 @@ async fn test_table_materialize() -> StreamResult<()> {
0,
));

curr_epoch += 1;
curr_epoch.inc();
let barrier_tx_clone = barrier_tx.clone();
tokio::spawn(async move {
let mut stream = delete.execute();
let _ = stream.next().await.unwrap()?;
// Send a barrier and poll again, should write changes to storage.
barrier_tx_clone
.send(Barrier::new_test_barrier(curr_epoch))
.send(Barrier::new_test_barrier(curr_epoch.as_u64()))
.unwrap();
Ok::<_, RwError>(())
});
Expand All @@ -404,7 +404,7 @@ async fn test_table_materialize() -> StreamResult<()> {
epoch,
mutation: None,
..
}) if epoch.curr == curr_epoch
}) if epoch.curr == curr_epoch.as_u64()
));

// Scan the table again, we are able to see the deletion now!
Expand Down Expand Up @@ -463,7 +463,7 @@ async fn test_row_seq_scan() -> Result<()> {
vec![0, 1, 2],
);

let mut epoch = EpochPair::new_test_epoch(1);
let mut epoch = EpochPair::new_test_epoch(TestEpoch::new_without_offset(1).as_u64());
state.init_epoch(epoch);
state.insert(OwnedRow::new(vec![
Some(1_i32.into()),
Expand Down
23 changes: 13 additions & 10 deletions src/connector/src/sink/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ mod tests {
use std::task::Poll;

use futures::{FutureExt, TryFuture};
use risingwave_common::util::epoch::TestEpoch;
use tokio::sync::oneshot;
use tokio::sync::oneshot::Receiver;

Expand Down Expand Up @@ -639,33 +640,33 @@ mod tests {
#[tokio::test]
async fn test_future_delivery_manager_compress_chunk() {
let mut manager = DeliveryFutureManager::new(10);
let epoch1 = 233;
let epoch1 = TestEpoch::new_without_offset(233);
let chunk_id1 = 1;
let chunk_id2 = chunk_id1 + 1;
let chunk_id3 = chunk_id2 + 1;
let (tx1_1, rx1_1) = oneshot::channel();
let (tx1_2, rx1_2) = oneshot::channel();
let (tx1_3, rx1_3) = oneshot::channel();
let epoch2 = epoch1 + 1;
let epoch2 = TestEpoch::new_without_offset(234);
let (tx2_1, rx2_1) = oneshot::channel();
assert!(!manager
.start_write_chunk(epoch1, chunk_id1)
.start_write_chunk(epoch1.as_u64(), chunk_id1)
.add_future_may_await(to_test_future(rx1_1))
.await
.unwrap());
assert!(!manager
.start_write_chunk(epoch1, chunk_id2)
.start_write_chunk(epoch1.as_u64(), chunk_id2)
.add_future_may_await(to_test_future(rx1_2))
.await
.unwrap());
assert!(!manager
.start_write_chunk(epoch1, chunk_id3)
.start_write_chunk(epoch1.as_u64(), chunk_id3)
.add_future_may_await(to_test_future(rx1_3))
.await
.unwrap());
manager.add_barrier(epoch1);
manager.add_barrier(epoch1.as_u64());
assert!(!manager
.start_write_chunk(epoch2, chunk_id1)
.start_write_chunk(epoch2.as_u64(), chunk_id1)
.add_future_may_await(to_test_future(rx2_1))
.await
.unwrap());
Expand All @@ -688,7 +689,7 @@ mod tests {
assert_eq!(
next_truncate_offset.await.unwrap(),
TruncateOffset::Chunk {
epoch: epoch1,
epoch: epoch1.as_u64(),
chunk_id: chunk_id2
}
);
Expand All @@ -706,14 +707,16 @@ mod tests {
// Emit barrier though later chunk has finished.
assert_eq!(
next_truncate_offset.await.unwrap(),
TruncateOffset::Barrier { epoch: epoch1 }
TruncateOffset::Barrier {
epoch: epoch1.as_u64()
}
);
}
assert_eq!(manager.future_count, 1);
assert_eq!(
manager.next_truncate_offset().await.unwrap(),
TruncateOffset::Chunk {
epoch: epoch2,
epoch: epoch2.as_u64(),
chunk_id: chunk_id1
}
);
Expand Down
Loading