Skip to content

Commit

Permalink
Fixed recovery call and refactored catalog methods.
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky committed Apr 15, 2024
1 parent 9f20ec3 commit 931244d
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 18 deletions.
2 changes: 1 addition & 1 deletion src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ impl GlobalBarrierManagerContext {
tracing::debug!("recovering stream job {}", id);
finished.await.ok().context("failed to finish command")??;
tracing::debug!(id, "finished stream job");
catalog_controller.finish_streaming_job(id, ).await?;
catalog_controller.finish_streaming_job(id, None).await?;
};
if let Err(e) = &res {
tracing::error!(
Expand Down
13 changes: 6 additions & 7 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,8 @@ use crate::controller::utils::{
};
use crate::controller::ObjectModel;
use crate::manager::{Catalog, MetaSrvEnv, NotificationVersion, IGNORED_NOTIFICATION_VERSION};
use crate::model::TableFragments;
use crate::rpc::ddl_controller::DropMode;
use crate::stream::{ReplaceTableContext, SourceManagerRef};
use crate::stream::SourceManagerRef;
use crate::telemetry::MetaTelemetryJobDesc;
use crate::{MetaError, MetaResult};

Expand Down Expand Up @@ -818,7 +817,7 @@ impl CatalogController {
pub async fn finish_streaming_job(
&self,
job_id: ObjectId,
replace_table_job_info: &Option<(StreamingJob, ReplaceTableContext, TableFragments)>,
replace_table_job_info: Option<(crate::manager::StreamingJob, Vec<MergeUpdate>, u32)>,
) -> MetaResult<NotificationVersion> {
let inner = self.inner.write().await;
let txn = inner.db.begin().await?;
Expand Down Expand Up @@ -962,11 +961,11 @@ impl CatalogController {

if let Some((streaming_job, merge_updates, table_id)) = replace_table_job_info {
self.finish_replace_streaming_job(
table_id.table_id as _,
streaming_job,
merge_updates,
table_id as ObjectId,
streaming_job.clone(),
merge_updates.clone(),
None,
Some(stream_job_id),
Some(job_id as _),
None,
)
.await?;
Expand Down
20 changes: 10 additions & 10 deletions src/meta/src/rpc/ddl_controller_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,23 +206,23 @@ impl DdlController {
let stream_job_id = streaming_job.id();
match streaming_job.create_type() {
CreateType::Unspecified | CreateType::Foreground => {
// let replace_table_job_info = ctx.replace_table_job_info.as_ref().map(
// |(streaming_job, ctx, table_fragments)| {
// (
// streaming_job.clone(),
// ctx.merge_updates.clone(),
// table_fragments.table_id(),
// )
// },
// );
let replace_table_job_info = ctx.replace_table_job_info.as_ref().map(
|(streaming_job, ctx, table_fragments)| {
(
streaming_job.clone(),
ctx.merge_updates.clone(),
table_fragments.table_id().table_id(),
)
},
);

self.stream_manager
.create_streaming_job(table_fragments, ctx)
.await?;

let version = mgr
.catalog_controller
.finish_streaming_job(stream_job_id as _, ctx.replace_table_job_info.as_ref())
.finish_streaming_job(stream_job_id as _, replace_table_job_info)
.await?;

Ok(version)
Expand Down

0 comments on commit 931244d

Please sign in to comment.