From 72c4e438cef54c662f2a426c6af80d6958043db1 Mon Sep 17 00:00:00 2001 From: luofucong Date: Tue, 12 Sep 2023 23:11:49 +0800 Subject: [PATCH 1/4] refactor: 1. remove TableIdent, use TableId directly 2. use the latest greptime-proto 3. independently invalidate table id cache and table name cache --- Cargo.lock | 2 +- Cargo.toml | 2 +- benchmarks/src/bin/nyc-taxi.rs | 4 +- src/api/src/helper.rs | 88 +++--- src/catalog/src/error.rs | 5 +- src/client/examples/logical.rs | 1 - src/client/examples/stream_ingest.rs | 3 +- src/common/grpc-expr/src/alter.rs | 68 +---- src/common/grpc-expr/src/delete.rs | 1 - src/common/grpc-expr/src/error.rs | 24 +- src/common/grpc-expr/src/insert.rs | 89 +----- src/common/grpc-expr/src/lib.rs | 2 +- src/common/grpc-expr/src/util.rs | 2 - src/common/grpc/src/select.rs | 8 +- src/common/grpc/src/writer.rs | 4 +- src/common/meta/src/cache_invalidator.rs | 16 +- src/common/meta/src/ddl/alter_table.rs | 38 +-- src/common/meta/src/ddl/drop_table.rs | 32 +-- src/common/meta/src/ident.rs | 78 ----- src/common/meta/src/instruction.rs | 65 ++--- src/common/meta/src/lib.rs | 1 - src/common/meta/src/table_name.rs | 6 + src/datanode/src/error.rs | 5 +- src/datanode/src/heartbeat/handler.rs | 14 +- src/datanode/src/region_server.rs | 6 +- src/datanode/src/tests.rs | 17 +- src/frontend/src/catalog.rs | 16 +- src/frontend/src/expr_factory.rs | 2 - .../handler/invalidate_table_cache.rs | 47 +-- src/frontend/src/heartbeat/handler/tests.rs | 22 +- src/frontend/src/req_convert/common.rs | 20 +- .../src/req_convert/delete/column_to_row.rs | 1 - .../src/req_convert/insert/column_to_row.rs | 1 - src/frontend/src/script.rs | 1 - src/frontend/src/statement/ddl.rs | 37 +-- src/meta-srv/src/cache_invalidator.rs | 23 +- src/meta-srv/src/error.rs | 5 +- src/meta-srv/src/handler/failure_handler.rs | 9 +- .../src/handler/failure_handler/runner.rs | 26 +- .../src/handler/region_lease_handler.rs | 11 +- src/meta-srv/src/lock/keys.rs | 8 +- src/meta-srv/src/metasrv.rs | 5 +- src/meta-srv/src/metasrv/builder.rs | 8 +- src/meta-srv/src/procedure/region_failover.rs | 42 +-- .../region_failover/failover_start.rs | 11 +- .../region_failover/invalidate_cache.rs | 17 +- .../region_failover/update_metadata.rs | 14 +- src/meta-srv/src/procedure/tests.rs | 1 - src/meta-srv/src/selector/load_based.rs | 35 +-- src/meta-srv/src/service/admin/route.rs | 9 +- src/meta-srv/src/table_routes.rs | 6 +- src/meta-srv/src/test_util.rs | 4 +- src/mito2/src/engine/alter_test.rs | 2 +- src/mito2/src/memtable/time_series.rs | 2 +- src/mito2/src/test_util.rs | 8 +- src/mito2/src/wal.rs | 2 +- src/query/src/dist_plan/merge_scan.rs | 1 + src/servers/src/influxdb.rs | 267 +----------------- src/servers/src/opentsdb/codec.rs | 9 +- src/servers/src/otlp.rs | 5 - src/servers/src/prom_store.rs | 187 +----------- src/servers/src/row_writer.rs | 5 +- src/servers/tests/http/influxdb_test.rs | 4 +- src/servers/tests/interceptor.rs | 11 +- src/table/src/metadata.rs | 2 + 65 files changed, 317 insertions(+), 1150 deletions(-) delete mode 100644 src/common/meta/src/ident.rs diff --git a/Cargo.lock b/Cargo.lock index af851888376c..3223760fdce9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4103,7 +4103,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=1969b5d610209ad1c2ad8e4cd3e9c3c24e40f1c2#1969b5d610209ad1c2ad8e4cd3e9c3c24e40f1c2" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=81495b166b2c8909f05b3fcaa09eb299bb43a995#81495b166b2c8909f05b3fcaa09eb299bb43a995" dependencies = [ "prost", "serde", diff --git a/Cargo.toml b/Cargo.toml index be1cf2d8bdd2..b377eef2f5f7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,7 +78,7 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git derive_builder = "0.12" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "1969b5d610209ad1c2ad8e4cd3e9c3c24e40f1c2" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "81495b166b2c8909f05b3fcaa09eb299bb43a995" } humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4" diff --git a/benchmarks/src/bin/nyc-taxi.rs b/benchmarks/src/bin/nyc-taxi.rs index 1d24f5741db9..648541db1e24 100644 --- a/benchmarks/src/bin/nyc-taxi.rs +++ b/benchmarks/src/bin/nyc-taxi.rs @@ -105,7 +105,6 @@ async fn write_data( let (columns, row_count) = convert_record_batch(record_batch); let request = InsertRequest { table_name: TABLE_NAME.to_string(), - region_number: 0, columns, row_count, }; @@ -189,7 +188,7 @@ fn build_values(column: &ArrayRef) -> (Values, ColumnDataType) { let values = array.values(); ( Values { - ts_microsecond_values: values.to_vec(), + timestamp_microsecond_values: values.to_vec(), ..Default::default() }, ColumnDataType::TimestampMicrosecond, @@ -389,7 +388,6 @@ fn create_table_expr() -> CreateTableExpr { primary_keys: vec!["VendorID".to_string()], create_if_not_exists: false, table_options: Default::default(), - region_numbers: vec![0], table_id: None, engine: "mito".to_string(), } diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index 17aa41d0ec33..f9ac18ed0c14 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -214,19 +214,19 @@ pub fn values_with_capacity(datatype: ColumnDataType, capacity: usize) -> Values ..Default::default() }, ColumnDataType::TimestampSecond => Values { - ts_second_values: Vec::with_capacity(capacity), + timestamp_second_values: Vec::with_capacity(capacity), ..Default::default() }, ColumnDataType::TimestampMillisecond => Values { - ts_millisecond_values: Vec::with_capacity(capacity), + timestamp_millisecond_values: Vec::with_capacity(capacity), ..Default::default() }, ColumnDataType::TimestampMicrosecond => Values { - ts_microsecond_values: Vec::with_capacity(capacity), + timestamp_microsecond_values: Vec::with_capacity(capacity), ..Default::default() }, ColumnDataType::TimestampNanosecond => Values { - ts_nanosecond_values: Vec::with_capacity(capacity), + timestamp_nanosecond_values: Vec::with_capacity(capacity), ..Default::default() }, ColumnDataType::TimeSecond => Values { @@ -286,10 +286,10 @@ pub fn push_vals(column: &mut Column, origin_count: usize, vector: VectorRef) { Value::Date(val) => values.date_values.push(val.val()), Value::DateTime(val) => values.datetime_values.push(val.val()), Value::Timestamp(val) => match val.unit() { - TimeUnit::Second => values.ts_second_values.push(val.value()), - TimeUnit::Millisecond => values.ts_millisecond_values.push(val.value()), - TimeUnit::Microsecond => values.ts_microsecond_values.push(val.value()), - TimeUnit::Nanosecond => values.ts_nanosecond_values.push(val.value()), + TimeUnit::Second => values.timestamp_second_values.push(val.value()), + TimeUnit::Millisecond => values.timestamp_millisecond_values.push(val.value()), + TimeUnit::Microsecond => values.timestamp_microsecond_values.push(val.value()), + TimeUnit::Nanosecond => values.timestamp_nanosecond_values.push(val.value()), }, Value::Time(val) => match val.unit() { TimeUnit::Second => values.time_second_values.push(val.value()), @@ -375,10 +375,16 @@ pub fn pb_value_to_value_ref(value: &v1::Value) -> ValueRef { ValueData::StringValue(string) => ValueRef::String(string.as_str()), ValueData::DateValue(d) => ValueRef::Date(Date::from(*d)), ValueData::DatetimeValue(d) => ValueRef::DateTime(DateTime::new(*d)), - ValueData::TsSecondValue(t) => ValueRef::Timestamp(Timestamp::new_second(*t)), - ValueData::TsMillisecondValue(t) => ValueRef::Timestamp(Timestamp::new_millisecond(*t)), - ValueData::TsMicrosecondValue(t) => ValueRef::Timestamp(Timestamp::new_microsecond(*t)), - ValueData::TsNanosecondValue(t) => ValueRef::Timestamp(Timestamp::new_nanosecond(*t)), + ValueData::TimestampSecondValue(t) => ValueRef::Timestamp(Timestamp::new_second(*t)), + ValueData::TimestampMillisecondValue(t) => { + ValueRef::Timestamp(Timestamp::new_millisecond(*t)) + } + ValueData::TimestampMicrosecondValue(t) => { + ValueRef::Timestamp(Timestamp::new_microsecond(*t)) + } + ValueData::TimestampNanosecondValue(t) => { + ValueRef::Timestamp(Timestamp::new_nanosecond(*t)) + } ValueData::TimeSecondValue(t) => ValueRef::Time(Time::new_second(*t)), ValueData::TimeMillisecondValue(t) => ValueRef::Time(Time::new_millisecond(*t)), ValueData::TimeMicrosecondValue(t) => ValueRef::Time(Time::new_microsecond(*t)), @@ -418,17 +424,17 @@ pub fn pb_values_to_vector_ref(data_type: &ConcreteDataType, values: Values) -> ConcreteDataType::Date(_) => Arc::new(DateVector::from_vec(values.date_values)), ConcreteDataType::DateTime(_) => Arc::new(DateTimeVector::from_vec(values.datetime_values)), ConcreteDataType::Timestamp(unit) => match unit { - TimestampType::Second(_) => { - Arc::new(TimestampSecondVector::from_vec(values.ts_second_values)) - } + TimestampType::Second(_) => Arc::new(TimestampSecondVector::from_vec( + values.timestamp_second_values, + )), TimestampType::Millisecond(_) => Arc::new(TimestampMillisecondVector::from_vec( - values.ts_millisecond_values, + values.timestamp_millisecond_values, )), TimestampType::Microsecond(_) => Arc::new(TimestampMicrosecondVector::from_vec( - values.ts_microsecond_values, + values.timestamp_microsecond_values, )), TimestampType::Nanosecond(_) => Arc::new(TimestampNanosecondVector::from_vec( - values.ts_nanosecond_values, + values.timestamp_nanosecond_values, )), }, ConcreteDataType::Time(unit) => match unit { @@ -550,22 +556,22 @@ pub fn pb_values_to_values(data_type: &ConcreteDataType, values: Values) -> Vec< .map(|v| Value::Date(v.into())) .collect(), ConcreteDataType::Timestamp(TimestampType::Second(_)) => values - .ts_second_values + .timestamp_second_values .into_iter() .map(|v| Value::Timestamp(Timestamp::new_second(v))) .collect(), ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => values - .ts_millisecond_values + .timestamp_millisecond_values .into_iter() .map(|v| Value::Timestamp(Timestamp::new_millisecond(v))) .collect(), ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => values - .ts_microsecond_values + .timestamp_microsecond_values .into_iter() .map(|v| Value::Timestamp(Timestamp::new_microsecond(v))) .collect(), ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => values - .ts_nanosecond_values + .timestamp_nanosecond_values .into_iter() .map(|v| Value::Timestamp(Timestamp::new_nanosecond(v))) .collect(), @@ -682,16 +688,16 @@ pub fn to_proto_value(value: Value) -> Option { }, Value::Timestamp(v) => match v.unit() { TimeUnit::Second => v1::Value { - value_data: Some(ValueData::TsSecondValue(v.value())), + value_data: Some(ValueData::TimestampSecondValue(v.value())), }, TimeUnit::Millisecond => v1::Value { - value_data: Some(ValueData::TsMillisecondValue(v.value())), + value_data: Some(ValueData::TimestampMillisecondValue(v.value())), }, TimeUnit::Microsecond => v1::Value { - value_data: Some(ValueData::TsMicrosecondValue(v.value())), + value_data: Some(ValueData::TimestampMicrosecondValue(v.value())), }, TimeUnit::Nanosecond => v1::Value { - value_data: Some(ValueData::TsNanosecondValue(v.value())), + value_data: Some(ValueData::TimestampNanosecondValue(v.value())), }, }, Value::Time(v) => match v.unit() { @@ -747,10 +753,10 @@ pub fn proto_value_type(value: &v1::Value) -> Option { ValueData::StringValue(_) => ColumnDataType::String, ValueData::DateValue(_) => ColumnDataType::Date, ValueData::DatetimeValue(_) => ColumnDataType::Datetime, - ValueData::TsSecondValue(_) => ColumnDataType::TimestampSecond, - ValueData::TsMillisecondValue(_) => ColumnDataType::TimestampMillisecond, - ValueData::TsMicrosecondValue(_) => ColumnDataType::TimestampMicrosecond, - ValueData::TsNanosecondValue(_) => ColumnDataType::TimestampNanosecond, + ValueData::TimestampSecondValue(_) => ColumnDataType::TimestampSecond, + ValueData::TimestampMillisecondValue(_) => ColumnDataType::TimestampMillisecond, + ValueData::TimestampMicrosecondValue(_) => ColumnDataType::TimestampMicrosecond, + ValueData::TimestampNanosecondValue(_) => ColumnDataType::TimestampNanosecond, ValueData::TimeSecondValue(_) => ColumnDataType::TimeSecond, ValueData::TimeMillisecondValue(_) => ColumnDataType::TimeMillisecond, ValueData::TimeMicrosecondValue(_) => ColumnDataType::TimeMicrosecond, @@ -837,10 +843,10 @@ pub fn value_to_grpc_value(value: Value) -> GrpcValue { Value::Date(v) => Some(ValueData::DateValue(v.val())), Value::DateTime(v) => Some(ValueData::DatetimeValue(v.val())), Value::Timestamp(v) => Some(match v.unit() { - TimeUnit::Second => ValueData::TsSecondValue(v.value()), - TimeUnit::Millisecond => ValueData::TsMillisecondValue(v.value()), - TimeUnit::Microsecond => ValueData::TsMicrosecondValue(v.value()), - TimeUnit::Nanosecond => ValueData::TsNanosecondValue(v.value()), + TimeUnit::Second => ValueData::TimestampSecondValue(v.value()), + TimeUnit::Millisecond => ValueData::TimestampMillisecondValue(v.value()), + TimeUnit::Microsecond => ValueData::TimestampMicrosecondValue(v.value()), + TimeUnit::Nanosecond => ValueData::TimestampNanosecondValue(v.value()), }), Value::Time(v) => Some(match v.unit() { TimeUnit::Second => ValueData::TimeSecondValue(v.value()), @@ -943,7 +949,7 @@ mod tests { assert_eq!(2, values.capacity()); let values = values_with_capacity(ColumnDataType::TimestampMillisecond, 2); - let values = values.ts_millisecond_values; + let values = values.timestamp_millisecond_values; assert_eq!(2, values.capacity()); let values = values_with_capacity(ColumnDataType::TimeMillisecond, 2); @@ -1162,28 +1168,28 @@ mod tests { push_vals(&mut column, 3, vector); assert_eq!( vec![1, 2, 3], - column.values.as_ref().unwrap().ts_nanosecond_values + column.values.as_ref().unwrap().timestamp_nanosecond_values ); let vector = Arc::new(TimestampMillisecondVector::from_vec(vec![4, 5, 6])); push_vals(&mut column, 3, vector); assert_eq!( vec![4, 5, 6], - column.values.as_ref().unwrap().ts_millisecond_values + column.values.as_ref().unwrap().timestamp_millisecond_values ); let vector = Arc::new(TimestampMicrosecondVector::from_vec(vec![7, 8, 9])); push_vals(&mut column, 3, vector); assert_eq!( vec![7, 8, 9], - column.values.as_ref().unwrap().ts_microsecond_values + column.values.as_ref().unwrap().timestamp_microsecond_values ); let vector = Arc::new(TimestampSecondVector::from_vec(vec![10, 11, 12])); push_vals(&mut column, 3, vector); assert_eq!( vec![10, 11, 12], - column.values.as_ref().unwrap().ts_second_values + column.values.as_ref().unwrap().timestamp_second_values ); } @@ -1312,7 +1318,7 @@ mod tests { let actual = pb_values_to_values( &ConcreteDataType::Timestamp(TimestampType::Second(TimestampSecondType)), Values { - ts_second_values: vec![1_i64, 2_i64, 3_i64], + timestamp_second_values: vec![1_i64, 2_i64, 3_i64], ..Default::default() }, ); @@ -1327,7 +1333,7 @@ mod tests { let actual = pb_values_to_values( &ConcreteDataType::Timestamp(TimestampType::Millisecond(TimestampMillisecondType)), Values { - ts_millisecond_values: vec![1_i64, 2_i64, 3_i64], + timestamp_millisecond_values: vec![1_i64, 2_i64, 3_i64], ..Default::default() }, ); diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index d97ccd544a4f..83b51e8c8331 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -20,6 +20,7 @@ use common_error::status_code::StatusCode; use datafusion::error::DataFusionError; use datatypes::prelude::ConcreteDataType; use snafu::{Location, Snafu}; +use table::metadata::TableId; use tokio::task::JoinError; #[derive(Debug, Snafu)] @@ -140,9 +141,9 @@ pub enum Error { #[snafu(display("Operation {} not supported", op))] NotSupported { op: String, location: Location }, - #[snafu(display("Failed to open table, table info: {}, source: {}", table_info, source))] + #[snafu(display("Failed to open table {table_id}, source: {source}, at {location}"))] OpenTable { - table_info: String, + table_id: TableId, location: Location, source: table::error::Error, }, diff --git a/src/client/examples/logical.rs b/src/client/examples/logical.rs index 0524d562ec9f..d13c4844c072 100644 --- a/src/client/examples/logical.rs +++ b/src/client/examples/logical.rs @@ -66,7 +66,6 @@ async fn run() { create_if_not_exists: false, table_options: Default::default(), table_id: Some(TableId { id: 1024 }), - region_numbers: vec![0], engine: MITO_ENGINE.to_string(), }; diff --git a/src/client/examples/stream_ingest.rs b/src/client/examples/stream_ingest.rs index 827f8e85c014..7b67cf33931e 100644 --- a/src/client/examples/stream_ingest.rs +++ b/src/client/examples/stream_ingest.rs @@ -131,7 +131,7 @@ fn to_insert_request(records: Vec) -> InsertRequest { Column { column_name: "ts".to_owned(), values: Some(column::Values { - ts_millisecond_values: timestamp_millis, + timestamp_millisecond_values: timestamp_millis, ..Default::default() }), semantic_type: SemanticType::Timestamp as i32, @@ -177,6 +177,5 @@ fn to_insert_request(records: Vec) -> InsertRequest { table_name: "weather_demo".to_owned(), columns, row_count: rows as u32, - ..Default::default() } } diff --git a/src/common/grpc-expr/src/alter.rs b/src/common/grpc-expr/src/alter.rs index cb5cafb91927..8e6ef3bc9af2 100644 --- a/src/common/grpc-expr/src/alter.rs +++ b/src/common/grpc-expr/src/alter.rs @@ -18,18 +18,15 @@ use api::v1::{ column_def, AddColumnLocation as Location, AlterExpr, CreateTableExpr, DropColumns, RenameTable, SemanticType, }; -use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_query::AddColumnLocation; use datatypes::schema::{ColumnSchema, RawSchema}; use snafu::{ensure, OptionExt, ResultExt}; use table::metadata::TableId; -use table::requests::{ - AddColumnRequest, AlterKind, AlterTableRequest, CreateTableRequest, TableOptions, -}; +use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest}; use crate::error::{ - ColumnNotFoundSnafu, InvalidColumnDefSnafu, MissingFieldSnafu, MissingTimestampColumnSnafu, - Result, UnknownLocationTypeSnafu, UnrecognizedTableOptionSnafu, + InvalidColumnDefSnafu, MissingFieldSnafu, MissingTimestampColumnSnafu, Result, + UnknownLocationTypeSnafu, }; const LOCATION_TYPE_FIRST: i32 = LocationType::First as i32; @@ -121,65 +118,6 @@ pub fn create_table_schema(expr: &CreateTableExpr, require_time_index: bool) -> Ok(RawSchema::new(column_schemas)) } -pub fn create_expr_to_request( - table_id: TableId, - expr: CreateTableExpr, - require_time_index: bool, -) -> Result { - let schema = create_table_schema(&expr, require_time_index)?; - let primary_key_indices = expr - .primary_keys - .iter() - .map(|key| { - // We do a linear search here. - schema - .column_schemas - .iter() - .position(|column_schema| column_schema.name == *key) - .context(ColumnNotFoundSnafu { - column_name: key, - table_name: &expr.table_name, - }) - }) - .collect::>>()?; - - let mut catalog_name = expr.catalog_name; - if catalog_name.is_empty() { - catalog_name = DEFAULT_CATALOG_NAME.to_string(); - } - let mut schema_name = expr.schema_name; - if schema_name.is_empty() { - schema_name = DEFAULT_SCHEMA_NAME.to_string(); - } - let desc = if expr.desc.is_empty() { - None - } else { - Some(expr.desc) - }; - - let region_numbers = if expr.region_numbers.is_empty() { - vec![0] - } else { - expr.region_numbers - }; - - let table_options = - TableOptions::try_from(&expr.table_options).context(UnrecognizedTableOptionSnafu)?; - Ok(CreateTableRequest { - id: table_id, - catalog_name, - schema_name, - table_name: expr.table_name, - desc, - schema, - region_numbers, - primary_key_indices, - create_if_not_exists: expr.create_if_not_exists, - table_options, - engine: expr.engine, - }) -} - fn parse_location(location: Option) -> Result> { match location { Some(Location { diff --git a/src/common/grpc-expr/src/delete.rs b/src/common/grpc-expr/src/delete.rs index 18a480e8a4c3..d272b6aa0bc0 100644 --- a/src/common/grpc-expr/src/delete.rs +++ b/src/common/grpc-expr/src/delete.rs @@ -79,7 +79,6 @@ mod tests { fn test_to_table_delete_request() { let grpc_request = GrpcDeleteRequest { table_name: "foo".to_string(), - region_number: 0, key_columns: vec![ Column { column_name: "id".to_string(), diff --git a/src/common/grpc-expr/src/error.rs b/src/common/grpc-expr/src/error.rs index c364b7e9426d..d14cd8141f6c 100644 --- a/src/common/grpc-expr/src/error.rs +++ b/src/common/grpc-expr/src/error.rs @@ -21,12 +21,6 @@ use snafu::{Location, Snafu}; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { - #[snafu(display("Column `{}` not found in table `{}`", column_name, table_name))] - ColumnNotFound { - column_name: String, - table_name: String, - }, - #[snafu(display("Illegal delete request, reason: {reason}"))] IllegalDeleteRequest { reason: String, location: Location }, @@ -75,18 +69,9 @@ pub enum Error { source: api::error::Error, }, - #[snafu(display("Unrecognized table option: {}", source))] - UnrecognizedTableOption { - location: Location, - source: table::error::Error, - }, - #[snafu(display("Unexpected values length, reason: {}", reason))] UnexpectedValuesLength { reason: String, location: Location }, - #[snafu(display("The column name already exists, column: {}", column))] - ColumnAlreadyExists { column: String, location: Location }, - #[snafu(display("Unknown location type: {}", location_type))] UnknownLocationType { location_type: i32, @@ -99,8 +84,6 @@ pub type Result = std::result::Result; impl ErrorExt for Error { fn status_code(&self) -> StatusCode { match self { - Error::ColumnNotFound { .. } => StatusCode::TableColumnNotFound, - Error::IllegalDeleteRequest { .. } => StatusCode::InvalidArguments, Error::ColumnDataType { .. } => StatusCode::Internal, @@ -111,10 +94,9 @@ impl ErrorExt for Error { Error::CreateVector { .. } => StatusCode::InvalidArguments, Error::MissingField { .. } => StatusCode::InvalidArguments, Error::InvalidColumnDef { source, .. } => source.status_code(), - Error::UnrecognizedTableOption { .. } => StatusCode::InvalidArguments, - Error::UnexpectedValuesLength { .. } - | Error::ColumnAlreadyExists { .. } - | Error::UnknownLocationType { .. } => StatusCode::InvalidArguments, + Error::UnexpectedValuesLength { .. } | Error::UnknownLocationType { .. } => { + StatusCode::InvalidArguments + } } } diff --git a/src/common/grpc-expr/src/insert.rs b/src/common/grpc-expr/src/insert.rs index cad3313773aa..b2a0f4452528 100644 --- a/src/common/grpc-expr/src/insert.rs +++ b/src/common/grpc-expr/src/insert.rs @@ -12,12 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; - use api::helper; -use api::helper::ColumnDataTypeWrapper; use api::v1::column::Values; -use api::v1::{AddColumns, Column, CreateTableExpr, InsertRequest as GrpcInsertRequest}; +use api::v1::{AddColumns, Column, CreateTableExpr}; use common_base::BitVec; use datatypes::data_type::{ConcreteDataType, DataType}; use datatypes::prelude::VectorRef; @@ -25,12 +22,8 @@ use datatypes::schema::SchemaRef; use snafu::{ensure, ResultExt}; use table::engine::TableReference; use table::metadata::TableId; -use table::requests::InsertRequest; -use crate::error::{ - ColumnAlreadyExistsSnafu, ColumnDataTypeSnafu, CreateVectorSnafu, Result, - UnexpectedValuesLengthSnafu, -}; +use crate::error::{CreateVectorSnafu, Result, UnexpectedValuesLengthSnafu}; use crate::util; use crate::util::ColumnExpr; @@ -59,47 +52,6 @@ pub fn build_create_expr_from_insertion( ) } -pub fn to_table_insert_request( - catalog_name: &str, - schema_name: &str, - request: GrpcInsertRequest, -) -> Result { - let table_name = &request.table_name; - let row_count = request.row_count as usize; - - let mut columns_values = HashMap::with_capacity(request.columns.len()); - for Column { - column_name, - values, - null_mask, - datatype, - .. - } in request.columns - { - let Some(values) = values else { continue }; - - let datatype: ConcreteDataType = ColumnDataTypeWrapper::try_new(datatype) - .context(ColumnDataTypeSnafu)? - .into(); - let vector = add_values_to_builder(datatype, values, row_count, null_mask)?; - - ensure!( - columns_values.insert(column_name.clone(), vector).is_none(), - ColumnAlreadyExistsSnafu { - column: column_name - } - ); - } - - Ok(InsertRequest { - catalog_name: catalog_name.to_string(), - schema_name: schema_name.to_string(), - table_name: table_name.to_string(), - columns_values, - region_number: request.region_number, - }) -} - pub(crate) fn add_values_to_builder( data_type: ConcreteDataType, values: Values, @@ -150,10 +102,9 @@ mod tests { use common_base::BitVec; use common_catalog::consts::MITO_ENGINE; use common_time::interval::IntervalUnit; - use common_time::timestamp::{TimeUnit, Timestamp}; + use common_time::timestamp::TimeUnit; use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, SchemaBuilder}; - use datatypes::value::Value; use snafu::ResultExt; use super::*; @@ -356,38 +307,6 @@ mod tests { ); } - #[test] - fn test_to_table_insert_request() { - let (columns, row_count) = mock_insert_batch(); - let request = GrpcInsertRequest { - table_name: "demo".to_string(), - columns, - row_count, - region_number: 0, - }; - let insert_req = to_table_insert_request("greptime", "public", request).unwrap(); - - assert_eq!("greptime", insert_req.catalog_name); - assert_eq!("public", insert_req.schema_name); - assert_eq!("demo", insert_req.table_name); - - let host = insert_req.columns_values.get("host").unwrap(); - assert_eq!(Value::String("host1".into()), host.get(0)); - assert_eq!(Value::String("host2".into()), host.get(1)); - - let cpu = insert_req.columns_values.get("cpu").unwrap(); - assert_eq!(Value::Float64(0.31.into()), cpu.get(0)); - assert_eq!(Value::Null, cpu.get(1)); - - let memory = insert_req.columns_values.get("memory").unwrap(); - assert_eq!(Value::Null, memory.get(0)); - assert_eq!(Value::Float64(0.1.into()), memory.get(1)); - - let ts = insert_req.columns_values.get("ts").unwrap(); - assert_eq!(Value::Timestamp(Timestamp::new_millisecond(100)), ts.get(0)); - assert_eq!(Value::Timestamp(Timestamp::new_millisecond(101)), ts.get(1)); - } - #[test] fn test_is_null() { let null_mask = BitVec::from_slice(&[0b0000_0001, 0b0000_1000]); @@ -476,7 +395,7 @@ mod tests { }; let ts_vals = Values { - ts_millisecond_values: vec![100, 101], + timestamp_millisecond_values: vec![100, 101], ..Default::default() }; let ts_column = Column { diff --git a/src/common/grpc-expr/src/lib.rs b/src/common/grpc-expr/src/lib.rs index cd3d9540b3eb..7a2fea237b52 100644 --- a/src/common/grpc-expr/src/lib.rs +++ b/src/common/grpc-expr/src/lib.rs @@ -18,5 +18,5 @@ pub mod error; pub mod insert; pub mod util; -pub use alter::{alter_expr_to_request, create_expr_to_request, create_table_schema}; +pub use alter::{alter_expr_to_request, create_table_schema}; pub use insert::{build_create_expr_from_insertion, find_new_columns}; diff --git a/src/common/grpc-expr/src/util.rs b/src/common/grpc-expr/src/util.rs index d2a2b79e8ac3..641b4022472e 100644 --- a/src/common/grpc-expr/src/util.rs +++ b/src/common/grpc-expr/src/util.rs @@ -139,8 +139,6 @@ pub fn build_create_table_expr( create_if_not_exists: true, table_options: Default::default(), table_id: table_id.map(|id| api::v1::TableId { id }), - // TODO(hl): region number should be allocated by frontend - region_numbers: vec![0], engine: engine.to_string(), }; diff --git a/src/common/grpc/src/select.rs b/src/common/grpc/src/select.rs index c0caabdfa7e5..d16f2f37177c 100644 --- a/src/common/grpc/src/select.rs +++ b/src/common/grpc/src/select.rs @@ -150,25 +150,25 @@ pub fn values(arrays: &[VectorRef]) -> Result { ( ConcreteDataType::Timestamp(TimestampType::Second(_)), TimestampSecondVector, - ts_second_values, + timestamp_second_values, |x| { x.into_native() } ), ( ConcreteDataType::Timestamp(TimestampType::Millisecond(_)), TimestampMillisecondVector, - ts_millisecond_values, + timestamp_millisecond_values, |x| { x.into_native() } ), ( ConcreteDataType::Timestamp(TimestampType::Microsecond(_)), TimestampMicrosecondVector, - ts_microsecond_values, + timestamp_microsecond_values, |x| { x.into_native() } ), ( ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)), TimestampNanosecondVector, - ts_nanosecond_values, + timestamp_nanosecond_values, |x| { x.into_native() } ), ( diff --git a/src/common/grpc/src/writer.rs b/src/common/grpc/src/writer.rs index 0ea270d388bc..7efa3dfd5cad 100644 --- a/src/common/grpc/src/writer.rs +++ b/src/common/grpc/src/writer.rs @@ -62,7 +62,7 @@ impl LinesWriter { // It is safe to use unwrap here, because values has been initialized in mut_column() let values = column.values.as_mut().unwrap(); values - .ts_millisecond_values + .timestamp_millisecond_values .push(to_ms_ts(value.1, value.0)); self.null_masks[idx].push(false); Ok(()) @@ -360,7 +360,7 @@ mod tests { assert_eq!(SemanticType::Timestamp as i32, column.semantic_type); assert_eq!( vec![101011000, 102011001, 103011002], - column.values.as_ref().unwrap().ts_millisecond_values + column.values.as_ref().unwrap().timestamp_millisecond_values ); verify_null_mask(&column.null_mask, vec![false, false, false]); diff --git a/src/common/meta/src/cache_invalidator.rs b/src/common/meta/src/cache_invalidator.rs index f1f23bbc56a0..4326ec1da622 100644 --- a/src/common/meta/src/cache_invalidator.rs +++ b/src/common/meta/src/cache_invalidator.rs @@ -14,11 +14,13 @@ use std::sync::Arc; +use table::metadata::TableId; + use crate::error::Result; -use crate::ident::TableIdent; +use crate::table_name::TableName; /// Places context of invalidating cache. e.g., span id, trace id etc. -#[derive(Debug, Default)] +#[derive(Default)] pub struct Context { pub subject: Option, } @@ -26,7 +28,9 @@ pub struct Context { #[async_trait::async_trait] pub trait CacheInvalidator: Send + Sync { // Invalidates table cache - async fn invalidate_table(&self, ctx: &Context, table_ident: TableIdent) -> Result<()>; + async fn invalidate_table_id(&self, ctx: &Context, table_id: TableId) -> Result<()>; + + async fn invalidate_table_name(&self, ctx: &Context, table_name: TableName) -> Result<()>; } pub type CacheInvalidatorRef = Arc; @@ -35,7 +39,11 @@ pub struct DummyCacheInvalidator; #[async_trait::async_trait] impl CacheInvalidator for DummyCacheInvalidator { - async fn invalidate_table(&self, _ctx: &Context, _table_ident: TableIdent) -> Result<()> { + async fn invalidate_table_id(&self, _ctx: &Context, _table_id: TableId) -> Result<()> { + Ok(()) + } + + async fn invalidate_table_name(&self, _ctx: &Context, _table_name: TableName) -> Result<()> { Ok(()) } } diff --git a/src/common/meta/src/ddl/alter_table.rs b/src/common/meta/src/ddl/alter_table.rs index 95892e6967c1..7cefa0f5668a 100644 --- a/src/common/meta/src/ddl/alter_table.rs +++ b/src/common/meta/src/ddl/alter_table.rs @@ -42,7 +42,6 @@ use crate::ddl::DdlContext; use crate::error::{ self, ConvertAlterTableRequestSnafu, InvalidProtoMsgSnafu, Result, TableRouteNotFoundSnafu, }; -use crate::ident::TableIdent; use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteValue; @@ -308,34 +307,25 @@ impl AlterTableProcedure { /// Broadcasts the invalidating table cache instructions. async fn on_broadcast(&mut self) -> Result { - let table_ref = self.data.table_ref(); - - let table_ident = TableIdent { - catalog: table_ref.catalog.to_string(), - schema: table_ref.schema.to_string(), - table: table_ref.table.to_string(), - table_id: self.data.table_id(), - engine: self.data.table_info().meta.engine.to_string(), - }; + let alter_kind = self.alter_kind()?; + let cache_invalidator = &self.context.cache_invalidator; - self.context - .cache_invalidator - .invalidate_table( - &Context { - subject: Some("Invalidate table cache by alter table procedure".to_string()), - }, - table_ident, - ) - .await?; + let status = if matches!(alter_kind, Kind::RenameTable { .. }) { + cache_invalidator + .invalidate_table_name(&Context::default(), self.data.table_ref().into()) + .await?; - let alter_kind = self.alter_kind()?; - if matches!(alter_kind, Kind::RenameTable { .. }) { - Ok(Status::Done) + Status::Done } else { + cache_invalidator + .invalidate_table_id(&Context::default(), self.data.table_id()) + .await?; + self.data.state = AlterTableState::SubmitAlterRegionRequests; - Ok(Status::executing(true)) - } + Status::executing(true) + }; + Ok(status) } fn lock_key_inner(&self) -> Vec { diff --git a/src/common/meta/src/ddl/drop_table.rs b/src/common/meta/src/ddl/drop_table.rs index f6dfb722626f..1de0f7b30636 100644 --- a/src/common/meta/src/ddl/drop_table.rs +++ b/src/common/meta/src/ddl/drop_table.rs @@ -36,14 +36,12 @@ use crate::cache_invalidator::Context; use crate::ddl::utils::handle_operate_region_error; use crate::ddl::DdlContext; use crate::error::{self, Result}; -use crate::ident::TableIdent; use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteValue; use crate::metrics; use crate::rpc::ddl::DropTableTask; use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute}; -use crate::table_name::TableName; pub struct DropTableProcedure { pub context: DdlContext, @@ -118,25 +116,17 @@ impl DropTableProcedure { /// Broadcasts invalidate table cache instruction. async fn on_broadcast(&mut self) -> Result { - let table_name = self.data.table_name(); - let engine = &self.data.table_info().meta.engine; - - let table_ident = TableIdent { - catalog: table_name.catalog_name, - schema: table_name.schema_name, - table: table_name.table_name, - table_id: self.data.task.table_id, - engine: engine.to_string(), + let ctx = Context { + subject: Some("Invalidate table cache by dropping table".to_string()), }; - self.context - .cache_invalidator - .invalidate_table( - &Context { - subject: Some("Invalidate Table Cache by dropping table procedure".to_string()), - }, - table_ident, - ) + let cache_invalidator = &self.context.cache_invalidator; + cache_invalidator + .invalidate_table_id(&ctx, self.data.table_id()) + .await?; + + cache_invalidator + .invalidate_table_name(&ctx, self.data.table_ref().into()) .await?; self.data.state = DropTableState::DatanodeDropRegions; @@ -261,10 +251,6 @@ impl DropTableData { self.task.table_ref() } - fn table_name(&self) -> TableName { - self.task.table_name() - } - fn region_routes(&self) -> &Vec { &self.table_route_value.region_routes } diff --git a/src/common/meta/src/ident.rs b/src/common/meta/src/ident.rs deleted file mode 100644 index f5015d12ab70..000000000000 --- a/src/common/meta/src/ident.rs +++ /dev/null @@ -1,78 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::fmt::{Display, Formatter}; - -use api::v1::meta::{TableIdent as RawTableIdent, TableName}; -use serde::{Deserialize, Serialize}; -use snafu::OptionExt; -use table::engine::TableReference; - -use crate::error::{Error, InvalidProtoMsgSnafu}; - -#[derive(Eq, Hash, PartialEq, Clone, Debug, Default, Serialize, Deserialize)] -pub struct TableIdent { - pub catalog: String, - pub schema: String, - pub table: String, - pub table_id: u32, - pub engine: String, -} - -impl TableIdent { - pub fn table_ref(&self) -> TableReference { - TableReference::full(&self.catalog, &self.schema, &self.table) - } -} - -impl Display for TableIdent { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!( - f, - "Table(id={}, name='{}.{}.{}', engine='{}')", - self.table_id, self.catalog, self.schema, self.table, self.engine, - ) - } -} - -impl TryFrom for TableIdent { - type Error = Error; - - fn try_from(value: RawTableIdent) -> Result { - let table_name = value.table_name.context(InvalidProtoMsgSnafu { - err_msg: "'table_name' is missing in TableIdent", - })?; - Ok(Self { - catalog: table_name.catalog_name, - schema: table_name.schema_name, - table: table_name.table_name, - table_id: value.table_id, - engine: value.engine, - }) - } -} - -impl From for RawTableIdent { - fn from(table_ident: TableIdent) -> Self { - Self { - table_id: table_ident.table_id, - engine: table_ident.engine, - table_name: Some(TableName { - catalog_name: table_ident.catalog, - schema_name: table_ident.schema, - table_name: table_ident.table, - }), - } - } -} diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index 659114e517f7..d5783400f61d 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -15,22 +15,24 @@ use std::fmt::{Display, Formatter}; use serde::{Deserialize, Serialize}; -use store_api::storage::RegionId; +use store_api::storage::{RegionId, RegionNumber}; +use strum::Display; +use table::metadata::TableId; -use crate::ident::TableIdent; +use crate::table_name::TableName; use crate::{ClusterId, DatanodeId}; #[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct RegionIdent { pub cluster_id: ClusterId, pub datanode_id: DatanodeId, - pub table_ident: TableIdent, - pub region_number: u32, + pub table_id: TableId, + pub region_number: RegionNumber, } impl RegionIdent { pub fn get_region_id(&self) -> RegionId { - RegionId::new(self.table_ident.table_id, self.region_number) + RegionId::new(self.table_id, self.region_number) } } @@ -38,25 +40,12 @@ impl Display for RegionIdent { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "RegionIdent(datanode_id='{}.{}', table_id='{}', table_name='{}.{}.{}', table_engine='{}', region_no='{}')", - self.cluster_id, - self.datanode_id, - self.table_ident.table_id, - self.table_ident.catalog, - self.table_ident.schema, - self.table_ident.table, - self.table_ident.engine, - self.region_number + "RegionIdent(datanode_id='{}.{}', table_id={}, region_number={})", + self.cluster_id, self.datanode_id, self.table_id, self.region_number ) } } -impl From for TableIdent { - fn from(region_ident: RegionIdent) -> Self { - region_ident.table_ident - } -} - #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] pub struct SimpleReply { pub result: bool, @@ -69,22 +58,12 @@ impl Display for SimpleReply { } } -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(tag = "type", rename_all = "snake_case")] +#[derive(Debug, Clone, Serialize, Deserialize, Display)] pub enum Instruction { OpenRegion(RegionIdent), CloseRegion(RegionIdent), - InvalidateTableCache(TableIdent), -} - -impl Display for Instruction { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - Self::OpenRegion(region) => write!(f, "Instruction::OpenRegion({})", region), - Self::CloseRegion(region) => write!(f, "Instruction::CloseRegion({})", region), - Self::InvalidateTableCache(table) => write!(f, "Instruction::Invalidate({})", table), - } - } + InvalidateTableIdCache(TableId), + InvalidateTableNameCache(TableName), } #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] @@ -116,40 +95,28 @@ mod tests { let open_region = Instruction::OpenRegion(RegionIdent { cluster_id: 1, datanode_id: 2, - table_ident: TableIdent { - catalog: "foo".to_string(), - schema: "bar".to_string(), - table: "hi".to_string(), - table_id: 1024, - engine: "mito".to_string(), - }, + table_id: 1024, region_number: 1, }); let serialized = serde_json::to_string(&open_region).unwrap(); assert_eq!( - r#"{"type":"open_region","cluster_id":1,"datanode_id":2,"table_ident":{"catalog":"foo","schema":"bar","table":"hi","table_id":1024,"engine":"mito"},"region_number":1}"#, + r#"{"OpenRegion":{"cluster_id":1,"datanode_id":2,"table_id":1024,"region_number":1}}"#, serialized ); let close_region = Instruction::CloseRegion(RegionIdent { cluster_id: 1, datanode_id: 2, - table_ident: TableIdent { - catalog: "foo".to_string(), - schema: "bar".to_string(), - table: "hi".to_string(), - table_id: 1024, - engine: "mito".to_string(), - }, + table_id: 1024, region_number: 1, }); let serialized = serde_json::to_string(&close_region).unwrap(); assert_eq!( - r#"{"type":"close_region","cluster_id":1,"datanode_id":2,"table_ident":{"catalog":"foo","schema":"bar","table":"hi","table_id":1024,"engine":"mito"},"region_number":1}"#, + r#"{"CloseRegion":{"cluster_id":1,"datanode_id":2,"table_id":1024,"region_number":1}}"#, serialized ); } diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index 8ed426adeef5..bfd134133c7b 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -21,7 +21,6 @@ pub mod ddl; pub mod ddl_manager; pub mod error; pub mod heartbeat; -pub mod ident; pub mod instruction; pub mod key; pub mod kv_backend; diff --git a/src/common/meta/src/table_name.rs b/src/common/meta/src/table_name.rs index fde0efd5ab60..77b299d76554 100644 --- a/src/common/meta/src/table_name.rs +++ b/src/common/meta/src/table_name.rs @@ -76,3 +76,9 @@ impl From for TableName { } } } + +impl From> for TableName { + fn from(table_ref: TableReference) -> Self { + Self::new(table_ref.catalog, table_ref.schema, table_ref.table) + } +} diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index d4b6865d923c..bd0e6555201b 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -22,6 +22,7 @@ use servers::define_into_tonic_status; use snafu::{Location, Snafu}; use store_api::storage::{RegionId, RegionNumber}; use table::error::Error as TableError; +use table::metadata::TableId; /// Business error of datanode. #[derive(Debug, Snafu)] @@ -54,9 +55,9 @@ pub enum Error { source: TableError, }, - #[snafu(display("Failed to get table: {}, source: {}", table_name, source))] + #[snafu(display("Failed to get table {table_id}, source: {source}, at {location}"))] GetTable { - table_name: String, + table_id: TableId, location: Location, source: TableError, }, diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index 320b0c0ccc65..f86a665d766f 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; use async_trait::async_trait; +use common_catalog::consts::default_engine; use common_meta::error::{InvalidHeartbeatResponseSnafu, Result as MetaResult}; use common_meta::heartbeat::handler::{ HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext, @@ -46,7 +47,7 @@ impl RegionHeartbeatResponseHandler { Instruction::OpenRegion(region_ident) => { let region_id = Self::region_ident_to_region_id(®ion_ident); let open_region_req = RegionRequest::Open(RegionOpenRequest { - engine: region_ident.table_ident.engine, + engine: default_engine().to_string(), region_dir: "".to_string(), options: HashMap::new(), }); @@ -57,15 +58,14 @@ impl RegionHeartbeatResponseHandler { let close_region_req = RegionRequest::Close(RegionCloseRequest {}); Ok((region_id, close_region_req)) } - Instruction::InvalidateTableCache(_) => InvalidHeartbeatResponseSnafu.fail(), + Instruction::InvalidateTableIdCache(_) | Instruction::InvalidateTableNameCache(_) => { + InvalidHeartbeatResponseSnafu.fail() + } } } fn region_ident_to_region_id(region_ident: &RegionIdent) -> RegionId { - RegionId::new( - region_ident.table_ident.table_id, - region_ident.region_number, - ) + RegionId::new(region_ident.table_id, region_ident.region_number) } fn reply_template_from_instruction(instruction: &Instruction) -> InstructionReply { @@ -78,7 +78,7 @@ impl RegionHeartbeatResponseHandler { result: false, error: None, }), - Instruction::InvalidateTableCache(_) => { + Instruction::InvalidateTableIdCache(_) | Instruction::InvalidateTableNameCache(_) => { InstructionReply::InvalidateTableCache(SimpleReply { result: false, error: None, diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 8a68d81bd6f5..b073a3aecdd2 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -254,7 +254,11 @@ impl RegionServerInner { pub async fn handle_read(&self, request: QueryRequest) -> Result { // TODO(ruihang): add metrics and set trace id - let QueryRequest { region_id, plan } = request; + let QueryRequest { + header: _, + region_id, + plan, + } = request; let region_id = RegionId::from_u64(region_id); // build dummy catalog list diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index 3f22900842ed..36c4091134e6 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -23,7 +23,6 @@ use common_meta::heartbeat::handler::{ HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor, }; use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta}; -use common_meta::ident::TableIdent; use common_meta::instruction::{Instruction, InstructionReply, RegionIdent}; use common_query::prelude::ScalarUdf; use common_query::Output; @@ -81,13 +80,7 @@ async fn handle_instruction( fn close_region_instruction() -> Instruction { Instruction::CloseRegion(RegionIdent { - table_ident: TableIdent { - catalog: "greptime".to_string(), - schema: "public".to_string(), - table: "demo".to_string(), - table_id: 1024, - engine: "mito".to_string(), - }, + table_id: 1024, region_number: 0, cluster_id: 1, datanode_id: 2, @@ -96,13 +89,7 @@ fn close_region_instruction() -> Instruction { fn open_region_instruction() -> Instruction { Instruction::OpenRegion(RegionIdent { - table_ident: TableIdent { - catalog: "greptime".to_string(), - schema: "public".to_string(), - table: "demo".to_string(), - table_id: 1024, - engine: "mito".to_string(), - }, + table_id: 1024, region_number: 0, cluster_id: 1, datanode_id: 2, diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index d72b137a183c..4bdf6f8dabc8 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -28,7 +28,6 @@ use common_error::ext::BoxedError; use common_meta::cache_invalidator::{CacheInvalidator, Context}; use common_meta::datanode_manager::DatanodeManagerRef; use common_meta::error::Result as MetaResult; -use common_meta::ident::TableIdent; use common_meta::key::catalog_name::CatalogNameKey; use common_meta::key::schema_name::SchemaNameKey; use common_meta::key::table_info::TableInfoKey; @@ -36,10 +35,12 @@ use common_meta::key::table_name::TableNameKey; use common_meta::key::table_route::TableRouteKey; use common_meta::key::{TableMetaKey, TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; +use common_meta::table_name::TableName; use common_telemetry::debug; use futures_util::TryStreamExt; use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef}; use snafu::prelude::*; +use table::metadata::TableId; use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME}; use table::TableRef; @@ -65,13 +66,8 @@ pub struct FrontendCatalogManager { #[async_trait::async_trait] impl CacheInvalidator for FrontendCatalogManager { - async fn invalidate_table(&self, _ctx: &Context, table_ident: TableIdent) -> MetaResult<()> { - let table_id = table_ident.table_id; - let key = TableNameKey::new( - &table_ident.catalog, - &table_ident.schema, - &table_ident.table, - ); + async fn invalidate_table_name(&self, _ctx: &Context, table_name: TableName) -> MetaResult<()> { + let key: TableNameKey = (&table_name).into(); self.backend_cache_invalidator .invalidate_key(&key.as_raw_key()) .await; @@ -80,6 +76,10 @@ impl CacheInvalidator for FrontendCatalogManager { String::from_utf8_lossy(&key.as_raw_key()) ); + Ok(()) + } + + async fn invalidate_table_id(&self, _ctx: &Context, table_id: TableId) -> MetaResult<()> { let key = TableInfoKey::new(table_id); self.backend_cache_invalidator .invalidate_key(&key.as_raw_key()) diff --git a/src/frontend/src/expr_factory.rs b/src/frontend/src/expr_factory.rs index e74d91ff9fa8..5f52fc2cead0 100644 --- a/src/frontend/src/expr_factory.rs +++ b/src/frontend/src/expr_factory.rs @@ -119,7 +119,6 @@ pub(crate) async fn create_external_expr( create_if_not_exists: create.if_not_exists, table_options: options, table_id: None, - region_numbers: vec![], engine: create.engine.to_string(), }; Ok(expr) @@ -151,7 +150,6 @@ pub fn create_to_expr(create: &CreateTable, query_ctx: QueryContextRef) -> Resul create_if_not_exists: create.if_not_exists, table_options, table_id: None, - region_numbers: vec![], engine: create.engine.to_string(), }; Ok(expr) diff --git a/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs b/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs index 8974a16afbef..81feab068d29 100644 --- a/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs +++ b/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs @@ -18,13 +18,15 @@ use common_meta::error::Result as MetaResult; use common_meta::heartbeat::handler::{ HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext, }; -use common_meta::ident::TableIdent; use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; use common_meta::key::table_info::TableInfoKey; use common_meta::key::table_name::TableNameKey; use common_meta::key::table_route::TableRouteKey; use common_meta::key::TableMetaKey; +use common_meta::table_name::TableName; use common_telemetry::error; +use futures::future::Either; +use table::metadata::TableId; #[derive(Clone)] pub struct InvalidateTableCacheHandler { @@ -36,22 +38,31 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler { fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool { matches!( ctx.incoming_message.as_ref(), - Some((_, Instruction::InvalidateTableCache { .. })) + Some((_, Instruction::InvalidateTableIdCache { .. })) + | Some((_, Instruction::InvalidateTableNameCache { .. })) ) } async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult { - // TODO(weny): considers introducing a macro - let Some((meta, Instruction::InvalidateTableCache(table_ident))) = - ctx.incoming_message.take() - else { - unreachable!("InvalidateTableCacheHandler: should be guarded by 'is_acceptable'"); - }; - let mailbox = ctx.mailbox.clone(); let self_ref = self.clone(); + + let (meta, invalidator) = match ctx.incoming_message.take() { + Some((meta, Instruction::InvalidateTableIdCache(table_id))) => ( + meta, + Either::Left(async move { self_ref.invalidate_table_id_cache(table_id).await }), + ), + Some((meta, Instruction::InvalidateTableNameCache(table_name))) => ( + meta, + Either::Right( + async move { self_ref.invalidate_table_name_cache(table_name).await }, + ), + ), + _ => unreachable!("InvalidateTableCacheHandler: should be guarded by 'is_acceptable'"), + }; + let _handle = common_runtime::spawn_bg(async move { - self_ref.invalidate_table_cache(table_ident).await; + invalidator.await; if let Err(e) = mailbox .send(( @@ -78,25 +89,19 @@ impl InvalidateTableCacheHandler { } } - async fn invalidate_table_cache(&self, table_ident: TableIdent) { - let table_id = table_ident.table_id; + async fn invalidate_table_id_cache(&self, table_id: TableId) { self.backend_cache_invalidator .invalidate_key(&TableInfoKey::new(table_id).as_raw_key()) .await; self.backend_cache_invalidator - .invalidate_key( - &TableNameKey::new( - &table_ident.catalog, - &table_ident.schema, - &table_ident.table, - ) - .as_raw_key(), - ) + .invalidate_key(&TableRouteKey { table_id }.as_raw_key()) .await; + } + async fn invalidate_table_name_cache(&self, table_name: TableName) { self.backend_cache_invalidator - .invalidate_key(&TableRouteKey { table_id }.as_raw_key()) + .invalidate_key(&TableNameKey::from(&table_name).as_raw_key()) .await; } } diff --git a/src/frontend/src/heartbeat/handler/tests.rs b/src/frontend/src/heartbeat/handler/tests.rs index 02fcd6d39d81..9c7ed815ca2c 100644 --- a/src/frontend/src/heartbeat/handler/tests.rs +++ b/src/frontend/src/heartbeat/handler/tests.rs @@ -22,7 +22,6 @@ use common_meta::heartbeat::handler::{ HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor, }; use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta}; -use common_meta::ident::TableIdent; use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; use common_meta::key::table_info::TableInfoKey; use common_meta::key::TableMetaKey; @@ -75,13 +74,7 @@ async fn test_invalidate_table_cache_handler() { handle_instruction( executor.clone(), mailbox.clone(), - Instruction::InvalidateTableCache(TableIdent { - catalog: "test".to_string(), - schema: "greptime".to_string(), - table: "foo_table".to_string(), - table_id, - engine: "mito".to_string(), - }), + Instruction::InvalidateTableIdCache(table_id), ) .await; @@ -97,18 +90,7 @@ async fn test_invalidate_table_cache_handler() { .contains_key(&table_info_key.as_raw_key())); // removes a invalid key - handle_instruction( - executor, - mailbox, - Instruction::InvalidateTableCache(TableIdent { - catalog: "test".to_string(), - schema: "greptime".to_string(), - table: "not_found".to_string(), - table_id: 0, - engine: "mito".to_string(), - }), - ) - .await; + handle_instruction(executor, mailbox, Instruction::InvalidateTableIdCache(0)).await; let (_, reply) = rx.recv().await.unwrap(); assert_matches!( diff --git a/src/frontend/src/req_convert/common.rs b/src/frontend/src/req_convert/common.rs index 00a07fdae473..698bf342103e 100644 --- a/src/frontend/src/req_convert/common.rs +++ b/src/frontend/src/req_convert/common.rs @@ -114,18 +114,26 @@ fn push_column_to_rows(column: Column, rows: &mut [Row]) -> Result<()> { (String, StringValue, string_values), (Date, DateValue, date_values), (Datetime, DatetimeValue, datetime_values), - (TimestampSecond, TsSecondValue, ts_second_values), + ( + TimestampSecond, + TimestampSecondValue, + timestamp_second_values + ), ( TimestampMillisecond, - TsMillisecondValue, - ts_millisecond_values + TimestampMillisecondValue, + timestamp_millisecond_values ), ( TimestampMicrosecond, - TsMicrosecondValue, - ts_microsecond_values + TimestampMicrosecondValue, + timestamp_microsecond_values + ), + ( + TimestampNanosecond, + TimestampNanosecondValue, + timestamp_nanosecond_values ), - (TimestampNanosecond, TsNanosecondValue, ts_nanosecond_values), (TimeSecond, TimeSecondValue, time_second_values), ( TimeMillisecond, diff --git a/src/frontend/src/req_convert/delete/column_to_row.rs b/src/frontend/src/req_convert/delete/column_to_row.rs index 610d9be48e2f..59564d78a317 100644 --- a/src/frontend/src/req_convert/delete/column_to_row.rs +++ b/src/frontend/src/req_convert/delete/column_to_row.rs @@ -35,6 +35,5 @@ fn request_column_to_row(request: DeleteRequest) -> Result { Ok(RowDeleteRequest { table_name: request.table_name, rows: Some(rows), - region_number: 0, // FIXME(zhongzc): deprecated field }) } diff --git a/src/frontend/src/req_convert/insert/column_to_row.rs b/src/frontend/src/req_convert/insert/column_to_row.rs index adc129219666..285dafd223e1 100644 --- a/src/frontend/src/req_convert/insert/column_to_row.rs +++ b/src/frontend/src/req_convert/insert/column_to_row.rs @@ -35,6 +35,5 @@ fn request_column_to_row(request: InsertRequest) -> Result { Ok(RowInsertRequest { table_name: request.table_name, rows: Some(rows), - region_number: 0, // FIXME(zhongzc): deprecated field }) } diff --git a/src/frontend/src/script.rs b/src/frontend/src/script.rs index 103b64dbd1c3..146f178fb0c2 100644 --- a/src/frontend/src/script.rs +++ b/src/frontend/src/script.rs @@ -190,7 +190,6 @@ mod python { create_if_not_exists: request.create_if_not_exists, table_options: (&request.table_options).into(), table_id: None, // Should and will be assigned by Meta. - region_numbers: vec![0], engine: request.engine, }) } diff --git a/src/frontend/src/statement/ddl.rs b/src/frontend/src/statement/ddl.rs index dea77dd0bfa1..08bc31073b24 100644 --- a/src/frontend/src/statement/ddl.rs +++ b/src/frontend/src/statement/ddl.rs @@ -23,7 +23,6 @@ use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_catalog::format_full_table_name; use common_meta::cache_invalidator::Context; use common_meta::ddl::ExecutorContext; -use common_meta::ident::TableIdent; use common_meta::key::schema_name::{SchemaNameKey, SchemaNameValue}; use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use common_meta::rpc::router::{Partition, Partition as MetaPartition}; @@ -114,7 +113,6 @@ impl StatementExecutor { info!("Successfully created table '{table_name}' with table id {table_id}"); table_info.ident.table_id = table_id; - let engine = table_info.meta.engine.to_string(); let table_info = Arc::new(table_info.try_into().context(error::CreateTableInfoSnafu)?); create_table.table_id = Some(api::v1::TableId { id: table_id }); @@ -123,16 +121,7 @@ impl StatementExecutor { // Invalidates local cache ASAP. self.cache_invalidator - .invalidate_table( - &Context::default(), - TableIdent { - catalog: table_name.catalog_name.to_string(), - schema: table_name.schema_name.to_string(), - table: table_name.table_name.to_string(), - table_id, - engine, - }, - ) + .invalidate_table_id(&Context::default(), table_id) .await .context(error::InvalidateTableCacheSnafu)?; @@ -153,21 +142,11 @@ impl StatementExecutor { table_name: table_name.to_string(), })?; let table_id = table.table_info().table_id(); - let engine = table.table_info().meta.engine.to_string(); self.drop_table_procedure(&table_name, table_id).await?; // Invalidates local cache ASAP. self.cache_invalidator - .invalidate_table( - &Context::default(), - TableIdent { - catalog: table_name.catalog_name.to_string(), - schema: table_name.schema_name.to_string(), - table: table_name.table_name.to_string(), - table_id, - engine, - }, - ) + .invalidate_table_id(&Context::default(), table_id) .await .context(error::InvalidateTableCacheSnafu)?; @@ -256,7 +235,6 @@ impl StatementExecutor { })?; let table_id = table.table_info().ident.table_id; - let engine = table.table_info().meta.engine.to_string(); self.verify_alter(table_id, table.table_info(), expr.clone())?; info!( @@ -276,16 +254,7 @@ impl StatementExecutor { // Invalidates local cache ASAP. self.cache_invalidator - .invalidate_table( - &Context::default(), - TableIdent { - catalog: catalog_name.to_string(), - schema: schema_name.to_string(), - table: table_name.to_string(), - table_id, - engine, - }, - ) + .invalidate_table_id(&Context::default(), table_id) .await .context(error::InvalidateTableCacheSnafu)?; diff --git a/src/meta-srv/src/cache_invalidator.rs b/src/meta-srv/src/cache_invalidator.rs index fa130ef2d1c7..8f98bc1744f1 100644 --- a/src/meta-srv/src/cache_invalidator.rs +++ b/src/meta-srv/src/cache_invalidator.rs @@ -13,12 +13,14 @@ // limitations under the License. use api::v1::meta::MailboxMessage; +use async_trait::async_trait; use common_error::ext::BoxedError; use common_meta::cache_invalidator::{CacheInvalidator, Context}; use common_meta::error::{self as meta_error, Result as MetaResult}; -use common_meta::ident::TableIdent; use common_meta::instruction::Instruction; +use common_meta::table_name::TableName; use snafu::ResultExt; +use table::metadata::TableId; use crate::metasrv::MetasrvInfo; use crate::service::mailbox::{BroadcastChannel, MailboxRef}; @@ -37,10 +39,8 @@ impl MetasrvCacheInvalidator { } } -#[async_trait::async_trait] -impl CacheInvalidator for MetasrvCacheInvalidator { - async fn invalidate_table(&self, ctx: &Context, table_ident: TableIdent) -> MetaResult<()> { - let instruction = Instruction::InvalidateTableCache(table_ident); +impl MetasrvCacheInvalidator { + async fn broadcast(&self, ctx: &Context, instruction: Instruction) -> MetaResult<()> { let subject = &ctx .subject .clone() @@ -62,3 +62,16 @@ impl CacheInvalidator for MetasrvCacheInvalidator { .context(meta_error::ExternalSnafu) } } + +#[async_trait] +impl CacheInvalidator for MetasrvCacheInvalidator { + async fn invalidate_table_id(&self, ctx: &Context, table_id: TableId) -> MetaResult<()> { + let instruction = Instruction::InvalidateTableIdCache(table_id); + self.broadcast(ctx, instruction).await + } + + async fn invalidate_table_name(&self, ctx: &Context, table_name: TableName) -> MetaResult<()> { + let instruction = Instruction::InvalidateTableNameCache(table_name); + self.broadcast(ctx, instruction).await + } +} diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 9d96b333ad47..b48bc5f95244 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -18,6 +18,7 @@ use common_meta::peer::Peer; use common_runtime::JoinError; use servers::define_into_tonic_status; use snafu::{Location, Snafu}; +use table::metadata::TableId; use tokio::sync::mpsc::error::SendError; use tonic::codegen::http; @@ -246,9 +247,9 @@ pub enum Error { location: Location, }, - #[snafu(display("Table route not found: {}", table_name))] + #[snafu(display("Failed to find table route for {table_id}, at {location}"))] TableRouteNotFound { - table_name: String, + table_id: TableId, location: Location, }, diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs index 8676113fbcf1..dc1c7ca46a18 100644 --- a/src/meta-srv/src/handler/failure_handler.rs +++ b/src/meta-srv/src/handler/failure_handler.rs @@ -18,8 +18,6 @@ use std::sync::Arc; use api::v1::meta::{HeartbeatRequest, Role}; use async_trait::async_trait; -use common_catalog::consts::MITO_ENGINE; -use common_meta::ident::TableIdent; use common_meta::RegionIdent; use store_api::storage::RegionId; @@ -86,12 +84,7 @@ impl HeartbeatHandler for RegionFailureHandler { RegionIdent { cluster_id: stat.cluster_id, datanode_id: stat.id, - table_ident: TableIdent { - table_id: region_id.table_id(), - // TODO(#1583): Use the actual table engine. - engine: MITO_ENGINE.to_string(), - ..Default::default() - }, + table_id: region_id.table_id(), region_number: region_id.region_number(), } }) diff --git a/src/meta-srv/src/handler/failure_handler/runner.rs b/src/meta-srv/src/handler/failure_handler/runner.rs index 1f3fb632572e..ed9ef603126a 100644 --- a/src/meta-srv/src/handler/failure_handler/runner.rs +++ b/src/meta-srv/src/handler/failure_handler/runner.rs @@ -247,8 +247,6 @@ impl FailureDetectorContainer { #[cfg(test)] mod tests { - use common_catalog::consts::MITO_ENGINE; - use common_meta::ident::TableIdent; use rand::Rng; use super::*; @@ -258,13 +256,7 @@ mod tests { fn test_default_failure_detector_container() { let container = FailureDetectorContainer(DashMap::new()); let ident = RegionIdent { - table_ident: TableIdent { - catalog: "a".to_string(), - schema: "b".to_string(), - table: "c".to_string(), - table_id: 1, - engine: MITO_ENGINE.to_string(), - }, + table_id: 1, cluster_id: 3, datanode_id: 2, region_number: 1, @@ -287,13 +279,7 @@ mod tests { let container = FailureDetectorContainer(DashMap::new()); let ident = RegionIdent { - table_ident: TableIdent { - catalog: "a".to_string(), - schema: "b".to_string(), - table: "c".to_string(), - table_id: 1, - engine: MITO_ENGINE.to_string(), - }, + table_id: 1, cluster_id: 3, datanode_id: 2, region_number: 1, @@ -328,13 +314,7 @@ mod tests { region_idents: region_ids .iter() .map(|®ion_number| RegionIdent { - table_ident: TableIdent { - catalog: "a".to_string(), - schema: "b".to_string(), - table: "c".to_string(), - table_id: 0, - engine: MITO_ENGINE.to_string(), - }, + table_id: 0, cluster_id: 1, datanode_id, region_number, diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index 62d88fd2d35a..40f85b902f53 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -70,7 +70,6 @@ impl HeartbeatHandler for RegionLeaseHandler { mod test { use std::sync::Arc; - use common_meta::ident::TableIdent; use common_meta::key::TableMetadataManager; use common_meta::RegionIdent; use store_api::storage::{RegionId, RegionNumber}; @@ -131,10 +130,7 @@ mod test { .register_inactive_region(&RegionIdent { cluster_id: 1, datanode_id: 1, - table_ident: TableIdent { - table_id: 1, - ..Default::default() - }, + table_id: 1, region_number: 1, }) .await @@ -143,10 +139,7 @@ mod test { .register_inactive_region(&RegionIdent { cluster_id: 1, datanode_id: 1, - table_ident: TableIdent { - table_id: 1, - ..Default::default() - }, + table_id: 1, region_number: 3, }) .await diff --git a/src/meta-srv/src/lock/keys.rs b/src/meta-srv/src/lock/keys.rs index 513b308ac081..db3f5d81282a 100644 --- a/src/meta-srv/src/lock/keys.rs +++ b/src/meta-srv/src/lock/keys.rs @@ -21,12 +21,8 @@ use crate::lock::Key; pub(crate) fn table_metadata_lock_key(region: &RegionIdent) -> Key { format!( - "table_metadata_lock_({}-{}.{}.{}-{})", - region.cluster_id, - region.table_ident.catalog, - region.table_ident.schema, - region.table_ident.table, - region.table_ident.table_id, + "table_metadata_lock_({}-{})", + region.cluster_id, region.table_id, ) .into_bytes() } diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 727f6a0c4187..9534b8d912f0 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -31,6 +31,7 @@ use common_telemetry::{debug, error, info, warn}; use serde::{Deserialize, Serialize}; use servers::http::HttpOptions; use snafu::ResultExt; +use table::metadata::TableId; use tokio::sync::broadcast::error::RecvError; use crate::cluster::MetaPeerClientRef; @@ -170,9 +171,7 @@ pub struct SelectorContext { pub datanode_lease_secs: u64, pub kv_store: KvStoreRef, pub meta_peer_client: MetaPeerClientRef, - pub catalog: Option, - pub schema: Option, - pub table: Option, + pub table_id: Option, } pub type SelectorRef = Arc>>; diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 39768817c5bd..e6325604299d 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -171,9 +171,7 @@ impl MetaSrvBuilder { server_addr: options.server_addr.clone(), kv_store: kv_store.clone(), meta_peer_client: meta_peer_client.clone(), - catalog: None, - schema: None, - table: None, + table_id: None, }; let ddl_manager = build_ddl_manager( &options, @@ -195,9 +193,7 @@ impl MetaSrvBuilder { datanode_lease_secs: options.datanode_lease_secs, kv_store: kv_store.clone(), meta_peer_client: meta_peer_client.clone(), - catalog: None, - schema: None, - table: None, + table_id: None, }; let region_failover_manager = Arc::new(RegionFailoverManager::new( options.region_lease_secs, diff --git a/src/meta-srv/src/procedure/region_failover.rs b/src/meta-srv/src/procedure/region_failover.rs index d328c9f112d0..1b009bb44296 100644 --- a/src/meta-srv/src/procedure/region_failover.rs +++ b/src/meta-srv/src/procedure/region_failover.rs @@ -25,7 +25,6 @@ use std::sync::{Arc, RwLock}; use std::time::Duration; use async_trait::async_trait; -use common_meta::ident::TableIdent; use common_meta::key::datanode_table::DatanodeTableKey; use common_meta::key::TableMetadataManagerRef; use common_meta::{ClusterId, RegionIdent}; @@ -41,6 +40,7 @@ use failover_start::RegionFailoverStart; use serde::{Deserialize, Serialize}; use snafu::ResultExt; use store_api::storage::RegionNumber; +use table::metadata::TableId; use crate::error::{Error, RegisterProcedureLoaderSnafu, Result, TableMetadataManagerSnafu}; use crate::lock::DistLockRef; @@ -54,7 +54,7 @@ const OPEN_REGION_MESSAGE_TIMEOUT: Duration = Duration::from_secs(30); #[derive(PartialEq, Eq, Hash, Clone)] pub(crate) struct RegionFailoverKey { pub(crate) cluster_id: ClusterId, - pub(crate) table_ident: TableIdent, + pub(crate) table_id: TableId, pub(crate) region_number: RegionNumber, } @@ -62,7 +62,7 @@ impl From for RegionFailoverKey { fn from(region_ident: RegionIdent) -> Self { Self { cluster_id: region_ident.cluster_id, - table_ident: region_ident.table_ident, + table_id: region_ident.table_id, region_number: region_ident.region_number, } } @@ -206,18 +206,17 @@ impl RegionFailoverManager { } async fn table_exists(&self, failed_region: &RegionIdent) -> Result { - let table_ident = &failed_region.table_ident; Ok(self .table_metadata_manager .table_route_manager() - .get_region_distribution(table_ident.table_id) + .get_region_distribution(failed_region.table_id) .await .context(TableMetadataManagerSnafu)? .is_some()) } async fn failed_region_exists(&self, failed_region: &RegionIdent) -> Result { - let table_id = failed_region.table_ident.table_id; + let table_id = failed_region.table_id; let datanode_id = failed_region.datanode_id; let value = self @@ -372,13 +371,11 @@ impl Procedure for RegionFailoverProcedure { fn lock_key(&self) -> LockKey { let region_ident = &self.node.failed_region; - let table_key = common_catalog::format_full_table_name( - ®ion_ident.table_ident.catalog, - ®ion_ident.table_ident.schema, - ®ion_ident.table_ident.table, + let region_key = format!( + "{}/region-{}", + region_ident.table_id, region_ident.region_number ); - let region_key = format!("{}/region-{}", table_key, region_ident.region_number); - LockKey::new(vec![table_key, region_key]) + LockKey::single(region_key) } } @@ -389,8 +386,6 @@ mod tests { use api::v1::meta::mailbox_message::Payload; use api::v1::meta::{HeartbeatResponse, MailboxMessage, Peer, RequestHeader}; - use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE}; - use common_meta::ident::TableIdent; use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; use common_meta::key::TableMetadataManager; use common_meta::sequence::Sequence; @@ -457,13 +452,7 @@ mod tests { cluster_id: 0, region_number, datanode_id: failed_datanode, - table_ident: TableIdent { - table_id: 1, - engine: MITO_ENGINE.to_string(), - catalog: DEFAULT_CATALOG_NAME.to_string(), - schema: DEFAULT_SCHEMA_NAME.to_string(), - table: "my_table".to_string(), - }, + table_id: 1, } } } @@ -493,6 +482,7 @@ mod tests { // Safety: all required fields set at initialization .unwrap(); + let table_id = 1; let table = "my_table"; let table_metadata_manager = Arc::new(TableMetadataManager::new( KvBackendAdapter::wrap(kv_store.clone()), @@ -543,9 +533,7 @@ mod tests { server_addr: "127.0.0.1:3002".to_string(), kv_store: kv_store.clone(), meta_peer_client, - catalog: Some(DEFAULT_CATALOG_NAME.to_string()), - schema: Some(DEFAULT_SCHEMA_NAME.to_string()), - table: Some(table.to_string()), + table_id: Some(table_id), }; TestingEnv { @@ -665,7 +653,7 @@ mod tests { assert_eq!( procedure.dump().unwrap(), - r#"{"failed_region":{"cluster_id":0,"datanode_id":1,"table_ident":{"catalog":"greptime","schema":"public","table":"my_table","table_id":1,"engine":"mito"},"region_number":1},"state":{"region_failover_state":"RegionFailoverEnd"}}"# + r#"{"failed_region":{"cluster_id":0,"datanode_id":1,"table_id":1,"region_number":1},"state":{"region_failover_state":"RegionFailoverEnd"}}"# ); // Verifies that the failed region (region 1) is moved from failed datanode (datanode 1) to the candidate datanode. @@ -705,12 +693,12 @@ mod tests { let s = procedure.dump().unwrap(); assert_eq!( s, - r#"{"failed_region":{"cluster_id":0,"datanode_id":1,"table_ident":{"catalog":"greptime","schema":"public","table":"my_table","table_id":1,"engine":"mito"},"region_number":1},"state":{"region_failover_state":"RegionFailoverStart","failover_candidate":null}}"# + r#"{"failed_region":{"cluster_id":0,"datanode_id":1,"table_id":1,"region_number":1},"state":{"region_failover_state":"RegionFailoverStart","failover_candidate":null}}"# ); let n: Node = serde_json::from_str(&s).unwrap(); assert_eq!( format!("{n:?}"), - r#"Node { failed_region: RegionIdent { cluster_id: 0, datanode_id: 1, table_ident: TableIdent { catalog: "greptime", schema: "public", table: "my_table", table_id: 1, engine: "mito" }, region_number: 1 }, state: RegionFailoverStart { failover_candidate: None } }"# + r#"Node { failed_region: RegionIdent { cluster_id: 0, datanode_id: 1, table_id: 1, region_number: 1 }, state: RegionFailoverStart { failover_candidate: None } }"# ); } diff --git a/src/meta-srv/src/procedure/region_failover/failover_start.rs b/src/meta-srv/src/procedure/region_failover/failover_start.rs index 52d55bbe9272..e31de684ca3b 100644 --- a/src/meta-srv/src/procedure/region_failover/failover_start.rs +++ b/src/meta-srv/src/procedure/region_failover/failover_start.rs @@ -15,7 +15,6 @@ use async_trait::async_trait; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; -use common_meta::ident::TableIdent; use common_meta::peer::Peer; use common_meta::RegionIdent; use common_telemetry::info; @@ -48,15 +47,7 @@ impl RegionFailoverStart { } let mut selector_ctx = ctx.selector_ctx.clone(); - let TableIdent { - catalog, - schema, - table, - .. - } = &failed_region.table_ident; - selector_ctx.catalog = Some(catalog.to_string()); - selector_ctx.schema = Some(schema.to_string()); - selector_ctx.table = Some(table.to_string()); + selector_ctx.table_id = Some(failed_region.table_id); let cluster_id = failed_region.cluster_id; let candidates = ctx diff --git a/src/meta-srv/src/procedure/region_failover/invalidate_cache.rs b/src/meta-srv/src/procedure/region_failover/invalidate_cache.rs index 52f47219a43a..6c26f22286a6 100644 --- a/src/meta-srv/src/procedure/region_failover/invalidate_cache.rs +++ b/src/meta-srv/src/procedure/region_failover/invalidate_cache.rs @@ -14,12 +14,12 @@ use api::v1::meta::MailboxMessage; use async_trait::async_trait; -use common_meta::ident::TableIdent; use common_meta::instruction::Instruction; use common_meta::RegionIdent; use common_telemetry::info; use serde::{Deserialize, Serialize}; use snafu::ResultExt; +use table::metadata::TableId; use super::failover_end::RegionFailoverEnd; use super::{RegionFailoverContext, State}; @@ -33,9 +33,9 @@ impl InvalidateCache { async fn broadcast_invalidate_table_cache_messages( &self, ctx: &RegionFailoverContext, - table_ident: &TableIdent, + table_id: TableId, ) -> Result<()> { - let instruction = Instruction::InvalidateTableCache(table_ident.clone()); + let instruction = Instruction::InvalidateTableIdCache(table_id); let msg = &MailboxMessage::json_message( "Invalidate Table Cache", @@ -62,12 +62,12 @@ impl State for InvalidateCache { ctx: &RegionFailoverContext, failed_region: &RegionIdent, ) -> Result> { - let table_ident = TableIdent::from(failed_region.clone()); + let table_id = failed_region.table_id; info!( "Broadcast invalidate table({}) cache message to frontend", - table_ident + table_id ); - self.broadcast_invalidate_table_cache_messages(ctx, &table_ident) + self.broadcast_invalidate_table_cache_messages(ctx, table_id) .await?; Ok(Box::new(RegionFailoverEnd)) @@ -108,7 +108,7 @@ mod tests { let _ = heartbeat_receivers.insert(frontend_id, rx); } - let table_ident: TableIdent = failed_region.clone().into(); + let table_id = failed_region.table_id; // lexicographical order // frontend-4,5,6,7 @@ -132,8 +132,7 @@ mod tests { assert_eq!( received.payload, Some(Payload::Json( - serde_json::to_string(&Instruction::InvalidateTableCache(table_ident.clone())) - .unwrap(), + serde_json::to_string(&Instruction::InvalidateTableIdCache(table_id)).unwrap(), )) ); } diff --git a/src/meta-srv/src/procedure/region_failover/update_metadata.rs b/src/meta-srv/src/procedure/region_failover/update_metadata.rs index 0cb40afc5d11..a58c687b2e4d 100644 --- a/src/meta-srv/src/procedure/region_failover/update_metadata.rs +++ b/src/meta-srv/src/procedure/region_failover/update_metadata.rs @@ -23,7 +23,7 @@ use snafu::{OptionExt, ResultExt}; use super::invalidate_cache::InvalidateCache; use super::{RegionFailoverContext, State}; -use crate::error::{self, Result, RetryLaterSnafu}; +use crate::error::{self, Result, RetryLaterSnafu, TableRouteNotFoundSnafu}; use crate::lock::keys::table_metadata_lock_key; use crate::lock::Opts; @@ -57,8 +57,8 @@ impl UpdateRegionMetadata { ctx: &RegionFailoverContext, failed_region: &RegionIdent, ) -> Result<()> { - let table_id = failed_region.table_ident.table_id; - let engine = failed_region.table_ident.engine.as_str(); + let table_id = failed_region.table_id; + let engine = &failed_region.engine; let table_route_value = ctx .table_metadata_manager @@ -66,9 +66,7 @@ impl UpdateRegionMetadata { .get(table_id) .await .context(error::TableMetadataManagerSnafu)? - .with_context(|| error::TableRouteNotFoundSnafu { - table_name: failed_region.table_ident.table_ref().to_string(), - })?; + .context(TableRouteNotFoundSnafu { table_id })?; let mut new_region_routes = table_route_value.region_routes.clone(); @@ -185,7 +183,7 @@ mod tests { .await .unwrap(); - let table_id = failed_region.table_ident.table_id; + let table_id = failed_region.table_id; env.context .table_metadata_manager @@ -316,7 +314,7 @@ mod tests { let failed_region_1 = env.failed_region(1).await; let failed_region_2 = env.failed_region(2).await; - let table_id = failed_region_1.table_ident.table_id; + let table_id = failed_region_1.table_id; let _ = futures::future::join_all(vec![ tokio::spawn(async move { diff --git a/src/meta-srv/src/procedure/tests.rs b/src/meta-srv/src/procedure/tests.rs index 43e5d26b655d..8ff2ac86b641 100644 --- a/src/meta-srv/src/procedure/tests.rs +++ b/src/meta-srv/src/procedure/tests.rs @@ -80,7 +80,6 @@ fn create_table_task() -> CreateTableTask { create_if_not_exists: false, table_options: HashMap::new(), table_id: None, - region_numbers: vec![1, 2, 3], engine: MITO2_ENGINE.to_string(), }; diff --git a/src/meta-srv/src/selector/load_based.rs b/src/meta-srv/src/selector/load_based.rs index 9e34ca0563ae..5c612d538df6 100644 --- a/src/meta-srv/src/selector/load_based.rs +++ b/src/meta-srv/src/selector/load_based.rs @@ -13,11 +13,11 @@ // limitations under the License. use api::v1::meta::Peer; -use common_meta::key::table_name::TableNameKey; use common_meta::key::TableMetadataManager; use common_meta::rpc::router::find_leaders; use common_telemetry::warn; use snafu::ResultExt; +use table::metadata::TableId; use crate::error::{self, Result}; use crate::keys::{LeaseKey, LeaseValue, StatKey}; @@ -32,32 +32,21 @@ pub struct LoadBasedSelector; async fn get_leader_peer_ids( table_metadata_manager: &TableMetadataManager, - catalog: &str, - schema: &str, - table: &str, + table_id: TableId, ) -> Result> { - let table_name = table_metadata_manager - .table_name_manager() - .get(TableNameKey::new(catalog, schema, table)) + table_metadata_manager + .table_route_manager() + .get(table_id) .await - .context(error::TableMetadataManagerSnafu)?; - - Ok(if let Some(table_name) = table_name { - table_metadata_manager - .table_route_manager() - .get(table_name.table_id()) - .await - .context(error::TableMetadataManagerSnafu)? - .map(|route| { + .context(error::TableMetadataManagerSnafu) + .map(|route| { + route.map_or_else(Vec::new, |route| { find_leaders(&route.region_routes) .into_iter() .map(|peer| peer.id) .collect() }) - .unwrap_or_default() - } else { - Vec::new() - }) + }) } #[async_trait::async_trait] @@ -76,13 +65,11 @@ impl Selector for LoadBasedSelector { let stat_keys: Vec = lease_kvs.keys().map(|k| k.into()).collect(); let stat_kvs = ctx.meta_peer_client.get_dn_stat_kvs(stat_keys).await?; - let leader_peer_ids = if let (Some(catalog), Some(schema), Some(table)) = - (&ctx.catalog, &ctx.schema, &ctx.table) - { + let leader_peer_ids = if let Some(table_id) = ctx.table_id { let table_metadata_manager = TableMetadataManager::new(KvBackendAdapter::wrap(ctx.kv_store.clone())); - get_leader_peer_ids(&table_metadata_manager, catalog, schema, table).await? + get_leader_peer_ids(&table_metadata_manager, table_id).await? } else { Vec::new() }; diff --git a/src/meta-srv/src/service/admin/route.rs b/src/meta-srv/src/service/admin/route.rs index eef3851677f2..80da536d583a 100644 --- a/src/meta-srv/src/service/admin/route.rs +++ b/src/meta-srv/src/service/admin/route.rs @@ -22,7 +22,7 @@ use tonic::codegen::http; use super::HttpHandler; use crate::error; -use crate::error::Result; +use crate::error::{Result, TableNotFoundSnafu, TableRouteNotFoundSnafu}; pub struct RouteHandler { pub table_metadata_manager: TableMetadataManagerRef, @@ -53,15 +53,16 @@ impl HttpHandler for RouteHandler { .get(key) .await .context(error::TableMetadataManagerSnafu)? - .map(|x| x.table_id()); + .map(|x| x.table_id()) + .context(TableNotFoundSnafu { name: table_name })?; let table_route_value = self .table_metadata_manager .table_route_manager() - .get(table_id.context(error::TableNotFoundSnafu { name: table_name })?) + .get(table_id) .await .context(error::TableMetadataManagerSnafu)? - .context(error::TableRouteNotFoundSnafu { table_name })?; + .context(TableRouteNotFoundSnafu { table_id })?; http::Response::builder() .status(http::StatusCode::OK) .body(serde_json::to_string(&table_route_value).unwrap()) diff --git a/src/meta-srv/src/table_routes.rs b/src/meta-srv/src/table_routes.rs index 403ebadaf4d4..cbdfd12263a9 100644 --- a/src/meta-srv/src/table_routes.rs +++ b/src/meta-srv/src/table_routes.rs @@ -19,7 +19,7 @@ use common_meta::rpc::router::{Table, TableRoute}; use snafu::{OptionExt, ResultExt}; use table::metadata::TableId; -use crate::error::{self, Result, TableMetadataManagerSnafu}; +use crate::error::{self, Result, TableMetadataManagerSnafu, TableRouteNotFoundSnafu}; use crate::metasrv::Context; pub(crate) async fn fetch_table( @@ -32,9 +32,7 @@ pub(crate) async fn fetch_table( .context(TableMetadataManagerSnafu)?; if let Some(table_info) = table_info { - let table_route = table_route.with_context(|| error::TableRouteNotFoundSnafu { - table_name: table_info.table_ref().to_string(), - })?; + let table_route = table_route.context(TableRouteNotFoundSnafu { table_id })?; let table = Table { id: table_id as u64, diff --git a/src/meta-srv/src/test_util.rs b/src/meta-srv/src/test_util.rs index e45ed3cbd740..b5317e04b76d 100644 --- a/src/meta-srv/src/test_util.rs +++ b/src/meta-srv/src/test_util.rs @@ -75,9 +75,7 @@ pub(crate) fn create_region_failover_manager() -> Arc { server_addr: "127.0.0.1:3002".to_string(), kv_store: kv_store.clone(), meta_peer_client, - catalog: None, - schema: None, - table: None, + table_id: None, }; Arc::new(RegionFailoverManager::new( diff --git a/src/mito2/src/engine/alter_test.rs b/src/mito2/src/engine/alter_test.rs index bbd9fece2a16..a1fddab9c7b7 100644 --- a/src/mito2/src/engine/alter_test.rs +++ b/src/mito2/src/engine/alter_test.rs @@ -147,7 +147,7 @@ fn build_rows_for_tags( value_data: Some(ValueData::F64Value((value_start + idx) as f64)), }, api::v1::Value { - value_data: Some(ValueData::TsMillisecondValue(ts as i64 * 1000)), + value_data: Some(ValueData::TimestampMillisecondValue(ts as i64 * 1000)), }, api::v1::Value { value_data: Some(ValueData::StringValue(tag1.to_string())), diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index f59d0b2389f2..b682e8d2ac6a 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -795,7 +795,7 @@ mod tests { value_data: Some(ValueData::I64Value(k1)), }, api::v1::Value { - value_data: Some(ValueData::TsMillisecondValue(i as i64)), + value_data: Some(ValueData::TimestampMillisecondValue(i as i64)), }, api::v1::Value { value_data: Some(ValueData::I64Value(i as i64)), diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 6fbf197ab9b4..2f29c8aa6da0 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -307,7 +307,7 @@ pub(crate) fn i64_value(data: i64) -> v1::Value { #[cfg(test)] pub(crate) fn ts_ms_value(data: i64) -> v1::Value { v1::Value { - value_data: Some(ValueData::TsMillisecondValue(data)), + value_data: Some(ValueData::TimestampMillisecondValue(data)), } } @@ -467,7 +467,7 @@ pub fn build_rows(start: usize, end: usize) -> Vec { value_data: Some(ValueData::F64Value(i as f64)), }, api::v1::Value { - value_data: Some(ValueData::TsMillisecondValue(i as i64 * 1000)), + value_data: Some(ValueData::TimestampMillisecondValue(i as i64 * 1000)), }, ], }) @@ -519,7 +519,7 @@ pub fn build_rows_for_key(key: &str, start: usize, end: usize, value_start: usiz value_data: Some(ValueData::F64Value((value_start + idx) as f64)), }, api::v1::Value { - value_data: Some(ValueData::TsMillisecondValue(ts as i64 * 1000)), + value_data: Some(ValueData::TimestampMillisecondValue(ts as i64 * 1000)), }, ], }) @@ -535,7 +535,7 @@ pub fn build_delete_rows_for_key(key: &str, start: usize, end: usize) -> Vec for InsertRequests { - type Error = Error; - - fn try_from(value: InfluxdbRequest) -> Result { - let mut writers: HashMap = HashMap::new(); - let lines = parse_lines(&value.lines) - .collect::>>() - .context(InfluxdbLineProtocolSnafu)?; - let line_len = lines.len(); - - for line in lines { - let table_name = line.series.measurement; - let writer = writers - .entry(table_name.to_string()) - .or_insert_with(|| LinesWriter::with_lines(line_len)); - - let tags = line.series.tag_set; - if let Some(tags) = tags { - for (k, v) in tags { - writer - .write_tag(k.as_str(), v.as_str()) - .context(InfluxdbLinesWriteSnafu)?; - } - } - - let fields = line.field_set; - for (k, v) in fields { - let column_name = k.as_str(); - match v { - FieldValue::I64(value) => { - writer - .write_i64(column_name, value) - .context(InfluxdbLinesWriteSnafu)?; - } - FieldValue::U64(value) => { - writer - .write_u64(column_name, value) - .context(InfluxdbLinesWriteSnafu)?; - } - FieldValue::F64(value) => { - writer - .write_f64(column_name, value) - .context(InfluxdbLinesWriteSnafu)?; - } - FieldValue::String(value) => { - writer - .write_string(column_name, value.as_str()) - .context(InfluxdbLinesWriteSnafu)?; - } - FieldValue::Boolean(value) => { - writer - .write_bool(column_name, value) - .context(InfluxdbLinesWriteSnafu)?; - } - } - } - - if let Some(timestamp) = line.timestamp { - let precision = unwrap_or_default_precision(value.precision); - writer - .write_ts(INFLUXDB_TIMESTAMP_COLUMN_NAME, (timestamp, precision)) - .context(InfluxdbLinesWriteSnafu)?; - } else { - let precision = unwrap_or_default_precision(value.precision); - let timestamp = Timestamp::current_millis(); - let unit: TimeUnit = precision.try_into().context(InfluxdbLinesWriteSnafu)?; - let timestamp = timestamp - .convert_to(unit) - .with_context(|| TimePrecisionSnafu { - name: precision.to_string(), - })?; - writer - .write_ts( - INFLUXDB_TIMESTAMP_COLUMN_NAME, - (timestamp.into(), precision), - ) - .context(InfluxdbLinesWriteSnafu)?; - } - writer.commit(); - } - - let inserts = writers - .into_iter() - .map(|(table_name, writer)| { - let (columns, row_count) = writer.finish(); - GrpcInsertRequest { - table_name, - region_number: 0, - columns, - row_count, - } - }) - .collect(); - Ok(InsertRequests { inserts }) - } -} - impl TryFrom for RowInsertRequests { type Error = Error; @@ -206,162 +101,12 @@ fn unwrap_or_default_precision(precision: Option) -> Precision { #[cfg(test)] mod tests { - use api::v1::column::Values; use api::v1::value::ValueData; - use api::v1::{Column, ColumnDataType, Rows, SemanticType}; - use common_base::BitVec; + use api::v1::{ColumnDataType, Rows, SemanticType}; use super::*; use crate::influxdb::InfluxdbRequest; - #[test] - fn test_convert_influxdb_lines() { - let lines = r" -monitor1,host=host1 cpu=66.6,memory=1024 1663840496100023100 -monitor1,host=host2 memory=1027 1663840496400340001 -monitor2,host=host3 cpu=66.5 1663840496100023102 -monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003"; - - let influxdb_req = InfluxdbRequest { - precision: None, - lines: lines.to_string(), - }; - - let requests: InsertRequests = influxdb_req.try_into().unwrap(); - assert_eq!(2, requests.inserts.len()); - - for request in requests.inserts { - match &request.table_name[..] { - "monitor1" => assert_monitor_1(&request.columns), - "monitor2" => assert_monitor_2(&request.columns), - _ => panic!(), - } - } - } - - fn assert_monitor_1(columns: &[Column]) { - assert_eq!(4, columns.len()); - verify_column( - &columns[0], - "host", - ColumnDataType::String, - SemanticType::Tag, - Vec::new(), - Values { - string_values: vec!["host1".to_string(), "host2".to_string()], - ..Default::default() - }, - ); - - verify_column( - &columns[1], - "cpu", - ColumnDataType::Float64, - SemanticType::Field, - vec![false, true], - Values { - f64_values: vec![66.6], - ..Default::default() - }, - ); - - verify_column( - &columns[2], - "memory", - ColumnDataType::Float64, - SemanticType::Field, - Vec::new(), - Values { - f64_values: vec![1024.0, 1027.0], - ..Default::default() - }, - ); - - verify_column( - &columns[3], - "ts", - ColumnDataType::TimestampMillisecond, - SemanticType::Timestamp, - Vec::new(), - Values { - ts_millisecond_values: vec![1663840496100, 1663840496400], - ..Default::default() - }, - ); - } - - fn assert_monitor_2(columns: &[Column]) { - assert_eq!(4, columns.len()); - verify_column( - &columns[0], - "host", - ColumnDataType::String, - SemanticType::Tag, - Vec::new(), - Values { - string_values: vec!["host3".to_string(), "host4".to_string()], - ..Default::default() - }, - ); - - verify_column( - &columns[1], - "cpu", - ColumnDataType::Float64, - SemanticType::Field, - Vec::new(), - Values { - f64_values: vec![66.5, 66.3], - ..Default::default() - }, - ); - - verify_column( - &columns[2], - "ts", - ColumnDataType::TimestampMillisecond, - SemanticType::Timestamp, - Vec::new(), - Values { - ts_millisecond_values: vec![1663840496100, 1663840496400], - ..Default::default() - }, - ); - - verify_column( - &columns[3], - "memory", - ColumnDataType::Float64, - SemanticType::Field, - vec![true, false], - Values { - f64_values: vec![1029.0], - ..Default::default() - }, - ); - } - - fn verify_column( - column: &Column, - name: &str, - datatype: ColumnDataType, - semantic_type: SemanticType, - null_mask: Vec, - vals: Values, - ) { - assert_eq!(name, column.column_name); - assert_eq!(datatype as i32, column.datatype); - assert_eq!(semantic_type as i32, column.semantic_type); - verify_null_mask(&column.null_mask, null_mask); - assert_eq!(Some(vals), column.values); - } - - fn verify_null_mask(data: &[u8], expected: Vec) { - let bitvec = BitVec::from_slice(data); - for (idx, b) in expected.iter().enumerate() { - assert_eq!(b, bitvec.get(idx).unwrap()) - } - } #[test] fn test_convert_influxdb_lines_to_rows() { let lines = r" @@ -553,7 +298,7 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003"; fn extract_ts_millis_value(value: &ValueData) -> i64 { match value { - ValueData::TsMillisecondValue(v) => *v, + ValueData::TimestampMillisecondValue(v) => *v, _ => panic!(), } } diff --git a/src/servers/src/opentsdb/codec.rs b/src/servers/src/opentsdb/codec.rs index bee99c182810..163e060adece 100644 --- a/src/servers/src/opentsdb/codec.rs +++ b/src/servers/src/opentsdb/codec.rs @@ -129,7 +129,7 @@ impl DataPoint { let ts_column = Column { column_name: OPENTSDB_TIMESTAMP_COLUMN_NAME.to_string(), values: Some(column::Values { - ts_millisecond_values: vec![self.ts_millis], + timestamp_millisecond_values: vec![self.ts_millis], ..Default::default() }), semantic_type: SemanticType::Timestamp as i32, @@ -165,7 +165,6 @@ impl DataPoint { GrpcInsertRequest { table_name: self.metric.clone(), - region_number: 0, columns, row_count: 1, } @@ -268,7 +267,11 @@ mod test { assert_eq!(columns[0].column_name, OPENTSDB_TIMESTAMP_COLUMN_NAME); assert_eq!( - columns[0].values.as_ref().unwrap().ts_millisecond_values, + columns[0] + .values + .as_ref() + .unwrap() + .timestamp_millisecond_values, vec![1000] ); diff --git a/src/servers/src/otlp.rs b/src/servers/src/otlp.rs index 05c7a56abccd..2510263ed709 100644 --- a/src/servers/src/otlp.rs +++ b/src/servers/src/otlp.rs @@ -176,7 +176,6 @@ fn encode_gauge( let (columns, row_count) = lines.finish(); Ok(InsertRequest { table_name: normalize_otlp_name(name), - region_number: 0, columns, row_count, }) @@ -208,7 +207,6 @@ fn encode_sum( let (columns, row_count) = lines.finish(); Ok(InsertRequest { table_name: normalize_otlp_name(name), - region_number: 0, columns, row_count, }) @@ -251,7 +249,6 @@ fn encode_histogram(name: &str, hist: &Histogram) -> Result { let (columns, row_count) = lines.finish(); Ok(InsertRequest { table_name: normalize_otlp_name(name), - region_number: 0, columns, row_count, }) @@ -310,7 +307,6 @@ fn encode_exponential_histogram(name: &str, hist: &ExponentialHistogram) -> Resu let (columns, row_count) = lines.finish(); Ok(InsertRequest { table_name: normalize_otlp_name(name), - region_number: 0, columns, row_count, }) @@ -351,7 +347,6 @@ fn encode_summary( let (columns, row_count) = lines.finish(); Ok(InsertRequest { table_name: normalize_otlp_name(name), - region_number: 0, columns, row_count, }) diff --git a/src/servers/src/prom_store.rs b/src/servers/src/prom_store.rs index 813a39045e37..494e6505494c 100644 --- a/src/servers/src/prom_store.rs +++ b/src/servers/src/prom_store.rs @@ -15,13 +15,12 @@ //! prometheus protocol supportings //! handles prometheus remote_write, remote_read logic use std::cmp::Ordering; -use std::collections::{BTreeMap, HashMap}; +use std::collections::BTreeMap; use std::hash::{Hash, Hasher}; use api::prom_store::remote::label_matcher::Type as MatcherType; use api::prom_store::remote::{Label, Query, Sample, TimeSeries, WriteRequest}; -use api::v1::{InsertRequest as GrpcInsertRequest, InsertRequests, RowInsertRequests}; -use common_grpc::writer::{LinesWriter, Precision}; +use api::v1::RowInsertRequests; use common_recordbatch::{RecordBatch, RecordBatches}; use common_time::timestamp::TimeUnit; use datafusion::prelude::{col, lit, regexp_match, Expr}; @@ -356,71 +355,6 @@ pub fn to_grpc_row_insert_requests(request: WriteRequest) -> Result<(RowInsertRe Ok(multi_table_data.into_row_insert_requests()) } -pub fn to_grpc_insert_requests(request: WriteRequest) -> Result<(InsertRequests, usize)> { - let mut writers: HashMap = HashMap::new(); - for timeseries in &request.timeseries { - let table_name = timeseries - .labels - .iter() - .find(|label| { - // The metric name is a special label - label.name == METRIC_NAME_LABEL - }) - .context(error::InvalidPromRemoteRequestSnafu { - msg: "missing '__name__' label in timeseries", - })? - .value - .clone(); - - let writer = writers - .entry(table_name) - .or_insert_with(|| LinesWriter::with_lines(16)); - // For each sample - for sample in ×eries.samples { - // Insert labels first. - for label in ×eries.labels { - // The metric name is a special label - if label.name == METRIC_NAME_LABEL { - continue; - } - - writer - .write_tag(&label.name, &label.value) - .context(error::PromSeriesWriteSnafu)?; - } - // Insert sample timestamp. - writer - .write_ts( - TIMESTAMP_COLUMN_NAME, - (sample.timestamp, Precision::Millisecond), - ) - .context(error::PromSeriesWriteSnafu)?; - // Insert sample value. - writer - .write_f64(FIELD_COLUMN_NAME, sample.value) - .context(error::PromSeriesWriteSnafu)?; - - writer.commit(); - } - } - - let mut sample_counts = 0; - let inserts = writers - .into_iter() - .map(|(table_name, writer)| { - let (columns, row_count) = writer.finish(); - sample_counts += row_count as usize; - GrpcInsertRequest { - table_name, - region_number: 0, - columns, - row_count, - } - }) - .collect(); - Ok((InsertRequests { inserts }, sample_counts)) -} - #[inline] pub fn snappy_decompress(buf: &[u8]) -> Result> { let mut decoder = Decoder::new(); @@ -655,7 +589,9 @@ mod tests { value_data: Some(api::v1::value::ValueData::F64Value(value)), }, api::v1::Value { - value_data: Some(api::v1::value::ValueData::TsMillisecondValue(timestamp)), + value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue( + timestamp, + )), }, ], } @@ -674,7 +610,9 @@ mod tests { value_data: Some(api::v1::value::ValueData::F64Value(value)), }, api::v1::Value { - value_data: Some(api::v1::value::ValueData::TsMillisecondValue(timestamp)), + value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue( + timestamp, + )), }, ], } @@ -756,115 +694,6 @@ mod tests { ); } - #[test] - fn test_write_request_to_insert_exprs() { - let write_request = WriteRequest { - timeseries: mock_timeseries(), - ..Default::default() - }; - - let mut exprs = to_grpc_insert_requests(write_request).unwrap().0.inserts; - exprs.sort_unstable_by(|l, r| l.table_name.cmp(&r.table_name)); - assert_eq!(3, exprs.len()); - assert_eq!("metric1", exprs[0].table_name); - assert_eq!("metric2", exprs[1].table_name); - assert_eq!("metric3", exprs[2].table_name); - - let expr = exprs.get_mut(0).unwrap(); - expr.columns - .sort_unstable_by(|l, r| l.column_name.cmp(&r.column_name)); - - let columns = &expr.columns; - let row_count = expr.row_count; - - assert_eq!(2, row_count); - assert_eq!(columns.len(), 3); - - assert_eq!(columns[0].column_name, TIMESTAMP_COLUMN_NAME); - assert_eq!( - columns[0].values.as_ref().unwrap().ts_millisecond_values, - vec![1000, 2000] - ); - - assert_eq!(columns[1].column_name, FIELD_COLUMN_NAME); - assert_eq!( - columns[1].values.as_ref().unwrap().f64_values, - vec![1.0, 2.0] - ); - - assert_eq!(columns[2].column_name, "job"); - assert_eq!( - columns[2].values.as_ref().unwrap().string_values, - vec!["spark", "spark"] - ); - - let expr = exprs.get_mut(1).unwrap(); - expr.columns - .sort_unstable_by(|l, r| l.column_name.cmp(&r.column_name)); - - let columns = &expr.columns; - let row_count = expr.row_count; - - assert_eq!(2, row_count); - assert_eq!(columns.len(), 4); - - assert_eq!(columns[0].column_name, TIMESTAMP_COLUMN_NAME); - assert_eq!( - columns[0].values.as_ref().unwrap().ts_millisecond_values, - vec![1000, 2000] - ); - - assert_eq!(columns[1].column_name, FIELD_COLUMN_NAME); - assert_eq!( - columns[1].values.as_ref().unwrap().f64_values, - vec![3.0, 4.0] - ); - - assert_eq!(columns[2].column_name, "idc"); - assert_eq!( - columns[2].values.as_ref().unwrap().string_values, - vec!["z001", "z001"] - ); - assert_eq!(columns[3].column_name, "instance"); - assert_eq!( - columns[3].values.as_ref().unwrap().string_values, - vec!["test_host1", "test_host1"] - ); - - let expr = exprs.get_mut(2).unwrap(); - expr.columns - .sort_unstable_by(|l, r| l.column_name.cmp(&r.column_name)); - - let columns = &expr.columns; - let row_count = expr.row_count; - - assert_eq!(3, row_count); - assert_eq!(columns.len(), 4); - - assert_eq!(columns[0].column_name, "app"); - assert_eq!( - columns[0].values.as_ref().unwrap().string_values, - vec!["biz", "biz", "biz"] - ); - assert_eq!(columns[1].column_name, TIMESTAMP_COLUMN_NAME); - assert_eq!( - columns[1].values.as_ref().unwrap().ts_millisecond_values, - vec![1000, 2000, 3000] - ); - - assert_eq!(columns[2].column_name, FIELD_COLUMN_NAME); - assert_eq!( - columns[2].values.as_ref().unwrap().f64_values, - vec![5.0, 6.0, 7.0] - ); - - assert_eq!(columns[3].column_name, "idc"); - assert_eq!( - columns[3].values.as_ref().unwrap().string_values, - vec!["z002", "z002", "z002"] - ); - } - #[test] fn test_recordbatches_to_timeseries() { let schema = Arc::new(Schema::new(vec![ diff --git a/src/servers/src/row_writer.rs b/src/servers/src/row_writer.rs index ab6b80d23355..5504b34aa1f8 100644 --- a/src/servers/src/row_writer.rs +++ b/src/servers/src/row_writer.rs @@ -108,7 +108,6 @@ impl<'a> MultiTableData<'a> { RowInsertRequest { table_name: table_name.to_string(), rows: Some(Rows { schema, rows }), - ..Default::default() } }) .collect::>(); @@ -227,14 +226,14 @@ pub fn write_ts_precision<'a>( datatype: ColumnDataType::TimestampMillisecond as i32, semantic_type: SemanticType::Timestamp as i32, }); - one_row.push(ValueData::TsMillisecondValue(ts).into()) + one_row.push(ValueData::TimestampMillisecondValue(ts).into()) } else { check_schema( ColumnDataType::TimestampMillisecond, SemanticType::Timestamp, &schema[*index], )?; - one_row[*index].value_data = Some(ValueData::TsMillisecondValue(ts)); + one_row[*index].value_data = Some(ValueData::TimestampMillisecondValue(ts)); } Ok(()) diff --git a/src/servers/tests/http/influxdb_test.rs b/src/servers/tests/http/influxdb_test.rs index 3e2db6311826..d74acca3fcc3 100644 --- a/src/servers/tests/http/influxdb_test.rs +++ b/src/servers/tests/http/influxdb_test.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use api::v1::greptime_request::Request; -use api::v1::InsertRequests; +use api::v1::RowInsertRequests; use async_trait::async_trait; use auth::tests::{DatabaseAuthInfo, MockUserProvider}; use axum::{http, Router}; @@ -54,7 +54,7 @@ impl GrpcQueryHandler for DummyInstance { #[async_trait] impl InfluxdbLineProtocolHandler for DummyInstance { async fn exec(&self, request: InfluxdbRequest, ctx: QueryContextRef) -> Result<()> { - let requests: InsertRequests = request.try_into()?; + let requests: RowInsertRequests = request.try_into()?; for expr in requests.inserts { let _ = self .tx diff --git a/src/servers/tests/interceptor.rs b/src/servers/tests/interceptor.rs index 92fcace38f10..1482a92078e4 100644 --- a/src/servers/tests/interceptor.rs +++ b/src/servers/tests/interceptor.rs @@ -54,10 +54,8 @@ impl GrpcQueryInterceptor for NoopInterceptor { match req { Request::Inserts(insert) => { ensure!( - insert.inserts.iter().all(|x| x.region_number == 0), - NotSupportedSnafu { - feat: "region not 0" - } + insert.inserts.iter().all(|x| x.row_count > 0), + NotSupportedSnafu { feat: "" } ) } _ => { @@ -74,10 +72,7 @@ fn test_grpc_interceptor() { let ctx = QueryContext::arc(); let req = Request::Inserts(InsertRequests { - inserts: vec![InsertRequest { - region_number: 1, - ..Default::default() - }], + inserts: vec![InsertRequest::default()], }); let fail = GrpcQueryInterceptor::pre_execute(&di, &req, ctx.clone()); diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index f384f8633170..ad1c7eb3571f 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -448,6 +448,8 @@ pub struct TableInfo { /// Id and version of the table. #[builder(default, setter(into))] pub ident: TableIdent, + + // TODO(LFC): Remove the catalog, schema and table names from TableInfo. /// Name of the table. #[builder(setter(into))] pub name: String, From 1ac15d067d58409e1f18b806e0a39a9c8cca3107 Mon Sep 17 00:00:00 2001 From: luofucong Date: Fri, 15 Sep 2023 12:46:35 +0800 Subject: [PATCH 2/4] rebase --- src/common/meta/src/instruction.rs | 11 +++++++---- src/datanode/src/tests.rs | 2 ++ src/meta-srv/src/handler/failure_handler.rs | 3 +++ src/meta-srv/src/handler/failure_handler/runner.rs | 3 +++ src/meta-srv/src/handler/region_lease_handler.rs | 2 ++ src/meta-srv/src/procedure/region_failover.rs | 7 ++++--- src/mito2/src/engine/projection_test.rs | 2 +- 7 files changed, 22 insertions(+), 8 deletions(-) diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index d5783400f61d..d335907c5a4f 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -28,6 +28,7 @@ pub struct RegionIdent { pub datanode_id: DatanodeId, pub table_id: TableId, pub region_number: RegionNumber, + pub engine: String, } impl RegionIdent { @@ -40,8 +41,8 @@ impl Display for RegionIdent { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "RegionIdent(datanode_id='{}.{}', table_id={}, region_number={})", - self.cluster_id, self.datanode_id, self.table_id, self.region_number + "RegionIdent(datanode_id='{}.{}', table_id={}, region_number={}, engine = {})", + self.cluster_id, self.datanode_id, self.table_id, self.region_number, self.engine ) } } @@ -97,12 +98,13 @@ mod tests { datanode_id: 2, table_id: 1024, region_number: 1, + engine: "mito2".to_string(), }); let serialized = serde_json::to_string(&open_region).unwrap(); assert_eq!( - r#"{"OpenRegion":{"cluster_id":1,"datanode_id":2,"table_id":1024,"region_number":1}}"#, + r#"{"OpenRegion":{"cluster_id":1,"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#, serialized ); @@ -111,12 +113,13 @@ mod tests { datanode_id: 2, table_id: 1024, region_number: 1, + engine: "mito2".to_string(), }); let serialized = serde_json::to_string(&close_region).unwrap(); assert_eq!( - r#"{"CloseRegion":{"cluster_id":1,"datanode_id":2,"table_id":1024,"region_number":1}}"#, + r#"{"CloseRegion":{"cluster_id":1,"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"}}"#, serialized ); } diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index 36c4091134e6..629603b82383 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -84,6 +84,7 @@ fn close_region_instruction() -> Instruction { region_number: 0, cluster_id: 1, datanode_id: 2, + engine: "mito2".to_string(), }) } @@ -93,6 +94,7 @@ fn open_region_instruction() -> Instruction { region_number: 0, cluster_id: 1, datanode_id: 2, + engine: "mito2".to_string(), }) } diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs index dc1c7ca46a18..85bb0c49aac3 100644 --- a/src/meta-srv/src/handler/failure_handler.rs +++ b/src/meta-srv/src/handler/failure_handler.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use api::v1::meta::{HeartbeatRequest, Role}; use async_trait::async_trait; +use common_catalog::consts::default_engine; use common_meta::RegionIdent; use store_api::storage::RegionId; @@ -86,6 +87,8 @@ impl HeartbeatHandler for RegionFailureHandler { datanode_id: stat.id, table_id: region_id.table_id(), region_number: region_id.region_number(), + // TODO(LFC): Use the actual table engine (maybe retrieve from heartbeat). + engine: default_engine().to_string(), } }) .collect(), diff --git a/src/meta-srv/src/handler/failure_handler/runner.rs b/src/meta-srv/src/handler/failure_handler/runner.rs index ed9ef603126a..6f9ca2ba9f40 100644 --- a/src/meta-srv/src/handler/failure_handler/runner.rs +++ b/src/meta-srv/src/handler/failure_handler/runner.rs @@ -260,6 +260,7 @@ mod tests { cluster_id: 3, datanode_id: 2, region_number: 1, + engine: "mito2".to_string(), }; let _ = container.get_failure_detector(ident.clone()); assert!(container.0.contains_key(&ident)); @@ -283,6 +284,7 @@ mod tests { cluster_id: 3, datanode_id: 2, region_number: 1, + engine: "mito2".to_string(), }; let _ = container.get_failure_detector(ident.clone()); @@ -318,6 +320,7 @@ mod tests { cluster_id: 1, datanode_id, region_number, + engine: "mito2".to_string(), }) .collect(), heartbeat_time: start + i * 1000 + rng.gen_range(0..100), diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index 40f85b902f53..9102f5c0e120 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -132,6 +132,7 @@ mod test { datanode_id: 1, table_id: 1, region_number: 1, + engine: "mito2".to_string(), }) .await .unwrap(); @@ -141,6 +142,7 @@ mod test { datanode_id: 1, table_id: 1, region_number: 3, + engine: "mito2".to_string(), }) .await .unwrap(); diff --git a/src/meta-srv/src/procedure/region_failover.rs b/src/meta-srv/src/procedure/region_failover.rs index 1b009bb44296..27a4fa1d2450 100644 --- a/src/meta-srv/src/procedure/region_failover.rs +++ b/src/meta-srv/src/procedure/region_failover.rs @@ -453,6 +453,7 @@ mod tests { region_number, datanode_id: failed_datanode, table_id: 1, + engine: "mito2".to_string(), } } } @@ -653,7 +654,7 @@ mod tests { assert_eq!( procedure.dump().unwrap(), - r#"{"failed_region":{"cluster_id":0,"datanode_id":1,"table_id":1,"region_number":1},"state":{"region_failover_state":"RegionFailoverEnd"}}"# + r#"{"failed_region":{"cluster_id":0,"datanode_id":1,"table_id":1,"region_number":1,"engine":"mito2"},"state":{"region_failover_state":"RegionFailoverEnd"}}"# ); // Verifies that the failed region (region 1) is moved from failed datanode (datanode 1) to the candidate datanode. @@ -693,12 +694,12 @@ mod tests { let s = procedure.dump().unwrap(); assert_eq!( s, - r#"{"failed_region":{"cluster_id":0,"datanode_id":1,"table_id":1,"region_number":1},"state":{"region_failover_state":"RegionFailoverStart","failover_candidate":null}}"# + r#"{"failed_region":{"cluster_id":0,"datanode_id":1,"table_id":1,"region_number":1,"engine":"mito2"},"state":{"region_failover_state":"RegionFailoverStart","failover_candidate":null}}"# ); let n: Node = serde_json::from_str(&s).unwrap(); assert_eq!( format!("{n:?}"), - r#"Node { failed_region: RegionIdent { cluster_id: 0, datanode_id: 1, table_id: 1, region_number: 1 }, state: RegionFailoverStart { failover_candidate: None } }"# + r#"Node { failed_region: RegionIdent { cluster_id: 0, datanode_id: 1, table_id: 1, region_number: 1, engine: "mito2" }, state: RegionFailoverStart { failover_candidate: None } }"# ); } diff --git a/src/mito2/src/engine/projection_test.rs b/src/mito2/src/engine/projection_test.rs index 9104f56d0890..796293675be5 100644 --- a/src/mito2/src/engine/projection_test.rs +++ b/src/mito2/src/engine/projection_test.rs @@ -43,7 +43,7 @@ fn build_rows_multi_tags_fields( }); } values.push(api::v1::Value { - value_data: Some(ValueData::TsMillisecondValue(ts as i64 * 1000)), + value_data: Some(ValueData::TimestampMillisecondValue(ts as i64 * 1000)), }); api::v1::Row { values } From 6dbc2d185d4a750f098cb528ce084d51eb034136 Mon Sep 17 00:00:00 2001 From: luofucong Date: Fri, 15 Sep 2023 12:54:14 +0800 Subject: [PATCH 3/4] fix: resolve PR comments --- src/datanode/src/heartbeat/handler.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index f86a665d766f..d88e1dcb9eee 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -15,7 +15,6 @@ use std::collections::HashMap; use async_trait::async_trait; -use common_catalog::consts::default_engine; use common_meta::error::{InvalidHeartbeatResponseSnafu, Result as MetaResult}; use common_meta::heartbeat::handler::{ HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext, @@ -47,7 +46,7 @@ impl RegionHeartbeatResponseHandler { Instruction::OpenRegion(region_ident) => { let region_id = Self::region_ident_to_region_id(®ion_ident); let open_region_req = RegionRequest::Open(RegionOpenRequest { - engine: default_engine().to_string(), + engine: region_ident.engine.to_string(), region_dir: "".to_string(), options: HashMap::new(), }); From 03a17af5d909784789c6229d915304dcd127b757 Mon Sep 17 00:00:00 2001 From: luofucong Date: Fri, 15 Sep 2023 12:57:47 +0800 Subject: [PATCH 4/4] fix: resolve PR comments --- src/datanode/src/heartbeat/handler.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index d88e1dcb9eee..7eba2a87000f 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -46,7 +46,7 @@ impl RegionHeartbeatResponseHandler { Instruction::OpenRegion(region_ident) => { let region_id = Self::region_ident_to_region_id(®ion_ident); let open_region_req = RegionRequest::Open(RegionOpenRequest { - engine: region_ident.engine.to_string(), + engine: region_ident.engine, region_dir: "".to_string(), options: HashMap::new(), });