Skip to content

Commit

Permalink
move clone to a level up
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Jul 6, 2024
1 parent 448b213 commit a42fea6
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 35 deletions.
49 changes: 14 additions & 35 deletions src/meta/src/rpc/ddl_controller_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ use thiserror_ext::AsReport;

use crate::controller::catalog::ReleaseContext;
use crate::manager::{
MetadataManagerV2, NotificationVersion, StreamingJob, IGNORED_NOTIFICATION_VERSION,
MetadataManagerV2, NotificationVersion, StreamingJob, StreamingJobDiscriminants,
IGNORED_NOTIFICATION_VERSION,
};
use crate::model::{MetadataModel, StreamContext};
use crate::rpc::ddl_controller::{
Expand Down Expand Up @@ -86,7 +87,7 @@ impl DdlController {
.create_streaming_job_inner_v2(
mgr,
ctx,
&mut streaming_job,
streaming_job.clone(),
fragment_graph,
affected_table_replace_info,
)
Expand Down Expand Up @@ -126,12 +127,12 @@ impl DdlController {
&self,
mgr: &MetadataManagerV2,
ctx: StreamContext,
streaming_job: &mut StreamingJob,
mut streaming_job: StreamingJob,
fragment_graph: StreamFragmentGraphProto,
affected_table_replace_info: Option<ReplaceTableInfo>,
) -> MetaResult<NotificationVersion> {
let mut fragment_graph =
StreamFragmentGraph::new(&self.env, fragment_graph, streaming_job).await?;
StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job).await?;
streaming_job.set_table_fragment_id(fragment_graph.table_fragment_id());
streaming_job.set_dml_fragment_id(fragment_graph.dml_fragment_id());

Expand Down Expand Up @@ -162,53 +163,31 @@ impl DdlController {
None => None,
};

let streaming_job_id = streaming_job.id();
let streaming_job_create_type = streaming_job.create_type();
let streaming_job_type: StreamingJobDiscriminants = (&streaming_job).into();

// create fragment and actor catalogs.
tracing::debug!(id = streaming_job.id(), "building streaming job");
let (ctx, table_fragments) = self
.build_stream_job(
ctx,
streaming_job.clone(),
streaming_job,
fragment_graph,
affected_table_replace_info,
)
.await?;

match streaming_job {
StreamingJob::Table(None, table, TableJobType::SharedCdcSource) => {
Self::validate_cdc_table(table, &table_fragments).await?;
}
StreamingJob::Table(Some(source), ..) => {
// Register the source on the connector node.
self.source_manager.register_source(source).await?;
}
StreamingJob::Sink(sink, target_table) => {
if let Some((StreamingJob::Table(source, table, _), ..)) =
&ctx.replace_table_job_info
{
*target_table = Some((table.clone(), source.clone()));
}

// Validate the sink on the connector node.
validate_sink(sink).await?;
}
StreamingJob::Source(source) => {
// Register the source on the connector node.
self.source_manager.register_source(source).await?;
}
_ => {}
}

mgr.catalog_controller
.prepare_streaming_job(table_fragments.to_protobuf(), streaming_job, false)
.prepare_streaming_job(table_fragments.to_protobuf(), ctx.streaming_job(), false)
.await?;

// create streaming jobs.
let stream_job_id = streaming_job.id();
match (streaming_job.create_type(), &streaming_job) {
match (streaming_job_create_type, streaming_job_type) {
(CreateType::Unspecified, _)
| (CreateType::Foreground, _)
// FIXME(kwannoel): Unify background stream's creation path with MV below.
| (CreateType::Background, StreamingJob::Sink(_, _)) => {
| (CreateType::Background, StreamingJobDiscriminants::Sink) => {
let version = self.stream_manager
.create_streaming_job(table_fragments, ctx)
.await?;
Expand All @@ -221,7 +200,7 @@ impl DdlController {
.stream_manager
.create_streaming_job(table_fragments, ctx)
.await.inspect_err(|err| {
tracing::error!(id = stream_job_id, error = ?err.as_report(), "failed to create background streaming job");
tracing::error!(id = streaming_job_id, error = ?err.as_report(), "failed to create background streaming job");
});
};
tokio::spawn(fut);
Expand Down
4 changes: 4 additions & 0 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ impl CreateStreamingJobContext {
pub fn internal_tables(&self) -> Vec<Table> {
self.internal_tables.values().cloned().collect()
}

pub fn streaming_job(&self) -> &StreamingJob {
&self.streaming_job
}
}

pub enum CreatingState {
Expand Down

0 comments on commit a42fea6

Please sign in to comment.