From 21466a00e9dde6a9b170611bc9f9d9c27964592c Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Thu, 8 Jun 2023 00:31:12 +0800 Subject: [PATCH] add no_upload flag --- src/storage/src/hummock/store/state_store.rs | 11 ++++++--- src/storage/src/store.rs | 26 ++++++++++++++++++++ src/stream/src/common/table/state_table.rs | 10 ++++---- 3 files changed, 39 insertions(+), 8 deletions(-) diff --git a/src/storage/src/hummock/store/state_store.rs b/src/storage/src/hummock/store/state_store.rs index 7c06b7fb3e790..acb45a8615268 100644 --- a/src/storage/src/hummock/store/state_store.rs +++ b/src/storage/src/hummock/store/state_store.rs @@ -66,6 +66,8 @@ pub struct LocalHummockStorage { /// Read handle. read_version: Arc>, + no_upload: bool, + /// Event sender. event_sender: mpsc::UnboundedSender, @@ -404,9 +406,11 @@ impl LocalHummockStorage { self.update(VersionUpdate::Staging(StagingData::ImmMem(imm.clone()))); // insert imm to uploader - self.event_sender - .send(HummockEvent::ImmToUploader(imm)) - .unwrap(); + if self.no_upload { + self.event_sender + .send(HummockEvent::ImmToUploader(imm)) + .unwrap(); + } timer.observe_duration(); @@ -437,6 +441,7 @@ impl LocalHummockStorage { table_id: option.table_id, is_consistent_op: option.is_consistent_op, table_option: option.table_option, + no_upload: option.no_upload, instance_guard, read_version, event_sender, diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index 9950d76a13d42..802a97962bf67 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -375,9 +375,34 @@ pub struct NewLocalOptions { /// `update` and `delete` should match the original stored value. pub is_consistent_op: bool, pub table_option: TableOption, + + /// Used to indicate that we should not upload the data. + pub no_upload: bool, } impl NewLocalOptions { + pub fn new(table_id: TableId, is_consistent_op: bool, table_option: TableOption) -> Self { + NewLocalOptions { + table_id, + is_consistent_op, + table_option, + no_upload: true, + } + } + + pub fn new_temporary( + table_id: TableId, + is_consistent_op: bool, + table_option: TableOption, + ) -> Self { + NewLocalOptions { + table_id, + is_consistent_op, + table_option, + no_upload: false, + } + } + pub fn for_test(table_id: TableId) -> Self { Self { table_id, @@ -385,6 +410,7 @@ impl NewLocalOptions { table_option: TableOption { retention_seconds: None, }, + no_upload: false, } } } diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 93ef6898acffa..9ffc788a1c38f 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -189,11 +189,11 @@ where let table_option = TableOption::build_table_option(table_catalog.get_properties()); let local_state_store = store - .new_local(NewLocalOptions { + .new_local(NewLocalOptions::new( table_id, is_consistent_op, table_option, - }) + )) .await; let pk_data_types = pk_indices @@ -384,11 +384,11 @@ where is_consistent_op: bool, ) -> Self { let local_state_store = store - .new_local(NewLocalOptions { + .new_local(NewLocalOptions::new( table_id, is_consistent_op, - table_option: TableOption::default(), - }) + TableOption::default(), + )) .await; let pk_data_types = pk_indices