Skip to content

Commit

Permalink
fix to send actors
Browse files Browse the repository at this point in the history
Signed-off-by: Shanicky Chen <[email protected]>
  • Loading branch information
shanicky committed Nov 7, 2023
1 parent 2440e52 commit 0ed6559
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 12 deletions.
6 changes: 6 additions & 0 deletions src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ pub async fn handle_alter_table_column(
table.clone()
};

if !original_catalog.incoming_sinks.is_empty() {
return Err(RwError::from(ErrorCode::BindError(
"Alter a table with incoming sinks has not been implemented.".to_string(),
)));
}

// TODO(yuhao): alter table with generated columns.
if original_catalog.has_generated_column() {
return Err(RwError::from(ErrorCode::BindError(
Expand Down
9 changes: 8 additions & 1 deletion src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use maplit::{convert_args, hashmap};
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::bail;
use risingwave_common::catalog::{ConnectionId, DatabaseId, SchemaId, UserId};
use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::types::DataType;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc};
Expand Down Expand Up @@ -238,6 +238,7 @@ pub async fn handle_create_sink(

let mut affected_table_change = None;
let mut table_dist = None;

if let Some(table_name) = target_table {
let db_name = session.database();
let (schema_name, real_table_name) =
Expand Down Expand Up @@ -265,6 +266,12 @@ pub async fn handle_create_sink(
table.clone()
};

if !original_catalog.incoming_sinks.is_empty() {
return Err(RwError::from(ErrorCode::BindError(
"Create sink into table with incoming sinks has not been implemented.".to_string(),
)));
}

// Retrieve the original table definition and parse it to AST.
let [mut definition]: [_; 1] = Parser::parse_sql(&original_catalog.definition)
.context("unable to parse original table definition")?
Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ impl Command {
let to_add = new_table_fragments.actor_ids().into_iter().collect();
let to_remove = old_table_fragments.actor_ids().into_iter().collect();

println!("to add {:?}", to_add);

CommandChanges::Combined(vec![
CommandChanges::CreateTable(table_fragments.table_id()),
CommandChanges::Actor { to_add, to_remove },
Expand Down
28 changes: 18 additions & 10 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::num::NonZeroUsize;
use std::sync::Arc;
use std::time::Duration;

use anyhow::anyhow;
use itertools::Itertools;
use risingwave_common::config::DefaultParallelism;
use risingwave_common::hash::VirtualNode;
Expand Down Expand Up @@ -588,10 +589,10 @@ impl DdlController {
let sink_fragment = sink_table_fragments
.fragments()
.into_iter()
.filter(|x| (x.fragment_type_mask & FragmentTypeFlag::Sink as u32) != 0)
.filter(|fragment| (fragment.fragment_type_mask & FragmentTypeFlag::Sink as u32) != 0)
.exactly_one()
.cloned()
.unwrap();
.map_err(|e| anyhow!("sink fragment not found: {}", e))?;

let sink_actor_ids = sink_fragment
.actors
Expand All @@ -616,6 +617,8 @@ impl DdlController {
visit_stream_node(node, |body| {
if let NodeBody::Merge(m) = body && m.upstream_actor_id.is_empty() {
target_fragment_id = Some(*fragment_id);
fragment.fragment_type_mask &= !(FragmentTypeFlag::Source as u32);
return
};
})
};
Expand All @@ -624,9 +627,12 @@ impl DdlController {

let sink_actor_ids = sink_actor_ids.into_iter().sorted().collect_vec();

let target_fragment_id =
target_fragment_id.expect("fragment of placeholder merger not found");

let merge_actor_ids = replace_table_table_fragments
.fragments
.get(&target_fragment_id.unwrap())
.get(&target_fragment_id)
.unwrap()
.actors
.iter()
Expand All @@ -649,11 +655,12 @@ impl DdlController {
if let Some(node) = &mut actor.nodes {
visit_stream_node(node, |body| {
if let NodeBody::Merge(m) = body && m.upstream_actor_id.is_empty() {
let i = merge_to_sink.get(&actor.actor_id).cloned().unwrap();
m.upstream_actor_id = vec![*i];
let upstream_actor_id = *merge_to_sink.get(&actor.actor_id).cloned().unwrap();
m.upstream_actor_id = vec![upstream_actor_id];
m.upstream_fragment_id = sink_fragment.fragment_id;
m.upstream_dispatcher_type = PbDispatcherType::NoShuffle as _;
actor.upstream_actor_id = vec![*i];

actor.upstream_actor_id = vec![upstream_actor_id];
}
})
}
Expand All @@ -666,13 +673,14 @@ impl DdlController {
.columns
.iter()
.enumerate()
.filter(|(_a, b)| {
b.get_column_desc()
.filter(|(_, column)| {
column
.get_column_desc()
.unwrap()
.generated_or_default_column
.is_none()
})
.map(|(a, _b)| a as u32)
.map(|(a, _)| a as u32)
.collect_vec();

for actor_id in sink_actor_ids.iter() {
Expand All @@ -685,7 +693,7 @@ impl DdlController {
dist_key_indices: vec![],
output_indices: output_indices.clone(),
hash_mapping: None,
dispatcher_id: target_fragment_id.unwrap() as u64,
dispatcher_id: target_fragment_id as u64,
downstream_actor_id: vec![**downstream_actor_id],
downstream_table_name: None,
}],
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/task/barrier_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ impl LocalBarrierManager {
}
}
None => {
// bail!("sender for actor {} does not exist", actor_id)
bail!("sender for actor {} does not exist", actor_id)
}
}
}
Expand Down

0 comments on commit 0ed6559

Please sign in to comment.