Skip to content

Commit

Permalink
feat: change EXPIRE WHEN to EXPIRE AFTER (#4002)
Browse files Browse the repository at this point in the history
* feat: change EXPIRE WHEN to EXPIRE AFTER

Signed-off-by: Ruihang Xia <[email protected]>

* change remaining

Signed-off-by: Ruihang Xia <[email protected]>

* rename create_if_not_exist to create_if_not_exists

Signed-off-by: Ruihang Xia <[email protected]>

* parse interval expr

Signed-off-by: Ruihang Xia <[email protected]>

* update comment

Signed-off-by: Ruihang Xia <[email protected]>

* Apply suggestions from code review

Co-authored-by: Jeremyhi <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
Co-authored-by: Jeremyhi <[email protected]>
  • Loading branch information
waynexia and fengjiachun authored May 27, 2024
1 parent 389ded9 commit 1de17ae
Show file tree
Hide file tree
Showing 19 changed files with 244 additions and 231 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "3cd71167ee067c5679a7fb17cf58bdfbb5487a0d" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "902f75fdd170c572e90b1f640161d90995f20218" }
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/cache/flow/table_flownode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ mod tests {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
flow_name: "my_flow".to_string(),
raw_sql: "sql".to_string(),
expire_when: "expire".to_string(),
expire_after: Some(300),
comment: "comment".to_string(),
options: Default::default(),
},
Expand Down
7 changes: 4 additions & 3 deletions src/common/meta/src/ddl/create_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::collections::BTreeMap;

use api::v1::flow::flow_request::Body as PbFlowRequest;
use api::v1::flow::{CreateRequest, FlowRequest, FlowRequestHeader};
use api::v1::ExpireAfter;
use async_trait::async_trait;
use common_catalog::format_full_flow_name;
use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
Expand Down Expand Up @@ -283,7 +284,7 @@ impl From<&CreateFlowData> for CreateRequest {
sink_table_name: Some(value.task.sink_table_name.clone().into()),
// Always be true
create_if_not_exists: true,
expire_when: value.task.expire_when.clone(),
expire_after: value.task.expire_after.map(|value| ExpireAfter { value }),
comment: value.task.comment.clone(),
sql: value.task.sql.clone(),
flow_options: value.task.flow_options.clone(),
Expand All @@ -297,7 +298,7 @@ impl From<&CreateFlowData> for FlowInfoValue {
catalog_name,
flow_name,
sink_table_name,
expire_when,
expire_after,
comment,
sql,
flow_options: options,
Expand All @@ -318,7 +319,7 @@ impl From<&CreateFlowData> for FlowInfoValue {
catalog_name,
flow_name,
raw_sql: sql,
expire_when,
expire_after,
comment,
options,
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/tests/create_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub(crate) fn test_create_flow_task(
sink_table_name,
or_replace: false,
create_if_not_exists,
expire_when: "".to_string(),
expire_after: Some(300),
comment: "".to_string(),
sql: "raw_sql".to_string(),
flow_options: Default::default(),
Expand Down
4 changes: 2 additions & 2 deletions src/common/meta/src/key/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ mod tests {
sink_table_name,
flownode_ids,
raw_sql: "raw".to_string(),
expire_when: "expr".to_string(),
expire_after: Some(300),
comment: "hi".to_string(),
options: Default::default(),
}
Expand Down Expand Up @@ -420,7 +420,7 @@ mod tests {
sink_table_name: another_sink_table_name,
flownode_ids: [(0, 1u64)].into(),
raw_sql: "raw".to_string(),
expire_when: "expr".to_string(),
expire_after: Some(300),
comment: "hi".to_string(),
options: Default::default(),
};
Expand Down
3 changes: 2 additions & 1 deletion src/common/meta/src/key/flow/flow_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ pub struct FlowInfoValue {
/// The raw sql.
pub(crate) raw_sql: String,
/// The expr of expire.
pub(crate) expire_when: String,
/// Duration in seconds as `i64`.
pub(crate) expire_after: Option<i64>,
/// The comment.
pub(crate) comment: String,
/// The options.
Expand Down
15 changes: 8 additions & 7 deletions src/common/meta/src/rpc/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ use api::v1::meta::{
};
use api::v1::{
AlterExpr, CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr,
DropDatabaseExpr, DropFlowExpr, DropTableExpr, DropViewExpr, QueryContext as PbQueryContext,
TruncateTableExpr,
DropDatabaseExpr, DropFlowExpr, DropTableExpr, DropViewExpr, ExpireAfter,
QueryContext as PbQueryContext, TruncateTableExpr,
};
use base64::engine::general_purpose;
use base64::Engine as _;
Expand Down Expand Up @@ -898,7 +898,8 @@ pub struct CreateFlowTask {
pub sink_table_name: TableName,
pub or_replace: bool,
pub create_if_not_exists: bool,
pub expire_when: String,
/// Duration in seconds. Data older than this duration will not be used.
pub expire_after: Option<i64>,
pub comment: String,
pub sql: String,
pub flow_options: HashMap<String, String>,
Expand All @@ -915,7 +916,7 @@ impl TryFrom<PbCreateFlowTask> for CreateFlowTask {
sink_table_name,
or_replace,
create_if_not_exists,
expire_when,
expire_after,
comment,
sql,
flow_options,
Expand All @@ -934,7 +935,7 @@ impl TryFrom<PbCreateFlowTask> for CreateFlowTask {
.into(),
or_replace,
create_if_not_exists,
expire_when,
expire_after: expire_after.map(|e| e.value),
comment,
sql,
flow_options,
Expand All @@ -951,7 +952,7 @@ impl From<CreateFlowTask> for PbCreateFlowTask {
sink_table_name,
or_replace,
create_if_not_exists,
expire_when,
expire_after,
comment,
sql,
flow_options,
Expand All @@ -965,7 +966,7 @@ impl From<CreateFlowTask> for PbCreateFlowTask {
sink_table_name: Some(sink_table_name.into()),
or_replace,
create_if_not_exists,
expire_when,
expire_after: expire_after.map(|value| ExpireAfter { value }),
comment,
sql,
flow_options,
Expand Down
29 changes: 6 additions & 23 deletions src/flow/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ use tokio::sync::{oneshot, watch, Mutex, RwLock};

use crate::adapter::error::{ExternalSnafu, TableNotFoundSnafu, UnexpectedSnafu};
pub(crate) use crate::adapter::node_context::FlownodeContext;
use crate::adapter::parse_expr::parse_fixed;
use crate::adapter::table_source::TableSource;
use crate::adapter::util::column_schemas_to_proto;
use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
Expand Down Expand Up @@ -565,22 +564,22 @@ impl FlownodeManager {
/// Return task id if a new task is created, otherwise return None
///
/// steps to create task:
/// 1. parse query into typed plan(and optional parse expire_when expr)
/// 1. parse query into typed plan(and optional parse expire_after expr)
/// 2. render source/sink with output table id and used input table id
#[allow(clippy::too_many_arguments)]
pub async fn create_flow(
&self,
flow_id: FlowId,
sink_table_name: TableName,
source_table_ids: &[TableId],
create_if_not_exist: bool,
expire_when: Option<String>,
create_if_not_exists: bool,
expire_after: Option<i64>,
comment: Option<String>,
sql: String,
flow_options: HashMap<String, String>,
query_ctx: Option<QueryContext>,
) -> Result<Option<FlowId>, Error> {
if create_if_not_exist {
if create_if_not_exists {
// check if the task already exists
for handle in self.worker_handles.iter() {
if handle.lock().await.contains_flow(flow_id).await? {
Expand Down Expand Up @@ -608,22 +607,6 @@ impl FlownodeManager {
debug!("Flow {:?}'s Plan is {:?}", flow_id, flow_plan);
node_ctx.assign_table_schema(&sink_table_name, flow_plan.typ.clone())?;

let expire_when = expire_when
.and_then(|s| {
if s.is_empty() || s.split_whitespace().join("").is_empty() {
None
} else {
Some(s)
}
})
.map(|d| {
let d = d.as_ref();
parse_fixed(d)
.map(|(_, n)| n)
.map_err(|err| err.to_string())
})
.transpose()
.map_err(|err| UnexpectedSnafu { reason: err }.build())?;
let _ = comment;
let _ = flow_options;

Expand Down Expand Up @@ -656,8 +639,8 @@ impl FlownodeManager {
sink_sender,
source_ids,
src_recvs: source_receivers,
expire_when,
create_if_not_exist,
expire_after,
create_if_not_exists,
err_collector,
};
handle.create_flow(create_request).await?;
Expand Down
5 changes: 3 additions & 2 deletions src/flow/src/adapter/flownode_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl Flownode for FlownodeManager {
source_table_ids,
sink_table_name: Some(sink_table_name),
create_if_not_exists,
expire_when,
expire_after,
comment,
sql,
flow_options,
Expand All @@ -56,13 +56,14 @@ impl Flownode for FlownodeManager {
sink_table_name.schema_name,
sink_table_name.table_name,
];
let expire_after = expire_after.map(|e| e.value);
let ret = self
.create_flow(
task_id.id as u64,
sink_table_name,
&source_table_ids,
create_if_not_exists,
Some(expire_when),
expire_after,
Some(comment),
sql,
flow_options,
Expand Down
26 changes: 13 additions & 13 deletions src/flow/src/adapter/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,12 +232,12 @@ impl<'s> Worker<'s> {
source_ids: &[GlobalId],
src_recvs: Vec<broadcast::Receiver<DiffRow>>,
// TODO(discord9): set expire duration for all arrangement and compare to sys timestamp instead
expire_when: Option<repr::Duration>,
create_if_not_exist: bool,
expire_after: Option<repr::Duration>,
create_if_not_exists: bool,
err_collector: ErrCollector,
) -> Result<Option<FlowId>, Error> {
let already_exist = self.task_states.contains_key(&flow_id);
match (already_exist, create_if_not_exist) {
let already_exists = self.task_states.contains_key(&flow_id);
match (already_exists, create_if_not_exists) {
(true, true) => return Ok(None),
(true, false) => FlowAlreadyExistSnafu { id: flow_id }.fail()?,
(false, _) => (),
Expand All @@ -247,7 +247,7 @@ impl<'s> Worker<'s> {
err_collector,
..Default::default()
};
cur_task_state.state.set_expire_after(expire_when);
cur_task_state.state.set_expire_after(expire_after);

{
let mut ctx = cur_task_state.new_ctx(sink_id);
Expand Down Expand Up @@ -319,8 +319,8 @@ impl<'s> Worker<'s> {
sink_sender,
source_ids,
src_recvs,
expire_when,
create_if_not_exist,
expire_after,
create_if_not_exists,
err_collector,
} => {
let task_create_result = self.create_flow(
Expand All @@ -330,8 +330,8 @@ impl<'s> Worker<'s> {
sink_sender,
&source_ids,
src_recvs,
expire_when,
create_if_not_exist,
expire_after,
create_if_not_exists,
err_collector,
);
Some((
Expand Down Expand Up @@ -368,8 +368,8 @@ pub enum Request {
sink_sender: mpsc::UnboundedSender<DiffRow>,
source_ids: Vec<GlobalId>,
src_recvs: Vec<broadcast::Receiver<DiffRow>>,
expire_when: Option<repr::Duration>,
create_if_not_exist: bool,
expire_after: Option<repr::Duration>,
create_if_not_exists: bool,
err_collector: ErrCollector,
},
Remove {
Expand Down Expand Up @@ -524,8 +524,8 @@ mod test {
sink_sender: sink_tx,
source_ids: src_ids,
src_recvs: vec![rx],
expire_when: None,
create_if_not_exist: true,
expire_after: None,
create_if_not_exists: true,
err_collector: ErrCollector::default(),
};
handle.create_flow(create_reqs).await.unwrap();
Expand Down
7 changes: 2 additions & 5 deletions src/operator/src/expr_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use api::v1::alter_expr::Kind;
use api::v1::{
AddColumn, AddColumns, AlterExpr, ChangeColumnType, ChangeColumnTypes, Column, ColumnDataType,
ColumnDataTypeExtension, CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropColumn,
DropColumns, RenameTable, SemanticType, TableName,
DropColumns, ExpireAfter, RenameTable, SemanticType, TableName,
};
use common_error::ext::BoxedError;
use common_grpc_expr::util::ColumnExpr;
Expand Down Expand Up @@ -591,10 +591,7 @@ pub fn to_create_flow_task_expr(
sink_table_name: Some(sink_table_name),
or_replace: create_flow.or_replace,
create_if_not_exists: create_flow.if_not_exists,
expire_when: create_flow
.expire_when
.map(|e| e.to_string())
.unwrap_or_default(),
expire_after: create_flow.expire_after.map(|value| ExpireAfter { value }),
comment: create_flow.comment.unwrap_or_default(),
sql: create_flow.query.to_string(),
flow_options: HashMap::new(),
Expand Down
27 changes: 27 additions & 0 deletions src/sql/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datafusion_common::DataFusionError;
use datatypes::prelude::{ConcreteDataType, Value};
use snafu::{Location, Snafu};
use sqlparser::ast::Ident;
Expand Down Expand Up @@ -123,6 +124,13 @@ pub enum Error {
#[snafu(display("Invalid database name: {}", name))]
InvalidDatabaseName { name: String },

#[snafu(display("Invalid interval provided: {}", reason))]
InvalidInterval {
reason: String,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Unrecognized database option key: {}", key))]
InvalidDatabaseOption {
key: String,
Expand Down Expand Up @@ -214,6 +222,22 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to convert to logical TQL expression"))]
ConvertToLogicalExpression {
#[snafu(source)]
error: DataFusionError,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to simplify TQL expression"))]
Simplification {
#[snafu(source)]
error: DataFusionError,
#[snafu(implicit)]
location: Location,
},

#[snafu(display(
"Permission denied while operating catalog {} from current catalog {}",
target,
Expand Down Expand Up @@ -254,6 +278,9 @@ impl ErrorExt for Error {
| TimestampOverflow { .. }
| InvalidTableOption { .. }
| InvalidCast { .. }
| ConvertToLogicalExpression { .. }
| Simplification { .. }
| InvalidInterval { .. }
| PermissionDenied { .. } => StatusCode::InvalidArguments,

SerializeColumnDefaultConstraint { source, .. } => source.status_code(),
Expand Down
Loading

0 comments on commit 1de17ae

Please sign in to comment.