From 95f9337e2c79e6022d365654a4c94d027ef239f2 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Mon, 12 Jun 2023 17:08:58 +0800 Subject: [PATCH] add snapshot test for global state store + local state store (replicated) iteration --- Cargo.lock | 2 + src/storage/hummock_test/Cargo.toml | 3 + src/storage/hummock_test/src/lib.rs | 2 +- .../hummock_test/src/state_store_tests.rs | 196 +++++++++++++++++- 4 files changed, 201 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1f4eac68794a1..382025a137bb0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6331,8 +6331,10 @@ dependencies = [ "async-trait", "bytes", "criterion", + "expect-test", "fail", "futures", + "futures-async-stream", "itertools", "madsim-tokio", "parking_lot 0.12.1", diff --git a/src/storage/hummock_test/Cargo.toml b/src/storage/hummock_test/Cargo.toml index bc7ff418946a0..7e0d82d80b8a3 100644 --- a/src/storage/hummock_test/Cargo.toml +++ b/src/storage/hummock_test/Cargo.toml @@ -39,10 +39,13 @@ workspace-hack = { path = "../../workspace-hack" } [dev-dependencies] criterion = { version = "0.4", features = ["async_futures"] } +expect-test = "1" futures = { version = "0.3", default-features = false, features = [ "alloc", "executor", ] } + +futures-async-stream = "0.2" risingwave_test_runner = { path = "../../test_runner" } serial_test = "0.9" sync-point = { path = "../../utils/sync-point" } diff --git a/src/storage/hummock_test/src/lib.rs b/src/storage/hummock_test/src/lib.rs index 12bbe52ee8743..8826c58c21fe4 100644 --- a/src/storage/hummock_test/src/lib.rs +++ b/src/storage/hummock_test/src/lib.rs @@ -11,7 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - +#![feature(proc_macro_hygiene, stmt_expr_attributes)] #![feature(custom_test_frameworks)] #![test_runner(risingwave_test_runner::test_runner::run_failpont_tests)] #![feature(bound_map)] diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index 6e2b2a76a6c1f..c34dadc2395a7 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -17,9 +17,11 @@ use std::ops::Bound::Unbounded; use std::sync::Arc; use bytes::Bytes; +use expect_test::expect; use futures::{pin_mut, TryStreamExt}; +use futures_async_stream::for_await; use risingwave_common::cache::CachePriority; -use risingwave_common::catalog::TableId; +use risingwave_common::catalog::{TableId, TableOption}; use risingwave_common::hash::VirtualNode; use risingwave_hummock_sdk::key::FullKey; use risingwave_hummock_sdk::{ @@ -1482,3 +1484,195 @@ async fn test_gc_watermark_and_clear_shared_buffer() { HummockSstableObjectId::MAX ); } + +/// Test the following behaviours: +/// 1. LocalStateStore can read replicated ReadVersion. +/// 2. GlobalStateStore cannot read replicated ReadVersion. +#[tokio::test] +async fn test_replicated_local_hummock_storage() { + const TEST_TABLE_ID: TableId = TableId { table_id: 233 }; + + let (hummock_storage, _meta_client) = with_hummock_storage_v2(Default::default()).await; + + let read_options = ReadOptions { + prefix_hint: None, + ignore_range_tombstone: false, + retention_seconds: None, + table_id: TableId { + table_id: TEST_TABLE_ID.table_id, + }, + read_version_from_backup: false, + prefetch_options: Default::default(), + cache_policy: CachePolicy::Fill(CachePriority::High), + }; + + let mut local_hummock_storage = hummock_storage + .new_local(NewLocalOptions::new_replicated( + TEST_TABLE_ID, + false, + TableOption { + retention_seconds: None, + }, + )) + .await; + + let epoch0 = local_hummock_storage + .read_version() + .read() + .committed() + .max_committed_epoch(); + + let epoch1 = epoch0 + 1; + + local_hummock_storage.init(epoch1); + // ingest 16B batch + let mut batch1 = vec![ + ( + Bytes::from([VirtualNode::ZERO.to_be_bytes().as_slice(), b"aaaa"].concat()), + StorageValue::new_put("1111"), + ), + ( + Bytes::from([VirtualNode::ZERO.to_be_bytes().as_slice(), b"bbbb"].concat()), + StorageValue::new_put("2222"), + ), + ]; + + batch1.sort_by(|(k1, _), (k2, _)| k1.cmp(k2)); + local_hummock_storage + .ingest_batch( + batch1, + vec![], + WriteOptions { + epoch: epoch1, + table_id: TEST_TABLE_ID, + }, + ) + .await + .unwrap(); + + // Test local state store read for replicated data. + { + assert!(local_hummock_storage.read_version().read().is_replicated()); + + let s = risingwave_storage::store::LocalStateStore::iter( + &local_hummock_storage, + (Unbounded, Unbounded), + read_options.clone(), + ) + .await + .unwrap(); + + let mut actual = vec![]; + + #[for_await] + for v in s { + actual.push(v) + } + + let expected = expect![[r#" + [ + Ok( + ( + FullKey { UserKey { 233, TableKey { 000061616161 } }, 1 }, + b"1111", + ), + ), + Ok( + ( + FullKey { UserKey { 233, TableKey { 000062626262 } }, 1 }, + b"2222", + ), + ), + ] + "#]]; + expected.assert_debug_eq(&actual); + } + + let epoch2 = epoch1 + 1; + + let mut local_hummock_storage_2 = hummock_storage + .new_local(NewLocalOptions::for_test(TEST_TABLE_ID)) + .await; + + local_hummock_storage_2.init(epoch2); + + // ingest 16B batch + let mut batch2 = vec![ + ( + Bytes::from([VirtualNode::ZERO.to_be_bytes().as_slice(), b"cccc"].concat()), + StorageValue::new_put("3333"), + ), + ( + Bytes::from([VirtualNode::ZERO.to_be_bytes().as_slice(), b"dddd"].concat()), + StorageValue::new_put("4444"), + ), + ]; + batch2.sort_by(|(k1, _), (k2, _)| k1.cmp(k2)); + + local_hummock_storage_2 + .ingest_batch( + batch2, + vec![], + WriteOptions { + epoch: epoch2, + table_id: TEST_TABLE_ID, + }, + ) + .await + .unwrap(); + + // Test Global State Store iter, epoch2 + { + let iter = hummock_storage + .iter((Unbounded, Unbounded), epoch2, read_options.clone()) + .await + .unwrap(); + pin_mut!(iter); + + let mut actual = vec![]; + + #[for_await] + for v in iter { + actual.push(v); + } + + let expected = expect![[r#" + [ + Ok( + ( + FullKey { UserKey { 233, TableKey { 000063636363 } }, 2 }, + b"3333", + ), + ), + Ok( + ( + FullKey { UserKey { 233, TableKey { 000064646464 } }, 2 }, + b"4444", + ), + ), + ] + "#]]; + expected.assert_debug_eq(&actual); + } + + // Test Global State Store iter, epoch1 + { + let iter = hummock_storage + .iter((Unbounded, Unbounded), epoch1, read_options) + .await + .unwrap(); + pin_mut!(iter); + + let mut actual = vec![]; + + #[for_await] + for v in iter { + actual.push(v); + } + + let expected = expect![[r#" + [] + "#]]; + expected.assert_debug_eq(&actual); + } +}