From eef9224f91536ae466eb2200c3ef254e453fbcf1 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Mon, 24 Jun 2024 23:56:42 +0800 Subject: [PATCH] commit finish catalog in barrier manager --- src/meta/src/barrier/command.rs | 6 ++++-- src/meta/src/barrier/progress.rs | 17 ++++++++++++----- src/meta/src/rpc/ddl_controller.rs | 7 +++---- src/meta/src/rpc/ddl_controller_v2.rs | 2 +- src/meta/src/stream/stream_manager.rs | 9 ++++++++- 5 files changed, 28 insertions(+), 13 deletions(-) diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 785abbbc89077..2f8bfee33d9d8 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -24,7 +24,7 @@ use risingwave_common::types::Timestamptz; use risingwave_common::util::epoch::Epoch; use risingwave_connector::source::SplitImpl; use risingwave_hummock_sdk::HummockEpoch; -use risingwave_pb::catalog::CreateType; +use risingwave_pb::catalog::{CreateType, Table}; use risingwave_pb::meta::table_fragments::PbActorStatus; use risingwave_pb::meta::PausedReason; use risingwave_pb::source::{ConnectorSplit, ConnectorSplits}; @@ -44,7 +44,7 @@ use tracing::warn; use super::info::{CommandActorChanges, CommandFragmentChanges, InflightActorInfo}; use super::trace::TracedEpoch; use crate::barrier::GlobalBarrierManagerContext; -use crate::manager::{DdlType, MetadataManager, WorkerId}; +use crate::manager::{DdlType, MetadataManager, StreamingJob, WorkerId}; use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments, TableParallelism}; use crate::stream::{build_actor_connector_splits, SplitAssignment, ThrottleConfig}; use crate::MetaResult; @@ -183,6 +183,8 @@ pub enum Command { /// for a while** until the `finish` channel is signaled, then the state of `TableFragments` /// will be set to `Created`. CreateStreamingJob { + streaming_job: StreamingJob, + internal_tables: Vec, table_fragments: TableFragments, /// Refer to the doc on [`MetadataManager::get_upstream_root_fragments`] for the meaning of "root". upstream_root_actors: HashMap>, diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index 746a263b06317..a62874a2ba2bc 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -179,23 +179,30 @@ impl TrackingJob { } pub(crate) async fn pre_finish(&self) -> MetaResult<()> { - let table_fragments = match &self { + let metadata = match &self { TrackingJob::New(command) => match &command.context.command { Command::CreateStreamingJob { - table_fragments, .. - } => Some(table_fragments), + table_fragments, + streaming_job, + internal_tables, + .. + } => Some((table_fragments, streaming_job, internal_tables)), _ => None, }, - TrackingJob::Recovered(recovered) => Some(&recovered.fragments), + _ => todo!(), + // TrackingJob::Recovered(recovered) => Some((&recovered.fragments, todo!(), todo!())), }; // Update the state of the table fragments from `Creating` to `Created`, so that the // fragments can be scaled. - if let Some(table_fragments) = table_fragments { + if let Some((table_fragments, stream_job, internal_tables)) = metadata { match self.metadata_manager() { MetadataManager::V1(mgr) => { mgr.fragment_manager .mark_table_fragments_created(table_fragments.table_id()) .await?; + mgr.catalog_manager + .finish_stream_job(stream_job.clone(), internal_tables.clone()) + .await?; } MetadataManager::V2(_) => {} } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index cc5d7dd8971ad..f3dac92212791 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1317,10 +1317,8 @@ impl DdlController { }; tracing::debug!(id = job_id, "finishing stream job"); - let version = mgr - .catalog_manager - .finish_stream_job(stream_job, internal_tables) - .await?; + // TODO(kwannoel): Poll the rx here. + let version = 0; tracing::debug!(id = job_id, "finished stream job"); Ok(version) @@ -1654,6 +1652,7 @@ impl DdlController { mv_table_id: stream_job.mv_table(), create_type: stream_job.create_type(), ddl_type: stream_job.into(), + streaming_job: stream_job.clone(), replace_table_job_info, option: CreateStreamingJobOption {}, }; diff --git a/src/meta/src/rpc/ddl_controller_v2.rs b/src/meta/src/rpc/ddl_controller_v2.rs index 0dabc9b19022d..eb3915bac0f24 100644 --- a/src/meta/src/rpc/ddl_controller_v2.rs +++ b/src/meta/src/rpc/ddl_controller_v2.rs @@ -204,7 +204,7 @@ impl DdlController { // create streaming jobs. let stream_job_id = streaming_job.id(); - match (streaming_job.create_type(), streaming_job) { + match (streaming_job.create_type(), &streaming_job) { (CreateType::Unspecified, _) | (CreateType::Foreground, _) // FIXME(kwannoel): Unify background stream's creation path with MV below. diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index d6a257575fbd2..f8dea1545f4f8 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -76,6 +76,8 @@ pub struct CreateStreamingJobContext { pub replace_table_job_info: Option<(StreamingJob, ReplaceTableContext, TableFragments)>, pub option: CreateStreamingJobOption, + + pub streaming_job: StreamingJob, } impl CreateStreamingJobContext { @@ -237,7 +239,7 @@ impl GlobalStreamManager { let stream_manager = self.clone(); let fut = async move { let res = stream_manager - .create_streaming_job_impl( table_fragments, ctx) + .create_streaming_job_impl(table_fragments, ctx) .await; match res { Ok(_) => { @@ -392,6 +394,7 @@ impl GlobalStreamManager { &self, table_fragments: TableFragments, CreateStreamingJobContext { + streaming_job, dispatchers, upstream_root_actors, building_locations, @@ -400,6 +403,7 @@ impl GlobalStreamManager { create_type, ddl_type, replace_table_job_info, + internal_tables, .. }: CreateStreamingJobContext, ) -> MetaResult<()> { @@ -469,6 +473,8 @@ impl GlobalStreamManager { dispatchers, init_split_assignment, definition: definition.to_string(), + streaming_job, + internal_tables: internal_tables.into_values().collect_vec(), ddl_type, replace_table: replace_table_command, create_type, @@ -1115,6 +1121,7 @@ mod tests { id: table_id.table_id(), ..Default::default() }; + let streaming_job = StreamingJob::MaterializedView(table.clone()); let table_fragments = TableFragments::new( table_id, fragments,