From 0aa523cd8cbe8630f95eff163c640f388b761498 Mon Sep 17 00:00:00 2001 From: Jeremyhi Date: Tue, 21 May 2024 16:30:57 +0800 Subject: [PATCH] feat: make create view procedure simple as others (#4001) --- src/common/meta/src/ddl/create_view.rs | 78 ++++++++++---------------- src/common/procedure/src/local.rs | 9 ++- src/common/procedure/src/store.rs | 77 +++++++++++++++++-------- 3 files changed, 90 insertions(+), 74 deletions(-) diff --git a/src/common/meta/src/ddl/create_view.rs b/src/common/meta/src/ddl/create_view.rs index c9163dd5cabc..5d364ba77417 100644 --- a/src/common/meta/src/ddl/create_view.rs +++ b/src/common/meta/src/ddl/create_view.rs @@ -30,10 +30,10 @@ use crate::lock_key::{CatalogLock, SchemaLock, TableNameLock}; use crate::rpc::ddl::CreateViewTask; use crate::{metrics, ClusterId}; -// The proceudure to execute `[CreateViewTask]`. +// The procedure to execute `[CreateViewTask]`. pub struct CreateViewProcedure { pub context: DdlContext, - pub creator: ViewCreator, + pub data: CreateViewData, } impl CreateViewProcedure { @@ -42,24 +42,27 @@ impl CreateViewProcedure { pub fn new(cluster_id: ClusterId, task: CreateViewTask, context: DdlContext) -> Self { Self { context, - creator: ViewCreator::new(cluster_id, task), + data: CreateViewData { + state: CreateViewState::Prepare, + cluster_id, + task, + need_update: false, + }, } } pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult { let data = serde_json::from_str(json).context(FromJsonSnafu)?; - let creator = ViewCreator { data }; - - Ok(CreateViewProcedure { context, creator }) + Ok(CreateViewProcedure { context, data }) } fn view_info(&self) -> &RawTableInfo { - &self.creator.data.task.view_info + &self.data.task.view_info } fn need_update(&self) -> bool { - self.creator.data.need_update + self.data.need_update } pub(crate) fn view_id(&self) -> TableId { @@ -68,7 +71,7 @@ impl CreateViewProcedure { #[cfg(any(test, feature = "testing"))] pub fn set_allocated_metadata(&mut self, view_id: TableId) { - self.creator.set_allocated_metadata(view_id, false) + self.data.set_allocated_metadata(view_id, false) } /// On the prepare step, it performs: @@ -79,7 +82,7 @@ impl CreateViewProcedure { /// - ViewName exists and `create_if_not_exists` is false. /// - Failed to allocate [ViewMetadata]. pub(crate) async fn on_prepare(&mut self) -> Result { - let expr = &self.creator.data.task.create_view; + let expr = &self.data.task.create_view; let view_name_value = self .context .table_metadata_manager @@ -102,7 +105,7 @@ impl CreateViewProcedure { ensure!( expr.create_if_not_exists || expr.or_replace, error::ViewAlreadyExistsSnafu { - view_name: self.creator.data.table_ref().to_string(), + view_name: self.data.table_ref().to_string(), } ); @@ -122,18 +125,18 @@ impl CreateViewProcedure { .get(view_id) .await? .with_context(|| error::TableInfoNotFoundSnafu { - table: self.creator.data.table_ref().to_string(), + table: self.data.table_ref().to_string(), })?; // Ensure the exists one is view, we can't replace a table. ensure!( view_info_value.table_info.table_type == TableType::View, error::TableAlreadyExistsSnafu { - table_name: self.creator.data.table_ref().to_string(), + table_name: self.data.table_ref().to_string(), } ); - self.creator.set_allocated_metadata(view_id, true); + self.data.set_allocated_metadata(view_id, true); } else { // Allocate the new `view_id`. let TableMetadata { table_id, .. } = self @@ -141,15 +144,15 @@ impl CreateViewProcedure { .table_metadata_allocator .create_view( &TableMetadataAllocatorContext { - cluster_id: self.creator.data.cluster_id, + cluster_id: self.data.cluster_id, }, &None, ) .await?; - self.creator.set_allocated_metadata(table_id, false); + self.data.set_allocated_metadata(table_id, false); } - self.creator.data.state = CreateViewState::CreateMetadata; + self.data.state = CreateViewState::CreateMetadata; Ok(Status::executing(true)) } @@ -169,9 +172,9 @@ impl CreateViewProcedure { .get(view_id) .await? .with_context(|| error::ViewNotFoundSnafu { - view_name: self.creator.data.table_ref().to_string(), + view_name: self.data.table_ref().to_string(), })?; - let new_logical_plan = self.creator.data.task.raw_logical_plan().clone(); + let new_logical_plan = self.data.task.raw_logical_plan().clone(); manager .update_view_info(view_id, ¤t_view_info, new_logical_plan) .await?; @@ -180,7 +183,7 @@ impl CreateViewProcedure { } else { let raw_view_info = self.view_info().clone(); manager - .create_view_metadata(raw_view_info, self.creator.data.task.raw_logical_plan()) + .create_view_metadata(raw_view_info, self.data.task.raw_logical_plan()) .await?; info!( @@ -200,7 +203,7 @@ impl Procedure for CreateViewProcedure { } async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult { - let state = &self.creator.data.state; + let state = &self.data.state; let _timer = metrics::METRIC_META_PROCEDURE_CREATE_VIEW .with_label_values(&[state.as_ref()]) @@ -214,11 +217,11 @@ impl Procedure for CreateViewProcedure { } fn dump(&self) -> ProcedureResult { - serde_json::to_string(&self.creator.data).context(ToJsonSnafu) + serde_json::to_string(&self.data).context(ToJsonSnafu) } fn lock_key(&self) -> LockKey { - let table_ref = &self.creator.data.table_ref(); + let table_ref = &self.data.table_ref(); LockKey::new(vec![ CatalogLock::Read(table_ref.catalog).into(), @@ -228,30 +231,6 @@ impl Procedure for CreateViewProcedure { } } -/// The VIEW creator -pub struct ViewCreator { - /// The serializable data. - pub data: CreateViewData, -} - -impl ViewCreator { - pub fn new(cluster_id: u64, task: CreateViewTask) -> Self { - Self { - data: CreateViewData { - state: CreateViewState::Prepare, - cluster_id, - task, - need_update: false, - }, - } - } - - fn set_allocated_metadata(&mut self, view_id: TableId, need_update: bool) { - self.data.task.view_info.ident.table_id = view_id; - self.data.need_update = need_update; - } -} - #[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)] pub enum CreateViewState { /// Prepares to create the table @@ -270,6 +249,11 @@ pub struct CreateViewData { } impl CreateViewData { + fn set_allocated_metadata(&mut self, view_id: TableId, need_update: bool) { + self.task.view_info.ident.table_id = view_id; + self.need_update = need_update; + } + fn table_ref(&self) -> TableReference<'_> { self.task.table_ref() } diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index eb984373adbe..5fdd81a9562f 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -36,7 +36,7 @@ use crate::error::{ }; use crate::local::runner::Runner; use crate::procedure::{BoxedProcedureLoader, InitProcedureState}; -use crate::store::{ProcedureMessage, ProcedureStore, StateStoreRef}; +use crate::store::{ProcedureMessage, ProcedureMessages, ProcedureStore, StateStoreRef}; use crate::{ BoxedProcedure, ContextProvider, LockKey, ProcedureId, ProcedureManager, ProcedureState, ProcedureWithId, Watcher, @@ -534,8 +534,11 @@ impl LocalManager { info!("LocalManager start to recover"); let recover_start = Instant::now(); - let (messages, rollback_messages, finished_ids) = - self.procedure_store.load_messages().await?; + let ProcedureMessages { + messages, + rollback_messages, + finished_ids, + } = self.procedure_store.load_messages().await?; // Submits recovered messages first. self.submit_recovered_messages(rollback_messages, InitProcedureState::RollingBack); self.submit_recovered_messages(messages, InitProcedureState::Running); diff --git a/src/common/procedure/src/store.rs b/src/common/procedure/src/store.rs index d25df4b66a80..22e5043d306a 100644 --- a/src/common/procedure/src/store.rs +++ b/src/common/procedure/src/store.rs @@ -55,6 +55,17 @@ pub struct ProcedureMessage { pub error: Option, } +/// A collection of all procedures' messages. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ProcedureMessages { + /// A map of uncommitted procedures + pub messages: HashMap, + /// A map of rolling back procedures + pub rollback_messages: HashMap, + /// A list of finished procedures' ids + pub finished_ids: Vec, +} + /// Procedure storage layer. pub(crate) struct ProcedureStore { proc_path: String, @@ -182,17 +193,7 @@ impl ProcedureStore { } /// Load procedures from the storage. - /// Returns: - /// - a map of uncommitted procedures - /// - a map of rolling back procedures - /// - a list of finished procedures' ids - pub(crate) async fn load_messages( - &self, - ) -> Result<( - HashMap, - HashMap, - Vec, - )> { + pub(crate) async fn load_messages(&self) -> Result { // Track the key-value pair by procedure id. let mut procedure_key_values: HashMap<_, (ParsedKey, Vec)> = HashMap::new(); @@ -242,7 +243,11 @@ impl ProcedureStore { } } - Ok((messages, rollback_messages, finished_ids)) + Ok(ProcedureMessages { + messages, + rollback_messages, + finished_ids, + }) } fn load_one_message(&self, key: &ParsedKey, value: &[u8]) -> Option { @@ -515,10 +520,14 @@ mod tests { .await .unwrap(); - let (messages, rollback_messages, finished) = store.load_messages().await.unwrap(); + let ProcedureMessages { + messages, + rollback_messages, + finished_ids, + } = store.load_messages().await.unwrap(); assert_eq!(1, messages.len()); assert!(rollback_messages.is_empty()); - assert!(finished.is_empty()); + assert!(finished_ids.is_empty()); let msg = messages.get(&procedure_id).unwrap(); let expect = ProcedureMessage { type_name: "MockProcedure".to_string(), @@ -545,10 +554,14 @@ mod tests { .unwrap(); store.commit_procedure(procedure_id, 1).await.unwrap(); - let (messages, rollback_messages, finished) = store.load_messages().await.unwrap(); + let ProcedureMessages { + messages, + rollback_messages, + finished_ids, + } = store.load_messages().await.unwrap(); assert!(messages.is_empty()); assert!(rollback_messages.is_empty()); - assert_eq!(&[procedure_id], &finished[..]); + assert_eq!(&[procedure_id], &finished_ids[..]); } #[tokio::test] @@ -582,10 +595,14 @@ mod tests { .await .unwrap(); - let (messages, rollback_messages, finished) = store.load_messages().await.unwrap(); + let ProcedureMessages { + messages, + rollback_messages, + finished_ids, + } = store.load_messages().await.unwrap(); assert!(messages.is_empty()); assert_eq!(1, rollback_messages.len()); - assert!(finished.is_empty()); + assert!(finished_ids.is_empty()); assert!(rollback_messages.contains_key(&procedure_id)); } @@ -611,10 +628,14 @@ mod tests { store.delete_procedure(procedure_id).await.unwrap(); - let (messages, rollback_messages, finished) = store.load_messages().await.unwrap(); + let ProcedureMessages { + messages, + rollback_messages, + finished_ids, + } = store.load_messages().await.unwrap(); assert!(messages.is_empty()); assert!(rollback_messages.is_empty()); - assert!(finished.is_empty()); + assert!(finished_ids.is_empty()); } #[tokio::test] @@ -642,10 +663,14 @@ mod tests { store.delete_procedure(procedure_id).await.unwrap(); - let (messages, rollback_messages, finished) = store.load_messages().await.unwrap(); + let ProcedureMessages { + messages, + rollback_messages, + finished_ids, + } = store.load_messages().await.unwrap(); assert!(messages.is_empty()); assert!(rollback_messages.is_empty()); - assert!(finished.is_empty()); + assert!(finished_ids.is_empty()); } #[tokio::test] @@ -705,10 +730,14 @@ mod tests { .await .unwrap(); - let (messages, rollback_messages, finished) = store.load_messages().await.unwrap(); + let ProcedureMessages { + messages, + rollback_messages, + finished_ids, + } = store.load_messages().await.unwrap(); assert_eq!(2, messages.len()); assert!(rollback_messages.is_empty()); - assert_eq!(1, finished.len()); + assert_eq!(1, finished_ids.len()); let msg = messages.get(&id0).unwrap(); assert_eq!("id0-2", msg.data);