Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(storage): remove enable_test_epoch and modify epochs used by all unit tests to be realistic #14557

Merged
merged 52 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
c558c9b
save work, remain about 30-40 unit tests
wcy-fdu Jan 14, 2024
db99819
modified a lot, remain 30
wcy-fdu Jan 15, 2024
404be29
remain 26
wcy-fdu Jan 15, 2024
e95c965
remain 8
wcy-fdu Jan 15, 2024
30821a3
remain 7, help wanted
wcy-fdu Jan 15, 2024
104c2a1
remain 6, help wanted
wcy-fdu Jan 16, 2024
ffbbb65
rolved another one
wcy-fdu Jan 16, 2024
9a1a0d0
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
wcy-fdu Jan 16, 2024
e5a6fd6
find a bug, only remain two ut
wcy-fdu Jan 16, 2024
8d5c0a7
all ut pass
wcy-fdu Jan 17, 2024
5277575
refactor 1/3
wcy-fdu Jan 22, 2024
e151d9b
save work
wcy-fdu Jan 23, 2024
302601b
minor
wcy-fdu Jan 24, 2024
2e3c58a
finish 99%
wcy-fdu Jan 24, 2024
2244612
fmt
wcy-fdu Jan 24, 2024
2379d70
fix one ut
wcy-fdu Jan 25, 2024
98558a2
update comments
wcy-fdu Jan 30, 2024
8b9302e
refactor, remove TestEpoch and TestEpochWithGap
wcy-fdu Jan 31, 2024
47fd634
resolve some commntes
wcy-fdu Feb 7, 2024
4e72501
remove 90% magic number
wcy-fdu Feb 7, 2024
7e74b13
refactor agg uts
wcy-fdu Feb 8, 2024
2e8b83e
minor
wcy-fdu Feb 18, 2024
a76514d
resolve comments
wcy-fdu Feb 22, 2024
4fe88e0
fmt toml file
wcy-fdu Feb 22, 2024
0a1dfbb
fix stest
wcy-fdu Feb 22, 2024
2816fa9
timeout 25mins
wcy-fdu Feb 22, 2024
00ec751
minor
wcy-fdu Feb 29, 2024
e338c45
remove usage of EpochWithGap::new_for_test(magic_number).as_u64_for_t…
wcy-fdu Feb 29, 2024
8f99665
add EpochExt for u64
wcy-fdu Feb 29, 2024
2eebf85
add doc
wcy-fdu Mar 1, 2024
8f28f85
avoid leak the concept of EpochWithGap
wcy-fdu Mar 1, 2024
ace01c4
clippy happy
wcy-fdu Mar 1, 2024
d6b9211
do not change timeout minutes
wcy-fdu Mar 1, 2024
6fde6b7
make clippy happy
wcy-fdu Mar 1, 2024
a548190
make clippy happy
wcy-fdu Mar 1, 2024
aef803d
rollback compaction test
wcy-fdu Mar 1, 2024
64319f6
resolve all comments
wcy-fdu Mar 1, 2024
b427f4d
timeout 25mins
wcy-fdu Mar 4, 2024
dc63913
resolve some comments
wcy-fdu Mar 4, 2024
60b4a2a
resolve comments
wcy-fdu Mar 6, 2024
2ead893
resolve comments
wcy-fdu Mar 6, 2024
7e02709
resolve all conflicts
wcy-fdu Mar 6, 2024
5930b19
save work
wcy-fdu Mar 6, 2024
fd9672a
timeout 22
wcy-fdu Mar 7, 2024
339f9d4
minor
wcy-fdu Mar 7, 2024
2398adf
go ahead resolve some comments
wcy-fdu Mar 7, 2024
4dde294
rebase main
wcy-fdu Mar 7, 2024
e2eff55
fix merger
wcy-fdu Mar 7, 2024
6c4aff9
resolve some comments
wcy-fdu Mar 7, 2024
e271f7a
add comments
wcy-fdu Mar 7, 2024
2d0a335
fix merger
wcy-fdu Mar 7, 2024
6db3372
resolve comments, can merge
wcy-fdu Mar 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions src/common/src/util/epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ impl EpochPair {
}

pub fn inc(&mut self) {
self.curr += 1;
self.prev += 1;
self.prev = self.curr;
self.curr += 1 << 16;
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
}

pub fn inc_for_test(&mut self, inc_by: u64) {
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -162,8 +162,8 @@ impl EpochPair {
}

pub fn new_test_epoch(curr: u64) -> Self {
assert!(curr > 0);
Self::new(curr, curr - 1)
assert!(curr > 65535);
Self::new(curr, curr - 65536)
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
23 changes: 13 additions & 10 deletions src/compute/tests/cdc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use risingwave_connector::source::cdc::external::{
};
use risingwave_connector::source::cdc::{CdcSplitBase, DebeziumCdcSplit, MySqlCdcSplit};
use risingwave_connector::source::SplitImpl;
use risingwave_hummock_sdk::to_committed_batch_query_epoch;
use risingwave_hummock_sdk::{to_committed_batch_query_epoch, EpochWithGap};
use risingwave_storage::memory::MemoryStateStore;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_stream::common::table::state_table::StateTable;
Expand Down Expand Up @@ -290,7 +290,7 @@ async fn test_cdc_backfill() -> StreamResult<()> {
let stream_chunk2 = create_stream_chunk(chunk2_datums, &chunk_schema);

// The first barrier
let curr_epoch = 11;
let mut curr_epoch = EpochWithGap::new_without_offset(11);
let mut splits = HashMap::new();
splits.insert(
actor_id,
Expand All @@ -306,13 +306,14 @@ async fn test_cdc_backfill() -> StreamResult<()> {
_phantom: PhantomData,
})],
);
let init_barrier =
Barrier::new_test_barrier(curr_epoch).with_mutation(Mutation::Add(AddMutation {
let init_barrier = Barrier::new_test_barrier(curr_epoch.as_u64_for_test()).with_mutation(
Mutation::Add(AddMutation {
adds: HashMap::new(),
added_actors: HashSet::new(),
splits,
pause: false,
}));
}),
);

tx.send_barrier(init_barrier);

Expand All @@ -323,7 +324,7 @@ async fn test_cdc_backfill() -> StreamResult<()> {
epoch,
mutation: Some(_),
..
}) if epoch.curr == curr_epoch
}) if epoch.curr == curr_epoch.as_u64_for_test()
));

// start the stream pipeline src -> backfill -> mview
Expand All @@ -332,17 +333,19 @@ async fn test_cdc_backfill() -> StreamResult<()> {
// ingest data and barrier
let interval = Duration::from_millis(10);
tx.push_chunk(stream_chunk1);

tokio::time::sleep(interval).await;
tx.push_barrier(curr_epoch + 1, false);
curr_epoch.inc();
tx.push_barrier(curr_epoch.as_u64_for_test(), false);

tx.push_chunk(stream_chunk2);

tokio::time::sleep(interval).await;
tx.push_barrier(curr_epoch + 2, false);
curr_epoch.inc();
tx.push_barrier(curr_epoch.as_u64_for_test(), false);

tokio::time::sleep(interval).await;
tx.push_barrier(curr_epoch + 3, true);
curr_epoch.inc();
tx.push_barrier(curr_epoch.as_u64_for_test(), true);

// scan the final result of the mv table
let column_descs = vec![
Expand Down
23 changes: 12 additions & 11 deletions src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_connector::source::SourceCtrlOpts;
use risingwave_connector::ConnectorParams;
use risingwave_hummock_sdk::to_committed_batch_query_epoch;
use risingwave_hummock_sdk::{to_committed_batch_query_epoch, EpochWithGap};
use risingwave_pb::catalog::StreamSourceInfo;
use risingwave_pb::plan_common::PbRowFormatType;
use risingwave_source::connector_test_utils::create_source_desc_builder;
Expand Down Expand Up @@ -274,9 +274,9 @@ async fn test_table_materialize() -> StreamResult<()> {
assert!(result.is_none());

// Send a barrier to start materialized view.
let mut curr_epoch = 1919;
let mut curr_epoch = EpochWithGap::new_without_offset(1919);
barrier_tx
.send(Barrier::new_test_barrier(curr_epoch))
.send(Barrier::new_test_barrier(curr_epoch.as_u64_for_test()))
.unwrap();

assert!(matches!(
Expand All @@ -285,17 +285,17 @@ async fn test_table_materialize() -> StreamResult<()> {
epoch,
mutation: None,
..
}) if epoch.curr == curr_epoch
}) if epoch.curr == curr_epoch.as_u64_for_test()
));

curr_epoch += 1;
curr_epoch.inc();
let barrier_tx_clone = barrier_tx.clone();
tokio::spawn(async move {
let mut stream = insert.execute();
let _ = stream.next().await.unwrap()?;
// Send a barrier and poll again, should write changes to storage.
barrier_tx_clone
.send(Barrier::new_test_barrier(curr_epoch))
.send(Barrier::new_test_barrier(curr_epoch.as_u64_for_test()))
.unwrap();
Ok::<_, RwError>(())
});
Expand Down Expand Up @@ -325,7 +325,7 @@ async fn test_table_materialize() -> StreamResult<()> {
epoch,
mutation: None,
..
}) if epoch.curr == curr_epoch
}) if epoch.curr == curr_epoch.as_u64_for_test()
));

// Scan the table again, we are able to get the data now!
Expand Down Expand Up @@ -370,14 +370,14 @@ async fn test_table_materialize() -> StreamResult<()> {
0,
));

curr_epoch += 1;
curr_epoch.inc();
let barrier_tx_clone = barrier_tx.clone();
tokio::spawn(async move {
let mut stream = delete.execute();
let _ = stream.next().await.unwrap()?;
// Send a barrier and poll again, should write changes to storage.
barrier_tx_clone
.send(Barrier::new_test_barrier(curr_epoch))
.send(Barrier::new_test_barrier(curr_epoch.as_u64_for_test()))
.unwrap();
Ok::<_, RwError>(())
});
Expand All @@ -404,7 +404,7 @@ async fn test_table_materialize() -> StreamResult<()> {
epoch,
mutation: None,
..
}) if epoch.curr == curr_epoch
}) if epoch.curr == curr_epoch.as_u64_for_test()
));

// Scan the table again, we are able to see the deletion now!
Expand Down Expand Up @@ -463,7 +463,8 @@ async fn test_row_seq_scan() -> Result<()> {
vec![0, 1, 2],
);

let mut epoch = EpochPair::new_test_epoch(1);
let mut epoch =
EpochPair::new_test_epoch(EpochWithGap::new_without_offset(1).as_u64_for_test());
state.init_epoch(epoch);
state.insert(OwnedRow::new(vec![
Some(1_i32.into()),
Expand Down
1 change: 1 addition & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ regex = "1.4"
reqwest = { version = "0.11", features = ["json"] }
risingwave_common = { workspace = true }
risingwave_jni_core = { workspace = true }
risingwave_hummock_sdk = { workspace = true }
risingwave_pb = { workspace = true }
risingwave_rpc_client = { workspace = true }
rust_decimal = "1"
Expand Down
23 changes: 13 additions & 10 deletions src/connector/src/sink/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ mod tests {
use std::task::Poll;

use futures::{FutureExt, TryFuture};
use risingwave_hummock_sdk::EpochWithGap;
use tokio::sync::oneshot;
use tokio::sync::oneshot::Receiver;

Expand Down Expand Up @@ -639,33 +640,33 @@ mod tests {
#[tokio::test]
async fn test_future_delivery_manager_compress_chunk() {
let mut manager = DeliveryFutureManager::new(10);
let epoch1 = 233;
let epoch1 = EpochWithGap::new_without_offset(233);
let chunk_id1 = 1;
let chunk_id2 = chunk_id1 + 1;
let chunk_id3 = chunk_id2 + 1;
let (tx1_1, rx1_1) = oneshot::channel();
let (tx1_2, rx1_2) = oneshot::channel();
let (tx1_3, rx1_3) = oneshot::channel();
let epoch2 = epoch1 + 1;
let epoch2 = EpochWithGap::new_without_offset(234);
let (tx2_1, rx2_1) = oneshot::channel();
assert!(!manager
.start_write_chunk(epoch1, chunk_id1)
.start_write_chunk(epoch1.as_u64_for_test(), chunk_id1)
.add_future_may_await(to_test_future(rx1_1))
.await
.unwrap());
assert!(!manager
.start_write_chunk(epoch1, chunk_id2)
.start_write_chunk(epoch1.as_u64_for_test(), chunk_id2)
.add_future_may_await(to_test_future(rx1_2))
.await
.unwrap());
assert!(!manager
.start_write_chunk(epoch1, chunk_id3)
.start_write_chunk(epoch1.as_u64_for_test(), chunk_id3)
.add_future_may_await(to_test_future(rx1_3))
.await
.unwrap());
manager.add_barrier(epoch1);
manager.add_barrier(epoch1.as_u64_for_test());
assert!(!manager
.start_write_chunk(epoch2, chunk_id1)
.start_write_chunk(epoch2.as_u64_for_test(), chunk_id1)
.add_future_may_await(to_test_future(rx2_1))
.await
.unwrap());
Expand All @@ -688,7 +689,7 @@ mod tests {
assert_eq!(
next_truncate_offset.await.unwrap(),
TruncateOffset::Chunk {
epoch: epoch1,
epoch: epoch1.as_u64_for_test(),
chunk_id: chunk_id2
}
);
Expand All @@ -706,14 +707,16 @@ mod tests {
// Emit barrier though later chunk has finished.
assert_eq!(
next_truncate_offset.await.unwrap(),
TruncateOffset::Barrier { epoch: epoch1 }
TruncateOffset::Barrier {
epoch: epoch1.as_u64_for_test()
}
);
}
assert_eq!(manager.future_count, 1);
assert_eq!(
manager.next_truncate_offset().await.unwrap(),
TruncateOffset::Chunk {
epoch: epoch2,
epoch: epoch2.as_u64_for_test(),
chunk_id: chunk_id1
}
);
Expand Down
Loading