Skip to content

Commit

Permalink
feat(storage): avoid uploader depending on seal_epoch (#16985)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Jun 12, 2024
1 parent c9c7064 commit d488f65
Show file tree
Hide file tree
Showing 10 changed files with 714 additions and 414 deletions.
19 changes: 17 additions & 2 deletions src/storage/hummock_test/src/hummock_storage_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,9 @@ async fn test_state_store_sync() {
.await
.unwrap();

let epoch3 = epoch2.next_epoch();
hummock_storage.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test());

let res = test_env.storage.seal_and_sync_epoch(epoch1).await.unwrap();
test_env
.meta_client
Expand Down Expand Up @@ -829,14 +832,15 @@ async fn test_delete_get() {
.await
.unwrap();

let epoch2 = epoch1.next_epoch();
hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test());
let res = test_env.storage.seal_and_sync_epoch(epoch1).await.unwrap();
test_env
.meta_client
.commit_epoch(epoch1, res)
.await
.unwrap();
let epoch2 = epoch1.next_epoch();
hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test());

let batch2 = vec![(
gen_key_from_str(VirtualNode::ZERO, "bb"),
StorageValue::new_delete(),
Expand All @@ -851,6 +855,7 @@ async fn test_delete_get() {
)
.await
.unwrap();
hummock_storage.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test());
let res = test_env.storage.seal_and_sync_epoch(epoch2).await.unwrap();
test_env
.meta_client
Expand Down Expand Up @@ -1005,6 +1010,8 @@ async fn test_multiple_epoch_sync() {
};
test_get().await;

let epoch4 = epoch3.next_epoch();
hummock_storage.seal_current_epoch(epoch4, SealCurrentEpochOptions::for_test());
test_env.storage.seal_epoch(epoch1, false);
let sync_result2 = test_env.storage.seal_and_sync_epoch(epoch2).await.unwrap();
let sync_result3 = test_env.storage.seal_and_sync_epoch(epoch3).await.unwrap();
Expand Down Expand Up @@ -1079,6 +1086,9 @@ async fn test_iter_with_min_epoch() {
.await
.unwrap();

let epoch3 = (33 * 1000) << 16;
hummock_storage.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test());

{
// test before sync
{
Expand Down Expand Up @@ -1329,6 +1339,9 @@ async fn test_hummock_version_reader() {
.await
.unwrap();

let epoch4 = (34 * 1000) << 16;
hummock_storage.seal_current_epoch(epoch4, SealCurrentEpochOptions::for_test());

{
// test before sync
{
Expand Down Expand Up @@ -1739,6 +1752,8 @@ async fn test_get_with_min_epoch() {
.await
.unwrap();

hummock_storage.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test());

{
// test before sync
let k = gen_key(0);
Expand Down
5 changes: 3 additions & 2 deletions src/storage/hummock_test/src/state_store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1046,11 +1046,12 @@ async fn test_delete_get_v2() {
)
.await
.unwrap();
let res = hummock_storage.seal_and_sync_epoch(epoch1).await.unwrap();
meta_client.commit_epoch(epoch1, res).await.unwrap();
let epoch2 = epoch1.next_epoch();

local.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test());
let res = hummock_storage.seal_and_sync_epoch(epoch1).await.unwrap();
meta_client.commit_epoch(epoch1, res).await.unwrap();

let batch2 = vec![(
gen_key_from_str(VirtualNode::ZERO, "bb"),
StorageValue::new_delete(),
Expand Down
94 changes: 48 additions & 46 deletions src/storage/src/hummock/event_handler/hummock_event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ pub struct HummockEventHandler {
version_update_rx: UnboundedReceiver<HummockVersionUpdate>,
read_version_mapping: Arc<RwLock<ReadVersionMappingType>>,
/// A copy of `read_version_mapping` but owned by event handler
local_read_version_mapping: HashMap<LocalInstanceId, HummockReadVersionRef>,
local_read_version_mapping: HashMap<LocalInstanceId, (TableId, HummockReadVersionRef)>,

version_update_notifier_tx: Arc<tokio::sync::watch::Sender<HummockEpoch>>,
pinned_version: Arc<ArcSwap<PinnedVersion>>,
Expand Down Expand Up @@ -455,7 +455,7 @@ impl HummockEventHandler {
let mut pending = VecDeque::new();
let mut total_count = 0;
for instance_id in instances {
let Some(read_version) = self.local_read_version_mapping.get(&instance_id) else {
let Some((_, read_version)) = self.local_read_version_mapping.get(&instance_id) else {
continue;
};
total_count += 1;
Expand All @@ -475,7 +475,7 @@ impl HummockEventHandler {
const TRY_LOCK_TIMEOUT: Duration = Duration::from_millis(1);

while let Some(instance_id) = pending.pop_front() {
let read_version = self
let (_, read_version) = self
.local_read_version_mapping
.get(&instance_id)
.expect("have checked exist before");
Expand Down Expand Up @@ -520,7 +520,6 @@ impl HummockEventHandler {
prev_epoch,
max_committed_epoch = self.uploader.max_committed_epoch(),
max_synced_epoch = self.uploader.max_synced_epoch(),
max_sealed_epoch = self.uploader.max_sealed_epoch(),
"handle clear event"
);

Expand Down Expand Up @@ -588,7 +587,7 @@ impl HummockEventHandler {
"read version mapping not empty when clear. remaining tables: {:?}",
self.local_read_version_mapping
.values()
.map(|read_version| read_version.read().table_id())
.map(|(_, read_version)| read_version.read().table_id())
.collect_vec()
);

Expand Down Expand Up @@ -784,6 +783,18 @@ impl HummockEventHandler {
HummockEvent::Shutdown => {
unreachable!("shutdown is handled specially")
}
HummockEvent::InitEpoch {
instance_id,
init_epoch,
} => {
let table_id = self
.local_read_version_mapping
.get(&instance_id)
.expect("should exist")
.0;
self.uploader
.init_instance(instance_id, table_id, init_epoch);
}
HummockEvent::ImmToUploader { instance_id, imm } => {
assert!(
self.local_read_version_mapping.contains_key(&instance_id),
Expand All @@ -795,29 +806,13 @@ impl HummockEventHandler {
self.uploader.may_flush();
}

HummockEvent::SealEpoch {
epoch,
is_checkpoint: _,
} => {
self.uploader.seal_epoch(epoch);
}

HummockEvent::LocalSealEpoch {
epoch,
next_epoch,
opts,
table_id,
instance_id,
} => {
assert!(
self.local_read_version_mapping
.contains_key(&instance_id),
"seal epoch from non-existing read version instance: instance_id: {}, table_id: {}, epoch: {}",
instance_id, table_id, epoch,
);
if let Some((direction, watermarks)) = opts.table_watermarks {
self.uploader
.add_table_watermarks(epoch, table_id, watermarks, direction)
}
self.uploader
.local_seal_epoch(instance_id, next_epoch, opts);
}

#[cfg(any(test, feature = "test"))]
Expand Down Expand Up @@ -852,7 +847,7 @@ impl HummockEventHandler {

{
self.local_read_version_mapping
.insert(instance_id, basic_read_version.clone());
.insert(instance_id, (table_id, basic_read_version.clone()));
let mut read_version_mapping_guard = self.read_version_mapping.write();

read_version_mapping_guard
Expand All @@ -876,33 +871,29 @@ impl HummockEventHandler {
table_id, instance_id
);
guard.event_sender.take().expect("sender is just set");
self.destroy_read_version(table_id, instance_id);
self.destroy_read_version(instance_id);
}
}
}

HummockEvent::DestroyReadVersion {
table_id,
instance_id,
} => {
self.destroy_read_version(table_id, instance_id);
HummockEvent::DestroyReadVersion { instance_id } => {
self.uploader.may_destroy_instance(instance_id);
self.destroy_read_version(instance_id);
}
}
}

fn destroy_read_version(&mut self, table_id: TableId, instance_id: LocalInstanceId) {
fn destroy_read_version(&mut self, instance_id: LocalInstanceId) {
{
{
debug!(
"read version deregister: table_id: {}, instance_id: {}",
table_id, instance_id
);
self.local_read_version_mapping
debug!("read version deregister: instance_id: {}", instance_id);
let (table_id, _) = self
.local_read_version_mapping
.remove(&instance_id)
.unwrap_or_else(|| {
panic!(
"DestroyHummockInstance inexist instance table_id {} instance_id {}",
table_id, instance_id
"DestroyHummockInstance inexist instance instance_id {}",
instance_id
)
});
let mut read_version_mapping_guard = self.read_version_mapping.write();
Expand Down Expand Up @@ -994,6 +985,7 @@ mod tests {
use crate::hummock::test_utils::default_opts_for_test;
use crate::hummock::HummockError;
use crate::monitor::HummockStateStoreMetrics;
use crate::store::SealCurrentEpochOptions;

#[tokio::test]
async fn test_clear_shared_buffer() {
Expand Down Expand Up @@ -1197,6 +1189,11 @@ mod tests {
rx.await.unwrap()
};

send_event(HummockEvent::InitEpoch {
instance_id: guard.instance_id,
init_epoch: epoch1,
});

let imm1 = gen_imm(epoch1).await;
read_version
.write()
Expand All @@ -1207,6 +1204,12 @@ mod tests {
imm: imm1,
});

send_event(HummockEvent::LocalSealEpoch {
instance_id: guard.instance_id,
next_epoch: epoch2,
opts: SealCurrentEpochOptions::for_test(),
});

let imm2 = gen_imm(epoch2).await;
read_version
.write()
Expand All @@ -1217,20 +1220,19 @@ mod tests {
imm: imm2,
});

send_event(HummockEvent::SealEpoch {
epoch: epoch1,
is_checkpoint: true,
let epoch3 = epoch2.next_epoch();
send_event(HummockEvent::LocalSealEpoch {
instance_id: guard.instance_id,
next_epoch: epoch3,
opts: SealCurrentEpochOptions::for_test(),
});

let (tx1, mut rx1) = oneshot::channel();
send_event(HummockEvent::SyncEpoch {
new_sync_epoch: epoch1,
sync_result_sender: tx1,
});
assert!(poll_fn(|cx| Poll::Ready(rx1.poll_unpin(cx).is_pending())).await);
send_event(HummockEvent::SealEpoch {
epoch: epoch2,
is_checkpoint: true,
});
let (tx2, mut rx2) = oneshot::channel();
send_event(HummockEvent::SyncEpoch {
new_sync_epoch: epoch2,
Expand Down
43 changes: 17 additions & 26 deletions src/storage/src/hummock/event_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,14 @@ pub enum HummockEvent {
imm: ImmutableMemtable,
},

SealEpoch {
epoch: HummockEpoch,
is_checkpoint: bool,
InitEpoch {
instance_id: LocalInstanceId,
init_epoch: HummockEpoch,
},

LocalSealEpoch {
instance_id: LocalInstanceId,
table_id: TableId,
epoch: HummockEpoch,
next_epoch: HummockEpoch,
opts: SealCurrentEpochOptions,
},

Expand All @@ -97,7 +96,6 @@ pub enum HummockEvent {
},

DestroyReadVersion {
table_id: TableId,
instance_id: LocalInstanceId,
},
}
Expand All @@ -116,27 +114,25 @@ impl HummockEvent {

HummockEvent::Shutdown => "Shutdown".to_string(),

HummockEvent::InitEpoch {
instance_id,
init_epoch,
} => {
format!("InitEpoch {} {}", instance_id, init_epoch)
}

HummockEvent::ImmToUploader { instance_id, imm } => {
format!("ImmToUploader {} {}", instance_id, imm.batch_id())
}

HummockEvent::SealEpoch {
epoch,
is_checkpoint,
} => format!(
"SealEpoch epoch {:?} is_checkpoint {:?}",
epoch, is_checkpoint
),

HummockEvent::LocalSealEpoch {
epoch,
instance_id,
table_id,
next_epoch,
opts,
} => {
format!(
"LocalSealEpoch epoch: {}, table_id: {}, instance_id: {}, opts: {:?}",
epoch, table_id.table_id, instance_id, opts
"LocalSealEpoch next_epoch: {}, instance_id: {}, opts: {:?}",
next_epoch, instance_id, opts
)
}

Expand All @@ -150,13 +146,9 @@ impl HummockEvent {
table_id, is_replicated
),

HummockEvent::DestroyReadVersion {
table_id,
instance_id,
} => format!(
"DestroyReadVersion table_id {:?} instance_id {:?}",
table_id, instance_id
),
HummockEvent::DestroyReadVersion { instance_id } => {
format!("DestroyReadVersion instance_id {:?}", instance_id)
}

#[cfg(any(test, feature = "test"))]
HummockEvent::FlushEvent(_) => "FlushEvent".to_string(),
Expand Down Expand Up @@ -210,7 +202,6 @@ impl Drop for LocalInstanceGuard {
// need to handle failure
sender
.send(HummockEvent::DestroyReadVersion {
table_id: self.table_id,
instance_id: self.instance_id,
})
.unwrap_or_else(|err| {
Expand Down
Loading

0 comments on commit d488f65

Please sign in to comment.