diff --git a/proto/meta.proto b/proto/meta.proto index 4be4f2b18d9ca..2bd9038c9808e 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -160,17 +160,11 @@ enum PausedReason { message PauseRequest {} -message PauseResponse { - optional PausedReason prev = 1; - optional PausedReason curr = 2; -} +message PauseResponse {} message ResumeRequest {} -message ResumeResponse { - optional PausedReason prev = 1; - optional PausedReason curr = 2; -} +message ResumeResponse {} message CancelCreatingJobsRequest { message CreatingJobInfo { diff --git a/src/ctl/src/cmd_impl/meta/pause_resume.rs b/src/ctl/src/cmd_impl/meta/pause_resume.rs index d274819f1db65..532bacc4241e8 100644 --- a/src/ctl/src/cmd_impl/meta/pause_resume.rs +++ b/src/ctl/src/cmd_impl/meta/pause_resume.rs @@ -16,7 +16,7 @@ use risingwave_pb::meta::PausedReason; use crate::CtlContext; -fn desc(reason: PausedReason) -> &'static str { +pub fn desc(reason: PausedReason) -> &'static str { // Method on optional enums derived from `prost` will use `Unspecified` if unset. So we treat // `Unspecified` as not paused here. match reason { @@ -29,13 +29,9 @@ fn desc(reason: PausedReason) -> &'static str { pub async fn pause(context: &CtlContext) -> anyhow::Result<()> { let meta_client = context.meta_client().await?; - let response = meta_client.pause().await?; + meta_client.pause().await?; - println!( - "Done.\nPrevious: {}\nCurrent: {}", - desc(response.prev()), - desc(response.curr()) - ); + println!("Done."); Ok(()) } @@ -43,13 +39,9 @@ pub async fn pause(context: &CtlContext) -> anyhow::Result<()> { pub async fn resume(context: &CtlContext) -> anyhow::Result<()> { let meta_client = context.meta_client().await?; - let response = meta_client.resume().await?; + meta_client.resume().await?; - println!( - "Done.\nPrevious: {}\nCurrent: {}", - desc(response.prev()), - desc(response.curr()) - ); + println!("Done."); Ok(()) } diff --git a/src/meta/service/src/stream_service.rs b/src/meta/service/src/stream_service.rs index cfbdda2e96509..14fed0da48dc6 100644 --- a/src/meta/service/src/stream_service.rs +++ b/src/meta/service/src/stream_service.rs @@ -78,26 +78,18 @@ impl StreamManagerService for StreamServiceImpl { #[cfg_attr(coverage, coverage(off))] async fn pause(&self, _: Request) -> Result, Status> { - let i = self - .barrier_scheduler + self.barrier_scheduler .run_command(Command::pause(PausedReason::Manual)) .await?; - Ok(Response::new(PauseResponse { - prev: i.prev_paused_reason.map(Into::into), - curr: i.curr_paused_reason.map(Into::into), - })) + Ok(Response::new(PauseResponse {})) } #[cfg_attr(coverage, coverage(off))] async fn resume(&self, _: Request) -> Result, Status> { - let i = self - .barrier_scheduler + self.barrier_scheduler .run_command(Command::resume(PausedReason::Manual)) .await?; - Ok(Response::new(ResumeResponse { - prev: i.prev_paused_reason.map(Into::into), - curr: i.curr_paused_reason.map(Into::into), - })) + Ok(Response::new(ResumeResponse {})) } #[cfg_attr(coverage, coverage(off))] diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 5bb76ee46133c..32d7becd1c82e 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -54,7 +54,6 @@ use self::command::CommandContext; use self::notifier::Notifier; use crate::barrier::creating_job::CreatingStreamingJobControl; use crate::barrier::info::InflightGraphInfo; -use crate::barrier::notifier::BarrierInfo; use crate::barrier::progress::{CreateMviewProgressTracker, TrackingCommand, TrackingJob}; use crate::barrier::rpc::{merge_node_rpc_errors, ControlStreamManager}; use crate::barrier::state::BarrierManagerState; @@ -1075,16 +1074,9 @@ impl GlobalBarrierManager { }; // Notify about the injection. - let prev_paused_reason = self.state.paused_reason(); let curr_paused_reason = command_ctx.next_paused_reason(); - let info = BarrierInfo { - prev_epoch: prev_epoch.value(), - curr_epoch: curr_epoch.value(), - prev_paused_reason, - curr_paused_reason, - }; - notifiers.iter_mut().for_each(|n| n.notify_started(info)); + notifiers.iter_mut().for_each(|n| n.notify_started()); // Update the paused state after the barrier is injected. self.state.set_paused_reason(curr_paused_reason); diff --git a/src/meta/src/barrier/notifier.rs b/src/meta/src/barrier/notifier.rs index da201927664a4..d86f43fdd0a1d 100644 --- a/src/meta/src/barrier/notifier.rs +++ b/src/meta/src/barrier/notifier.rs @@ -12,27 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::util::epoch::Epoch; -use risingwave_pb::meta::PausedReason; use tokio::sync::oneshot; use crate::{MetaError, MetaResult}; -/// The barrier info sent back to the caller when a barrier is injected. -#[derive(Debug, Clone, Copy)] -pub struct BarrierInfo { - pub prev_epoch: Epoch, - pub curr_epoch: Epoch, - - pub prev_paused_reason: Option, - pub curr_paused_reason: Option, -} - /// Used for notifying the status of a scheduled command/barrier. #[derive(Debug, Default)] pub(crate) struct Notifier { /// Get notified when scheduled barrier has started to be handled. - pub started: Option>>, + pub started: Option>>, /// Get notified when scheduled barrier is collected or failed. pub collected: Option>>, @@ -40,9 +28,9 @@ pub(crate) struct Notifier { impl Notifier { /// Notify when we have injected a barrier to compute nodes. - pub fn notify_started(&mut self, info: BarrierInfo) { + pub fn notify_started(&mut self) { if let Some(tx) = self.started.take() { - tx.send(Ok(info)).ok(); + tx.send(Ok(())).ok(); } } diff --git a/src/meta/src/barrier/schedule.rs b/src/meta/src/barrier/schedule.rs index f213999f0a08f..9b7cbb546fe60 100644 --- a/src/meta/src/barrier/schedule.rs +++ b/src/meta/src/barrier/schedule.rs @@ -27,7 +27,7 @@ use tokio::select; use tokio::sync::{oneshot, watch}; use tokio::time::Interval; -use super::notifier::{BarrierInfo, Notifier}; +use super::notifier::Notifier; use super::{Command, Scheduled}; use crate::hummock::HummockManagerRef; use crate::model::ActorId; @@ -251,7 +251,7 @@ impl BarrierScheduler { /// Returns the barrier info of each command. /// /// TODO: atomicity of multiple commands is not guaranteed. - async fn run_multiple_commands(&self, commands: Vec) -> MetaResult> { + async fn run_multiple_commands(&self, commands: Vec) -> MetaResult<()> { let mut contexts = Vec::with_capacity(commands.len()); let mut scheduleds = Vec::with_capacity(commands.len()); @@ -272,16 +272,13 @@ impl BarrierScheduler { self.push(scheduleds)?; - let mut infos = Vec::with_capacity(contexts.len()); - for (injected_rx, collect_rx) in contexts { // Wait for this command to be injected, and record the result. tracing::trace!("waiting for injected_rx"); - let info = injected_rx + injected_rx .await .ok() .context("failed to inject barrier")??; - infos.push(info); tracing::trace!("waiting for collect_rx"); // Throw the error if it occurs when collecting this barrier. @@ -291,35 +288,28 @@ impl BarrierScheduler { .context("failed to collect barrier")??; } - Ok(infos) + Ok(()) } /// Run a command with a `Pause` command before and `Resume` command after it. Used for /// configuration change. /// /// Returns the barrier info of the actual command. - pub async fn run_config_change_command_with_pause( - &self, - command: Command, - ) -> MetaResult { + pub async fn run_config_change_command_with_pause(&self, command: Command) -> MetaResult<()> { self.run_multiple_commands(vec![ Command::pause(PausedReason::ConfigChange), command, Command::resume(PausedReason::ConfigChange), ]) .await - .map(|i| i[1]) } /// Run a command and return when it's completely finished. /// /// Returns the barrier info of the actual command. - pub async fn run_command(&self, command: Command) -> MetaResult { + pub async fn run_command(&self, command: Command) -> MetaResult<()> { tracing::trace!("run_command: {:?}", command); - let ret = self - .run_multiple_commands(vec![command]) - .await - .map(|i| i[0]); + let ret = self.run_multiple_commands(vec![command]).await; tracing::trace!("run_command finished"); ret }