Skip to content

Commit

Permalink
feat: make create view procedure simple as others (#4001)
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun authored May 21, 2024
1 parent 7a8222d commit 0aa523c
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 74 deletions.
78 changes: 31 additions & 47 deletions src/common/meta/src/ddl/create_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ use crate::lock_key::{CatalogLock, SchemaLock, TableNameLock};
use crate::rpc::ddl::CreateViewTask;
use crate::{metrics, ClusterId};

// The proceudure to execute `[CreateViewTask]`.
// The procedure to execute `[CreateViewTask]`.
pub struct CreateViewProcedure {
pub context: DdlContext,
pub creator: ViewCreator,
pub data: CreateViewData,
}

impl CreateViewProcedure {
Expand All @@ -42,24 +42,27 @@ impl CreateViewProcedure {
pub fn new(cluster_id: ClusterId, task: CreateViewTask, context: DdlContext) -> Self {
Self {
context,
creator: ViewCreator::new(cluster_id, task),
data: CreateViewData {
state: CreateViewState::Prepare,
cluster_id,
task,
need_update: false,
},
}
}

pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data = serde_json::from_str(json).context(FromJsonSnafu)?;

let creator = ViewCreator { data };

Ok(CreateViewProcedure { context, creator })
Ok(CreateViewProcedure { context, data })
}

fn view_info(&self) -> &RawTableInfo {
&self.creator.data.task.view_info
&self.data.task.view_info
}

fn need_update(&self) -> bool {
self.creator.data.need_update
self.data.need_update
}

pub(crate) fn view_id(&self) -> TableId {
Expand All @@ -68,7 +71,7 @@ impl CreateViewProcedure {

#[cfg(any(test, feature = "testing"))]
pub fn set_allocated_metadata(&mut self, view_id: TableId) {
self.creator.set_allocated_metadata(view_id, false)
self.data.set_allocated_metadata(view_id, false)
}

/// On the prepare step, it performs:
Expand All @@ -79,7 +82,7 @@ impl CreateViewProcedure {
/// - ViewName exists and `create_if_not_exists` is false.
/// - Failed to allocate [ViewMetadata].
pub(crate) async fn on_prepare(&mut self) -> Result<Status> {
let expr = &self.creator.data.task.create_view;
let expr = &self.data.task.create_view;
let view_name_value = self
.context
.table_metadata_manager
Expand All @@ -102,7 +105,7 @@ impl CreateViewProcedure {
ensure!(
expr.create_if_not_exists || expr.or_replace,
error::ViewAlreadyExistsSnafu {
view_name: self.creator.data.table_ref().to_string(),
view_name: self.data.table_ref().to_string(),
}
);

Expand All @@ -122,34 +125,34 @@ impl CreateViewProcedure {
.get(view_id)
.await?
.with_context(|| error::TableInfoNotFoundSnafu {
table: self.creator.data.table_ref().to_string(),
table: self.data.table_ref().to_string(),
})?;

// Ensure the exists one is view, we can't replace a table.
ensure!(
view_info_value.table_info.table_type == TableType::View,
error::TableAlreadyExistsSnafu {
table_name: self.creator.data.table_ref().to_string(),
table_name: self.data.table_ref().to_string(),
}
);

self.creator.set_allocated_metadata(view_id, true);
self.data.set_allocated_metadata(view_id, true);
} else {
// Allocate the new `view_id`.
let TableMetadata { table_id, .. } = self
.context
.table_metadata_allocator
.create_view(
&TableMetadataAllocatorContext {
cluster_id: self.creator.data.cluster_id,
cluster_id: self.data.cluster_id,
},
&None,
)
.await?;
self.creator.set_allocated_metadata(table_id, false);
self.data.set_allocated_metadata(table_id, false);
}

self.creator.data.state = CreateViewState::CreateMetadata;
self.data.state = CreateViewState::CreateMetadata;

Ok(Status::executing(true))
}
Expand All @@ -169,9 +172,9 @@ impl CreateViewProcedure {
.get(view_id)
.await?
.with_context(|| error::ViewNotFoundSnafu {
view_name: self.creator.data.table_ref().to_string(),
view_name: self.data.table_ref().to_string(),
})?;
let new_logical_plan = self.creator.data.task.raw_logical_plan().clone();
let new_logical_plan = self.data.task.raw_logical_plan().clone();
manager
.update_view_info(view_id, &current_view_info, new_logical_plan)
.await?;
Expand All @@ -180,7 +183,7 @@ impl CreateViewProcedure {
} else {
let raw_view_info = self.view_info().clone();
manager
.create_view_metadata(raw_view_info, self.creator.data.task.raw_logical_plan())
.create_view_metadata(raw_view_info, self.data.task.raw_logical_plan())
.await?;

info!(
Expand All @@ -200,7 +203,7 @@ impl Procedure for CreateViewProcedure {
}

async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &self.creator.data.state;
let state = &self.data.state;

let _timer = metrics::METRIC_META_PROCEDURE_CREATE_VIEW
.with_label_values(&[state.as_ref()])
Expand All @@ -214,11 +217,11 @@ impl Procedure for CreateViewProcedure {
}

fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.creator.data).context(ToJsonSnafu)
serde_json::to_string(&self.data).context(ToJsonSnafu)
}

fn lock_key(&self) -> LockKey {
let table_ref = &self.creator.data.table_ref();
let table_ref = &self.data.table_ref();

LockKey::new(vec![
CatalogLock::Read(table_ref.catalog).into(),
Expand All @@ -228,30 +231,6 @@ impl Procedure for CreateViewProcedure {
}
}

/// The VIEW creator
pub struct ViewCreator {
/// The serializable data.
pub data: CreateViewData,
}

impl ViewCreator {
pub fn new(cluster_id: u64, task: CreateViewTask) -> Self {
Self {
data: CreateViewData {
state: CreateViewState::Prepare,
cluster_id,
task,
need_update: false,
},
}
}

fn set_allocated_metadata(&mut self, view_id: TableId, need_update: bool) {
self.data.task.view_info.ident.table_id = view_id;
self.data.need_update = need_update;
}
}

#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)]
pub enum CreateViewState {
/// Prepares to create the table
Expand All @@ -270,6 +249,11 @@ pub struct CreateViewData {
}

impl CreateViewData {
fn set_allocated_metadata(&mut self, view_id: TableId, need_update: bool) {
self.task.view_info.ident.table_id = view_id;
self.need_update = need_update;
}

fn table_ref(&self) -> TableReference<'_> {
self.task.table_ref()
}
Expand Down
9 changes: 6 additions & 3 deletions src/common/procedure/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::error::{
};
use crate::local::runner::Runner;
use crate::procedure::{BoxedProcedureLoader, InitProcedureState};
use crate::store::{ProcedureMessage, ProcedureStore, StateStoreRef};
use crate::store::{ProcedureMessage, ProcedureMessages, ProcedureStore, StateStoreRef};
use crate::{
BoxedProcedure, ContextProvider, LockKey, ProcedureId, ProcedureManager, ProcedureState,
ProcedureWithId, Watcher,
Expand Down Expand Up @@ -534,8 +534,11 @@ impl LocalManager {
info!("LocalManager start to recover");
let recover_start = Instant::now();

let (messages, rollback_messages, finished_ids) =
self.procedure_store.load_messages().await?;
let ProcedureMessages {
messages,
rollback_messages,
finished_ids,
} = self.procedure_store.load_messages().await?;
// Submits recovered messages first.
self.submit_recovered_messages(rollback_messages, InitProcedureState::RollingBack);
self.submit_recovered_messages(messages, InitProcedureState::Running);
Expand Down
77 changes: 53 additions & 24 deletions src/common/procedure/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,17 @@ pub struct ProcedureMessage {
pub error: Option<String>,
}

/// A collection of all procedures' messages.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ProcedureMessages {
/// A map of uncommitted procedures
pub messages: HashMap<ProcedureId, ProcedureMessage>,
/// A map of rolling back procedures
pub rollback_messages: HashMap<ProcedureId, ProcedureMessage>,
/// A list of finished procedures' ids
pub finished_ids: Vec<ProcedureId>,
}

/// Procedure storage layer.
pub(crate) struct ProcedureStore {
proc_path: String,
Expand Down Expand Up @@ -182,17 +193,7 @@ impl ProcedureStore {
}

/// Load procedures from the storage.
/// Returns:
/// - a map of uncommitted procedures
/// - a map of rolling back procedures
/// - a list of finished procedures' ids
pub(crate) async fn load_messages(
&self,
) -> Result<(
HashMap<ProcedureId, ProcedureMessage>,
HashMap<ProcedureId, ProcedureMessage>,
Vec<ProcedureId>,
)> {
pub(crate) async fn load_messages(&self) -> Result<ProcedureMessages> {
// Track the key-value pair by procedure id.
let mut procedure_key_values: HashMap<_, (ParsedKey, Vec<u8>)> = HashMap::new();

Expand Down Expand Up @@ -242,7 +243,11 @@ impl ProcedureStore {
}
}

Ok((messages, rollback_messages, finished_ids))
Ok(ProcedureMessages {
messages,
rollback_messages,
finished_ids,
})
}

fn load_one_message(&self, key: &ParsedKey, value: &[u8]) -> Option<ProcedureMessage> {
Expand Down Expand Up @@ -515,10 +520,14 @@ mod tests {
.await
.unwrap();

let (messages, rollback_messages, finished) = store.load_messages().await.unwrap();
let ProcedureMessages {
messages,
rollback_messages,
finished_ids,
} = store.load_messages().await.unwrap();
assert_eq!(1, messages.len());
assert!(rollback_messages.is_empty());
assert!(finished.is_empty());
assert!(finished_ids.is_empty());
let msg = messages.get(&procedure_id).unwrap();
let expect = ProcedureMessage {
type_name: "MockProcedure".to_string(),
Expand All @@ -545,10 +554,14 @@ mod tests {
.unwrap();
store.commit_procedure(procedure_id, 1).await.unwrap();

let (messages, rollback_messages, finished) = store.load_messages().await.unwrap();
let ProcedureMessages {
messages,
rollback_messages,
finished_ids,
} = store.load_messages().await.unwrap();
assert!(messages.is_empty());
assert!(rollback_messages.is_empty());
assert_eq!(&[procedure_id], &finished[..]);
assert_eq!(&[procedure_id], &finished_ids[..]);
}

#[tokio::test]
Expand Down Expand Up @@ -582,10 +595,14 @@ mod tests {
.await
.unwrap();

let (messages, rollback_messages, finished) = store.load_messages().await.unwrap();
let ProcedureMessages {
messages,
rollback_messages,
finished_ids,
} = store.load_messages().await.unwrap();
assert!(messages.is_empty());
assert_eq!(1, rollback_messages.len());
assert!(finished.is_empty());
assert!(finished_ids.is_empty());
assert!(rollback_messages.contains_key(&procedure_id));
}

Expand All @@ -611,10 +628,14 @@ mod tests {

store.delete_procedure(procedure_id).await.unwrap();

let (messages, rollback_messages, finished) = store.load_messages().await.unwrap();
let ProcedureMessages {
messages,
rollback_messages,
finished_ids,
} = store.load_messages().await.unwrap();
assert!(messages.is_empty());
assert!(rollback_messages.is_empty());
assert!(finished.is_empty());
assert!(finished_ids.is_empty());
}

#[tokio::test]
Expand Down Expand Up @@ -642,10 +663,14 @@ mod tests {

store.delete_procedure(procedure_id).await.unwrap();

let (messages, rollback_messages, finished) = store.load_messages().await.unwrap();
let ProcedureMessages {
messages,
rollback_messages,
finished_ids,
} = store.load_messages().await.unwrap();
assert!(messages.is_empty());
assert!(rollback_messages.is_empty());
assert!(finished.is_empty());
assert!(finished_ids.is_empty());
}

#[tokio::test]
Expand Down Expand Up @@ -705,10 +730,14 @@ mod tests {
.await
.unwrap();

let (messages, rollback_messages, finished) = store.load_messages().await.unwrap();
let ProcedureMessages {
messages,
rollback_messages,
finished_ids,
} = store.load_messages().await.unwrap();
assert_eq!(2, messages.len());
assert!(rollback_messages.is_empty());
assert_eq!(1, finished.len());
assert_eq!(1, finished_ids.len());

let msg = messages.get(&id0).unwrap();
assert_eq!("id0-2", msg.data);
Expand Down

0 comments on commit 0aa523c

Please sign in to comment.