From 3b7d67b90ea891fa596cb85055c253215cec9cec Mon Sep 17 00:00:00 2001 From: August Date: Thu, 24 Aug 2023 16:42:43 +0800 Subject: [PATCH] feat: support show and cancel jobs syntax (#11854) --- proto/meta.proto | 26 ++++++--- src/frontend/src/handler/cancel_job.rs | 51 ++++++++++++++++ src/frontend/src/handler/mod.rs | 5 +- src/frontend/src/handler/show.rs | 58 +++++++++++++++++-- src/frontend/src/meta_client.rs | 6 +- .../src/scheduler/streaming_manager.rs | 8 ++- src/frontend/src/test_utils.rs | 7 ++- src/meta/src/manager/catalog/mod.rs | 3 +- src/meta/src/rpc/service/stream_service.rs | 31 ++++++---- src/meta/src/stream/stream_manager.rs | 51 +++++++++++----- src/rpc_client/src/meta_client.rs | 9 +-- src/sqlparser/src/ast/mod.rs | 12 ++++ src/sqlparser/src/keywords.rs | 2 + src/sqlparser/src/parser.rs | 19 ++++++ src/utils/pgwire/src/pg_response.rs | 2 + 15 files changed, 238 insertions(+), 52 deletions(-) create mode 100644 src/frontend/src/handler/cancel_job.rs diff --git a/proto/meta.proto b/proto/meta.proto index ac8c36336c93b..b5f3b7ce7ec5f 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -130,18 +130,30 @@ message FlushResponse { hummock.HummockSnapshot snapshot = 2; } -message CreatingJobInfo { - uint32 database_id = 1; - uint32 schema_id = 2; - string name = 3; -} - message CancelCreatingJobsRequest { - repeated CreatingJobInfo infos = 1; + message CreatingJobInfo { + uint32 database_id = 1; + uint32 schema_id = 2; + string name = 3; + } + + message CreatingJobInfos { + repeated CreatingJobInfo infos = 1; + } + + message CreatingJobIds { + repeated uint32 job_ids = 1; + } + + oneof jobs { + CreatingJobInfos infos = 1; + CreatingJobIds ids = 2; + } } message CancelCreatingJobsResponse { common.Status status = 1; + repeated uint32 canceled_jobs = 2; } message ListTableFragmentsRequest { diff --git a/src/frontend/src/handler/cancel_job.rs b/src/frontend/src/handler/cancel_job.rs new file mode 100644 index 0000000000000..3a9c2ec3163b7 --- /dev/null +++ b/src/frontend/src/handler/cancel_job.rs @@ -0,0 +1,51 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use itertools::Itertools; +use pgwire::pg_field_descriptor::PgFieldDescriptor; +use pgwire::pg_response::{PgResponse, StatementType}; +use pgwire::types::Row; +use risingwave_common::error::Result; +use risingwave_common::types::DataType; +use risingwave_pb::meta::cancel_creating_jobs_request::{CreatingJobIds, PbJobs}; +use risingwave_sqlparser::ast::JobIdents; + +use crate::handler::{HandlerArgs, RwPgResponse}; + +pub(super) async fn handle_cancel( + handler_args: HandlerArgs, + jobs: JobIdents, +) -> Result { + let session = handler_args.session; + + let canceled_jobs = session + .env() + .meta_client() + .cancel_creating_jobs(PbJobs::Ids(CreatingJobIds { job_ids: jobs.0 })) + .await?; + let rows = canceled_jobs + .into_iter() + .map(|id| Row::new(vec![Some(id.to_string().into())])) + .collect_vec(); + Ok(PgResponse::builder(StatementType::CANCEL_COMMAND) + .values( + rows.into(), + vec![PgFieldDescriptor::new( + "Id".to_string(), + DataType::Varchar.to_oid(), + DataType::Varchar.type_len(), + )], + ) + .into()) +} diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index 464d71427004d..4821a12455e04 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -28,6 +28,7 @@ use risingwave_sqlparser::ast::*; use self::util::DataChunkToRowSetAdapter; use self::variable::handle_set_time_zone; use crate::catalog::table_catalog::TableType; +use crate::handler::cancel_job::handle_cancel; use crate::scheduler::{DistributedQueryStream, LocalQueryStream}; use crate::session::SessionImpl; use crate::utils::WithOptions; @@ -37,6 +38,7 @@ mod alter_source_column; mod alter_system; mod alter_table_column; pub mod alter_user; +pub mod cancel_job; pub mod create_connection; mod create_database; pub mod create_function; @@ -284,7 +286,7 @@ pub async fn handle( Statement::ShowObjects { object: show_object, filter, - } => show::handle_show_object(handler_args, show_object, filter), + } => show::handle_show_object(handler_args, show_object, filter).await, Statement::ShowCreateObject { create_type, name } => { show::handle_show_create_object(handler_args, create_type, name) } @@ -518,6 +520,7 @@ pub async fn handle( snapshot, session, } => transaction::handle_set(handler_args, modes, snapshot, session).await, + Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await, _ => Err( ErrorCode::NotImplemented(format!("Unhandled statement: {}", stmt), None.into()).into(), ), diff --git a/src/frontend/src/handler/show.rs b/src/frontend/src/handler/show.rs index e869d6160acf9..9ad9e905faa27 100644 --- a/src/frontend/src/handler/show.rs +++ b/src/frontend/src/handler/show.rs @@ -81,13 +81,12 @@ fn schema_or_default(schema: &Option) -> String { .map_or_else(|| DEFAULT_SCHEMA_NAME.to_string(), |s| s.real_value()) } -pub fn handle_show_object( +pub async fn handle_show_object( handler_args: HandlerArgs, command: ShowObject, filter: Option, ) -> Result { let session = handler_args.session; - let catalog_reader = session.env().catalog_reader().read_guard(); if let Some(ShowStatementFilter::Where(..)) = filter { return Err(ErrorCode::NotImplemented( @@ -97,37 +96,47 @@ pub fn handle_show_object( .into()); } + let catalog_reader = session.env().catalog_reader(); + let names = match command { // If not include schema name, use default schema name ShowObject::Table { schema } => catalog_reader + .read_guard() .get_schema_by_name(session.database(), &schema_or_default(&schema))? .iter_table() .map(|t| t.name.clone()) .collect(), ShowObject::InternalTable { schema } => catalog_reader + .read_guard() .get_schema_by_name(session.database(), &schema_or_default(&schema))? .iter_internal_table() .map(|t| t.name.clone()) .collect(), - ShowObject::Database => catalog_reader.get_all_database_names(), - ShowObject::Schema => catalog_reader.get_all_schema_names(session.database())?, + ShowObject::Database => catalog_reader.read_guard().get_all_database_names(), + ShowObject::Schema => catalog_reader + .read_guard() + .get_all_schema_names(session.database())?, ShowObject::View { schema } => catalog_reader + .read_guard() .get_schema_by_name(session.database(), &schema_or_default(&schema))? .iter_view() .map(|t| t.name.clone()) .collect(), ShowObject::MaterializedView { schema } => catalog_reader + .read_guard() .get_schema_by_name(session.database(), &schema_or_default(&schema))? .iter_mv() .map(|t| t.name.clone()) .collect(), ShowObject::Source { schema } => catalog_reader + .read_guard() .get_schema_by_name(session.database(), &schema_or_default(&schema))? .iter_source() .filter(|t| t.associated_table_id.is_none()) .map(|t| t.name.clone()) .collect(), ShowObject::Sink { schema } => catalog_reader + .read_guard() .get_schema_by_name(session.database(), &schema_or_default(&schema))? .iter_sink() .map(|t| t.name.clone()) @@ -192,8 +201,9 @@ pub fn handle_show_object( .into()); } ShowObject::Connection { schema } => { - let schema = catalog_reader - .get_schema_by_name(session.database(), &schema_or_default(&schema))?; + let reader = catalog_reader.read_guard(); + let schema = + reader.get_schema_by_name(session.database(), &schema_or_default(&schema))?; let rows = schema .iter_connections() .map(|c| { @@ -260,6 +270,7 @@ pub fn handle_show_object( } ShowObject::Function { schema } => { let rows = catalog_reader + .read_guard() .get_schema_by_name(session.database(), &schema_or_default(&schema))? .iter_function() .map(|t| { @@ -367,6 +378,41 @@ pub fn handle_show_object( ) .into()); } + ShowObject::Jobs => { + let resp = session.env().meta_client().list_ddl_progress().await?; + let rows = resp + .into_iter() + .map(|job| { + Row::new(vec![ + Some(job.id.to_string().into()), + Some(job.statement.into()), + Some(job.progress.into()), + ]) + }) + .collect_vec(); + return Ok(PgResponse::builder(StatementType::SHOW_COMMAND) + .values( + rows.into(), + vec![ + PgFieldDescriptor::new( + "Id".to_owned(), + DataType::Int64.to_oid(), + DataType::Int64.type_len(), + ), + PgFieldDescriptor::new( + "Statement".to_owned(), + DataType::Varchar.to_oid(), + DataType::Varchar.type_len(), + ), + PgFieldDescriptor::new( + "Progress".to_owned(), + DataType::Varchar.to_oid(), + DataType::Varchar.type_len(), + ), + ], + ) + .into()); + } }; let rows = names diff --git a/src/frontend/src/meta_client.rs b/src/frontend/src/meta_client.rs index a7f4388bf55b3..9423f2e8e63aa 100644 --- a/src/frontend/src/meta_client.rs +++ b/src/frontend/src/meta_client.rs @@ -18,11 +18,11 @@ use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_pb::backup_service::MetaSnapshotMetadata; use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::hummock::HummockSnapshot; +use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs; use risingwave_pb::meta::list_actor_states_response::ActorState; use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution; use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState; use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo; -use risingwave_pb::meta::CreatingJobInfo; use risingwave_rpc_client::error::Result; use risingwave_rpc_client::{HummockMetaClient, MetaClient}; @@ -39,7 +39,7 @@ pub trait FrontendMetaClient: Send + Sync { async fn flush(&self, checkpoint: bool) -> Result; - async fn cancel_creating_jobs(&self, infos: Vec) -> Result<()>; + async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result>; async fn list_table_fragments( &self, @@ -85,7 +85,7 @@ impl FrontendMetaClient for FrontendMetaClientImpl { self.0.flush(checkpoint).await } - async fn cancel_creating_jobs(&self, infos: Vec) -> Result<()> { + async fn cancel_creating_jobs(&self, infos: PbJobs) -> Result> { self.0.cancel_creating_jobs(infos).await } diff --git a/src/frontend/src/scheduler/streaming_manager.rs b/src/frontend/src/scheduler/streaming_manager.rs index 04e030aabcb99..a4b7cba6181cf 100644 --- a/src/frontend/src/scheduler/streaming_manager.rs +++ b/src/frontend/src/scheduler/streaming_manager.rs @@ -19,7 +19,9 @@ use std::sync::Arc; use itertools::Itertools; use parking_lot::RwLock; use pgwire::pg_server::SessionId; -use risingwave_pb::meta::CreatingJobInfo; +use risingwave_pb::meta::cancel_creating_jobs_request::{ + CreatingJobInfo, CreatingJobInfos, PbJobs, +}; use uuid::Uuid; use crate::catalog::{DatabaseId, SchemaId}; @@ -126,7 +128,9 @@ impl StreamingJobTracker { let client = self.meta_client.clone(); tokio::spawn(async move { client - .cancel_creating_jobs(jobs.into_iter().map(|job| job.info).collect_vec()) + .cancel_creating_jobs(PbJobs::Infos(CreatingJobInfos { + infos: jobs.into_iter().map(|job| job.info).collect_vec(), + })) .await }); } diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 7519f6c281d8d..5f7151d689406 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -36,11 +36,12 @@ use risingwave_pb::catalog::{ }; use risingwave_pb::ddl_service::{create_connection_request, DdlProgress}; use risingwave_pb::hummock::HummockSnapshot; +use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs; use risingwave_pb::meta::list_actor_states_response::ActorState; use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution; use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState; use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo; -use risingwave_pb::meta::{CreatingJobInfo, SystemParams}; +use risingwave_pb::meta::SystemParams; use risingwave_pb::stream_plan::StreamFragmentGraph; use risingwave_pb::user::update_user_request::UpdateField; use risingwave_pb::user::{GrantPrivilege, UserInfo}; @@ -768,8 +769,8 @@ impl FrontendMetaClient for MockFrontendMetaClient { }) } - async fn cancel_creating_jobs(&self, _infos: Vec) -> RpcResult<()> { - Ok(()) + async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult> { + Ok(vec![]) } async fn list_table_fragments( diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 0aa241c3bd53f..a6552f2126a11 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -112,8 +112,9 @@ macro_rules! commit_meta { use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::epoch::Epoch; +use risingwave_pb::meta::cancel_creating_jobs_request::CreatingJobInfo; use risingwave_pb::meta::relation::RelationInfo; -use risingwave_pb::meta::{CreatingJobInfo, Relation, RelationGroup}; +use risingwave_pb::meta::{Relation, RelationGroup}; pub(crate) use {commit_meta, commit_meta_with_trx}; use crate::manager::catalog::utils::{ diff --git a/src/meta/src/rpc/service/stream_service.rs b/src/meta/src/rpc/service/stream_service.rs index 8c223bcc1ab03..c9a42b33113f0 100644 --- a/src/meta/src/rpc/service/stream_service.rs +++ b/src/meta/src/rpc/service/stream_service.rs @@ -16,6 +16,7 @@ use std::collections::{HashMap, HashSet}; use itertools::Itertools; use risingwave_common::catalog::TableId; +use risingwave_pb::meta::cancel_creating_jobs_request::Jobs; use risingwave_pb::meta::list_table_fragments_response::{ ActorInfo, FragmentInfo, TableFragmentInfo, }; @@ -85,16 +86,26 @@ where request: Request, ) -> TonicResponse { let req = request.into_inner(); - let table_ids = self - .catalog_manager - .find_creating_streaming_job_ids(req.infos) - .await; - if !table_ids.is_empty() { - self.stream_manager - .cancel_streaming_jobs(table_ids.into_iter().map(TableId::from).collect_vec()) - .await; - } - Ok(Response::new(CancelCreatingJobsResponse { status: None })) + let table_ids = match req.jobs.unwrap() { + Jobs::Infos(infos) => { + self.catalog_manager + .find_creating_streaming_job_ids(infos.infos) + .await + } + Jobs::Ids(jobs) => jobs.job_ids, + }; + + let canceled_jobs = self + .stream_manager + .cancel_streaming_jobs(table_ids.into_iter().map(TableId::from).collect_vec()) + .await + .into_iter() + .map(|id| id.table_id) + .collect_vec(); + Ok(Response::new(CancelCreatingJobsResponse { + status: None, + canceled_jobs, + })) } #[cfg_attr(coverage, no_coverage)] diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index e92f905a156f9..aa0c850f30142 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -15,7 +15,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use futures::future::{try_join_all, BoxFuture}; +use futures::future::{join_all, try_join_all, BoxFuture}; use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_pb::catalog::Table; @@ -25,7 +25,7 @@ use risingwave_pb::stream_service::{ BroadcastActorInfoTableRequest, BuildActorsRequest, DropActorsRequest, UpdateActorsRequest, }; use tokio::sync::mpsc::Sender; -use tokio::sync::{Mutex, RwLock}; +use tokio::sync::{oneshot, Mutex, RwLock}; use tracing::Instrument; use uuid::Uuid; @@ -78,7 +78,8 @@ impl CreateStreamingJobContext { pub enum CreatingState { Failed { reason: MetaError }, - Canceling, + // sender is used to notify the canceling result. + Canceling { finish_tx: oneshot::Sender<()> }, Created, } @@ -112,20 +113,22 @@ impl CreatingStreamingJobInfo { jobs.remove(&job_id); } - async fn cancel_jobs(&self, job_ids: Vec) { + async fn cancel_jobs(&self, job_ids: Vec) -> HashMap> { let mut jobs = self.streaming_jobs.lock().await; + let mut receivers = HashMap::new(); for job_id in job_ids { if let Some(job) = jobs.get_mut(&job_id) && let Some(shutdown_tx) = job.shutdown_tx.take() { - let _ = shutdown_tx - .send(CreatingState::Canceling) - .await - .inspect_err(|_| { - tracing::warn!("failed to send canceling state"); - }); + let (tx, rx) = oneshot::channel(); + if shutdown_tx.send(CreatingState::Canceling{finish_tx: tx}).await.is_ok() { + receivers.insert(job_id, rx); + } else { + tracing::warn!("failed to send canceling state"); + } } } + receivers } } @@ -255,7 +258,7 @@ where CreatingState::Failed { reason } => { return Err(reason); } - CreatingState::Canceling => { + CreatingState::Canceling { finish_tx } => { if let Ok(table_fragments) = self .fragment_manager .select_table_fragments_by_table_id(&table_id) @@ -301,7 +304,6 @@ where table_id, ))) .await?; - return Err(MetaError::cancelled("create".into())); } if !table_fragments.is_created() { tracing::debug!( @@ -310,8 +312,11 @@ where self.barrier_scheduler .run_command(Command::CancelStreamingJob(table_fragments)) .await?; - return Err(MetaError::cancelled("create".into())); } + let _ = finish_tx.send(()).inspect_err(|_| { + tracing::warn!("failed to notify cancelled: {table_id}") + }); + return Err(MetaError::cancelled("create".into())); } } CreatingState::Created => return Ok(()), @@ -540,9 +545,25 @@ where Ok(()) } - pub async fn cancel_streaming_jobs(&self, table_ids: Vec) { + /// Cancel streaming jobs and return the canceled table ids. + pub async fn cancel_streaming_jobs(&self, table_ids: Vec) -> Vec { + if table_ids.is_empty() { + return vec![]; + } + let _reschedule_job_lock = self.reschedule_lock.read().await; - self.creating_job_info.cancel_jobs(table_ids).await; + let receivers = self.creating_job_info.cancel_jobs(table_ids).await; + + let futures = receivers.into_iter().map(|(id, receiver)| async move { + if receiver.await.is_ok() { + tracing::info!("canceled streaming job {id}"); + Some(id) + } else { + tracing::warn!("failed to cancel streaming job {id}"); + None + } + }); + join_all(futures).await.into_iter().flatten().collect_vec() } } diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index ddedcacd7c7f5..8c3186fe707c7 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -55,6 +55,7 @@ use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_c use risingwave_pb::hummock::subscribe_compaction_event_request::Register; use risingwave_pb::hummock::*; use risingwave_pb::meta::add_worker_node_request::Property; +use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs; use risingwave_pb::meta::cluster_service_client::ClusterServiceClient; use risingwave_pb::meta::get_reschedule_plan_request::PbPolicy; use risingwave_pb::meta::heartbeat_request::{extra_info, ExtraInfo}; @@ -693,10 +694,10 @@ impl MetaClient { Ok(resp.snapshot.unwrap()) } - pub async fn cancel_creating_jobs(&self, infos: Vec) -> Result<()> { - let request = CancelCreatingJobsRequest { infos }; - let _ = self.inner.cancel_creating_jobs(request).await?; - Ok(()) + pub async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result> { + let request = CancelCreatingJobsRequest { jobs: Some(jobs) }; + let resp = self.inner.cancel_creating_jobs(request).await?; + Ok(resp.canceled_jobs) } pub async fn list_table_fragments( diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index fdeff75d6a213..0d0ff1193dff1 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -831,8 +831,13 @@ pub enum ShowObject { Function { schema: Option }, Indexes { table: ObjectName }, Cluster, + Jobs, } +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub struct JobIdents(pub Vec); + impl fmt::Display for ShowObject { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt_schema(schema: &Option) -> String { @@ -867,6 +872,7 @@ impl fmt::Display for ShowObject { ShowObject::Cluster => { write!(f, "CLUSTER") } + ShowObject::Jobs => write!(f, "JOBS"), } } } @@ -1142,6 +1148,8 @@ pub enum Statement { /// Show create object name name: ObjectName, }, + /// CANCEL JOBS COMMAND + CancelJobs(JobIdents), /// DROP Drop(DropStatement), /// DROP Function @@ -1760,6 +1768,10 @@ impl fmt::Display for Statement { } Ok(()) } + Statement::CancelJobs(jobs) => { + write!(f, "CANCEL JOBS {}", display_comma_separated(&jobs.0))?; + Ok(()) + } } } } diff --git a/src/sqlparser/src/keywords.rs b/src/sqlparser/src/keywords.rs index 390bfebbe5ff9..7b16379be280a 100644 --- a/src/sqlparser/src/keywords.rs +++ b/src/sqlparser/src/keywords.rs @@ -108,6 +108,7 @@ define_keywords!( CACHE, CALL, CALLED, + CANCEL, CARDINALITY, CASCADE, CASCADED, @@ -276,6 +277,7 @@ define_keywords!( IS, ISNULL, ISOLATION, + JOBS, JOIN, JSON, KEY, diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 66e3b06a8b141..13f1e0fe259ff 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -238,6 +238,7 @@ impl Parser { Ok(self.parse_show()?) } } + Keyword::CANCEL => Ok(self.parse_cancel_job()?), Keyword::DESCRIBE => Ok(Statement::Describe { name: self.parse_object_name()?, }), @@ -4020,6 +4021,12 @@ impl Parser { filter: self.parse_show_statement_filter()?, }); } + Keyword::JOBS => { + return Ok(Statement::ShowObjects { + object: ShowObject::Jobs, + filter: self.parse_show_statement_filter()?, + }); + } _ => {} } } @@ -4029,6 +4036,18 @@ impl Parser { }) } + pub fn parse_cancel_job(&mut self) -> Result { + self.expect_keyword(Keyword::JOBS)?; + let mut job_ids = vec![]; + loop { + job_ids.push(self.parse_literal_uint()? as u32); + if !self.consume_token(&Token::Comma) { + break; + } + } + Ok(Statement::CancelJobs(JobIdents(job_ids))) + } + /// Parser `from schema` after `show tables` and `show materialized views`, if not conclude /// `from` then use default schema name. pub fn parse_from_and_identifier(&mut self) -> Result, ParserError> { diff --git a/src/utils/pgwire/src/pg_response.rs b/src/utils/pgwire/src/pg_response.rs index 559cea3fbd4c9..29ea77f83b71b 100644 --- a/src/utils/pgwire/src/pg_response.rs +++ b/src/utils/pgwire/src/pg_response.rs @@ -91,6 +91,7 @@ pub enum StatementType { COMMIT, ROLLBACK, SET_TRANSACTION, + CANCEL_COMMAND, } impl std::fmt::Display for StatementType { @@ -320,6 +321,7 @@ impl StatementType { | StatementType::INSERT_RETURNING | StatementType::DELETE_RETURNING | StatementType::UPDATE_RETURNING + | StatementType::CANCEL_COMMAND ) }