Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: specify column when sink into table #16587

Merged
merged 10 commits into from
May 9, 2024
54 changes: 54 additions & 0 deletions e2e_test/sink/sink_into_table/specify_column.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table s (a int, b int, c int) append only;

statement ok
create table t (a int, b int default 900, c int default 9000);

statement error
create sink ss into t(aaa) as select a from s with(type = 'append-only');

statement error
create sink ss into t(a) as select a, b from s with(type = 'append-only');

statement error
create sink ss into t(a, b) as select b from s with(type = 'append-only');

statement error
create sink ss into t(a, b, c, a) as select a, b from s with(type = 'append-only');

statement ok
create sink s1 into t(a,B,c) as select c, b, a from s with(type = 'append-only');

statement ok
create sink s2 into t(a,B) as select 2*c, 2*b from s with(type = 'append-only');

statement ok
create sink s3 into t(c) as select 3*a from s with(type = 'append-only');

statement ok
insert into s values(10, 100, 1000);

query III rowsort
select * from t order by a;
----
1000 100 10
2000 200 9000
NULL 900 30

statement ok
drop sink s1;

statement ok
drop sink s2;

statement ok
drop sink s3;

statement ok
drop table s;

statement ok
drop table t;
14 changes: 14 additions & 0 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,20 @@ impl TableCatalog {
.map(|(i, _)| i)
}

pub fn default_column_expr(&self, col_idx: usize) -> ExprImpl {
if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc { expr, .. })) = self
.columns[col_idx]
.column_desc
.generated_or_default_column
.as_ref()
{
ExprImpl::from_expr_proto(expr.as_ref().unwrap())
.expect("expr in default columns corrupted")
} else {
ExprImpl::literal_null(self.columns[col_idx].data_type().clone())
}
}

pub fn default_columns(&self) -> impl Iterator<Item = (usize, ExprImpl)> + '_ {
self.columns.iter().enumerate().filter_map(|(i, c)| {
if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
Expand Down
23 changes: 13 additions & 10 deletions src/frontend/src/handler/create_mv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,24 @@ use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
use crate::session::SessionImpl;
use crate::stream_fragmenter::build_graph;

pub(super) fn parse_column_names(columns: &[Ident]) -> Option<Vec<String>> {
if columns.is_empty() {
None
} else {
Some(columns.iter().map(|v| v.real_value()).collect())
}
}

/// If columns is empty, it means that the user did not specify the column names.
/// In this case, we extract the column names from the query.
/// If columns is not empty, it means that user specify the column names and the user
/// should guarantee that the column names number are consistent with the query.
pub(super) fn get_column_names(
Comment on lines +48 to 52
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do some refactor for the function later. I guess some check should be done in other place instead of here, this PR focus on the feature and not contain it.

bound: &BoundQuery,
session: &SessionImpl,
columns: Vec<Ident>,
) -> Result<Option<Vec<String>>> {
// If columns is empty, it means that the user did not specify the column names.
// In this case, we extract the column names from the query.
// If columns is not empty, it means that user specify the column names and the user
// should guarantee that the column names number are consistent with the query.
let col_names: Option<Vec<String>> = if columns.is_empty() {
None
} else {
Some(columns.iter().map(|v| v.real_value()).collect())
};

let col_names = parse_column_names(&columns);
if let BoundSetExpr::Select(select) = &bound.body {
// `InputRef`'s alias will be implicitly assigned in `bind_project`.
// If user provide columns name (col_names.is_some()), we don't need alias.
Expand Down
137 changes: 86 additions & 51 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::rc::Rc;
use std::sync::{Arc, LazyLock};

Expand All @@ -22,9 +22,8 @@ use either::Either;
use itertools::Itertools;
use maplit::{convert_args, hashmap};
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::catalog::{ConnectionId, DatabaseId, SchemaId, TableId, UserId};
use risingwave_common::types::{DataType, Datum};
use risingwave_common::util::value_encoding::DatumFromProtoExt;
use risingwave_common::catalog::{ConnectionId, DatabaseId, Schema, SchemaId, TableId, UserId};
use risingwave_common::types::DataType;
use risingwave_common::{bail, catalog};
use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc, SinkType};
use risingwave_connector::sink::iceberg::{IcebergConfig, ICEBERG_SINK};
Expand All @@ -33,7 +32,6 @@ use risingwave_connector::sink::{
};
use risingwave_pb::catalog::{PbSource, Table};
use risingwave_pb::ddl_service::ReplaceTablePlan;
use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{DispatcherType, MergeNode, StreamFragmentGraph, StreamNode};
Expand All @@ -50,8 +48,9 @@ use crate::binder::Binder;
use crate::catalog::catalog_service::CatalogReadGuard;
use crate::catalog::source_catalog::SourceCatalog;
use crate::error::{ErrorCode, Result, RwError};
use crate::expr::{ExprImpl, InputRef, Literal};
use crate::expr::{ExprImpl, InputRef};
use crate::handler::alter_table_column::fetch_table_catalog_for_alter;
use crate::handler::create_mv::parse_column_names;
use crate::handler::create_table::{generate_stream_graph_for_table, ColumnIdGenerator};
use crate::handler::privilege::resolve_query_privileges;
use crate::handler::util::SourceSchemaCompatExt;
Expand Down Expand Up @@ -80,6 +79,7 @@ pub fn gen_sink_plan(
stmt: CreateSinkStatement,
partition_info: Option<PartitionComputeInfo>,
) -> Result<SinkPlanContext> {
let user_specified_columns = !stmt.columns.is_empty();
let db_name = session.database();
let (sink_schema_name, sink_table_name) =
Binder::resolve_schema_qualified_name(db_name, stmt.sink_name.clone())?;
Expand Down Expand Up @@ -118,8 +118,12 @@ pub fn gen_sink_plan(
let check_items = resolve_query_privileges(&bound);
session.check_privileges(&check_items)?;

// If column names not specified, use the name in materialized view.
let col_names = get_column_names(&bound, session, stmt.columns)?;
let col_names = if sink_into_table_name.is_some() {
parse_column_names(&stmt.columns)
} else {
// If column names not specified, use the name in the bound query, which is equal with the plan root's original field name.
get_column_names(&bound, session, stmt.columns)?
};

let mut with_options = context.with_options().clone();

Expand Down Expand Up @@ -172,8 +176,8 @@ pub fn gen_sink_plan(
};

let mut plan_root = Planner::new(context).plan_query(bound)?;
if let Some(col_names) = col_names {
plan_root.set_out_names(col_names)?;
if let Some(col_names) = &col_names {
plan_root.set_out_names(col_names.clone())?;
};

let without_backfill = match with_options.remove(SINK_WITHOUT_BACKFILL) {
Expand All @@ -196,6 +200,24 @@ pub fn gen_sink_plan(
.map(|table_name| fetch_table_catalog_for_alter(session, table_name))
.transpose()?;

if let Some(target_table_catalog) = &target_table_catalog {
if let Some(col_names) = col_names {
let target_table_columns = target_table_catalog
.columns()
.iter()
.map(|c| c.name())
.collect::<BTreeSet<_>>();
for c in col_names {
if !target_table_columns.contains(c.as_str()) {
return Err(RwError::from(ErrorCode::BindError(format!(
"Column {} not found in table {}",
c, target_table_catalog.name()
))));
}
}
}
}

let target_table = target_table_catalog.as_ref().map(|catalog| catalog.id());

let sink_plan = plan_root.gen_sink_plan(
Expand Down Expand Up @@ -251,7 +273,12 @@ pub fn gen_sink_plan(
)));
}

let exprs = derive_default_column_project_for_sink(&sink_catalog, table_catalog)?;
let exprs = derive_default_column_project_for_sink(
&sink_catalog,
sink_plan.schema(),
table_catalog,
user_specified_columns,
)?;

let logical_project = generic::Project::new(exprs, sink_plan);

Expand Down Expand Up @@ -632,66 +659,74 @@ pub(crate) fn insert_merger_to_union(node: &mut StreamNode) {
insert_merger_to_union(input);
}
}

fn derive_sink_to_table_expr(
sink_schema: &Schema,
idx: usize,
target_type: &DataType,
) -> Result<ExprImpl> {
let input_type = &sink_schema.fields()[idx].data_type;

if target_type != input_type {
bail!(
"column type mismatch: {:?} vs {:?}",
target_type,
input_type
);
} else {
Ok(ExprImpl::InputRef(Box::new(InputRef::new(
idx,
input_type.clone(),
))))
}
}

fn derive_default_column_project_for_sink(
sink: &SinkCatalog,
sink_schema: &Schema,
target_table_catalog: &Arc<TableCatalog>,
user_specified_columns: bool,
) -> Result<Vec<ExprImpl>> {
assert_eq!(sink.full_schema().len(), sink_schema.len());

let mut exprs = vec![];

let sink_visible_columns = sink
let sink_visible_col_idxes = sink
.full_columns()
.iter()
.enumerate()
.filter(|(_i, c)| !c.is_hidden())
.positions(|c| !c.is_hidden())
.collect_vec();
let sink_visible_col_idxes_by_name = sink
.full_columns()
.iter()
.enumerate()
.filter(|(_, c)| !c.is_hidden())
.map(|(i, c)| (c.name(), i))
.collect::<BTreeMap<_, _>>();

for (idx, table_column) in target_table_catalog.columns().iter().enumerate() {
if table_column.is_generated() {
continue;
}

let data_type = table_column.data_type();

if idx < sink_visible_columns.len() {
let (sink_col_idx, sink_column) = sink_visible_columns[idx];

let sink_col_type = sink_column.data_type();
let default_col_expr = || -> ExprImpl { target_table_catalog.default_column_expr(idx) };
let sink_col_expr = |sink_col_idx: usize| -> Result<ExprImpl> {
derive_sink_to_table_expr(sink_schema, sink_col_idx, table_column.data_type())
};

if data_type != sink_col_type {
bail!(
"column type mismatch: {:?} vs {:?}",
data_type,
sink_col_type
);
if user_specified_columns {
if let Some(idx) = sink_visible_col_idxes_by_name.get(table_column.name()) {
exprs.push(sink_col_expr(*idx)?);
} else {
exprs.push(ExprImpl::InputRef(Box::new(InputRef::new(
sink_col_idx,
data_type.clone(),
))));
exprs.push(default_col_expr());
}
} else {
let data = match table_column
.column_desc
.generated_or_default_column
.as_ref()
{
// default column with default value
Some(GeneratedOrDefaultColumn::DefaultColumn(default_column)) => {
Datum::from_protobuf(default_column.get_snapshot_value().unwrap(), data_type)
.unwrap()
}
// default column with no default value
None => None,

// generated column is unreachable
_ => unreachable!(),
if idx < sink_visible_col_idxes.len() {
exprs.push(sink_col_expr(sink_visible_col_idxes[idx])?);
} else {
exprs.push(default_col_expr());
};

exprs.push(ExprImpl::Literal(Box::new(Literal::new(
data,
data_type.clone(),
))));
};
}
}
Ok(exprs)
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/stream_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ impl StreamSink {
.into_stream()
.expect("input should be stream plan")
.clone_with_new_plan_id();

Self {
base,
input,
Expand Down
3 changes: 1 addition & 2 deletions src/sqlparser/src/ast/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,6 @@ impl fmt::Display for CreateSink {
}
}
}

// sql_grammar!(CreateSinkStatement {
// if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS],
// sink_name: Ident,
Expand Down Expand Up @@ -517,7 +516,7 @@ impl ParseTo for CreateSinkStatement {
p.expected("FROM or AS after CREATE SINK sink_name", p.peek_token())?
};

let emit_mode = p.parse_emit_mode()?;
let emit_mode: Option<EmitMode> = p.parse_emit_mode()?;

// This check cannot be put into the `WithProperties::parse_to`, since other
// statements may not need the with properties.
Expand Down
Loading