Skip to content

Commit

Permalink
fix: SQL insertion and column default constraint aware of timezone (#…
Browse files Browse the repository at this point in the history
…3266)

* fix: missing timezone when parsing sql value to greptimedb value

* test: sql_value_to_value

* fix: column default constraint missing timezone

* test: column def default constraint  with timezone

* test: adds sqlness test for default constraint aware of timezone

* fix: typo

* chore: comment
  • Loading branch information
killme2008 authored Jan 30, 2024
1 parent e0e6351 commit 43ef082
Show file tree
Hide file tree
Showing 15 changed files with 563 additions and 73 deletions.
2 changes: 1 addition & 1 deletion src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ pub fn check_permission(
}

fn validate_param(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<()> {
let (catalog, schema, _) = table_idents_to_full_name(name, query_ctx.clone())
let (catalog, schema, _) = table_idents_to_full_name(name, query_ctx)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/instance/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl GrpcQueryHandler for Instance {
// TODO(weny): supports to create multiple region table.
let _ = self
.statement_executor
.create_table_inner(&mut expr, None)
.create_table_inner(&mut expr, None, &ctx)
.await?;
Output::AffectedRows(0)
}
Expand Down
128 changes: 119 additions & 9 deletions src/operator/src/expr_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use api::v1::{
};
use common_error::ext::BoxedError;
use common_grpc_expr::util::ColumnExpr;
use common_time::Timezone;
use datatypes::schema::{ColumnSchema, COMMENT_KEY};
use file_engine::FileOptions;
use query::sql::{
Expand Down Expand Up @@ -122,7 +123,7 @@ pub(crate) async fn create_external_expr(
query_ctx: QueryContextRef,
) -> Result<CreateTableExpr> {
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(&create.name, query_ctx)
table_idents_to_full_name(&create.name, &query_ctx)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;

Expand All @@ -141,7 +142,8 @@ pub(crate) async fn create_external_expr(
// expanded form
let time_index = find_time_index(&create.constraints)?;
let primary_keys = find_primary_keys(&create.columns, &create.constraints)?;
let column_schemas = columns_to_column_schemas(&create.columns, &time_index)?;
let column_schemas =
columns_to_column_schemas(&create.columns, &time_index, Some(&query_ctx.timezone()))?;
(time_index, primary_keys, column_schemas)
} else {
// inferred form
Expand Down Expand Up @@ -182,7 +184,7 @@ pub(crate) async fn create_external_expr(
/// Convert `CreateTable` statement to `CreateExpr` gRPC request.
pub fn create_to_expr(create: &CreateTable, query_ctx: QueryContextRef) -> Result<CreateTableExpr> {
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(&create.name, query_ctx)
table_idents_to_full_name(&create.name, &query_ctx)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;

Expand All @@ -199,7 +201,12 @@ pub fn create_to_expr(create: &CreateTable, query_ctx: QueryContextRef) -> Resul
schema_name,
table_name,
desc: String::default(),
column_defs: columns_to_expr(&create.columns, &time_index, &primary_keys)?,
column_defs: columns_to_expr(
&create.columns,
&time_index,
&primary_keys,
Some(&query_ctx.timezone()),
)?,
time_index,
primary_keys,
create_if_not_exists: create.if_not_exists,
Expand Down Expand Up @@ -293,18 +300,23 @@ fn columns_to_expr(
column_defs: &[ColumnDef],
time_index: &str,
primary_keys: &[String],
timezone: Option<&Timezone>,
) -> Result<Vec<api::v1::ColumnDef>> {
let column_schemas = columns_to_column_schemas(column_defs, time_index)?;
let column_schemas = columns_to_column_schemas(column_defs, time_index, timezone)?;
column_schemas_to_defs(column_schemas, primary_keys)
}

fn columns_to_column_schemas(
column_defs: &[ColumnDef],
time_index: &str,
timezone: Option<&Timezone>,
) -> Result<Vec<ColumnSchema>> {
column_defs
.iter()
.map(|c| column_def_to_schema(c, c.name.to_string() == time_index).context(ParseSqlSnafu))
.map(|c| {
column_def_to_schema(c, c.name.to_string() == time_index, timezone)
.context(ParseSqlSnafu)
})
.collect::<Result<Vec<ColumnSchema>>>()
}

Expand Down Expand Up @@ -365,7 +377,7 @@ pub(crate) fn to_alter_expr(
query_ctx: QueryContextRef,
) -> Result<AlterExpr> {
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(alter_table.table_name(), query_ctx)
table_idents_to_full_name(alter_table.table_name(), &query_ctx)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;

Expand All @@ -382,7 +394,7 @@ pub(crate) fn to_alter_expr(
} => Kind::AddColumns(AddColumns {
add_columns: vec![AddColumn {
column_def: Some(
sql_column_def_to_grpc_column_def(column_def)
sql_column_def_to_grpc_column_def(column_def, Some(&query_ctx.timezone()))
.map_err(BoxedError::new)
.context(ExternalSnafu)?,
),
Expand All @@ -409,10 +421,12 @@ pub(crate) fn to_alter_expr(

#[cfg(test)]
mod tests {
use session::context::QueryContext;
use datatypes::value::Value;
use session::context::{QueryContext, QueryContextBuilder};
use sql::dialect::GreptimeDbDialect;
use sql::parser::{ParseOptions, ParserContext};
use sql::statements::statement::Statement;
use store_api::storage::ColumnDefaultConstraint;

use super::*;

Expand All @@ -435,4 +449,100 @@ mod tests {
expr.table_options.get("write_buffer_size").unwrap()
);
}

#[test]
fn test_create_to_expr_with_default_timestamp_value() {
let sql = "CREATE TABLE monitor (v double,ts TIMESTAMP default '2024-01-30T00:01:01',TIME INDEX (ts)) engine=mito;";
let stmt =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap()
.pop()
.unwrap();

let Statement::CreateTable(create_table) = stmt else {
unreachable!()
};

// query context with system timezone UTC.
let expr = create_to_expr(&create_table, QueryContext::arc()).unwrap();
let ts_column = &expr.column_defs[1];
let constraint = assert_ts_column(ts_column);
assert!(
matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
);

// query context with timezone `+08:00`
let ctx = QueryContextBuilder::default()
.timezone(Timezone::from_tz_string("+08:00").unwrap().into())
.build();
let expr = create_to_expr(&create_table, ctx).unwrap();
let ts_column = &expr.column_defs[1];
let constraint = assert_ts_column(ts_column);
assert!(
matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
);
}

fn assert_ts_column(ts_column: &api::v1::ColumnDef) -> ColumnDefaultConstraint {
assert_eq!("ts", ts_column.name);
assert_eq!(
ColumnDataType::TimestampMillisecond as i32,
ts_column.data_type
);
assert!(!ts_column.default_constraint.is_empty());

ColumnDefaultConstraint::try_from(&ts_column.default_constraint[..]).unwrap()
}

#[test]
fn test_to_alter_expr() {
let sql = "ALTER TABLE monitor add column ts TIMESTAMP default '2024-01-30T00:01:01';";
let stmt =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap()
.pop()
.unwrap();

let Statement::Alter(alter_table) = stmt else {
unreachable!()
};

// query context with system timezone UTC.
let expr = to_alter_expr(alter_table.clone(), QueryContext::arc()).unwrap();
let kind = expr.kind.unwrap();

let Kind::AddColumns(AddColumns { add_columns, .. }) = kind else {
unreachable!()
};

assert_eq!(1, add_columns.len());
let ts_column = add_columns[0].column_def.clone().unwrap();
let constraint = assert_ts_column(&ts_column);
assert!(
matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
if ts.to_iso8601_string() == "2024-01-30 00:01:01+0000")
);

//
// query context with timezone `+08:00`
let ctx = QueryContextBuilder::default()
.timezone(Timezone::from_tz_string("+08:00").unwrap().into())
.build();
let expr = to_alter_expr(alter_table, ctx).unwrap();
let kind = expr.kind.unwrap();

let Kind::AddColumns(AddColumns { add_columns, .. }) = kind else {
unreachable!()
};

assert_eq!(1, add_columns.len());
let ts_column = add_columns[0].column_def.clone().unwrap();
let constraint = assert_ts_column(&ts_column);
assert!(
matches!(constraint, ColumnDefaultConstraint::Value(Value::Timestamp(ts))
if ts.to_iso8601_string() == "2024-01-29 16:01:01+0000")
);
}
}
6 changes: 3 additions & 3 deletions src/operator/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ impl Inserter {
) -> Result<Output> {
let inserts =
StatementToRegion::new(self.catalog_manager.as_ref(), &self.partition_manager, ctx)
.convert(insert)
.convert(insert, ctx)
.await?;

let affected_rows = self.do_request(inserts, ctx).await?;
Expand Down Expand Up @@ -334,7 +334,7 @@ impl Inserter {

// create physical table
let res = statement_executor
.create_table_inner(create_table_expr, None)
.create_table_inner(create_table_expr, None, ctx)
.await;

match res {
Expand Down Expand Up @@ -431,7 +431,7 @@ impl Inserter {

// TODO(weny): multiple regions table.
let res = statement_executor
.create_table_inner(create_table_expr, None)
.create_table_inner(create_table_expr, None, ctx)
.await;

match res {
Expand Down
23 changes: 18 additions & 5 deletions src/operator/src/req_convert/insert/stmt_to_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ use api::helper::{value_to_grpc_value, ColumnDataTypeWrapper};
use api::v1::region::InsertRequests as RegionInsertRequests;
use api::v1::{ColumnSchema as GrpcColumnSchema, Row, Rows, Value as GrpcValue};
use catalog::CatalogManager;
use common_time::Timezone;
use datatypes::schema::{ColumnSchema, SchemaRef};
use partition::manager::PartitionRuleManager;
use session::context::QueryContext;
use session::context::{QueryContext, QueryContextRef};
use snafu::{ensure, OptionExt, ResultExt};
use sql::statements;
use sql::statements::insert::Insert;
Expand Down Expand Up @@ -54,7 +55,11 @@ impl<'a> StatementToRegion<'a> {
}
}

pub async fn convert(&self, stmt: &Insert) -> Result<RegionInsertRequests> {
pub async fn convert(
&self,
stmt: &Insert,
query_ctx: &QueryContextRef,
) -> Result<RegionInsertRequests> {
let (catalog, schema, table_name) = self.get_full_name(stmt.table_name())?;
let table = self.get_table(&catalog, &schema, &table_name).await?;
let table_schema = table.schema();
Expand Down Expand Up @@ -110,7 +115,11 @@ impl<'a> StatementToRegion<'a> {
schema.push(grpc_column_schema);

for (sql_row, grpc_row) in sql_rows.iter().zip(rows.iter_mut()) {
let value = sql_value_to_grpc_value(column_schema, &sql_row[i])?;
let value = sql_value_to_grpc_value(
column_schema,
&sql_row[i],
Some(&query_ctx.timezone()),
)?;
grpc_row.values.push(value);
}
}
Expand Down Expand Up @@ -169,7 +178,11 @@ fn column_names<'a>(stmt: &'a Insert, table_schema: &'a SchemaRef) -> Vec<&'a St
}
}

fn sql_value_to_grpc_value(column_schema: &ColumnSchema, sql_val: &SqlValue) -> Result<GrpcValue> {
fn sql_value_to_grpc_value(
column_schema: &ColumnSchema,
sql_val: &SqlValue,
timezone: Option<&Timezone>,
) -> Result<GrpcValue> {
let column = &column_schema.name;
let value = if replace_default(sql_val) {
let default_value = column_schema
Expand All @@ -182,7 +195,7 @@ fn sql_value_to_grpc_value(column_schema: &ColumnSchema, sql_val: &SqlValue) ->
column: column.clone(),
})?
} else {
statements::sql_value_to_value(column, &column_schema.data_type, sql_val)
statements::sql_value_to_value(column, &column_schema.data_type, sql_val, timezone)
.context(ParseSqlSnafu)?
};

Expand Down
13 changes: 7 additions & 6 deletions src/operator/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,15 +160,15 @@ impl StatementExecutor {
Statement::Alter(alter_table) => self.alter_table(alter_table, query_ctx).await,
Statement::DropTable(stmt) => {
let (catalog, schema, table) =
table_idents_to_full_name(stmt.table_name(), query_ctx)
table_idents_to_full_name(stmt.table_name(), &query_ctx)
.map_err(BoxedError::new)
.context(error::ExternalSnafu)?;
let table_name = TableName::new(catalog, schema, table);
self.drop_table(table_name, stmt.drop_if_exists()).await
}
Statement::TruncateTable(stmt) => {
let (catalog, schema, table) =
table_idents_to_full_name(stmt.table_name(), query_ctx)
table_idents_to_full_name(stmt.table_name(), &query_ctx)
.map_err(BoxedError::new)
.context(error::ExternalSnafu)?;
let table_name = TableName::new(catalog, schema, table);
Expand All @@ -186,7 +186,7 @@ impl StatementExecutor {

Statement::ShowCreateTable(show) => {
let (catalog, schema, table) =
table_idents_to_full_name(&show.table_name, query_ctx.clone())
table_idents_to_full_name(&show.table_name, &query_ctx)
.map_err(BoxedError::new)
.context(error::ExternalSnafu)?;

Expand Down Expand Up @@ -298,9 +298,10 @@ fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result<
CopyTable::To(arg) => arg,
CopyTable::From(arg) => arg,
};
let (catalog_name, schema_name, table_name) = table_idents_to_full_name(&table_name, query_ctx)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(&table_name, &query_ctx)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;

let pattern = with
.get(common_datasource::file_format::FILE_PATTERN)
Expand Down
Loading

0 comments on commit 43ef082

Please sign in to comment.