Skip to content

Commit

Permalink
Refactor streaming job ctrl: Clean imports, remove cycle check, adjus…
Browse files Browse the repository at this point in the history
…t types
  • Loading branch information
shanicky committed Feb 23, 2024
1 parent 4c687ec commit ad3bf1d
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 58 deletions.
70 changes: 13 additions & 57 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::num::NonZeroUsize;

use itertools::Itertools;
Expand All @@ -35,7 +35,6 @@ use risingwave_meta_model_v2::{
FragmentId, I32Array, IndexId, JobStatus, ObjectId, SchemaId, SourceId, StreamingParallelism,
TableId, TableVersion, UserId,
};
use risingwave_pb::catalog;
use risingwave_pb::catalog::source::PbOptionalAssociatedTableId;
use risingwave_pb::catalog::table::{PbOptionalAssociatedSourceId, PbTableVersion};
use risingwave_pb::catalog::{PbCreateType, PbTable};
Expand Down Expand Up @@ -66,8 +65,8 @@ use crate::barrier::Reschedule;
use crate::controller::catalog::CatalogController;
use crate::controller::rename::ReplaceTableExprRewriter;
use crate::controller::utils::{
check_relation_name_duplicate, ensure_object_id, ensure_user_id, get_fragment_actor_ids,
get_fragment_mappings,
check_relation_name_duplicate, check_sink_into_table_cycle, ensure_object_id, ensure_user_id,
get_fragment_actor_ids, get_fragment_mappings,
};
use crate::controller::ObjectModel;
use crate::manager::{NotificationVersion, SinkId, StreamingJob};
Expand Down Expand Up @@ -99,58 +98,6 @@ impl CatalogController {
Ok(obj.oid)
}

async fn check_cycle_for_table_sink(
table: u32,
sink: &catalog::Sink,
txn: &DatabaseTransaction,
) -> MetaResult<bool> {
let mut queue: VecDeque<(ObjectId, ObjectType)> = VecDeque::new();

let mut visited_objects = HashSet::new();

visited_objects.insert(table as ObjectId);

for table_id in &sink.dependent_relations {
queue.push_front((*table_id as ObjectId, ObjectType::Table));
}

while let Some((object_id, object_type)) = queue.pop_front() {
if visited_objects.contains(&object_id) {
return Ok(true);
}

visited_objects.insert(object_id);

if object_type == ObjectType::Table {
let table = Table::find_by_id(object_id)
.one(txn)
.await?
.ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;

for sink_id in table.incoming_sinks.inner_ref() {
queue.push_front((*sink_id, ObjectType::Sink));
}
}

let dependent_relations: Vec<(ObjectId, ObjectType)> = ObjectDependency::find()
.select_only()
.column(object_dependency::Column::Oid)
.column(object::Column::ObjType)
.join(
JoinType::InnerJoin,
object_dependency::Relation::Object2.def(),
)
.filter(object_dependency::Column::UsedBy.eq(object_id))
.into_tuple()
.all(txn)
.await?;

queue.extend(dependent_relations.into_iter());
}

Ok(false)
}

pub async fn create_job_catalog(
&self,
streaming_job: &mut StreamingJob,
Expand Down Expand Up @@ -196,7 +143,16 @@ impl CatalogController {
}
StreamingJob::Sink(sink, _) => {
if let Some(target_table_id) = sink.target_table {
if Self::check_cycle_for_table_sink(target_table_id, sink, &txn).await? {
if check_sink_into_table_cycle(
target_table_id as ObjectId,
sink.dependent_relations
.iter()
.map(|id| *id as ObjectId)
.collect(),
&txn,
)
.await?
{
bail!("Creating such a sink will result in circular dependency.");
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/meta/src/controller/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,8 @@ where
))
.await?
.unwrap();
let cnt: i32 = res.try_get_by(0)?;

let cnt: i64 = res.try_get_by(0)?;

Ok(cnt != 0)
}
Expand Down

0 comments on commit ad3bf1d

Please sign in to comment.