Skip to content

Commit

Permalink
feat(storage): support replicated LocalHummockStorage (#10226)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored Jun 14, 2023
1 parent ede3278 commit 02a110c
Show file tree
Hide file tree
Showing 14 changed files with 292 additions and 14 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/storage/hummock_test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,13 @@ workspace-hack = { path = "../../workspace-hack" }

[dev-dependencies]
criterion = { version = "0.4", features = ["async_futures"] }
expect-test = "1"
futures = { version = "0.3", default-features = false, features = [
"alloc",
"executor",
] }

futures-async-stream = "0.2"
risingwave_test_runner = { path = "../../test_runner" }
serial_test = "0.9"
sync-point = { path = "../../utils/sync-point" }
Expand Down
2 changes: 1 addition & 1 deletion src/storage/hummock_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(proc_macro_hygiene, stmt_expr_attributes)]
#![feature(custom_test_frameworks)]
#![test_runner(risingwave_test_runner::test_runner::run_failpont_tests)]
#![feature(bound_map)]
Expand Down
196 changes: 195 additions & 1 deletion src/storage/hummock_test/src/state_store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ use std::ops::Bound::Unbounded;
use std::sync::Arc;

use bytes::Bytes;
use expect_test::expect;
use futures::{pin_mut, TryStreamExt};
use futures_async_stream::for_await;
use risingwave_common::cache::CachePriority;
use risingwave_common::catalog::TableId;
use risingwave_common::catalog::{TableId, TableOption};
use risingwave_common::hash::VirtualNode;
use risingwave_hummock_sdk::key::FullKey;
use risingwave_hummock_sdk::{
Expand Down Expand Up @@ -1484,3 +1486,195 @@ async fn test_gc_watermark_and_clear_shared_buffer() {
HummockSstableObjectId::MAX
);
}

/// Test the following behaviours:
/// 1. LocalStateStore can read replicated ReadVersion.
/// 2. GlobalStateStore cannot read replicated ReadVersion.
#[tokio::test]
async fn test_replicated_local_hummock_storage() {
const TEST_TABLE_ID: TableId = TableId { table_id: 233 };

let (hummock_storage, _meta_client) = with_hummock_storage_v2(Default::default()).await;

let read_options = ReadOptions {
prefix_hint: None,
ignore_range_tombstone: false,
retention_seconds: None,
table_id: TableId {
table_id: TEST_TABLE_ID.table_id,
},
read_version_from_backup: false,
prefetch_options: Default::default(),
cache_policy: CachePolicy::Fill(CachePriority::High),
};

let mut local_hummock_storage = hummock_storage
.new_local(NewLocalOptions::new_replicated(
TEST_TABLE_ID,
false,
TableOption {
retention_seconds: None,
},
))
.await;

let epoch0 = local_hummock_storage
.read_version()
.read()
.committed()
.max_committed_epoch();

let epoch1 = epoch0 + 1;

local_hummock_storage.init(epoch1);
// ingest 16B batch
let mut batch1 = vec![
(
Bytes::from([VirtualNode::ZERO.to_be_bytes().as_slice(), b"aaaa"].concat()),
StorageValue::new_put("1111"),
),
(
Bytes::from([VirtualNode::ZERO.to_be_bytes().as_slice(), b"bbbb"].concat()),
StorageValue::new_put("2222"),
),
];

batch1.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
local_hummock_storage
.ingest_batch(
batch1,
vec![],
WriteOptions {
epoch: epoch1,
table_id: TEST_TABLE_ID,
},
)
.await
.unwrap();

// Test local state store read for replicated data.
{
assert!(local_hummock_storage.read_version().read().is_replicated());

let s = risingwave_storage::store::LocalStateStore::iter(
&local_hummock_storage,
(Unbounded, Unbounded),
read_options.clone(),
)
.await
.unwrap();

let mut actual = vec![];

#[for_await]
for v in s {
actual.push(v)
}

let expected = expect![[r#"
[
Ok(
(
FullKey { UserKey { 233, TableKey { 000061616161 } }, 1 },
b"1111",
),
),
Ok(
(
FullKey { UserKey { 233, TableKey { 000062626262 } }, 1 },
b"2222",
),
),
]
"#]];
expected.assert_debug_eq(&actual);
}

let epoch2 = epoch1 + 1;

let mut local_hummock_storage_2 = hummock_storage
.new_local(NewLocalOptions::for_test(TEST_TABLE_ID))
.await;

local_hummock_storage_2.init(epoch2);

// ingest 16B batch
let mut batch2 = vec![
(
Bytes::from([VirtualNode::ZERO.to_be_bytes().as_slice(), b"cccc"].concat()),
StorageValue::new_put("3333"),
),
(
Bytes::from([VirtualNode::ZERO.to_be_bytes().as_slice(), b"dddd"].concat()),
StorageValue::new_put("4444"),
),
];
batch2.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));

local_hummock_storage_2
.ingest_batch(
batch2,
vec![],
WriteOptions {
epoch: epoch2,
table_id: TEST_TABLE_ID,
},
)
.await
.unwrap();

// Test Global State Store iter, epoch2
{
let iter = hummock_storage
.iter((Unbounded, Unbounded), epoch2, read_options.clone())
.await
.unwrap();
pin_mut!(iter);

let mut actual = vec![];

#[for_await]
for v in iter {
actual.push(v);
}

let expected = expect![[r#"
[
Ok(
(
FullKey { UserKey { 233, TableKey { 000063636363 } }, 2 },
b"3333",
),
),
Ok(
(
FullKey { UserKey { 233, TableKey { 000064646464 } }, 2 },
b"4444",
),
),
]
"#]];
expected.assert_debug_eq(&actual);
}

// Test Global State Store iter, epoch1
{
let iter = hummock_storage
.iter((Unbounded, Unbounded), epoch1, read_options)
.await
.unwrap();
pin_mut!(iter);

let mut actual = vec![];

#[for_await]
for v in iter {
actual.push(v);
}

let expected = expect![[r#"
[]
"#]];
expected.assert_debug_eq(&actual);
}
}
1 change: 1 addition & 0 deletions src/storage/hummock_trace/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ mod tests {
table_option: TracedTableOption {
retention_seconds: None,
},
is_replicated: false,
},
),
);
Expand Down
1 change: 1 addition & 0 deletions src/storage/hummock_trace/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ pub struct TracedNewLocalOptions {
pub table_id: TracedTableId,
pub is_consistent_op: bool,
pub table_option: TracedTableOption,
pub is_replicated: bool,
}

pub type TracedHummockEpoch = u64;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -564,10 +564,14 @@ impl HummockEventHandler {
HummockEvent::RegisterReadVersion {
table_id,
new_read_version_sender,
is_replicated,
} => {
let pinned_version = self.pinned_version.load();
let basic_read_version = Arc::new(RwLock::new(
HummockReadVersion::new((**pinned_version).clone()),
HummockReadVersion::new_with_replication_option(
(**pinned_version).clone(),
is_replicated,
),
));

let instance_id = self.generate_instance_id();
Expand Down
7 changes: 6 additions & 1 deletion src/storage/src/hummock/event_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ pub enum HummockEvent {
table_id: TableId,
new_read_version_sender:
oneshot::Sender<(Arc<RwLock<HummockReadVersion>>, LocalInstanceGuard)>,
is_replicated: bool,
},

DestroyReadVersion {
Expand Down Expand Up @@ -116,7 +117,11 @@ impl HummockEvent {
HummockEvent::RegisterReadVersion {
table_id,
new_read_version_sender: _,
} => format!("RegisterReadVersion table_id {:?}", table_id,),
is_replicated,
} => format!(
"RegisterReadVersion table_id {:?}, is_replicated: {:?}",
table_id, is_replicated
),

HummockEvent::DestroyReadVersion {
table_id,
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/hummock/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ impl HummockStorage {
.send(HummockEvent::RegisterReadVersion {
table_id: option.table_id,
new_read_version_sender: tx,
is_replicated: option.is_replicated,
})
.unwrap();

Expand Down
7 changes: 6 additions & 1 deletion src/storage/src/hummock/state_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,12 @@ impl HummockStorage {
let read_guard = self.read_version_mapping.read();
read_guard
.get(&table_id)
.map(|v| v.values().cloned().collect_vec())
.map(|v| {
v.values()
.filter(|v| !v.read_arc().is_replicated())
.cloned()
.collect_vec()
})
.unwrap_or(Vec::new())
};

Expand Down
21 changes: 18 additions & 3 deletions src/storage/src/hummock/store/state_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,18 @@ pub struct LocalHummockStorage {
/// Read handle.
read_version: Arc<RwLock<HummockReadVersion>>,

/// This indicates that this `LocalHummockStorage` replicates another `LocalHummockStorage`.
/// It's used by executors in different CNs to synchronize states.
///
/// Within `LocalHummockStorage` we use this flag to avoid uploading local state to be
/// persisted, so we won't have duplicate data.
///
/// This also handles a corner case where an executor doing replication
/// is scheduled to the same CN as its Upstream executor.
/// In that case, we use this flag to avoid reading the same data twice,
/// by ignoring the replicated ReadVersion.
is_replicated: bool,

/// Event sender.
event_sender: mpsc::UnboundedSender<HummockEvent>,

Expand Down Expand Up @@ -401,9 +413,11 @@ impl LocalHummockStorage {
self.update(VersionUpdate::Staging(StagingData::ImmMem(imm.clone())));

// insert imm to uploader
self.event_sender
.send(HummockEvent::ImmToUploader(imm))
.unwrap();
if !self.is_replicated {
self.event_sender
.send(HummockEvent::ImmToUploader(imm))
.unwrap();
}

timer.observe_duration();

Expand Down Expand Up @@ -433,6 +447,7 @@ impl LocalHummockStorage {
table_id: option.table_id,
is_consistent_op: option.is_consistent_op,
table_option: option.table_option,
is_replicated: option.is_replicated,
instance_guard,
read_version,
event_sender,
Expand Down
Loading

0 comments on commit 02a110c

Please sign in to comment.