Skip to content

Commit

Permalink
feat: add WAIT command (#13027)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored Oct 25, 2023
1 parent 8fdcfb8 commit 8221d3a
Show file tree
Hide file tree
Showing 12 changed files with 86 additions and 0 deletions.
5 changes: 5 additions & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,10 @@ message GetTablesResponse {
map<uint32, catalog.Table> tables = 1;
}

message WaitRequest {}

message WaitResponse {}

service DdlService {
rpc CreateDatabase(CreateDatabaseRequest) returns (CreateDatabaseResponse);
rpc DropDatabase(DropDatabaseRequest) returns (DropDatabaseResponse);
Expand Down Expand Up @@ -343,4 +347,5 @@ service DdlService {
rpc ListConnections(ListConnectionsRequest) returns (ListConnectionsResponse);
rpc DropConnection(DropConnectionRequest) returns (DropConnectionResponse);
rpc GetTables(GetTablesRequest) returns (GetTablesResponse);
rpc Wait(WaitRequest) returns (WaitResponse);
}
2 changes: 2 additions & 0 deletions src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ mod show;
mod transaction;
pub mod util;
pub mod variable;
mod wait;

/// The [`PgResponseBuilder`] used by RisingWave.
pub type RwPgResponseBuilder = PgResponseBuilder<PgResponseStream>;
Expand Down Expand Up @@ -419,6 +420,7 @@ pub async fn handle(
}
}
Statement::Flush => flush::handle_flush(handler_args).await,
Statement::Wait => wait::handle_wait(handler_args).await,
Statement::SetVariable {
local: _,
variable,
Expand Down
31 changes: 31 additions & 0 deletions src/frontend/src/handler/wait.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::error::Result;

use super::RwPgResponse;
use crate::handler::HandlerArgs;
use crate::session::SessionImpl;

pub(super) async fn handle_wait(handler_args: HandlerArgs) -> Result<RwPgResponse> {
do_wait(&handler_args.session).await?;
Ok(PgResponse::empty_result(StatementType::WAIT))
}

pub(crate) async fn do_wait(session: &SessionImpl) -> Result<()> {
let client = session.env().meta_client();
client.wait().await?;
Ok(())
}
6 changes: 6 additions & 0 deletions src/frontend/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pub trait FrontendMetaClient: Send + Sync {

async fn flush(&self, checkpoint: bool) -> Result<HummockSnapshot>;

async fn wait(&self) -> Result<()>;

async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>>;

async fn list_table_fragments(
Expand Down Expand Up @@ -111,6 +113,10 @@ impl FrontendMetaClient for FrontendMetaClientImpl {
self.0.flush(checkpoint).await
}

async fn wait(&self) -> Result<()> {
self.0.wait().await
}

async fn cancel_creating_jobs(&self, infos: PbJobs) -> Result<Vec<u32>> {
self.0.cancel_creating_jobs(infos).await
}
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,10 @@ impl FrontendMetaClient for MockFrontendMetaClient {
})
}

async fn wait(&self) -> RpcResult<()> {
Ok(())
}

async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult<Vec<u32>> {
Ok(vec![])
}
Expand Down
5 changes: 5 additions & 0 deletions src/meta/service/src/ddl_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,11 @@ impl DdlService for DdlServiceImpl {
}
Ok(Response::new(GetTablesResponse { tables }))
}

async fn wait(&self, _request: Request<WaitRequest>) -> Result<Response<WaitResponse>, Status> {
self.ddl_controller.wait().await;
Ok(Response::new(WaitResponse {}))
}
}

impl DdlServiceImpl {
Expand Down
16 changes: 16 additions & 0 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::cmp::Ordering;
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::time::Duration;

use itertools::Itertools;
use risingwave_common::config::DefaultParallelism;
Expand All @@ -29,6 +30,7 @@ use risingwave_pb::ddl_service::alter_relation_name_request::Relation;
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::stream_plan::StreamFragmentGraph as StreamFragmentGraphProto;
use tokio::sync::Semaphore;
use tokio::time::sleep;
use tracing::log::warn;
use tracing::Instrument;

Expand Down Expand Up @@ -1094,4 +1096,18 @@ impl DdlController {
}
}
}

pub async fn wait(&self) {
for _ in 0..30 * 60 {
if self
.catalog_manager
.list_creating_background_mvs()
.await
.is_empty()
{
break;
}
sleep(Duration::from_secs(1)).await;
}
}
}
7 changes: 7 additions & 0 deletions src/rpc_client/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,12 @@ impl MetaClient {
Ok(resp.snapshot.unwrap())
}

pub async fn wait(&self) -> Result<()> {
let request = WaitRequest {};
self.inner.wait(request).await?;
Ok(())
}

pub async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>> {
let request = CancelCreatingJobsRequest { jobs: Some(jobs) };
let resp = self.inner.cancel_creating_jobs(request).await?;
Expand Down Expand Up @@ -1719,6 +1725,7 @@ macro_rules! for_all_meta_rpc {
,{ ddl_client, list_connections, ListConnectionsRequest, ListConnectionsResponse }
,{ ddl_client, drop_connection, DropConnectionRequest, DropConnectionResponse }
,{ ddl_client, get_tables, GetTablesRequest, GetTablesResponse }
,{ ddl_client, wait, WaitRequest, WaitResponse }
,{ hummock_client, unpin_version_before, UnpinVersionBeforeRequest, UnpinVersionBeforeResponse }
,{ hummock_client, get_current_version, GetCurrentVersionRequest, GetCurrentVersionResponse }
,{ hummock_client, replay_version_delta, ReplayVersionDeltaRequest, ReplayVersionDeltaResponse }
Expand Down
6 changes: 6 additions & 0 deletions src/sqlparser/src/ast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1294,6 +1294,9 @@ pub enum Statement {
///
/// Note: RisingWave specific statement.
Flush,
/// WAIT for ALL running stream jobs to finish.
/// It will block the current session the condition is met.
Wait,
}

impl fmt::Display for Statement {
Expand Down Expand Up @@ -1787,6 +1790,9 @@ impl fmt::Display for Statement {
Statement::Flush => {
write!(f, "FLUSH")
}
Statement::Wait => {
write!(f, "WAIT")
}
Statement::Begin { modes } => {
write!(f, "BEGIN")?;
if !modes.is_empty() {
Expand Down
1 change: 1 addition & 0 deletions src/sqlparser/src/keywords.rs
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,7 @@ define_keywords!(
VIEWS,
VIRTUAL,
VOLATILE,
WAIT,
WATERMARK,
WHEN,
WHENEVER,
Expand Down
1 change: 1 addition & 0 deletions src/sqlparser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ impl Parser {
Keyword::PREPARE => Ok(self.parse_prepare()?),
Keyword::COMMENT => Ok(self.parse_comment()?),
Keyword::FLUSH => Ok(Statement::Flush),
Keyword::WAIT => Ok(Statement::Wait),
_ => self.expected(
"an SQL statement",
Token::Word(w).with_location(token.location),
Expand Down
2 changes: 2 additions & 0 deletions src/utils/pgwire/src/pg_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ pub enum StatementType {
ROLLBACK,
SET_TRANSACTION,
CANCEL_COMMAND,
WAIT,
}

impl std::fmt::Display for StatementType {
Expand Down Expand Up @@ -278,6 +279,7 @@ impl StatementType {
},
Statement::Explain { .. } => Ok(StatementType::EXPLAIN),
Statement::Flush => Ok(StatementType::FLUSH),
Statement::Wait => Ok(StatementType::WAIT),
_ => Err("unsupported statement type".to_string()),
}
}
Expand Down

0 comments on commit 8221d3a

Please sign in to comment.