Skip to content

Commit

Permalink
feat(frontend, meta): support RECOVER command to trigger recovery (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored Apr 16, 2024
1 parent cb0ea41 commit 211b9b9
Showing 15 changed files with 158 additions and 8 deletions.
5 changes: 5 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
@@ -266,6 +266,10 @@ message ApplyThrottleResponse {
common.Status status = 1;
}

message RecoverRequest {}

message RecoverResponse {}

service StreamManagerService {
rpc Flush(FlushRequest) returns (FlushResponse);
rpc Pause(PauseRequest) returns (PauseResponse);
@@ -277,6 +281,7 @@ service StreamManagerService {
rpc ListActorStates(ListActorStatesRequest) returns (ListActorStatesResponse);
rpc ListObjectDependencies(ListObjectDependenciesRequest) returns (ListObjectDependenciesResponse);
rpc ApplyThrottle(ApplyThrottleRequest) returns (ApplyThrottleResponse);
rpc Recover(RecoverRequest) returns (RecoverResponse);
}

// Below for cluster service.
2 changes: 2 additions & 0 deletions src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
@@ -87,6 +87,7 @@ pub mod handle_privilege;
mod kill_process;
pub mod privilege;
pub mod query;
mod recover;
pub mod show;
mod transaction;
pub mod util;
@@ -509,6 +510,7 @@ pub async fn handle(
}
Statement::Flush => flush::handle_flush(handler_args).await,
Statement::Wait => wait::handle_wait(handler_args).await,
Statement::Recover => recover::handle_recover(handler_args).await,
Statement::SetVariable {
local: _,
variable,
38 changes: 38 additions & 0 deletions src/frontend/src/handler/recover.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use pgwire::pg_response::{PgResponse, StatementType};

use super::RwPgResponse;
use crate::error::{ErrorCode, Result};
use crate::handler::HandlerArgs;
use crate::session::SessionImpl;

pub(super) async fn handle_recover(handler_args: HandlerArgs) -> Result<RwPgResponse> {
// Only permit recovery for super users.
if !handler_args.session.is_super_user() {
return Err(ErrorCode::PermissionDenied(
"only superusers can trigger adhoc recovery".to_string(),
)
.into());
}
do_recover(&handler_args.session).await?;
Ok(PgResponse::empty_result(StatementType::RECOVER))
}

pub(crate) async fn do_recover(session: &SessionImpl) -> Result<()> {
let client = session.env().meta_client();
client.recover().await?;
Ok(())
}
6 changes: 6 additions & 0 deletions src/frontend/src/meta_client.rs
Original file line number Diff line number Diff line change
@@ -52,6 +52,8 @@ pub trait FrontendMetaClient: Send + Sync {

async fn wait(&self) -> Result<()>;

async fn recover(&self) -> Result<()>;

async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>>;

async fn list_table_fragments(
@@ -137,6 +139,10 @@ impl FrontendMetaClient for FrontendMetaClientImpl {
self.0.wait().await
}

async fn recover(&self) -> Result<()> {
self.0.recover().await
}

async fn cancel_creating_jobs(&self, infos: PbJobs) -> Result<Vec<u32>> {
self.0.cancel_creating_jobs(infos).await
}
4 changes: 4 additions & 0 deletions src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1055,6 +1055,10 @@ impl FrontendMetaClient for MockFrontendMetaClient {
async fn list_compact_task_progress(&self) -> RpcResult<Vec<CompactTaskProgress>> {
unimplemented!()
}

async fn recover(&self) -> RpcResult<()> {
unimplemented!()
}
}

#[cfg(test)]
14 changes: 13 additions & 1 deletion src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
@@ -16,7 +16,7 @@ use std::collections::{HashMap, HashSet};

use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_meta::manager::MetadataManager;
use risingwave_meta::manager::{LocalNotification, MetadataManager};
use risingwave_meta::model;
use risingwave_meta::model::ActorId;
use risingwave_meta::stream::ThrottleConfig;
@@ -411,4 +411,16 @@ impl StreamManagerService for StreamServiceImpl {
dependencies,
}))
}

#[cfg_attr(coverage, coverage(off))]
async fn recover(
&self,
_request: Request<RecoverRequest>,
) -> Result<Response<RecoverResponse>, Status> {
self.env
.notification_manager()
.notify_local_subscribers(LocalNotification::AdhocRecovery)
.await;
Ok(Response::new(RecoverResponse {}))
}
}
51 changes: 44 additions & 7 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
@@ -57,6 +57,7 @@ use crate::barrier::notifier::BarrierInfo;
use crate::barrier::progress::CreateMviewProgressTracker;
use crate::barrier::rpc::ControlStreamManager;
use crate::barrier::state::BarrierManagerState;
use crate::error::MetaErrorInner;
use crate::hummock::{CommitEpochInfo, HummockManagerRef, NewTableFragmentInfo};
use crate::manager::sink_coordination::SinkCoordinatorManager;
use crate::manager::{
@@ -118,6 +119,8 @@ enum RecoveryReason {
Bootstrap,
/// After failure.
Failover(MetaError),
/// Manually triggered
Adhoc,
}

/// Status of barrier manager.
@@ -651,14 +654,20 @@ impl GlobalBarrierManager {
}
}

// Checkpoint frequency changes.
notification = local_notification_rx.recv() => {
let notification = notification.unwrap();
// Handle barrier interval and checkpoint frequency changes
if let LocalNotification::SystemParamsChange(p) = &notification {
self.scheduled_barriers.set_min_interval(Duration::from_millis(p.barrier_interval_ms() as u64));
self.scheduled_barriers
.set_checkpoint_frequency(p.checkpoint_frequency() as usize)
match notification {
// Handle barrier interval and checkpoint frequency changes.
LocalNotification::SystemParamsChange(p) => {
self.scheduled_barriers.set_min_interval(Duration::from_millis(p.barrier_interval_ms() as u64));
self.scheduled_barriers
.set_checkpoint_frequency(p.checkpoint_frequency() as usize)
},
// Handle adhoc recovery triggered by user.
LocalNotification::AdhocRecovery => {
self.adhoc_recovery().await;
}
_ => {}
}
}
resp_result = self.control_stream_manager.next_response() => {
@@ -788,7 +797,7 @@ impl GlobalBarrierManager {
err.clone(),
)));
let latest_snapshot = self.context.hummock_manager.latest_snapshot();
let prev_epoch = TracedEpoch::new(latest_snapshot.committed_epoch.into()); // we can only recovery from the committed epoch
let prev_epoch = TracedEpoch::new(latest_snapshot.committed_epoch.into()); // we can only recover from the committed epoch
let span = tracing::info_span!(
"failure_recovery",
error = %err.as_report(),
@@ -803,6 +812,31 @@ impl GlobalBarrierManager {
panic!("failed to execute barrier: {}", err.as_report());
}
}

async fn adhoc_recovery(&mut self) {
let err = MetaErrorInner::AdhocRecovery.into();
self.context.tracker.lock().await.abort_all(&err);
self.checkpoint_control.clear_on_err(&err).await;

if self.enable_recovery {
self.context
.set_status(BarrierManagerStatus::Recovering(RecoveryReason::Adhoc));
let latest_snapshot = self.context.hummock_manager.latest_snapshot();
let prev_epoch = TracedEpoch::new(latest_snapshot.committed_epoch.into()); // we can only recover from the committed epoch
let span = tracing::info_span!(
"adhoc_recovery",
error = %err.as_report(),
prev_epoch = prev_epoch.value().0
);

// No need to clean dirty tables for barrier recovery,
// The foreground stream job should cleanup their own tables.
self.recovery(None).instrument(span).await;
self.context.set_status(BarrierManagerStatus::Running);
} else {
panic!("failed to execute barrier: {}", err.as_report());
}
}
}

impl GlobalBarrierManagerContext {
@@ -1031,6 +1065,9 @@ impl GlobalBarrierManagerContext {
BarrierManagerStatus::Recovering(RecoveryReason::Failover(e)) => {
Err(anyhow::anyhow!(e.clone()).context("The cluster is recovering"))?
}
BarrierManagerStatus::Recovering(RecoveryReason::Adhoc) => {
bail!("The cluster is recovering-adhoc")
}
BarrierManagerStatus::Running => Ok(()),
}
}
4 changes: 4 additions & 0 deletions src/meta/src/error.rs
Original file line number Diff line number Diff line change
@@ -119,6 +119,10 @@ pub enum MetaErrorInner {
#[backtrace]
anyhow::Error,
),

// Indicates that recovery was triggered manually.
#[error("adhoc recovery triggered")]
AdhocRecovery,
}

impl MetaError {
1 change: 1 addition & 0 deletions src/meta/src/manager/notification.rs
Original file line number Diff line number Diff line change
@@ -47,6 +47,7 @@ pub enum LocalNotification {
SystemParamsChange(SystemParamsReader),
FragmentMappingsUpsert(Vec<FragmentId>),
FragmentMappingsDelete(Vec<FragmentId>),
AdhocRecovery,
}

#[derive(Debug)]
7 changes: 7 additions & 0 deletions src/rpc_client/src/meta_client.rs
Original file line number Diff line number Diff line change
@@ -798,6 +798,12 @@ impl MetaClient {
Ok(())
}

pub async fn recover(&self) -> Result<()> {
let request = RecoverRequest {};
self.inner.recover(request).await?;
Ok(())
}

pub async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>> {
let request = CancelCreatingJobsRequest { jobs: Some(jobs) };
let resp = self.inner.cancel_creating_jobs(request).await?;
@@ -1903,6 +1909,7 @@ macro_rules! for_all_meta_rpc {
,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse }
,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse }
,{ stream_client, list_object_dependencies, ListObjectDependenciesRequest, ListObjectDependenciesResponse }
,{ stream_client, recover, RecoverRequest, RecoverResponse }
,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse }
,{ ddl_client, alter_name, AlterNameRequest, AlterNameResponse }
,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse }
6 changes: 6 additions & 0 deletions src/sqlparser/src/ast/mod.rs
Original file line number Diff line number Diff line change
@@ -1519,6 +1519,8 @@ pub enum Statement {
/// WAIT for ALL running stream jobs to finish.
/// It will block the current session the condition is met.
Wait,
/// Trigger stream job recover
Recover,
}

impl fmt::Display for Statement {
@@ -2108,6 +2110,10 @@ impl fmt::Display for Statement {
write!(f, "KILL {}", process_id)?;
Ok(())
}
Statement::Recover => {
write!(f, "RECOVER")?;
Ok(())
}
}
}
}
1 change: 1 addition & 0 deletions src/sqlparser/src/keywords.rs
Original file line number Diff line number Diff line change
@@ -402,6 +402,7 @@ define_keywords!(
READ,
READS,
REAL,
RECOVER,
RECURSIVE,
REF,
REFERENCES,
1 change: 1 addition & 0 deletions src/sqlparser/src/parser.rs
Original file line number Diff line number Diff line change
@@ -283,6 +283,7 @@ impl Parser {
Keyword::CLOSE => Ok(self.parse_close_cursor()?),
Keyword::FLUSH => Ok(Statement::Flush),
Keyword::WAIT => Ok(Statement::Wait),
Keyword::RECOVER => Ok(Statement::Recover),
_ => self.expected(
"an SQL statement",
Token::Word(w).with_location(token.location),
25 changes: 25 additions & 0 deletions src/tests/simulation/tests/integration_tests/backfill_tests.rs
Original file line number Diff line number Diff line change
@@ -305,3 +305,28 @@ async fn test_enable_arrangement_backfill() -> Result<()> {
assert!(!result.contains("ArrangementBackfill"));
Ok(())
}

#[tokio::test]
async fn test_recovery_cancels_foreground_ddl() -> Result<()> {
let mut cluster = Cluster::start(Configuration::enable_arrangement_backfill()).await?;
let mut session = cluster.start_session();
session.run("SET STREAMING_RATE_LIMIT=1").await?;
session.run("CREATE TABLE t(v1 int);").await?;
session
.run("INSERT INTO t select * from generate_series(1, 100000);")
.await?;
let handle = tokio::spawn(async move {
session
.run("CREATE MATERIALIZED VIEW m1 AS SELECT * FROM t;")
.await
});
sleep(Duration::from_secs(2)).await;
cluster.run("RECOVER").await?;
match handle.await? {
Ok(_) => panic!("create m1 should fail"),
Err(e) => {
assert!(e.to_string().contains("adhoc recovery triggered"));
}
}
Ok(())
}
1 change: 1 addition & 0 deletions src/utils/pgwire/src/pg_response.rs
Original file line number Diff line number Diff line change
@@ -105,6 +105,7 @@ pub enum StatementType {
CLOSE_CURSOR,
WAIT,
KILL,
RECOVER,
}

impl std::fmt::Display for StatementType {

0 comments on commit 211b9b9

Please sign in to comment.