diff --git a/src/frontend/src/binder/relation/mod.rs b/src/frontend/src/binder/relation/mod.rs index 890c3b9ae0a91..1675c9bc762ce 100644 --- a/src/frontend/src/binder/relation/mod.rs +++ b/src/frontend/src/binder/relation/mod.rs @@ -19,7 +19,8 @@ use itertools::{EitherOrBoth, Itertools}; use risingwave_common::bail; use risingwave_common::catalog::{Field, TableId, DEFAULT_SCHEMA_NAME}; use risingwave_sqlparser::ast::{ - Expr as ParserExpr, FunctionArg, FunctionArgExpr, Ident, ObjectName, TableAlias, TableFactor, + AsOf, Expr as ParserExpr, FunctionArg, FunctionArgExpr, Ident, ObjectName, TableAlias, + TableFactor, }; use thiserror::Error; use thiserror_ext::AsReport; @@ -333,7 +334,7 @@ impl Binder { &mut self, name: ObjectName, alias: Option, - for_system_time_as_of_proctime: bool, + as_of: Option, ) -> Result { let (schema_name, table_name) = Self::resolve_schema_qualified_name(&self.db_name, name)?; if schema_name.is_none() @@ -376,12 +377,7 @@ impl Binder { })); Ok(share_relation) } else { - self.bind_relation_by_name_inner( - schema_name.as_deref(), - &table_name, - alias, - for_system_time_as_of_proctime, - ) + self.bind_relation_by_name_inner(schema_name.as_deref(), &table_name, alias, as_of) } } @@ -401,7 +397,7 @@ impl Binder { }?; Ok(( - self.bind_relation_by_name(table_name.clone(), None, false)?, + self.bind_relation_by_name(table_name.clone(), None, None)?, table_name, )) } @@ -451,16 +447,14 @@ impl Binder { .map_or(DEFAULT_SCHEMA_NAME.to_string(), |arg| arg.to_string()); let table_name = self.catalog.get_table_name_by_id(table_id)?; - self.bind_relation_by_name_inner(Some(&schema), &table_name, alias, false) + self.bind_relation_by_name_inner(Some(&schema), &table_name, alias, None) } pub(super) fn bind_table_factor(&mut self, table_factor: TableFactor) -> Result { match table_factor { - TableFactor::Table { - name, - alias, - for_system_time_as_of_proctime, - } => self.bind_relation_by_name(name, alias, for_system_time_as_of_proctime), + TableFactor::Table { name, alias, as_of } => { + self.bind_relation_by_name(name, alias, as_of) + } TableFactor::TableFunction { name, alias, diff --git a/src/frontend/src/binder/relation/table_function.rs b/src/frontend/src/binder/relation/table_function.rs index a0c70f58f1cb0..9189e176e0d55 100644 --- a/src/frontend/src/binder/relation/table_function.rs +++ b/src/frontend/src/binder/relation/table_function.rs @@ -72,7 +72,7 @@ impl Binder { Some(PG_CATALOG_SCHEMA_NAME), PG_KEYWORDS_TABLE_NAME, alias, - false, + None, ); } } diff --git a/src/frontend/src/binder/relation/table_or_source.rs b/src/frontend/src/binder/relation/table_or_source.rs index 39d34113663b7..ef54c65cb5fef 100644 --- a/src/frontend/src/binder/relation/table_or_source.rs +++ b/src/frontend/src/binder/relation/table_or_source.rs @@ -19,7 +19,7 @@ use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{is_system_schema, Field}; use risingwave_common::session_config::USER_NAME_WILD_CARD; use risingwave_connector::WithPropertiesExt; -use risingwave_sqlparser::ast::{Statement, TableAlias}; +use risingwave_sqlparser::ast::{AsOf, Statement, TableAlias}; use risingwave_sqlparser::parser::Parser; use thiserror_ext::AsReport; @@ -39,7 +39,7 @@ pub struct BoundBaseTable { pub table_id: TableId, pub table_catalog: Arc, pub table_indexes: Vec>, - pub for_system_time_as_of_proctime: bool, + pub as_of: Option, } #[derive(Debug, Clone)] @@ -51,6 +51,7 @@ pub struct BoundSystemTable { #[derive(Debug, Clone)] pub struct BoundSource { pub catalog: SourceCatalog, + pub as_of: Option, } impl BoundSource { @@ -59,12 +60,6 @@ impl BoundSource { } } -impl From<&SourceCatalog> for BoundSource { - fn from(s: &SourceCatalog) -> Self { - Self { catalog: s.clone() } - } -} - impl Binder { /// Binds table or source, or logical view according to what we get from the catalog. pub fn bind_relation_by_name_inner( @@ -72,7 +67,7 @@ impl Binder { schema_name: Option<&str>, table_name: &str, alias: Option, - for_system_time_as_of_proctime: bool, + as_of: Option, ) -> Result { // define some helper functions converting catalog to bound relation let resolve_sys_table_relation = |sys_table_catalog: &Arc| { @@ -124,16 +119,12 @@ impl Binder { self.catalog .get_table_by_name(&self.db_name, schema_path, table_name) { - self.resolve_table_relation( - table_catalog.clone(), - schema_name, - for_system_time_as_of_proctime, - )? + self.resolve_table_relation(table_catalog.clone(), schema_name, as_of)? } else if let Ok((source_catalog, _)) = self.catalog .get_source_by_name(&self.db_name, schema_path, table_name) { - self.resolve_source_relation(&source_catalog.clone()) + self.resolve_source_relation(&source_catalog.clone(), as_of) } else if let Ok((view_catalog, _)) = self.catalog .get_view_by_name(&self.db_name, schema_path, table_name) @@ -171,14 +162,13 @@ impl Binder { return self.resolve_table_relation( table_catalog.clone(), &schema_name.clone(), - for_system_time_as_of_proctime, + as_of, ); } else if let Some(source_catalog) = schema.get_source_by_name(table_name) { - return Ok( - self.resolve_source_relation(&source_catalog.clone()) - ); + return Ok(self + .resolve_source_relation(&source_catalog.clone(), as_of)); } else if let Some(view_catalog) = schema.get_view_by_name(table_name) { @@ -201,7 +191,7 @@ impl Binder { &mut self, table_catalog: Arc, schema_name: &str, - for_system_time_as_of_proctime: bool, + as_of: Option, ) -> Result<(Relation, Vec<(bool, Field)>)> { let table_id = table_catalog.id(); let columns = table_catalog @@ -216,7 +206,7 @@ impl Binder { table_id, table_catalog, table_indexes, - for_system_time_as_of_proctime, + as_of, }; Ok::<_, RwError>((Relation::BaseTable(Box::new(table)), columns)) @@ -225,10 +215,14 @@ impl Binder { fn resolve_source_relation( &mut self, source_catalog: &SourceCatalog, + as_of: Option, ) -> (Relation, Vec<(bool, Field)>) { self.included_relations.insert(source_catalog.id.into()); ( - Relation::Source(Box::new(source_catalog.into())), + Relation::Source(Box::new(BoundSource { + catalog: source_catalog.clone(), + as_of, + })), source_catalog .columns .iter() @@ -334,7 +328,7 @@ impl Binder { table_id, table_catalog, table_indexes, - for_system_time_as_of_proctime: false, + as_of: None, }) } diff --git a/src/frontend/src/handler/create_index.rs b/src/frontend/src/handler/create_index.rs index e1b263a64db18..5543e8e221193 100644 --- a/src/frontend/src/handler/create_index.rs +++ b/src/frontend/src/handler/create_index.rs @@ -332,7 +332,7 @@ fn assemble_materialize( // Index table has no indexes. vec![], context, - false, + None, cardinality, ); diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 4ba3e8af66572..3968e5eb827b5 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -70,7 +70,7 @@ pub fn gen_sink_subscription_query_from_name(from_name: ObjectName) -> Result Re // Vec, Vec, Vec, Vec>, String, Option let (columns, pk_columns, dist_columns, indices, relname, description) = - if let Ok(relation) = binder.bind_relation_by_name(object_name.clone(), None, false) { + if let Ok(relation) = binder.bind_relation_by_name(object_name.clone(), None, None) { match relation { Relation::Source(s) => { let pk_column_catalogs = s diff --git a/src/frontend/src/handler/show.rs b/src/frontend/src/handler/show.rs index 1e07abd9b09de..dfeeb0965ad64 100644 --- a/src/frontend/src/handler/show.rs +++ b/src/frontend/src/handler/show.rs @@ -43,7 +43,7 @@ pub fn get_columns_from_table( table_name: ObjectName, ) -> Result> { let mut binder = Binder::new_for_system(session); - let relation = binder.bind_relation_by_name(table_name.clone(), None, false)?; + let relation = binder.bind_relation_by_name(table_name.clone(), None, None)?; let column_catalogs = match relation { Relation::Source(s) => s.catalog.columns, Relation::BaseTable(t) => t.table_catalog.columns.clone(), @@ -89,7 +89,7 @@ pub fn get_indexes_from_table( table_name: ObjectName, ) -> Result>> { let mut binder = Binder::new_for_system(session); - let relation = binder.bind_relation_by_name(table_name.clone(), None, false)?; + let relation = binder.bind_relation_by_name(table_name.clone(), None, None)?; let indexes = match relation { Relation::BaseTable(t) => t.table_indexes, _ => { diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index d5f1eb236f97b..89bccfdfed162 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -633,6 +633,7 @@ impl PlanRoot { row_id_index, SourceNodeKind::CreateTable, context.clone(), + None, ) .and_then(|s| s.to_stream(&mut ToStreamContext::new(false)))?; diff --git a/src/frontend/src/optimizer/plan_node/generic/source.rs b/src/frontend/src/optimizer/plan_node/generic/source.rs index 406a3654def24..fa347375bf2bf 100644 --- a/src/frontend/src/optimizer/plan_node/generic/source.rs +++ b/src/frontend/src/optimizer/plan_node/generic/source.rs @@ -21,6 +21,7 @@ use risingwave_common::catalog::{ColumnCatalog, Field, Schema}; use risingwave_common::types::DataType; use risingwave_common::util::sort_util::OrderType; use risingwave_connector::WithPropertiesExt; +use risingwave_sqlparser::ast::AsOf; use super::super::utils::TableCatalogBuilder; use super::GenericPlanNode; @@ -66,6 +67,8 @@ pub struct Source { /// Kafka timestamp range, currently we only support kafka, so we just leave it like this. pub(crate) kafka_timestamp_range: (Bound, Bound), + + pub as_of: Option, } impl GenericPlanNode for Source { diff --git a/src/frontend/src/optimizer/plan_node/generic/table_scan.rs b/src/frontend/src/optimizer/plan_node/generic/table_scan.rs index 8b203d66dbc33..e8ac52cb7fa8e 100644 --- a/src/frontend/src/optimizer/plan_node/generic/table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/generic/table_scan.rs @@ -22,6 +22,7 @@ use pretty_xmlish::Pretty; use risingwave_common::catalog::{ColumnDesc, Field, Schema, TableDesc}; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::sort_util::ColumnOrder; +use risingwave_sqlparser::ast::AsOf; use super::GenericPlanNode; use crate::catalog::table_catalog::TableType; @@ -54,7 +55,10 @@ pub struct TableScan { /// The pushed down predicates. It refers to column indexes of the table. pub predicate: Condition, /// syntax `FOR SYSTEM_TIME AS OF PROCTIME()` is used for temporal join. - pub for_system_time_as_of_proctime: bool, + /// syntax `FOR SYSTEM_TIME AS OF '1986-10-26 01:21:00'` is used for iceberg. + /// syntax `FOR SYSTEM_TIME AS OF 499162860` is used for iceberg. + /// syntax `FOR SYSTEM_VERSION AS OF 10963874102873;` is used for iceberg. + pub as_of: Option, /// The cardinality of the table **without** applying the predicate. pub table_cardinality: Cardinality, #[educe(PartialEq(ignore))] @@ -235,7 +239,7 @@ impl TableScan { vec![], self.ctx.clone(), new_predicate, - self.for_system_time_as_of_proctime, + self.as_of.clone(), self.table_cardinality, ) } @@ -249,7 +253,7 @@ impl TableScan { indexes: Vec>, ctx: OptimizerContextRef, predicate: Condition, // refers to column indexes of the table - for_system_time_as_of_proctime: bool, + as_of: Option, table_cardinality: Cardinality, ) -> Self { Self::new_inner( @@ -259,7 +263,7 @@ impl TableScan { indexes, ctx, predicate, - for_system_time_as_of_proctime, + as_of, table_cardinality, ) } @@ -272,7 +276,7 @@ impl TableScan { indexes: Vec>, ctx: OptimizerContextRef, predicate: Condition, // refers to column indexes of the table - for_system_time_as_of_proctime: bool, + as_of: Option, table_cardinality: Cardinality, ) -> Self { // here we have 3 concepts @@ -301,7 +305,7 @@ impl TableScan { table_desc, indexes, predicate, - for_system_time_as_of_proctime, + as_of, table_cardinality, ctx, } diff --git a/src/frontend/src/optimizer/plan_node/logical_join.rs b/src/frontend/src/optimizer/plan_node/logical_join.rs index 62fb521fa9d0f..47c5238bde2ba 100644 --- a/src/frontend/src/optimizer/plan_node/logical_join.rs +++ b/src/frontend/src/optimizer/plan_node/logical_join.rs @@ -19,6 +19,7 @@ use itertools::{EitherOrBoth, Itertools}; use pretty_xmlish::{Pretty, XmlNode}; use risingwave_pb::plan_common::JoinType; use risingwave_pb::stream_plan::StreamScanType; +use risingwave_sqlparser::ast::AsOf; use super::generic::{ push_down_into_join, push_down_join_condition, GenericPlanNode, GenericPlanRef, @@ -932,7 +933,7 @@ impl LogicalJoin { fn should_be_temporal_join(&self) -> bool { let right = self.right(); if let Some(logical_scan) = right.as_logical_scan() { - logical_scan.for_system_time_as_of_proctime() + matches!(logical_scan.as_of(), Some(AsOf::ProcessTime)) } else { false } @@ -999,7 +1000,7 @@ impl LogicalJoin { ))); }; - if !logical_scan.for_system_time_as_of_proctime() { + if !matches!(logical_scan.as_of(), Some(AsOf::ProcessTime)) { return Err(RwError::from(ErrorCode::NotSupported( "Temporal join requires a table defined as temporal table".into(), "Please use FOR SYSTEM_TIME AS OF PROCTIME() syntax".into(), diff --git a/src/frontend/src/optimizer/plan_node/logical_scan.rs b/src/frontend/src/optimizer/plan_node/logical_scan.rs index 1fa666e2255a0..77f26d0adc45e 100644 --- a/src/frontend/src/optimizer/plan_node/logical_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_scan.rs @@ -21,6 +21,7 @@ use itertools::Itertools; use pretty_xmlish::{Pretty, XmlNode}; use risingwave_common::catalog::{ColumnDesc, TableDesc}; use risingwave_common::util::sort_util::ColumnOrder; +use risingwave_sqlparser::ast::AsOf; use super::generic::{GenericPlanNode, GenericPlanRef}; use super::utils::{childless_record, Distill}; @@ -69,7 +70,7 @@ impl LogicalScan { table_catalog: Arc, indexes: Vec>, ctx: OptimizerContextRef, - for_system_time_as_of_proctime: bool, + as_of: Option, table_cardinality: Cardinality, ) -> Self { let output_col_idx: Vec = (0..table_catalog.columns().len()).collect(); @@ -80,7 +81,7 @@ impl LogicalScan { indexes, ctx, Condition::true_cond(), - for_system_time_as_of_proctime, + as_of, table_cardinality, ) .into() @@ -90,8 +91,8 @@ impl LogicalScan { &self.core.table_name } - pub fn for_system_time_as_of_proctime(&self) -> bool { - self.core.for_system_time_as_of_proctime + pub fn as_of(&self) -> Option { + self.core.as_of.clone() } /// The cardinality of the table **without** applying the predicate. @@ -247,7 +248,7 @@ impl LogicalScan { self.indexes().to_vec(), self.ctx(), Condition::true_cond(), - self.for_system_time_as_of_proctime(), + self.as_of(), self.table_cardinality(), ); let project_expr = if self.required_col_idx() != self.output_col_idx() { @@ -266,7 +267,7 @@ impl LogicalScan { self.indexes().to_vec(), self.base.ctx().clone(), predicate, - self.for_system_time_as_of_proctime(), + self.as_of(), self.table_cardinality(), ) .into() @@ -280,7 +281,7 @@ impl LogicalScan { self.indexes().to_vec(), self.base.ctx().clone(), self.predicate().clone(), - self.for_system_time_as_of_proctime(), + self.as_of(), self.table_cardinality(), ) .into() diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index 13cd420b61523..54d3680778817 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -26,6 +26,7 @@ use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR; use risingwave_connector::source::{DataType, UPSTREAM_SOURCE_KEY}; use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; use risingwave_pb::plan_common::GeneratedColumnDesc; +use risingwave_sqlparser::ast::AsOf; use super::generic::{GenericPlanRef, SourceNodeKind}; use super::stream_watermark_filter::StreamWatermarkFilter; @@ -71,6 +72,7 @@ impl LogicalSource { row_id_index: Option, kind: SourceNodeKind, ctx: OptimizerContextRef, + as_of: Option, ) -> Result { let kafka_timestamp_range = (Bound::Unbounded, Bound::Unbounded); let core = generic::Source { @@ -80,6 +82,7 @@ impl LogicalSource { kind, ctx, kafka_timestamp_range, + as_of, }; let base = PlanBase::new_logical_with_core(&core); @@ -99,6 +102,7 @@ impl LogicalSource { source_catalog: Rc, kind: SourceNodeKind, ctx: OptimizerContextRef, + as_of: Option, ) -> Result { let column_catalogs = source_catalog.columns.clone(); let row_id_index = source_catalog.row_id_index; @@ -112,6 +116,7 @@ impl LogicalSource { row_id_index, kind, ctx, + as_of, ) } diff --git a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs index d919f3a968419..ecbdba1b32265 100644 --- a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs @@ -17,6 +17,7 @@ use pretty_xmlish::{Pretty, XmlNode}; use risingwave_pb::plan_common::JoinType; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::TemporalJoinNode; +use risingwave_sqlparser::ast::AsOf; use super::generic::GenericPlanRef; use super::stream::prelude::*; @@ -54,7 +55,7 @@ impl StreamTemporalJoin { let scan: &StreamTableScan = exchange_input .as_stream_table_scan() .expect("should be a stream table scan"); - assert!(scan.core().for_system_time_as_of_proctime); + assert!(matches!(scan.core().as_of, Some(AsOf::ProcessTime))); let l2o = core.l2i_col_mapping().composite(&core.i2o_col_mapping()); let dist = l2o.rewrite_provided_distribution(core.left.distribution()); diff --git a/src/frontend/src/optimizer/plan_visitor/temporal_join_validator.rs b/src/frontend/src/optimizer/plan_visitor/temporal_join_validator.rs index 6459d4925dc1c..2f8d6b3fc89b2 100644 --- a/src/frontend/src/optimizer/plan_visitor/temporal_join_validator.rs +++ b/src/frontend/src/optimizer/plan_visitor/temporal_join_validator.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_sqlparser::ast::AsOf; + use super::{DefaultBehavior, Merge}; use crate::optimizer::plan_node::{ BatchSeqScan, LogicalScan, PlanTreeNodeBinary, StreamTableScan, StreamTemporalJoin, @@ -39,15 +41,15 @@ impl PlanVisitor for TemporalJoinValidator { } fn visit_stream_table_scan(&mut self, stream_table_scan: &StreamTableScan) -> bool { - stream_table_scan.core().for_system_time_as_of_proctime + matches!(stream_table_scan.core().as_of, Some(AsOf::ProcessTime)) } fn visit_batch_seq_scan(&mut self, batch_seq_scan: &BatchSeqScan) -> bool { - batch_seq_scan.core().for_system_time_as_of_proctime + matches!(batch_seq_scan.core().as_of, Some(AsOf::ProcessTime)) } fn visit_logical_scan(&mut self, logical_scan: &LogicalScan) -> bool { - logical_scan.for_system_time_as_of_proctime() + matches!(logical_scan.as_of(), Some(AsOf::ProcessTime)) } fn visit_stream_temporal_join(&mut self, stream_temporal_join: &StreamTemporalJoin) -> bool { diff --git a/src/frontend/src/optimizer/rule/index_selection_rule.rs b/src/frontend/src/optimizer/rule/index_selection_rule.rs index 6c345088b0b67..d47ad04de2763 100644 --- a/src/frontend/src/optimizer/rule/index_selection_rule.rs +++ b/src/frontend/src/optimizer/rule/index_selection_rule.rs @@ -95,7 +95,7 @@ impl Rule for IndexSelectionRule { if indexes.is_empty() { return None; } - if logical_scan.for_system_time_as_of_proctime() { + if logical_scan.as_of().is_some() { return None; } let primary_table_row_size = TableScanIoEstimator::estimate_row_size(logical_scan); @@ -230,7 +230,7 @@ impl IndexSelectionRule { index.index_table.clone(), vec![], logical_scan.ctx(), - false, + None, index.index_table.cardinality, ); @@ -239,7 +239,7 @@ impl IndexSelectionRule { index.primary_table.clone(), vec![], logical_scan.ctx(), - false, + None, index.primary_table.cardinality, ); @@ -338,7 +338,7 @@ impl IndexSelectionRule { logical_scan.table_catalog(), vec![], logical_scan.ctx(), - false, + None, logical_scan.table_cardinality(), ); @@ -573,7 +573,7 @@ impl IndexSelectionRule { Condition { conjunctions: conjunctions.to_vec(), }, - false, + None, logical_scan.table_cardinality(), ); @@ -613,7 +613,7 @@ impl IndexSelectionRule { vec![], ctx, new_predicate, - false, + None, index.index_table.cardinality, ) .into(), diff --git a/src/frontend/src/planner/relation.rs b/src/frontend/src/planner/relation.rs index 997f4923c6c51..16440e97c15bb 100644 --- a/src/frontend/src/planner/relation.rs +++ b/src/frontend/src/planner/relation.rs @@ -18,6 +18,7 @@ use itertools::Itertools; use risingwave_common::bail_not_implemented; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::types::{DataType, Interval, ScalarImpl}; +use risingwave_sqlparser::ast::AsOf; use crate::binder::{ BoundBaseTable, BoundJoin, BoundShare, BoundSource, BoundSystemTable, BoundWatermark, @@ -68,7 +69,16 @@ impl Planner { } pub(super) fn plan_base_table(&mut self, base_table: &BoundBaseTable) -> Result { - let for_system_time_as_of_proctime = base_table.for_system_time_as_of_proctime; + let as_of = base_table.as_of.clone(); + match as_of { + None | Some(AsOf::ProcessTime) => {} + Some(AsOf::TimestampString(_)) | Some(AsOf::TimestampNum(_)) => { + bail_not_implemented!("As Of Timestamp is not supported yet.") + } + Some(AsOf::VersionNum(_)) | Some(AsOf::VersionString(_)) => { + bail_not_implemented!("As Of Version is not supported yet.") + } + } let table_cardinality = base_table.table_catalog.cardinality; Ok(LogicalScan::create( base_table.table_catalog.name().to_string(), @@ -79,7 +89,7 @@ impl Planner { .map(|x| x.as_ref().clone().into()) .collect(), self.ctx(), - for_system_time_as_of_proctime, + as_of, table_cardinality, ) .into()) @@ -92,10 +102,24 @@ impl Planner { ) .into()) } else { + let as_of = source.as_of.clone(); + match as_of { + None => {} + Some(AsOf::ProcessTime) => { + bail_not_implemented!("As Of ProcessTime() is not supported yet.") + } + Some(AsOf::TimestampString(_)) | Some(AsOf::TimestampNum(_)) => { + bail_not_implemented!("As Of Timestamp is not supported yet.") + } + Some(AsOf::VersionNum(_)) | Some(AsOf::VersionString(_)) => { + bail_not_implemented!("As Of Version is not supported yet.") + } + } Ok(LogicalSource::with_catalog( Rc::new(source.catalog), SourceNodeKind::CreateMViewOrBatch, self.ctx(), + as_of, )? .into()) } diff --git a/src/frontend/src/scheduler/distributed/query.rs b/src/frontend/src/scheduler/distributed/query.rs index 2c1044a2039e1..1d96dc9f123a1 100644 --- a/src/frontend/src/scheduler/distributed/query.rs +++ b/src/frontend/src/scheduler/distributed/query.rs @@ -598,7 +598,7 @@ pub(crate) mod tests { table_catalog.into(), vec![], ctx, - false, + None, Cardinality::unknown(), ) .to_batch() diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index 558d271624dd9..be76b5b8c96ef 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -2892,6 +2892,30 @@ impl fmt::Display for SetVariableValueSingle { } } +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub enum AsOf { + ProcessTime, + // the number of seconds that have elapsed since the Unix epoch, which is January 1, 1970 at 00:00:00 Coordinated Universal Time (UTC). + TimestampNum(i64), + TimestampString(String), + VersionNum(i64), + VersionString(String), +} + +impl fmt::Display for AsOf { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + use AsOf::*; + match self { + ProcessTime => write!(f, " FOR SYSTEM_TIME AS OF PROCTIME()"), + TimestampNum(ts) => write!(f, " FOR SYSTEM_TIME AS OF {}", ts), + TimestampString(ts) => write!(f, " FOR SYSTEM_TIME AS OF '{}'", ts), + VersionNum(v) => write!(f, " FOR SYSTEM_VERSION AS OF {}", v), + VersionString(v) => write!(f, " FOR SYSTEM_VERSION AS OF '{}'", v), + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/sqlparser/src/ast/query.rs b/src/sqlparser/src/ast/query.rs index adbf6971967a9..5425864bf4e5c 100644 --- a/src/sqlparser/src/ast/query.rs +++ b/src/sqlparser/src/ast/query.rs @@ -380,8 +380,7 @@ pub enum TableFactor { Table { name: ObjectName, alias: Option, - /// syntax `FOR SYSTEM_TIME AS OF PROCTIME()` is used for temporal join. - for_system_time_as_of_proctime: bool, + as_of: Option, }, Derived { lateral: bool, @@ -409,14 +408,11 @@ pub enum TableFactor { impl fmt::Display for TableFactor { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - TableFactor::Table { - name, - alias, - for_system_time_as_of_proctime, - } => { + TableFactor::Table { name, alias, as_of } => { write!(f, "{}", name)?; - if *for_system_time_as_of_proctime { - write!(f, " FOR SYSTEM_TIME AS OF PROCTIME()")?; + match as_of { + Some(as_of) => write!(f, "{}", as_of)?, + None => (), } if let Some(alias) = alias { write!(f, " AS {}", alias)?; diff --git a/src/sqlparser/src/keywords.rs b/src/sqlparser/src/keywords.rs index 68d4b3e0a366a..4d99482558bcd 100644 --- a/src/sqlparser/src/keywords.rs +++ b/src/sqlparser/src/keywords.rs @@ -494,6 +494,7 @@ define_keywords!( SYSTEM, SYSTEM_TIME, SYSTEM_USER, + SYSTEM_VERSION, TABLE, TABLES, TABLESAMPLE, diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 20b25136b15f6..ab851f3a75ec4 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -3779,21 +3779,61 @@ impl Parser { } /// syntax `FOR SYSTEM_TIME AS OF PROCTIME()` is used for temporal join. - pub fn parse_for_system_time_as_of_proctime(&mut self) -> Result { + pub fn parse_as_of(&mut self) -> Result, ParserError> { let after_for = self.parse_keyword(Keyword::FOR); if after_for { - self.expect_keywords(&[Keyword::SYSTEM_TIME, Keyword::AS, Keyword::OF])?; - let ident = self.parse_identifier()?; - // Backward compatibility for now. - if ident.real_value() != "proctime" && ident.real_value() != "now" { - return parser_err!(format!("Expected proctime, found: {}", ident.real_value())); + if self.peek_nth_any_of_keywords(0, &[Keyword::SYSTEM_TIME]) { + self.expect_keywords(&[Keyword::SYSTEM_TIME, Keyword::AS, Keyword::OF])?; + let token = self.next_token(); + match token.token { + Token::Word(w) => { + let ident = w.to_ident()?; + // Backward compatibility for now. + if ident.real_value() == "proctime" || ident.real_value() == "now" { + self.expect_token(&Token::LParen)?; + self.expect_token(&Token::RParen)?; + Ok(Some(AsOf::ProcessTime)) + } else { + parser_err!(format!("Expected proctime, found: {}", ident.real_value())) + } + } + Token::Number(s) => { + let num = s.parse::().map_err(|e| { + ParserError::ParserError(format!( + "Could not parse '{}' as i64: {}", + s, e + )) + }); + Ok(Some(AsOf::TimestampNum(num?))) + } + Token::SingleQuotedString(s) => Ok(Some(AsOf::TimestampString(s))), + unexpected => self.expected( + "Proctime(), Number or SingleQuotedString", + unexpected.with_location(token.location), + ), + } + } else { + self.expect_keywords(&[Keyword::SYSTEM_VERSION, Keyword::AS, Keyword::OF])?; + let token = self.next_token(); + match token.token { + Token::Number(s) => { + let num = s.parse::().map_err(|e| { + ParserError::ParserError(format!( + "Could not parse '{}' as i64: {}", + s, e + )) + }); + Ok(Some(AsOf::VersionNum(num?))) + } + Token::SingleQuotedString(s) => Ok(Some(AsOf::VersionString(s))), + unexpected => self.expected( + "Number or SingleQuotedString", + unexpected.with_location(token.location), + ), + } } - - self.expect_token(&Token::LParen)?; - self.expect_token(&Token::RParen)?; - Ok(true) } else { - Ok(false) + Ok(None) } } @@ -4728,13 +4768,9 @@ impl Parser { with_ordinality, }) } else { - let for_system_time_as_of_proctime = self.parse_for_system_time_as_of_proctime()?; + let as_of = self.parse_as_of()?; let alias = self.parse_optional_table_alias(keywords::RESERVED_FOR_TABLE_ALIAS)?; - Ok(TableFactor::Table { - name, - alias, - for_system_time_as_of_proctime, - }) + Ok(TableFactor::Table { name, alias, as_of }) } } } diff --git a/src/sqlparser/src/test_utils.rs b/src/sqlparser/src/test_utils.rs index 7519b200ccd66..57ff2d0f0efb8 100644 --- a/src/sqlparser/src/test_utils.rs +++ b/src/sqlparser/src/test_utils.rs @@ -138,7 +138,7 @@ pub fn table_alias(name: impl Into) -> Option { pub fn table(name: impl Into) -> TableFactor { TableFactor::Table { name: ObjectName(vec![Ident::new_unchecked(name.into())]), - for_system_time_as_of_proctime: false, + as_of: None, alias: None, } } diff --git a/src/sqlparser/tests/sqlparser_common.rs b/src/sqlparser/tests/sqlparser_common.rs index b447dce37d31b..76b41ed5fd25f 100644 --- a/src/sqlparser/tests/sqlparser_common.rs +++ b/src/sqlparser/tests/sqlparser_common.rs @@ -2179,17 +2179,13 @@ fn parse_delimited_identifiers() { ); // check FROM match only(select.from).relation { - TableFactor::Table { - name, - alias, - for_system_time_as_of_proctime, - } => { + TableFactor::Table { name, alias, as_of } => { assert_eq!(vec![Ident::with_quote_unchecked('"', "a table")], name.0); assert_eq!( Ident::with_quote_unchecked('"', "alias"), alias.unwrap().name ); - assert!(!for_system_time_as_of_proctime); + assert!(as_of.is_none()); } _ => panic!("Expecting TableFactor::Table"), } @@ -2318,7 +2314,7 @@ fn parse_implicit_join() { relation: TableFactor::Table { name: ObjectName(vec!["t1".into()]), alias: None, - for_system_time_as_of_proctime: false, + as_of: None, }, joins: vec![], }, @@ -2326,7 +2322,7 @@ fn parse_implicit_join() { relation: TableFactor::Table { name: ObjectName(vec!["t2".into()]), alias: None, - for_system_time_as_of_proctime: false, + as_of: None, }, joins: vec![], } @@ -2342,13 +2338,13 @@ fn parse_implicit_join() { relation: TableFactor::Table { name: ObjectName(vec!["t1a".into()]), alias: None, - for_system_time_as_of_proctime: false, + as_of: None, }, joins: vec![Join { relation: TableFactor::Table { name: ObjectName(vec!["t1b".into()]), alias: None, - for_system_time_as_of_proctime: false, + as_of: None, }, join_operator: JoinOperator::Inner(JoinConstraint::Natural), }] @@ -2357,13 +2353,13 @@ fn parse_implicit_join() { relation: TableFactor::Table { name: ObjectName(vec!["t2a".into()]), alias: None, - for_system_time_as_of_proctime: false, + as_of: None, }, joins: vec![Join { relation: TableFactor::Table { name: ObjectName(vec!["t2b".into()]), alias: None, - for_system_time_as_of_proctime: false, + as_of: None, }, join_operator: JoinOperator::Inner(JoinConstraint::Natural), }] @@ -2382,7 +2378,7 @@ fn parse_cross_join() { relation: TableFactor::Table { name: ObjectName(vec![Ident::new_unchecked("t2")]), alias: None, - for_system_time_as_of_proctime: false, + as_of: None, }, join_operator: JoinOperator::CrossJoin }, @@ -2399,7 +2395,7 @@ fn parse_temporal_join() { relation: TableFactor::Table { name: ObjectName(vec![Ident::new_unchecked("t2")]), alias: None, - for_system_time_as_of_proctime: true, + as_of: Some(AsOf::ProcessTime), }, join_operator: Inner(JoinConstraint::On(Expr::BinaryOp { left: Box::new(Expr::Identifier("c1".into())), @@ -2422,7 +2418,7 @@ fn parse_joins_on() { relation: TableFactor::Table { name: ObjectName(vec![Ident::new_unchecked(relation.into())]), alias, - for_system_time_as_of_proctime: false, + as_of: None, }, join_operator: f(JoinConstraint::On(Expr::BinaryOp { left: Box::new(Expr::Identifier("c1".into())), @@ -2474,7 +2470,7 @@ fn parse_joins_using() { relation: TableFactor::Table { name: ObjectName(vec![Ident::new_unchecked(relation.into())]), alias, - for_system_time_as_of_proctime: false, + as_of: None, }, join_operator: f(JoinConstraint::Using(vec!["c1".into()])), } @@ -2518,7 +2514,7 @@ fn parse_natural_join() { relation: TableFactor::Table { name: ObjectName(vec![Ident::new_unchecked("t2")]), alias: None, - for_system_time_as_of_proctime: false, + as_of: None, }, join_operator: f(JoinConstraint::Natural), } @@ -2745,7 +2741,7 @@ fn parse_derived_tables() { relation: TableFactor::Table { name: ObjectName(vec!["t2".into()]), alias: None, - for_system_time_as_of_proctime: false, + as_of: None, }, join_operator: JoinOperator::Inner(JoinConstraint::Natural), }], diff --git a/src/sqlparser/tests/testdata/qualified_operator.yaml b/src/sqlparser/tests/testdata/qualified_operator.yaml index 814113edbe5bb..854e6b5302e7f 100644 --- a/src/sqlparser/tests/testdata/qualified_operator.yaml +++ b/src/sqlparser/tests/testdata/qualified_operator.yaml @@ -22,7 +22,7 @@ formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Function(Function { name: ObjectName([Ident { value: "operator", quote_style: Some(''"'') }]), args: [Unnamed(Expr(CompoundIdentifier([Ident { value: "foo", quote_style: None }, Ident { value: "bar", quote_style: None }])))], variadic: false, over: None, distinct: false, order_by: [], filter: None, within_group: None }))], from: [], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' - input: select operator operator(+) operator(+) "operator"(9) operator from operator; formatted_sql: SELECT operator OPERATOR(+) OPERATOR(+) "operator"(9) AS operator FROM operator - formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [ExprWithAlias { expr: BinaryOp { left: Identifier(Ident { value: "operator", quote_style: None }), op: PGQualified(QualifiedOperator { schema: None, name: "+" }), right: UnaryOp { op: PGQualified(QualifiedOperator { schema: None, name: "+" }), expr: Function(Function { name: ObjectName([Ident { value: "operator", quote_style: Some(''"'') }]), args: [Unnamed(Expr(Value(Number("9"))))], variadic: false, over: None, distinct: false, order_by: [], filter: None, within_group: None }) } }, alias: Ident { value: "operator", quote_style: None } }], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "operator", quote_style: None }]), alias: None, for_system_time_as_of_proctime: false }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' + formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [ExprWithAlias { expr: BinaryOp { left: Identifier(Ident { value: "operator", quote_style: None }), op: PGQualified(QualifiedOperator { schema: None, name: "+" }), right: UnaryOp { op: PGQualified(QualifiedOperator { schema: None, name: "+" }), expr: Function(Function { name: ObjectName([Ident { value: "operator", quote_style: Some(''"'') }]), args: [Unnamed(Expr(Value(Number("9"))))], variadic: false, over: None, distinct: false, order_by: [], filter: None, within_group: None }) } }, alias: Ident { value: "operator", quote_style: None } }], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "operator", quote_style: None }]), alias: None, as_of: None }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' - input: select 3 operator(-) 2 - 1; formatted_sql: SELECT 3 OPERATOR(-) 2 - 1 formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(BinaryOp { left: Value(Number("3")), op: PGQualified(QualifiedOperator { schema: None, name: "-" }), right: BinaryOp { left: Value(Number("2")), op: Minus, right: Value(Number("1")) } })], from: [], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' diff --git a/src/sqlparser/tests/testdata/select.yaml b/src/sqlparser/tests/testdata/select.yaml index 5321abd469ee2..3544be3eb3f5e 100644 --- a/src/sqlparser/tests/testdata/select.yaml +++ b/src/sqlparser/tests/testdata/select.yaml @@ -1,14 +1,14 @@ # This file is automatically generated. See `src/sqlparser/test_runner/src/bin/apply.rs` for more information. - input: SELECT sqrt(id) FROM foo formatted_sql: SELECT sqrt(id) FROM foo - formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Function(Function { name: ObjectName([Ident { value: "sqrt", quote_style: None }]), args: [Unnamed(Expr(Identifier(Ident { value: "id", quote_style: None })))], variadic: false, over: None, distinct: false, order_by: [], filter: None, within_group: None }))], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "foo", quote_style: None }]), alias: None, for_system_time_as_of_proctime: false }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' + formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Function(Function { name: ObjectName([Ident { value: "sqrt", quote_style: None }]), args: [Unnamed(Expr(Identifier(Ident { value: "id", quote_style: None })))], variadic: false, over: None, distinct: false, order_by: [], filter: None, within_group: None }))], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "foo", quote_style: None }]), alias: None, as_of: None }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' - input: SELECT INT '1' formatted_sql: SELECT INT '1' - input: SELECT (foo).v1.v2 FROM foo formatted_sql: SELECT (foo).v1.v2 FROM foo - input: SELECT ((((foo).v1)).v2) FROM foo formatted_sql: SELECT (((foo).v1).v2) FROM foo - formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Nested(FieldIdentifier(FieldIdentifier(Identifier(Ident { value: "foo", quote_style: None }), [Ident { value: "v1", quote_style: None }]), [Ident { value: "v2", quote_style: None }])))], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "foo", quote_style: None }]), alias: None, for_system_time_as_of_proctime: false }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' + formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Nested(FieldIdentifier(FieldIdentifier(Identifier(Ident { value: "foo", quote_style: None }), [Ident { value: "v1", quote_style: None }]), [Ident { value: "v2", quote_style: None }])))], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "foo", quote_style: None }]), alias: None, as_of: None }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' - input: SELECT (foo.v1).v2 FROM foo formatted_sql: SELECT (foo.v1).v2 FROM foo - input: SELECT (v1).v2 FROM foo @@ -39,13 +39,13 @@ formatted_sql: SELECT id FROM customer WHERE NOT salary = '' - input: SELECT * EXCEPT (v1,v2) FROM foo formatted_sql: SELECT * EXCEPT (v1, v2) FROM foo - formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [Wildcard(Some([Identifier(Ident { value: "v1", quote_style: None }), Identifier(Ident { value: "v2", quote_style: None })]))], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "foo", quote_style: None }]), alias: None, for_system_time_as_of_proctime: false }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' + formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [Wildcard(Some([Identifier(Ident { value: "v1", quote_style: None }), Identifier(Ident { value: "v2", quote_style: None })]))], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "foo", quote_style: None }]), alias: None, as_of: None }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' - input: SELECT foo.* EXCEPT (foo.v1, bar.v2) FROM foo, bar formatted_sql: SELECT foo.* EXCEPT (foo.v1, bar.v2) FROM foo, bar - formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [QualifiedWildcard(ObjectName([Ident { value: "foo", quote_style: None }]), Some([CompoundIdentifier([Ident { value: "foo", quote_style: None }, Ident { value: "v1", quote_style: None }]), CompoundIdentifier([Ident { value: "bar", quote_style: None }, Ident { value: "v2", quote_style: None }])]))], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "foo", quote_style: None }]), alias: None, for_system_time_as_of_proctime: false }, joins: [] }, TableWithJoins { relation: Table { name: ObjectName([Ident { value: "bar", quote_style: None }]), alias: None, for_system_time_as_of_proctime: false }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' + formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [QualifiedWildcard(ObjectName([Ident { value: "foo", quote_style: None }]), Some([CompoundIdentifier([Ident { value: "foo", quote_style: None }, Ident { value: "v1", quote_style: None }]), CompoundIdentifier([Ident { value: "bar", quote_style: None }, Ident { value: "v2", quote_style: None }])]))], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "foo", quote_style: None }]), alias: None, as_of: None }, joins: [] }, TableWithJoins { relation: Table { name: ObjectName([Ident { value: "bar", quote_style: None }]), alias: None, as_of: None }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' - input: SELECT * EXCEPT (v1), bar.* EXCEPT (foo.v2) FROM foo, bar formatted_sql: SELECT * EXCEPT (v1), bar.* EXCEPT (foo.v2) FROM foo, bar - formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [Wildcard(Some([Identifier(Ident { value: "v1", quote_style: None })])), QualifiedWildcard(ObjectName([Ident { value: "bar", quote_style: None }]), Some([CompoundIdentifier([Ident { value: "foo", quote_style: None }, Ident { value: "v2", quote_style: None }])]))], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "foo", quote_style: None }]), alias: None, for_system_time_as_of_proctime: false }, joins: [] }, TableWithJoins { relation: Table { name: ObjectName([Ident { value: "bar", quote_style: None }]), alias: None, for_system_time_as_of_proctime: false }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' + formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [Wildcard(Some([Identifier(Ident { value: "v1", quote_style: None })])), QualifiedWildcard(ObjectName([Ident { value: "bar", quote_style: None }]), Some([CompoundIdentifier([Ident { value: "foo", quote_style: None }, Ident { value: "v2", quote_style: None }])]))], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "foo", quote_style: None }]), alias: None, as_of: None }, joins: [] }, TableWithJoins { relation: Table { name: ObjectName([Ident { value: "bar", quote_style: None }]), alias: None, as_of: None }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' - input: SELECT v3 EXCEPT (v1, v2) FROM foo error_msg: |- sql parser error: Expected SELECT, VALUES, or a subquery in the query body, found: v1 at line:1, column:21 @@ -129,7 +129,7 @@ Near "SELECT 1::int" - input: select id1, a1, id2, a2 from stream as S join version FOR SYSTEM_TIME AS OF PROCTIME() AS V on id1= id2 formatted_sql: SELECT id1, a1, id2, a2 FROM stream AS S JOIN version FOR SYSTEM_TIME AS OF PROCTIME() AS V ON id1 = id2 - formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Identifier(Ident { value: "id1", quote_style: None })), UnnamedExpr(Identifier(Ident { value: "a1", quote_style: None })), UnnamedExpr(Identifier(Ident { value: "id2", quote_style: None })), UnnamedExpr(Identifier(Ident { value: "a2", quote_style: None }))], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "stream", quote_style: None }]), alias: Some(TableAlias { name: Ident { value: "S", quote_style: None }, columns: [] }), for_system_time_as_of_proctime: false }, joins: [Join { relation: Table { name: ObjectName([Ident { value: "version", quote_style: None }]), alias: Some(TableAlias { name: Ident { value: "V", quote_style: None }, columns: [] }), for_system_time_as_of_proctime: true }, join_operator: Inner(On(BinaryOp { left: Identifier(Ident { value: "id1", quote_style: None }), op: Eq, right: Identifier(Ident { value: "id2", quote_style: None }) })) }] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' + formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Identifier(Ident { value: "id1", quote_style: None })), UnnamedExpr(Identifier(Ident { value: "a1", quote_style: None })), UnnamedExpr(Identifier(Ident { value: "id2", quote_style: None })), UnnamedExpr(Identifier(Ident { value: "a2", quote_style: None }))], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "stream", quote_style: None }]), alias: Some(TableAlias { name: Ident { value: "S", quote_style: None }, columns: [] }), as_of: None }, joins: [Join { relation: Table { name: ObjectName([Ident { value: "version", quote_style: None }]), alias: Some(TableAlias { name: Ident { value: "V", quote_style: None }, columns: [] }), as_of: Some(ProcessTime) }, join_operator: Inner(On(BinaryOp { left: Identifier(Ident { value: "id1", quote_style: None }), op: Eq, right: Identifier(Ident { value: "id2", quote_style: None }) })) }] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' - input: select percentile_cont(0.3) within group (order by x desc) from unnest(array[1,2,4,5,10]) as x formatted_sql: SELECT percentile_cont(0.3) FROM unnest(ARRAY[1, 2, 4, 5, 10]) AS x formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Function(Function { name: ObjectName([Ident { value: "percentile_cont", quote_style: None }]), args: [Unnamed(Expr(Value(Number("0.3"))))], variadic: false, over: None, distinct: false, order_by: [], filter: None, within_group: Some(OrderByExpr { expr: Identifier(Ident { value: "x", quote_style: None }), asc: Some(false), nulls_first: None }) }))], from: [TableWithJoins { relation: TableFunction { name: ObjectName([Ident { value: "unnest", quote_style: None }]), alias: Some(TableAlias { name: Ident { value: "x", quote_style: None }, columns: [] }), args: [Unnamed(Expr(Array(Array { elem: [Value(Number("1")), Value(Number("2")), Value(Number("4")), Value(Number("5")), Value(Number("10"))], named: true })))], with_ordinality: false }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' diff --git a/src/sqlparser/tests/testdata/subquery.yaml b/src/sqlparser/tests/testdata/subquery.yaml index bc3dbc59de77f..a79acbd7a317a 100644 --- a/src/sqlparser/tests/testdata/subquery.yaml +++ b/src/sqlparser/tests/testdata/subquery.yaml @@ -1,10 +1,10 @@ # This file is automatically generated. See `src/sqlparser/test_runner/src/bin/apply.rs` for more information. - input: select a1 from a where exists (select 1 from b where a1 = b1); formatted_sql: SELECT a1 FROM a WHERE EXISTS (SELECT 1 FROM b WHERE a1 = b1) - formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Identifier(Ident { value: "a1", quote_style: None }))], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "a", quote_style: None }]), alias: None, for_system_time_as_of_proctime: false }, joins: [] }], lateral_views: [], selection: Some(Exists(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Value(Number("1")))], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "b", quote_style: None }]), alias: None, for_system_time_as_of_proctime: false }, joins: [] }], lateral_views: [], selection: Some(BinaryOp { left: Identifier(Ident { value: "a1", quote_style: None }), op: Eq, right: Identifier(Ident { value: "b1", quote_style: None }) }), group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })), group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' + formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Identifier(Ident { value: "a1", quote_style: None }))], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "a", quote_style: None }]), alias: None, as_of: None }, joins: [] }], lateral_views: [], selection: Some(Exists(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Value(Number("1")))], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "b", quote_style: None }]), alias: None, as_of: None }, joins: [] }], lateral_views: [], selection: Some(BinaryOp { left: Identifier(Ident { value: "a1", quote_style: None }), op: Eq, right: Identifier(Ident { value: "b1", quote_style: None }) }), group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })), group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' - input: select a1 from a where a1 NOT IN (select b1 from b); formatted_sql: SELECT a1 FROM a WHERE a1 NOT IN (SELECT b1 FROM b) - formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Identifier(Ident { value: "a1", quote_style: None }))], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "a", quote_style: None }]), alias: None, for_system_time_as_of_proctime: false }, joins: [] }], lateral_views: [], selection: Some(InSubquery { expr: Identifier(Ident { value: "a1", quote_style: None }), subquery: Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Identifier(Ident { value: "b1", quote_style: None }))], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "b", quote_style: None }]), alias: None, for_system_time_as_of_proctime: false }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None }, negated: true }), group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' + formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Identifier(Ident { value: "a1", quote_style: None }))], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "a", quote_style: None }]), alias: None, as_of: None }, joins: [] }], lateral_views: [], selection: Some(InSubquery { expr: Identifier(Ident { value: "a1", quote_style: None }), subquery: Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Identifier(Ident { value: "b1", quote_style: None }))], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "b", quote_style: None }]), alias: None, as_of: None }, joins: [] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None }, negated: true }), group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' - input: select a1 from a where a1 < ALL (select b1 from b); error_msg: |- sql parser error: Expected ), found: b1 at line:1, column:43 diff --git a/src/tests/sqlsmith/src/sql_gen/relation.rs b/src/tests/sqlsmith/src/sql_gen/relation.rs index a5c6b7d27545d..6e6db4e40493d 100644 --- a/src/tests/sqlsmith/src/sql_gen/relation.rs +++ b/src/tests/sqlsmith/src/sql_gen/relation.rs @@ -67,7 +67,7 @@ impl<'a, R: Rng> SqlGenerator<'a, R> { name: alias.as_str().into(), columns: vec![], }), - for_system_time_as_of_proctime: false, + as_of: None, }; table.name = alias; // Rename the table. (table_factor, table) diff --git a/src/tests/sqlsmith/src/sql_gen/utils.rs b/src/tests/sqlsmith/src/sql_gen/utils.rs index 96eeadef7831b..0e36507e86169 100644 --- a/src/tests/sqlsmith/src/sql_gen/utils.rs +++ b/src/tests/sqlsmith/src/sql_gen/utils.rs @@ -74,7 +74,7 @@ pub(crate) fn create_table_factor_from_table(table: &Table) -> TableFactor { TableFactor::Table { name: ObjectName(vec![Ident::new_unchecked(&table.name)]), alias: None, - for_system_time_as_of_proctime: false, + as_of: None, } }