diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 2347a04f8d02..db2e335602a3 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -158,8 +158,8 @@ pub enum Error { #[snafu(display("Invalid SQL, error: {}", err_msg))] InvalidSql { err_msg: String, location: Location }, - #[snafu(display("Incomplete GRPC result: {}", err_msg))] - IncompleteGrpcResult { err_msg: String, location: Location }, + #[snafu(display("Incomplete GRPC request: {}", err_msg))] + IncompleteGrpcRequest { err_msg: String, location: Location }, #[snafu(display("Failed to find Datanode by region: {:?}", region))] FindDatanode { @@ -713,7 +713,8 @@ impl ErrorExt for Error { | Error::ProjectSchema { .. } | Error::UnsupportedFormat { .. } | Error::EmptyData { .. } - | Error::ColumnNoneDefaultValue { .. } => StatusCode::InvalidArguments, + | Error::ColumnNoneDefaultValue { .. } + | Error::IncompleteGrpcRequest { .. } => StatusCode::InvalidArguments, Error::NotSupported { .. } => StatusCode::Unsupported, @@ -770,8 +771,7 @@ impl ErrorExt for Error { | Error::MissingInsertBody { .. } | Error::InvalidRegionRequest { .. } => StatusCode::Internal, - Error::IncompleteGrpcResult { .. } - | Error::ContextValueNotFound { .. } + Error::ContextValueNotFound { .. } | Error::InvalidSystemTableDef { .. } | Error::EncodeJson { .. } => StatusCode::Unexpected, diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index d24598fbd1c2..f73525150c67 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::ddl_request::Expr as DdlExpr; +use api::v1::ddl_request::{Expr as DdlExpr, Expr}; use api::v1::greptime_request::Request; use api::v1::query_request::Query; use api::v1::{DeleteRequests, InsertRequests, RowDeleteRequests, RowInsertRequests}; @@ -27,9 +27,7 @@ use servers::query_handler::sql::SqlQueryHandler; use session::context::QueryContextRef; use snafu::{ensure, OptionExt, ResultExt}; -use crate::error::{ - self, Error, IncompleteGrpcResultSnafu, NotSupportedSnafu, PermissionSnafu, Result, -}; +use crate::error::{Error, IncompleteGrpcRequestSnafu, NotSupportedSnafu, PermissionSnafu, Result}; use crate::instance::Instance; #[async_trait] @@ -53,7 +51,7 @@ impl GrpcQueryHandler for Instance { Request::Deletes(requests) => self.handle_deletes(requests, ctx.clone()).await?, Request::RowDeletes(requests) => self.handle_row_deletes(requests, ctx.clone()).await?, Request::Query(query_request) => { - let query = query_request.query.context(IncompleteGrpcResultSnafu { + let query = query_request.query.context(IncompleteGrpcRequestSnafu { err_msg: "Missing field 'QueryRequest.query'", })?; match query { @@ -93,10 +91,12 @@ impl GrpcQueryHandler for Instance { } } Request::Ddl(request) => { - let expr = request.expr.context(error::UnexpectedSnafu { - violated: "expected expr", + let mut expr = request.expr.context(IncompleteGrpcRequestSnafu { + err_msg: "'expr' is absent in DDL request", })?; + fill_catalog_and_schema_from_context(&mut expr, &ctx); + match expr { DdlExpr::CreateTable(mut expr) => { // TODO(weny): supports to create multiple region table. @@ -135,6 +135,38 @@ impl GrpcQueryHandler for Instance { } } +fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryContextRef) { + let catalog = ctx.current_catalog(); + let schema = ctx.current_schema(); + + macro_rules! check_and_fill { + ($expr:ident) => { + if $expr.catalog_name.is_empty() { + $expr.catalog_name = catalog.to_string(); + } + if $expr.schema_name.is_empty() { + $expr.schema_name = schema.to_string(); + } + }; + } + + match ddl_expr { + Expr::CreateDatabase(_) => { /* do nothing*/ } + Expr::CreateTable(expr) => { + check_and_fill!(expr); + } + Expr::Alter(expr) => { + check_and_fill!(expr); + } + Expr::DropTable(expr) => { + check_and_fill!(expr); + } + Expr::TruncateTable(expr) => { + check_and_fill!(expr); + } + } +} + impl Instance { pub async fn handle_inserts( &self, diff --git a/src/sql/src/statements/create.rs b/src/sql/src/statements/create.rs index 3bf1fe260b8f..32ce2a60ef55 100644 --- a/src/sql/src/statements/create.rs +++ b/src/sql/src/statements/create.rs @@ -161,9 +161,7 @@ impl Display for Partitions { if !self.column_list.is_empty() { write!( f, - r#"PARTITION BY RANGE COLUMNS ({}) ( - {} - )"#, + "PARTITION BY RANGE COLUMNS ({}) (\n{}\n)", format_list_comma!(self.column_list), format_list_indent!(self.entries), ) @@ -261,10 +259,10 @@ CREATE TABLE IF NOT EXISTS demo ( PRIMARY KEY (ts, host) ) PARTITION BY RANGE COLUMNS (ts) ( - PARTITION r0 VALUES LESS THAN (5), + PARTITION r0 VALUES LESS THAN (5), PARTITION r1 VALUES LESS THAN (9), PARTITION r2 VALUES LESS THAN (MAXVALUE) - ) +) ENGINE=mito WITH( regions = 1, diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index d7eec56ed6c7..90dc72a46e1c 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -41,7 +41,8 @@ use tonic::transport::Server; use tower::service_fn; use crate::test_util::{ - create_datanode_opts, create_tmp_dir_and_datanode_opts, FileDirGuard, StorageGuard, StorageType, + self, create_datanode_opts, create_tmp_dir_and_datanode_opts, FileDirGuard, StorageGuard, + StorageType, }; pub struct GreptimeDbCluster { @@ -101,6 +102,8 @@ impl GreptimeDbClusterBuilder { .build_frontend(meta_srv.clone(), datanode_clients) .await; + test_util::prepare_another_catalog_and_schema(frontend.as_ref()).await; + frontend.start().await.unwrap(); GreptimeDbCluster { diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index 0a15b79b412f..4bb7b2be8e41 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -307,7 +307,7 @@ CREATE TABLE {table_name} ( } async fn test_insert_delete_and_query_on_existing_table(instance: &Instance, table_name: &str) { - let ts_millisecond_values = vec![ + let timestamp_millisecond_values = vec![ 1672557972000, 1672557973000, 1672557974000, @@ -341,7 +341,7 @@ CREATE TABLE {table_name} ( Column { column_name: "b".to_string(), values: Some(Values { - string_values: ts_millisecond_values + string_values: timestamp_millisecond_values .iter() .map(|x| format!("ts: {x}")) .collect(), @@ -354,7 +354,7 @@ CREATE TABLE {table_name} ( Column { column_name: "ts".to_string(), values: Some(Values { - ts_millisecond_values, + timestamp_millisecond_values, ..Default::default() }), semantic_type: SemanticType::Timestamp as i32, @@ -363,7 +363,6 @@ CREATE TABLE {table_name} ( }, ], row_count: 16, - ..Default::default() }; let output = query( instance, @@ -409,7 +408,6 @@ CREATE TABLE {table_name} ( let new_grpc_delete_request = |a, b, ts, row_count| DeleteRequest { table_name: table_name.to_string(), - region_number: 0, key_columns: vec![ Column { column_name: "a".to_string(), @@ -435,7 +433,7 @@ CREATE TABLE {table_name} ( column_name: "ts".to_string(), semantic_type: SemanticType::Timestamp as i32, values: Some(Values { - ts_millisecond_values: ts, + timestamp_millisecond_values: ts, ..Default::default() }), datatype: ColumnDataType::TimestampMillisecond as i32, @@ -545,6 +543,7 @@ CREATE TABLE {table_name} ( .handle_read(RegionQueryRequest { region_id: region_id.as_u64(), plan: plan.to_vec(), + ..Default::default() }) .await .unwrap(); @@ -574,7 +573,11 @@ CREATE TABLE {table_name} ( Column { column_name: "ts".to_string(), values: Some(Values { - ts_millisecond_values: vec![1672557975000, 1672557976000, 1672557977000], + timestamp_millisecond_values: vec![ + 1672557975000, + 1672557976000, + 1672557977000, + ], ..Default::default() }), semantic_type: SemanticType::Timestamp as i32, @@ -583,7 +586,6 @@ CREATE TABLE {table_name} ( }, ], row_count: 3, - ..Default::default() }; // Test auto create not existed table upon insertion. @@ -609,7 +611,11 @@ CREATE TABLE {table_name} ( Column { column_name: "ts".to_string(), values: Some(Values { - ts_millisecond_values: vec![1672557978000, 1672557979000, 1672557980000], + timestamp_millisecond_values: vec![ + 1672557978000, + 1672557979000, + 1672557980000, + ], ..Default::default() }), semantic_type: SemanticType::Timestamp as i32, @@ -618,7 +624,6 @@ CREATE TABLE {table_name} ( }, ], row_count: 3, - ..Default::default() }; // Test auto add not existed column upon insertion. @@ -653,11 +658,10 @@ CREATE TABLE {table_name} ( let delete = DeleteRequest { table_name: "auto_created_table".to_string(), - region_number: 0, key_columns: vec![Column { column_name: "ts".to_string(), values: Some(Values { - ts_millisecond_values: vec![1672557975000, 1672557979000], + timestamp_millisecond_values: vec![1672557975000, 1672557979000], ..Default::default() }), semantic_type: SemanticType::Timestamp as i32, @@ -739,7 +743,7 @@ CREATE TABLE {table_name} ( Column { column_name: "ts".to_string(), values: Some(Values { - ts_millisecond_values: vec![ + timestamp_millisecond_values: vec![ 1672557972000, 1672557973000, 1672557974000, @@ -757,7 +761,6 @@ CREATE TABLE {table_name} ( }, ], row_count: 8, - ..Default::default() }; let request = Request::Inserts(InsertRequests { diff --git a/tests-integration/src/instance.rs b/tests-integration/src/instance.rs index 4740e03aa02f..7707df8d4b37 100644 --- a/tests-integration/src/instance.rs +++ b/tests-integration/src/instance.rs @@ -240,6 +240,7 @@ mod tests { .handle_read(QueryRequest { region_id: region_id.as_u64(), plan: plan.to_vec(), + ..Default::default() }) .await .unwrap(); diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 9c4c5380f6c4..460adb65bfee 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -14,16 +14,15 @@ use std::sync::Arc; -use catalog::kvbackend::DummyKvCacheInvalidator; +use catalog::kvbackend::{DummyKvCacheInvalidator, KvBackendCatalogManager}; use common_base::Plugins; use common_config::KvStoreConfig; use common_procedure::options::ProcedureConfig; use datanode::datanode::builder::DatanodeBuilder; use datanode::datanode::DatanodeOptions; -use frontend::catalog::FrontendCatalogManager; -use frontend::instance::{Instance, StandaloneDatanodeManager}; +use frontend::instance::{FrontendInstance, Instance, StandaloneDatanodeManager}; -use crate::test_util::{create_tmp_dir_and_datanode_opts, StorageType, TestGuard}; +use crate::test_util::{self, create_tmp_dir_and_datanode_opts, StorageType, TestGuard}; pub struct GreptimeDbStandalone { pub instance: Arc, @@ -81,7 +80,7 @@ impl GreptimeDbStandaloneBuilder { .await .unwrap(); - let catalog_manager = FrontendCatalogManager::new( + let catalog_manager = KvBackendCatalogManager::new( kv_store.clone(), Arc::new(DummyKvCacheInvalidator), Arc::new(StandaloneDatanodeManager(datanode.region_server())), @@ -103,6 +102,10 @@ impl GreptimeDbStandaloneBuilder { .await .unwrap(); + test_util::prepare_another_catalog_and_schema(&instance).await; + + instance.start().await.unwrap(); + GreptimeDbStandalone { instance: Arc::new(instance), datanode_opts: opts, diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 059ba92cebe9..06a24b3f9276 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; use std::env; use std::fmt::Display; use std::net::SocketAddr; @@ -21,7 +20,9 @@ use std::time::Duration; use auth::UserProviderRef; use axum::Router; -use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use catalog::kvbackend::KvBackendCatalogManager; +use common_meta::key::catalog_name::CatalogNameKey; +use common_meta::key::schema_name::SchemaNameKey; use common_query::Output; use common_recordbatch::util; use common_runtime::Builder as RuntimeBuilder; @@ -31,10 +32,6 @@ use datanode::datanode::{ AzblobConfig, DatanodeOptions, FileConfig, GcsConfig, ObjectStoreConfig, OssConfig, S3Config, StorageConfig, }; -use datatypes::scalars::ScalarVectorBuilder; -use datatypes::vectors::{ - Float64VectorBuilder, MutableVector, StringVectorBuilder, TimestampMillisecondVectorBuilder, -}; use frontend::instance::Instance; use frontend::service_config::{MysqlOptions, PostgresOptions}; use object_store::services::{Azblob, Gcs, Oss, S3}; @@ -52,7 +49,6 @@ use servers::query_handler::sql::{ServerSqlQueryHandlerAdaptor, SqlQueryHandler} use servers::server::Server; use servers::Mode; use session::context::QueryContext; -use table::requests::InsertRequest; use crate::standalone::{GreptimeDbStandalone, GreptimeDbStandaloneBuilder}; @@ -312,26 +308,23 @@ CREATE TABLE IF NOT EXISTS {table_name} ( host String NOT NULL PRIMARY KEY, cpu DOUBLE NULL, memory DOUBLE NULL, - ts TIMESTAMP NULL TIME INDEX + ts TIMESTAMP NOT NULL TIME INDEX, ) "# ); - let _ = instance.do_query(&sql, QueryContext::arc()).await; + let result = instance.do_query(&sql, QueryContext::arc()).await; + let _ = result.first().unwrap().as_ref().unwrap(); } async fn setup_standalone_instance( test_name: &str, store_type: StorageType, ) -> GreptimeDbStandalone { - let instance = GreptimeDbStandaloneBuilder::new(test_name) + GreptimeDbStandaloneBuilder::new(test_name) .with_store_type(store_type) .build() - .await; - - create_test_table(instance.instance.as_ref(), "demo").await; - - instance + .await } pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router, TestGuard) { @@ -366,6 +359,8 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider( ) -> (Router, TestGuard) { let instance = setup_standalone_instance(name, store_type).await; + create_test_table(instance.instance.as_ref(), "demo").await; + let http_opts = HttpOptions { addr: format!("127.0.0.1:{}", ports::get_port()), ..Default::default() @@ -391,39 +386,6 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider( (app, instance.guard) } -fn mock_insert_request(host: &str, cpu: f64, memory: f64, ts: i64) -> InsertRequest { - let mut builder = StringVectorBuilder::with_capacity(1); - builder.push(Some(host)); - let host = builder.to_vector(); - - let mut builder = Float64VectorBuilder::with_capacity(1); - builder.push(Some(cpu)); - let cpu = builder.to_vector(); - - let mut builder = Float64VectorBuilder::with_capacity(1); - builder.push(Some(memory)); - let memory = builder.to_vector(); - - let mut builder = TimestampMillisecondVectorBuilder::with_capacity(1); - builder.push(Some(ts.into())); - let ts = builder.to_vector(); - - let columns_values = HashMap::from([ - ("host".to_string(), host), - ("cpu".to_string(), cpu), - ("memory".to_string(), memory), - ("ts".to_string(), ts), - ]); - - InsertRequest { - catalog_name: common_catalog::consts::DEFAULT_CATALOG_NAME.to_string(), - schema_name: common_catalog::consts::DEFAULT_SCHEMA_NAME.to_string(), - table_name: "demo".to_string(), - columns_values, - region_number: 0, - } -} - pub async fn setup_test_prom_app_with_frontend( store_type: StorageType, name: &str, @@ -432,22 +394,11 @@ pub async fn setup_test_prom_app_with_frontend( let instance = setup_standalone_instance(name, store_type).await; - let demo = instance - .instance - .catalog_manager() - .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "demo") - .await - .unwrap() - .unwrap(); + create_test_table(instance.instance.as_ref(), "demo").await; - let _ = demo - .insert(mock_insert_request("host1", 1.1, 2.2, 0)) - .await - .unwrap(); - let _ = demo - .insert(mock_insert_request("host2", 2.1, 4.3, 600000)) - .await - .unwrap(); + let sql = "INSERT INTO demo VALUES ('host1', 1.1, 2.2, 0), ('host2', 2.1, 4.3, 600000)"; + let result = instance.instance.do_query(sql, QueryContext::arc()).await; + let _ = result.first().unwrap().as_ref().unwrap(); let http_opts = HttpOptions { addr: format!("127.0.0.1:{}", ports::get_port()), @@ -627,3 +578,27 @@ pub async fn setup_pg_server_with_user_provider( (fe_pg_addr, instance.guard, fe_pg_server) } + +pub(crate) async fn prepare_another_catalog_and_schema(instance: &Instance) { + let catalog_manager = instance + .catalog_manager() + .as_any() + .downcast_ref::() + .unwrap(); + + let table_metadata_manager = catalog_manager.table_metadata_manager_ref(); + table_metadata_manager + .catalog_manager() + .create(CatalogNameKey::new("another_catalog"), true) + .await + .unwrap(); + table_metadata_manager + .schema_manager() + .create( + SchemaNameKey::new("another_catalog", "another_schema"), + None, + true, + ) + .await + .unwrap(); +} diff --git a/tests-integration/src/tests/instance_test.rs b/tests-integration/src/tests/instance_test.rs index c4bceefaf25f..b176e51dd786 100644 --- a/tests-integration/src/tests/instance_test.rs +++ b/tests-integration/src/tests/instance_test.rs @@ -84,13 +84,11 @@ async fn test_show_create_table(instance: Arc) { let frontend = instance.frontend(); let sql = if instance.is_distributed_mode() { r#"create table demo( - host STRING, - cpu DOUBLE, - memory DOUBLE, + n INT PRIMARY KEY, ts timestamp, TIME INDEX(ts) ) -PARTITION BY RANGE COLUMNS (ts) ( +PARTITION BY RANGE COLUMNS (n) ( PARTITION r0 VALUES LESS THAN (1), PARTITION r1 VALUES LESS THAN (10), PARTITION r2 VALUES LESS THAN (100), @@ -111,27 +109,26 @@ PARTITION BY RANGE COLUMNS (ts) ( let output = execute_sql(&frontend, "show create table demo").await; let expected = if instance.is_distributed_mode() { - r#"+-------+----------------------------------------------------------+ -| Table | Create Table | -+-------+----------------------------------------------------------+ -| demo | CREATE TABLE IF NOT EXISTS "demo" ( | -| | "host" STRING NULL, | -| | "cpu" DOUBLE NULL, | -| | "memory" DOUBLE NULL, | -| | "ts" TIMESTAMP(3) NOT NULL, | -| | TIME INDEX ("ts") | -| | ) | -| | PARTITION BY RANGE COLUMNS ("ts") ( | -| | PARTITION r0 VALUES LESS THAN (1), | -| | PARTITION r1 VALUES LESS THAN (10), | -| | PARTITION r2 VALUES LESS THAN (100), | -| | PARTITION r3 VALUES LESS THAN (MAXVALUE) | -| | ) | -| | ENGINE=mito | -| | WITH( | -| | regions = 4 | -| | ) | -+-------+----------------------------------------------------------+"# + r#"+-------+--------------------------------------------+ +| Table | Create Table | ++-------+--------------------------------------------+ +| demo | CREATE TABLE IF NOT EXISTS "demo" ( | +| | "n" INT NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("n") | +| | ) | +| | PARTITION BY RANGE COLUMNS ("n") ( | +| | PARTITION r0 VALUES LESS THAN (1), | +| | PARTITION r1 VALUES LESS THAN (10), | +| | PARTITION r2 VALUES LESS THAN (100), | +| | PARTITION r3 VALUES LESS THAN (MAXVALUE) | +| | ) | +| | ENGINE=mito | +| | WITH( | +| | regions = 4 | +| | ) | ++-------+--------------------------------------------+"# } else { r#"+-------+-------------------------------------+ | Table | Create Table | @@ -143,6 +140,7 @@ PARTITION BY RANGE COLUMNS (ts) ( | | "ts" TIMESTAMP(3) NOT NULL, | | | TIME INDEX ("ts") | | | ) | +| | | | | ENGINE=mito | | | WITH( | | | regions = 1 | @@ -1467,7 +1465,6 @@ async fn test_cast_type_issue_1594(instance: Arc) { #[apply(both_instances_cases)] async fn test_information_schema_dot_tables(instance: Arc) { - let is_distributed_mode = instance.is_distributed_mode(); let instance = instance.frontend(); let sql = "create table another_table(i timestamp time index)"; @@ -1480,9 +1477,7 @@ async fn test_information_schema_dot_tables(instance: Arc) { let sql = "select table_catalog, table_schema, table_name, table_type, table_id, engine from information_schema.tables where table_type != 'SYSTEM VIEW' order by table_name"; let output = execute_sql(&instance, sql).await; - let expected = match is_distributed_mode { - true => { - "\ + let expected = "\ +---------------+--------------------+------------+-----------------+----------+-------------+ | table_catalog | table_schema | table_name | table_type | table_id | engine | +---------------+--------------------+------------+-----------------+----------+-------------+ @@ -1490,46 +1485,19 @@ async fn test_information_schema_dot_tables(instance: Arc) { | greptime | public | numbers | LOCAL TEMPORARY | 2 | test_engine | | greptime | public | scripts | BASE TABLE | 1024 | mito | | greptime | information_schema | tables | LOCAL TEMPORARY | 3 | | -+---------------+--------------------+------------+-----------------+----------+-------------+" - } - false => { - "\ -+---------------+--------------------+------------+-----------------+----------+-------------+ -| table_catalog | table_schema | table_name | table_type | table_id | engine | -+---------------+--------------------+------------+-----------------+----------+-------------+ -| greptime | information_schema | columns | LOCAL TEMPORARY | 4 | | -| greptime | public | numbers | LOCAL TEMPORARY | 2 | test_engine | -| greptime | public | scripts | BASE TABLE | 1 | mito | -| greptime | information_schema | tables | LOCAL TEMPORARY | 3 | | -+---------------+--------------------+------------+-----------------+----------+-------------+" - } - }; ++---------------+--------------------+------------+-----------------+----------+-------------+"; check_output_stream(output, expected).await; let output = execute_sql_with(&instance, sql, query_ctx).await; - let expected = match is_distributed_mode { - true => { - "\ + let expected = "\ +-----------------+--------------------+---------------+-----------------+----------+--------+ | table_catalog | table_schema | table_name | table_type | table_id | engine | +-----------------+--------------------+---------------+-----------------+----------+--------+ | another_catalog | another_schema | another_table | BASE TABLE | 1025 | mito | | another_catalog | information_schema | columns | LOCAL TEMPORARY | 4 | | | another_catalog | information_schema | tables | LOCAL TEMPORARY | 3 | | -+-----------------+--------------------+---------------+-----------------+----------+--------+" - } - false => { - "\ -+-----------------+--------------------+---------------+-----------------+----------+--------+ -| table_catalog | table_schema | table_name | table_type | table_id | engine | -+-----------------+--------------------+---------------+-----------------+----------+--------+ -| another_catalog | another_schema | another_table | BASE TABLE | 1024 | mito | -| another_catalog | information_schema | columns | LOCAL TEMPORARY | 4 | | -| another_catalog | information_schema | tables | LOCAL TEMPORARY | 3 | | -+-----------------+--------------------+---------------+-----------------+----------+--------+" - } - }; ++-----------------+--------------------+---------------+-----------------+----------+--------+"; check_output_stream(output, expected).await; } diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index d5a0c815e14f..f8470be9fe66 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -82,7 +82,6 @@ pub async fn test_invalid_dbname(store_type: StorageType) { let (expected_host_col, expected_cpu_col, expected_mem_col, expected_ts_col) = expect_data(); let request = InsertRequest { table_name: "demo".to_string(), - region_number: 0, columns: vec![ expected_host_col.clone(), expected_cpu_col.clone(), @@ -218,7 +217,7 @@ fn expect_data() -> (Column, Column, Column, Column) { let expected_ts_col = Column { column_name: "ts".to_string(), values: Some(column::Values { - ts_millisecond_values: vec![100, 101, 102, 103], + timestamp_millisecond_values: vec![100, 101, 102, 103], ..Default::default() }), semantic_type: SemanticType::Timestamp as i32, @@ -283,7 +282,6 @@ async fn insert_and_assert(db: &Database) { let request = InsertRequest { table_name: "demo".to_string(), - region_number: 0, columns: vec![ expected_host_col.clone(), expected_cpu_col.clone(), @@ -381,7 +379,6 @@ fn testing_create_expr() -> CreateTableExpr { table_id: Some(TableId { id: MIN_USER_TABLE_ID, }), - region_numbers: vec![0], engine: MITO_ENGINE.to_string(), } } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 8079f41e4d6b..0f97bca5a030 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -117,7 +117,6 @@ pub async fn test_http_auth(store_type: StorageType) { } pub async fn test_sql_api(store_type: StorageType) { - common_telemetry::init_default_ut_logging(); let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "sql_api").await; let client = TestClient::new(app); let res = client.get("/v1/sql").send().await; @@ -305,7 +304,7 @@ pub async fn test_sql_api(store_type: StorageType) { .await; assert_eq!(res.status(), StatusCode::OK); let body = serde_json::from_str::(&res.text().await).unwrap(); - assert_eq!(body.code(), ErrorCode::Internal as u32); + assert_eq!(body.code(), ErrorCode::DatabaseNotFound as u32); // test invalid schema let res = client @@ -320,7 +319,6 @@ pub async fn test_sql_api(store_type: StorageType) { } pub async fn test_prometheus_promql_api(store_type: StorageType) { - common_telemetry::init_default_ut_logging(); let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "sql_api").await; let client = TestClient::new(app); @@ -606,62 +604,66 @@ pub async fn test_config_api(store_type: StorageType) { let res_get = client.get("/config").send().await; assert_eq!(res_get.status(), StatusCode::OK); let expected_toml_str = format!( - r#" - mode = "standalone" - enable_memory_catalog = false - rpc_addr = "127.0.0.1:3001" - rpc_runtime_size = 8 - enable_telemetry = true - - [heartbeat] - interval_millis = 5000 - retry_interval_millis = 5000 - - [http_opts] - addr = "127.0.0.1:4000" - timeout = "30s" - body_limit = "64MiB" - - [wal] - file_size = "256MiB" - purge_threshold = "4GiB" - purge_interval = "10m" - read_batch_size = 128 - sync_write = false - - [storage] - type = "{}" - - [storage.compaction] - max_inflight_tasks = 4 - max_files_in_level0 = 8 - max_purge_tasks = 32 - sst_write_buffer_size = "8MiB" - - [storage.manifest] - checkpoint_margin = 10 - gc_duration = "10m" - compress = false - - [storage.flush] - max_flush_tasks = 8 - region_write_buffer_size = "32MiB" - picker_schedule_interval = "5m" - auto_flush_interval = "1h" - - [procedure] - max_retry_times = 3 - retry_delay = "500ms" - - [logging] - enable_jaeger_tracing = false"#, + r#"mode = "standalone" +rpc_addr = "127.0.0.1:3001" +rpc_runtime_size = 8 +enable_telemetry = true + +[heartbeat] +interval_millis = 5000 +retry_interval_millis = 5000 + +[http_opts] +addr = "127.0.0.1:4000" +timeout = "30s" +body_limit = "64MiB" + +[wal] +file_size = "256MiB" +purge_threshold = "4GiB" +purge_interval = "10m" +read_batch_size = 128 +sync_write = false + +[storage] +type = "{}" + +[storage.compaction] +max_inflight_tasks = 4 +max_files_in_level0 = 8 +max_purge_tasks = 32 +sst_write_buffer_size = "8MiB" + +[storage.manifest] +checkpoint_margin = 10 +gc_duration = "10m" +compress = false + +[storage.flush] +max_flush_tasks = 8 +region_write_buffer_size = "32MiB" +picker_schedule_interval = "5m" +auto_flush_interval = "1h" + +[[region_engine]] + +[region_engine.mito] +num_workers = 1 +worker_channel_size = 128 +worker_request_batch_size = 64 +manifest_checkpoint_distance = 10 +manifest_compress_type = "Uncompressed" +max_background_jobs = 4 +auto_flush_interval = "30m" +global_write_buffer_size = "1GiB" +global_write_buffer_reject_size = "2GiB" + +[logging] +enable_jaeger_tracing = false"#, store_type ); let body_text = drop_lines_with_inconsistent_results(res_get.text().await); - assert_eq!( - normalize_str(body_text.as_str()), - normalize_str(&expected_toml_str) - ); + assert_eq!(body_text, expected_toml_str); } fn drop_lines_with_inconsistent_results(input: String) -> String { @@ -684,10 +686,6 @@ fn drop_lines_with_inconsistent_results(input: String) -> String { .join("\n") } -fn normalize_str(s: &str) -> String { - s.replace([' ', '\n'], "") -} - #[cfg(feature = "dashboard")] pub async fn test_dashboard_path(store_type: StorageType) { common_telemetry::init_default_ut_logging(); diff --git a/tests-integration/tests/region_failover.rs b/tests-integration/tests/region_failover.rs index e324a5f3ebb3..7ebeb9da726d 100644 --- a/tests-integration/tests/region_failover.rs +++ b/tests-integration/tests/region_failover.rs @@ -16,17 +16,14 @@ use std::sync::Arc; use std::time::Duration; use api::v1::meta::Peer; -use catalog::kvbackend::CachedMetaKvBackend; -use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE}; -use common_meta::ident::TableIdent; -use common_meta::key::table_name::TableNameKey; +use catalog::kvbackend::{CachedMetaKvBackend, KvBackendCatalogManager}; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_meta::key::table_route::TableRouteKey; use common_meta::key::{RegionDistribution, TableMetaKey}; use common_meta::RegionIdent; use common_procedure::{watcher, ProcedureWithId}; use common_query::Output; use common_telemetry::info; -use frontend::catalog::FrontendCatalogManager; use frontend::error::Result as FrontendResult; use frontend::instance::Instance; use futures::TryStreamExt; @@ -100,7 +97,7 @@ pub async fn test_region_failover(store_type: StorageType) { let frontend = cluster.frontend.clone(); - prepare_testing_table(&cluster).await; + let table_id = prepare_testing_table(&cluster).await; let results = write_datas(&frontend, logical_timer).await; logical_timer += 1000; @@ -108,11 +105,6 @@ pub async fn test_region_failover(store_type: StorageType) { assert!(matches!(result.unwrap(), Output::AffectedRows(1))); } - let table_id = get_table_id( - &frontend, - TableNameKey::new("greptime", "public", "my_table"), - ) - .await; assert!(has_route_cache(&frontend, table_id).await); let distribution = find_region_distribution(&cluster, table_id).await; @@ -165,28 +157,11 @@ pub async fn test_region_failover(store_type: StorageType) { assert!(success) } -async fn get_table_id(instance: &Arc, key: TableNameKey<'_>) -> TableId { - let catalog_manager = instance - .catalog_manager() - .as_any() - .downcast_ref::() - .unwrap(); - - catalog_manager - .table_metadata_manager_ref() - .table_name_manager() - .get(key) - .await - .unwrap() - .unwrap() - .table_id() -} - async fn has_route_cache(instance: &Arc, table_id: TableId) -> bool { let catalog_manager = instance .catalog_manager() .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap(); let kv_backend = catalog_manager.table_metadata_manager_ref().kv_backend(); @@ -251,7 +226,7 @@ async fn assert_writes(instance: &Arc) { check_output_stream(result.unwrap(), expected).await; } -async fn prepare_testing_table(cluster: &GreptimeDbCluster) { +async fn prepare_testing_table(cluster: &GreptimeDbCluster) -> TableId { let sql = r" CREATE TABLE my_table ( i INT PRIMARY KEY, @@ -264,6 +239,15 @@ CREATE TABLE my_table ( )"; let result = cluster.frontend.do_query(sql, QueryContext::arc()).await; result.get(0).unwrap().as_ref().unwrap(); + + let table = cluster + .frontend + .catalog_manager() + .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_table") + .await + .unwrap() + .unwrap(); + table.table_info().table_id() } async fn find_region_distribution( @@ -324,14 +308,9 @@ fn choose_failed_region(distribution: RegionDistribution) -> RegionIdent { RegionIdent { cluster_id: 1000, datanode_id: failed_datanode, - table_ident: TableIdent { - table_id: 1025, - engine: MITO_ENGINE.to_string(), - catalog: DEFAULT_CATALOG_NAME.to_string(), - schema: DEFAULT_SCHEMA_NAME.to_string(), - table: "my_table".to_string(), - }, + table_id: 1025, region_number: failed_region, + engine: "mito2".to_string(), } } @@ -369,9 +348,7 @@ async fn run_region_failover_procedure( server_addr: meta_srv.options().server_addr.clone(), kv_store: meta_srv.kv_store().clone(), meta_peer_client: meta_srv.meta_peer_client().clone(), - catalog: None, - schema: None, - table: None, + table_id: None, }, dist_lock: meta_srv.lock().clone(), table_metadata_manager: meta_srv.table_metadata_manager().clone(), diff --git a/tests-integration/tests/sql.rs b/tests-integration/tests/sql.rs index 4e10d5860534..8d4b83a1c1fd 100644 --- a/tests-integration/tests/sql.rs +++ b/tests-integration/tests/sql.rs @@ -121,6 +121,8 @@ pub async fn test_mysql_auth(store_type: StorageType) { } pub async fn test_mysql_crud(store_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (addr, mut guard, fe_mysql_server) = setup_mysql_server(store_type, "sql_crud").await; let pool = MySqlPoolOptions::new()