Skip to content

Commit

Permalink
commit finish catalog in barrier manager
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Jun 24, 2024
1 parent cdbd982 commit eef9224
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 13 deletions.
6 changes: 4 additions & 2 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_common::types::Timestamptz;
use risingwave_common::util::epoch::Epoch;
use risingwave_connector::source::SplitImpl;
use risingwave_hummock_sdk::HummockEpoch;
use risingwave_pb::catalog::CreateType;
use risingwave_pb::catalog::{CreateType, Table};
use risingwave_pb::meta::table_fragments::PbActorStatus;
use risingwave_pb::meta::PausedReason;
use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
Expand All @@ -44,7 +44,7 @@ use tracing::warn;
use super::info::{CommandActorChanges, CommandFragmentChanges, InflightActorInfo};
use super::trace::TracedEpoch;
use crate::barrier::GlobalBarrierManagerContext;
use crate::manager::{DdlType, MetadataManager, WorkerId};
use crate::manager::{DdlType, MetadataManager, StreamingJob, WorkerId};
use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments, TableParallelism};
use crate::stream::{build_actor_connector_splits, SplitAssignment, ThrottleConfig};
use crate::MetaResult;
Expand Down Expand Up @@ -183,6 +183,8 @@ pub enum Command {
/// for a while** until the `finish` channel is signaled, then the state of `TableFragments`
/// will be set to `Created`.
CreateStreamingJob {
streaming_job: StreamingJob,
internal_tables: Vec<Table>,
table_fragments: TableFragments,
/// Refer to the doc on [`MetadataManager::get_upstream_root_fragments`] for the meaning of "root".
upstream_root_actors: HashMap<TableId, Vec<ActorId>>,
Expand Down
17 changes: 12 additions & 5 deletions src/meta/src/barrier/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,23 +179,30 @@ impl TrackingJob {
}

pub(crate) async fn pre_finish(&self) -> MetaResult<()> {
let table_fragments = match &self {
let metadata = match &self {
TrackingJob::New(command) => match &command.context.command {
Command::CreateStreamingJob {
table_fragments, ..
} => Some(table_fragments),
table_fragments,
streaming_job,
internal_tables,
..
} => Some((table_fragments, streaming_job, internal_tables)),
_ => None,
},
TrackingJob::Recovered(recovered) => Some(&recovered.fragments),
_ => todo!(),
// TrackingJob::Recovered(recovered) => Some((&recovered.fragments, todo!(), todo!())),
};
// Update the state of the table fragments from `Creating` to `Created`, so that the
// fragments can be scaled.
if let Some(table_fragments) = table_fragments {
if let Some((table_fragments, stream_job, internal_tables)) = metadata {
match self.metadata_manager() {
MetadataManager::V1(mgr) => {
mgr.fragment_manager
.mark_table_fragments_created(table_fragments.table_id())
.await?;
mgr.catalog_manager
.finish_stream_job(stream_job.clone(), internal_tables.clone())
.await?;
}
MetadataManager::V2(_) => {}
}
Expand Down
7 changes: 3 additions & 4 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1317,10 +1317,8 @@ impl DdlController {
};

tracing::debug!(id = job_id, "finishing stream job");
let version = mgr
.catalog_manager
.finish_stream_job(stream_job, internal_tables)
.await?;
// TODO(kwannoel): Poll the rx here.
let version = 0;
tracing::debug!(id = job_id, "finished stream job");

Ok(version)
Expand Down Expand Up @@ -1654,6 +1652,7 @@ impl DdlController {
mv_table_id: stream_job.mv_table(),
create_type: stream_job.create_type(),
ddl_type: stream_job.into(),
streaming_job: stream_job.clone(),
replace_table_job_info,
option: CreateStreamingJobOption {},
};
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/rpc/ddl_controller_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ impl DdlController {

// create streaming jobs.
let stream_job_id = streaming_job.id();
match (streaming_job.create_type(), streaming_job) {
match (streaming_job.create_type(), &streaming_job) {
(CreateType::Unspecified, _)
| (CreateType::Foreground, _)
// FIXME(kwannoel): Unify background stream's creation path with MV below.
Expand Down
9 changes: 8 additions & 1 deletion src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ pub struct CreateStreamingJobContext {
pub replace_table_job_info: Option<(StreamingJob, ReplaceTableContext, TableFragments)>,

pub option: CreateStreamingJobOption,

pub streaming_job: StreamingJob,
}

impl CreateStreamingJobContext {
Expand Down Expand Up @@ -237,7 +239,7 @@ impl GlobalStreamManager {
let stream_manager = self.clone();
let fut = async move {
let res = stream_manager
.create_streaming_job_impl( table_fragments, ctx)
.create_streaming_job_impl(table_fragments, ctx)
.await;
match res {
Ok(_) => {
Expand Down Expand Up @@ -392,6 +394,7 @@ impl GlobalStreamManager {
&self,
table_fragments: TableFragments,
CreateStreamingJobContext {
streaming_job,
dispatchers,
upstream_root_actors,
building_locations,
Expand All @@ -400,6 +403,7 @@ impl GlobalStreamManager {
create_type,
ddl_type,
replace_table_job_info,
internal_tables,
..
}: CreateStreamingJobContext,
) -> MetaResult<()> {
Expand Down Expand Up @@ -469,6 +473,8 @@ impl GlobalStreamManager {
dispatchers,
init_split_assignment,
definition: definition.to_string(),
streaming_job,
internal_tables: internal_tables.into_values().collect_vec(),
ddl_type,
replace_table: replace_table_command,
create_type,
Expand Down Expand Up @@ -1115,6 +1121,7 @@ mod tests {
id: table_id.table_id(),
..Default::default()
};
let streaming_job = StreamingJob::MaterializedView(table.clone());
let table_fragments = TableFragments::new(
table_id,
fragments,
Expand Down

0 comments on commit eef9224

Please sign in to comment.