From d8bf8f31ace89e249265a4aace0e622af7095bb5 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 13 Sep 2023 15:27:24 +0800 Subject: [PATCH] remove region id, rename types, remove table ident Signed-off-by: Ruihang Xia --- Cargo.lock | 2 +- Cargo.toml | 2 +- benchmarks/src/bin/nyc-taxi.rs | 4 +- src/api/src/helper.rs | 88 ++++++++++--------- src/catalog/src/system.rs | 1 - src/client/examples/logical.rs | 1 - src/client/examples/stream_ingest.rs | 3 +- src/common/grpc-expr/src/alter.rs | 68 +------------- src/common/grpc-expr/src/delete.rs | 1 - src/common/grpc-expr/src/error.rs | 1 + src/common/grpc-expr/src/insert.rs | 4 +- src/common/grpc-expr/src/lib.rs | 2 +- src/common/grpc-expr/src/util.rs | 2 - src/common/grpc/src/select.rs | 8 +- src/common/grpc/src/writer.rs | 4 +- src/common/meta/src/ident.rs | 35 -------- src/datanode/src/error.rs | 6 +- src/datanode/src/instance/grpc.rs | 6 +- src/datanode/src/region_server.rs | 17 +++- src/frontend/src/expr_factory.rs | 2 - src/frontend/src/req_convert/common.rs | 20 +++-- .../src/req_convert/delete/column_to_row.rs | 1 - .../src/req_convert/insert/column_to_row.rs | 1 - .../src/req_convert/insert/table_to_region.rs | 1 - src/frontend/src/script.rs | 1 - src/frontend/src/statement/copy_table_from.rs | 1 - src/frontend/src/statement/dml.rs | 1 - src/frontend/src/table.rs | 1 - src/meta-srv/src/procedure/tests.rs | 1 - src/mito/src/table.rs | 4 +- src/mito/src/table/test_util.rs | 1 - src/mito2/src/engine/alter_test.rs | 2 +- src/mito2/src/memtable/time_series.rs | 2 +- src/mito2/src/test_util.rs | 8 +- src/mito2/src/wal.rs | 2 +- src/query/src/datafusion.rs | 1 - src/query/src/dist_plan/merge_scan.rs | 7 +- src/script/src/table.rs | 1 - src/servers/src/influxdb.rs | 7 +- src/servers/src/line_writer.rs | 1 - src/servers/src/opentsdb/codec.rs | 9 +- src/servers/src/otlp.rs | 5 -- src/servers/src/prom_store.rs | 27 ++++-- src/servers/src/row_writer.rs | 5 +- src/servers/tests/interceptor.rs | 3 +- src/session/src/context.rs | 6 ++ src/table/src/requests.rs | 2 - tests-integration/src/grpc.rs | 16 ++-- tests-integration/tests/grpc.rs | 2 +- 49 files changed, 161 insertions(+), 235 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c54444c6f60d..27234d636a53 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4099,7 +4099,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=1969b5d610209ad1c2ad8e4cd3e9c3c24e40f1c2#1969b5d610209ad1c2ad8e4cd3e9c3c24e40f1c2" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=81495b166b2c8909f05b3fcaa09eb299bb43a995#81495b166b2c8909f05b3fcaa09eb299bb43a995" dependencies = [ "prost", "serde", diff --git a/Cargo.toml b/Cargo.toml index be1cf2d8bdd2..b377eef2f5f7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,7 +78,7 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git derive_builder = "0.12" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "1969b5d610209ad1c2ad8e4cd3e9c3c24e40f1c2" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "81495b166b2c8909f05b3fcaa09eb299bb43a995" } humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4" diff --git a/benchmarks/src/bin/nyc-taxi.rs b/benchmarks/src/bin/nyc-taxi.rs index 1d24f5741db9..648541db1e24 100644 --- a/benchmarks/src/bin/nyc-taxi.rs +++ b/benchmarks/src/bin/nyc-taxi.rs @@ -105,7 +105,6 @@ async fn write_data( let (columns, row_count) = convert_record_batch(record_batch); let request = InsertRequest { table_name: TABLE_NAME.to_string(), - region_number: 0, columns, row_count, }; @@ -189,7 +188,7 @@ fn build_values(column: &ArrayRef) -> (Values, ColumnDataType) { let values = array.values(); ( Values { - ts_microsecond_values: values.to_vec(), + timestamp_microsecond_values: values.to_vec(), ..Default::default() }, ColumnDataType::TimestampMicrosecond, @@ -389,7 +388,6 @@ fn create_table_expr() -> CreateTableExpr { primary_keys: vec!["VendorID".to_string()], create_if_not_exists: false, table_options: Default::default(), - region_numbers: vec![0], table_id: None, engine: "mito".to_string(), } diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index 17aa41d0ec33..f9ac18ed0c14 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -214,19 +214,19 @@ pub fn values_with_capacity(datatype: ColumnDataType, capacity: usize) -> Values ..Default::default() }, ColumnDataType::TimestampSecond => Values { - ts_second_values: Vec::with_capacity(capacity), + timestamp_second_values: Vec::with_capacity(capacity), ..Default::default() }, ColumnDataType::TimestampMillisecond => Values { - ts_millisecond_values: Vec::with_capacity(capacity), + timestamp_millisecond_values: Vec::with_capacity(capacity), ..Default::default() }, ColumnDataType::TimestampMicrosecond => Values { - ts_microsecond_values: Vec::with_capacity(capacity), + timestamp_microsecond_values: Vec::with_capacity(capacity), ..Default::default() }, ColumnDataType::TimestampNanosecond => Values { - ts_nanosecond_values: Vec::with_capacity(capacity), + timestamp_nanosecond_values: Vec::with_capacity(capacity), ..Default::default() }, ColumnDataType::TimeSecond => Values { @@ -286,10 +286,10 @@ pub fn push_vals(column: &mut Column, origin_count: usize, vector: VectorRef) { Value::Date(val) => values.date_values.push(val.val()), Value::DateTime(val) => values.datetime_values.push(val.val()), Value::Timestamp(val) => match val.unit() { - TimeUnit::Second => values.ts_second_values.push(val.value()), - TimeUnit::Millisecond => values.ts_millisecond_values.push(val.value()), - TimeUnit::Microsecond => values.ts_microsecond_values.push(val.value()), - TimeUnit::Nanosecond => values.ts_nanosecond_values.push(val.value()), + TimeUnit::Second => values.timestamp_second_values.push(val.value()), + TimeUnit::Millisecond => values.timestamp_millisecond_values.push(val.value()), + TimeUnit::Microsecond => values.timestamp_microsecond_values.push(val.value()), + TimeUnit::Nanosecond => values.timestamp_nanosecond_values.push(val.value()), }, Value::Time(val) => match val.unit() { TimeUnit::Second => values.time_second_values.push(val.value()), @@ -375,10 +375,16 @@ pub fn pb_value_to_value_ref(value: &v1::Value) -> ValueRef { ValueData::StringValue(string) => ValueRef::String(string.as_str()), ValueData::DateValue(d) => ValueRef::Date(Date::from(*d)), ValueData::DatetimeValue(d) => ValueRef::DateTime(DateTime::new(*d)), - ValueData::TsSecondValue(t) => ValueRef::Timestamp(Timestamp::new_second(*t)), - ValueData::TsMillisecondValue(t) => ValueRef::Timestamp(Timestamp::new_millisecond(*t)), - ValueData::TsMicrosecondValue(t) => ValueRef::Timestamp(Timestamp::new_microsecond(*t)), - ValueData::TsNanosecondValue(t) => ValueRef::Timestamp(Timestamp::new_nanosecond(*t)), + ValueData::TimestampSecondValue(t) => ValueRef::Timestamp(Timestamp::new_second(*t)), + ValueData::TimestampMillisecondValue(t) => { + ValueRef::Timestamp(Timestamp::new_millisecond(*t)) + } + ValueData::TimestampMicrosecondValue(t) => { + ValueRef::Timestamp(Timestamp::new_microsecond(*t)) + } + ValueData::TimestampNanosecondValue(t) => { + ValueRef::Timestamp(Timestamp::new_nanosecond(*t)) + } ValueData::TimeSecondValue(t) => ValueRef::Time(Time::new_second(*t)), ValueData::TimeMillisecondValue(t) => ValueRef::Time(Time::new_millisecond(*t)), ValueData::TimeMicrosecondValue(t) => ValueRef::Time(Time::new_microsecond(*t)), @@ -418,17 +424,17 @@ pub fn pb_values_to_vector_ref(data_type: &ConcreteDataType, values: Values) -> ConcreteDataType::Date(_) => Arc::new(DateVector::from_vec(values.date_values)), ConcreteDataType::DateTime(_) => Arc::new(DateTimeVector::from_vec(values.datetime_values)), ConcreteDataType::Timestamp(unit) => match unit { - TimestampType::Second(_) => { - Arc::new(TimestampSecondVector::from_vec(values.ts_second_values)) - } + TimestampType::Second(_) => Arc::new(TimestampSecondVector::from_vec( + values.timestamp_second_values, + )), TimestampType::Millisecond(_) => Arc::new(TimestampMillisecondVector::from_vec( - values.ts_millisecond_values, + values.timestamp_millisecond_values, )), TimestampType::Microsecond(_) => Arc::new(TimestampMicrosecondVector::from_vec( - values.ts_microsecond_values, + values.timestamp_microsecond_values, )), TimestampType::Nanosecond(_) => Arc::new(TimestampNanosecondVector::from_vec( - values.ts_nanosecond_values, + values.timestamp_nanosecond_values, )), }, ConcreteDataType::Time(unit) => match unit { @@ -550,22 +556,22 @@ pub fn pb_values_to_values(data_type: &ConcreteDataType, values: Values) -> Vec< .map(|v| Value::Date(v.into())) .collect(), ConcreteDataType::Timestamp(TimestampType::Second(_)) => values - .ts_second_values + .timestamp_second_values .into_iter() .map(|v| Value::Timestamp(Timestamp::new_second(v))) .collect(), ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => values - .ts_millisecond_values + .timestamp_millisecond_values .into_iter() .map(|v| Value::Timestamp(Timestamp::new_millisecond(v))) .collect(), ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => values - .ts_microsecond_values + .timestamp_microsecond_values .into_iter() .map(|v| Value::Timestamp(Timestamp::new_microsecond(v))) .collect(), ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => values - .ts_nanosecond_values + .timestamp_nanosecond_values .into_iter() .map(|v| Value::Timestamp(Timestamp::new_nanosecond(v))) .collect(), @@ -682,16 +688,16 @@ pub fn to_proto_value(value: Value) -> Option { }, Value::Timestamp(v) => match v.unit() { TimeUnit::Second => v1::Value { - value_data: Some(ValueData::TsSecondValue(v.value())), + value_data: Some(ValueData::TimestampSecondValue(v.value())), }, TimeUnit::Millisecond => v1::Value { - value_data: Some(ValueData::TsMillisecondValue(v.value())), + value_data: Some(ValueData::TimestampMillisecondValue(v.value())), }, TimeUnit::Microsecond => v1::Value { - value_data: Some(ValueData::TsMicrosecondValue(v.value())), + value_data: Some(ValueData::TimestampMicrosecondValue(v.value())), }, TimeUnit::Nanosecond => v1::Value { - value_data: Some(ValueData::TsNanosecondValue(v.value())), + value_data: Some(ValueData::TimestampNanosecondValue(v.value())), }, }, Value::Time(v) => match v.unit() { @@ -747,10 +753,10 @@ pub fn proto_value_type(value: &v1::Value) -> Option { ValueData::StringValue(_) => ColumnDataType::String, ValueData::DateValue(_) => ColumnDataType::Date, ValueData::DatetimeValue(_) => ColumnDataType::Datetime, - ValueData::TsSecondValue(_) => ColumnDataType::TimestampSecond, - ValueData::TsMillisecondValue(_) => ColumnDataType::TimestampMillisecond, - ValueData::TsMicrosecondValue(_) => ColumnDataType::TimestampMicrosecond, - ValueData::TsNanosecondValue(_) => ColumnDataType::TimestampNanosecond, + ValueData::TimestampSecondValue(_) => ColumnDataType::TimestampSecond, + ValueData::TimestampMillisecondValue(_) => ColumnDataType::TimestampMillisecond, + ValueData::TimestampMicrosecondValue(_) => ColumnDataType::TimestampMicrosecond, + ValueData::TimestampNanosecondValue(_) => ColumnDataType::TimestampNanosecond, ValueData::TimeSecondValue(_) => ColumnDataType::TimeSecond, ValueData::TimeMillisecondValue(_) => ColumnDataType::TimeMillisecond, ValueData::TimeMicrosecondValue(_) => ColumnDataType::TimeMicrosecond, @@ -837,10 +843,10 @@ pub fn value_to_grpc_value(value: Value) -> GrpcValue { Value::Date(v) => Some(ValueData::DateValue(v.val())), Value::DateTime(v) => Some(ValueData::DatetimeValue(v.val())), Value::Timestamp(v) => Some(match v.unit() { - TimeUnit::Second => ValueData::TsSecondValue(v.value()), - TimeUnit::Millisecond => ValueData::TsMillisecondValue(v.value()), - TimeUnit::Microsecond => ValueData::TsMicrosecondValue(v.value()), - TimeUnit::Nanosecond => ValueData::TsNanosecondValue(v.value()), + TimeUnit::Second => ValueData::TimestampSecondValue(v.value()), + TimeUnit::Millisecond => ValueData::TimestampMillisecondValue(v.value()), + TimeUnit::Microsecond => ValueData::TimestampMicrosecondValue(v.value()), + TimeUnit::Nanosecond => ValueData::TimestampNanosecondValue(v.value()), }), Value::Time(v) => Some(match v.unit() { TimeUnit::Second => ValueData::TimeSecondValue(v.value()), @@ -943,7 +949,7 @@ mod tests { assert_eq!(2, values.capacity()); let values = values_with_capacity(ColumnDataType::TimestampMillisecond, 2); - let values = values.ts_millisecond_values; + let values = values.timestamp_millisecond_values; assert_eq!(2, values.capacity()); let values = values_with_capacity(ColumnDataType::TimeMillisecond, 2); @@ -1162,28 +1168,28 @@ mod tests { push_vals(&mut column, 3, vector); assert_eq!( vec![1, 2, 3], - column.values.as_ref().unwrap().ts_nanosecond_values + column.values.as_ref().unwrap().timestamp_nanosecond_values ); let vector = Arc::new(TimestampMillisecondVector::from_vec(vec![4, 5, 6])); push_vals(&mut column, 3, vector); assert_eq!( vec![4, 5, 6], - column.values.as_ref().unwrap().ts_millisecond_values + column.values.as_ref().unwrap().timestamp_millisecond_values ); let vector = Arc::new(TimestampMicrosecondVector::from_vec(vec![7, 8, 9])); push_vals(&mut column, 3, vector); assert_eq!( vec![7, 8, 9], - column.values.as_ref().unwrap().ts_microsecond_values + column.values.as_ref().unwrap().timestamp_microsecond_values ); let vector = Arc::new(TimestampSecondVector::from_vec(vec![10, 11, 12])); push_vals(&mut column, 3, vector); assert_eq!( vec![10, 11, 12], - column.values.as_ref().unwrap().ts_second_values + column.values.as_ref().unwrap().timestamp_second_values ); } @@ -1312,7 +1318,7 @@ mod tests { let actual = pb_values_to_values( &ConcreteDataType::Timestamp(TimestampType::Second(TimestampSecondType)), Values { - ts_second_values: vec![1_i64, 2_i64, 3_i64], + timestamp_second_values: vec![1_i64, 2_i64, 3_i64], ..Default::default() }, ); @@ -1327,7 +1333,7 @@ mod tests { let actual = pb_values_to_values( &ConcreteDataType::Timestamp(TimestampType::Millisecond(TimestampMillisecondType)), Values { - ts_millisecond_values: vec![1_i64, 2_i64, 3_i64], + timestamp_millisecond_values: vec![1_i64, 2_i64, 3_i64], ..Default::default() }, ); diff --git a/src/catalog/src/system.rs b/src/catalog/src/system.rs index 624b1c697672..b3021790d346 100644 --- a/src/catalog/src/system.rs +++ b/src/catalog/src/system.rs @@ -262,7 +262,6 @@ pub fn build_insert_request(entry_type: EntryType, key: &[u8], value: &[u8]) -> schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: SYSTEM_CATALOG_TABLE_NAME.to_string(), columns_values, - region_number: 0, // system catalog table has only one region } } diff --git a/src/client/examples/logical.rs b/src/client/examples/logical.rs index 0524d562ec9f..d13c4844c072 100644 --- a/src/client/examples/logical.rs +++ b/src/client/examples/logical.rs @@ -66,7 +66,6 @@ async fn run() { create_if_not_exists: false, table_options: Default::default(), table_id: Some(TableId { id: 1024 }), - region_numbers: vec![0], engine: MITO_ENGINE.to_string(), }; diff --git a/src/client/examples/stream_ingest.rs b/src/client/examples/stream_ingest.rs index 827f8e85c014..7b67cf33931e 100644 --- a/src/client/examples/stream_ingest.rs +++ b/src/client/examples/stream_ingest.rs @@ -131,7 +131,7 @@ fn to_insert_request(records: Vec) -> InsertRequest { Column { column_name: "ts".to_owned(), values: Some(column::Values { - ts_millisecond_values: timestamp_millis, + timestamp_millisecond_values: timestamp_millis, ..Default::default() }), semantic_type: SemanticType::Timestamp as i32, @@ -177,6 +177,5 @@ fn to_insert_request(records: Vec) -> InsertRequest { table_name: "weather_demo".to_owned(), columns, row_count: rows as u32, - ..Default::default() } } diff --git a/src/common/grpc-expr/src/alter.rs b/src/common/grpc-expr/src/alter.rs index cb5cafb91927..8e6ef3bc9af2 100644 --- a/src/common/grpc-expr/src/alter.rs +++ b/src/common/grpc-expr/src/alter.rs @@ -18,18 +18,15 @@ use api::v1::{ column_def, AddColumnLocation as Location, AlterExpr, CreateTableExpr, DropColumns, RenameTable, SemanticType, }; -use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_query::AddColumnLocation; use datatypes::schema::{ColumnSchema, RawSchema}; use snafu::{ensure, OptionExt, ResultExt}; use table::metadata::TableId; -use table::requests::{ - AddColumnRequest, AlterKind, AlterTableRequest, CreateTableRequest, TableOptions, -}; +use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest}; use crate::error::{ - ColumnNotFoundSnafu, InvalidColumnDefSnafu, MissingFieldSnafu, MissingTimestampColumnSnafu, - Result, UnknownLocationTypeSnafu, UnrecognizedTableOptionSnafu, + InvalidColumnDefSnafu, MissingFieldSnafu, MissingTimestampColumnSnafu, Result, + UnknownLocationTypeSnafu, }; const LOCATION_TYPE_FIRST: i32 = LocationType::First as i32; @@ -121,65 +118,6 @@ pub fn create_table_schema(expr: &CreateTableExpr, require_time_index: bool) -> Ok(RawSchema::new(column_schemas)) } -pub fn create_expr_to_request( - table_id: TableId, - expr: CreateTableExpr, - require_time_index: bool, -) -> Result { - let schema = create_table_schema(&expr, require_time_index)?; - let primary_key_indices = expr - .primary_keys - .iter() - .map(|key| { - // We do a linear search here. - schema - .column_schemas - .iter() - .position(|column_schema| column_schema.name == *key) - .context(ColumnNotFoundSnafu { - column_name: key, - table_name: &expr.table_name, - }) - }) - .collect::>>()?; - - let mut catalog_name = expr.catalog_name; - if catalog_name.is_empty() { - catalog_name = DEFAULT_CATALOG_NAME.to_string(); - } - let mut schema_name = expr.schema_name; - if schema_name.is_empty() { - schema_name = DEFAULT_SCHEMA_NAME.to_string(); - } - let desc = if expr.desc.is_empty() { - None - } else { - Some(expr.desc) - }; - - let region_numbers = if expr.region_numbers.is_empty() { - vec![0] - } else { - expr.region_numbers - }; - - let table_options = - TableOptions::try_from(&expr.table_options).context(UnrecognizedTableOptionSnafu)?; - Ok(CreateTableRequest { - id: table_id, - catalog_name, - schema_name, - table_name: expr.table_name, - desc, - schema, - region_numbers, - primary_key_indices, - create_if_not_exists: expr.create_if_not_exists, - table_options, - engine: expr.engine, - }) -} - fn parse_location(location: Option) -> Result> { match location { Some(Location { diff --git a/src/common/grpc-expr/src/delete.rs b/src/common/grpc-expr/src/delete.rs index 18a480e8a4c3..d272b6aa0bc0 100644 --- a/src/common/grpc-expr/src/delete.rs +++ b/src/common/grpc-expr/src/delete.rs @@ -79,7 +79,6 @@ mod tests { fn test_to_table_delete_request() { let grpc_request = GrpcDeleteRequest { table_name: "foo".to_string(), - region_number: 0, key_columns: vec![ Column { column_name: "id".to_string(), diff --git a/src/common/grpc-expr/src/error.rs b/src/common/grpc-expr/src/error.rs index c364b7e9426d..25bf50b90616 100644 --- a/src/common/grpc-expr/src/error.rs +++ b/src/common/grpc-expr/src/error.rs @@ -55,6 +55,7 @@ pub enum Error { #[snafu(display("Invalid column proto: {}", err_msg))] InvalidColumnProto { err_msg: String, location: Location }, + #[snafu(display("Failed to create vector, source: {}", source))] CreateVector { location: Location, diff --git a/src/common/grpc-expr/src/insert.rs b/src/common/grpc-expr/src/insert.rs index cad3313773aa..548766d1117d 100644 --- a/src/common/grpc-expr/src/insert.rs +++ b/src/common/grpc-expr/src/insert.rs @@ -96,7 +96,6 @@ pub fn to_table_insert_request( schema_name: schema_name.to_string(), table_name: table_name.to_string(), columns_values, - region_number: request.region_number, }) } @@ -363,7 +362,6 @@ mod tests { table_name: "demo".to_string(), columns, row_count, - region_number: 0, }; let insert_req = to_table_insert_request("greptime", "public", request).unwrap(); @@ -476,7 +474,7 @@ mod tests { }; let ts_vals = Values { - ts_millisecond_values: vec![100, 101], + timestamp_millisecond_values: vec![100, 101], ..Default::default() }; let ts_column = Column { diff --git a/src/common/grpc-expr/src/lib.rs b/src/common/grpc-expr/src/lib.rs index cd3d9540b3eb..7a2fea237b52 100644 --- a/src/common/grpc-expr/src/lib.rs +++ b/src/common/grpc-expr/src/lib.rs @@ -18,5 +18,5 @@ pub mod error; pub mod insert; pub mod util; -pub use alter::{alter_expr_to_request, create_expr_to_request, create_table_schema}; +pub use alter::{alter_expr_to_request, create_table_schema}; pub use insert::{build_create_expr_from_insertion, find_new_columns}; diff --git a/src/common/grpc-expr/src/util.rs b/src/common/grpc-expr/src/util.rs index d2a2b79e8ac3..641b4022472e 100644 --- a/src/common/grpc-expr/src/util.rs +++ b/src/common/grpc-expr/src/util.rs @@ -139,8 +139,6 @@ pub fn build_create_table_expr( create_if_not_exists: true, table_options: Default::default(), table_id: table_id.map(|id| api::v1::TableId { id }), - // TODO(hl): region number should be allocated by frontend - region_numbers: vec![0], engine: engine.to_string(), }; diff --git a/src/common/grpc/src/select.rs b/src/common/grpc/src/select.rs index c0caabdfa7e5..d16f2f37177c 100644 --- a/src/common/grpc/src/select.rs +++ b/src/common/grpc/src/select.rs @@ -150,25 +150,25 @@ pub fn values(arrays: &[VectorRef]) -> Result { ( ConcreteDataType::Timestamp(TimestampType::Second(_)), TimestampSecondVector, - ts_second_values, + timestamp_second_values, |x| { x.into_native() } ), ( ConcreteDataType::Timestamp(TimestampType::Millisecond(_)), TimestampMillisecondVector, - ts_millisecond_values, + timestamp_millisecond_values, |x| { x.into_native() } ), ( ConcreteDataType::Timestamp(TimestampType::Microsecond(_)), TimestampMicrosecondVector, - ts_microsecond_values, + timestamp_microsecond_values, |x| { x.into_native() } ), ( ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)), TimestampNanosecondVector, - ts_nanosecond_values, + timestamp_nanosecond_values, |x| { x.into_native() } ), ( diff --git a/src/common/grpc/src/writer.rs b/src/common/grpc/src/writer.rs index 0ea270d388bc..7efa3dfd5cad 100644 --- a/src/common/grpc/src/writer.rs +++ b/src/common/grpc/src/writer.rs @@ -62,7 +62,7 @@ impl LinesWriter { // It is safe to use unwrap here, because values has been initialized in mut_column() let values = column.values.as_mut().unwrap(); values - .ts_millisecond_values + .timestamp_millisecond_values .push(to_ms_ts(value.1, value.0)); self.null_masks[idx].push(false); Ok(()) @@ -360,7 +360,7 @@ mod tests { assert_eq!(SemanticType::Timestamp as i32, column.semantic_type); assert_eq!( vec![101011000, 102011001, 103011002], - column.values.as_ref().unwrap().ts_millisecond_values + column.values.as_ref().unwrap().timestamp_millisecond_values ); verify_null_mask(&column.null_mask, vec![false, false, false]); diff --git a/src/common/meta/src/ident.rs b/src/common/meta/src/ident.rs index f5015d12ab70..04ede7e5c18e 100644 --- a/src/common/meta/src/ident.rs +++ b/src/common/meta/src/ident.rs @@ -14,13 +14,9 @@ use std::fmt::{Display, Formatter}; -use api::v1::meta::{TableIdent as RawTableIdent, TableName}; use serde::{Deserialize, Serialize}; -use snafu::OptionExt; use table::engine::TableReference; -use crate::error::{Error, InvalidProtoMsgSnafu}; - #[derive(Eq, Hash, PartialEq, Clone, Debug, Default, Serialize, Deserialize)] pub struct TableIdent { pub catalog: String, @@ -45,34 +41,3 @@ impl Display for TableIdent { ) } } - -impl TryFrom for TableIdent { - type Error = Error; - - fn try_from(value: RawTableIdent) -> Result { - let table_name = value.table_name.context(InvalidProtoMsgSnafu { - err_msg: "'table_name' is missing in TableIdent", - })?; - Ok(Self { - catalog: table_name.catalog_name, - schema: table_name.schema_name, - table: table_name.table_name, - table_id: value.table_id, - engine: value.engine, - }) - } -} - -impl From for RawTableIdent { - fn from(table_ident: TableIdent) -> Self { - Self { - table_id: table_ident.table_id, - engine: table_ident.engine, - table_name: Some(TableName { - catalog_name: table_ident.catalog, - schema_name: table_ident.schema, - table_name: table_ident.table, - }), - } - } -} diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index d4b6865d923c..817195e60172 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -164,6 +164,9 @@ pub enum Error { source: table::error::Error, }, + #[snafu(display("Missing request header"))] + MissingRequestHeader { location: Location }, + #[snafu(display("Table not found: {}", table_name))] TableNotFound { table_name: String, @@ -628,7 +631,8 @@ impl ErrorExt for Error { | MissingWalDirConfig { .. } | PrepareImmutableTable { .. } | InvalidInsertRowLen { .. } - | ColumnDataType { .. } => StatusCode::InvalidArguments, + | ColumnDataType { .. } + | MissingRequestHeader { .. } => StatusCode::InvalidArguments, EncodeJson { .. } | DecodeJson { .. } | PayloadNotExist { .. } | Unexpected { .. } => { StatusCode::Unexpected diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index 2213ff8388bb..da30cd74fd80 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -864,7 +864,7 @@ mod test { Column { column_name: "ts".to_string(), values: Some(Values { - ts_millisecond_values: vec![1672384140000, 1672384141000, 1672384142000], + timestamp_millisecond_values: vec![1672384140000, 1672384141000, 1672384142000], ..Default::default() }), semantic_type: SemanticType::Timestamp as i32, @@ -937,7 +937,7 @@ mod test { Column { column_name: "ts".to_string(), values: Some(Values { - ts_millisecond_values: vec![1672201026000], + timestamp_millisecond_values: vec![1672201026000], ..Default::default() }), datatype: ColumnDataType::TimestampMillisecond as i32, @@ -962,7 +962,7 @@ mod test { Column { column_name: "ts".to_string(), values: Some(Values { - ts_millisecond_values: vec![1672201026000], + timestamp_millisecond_values: vec![1672201026000], ..Default::default() }), datatype: ColumnDataType::TimestampMillisecond as i32, diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 7580a8307ffc..4da960c96b42 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -56,8 +56,9 @@ use tonic::{Request, Response, Result as TonicResult}; use crate::error::{ BuildRegionRequestsSnafu, DecodeLogicalPlanSnafu, ExecuteLogicalPlanSnafu, - GetRegionMetadataSnafu, HandleRegionRequestSnafu, RegionEngineNotFoundSnafu, - RegionNotFoundSnafu, Result, StopRegionEngineSnafu, UnsupportedOutputSnafu, + GetRegionMetadataSnafu, HandleRegionRequestSnafu, MissingRequestHeaderSnafu, + RegionEngineNotFoundSnafu, RegionNotFoundSnafu, Result, StopRegionEngineSnafu, + UnsupportedOutputSnafu, }; #[derive(Clone)] @@ -66,6 +67,7 @@ pub struct RegionServer { } impl RegionServer { + /// Create and start a region server. pub fn new(query_engine: QueryEngineRef, runtime: Arc) -> Self { Self { inner: Arc::new(RegionServerInner::new(query_engine, runtime)), @@ -88,10 +90,12 @@ impl RegionServer { self.inner.handle_read(request).await } + /// Retrieve all opened region ids. pub fn opened_region_ids(&self) -> Vec { self.inner.region_map.iter().map(|e| *e.key()).collect() } + /// Get a copy of inner [Runtime] pub fn runtime(&self) -> Arc { self.inner.runtime.clone() } @@ -248,8 +252,13 @@ impl RegionServerInner { pub async fn handle_read(&self, request: QueryRequest) -> Result { // TODO(ruihang): add metrics and set trace id - let QueryRequest { region_id, plan } = request; + let QueryRequest { + header, + region_id, + plan, + } = request; let region_id = RegionId::from_u64(region_id); + let ctx = QueryContext::with_trace_id(header.context(MissingRequestHeaderSnafu)?.trace_id); // build dummy catalog list let engine = self @@ -266,7 +275,7 @@ impl RegionServerInner { .context(DecodeLogicalPlanSnafu)?; let result = self .query_engine - .execute(logical_plan.into(), QueryContext::arc()) + .execute(logical_plan.into(), ctx) .await .context(ExecuteLogicalPlanSnafu)?; diff --git a/src/frontend/src/expr_factory.rs b/src/frontend/src/expr_factory.rs index e74d91ff9fa8..5f52fc2cead0 100644 --- a/src/frontend/src/expr_factory.rs +++ b/src/frontend/src/expr_factory.rs @@ -119,7 +119,6 @@ pub(crate) async fn create_external_expr( create_if_not_exists: create.if_not_exists, table_options: options, table_id: None, - region_numbers: vec![], engine: create.engine.to_string(), }; Ok(expr) @@ -151,7 +150,6 @@ pub fn create_to_expr(create: &CreateTable, query_ctx: QueryContextRef) -> Resul create_if_not_exists: create.if_not_exists, table_options, table_id: None, - region_numbers: vec![], engine: create.engine.to_string(), }; Ok(expr) diff --git a/src/frontend/src/req_convert/common.rs b/src/frontend/src/req_convert/common.rs index 00a07fdae473..698bf342103e 100644 --- a/src/frontend/src/req_convert/common.rs +++ b/src/frontend/src/req_convert/common.rs @@ -114,18 +114,26 @@ fn push_column_to_rows(column: Column, rows: &mut [Row]) -> Result<()> { (String, StringValue, string_values), (Date, DateValue, date_values), (Datetime, DatetimeValue, datetime_values), - (TimestampSecond, TsSecondValue, ts_second_values), + ( + TimestampSecond, + TimestampSecondValue, + timestamp_second_values + ), ( TimestampMillisecond, - TsMillisecondValue, - ts_millisecond_values + TimestampMillisecondValue, + timestamp_millisecond_values ), ( TimestampMicrosecond, - TsMicrosecondValue, - ts_microsecond_values + TimestampMicrosecondValue, + timestamp_microsecond_values + ), + ( + TimestampNanosecond, + TimestampNanosecondValue, + timestamp_nanosecond_values ), - (TimestampNanosecond, TsNanosecondValue, ts_nanosecond_values), (TimeSecond, TimeSecondValue, time_second_values), ( TimeMillisecond, diff --git a/src/frontend/src/req_convert/delete/column_to_row.rs b/src/frontend/src/req_convert/delete/column_to_row.rs index 610d9be48e2f..59564d78a317 100644 --- a/src/frontend/src/req_convert/delete/column_to_row.rs +++ b/src/frontend/src/req_convert/delete/column_to_row.rs @@ -35,6 +35,5 @@ fn request_column_to_row(request: DeleteRequest) -> Result { Ok(RowDeleteRequest { table_name: request.table_name, rows: Some(rows), - region_number: 0, // FIXME(zhongzc): deprecated field }) } diff --git a/src/frontend/src/req_convert/insert/column_to_row.rs b/src/frontend/src/req_convert/insert/column_to_row.rs index adc129219666..285dafd223e1 100644 --- a/src/frontend/src/req_convert/insert/column_to_row.rs +++ b/src/frontend/src/req_convert/insert/column_to_row.rs @@ -35,6 +35,5 @@ fn request_column_to_row(request: InsertRequest) -> Result { Ok(RowInsertRequest { table_name: request.table_name, rows: Some(rows), - region_number: 0, // FIXME(zhongzc): deprecated field }) } diff --git a/src/frontend/src/req_convert/insert/table_to_region.rs b/src/frontend/src/req_convert/insert/table_to_region.rs index 3160a58d0396..427b3d3354e1 100644 --- a/src/frontend/src/req_convert/insert/table_to_region.rs +++ b/src/frontend/src/req_convert/insert/table_to_region.rs @@ -145,7 +145,6 @@ mod tests { schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: "table_1".to_string(), columns_values: HashMap::from([("a".to_string(), vector)]), - region_number: 0, } } diff --git a/src/frontend/src/script.rs b/src/frontend/src/script.rs index 103b64dbd1c3..146f178fb0c2 100644 --- a/src/frontend/src/script.rs +++ b/src/frontend/src/script.rs @@ -190,7 +190,6 @@ mod python { create_if_not_exists: request.create_if_not_exists, table_options: (&request.table_options).into(), table_id: None, // Should and will be assigned by Meta. - region_numbers: vec![0], engine: request.engine, }) } diff --git a/src/frontend/src/statement/copy_table_from.rs b/src/frontend/src/statement/copy_table_from.rs index cb36d41823d3..64acf653b5ab 100644 --- a/src/frontend/src/statement/copy_table_from.rs +++ b/src/frontend/src/statement/copy_table_from.rs @@ -333,7 +333,6 @@ impl StatementExecutor { schema_name: req.schema_name.to_string(), table_name: req.table_name.to_string(), columns_values, - region_number: 0, }, query_ctx.clone(), )); diff --git a/src/frontend/src/statement/dml.rs b/src/frontend/src/statement/dml.rs index f127dda4d48b..8d5279b26e06 100644 --- a/src/frontend/src/statement/dml.rs +++ b/src/frontend/src/statement/dml.rs @@ -182,7 +182,6 @@ fn build_insert_request( schema_name: table_info.schema_name.clone(), table_name: table_info.name.clone(), columns_values, - region_number: 0, }) } diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index 19e33ace2345..f90039c6f7ab 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -509,7 +509,6 @@ pub(crate) mod test { schema_name: "public".to_string(), table_name: "numbers".to_string(), columns_values: Default::default(), - region_number: 0, }; meter_insert_request!(req); diff --git a/src/meta-srv/src/procedure/tests.rs b/src/meta-srv/src/procedure/tests.rs index eb89954af8ac..1f079942fde6 100644 --- a/src/meta-srv/src/procedure/tests.rs +++ b/src/meta-srv/src/procedure/tests.rs @@ -80,7 +80,6 @@ fn create_table_task() -> CreateTableTask { create_if_not_exists: false, table_options: HashMap::new(), table_id: None, - region_numbers: vec![1, 2, 3], engine: MITO2_ENGINE.to_string(), }; diff --git a/src/mito/src/table.rs b/src/mito/src/table.rs index 4d6d1352d534..b863048ab907 100644 --- a/src/mito/src/table.rs +++ b/src/mito/src/table.rs @@ -92,14 +92,14 @@ impl Table for MitoTable { } let regions = self.regions.load(); let region = regions - .get(&request.region_number) + .get(&0) .with_context(|| RegionNotFoundSnafu { table: common_catalog::format_full_table_name( &request.catalog_name, &request.schema_name, &request.table_name, ), - region: request.region_number, + region: 0u32, }) .map_err(BoxedError::new) .context(table_error::TableOperationSnafu)?; diff --git a/src/mito/src/table/test_util.rs b/src/mito/src/table/test_util.rs index 6eb4ac743072..51536bbaff93 100644 --- a/src/mito/src/table/test_util.rs +++ b/src/mito/src/table/test_util.rs @@ -52,7 +52,6 @@ pub fn new_insert_request( schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name, columns_values, - region_number: 0, } } diff --git a/src/mito2/src/engine/alter_test.rs b/src/mito2/src/engine/alter_test.rs index 26faadd293e3..007bdacecd5b 100644 --- a/src/mito2/src/engine/alter_test.rs +++ b/src/mito2/src/engine/alter_test.rs @@ -147,7 +147,7 @@ fn build_rows_for_tags( value_data: Some(ValueData::F64Value((value_start + idx) as f64)), }, api::v1::Value { - value_data: Some(ValueData::TsMillisecondValue(ts as i64 * 1000)), + value_data: Some(ValueData::TimestampMillisecondValue(ts as i64 * 1000)), }, api::v1::Value { value_data: Some(ValueData::StringValue(tag1.to_string())), diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index f59d0b2389f2..b682e8d2ac6a 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -795,7 +795,7 @@ mod tests { value_data: Some(ValueData::I64Value(k1)), }, api::v1::Value { - value_data: Some(ValueData::TsMillisecondValue(i as i64)), + value_data: Some(ValueData::TimestampMillisecondValue(i as i64)), }, api::v1::Value { value_data: Some(ValueData::I64Value(i as i64)), diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 6e55d7f22860..d7566a54add8 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -305,7 +305,7 @@ pub(crate) fn i64_value(data: i64) -> v1::Value { #[cfg(test)] pub(crate) fn ts_ms_value(data: i64) -> v1::Value { v1::Value { - value_data: Some(ValueData::TsMillisecondValue(data)), + value_data: Some(ValueData::TimestampMillisecondValue(data)), } } @@ -465,7 +465,7 @@ pub fn build_rows(start: usize, end: usize) -> Vec { value_data: Some(ValueData::F64Value(i as f64)), }, api::v1::Value { - value_data: Some(ValueData::TsMillisecondValue(i as i64 * 1000)), + value_data: Some(ValueData::TimestampMillisecondValue(i as i64 * 1000)), }, ], }) @@ -517,7 +517,7 @@ pub fn build_rows_for_key(key: &str, start: usize, end: usize, value_start: usiz value_data: Some(ValueData::F64Value((value_start + idx) as f64)), }, api::v1::Value { - value_data: Some(ValueData::TsMillisecondValue(ts as i64 * 1000)), + value_data: Some(ValueData::TimestampMillisecondValue(ts as i64 * 1000)), }, ], }) @@ -533,7 +533,7 @@ pub fn build_delete_rows_for_key(key: &str, start: usize, end: usize) -> Vec for InsertRequests { let (columns, row_count) = writer.finish(); GrpcInsertRequest { table_name, - region_number: 0, columns, row_count, } @@ -284,7 +283,7 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003"; SemanticType::Timestamp, Vec::new(), Values { - ts_millisecond_values: vec![1663840496100, 1663840496400], + timestamp_millisecond_values: vec![1663840496100, 1663840496400], ..Default::default() }, ); @@ -323,7 +322,7 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003"; SemanticType::Timestamp, Vec::new(), Values { - ts_millisecond_values: vec![1663840496100, 1663840496400], + timestamp_millisecond_values: vec![1663840496100, 1663840496400], ..Default::default() }, ); @@ -553,7 +552,7 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003"; fn extract_ts_millis_value(value: &ValueData) -> i64 { match value { - ValueData::TsMillisecondValue(v) => *v, + ValueData::TimestampMillisecondValue(v) => *v, _ => panic!(), } } diff --git a/src/servers/src/line_writer.rs b/src/servers/src/line_writer.rs index 7da1de6bb8df..38ebe218c7a9 100644 --- a/src/servers/src/line_writer.rs +++ b/src/servers/src/line_writer.rs @@ -141,7 +141,6 @@ impl LineWriter { schema_name: self.db, table_name: self.table_name, columns_values, - region_number: 0, // TODO(hl): Check if assign 0 region is ok? } } } diff --git a/src/servers/src/opentsdb/codec.rs b/src/servers/src/opentsdb/codec.rs index bee99c182810..163e060adece 100644 --- a/src/servers/src/opentsdb/codec.rs +++ b/src/servers/src/opentsdb/codec.rs @@ -129,7 +129,7 @@ impl DataPoint { let ts_column = Column { column_name: OPENTSDB_TIMESTAMP_COLUMN_NAME.to_string(), values: Some(column::Values { - ts_millisecond_values: vec![self.ts_millis], + timestamp_millisecond_values: vec![self.ts_millis], ..Default::default() }), semantic_type: SemanticType::Timestamp as i32, @@ -165,7 +165,6 @@ impl DataPoint { GrpcInsertRequest { table_name: self.metric.clone(), - region_number: 0, columns, row_count: 1, } @@ -268,7 +267,11 @@ mod test { assert_eq!(columns[0].column_name, OPENTSDB_TIMESTAMP_COLUMN_NAME); assert_eq!( - columns[0].values.as_ref().unwrap().ts_millisecond_values, + columns[0] + .values + .as_ref() + .unwrap() + .timestamp_millisecond_values, vec![1000] ); diff --git a/src/servers/src/otlp.rs b/src/servers/src/otlp.rs index 05c7a56abccd..2510263ed709 100644 --- a/src/servers/src/otlp.rs +++ b/src/servers/src/otlp.rs @@ -176,7 +176,6 @@ fn encode_gauge( let (columns, row_count) = lines.finish(); Ok(InsertRequest { table_name: normalize_otlp_name(name), - region_number: 0, columns, row_count, }) @@ -208,7 +207,6 @@ fn encode_sum( let (columns, row_count) = lines.finish(); Ok(InsertRequest { table_name: normalize_otlp_name(name), - region_number: 0, columns, row_count, }) @@ -251,7 +249,6 @@ fn encode_histogram(name: &str, hist: &Histogram) -> Result { let (columns, row_count) = lines.finish(); Ok(InsertRequest { table_name: normalize_otlp_name(name), - region_number: 0, columns, row_count, }) @@ -310,7 +307,6 @@ fn encode_exponential_histogram(name: &str, hist: &ExponentialHistogram) -> Resu let (columns, row_count) = lines.finish(); Ok(InsertRequest { table_name: normalize_otlp_name(name), - region_number: 0, columns, row_count, }) @@ -351,7 +347,6 @@ fn encode_summary( let (columns, row_count) = lines.finish(); Ok(InsertRequest { table_name: normalize_otlp_name(name), - region_number: 0, columns, row_count, }) diff --git a/src/servers/src/prom_store.rs b/src/servers/src/prom_store.rs index 813a39045e37..2f49d5e55309 100644 --- a/src/servers/src/prom_store.rs +++ b/src/servers/src/prom_store.rs @@ -412,7 +412,6 @@ pub fn to_grpc_insert_requests(request: WriteRequest) -> Result<(InsertRequests, sample_counts += row_count as usize; GrpcInsertRequest { table_name, - region_number: 0, columns, row_count, } @@ -655,7 +654,9 @@ mod tests { value_data: Some(api::v1::value::ValueData::F64Value(value)), }, api::v1::Value { - value_data: Some(api::v1::value::ValueData::TsMillisecondValue(timestamp)), + value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue( + timestamp, + )), }, ], } @@ -674,7 +675,9 @@ mod tests { value_data: Some(api::v1::value::ValueData::F64Value(value)), }, api::v1::Value { - value_data: Some(api::v1::value::ValueData::TsMillisecondValue(timestamp)), + value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue( + timestamp, + )), }, ], } @@ -782,7 +785,11 @@ mod tests { assert_eq!(columns[0].column_name, TIMESTAMP_COLUMN_NAME); assert_eq!( - columns[0].values.as_ref().unwrap().ts_millisecond_values, + columns[0] + .values + .as_ref() + .unwrap() + .timestamp_millisecond_values, vec![1000, 2000] ); @@ -810,7 +817,11 @@ mod tests { assert_eq!(columns[0].column_name, TIMESTAMP_COLUMN_NAME); assert_eq!( - columns[0].values.as_ref().unwrap().ts_millisecond_values, + columns[0] + .values + .as_ref() + .unwrap() + .timestamp_millisecond_values, vec![1000, 2000] ); @@ -848,7 +859,11 @@ mod tests { ); assert_eq!(columns[1].column_name, TIMESTAMP_COLUMN_NAME); assert_eq!( - columns[1].values.as_ref().unwrap().ts_millisecond_values, + columns[1] + .values + .as_ref() + .unwrap() + .timestamp_millisecond_values, vec![1000, 2000, 3000] ); diff --git a/src/servers/src/row_writer.rs b/src/servers/src/row_writer.rs index ab6b80d23355..5504b34aa1f8 100644 --- a/src/servers/src/row_writer.rs +++ b/src/servers/src/row_writer.rs @@ -108,7 +108,6 @@ impl<'a> MultiTableData<'a> { RowInsertRequest { table_name: table_name.to_string(), rows: Some(Rows { schema, rows }), - ..Default::default() } }) .collect::>(); @@ -227,14 +226,14 @@ pub fn write_ts_precision<'a>( datatype: ColumnDataType::TimestampMillisecond as i32, semantic_type: SemanticType::Timestamp as i32, }); - one_row.push(ValueData::TsMillisecondValue(ts).into()) + one_row.push(ValueData::TimestampMillisecondValue(ts).into()) } else { check_schema( ColumnDataType::TimestampMillisecond, SemanticType::Timestamp, &schema[*index], )?; - one_row[*index].value_data = Some(ValueData::TsMillisecondValue(ts)); + one_row[*index].value_data = Some(ValueData::TimestampMillisecondValue(ts)); } Ok(()) diff --git a/src/servers/tests/interceptor.rs b/src/servers/tests/interceptor.rs index 92fcace38f10..dcb3ddaf8c5a 100644 --- a/src/servers/tests/interceptor.rs +++ b/src/servers/tests/interceptor.rs @@ -54,7 +54,7 @@ impl GrpcQueryInterceptor for NoopInterceptor { match req { Request::Inserts(insert) => { ensure!( - insert.inserts.iter().all(|x| x.region_number == 0), + insert.inserts.iter().all(|x| !x.table_name.is_empty()), NotSupportedSnafu { feat: "region not 0" } @@ -75,7 +75,6 @@ fn test_grpc_interceptor() { let req = Request::Inserts(InsertRequests { inserts: vec![InsertRequest { - region_number: 1, ..Default::default() }], }); diff --git a/src/session/src/context.rs b/src/session/src/context.rs index 219c58e33a04..7c815afa1f3d 100644 --- a/src/session/src/context.rs +++ b/src/session/src/context.rs @@ -62,6 +62,12 @@ impl QueryContext { .build() } + pub fn with_trace_id(trace_id: u64) -> QueryContextRef { + QueryContextBuilder::default() + .try_trace_id(Some(trace_id)) + .build() + } + pub fn with_db_name(db_name: Option<&String>) -> QueryContextRef { let (catalog, schema) = db_name .map(|db| { diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 5407cbac3959..6ec2c69dd5b3 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -246,7 +246,6 @@ pub struct InsertRequest { pub schema_name: String, pub table_name: String, pub columns_values: HashMap, - pub region_number: RegionNumber, } /// Delete (by primary key) request @@ -327,7 +326,6 @@ macro_rules! meter_insert_request { $req.catalog_name.to_string(), $req.schema_name.to_string(), $req.table_name.to_string(), - $req.region_number, $req ); }; diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index 672e40261a92..4ea89466ecca 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -301,7 +301,7 @@ CREATE TABLE {table_name} ( } async fn test_insert_delete_and_query_on_existing_table(instance: &Instance, table_name: &str) { - let ts_millisecond_values = vec![ + let timestamp_millisecond_values = vec![ 1672557972000, 1672557973000, 1672557974000, @@ -335,7 +335,7 @@ CREATE TABLE {table_name} ( Column { column_name: "b".to_string(), values: Some(Values { - string_values: ts_millisecond_values + string_values: timestamp_millisecond_values .iter() .map(|x| format!("ts: {x}")) .collect(), @@ -348,7 +348,7 @@ CREATE TABLE {table_name} ( Column { column_name: "ts".to_string(), values: Some(Values { - ts_millisecond_values, + timestamp_millisecond_values, ..Default::default() }), semantic_type: SemanticType::Timestamp as i32, @@ -429,7 +429,7 @@ CREATE TABLE {table_name} ( column_name: "ts".to_string(), semantic_type: SemanticType::Timestamp as i32, values: Some(Values { - ts_millisecond_values: ts, + timestamp_millisecond_values: ts, ..Default::default() }), datatype: ColumnDataType::TimestampMillisecond as i32, @@ -559,7 +559,7 @@ CREATE TABLE {table_name} ( Column { column_name: "ts".to_string(), values: Some(Values { - ts_millisecond_values: vec![1672557975000, 1672557976000, 1672557977000], + timestamp_millisecond_values: vec![1672557975000, 1672557976000, 1672557977000], ..Default::default() }), semantic_type: SemanticType::Timestamp as i32, @@ -594,7 +594,7 @@ CREATE TABLE {table_name} ( Column { column_name: "ts".to_string(), values: Some(Values { - ts_millisecond_values: vec![1672557978000, 1672557979000, 1672557980000], + timestamp_millisecond_values: vec![1672557978000, 1672557979000, 1672557980000], ..Default::default() }), semantic_type: SemanticType::Timestamp as i32, @@ -642,7 +642,7 @@ CREATE TABLE {table_name} ( key_columns: vec![Column { column_name: "ts".to_string(), values: Some(Values { - ts_millisecond_values: vec![1672557975000, 1672557979000], + timestamp_millisecond_values: vec![1672557975000, 1672557979000], ..Default::default() }), semantic_type: SemanticType::Timestamp as i32, @@ -724,7 +724,7 @@ CREATE TABLE {table_name} ( Column { column_name: "ts".to_string(), values: Some(Values { - ts_millisecond_values: vec![ + timestamp_millisecond_values: vec![ 1672557972000, 1672557973000, 1672557974000, diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index bc3f8abf128c..3bcfbc08903a 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -218,7 +218,7 @@ fn expect_data() -> (Column, Column, Column, Column) { let expected_ts_col = Column { column_name: "ts".to_string(), values: Some(column::Values { - ts_millisecond_values: vec![100, 101, 102, 103], + timestamp_millisecond_values: vec![100, 101, 102, 103], ..Default::default() }), semantic_type: SemanticType::Timestamp as i32,