Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): support replicated LocalHummockStorage #10226

Merged
merged 15 commits into from
Jun 14, 2023
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/storage/hummock_test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,13 @@ workspace-hack = { path = "../../workspace-hack" }

[dev-dependencies]
criterion = { version = "0.4", features = ["async_futures"] }
expect-test = "1"
futures = { version = "0.3", default-features = false, features = [
"alloc",
"executor",
] }

futures-async-stream = "0.2"
risingwave_test_runner = { path = "../../test_runner" }
serial_test = "0.9"
sync-point = { path = "../../utils/sync-point" }
Expand Down
2 changes: 1 addition & 1 deletion src/storage/hummock_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(proc_macro_hygiene, stmt_expr_attributes)]
#![feature(custom_test_frameworks)]
#![test_runner(risingwave_test_runner::test_runner::run_failpont_tests)]
#![feature(bound_map)]
Expand Down
196 changes: 195 additions & 1 deletion src/storage/hummock_test/src/state_store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ use std::ops::Bound::Unbounded;
use std::sync::Arc;

use bytes::Bytes;
use expect_test::expect;
use futures::{pin_mut, TryStreamExt};
use futures_async_stream::for_await;
use risingwave_common::cache::CachePriority;
use risingwave_common::catalog::TableId;
use risingwave_common::catalog::{TableId, TableOption};
use risingwave_common::hash::VirtualNode;
use risingwave_hummock_sdk::key::FullKey;
use risingwave_hummock_sdk::{
Expand Down Expand Up @@ -1484,3 +1486,195 @@ async fn test_gc_watermark_and_clear_shared_buffer() {
HummockSstableObjectId::MAX
);
}

/// Test the following behaviours:
/// 1. LocalStateStore can read replicated ReadVersion.
/// 2. GlobalStateStore cannot read replicated ReadVersion.
#[tokio::test]
async fn test_replicated_local_hummock_storage() {
const TEST_TABLE_ID: TableId = TableId { table_id: 233 };

let (hummock_storage, _meta_client) = with_hummock_storage_v2(Default::default()).await;

let read_options = ReadOptions {
prefix_hint: None,
ignore_range_tombstone: false,
retention_seconds: None,
table_id: TableId {
table_id: TEST_TABLE_ID.table_id,
},
read_version_from_backup: false,
prefetch_options: Default::default(),
cache_policy: CachePolicy::Fill(CachePriority::High),
};

let mut local_hummock_storage = hummock_storage
.new_local(NewLocalOptions::new_replicated(
TEST_TABLE_ID,
false,
TableOption {
retention_seconds: None,
},
))
.await;

let epoch0 = local_hummock_storage
.read_version()
.read()
.committed()
.max_committed_epoch();

let epoch1 = epoch0 + 1;

local_hummock_storage.init(epoch1);
// ingest 16B batch
let mut batch1 = vec![
(
Bytes::from([VirtualNode::ZERO.to_be_bytes().as_slice(), b"aaaa"].concat()),
StorageValue::new_put("1111"),
),
(
Bytes::from([VirtualNode::ZERO.to_be_bytes().as_slice(), b"bbbb"].concat()),
StorageValue::new_put("2222"),
),
];

batch1.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));
local_hummock_storage
.ingest_batch(
batch1,
vec![],
WriteOptions {
epoch: epoch1,
table_id: TEST_TABLE_ID,
},
)
.await
.unwrap();

// Test local state store read for replicated data.
{
assert!(local_hummock_storage.read_version().read().is_replicated());

let s = risingwave_storage::store::LocalStateStore::iter(
&local_hummock_storage,
(Unbounded, Unbounded),
read_options.clone(),
)
.await
.unwrap();

let mut actual = vec![];

#[for_await]
for v in s {
actual.push(v)
}

let expected = expect![[r#"
[
Ok(
(
FullKey { UserKey { 233, TableKey { 000061616161 } }, 1 },
b"1111",
),
),
Ok(
(
FullKey { UserKey { 233, TableKey { 000062626262 } }, 1 },
b"2222",
),
),
]
"#]];
expected.assert_debug_eq(&actual);
}

let epoch2 = epoch1 + 1;

let mut local_hummock_storage_2 = hummock_storage
.new_local(NewLocalOptions::for_test(TEST_TABLE_ID))
.await;

local_hummock_storage_2.init(epoch2);

// ingest 16B batch
let mut batch2 = vec![
(
Bytes::from([VirtualNode::ZERO.to_be_bytes().as_slice(), b"cccc"].concat()),
StorageValue::new_put("3333"),
),
(
Bytes::from([VirtualNode::ZERO.to_be_bytes().as_slice(), b"dddd"].concat()),
StorageValue::new_put("4444"),
),
];
batch2.sort_by(|(k1, _), (k2, _)| k1.cmp(k2));

local_hummock_storage_2
.ingest_batch(
batch2,
vec![],
WriteOptions {
epoch: epoch2,
table_id: TEST_TABLE_ID,
},
)
.await
.unwrap();

// Test Global State Store iter, epoch2
{
let iter = hummock_storage
.iter((Unbounded, Unbounded), epoch2, read_options.clone())
.await
.unwrap();
pin_mut!(iter);

let mut actual = vec![];

#[for_await]
for v in iter {
actual.push(v);
}

let expected = expect![[r#"
[
Ok(
(
FullKey { UserKey { 233, TableKey { 000063636363 } }, 2 },
b"3333",
),
),
Ok(
(
FullKey { UserKey { 233, TableKey { 000064646464 } }, 2 },
b"4444",
),
),
]
"#]];
expected.assert_debug_eq(&actual);
}

// Test Global State Store iter, epoch1
{
let iter = hummock_storage
.iter((Unbounded, Unbounded), epoch1, read_options)
.await
.unwrap();
pin_mut!(iter);

let mut actual = vec![];

#[for_await]
for v in iter {
actual.push(v);
}

let expected = expect![[r#"
[]
"#]];
expected.assert_debug_eq(&actual);
}
}
1 change: 1 addition & 0 deletions src/storage/hummock_trace/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ mod tests {
table_option: TracedTableOption {
retention_seconds: None,
},
is_replicated: false,
},
),
);
Expand Down
1 change: 1 addition & 0 deletions src/storage/hummock_trace/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ pub struct TracedNewLocalOptions {
pub table_id: TracedTableId,
pub is_consistent_op: bool,
pub table_option: TracedTableOption,
pub is_replicated: bool,
}

pub type TracedHummockEpoch = u64;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -564,10 +564,14 @@ impl HummockEventHandler {
HummockEvent::RegisterReadVersion {
table_id,
new_read_version_sender,
is_replicated,
} => {
let pinned_version = self.pinned_version.load();
let basic_read_version = Arc::new(RwLock::new(
HummockReadVersion::new((**pinned_version).clone()),
HummockReadVersion::new_with_replication_option(
(**pinned_version).clone(),
is_replicated,
),
));

let instance_id = self.generate_instance_id();
Expand Down
7 changes: 6 additions & 1 deletion src/storage/src/hummock/event_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ pub enum HummockEvent {
table_id: TableId,
new_read_version_sender:
oneshot::Sender<(Arc<RwLock<HummockReadVersion>>, LocalInstanceGuard)>,
is_replicated: bool,
},

DestroyReadVersion {
Expand Down Expand Up @@ -116,7 +117,11 @@ impl HummockEvent {
HummockEvent::RegisterReadVersion {
table_id,
new_read_version_sender: _,
} => format!("RegisterReadVersion table_id {:?}", table_id,),
is_replicated,
} => format!(
"RegisterReadVersion table_id {:?}, is_replicated: {:?}",
table_id, is_replicated
),

HummockEvent::DestroyReadVersion {
table_id,
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/hummock/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ impl HummockStorage {
.send(HummockEvent::RegisterReadVersion {
table_id: option.table_id,
new_read_version_sender: tx,
is_replicated: option.is_replicated,
})
.unwrap();

Expand Down
7 changes: 6 additions & 1 deletion src/storage/src/hummock/state_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,12 @@ impl HummockStorage {
let read_guard = self.read_version_mapping.read();
read_guard
.get(&table_id)
.map(|v| v.values().cloned().collect_vec())
.map(|v| {
v.values()
.filter(|v| !v.read_arc().is_replicated())
.cloned()
.collect_vec()
})
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This filters out replicated read versions, when reading from global state store.

.unwrap_or(Vec::new())
};

Expand Down
21 changes: 18 additions & 3 deletions src/storage/src/hummock/store/state_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,18 @@ pub struct LocalHummockStorage {
/// Read handle.
read_version: Arc<RwLock<HummockReadVersion>>,

/// This indicates that this `LocalHummockStorage` replicates another `LocalHummockStorage`.
/// It's used by executors in different CNs to synchronize states.
///
/// Within `LocalHummockStorage` we use this flag to avoid uploading local state to be
/// persisted, so we won't have duplicate data.
///
/// This also handles a corner case where an executor doing replication
/// is scheduled to the same CN as its Upstream executor.
/// In that case, we use this flag to avoid reading the same data twice,
/// by ignoring the replicated ReadVersion.
is_replicated: bool,

/// Event sender.
event_sender: mpsc::UnboundedSender<HummockEvent>,

Expand Down Expand Up @@ -401,9 +413,11 @@ impl LocalHummockStorage {
self.update(VersionUpdate::Staging(StagingData::ImmMem(imm.clone())));

// insert imm to uploader
self.event_sender
.send(HummockEvent::ImmToUploader(imm))
.unwrap();
if !self.is_replicated {
self.event_sender
.send(HummockEvent::ImmToUploader(imm))
.unwrap();
}

timer.observe_duration();

Expand Down Expand Up @@ -433,6 +447,7 @@ impl LocalHummockStorage {
table_id: option.table_id,
is_consistent_op: option.is_consistent_op,
table_option: option.table_option,
is_replicated: option.is_replicated,
instance_guard,
read_version,
event_sender,
Expand Down
Loading