Skip to content

Commit

Permalink
Optimized column handling and cleaned up code in `alter_table_column.…
Browse files Browse the repository at this point in the history
…rs`.

Signed-off-by: Shanicky Chen <[email protected]>
  • Loading branch information
shanicky committed Jun 12, 2024
1 parent aae270c commit 4679d4d
Show file tree
Hide file tree
Showing 10 changed files with 143 additions and 110 deletions.
1 change: 1 addition & 0 deletions src/common/src/array/data_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ impl DataChunk {
}

pub fn column_at(&self, idx: usize) -> &ArrayRef {
println!("chunk {:?} idx {}", self, idx);
&self.columns[idx]
}

Expand Down
2 changes: 1 addition & 1 deletion src/common/src/array/stream_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ impl StreamChunk {
/// will be `[c, b, a]`. If `indices` is [2, 0], then the output will be `[c, a]`.
/// If the input mapping is identity mapping, no reorder will be performed.
pub fn project(&self, indices: &[usize]) -> Self {
println!("data {:#?}", self.data);
println!("data {:?} indices {:?}", self.data, indices);
Self {
ops: self.ops.clone(),
data: self.data.project(indices),
Expand Down
2 changes: 2 additions & 0 deletions src/expr/core/src/expr/expr_input_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ impl Expression for InputRefExpression {
}

async fn eval(&self, input: &DataChunk) -> Result<ArrayRef> {
println!("self {:?}", self);
println!("input {:?} idx {}", input, self.idx);
Ok(input.column_at(self.idx).clone())
}

Expand Down
24 changes: 17 additions & 7 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{HashMap, HashSet};
use std::collections::HashSet;
use std::rc::Rc;
use std::sync::Arc;

Expand All @@ -21,11 +21,9 @@ use create_sink::derive_default_column_project_for_sink;
use itertools::Itertools;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::bail_not_implemented;
use risingwave_common::catalog::ColumnCatalog;
use risingwave_common::catalog::{ColumnCatalog, Field};
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_connector::sink::catalog::SinkCatalog;
use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
use risingwave_pb::plan_common::DefaultColumnDesc;
use risingwave_pb::stream_plan::StreamFragmentGraph;
use risingwave_sqlparser::ast::{
AlterTableOperation, ColumnOption, ConnectorSchema, Encode, ObjectName, Statement,
Expand Down Expand Up @@ -141,11 +139,16 @@ pub(crate) fn hijack_merger_for_target_table(
let exprs = derive_default_column_project_for_sink(
sink,
&sink.full_schema(),
&target_columns,
&default_columns,
target_columns,
default_columns,
false, // todo
)?;

println!(
"sink {} exprs {:?} target {:?} default {:?}",
sink.name, exprs, target_columns, default_columns
);

let pb_project = StreamProject::new(generic::Project::new(
exprs,
LogicalSource::new(
Expand All @@ -162,7 +165,14 @@ pub(crate) fn hijack_merger_for_target_table(

for fragment in graph.fragments.values_mut() {
if let Some(node) = &mut fragment.node {
insert_merger_to_union_with_project(node, &pb_project);
insert_merger_to_union_with_project(
node,
&pb_project,
&format!(
"{}.{}.{}",
sink.database_id.database_id, sink.schema_id.schema_id, sink.name
),
);
}
}

Expand Down
33 changes: 5 additions & 28 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use risingwave_connector::sink::{
};
use risingwave_pb::catalog::{PbSource, Table};
use risingwave_pb::ddl_service::ReplaceTablePlan;
use risingwave_pb::plan_common::PbField;
use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody};
use risingwave_pb::stream_plan::{DispatcherType, MergeNode, StreamFragmentGraph, StreamNode};
Expand All @@ -56,9 +57,7 @@ use crate::error::{ErrorCode, Result, RwError};
use crate::expr::{ExprImpl, InputRef};
use crate::handler::alter_table_column::fetch_table_catalog_for_alter;
use crate::handler::create_mv::parse_column_names;
use crate::handler::create_table::{
bind_sql_columns, generate_stream_graph_for_table, ColumnIdGenerator,
};
use crate::handler::create_table::{generate_stream_graph_for_table, ColumnIdGenerator};
use crate::handler::privilege::resolve_query_privileges;
use crate::handler::util::SourceSchemaCompatExt;
use crate::handler::HandlerArgs;
Expand Down Expand Up @@ -694,44 +693,22 @@ pub(crate) async fn reparse_table_for_sink(
Ok((graph, table, source))
}

pub(crate) fn insert_merger_to_union(node: &mut StreamNode) {
if let Some(NodeBody::Union(_union_node)) = &mut node.node_body {
node.input.push(StreamNode {
identity: "Merge (sink into table)".to_string(),
fields: node.fields.clone(),
node_body: Some(NodeBody::Merge(MergeNode {
upstream_dispatcher_type: DispatcherType::Hash as _,
..Default::default()
})),
..Default::default()
});

return;
}

for input in &mut node.input {
insert_merger_to_union(input);
}
}

pub(crate) fn insert_merger_to_union_with_project(
node: &mut StreamNode,
project_node: &PbNodeBody,
uniq_name: &str,
) {
if let Some(NodeBody::Union(_union_node)) = &mut node.node_body {
node.input.push(StreamNode {
input: vec![StreamNode {
identity: "Merge (sink into table)".to_string(),
fields: node.fields.clone(),
node_body: Some(NodeBody::Merge(MergeNode {
upstream_dispatcher_type: DispatcherType::Hash as _,
..Default::default()
})),
..Default::default()
}],
stream_key: vec![],
append_only: false,
identity: "".to_string(),
identity: uniq_name.to_string(),
fields: vec![],
node_body: Some(project_node.clone()),
..Default::default()
Expand All @@ -741,7 +718,7 @@ pub(crate) fn insert_merger_to_union_with_project(
}

for input in &mut node.input {
insert_merger_to_union_with_project(input, project_node);
insert_merger_to_union_with_project(input, project_node, uniq_name);
}
}

Expand Down
7 changes: 2 additions & 5 deletions src/frontend/src/handler/drop_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::collections::HashSet;
use std::rc::Rc;
use std::sync::Arc;

use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_pb::ddl_service::ReplaceTablePlan;
Expand All @@ -25,9 +24,7 @@ use crate::binder::Binder;
use crate::catalog::root_catalog::SchemaPath;
use crate::error::Result;
use crate::expr::ExprImpl;
use crate::handler::create_sink::{
fetch_incoming_sinks, insert_merger_to_union, reparse_table_for_sink,
};
use crate::handler::create_sink::{fetch_incoming_sinks, reparse_table_for_sink};
use crate::handler::HandlerArgs;
use crate::{OptimizerContext, TableCatalog};

Expand Down Expand Up @@ -92,7 +89,7 @@ pub async fn handle_drop_sink(

assert!(incoming_sink_ids.remove(&sink_id.sink_id));

let mut incoming_sinks = fetch_incoming_sinks(&session, &incoming_sink_ids)?;
let incoming_sinks = fetch_incoming_sinks(&session, &incoming_sink_ids)?;

for sink in incoming_sinks {
let context = Rc::new(OptimizerContext::from_handler_args(handler_args.clone()));
Expand Down
13 changes: 13 additions & 0 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2678,6 +2678,19 @@ impl CatalogController {
.collect())
}

pub async fn get_sink_by_ids(&self, sink_ids: Vec<SinkId>) -> MetaResult<Vec<PbSink>> {
let inner = self.inner.read().await;
let sink_objs = Sink::find()
.find_also_related(Object)
.filter(sink::Column::SinkId.is_in(sink_ids))
.all(&inner.db)
.await?;
Ok(sink_objs
.into_iter()
.map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into())
.collect())
}

pub async fn get_subscription_by_id(
&self,
subscription_id: SubscriptionId,
Expand Down
17 changes: 17 additions & 0 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3815,6 +3815,23 @@ impl CatalogManager {
Ok(subscription.clone())
}

pub async fn get_sinks(&self, sink_ids: &[SinkId]) -> Vec<Sink> {
let mut sinks = vec![];
let guard = self.core.lock().await;
for sink_id in sink_ids {
// if let Some(table) = guard.database.in_progress_creating_tables.get(table_id) {
// sinks.push(table.clone());
// } else if let Some(table) = guard.database.tables.get(table_id) {
// sinks.push(table.clone());
// }

if let Some(sink) = guard.database.sinks.get(sink_id) {
sinks.push(sink.clone());
}
}
sinks
}

pub async fn get_created_table_ids(&self) -> Vec<u32> {
let guard = self.core.lock().await;
guard
Expand Down
13 changes: 12 additions & 1 deletion src/meta/src/manager/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::time::Duration;
use futures::future::{select, Either};
use risingwave_common::catalog::{TableId, TableOption};
use risingwave_meta_model_v2::SourceId;
use risingwave_pb::catalog::{PbSource, PbTable};
use risingwave_pb::catalog::{PbSink, PbSource, PbTable};
use risingwave_pb::common::worker_node::{PbResource, State};
use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerNode, WorkerType};
use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty;
Expand Down Expand Up @@ -534,6 +534,17 @@ impl MetadataManager {
}
}

pub async fn get_sink_catalog_by_ids(&self, ids: &[u32]) -> MetaResult<Vec<PbSink>> {
match &self {
MetadataManager::V1(mgr) => Ok(mgr.catalog_manager.get_sinks(ids).await),
MetadataManager::V2(mgr) => {
mgr.catalog_controller
.get_sink_by_ids(ids.iter().map(|id| *id as _).collect())
.await
}
}
}

pub async fn get_downstream_chain_fragments(
&self,
job_id: u32,
Expand Down
Loading

0 comments on commit 4679d4d

Please sign in to comment.