Skip to content

Commit

Permalink
Add dependent_relations to TableCatalog; update conversion logic
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky committed Feb 20, 2024
1 parent 765d3ae commit 1536204
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 33 deletions.
7 changes: 7 additions & 0 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ pub struct TableCatalog {

pub name: String,

pub dependent_relations: Vec<TableId>,

/// All columns in this table.
pub columns: Vec<ColumnCatalog>,

Expand Down Expand Up @@ -564,6 +566,11 @@ impl From<PbTable> for TableCatalog {
created_at_cluster_version: tb.created_at_cluster_version.clone(),
initialized_at_cluster_version: tb.initialized_at_cluster_version.clone(),
retention_seconds: tb.retention_seconds,
dependent_relations: tb
.dependent_relations
.into_iter()
.map(TableId::from)
.collect_vec(),
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,14 @@ fn check_cycle_for_sink(
}
}

for table_id in &table.dependent_relations {
if let Ok(table) = reader.get_table_by_id(table_id) {
visit_table(session, reader, sink_index, table.as_ref(), visited_tables)?
} else {
bail!("table not found: {:?}", table_id);
}
}

Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/stream_materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ impl StreamMaterialize {
id: TableId::placeholder(),
associated_source_id: None,
name,
dependent_relations: vec![],
columns,
pk: table_pk,
stream_key,
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ impl TableCatalogBuilder {
id: TableId::placeholder(),
associated_source_id: None,
name: String::new(),
dependent_relations: vec![],
columns: self.columns.clone(),
pk: self.pk,
stream_key: vec![],
Expand Down
37 changes: 7 additions & 30 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@ use risingwave_meta_model_v2::fragment::StreamNode;
use risingwave_meta_model_v2::object::ObjectType;
use risingwave_meta_model_v2::prelude::*;
use risingwave_meta_model_v2::table::TableType;
use risingwave_meta_model_v2::{actor, connection, database, fragment, function, index, object, object_dependency, schema, sink, source, streaming_job, table, user_privilege, view, ActorId, ActorUpstreamActors, ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, FunctionId, I32Array, IndexId, JobStatus, ObjectId, PrivateLinkService, SchemaId, SourceId, StreamSourceInfo, StreamingParallelism, TableId, UserId, SinkId};
use risingwave_meta_model_v2::{
actor, connection, database, fragment, function, index, object, object_dependency, schema,
sink, source, streaming_job, table, user_privilege, view, ActorId, ActorUpstreamActors,
ColumnCatalogArray, ConnectionId, CreateType, DatabaseId, FragmentId, FunctionId, I32Array,
IndexId, JobStatus, ObjectId, PrivateLinkService, SchemaId, SinkId, SourceId, StreamSourceInfo,
StreamingParallelism, TableId, UserId,
};
use risingwave_pb::catalog::table::PbTableType;
use risingwave_pb::catalog::{
PbComment, PbConnection, PbDatabase, PbFunction, PbIndex, PbSchema, PbSink, PbSource, PbTable,
Expand Down Expand Up @@ -2281,35 +2287,6 @@ impl CatalogController {
.map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
.collect())
}

pub async fn get_dependent_by_ids(&self, table_ids: Vec<TableId>) -> MetaResult<Vec<PbTable>> {
let inner = self.inner.read().await;
ObjectDependency::find().filter()
// let table_objs = Table::find()
// .find_also_related(Object)
// .filter(table::Column::TableId.is_in(table_ids))
// .all(&inner.db)
// .await?;
// Ok(table_objs
// .into_iter()
// .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
// .collect())
}


// pub async fn get_sink_by_ids(&self, sink_ids: Vec<SinkId>) -> MetaResult<Vec<PbSink>> {
// let inner = self.inner.read().await;
// let sink_objs = Sink::find()
// .find_also_related(Object)
// .filter(sink::Column::SinkId.is_in(sink_ids))
// .all(&inner.db)
// .await?;
// Ok(sink_objs
// .into_iter()
// .map(|(table, obj)| ObjectModel(table, obj.unwrap()).into())
// .collect())
// }

pub async fn find_creating_streaming_job_ids(
&self,
infos: Vec<PbCreatingJobInfo>,
Expand Down
67 changes: 65 additions & 2 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
use std::num::NonZeroUsize;

use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::buffer::Bitmap;
use risingwave_common::hash::{ActorMapping, ParallelUnitId, ParallelUnitMapping};
use risingwave_common::util::column_index_mapping::ColIndexMapping;
Expand All @@ -34,6 +35,7 @@ use risingwave_meta_model_v2::{
FragmentId, I32Array, IndexId, JobStatus, ObjectId, SchemaId, SourceId, StreamingParallelism,
TableId, TableVersion, UserId,
};
use risingwave_pb::catalog;
use risingwave_pb::catalog::source::PbOptionalAssociatedTableId;
use risingwave_pb::catalog::table::{PbOptionalAssociatedSourceId, PbTableVersion};
use risingwave_pb::catalog::{PbCreateType, PbTable};
Expand Down Expand Up @@ -97,6 +99,61 @@ impl CatalogController {
Ok(obj.oid)
}

async fn check_cycle_for_table_sink(
table: u32,
sink: &catalog::Sink,
txn: &DatabaseTransaction,
) -> MetaResult<bool> {
let mut queue: VecDeque<(ObjectId, ObjectType)> = VecDeque::new();

let mut visited_objects = HashSet::new();

visited_objects.insert(table as ObjectId);

for table_id in &sink.dependent_relations {
queue.push_front((*table_id as ObjectId, ObjectType::Table));
}

while let Some((object_id, object_type)) = queue.pop_front() {
if visited_objects.contains(&object_id) {
return Ok(true);
}

visited_objects.insert(object_id);

if object_type == ObjectType::Table {
let table = Table::find_by_id(object_id)
.one(txn)
.await?
.ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;

for sink_id in table.incoming_sinks.inner_ref() {
println!("insert table {} incoming {} ", object_id, sink_id);
queue.push_front((*sink_id, ObjectType::Sink));
}
}

let dependent_relations: Vec<(ObjectId, ObjectType)> = ObjectDependency::find()
.select_only()
.column(object_dependency::Column::Oid)
.column(object::Column::ObjType)
.join(
JoinType::InnerJoin,
object_dependency::Relation::Object2.def(),
)
.filter(object_dependency::Column::UsedBy.eq(object_id))
.into_tuple()
.all(txn)
.await?;

println!("deps {:?}", dependent_relations);

queue.extend(dependent_relations.into_iter());
}

Ok(false)
}

pub async fn create_job_catalog(
&self,
streaming_job: &mut StreamingJob,
Expand Down Expand Up @@ -140,7 +197,13 @@ impl CatalogController {
let table: table::ActiveModel = table.clone().into();
Table::insert(table).exec(&txn).await?;
}
StreamingJob::Sink(sink, _) => {
StreamingJob::Sink(sink, s) => {
if let Some(target_table_id) = sink.target_table {
if Self::check_cycle_for_table_sink(target_table_id, sink, &txn).await? {
bail!("Creating such a sink will result in circular dependency.");
}
}

let job_id = Self::create_streaming_job_obj(
&txn,
ObjectType::Sink,
Expand Down
3 changes: 2 additions & 1 deletion src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use std::cmp::Ordering;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet, VecDeque};
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -38,6 +38,7 @@ use risingwave_connector::source::{
};
use risingwave_meta_model_v2::object::ObjectType;
use risingwave_meta_model_v2::ObjectId;
use risingwave_pb::catalog;
use risingwave_pb::catalog::connection::private_link_service::PbPrivateLinkProvider;
use risingwave_pb::catalog::connection::PrivateLinkService;
use risingwave_pb::catalog::source::OptionalAssociatedTableId;
Expand Down

0 comments on commit 1536204

Please sign in to comment.