Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: change EXPIRE WHEN to EXPIRE AFTER #4002

Merged
merged 8 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "3cd71167ee067c5679a7fb17cf58bdfbb5487a0d" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "902f75fdd170c572e90b1f640161d90995f20218" }
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/cache/flow/table_flownode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ mod tests {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
flow_name: "my_flow".to_string(),
raw_sql: "sql".to_string(),
expire_when: "expire".to_string(),
expire_after: Some(300),
comment: "comment".to_string(),
options: Default::default(),
},
Expand Down
7 changes: 4 additions & 3 deletions src/common/meta/src/ddl/create_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::collections::BTreeMap;

use api::v1::flow::flow_request::Body as PbFlowRequest;
use api::v1::flow::{CreateRequest, FlowRequest, FlowRequestHeader};
use api::v1::ExpireAfter;
use async_trait::async_trait;
use common_catalog::format_full_flow_name;
use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
Expand Down Expand Up @@ -283,7 +284,7 @@ impl From<&CreateFlowData> for CreateRequest {
sink_table_name: Some(value.task.sink_table_name.clone().into()),
// Always be true
create_if_not_exists: true,
expire_when: value.task.expire_when.clone(),
expire_after: value.task.expire_after.map(|value| ExpireAfter { value }),
comment: value.task.comment.clone(),
sql: value.task.sql.clone(),
flow_options: value.task.flow_options.clone(),
Expand All @@ -297,7 +298,7 @@ impl From<&CreateFlowData> for FlowInfoValue {
catalog_name,
flow_name,
sink_table_name,
expire_when,
expire_after,
comment,
sql,
flow_options: options,
Expand All @@ -318,7 +319,7 @@ impl From<&CreateFlowData> for FlowInfoValue {
catalog_name,
flow_name,
raw_sql: sql,
expire_when,
expire_after,
comment,
options,
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/tests/create_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub(crate) fn test_create_flow_task(
sink_table_name,
or_replace: false,
create_if_not_exists,
expire_when: "".to_string(),
expire_after: Some(300),
comment: "".to_string(),
sql: "raw_sql".to_string(),
flow_options: Default::default(),
Expand Down
4 changes: 2 additions & 2 deletions src/common/meta/src/key/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ mod tests {
sink_table_name,
flownode_ids,
raw_sql: "raw".to_string(),
expire_when: "expr".to_string(),
expire_after: Some(300),
comment: "hi".to_string(),
options: Default::default(),
}
Expand Down Expand Up @@ -420,7 +420,7 @@ mod tests {
sink_table_name: another_sink_table_name,
flownode_ids: [(0, 1u64)].into(),
raw_sql: "raw".to_string(),
expire_when: "expr".to_string(),
expire_after: Some(300),
comment: "hi".to_string(),
options: Default::default(),
};
Expand Down
3 changes: 2 additions & 1 deletion src/common/meta/src/key/flow/flow_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ pub struct FlowInfoValue {
/// The raw sql.
pub(crate) raw_sql: String,
/// The expr of expire.
pub(crate) expire_when: String,
/// Duration in second as `i64`.
waynexia marked this conversation as resolved.
Show resolved Hide resolved
pub(crate) expire_after: Option<i64>,
/// The comment.
pub(crate) comment: String,
/// The options.
Expand Down
15 changes: 8 additions & 7 deletions src/common/meta/src/rpc/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ use api::v1::meta::{
};
use api::v1::{
AlterExpr, CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr,
DropDatabaseExpr, DropFlowExpr, DropTableExpr, DropViewExpr, QueryContext as PbQueryContext,
TruncateTableExpr,
DropDatabaseExpr, DropFlowExpr, DropTableExpr, DropViewExpr, ExpireAfter,
QueryContext as PbQueryContext, TruncateTableExpr,
};
use base64::engine::general_purpose;
use base64::Engine as _;
Expand Down Expand Up @@ -898,7 +898,8 @@ pub struct CreateFlowTask {
pub sink_table_name: TableName,
pub or_replace: bool,
pub create_if_not_exists: bool,
pub expire_when: String,
/// Duration in seconds. Data older than this duration will not be used.
pub expire_after: Option<i64>,
pub comment: String,
pub sql: String,
pub flow_options: HashMap<String, String>,
Expand All @@ -915,7 +916,7 @@ impl TryFrom<PbCreateFlowTask> for CreateFlowTask {
sink_table_name,
or_replace,
create_if_not_exists,
expire_when,
expire_after,
comment,
sql,
flow_options,
Expand All @@ -934,7 +935,7 @@ impl TryFrom<PbCreateFlowTask> for CreateFlowTask {
.into(),
or_replace,
create_if_not_exists,
expire_when,
expire_after: expire_after.map(|e| e.value),
comment,
sql,
flow_options,
Expand All @@ -951,7 +952,7 @@ impl From<CreateFlowTask> for PbCreateFlowTask {
sink_table_name,
or_replace,
create_if_not_exists,
expire_when,
expire_after,
comment,
sql,
flow_options,
Expand All @@ -965,7 +966,7 @@ impl From<CreateFlowTask> for PbCreateFlowTask {
sink_table_name: Some(sink_table_name.into()),
or_replace,
create_if_not_exists,
expire_when,
expire_after: expire_after.map(|value| ExpireAfter { value }),
comment,
sql,
flow_options,
Expand Down
29 changes: 6 additions & 23 deletions src/flow/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ use tokio::sync::{oneshot, watch, Mutex, RwLock};

use crate::adapter::error::{ExternalSnafu, TableNotFoundSnafu, UnexpectedSnafu};
pub(crate) use crate::adapter::node_context::FlownodeContext;
use crate::adapter::parse_expr::parse_fixed;
use crate::adapter::table_source::TableSource;
use crate::adapter::util::column_schemas_to_proto;
use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
Expand Down Expand Up @@ -565,22 +564,22 @@ impl FlownodeManager {
/// Return task id if a new task is created, otherwise return None
///
/// steps to create task:
/// 1. parse query into typed plan(and optional parse expire_when expr)
/// 1. parse query into typed plan(and optional parse expire_after expr)
/// 2. render source/sink with output table id and used input table id
#[allow(clippy::too_many_arguments)]
pub async fn create_flow(
&self,
flow_id: FlowId,
sink_table_name: TableName,
source_table_ids: &[TableId],
create_if_not_exist: bool,
expire_when: Option<String>,
create_if_not_exists: bool,
expire_after: Option<i64>,
comment: Option<String>,
sql: String,
flow_options: HashMap<String, String>,
query_ctx: Option<QueryContext>,
) -> Result<Option<FlowId>, Error> {
if create_if_not_exist {
if create_if_not_exists {
// check if the task already exists
for handle in self.worker_handles.iter() {
if handle.lock().await.contains_flow(flow_id).await? {
Expand Down Expand Up @@ -608,22 +607,6 @@ impl FlownodeManager {
debug!("Flow {:?}'s Plan is {:?}", flow_id, flow_plan);
node_ctx.assign_table_schema(&sink_table_name, flow_plan.typ.clone())?;

let expire_when = expire_when
.and_then(|s| {
if s.is_empty() || s.split_whitespace().join("").is_empty() {
None
} else {
Some(s)
}
})
.map(|d| {
let d = d.as_ref();
parse_fixed(d)
.map(|(_, n)| n)
.map_err(|err| err.to_string())
})
.transpose()
.map_err(|err| UnexpectedSnafu { reason: err }.build())?;
let _ = comment;
let _ = flow_options;

Expand Down Expand Up @@ -656,8 +639,8 @@ impl FlownodeManager {
sink_sender,
source_ids,
src_recvs: source_receivers,
expire_when,
create_if_not_exist,
expire_after,
create_if_not_exists,
err_collector,
};
handle.create_flow(create_request).await?;
Expand Down
5 changes: 3 additions & 2 deletions src/flow/src/adapter/flownode_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl Flownode for FlownodeManager {
source_table_ids,
sink_table_name: Some(sink_table_name),
create_if_not_exists,
expire_when,
expire_after,
comment,
sql,
flow_options,
Expand All @@ -56,13 +56,14 @@ impl Flownode for FlownodeManager {
sink_table_name.schema_name,
sink_table_name.table_name,
];
let expire_after = expire_after.map(|e| e.value);
let ret = self
.create_flow(
task_id.id as u64,
sink_table_name,
&source_table_ids,
create_if_not_exists,
Some(expire_when),
expire_after,
Some(comment),
sql,
flow_options,
Expand Down
24 changes: 12 additions & 12 deletions src/flow/src/adapter/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,12 +232,12 @@ impl<'s> Worker<'s> {
source_ids: &[GlobalId],
src_recvs: Vec<broadcast::Receiver<DiffRow>>,
// TODO(discord9): set expire duration for all arrangement and compare to sys timestamp instead
expire_when: Option<repr::Duration>,
create_if_not_exist: bool,
expire_after: Option<repr::Duration>,
create_if_not_exists: bool,
err_collector: ErrCollector,
) -> Result<Option<FlowId>, Error> {
let already_exist = self.task_states.contains_key(&flow_id);
waynexia marked this conversation as resolved.
Show resolved Hide resolved
match (already_exist, create_if_not_exist) {
match (already_exist, create_if_not_exists) {
waynexia marked this conversation as resolved.
Show resolved Hide resolved
(true, true) => return Ok(None),
(true, false) => FlowAlreadyExistSnafu { id: flow_id }.fail()?,
(false, _) => (),
Expand All @@ -247,7 +247,7 @@ impl<'s> Worker<'s> {
err_collector,
..Default::default()
};
cur_task_state.state.set_expire_after(expire_when);
cur_task_state.state.set_expire_after(expire_after);

{
let mut ctx = cur_task_state.new_ctx(sink_id);
Expand Down Expand Up @@ -319,8 +319,8 @@ impl<'s> Worker<'s> {
sink_sender,
source_ids,
src_recvs,
expire_when,
create_if_not_exist,
expire_after,
create_if_not_exists,
err_collector,
} => {
let task_create_result = self.create_flow(
Expand All @@ -330,8 +330,8 @@ impl<'s> Worker<'s> {
sink_sender,
&source_ids,
src_recvs,
expire_when,
create_if_not_exist,
expire_after,
create_if_not_exists,
err_collector,
);
Some((
Expand Down Expand Up @@ -368,8 +368,8 @@ pub enum Request {
sink_sender: mpsc::UnboundedSender<DiffRow>,
source_ids: Vec<GlobalId>,
src_recvs: Vec<broadcast::Receiver<DiffRow>>,
expire_when: Option<repr::Duration>,
create_if_not_exist: bool,
expire_after: Option<repr::Duration>,
create_if_not_exists: bool,
err_collector: ErrCollector,
},
Remove {
Expand Down Expand Up @@ -524,8 +524,8 @@ mod test {
sink_sender: sink_tx,
source_ids: src_ids,
src_recvs: vec![rx],
expire_when: None,
create_if_not_exist: true,
expire_after: None,
create_if_not_exists: true,
err_collector: ErrCollector::default(),
};
handle.create_flow(create_reqs).await.unwrap();
Expand Down
7 changes: 2 additions & 5 deletions src/operator/src/expr_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use api::v1::alter_expr::Kind;
use api::v1::{
AddColumn, AddColumns, AlterExpr, ChangeColumnType, ChangeColumnTypes, Column, ColumnDataType,
ColumnDataTypeExtension, CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropColumn,
DropColumns, RenameTable, SemanticType, TableName,
DropColumns, ExpireAfter, RenameTable, SemanticType, TableName,
};
use common_error::ext::BoxedError;
use common_grpc_expr::util::ColumnExpr;
Expand Down Expand Up @@ -591,10 +591,7 @@ pub fn to_create_flow_task_expr(
sink_table_name: Some(sink_table_name),
or_replace: create_flow.or_replace,
create_if_not_exists: create_flow.if_not_exists,
expire_when: create_flow
.expire_when
.map(|e| e.to_string())
.unwrap_or_default(),
expire_after: create_flow.expire_after.map(|value| ExpireAfter { value }),
comment: create_flow.comment.unwrap_or_default(),
sql: create_flow.query.to_string(),
flow_options: HashMap::new(),
Expand Down
27 changes: 27 additions & 0 deletions src/sql/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datafusion_common::DataFusionError;
use datatypes::prelude::{ConcreteDataType, Value};
use snafu::{Location, Snafu};
use sqlparser::ast::Ident;
Expand Down Expand Up @@ -123,6 +124,13 @@ pub enum Error {
#[snafu(display("Invalid database name: {}", name))]
InvalidDatabaseName { name: String },

#[snafu(display("Invalid interval provided: {}", reason))]
InvalidInterval {
reason: String,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Unrecognized database option key: {}", key))]
InvalidDatabaseOption {
key: String,
Expand Down Expand Up @@ -214,6 +222,22 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to convert to logical TQL expression"))]
ConvertToLogicalExpression {
#[snafu(source)]
error: DataFusionError,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to simplify TQL expression"))]
Simplification {
#[snafu(source)]
error: DataFusionError,
#[snafu(implicit)]
location: Location,
},

#[snafu(display(
"Permission denied while operating catalog {} from current catalog {}",
target,
Expand Down Expand Up @@ -254,6 +278,9 @@ impl ErrorExt for Error {
| TimestampOverflow { .. }
| InvalidTableOption { .. }
| InvalidCast { .. }
| ConvertToLogicalExpression { .. }
| Simplification { .. }
| InvalidInterval { .. }
| PermissionDenied { .. } => StatusCode::InvalidArguments,

SerializeColumnDefaultConstraint { source, .. } => source.status_code(),
Expand Down
1 change: 1 addition & 0 deletions src/sql/src/parsers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ pub(crate) mod set_var_parser;
pub(crate) mod show_parser;
pub(crate) mod tql_parser;
pub(crate) mod truncate_parser;
pub(crate) mod utils;
Loading
Loading