From b2b0073e8a6dce9c096399cc63f0c985651858ae Mon Sep 17 00:00:00 2001 From: WenyXu Date: Sun, 28 Apr 2024 13:36:30 +0000 Subject: [PATCH] refactor: rename task suffix --- src/api/src/helper.rs | 2 +- src/cmd/src/standalone.rs | 4 +- src/common/meta/src/ddl.rs | 4 +- src/common/meta/src/ddl/create_flow.rs | 12 +- src/common/meta/src/key.rs | 2 +- src/common/meta/src/key/flow.rs | 132 +++++++++--------- src/common/meta/src/key/flow/flow_info.rs | 29 ++-- src/common/meta/src/key/flow/flownode_flow.rs | 4 +- src/common/meta/src/key/flow/table_flow.rs | 4 +- src/common/meta/src/lock_key.rs | 4 +- src/common/meta/src/metrics.rs | 4 +- 11 files changed, 98 insertions(+), 103 deletions(-) diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index 3f74d2ccc77c..f8d13e00250d 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -519,7 +519,7 @@ fn ddl_request_type(request: &DdlRequest) -> &'static str { Some(Expr::DropTable(_)) => "ddl.drop_table", Some(Expr::TruncateTable(_)) => "ddl.truncate_table", Some(Expr::CreateFlowTask(_)) => "ddl.create_flow", - Some(Expr::DropFlowTask(_)) => "ddl.drop_flow_task", + Some(Expr::DropFlowTask(_)) => "ddl.drop_flow", None => "ddl.empty", } } diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index c535e2e24ec2..695e55516d99 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -25,7 +25,7 @@ use common_meta::ddl::flow_meta::{FlowMetadataAllocator, FlowMetadataAllocatorRe use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef}; use common_meta::ddl::{DdlContext, ProcedureExecutorRef}; use common_meta::ddl_manager::DdlManager; -use common_meta::key::flow::{FlowMetadataManager, FlowTaskMetadataManagerRef}; +use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef}; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; use common_meta::node_manager::NodeManagerRef; @@ -480,7 +480,7 @@ impl StartCommand { cache_invalidator: CacheInvalidatorRef, table_metadata_manager: TableMetadataManagerRef, table_metadata_allocator: TableMetadataAllocatorRef, - flow_metadata_manager: FlowTaskMetadataManagerRef, + flow_metadata_manager: FlowMetadataManagerRef, flow_metadata_allocator: FlowMetadataAllocatorRef, ) -> Result { let procedure_executor: ProcedureExecutorRef = Arc::new( diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index e8f86f1ce281..bc4563b2f567 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -22,7 +22,7 @@ use crate::cache_invalidator::CacheInvalidatorRef; use crate::ddl::flow_meta::FlowMetadataAllocatorRef; use crate::ddl::table_meta::TableMetadataAllocatorRef; use crate::error::Result; -use crate::key::flow::FlowTaskMetadataManagerRef; +use crate::key::flow::FlowMetadataManagerRef; use crate::key::table_route::PhysicalTableRouteValue; use crate::key::TableMetadataManagerRef; use crate::node_manager::NodeManagerRef; @@ -111,7 +111,7 @@ pub struct DdlContext { /// Allocator for table metadata. pub table_metadata_allocator: TableMetadataAllocatorRef, /// Flow metadata manager. - pub flow_metadata_manager: FlowTaskMetadataManagerRef, + pub flow_metadata_manager: FlowMetadataManagerRef, /// Allocator for flow metadata. pub flow_metadata_allocator: FlowMetadataAllocatorRef, } diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs index 40a2b12afce3..663f547b71a7 100644 --- a/src/common/meta/src/ddl/create_flow.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -84,13 +84,13 @@ impl CreateFlowProcedure { async fn on_flownode_create_flow(&mut self) -> Result { // Safety: must be allocated. - let mut create_flow_task = Vec::with_capacity(self.data.peers.len()); + let mut create_flow = Vec::with_capacity(self.data.peers.len()); for peer in &self.data.peers { let requester = self.context.node_manager.flownode(peer).await; let request = FlowRequest { body: Some(PbFlowRequest::Create(self.data.to_create_flow_request())), }; - create_flow_task.push(async move { + create_flow.push(async move { requester .handle(request) .await @@ -98,7 +98,7 @@ impl CreateFlowProcedure { }); } - join_all(create_flow_task) + join_all(create_flow) .await .into_iter() .collect::>>()?; @@ -117,7 +117,7 @@ impl CreateFlowProcedure { // TODO(weny): Support `or_replace`. self.context .flow_metadata_manager - .create_flow_task_metadata(flow_id, self.data.to_flow_task_info_value()) + .create_flow_metadata(flow_id, self.data.to_flow_info_value()) .await?; info!("Created flow metadata for flow {flow_id}"); Ok(Status::done_with_output(flow_id)) @@ -133,7 +133,7 @@ impl Procedure for CreateFlowProcedure { async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { let state = &self.data.state; - let _timer = metrics::METRIC_META_PROCEDURE_CREATE_FLOW_TASK + let _timer = metrics::METRIC_META_PROCEDURE_CREATE_FLOW .with_label_values(&[state.as_ref()]) .start_timer(); @@ -225,7 +225,7 @@ impl CreateFlowTaskData { } /// Converts to [FlowTaskValue]. - fn to_flow_task_info_value(&self) -> FlowTaskValue { + fn to_flow_info_value(&self) -> FlowTaskValue { let CreateFlowTask { catalog_name, flow_name, diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 4bdbb48797be..bc5410120955 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -54,7 +54,7 @@ //! //! To simplify the managers used in struct fields and function parameters, we define "unify" //! table metadata manager: [TableMetadataManager] -//! and flow metadata manager: [FlowTaskMetadataManager](crate::key::flow_task::FlowTaskMetadataManager). +//! and flow metadata manager: [FlowTaskMetadataManager](crate::key::flow::FlowMetadataManager). //! It contains all the managers defined above. It's recommended to just use this manager only. //! //! The whole picture of flow keys will be like this: diff --git a/src/common/meta/src/key/flow.rs b/src/common/meta/src/key/flow.rs index dad2517e390f..5cdeecb6c329 100644 --- a/src/common/meta/src/key/flow.rs +++ b/src/common/meta/src/key/flow.rs @@ -28,8 +28,8 @@ use crate::ensure_values; use crate::error::{self, Result}; use crate::key::flow::flow_info::FlowTaskManager; use crate::key::flow::flow_name::FlowNameManager; -use crate::key::flow::flownode_flow::FlownodeTaskManager; -use crate::key::flow::table_flow::TableTaskManager; +use crate::key::flow::flownode_flow::FlownodeFlowManager; +use crate::key::flow::table_flow::TableFlowManager; use crate::key::scope::MetaKey; use crate::key::txn_helper::TxnOpGetResponseSet; use crate::key::FlowTaskId; @@ -83,17 +83,17 @@ impl> MetaKey> for FlowTaskScoped { } } -pub type FlowTaskMetadataManagerRef = Arc; +pub type FlowMetadataManagerRef = Arc; /// The manager of metadata, provides ability to: /// - Create metadata of the task. /// - Retrieve metadata of the task. /// - Delete metadata of the task. pub struct FlowMetadataManager { - flow_task_info_manager: FlowTaskManager, - flownode_task_manager: FlownodeTaskManager, - table_task_manager: TableTaskManager, - flow_flow_name_manager: FlowNameManager, + flow_info_manager: FlowTaskManager, + flownode_flow_manager: FlownodeFlowManager, + table_flow_manager: TableFlowManager, + flow_name_manager: FlowNameManager, kv_backend: KvBackendRef, } @@ -101,74 +101,73 @@ impl FlowMetadataManager { /// Returns a new [FlowTaskMetadataManager]. pub fn new(kv_backend: KvBackendRef) -> Self { Self { - flow_task_info_manager: FlowTaskManager::new(kv_backend.clone()), - flow_flow_name_manager: FlowNameManager::new(kv_backend.clone()), - flownode_task_manager: FlownodeTaskManager::new(kv_backend.clone()), - table_task_manager: TableTaskManager::new(kv_backend.clone()), + flow_info_manager: FlowTaskManager::new(kv_backend.clone()), + flow_name_manager: FlowNameManager::new(kv_backend.clone()), + flownode_flow_manager: FlownodeFlowManager::new(kv_backend.clone()), + table_flow_manager: TableFlowManager::new(kv_backend.clone()), kv_backend, } } /// Returns the [FlowNameManager]. pub fn flow_name_manager(&self) -> &FlowNameManager { - &self.flow_flow_name_manager + &self.flow_name_manager } /// Returns the [FlowTaskManager]. - pub fn flow_task_info_manager(&self) -> &FlowTaskManager { - &self.flow_task_info_manager + pub fn flow_info_manager(&self) -> &FlowTaskManager { + &self.flow_info_manager } - /// Returns the [FlownodeTaskManager]. - pub fn flownode_task_manager(&self) -> &FlownodeTaskManager { - &self.flownode_task_manager + /// Returns the [FlownodeFlowManager]. + pub fn flownode_flow_manager(&self) -> &FlownodeFlowManager { + &self.flownode_flow_manager } - /// Returns the [TableTaskManager]. - pub fn table_task_manager(&self) -> &TableTaskManager { - &self.table_task_manager + /// Returns the [TableFlowManager]. + pub fn table_flow_manager(&self) -> &TableFlowManager { + &self.table_flow_manager } - /// Creates metadata for task and returns an error if different metadata exists. - pub async fn create_flow_task_metadata( + /// Creates metadata for flow and returns an error if different metadata exists. + pub async fn create_flow_metadata( &self, flow_id: FlowTaskId, - flow_task_value: FlowTaskValue, + flow_value: FlowTaskValue, ) -> Result<()> { - let (create_flow_flow_name_txn, on_create_flow_flow_name_failure) = - self.flow_flow_name_manager.build_create_txn( - &flow_task_value.catalog_name, - &flow_task_value.flow_name, - flow_id, - )?; - - let (create_flow_task_txn, on_create_flow_task_failure) = self - .flow_task_info_manager - .build_create_txn(&flow_task_value.catalog_name, flow_id, &flow_task_value)?; - - let create_flownode_task_txn = self.flownode_task_manager.build_create_txn( - &flow_task_value.catalog_name, + let (create_flow_flow_name_txn, on_create_flow_flow_name_failure) = self + .flow_name_manager + .build_create_txn(&flow_value.catalog_name, &flow_value.flow_name, flow_id)?; + + let (create_flow_txn, on_create_flow_failure) = self.flow_info_manager.build_create_txn( + &flow_value.catalog_name, + flow_id, + &flow_value, + )?; + + let create_flownode_task_txn = self.flownode_flow_manager.build_create_txn( + &flow_value.catalog_name, flow_id, - flow_task_value.flownode_ids().clone(), + flow_value.flownode_ids().clone(), ); - let create_table_task_txn = self.table_task_manager.build_create_txn( - &flow_task_value.catalog_name, + let create_table_task_txn = self.table_flow_manager.build_create_txn( + &flow_value.catalog_name, flow_id, - flow_task_value.flownode_ids().clone(), - flow_task_value.source_table_ids(), + flow_value.flownode_ids().clone(), + flow_value.source_table_ids(), ); let txn = Txn::merge_all(vec![ create_flow_flow_name_txn, - create_flow_task_txn, + create_flow_txn, create_flownode_task_txn, create_table_task_txn, ]); info!( "Creating flow {}.{}({}), with {} txn operations", - flow_task_value.catalog_name, - flow_task_value.flow_name, + flow_value.catalog_name, + flow_value.flow_name, flow_id, txn.max_operations() ); @@ -188,29 +187,26 @@ impl FlowMetadataManager { if remote_flow_flow_name.flow_id() != flow_id { info!( "Trying to create flow {}.{}({}), but flow({}) already exists", - flow_task_value.catalog_name, - flow_task_value.flow_name, + flow_value.catalog_name, + flow_value.flow_name, flow_id, remote_flow_flow_name.flow_id() ); return error::TaskAlreadyExistsSnafu { - flow_name: format!( - "{}.{}", - flow_task_value.catalog_name, flow_task_value.flow_name - ), + flow_name: format!("{}.{}", flow_value.catalog_name, flow_value.flow_name), } .fail(); } - let remote_flow_task = - on_create_flow_task_failure(&mut set)?.with_context(|| error::UnexpectedSnafu { + let remote_flow = + on_create_flow_failure(&mut set)?.with_context(|| error::UnexpectedSnafu { err_msg: format!( "Reads the empty flow during the creating flow, flow_id: {flow_id}" ), })?; let op_name = "creating flow"; - ensure_values!(*remote_flow_task, flow_task_value, op_name); + ensure_values!(*remote_flow, flow_value, op_name); } Ok(()) @@ -284,7 +280,7 @@ mod tests { schema_name: "my_schema".to_string(), table_name: "sink_table".to_string(), }; - let flow_task_value = FlowTaskValue { + let flow_value = FlowTaskValue { catalog_name: catalog_name.to_string(), flow_name: "task".to_string(), source_table_ids: vec![1024, 1025, 1026], @@ -296,23 +292,23 @@ mod tests { options: Default::default(), }; flow_metadata_manager - .create_flow_task_metadata(task_id, flow_task_value.clone()) + .create_flow_metadata(task_id, flow_value.clone()) .await .unwrap(); // Creates again. flow_metadata_manager - .create_flow_task_metadata(task_id, flow_task_value.clone()) + .create_flow_metadata(task_id, flow_value.clone()) .await .unwrap(); let got = flow_metadata_manager - .flow_task_info_manager() + .flow_info_manager() .get(catalog_name, task_id) .await .unwrap() .unwrap(); - assert_eq!(got, flow_task_value); + assert_eq!(got, flow_value); let tasks = flow_metadata_manager - .flownode_task_manager() + .flownode_flow_manager() .tasks(catalog_name, 1) .try_collect::>() .await @@ -320,7 +316,7 @@ mod tests { assert_eq!(tasks, vec![(task_id, 0)]); for table_id in [1024, 1025, 1026] { let nodes = flow_metadata_manager - .table_task_manager() + .table_flow_manager() .nodes(catalog_name, table_id) .try_collect::>() .await @@ -349,7 +345,7 @@ mod tests { schema_name: "my_schema".to_string(), table_name: "sink_table".to_string(), }; - let flow_task_value = FlowTaskValue { + let flow_value = FlowTaskValue { catalog_name: "greptime".to_string(), flow_name: "task".to_string(), source_table_ids: vec![1024, 1025, 1026], @@ -361,11 +357,11 @@ mod tests { options: Default::default(), }; flow_metadata_manager - .create_flow_task_metadata(task_id, flow_task_value.clone()) + .create_flow_metadata(task_id, flow_value.clone()) .await .unwrap(); // Creates again. - let flow_task_value = FlowTaskValue { + let flow_value = FlowTaskValue { catalog_name: catalog_name.to_string(), flow_name: "task".to_string(), source_table_ids: vec![1024, 1025, 1026], @@ -377,7 +373,7 @@ mod tests { options: Default::default(), }; let err = flow_metadata_manager - .create_flow_task_metadata(task_id + 1, flow_task_value) + .create_flow_metadata(task_id + 1, flow_value) .await .unwrap_err(); assert_matches!(err, error::Error::TaskAlreadyExists { .. }); @@ -394,7 +390,7 @@ mod tests { schema_name: "my_schema".to_string(), table_name: "sink_table".to_string(), }; - let flow_task_value = FlowTaskValue { + let flow_value = FlowTaskValue { catalog_name: "greptime".to_string(), flow_name: "task".to_string(), source_table_ids: vec![1024, 1025, 1026], @@ -406,7 +402,7 @@ mod tests { options: Default::default(), }; flow_metadata_manager - .create_flow_task_metadata(task_id, flow_task_value.clone()) + .create_flow_metadata(task_id, flow_value.clone()) .await .unwrap(); // Creates again. @@ -415,7 +411,7 @@ mod tests { schema_name: "my_schema".to_string(), table_name: "another_sink_table".to_string(), }; - let flow_task_value = FlowTaskValue { + let flow_value = FlowTaskValue { catalog_name: "greptime".to_string(), flow_name: "task".to_string(), source_table_ids: vec![1024, 1025, 1026], @@ -427,7 +423,7 @@ mod tests { options: Default::default(), }; let err = flow_metadata_manager - .create_flow_task_metadata(task_id, flow_task_value) + .create_flow_metadata(task_id, flow_value) .await .unwrap_err(); assert!(err.to_string().contains("Reads the different value")); diff --git a/src/common/meta/src/key/flow/flow_info.rs b/src/common/meta/src/key/flow/flow_info.rs index 999844d56787..92b81b4c35f7 100644 --- a/src/common/meta/src/key/flow/flow_info.rs +++ b/src/common/meta/src/key/flow/flow_info.rs @@ -42,25 +42,25 @@ lazy_static! { /// The key stores the metadata of the task. /// /// The layout: `__flow/{catalog}/info/{flow_id}`. -pub struct FlowTaskKey(FlowTaskScoped>); +pub struct FlowInfoKey(FlowTaskScoped>); -impl MetaKey for FlowTaskKey { +impl MetaKey for FlowInfoKey { fn to_bytes(&self) -> Vec { self.0.to_bytes() } - fn from_bytes(bytes: &[u8]) -> Result { - Ok(FlowTaskKey(FlowTaskScoped::< + fn from_bytes(bytes: &[u8]) -> Result { + Ok(FlowInfoKey(FlowTaskScoped::< CatalogScoped, >::from_bytes(bytes)?)) } } -impl FlowTaskKey { +impl FlowInfoKey { /// Returns the [FlowTaskKey]. - pub fn new(catalog: String, flow_id: FlowTaskId) -> FlowTaskKey { + pub fn new(catalog: String, flow_id: FlowTaskId) -> FlowInfoKey { let inner = FlowTaskKeyInner::new(flow_id); - FlowTaskKey(FlowTaskScoped::new(CatalogScoped::new(catalog, inner))) + FlowInfoKey(FlowTaskScoped::new(CatalogScoped::new(catalog, inner))) } /// Returns the catalog. @@ -162,7 +162,7 @@ impl FlowTaskManager { /// Returns the [FlowTaskValue] of specified `flow_id`. pub async fn get(&self, catalog: &str, flow_id: FlowTaskId) -> Result> { - let key = FlowTaskKey::new(catalog.to_string(), flow_id).to_bytes(); + let key = FlowInfoKey::new(catalog.to_string(), flow_id).to_bytes(); self.kv_backend .get(&key) .await? @@ -177,16 +177,15 @@ impl FlowTaskManager { &self, catalog: &str, flow_id: FlowTaskId, - flow_task_value: &FlowTaskValue, + flow_value: &FlowTaskValue, ) -> Result<( Txn, impl FnOnce( &mut TxnOpGetResponseSet, ) -> Result>>, )> { - let key = FlowTaskKey::new(catalog.to_string(), flow_id).to_bytes(); - let txn = - txn_helper::build_put_if_absent_txn(key.clone(), flow_task_value.try_as_raw_value()?); + let key = FlowInfoKey::new(catalog.to_string(), flow_id).to_bytes(); + let txn = txn_helper::build_put_if_absent_txn(key.clone(), flow_value.try_as_raw_value()?); Ok(( txn, @@ -201,14 +200,14 @@ mod tests { #[test] fn test_key_serialization() { - let flow_task = FlowTaskKey::new("my_catalog".to_string(), 2); - assert_eq!(b"__flow/my_catalog/info/2".to_vec(), flow_task.to_bytes()); + let flow_info = FlowInfoKey::new("my_catalog".to_string(), 2); + assert_eq!(b"__flow/my_catalog/info/2".to_vec(), flow_info.to_bytes()); } #[test] fn test_key_deserialization() { let bytes = b"__flow/my_catalog/info/2".to_vec(); - let key = FlowTaskKey::from_bytes(&bytes).unwrap(); + let key = FlowInfoKey::from_bytes(&bytes).unwrap(); assert_eq!(key.catalog(), "my_catalog"); assert_eq!(key.flow_id(), 2); } diff --git a/src/common/meta/src/key/flow/flownode_flow.rs b/src/common/meta/src/key/flow/flownode_flow.rs index e3d20650a475..1118399ea954 100644 --- a/src/common/meta/src/key/flow/flownode_flow.rs +++ b/src/common/meta/src/key/flow/flownode_flow.rs @@ -170,7 +170,7 @@ impl MetaKey for FlownodeFlowKeyInner { } /// The manager of [FlownodeFlowKey]. -pub struct FlownodeTaskManager { +pub struct FlownodeFlowManager { kv_backend: KvBackendRef, } @@ -179,7 +179,7 @@ pub fn flownode_task_key_decoder(kv: KeyValue) -> Result { FlownodeFlowKey::from_bytes(&kv.key) } -impl FlownodeTaskManager { +impl FlownodeFlowManager { /// Returns a new [FlownodeTaskManager]. pub fn new(kv_backend: KvBackendRef) -> Self { Self { kv_backend } diff --git a/src/common/meta/src/key/flow/table_flow.rs b/src/common/meta/src/key/flow/table_flow.rs index 05583b21f949..cabda29302f8 100644 --- a/src/common/meta/src/key/flow/table_flow.rs +++ b/src/common/meta/src/key/flow/table_flow.rs @@ -187,11 +187,11 @@ pub fn table_task_decoder(kv: KeyValue) -> Result { } /// The manager of [TableFlowKey]. -pub struct TableTaskManager { +pub struct TableFlowManager { kv_backend: KvBackendRef, } -impl TableTaskManager { +impl TableFlowManager { /// Returns a new [TableTaskManager]. pub fn new(kv_backend: KvBackendRef) -> Self { Self { kv_backend } diff --git a/src/common/meta/src/lock_key.rs b/src/common/meta/src/lock_key.rs index 9203fa5ad8f2..98d870ce2f68 100644 --- a/src/common/meta/src/lock_key.rs +++ b/src/common/meta/src/lock_key.rs @@ -22,7 +22,7 @@ const CATALOG_LOCK_PREFIX: &str = "__catalog_lock"; const SCHEMA_LOCK_PREFIX: &str = "__schema_lock"; const TABLE_LOCK_PREFIX: &str = "__table_lock"; const TABLE_NAME_LOCK_PREFIX: &str = "__table_name_lock"; -const FLOW_TASK_NAME_LOCK_PREFIX: &str = "__flow_name_lock"; +const FLOW_NAME_LOCK_PREFIX: &str = "__flow_name_lock"; const REGION_LOCK_PREFIX: &str = "__region_lock"; /// [CatalogLock] acquires the lock on the tenant level. @@ -134,7 +134,7 @@ impl FlowNameLock { impl Display for FlowNameLock { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let FlowNameLock::Write(name) = self; - write!(f, "{}/{}", FLOW_TASK_NAME_LOCK_PREFIX, name) + write!(f, "{}/{}", FLOW_NAME_LOCK_PREFIX, name) } } diff --git a/src/common/meta/src/metrics.rs b/src/common/meta/src/metrics.rs index a38ebe93341a..34bb95dc0cb7 100644 --- a/src/common/meta/src/metrics.rs +++ b/src/common/meta/src/metrics.rs @@ -39,8 +39,8 @@ lazy_static! { &["step"] ) .unwrap(); - pub static ref METRIC_META_PROCEDURE_CREATE_FLOW_TASK: HistogramVec = register_histogram_vec!( - "greptime_meta_procedure_create_flow_task", + pub static ref METRIC_META_PROCEDURE_CREATE_FLOW: HistogramVec = register_histogram_vec!( + "greptime_meta_procedure_create_flow", "meta procedure create flow", &["step"] )