Skip to content

Commit

Permalink
add snapshot test for global state store + local state store (replica…
Browse files Browse the repository at this point in the history
…ted) iteration
  • Loading branch information
kwannoel committed Jun 12, 2023
1 parent 921a76c commit 95f9337
Show file tree
Hide file tree
Showing 4 changed files with 201 additions and 2 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/storage/hummock_test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,13 @@ workspace-hack = { path = "../../workspace-hack" }

[dev-dependencies]
criterion = { version = "0.4", features = ["async_futures"] }
expect-test = "1"
futures = { version = "0.3", default-features = false, features = [
"alloc",
"executor",
] }

futures-async-stream = "0.2"
risingwave_test_runner = { path = "../../test_runner" }
serial_test = "0.9"
sync-point = { path = "../../utils/sync-point" }
Expand Down
2 changes: 1 addition & 1 deletion src/storage/hummock_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(proc_macro_hygiene, stmt_expr_attributes)]
#![feature(custom_test_frameworks)]
#![test_runner(risingwave_test_runner::test_runner::run_failpont_tests)]
#![feature(bound_map)]
Expand Down
196 changes: 195 additions & 1 deletion src/storage/hummock_test/src/state_store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ use std::ops::Bound::Unbounded;
use std::sync::Arc;

use bytes::Bytes;
use expect_test::expect;
use futures::{pin_mut, TryStreamExt};
use futures_async_stream::for_await;
use risingwave_common::cache::CachePriority;
use risingwave_common::catalog::TableId;
use risingwave_common::catalog::{TableId, TableOption};
use risingwave_common::hash::VirtualNode;
use risingwave_hummock_sdk::key::FullKey;
use risingwave_hummock_sdk::{
Expand Down Expand Up @@ -1482,3 +1484,195 @@ async fn test_gc_watermark_and_clear_shared_buffer() {
HummockSstableObjectId::MAX
);
}

/// Test the following behaviours:
/// 1. LocalStateStore can read replicated ReadVersion.
/// 2. GlobalStateStore cannot read replicated ReadVersion.
#[tokio::test]
async fn test_replicated_local_hummock_storage() {
const TEST_TABLE_ID: TableId = TableId { table_id: 233 };

let (hummock_storage, _meta_client) = with_hummock_storage_v2(Default::default()).await;

let read_options = ReadOptions {
prefix_hint: None,
ignore_range_tombstone: false,
retention_seconds: None,
table_id: TableId {
table_id: TEST_TABLE_ID.table_id,
},
read_version_from_backup: false,
prefetch_options: Default::default(),
cache_policy: CachePolicy::Fill(CachePriority::High),
};

let mut local_hummock_storage = hummock_storage
.new_local(NewLocalOptions::new_replicated(
TEST_TABLE_ID,
false,
TableOption {
retention_seconds: None,
},
))
.await;

let epoch0 = local_hummock_storage
.read_version()
.read()
.committed()
.max_committed_epoch();

let epoch1 = epoch0 + 1;

local_hummock_storage.init(epoch1);
// ingest 16B batch
let mut batch1 = vec![
(
Bytes::from([VirtualNode::ZERO.to_be_bytes().as_slice(), b"aaaa"].concat()),
StorageValue::new_put("1111"),
),
(
Bytes::from([VirtualNode::ZERO.to_be_bytes().as_slice(), b"bbbb"].concat()),
StorageValue::new_put("2222"),
),
];

batch1.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
local_hummock_storage
.ingest_batch(
batch1,
vec![],
WriteOptions {
epoch: epoch1,
table_id: TEST_TABLE_ID,
},
)
.await
.unwrap();

// Test local state store read for replicated data.
{
assert!(local_hummock_storage.read_version().read().is_replicated());

let s = risingwave_storage::store::LocalStateStore::iter(
&local_hummock_storage,
(Unbounded, Unbounded),
read_options.clone(),
)
.await
.unwrap();

let mut actual = vec![];

#[for_await]
for v in s {
actual.push(v)
}

let expected = expect![[r#"
[
Ok(
(
FullKey { UserKey { 233, TableKey { 000061616161 } }, 1 },
b"1111",
),
),
Ok(
(
FullKey { UserKey { 233, TableKey { 000062626262 } }, 1 },
b"2222",
),
),
]
"#]];
expected.assert_debug_eq(&actual);
}

let epoch2 = epoch1 + 1;

let mut local_hummock_storage_2 = hummock_storage
.new_local(NewLocalOptions::for_test(TEST_TABLE_ID))
.await;

local_hummock_storage_2.init(epoch2);

// ingest 16B batch
let mut batch2 = vec![
(
Bytes::from([VirtualNode::ZERO.to_be_bytes().as_slice(), b"cccc"].concat()),
StorageValue::new_put("3333"),
),
(
Bytes::from([VirtualNode::ZERO.to_be_bytes().as_slice(), b"dddd"].concat()),
StorageValue::new_put("4444"),
),
];
batch2.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));

local_hummock_storage_2
.ingest_batch(
batch2,
vec![],
WriteOptions {
epoch: epoch2,
table_id: TEST_TABLE_ID,
},
)
.await
.unwrap();

// Test Global State Store iter, epoch2
{
let iter = hummock_storage
.iter((Unbounded, Unbounded), epoch2, read_options.clone())
.await
.unwrap();
pin_mut!(iter);

let mut actual = vec![];

#[for_await]
for v in iter {
actual.push(v);
}

let expected = expect![[r#"
[
Ok(
(
FullKey { UserKey { 233, TableKey { 000063636363 } }, 2 },
b"3333",
),
),
Ok(
(
FullKey { UserKey { 233, TableKey { 000064646464 } }, 2 },
b"4444",
),
),
]
"#]];
expected.assert_debug_eq(&actual);
}

// Test Global State Store iter, epoch1
{
let iter = hummock_storage
.iter((Unbounded, Unbounded), epoch1, read_options)
.await
.unwrap();
pin_mut!(iter);

let mut actual = vec![];

#[for_await]
for v in iter {
actual.push(v);
}

let expected = expect![[r#"
[]
"#]];
expected.assert_debug_eq(&actual);
}
}

0 comments on commit 95f9337

Please sign in to comment.