Skip to content

Commit

Permalink
record dependencies in Request message
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Nov 15, 2024
1 parent 695953b commit 0acf15b
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 20 deletions.
2 changes: 1 addition & 1 deletion proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ message Table {
repeated plan_common.ColumnCatalog columns = 5;
repeated common.ColumnOrder pk = 6;
// For cdc table created from a cdc source, here records the source id.
repeated uint32 dependent_relations = 8;
repeated uint32 dependent_relations = 8; // TODO(rc): deprecate this by passing dependencies via `Request` message
oneof optional_associated_source_id {
uint32 associated_source_id = 9;
}
Expand Down
6 changes: 6 additions & 0 deletions src/meta/service/src/ddl_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ impl DdlService for DdlServiceImpl {
fragment_graph,
CreateType::Foreground,
None,
HashSet::new(), // TODO(rc): pass dependencies through this field instead of `PbSource`
))
.await?;
Ok(Response::new(CreateSourceResponse {
Expand Down Expand Up @@ -295,6 +296,7 @@ impl DdlService for DdlServiceImpl {
fragment_graph,
CreateType::Foreground,
affected_table_change.map(Self::extract_replace_table_info),
HashSet::new(), // TODO(rc): pass dependencies through this field instead of `PbSink`
);

let version = self.ddl_controller.run_command(command).await?;
Expand Down Expand Up @@ -380,6 +382,7 @@ impl DdlService for DdlServiceImpl {
let mview = req.get_materialized_view()?.clone();
let create_type = mview.get_create_type().unwrap_or(CreateType::Foreground);
let fragment_graph = req.get_fragment_graph()?.clone();
let dependencies = req.get_dependencies().iter().map(|id| *id as i32).collect();

let stream_job = StreamingJob::MaterializedView(mview);
let version = self
Expand All @@ -389,6 +392,7 @@ impl DdlService for DdlServiceImpl {
fragment_graph,
create_type,
None,
dependencies,
))
.await?;

Expand Down Expand Up @@ -442,6 +446,7 @@ impl DdlService for DdlServiceImpl {
fragment_graph,
CreateType::Foreground,
None,
HashSet::new(),
))
.await?;

Expand Down Expand Up @@ -528,6 +533,7 @@ impl DdlService for DdlServiceImpl {
fragment_graph,
CreateType::Foreground,
None,
HashSet::new(), // TODO(rc): pass dependencies through this field instead of `PbTable`
))
.await?;

Expand Down
34 changes: 22 additions & 12 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ impl CatalogController {
ctx: &StreamContext,
parallelism: &Option<Parallelism>,
max_parallelism: usize,
mut dependencies: HashSet<ObjectId>,
) -> MetaResult<()> {
let inner = self.inner.write().await;
let txn = inner.db.begin().await?;
Expand All @@ -133,9 +134,16 @@ impl CatalogController {
)
.await?;

// check if any dependent relation is in altering status.
let dependent_relations = streaming_job.dependent_relations();
if !dependent_relations.is_empty() {
// TODO(rc): pass all dependencies uniformly, deprecate `dependent_relations` and `dependent_secret_ids`.
dependencies.extend(
streaming_job
.dependent_relations()
.into_iter()
.map(|id| id as ObjectId),
);

// check if any dependency is in altering status.
if !dependencies.is_empty() {
let altering_cnt = ObjectDependency::find()
.join(
JoinType::InnerJoin,
Expand All @@ -144,7 +152,7 @@ impl CatalogController {
.join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
.filter(
object_dependency::Column::Oid
.is_in(dependent_relations.iter().map(|id| *id as ObjectId))
.is_in(dependencies.clone())
.and(object::Column::ObjType.eq(ObjectType::Table))
.and(streaming_job::Column::JobStatus.ne(JobStatus::Created))
.and(
Expand Down Expand Up @@ -308,17 +316,19 @@ impl CatalogController {
}
}

// get dependent secrets.
let dependent_secret_ids = streaming_job.dependent_secret_ids()?;
// collect dependent secrets.
dependencies.extend(
streaming_job
.dependent_secret_ids()?
.into_iter()
.map(|secret_id| secret_id as ObjectId),
);

let dependent_objs = dependent_relations
.iter()
.chain(dependent_secret_ids.iter());
// record object dependency.
if !dependent_secret_ids.is_empty() || !dependent_relations.is_empty() {
ObjectDependency::insert_many(dependent_objs.map(|id| {
if !dependencies.is_empty() {
ObjectDependency::insert_many(dependencies.into_iter().map(|oid| {
object_dependency::ActiveModel {
oid: Set(*id as _),
oid: Set(oid),
used_by: Set(streaming_job.id() as _),
..Default::default()
}
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/manager/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ impl StreamingJob {
}
}

// TODO: record all objects instead.
// TODO: to be removed, pass all objects uniformly through `dependencies` field instead.
pub fn dependent_relations(&self) -> Vec<u32> {
match self {
StreamingJob::MaterializedView(table) => table.dependent_relations.clone(),
Expand Down
18 changes: 12 additions & 6 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use std::cmp::Ordering;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -36,9 +36,10 @@ use risingwave_connector::source::{
};
use risingwave_connector::{dispatch_source_prop, WithOptionsSecResolved};
use risingwave_meta_model::object::ObjectType;
use risingwave_meta_model::prelude::ObjectDependency;
use risingwave_meta_model::{
ConnectionId, DatabaseId, FunctionId, IndexId, ObjectId, SchemaId, SecretId, SinkId, SourceId,
SubscriptionId, TableId, UserId, ViewId,
object_dependency, ConnectionId, DatabaseId, FunctionId, IndexId, ObjectId, SchemaId, SecretId,
SinkId, SourceId, SubscriptionId, TableId, UserId, ViewId,
};
use risingwave_pb::catalog::{
Comment, Connection, CreateType, Database, Function, PbSink, Schema, Secret, Sink, Source,
Expand All @@ -58,6 +59,7 @@ use risingwave_pb::stream_plan::{
Dispatcher, DispatcherType, FragmentTypeFlag, MergeNode, PbStreamFragmentGraph,
StreamFragmentGraph as StreamFragmentGraphProto,
};
use sea_orm::{EntityTrait, Set};
use thiserror_ext::AsReport;
use tokio::sync::Semaphore;
use tokio::time::sleep;
Expand Down Expand Up @@ -137,6 +139,7 @@ pub enum DdlCommand {
StreamFragmentGraphProto,
CreateType,
Option<ReplaceTableInfo>,
HashSet<ObjectId>,
),
DropStreamingJob(StreamingJobId, DropMode, Option<ReplaceTableInfo>),
AlterName(alter_name_request::Object, String),
Expand Down Expand Up @@ -177,7 +180,7 @@ impl DdlCommand {
| DdlCommand::CommentOn(_)
| DdlCommand::CreateSecret(_)
| DdlCommand::AlterSwapRename(_) => true,
DdlCommand::CreateStreamingJob(_, _, _, _)
DdlCommand::CreateStreamingJob(_, _, _, _, _)
| DdlCommand::CreateSourceWithoutStreamingJob(_)
| DdlCommand::ReplaceTable(_)
| DdlCommand::AlterSourceColumn(_)
Expand Down Expand Up @@ -310,11 +313,13 @@ impl DdlController {
fragment_graph,
_create_type,
affected_table_replace_info,
dependencies,
) => {
ctrl.create_streaming_job(
stream_job,
fragment_graph,
affected_table_replace_info,
dependencies,
)
.await
}
Expand Down Expand Up @@ -920,6 +925,7 @@ impl DdlController {
mut streaming_job: StreamingJob,
fragment_graph: StreamFragmentGraphProto,
affected_table_replace_info: Option<ReplaceTableInfo>,
dependencies: HashSet<ObjectId>,
) -> MetaResult<NotificationVersion> {
let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap());
self.metadata_manager
Expand All @@ -929,6 +935,7 @@ impl DdlController {
&ctx,
&fragment_graph.parallelism,
fragment_graph.max_parallelism as _,
dependencies,
)
.await?;
let job_id = streaming_job.id();
Expand All @@ -947,7 +954,6 @@ impl DdlController {
.unwrap();
let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;

let id = streaming_job.id();
let name = streaming_job.name();
let definition = streaming_job.definition();
let source_id = match &streaming_job {
Expand All @@ -969,7 +975,7 @@ impl DdlController {
Err(err) => {
tracing::error!(id = job_id, error = %err.as_report(), "failed to create streaming job");
let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
id,
id: job_id,
name,
definition,
error: err.as_report().to_string(),
Expand Down

0 comments on commit 0acf15b

Please sign in to comment.