Skip to content

Commit

Permalink
feat: batch get physical table routes (#3319)
Browse files Browse the repository at this point in the history
* feat: batch get physical table routes

* chore: by comment
  • Loading branch information
fengjiachun authored Feb 19, 2024
1 parent 8b73067 commit aa569f7
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 3 deletions.
6 changes: 5 additions & 1 deletion src/common/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,9 @@ pub enum Error {

#[snafu(display("The tasks of create tables cannot be empty"))]
EmptyCreateTableTasks { location: Location },

#[snafu(display("Metadata corruption: {}", err_msg))]
MetadataCorruption { err_msg: String, location: Location },
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -417,7 +420,8 @@ impl ErrorExt for Error {
| CreateKafkaWalTopic { .. }
| EmptyTopicPool { .. }
| UnexpectedLogicalRouteTable { .. }
| ProcedureOutput { .. } => StatusCode::Unexpected,
| ProcedureOutput { .. }
| MetadataCorruption { .. } => StatusCode::Unexpected,

SendMessage { .. }
| GetKvCache { .. }
Expand Down
64 changes: 62 additions & 2 deletions src/common/meta/src/key/table_route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::fmt::Display;

use serde::{Deserialize, Serialize};
Expand All @@ -22,7 +22,8 @@ use table::metadata::TableId;

use super::{DeserializedValueWithBytes, TableMetaValue};
use crate::error::{
Result, SerdeJsonSnafu, TableRouteNotFoundSnafu, UnexpectedLogicalRouteTableSnafu,
MetadataCorruptionSnafu, Result, SerdeJsonSnafu, TableRouteNotFoundSnafu,
UnexpectedLogicalRouteTableSnafu,
};
use crate::key::{to_removed_key, RegionDistribution, TableMetaKey, TABLE_ROUTE_PREFIX};
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse};
Expand Down Expand Up @@ -388,6 +389,65 @@ impl TableRouteManager {
}
}

pub async fn batch_get_physical_table_routes(
&self,
logical_or_physical_table_ids: &[TableId],
) -> Result<HashMap<TableId, PhysicalTableRouteValue>> {
let table_routes = self.batch_get(logical_or_physical_table_ids).await?;

let mut physical_table_routes = HashMap::with_capacity(table_routes.len());
let mut logical_table_ids = HashMap::with_capacity(table_routes.len());

for (table_id, table_route) in table_routes {
match table_route {
TableRouteValue::Physical(x) => {
physical_table_routes.insert(table_id, x);
}
TableRouteValue::Logical(x) => {
logical_table_ids.insert(table_id, x.physical_table_id());
}
}
}

if logical_table_ids.is_empty() {
return Ok(physical_table_routes);
}

let physical_table_ids = logical_table_ids
.values()
.cloned()
.collect::<HashSet<_>>()
.into_iter()
.collect::<Vec<_>>();
let table_routes = self.batch_get(&physical_table_ids).await?;

for (logical_table_id, physical_table_id) in logical_table_ids {
let table_route =
table_routes
.get(&physical_table_id)
.context(TableRouteNotFoundSnafu {
table_id: physical_table_id,
})?;
match table_route {
TableRouteValue::Physical(x) => {
physical_table_routes.insert(logical_table_id, x.clone());
}
TableRouteValue::Logical(x) => {
// Never get here, because we use a physical table id cannot obtain a logical table.
MetadataCorruptionSnafu {
err_msg: format!(
"logical table {} {:?} cannot be resolved to a physical table.",
logical_table_id, x
),
}
.fail()?;
}
}
}

Ok(physical_table_routes)
}

/// It may return a subset of the `table_ids`.
pub async fn batch_get(
&self,
Expand Down

0 comments on commit aa569f7

Please sign in to comment.