Skip to content

Commit

Permalink
chore: some improve
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun committed Feb 19, 2024
1 parent f41f646 commit be317e1
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 17 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/common/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ etcd-client.workspace = true
futures-util.workspace = true
futures.workspace = true
humantime-serde.workspace = true
itertools.workspace = true
lazy_static.workspace = true
prometheus.workspace = true
prost.workspace = true
Expand Down
20 changes: 14 additions & 6 deletions src/common/meta/src/ddl/create_logical_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
use common_telemetry::info;
use common_telemetry::tracing_context::TracingContext;
use futures_util::future::join_all;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use store_api::storage::{RegionId, RegionNumber};
Expand All @@ -35,7 +36,6 @@ use crate::ddl::DdlContext;
use crate::error::{Result, TableAlreadyExistsSnafu};
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::kv_backend::txn;
use crate::lock_key::{TableLock, TableNameLock};
use crate::peer::Peer;
use crate::rpc::ddl::CreateTableTask;
Expand Down Expand Up @@ -160,11 +160,19 @@ impl CreateLogicalTablesProcedure {
let num_tables = tables_data.len();

if num_tables > 0 {
// The batch size is txn::MAX_TXN_SIZE / 3 because the size of the `tables_data`
// is 3 times the size of the `tables_data`.
const BATCH_SIZE: usize = txn::MAX_TXN_SIZE / 3;
for chunk in tables_data.chunks(BATCH_SIZE) {
manager.create_logic_tables_metadata(chunk.to_vec()).await?;
let chunk_size = manager.max_logical_tables_per_batch();
if num_tables > chunk_size {
let chunks = tables_data
.into_iter()
.chunks(chunk_size)
.into_iter()
.map(|chunk| chunk.collect::<Vec<_>>())
.collect::<Vec<_>>();
for chunk in chunks {
manager.create_logical_tables_metadata(chunk).await?;
}
} else {
manager.create_logical_tables_metadata(tables_data).await?;
}
}

Expand Down
14 changes: 10 additions & 4 deletions src/common/meta/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,8 +458,14 @@ impl TableMetadataManager {
Ok(())
}

pub fn max_logical_tables_per_batch(&self) -> usize {
// The batch size is max_txn_size / 3 because the size of the `tables_data`
// is 3 times the size of the `tables_data`.
self.kv_backend.max_txn_size() / 3
}

/// Creates metadata for multiple logical tables and return an error if different metadata exists.
pub async fn create_logic_tables_metadata(
pub async fn create_logical_tables_metadata(
&self,
tables_data: Vec<(RawTableInfo, TableRouteValue)>,
) -> Result<()> {
Expand Down Expand Up @@ -1002,13 +1008,13 @@ mod tests {
let tables_data = vec![(table_info.clone(), table_route_value.clone())];
// creates metadata.
table_metadata_manager
.create_logic_tables_metadata(tables_data.clone())
.create_logical_tables_metadata(tables_data.clone())
.await
.unwrap();

// if metadata was already created, it should be ok.
assert!(table_metadata_manager
.create_logic_tables_metadata(tables_data)
.create_logical_tables_metadata(tables_data)
.await
.is_ok());

Expand All @@ -1018,7 +1024,7 @@ mod tests {
let modified_tables_data = vec![(table_info.clone(), modified_table_route_value)];
// if remote metadata was exists, it should return an error.
assert!(table_metadata_manager
.create_logic_tables_metadata(modified_tables_data)
.create_logical_tables_metadata(modified_tables_data)
.await
.is_err());

Expand Down
13 changes: 9 additions & 4 deletions src/common/meta/src/kv_backend/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use etcd_client::{
};
use snafu::{ensure, OptionExt, ResultExt};

use super::{txn, KvBackendRef};
use super::KvBackendRef;
use crate::error::{self, Error, Result};
use crate::kv_backend::txn::{Txn as KvTxn, TxnResponse as KvTxnResponse};
use crate::kv_backend::{KvBackend, TxnService};
Expand All @@ -37,7 +37,7 @@ use crate::rpc::KeyValue;
// The etcd default configuration's `--max-txn-ops` is 128.
//
// For more detail, see: https://etcd.io/docs/v3.5/op-guide/configuration/
const MAX_TXN_SIZE: usize = txn::MAX_TXN_SIZE;
const MAX_TXN_SIZE: usize = 128;

fn convert_key_value(kv: etcd_client::KeyValue) -> KeyValue {
let (key, value) = kv.into_key_value();
Expand Down Expand Up @@ -66,7 +66,8 @@ impl EtcdStore {
}

async fn do_multi_txn(&self, txn_ops: Vec<TxnOp>) -> Result<Vec<TxnResponse>> {
if txn_ops.len() < MAX_TXN_SIZE {
let max_txn_size = self.max_txn_size();
if txn_ops.len() < max_txn_size {
// fast path
let _timer = METRIC_META_TXN_REQUEST
.with_label_values(&["etcd", "txn"])
Expand All @@ -82,7 +83,7 @@ impl EtcdStore {
}

let txns = txn_ops
.chunks(MAX_TXN_SIZE)
.chunks(max_txn_size)
.map(|part| async move {
let _timer = METRIC_META_TXN_REQUEST
.with_label_values(&["etcd", "txn"])
Expand Down Expand Up @@ -319,6 +320,10 @@ impl TxnService for EtcdStore {
.context(error::EtcdFailedSnafu)?;
txn_res.try_into()
}

fn max_txn_size(&self) -> usize {
MAX_TXN_SIZE
}
}

struct Get {
Expand Down
8 changes: 5 additions & 3 deletions src/common/meta/src/kv_backend/txn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,18 @@ use crate::rpc::store::{DeleteRangeResponse, PutResponse, RangeResponse};

mod etcd;

// Maximum number of operations permitted in a transaction.
pub const MAX_TXN_SIZE: usize = 128;

#[async_trait::async_trait]
pub trait TxnService: Sync + Send {
type Error: ErrorExt;

async fn txn(&self, _txn: Txn) -> Result<TxnResponse, Self::Error> {
unimplemented!("txn is not implemented")
}

/// Maximum number of operations permitted in a transaction.
fn max_txn_size(&self) -> usize {
usize::MAX
}
}

#[derive(Debug, Clone, PartialEq)]
Expand Down

0 comments on commit be317e1

Please sign in to comment.