Skip to content

Commit

Permalink
feat(storage): support sync on multiple partial graph (#19431)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Nov 19, 2024
1 parent 7ba6650 commit f12f4e5
Show file tree
Hide file tree
Showing 18 changed files with 237 additions and 255 deletions.
11 changes: 9 additions & 2 deletions src/storage/hummock_test/src/bin/replay/replay_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,17 @@ impl ReplayRead for GlobalReplayImpl {

#[async_trait::async_trait]
impl ReplayStateStore for GlobalReplayImpl {
async fn sync(&self, id: u64, table_ids: Vec<u32>) -> Result<usize> {
async fn sync(&self, sync_table_epochs: Vec<(u64, Vec<u32>)>) -> Result<usize> {
let result: SyncResult = self
.store
.sync(id, table_ids.into_iter().map(TableId::new).collect())
.sync(
sync_table_epochs
.into_iter()
.map(|(epoch, table_ids)| {
(epoch, table_ids.into_iter().map(TableId::new).collect())
})
.collect(),
)
.await
.map_err(|e| TraceError::SyncFailed(format!("{e}")))?;
Ok(result.sync_size)
Expand Down
16 changes: 11 additions & 5 deletions src/storage/hummock_trace/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use bincode::{Decode, Encode};
use bytes::Bytes;
use parking_lot::Mutex;
use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::HummockReadEpoch;
use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch};
use risingwave_pb::meta::SubscribeResponse;
use tokio::runtime::Runtime;
use tokio::sync::mpsc::{
Expand Down Expand Up @@ -281,14 +281,20 @@ impl TraceSpan {
}

pub fn new_sync_span(
epoch: u64,
table_ids: &HashSet<TableId>,
sync_table_epochs: &Vec<(HummockEpoch, HashSet<TableId>)>,
storage_type: StorageType,
) -> MayTraceSpan {
Self::new_global_op(
Operation::Sync(
epoch,
table_ids.iter().map(|table_id| table_id.table_id).collect(),
sync_table_epochs
.iter()
.map(|(epoch, table_ids)| {
(
*epoch,
table_ids.iter().map(|table_id| table_id.table_id).collect(),
)
})
.collect(),
),
storage_type,
)
Expand Down
2 changes: 1 addition & 1 deletion src/storage/hummock_trace/src/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ pub enum Operation {
IterNext(RecordId),

/// Sync operation of Hummock.
Sync(u64, Vec<u32>),
Sync(Vec<(u64, Vec<u32>)>),

/// `MetaMessage` operation of Hummock.
MetaMessage(Box<TracedSubResp>),
Expand Down
4 changes: 2 additions & 2 deletions src/storage/hummock_trace/src/replay/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ pub trait ReplayWrite {
#[cfg_attr(test, automock)]
#[async_trait::async_trait]
pub trait ReplayStateStore {
async fn sync(&self, id: u64, table_ids: Vec<u32>) -> Result<usize>;
async fn sync(&self, sync_table_epochs: Vec<(u64, Vec<u32>)>) -> Result<usize>;
async fn notify_hummock(&self, info: Info, op: RespOperation, version: u64) -> Result<u64>;
async fn new_local(&self, opts: TracedNewLocalOptions) -> Box<dyn LocalReplay>;
async fn try_wait_epoch(
Expand Down Expand Up @@ -147,7 +147,7 @@ mock! {
}
#[async_trait::async_trait]
impl ReplayStateStore for GlobalReplayInterface{
async fn sync(&self, id: u64, table_ids: Vec<u32>) -> Result<usize>;
async fn sync(&self, sync_table_epochs: Vec<(u64, Vec<u32>)>) -> Result<usize>;
async fn notify_hummock(&self, info: Info, op: RespOperation, version: u64,
) -> Result<u64>;
async fn new_local(&self, opts: TracedNewLocalOptions) -> Box<dyn LocalReplay>;
Expand Down
6 changes: 3 additions & 3 deletions src/storage/hummock_trace/src/replay/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ mod tests {

let mut non_local: Vec<Result<Record>> = vec![
(12, Operation::Finish),
(13, Operation::Sync(sync_id, vec![1, 2, 3])),
(13, Operation::Sync(vec![(sync_id, vec![1, 2, 3])])),
(
13,
Operation::Result(OperationResult::Sync(TraceResult::Ok(0))),
Expand Down Expand Up @@ -244,9 +244,9 @@ mod tests {

mock_replay
.expect_sync()
.with(predicate::eq(sync_id), predicate::eq(vec![1, 2, 3]))
.with(predicate::eq(vec![(sync_id, vec![1, 2, 3])]))
.times(1)
.returning(|_, _| Ok(0));
.returning(|_| Ok(0));

let mut replay = HummockReplay::new(mock_reader, mock_replay);

Expand Down
4 changes: 2 additions & 2 deletions src/storage/hummock_trace/src/replay/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,9 @@ impl ReplayWorker {
panic!("expect iter result, but got {:?}", res);
}
}
Operation::Sync(epoch_id, table_ids) => {
Operation::Sync(sync_table_epochs) => {
assert_eq!(storage_type, StorageType::Global);
let sync_result = replay.sync(epoch_id, table_ids).await.unwrap();
let sync_result = replay.sync(sync_table_epochs).await.unwrap();
let res = res_rx.recv().await.expect("recv result failed");
if let OperationResult::Sync(expected) = res {
assert_eq!(TraceResult::Ok(sync_result), expected, "sync failed");
Expand Down
28 changes: 9 additions & 19 deletions src/storage/src/hummock/event_handler/hummock_event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,17 +463,12 @@ impl HummockEventHandler {

fn handle_sync_epoch(
&mut self,
new_sync_epoch: HummockEpoch,
sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
table_ids: HashSet<TableId>,
) {
debug!(
new_sync_epoch,
?table_ids,
"awaiting for epoch to be synced",
);
debug!(?sync_table_epochs, "awaiting for epoch to be synced",);
self.uploader
.start_sync_epoch(new_sync_epoch, sync_result_sender, table_ids);
.start_sync_epoch(sync_result_sender, sync_table_epochs);
}

fn handle_clear(&mut self, notifier: oneshot::Sender<()>, table_ids: Option<HashSet<TableId>>) {
Expand Down Expand Up @@ -641,11 +636,10 @@ impl HummockEventHandler {
self.uploader.may_flush();
}
HummockEvent::SyncEpoch {
new_sync_epoch,
sync_result_sender,
table_ids,
sync_table_epochs,
} => {
self.handle_sync_epoch(new_sync_epoch, sync_result_sender, table_ids);
self.handle_sync_epoch(sync_table_epochs, sync_result_sender);
}
HummockEvent::Clear(notifier, table_ids) => {
self.handle_clear(notifier, table_ids);
Expand Down Expand Up @@ -1013,16 +1007,14 @@ mod tests {

let (tx1, mut rx1) = oneshot::channel();
send_event(HummockEvent::SyncEpoch {
new_sync_epoch: epoch1,
sync_result_sender: tx1,
table_ids: HashSet::from_iter([TEST_TABLE_ID]),
sync_table_epochs: vec![(epoch1, HashSet::from_iter([TEST_TABLE_ID]))],
});
assert!(poll_fn(|cx| Poll::Ready(rx1.poll_unpin(cx).is_pending())).await);
let (tx2, mut rx2) = oneshot::channel();
send_event(HummockEvent::SyncEpoch {
new_sync_epoch: epoch2,
sync_result_sender: tx2,
table_ids: HashSet::from_iter([TEST_TABLE_ID]),
sync_table_epochs: vec![(epoch2, HashSet::from_iter([TEST_TABLE_ID]))],
});
assert!(poll_fn(|cx| Poll::Ready(rx2.poll_unpin(cx).is_pending())).await);

Expand Down Expand Up @@ -1144,9 +1136,8 @@ mod tests {
let sync_epoch = |table_id, new_sync_epoch| {
let (tx, rx) = oneshot::channel();
send_event(HummockEvent::SyncEpoch {
new_sync_epoch,
sync_result_sender: tx,
table_ids: HashSet::from_iter([table_id]),
sync_table_epochs: vec![(new_sync_epoch, HashSet::from_iter([table_id]))],
});
rx
};
Expand Down Expand Up @@ -1281,9 +1272,8 @@ mod tests {
vec![imm1_2_2.batch_id()],
)]));
send_event(HummockEvent::SyncEpoch {
new_sync_epoch: epoch2,
sync_result_sender: tx2,
table_ids: HashSet::from_iter([table_id1]),
sync_table_epochs: vec![(epoch2, HashSet::from_iter([table_id1]))],
});
wait_task_start.await;
assert!(poll_fn(|cx| Poll::Ready(sync_rx2.poll_unpin(cx).is_pending())).await);
Expand Down
8 changes: 3 additions & 5 deletions src/storage/src/hummock/event_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,8 @@ pub enum HummockEvent {
/// task on this epoch. Previous concurrent flush task join handle will be returned by the join
/// handle sender.
SyncEpoch {
new_sync_epoch: HummockEpoch,
sync_result_sender: oneshot::Sender<HummockResult<SyncedData>>,
table_ids: HashSet<TableId>,
sync_table_epochs: Vec<(HummockEpoch, HashSet<TableId>)>,
},

/// Clear shared buffer and reset all states
Expand Down Expand Up @@ -117,10 +116,9 @@ impl HummockEvent {
HummockEvent::BufferMayFlush => "BufferMayFlush".to_string(),

HummockEvent::SyncEpoch {
new_sync_epoch,
sync_result_sender: _,
table_ids,
} => format!("AwaitSyncEpoch epoch {} {:?}", new_sync_epoch, table_ids),
sync_table_epochs,
} => format!("AwaitSyncEpoch epoch {:?}", sync_table_epochs),

HummockEvent::Clear(_, table_ids) => {
format!("Clear {:?}", table_ids)
Expand Down
Loading

0 comments on commit f12f4e5

Please sign in to comment.