Skip to content

Commit

Permalink
feat: define CreateFlowTask and DropFlowTask (#3801)
Browse files Browse the repository at this point in the history
* chore: bump proto to cefc73f

* feat: add `CreateFlowTask` and `DropFlowTask`

* chore: bump to 87f2b38

* chore: bump to 783682f
  • Loading branch information
WenyXu authored Apr 25, 2024
1 parent 9206f60 commit 1ec5951
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 9 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "73ac0207ab71dfea48f30259ffdb611501b5ecb8" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "783682fabc38c57b5b9d46bdcfeebe2496e85bbb" }
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"
Expand Down
2 changes: 2 additions & 0 deletions src/api/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,8 @@ fn ddl_request_type(request: &DdlRequest) -> &'static str {
Some(Expr::Alter(_)) => "ddl.alter",
Some(Expr::DropTable(_)) => "ddl.drop_table",
Some(Expr::TruncateTable(_)) => "ddl.truncate_table",
Some(Expr::CreateFlowTask(_)) => "ddl.create_flow_task",
Some(Expr::DropFlowTask(_)) => "ddl.drop_flow_task",
None => "ddl.empty",
}
}
Expand Down
136 changes: 131 additions & 5 deletions src/common/meta/src/rpc/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@ use std::result;
use api::v1::meta::ddl_task_request::Task;
use api::v1::meta::{
AlterTableTask as PbAlterTableTask, AlterTableTasks as PbAlterTableTasks,
CreateDatabaseTask as PbCreateDatabaseTask, CreateTableTask as PbCreateTableTask,
CreateTableTasks as PbCreateTableTasks, DdlTaskRequest as PbDdlTaskRequest,
DdlTaskResponse as PbDdlTaskResponse, DropDatabaseTask as PbDropDatabaseTask,
CreateDatabaseTask as PbCreateDatabaseTask, CreateFlowTask as PbCreateFlowTask,
CreateTableTask as PbCreateTableTask, CreateTableTasks as PbCreateTableTasks,
DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse,
DropDatabaseTask as PbDropDatabaseTask, DropFlowTask as PbDropFlowTask,
DropTableTask as PbDropTableTask, DropTableTasks as PbDropTableTasks, Partition, ProcedureId,
TruncateTableTask as PbTruncateTableTask,
};
use api::v1::{
AlterExpr, CreateDatabaseExpr, CreateTableExpr, DropDatabaseExpr, DropTableExpr,
TruncateTableExpr,
AlterExpr, CreateDatabaseExpr, CreateFlowTaskExpr, CreateTableExpr, DropDatabaseExpr,
DropFlowTaskExpr, DropTableExpr, TruncateTableExpr,
};
use base64::engine::general_purpose;
use base64::Engine as _;
Expand Down Expand Up @@ -181,6 +182,8 @@ impl TryFrom<Task> for DdlTask {
Task::DropDatabaseTask(drop_database) => {
Ok(DdlTask::DropDatabase(drop_database.try_into()?))
}
Task::CreateFlowTask(_) => unimplemented!(),
Task::DropFlowTask(_) => unimplemented!(),
}
}
}
Expand Down Expand Up @@ -720,6 +723,129 @@ impl TryFrom<DropDatabaseTask> for PbDropDatabaseTask {
}
}

/// Create flow task
pub struct CreateFlowTask {
pub catalog_name: String,
pub task_name: String,
pub source_table_names: Vec<TableName>,
pub sink_table_name: TableName,
pub or_replace: bool,
pub create_if_not_exists: bool,
pub expire_when: String,
pub comment: String,
pub sql: String,
pub options: HashMap<String, String>,
}

impl TryFrom<PbCreateFlowTask> for CreateFlowTask {
type Error = error::Error;

fn try_from(pb: PbCreateFlowTask) -> Result<Self> {
let CreateFlowTaskExpr {
catalog_name,
task_name,
source_table_names,
sink_table_name,
or_replace,
create_if_not_exists,
expire_when,
comment,
sql,
task_options,
} = pb.create_flow_task.context(error::InvalidProtoMsgSnafu {
err_msg: "expected create_flow_task",
})?;

Ok(CreateFlowTask {
catalog_name,
task_name,
source_table_names: source_table_names.into_iter().map(Into::into).collect(),
sink_table_name: sink_table_name
.context(error::InvalidProtoMsgSnafu {
err_msg: "expected sink_table_name",
})?
.into(),
or_replace,
create_if_not_exists,
expire_when,
comment,
sql,
options: task_options,
})
}
}

impl From<CreateFlowTask> for PbCreateFlowTask {
fn from(
CreateFlowTask {
catalog_name,
task_name,
source_table_names,
sink_table_name,
or_replace,
create_if_not_exists,
expire_when,
comment,
sql,
options,
}: CreateFlowTask,
) -> Self {
PbCreateFlowTask {
create_flow_task: Some(CreateFlowTaskExpr {
catalog_name,
task_name,
source_table_names: source_table_names.into_iter().map(Into::into).collect(),
sink_table_name: Some(sink_table_name.into()),
or_replace,
create_if_not_exists,
expire_when,
comment,
sql,
task_options: options,
}),
}
}
}

/// Drop flow task
pub struct DropFlowTask {
pub catalog_name: String,
pub task_name: String,
}

impl TryFrom<PbDropFlowTask> for DropFlowTask {
type Error = error::Error;

fn try_from(pb: PbDropFlowTask) -> Result<Self> {
let DropFlowTaskExpr {
catalog_name,
task_name,
} = pb.drop_flow_task.context(error::InvalidProtoMsgSnafu {
err_msg: "expected sink_table_name",
})?;
Ok(DropFlowTask {
catalog_name,
task_name,
})
}
}

impl From<DropFlowTask> for PbDropFlowTask {
fn from(
DropFlowTask {
catalog_name,
task_name,
}: DropFlowTask,
) -> Self {
PbDropFlowTask {
drop_flow_task: Some(DropFlowTaskExpr {
catalog_name,
task_name,
}),
}
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/table_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::fmt::{Display, Formatter};

use api::v1::meta::TableName as PbTableName;
use api::v1::TableName as PbTableName;
use serde::{Deserialize, Serialize};
use table::table_reference::TableReference;

Expand Down
17 changes: 16 additions & 1 deletion src/frontend/src/instance/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ impl GrpcQueryHandler for Instance {

match expr {
DdlExpr::CreateTable(mut expr) => {
// TODO(weny): supports to create multiple region table.
let _ = self
.statement_executor
.create_table_inner(&mut expr, None, &ctx)
Expand Down Expand Up @@ -138,6 +137,12 @@ impl GrpcQueryHandler for Instance {
TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name);
self.statement_executor.truncate_table(table_name).await?
}
DdlExpr::CreateFlowTask(_) => {
unimplemented!()
}
DdlExpr::DropFlowTask(_) => {
unimplemented!()
}
}
}
};
Expand Down Expand Up @@ -176,6 +181,16 @@ fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryConte
Expr::TruncateTable(expr) => {
check_and_fill!(expr);
}
Expr::CreateFlowTask(expr) => {
if expr.catalog_name.is_empty() {
expr.catalog_name = catalog.to_string();
}
}
Expr::DropFlowTask(expr) => {
if expr.catalog_name.is_empty() {
expr.catalog_name = catalog.to_string();
}
}
}
}

Expand Down

0 comments on commit 1ec5951

Please sign in to comment.