Skip to content

Commit

Permalink
feat(frontend): support AS OF syntax (#15849)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Mar 22, 2024
1 parent 4b312cf commit bd9dcfc
Show file tree
Hide file tree
Showing 31 changed files with 213 additions and 128 deletions.
24 changes: 9 additions & 15 deletions src/frontend/src/binder/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use itertools::{EitherOrBoth, Itertools};
use risingwave_common::bail;
use risingwave_common::catalog::{Field, TableId, DEFAULT_SCHEMA_NAME};
use risingwave_sqlparser::ast::{
Expr as ParserExpr, FunctionArg, FunctionArgExpr, Ident, ObjectName, TableAlias, TableFactor,
AsOf, Expr as ParserExpr, FunctionArg, FunctionArgExpr, Ident, ObjectName, TableAlias,
TableFactor,
};
use thiserror::Error;
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -333,7 +334,7 @@ impl Binder {
&mut self,
name: ObjectName,
alias: Option<TableAlias>,
for_system_time_as_of_proctime: bool,
as_of: Option<AsOf>,
) -> Result<Relation> {
let (schema_name, table_name) = Self::resolve_schema_qualified_name(&self.db_name, name)?;
if schema_name.is_none()
Expand Down Expand Up @@ -376,12 +377,7 @@ impl Binder {
}));
Ok(share_relation)
} else {
self.bind_relation_by_name_inner(
schema_name.as_deref(),
&table_name,
alias,
for_system_time_as_of_proctime,
)
self.bind_relation_by_name_inner(schema_name.as_deref(), &table_name, alias, as_of)
}
}

Expand All @@ -401,7 +397,7 @@ impl Binder {
}?;

Ok((
self.bind_relation_by_name(table_name.clone(), None, false)?,
self.bind_relation_by_name(table_name.clone(), None, None)?,
table_name,
))
}
Expand Down Expand Up @@ -451,16 +447,14 @@ impl Binder {
.map_or(DEFAULT_SCHEMA_NAME.to_string(), |arg| arg.to_string());

let table_name = self.catalog.get_table_name_by_id(table_id)?;
self.bind_relation_by_name_inner(Some(&schema), &table_name, alias, false)
self.bind_relation_by_name_inner(Some(&schema), &table_name, alias, None)
}

pub(super) fn bind_table_factor(&mut self, table_factor: TableFactor) -> Result<Relation> {
match table_factor {
TableFactor::Table {
name,
alias,
for_system_time_as_of_proctime,
} => self.bind_relation_by_name(name, alias, for_system_time_as_of_proctime),
TableFactor::Table { name, alias, as_of } => {
self.bind_relation_by_name(name, alias, as_of)
}
TableFactor::TableFunction {
name,
alias,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/binder/relation/table_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl Binder {
Some(PG_CATALOG_SCHEMA_NAME),
PG_KEYWORDS_TABLE_NAME,
alias,
false,
None,
);
}
}
Expand Down
40 changes: 17 additions & 23 deletions src/frontend/src/binder/relation/table_or_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use risingwave_common::bail_not_implemented;
use risingwave_common::catalog::{is_system_schema, Field};
use risingwave_common::session_config::USER_NAME_WILD_CARD;
use risingwave_connector::WithPropertiesExt;
use risingwave_sqlparser::ast::{Statement, TableAlias};
use risingwave_sqlparser::ast::{AsOf, Statement, TableAlias};
use risingwave_sqlparser::parser::Parser;
use thiserror_ext::AsReport;

Expand All @@ -39,7 +39,7 @@ pub struct BoundBaseTable {
pub table_id: TableId,
pub table_catalog: Arc<TableCatalog>,
pub table_indexes: Vec<Arc<IndexCatalog>>,
pub for_system_time_as_of_proctime: bool,
pub as_of: Option<AsOf>,
}

#[derive(Debug, Clone)]
Expand All @@ -51,6 +51,7 @@ pub struct BoundSystemTable {
#[derive(Debug, Clone)]
pub struct BoundSource {
pub catalog: SourceCatalog,
pub as_of: Option<AsOf>,
}

impl BoundSource {
Expand All @@ -59,20 +60,14 @@ impl BoundSource {
}
}

impl From<&SourceCatalog> for BoundSource {
fn from(s: &SourceCatalog) -> Self {
Self { catalog: s.clone() }
}
}

impl Binder {
/// Binds table or source, or logical view according to what we get from the catalog.
pub fn bind_relation_by_name_inner(
&mut self,
schema_name: Option<&str>,
table_name: &str,
alias: Option<TableAlias>,
for_system_time_as_of_proctime: bool,
as_of: Option<AsOf>,
) -> Result<Relation> {
// define some helper functions converting catalog to bound relation
let resolve_sys_table_relation = |sys_table_catalog: &Arc<SystemTableCatalog>| {
Expand Down Expand Up @@ -124,16 +119,12 @@ impl Binder {
self.catalog
.get_table_by_name(&self.db_name, schema_path, table_name)
{
self.resolve_table_relation(
table_catalog.clone(),
schema_name,
for_system_time_as_of_proctime,
)?
self.resolve_table_relation(table_catalog.clone(), schema_name, as_of)?
} else if let Ok((source_catalog, _)) =
self.catalog
.get_source_by_name(&self.db_name, schema_path, table_name)
{
self.resolve_source_relation(&source_catalog.clone())
self.resolve_source_relation(&source_catalog.clone(), as_of)
} else if let Ok((view_catalog, _)) =
self.catalog
.get_view_by_name(&self.db_name, schema_path, table_name)
Expand Down Expand Up @@ -171,14 +162,13 @@ impl Binder {
return self.resolve_table_relation(
table_catalog.clone(),
&schema_name.clone(),
for_system_time_as_of_proctime,
as_of,
);
} else if let Some(source_catalog) =
schema.get_source_by_name(table_name)
{
return Ok(
self.resolve_source_relation(&source_catalog.clone())
);
return Ok(self
.resolve_source_relation(&source_catalog.clone(), as_of));
} else if let Some(view_catalog) =
schema.get_view_by_name(table_name)
{
Expand All @@ -201,7 +191,7 @@ impl Binder {
&mut self,
table_catalog: Arc<TableCatalog>,
schema_name: &str,
for_system_time_as_of_proctime: bool,
as_of: Option<AsOf>,
) -> Result<(Relation, Vec<(bool, Field)>)> {
let table_id = table_catalog.id();
let columns = table_catalog
Expand All @@ -216,7 +206,7 @@ impl Binder {
table_id,
table_catalog,
table_indexes,
for_system_time_as_of_proctime,
as_of,
};

Ok::<_, RwError>((Relation::BaseTable(Box::new(table)), columns))
Expand All @@ -225,10 +215,14 @@ impl Binder {
fn resolve_source_relation(
&mut self,
source_catalog: &SourceCatalog,
as_of: Option<AsOf>,
) -> (Relation, Vec<(bool, Field)>) {
self.included_relations.insert(source_catalog.id.into());
(
Relation::Source(Box::new(source_catalog.into())),
Relation::Source(Box::new(BoundSource {
catalog: source_catalog.clone(),
as_of,
})),
source_catalog
.columns
.iter()
Expand Down Expand Up @@ -334,7 +328,7 @@ impl Binder {
table_id,
table_catalog,
table_indexes,
for_system_time_as_of_proctime: false,
as_of: None,
})
}

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ fn assemble_materialize(
// Index table has no indexes.
vec![],
context,
false,
None,
cardinality,
);

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub fn gen_sink_subscription_query_from_name(from_name: ObjectName) -> Result<Qu
let table_factor = TableFactor::Table {
name: from_name,
alias: None,
for_system_time_as_of_proctime: false,
as_of: None,
};
let from = vec![TableWithJoins {
relation: table_factor,
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1424,6 +1424,7 @@ pub async fn handle_create_source(
Rc::new(SourceCatalog::from(&source)),
SourceNodeKind::CreateSourceWithStreamjob,
context.into(),
None,
)?;

// generate stream graph for cdc source job
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,7 @@ fn gen_table_plan_inner(
row_id_index,
SourceNodeKind::CreateTable,
context.clone(),
None,
)?
.into();

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/describe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub fn handle_describe(handler_args: HandlerArgs, object_name: ObjectName) -> Re

// Vec<ColumnCatalog>, Vec<ColumnDesc>, Vec<ColumnDesc>, Vec<Arc<IndexCatalog>>, String, Option<String>
let (columns, pk_columns, dist_columns, indices, relname, description) =
if let Ok(relation) = binder.bind_relation_by_name(object_name.clone(), None, false) {
if let Ok(relation) = binder.bind_relation_by_name(object_name.clone(), None, None) {
match relation {
Relation::Source(s) => {
let pk_column_catalogs = s
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/handler/show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub fn get_columns_from_table(
table_name: ObjectName,
) -> Result<Vec<ColumnCatalog>> {
let mut binder = Binder::new_for_system(session);
let relation = binder.bind_relation_by_name(table_name.clone(), None, false)?;
let relation = binder.bind_relation_by_name(table_name.clone(), None, None)?;
let column_catalogs = match relation {
Relation::Source(s) => s.catalog.columns,
Relation::BaseTable(t) => t.table_catalog.columns.clone(),
Expand Down Expand Up @@ -89,7 +89,7 @@ pub fn get_indexes_from_table(
table_name: ObjectName,
) -> Result<Vec<Arc<IndexCatalog>>> {
let mut binder = Binder::new_for_system(session);
let relation = binder.bind_relation_by_name(table_name.clone(), None, false)?;
let relation = binder.bind_relation_by_name(table_name.clone(), None, None)?;
let indexes = match relation {
Relation::BaseTable(t) => t.table_indexes,
_ => {
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,7 @@ impl PlanRoot {
row_id_index,
SourceNodeKind::CreateTable,
context.clone(),
None,
)
.and_then(|s| s.to_stream(&mut ToStreamContext::new(false)))?;

Expand Down
3 changes: 3 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use risingwave_common::catalog::{ColumnCatalog, Field, Schema};
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::OrderType;
use risingwave_connector::WithPropertiesExt;
use risingwave_sqlparser::ast::AsOf;

use super::super::utils::TableCatalogBuilder;
use super::GenericPlanNode;
Expand Down Expand Up @@ -66,6 +67,8 @@ pub struct Source {

/// Kafka timestamp range, currently we only support kafka, so we just leave it like this.
pub(crate) kafka_timestamp_range: (Bound<i64>, Bound<i64>),

pub as_of: Option<AsOf>,
}

impl GenericPlanNode for Source {
Expand Down
16 changes: 10 additions & 6 deletions src/frontend/src/optimizer/plan_node/generic/table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use pretty_xmlish::Pretty;
use risingwave_common::catalog::{ColumnDesc, Field, Schema, TableDesc};
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_sqlparser::ast::AsOf;

use super::GenericPlanNode;
use crate::catalog::table_catalog::TableType;
Expand Down Expand Up @@ -54,7 +55,10 @@ pub struct TableScan {
/// The pushed down predicates. It refers to column indexes of the table.
pub predicate: Condition,
/// syntax `FOR SYSTEM_TIME AS OF PROCTIME()` is used for temporal join.
pub for_system_time_as_of_proctime: bool,
/// syntax `FOR SYSTEM_TIME AS OF '1986-10-26 01:21:00'` is used for iceberg.
/// syntax `FOR SYSTEM_TIME AS OF 499162860` is used for iceberg.
/// syntax `FOR SYSTEM_VERSION AS OF 10963874102873;` is used for iceberg.
pub as_of: Option<AsOf>,
/// The cardinality of the table **without** applying the predicate.
pub table_cardinality: Cardinality,
#[educe(PartialEq(ignore))]
Expand Down Expand Up @@ -235,7 +239,7 @@ impl TableScan {
vec![],
self.ctx.clone(),
new_predicate,
self.for_system_time_as_of_proctime,
self.as_of.clone(),
self.table_cardinality,
)
}
Expand All @@ -249,7 +253,7 @@ impl TableScan {
indexes: Vec<Rc<IndexCatalog>>,
ctx: OptimizerContextRef,
predicate: Condition, // refers to column indexes of the table
for_system_time_as_of_proctime: bool,
as_of: Option<AsOf>,
table_cardinality: Cardinality,
) -> Self {
Self::new_inner(
Expand All @@ -259,7 +263,7 @@ impl TableScan {
indexes,
ctx,
predicate,
for_system_time_as_of_proctime,
as_of,
table_cardinality,
)
}
Expand All @@ -272,7 +276,7 @@ impl TableScan {
indexes: Vec<Rc<IndexCatalog>>,
ctx: OptimizerContextRef,
predicate: Condition, // refers to column indexes of the table
for_system_time_as_of_proctime: bool,
as_of: Option<AsOf>,
table_cardinality: Cardinality,
) -> Self {
// here we have 3 concepts
Expand Down Expand Up @@ -301,7 +305,7 @@ impl TableScan {
table_desc,
indexes,
predicate,
for_system_time_as_of_proctime,
as_of,
table_cardinality,
ctx,
}
Expand Down
5 changes: 3 additions & 2 deletions src/frontend/src/optimizer/plan_node/logical_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use itertools::{EitherOrBoth, Itertools};
use pretty_xmlish::{Pretty, XmlNode};
use risingwave_pb::plan_common::JoinType;
use risingwave_pb::stream_plan::StreamScanType;
use risingwave_sqlparser::ast::AsOf;

use super::generic::{
push_down_into_join, push_down_join_condition, GenericPlanNode, GenericPlanRef,
Expand Down Expand Up @@ -932,7 +933,7 @@ impl LogicalJoin {
fn should_be_temporal_join(&self) -> bool {
let right = self.right();
if let Some(logical_scan) = right.as_logical_scan() {
logical_scan.for_system_time_as_of_proctime()
matches!(logical_scan.as_of(), Some(AsOf::ProcessTime))
} else {
false
}
Expand Down Expand Up @@ -999,7 +1000,7 @@ impl LogicalJoin {
)));
};

if !logical_scan.for_system_time_as_of_proctime() {
if !matches!(logical_scan.as_of(), Some(AsOf::ProcessTime)) {
return Err(RwError::from(ErrorCode::NotSupported(
"Temporal join requires a table defined as temporal table".into(),
"Please use FOR SYSTEM_TIME AS OF PROCTIME() syntax".into(),
Expand Down
Loading

0 comments on commit bd9dcfc

Please sign in to comment.