From 1c94d4c506c3dcf6b221333071faf1c0e40446a8 Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Sat, 30 Dec 2023 13:36:14 +0800 Subject: [PATCH 1/5] ci: fix duplicatd doc issue (#3056) --- .github/workflows/doc-label.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/doc-label.yml b/.github/workflows/doc-label.yml index d9060c30bf34..4e547c9b8062 100644 --- a/.github/workflows/doc-label.yml +++ b/.github/workflows/doc-label.yml @@ -20,7 +20,7 @@ jobs: sync-labels: 1 - name: create an issue in doc repo uses: dacbd/create-issue-action@main - if: ${{ contains(github.event.pull_request.body, '- [ ] This PR does not require documentation updates.') }} + if: ${{ github.event.action == 'opened' && contains(github.event.pull_request.body, '- [ ] This PR does not require documentation updates.') }} with: owner: GreptimeTeam repo: docs From 69a53130c223a821bf8ca1b9a028667c5a1c9bf9 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Sat, 30 Dec 2023 15:32:32 +0800 Subject: [PATCH 2/5] feat(inverted_index): Add applier builder to convert Expr to Predicates (Part 1) (#3034) * feat(inverted_index.integration): Add applier builder to convert Expr to Predicates (Part 1) Signed-off-by: Zhenchi * chore: add docs Signed-off-by: Zhenchi * fix: typos Signed-off-by: Zhenchi * fix: address comments Signed-off-by: Zhenchi * Update src/mito2/src/sst/index/applier/builder.rs Co-authored-by: Yingwen * fix: remove unwrap Signed-off-by: Zhenchi * chore: error source Signed-off-by: Zhenchi --------- Signed-off-by: Zhenchi Co-authored-by: Yingwen --- Cargo.lock | 13 +- Cargo.toml | 3 +- src/common/config/src/wal/kafka.rs | 2 +- src/index/src/inverted_index/error.rs | 2 +- .../search/fst_apply/intersection_apply.rs | 112 ++++---- src/mito2/Cargo.toml | 1 + src/mito2/src/error.rs | 20 ++ src/mito2/src/row_converter.rs | 6 +- src/mito2/src/sst.rs | 1 + src/mito2/src/sst/index.rs | 18 ++ src/mito2/src/sst/index/applier.rs | 47 ++++ src/mito2/src/sst/index/applier/builder.rs | 261 ++++++++++++++++++ .../src/sst/index/applier/builder/between.rs | 171 ++++++++++++ src/mito2/src/sst/index/codec.rs | 65 +++++ 14 files changed, 670 insertions(+), 52 deletions(-) create mode 100644 src/mito2/src/sst/index.rs create mode 100644 src/mito2/src/sst/index/applier.rs create mode 100644 src/mito2/src/sst/index/applier/builder.rs create mode 100644 src/mito2/src/sst/index/applier/builder/between.rs create mode 100644 src/mito2/src/sst/index/codec.rs diff --git a/Cargo.lock b/Cargo.lock index d179ea6c8c54..b042227a293b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4029,7 +4029,7 @@ dependencies = [ "prost 0.12.3", "rand", "regex", - "regex-automata 0.1.10", + "regex-automata 0.2.0", "snafu", "tokio", "tokio-util", @@ -4977,6 +4977,7 @@ dependencies = [ "datatypes", "futures", "humantime-serde", + "index", "lazy_static", "log-store", "memcomparable", @@ -7134,8 +7135,18 @@ name = "regex-automata" version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", +] + +[[package]] +name = "regex-automata" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9368763f5a9b804326f3af749e16f9abf378d227bcdee7634b13d8f17793782" dependencies = [ "fst", + "memchr", "regex-syntax 0.6.29", ] diff --git a/Cargo.toml b/Cargo.toml index 0e38d914eccb..a3413aa9d48d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -111,7 +111,7 @@ prost = "0.12" raft-engine = { git = "https://github.com/tikv/raft-engine.git", rev = "22dfb426cd994602b57725ef080287d3e53db479" } rand = "0.8" regex = "1.8" -regex-automata = { version = "0.1", features = ["transducer"] } +regex-automata = { version = "0.2", features = ["transducer"] } reqwest = { version = "0.11", default-features = false, features = [ "json", "rustls-tls-native-roots", @@ -169,6 +169,7 @@ datanode = { path = "src/datanode" } datatypes = { path = "src/datatypes" } file-engine = { path = "src/file-engine" } frontend = { path = "src/frontend" } +index = { path = "src/index" } log-store = { path = "src/log-store" } meta-client = { path = "src/meta-client" } meta-srv = { path = "src/meta-srv" } diff --git a/src/common/config/src/wal/kafka.rs b/src/common/config/src/wal/kafka.rs index e93aa6cb2271..858991264bb6 100644 --- a/src/common/config/src/wal/kafka.rs +++ b/src/common/config/src/wal/kafka.rs @@ -42,7 +42,7 @@ pub struct KafkaConfig { #[serde(skip)] #[serde(default)] pub compression: RsKafkaCompression, - /// The maximum log size a kakfa batch producer could buffer. + /// The maximum log size a kafka batch producer could buffer. pub max_batch_size: ReadableSize, /// The linger duration of a kafka batch producer. #[serde(with = "humantime_serde")] diff --git a/src/index/src/inverted_index/error.rs b/src/index/src/inverted_index/error.rs index b795e33003b7..6e5f39006eb9 100644 --- a/src/index/src/inverted_index/error.rs +++ b/src/index/src/inverted_index/error.rs @@ -113,7 +113,7 @@ pub enum Error { #[snafu(display("Failed to parse regex DFA"))] ParseDFA { #[snafu(source)] - error: regex_automata::Error, + error: Box, location: Location, }, diff --git a/src/index/src/inverted_index/search/fst_apply/intersection_apply.rs b/src/index/src/inverted_index/search/fst_apply/intersection_apply.rs index a0ae0d7b9afb..a608acd0bab5 100644 --- a/src/index/src/inverted_index/search/fst_apply/intersection_apply.rs +++ b/src/index/src/inverted_index/search/fst_apply/intersection_apply.rs @@ -14,7 +14,7 @@ use fst::map::OpBuilder; use fst::{IntoStreamer, Streamer}; -use regex_automata::DenseDFA; +use regex_automata::dfa::dense::DFA; use snafu::{ensure, ResultExt}; use crate::inverted_index::error::{ @@ -24,15 +24,13 @@ use crate::inverted_index::search::fst_apply::FstApplier; use crate::inverted_index::search::predicate::{Predicate, Range}; use crate::inverted_index::FstMap; -type Dfa = DenseDFA, usize>; - /// `IntersectionFstApplier` applies intersection operations on an FstMap using specified ranges and regex patterns. pub struct IntersectionFstApplier { /// A list of `Range` which define inclusive or exclusive ranges for keys to be queried in the FstMap. ranges: Vec, /// A list of `Dfa` compiled from regular expression patterns. - dfas: Vec, + dfas: Vec>>, } impl FstApplier for IntersectionFstApplier { @@ -88,8 +86,8 @@ impl IntersectionFstApplier { match predicate { Predicate::Range(range) => ranges.push(range.range), Predicate::RegexMatch(regex) => { - let dfa = DenseDFA::new(®ex.pattern); - let dfa = dfa.context(ParseDFASnafu)?; + let dfa = DFA::new(®ex.pattern); + let dfa = dfa.map_err(Box::new).context(ParseDFASnafu)?; dfas.push(dfa); } // Rejection of `InList` predicates is enforced here. @@ -210,47 +208,67 @@ mod tests { #[test] fn test_intersection_fst_applier_with_valid_pattern() { - let test_fst = FstMap::from_iter([("aa", 1), ("bb", 2), ("cc", 3)]).unwrap(); - - let applier = create_applier_from_pattern("a.?").unwrap(); - let results = applier.apply(&test_fst); - assert_eq!(results, vec![1]); - - let applier = create_applier_from_pattern("b.?").unwrap(); - let results = applier.apply(&test_fst); - assert_eq!(results, vec![2]); - - let applier = create_applier_from_pattern("c.?").unwrap(); - let results = applier.apply(&test_fst); - assert_eq!(results, vec![3]); - - let applier = create_applier_from_pattern("a.*").unwrap(); - let results = applier.apply(&test_fst); - assert_eq!(results, vec![1]); - - let applier = create_applier_from_pattern("b.*").unwrap(); - let results = applier.apply(&test_fst); - assert_eq!(results, vec![2]); - - let applier = create_applier_from_pattern("c.*").unwrap(); - let results = applier.apply(&test_fst); - assert_eq!(results, vec![3]); - - let applier = create_applier_from_pattern("d.?").unwrap(); - let results = applier.apply(&test_fst); - assert!(results.is_empty()); - - let applier = create_applier_from_pattern("a.?|b.?").unwrap(); - let results = applier.apply(&test_fst); - assert_eq!(results, vec![1, 2]); - - let applier = create_applier_from_pattern("d.?|a.?").unwrap(); - let results = applier.apply(&test_fst); - assert_eq!(results, vec![1]); - - let applier = create_applier_from_pattern(".*").unwrap(); - let results = applier.apply(&test_fst); - assert_eq!(results, vec![1, 2, 3]); + let test_fst = FstMap::from_iter([("123", 1), ("abc", 2)]).unwrap(); + + let cases = vec![ + ("1", vec![1]), + ("2", vec![1]), + ("3", vec![1]), + ("^1", vec![1]), + ("^2", vec![]), + ("^3", vec![]), + ("^1.*", vec![1]), + ("^.*2", vec![1]), + ("^.*3", vec![1]), + ("1$", vec![]), + ("2$", vec![]), + ("3$", vec![1]), + ("1.*$", vec![1]), + ("2.*$", vec![1]), + ("3.*$", vec![1]), + ("^1..$", vec![1]), + ("^.2.$", vec![1]), + ("^..3$", vec![1]), + ("^[0-9]", vec![1]), + ("^[0-9]+$", vec![1]), + ("^[0-9][0-9]$", vec![]), + ("^[0-9][0-9][0-9]$", vec![1]), + ("^123$", vec![1]), + ("a", vec![2]), + ("b", vec![2]), + ("c", vec![2]), + ("^a", vec![2]), + ("^b", vec![]), + ("^c", vec![]), + ("^a.*", vec![2]), + ("^.*b", vec![2]), + ("^.*c", vec![2]), + ("a$", vec![]), + ("b$", vec![]), + ("c$", vec![2]), + ("a.*$", vec![2]), + ("b.*$", vec![2]), + ("c.*$", vec![2]), + ("^.[a-z]", vec![2]), + ("^abc$", vec![2]), + ("^ab$", vec![]), + ("abc$", vec![2]), + ("^a.c$", vec![2]), + ("^..c$", vec![2]), + ("ab", vec![2]), + (".*", vec![1, 2]), + ("", vec![1, 2]), + ("^$", vec![]), + ("1|a", vec![1, 2]), + ("^123$|^abc$", vec![1, 2]), + ("^123$|d", vec![1]), + ]; + + for (pattern, expected) in cases { + let applier = create_applier_from_pattern(pattern).unwrap(); + let results = applier.apply(&test_fst); + assert_eq!(results, expected); + } } #[test] diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 8c3ef50ec2c7..a28e4f0426ea 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -39,6 +39,7 @@ datafusion.workspace = true datatypes.workspace = true futures.workspace = true humantime-serde.workspace = true +index.workspace = true lazy_static = "1.4" log-store = { workspace = true, optional = true } memcomparable = "0.2" diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 39457281d76b..68a35123ea39 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -423,6 +423,23 @@ pub enum Error { #[snafu(source)] error: parquet::errors::ParquetError, }, + + #[snafu(display("Column not found, column: {column}"))] + ColumnNotFound { column: String, location: Location }, + + #[snafu(display("Failed to build index applier"))] + BuildIndexApplier { + #[snafu(source)] + source: index::inverted_index::error::Error, + location: Location, + }, + + #[snafu(display("Failed to convert value"))] + ConvertValue { + #[snafu(source)] + source: datatypes::error::Error, + location: Location, + }, } pub type Result = std::result::Result; @@ -468,6 +485,7 @@ impl ErrorExt for Error { | InvalidRequest { .. } | FillDefault { .. } | ConvertColumnDataType { .. } + | ColumnNotFound { .. } | InvalidMetadata { .. } => StatusCode::InvalidArguments, RegionMetadataNotFound { .. } | Join { .. } @@ -504,6 +522,8 @@ impl ErrorExt for Error { JsonOptions { .. } => StatusCode::InvalidArguments, EmptyRegionDir { .. } | EmptyManifestDir { .. } => StatusCode::RegionNotFound, ArrowReader { .. } => StatusCode::StorageUnavailable, + BuildIndexApplier { source, .. } => source.status_code(), + ConvertValue { source, .. } => source.status_code(), } } diff --git a/src/mito2/src/row_converter.rs b/src/mito2/src/row_converter.rs index 4cc6fd3274ac..33ef05433521 100644 --- a/src/mito2/src/row_converter.rs +++ b/src/mito2/src/row_converter.rs @@ -84,7 +84,11 @@ impl SortField { } impl SortField { - fn serialize(&self, serializer: &mut Serializer<&mut Vec>, value: &ValueRef) -> Result<()> { + pub(crate) fn serialize( + &self, + serializer: &mut Serializer<&mut Vec>, + value: &ValueRef, + ) -> Result<()> { macro_rules! cast_value_and_serialize { ( $self: ident; diff --git a/src/mito2/src/sst.rs b/src/mito2/src/sst.rs index 32c7b4951a55..55939c2d246a 100644 --- a/src/mito2/src/sst.rs +++ b/src/mito2/src/sst.rs @@ -16,5 +16,6 @@ pub mod file; pub mod file_purger; +mod index; pub mod parquet; pub(crate) mod version; diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs new file mode 100644 index 000000000000..baffda27aa6e --- /dev/null +++ b/src/mito2/src/sst/index.rs @@ -0,0 +1,18 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![allow(dead_code)] + +pub mod applier; +mod codec; diff --git a/src/mito2/src/sst/index/applier.rs b/src/mito2/src/sst/index/applier.rs new file mode 100644 index 000000000000..95ca25ba003d --- /dev/null +++ b/src/mito2/src/sst/index/applier.rs @@ -0,0 +1,47 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod builder; + +use index::inverted_index::search::index_apply::IndexApplier; +use object_store::ObjectStore; + +/// The [`SstIndexApplier`] is responsible for applying predicates to the provided SST files +/// and returning the relevant row group ids for further scan. +pub struct SstIndexApplier { + /// The root directory of the region. + region_dir: String, + + /// Object store responsible for accessing SST files. + object_store: ObjectStore, + + /// Predefined index applier used to apply predicates to index files + /// and return the relevant row group ids for further scan. + index_applier: Box, +} + +impl SstIndexApplier { + /// Creates a new [`SstIndexApplier`]. + pub fn new( + region_dir: String, + object_store: ObjectStore, + index_applier: Box, + ) -> Self { + Self { + region_dir, + object_store, + index_applier, + } + } +} diff --git a/src/mito2/src/sst/index/applier/builder.rs b/src/mito2/src/sst/index/applier/builder.rs new file mode 100644 index 000000000000..52af22effb18 --- /dev/null +++ b/src/mito2/src/sst/index/applier/builder.rs @@ -0,0 +1,261 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod between; + +// TODO(zhongzc): This PR is too large. The following modules are coming soon. + +// mod comparison; +// mod eq_list; +// mod in_list; +// mod regex_match; + +use std::collections::HashMap; + +use api::v1::SemanticType; +use common_query::logical_plan::Expr; +use common_telemetry::warn; +use datafusion_common::ScalarValue; +use datafusion_expr::Expr as DfExpr; +use datatypes::data_type::ConcreteDataType; +use datatypes::value::Value; +use index::inverted_index::search::index_apply::PredicatesIndexApplier; +use index::inverted_index::search::predicate::Predicate; +use object_store::ObjectStore; +use snafu::{OptionExt, ResultExt}; +use store_api::metadata::RegionMetadata; + +use crate::error::{BuildIndexApplierSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, Result}; +use crate::row_converter::SortField; +use crate::sst::index::applier::SstIndexApplier; +use crate::sst::index::codec::IndexValueCodec; + +type ColumnName = String; + +/// Constructs an [`SstIndexApplier`] which applies predicates to SST files during scan. +pub struct SstIndexApplierBuilder<'a> { + /// Directory of the region, required argument for constructing [`SstIndexApplier`]. + region_dir: String, + + /// Object store, required argument for constructing [`SstIndexApplier`]. + object_store: ObjectStore, + + /// Metadata of the region, used to get metadata like column type. + metadata: &'a RegionMetadata, + + /// Stores predicates during traversal on the Expr tree. + output: HashMap>, +} + +impl<'a> SstIndexApplierBuilder<'a> { + /// Creates a new [`SstIndexApplierBuilder`]. + pub fn new( + region_dir: String, + object_store: ObjectStore, + metadata: &'a RegionMetadata, + ) -> Self { + Self { + region_dir, + object_store, + metadata, + output: HashMap::default(), + } + } + + /// Consumes the builder to construct an [`SstIndexApplier`], optionally returned based on + /// the expressions provided. If no predicates match, returns `None`. + pub fn build(mut self, exprs: &[Expr]) -> Result> { + for expr in exprs { + self.traverse_and_collect(expr.df_expr()); + } + + if self.output.is_empty() { + return Ok(None); + } + + let predicates = self.output.into_iter().collect(); + let applier = PredicatesIndexApplier::try_from(predicates); + Ok(Some(SstIndexApplier::new( + self.region_dir, + self.object_store, + Box::new(applier.context(BuildIndexApplierSnafu)?), + ))) + } + + /// Recursively traverses expressions to collect predicates. + /// Results are stored in `self.output`. + fn traverse_and_collect(&mut self, expr: &DfExpr) { + let res = match expr { + DfExpr::Between(between) => self.collect_between(between), + + // TODO(zhongzc): This PR is too large. The following arms are coming soon. + + // DfExpr::InList(in_list) => self.collect_inlist(in_list), + // DfExpr::BinaryExpr(BinaryExpr { left, op, right }) => match op { + // Operator::And => { + // self.traverse_and_collect(left); + // self.traverse_and_collect(right); + // Ok(()) + // } + // Operator::Or => self.collect_or_eq_list(left, right), + // Operator::Eq => self.collect_eq(left, right), + // Operator::Lt | Operator::LtEq | Operator::Gt | Operator::GtEq => { + // self.collect_comparison_expr(left, op, right) + // } + // Operator::RegexMatch => self.collect_regex_match(left, right), + // _ => Ok(()), + // }, + + // TODO(zhongzc): support more expressions, e.g. IsNull, IsNotNull, ... + _ => Ok(()), + }; + + if let Err(err) = res { + warn!(err; "Failed to collect predicates, ignore it. expr: {expr}"); + } + } + + /// Helper function to add a predicate to the output. + fn add_predicate(&mut self, column_name: &str, predicate: Predicate) { + match self.output.get_mut(column_name) { + Some(predicates) => predicates.push(predicate), + None => { + self.output.insert(column_name.to_string(), vec![predicate]); + } + } + } + + /// Helper function to get the column type of a tag column. + /// Returns `None` if the column is not a tag column. + fn tag_column_type(&self, column_name: &str) -> Result> { + let column = self + .metadata + .column_by_name(column_name) + .context(ColumnNotFoundSnafu { + column: column_name, + })?; + + Ok((column.semantic_type == SemanticType::Tag) + .then(|| column.column_schema.data_type.clone())) + } + + /// Helper function to get a non-null literal. + fn nonnull_lit(expr: &DfExpr) -> Option<&ScalarValue> { + match expr { + DfExpr::Literal(lit) if !lit.is_null() => Some(lit), + _ => None, + } + } + + /// Helper function to get the column name of a column expression. + fn column_name(expr: &DfExpr) -> Option<&str> { + match expr { + DfExpr::Column(column) => Some(&column.name), + _ => None, + } + } + + /// Helper function to encode a literal into bytes. + fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result> { + let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?; + let mut bytes = vec![]; + let field = SortField::new(data_type); + IndexValueCodec::encode_value(value.as_value_ref(), &field, &mut bytes)?; + Ok(bytes) + } +} + +#[cfg(test)] +mod tests { + use api::v1::SemanticType; + use datafusion_common::Column; + use datatypes::data_type::ConcreteDataType; + use datatypes::schema::ColumnSchema; + use object_store::services::Memory; + use object_store::ObjectStore; + use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder}; + use store_api::storage::RegionId; + + use super::*; + + pub(crate) fn test_region_metadata() -> RegionMetadata { + let mut builder = RegionMetadataBuilder::new(RegionId::new(1234, 5678)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false), + semantic_type: SemanticType::Tag, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("b", ConcreteDataType::string_datatype(), false), + semantic_type: SemanticType::Field, + column_id: 2, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "c", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 3, + }) + .primary_key(vec![1]); + builder.build().unwrap() + } + + pub(crate) fn test_object_store() -> ObjectStore { + ObjectStore::new(Memory::default()).unwrap().finish() + } + + pub(crate) fn tag_column() -> DfExpr { + DfExpr::Column(Column { + relation: None, + name: "a".to_string(), + }) + } + + pub(crate) fn field_column() -> DfExpr { + DfExpr::Column(Column { + relation: None, + name: "b".to_string(), + }) + } + + pub(crate) fn nonexistent_column() -> DfExpr { + DfExpr::Column(Column { + relation: None, + name: "nonexistent".to_string(), + }) + } + + pub(crate) fn string_lit(s: impl Into) -> DfExpr { + DfExpr::Literal(ScalarValue::Utf8(Some(s.into()))) + } + + pub(crate) fn int64_lit(i: impl Into) -> DfExpr { + DfExpr::Literal(ScalarValue::Int64(Some(i.into()))) + } + + pub(crate) fn encoded_string(s: impl Into) -> Vec { + let mut bytes = vec![]; + IndexValueCodec::encode_value( + Value::from(s.into()).as_value_ref(), + &SortField::new(ConcreteDataType::string_datatype()), + &mut bytes, + ) + .unwrap(); + bytes + } +} diff --git a/src/mito2/src/sst/index/applier/builder/between.rs b/src/mito2/src/sst/index/applier/builder/between.rs new file mode 100644 index 000000000000..50ae7073b2db --- /dev/null +++ b/src/mito2/src/sst/index/applier/builder/between.rs @@ -0,0 +1,171 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use datafusion_expr::Between; +use index::inverted_index::search::predicate::{Bound, Predicate, Range, RangePredicate}; + +use crate::error::Result; +use crate::sst::index::applier::builder::SstIndexApplierBuilder; + +impl<'a> SstIndexApplierBuilder<'a> { + /// Collects a `BETWEEN` expression in the form of `column BETWEEN lit AND lit`. + pub(crate) fn collect_between(&mut self, between: &Between) -> Result<()> { + if between.negated { + return Ok(()); + } + + let Some(column_name) = Self::column_name(&between.expr) else { + return Ok(()); + }; + let Some(data_type) = self.tag_column_type(column_name)? else { + return Ok(()); + }; + let Some(low) = Self::nonnull_lit(&between.low) else { + return Ok(()); + }; + let Some(high) = Self::nonnull_lit(&between.high) else { + return Ok(()); + }; + + let predicate = Predicate::Range(RangePredicate { + range: Range { + lower: Some(Bound { + inclusive: true, + value: Self::encode_lit(low, data_type.clone())?, + }), + upper: Some(Bound { + inclusive: true, + value: Self::encode_lit(high, data_type)?, + }), + }, + }); + + self.add_predicate(column_name, predicate); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::error::Error; + use crate::sst::index::applier::builder::tests::{ + encoded_string, field_column, int64_lit, nonexistent_column, string_lit, tag_column, + test_object_store, test_region_metadata, + }; + + #[test] + fn test_collect_between_basic() { + let metadata = test_region_metadata(); + let mut builder = + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + + let between = Between { + negated: false, + expr: Box::new(tag_column()), + low: Box::new(string_lit("abc")), + high: Box::new(string_lit("def")), + }; + + builder.collect_between(&between).unwrap(); + + let predicates = builder.output.get("a").unwrap(); + assert_eq!(predicates.len(), 1); + assert_eq!( + predicates[0], + Predicate::Range(RangePredicate { + range: Range { + lower: Some(Bound { + inclusive: true, + value: encoded_string("abc"), + }), + upper: Some(Bound { + inclusive: true, + value: encoded_string("def"), + }), + } + }) + ); + } + + #[test] + fn test_collect_between_negated() { + let metadata = test_region_metadata(); + let mut builder = + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + + let between = Between { + negated: true, + expr: Box::new(tag_column()), + low: Box::new(string_lit("abc")), + high: Box::new(string_lit("def")), + }; + + builder.collect_between(&between).unwrap(); + assert!(builder.output.is_empty()); + } + + #[test] + fn test_collect_between_field_column() { + let metadata = test_region_metadata(); + let mut builder = + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + + let between = Between { + negated: false, + expr: Box::new(field_column()), + low: Box::new(string_lit("abc")), + high: Box::new(string_lit("def")), + }; + + builder.collect_between(&between).unwrap(); + assert!(builder.output.is_empty()); + } + + #[test] + fn test_collect_between_type_mismatch() { + let metadata = test_region_metadata(); + let mut builder = + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + + let between = Between { + negated: false, + expr: Box::new(tag_column()), + low: Box::new(int64_lit(123)), + high: Box::new(int64_lit(456)), + }; + + let res = builder.collect_between(&between); + assert!(matches!(res, Err(Error::FieldTypeMismatch { .. }))); + assert!(builder.output.is_empty()); + } + + #[test] + fn test_collect_between_nonexistent_column() { + let metadata = test_region_metadata(); + let mut builder = + SstIndexApplierBuilder::new("test".to_string(), test_object_store(), &metadata); + + let between = Between { + negated: false, + expr: Box::new(nonexistent_column()), + low: Box::new(string_lit("abc")), + high: Box::new(string_lit("def")), + }; + + let res = builder.collect_between(&between); + assert!(matches!(res, Err(Error::ColumnNotFound { .. }))); + assert!(builder.output.is_empty()); + } +} diff --git a/src/mito2/src/sst/index/codec.rs b/src/mito2/src/sst/index/codec.rs new file mode 100644 index 000000000000..ada5ac07cbfc --- /dev/null +++ b/src/mito2/src/sst/index/codec.rs @@ -0,0 +1,65 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use datatypes::value::ValueRef; +use memcomparable::Serializer; + +use crate::error::Result; +use crate::row_converter::SortField; + +/// Encodes index values according to their data types for sorting and storage use. +pub struct IndexValueCodec; + +impl IndexValueCodec { + /// Serializes a `ValueRef` using the data type defined in `SortField` and writes + /// the result into a buffer. + /// + /// # Arguments + /// * `value` - The value to be encoded. + /// * `field` - Contains data type to guide serialization. + /// * `buffer` - Destination buffer for the serialized value. + pub fn encode_value(value: ValueRef, field: &SortField, buffer: &mut Vec) -> Result<()> { + buffer.reserve(field.estimated_size()); + let mut serializer = Serializer::new(buffer); + field.serialize(&mut serializer, &value) + } +} + +#[cfg(test)] +mod tests { + use datatypes::data_type::ConcreteDataType; + + use super::*; + use crate::error::Error; + + #[test] + fn test_encode_value_basic() { + let value = ValueRef::from("hello"); + let field = SortField::new(ConcreteDataType::string_datatype()); + + let mut buffer = Vec::new(); + IndexValueCodec::encode_value(value, &field, &mut buffer).unwrap(); + assert!(!buffer.is_empty()); + } + + #[test] + fn test_encode_value_type_mismatch() { + let value = ValueRef::from("hello"); + let field = SortField::new(ConcreteDataType::int64_datatype()); + + let mut buffer = Vec::new(); + let res = IndexValueCodec::encode_value(value, &field, &mut buffer); + assert!(matches!(res, Err(Error::FieldTypeMismatch { .. }))); + } +} From 4460af800f344676f18749ed0a5205ba2e84844b Mon Sep 17 00:00:00 2001 From: AntiTopQuark Date: Sat, 30 Dec 2023 21:02:26 +0800 Subject: [PATCH 3/5] feat(TableRouteValue): add panic notes and type checks (#3031) * refactor(TableRouteValue): add panic notes and type checks * chore: add deprecate develop branch warning Signed-off-by: Ruihang Xia * add error defines and checks * Update README.md * update code format and fix tests * update name of error * delete unused note * fix unsafe .expect() for region_route() * update error name * update unwrap * update code format --------- Signed-off-by: Ruihang Xia Co-authored-by: Ruihang Xia --- src/common/meta/src/ddl/alter_table.rs | 2 +- src/common/meta/src/ddl/create_table.rs | 2 +- src/common/meta/src/ddl/drop_table.rs | 6 +-- src/common/meta/src/ddl_manager.rs | 2 +- src/common/meta/src/error.rs | 6 ++- src/common/meta/src/key.rs | 42 +++++++++------ src/common/meta/src/key/table_route.rs | 54 +++++++++++++------ src/meta-srv/src/error.rs | 10 +++- .../region_failover/deactivate_region.rs | 1 + .../region_failover/update_metadata.rs | 14 +++-- .../src/procedure/region_migration.rs | 2 +- .../src/procedure/region_migration/manager.rs | 3 ++ .../region_migration/migration_start.rs | 5 +- .../procedure/region_migration/test_util.rs | 2 +- .../downgrade_leader_region.rs | 6 +-- .../rollback_downgraded_region.rs | 10 +++- .../upgrade_candidate_region.rs | 16 ++++-- src/meta-srv/src/region/lease_keeper.rs | 2 +- src/meta-srv/src/selector/load_based.rs | 20 ++++--- src/partition/src/error.rs | 8 +++ src/partition/src/manager.rs | 16 ++++-- tests-integration/src/grpc.rs | 14 +++-- tests-integration/src/instance.rs | 14 +++-- 23 files changed, 180 insertions(+), 77 deletions(-) diff --git a/src/common/meta/src/ddl/alter_table.rs b/src/common/meta/src/ddl/alter_table.rs index 092d4dd24263..c3b1f7c31121 100644 --- a/src/common/meta/src/ddl/alter_table.rs +++ b/src/common/meta/src/ddl/alter_table.rs @@ -191,7 +191,7 @@ impl AlterTableProcedure { .await? .context(TableRouteNotFoundSnafu { table_id })? .into_inner(); - let region_routes = table_route.region_routes(); + let region_routes = table_route.region_routes()?; let leaders = find_leaders(region_routes); let mut alter_region_tasks = Vec::with_capacity(leaders.len()); diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index c73844fc8337..c6e09006b470 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -217,7 +217,7 @@ impl CreateTableProcedure { .context(TableRouteNotFoundSnafu { table_id: physical_table_id, })?; - let region_routes = physical_table_route.region_routes(); + let region_routes = physical_table_route.region_routes()?; let request_builder = self.new_region_request_builder(Some(physical_table_id))?; diff --git a/src/common/meta/src/ddl/drop_table.rs b/src/common/meta/src/ddl/drop_table.rs index 94c6cdf0a06a..7fac47e62cb1 100644 --- a/src/common/meta/src/ddl/drop_table.rs +++ b/src/common/meta/src/ddl/drop_table.rs @@ -116,7 +116,7 @@ impl DropTableProcedure { /// Register dropping regions if doesn't exist. fn register_dropping_regions(&mut self) -> Result<()> { - let region_routes = self.data.region_routes(); + let region_routes = self.data.region_routes()?; let dropping_regions = operating_leader_regions(region_routes); @@ -190,7 +190,7 @@ impl DropTableProcedure { pub async fn on_datanode_drop_regions(&self) -> Result { let table_id = self.data.table_id(); - let region_routes = &self.data.region_routes(); + let region_routes = &self.data.region_routes()?; let leaders = find_leaders(region_routes); let mut drop_region_tasks = Vec::with_capacity(leaders.len()); @@ -306,7 +306,7 @@ impl DropTableData { self.task.table_ref() } - fn region_routes(&self) -> &Vec { + fn region_routes(&self) -> Result<&Vec> { self.table_route_value.region_routes() } diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index af669797f4d4..7876d2a8a793 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -278,7 +278,7 @@ async fn handle_truncate_table_task( let table_route_value = table_route_value.context(error::TableRouteNotFoundSnafu { table_id })?; - let table_route = table_route_value.into_inner().region_routes().clone(); + let table_route = table_route_value.into_inner().region_routes()?.clone(); let id = ddl_manager .submit_truncate_table_task( diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 323d922b9cda..2a0db2abbb08 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -351,6 +351,9 @@ pub enum Error { #[snafu(display("The topic pool is empty"))] EmptyTopicPool { location: Location }, + + #[snafu(display("Unexpected table route type: {}", err_msg))] + UnexpectedLogicalRouteTable { location: Location, err_msg: String }, } pub type Result = std::result::Result; @@ -392,7 +395,8 @@ impl ErrorExt for Error { | BuildKafkaPartitionClient { .. } | ProduceRecord { .. } | CreateKafkaWalTopic { .. } - | EmptyTopicPool { .. } => StatusCode::Unexpected, + | EmptyTopicPool { .. } + | UnexpectedLogicalRouteTable { .. } => StatusCode::Unexpected, SendMessage { .. } | GetKvCache { .. } diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index bb2b87a973f5..57de421be202 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -483,7 +483,7 @@ impl TableMetadataManager { .build_delete_txn(table_id, table_info_value)?; // Deletes datanode table key value pairs. - let distribution = region_distribution(table_route_value.region_routes())?; + let distribution = region_distribution(table_route_value.region_routes()?)?; let delete_datanode_txn = self .datanode_table_manager() .build_delete_txn(table_id, distribution)?; @@ -608,7 +608,7 @@ impl TableMetadataManager { ) -> Result<()> { // Updates the datanode table key value pairs. let current_region_distribution = - region_distribution(current_table_route_value.region_routes())?; + region_distribution(current_table_route_value.region_routes()?)?; let new_region_distribution = region_distribution(&new_region_routes)?; let update_datanode_table_txn = self.datanode_table_manager().build_update_txn( @@ -621,7 +621,7 @@ impl TableMetadataManager { )?; // Updates the table_route. - let new_table_route_value = current_table_route_value.update(new_region_routes); + let new_table_route_value = current_table_route_value.update(new_region_routes)?; let (update_table_route_txn, on_update_table_route_failure) = self .table_route_manager() @@ -656,7 +656,7 @@ impl TableMetadataManager { where F: Fn(&RegionRoute) -> Option>, { - let mut new_region_routes = current_table_route_value.region_routes().clone(); + let mut new_region_routes = current_table_route_value.region_routes()?.clone(); let mut updated = 0; for route in &mut new_region_routes { @@ -673,7 +673,7 @@ impl TableMetadataManager { } // Updates the table_route. - let new_table_route_value = current_table_route_value.update(new_region_routes); + let new_table_route_value = current_table_route_value.update(new_region_routes)?; let (update_table_route_txn, on_update_table_route_failure) = self .table_route_manager() @@ -897,7 +897,11 @@ mod tests { table_info ); assert_eq!( - remote_table_route.unwrap().into_inner().region_routes(), + remote_table_route + .unwrap() + .into_inner() + .region_routes() + .unwrap(), region_routes ); } @@ -978,7 +982,7 @@ mod tests { .unwrap() .unwrap() .into_inner(); - assert_eq!(removed_table_route.region_routes(), region_routes); + assert_eq!(removed_table_route.region_routes().unwrap(), region_routes); } #[tokio::test] @@ -1173,11 +1177,11 @@ mod tests { .unwrap(); assert_eq!( - updated_route_value.region_routes()[0].leader_status, + updated_route_value.region_routes().unwrap()[0].leader_status, Some(RegionStatus::Downgraded) ); assert_eq!( - updated_route_value.region_routes()[1].leader_status, + updated_route_value.region_routes().unwrap()[1].leader_status, Some(RegionStatus::Downgraded) ); } @@ -1271,7 +1275,8 @@ mod tests { let current_table_route_value = DeserializedValueWithBytes::from_inner( current_table_route_value .inner - .update(new_region_routes.clone()), + .update(new_region_routes.clone()) + .unwrap(), ); let new_region_routes = vec![new_region_route(2, 4), new_region_route(5, 5)]; // it should be ok. @@ -1295,13 +1300,16 @@ mod tests { // if the current_table_route_value is wrong, it should return an error. // The ABA problem. - let wrong_table_route_value = - DeserializedValueWithBytes::from_inner(current_table_route_value.update(vec![ - new_region_route(1, 1), - new_region_route(2, 2), - new_region_route(3, 3), - new_region_route(4, 4), - ])); + let wrong_table_route_value = DeserializedValueWithBytes::from_inner( + current_table_route_value + .update(vec![ + new_region_route(1, 1), + new_region_route(2, 2), + new_region_route(3, 3), + new_region_route(4, 4), + ]) + .unwrap(), + ); assert!(table_metadata_manager .update_table_route( table_id, diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index d767d098a79f..4d2ac35001f3 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -16,12 +16,12 @@ use std::collections::HashMap; use std::fmt::Display; use serde::{Deserialize, Serialize}; -use snafu::ResultExt; +use snafu::{ensure, ResultExt}; use store_api::storage::{RegionId, RegionNumber}; use table::metadata::TableId; use super::{DeserializedValueWithBytes, TableMetaValue}; -use crate::error::{Result, SerdeJsonSnafu}; +use crate::error::{Result, SerdeJsonSnafu, UnexpectedLogicalRouteTableSnafu}; use crate::key::{to_removed_key, RegionDistribution, TableMetaKey, TABLE_ROUTE_PREFIX}; use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse}; use crate::kv_backend::KvBackendRef; @@ -62,29 +62,48 @@ impl TableRouteValue { } /// Returns a new version [TableRouteValue] with `region_routes`. - pub fn update(&self, region_routes: Vec) -> Self { + pub fn update(&self, region_routes: Vec) -> Result { + ensure!( + self.is_physical(), + UnexpectedLogicalRouteTableSnafu { + err_msg: "{self:?} is a non-physical TableRouteValue.", + } + ); let version = self.physical_table_route().version; - Self::Physical(PhysicalTableRouteValue { + Ok(Self::Physical(PhysicalTableRouteValue { region_routes, version: version + 1, - }) + })) } /// Returns the version. /// /// For test purpose. #[cfg(any(test, feature = "testing"))] - pub fn version(&self) -> u64 { - self.physical_table_route().version + pub fn version(&self) -> Result { + ensure!( + self.is_physical(), + UnexpectedLogicalRouteTableSnafu { + err_msg: "{self:?} is a non-physical TableRouteValue.", + } + ); + Ok(self.physical_table_route().version) } /// Returns the corresponding [RegionRoute]. - pub fn region_route(&self, region_id: RegionId) -> Option { - self.physical_table_route() + pub fn region_route(&self, region_id: RegionId) -> Result> { + ensure!( + self.is_physical(), + UnexpectedLogicalRouteTableSnafu { + err_msg: "{self:?} is a non-physical TableRouteValue.", + } + ); + Ok(self + .physical_table_route() .region_routes .iter() .find(|route| route.region.id == region_id) - .cloned() + .cloned()) } /// Returns true if it's [TableRouteValue::Physical]. @@ -93,11 +112,14 @@ impl TableRouteValue { } /// Gets the [RegionRoute]s of this [TableRouteValue::Physical]. - /// - /// # Panics - /// The route type is not the [TableRouteValue::Physical]. - pub fn region_routes(&self) -> &Vec { - &self.physical_table_route().region_routes + pub fn region_routes(&self) -> Result<&Vec> { + ensure!( + self.is_physical(), + UnexpectedLogicalRouteTableSnafu { + err_msg: "{self:?} is a non-physical TableRouteValue.", + } + ); + Ok(&self.physical_table_route().region_routes) } fn physical_table_route(&self) -> &PhysicalTableRouteValue { @@ -354,7 +376,7 @@ impl TableRouteManager { ) -> Result> { self.get(table_id) .await? - .map(|table_route| region_distribution(table_route.region_routes())) + .map(|table_route| region_distribution(table_route.region_routes()?)) .transpose() } } diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 530fba83aa2e..5272c3abe77a 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -602,6 +602,13 @@ pub enum Error { #[snafu(display("Weight array is not set"))] NotSetWeightArray { location: Location }, + + #[snafu(display("Unexpected table route type: {}", err_msg))] + UnexpectedLogicalRouteTable { + location: Location, + err_msg: String, + source: common_meta::error::Error, + }, } impl Error { @@ -717,7 +724,8 @@ impl ErrorExt for Error { | Error::TableMetadataManager { source, .. } | Error::KvBackend { source, .. } | Error::UpdateTableRoute { source, .. } - | Error::GetFullTableInfo { source, .. } => source.status_code(), + | Error::GetFullTableInfo { source, .. } + | Error::UnexpectedLogicalRouteTable { source, .. } => source.status_code(), Error::InitMetadata { source, .. } | Error::InitDdlManager { source, .. } => { source.status_code() diff --git a/src/meta-srv/src/procedure/region_failover/deactivate_region.rs b/src/meta-srv/src/procedure/region_failover/deactivate_region.rs index c2d06590aec2..650c794126a6 100644 --- a/src/meta-srv/src/procedure/region_failover/deactivate_region.rs +++ b/src/meta-srv/src/procedure/region_failover/deactivate_region.rs @@ -208,6 +208,7 @@ mod tests { let should_downgraded = table_route_value .region_routes() + .unwrap() .iter() .find(|route| route.region.id.region_number() == failed_region.region_number) .unwrap(); diff --git a/src/meta-srv/src/procedure/region_failover/update_metadata.rs b/src/meta-srv/src/procedure/region_failover/update_metadata.rs index 23ade1a2a1fe..c2218c6afede 100644 --- a/src/meta-srv/src/procedure/region_failover/update_metadata.rs +++ b/src/meta-srv/src/procedure/region_failover/update_metadata.rs @@ -85,7 +85,12 @@ impl UpdateRegionMetadata { .context(error::TableMetadataManagerSnafu)? .context(TableRouteNotFoundSnafu { table_id })?; - let mut new_region_routes = table_route_value.region_routes().clone(); + let mut new_region_routes = table_route_value + .region_routes() + .context(error::UnexpectedLogicalRouteTableSnafu { + err_msg: "{self:?} is a non-physical TableRouteValue.", + })? + .clone(); for region_route in new_region_routes.iter_mut() { if region_route.region.id.region_number() == failed_region.region_number { @@ -234,6 +239,7 @@ mod tests { .unwrap() .into_inner() .region_routes() + .unwrap() .clone() } @@ -396,8 +402,8 @@ mod tests { .unwrap() .into_inner(); - let peers = &extract_all_peers(table_route_value.region_routes()); - let actual = table_route_value.region_routes(); + let peers = &extract_all_peers(table_route_value.region_routes().unwrap()); + let actual = table_route_value.region_routes().unwrap(); let expected = &vec![ new_region_route(1, peers, 2), new_region_route(2, peers, 3), @@ -416,7 +422,7 @@ mod tests { .unwrap() .into_inner(); - let map = region_distribution(table_route_value.region_routes()).unwrap(); + let map = region_distribution(table_route_value.region_routes().unwrap()).unwrap(); assert_eq!(map.len(), 2); assert_eq!(map.get(&2), Some(&vec![1, 3])); assert_eq!(map.get(&3), Some(&vec![2, 4])); diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index a1e92277d60b..b187a026723a 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -753,7 +753,7 @@ mod tests { .unwrap() .version(); // Should be unchanged. - assert_eq!(table_routes_version, 0); + assert_eq!(table_routes_version.unwrap(), 0); } #[tokio::test] diff --git a/src/meta-srv/src/procedure/region_migration/manager.rs b/src/meta-srv/src/procedure/region_migration/manager.rs index 03794ed85d11..dd034ba3e7d2 100644 --- a/src/meta-srv/src/procedure/region_migration/manager.rs +++ b/src/meta-srv/src/procedure/region_migration/manager.rs @@ -244,6 +244,9 @@ impl RegionMigrationManager { // Safety: checked before. let region_route = table_route .region_route(region_id) + .context(error::UnexpectedLogicalRouteTableSnafu { + err_msg: "{self:?} is a non-physical TableRouteValue.", + })? .context(error::RegionRouteNotFoundSnafu { region_id })?; if self.has_migrated(®ion_route, &task)? { diff --git a/src/meta-srv/src/procedure/region_migration/migration_start.rs b/src/meta-srv/src/procedure/region_migration/migration_start.rs index fa84a1a6dd5e..68b291cb87c1 100644 --- a/src/meta-srv/src/procedure/region_migration/migration_start.rs +++ b/src/meta-srv/src/procedure/region_migration/migration_start.rs @@ -18,7 +18,7 @@ use common_meta::peer::Peer; use common_meta::rpc::router::RegionRoute; use common_procedure::Status; use serde::{Deserialize, Serialize}; -use snafu::OptionExt; +use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; use super::migration_end::RegionMigrationEnd; @@ -85,6 +85,9 @@ impl RegionMigrationStart { let region_route = table_route .region_routes() + .context(error::UnexpectedLogicalRouteTableSnafu { + err_msg: "{self:?} is a non-physical TableRouteValue.", + })? .iter() .find(|route| route.region.id == region_id) .cloned() diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs index 4431791ff70f..4e9bb3939525 100644 --- a/src/meta-srv/src/procedure/region_migration/test_util.rs +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -419,7 +419,7 @@ impl ProcedureMigrationTestSuite { .unwrap() .unwrap() .into_inner(); - let region_routes = table_route.region_routes(); + let region_routes = table_route.region_routes().unwrap(); let expected_leader_id = self.context.persistent_ctx.to_peer.id; let removed_follower_id = self.context.persistent_ctx.from_peer.id; diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs index cc67aa7ca8e9..818aadd9cda6 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs @@ -208,8 +208,8 @@ mod tests { .unwrap(); // It should remain unchanged. - assert_eq!(latest_table_route.version(), 0); - assert!(!latest_table_route.region_routes()[0].is_leader_downgraded()); + assert_eq!(latest_table_route.version().unwrap(), 0); + assert!(!latest_table_route.region_routes().unwrap()[0].is_leader_downgraded()); assert!(ctx.volatile_ctx.table_route.is_none()); } @@ -249,7 +249,7 @@ mod tests { .unwrap() .unwrap(); - assert!(latest_table_route.region_routes()[0].is_leader_downgraded()); + assert!(latest_table_route.region_routes().unwrap()[0].is_leader_downgraded()); assert!(ctx.volatile_ctx.table_route.is_none()); } } diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs index 7281737752a4..844188f2f1f9 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs @@ -170,7 +170,10 @@ mod tests { .unwrap() .unwrap() .into_inner(); - assert_eq!(&expected_region_routes, table_route.region_routes()); + assert_eq!( + &expected_region_routes, + table_route.region_routes().unwrap() + ); } #[tokio::test] @@ -231,6 +234,9 @@ mod tests { .unwrap() .unwrap() .into_inner(); - assert_eq!(&expected_region_routes, table_route.region_routes()); + assert_eq!( + &expected_region_routes, + table_route.region_routes().unwrap() + ); } } diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs index 597d9afe9a7b..745b8487a8f3 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs @@ -33,7 +33,12 @@ impl UpdateMetadata { let region_id = ctx.region_id(); let table_route_value = ctx.get_table_route_value().await?.clone(); - let mut region_routes = table_route_value.region_routes().clone(); + let mut region_routes = table_route_value + .region_routes() + .context(error::UnexpectedLogicalRouteTableSnafu { + err_msg: "{self:?} is a non-physical TableRouteValue.", + })? + .clone(); let region_route = region_routes .iter_mut() .find(|route| route.region.id == region_id) @@ -81,7 +86,12 @@ impl UpdateMetadata { let region_id = ctx.region_id(); let table_route_value = ctx.get_table_route_value().await?.clone(); - let region_routes = table_route_value.region_routes().clone(); + let region_routes = table_route_value + .region_routes() + .context(error::UnexpectedLogicalRouteTableSnafu { + err_msg: "{self:?} is a non-physical TableRouteValue.", + })? + .clone(); let region_route = region_routes .into_iter() .find(|route| route.region.id == region_id) @@ -465,7 +475,7 @@ mod tests { .unwrap() .unwrap() .into_inner(); - let region_routes = table_route.region_routes(); + let region_routes = table_route.region_routes().unwrap(); assert!(ctx.volatile_ctx.table_route.is_none()); assert!(ctx.volatile_ctx.opening_region_guard.is_none()); diff --git a/src/meta-srv/src/region/lease_keeper.rs b/src/meta-srv/src/region/lease_keeper.rs index cbd2451896b1..9b066065b427 100644 --- a/src/meta-srv/src/region/lease_keeper.rs +++ b/src/meta-srv/src/region/lease_keeper.rs @@ -127,7 +127,7 @@ impl RegionLeaseKeeper { } if let Some(table_route) = table_metadata.get(®ion_id.table_id()) { - if let Some(region_route) = table_route.region_route(region_id) { + if let Ok(Some(region_route)) = table_route.region_route(region_id) { return renew_region_lease_via_region_route(®ion_route, datanode_id, region_id); } } diff --git a/src/meta-srv/src/selector/load_based.rs b/src/meta-srv/src/selector/load_based.rs index e8b3dcdf9e97..9573757a3ffc 100644 --- a/src/meta-srv/src/selector/load_based.rs +++ b/src/meta-srv/src/selector/load_based.rs @@ -142,13 +142,19 @@ async fn get_leader_peer_ids( .await .context(error::TableMetadataManagerSnafu) .map(|route| { - route.map_or_else(Vec::new, |route| { - find_leaders(route.region_routes()) - .into_iter() - .map(|peer| peer.id) - .collect() - }) - }) + route.map_or_else( + || Ok(Vec::new()), + |route| { + let region_routes = route + .region_routes() + .context(error::UnexpectedLogicalRouteTableSnafu { err_msg: "" })?; + Ok(find_leaders(region_routes) + .into_iter() + .map(|peer| peer.id) + .collect()) + }, + ) + })? } #[cfg(test)] diff --git a/src/partition/src/error.rs b/src/partition/src/error.rs index 7765a77c9796..6bfe76fa5b8f 100644 --- a/src/partition/src/error.rs +++ b/src/partition/src/error.rs @@ -119,6 +119,13 @@ pub enum Error { region_id: RegionId, location: Location, }, + + #[snafu(display("Unexpected table route type: {}", err_msg))] + UnexpectedLogicalRouteTable { + location: Location, + err_msg: String, + source: common_meta::error::Error, + }, } impl ErrorExt for Error { @@ -138,6 +145,7 @@ impl ErrorExt for Error { Error::FindDatanode { .. } => StatusCode::InvalidArguments, Error::TableRouteManager { source, .. } => source.status_code(), Error::MissingDefaultValue { .. } => StatusCode::Internal, + Error::UnexpectedLogicalRouteTable { source, .. } => source.status_code(), } } diff --git a/src/partition/src/manager.rs b/src/partition/src/manager.rs index ad15c62cc1dd..2963ac8e2b45 100644 --- a/src/partition/src/manager.rs +++ b/src/partition/src/manager.rs @@ -75,8 +75,13 @@ impl PartitionRuleManager { .context(error::TableRouteManagerSnafu)? .context(error::FindTableRoutesSnafu { table_id })? .into_inner(); - - Ok(RegionRoutes(route.region_routes().clone())) + let region_routes = + route + .region_routes() + .context(error::UnexpectedLogicalRouteTableSnafu { + err_msg: "{self:?} is a non-physical TableRouteValue.", + })?; + Ok(RegionRoutes(region_routes.clone())) } pub async fn find_table_partitions(&self, table_id: TableId) -> Result> { @@ -87,7 +92,12 @@ impl PartitionRuleManager { .context(error::TableRouteManagerSnafu)? .context(error::FindTableRoutesSnafu { table_id })? .into_inner(); - let region_routes = route.region_routes(); + let region_routes = + route + .region_routes() + .context(error::UnexpectedLogicalRouteTableSnafu { + err_msg: "{self:?} is a non-physical TableRouteValue.", + })?; ensure!( !region_routes.is_empty(), diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index e9731cc336fa..24cd470c3905 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -521,11 +521,15 @@ CREATE TABLE {table_name} ( .unwrap() .into_inner(); - let region_to_dn_map = region_distribution(table_route_value.region_routes()) - .unwrap() - .iter() - .map(|(k, v)| (v[0], *k)) - .collect::>(); + let region_to_dn_map = region_distribution( + table_route_value + .region_routes() + .expect("physical table route"), + ) + .unwrap() + .iter() + .map(|(k, v)| (v[0], *k)) + .collect::>(); assert!(region_to_dn_map.len() <= instance.datanodes().len()); let stmt = QueryLanguageParser::parse_sql(&format!( diff --git a/tests-integration/src/instance.rs b/tests-integration/src/instance.rs index 05253dc0a236..5b7ed080d9d9 100644 --- a/tests-integration/src/instance.rs +++ b/tests-integration/src/instance.rs @@ -216,11 +216,15 @@ mod tests { .unwrap() .into_inner(); - let region_to_dn_map = region_distribution(table_route_value.region_routes()) - .unwrap() - .iter() - .map(|(k, v)| (v[0], *k)) - .collect::>(); + let region_to_dn_map = region_distribution( + table_route_value + .region_routes() + .expect("region routes should be physical"), + ) + .unwrap() + .iter() + .map(|(k, v)| (v[0], *k)) + .collect::>(); assert!(region_to_dn_map.len() <= instance.datanodes().len()); let stmt = QueryLanguageParser::parse_sql("SELECT ts, host FROM demo ORDER BY ts").unwrap(); From 9db168875c4f67be5df6103d0c79f5fd159b3f0e Mon Sep 17 00:00:00 2001 From: niebayes Date: Sat, 30 Dec 2023 23:28:10 +0800 Subject: [PATCH 4/5] fix(remote_wal): some known issues (#3052) * fix: some known issues * fix: CR * fix: CR * chore: replace Mutex with RwLock --- src/log-store/src/kafka/client_manager.rs | 28 ++++++++++------ src/log-store/src/kafka/log_store.rs | 39 +++++++++++------------ 2 files changed, 38 insertions(+), 29 deletions(-) diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index e272840201bb..cd2f705c4db9 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -12,17 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::sync::Arc; use common_config::wal::{KafkaConfig, KafkaWalTopic as Topic}; -use dashmap::mapref::entry::Entry as DashMapEntry; -use dashmap::DashMap; use rskafka::client::partition::{PartitionClient, UnknownTopicHandling}; use rskafka::client::producer::aggregator::RecordAggregator; use rskafka::client::producer::{BatchProducer, BatchProducerBuilder}; use rskafka::client::{Client as RsKafkaClient, ClientBuilder}; use rskafka::BackoffConfig; use snafu::ResultExt; +use tokio::sync::RwLock; use crate::error::{BuildClientSnafu, BuildPartitionClientSnafu, Result}; @@ -67,7 +67,7 @@ pub(crate) struct ClientManager { client_factory: RsKafkaClient, /// A pool maintaining a collection of clients. /// Key: a topic. Value: the associated client of the topic. - client_pool: DashMap, + client_pool: RwLock>, } impl ClientManager { @@ -91,18 +91,28 @@ impl ClientManager { Ok(Self { config: config.clone(), client_factory: client, - client_pool: DashMap::new(), + client_pool: RwLock::new(HashMap::new()), }) } /// Gets the client associated with the topic. If the client does not exist, a new one will /// be created and returned. pub(crate) async fn get_or_insert(&self, topic: &Topic) -> Result { - match self.client_pool.entry(topic.to_string()) { - DashMapEntry::Occupied(entry) => Ok(entry.get().clone()), - DashMapEntry::Vacant(entry) => { - let topic_client = self.try_create_client(topic).await?; - Ok(entry.insert(topic_client).clone()) + let client_pool = self.client_pool.read().await; + if let Some(client) = client_pool.get(topic) { + return Ok(client.clone()); + } + // Manullay releases the read lock. + drop(client_pool); + + // Acquires the write lock. + let mut client_pool = self.client_pool.write().await; + match client_pool.get(topic) { + Some(client) => Ok(client.clone()), + None => { + let client = self.try_create_client(topic).await?; + client_pool.insert(topic.clone(), client.clone()); + Ok(client) } } } diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 36c86987041b..20bcd4e7cf50 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -100,25 +100,24 @@ impl LogStore for KafkaLogStore { .push(entry); } - // Builds a record from entries belong to a region and produces them to kafka server. - let region_ids = producers.keys().cloned().collect::>(); - - let tasks = producers - .into_values() - .map(|producer| producer.produce(&self.client_manager)) - .collect::>(); - // Each produce operation returns a kafka offset of the produced record. - // The offsets are then converted to entry ids. - let entry_ids = futures::future::try_join_all(tasks) - .await? - .into_iter() - .map(TryInto::try_into) - .collect::>>()?; - debug!("The entries are appended at offsets {:?}", entry_ids); - - Ok(AppendBatchResponse { - last_entry_ids: region_ids.into_iter().zip(entry_ids).collect(), - }) + // Produces entries for each region and gets the offset those entries written to. + // The returned offset is then converted into an entry id. + let last_entry_ids = futures::future::try_join_all(producers.into_iter().map( + |(region_id, producer)| async move { + let entry_id = producer + .produce(&self.client_manager) + .await + .map(TryInto::try_into)??; + Ok((region_id, entry_id)) + }, + )) + .await? + .into_iter() + .collect::>(); + + debug!("Append batch result: {:?}", last_entry_ids); + + Ok(AppendBatchResponse { last_entry_ids }) } /// Creates a new `EntryStream` to asynchronously generates `Entry` with entry ids @@ -186,7 +185,7 @@ impl LogStore for KafkaLogStore { record_offset, ns_clone, high_watermark ); - // Ignores the noop record. + // Ignores noop records. if record.record.value.is_none() { continue; } From 6070e880776e0216796a0155420ed7dafb5d18a0 Mon Sep 17 00:00:00 2001 From: dimbtp Date: Sun, 31 Dec 2023 10:08:16 +0800 Subject: [PATCH 5/5] feat: add information_schema.files (#3054) * feat: add information_schema.files * fix: update information_schema.result * fix: change `EXTRA` field type to string --- src/catalog/src/information_schema.rs | 2 + .../information_schema/memory_table/tables.rs | 44 +++++++++ .../src/information_schema/table_names.rs | 1 + src/common/catalog/src/consts.rs | 4 +- .../common/show/show_databases_tables.result | 1 + .../common/system/information_schema.result | 95 +++++++++++++------ 6 files changed, 118 insertions(+), 29 deletions(-) diff --git a/src/catalog/src/information_schema.rs b/src/catalog/src/information_schema.rs index 232c2279d938..92427552425f 100644 --- a/src/catalog/src/information_schema.rs +++ b/src/catalog/src/information_schema.rs @@ -58,6 +58,7 @@ lazy_static! { COLLATION_CHARACTER_SET_APPLICABILITY, CHECK_CONSTRAINTS, EVENTS, + FILES, ]; } @@ -171,6 +172,7 @@ impl InformationSchemaProvider { } CHECK_CONSTRAINTS => setup_memory_table!(CHECK_CONSTRAINTS), EVENTS => setup_memory_table!(EVENTS), + FILES => setup_memory_table!(FILES), SCHEMATA => Some(Arc::new(InformationSchemaSchemata::new( self.catalog_name.clone(), self.catalog_manager.clone(), diff --git a/src/catalog/src/information_schema/memory_table/tables.rs b/src/catalog/src/information_schema/memory_table/tables.rs index abb719ca1b4b..30be1fbaa748 100644 --- a/src/catalog/src/information_schema/memory_table/tables.rs +++ b/src/catalog/src/information_schema/memory_table/tables.rs @@ -183,6 +183,50 @@ pub fn get_schema_columns(table_name: &str) -> (SchemaRef, Vec) { vec![], ), + FILES => ( + vec![ + bigint_column("FILE_ID"), + string_column("FILE_NAME"), + string_column("FILE_TYPE"), + string_column("TABLESPACE_NAME"), + string_column("TABLE_CATALOG"), + string_column("TABLE_SCHEMA"), + string_column("TABLE_NAME"), + string_column("LOGFILE_GROUP_NAME"), + bigint_column("LOGFILE_GROUP_NUMBER"), + string_column("ENGINE"), + string_column("FULLTEXT_KEYS"), + bigint_column("DELETED_ROWS"), + bigint_column("UPDATE_COUNT"), + bigint_column("FREE_EXTENTS"), + bigint_column("TOTAL_EXTENTS"), + bigint_column("EXTENT_SIZE"), + bigint_column("INITIAL_SIZE"), + bigint_column("MAXIMUM_SIZE"), + bigint_column("AUTOEXTEND_SIZE"), + datetime_column("CREATION_TIME"), + datetime_column("LAST_UPDATE_TIME"), + datetime_column("LAST_ACCESS_TIME"), + datetime_column("RECOVER_TIME"), + bigint_column("TRANSACTION_COUNTER"), + string_column("VERSION"), + string_column("ROW_FORMAT"), + bigint_column("TABLE_ROWS"), + bigint_column("AVG_ROW_LENGTH"), + bigint_column("DATA_LENGTH"), + bigint_column("MAX_DATA_LENGTH"), + bigint_column("INDEX_LENGTH"), + bigint_column("DATA_FREE"), + datetime_column("CREATE_TIME"), + datetime_column("UPDATE_TIME"), + datetime_column("CHECK_TIME"), + string_column("CHECKSUM"), + string_column("STATUS"), + string_column("EXTRA"), + ], + vec![], + ), + _ => unreachable!("Unknown table in information_schema: {}", table_name), }; diff --git a/src/catalog/src/information_schema/table_names.rs b/src/catalog/src/information_schema/table_names.rs index 73ef00b81bd3..bfdc56d89217 100644 --- a/src/catalog/src/information_schema/table_names.rs +++ b/src/catalog/src/information_schema/table_names.rs @@ -25,4 +25,5 @@ pub const COLLATIONS: &str = "collations"; pub const COLLATION_CHARACTER_SET_APPLICABILITY: &str = "collation_character_set_applicability"; pub const CHECK_CONSTRAINTS: &str = "check_constraints"; pub const EVENTS: &str = "events"; +pub const FILES: &str = "files"; pub const SCHEMATA: &str = "schemata"; diff --git a/src/common/catalog/src/consts.rs b/src/common/catalog/src/consts.rs index 75c176112db2..9e8b9e4a0768 100644 --- a/src/common/catalog/src/consts.rs +++ b/src/common/catalog/src/consts.rs @@ -54,8 +54,10 @@ pub const INFORMATION_SCHEMA_COLLATION_CHARACTER_SET_APPLICABILITY_TABLE_ID: u32 pub const INFORMATION_SCHEMA_CHECK_CONSTRAINTS_TABLE_ID: u32 = 12; /// id for information_schema.EVENTS pub const INFORMATION_SCHEMA_EVENTS_TABLE_ID: u32 = 13; +/// id for information_schema.FILES +pub const INFORMATION_SCHEMA_FILES_TABLE_ID: u32 = 14; /// id for information_schema.SCHEMATA -pub const INFORMATION_SCHEMA_SCHEMATA_TABLE_ID: u32 = 14; +pub const INFORMATION_SCHEMA_SCHEMATA_TABLE_ID: u32 = 15; /// ----- End of information_schema tables ----- pub const MITO_ENGINE: &str = "mito"; diff --git a/tests/cases/standalone/common/show/show_databases_tables.result b/tests/cases/standalone/common/show/show_databases_tables.result index a407564c3b4c..8a1e606ad110 100644 --- a/tests/cases/standalone/common/show/show_databases_tables.result +++ b/tests/cases/standalone/common/show/show_databases_tables.result @@ -30,6 +30,7 @@ show tables; | columns | | engines | | events | +| files | | schemata | | tables | +---------------------------------------+ diff --git a/tests/cases/standalone/common/system/information_schema.result b/tests/cases/standalone/common/system/information_schema.result index c7fc6543ee95..a97660ee5839 100644 --- a/tests/cases/standalone/common/system/information_schema.result +++ b/tests/cases/standalone/common/system/information_schema.result @@ -22,7 +22,8 @@ order by table_schema, table_name; | greptime | information_schema | columns | LOCAL TEMPORARY | 4 | | | greptime | information_schema | engines | LOCAL TEMPORARY | 5 | | | greptime | information_schema | events | LOCAL TEMPORARY | 13 | | -| greptime | information_schema | schemata | LOCAL TEMPORARY | 14 | | +| greptime | information_schema | files | LOCAL TEMPORARY | 14 | | +| greptime | information_schema | schemata | LOCAL TEMPORARY | 15 | | | greptime | information_schema | tables | LOCAL TEMPORARY | 3 | | | greptime | public | numbers | LOCAL TEMPORARY | 2 | test_engine | +---------------+--------------------+---------------------------------------+-----------------+----------+-------------+ @@ -32,56 +33,56 @@ select * from information_schema.columns order by table_schema, table_name; +---------------+--------------------+---------------------------------------+----------------------------+-----------+---------------+----------------+-------------+-------------+----------------+ | table_catalog | table_schema | table_name | column_name | data_type | semantic_type | column_default | is_nullable | column_type | column_comment | +---------------+--------------------+---------------------------------------+----------------------------+-----------+---------------+----------------+-------------+-------------+----------------+ -| greptime | information_schema | build_info | pkg_version | String | FIELD | | No | String | | | greptime | information_schema | build_info | git_branch | String | FIELD | | No | String | | | greptime | information_schema | build_info | git_commit | String | FIELD | | No | String | | | greptime | information_schema | build_info | git_commit_short | String | FIELD | | No | String | | | greptime | information_schema | build_info | git_dirty | String | FIELD | | No | String | | -| greptime | information_schema | character_sets | description | String | FIELD | | No | String | | -| greptime | information_schema | character_sets | maxlen | Int64 | FIELD | | No | Int64 | | +| greptime | information_schema | build_info | pkg_version | String | FIELD | | No | String | | | greptime | information_schema | character_sets | default_collate_name | String | FIELD | | No | String | | | greptime | information_schema | character_sets | character_set_name | String | FIELD | | No | String | | -| greptime | information_schema | check_constraints | constraint_schema | String | FIELD | | No | String | | +| greptime | information_schema | character_sets | maxlen | Int64 | FIELD | | No | Int64 | | +| greptime | information_schema | character_sets | description | String | FIELD | | No | String | | | greptime | information_schema | check_constraints | check_clause | String | FIELD | | No | String | | | greptime | information_schema | check_constraints | constraint_name | String | FIELD | | No | String | | +| greptime | information_schema | check_constraints | constraint_schema | String | FIELD | | No | String | | | greptime | information_schema | check_constraints | constraint_catalog | String | FIELD | | No | String | | -| greptime | information_schema | collation_character_set_applicability | collation_name | String | FIELD | | No | String | | | greptime | information_schema | collation_character_set_applicability | character_set_name | String | FIELD | | No | String | | +| greptime | information_schema | collation_character_set_applicability | collation_name | String | FIELD | | No | String | | | greptime | information_schema | collations | collation_name | String | FIELD | | No | String | | | greptime | information_schema | collations | sortlen | Int64 | FIELD | | No | Int64 | | -| greptime | information_schema | collations | is_compiled | String | FIELD | | No | String | | -| greptime | information_schema | collations | is_default | String | FIELD | | No | String | | -| greptime | information_schema | collations | id | Int64 | FIELD | | No | Int64 | | | greptime | information_schema | collations | character_set_name | String | FIELD | | No | String | | -| greptime | information_schema | column_privileges | is_grantable | String | FIELD | | No | String | | -| greptime | information_schema | column_privileges | grantee | String | FIELD | | No | String | | -| greptime | information_schema | column_privileges | table_catalog | String | FIELD | | No | String | | +| greptime | information_schema | collations | id | Int64 | FIELD | | No | Int64 | | +| greptime | information_schema | collations | is_default | String | FIELD | | No | String | | +| greptime | information_schema | collations | is_compiled | String | FIELD | | No | String | | | greptime | information_schema | column_privileges | table_schema | String | FIELD | | No | String | | +| greptime | information_schema | column_privileges | table_catalog | String | FIELD | | No | String | | +| greptime | information_schema | column_privileges | grantee | String | FIELD | | No | String | | | greptime | information_schema | column_privileges | table_name | String | FIELD | | No | String | | | greptime | information_schema | column_privileges | column_name | String | FIELD | | No | String | | | greptime | information_schema | column_privileges | privilege_type | String | FIELD | | No | String | | -| greptime | information_schema | column_statistics | table_name | String | FIELD | | No | String | | +| greptime | information_schema | column_privileges | is_grantable | String | FIELD | | No | String | | +| greptime | information_schema | column_statistics | histogram | String | FIELD | | No | String | | | greptime | information_schema | column_statistics | schema_name | String | FIELD | | No | String | | +| greptime | information_schema | column_statistics | table_name | String | FIELD | | No | String | | | greptime | information_schema | column_statistics | column_name | String | FIELD | | No | String | | -| greptime | information_schema | column_statistics | histogram | String | FIELD | | No | String | | -| greptime | information_schema | columns | table_catalog | String | FIELD | | No | String | | -| greptime | information_schema | columns | column_comment | String | FIELD | | Yes | String | | | greptime | information_schema | columns | column_type | String | FIELD | | No | String | | +| greptime | information_schema | columns | column_comment | String | FIELD | | Yes | String | | +| greptime | information_schema | columns | table_schema | String | FIELD | | No | String | | | greptime | information_schema | columns | is_nullable | String | FIELD | | No | String | | | greptime | information_schema | columns | column_default | String | FIELD | | Yes | String | | | greptime | information_schema | columns | semantic_type | String | FIELD | | No | String | | +| greptime | information_schema | columns | table_catalog | String | FIELD | | No | String | | | greptime | information_schema | columns | data_type | String | FIELD | | No | String | | | greptime | information_schema | columns | column_name | String | FIELD | | No | String | | | greptime | information_schema | columns | table_name | String | FIELD | | No | String | | -| greptime | information_schema | columns | table_schema | String | FIELD | | No | String | | -| greptime | information_schema | engines | xa | String | FIELD | | No | String | | -| greptime | information_schema | engines | support | String | FIELD | | No | String | | | greptime | information_schema | engines | savepoints | String | FIELD | | No | String | | -| greptime | information_schema | engines | engine | String | FIELD | | No | String | | +| greptime | information_schema | engines | xa | String | FIELD | | No | String | | | greptime | information_schema | engines | transactions | String | FIELD | | No | String | | | greptime | information_schema | engines | comment | String | FIELD | | No | String | | -| greptime | information_schema | events | event_comment | String | FIELD | | No | String | | -| greptime | information_schema | events | sql_mode | String | FIELD | | No | String | | +| greptime | information_schema | engines | support | String | FIELD | | No | String | | +| greptime | information_schema | engines | engine | String | FIELD | | No | String | | +| greptime | information_schema | events | event_definition | String | FIELD | | No | String | | +| greptime | information_schema | events | last_executed | DateTime | FIELD | | No | DateTime | | | greptime | information_schema | events | database_collation | String | FIELD | | No | String | | | greptime | information_schema | events | collation_connection | String | FIELD | | No | String | | | greptime | information_schema | events | character_set_client | String | FIELD | | No | String | | @@ -92,29 +93,67 @@ select * from information_schema.columns order by table_schema, table_name; | greptime | information_schema | events | definer | String | FIELD | | No | String | | | greptime | information_schema | events | time_zone | String | FIELD | | No | String | | | greptime | information_schema | events | event_body | String | FIELD | | No | String | | -| greptime | information_schema | events | event_definition | String | FIELD | | No | String | | +| greptime | information_schema | events | event_comment | String | FIELD | | No | String | | | greptime | information_schema | events | event_type | String | FIELD | | No | String | | | greptime | information_schema | events | execute_at | DateTime | FIELD | | No | DateTime | | | greptime | information_schema | events | interval_value | Int64 | FIELD | | No | Int64 | | | greptime | information_schema | events | interval_field | String | FIELD | | No | String | | -| greptime | information_schema | events | last_executed | DateTime | FIELD | | No | DateTime | | +| greptime | information_schema | events | sql_mode | String | FIELD | | No | String | | | greptime | information_schema | events | starts | DateTime | FIELD | | No | DateTime | | | greptime | information_schema | events | ends | DateTime | FIELD | | No | DateTime | | | greptime | information_schema | events | status | String | FIELD | | No | String | | | greptime | information_schema | events | on_completion | String | FIELD | | No | String | | | greptime | information_schema | events | created | DateTime | FIELD | | No | DateTime | | | greptime | information_schema | events | last_altered | DateTime | FIELD | | No | DateTime | | +| greptime | information_schema | files | logfile_group_name | String | FIELD | | No | String | | +| greptime | information_schema | files | data_free | Int64 | FIELD | | No | Int64 | | +| greptime | information_schema | files | extra | String | FIELD | | No | String | | +| greptime | information_schema | files | status | String | FIELD | | No | String | | +| greptime | information_schema | files | checksum | String | FIELD | | No | String | | +| greptime | information_schema | files | check_time | DateTime | FIELD | | No | DateTime | | +| greptime | information_schema | files | file_id | Int64 | FIELD | | No | Int64 | | +| greptime | information_schema | files | file_name | String | FIELD | | No | String | | +| greptime | information_schema | files | file_type | String | FIELD | | No | String | | +| greptime | information_schema | files | tablespace_name | String | FIELD | | No | String | | +| greptime | information_schema | files | table_catalog | String | FIELD | | No | String | | +| greptime | information_schema | files | table_schema | String | FIELD | | No | String | | +| greptime | information_schema | files | table_name | String | FIELD | | No | String | | +| greptime | information_schema | files | update_time | DateTime | FIELD | | No | DateTime | | +| greptime | information_schema | files | logfile_group_number | Int64 | FIELD | | No | Int64 | | +| greptime | information_schema | files | engine | String | FIELD | | No | String | | +| greptime | information_schema | files | fulltext_keys | String | FIELD | | No | String | | +| greptime | information_schema | files | deleted_rows | Int64 | FIELD | | No | Int64 | | +| greptime | information_schema | files | update_count | Int64 | FIELD | | No | Int64 | | +| greptime | information_schema | files | free_extents | Int64 | FIELD | | No | Int64 | | +| greptime | information_schema | files | total_extents | Int64 | FIELD | | No | Int64 | | +| greptime | information_schema | files | extent_size | Int64 | FIELD | | No | Int64 | | +| greptime | information_schema | files | initial_size | Int64 | FIELD | | No | Int64 | | +| greptime | information_schema | files | maximum_size | Int64 | FIELD | | No | Int64 | | +| greptime | information_schema | files | autoextend_size | Int64 | FIELD | | No | Int64 | | +| greptime | information_schema | files | creation_time | DateTime | FIELD | | No | DateTime | | +| greptime | information_schema | files | last_update_time | DateTime | FIELD | | No | DateTime | | +| greptime | information_schema | files | last_access_time | DateTime | FIELD | | No | DateTime | | +| greptime | information_schema | files | recover_time | DateTime | FIELD | | No | DateTime | | +| greptime | information_schema | files | transaction_counter | Int64 | FIELD | | No | Int64 | | +| greptime | information_schema | files | version | String | FIELD | | No | String | | +| greptime | information_schema | files | row_format | String | FIELD | | No | String | | +| greptime | information_schema | files | table_rows | Int64 | FIELD | | No | Int64 | | +| greptime | information_schema | files | avg_row_length | Int64 | FIELD | | No | Int64 | | +| greptime | information_schema | files | data_length | Int64 | FIELD | | No | Int64 | | +| greptime | information_schema | files | max_data_length | Int64 | FIELD | | No | Int64 | | +| greptime | information_schema | files | index_length | Int64 | FIELD | | No | Int64 | | +| greptime | information_schema | files | create_time | DateTime | FIELD | | No | DateTime | | | greptime | information_schema | schemata | catalog_name | String | FIELD | | No | String | | | greptime | information_schema | schemata | schema_name | String | FIELD | | No | String | | | greptime | information_schema | schemata | default_character_set_name | String | FIELD | | No | String | | | greptime | information_schema | schemata | default_collation_name | String | FIELD | | No | String | | | greptime | information_schema | schemata | sql_path | String | FIELD | | Yes | String | | +| greptime | information_schema | tables | engine | String | FIELD | | Yes | String | | | greptime | information_schema | tables | table_catalog | String | FIELD | | No | String | | -| greptime | information_schema | tables | table_schema | String | FIELD | | No | String | | -| greptime | information_schema | tables | table_name | String | FIELD | | No | String | | -| greptime | information_schema | tables | table_type | String | FIELD | | No | String | | | greptime | information_schema | tables | table_id | UInt32 | FIELD | | Yes | UInt32 | | -| greptime | information_schema | tables | engine | String | FIELD | | Yes | String | | +| greptime | information_schema | tables | table_type | String | FIELD | | No | String | | +| greptime | information_schema | tables | table_name | String | FIELD | | No | String | | +| greptime | information_schema | tables | table_schema | String | FIELD | | No | String | | | greptime | public | numbers | number | UInt32 | TAG | | No | UInt32 | | +---------------+--------------------+---------------------------------------+----------------------------+-----------+---------------+----------------+-------------+-------------+----------------+