Skip to content

Commit

Permalink
Enhanced sink handling and column changes in catalog and DDL operations.
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky committed Jun 13, 2024
1 parent 0baacff commit 8488f35
Show file tree
Hide file tree
Showing 12 changed files with 196 additions and 91 deletions.
2 changes: 2 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ message Sink {

// Handle the sink relies on any sceret. The key is the propertity name and the value is the secret id.
map<string, uint32> secret_ref = 25;

repeated plan_common.ColumnCatalog original_target_columns = 26;
}

message Subscription {
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/sink/catalog/desc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ impl SinkDesc {
initialized_at_cluster_version: None,
create_type: self.create_type,
secret_ref,
original_target_columns: vec![],
}
}

Expand Down
12 changes: 12 additions & 0 deletions src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,8 @@ pub struct SinkCatalog {

/// The secret reference for the sink, mapping from property name to secret id.
pub secret_ref: BTreeMap<String, u32>,

pub original_target_columns: Vec<ColumnCatalog>,
}

impl SinkCatalog {
Expand Down Expand Up @@ -382,6 +384,11 @@ impl SinkCatalog {
initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
create_type: self.create_type.to_proto() as i32,
secret_ref: self.secret_ref.clone(),
original_target_columns: self
.original_target_columns
.iter()
.map(|c| c.to_protobuf())
.collect_vec(),
}
}

Expand Down Expand Up @@ -476,6 +483,11 @@ impl From<PbSink> for SinkCatalog {
created_at_cluster_version: pb.created_at_cluster_version,
create_type: CreateType::from_proto(create_type),
secret_ref: pb.secret_ref,
original_target_columns: pb
.original_target_columns
.into_iter()
.map(ColumnCatalog::from)
.collect_vec(),
}
}
}
Expand Down
96 changes: 48 additions & 48 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,17 @@
// limitations under the License.

use std::collections::HashSet;
use std::rc::Rc;
use std::sync::Arc;

use anyhow::Context;
use create_sink::derive_default_column_project_for_sink;
use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::bail_not_implemented;
use risingwave_common::catalog::ColumnCatalog;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_connector::sink::catalog::SinkCatalog;
use risingwave_pb::stream_plan::StreamFragmentGraph;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::{ProjectNode, StreamFragmentGraph};
use risingwave_sqlparser::ast::{
AlterTableOperation, ColumnOption, ConnectorSchema, Encode, ObjectName, Statement,
};
Expand All @@ -33,18 +32,14 @@ use risingwave_sqlparser::parser::Parser;
use super::create_source::get_json_schema_location;
use super::create_table::{bind_sql_columns, generate_stream_graph_for_table, ColumnIdGenerator};
use super::util::SourceSchemaCompatExt;
use super::{create_sink, HandlerArgs, RwPgResponse};
use super::{HandlerArgs, RwPgResponse};
use crate::catalog::root_catalog::SchemaPath;
use crate::catalog::table_catalog::TableType;
use crate::error::{ErrorCode, Result, RwError};
use crate::expr::ExprImpl;
use crate::expr::{Expr, ExprImpl, InputRef, Literal};
use crate::handler::create_sink::{fetch_incoming_sinks, insert_merger_to_union_with_project};
use crate::optimizer::plan_node::generic::SourceNodeKind;
use crate::optimizer::plan_node::{
generic, LogicalSource, StreamProject, ToStream, ToStreamContext,
};
use crate::session::SessionImpl;
use crate::{Binder, OptimizerContext, TableCatalog, WithOptions};
use crate::{Binder, TableCatalog, WithOptions};

pub async fn replace_table_with_definition(
session: &Arc<SessionImpl>,
Expand Down Expand Up @@ -102,19 +97,10 @@ pub async fn replace_table_with_definition(
);

let incoming_sink_ids: HashSet<_> = original_catalog.incoming_sinks.iter().copied().collect();
let incoming_sinks = fetch_incoming_sinks(session, &incoming_sink_ids)?;
let target_columns = bind_sql_columns(&columns)?;
let default_columns: Vec<ExprImpl> = TableCatalog::default_column_exprs(&target_columns);

for sink in incoming_sinks {
let context = Rc::new(OptimizerContext::from_handler_args(handler_args.clone()));
hijack_merger_for_target_table(
&mut graph,
&target_columns,
&default_columns,
&sink,
context,
)?;

for sink in fetch_incoming_sinks(session, &incoming_sink_ids)? {
hijack_merger_for_target_table(&mut graph, &target_columns, &sink)?;
}

table.incoming_sinks = incoming_sink_ids.iter().copied().collect();
Expand All @@ -130,31 +116,40 @@ pub async fn replace_table_with_definition(
pub(crate) fn hijack_merger_for_target_table(
graph: &mut StreamFragmentGraph,
target_columns: &[ColumnCatalog],
default_columns: &[ExprImpl],
sink: &SinkCatalog,
context: Rc<OptimizerContext>,
) -> Result<()> {
let exprs = derive_default_column_project_for_sink(
sink,
&sink.full_schema(),
target_columns,
default_columns,
false, // todo
)?;

let pb_project = StreamProject::new(generic::Project::new(
exprs,
LogicalSource::new(
None,
sink.full_columns().to_vec(),
None,
SourceNodeKind::CreateTable,
context,
None,
)
.and_then(|s| s.to_stream(&mut ToStreamContext::new(false)))?,
))
.to_stream_prost_body_inner();
let mut sink_columns = sink.original_target_columns.clone();
if sink_columns.is_empty() {
sink_columns = target_columns.to_vec();
}

let mut i = 0;
let mut j = 0;
let mut exprs = Vec::new();

while j < target_columns.len() {
if i < sink_columns.len() && sink_columns[i].data_type() == target_columns[j].data_type() {
exprs.push(ExprImpl::InputRef(Box::new(InputRef {
data_type: sink_columns[i].data_type().clone(),
index: i,
})));

i += 1;
j += 1;
} else {
exprs.push(ExprImpl::Literal(Box::new(Literal::new(
None,
target_columns[j].data_type().clone(),
))));

j += 1;
}
}

let pb_project = PbNodeBody::Project(ProjectNode {
select_list: exprs.iter().map(|expr| expr.to_expr_proto()).collect(),
..Default::default()
});

for fragment in graph.fragments.values_mut() {
if let Some(node) = &mut fragment.node {
Expand All @@ -181,9 +176,6 @@ pub async fn handle_alter_table_column(
) -> Result<RwPgResponse> {
let session = handler_args.session;
let original_catalog = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?;
// if !original_catalog.incoming_sinks.is_empty() {
// bail_not_implemented!("alter table with incoming sinks");
// }

// TODO(yuhao): alter table with generated columns.
if original_catalog.has_generated_column() {
Expand Down Expand Up @@ -219,6 +211,14 @@ pub async fn handle_alter_table_column(
}
}

if !original_catalog.incoming_sinks.is_empty()
&& matches!(operation, AlterTableOperation::DropColumn { .. })
{
return Err(ErrorCode::InvalidInputSyntax(
"dropping columns in target table of sinks is not supported".to_string(),
))?;
}

match operation {
AlterTableOperation::AddColumn {
column_def: new_column,
Expand Down
24 changes: 9 additions & 15 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,6 @@ pub fn gen_sink_plan(
&sink_catalog,
sink_plan.schema(),
table_catalog.columns(),
TableCatalog::default_column_exprs(table_catalog.columns()).as_ref(),
user_specified_columns,
)?;

Expand Down Expand Up @@ -416,7 +415,7 @@ pub async fn handle_create_sink(

let partition_info = get_partition_compute_info(&handle_args.with_options).await?;

let (sink, graph, target_table_catalog) = {
let (mut sink, graph, target_table_catalog) = {
let context = Rc::new(OptimizerContext::from_handler_args(handle_args.clone()));

let SinkPlanContext {
Expand Down Expand Up @@ -454,30 +453,24 @@ pub async fn handle_create_sink(
let (mut graph, mut table, source) =
reparse_table_for_sink(&session, &table_catalog).await?;

sink.original_target_columns = table
.columns
.iter()
.map(|col| ColumnCatalog::from(col.clone()))
.collect_vec();

table
.incoming_sinks
.clone_from(&table_catalog.incoming_sinks);

// let target_columns = bind_sql_columns(&columns)?;

let default_columns: Vec<ExprImpl> =
TableCatalog::default_column_exprs(table_catalog.columns());
// let default_columns: Vec<ExprImpl> = TableCatalog::default_column_exprs(&target_columns);

let incoming_sink_ids: HashSet<_> = table_catalog.incoming_sinks.iter().copied().collect();

let mut incoming_sinks = fetch_incoming_sinks(&session, &incoming_sink_ids)?;

incoming_sinks.push(Arc::new(sink.clone()));

let context = Rc::new(OptimizerContext::from_handler_args(handle_args.clone()));
for sink in incoming_sinks {
crate::handler::alter_table_column::hijack_merger_for_target_table(
&mut graph,
table_catalog.columns(),
&default_columns,
&sink,
context.clone(),
)?;
}

Expand Down Expand Up @@ -746,11 +739,12 @@ pub(crate) fn derive_default_column_project_for_sink(
sink: &SinkCatalog,
sink_schema: &Schema,
columns: &[ColumnCatalog],
default_column_exprs: &[ExprImpl],
user_specified_columns: bool,
) -> Result<Vec<ExprImpl>> {
assert_eq!(sink.full_schema().len(), sink_schema.len());

let default_column_exprs = TableCatalog::default_column_exprs(columns);

let mut exprs = vec![];

let sink_visible_col_idxes = sink
Expand Down
20 changes: 3 additions & 17 deletions src/frontend/src/handler/drop_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use std::collections::HashSet;
use std::rc::Rc;

use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_pb::ddl_service::ReplaceTablePlan;
Expand All @@ -23,10 +22,9 @@ use super::RwPgResponse;
use crate::binder::Binder;
use crate::catalog::root_catalog::SchemaPath;
use crate::error::Result;
use crate::expr::ExprImpl;
use crate::handler::alter_table_column::hijack_merger_for_target_table;
use crate::handler::create_sink::{fetch_incoming_sinks, reparse_table_for_sink};
use crate::handler::HandlerArgs;
use crate::{OptimizerContext, TableCatalog};

pub async fn handle_drop_sink(
handler_args: HandlerArgs,
Expand Down Expand Up @@ -81,25 +79,13 @@ pub async fn handle_drop_sink(
.incoming_sinks
.clone_from(&table_catalog.incoming_sinks);

let default_columns: Vec<ExprImpl> =
TableCatalog::default_column_exprs(table_catalog.columns());

let mut incoming_sink_ids: HashSet<_> =
table_catalog.incoming_sinks.iter().copied().collect();

assert!(incoming_sink_ids.remove(&sink_id.sink_id));

let incoming_sinks = fetch_incoming_sinks(&session, &incoming_sink_ids)?;

for sink in incoming_sinks {
let context = Rc::new(OptimizerContext::from_handler_args(handler_args.clone()));
crate::handler::alter_table_column::hijack_merger_for_target_table(
&mut graph,
table_catalog.columns(),
&default_columns,
&sink,
context,
)?;
for sink in fetch_incoming_sinks(&session, &incoming_sink_ids)? {
hijack_merger_for_target_table(&mut graph, table_catalog.columns(), &sink)?;
}

affected_table_change = Some(ReplaceTablePlan {
Expand Down
2 changes: 2 additions & 0 deletions src/meta/model_v2/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ pub struct Model {
pub target_table: Option<TableId>,
// `secret_ref` stores a json string, mapping from property name to secret id.
pub secret_ref: Option<SecretRef>,
pub original_target_columns: ColumnCatalogArray,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down Expand Up @@ -130,6 +131,7 @@ impl From<PbSink> for ActiveModel {
sink_format_desc: Set(pb_sink.format_desc.as_ref().map(|x| x.into())),
target_table: Set(pb_sink.target_table.map(|x| x as _)),
secret_ref: Set(Some(SecretRef::from(pb_sink.secret_ref))),
original_target_columns: Set(pb_sink.original_target_columns.into()),
}
}
}
1 change: 1 addition & 0 deletions src/meta/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ impl From<ObjectModel<sink::Model>> for PbSink {
created_at_cluster_version: value.1.created_at_cluster_version,
create_type: PbCreateType::Foreground as _,
secret_ref: secret_ref_map,
original_target_columns: value.0.original_target_columns.to_protobuf(),
}
}
}
Expand Down
Loading

0 comments on commit 8488f35

Please sign in to comment.