Skip to content

Commit

Permalink
feat: distribute truncate table
Browse files Browse the repository at this point in the history
  • Loading branch information
DevilExileSu committed Sep 16, 2023
1 parent 627c5b7 commit 1d607f0
Show file tree
Hide file tree
Showing 8 changed files with 338 additions and 84 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git
derive_builder = "0.12"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "e81a60e817a348ee5b7dfbd991f86d35cd068ce5" }
# greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "1969b5d610209ad1c2ad8e4cd3e9c3c24e40f1c2" }
greptime-proto = {git = "https://github.com/DevilExileSu/greptime-proto.git", rev = "637f06ca7fe097ab22277f473d33840d92f4190c"}
humantime-serde = "1.1"
itertools = "0.10"
lazy_static = "1.4"
Expand Down
1 change: 1 addition & 0 deletions src/common/meta/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::rpc::router::RegionRoute;
pub mod alter_table;
pub mod create_table;
pub mod drop_table;
pub mod truncate_table;
pub mod utils;

#[derive(Debug, Default)]
Expand Down
229 changes: 229 additions & 0 deletions src/common/meta/src/ddl/truncate_table.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use api::v1::region::{
region_request, RegionRequest, RegionRequestHeader, TruncateRequest as PbTruncateRegionRequest,
};
use async_trait::async_trait;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
use common_procedure::{
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
Result as ProcedureResult, Status,
};
use common_telemetry::debug;
use futures::future::join_all;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use store_api::storage::RegionId;
use table::engine::TableReference;
use table::metadata::{RawTableInfo, TableId};

use crate::ddl::utils::handle_operate_region_error;
use crate::ddl::DdlContext;
use crate::error::{self, Result, TableNotFoundSnafu};
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::rpc::ddl::TruncateTableTask;
use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute};
use crate::table_name::TableName;

pub struct TruncateTableProcedure {
context: DdlContext,
data: TruncateTableData,
}

#[async_trait]
impl Procedure for TruncateTableProcedure {
fn type_name(&self) -> &str {
Self::TYPE_NAME
}

async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let error_handler = |e| {
if matches!(e, error::Error::RetryLater { .. }) {
ProcedureError::retry_later(e)
} else {
ProcedureError::external(e)
}
};
match self.data.state {
TruncateTableState::Prepare => self.on_prepare().await,
TruncateTableState::DatanodeTruncateTable => self.on_datanode_truncate_table().await,
}
.map_err(error_handler)
}

fn dump(&self) -> ProcedureResult<String> {
serde_json::to_string(&self.data).context(ToJsonSnafu)
}

fn lock_key(&self) -> LockKey {
let table_ref = &self.data.table_ref();
let key = common_catalog::format_full_table_name(
table_ref.catalog,
table_ref.schema,
table_ref.table,
);

LockKey::single(key)
}
}

impl TruncateTableProcedure {
pub(crate) const TYPE_NAME: &'static str = "metasrv-procedure::TruncateTableProcedure";

pub(crate) fn new(
cluster_id: u64,
task: TruncateTableTask,
table_info_value: TableInfoValue,
region_routes: Vec<RegionRoute>,
context: DdlContext,
) -> Self {
Self {
context,
data: TruncateTableData::new(cluster_id, task, table_info_value, region_routes),
}
}

pub(crate) fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data = serde_json::from_str(json).context(FromJsonSnafu)?;
Ok(Self { context, data })
}

// Checks whether the table exists.
async fn on_prepare(&mut self) -> Result<Status> {
let table_ref = &self.data.table_ref();

let manager = &self.context.table_metadata_manager;

let exist = manager
.table_name_manager()
.exists(TableNameKey::new(
table_ref.catalog,
table_ref.schema,
table_ref.table,
))
.await?;

ensure!(
exist,
TableNotFoundSnafu {
table_name: table_ref.to_string()
}
);

self.data.state = TruncateTableState::DatanodeTruncateTable;

Ok(Status::executing(true))
}

async fn on_datanode_truncate_table(&mut self) -> Result<Status> {
let table_id = self.data.table_id();

let region_routes = &self.data.region_routes;
let leaders = find_leaders(region_routes);
let mut truncate_region_tasks = Vec::with_capacity(leaders.len());

for datanode in leaders {
let clients = self.context.datanode_manager.clone();
let regions = find_leader_regions(region_routes, &datanode);
let region_ids = regions
.iter()
.map(|region_number| RegionId::new(table_id, *region_number))
.collect::<Vec<_>>();

truncate_region_tasks.push(async move {
for region_id in region_ids {
debug!("Truncating region {region_id} on Datanode {datanode:?}");

let request = RegionRequest {
header: Some(RegionRequestHeader {
trace_id: 0,
span_id: 0,
}),
body: Some(region_request::Body::Truncate(PbTruncateRegionRequest {
region_id: region_id.as_u64(),
})),
};

if let Err(err) = clients.datanode(&datanode).await.handle(request).await {
if err.status_code() != StatusCode::RegionNotFound {
return Err(handle_operate_region_error(datanode)(err));
}
}
}
Ok(())
});
}

join_all(truncate_region_tasks)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;

Ok(Status::Done)
}
}

#[derive(Debug, Serialize, Deserialize)]
pub struct TruncateTableData {
state: TruncateTableState,
cluster_id: u64,
task: TruncateTableTask,
table_info_value: TableInfoValue,
region_routes: Vec<RegionRoute>,
}

impl TruncateTableData {
pub fn new(
cluster_id: u64,
task: TruncateTableTask,
table_info_value: TableInfoValue,
region_routes: Vec<RegionRoute>,
) -> Self {
Self {
state: TruncateTableState::DatanodeTruncateTable,
cluster_id,
task,
table_info_value,
region_routes,
}
}

pub fn table_ref(&self) -> TableReference {
self.task.table_ref()
}

pub fn table_name(&self) -> TableName {
self.task.table_name()
}

fn table_info(&self) -> &RawTableInfo {
&self.table_info_value.table_info
}

fn table_id(&self) -> TableId {
self.table_info().ident.table_id
}
}

#[derive(Debug, Serialize, Deserialize)]
enum TruncateTableState {
/// Prepares to truncate the table
Prepare,
/// Datanode truncates the table
DatanodeTruncateTable,
}
75 changes: 49 additions & 26 deletions src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,22 @@
use std::sync::Arc;

use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId};
use common_telemetry::{error, info};
use common_telemetry::info;
use snafu::{OptionExt, ResultExt};

use crate::cache_invalidator::CacheInvalidatorRef;
use crate::datanode_manager::DatanodeManagerRef;
use crate::ddl::alter_table::AlterTableProcedure;
use crate::ddl::create_table::CreateTableProcedure;
use crate::ddl::drop_table::DropTableProcedure;
use crate::ddl::truncate_table::TruncateTableProcedure;
use crate::ddl::{
DdlContext, DdlTaskExecutor, ExecutorContext, TableMetadataAllocatorContext,
TableMetadataAllocatorRef,
};
use crate::error::{
self, RegisterProcedureLoaderSnafu, Result, SubmitProcedureSnafu, TableNotFoundSnafu,
UnsupportedSnafu, WaitProcedureSnafu,
WaitProcedureSnafu,
};
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
Expand Down Expand Up @@ -122,6 +123,20 @@ impl DdlManager {
)
.context(RegisterProcedureLoaderSnafu {
type_name: AlterTableProcedure::TYPE_NAME,
})?;

let context = self.create_context();

self.procedure_manager
.register_loader(
TruncateTableProcedure::TYPE_NAME,
Box::new(move |json| {
let context = context.clone();
TruncateTableProcedure::from_json(json, context).map(|p| Box::new(p) as _)
}),
)
.context(RegisterProcedureLoaderSnafu {
type_name: TruncateTableProcedure::TYPE_NAME,
})
}

Expand Down Expand Up @@ -183,15 +198,21 @@ impl DdlManager {
&self,
cluster_id: u64,
truncate_table_task: TruncateTableTask,
table_info_value: TableInfoValue,
region_routes: Vec<RegionRoute>,
) -> Result<ProcedureId> {
error!("Truncate table procedure is not supported, cluster_id = {}, truncate_table_task = {:?}, region_routes = {:?}",
cluster_id, truncate_table_task, region_routes);
let context = self.create_context();
let procedure = TruncateTableProcedure::new(
cluster_id,
truncate_table_task,
table_info_value,
region_routes,
context,
);

UnsupportedSnafu {
operation: "TRUNCATE TABLE",
}
.fail()
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));

self.submit_procedure(procedure_with_id).await
}

async fn submit_procedure(&self, procedure_with_id: ProcedureWithId) -> Result<ProcedureId> {
Expand All @@ -216,32 +237,34 @@ async fn handle_truncate_table_task(
cluster_id: u64,
truncate_table_task: TruncateTableTask,
) -> Result<SubmitDdlTaskResponse> {
let truncate_table = &truncate_table_task.truncate_table;
let table_id = truncate_table
.table_id
.as_ref()
.context(error::UnexpectedSnafu {
err_msg: "expected table id ",
})?
.id;

let table_id = truncate_table_task.table_id;
let table_metadata_manager = &ddl_manager.table_metadata_manager();
let table_ref = truncate_table_task.table_ref();

let table_route_value = ddl_manager
.table_metadata_manager()
.table_route_manager()
.get(table_id)
.await?
.with_context(|| error::TableRouteNotFoundSnafu {
table_name: table_ref.to_string(),
})?;
let (table_info_value, table_route_value) =
table_metadata_manager.get_full_table_info(table_id).await?;

let table_info_value = table_info_value.with_context(|| error::TableInfoNotFoundSnafu {
table_name: table_ref.to_string(),
})?;

let table_route_value = table_route_value.with_context(|| error::TableRouteNotFoundSnafu {
table_name: table_ref.to_string(),
})?;

let table_route = table_route_value.region_routes;

let id = ddl_manager
.submit_truncate_table_task(cluster_id, truncate_table_task, table_route)
.submit_truncate_table_task(
cluster_id,
truncate_table_task,
table_info_value,
table_route,
)
.await?;

info!("Table: {table_id} is truncated via procedure_id {id:?}");

Ok(SubmitDdlTaskResponse {
key: id.to_string().into(),
..Default::default()
Expand Down
Loading

0 comments on commit 1d607f0

Please sign in to comment.