Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): avoid uploader depending on seal_epoch #16985

Merged
merged 3 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions src/storage/hummock_test/src/hummock_storage_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,9 @@ async fn test_state_store_sync() {
.await
.unwrap();

let epoch3 = epoch2.next_epoch();
hummock_storage.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test());

let res = test_env.storage.seal_and_sync_epoch(epoch1).await.unwrap();
test_env
.meta_client
Expand Down Expand Up @@ -829,14 +832,15 @@ async fn test_delete_get() {
.await
.unwrap();

let epoch2 = epoch1.next_epoch();
hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test());
let res = test_env.storage.seal_and_sync_epoch(epoch1).await.unwrap();
test_env
.meta_client
.commit_epoch(epoch1, res)
.await
.unwrap();
let epoch2 = epoch1.next_epoch();
hummock_storage.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test());

let batch2 = vec![(
gen_key_from_str(VirtualNode::ZERO, "bb"),
StorageValue::new_delete(),
Expand All @@ -851,6 +855,7 @@ async fn test_delete_get() {
)
.await
.unwrap();
hummock_storage.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test());
let res = test_env.storage.seal_and_sync_epoch(epoch2).await.unwrap();
test_env
.meta_client
Expand Down Expand Up @@ -1005,6 +1010,8 @@ async fn test_multiple_epoch_sync() {
};
test_get().await;

let epoch4 = epoch3.next_epoch();
hummock_storage.seal_current_epoch(epoch4, SealCurrentEpochOptions::for_test());
test_env.storage.seal_epoch(epoch1, false);
let sync_result2 = test_env.storage.seal_and_sync_epoch(epoch2).await.unwrap();
let sync_result3 = test_env.storage.seal_and_sync_epoch(epoch3).await.unwrap();
Expand Down Expand Up @@ -1079,6 +1086,9 @@ async fn test_iter_with_min_epoch() {
.await
.unwrap();

let epoch3 = (33 * 1000) << 16;
hummock_storage.seal_current_epoch(epoch3, SealCurrentEpochOptions::for_test());

{
// test before sync
{
Expand Down Expand Up @@ -1329,6 +1339,9 @@ async fn test_hummock_version_reader() {
.await
.unwrap();

let epoch4 = (34 * 1000) << 16;
hummock_storage.seal_current_epoch(epoch4, SealCurrentEpochOptions::for_test());

{
// test before sync
{
Expand Down Expand Up @@ -1739,6 +1752,8 @@ async fn test_get_with_min_epoch() {
.await
.unwrap();

hummock_storage.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test());

{
// test before sync
let k = gen_key(0);
Expand Down
5 changes: 3 additions & 2 deletions src/storage/hummock_test/src/state_store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1046,11 +1046,12 @@ async fn test_delete_get_v2() {
)
.await
.unwrap();
let res = hummock_storage.seal_and_sync_epoch(epoch1).await.unwrap();
meta_client.commit_epoch(epoch1, res).await.unwrap();
let epoch2 = epoch1.next_epoch();

local.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test());
let res = hummock_storage.seal_and_sync_epoch(epoch1).await.unwrap();
meta_client.commit_epoch(epoch1, res).await.unwrap();

let batch2 = vec![(
gen_key_from_str(VirtualNode::ZERO, "bb"),
StorageValue::new_delete(),
Expand Down
94 changes: 48 additions & 46 deletions src/storage/src/hummock/event_handler/hummock_event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ pub struct HummockEventHandler {
version_update_rx: UnboundedReceiver<HummockVersionUpdate>,
read_version_mapping: Arc<RwLock<ReadVersionMappingType>>,
/// A copy of `read_version_mapping` but owned by event handler
local_read_version_mapping: HashMap<LocalInstanceId, HummockReadVersionRef>,
local_read_version_mapping: HashMap<LocalInstanceId, (TableId, HummockReadVersionRef)>,

version_update_notifier_tx: Arc<tokio::sync::watch::Sender<HummockEpoch>>,
pinned_version: Arc<ArcSwap<PinnedVersion>>,
Expand Down Expand Up @@ -455,7 +455,7 @@ impl HummockEventHandler {
let mut pending = VecDeque::new();
let mut total_count = 0;
for instance_id in instances {
let Some(read_version) = self.local_read_version_mapping.get(&instance_id) else {
let Some((_, read_version)) = self.local_read_version_mapping.get(&instance_id) else {
continue;
};
total_count += 1;
Expand All @@ -475,7 +475,7 @@ impl HummockEventHandler {
const TRY_LOCK_TIMEOUT: Duration = Duration::from_millis(1);

while let Some(instance_id) = pending.pop_front() {
let read_version = self
let (_, read_version) = self
.local_read_version_mapping
.get(&instance_id)
.expect("have checked exist before");
Expand Down Expand Up @@ -520,7 +520,6 @@ impl HummockEventHandler {
prev_epoch,
max_committed_epoch = self.uploader.max_committed_epoch(),
max_synced_epoch = self.uploader.max_synced_epoch(),
max_sealed_epoch = self.uploader.max_sealed_epoch(),
"handle clear event"
);

Expand Down Expand Up @@ -588,7 +587,7 @@ impl HummockEventHandler {
"read version mapping not empty when clear. remaining tables: {:?}",
self.local_read_version_mapping
.values()
.map(|read_version| read_version.read().table_id())
.map(|(_, read_version)| read_version.read().table_id())
.collect_vec()
);

Expand Down Expand Up @@ -784,6 +783,18 @@ impl HummockEventHandler {
HummockEvent::Shutdown => {
unreachable!("shutdown is handled specially")
}
HummockEvent::InitEpoch {
instance_id,
init_epoch,
} => {
let table_id = self
.local_read_version_mapping
.get(&instance_id)
.expect("should exist")
.0;
self.uploader
.init_instance(instance_id, table_id, init_epoch);
}
HummockEvent::ImmToUploader { instance_id, imm } => {
assert!(
self.local_read_version_mapping.contains_key(&instance_id),
Expand All @@ -795,29 +806,13 @@ impl HummockEventHandler {
self.uploader.may_flush();
}

HummockEvent::SealEpoch {
epoch,
is_checkpoint: _,
} => {
self.uploader.seal_epoch(epoch);
}

HummockEvent::LocalSealEpoch {
epoch,
next_epoch,
opts,
table_id,
instance_id,
} => {
assert!(
self.local_read_version_mapping
.contains_key(&instance_id),
"seal epoch from non-existing read version instance: instance_id: {}, table_id: {}, epoch: {}",
instance_id, table_id, epoch,
);
if let Some((direction, watermarks)) = opts.table_watermarks {
self.uploader
.add_table_watermarks(epoch, table_id, watermarks, direction)
}
self.uploader
.local_seal_epoch(instance_id, next_epoch, opts);
}

#[cfg(any(test, feature = "test"))]
Expand Down Expand Up @@ -852,7 +847,7 @@ impl HummockEventHandler {

{
self.local_read_version_mapping
.insert(instance_id, basic_read_version.clone());
.insert(instance_id, (table_id, basic_read_version.clone()));
let mut read_version_mapping_guard = self.read_version_mapping.write();

read_version_mapping_guard
Expand All @@ -876,33 +871,29 @@ impl HummockEventHandler {
table_id, instance_id
);
guard.event_sender.take().expect("sender is just set");
self.destroy_read_version(table_id, instance_id);
self.destroy_read_version(instance_id);
}
}
}

HummockEvent::DestroyReadVersion {
table_id,
instance_id,
} => {
self.destroy_read_version(table_id, instance_id);
HummockEvent::DestroyReadVersion { instance_id } => {
self.uploader.may_destroy_instance(instance_id);
self.destroy_read_version(instance_id);
}
}
}

fn destroy_read_version(&mut self, table_id: TableId, instance_id: LocalInstanceId) {
fn destroy_read_version(&mut self, instance_id: LocalInstanceId) {
{
{
debug!(
"read version deregister: table_id: {}, instance_id: {}",
table_id, instance_id
);
self.local_read_version_mapping
debug!("read version deregister: instance_id: {}", instance_id);
let (table_id, _) = self
.local_read_version_mapping
.remove(&instance_id)
.unwrap_or_else(|| {
panic!(
"DestroyHummockInstance inexist instance table_id {} instance_id {}",
table_id, instance_id
"DestroyHummockInstance inexist instance instance_id {}",
instance_id
)
});
let mut read_version_mapping_guard = self.read_version_mapping.write();
Expand Down Expand Up @@ -994,6 +985,7 @@ mod tests {
use crate::hummock::test_utils::default_opts_for_test;
use crate::hummock::HummockError;
use crate::monitor::HummockStateStoreMetrics;
use crate::store::SealCurrentEpochOptions;

#[tokio::test]
async fn test_clear_shared_buffer() {
Expand Down Expand Up @@ -1197,6 +1189,11 @@ mod tests {
rx.await.unwrap()
};

send_event(HummockEvent::InitEpoch {
instance_id: guard.instance_id,
init_epoch: epoch1,
});

let imm1 = gen_imm(epoch1).await;
read_version
.write()
Expand All @@ -1207,6 +1204,12 @@ mod tests {
imm: imm1,
});

send_event(HummockEvent::LocalSealEpoch {
instance_id: guard.instance_id,
next_epoch: epoch2,
opts: SealCurrentEpochOptions::for_test(),
});

let imm2 = gen_imm(epoch2).await;
read_version
.write()
Expand All @@ -1217,20 +1220,19 @@ mod tests {
imm: imm2,
});

send_event(HummockEvent::SealEpoch {
epoch: epoch1,
is_checkpoint: true,
let epoch3 = epoch2.next_epoch();
send_event(HummockEvent::LocalSealEpoch {
instance_id: guard.instance_id,
next_epoch: epoch3,
opts: SealCurrentEpochOptions::for_test(),
});

let (tx1, mut rx1) = oneshot::channel();
send_event(HummockEvent::SyncEpoch {
new_sync_epoch: epoch1,
sync_result_sender: tx1,
});
assert!(poll_fn(|cx| Poll::Ready(rx1.poll_unpin(cx).is_pending())).await);
send_event(HummockEvent::SealEpoch {
epoch: epoch2,
is_checkpoint: true,
});
let (tx2, mut rx2) = oneshot::channel();
send_event(HummockEvent::SyncEpoch {
new_sync_epoch: epoch2,
Expand Down
43 changes: 17 additions & 26 deletions src/storage/src/hummock/event_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,14 @@ pub enum HummockEvent {
imm: ImmutableMemtable,
},

SealEpoch {
epoch: HummockEpoch,
is_checkpoint: bool,
InitEpoch {
instance_id: LocalInstanceId,
init_epoch: HummockEpoch,
},

LocalSealEpoch {
instance_id: LocalInstanceId,
table_id: TableId,
epoch: HummockEpoch,
next_epoch: HummockEpoch,
opts: SealCurrentEpochOptions,
},

Expand All @@ -97,7 +96,6 @@ pub enum HummockEvent {
},

DestroyReadVersion {
table_id: TableId,
instance_id: LocalInstanceId,
},
}
Expand All @@ -116,27 +114,25 @@ impl HummockEvent {

HummockEvent::Shutdown => "Shutdown".to_string(),

HummockEvent::InitEpoch {
instance_id,
init_epoch,
} => {
format!("InitEpoch {} {}", instance_id, init_epoch)
}

HummockEvent::ImmToUploader { instance_id, imm } => {
format!("ImmToUploader {} {}", instance_id, imm.batch_id())
}

HummockEvent::SealEpoch {
epoch,
is_checkpoint,
} => format!(
"SealEpoch epoch {:?} is_checkpoint {:?}",
epoch, is_checkpoint
),

HummockEvent::LocalSealEpoch {
epoch,
instance_id,
table_id,
next_epoch,
opts,
} => {
format!(
"LocalSealEpoch epoch: {}, table_id: {}, instance_id: {}, opts: {:?}",
epoch, table_id.table_id, instance_id, opts
"LocalSealEpoch next_epoch: {}, instance_id: {}, opts: {:?}",
next_epoch, instance_id, opts
)
}

Expand All @@ -150,13 +146,9 @@ impl HummockEvent {
table_id, is_replicated
),

HummockEvent::DestroyReadVersion {
table_id,
instance_id,
} => format!(
"DestroyReadVersion table_id {:?} instance_id {:?}",
table_id, instance_id
),
HummockEvent::DestroyReadVersion { instance_id } => {
format!("DestroyReadVersion instance_id {:?}", instance_id)
}

#[cfg(any(test, feature = "test"))]
HummockEvent::FlushEvent(_) => "FlushEvent".to_string(),
Expand Down Expand Up @@ -210,7 +202,6 @@ impl Drop for LocalInstanceGuard {
// need to handle failure
sender
.send(HummockEvent::DestroyReadVersion {
table_id: self.table_id,
instance_id: self.instance_id,
})
.unwrap_or_else(|err| {
Expand Down
Loading
Loading