Skip to content

Commit

Permalink
refactor(meta): remove BarrierInfo and simplify pause and resume log …
Browse files Browse the repository at this point in the history
…output
  • Loading branch information
wenym1 committed Sep 22, 2024
1 parent 98a2d41 commit 5b92fa6
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 74 deletions.
10 changes: 2 additions & 8 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -160,17 +160,11 @@ enum PausedReason {

message PauseRequest {}

message PauseResponse {
optional PausedReason prev = 1;
optional PausedReason curr = 2;
}
message PauseResponse {}

Check failure on line 163 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "1" with name "prev" on message "PauseResponse" was deleted without reserving the name "prev".

Check failure on line 163 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "2" with name "curr" on message "PauseResponse" was deleted without reserving the name "curr".

Check failure on line 163 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "1" with name "prev" on message "PauseResponse" was deleted without reserving the number "1".

Check failure on line 163 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "2" with name "curr" on message "PauseResponse" was deleted without reserving the number "2".

message ResumeRequest {}

message ResumeResponse {
optional PausedReason prev = 1;
optional PausedReason curr = 2;
}
message ResumeResponse {}

Check failure on line 167 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "1" with name "prev" on message "ResumeResponse" was deleted without reserving the name "prev".

Check failure on line 167 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "2" with name "curr" on message "ResumeResponse" was deleted without reserving the name "curr".

Check failure on line 167 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "1" with name "prev" on message "ResumeResponse" was deleted without reserving the number "1".

Check failure on line 167 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "2" with name "curr" on message "ResumeResponse" was deleted without reserving the number "2".

message CancelCreatingJobsRequest {
message CreatingJobInfo {
Expand Down
18 changes: 5 additions & 13 deletions src/ctl/src/cmd_impl/meta/pause_resume.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use risingwave_pb::meta::PausedReason;

use crate::CtlContext;

fn desc(reason: PausedReason) -> &'static str {
pub fn desc(reason: PausedReason) -> &'static str {
// Method on optional enums derived from `prost` will use `Unspecified` if unset. So we treat
// `Unspecified` as not paused here.
match reason {
Expand All @@ -29,27 +29,19 @@ fn desc(reason: PausedReason) -> &'static str {
pub async fn pause(context: &CtlContext) -> anyhow::Result<()> {
let meta_client = context.meta_client().await?;

let response = meta_client.pause().await?;
meta_client.pause().await?;

println!(
"Done.\nPrevious: {}\nCurrent: {}",
desc(response.prev()),
desc(response.curr())
);
println!("Done.");

Ok(())
}

pub async fn resume(context: &CtlContext) -> anyhow::Result<()> {
let meta_client = context.meta_client().await?;

let response = meta_client.resume().await?;
meta_client.resume().await?;

println!(
"Done.\nPrevious: {}\nCurrent: {}",
desc(response.prev()),
desc(response.curr())
);
println!("Done.");

Ok(())
}
16 changes: 4 additions & 12 deletions src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,26 +78,18 @@ impl StreamManagerService for StreamServiceImpl {

#[cfg_attr(coverage, coverage(off))]
async fn pause(&self, _: Request<PauseRequest>) -> Result<Response<PauseResponse>, Status> {
let i = self
.barrier_scheduler
self.barrier_scheduler
.run_command(Command::pause(PausedReason::Manual))
.await?;
Ok(Response::new(PauseResponse {
prev: i.prev_paused_reason.map(Into::into),
curr: i.curr_paused_reason.map(Into::into),
}))
Ok(Response::new(PauseResponse {}))
}

#[cfg_attr(coverage, coverage(off))]
async fn resume(&self, _: Request<ResumeRequest>) -> Result<Response<ResumeResponse>, Status> {
let i = self
.barrier_scheduler
self.barrier_scheduler
.run_command(Command::resume(PausedReason::Manual))
.await?;
Ok(Response::new(ResumeResponse {
prev: i.prev_paused_reason.map(Into::into),
curr: i.curr_paused_reason.map(Into::into),
}))
Ok(Response::new(ResumeResponse {}))
}

#[cfg_attr(coverage, coverage(off))]
Expand Down
10 changes: 1 addition & 9 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ use self::command::CommandContext;
use self::notifier::Notifier;
use crate::barrier::creating_job::CreatingStreamingJobControl;
use crate::barrier::info::InflightGraphInfo;
use crate::barrier::notifier::BarrierInfo;
use crate::barrier::progress::{CreateMviewProgressTracker, TrackingCommand, TrackingJob};
use crate::barrier::rpc::{merge_node_rpc_errors, ControlStreamManager};
use crate::barrier::state::BarrierManagerState;
Expand Down Expand Up @@ -1075,16 +1074,9 @@ impl GlobalBarrierManager {
};

// Notify about the injection.
let prev_paused_reason = self.state.paused_reason();
let curr_paused_reason = command_ctx.next_paused_reason();

let info = BarrierInfo {
prev_epoch: prev_epoch.value(),
curr_epoch: curr_epoch.value(),
prev_paused_reason,
curr_paused_reason,
};
notifiers.iter_mut().for_each(|n| n.notify_started(info));
notifiers.iter_mut().for_each(|n| n.notify_started());

// Update the paused state after the barrier is injected.
self.state.set_paused_reason(curr_paused_reason);
Expand Down
18 changes: 3 additions & 15 deletions src/meta/src/barrier/notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,37 +12,25 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::util::epoch::Epoch;
use risingwave_pb::meta::PausedReason;
use tokio::sync::oneshot;

use crate::{MetaError, MetaResult};

/// The barrier info sent back to the caller when a barrier is injected.
#[derive(Debug, Clone, Copy)]
pub struct BarrierInfo {
pub prev_epoch: Epoch,
pub curr_epoch: Epoch,

pub prev_paused_reason: Option<PausedReason>,
pub curr_paused_reason: Option<PausedReason>,
}

/// Used for notifying the status of a scheduled command/barrier.
#[derive(Debug, Default)]
pub(crate) struct Notifier {
/// Get notified when scheduled barrier has started to be handled.
pub started: Option<oneshot::Sender<MetaResult<BarrierInfo>>>,
pub started: Option<oneshot::Sender<MetaResult<()>>>,

/// Get notified when scheduled barrier is collected or failed.
pub collected: Option<oneshot::Sender<MetaResult<()>>>,
}

impl Notifier {
/// Notify when we have injected a barrier to compute nodes.
pub fn notify_started(&mut self, info: BarrierInfo) {
pub fn notify_started(&mut self) {
if let Some(tx) = self.started.take() {
tx.send(Ok(info)).ok();
tx.send(Ok(())).ok();
}
}

Expand Down
24 changes: 7 additions & 17 deletions src/meta/src/barrier/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use tokio::select;
use tokio::sync::{oneshot, watch};
use tokio::time::Interval;

use super::notifier::{BarrierInfo, Notifier};
use super::notifier::Notifier;
use super::{Command, Scheduled};
use crate::hummock::HummockManagerRef;
use crate::model::ActorId;
Expand Down Expand Up @@ -251,7 +251,7 @@ impl BarrierScheduler {
/// Returns the barrier info of each command.
///
/// TODO: atomicity of multiple commands is not guaranteed.
async fn run_multiple_commands(&self, commands: Vec<Command>) -> MetaResult<Vec<BarrierInfo>> {
async fn run_multiple_commands(&self, commands: Vec<Command>) -> MetaResult<()> {
let mut contexts = Vec::with_capacity(commands.len());
let mut scheduleds = Vec::with_capacity(commands.len());

Expand All @@ -272,16 +272,13 @@ impl BarrierScheduler {

self.push(scheduleds)?;

let mut infos = Vec::with_capacity(contexts.len());

for (injected_rx, collect_rx) in contexts {
// Wait for this command to be injected, and record the result.
tracing::trace!("waiting for injected_rx");
let info = injected_rx
injected_rx
.await
.ok()
.context("failed to inject barrier")??;
infos.push(info);

tracing::trace!("waiting for collect_rx");
// Throw the error if it occurs when collecting this barrier.
Expand All @@ -291,35 +288,28 @@ impl BarrierScheduler {
.context("failed to collect barrier")??;
}

Ok(infos)
Ok(())
}

/// Run a command with a `Pause` command before and `Resume` command after it. Used for
/// configuration change.
///
/// Returns the barrier info of the actual command.
pub async fn run_config_change_command_with_pause(
&self,
command: Command,
) -> MetaResult<BarrierInfo> {
pub async fn run_config_change_command_with_pause(&self, command: Command) -> MetaResult<()> {
self.run_multiple_commands(vec![
Command::pause(PausedReason::ConfigChange),
command,
Command::resume(PausedReason::ConfigChange),
])
.await
.map(|i| i[1])
}

/// Run a command and return when it's completely finished.
///
/// Returns the barrier info of the actual command.
pub async fn run_command(&self, command: Command) -> MetaResult<BarrierInfo> {
pub async fn run_command(&self, command: Command) -> MetaResult<()> {
tracing::trace!("run_command: {:?}", command);
let ret = self
.run_multiple_commands(vec![command])
.await
.map(|i| i[0]);
let ret = self.run_multiple_commands(vec![command]).await;
tracing::trace!("run_command finished");
ret
}
Expand Down

0 comments on commit 5b92fa6

Please sign in to comment.