From 7bb8a5999cce874614e6feba81fcd82d95e97ce5 Mon Sep 17 00:00:00 2001 From: JeremyHi Date: Thu, 14 Sep 2023 17:50:35 +0800 Subject: [PATCH 1/3] feat!: add engine name to DatanodeTableValue (#2395) * feat: add engine name to DatanodeTableValue * fix: by cr --- src/cmd/src/cli/upgrade.rs | 6 +++++- src/common/meta/src/key.rs | 15 +++++++++++---- src/common/meta/src/key/datanode_table.rs | 18 ++++++++++++------ .../region_failover/update_metadata.rs | 3 ++- 4 files changed, 30 insertions(+), 12 deletions(-) diff --git a/src/cmd/src/cli/upgrade.rs b/src/cmd/src/cli/upgrade.rs index 2d9b8fe09dbf..cee58afb0c5f 100644 --- a/src/cmd/src/cli/upgrade.rs +++ b/src/cmd/src/cli/upgrade.rs @@ -386,6 +386,7 @@ impl MigrateTableMetadata { async fn create_datanode_table_keys(&self, value: &TableGlobalValue) { let table_id = value.table_id(); + let engine = value.table_info.meta.engine.as_str(); let region_distribution: RegionDistribution = value.regions_id_map.clone().into_iter().collect(); @@ -394,7 +395,10 @@ impl MigrateTableMetadata { .map(|(datanode_id, regions)| { let k = DatanodeTableKey::new(datanode_id, table_id); info!("Creating DatanodeTableKey '{k}' => {regions:?}"); - (k, DatanodeTableValue::new(table_id, regions)) + ( + k, + DatanodeTableValue::new(table_id, regions, engine.to_string()), + ) }) .collect::>(); diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 66b80f56b128..3e5f5d82fde6 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -241,6 +241,7 @@ impl TableMetadataManager { .collect::>(); table_info.meta.region_numbers = region_numbers; let table_id = table_info.ident.table_id; + let engine = table_info.meta.engine.clone(); // Creates table name. let table_name = TableNameKey::new( @@ -260,9 +261,9 @@ impl TableMetadataManager { // Creates datanode table key value pairs. let distribution = region_distribution(®ion_routes)?; - let create_datanode_table_txn = self - .datanode_table_manager() - .build_create_txn(table_id, distribution)?; + let create_datanode_table_txn = + self.datanode_table_manager() + .build_create_txn(table_id, &engine, distribution)?; // Creates table route. let table_route_value = TableRouteValue::new(region_routes); @@ -439,6 +440,7 @@ impl TableMetadataManager { pub async fn update_table_route( &self, table_id: TableId, + engine: &str, current_table_route_value: TableRouteValue, new_region_routes: Vec, ) -> Result<()> { @@ -449,6 +451,7 @@ impl TableMetadataManager { let update_datanode_table_txn = self.datanode_table_manager().build_update_txn( table_id, + engine, current_region_distribution, new_region_distribution, )?; @@ -863,6 +866,7 @@ mod tests { let table_info: RawTableInfo = new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into(); let table_id = table_info.ident.table_id; + let engine = table_info.meta.engine.as_str(); let current_table_route_value = TableRouteValue::new(region_routes.clone()); // creates metadata. table_metadata_manager @@ -879,6 +883,7 @@ mod tests { table_metadata_manager .update_table_route( table_id, + engine, current_table_route_value.clone(), new_region_routes.clone(), ) @@ -890,6 +895,7 @@ mod tests { table_metadata_manager .update_table_route( table_id, + engine, current_table_route_value.clone(), new_region_routes.clone(), ) @@ -902,6 +908,7 @@ mod tests { table_metadata_manager .update_table_route( table_id, + engine, current_table_route_value.clone(), new_region_routes.clone(), ) @@ -918,7 +925,7 @@ mod tests { new_region_route(4, 4), ]); assert!(table_metadata_manager - .update_table_route(table_id, wrong_table_route_value, new_region_routes) + .update_table_route(table_id, engine, wrong_table_route_value, new_region_routes) .await .is_err()); } diff --git a/src/common/meta/src/key/datanode_table.rs b/src/common/meta/src/key/datanode_table.rs index 8e92edb5b352..4641f9769b33 100644 --- a/src/common/meta/src/key/datanode_table.rs +++ b/src/common/meta/src/key/datanode_table.rs @@ -85,14 +85,16 @@ impl TableMetaKey for DatanodeTableKey { pub struct DatanodeTableValue { pub table_id: TableId, pub regions: Vec, + pub engine: String, version: u64, } impl DatanodeTableValue { - pub fn new(table_id: TableId, regions: Vec) -> Self { + pub fn new(table_id: TableId, regions: Vec, engine: String) -> Self { Self { table_id, regions, + engine, version: 0, } } @@ -143,13 +145,14 @@ impl DatanodeTableManager { pub fn build_create_txn( &self, table_id: TableId, + engine: &str, distribution: RegionDistribution, ) -> Result { let txns = distribution .into_iter() .map(|(datanode_id, regions)| { let key = DatanodeTableKey::new(datanode_id, table_id); - let val = DatanodeTableValue::new(table_id, regions); + let val = DatanodeTableValue::new(table_id, regions, engine.to_string()); Ok(TxnOp::Put(key.as_raw_key(), val.try_as_raw_value()?)) }) @@ -164,6 +167,7 @@ impl DatanodeTableManager { pub(crate) fn build_update_txn( &self, table_id: TableId, + engine: &str, current_region_distribution: RegionDistribution, new_region_distribution: RegionDistribution, ) -> Result { @@ -184,14 +188,16 @@ impl DatanodeTableManager { if *current_region != regions { let key = DatanodeTableKey::new(datanode, table_id); let raw_key = key.as_raw_key(); - let val = DatanodeTableValue::new(table_id, regions).try_as_raw_value()?; + let val = DatanodeTableValue::new(table_id, regions, engine.to_string()) + .try_as_raw_value()?; opts.push(TxnOp::Put(raw_key, val)); } } else { // New datanodes let key = DatanodeTableKey::new(datanode, table_id); let raw_key = key.as_raw_key(); - let val = DatanodeTableValue::new(table_id, regions).try_as_raw_value()?; + let val = DatanodeTableValue::new(table_id, regions, engine.to_string()) + .try_as_raw_value()?; opts.push(TxnOp::Put(raw_key, val)); } } @@ -224,7 +230,6 @@ impl DatanodeTableManager { #[cfg(test)] mod tests { - use super::*; #[test] @@ -239,9 +244,10 @@ mod tests { let value = DatanodeTableValue { table_id: 42, regions: vec![1, 2, 3], + engine: Default::default(), version: 1, }; - let literal = br#"{"table_id":42,"regions":[1,2,3],"version":1}"#; + let literal = br#"{"table_id":42,"regions":[1,2,3],"engine":"","version":1}"#; let raw_value = value.try_as_raw_value().unwrap(); assert_eq!(raw_value, literal); diff --git a/src/meta-srv/src/procedure/region_failover/update_metadata.rs b/src/meta-srv/src/procedure/region_failover/update_metadata.rs index c3d7fad01f13..0cb40afc5d11 100644 --- a/src/meta-srv/src/procedure/region_failover/update_metadata.rs +++ b/src/meta-srv/src/procedure/region_failover/update_metadata.rs @@ -58,6 +58,7 @@ impl UpdateRegionMetadata { failed_region: &RegionIdent, ) -> Result<()> { let table_id = failed_region.table_ident.table_id; + let engine = failed_region.table_ident.engine.as_str(); let table_route_value = ctx .table_metadata_manager @@ -85,7 +86,7 @@ impl UpdateRegionMetadata { ); ctx.table_metadata_manager - .update_table_route(table_id, table_route_value, new_region_routes) + .update_table_route(table_id, engine, table_route_value, new_region_routes) .await .context(error::UpdateTableRouteSnafu)?; From a84a8ad04fff72e6f05bfc5b98b8c35a764c0275 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Thu, 14 Sep 2023 19:50:00 +0800 Subject: [PATCH 2/3] fix: alter table procedure panics while renaming table (#2397) * fix: procedure panic on renaming table * test: fix test_insert_and_select invalid arguments * test: fix test_standalone_insert_and_query using wrong semantic type * test: fix test_distributed_insert_delete_and_query semantic type --- src/common/meta/src/ddl/alter_table.rs | 16 +++++++++------- tests-integration/src/grpc.rs | 9 +++++---- tests-integration/tests/grpc.rs | 8 ++++---- 3 files changed, 18 insertions(+), 15 deletions(-) diff --git a/src/common/meta/src/ddl/alter_table.rs b/src/common/meta/src/ddl/alter_table.rs index 542f731576e1..95892e6967c1 100644 --- a/src/common/meta/src/ddl/alter_table.rs +++ b/src/common/meta/src/ddl/alter_table.rs @@ -54,8 +54,8 @@ use crate::table_name::TableName; pub struct AlterTableProcedure { context: DdlContext, data: AlterTableData, - /// proto alter Kind. - kind: alter_request::Kind, + /// proto alter Kind for adding/dropping columns. + kind: Option, } impl AlterTableProcedure { @@ -171,7 +171,7 @@ impl AlterTableProcedure { Ok(AlterRequest { region_id: region_id.as_u64(), schema_version: table_info.ident.version, - kind: Some(self.kind.clone()), + kind: self.kind.clone(), }) } @@ -461,7 +461,7 @@ impl AlterTableData { pub fn create_proto_alter_kind( table_info: &RawTableInfo, alter_kind: &Kind, -) -> Result<(alter_request::Kind, Option)> { +) -> Result<(Option, Option)> { match alter_kind { Kind::AddColumns(x) => { let mut next_column_id = table_info.meta.next_column_id; @@ -494,7 +494,7 @@ pub fn create_proto_alter_kind( .collect::>>()?; Ok(( - alter_request::Kind::AddColumns(AddColumns { add_columns }), + Some(alter_request::Kind::AddColumns(AddColumns { add_columns })), Some(next_column_id), )) } @@ -508,10 +508,12 @@ pub fn create_proto_alter_kind( .collect::>(); Ok(( - alter_request::Kind::DropColumns(DropColumns { drop_columns }), + Some(alter_request::Kind::DropColumns(DropColumns { + drop_columns, + })), None, )) } - Kind::RenameTable(_) => unreachable!(), + Kind::RenameTable(_) => Ok((None, None)), } } diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index 85940f3a366c..0a15b79b412f 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -195,9 +195,10 @@ mod test { r" CREATE TABLE {table_name} ( a INT, - b STRING PRIMARY KEY, + b STRING, ts TIMESTAMP, - TIME INDEX (ts) + TIME INDEX (ts), + PRIMARY KEY (a, b) ) PARTITION BY RANGE COLUMNS(a) ( PARTITION r0 VALUES LESS THAN (10), PARTITION r1 VALUES LESS THAN (20), @@ -334,7 +335,7 @@ CREATE TABLE {table_name} ( ..Default::default() }), null_mask: vec![32, 0], - semantic_type: SemanticType::Field as i32, + semantic_type: SemanticType::Tag as i32, datatype: ColumnDataType::Int32 as i32, }, Column { @@ -412,7 +413,7 @@ CREATE TABLE {table_name} ( key_columns: vec![ Column { column_name: "a".to_string(), - semantic_type: SemanticType::Field as i32, + semantic_type: SemanticType::Tag as i32, values: Some(Values { i32_values: a, ..Default::default() diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index 48d7289a5bb8..d5a0c815e14f 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -191,7 +191,7 @@ fn expect_data() -> (Column, Column, Column, Column) { .collect(), ..Default::default() }), - semantic_type: SemanticType::Field as i32, + semantic_type: SemanticType::Tag as i32, datatype: ColumnDataType::String as i32, ..Default::default() }; @@ -363,14 +363,14 @@ fn testing_create_expr() -> CreateTableExpr { ColumnDef { name: "ts".to_string(), data_type: ColumnDataType::TimestampMillisecond as i32, // timestamp - is_nullable: true, + is_nullable: false, default_constraint: vec![], semantic_type: SemanticType::Timestamp as i32, }, ]; CreateTableExpr { - catalog_name: "".to_string(), - schema_name: "".to_string(), + catalog_name: "greptime".to_string(), + schema_name: "public".to_string(), table_name: "demo".to_string(), desc: "blabla little magic fairy".to_string(), column_defs, From d1adb915bfa18a8fe8703318d8766211e780c765 Mon Sep 17 00:00:00 2001 From: JeremyHi Date: Thu, 14 Sep 2023 20:12:38 +0800 Subject: [PATCH 3/3] feat: set readonly first when deregister region (#2391) * feat: set readonly first when deregister region * revert distxxx --- src/datanode/src/region_server.rs | 8 +++++--- src/frontend/src/instance.rs | 2 +- src/frontend/src/statement/ddl.rs | 2 +- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 15c295933735..8a68d81bd6f5 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -130,7 +130,7 @@ impl RegionServerHandler for RegionServer { for result in results { match result .map_err(BoxedError::new) - .context(servers_error::ExecuteGrpcRequestSnafu)? + .context(ExecuteGrpcRequestSnafu)? { Output::AffectedRows(rows) => affected_rows += rows, Output::Stream(_) | Output::RecordBatches(_) => { @@ -242,7 +242,9 @@ impl RegionServerInner { } RegionChange::Deregisters => { info!("Region {region_id} is deregistered from engine {engine_type}"); - self.region_map.remove(®ion_id); + self.region_map + .remove(®ion_id) + .map(|(id, engine)| engine.set_writable(id, false)); } } @@ -417,7 +419,7 @@ impl SchemaProvider for DummySchemaProvider { } } -/// For [TableProvider](datafusion::datasource::TableProvider) and [DummyCatalogList] +/// For [TableProvider](TableProvider) and [DummyCatalogList] #[derive(Clone)] struct DummyTableProvider { region_id: RegionId, diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index cda343d2a099..c61abd18f066 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod distributed; +mod distributed; mod grpc; mod influxdb; mod opentsdb; diff --git a/src/frontend/src/statement/ddl.rs b/src/frontend/src/statement/ddl.rs index 711386128b2e..dea77dd0bfa1 100644 --- a/src/frontend/src/statement/ddl.rs +++ b/src/frontend/src/statement/ddl.rs @@ -111,7 +111,7 @@ impl StatementExecutor { let table_id = resp.table_id.context(error::UnexpectedSnafu { violated: "expected table_id", })?; - info!("Successfully created distributed table '{table_name}' with table id {table_id}"); + info!("Successfully created table '{table_name}' with table id {table_id}"); table_info.ident.table_id = table_id; let engine = table_info.meta.engine.to_string();