Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support show and cancel jobs syntax #11854

Merged
merged 4 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -130,18 +130,30 @@ message FlushResponse {
hummock.HummockSnapshot snapshot = 2;
}

message CreatingJobInfo {
uint32 database_id = 1;
uint32 schema_id = 2;
string name = 3;
}

message CancelCreatingJobsRequest {
repeated CreatingJobInfo infos = 1;
message CreatingJobInfo {
uint32 database_id = 1;
uint32 schema_id = 2;
string name = 3;
}

message CreatingJobInfos {
repeated CreatingJobInfo infos = 1;
}

message CreatingJobIds {
repeated uint32 job_ids = 1;
}

oneof jobs {
CreatingJobInfos infos = 1;
CreatingJobIds ids = 2;
}
}

message CancelCreatingJobsResponse {
common.Status status = 1;
repeated uint32 canceled_jobs = 2;
}

message ListTableFragmentsRequest {
Expand Down
51 changes: 51 additions & 0 deletions src/frontend/src/handler/cancel_job.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use itertools::Itertools;
yezizp2012 marked this conversation as resolved.
Show resolved Hide resolved
use pgwire::pg_field_descriptor::PgFieldDescriptor;
use pgwire::pg_response::{PgResponse, StatementType};
use pgwire::types::Row;
use risingwave_common::error::Result;
use risingwave_common::types::DataType;
use risingwave_pb::meta::cancel_creating_jobs_request::{CreatingJobIds, PbJobs};
use risingwave_sqlparser::ast::JobIdents;

use crate::handler::{HandlerArgs, RwPgResponse};

pub(super) async fn handle_cancel(
handler_args: HandlerArgs,
jobs: JobIdents,
) -> Result<RwPgResponse> {
let session = handler_args.session;

let canceled_jobs = session
.env()
.meta_client()
.cancel_creating_jobs(PbJobs::Ids(CreatingJobIds { job_ids: jobs.0 }))
.await?;
let rows = canceled_jobs
.into_iter()
.map(|id| Row::new(vec![Some(id.to_string().into())]))
.collect_vec();
Ok(PgResponse::builder(StatementType::CANCEL_COMMAND)
.values(
rows.into(),
vec![PgFieldDescriptor::new(
"Id".to_string(),
DataType::Varchar.to_oid(),
DataType::Varchar.type_len(),
)],
)
.into())
}
5 changes: 4 additions & 1 deletion src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use risingwave_sqlparser::ast::*;
use self::util::DataChunkToRowSetAdapter;
use self::variable::handle_set_time_zone;
use crate::catalog::table_catalog::TableType;
use crate::handler::cancel_job::handle_cancel;
use crate::scheduler::{DistributedQueryStream, LocalQueryStream};
use crate::session::SessionImpl;
use crate::utils::WithOptions;
Expand All @@ -36,6 +37,7 @@ mod alter_relation_rename;
mod alter_system;
mod alter_table_column;
pub mod alter_user;
pub mod cancel_job;
pub mod create_connection;
mod create_database;
pub mod create_function;
Expand Down Expand Up @@ -283,7 +285,7 @@ pub async fn handle(
Statement::ShowObjects {
object: show_object,
filter,
} => show::handle_show_object(handler_args, show_object, filter),
} => show::handle_show_object(handler_args, show_object, filter).await,
Statement::ShowCreateObject { create_type, name } => {
show::handle_show_create_object(handler_args, create_type, name)
}
Expand Down Expand Up @@ -513,6 +515,7 @@ pub async fn handle(
snapshot,
session,
} => transaction::handle_set(handler_args, modes, snapshot, session).await,
Statement::CancelJobs(jobs) => handle_cancel(handler_args, jobs).await,
_ => Err(
ErrorCode::NotImplemented(format!("Unhandled statement: {}", stmt), None.into()).into(),
),
Expand Down
58 changes: 52 additions & 6 deletions src/frontend/src/handler/show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,12 @@ fn schema_or_default(schema: &Option<Ident>) -> String {
.map_or_else(|| DEFAULT_SCHEMA_NAME.to_string(), |s| s.real_value())
}

pub fn handle_show_object(
pub async fn handle_show_object(
handler_args: HandlerArgs,
command: ShowObject,
filter: Option<ShowStatementFilter>,
) -> Result<RwPgResponse> {
let session = handler_args.session;
let catalog_reader = session.env().catalog_reader().read_guard();

if let Some(ShowStatementFilter::Where(..)) = filter {
return Err(ErrorCode::NotImplemented(
Expand All @@ -97,37 +96,47 @@ pub fn handle_show_object(
.into());
}

let catalog_reader = session.env().catalog_reader();

let names = match command {
// If not include schema name, use default schema name
ShowObject::Table { schema } => catalog_reader
.read_guard()
.get_schema_by_name(session.database(), &schema_or_default(&schema))?
.iter_table()
.map(|t| t.name.clone())
.collect(),
ShowObject::InternalTable { schema } => catalog_reader
.read_guard()
.get_schema_by_name(session.database(), &schema_or_default(&schema))?
.iter_internal_table()
.map(|t| t.name.clone())
.collect(),
ShowObject::Database => catalog_reader.get_all_database_names(),
ShowObject::Schema => catalog_reader.get_all_schema_names(session.database())?,
ShowObject::Database => catalog_reader.read_guard().get_all_database_names(),
ShowObject::Schema => catalog_reader
.read_guard()
.get_all_schema_names(session.database())?,
ShowObject::View { schema } => catalog_reader
.read_guard()
.get_schema_by_name(session.database(), &schema_or_default(&schema))?
.iter_view()
.map(|t| t.name.clone())
.collect(),
ShowObject::MaterializedView { schema } => catalog_reader
.read_guard()
.get_schema_by_name(session.database(), &schema_or_default(&schema))?
.iter_mv()
.map(|t| t.name.clone())
.collect(),
ShowObject::Source { schema } => catalog_reader
.read_guard()
.get_schema_by_name(session.database(), &schema_or_default(&schema))?
.iter_source()
.filter(|t| t.associated_table_id.is_none())
.map(|t| t.name.clone())
.collect(),
ShowObject::Sink { schema } => catalog_reader
.read_guard()
.get_schema_by_name(session.database(), &schema_or_default(&schema))?
.iter_sink()
.map(|t| t.name.clone())
Expand Down Expand Up @@ -192,8 +201,9 @@ pub fn handle_show_object(
.into());
}
ShowObject::Connection { schema } => {
let schema = catalog_reader
.get_schema_by_name(session.database(), &schema_or_default(&schema))?;
let reader = catalog_reader.read_guard();
let schema =
reader.get_schema_by_name(session.database(), &schema_or_default(&schema))?;
let rows = schema
.iter_connections()
.map(|c| {
Expand Down Expand Up @@ -260,6 +270,7 @@ pub fn handle_show_object(
}
ShowObject::Function { schema } => {
let rows = catalog_reader
.read_guard()
.get_schema_by_name(session.database(), &schema_or_default(&schema))?
.iter_function()
.map(|t| {
Expand Down Expand Up @@ -367,6 +378,41 @@ pub fn handle_show_object(
)
.into());
}
ShowObject::Jobs => {
let resp = session.env().meta_client().list_ddl_progress().await?;
let rows = resp
.into_iter()
.map(|job| {
Row::new(vec![
Some(job.id.to_string().into()),
Some(job.statement.into()),
Some(job.progress.into()),
])
})
.collect_vec();
return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)
.values(
rows.into(),
vec![
PgFieldDescriptor::new(
"Id".to_owned(),
DataType::Int64.to_oid(),
DataType::Int64.type_len(),
),
PgFieldDescriptor::new(
"Statement".to_owned(),
DataType::Varchar.to_oid(),
DataType::Varchar.type_len(),
),
PgFieldDescriptor::new(
"Progress".to_owned(),
DataType::Varchar.to_oid(),
DataType::Varchar.type_len(),
),
],
)
.into());
}
};

let rows = names
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ use risingwave_common::system_param::reader::SystemParamsReader;
use risingwave_pb::backup_service::MetaSnapshotMetadata;
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::hummock::HummockSnapshot;
use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
use risingwave_pb::meta::list_actor_states_response::ActorState;
use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution;
use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState;
use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
use risingwave_pb::meta::CreatingJobInfo;
use risingwave_rpc_client::error::Result;
use risingwave_rpc_client::{HummockMetaClient, MetaClient};

Expand All @@ -39,7 +39,7 @@ pub trait FrontendMetaClient: Send + Sync {

async fn flush(&self, checkpoint: bool) -> Result<HummockSnapshot>;

async fn cancel_creating_jobs(&self, infos: Vec<CreatingJobInfo>) -> Result<()>;
async fn cancel_creating_jobs(&self, jobs: PbJobs) -> Result<Vec<u32>>;

async fn list_table_fragments(
&self,
Expand Down Expand Up @@ -85,7 +85,7 @@ impl FrontendMetaClient for FrontendMetaClientImpl {
self.0.flush(checkpoint).await
}

async fn cancel_creating_jobs(&self, infos: Vec<CreatingJobInfo>) -> Result<()> {
async fn cancel_creating_jobs(&self, infos: PbJobs) -> Result<Vec<u32>> {
self.0.cancel_creating_jobs(infos).await
}

Expand Down
8 changes: 6 additions & 2 deletions src/frontend/src/scheduler/streaming_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ use std::sync::Arc;
use itertools::Itertools;
use parking_lot::RwLock;
use pgwire::pg_server::SessionId;
use risingwave_pb::meta::CreatingJobInfo;
use risingwave_pb::meta::cancel_creating_jobs_request::{
CreatingJobInfo, CreatingJobInfos, PbJobs,
};
use uuid::Uuid;

use crate::catalog::{DatabaseId, SchemaId};
Expand Down Expand Up @@ -126,7 +128,9 @@ impl StreamingJobTracker {
let client = self.meta_client.clone();
tokio::spawn(async move {
client
.cancel_creating_jobs(jobs.into_iter().map(|job| job.info).collect_vec())
.cancel_creating_jobs(PbJobs::Infos(CreatingJobInfos {
infos: jobs.into_iter().map(|job| job.info).collect_vec(),
}))
.await
});
}
Expand Down
7 changes: 4 additions & 3 deletions src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ use risingwave_pb::catalog::{
};
use risingwave_pb::ddl_service::{create_connection_request, DdlProgress};
use risingwave_pb::hummock::HummockSnapshot;
use risingwave_pb::meta::cancel_creating_jobs_request::PbJobs;
use risingwave_pb::meta::list_actor_states_response::ActorState;
use risingwave_pb::meta::list_fragment_distribution_response::FragmentDistribution;
use risingwave_pb::meta::list_table_fragment_states_response::TableFragmentState;
use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo;
use risingwave_pb::meta::{CreatingJobInfo, SystemParams};
use risingwave_pb::meta::SystemParams;
use risingwave_pb::stream_plan::StreamFragmentGraph;
use risingwave_pb::user::update_user_request::UpdateField;
use risingwave_pb::user::{GrantPrivilege, UserInfo};
Expand Down Expand Up @@ -763,8 +764,8 @@ impl FrontendMetaClient for MockFrontendMetaClient {
})
}

async fn cancel_creating_jobs(&self, _infos: Vec<CreatingJobInfo>) -> RpcResult<()> {
Ok(())
async fn cancel_creating_jobs(&self, _infos: PbJobs) -> RpcResult<Vec<u32>> {
Ok(vec![])
}

async fn list_table_fragments(
Expand Down
3 changes: 2 additions & 1 deletion src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,9 @@ macro_rules! commit_meta {

use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_common::util::epoch::Epoch;
use risingwave_pb::meta::cancel_creating_jobs_request::CreatingJobInfo;
use risingwave_pb::meta::relation::RelationInfo;
use risingwave_pb::meta::{CreatingJobInfo, Relation, RelationGroup};
use risingwave_pb::meta::{Relation, RelationGroup};
pub(crate) use {commit_meta, commit_meta_with_trx};

use crate::manager::catalog::utils::{
Expand Down
31 changes: 21 additions & 10 deletions src/meta/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::{HashMap, HashSet};

use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_pb::meta::cancel_creating_jobs_request::Jobs;
use risingwave_pb::meta::list_table_fragments_response::{
ActorInfo, FragmentInfo, TableFragmentInfo,
};
Expand Down Expand Up @@ -85,16 +86,26 @@ where
request: Request<CancelCreatingJobsRequest>,
) -> TonicResponse<CancelCreatingJobsResponse> {
let req = request.into_inner();
let table_ids = self
.catalog_manager
.find_creating_streaming_job_ids(req.infos)
.await;
if !table_ids.is_empty() {
self.stream_manager
.cancel_streaming_jobs(table_ids.into_iter().map(TableId::from).collect_vec())
.await;
}
Ok(Response::new(CancelCreatingJobsResponse { status: None }))
let table_ids = match req.jobs.unwrap() {
Jobs::Infos(infos) => {
self.catalog_manager
.find_creating_streaming_job_ids(infos.infos)
.await
}
Jobs::Ids(jobs) => jobs.job_ids,
};

let canceled_jobs = self
.stream_manager
.cancel_streaming_jobs(table_ids.into_iter().map(TableId::from).collect_vec())
.await
.into_iter()
.map(|id| id.table_id)
.collect_vec();
Ok(Response::new(CancelCreatingJobsResponse {
status: None,
canceled_jobs,
}))
}

#[cfg_attr(coverage, no_coverage)]
Expand Down
Loading