Skip to content

Commit

Permalink
cherry-pick feat: specify column when sink into table (#16587) (#16705)
Browse files Browse the repository at this point in the history
Co-authored-by: Eric Fu <[email protected]>
  • Loading branch information
st1page and fuyufjh authored May 12, 2024
1 parent e659756 commit 8149890
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 63 deletions.
54 changes: 54 additions & 0 deletions e2e_test/sink/sink_into_table/specify_column.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table s (a int, b int, c int) append only;

statement ok
create table t (a int, b int default 900, c int default 9000);

statement error
create sink ss into t(aaa) as select a from s with(type = 'append-only');

statement error
create sink ss into t(a) as select a, b from s with(type = 'append-only');

statement error
create sink ss into t(a, b) as select b from s with(type = 'append-only');

statement error
create sink ss into t(a, b, c, a) as select a, b from s with(type = 'append-only');

statement ok
create sink s1 into t(a,B,c) as select c, b, a from s with(type = 'append-only');

statement ok
create sink s2 into t(a,B) as select 2*c, 2*b from s with(type = 'append-only');

statement ok
create sink s3 into t(c) as select 3*a from s with(type = 'append-only');

statement ok
insert into s values(10, 100, 1000);

query III rowsort
select * from t order by a;
----
1000 100 10
2000 200 9000
NULL 900 30

statement ok
drop sink s1;

statement ok
drop sink s2;

statement ok
drop sink s3;

statement ok
drop table s;

statement ok
drop table t;
14 changes: 14 additions & 0 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,20 @@ impl TableCatalog {
.map(|(i, _)| i)
}

pub fn default_column_expr(&self, col_idx: usize) -> ExprImpl {
if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc { expr, .. })) = self
.columns[col_idx]
.column_desc
.generated_or_default_column
.as_ref()
{
ExprImpl::from_expr_proto(expr.as_ref().unwrap())
.expect("expr in default columns corrupted")
} else {
ExprImpl::literal_null(self.columns[col_idx].data_type().clone())
}
}

pub fn default_columns(&self) -> impl Iterator<Item = (usize, ExprImpl)> + '_ {
self.columns.iter().enumerate().filter_map(|(i, c)| {
if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc {
Expand Down
23 changes: 13 additions & 10 deletions src/frontend/src/handler/create_mv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,24 @@ use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
use crate::session::SessionImpl;
use crate::stream_fragmenter::build_graph;

pub(super) fn parse_column_names(columns: &[Ident]) -> Option<Vec<String>> {
if columns.is_empty() {
None
} else {
Some(columns.iter().map(|v| v.real_value()).collect())
}
}

/// If columns is empty, it means that the user did not specify the column names.
/// In this case, we extract the column names from the query.
/// If columns is not empty, it means that user specify the column names and the user
/// should guarantee that the column names number are consistent with the query.
pub(super) fn get_column_names(
bound: &BoundQuery,
session: &SessionImpl,
columns: Vec<Ident>,
) -> Result<Option<Vec<String>>> {
// If columns is empty, it means that the user did not specify the column names.
// In this case, we extract the column names from the query.
// If columns is not empty, it means that user specify the column names and the user
// should guarantee that the column names number are consistent with the query.
let col_names: Option<Vec<String>> = if columns.is_empty() {
None
} else {
Some(columns.iter().map(|v| v.real_value()).collect())
};

let col_names = parse_column_names(&columns);
if let BoundSetExpr::Select(select) = &bound.body {
// `InputRef`'s alias will be implicitly assigned in `bind_project`.
// If user provide columns name (col_names.is_some()), we don't need alias.
Expand Down
142 changes: 91 additions & 51 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::rc::Rc;
use std::sync::{Arc, LazyLock};

Expand All @@ -22,9 +22,8 @@ use either::Either;
use itertools::Itertools;
use maplit::{convert_args, hashmap};
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::catalog::{ConnectionId, DatabaseId, SchemaId, TableId, UserId};
use risingwave_common::types::{DataType, Datum};
use risingwave_common::util::value_encoding::DatumFromProtoExt;
use risingwave_common::catalog::{ConnectionId, DatabaseId, Schema, SchemaId, TableId, UserId};
use risingwave_common::types::DataType;
use risingwave_common::{bail, catalog};
use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc, SinkType};
use risingwave_connector::sink::iceberg::{IcebergConfig, ICEBERG_SINK};
Expand All @@ -33,7 +32,6 @@ use risingwave_connector::sink::{
};
use risingwave_pb::catalog::{PbSource, Table};
use risingwave_pb::ddl_service::ReplaceTablePlan;
use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{DispatcherType, MergeNode, StreamFragmentGraph, StreamNode};
Expand All @@ -50,8 +48,9 @@ use crate::binder::Binder;
use crate::catalog::catalog_service::CatalogReadGuard;
use crate::catalog::source_catalog::SourceCatalog;
use crate::error::{ErrorCode, Result, RwError};
use crate::expr::{ExprImpl, InputRef, Literal};
use crate::expr::{ExprImpl, InputRef};
use crate::handler::alter_table_column::fetch_table_catalog_for_alter;
use crate::handler::create_mv::parse_column_names;
use crate::handler::create_table::{generate_stream_graph_for_table, ColumnIdGenerator};
use crate::handler::privilege::resolve_query_privileges;
use crate::handler::util::SourceSchemaCompatExt;
Expand Down Expand Up @@ -80,6 +79,7 @@ pub fn gen_sink_plan(
stmt: CreateSinkStatement,
partition_info: Option<PartitionComputeInfo>,
) -> Result<SinkPlanContext> {
let user_specified_columns = !stmt.columns.is_empty();
let db_name = session.database();
let (sink_schema_name, sink_table_name) =
Binder::resolve_schema_qualified_name(db_name, stmt.sink_name.clone())?;
Expand Down Expand Up @@ -118,8 +118,12 @@ pub fn gen_sink_plan(
let check_items = resolve_query_privileges(&bound);
session.check_privileges(&check_items)?;

// If column names not specified, use the name in materialized view.
let col_names = get_column_names(&bound, session, stmt.columns)?;
let col_names = if sink_into_table_name.is_some() {
parse_column_names(&stmt.columns)
} else {
// If column names not specified, use the name in the bound query, which is equal with the plan root's original field name.
get_column_names(&bound, session, stmt.columns)?
};

let mut with_options = context.with_options().clone();

Expand Down Expand Up @@ -172,8 +176,8 @@ pub fn gen_sink_plan(
};

let mut plan_root = Planner::new(context).plan_query(bound)?;
if let Some(col_names) = col_names {
plan_root.set_out_names(col_names)?;
if let Some(col_names) = &col_names {
plan_root.set_out_names(col_names.clone())?;
};

let without_backfill = match with_options.remove(SINK_WITHOUT_BACKFILL) {
Expand All @@ -196,6 +200,25 @@ pub fn gen_sink_plan(
.map(|table_name| fetch_table_catalog_for_alter(session, table_name))
.transpose()?;

if let Some(target_table_catalog) = &target_table_catalog {
if let Some(col_names) = col_names {
let target_table_columns = target_table_catalog
.columns()
.iter()
.map(|c| c.name())
.collect::<BTreeSet<_>>();
for c in col_names {
if !target_table_columns.contains(c.as_str()) {
return Err(RwError::from(ErrorCode::BindError(format!(
"Column {} not found in table {}",
c,
target_table_catalog.name()
))));
}
}
}
}

let target_table = target_table_catalog.as_ref().map(|catalog| catalog.id());

let sink_plan = plan_root.gen_sink_plan(
Expand Down Expand Up @@ -251,7 +274,12 @@ pub fn gen_sink_plan(
)));
}

let exprs = derive_default_column_project_for_sink(&sink_catalog, table_catalog)?;
let exprs = derive_default_column_project_for_sink(
&sink_catalog,
sink_plan.schema(),
table_catalog,
user_specified_columns,
)?;

let logical_project = generic::Project::new(exprs, sink_plan);

Expand Down Expand Up @@ -632,66 +660,78 @@ pub(crate) fn insert_merger_to_union(node: &mut StreamNode) {
insert_merger_to_union(input);
}
}

fn derive_sink_to_table_expr(
sink_schema: &Schema,
idx: usize,
target_type: &DataType,
) -> Result<ExprImpl> {
let input_type = &sink_schema.fields()[idx].data_type;

if target_type != input_type {
bail!(
"column type mismatch: {:?} vs {:?}",
target_type,
input_type
);
} else {
Ok(ExprImpl::InputRef(Box::new(InputRef::new(
idx,
input_type.clone(),
))))
}
}

fn derive_default_column_project_for_sink(
sink: &SinkCatalog,
sink_schema: &Schema,
target_table_catalog: &Arc<TableCatalog>,
user_specified_columns: bool,
) -> Result<Vec<ExprImpl>> {
assert_eq!(sink.full_schema().len(), sink_schema.len());

let mut exprs = vec![];

let sink_visible_columns = sink
let sink_visible_col_idxes = sink
.full_columns()
.iter()
.enumerate()
.filter(|(_i, c)| !c.is_hidden())
.positions(|c| !c.is_hidden())
.collect_vec();
let sink_visible_col_idxes_by_name = sink
.full_columns()
.iter()
.enumerate()
.filter(|(_, c)| !c.is_hidden())
.map(|(i, c)| (c.name(), i))
.collect::<BTreeMap<_, _>>();

for (idx, table_column) in target_table_catalog.columns().iter().enumerate() {
if table_column.is_generated() {
continue;
}

let data_type = table_column.data_type();

if idx < sink_visible_columns.len() {
let (sink_col_idx, sink_column) = sink_visible_columns[idx];

let sink_col_type = sink_column.data_type();
let default_col_expr = || -> ExprImpl { target_table_catalog.default_column_expr(idx) };
let sink_col_expr = |sink_col_idx: usize| -> Result<ExprImpl> {
derive_sink_to_table_expr(sink_schema, sink_col_idx, table_column.data_type())
};

if data_type != sink_col_type {
bail!(
"column type mismatch: {:?} vs {:?}",
data_type,
sink_col_type
);
// If users specified the columns to be inserted e.g. `CREATE SINK s INTO t(a, b)`, the expressions of `Project` will be generated accordingly.
// The missing columns will be filled with default value (`null` if not explicitly defined).
// Otherwise, e.g. `CREATE SINK s INTO t`, the columns will be matched by their order in `select` query and the target table.
#[allow(clippy::collapsible_else_if)]
if user_specified_columns {
if let Some(idx) = sink_visible_col_idxes_by_name.get(table_column.name()) {
exprs.push(sink_col_expr(*idx)?);
} else {
exprs.push(ExprImpl::InputRef(Box::new(InputRef::new(
sink_col_idx,
data_type.clone(),
))));
exprs.push(default_col_expr());
}
} else {
let data = match table_column
.column_desc
.generated_or_default_column
.as_ref()
{
// default column with default value
Some(GeneratedOrDefaultColumn::DefaultColumn(default_column)) => {
Datum::from_protobuf(default_column.get_snapshot_value().unwrap(), data_type)
.unwrap()
}
// default column with no default value
None => None,

// generated column is unreachable
_ => unreachable!(),
if idx < sink_visible_col_idxes.len() {
exprs.push(sink_col_expr(sink_visible_col_idxes[idx])?);
} else {
exprs.push(default_col_expr());
};

exprs.push(ExprImpl::Literal(Box::new(Literal::new(
data,
data_type.clone(),
))));
};
}
}
Ok(exprs)
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/stream_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ impl StreamSink {
.into_stream()
.expect("input should be stream plan")
.clone_with_new_plan_id();

Self {
base,
input,
Expand Down
3 changes: 1 addition & 2 deletions src/sqlparser/src/ast/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,6 @@ impl fmt::Display for CreateSink {
}
}
}

// sql_grammar!(CreateSinkStatement {
// if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS],
// sink_name: Ident,
Expand Down Expand Up @@ -521,7 +520,7 @@ impl ParseTo for CreateSinkStatement {
p.expected("FROM or AS after CREATE SINK sink_name", p.peek_token())?
};

let emit_mode = p.parse_emit_mode()?;
let emit_mode: Option<EmitMode> = p.parse_emit_mode()?;

// This check cannot be put into the `WithProperties::parse_to`, since other
// statements may not need the with properties.
Expand Down

0 comments on commit 8149890

Please sign in to comment.