Skip to content

Commit

Permalink
Modified error message and removed functions in alter_table_column.rs…
Browse files Browse the repository at this point in the history
… and catalog/mod.rs

tmp

fix conlict

Signed-off-by: Shanicky Chen <[email protected]>
  • Loading branch information
shanicky committed Nov 24, 2023
1 parent b905cd5 commit 239c0d7
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 151 deletions.
120 changes: 7 additions & 113 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,6 @@ fn check_cycle_for_sink(

visit_sink(session, &reader, &sinks, &sink_catalog, &mut visited_tables)
}

pub async fn handle_create_sink(
handle_args: HandlerArgs,
stmt: CreateSinkStatement,
Expand Down Expand Up @@ -352,10 +351,13 @@ pub async fn handle_create_sink(

let mut graph = build_graph(plan);

graph.parallelism = session
.config()
.get_streaming_parallelism()
.map(|parallelism| Parallelism { parallelism });
graph.parallelism =
session
.config()
.streaming_parallelism()
.map(|parallelism| Parallelism {
parallelism: parallelism.get(),
});

(sink, graph, target_table_catalog)
};
Expand Down Expand Up @@ -449,114 +451,6 @@ pub async fn handle_create_sink(
});
}

let (sink, graph) = {
let context = Rc::new(OptimizerContext::from_handler_args(handle_args));

let (query, mut plan, sink) = gen_sink_plan(&session, context.clone(), stmt)?;

if let Some(replace_table_plan) = &affected_table_change {
let affected_table_catalog = replace_table_plan.table.as_ref().unwrap();

for column in sink.full_columns() {
if column.is_generated() {
return Err(RwError::from(ErrorCode::BindError(
"The sink to table feature for Sinks with generated columns has not been implemented yet."
.to_string(),
)));
}
}

let user_defined_primary_key_table = !(affected_table_catalog.append_only
|| affected_table_catalog.row_id_index.is_some());

if !(user_defined_primary_key_table
|| sink.sink_type == SinkType::AppendOnly
|| sink.sink_type == SinkType::ForceAppendOnly)
{
return Err(RwError::from(ErrorCode::BindError(
"Only append-only sinks can sink to a table without primary keys.".to_string(),
)));
}

let mut exprs = vec![];

let table_columns = affected_table_catalog
.get_columns()
.iter()
.map(|col| ColumnCatalog::from(col.clone()))
.collect_vec();

let sink_visible_columns = sink
.full_columns()
.iter()
.enumerate()
.filter(|(_i, c)| !c.is_hidden())
.collect_vec();

for (idx, table_column) in table_columns.iter().enumerate() {
if table_column.is_generated() {
continue;
}

let data_type = table_column.data_type();

if idx < sink_visible_columns.len() {
let (sink_col_idx, sink_column) = sink_visible_columns[idx];

let sink_col_type = sink_column.data_type();

if data_type != sink_col_type {
bail!(
"column type mismatch: {:?} vs {:?}",
data_type,
sink_col_type
);
} else {
exprs.push(ExprImpl::InputRef(Box::new(InputRef::new(
sink_col_idx,
data_type.clone(),
))));
}
} else {
exprs.push(ExprImpl::Literal(Box::new(Literal::new(
None,
data_type.clone(),
))));
};
}

let logical_project = generic::Project::new(exprs, plan);

plan = StreamProject::new(logical_project).into();

let exprs = LogicalSource::derive_output_exprs_from_generated_columns(&table_columns)?;
if let Some(exprs) = exprs {
let logical_project = generic::Project::new(exprs, plan);
plan = StreamProject::new(logical_project).into();
}
};

let has_order_by = !query.order_by.is_empty();
if has_order_by {
context.warn_to_user(
r#"The ORDER BY clause in the CREATE SINK statement has no effect at all."#
.to_string(),
);
}

let mut graph = build_graph(plan);

graph.parallelism =
session
.config()
.streaming_parallelism()
.map(|parallelism| Parallelism {
parallelism: parallelism.get(),
});

(sink, graph)
};

let _job_guard =
session
.env()
Expand Down
6 changes: 4 additions & 2 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1139,8 +1139,10 @@ pub async fn generate_stream_graph_for_table(
let graph = StreamFragmentGraph {
parallelism: session
.config()
.get_streaming_parallelism()
.map(|parallelism| Parallelism { parallelism }),
.streaming_parallelism()
.map(|parallelism| Parallelism {
parallelism: parallelism.get(),
}),
..build_graph(plan)
};

Expand Down
48 changes: 13 additions & 35 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1098,16 +1098,6 @@ impl CatalogManager {
.collect()
};

let check_sink_into_table_in_dependency = |relation_id: RelationId| -> Vec<Sink> {
let mut table_sinks = vec![];
for relation_info in relations_depend_on(relation_id) {
if let RelationInfo::Sink(s) = relation_info && s.target_table.is_some() {
table_sinks.push(s);
}
}
table_sinks
};

// Initial push into deque.
match relation {
RelationIdEnum::Table(table_id) => {
Expand Down Expand Up @@ -1168,14 +1158,6 @@ impl CatalogManager {
continue;
}

let associated_sink_into_tables = check_sink_into_table_in_dependency(table_id);
if !associated_sink_into_tables.is_empty() {
bail!(
"Found {} sink(s) into table in dependency, please drop them manually",
associated_sink_into_tables.len()
);
}

let table_fragments = fragment_manager
.select_table_fragments_by_table_id(&table_id.into())
.await?;
Expand Down Expand Up @@ -1332,15 +1314,6 @@ impl CatalogManager {
continue;
}

let associated_sink_into_tables =
check_sink_into_table_in_dependency(source.id as RelationId);
if !associated_sink_into_tables.is_empty() {
bail!(
"Found {} sink(s) into table in dependency, please drop them manually",
associated_sink_into_tables.len()
);
}

// cdc source streaming job
if let Some(info) = source.info
&& info.cdc_source_job
Expand Down Expand Up @@ -1380,14 +1353,6 @@ impl CatalogManager {
continue;
}

let associated_sink_into_tables = check_sink_into_table_in_dependency(view.id);
if !associated_sink_into_tables.is_empty() {
bail!(
"Found {} sink(s) into table in dependency, please drop them manually",
associated_sink_into_tables.len()
);
}

if let Some(ref_count) = database_core.relation_ref_count.get(&view.id).cloned()
{
if ref_count > 0 {
Expand Down Expand Up @@ -1468,6 +1433,19 @@ impl CatalogManager {
.map(|sink_id| sinks.remove(*sink_id).unwrap())
.collect_vec();

if !matches!(relation, RelationIdEnum::Sink(_)) {
let table_sink = sinks_removed
.iter()
.filter(|sink| sink.target_table.is_some())
.collect_vec();
if !table_sink.is_empty() {
bail!(
"Found {} sink(s) into table in dependency, please drop them manually",
table_sink.len()
);
}
}

let internal_tables = all_internal_table_ids
.iter()
.map(|internal_table_id| {
Expand Down
1 change: 0 additions & 1 deletion src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,6 @@ impl DdlController {
hash_mapping: Some(actor_mapping.to_protobuf()),
dispatcher_id: union_fragment.fragment_id as _,
downstream_actor_id: downstream_actor_ids.clone(),
downstream_table_name: None,
}],
);
}
Expand Down

0 comments on commit 239c0d7

Please sign in to comment.