Skip to content

Commit

Permalink
feat(storage): pass epoch and table id before barrier (#17635)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Jul 13, 2024
1 parent 5c52c7d commit 4c4ada1
Show file tree
Hide file tree
Showing 16 changed files with 545 additions and 69 deletions.
8 changes: 4 additions & 4 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -767,10 +767,10 @@ impl GlobalBarrierManager {

send_latency_timer.observe_duration();

let node_to_collect = match self
.control_stream_manager
.inject_barrier(command_ctx.clone())
{
let node_to_collect = match self.control_stream_manager.inject_barrier(
command_ctx.clone(),
self.state.inflight_actor_infos.existing_table_ids(),
) {
Ok(node_to_collect) => node_to_collect,
Err(err) => {
for notifier in notifiers {
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,8 +477,8 @@ impl GlobalBarrierManager {
tracing::Span::current(), // recovery span
));

let mut node_to_collect =
control_stream_manager.inject_barrier(command_ctx.clone())?;
let mut node_to_collect = control_stream_manager
.inject_barrier(command_ctx.clone(), info.existing_table_ids())?;
while !node_to_collect.is_empty() {
let (worker_id, prev_epoch, _) = control_stream_manager
.next_complete_barrier_response()
Expand Down
14 changes: 9 additions & 5 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use futures::future::try_join_all;
use futures::stream::{BoxStream, FuturesUnordered};
use futures::{pin_mut, FutureExt, StreamExt};
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::ActorId;
use risingwave_common::util::tracing::TracingContext;
use risingwave_pb::common::{ActorInfo, WorkerNode};
Expand Down Expand Up @@ -247,6 +248,7 @@ impl ControlStreamManager {
pub(super) fn inject_barrier(
&mut self,
command_context: Arc<CommandContext>,
table_ids_to_sync: HashSet<TableId>,
) -> MetaResult<HashSet<WorkerId>> {
fail_point!("inject_barrier_err", |_| risingwave_common::bail!(
"inject_barrier_err"
Expand All @@ -263,9 +265,13 @@ impl ControlStreamManager {
if actor_ids_to_collect.is_empty() {
// No need to send or collect barrier for this node.
assert!(actor_ids_to_send.is_empty());
Ok(())
} else {
}
{
let Some(node) = self.nodes.get_mut(node_id) else {
if actor_ids_to_collect.is_empty() {
// Worker node get disconnected but has no actor to collect. Simply skip it.
return Ok(());
}
return Err(
anyhow!("unconnected worker node: {:?}", worker_node.host).into()
);
Expand Down Expand Up @@ -294,9 +300,7 @@ impl ControlStreamManager {
barrier: Some(barrier),
actor_ids_to_send,
actor_ids_to_collect,
table_ids_to_sync: command_context
.info
.existing_table_ids()
table_ids_to_sync: table_ids_to_sync
.iter()
.map(|table_id| table_id.table_id)
.collect(),
Expand Down
40 changes: 28 additions & 12 deletions src/storage/hummock_test/src/compactor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#[cfg(test)]
pub(crate) mod tests {

use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::collections::{BTreeMap, BTreeSet, HashSet, VecDeque};
use std::ops::Bound;
use std::sync::Arc;

Expand Down Expand Up @@ -156,6 +156,9 @@ pub(crate) mod tests {
value_size: usize,
epochs: Vec<u64>,
) {
for epoch in &epochs {
storage.start_epoch(*epoch, HashSet::from_iter([Default::default()]));
}
let mut local = storage
.new_local(NewLocalOptions::for_test(TableId::default()))
.await;
Expand Down Expand Up @@ -534,17 +537,16 @@ pub(crate) mod tests {
existing_table_id: u32,
keys_per_epoch: usize,
) {
let table_id = existing_table_id.into();
let kv_count: u16 = 128;
let mut epoch = test_epoch(1);
let mut local = storage
.new_local(NewLocalOptions::for_test(existing_table_id.into()))
.await;
let mut local = storage.new_local(NewLocalOptions::for_test(table_id)).await;

storage.start_epoch(epoch, HashSet::from_iter([table_id]));

// 1. add sstables
let val = Bytes::from(b"0"[..].repeat(1 << 10)); // 1024 Byte value
for idx in 0..kv_count {
epoch.inc_epoch();

if idx == 0 {
local.init_for_test(epoch).await.unwrap();
}
Expand All @@ -559,9 +561,11 @@ pub(crate) mod tests {
}
local.flush().await.unwrap();
let next_epoch = epoch.next_epoch();
storage.start_epoch(next_epoch, HashSet::from_iter([table_id]));
local.seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test());

flush_and_commit(&hummock_meta_client, storage, epoch).await;
epoch.inc_epoch();
}
}

Expand Down Expand Up @@ -727,9 +731,10 @@ pub(crate) mod tests {
.await;

let vnode = VirtualNode::from_index(1);
global_storage.start_epoch(epoch, HashSet::from_iter([1.into(), 2.into()]));
for index in 0..kv_count {
epoch.inc_epoch();
let next_epoch = epoch.next_epoch();
global_storage.start_epoch(next_epoch, HashSet::from_iter([1.into(), 2.into()]));
if index == 0 {
storage_1.init_for_test(epoch).await.unwrap();
storage_2.init_for_test(epoch).await.unwrap();
Expand All @@ -755,6 +760,7 @@ pub(crate) mod tests {

let res = global_storage.seal_and_sync_epoch(epoch).await.unwrap();
hummock_meta_client.commit_epoch(epoch, res).await.unwrap();
epoch.inc_epoch();
}

// Mimic dropping table
Expand Down Expand Up @@ -838,7 +844,6 @@ pub(crate) mod tests {
.unwrap();
assert!(compact_task.is_none());

epoch.inc_epoch();
// to update version for hummock_storage
global_storage.wait_version(version).await;

Expand Down Expand Up @@ -921,12 +926,14 @@ pub(crate) mod tests {
let vnode = VirtualNode::from_index(1);
let mut epoch_set = BTreeSet::new();

storage.start_epoch(epoch, HashSet::from_iter([existing_table_id.into()]));

let mut local = storage
.new_local(NewLocalOptions::for_test(existing_table_id.into()))
.await;
for i in 0..kv_count {
epoch += millisec_interval_epoch;
let next_epoch = epoch + millisec_interval_epoch;
storage.start_epoch(next_epoch, HashSet::from_iter([existing_table_id.into()]));
if i == 0 {
local.init_for_test(epoch).await.unwrap();
}
Expand All @@ -944,6 +951,7 @@ pub(crate) mod tests {

let res = storage.seal_and_sync_epoch(epoch).await.unwrap();
hummock_meta_client.commit_epoch(epoch, res).await.unwrap();
epoch += millisec_interval_epoch;
}

let manual_compcation_option = ManualCompactionOption {
Expand All @@ -969,7 +977,10 @@ pub(crate) mod tests {
retention_seconds: Some(retention_seconds_expire_second),
},
)]);
compact_task.current_epoch_time = epoch;
compact_task.current_epoch_time = hummock_manager_ref
.get_current_version()
.await
.max_committed_epoch;

// assert compact_task
assert_eq!(
Expand Down Expand Up @@ -1123,12 +1134,13 @@ pub(crate) mod tests {
let mut local = storage
.new_local(NewLocalOptions::for_test(existing_table_id.into()))
.await;
storage.start_epoch(epoch, HashSet::from_iter([existing_table_id.into()]));
for i in 0..kv_count {
epoch += millisec_interval_epoch;
if i == 0 {
local.init_for_test(epoch).await.unwrap();
}
let next_epoch = epoch + millisec_interval_epoch;
storage.start_epoch(next_epoch, HashSet::from_iter([existing_table_id.into()]));
epoch_set.insert(epoch);

let ramdom_key = [key_prefix.as_ref(), &rand::thread_rng().gen::<[u8; 32]>()].concat();
Expand All @@ -1139,6 +1151,7 @@ pub(crate) mod tests {
local.seal_current_epoch(next_epoch, SealCurrentEpochOptions::for_test());
let res = storage.seal_and_sync_epoch(epoch).await.unwrap();
hummock_meta_client.commit_epoch(epoch, res).await.unwrap();
epoch += millisec_interval_epoch;
}

let manual_compcation_option = ManualCompactionOption {
Expand Down Expand Up @@ -1166,7 +1179,10 @@ pub(crate) mod tests {

let compaction_filter_flag = CompactionFilterFlag::STATE_CLEAN | CompactionFilterFlag::TTL;
compact_task.compaction_filter_mask = compaction_filter_flag.bits();
compact_task.current_epoch_time = epoch;
compact_task.current_epoch_time = hummock_manager_ref
.get_current_version()
.await
.max_committed_epoch;

// 3. compact
let (_tx, rx) = tokio::sync::oneshot::channel();
Expand Down
Loading

0 comments on commit 4c4ada1

Please sign in to comment.