Skip to content

Commit

Permalink
feat: support show and cancel jobs syntax (#11854)
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 authored and Li0k committed Sep 15, 2023
1 parent 21b7fb5 commit 15017b9
Show file tree
Hide file tree
Showing 15 changed files with 238 additions and 52 deletions.
26 changes: 19 additions & 7 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -130,18 +130,30 @@ message FlushResponse {
hummock.HummockSnapshot snapshot = 2;
}

message CreatingJobInfo {
uint32 database_id = 1;
uint32 schema_id = 2;
string name = 3;
}

message CancelCreatingJobsRequest {
repeated CreatingJobInfo infos = 1;
message CreatingJobInfo {
uint32 database_id = 1;
uint32 schema_id = 2;
string name = 3;
}

message CreatingJobInfos {
repeated CreatingJobInfo infos = 1;
}

message CreatingJobIds {
repeated uint32 job_ids = 1;
}

oneof jobs {
CreatingJobInfos infos = 1;
CreatingJobIds ids = 2;
}
}

message CancelCreatingJobsResponse {
common.Status status = 1;
repeated uint32 canceled_jobs = 2;
}

message ListTableFragmentsRequest {
Expand Down
51 changes: 51 additions & 0 deletions src/frontend/src/handler/cancel_job.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use itertools::Itertools;
use pgwire::pg_field_descriptor::PgFieldDescriptor;
use pgwire::pg_response::{PgResponse, StatementType};
use pgwire::types::Row;
use risingwave_common::error::Result;
use risingwave_common::types::DataType;
use risingwave_pb::meta::cancel_creating_jobs_request::{CreatingJobIds, PbJobs};
use risingwave_sqlparser::ast::JobIdents;

use crate::handler::{HandlerArgs, RwPgResponse};

pub(super) async fn handle_cancel(
handler_args: HandlerArgs,
jobs: JobIdents,
) -> Result<RwPgResponse> {
let session = handler_args.session;

let canceled_jobs = session
.env()
.meta_client()
.cancel_creating_jobs(PbJobs::Ids(CreatingJobIds { job_ids: jobs.0 }))
.await?;
let rows = canceled_jobs
.into_iter()
.map(|id| Row::new(vec![Some(id.to_string().into())]))
.collect_vec();
Ok(PgResponse::builder(StatementType::CANCEL_COMMAND)
.values(
rows.into(),
vec![PgFieldDescriptor::new(
"Id".to_string(),
DataType::Varchar.to_oid(),
DataType::Varchar.type_len(),
)],
)
.into())
}
5 changes: 4 additions & 1 deletion src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use risingwave_sqlparser::ast::*;
use self::util::DataChunkToRowSetAdapter;
use self::variable::handle_set_time_zone;
use crate::catalog::table_catalog::TableType;
use crate::handler::cancel_job::handle_cancel;
use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
use crate::session::SessionImpl;
use crate::utils::WithOptions;
Expand All @@ -37,6 +38,7 @@ mod alter_source_column;
mod alter_system;
mod alter_table_column;
pub mod alter_user;
pub mod cancel_job;
pub mod create_connection;
mod create_database;
pub mod create_function;
Expand Down Expand Up @@ -284,7 +286,7 @@ pub async fn handle(
Statement::ShowObjects {
object: show_object,
filter,
} => show::handle_show_object(handler_args, show_object, filter),
} => show::handle_show_object(handler_args, show_object, filter).await,
Statement::ShowCreateObject { create_type, name } => {
show::handle_show_create_object(handler_args, create_type, name)
}
Expand Down Expand Up @@ -518,6 +520,7 @@ pub async fn handle(
snapshot,
session,
} => transaction::handle_set(handler_args, modes, snapshot, session).await,
Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
_ => Err(
ErrorCode::NotImplemented(format!("Unhandled statement: {}", stmt), None.into()).into(),
),
Expand Down
58 changes: 52 additions & 6 deletions src/frontend/src/handler/show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,12 @@ fn schema_or_default(schema: &Option<Ident>) -> String {
.map_or_else(|| DEFAULT_SCHEMA_NAME.to_string(), |s| s.real_value())
}

pub fn handle_show_object(
pub async fn handle_show_object(
handler_args: HandlerArgs,
command: ShowObject,
filter: Option<ShowStatementFilter>,
) -> Result<RwPgResponse> {
let session = handler_args.session;
let catalog_reader = session.env().catalog_reader().read_guard();

if let Some(ShowStatementFilter::Where(..)) = filter {
return Err(ErrorCode::NotImplemented(
Expand All @@ -97,37 +96,47 @@ pub fn handle_show_object(
.into());
}

let catalog_reader = session.env().catalog_reader();

let names = match command {
// If not include schema name, use default schema name
ShowObject::Table { schema } => catalog_reader
.read_guard()
.get_schema_by_name(session.database(), &schema_or_default(&schema))?
.iter_table()
.map(|t| t.name.clone())
.collect(),
ShowObject::InternalTable { schema } => catalog_reader
.read_guard()
.get_schema_by_name(session.database(), &schema_or_default(&schema))?
.iter_internal_table()
.map(|t| t.name.clone())
.collect(),
ShowObject::Database => catalog_reader.get_all_database_names(),
ShowObject::Schema => catalog_reader.get_all_schema_names(session.database())?,
ShowObject::Database => catalog_reader.read_guard().get_all_database_names(),
ShowObject::Schema => catalog_reader
.read_guard()
.get_all_schema_names(session.database())?,
ShowObject::View { schema } => catalog_reader
.read_guard()
.get_schema_by_name(session.database(), &schema_or_default(&schema))?
.iter_view()
.map(|t| t.name.clone())
.collect(),
ShowObject::MaterializedView { schema } => catalog_reader
.read_guard()
.get_schema_by_name(session.database(), &schema_or_default(&schema))?
.iter_mv()
.map(|t| t.name.clone())
.collect(),
ShowObject::Source { schema } => catalog_reader
.read_guard()
.get_schema_by_name(session.database(), &schema_or_default(&schema))?
.iter_source()
.filter(|t| t.associated_table_id.is_none())
.map(|t| t.name.clone())
.collect(),
ShowObject::Sink { schema } => catalog_reader
.read_guard()
.get_schema_by_name(session.database(), &schema_or_default(&schema))?
.iter_sink()
.map(|t| t.name.clone())
Expand Down Expand Up @@ -192,8 +201,9 @@ pub fn handle_show_object(
.into());
}
ShowObject::Connection { schema } => {
let schema = catalog_reader
.get_schema_by_name(session.database(), &schema_or_default(&schema))?;
let reader = catalog_reader.read_guard();
let schema =
reader.get_schema_by_name(session.database(), &schema_or_default(&schema))?;
let rows = schema
.iter_connections()
.map(|c| {
Expand Down Expand Up @@ -260,6 +270,7 @@ pub fn handle_show_object(
}
ShowObject::Function { schema } => {
let rows = catalog_reader
.read_guard()
.get_schema_by_name(session.database(), &schema_or_default(&schema))?
.iter_function()
.map(|t| {
Expand Down Expand Up @@ -367,6 +378,41 @@ pub fn handle_show_object(
)
.into());
}
ShowObject::Jobs => {
let resp = session.env().meta_client().list_ddl_progress().await?;
let rows = resp
.into_iter()
.map(|job| {
Row::new(vec![
Some(job.id.to_string().into()),
Some(job.statement.into()),
Some(job.progress.into()),
])
})
.collect_vec();
return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
.values(
rows.into(),
vec![
PgFieldDescriptor::new(
"Id".to_owned(),
DataType::Int64.to_oid(),
DataType::Int64.type_len(),
),
PgFieldDescriptor::new(
"Statement".to_owned(),
DataType::Varchar.to_oid(),
DataType::Varchar.type_len(),
),
PgFieldDescriptor::new(
"Progress".to_owned(),
DataType::Varchar.to_oid(),
DataType::Varchar.type_len(),
),
],
)
.into());
}
};

let rows = names
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ use risingwave_common::system_param::reader::SystemParamsReader;
use risingwave_pb::backup_service::MetaSnapshotMetadata;
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::hummock::HummockSnapshot;
use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
use risingwave_pb::meta::list_actor_states_response::ActorState;
use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution;
use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState;
use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
use risingwave_pb::meta::CreatingJobInfo;
use risingwave_rpc_client::error::Result;
use risingwave_rpc_client::{HummockMetaClient, MetaClient};

Expand All @@ -39,7 +39,7 @@ pub trait FrontendMetaClient: Send + Sync {

async fn flush(&self, checkpoint: bool) -> Result<HummockSnapshot>;

async fn cancel_creating_jobs(&self, infos: Vec<CreatingJobInfo>) -> Result<()>;
async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>>;

async fn list_table_fragments(
&self,
Expand Down Expand Up @@ -85,7 +85,7 @@ impl FrontendMetaClient for FrontendMetaClientImpl {
self.0.flush(checkpoint).await
}

async fn cancel_creating_jobs(&self, infos: Vec<CreatingJobInfo>) -> Result<()> {
async fn cancel_creating_jobs(&self, infos: PbJobs) -> Result<Vec<u32>> {
self.0.cancel_creating_jobs(infos).await
}

Expand Down
8 changes: 6 additions & 2 deletions src/frontend/src/scheduler/streaming_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ use std::sync::Arc;
use itertools::Itertools;
use parking_lot::RwLock;
use pgwire::pg_server::SessionId;
use risingwave_pb::meta::CreatingJobInfo;
use risingwave_pb::meta::cancel_creating_jobs_request::{
CreatingJobInfo, CreatingJobInfos, PbJobs,
};
use uuid::Uuid;

use crate::catalog::{DatabaseId, SchemaId};
Expand Down Expand Up @@ -126,7 +128,9 @@ impl StreamingJobTracker {
let client = self.meta_client.clone();
tokio::spawn(async move {
client
.cancel_creating_jobs(jobs.into_iter().map(|job| job.info).collect_vec())
.cancel_creating_jobs(PbJobs::Infos(CreatingJobInfos {
infos: jobs.into_iter().map(|job| job.info).collect_vec(),
}))
.await
});
}
Expand Down
7 changes: 4 additions & 3 deletions src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ use risingwave_pb::catalog::{
};
use risingwave_pb::ddl_service::{create_connection_request, DdlProgress};
use risingwave_pb::hummock::HummockSnapshot;
use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
use risingwave_pb::meta::list_actor_states_response::ActorState;
use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution;
use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState;
use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
use risingwave_pb::meta::{CreatingJobInfo, SystemParams};
use risingwave_pb::meta::SystemParams;
use risingwave_pb::stream_plan::StreamFragmentGraph;
use risingwave_pb::user::update_user_request::UpdateField;
use risingwave_pb::user::{GrantPrivilege, UserInfo};
Expand Down Expand Up @@ -768,8 +769,8 @@ impl FrontendMetaClient for MockFrontendMetaClient {
})
}

async fn cancel_creating_jobs(&self, _infos: Vec<CreatingJobInfo>) -> RpcResult<()> {
Ok(())
async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult<Vec<u32>> {
Ok(vec![])
}

async fn list_table_fragments(
Expand Down
3 changes: 2 additions & 1 deletion src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,9 @@ macro_rules! commit_meta {

use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_common::util::epoch::Epoch;
use risingwave_pb::meta::cancel_creating_jobs_request::CreatingJobInfo;
use risingwave_pb::meta::relation::RelationInfo;
use risingwave_pb::meta::{CreatingJobInfo, Relation, RelationGroup};
use risingwave_pb::meta::{Relation, RelationGroup};
pub(crate) use {commit_meta, commit_meta_with_trx};

use crate::manager::catalog::utils::{
Expand Down
31 changes: 21 additions & 10 deletions src/meta/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::{HashMap, HashSet};

use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_pb::meta::cancel_creating_jobs_request::Jobs;
use risingwave_pb::meta::list_table_fragments_response::{
ActorInfo, FragmentInfo, TableFragmentInfo,
};
Expand Down Expand Up @@ -85,16 +86,26 @@ where
request: Request<CancelCreatingJobsRequest>,
) -> TonicResponse<CancelCreatingJobsResponse> {
let req = request.into_inner();
let table_ids = self
.catalog_manager
.find_creating_streaming_job_ids(req.infos)
.await;
if !table_ids.is_empty() {
self.stream_manager
.cancel_streaming_jobs(table_ids.into_iter().map(TableId::from).collect_vec())
.await;
}
Ok(Response::new(CancelCreatingJobsResponse { status: None }))
let table_ids = match req.jobs.unwrap() {
Jobs::Infos(infos) => {
self.catalog_manager
.find_creating_streaming_job_ids(infos.infos)
.await
}
Jobs::Ids(jobs) => jobs.job_ids,
};

let canceled_jobs = self
.stream_manager
.cancel_streaming_jobs(table_ids.into_iter().map(TableId::from).collect_vec())
.await
.into_iter()
.map(|id| id.table_id)
.collect_vec();
Ok(Response::new(CancelCreatingJobsResponse {
status: None,
canceled_jobs,
}))
}

#[cfg_attr(coverage, no_coverage)]
Expand Down
Loading

0 comments on commit 15017b9

Please sign in to comment.