From 2a3fca20c66eea0762eb7598b9706a3cf058b611 Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 28 Aug 2024 17:54:31 +0800 Subject: [PATCH] rename struct --- src/meta/src/barrier/creating_job/mod.rs | 4 ++-- src/meta/src/barrier/creating_job/status.rs | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/meta/src/barrier/creating_job/mod.rs b/src/meta/src/barrier/creating_job/mod.rs index 6df93644f27d..1fdfec10257d 100644 --- a/src/meta/src/barrier/creating_job/mod.rs +++ b/src/meta/src/barrier/creating_job/mod.rs @@ -35,7 +35,7 @@ use crate::barrier::command::CommandContext; use crate::barrier::creating_job::barrier_control::{ CreatingStreamingJobBarrierControl, CreatingStreamingJobBarrierType, }; -use crate::barrier::creating_job::status::{CreatingStreamingJobStatus, InjectBarrierInfo}; +use crate::barrier::creating_job::status::{CreatingStreamingJobStatus, CreatingJobInjectBarrierInfo}; use crate::barrier::info::InflightGraphInfo; use crate::barrier::progress::CreateMviewProgressTracker; use crate::barrier::rpc::ControlStreamManager; @@ -276,7 +276,7 @@ impl CreatingStreamingJobControl { .active_graph_info() .expect("must exist when having barriers to inject"); let table_id = self.info.table_fragments.table_id(); - for InjectBarrierInfo { + for CreatingJobInjectBarrierInfo { curr_epoch, prev_epoch, kind, diff --git a/src/meta/src/barrier/creating_job/status.rs b/src/meta/src/barrier/creating_job/status.rs index 67b2a2185bb1..87c05d30ab48 100644 --- a/src/meta/src/barrier/creating_job/status.rs +++ b/src/meta/src/barrier/creating_job/status.rs @@ -55,7 +55,7 @@ pub(super) enum CreatingStreamingJobStatus { }, } -pub(super) struct InjectBarrierInfo { +pub(super) struct CreatingJobInjectBarrierInfo { pub curr_epoch: TracedEpoch, pub prev_epoch: TracedEpoch, pub kind: BarrierKind, @@ -96,7 +96,7 @@ impl CreatingStreamingJobStatus { pub(super) fn may_inject_fake_barrier( &mut self, is_checkpoint: bool, - ) -> Option<(Vec, Option)> { + ) -> Option<(Vec, Option)> { if let CreatingStreamingJobStatus::ConsumingSnapshot { prev_epoch_fake_physical_time, pending_commands, @@ -113,7 +113,7 @@ impl CreatingStreamingJobStatus { pending_non_checkpoint_barriers.push(*backfill_epoch); let prev_epoch = Epoch::from_physical_time(*prev_epoch_fake_physical_time); - let barriers_to_inject = [InjectBarrierInfo { + let barriers_to_inject = [CreatingJobInjectBarrierInfo { curr_epoch: TracedEpoch::new(Epoch(*backfill_epoch)), prev_epoch: TracedEpoch::new(prev_epoch), kind: BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers)), @@ -123,7 +123,7 @@ impl CreatingStreamingJobStatus { .chain( pending_commands .drain(..) - .map(|command_ctx| InjectBarrierInfo { + .map(|command_ctx| CreatingJobInjectBarrierInfo { curr_epoch: command_ctx.curr_epoch.clone(), prev_epoch: command_ctx.prev_epoch.clone(), kind: command_ctx.kind.clone(), @@ -147,7 +147,7 @@ impl CreatingStreamingJobStatus { BarrierKind::Barrier }; Some(( - vec![InjectBarrierInfo { + vec![CreatingJobInjectBarrierInfo { curr_epoch, prev_epoch, kind,