Skip to content

Commit

Permalink
move check cycle to meta
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky committed Nov 27, 2023
1 parent e687ccc commit 3fd4a57
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 74 deletions.
73 changes: 2 additions & 71 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::rc::Rc;
use std::sync::{Arc, LazyLock};

Expand All @@ -21,12 +21,12 @@ use either::Either;
use itertools::Itertools;
use maplit::{convert_args, hashmap};
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::bail;
use risingwave_common::catalog::{ConnectionId, DatabaseId, SchemaId, UserId};
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::types::Datum;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_common::util::value_encoding::DatumFromProtoExt;
use risingwave_common::{bail, catalog};
use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc, SinkType};
use risingwave_connector::sink::{
CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION,
Expand All @@ -45,7 +45,6 @@ use risingwave_sqlparser::parser::Parser;
use super::create_mv::get_column_names;
use super::RwPgResponse;
use crate::binder::Binder;
use crate::catalog::catalog_service::CatalogReadGuard;
use crate::expr::{ExprImpl, InputRef, Literal};
use crate::handler::alter_table_column::fetch_table_catalog_for_alter;
use crate::handler::create_table::{generate_stream_graph_for_table, ColumnIdGenerator};
Expand Down Expand Up @@ -257,68 +256,6 @@ pub fn gen_sink_plan(
Ok((query, sink_plan, sink_catalog, target_table_catalog))
}

fn check_cycle_for_sink(
session: &SessionImpl,
sink_catalog: SinkCatalog,
table_id: catalog::TableId,
) -> Result<bool> {
let reader = session.env().catalog_reader().read_guard();

let mut sinks = HashMap::new();
let db_name = session.database();
for schema in reader.iter_schemas(db_name)? {
for sink in schema.iter_sink() {
sinks.insert(sink.id.sink_id, sink.as_ref());
}
}
fn visit_sink(
session: &SessionImpl,
reader: &CatalogReadGuard,
sink_index: &HashMap<u32, &SinkCatalog>,
sink: &SinkCatalog,
visited_tables: &mut HashSet<u32>,
) -> Result<bool> {
for table_id in &sink.dependent_relations {
if let Ok(table) = reader.get_table_by_id(table_id) {
if visit_table(session, reader, sink_index, table.as_ref(), visited_tables)? {
return Ok(true);
}
} else {
bail!("table not found: {:?}", table_id);
}
}
Ok(false)
}

fn visit_table(
session: &SessionImpl,
reader: &CatalogReadGuard,
sink_index: &HashMap<u32, &SinkCatalog>,
table: &TableCatalog,
visited_tables: &mut HashSet<u32>,
) -> Result<bool> {
if visited_tables.contains(&table.id.table_id) {
Ok(true)
} else {
let _ = visited_tables.insert(table.id.table_id);
for sink_id in &table.incoming_sinks {
if let Some(sink) = sink_index.get(sink_id) {
if visit_sink(session, reader, sink_index, sink, visited_tables)? {
return Ok(true);
}
} else {
bail!("sink not found: {:?}", sink_id);
}
}
Ok(false)
}
}

let mut visited_tables = HashSet::new();
visited_tables.insert(table_id.table_id);

visit_sink(session, &reader, &sinks, &sink_catalog, &mut visited_tables)
}
pub async fn handle_create_sink(
handle_args: HandlerArgs,
stmt: CreateSinkStatement,
Expand Down Expand Up @@ -370,12 +307,6 @@ pub async fn handle_create_sink(
)));
}

if check_cycle_for_sink(session.as_ref(), sink.clone(), table_catalog.id())? {
return Err(RwError::from(ErrorCode::BindError(
"Creating such a sink will result in circular dependency.".to_string(),
)));
}

// Retrieve the original table definition and parse it to AST.
let [mut definition]: [_; 1] = Parser::parse_sql(&table_catalog.definition)
.context("unable to parse original table definition")?
Expand Down
4 changes: 4 additions & 0 deletions src/meta/src/manager/catalog/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,10 @@ impl DatabaseManager {
self.tables.get(&table_id)
}

pub fn get_sink(&self, sink_id: SinkId) -> Option<&Sink> {
self.sinks.get(&sink_id)
}

pub fn get_all_table_options(&self) -> HashMap<TableId, TableOption> {
self.tables
.iter()
Expand Down
71 changes: 68 additions & 3 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,23 @@
// limitations under the License.

use std::cmp::Ordering;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet, VecDeque};
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::time::Duration;

use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::config::DefaultParallelism;
use risingwave_common::hash::{ParallelUnitMapping, VirtualNode};
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_common::util::epoch::Epoch;
use risingwave_common::util::stream_graph_visitor::visit_stream_node;
use risingwave_pb::catalog;
use risingwave_pb::catalog::connection::private_link_service::PbPrivateLinkProvider;
use risingwave_pb::catalog::{
connection, Comment, Connection, CreateType, Database, Function, Schema, Source, Table, View,
connection, Comment, Connection, CreateType, Database, Function, Schema, Sink, Source, Table,
View,
};
use risingwave_pb::ddl_service::alter_owner_request::Object;
use risingwave_pb::ddl_service::alter_relation_name_request::Relation;
Expand Down Expand Up @@ -569,19 +572,77 @@ impl DdlController {
}
}

async fn check_cycle_for_sink(
&self,
sink: &catalog::Sink,
table_id: TableId,
) -> MetaResult<bool> {
let reader = self.catalog_manager.get_catalog_core_guard().await;

let mut q: VecDeque<RelationIdEnum> = VecDeque::new();

let mut visited_tables = HashSet::new();

visited_tables.insert(table_id);

for table_id in &sink.dependent_relations {
q.push_front(RelationIdEnum::Table(*table_id));
}

while !q.is_empty() {
match q.pop_front() {
Some(RelationIdEnum::Table(table_id)) => {
if visited_tables.contains(&table_id) {
return Ok(true);
}
let _ = visited_tables.insert(table_id);

if let Some(table) = reader.database.get_table(table_id) {
for sink_id in &table.incoming_sinks {
q.push_front(RelationIdEnum::Sink(*sink_id));
}
} else {
bail!("table {} not found when checking cycle in create sink into table procedure", table_id);
}
}
Some(RelationIdEnum::Sink(sink_id)) => {
if let Some(sink) = reader.database.get_sink(sink_id) {
for table_id in &sink.dependent_relations {
q.push_front(RelationIdEnum::Table(*table_id));
}
} else {
bail!("table {} not found when checking cycle in create sink into table procedure", table_id);
}
}
_ => unreachable!(),
}
}

Ok(false)
}

// Here we modify the union node of the downstream table by the TableFragments of the to-be-created sink upstream.
// The merge in the union has already been set up in the frontend and will be filled with specific upstream actors in this function.
// Meanwhile, the Dispatcher corresponding to the upstream of the merge will also be added to the replace table context here.
async fn inject_replace_table_job(
&self,
env: StreamEnvironment,
sink: &Sink,
sink_table_fragments: &TableFragments,
ReplaceTableInfo {
mut streaming_job,
fragment_graph,
col_index_mapping,
}: ReplaceTableInfo,
) -> MetaResult<ReplaceTableJob> {
let table_id = streaming_job.table().unwrap().id;

if self.check_cycle_for_sink(sink, table_id).await? {
return Err(MetaError::invalid_parameter(
"Creating such a sink will result in circular dependency.",
));
}

let fragment_graph = self
.prepare_replace_table(&mut streaming_job, fragment_graph)
.await?;
Expand Down Expand Up @@ -1019,8 +1080,12 @@ impl DdlController {
// 5. If we have other tables that will be affected (for example, Sink into table),
// we need to modify them to generate a replace table plan to change their operational status.
let replace_table_job = if let Some(replace_table_info) = affected_table_replace_info {
let StreamingJob::Sink(sink) = stream_job else {
bail!("Only sink into table can have affected tables")
};

Some(
self.inject_replace_table_job(env, &table_fragments, replace_table_info)
self.inject_replace_table_job(env, sink, &table_fragments, replace_table_info)
.await?,
)
} else {
Expand Down

0 comments on commit 3fd4a57

Please sign in to comment.