From 211b9b93ecbd18c32b3e2f9b4bc1974453a9d6ce Mon Sep 17 00:00:00 2001 From: Noel Kwan <47273164+kwannoel@users.noreply.github.com> Date: Tue, 16 Apr 2024 10:14:45 +0800 Subject: [PATCH] feat(frontend, meta): support `RECOVER` command to trigger recovery (#16259) --- proto/meta.proto | 5 ++ src/frontend/src/handler/mod.rs | 2 + src/frontend/src/handler/recover.rs | 38 ++++++++++++++ src/frontend/src/meta_client.rs | 6 +++ src/frontend/src/test_utils.rs | 4 ++ src/meta/service/src/stream_service.rs | 14 ++++- src/meta/src/barrier/mod.rs | 51 ++++++++++++++++--- src/meta/src/error.rs | 4 ++ src/meta/src/manager/notification.rs | 1 + src/rpc_client/src/meta_client.rs | 7 +++ src/sqlparser/src/ast/mod.rs | 6 +++ src/sqlparser/src/keywords.rs | 1 + src/sqlparser/src/parser.rs | 1 + .../tests/integration_tests/backfill_tests.rs | 25 +++++++++ src/utils/pgwire/src/pg_response.rs | 1 + 15 files changed, 158 insertions(+), 8 deletions(-) create mode 100644 src/frontend/src/handler/recover.rs diff --git a/proto/meta.proto b/proto/meta.proto index e48dc485d495b..dadc5b364c623 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -266,6 +266,10 @@ message ApplyThrottleResponse { common.Status status = 1; } +message RecoverRequest {} + +message RecoverResponse {} + service StreamManagerService { rpc Flush(FlushRequest) returns (FlushResponse); rpc Pause(PauseRequest) returns (PauseResponse); @@ -277,6 +281,7 @@ service StreamManagerService { rpc ListActorStates(ListActorStatesRequest) returns (ListActorStatesResponse); rpc ListObjectDependencies(ListObjectDependenciesRequest) returns (ListObjectDependenciesResponse); rpc ApplyThrottle(ApplyThrottleRequest) returns (ApplyThrottleResponse); + rpc Recover(RecoverRequest) returns (RecoverResponse); } // Below for cluster service. diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index 4d5f8527fb198..8b60eeeeef2bc 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -87,6 +87,7 @@ pub mod handle_privilege; mod kill_process; pub mod privilege; pub mod query; +mod recover; pub mod show; mod transaction; pub mod util; @@ -509,6 +510,7 @@ pub async fn handle( } Statement::Flush => flush::handle_flush(handler_args).await, Statement::Wait => wait::handle_wait(handler_args).await, + Statement::Recover => recover::handle_recover(handler_args).await, Statement::SetVariable { local: _, variable, diff --git a/src/frontend/src/handler/recover.rs b/src/frontend/src/handler/recover.rs new file mode 100644 index 0000000000000..0a2c7d1837d39 --- /dev/null +++ b/src/frontend/src/handler/recover.rs @@ -0,0 +1,38 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use pgwire::pg_response::{PgResponse, StatementType}; + +use super::RwPgResponse; +use crate::error::{ErrorCode, Result}; +use crate::handler::HandlerArgs; +use crate::session::SessionImpl; + +pub(super) async fn handle_recover(handler_args: HandlerArgs) -> Result { + // Only permit recovery for super users. + if !handler_args.session.is_super_user() { + return Err(ErrorCode::PermissionDenied( + "only superusers can trigger adhoc recovery".to_string(), + ) + .into()); + } + do_recover(&handler_args.session).await?; + Ok(PgResponse::empty_result(StatementType::RECOVER)) +} + +pub(crate) async fn do_recover(session: &SessionImpl) -> Result<()> { + let client = session.env().meta_client(); + client.recover().await?; + Ok(()) +} diff --git a/src/frontend/src/meta_client.rs b/src/frontend/src/meta_client.rs index 678a1684f731a..3cc7e22cf8b24 100644 --- a/src/frontend/src/meta_client.rs +++ b/src/frontend/src/meta_client.rs @@ -52,6 +52,8 @@ pub trait FrontendMetaClient: Send + Sync { async fn wait(&self) -> Result<()>; + async fn recover(&self) -> Result<()>; + async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result>; async fn list_table_fragments( @@ -137,6 +139,10 @@ impl FrontendMetaClient for FrontendMetaClientImpl { self.0.wait().await } + async fn recover(&self) -> Result<()> { + self.0.recover().await + } + async fn cancel_creating_jobs(&self, infos: PbJobs) -> Result> { self.0.cancel_creating_jobs(infos).await } diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index fb8cc650b297e..bda6683797251 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -1055,6 +1055,10 @@ impl FrontendMetaClient for MockFrontendMetaClient { async fn list_compact_task_progress(&self) -> RpcResult> { unimplemented!() } + + async fn recover(&self) -> RpcResult<()> { + unimplemented!() + } } #[cfg(test)] diff --git a/src/meta/service/src/stream_service.rs b/src/meta/service/src/stream_service.rs index 6c2d40e2c581d..cf9a8b1a3e48c 100644 --- a/src/meta/service/src/stream_service.rs +++ b/src/meta/service/src/stream_service.rs @@ -16,7 +16,7 @@ use std::collections::{HashMap, HashSet}; use itertools::Itertools; use risingwave_common::catalog::TableId; -use risingwave_meta::manager::MetadataManager; +use risingwave_meta::manager::{LocalNotification, MetadataManager}; use risingwave_meta::model; use risingwave_meta::model::ActorId; use risingwave_meta::stream::ThrottleConfig; @@ -411,4 +411,16 @@ impl StreamManagerService for StreamServiceImpl { dependencies, })) } + + #[cfg_attr(coverage, coverage(off))] + async fn recover( + &self, + _request: Request, + ) -> Result, Status> { + self.env + .notification_manager() + .notify_local_subscribers(LocalNotification::AdhocRecovery) + .await; + Ok(Response::new(RecoverResponse {})) + } } diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 63df19631adc2..c99940fbc32fb 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -57,6 +57,7 @@ use crate::barrier::notifier::BarrierInfo; use crate::barrier::progress::CreateMviewProgressTracker; use crate::barrier::rpc::ControlStreamManager; use crate::barrier::state::BarrierManagerState; +use crate::error::MetaErrorInner; use crate::hummock::{CommitEpochInfo, HummockManagerRef, NewTableFragmentInfo}; use crate::manager::sink_coordination::SinkCoordinatorManager; use crate::manager::{ @@ -118,6 +119,8 @@ enum RecoveryReason { Bootstrap, /// After failure. Failover(MetaError), + /// Manually triggered + Adhoc, } /// Status of barrier manager. @@ -651,14 +654,20 @@ impl GlobalBarrierManager { } } - // Checkpoint frequency changes. notification = local_notification_rx.recv() => { let notification = notification.unwrap(); - // Handle barrier interval and checkpoint frequency changes - if let LocalNotification::SystemParamsChange(p) = ¬ification { - self.scheduled_barriers.set_min_interval(Duration::from_millis(p.barrier_interval_ms() as u64)); - self.scheduled_barriers - .set_checkpoint_frequency(p.checkpoint_frequency() as usize) + match notification { + // Handle barrier interval and checkpoint frequency changes. + LocalNotification::SystemParamsChange(p) => { + self.scheduled_barriers.set_min_interval(Duration::from_millis(p.barrier_interval_ms() as u64)); + self.scheduled_barriers + .set_checkpoint_frequency(p.checkpoint_frequency() as usize) + }, + // Handle adhoc recovery triggered by user. + LocalNotification::AdhocRecovery => { + self.adhoc_recovery().await; + } + _ => {} } } resp_result = self.control_stream_manager.next_response() => { @@ -788,7 +797,7 @@ impl GlobalBarrierManager { err.clone(), ))); let latest_snapshot = self.context.hummock_manager.latest_snapshot(); - let prev_epoch = TracedEpoch::new(latest_snapshot.committed_epoch.into()); // we can only recovery from the committed epoch + let prev_epoch = TracedEpoch::new(latest_snapshot.committed_epoch.into()); // we can only recover from the committed epoch let span = tracing::info_span!( "failure_recovery", error = %err.as_report(), @@ -803,6 +812,31 @@ impl GlobalBarrierManager { panic!("failed to execute barrier: {}", err.as_report()); } } + + async fn adhoc_recovery(&mut self) { + let err = MetaErrorInner::AdhocRecovery.into(); + self.context.tracker.lock().await.abort_all(&err); + self.checkpoint_control.clear_on_err(&err).await; + + if self.enable_recovery { + self.context + .set_status(BarrierManagerStatus::Recovering(RecoveryReason::Adhoc)); + let latest_snapshot = self.context.hummock_manager.latest_snapshot(); + let prev_epoch = TracedEpoch::new(latest_snapshot.committed_epoch.into()); // we can only recover from the committed epoch + let span = tracing::info_span!( + "adhoc_recovery", + error = %err.as_report(), + prev_epoch = prev_epoch.value().0 + ); + + // No need to clean dirty tables for barrier recovery, + // The foreground stream job should cleanup their own tables. + self.recovery(None).instrument(span).await; + self.context.set_status(BarrierManagerStatus::Running); + } else { + panic!("failed to execute barrier: {}", err.as_report()); + } + } } impl GlobalBarrierManagerContext { @@ -1031,6 +1065,9 @@ impl GlobalBarrierManagerContext { BarrierManagerStatus::Recovering(RecoveryReason::Failover(e)) => { Err(anyhow::anyhow!(e.clone()).context("The cluster is recovering"))? } + BarrierManagerStatus::Recovering(RecoveryReason::Adhoc) => { + bail!("The cluster is recovering-adhoc") + } BarrierManagerStatus::Running => Ok(()), } } diff --git a/src/meta/src/error.rs b/src/meta/src/error.rs index 702f02a4ca0a7..72eb9de2f67d6 100644 --- a/src/meta/src/error.rs +++ b/src/meta/src/error.rs @@ -119,6 +119,10 @@ pub enum MetaErrorInner { #[backtrace] anyhow::Error, ), + + // Indicates that recovery was triggered manually. + #[error("adhoc recovery triggered")] + AdhocRecovery, } impl MetaError { diff --git a/src/meta/src/manager/notification.rs b/src/meta/src/manager/notification.rs index dc54c1e1b12e5..0ce47608cdfd2 100644 --- a/src/meta/src/manager/notification.rs +++ b/src/meta/src/manager/notification.rs @@ -47,6 +47,7 @@ pub enum LocalNotification { SystemParamsChange(SystemParamsReader), FragmentMappingsUpsert(Vec), FragmentMappingsDelete(Vec), + AdhocRecovery, } #[derive(Debug)] diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index eae7cd5019cfa..d438c13ba7a00 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -798,6 +798,12 @@ impl MetaClient { Ok(()) } + pub async fn recover(&self) -> Result<()> { + let request = RecoverRequest {}; + self.inner.recover(request).await?; + Ok(()) + } + pub async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result> { let request = CancelCreatingJobsRequest { jobs: Some(jobs) }; let resp = self.inner.cancel_creating_jobs(request).await?; @@ -1903,6 +1909,7 @@ macro_rules! for_all_meta_rpc { ,{ stream_client, list_fragment_distribution, ListFragmentDistributionRequest, ListFragmentDistributionResponse } ,{ stream_client, list_actor_states, ListActorStatesRequest, ListActorStatesResponse } ,{ stream_client, list_object_dependencies, ListObjectDependenciesRequest, ListObjectDependenciesResponse } + ,{ stream_client, recover, RecoverRequest, RecoverResponse } ,{ ddl_client, create_table, CreateTableRequest, CreateTableResponse } ,{ ddl_client, alter_name, AlterNameRequest, AlterNameResponse } ,{ ddl_client, alter_owner, AlterOwnerRequest, AlterOwnerResponse } diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index eeeeb85a932b8..82b59d1bf462a 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -1519,6 +1519,8 @@ pub enum Statement { /// WAIT for ALL running stream jobs to finish. /// It will block the current session the condition is met. Wait, + /// Trigger stream job recover + Recover, } impl fmt::Display for Statement { @@ -2108,6 +2110,10 @@ impl fmt::Display for Statement { write!(f, "KILL {}", process_id)?; Ok(()) } + Statement::Recover => { + write!(f, "RECOVER")?; + Ok(()) + } } } } diff --git a/src/sqlparser/src/keywords.rs b/src/sqlparser/src/keywords.rs index c5cd8adbe73a8..208a642eb484b 100644 --- a/src/sqlparser/src/keywords.rs +++ b/src/sqlparser/src/keywords.rs @@ -402,6 +402,7 @@ define_keywords!( READ, READS, REAL, + RECOVER, RECURSIVE, REF, REFERENCES, diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index fbef8bac45a52..aaf400b43879e 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -283,6 +283,7 @@ impl Parser { Keyword::CLOSE => Ok(self.parse_close_cursor()?), Keyword::FLUSH => Ok(Statement::Flush), Keyword::WAIT => Ok(Statement::Wait), + Keyword::RECOVER => Ok(Statement::Recover), _ => self.expected( "an SQL statement", Token::Word(w).with_location(token.location), diff --git a/src/tests/simulation/tests/integration_tests/backfill_tests.rs b/src/tests/simulation/tests/integration_tests/backfill_tests.rs index a97a52a095dad..bcc271bcb4dce 100644 --- a/src/tests/simulation/tests/integration_tests/backfill_tests.rs +++ b/src/tests/simulation/tests/integration_tests/backfill_tests.rs @@ -305,3 +305,28 @@ async fn test_enable_arrangement_backfill() -> Result<()> { assert!(!result.contains("ArrangementBackfill")); Ok(()) } + +#[tokio::test] +async fn test_recovery_cancels_foreground_ddl() -> Result<()> { + let mut cluster = Cluster::start(Configuration::enable_arrangement_backfill()).await?; + let mut session = cluster.start_session(); + session.run("SET STREAMING_RATE_LIMIT=1").await?; + session.run("CREATE TABLE t(v1 int);").await?; + session + .run("INSERT INTO t select * from generate_series(1, 100000);") + .await?; + let handle = tokio::spawn(async move { + session + .run("CREATE MATERIALIZED VIEW m1 AS SELECT * FROM t;") + .await + }); + sleep(Duration::from_secs(2)).await; + cluster.run("RECOVER").await?; + match handle.await? { + Ok(_) => panic!("create m1 should fail"), + Err(e) => { + assert!(e.to_string().contains("adhoc recovery triggered")); + } + } + Ok(()) +} diff --git a/src/utils/pgwire/src/pg_response.rs b/src/utils/pgwire/src/pg_response.rs index 0742ac62f7287..1431909f5366f 100644 --- a/src/utils/pgwire/src/pg_response.rs +++ b/src/utils/pgwire/src/pg_response.rs @@ -105,6 +105,7 @@ pub enum StatementType { CLOSE_CURSOR, WAIT, KILL, + RECOVER, } impl std::fmt::Display for StatementType {