diff --git a/Cargo.lock b/Cargo.lock index 1d5c67a6f559..4772587dd49b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1236,6 +1236,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" name = "catalog" version = "0.7.1" dependencies = [ + "api", "arrow", "arrow-schema", "async-stream", @@ -10119,6 +10120,7 @@ dependencies = [ "paste", "prost 0.12.3", "query", + "rand", "rstest", "rstest_reuse", "script", diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index 8d91421ebaac..1c0a9a9b170b 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -11,6 +11,7 @@ testing = [] workspace = true [dependencies] +api.workspace = true arrow.workspace = true arrow-schema.workspace = true async-stream.workspace = true diff --git a/src/catalog/src/information_schema/memory_table/tables.rs b/src/catalog/src/information_schema/memory_table/tables.rs index 9655725aece2..e1696ab8e106 100644 --- a/src/catalog/src/information_schema/memory_table/tables.rs +++ b/src/catalog/src/information_schema/memory_table/tables.rs @@ -14,13 +14,15 @@ use std::sync::Arc; -use common_catalog::consts::MITO_ENGINE; +use common_catalog::consts::{METRIC_ENGINE, MITO_ENGINE}; use datatypes::prelude::{ConcreteDataType, VectorRef}; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use datatypes::vectors::{Int64Vector, StringVector}; use crate::information_schema::table_names::*; +const NO_VALUE: &str = "NO"; + /// Find the schema and columns by the table_name, only valid for memory tables. /// Safety: the user MUST ensure the table schema exists, panic otherwise. pub fn get_schema_columns(table_name: &str) -> (SchemaRef, Vec) { @@ -59,14 +61,15 @@ pub fn get_schema_columns(table_name: &str) -> (SchemaRef, Vec) { "SAVEPOINTS", ]), vec![ - Arc::new(StringVector::from(vec![MITO_ENGINE])), - Arc::new(StringVector::from(vec!["DEFAULT"])), + Arc::new(StringVector::from(vec![MITO_ENGINE, METRIC_ENGINE])), + Arc::new(StringVector::from(vec!["DEFAULT", "YES"])), Arc::new(StringVector::from(vec![ "Storage engine for time-series data", + "Storage engine for observability scenarios, which is adept at handling a large number of small tables, making it particularly suitable for cloud-native monitoring", ])), - Arc::new(StringVector::from(vec!["NO"])), - Arc::new(StringVector::from(vec!["NO"])), - Arc::new(StringVector::from(vec!["NO"])), + Arc::new(StringVector::from(vec![NO_VALUE, NO_VALUE])), + Arc::new(StringVector::from(vec![NO_VALUE, NO_VALUE])), + Arc::new(StringVector::from(vec![NO_VALUE, NO_VALUE])), ], ), diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 6864748881cd..5a98a0eb0032 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -19,10 +19,10 @@ use std::any::Any; use std::fmt::{Debug, Formatter}; use std::sync::Arc; +use api::v1::CreateTableExpr; use futures::future::BoxFuture; use futures_util::stream::BoxStream; use table::metadata::TableId; -use table::requests::CreateTableRequest; use table::TableRef; use crate::error::Result; @@ -75,9 +75,9 @@ pub type OpenSystemTableHook = /// Register system table request: /// - When system table is already created and registered, the hook will be called /// with table ref after opening the system table -/// - When system table is not exists, create and register the table by create_table_request and calls open_hook with the created table. +/// - When system table is not exists, create and register the table by `create_table_expr` and calls `open_hook` with the created table. pub struct RegisterSystemTableRequest { - pub create_table_request: CreateTableRequest, + pub create_table_expr: CreateTableExpr, pub open_hook: Option, } diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 16c11b8e4cdd..5d508adbf659 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -480,6 +480,7 @@ impl StartCommand { table_metadata_manager, table_meta_allocator, Arc::new(MemoryRegionKeeper::default()), + true, ) .context(InitDdlManagerSnafu)?, ); diff --git a/src/common/catalog/src/lib.rs b/src/common/catalog/src/lib.rs index 1a2596371709..e1cf4c201d48 100644 --- a/src/common/catalog/src/lib.rs +++ b/src/common/catalog/src/lib.rs @@ -55,10 +55,10 @@ pub fn build_db_string(catalog: &str, schema: &str) -> String { /// schema name /// - if `[-]` is provided, we split database name with `-` and use /// `` and ``. -pub fn parse_catalog_and_schema_from_db_string(db: &str) -> (&str, &str) { +pub fn parse_catalog_and_schema_from_db_string(db: &str) -> (String, String) { match parse_optional_catalog_and_schema_from_db_string(db) { (Some(catalog), schema) => (catalog, schema), - (None, schema) => (DEFAULT_CATALOG_NAME, schema), + (None, schema) => (DEFAULT_CATALOG_NAME.to_string(), schema), } } @@ -66,12 +66,12 @@ pub fn parse_catalog_and_schema_from_db_string(db: &str) -> (&str, &str) { /// /// Similar to [`parse_catalog_and_schema_from_db_string`] but returns an optional /// catalog if it's not provided in the database name. -pub fn parse_optional_catalog_and_schema_from_db_string(db: &str) -> (Option<&str>, &str) { +pub fn parse_optional_catalog_and_schema_from_db_string(db: &str) -> (Option, String) { let parts = db.splitn(2, '-').collect::>(); if parts.len() == 2 { - (Some(parts[0]), parts[1]) + (Some(parts[0].to_lowercase()), parts[1].to_lowercase()) } else { - (None, db) + (None, db.to_lowercase()) } } @@ -88,32 +88,37 @@ mod tests { #[test] fn test_parse_catalog_and_schema() { assert_eq!( - (DEFAULT_CATALOG_NAME, "fullschema"), + (DEFAULT_CATALOG_NAME.to_string(), "fullschema".to_string()), parse_catalog_and_schema_from_db_string("fullschema") ); assert_eq!( - ("catalog", "schema"), + ("catalog".to_string(), "schema".to_string()), parse_catalog_and_schema_from_db_string("catalog-schema") ); assert_eq!( - ("catalog", "schema1-schema2"), + ("catalog".to_string(), "schema1-schema2".to_string()), parse_catalog_and_schema_from_db_string("catalog-schema1-schema2") ); assert_eq!( - (None, "fullschema"), + (None, "fullschema".to_string()), parse_optional_catalog_and_schema_from_db_string("fullschema") ); assert_eq!( - (Some("catalog"), "schema"), + (Some("catalog".to_string()), "schema".to_string()), parse_optional_catalog_and_schema_from_db_string("catalog-schema") ); assert_eq!( - (Some("catalog"), "schema1-schema2"), + (Some("catalog".to_string()), "schema".to_string()), + parse_optional_catalog_and_schema_from_db_string("CATALOG-SCHEMA") + ); + + assert_eq!( + (Some("catalog".to_string()), "schema1-schema2".to_string()), parse_optional_catalog_and_schema_from_db_string("catalog-schema1-schema2") ); } diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index db435d930d30..e63477f47562 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -79,6 +79,7 @@ impl DdlManager { table_metadata_manager: TableMetadataManagerRef, table_metadata_allocator: TableMetadataAllocatorRef, memory_region_keeper: MemoryRegionKeeperRef, + register_loaders: bool, ) -> Result { let manager = Self { procedure_manager, @@ -88,7 +89,9 @@ impl DdlManager { table_metadata_allocator, memory_region_keeper, }; - manager.register_loaders()?; + if register_loaders { + manager.register_loaders()?; + } Ok(manager) } @@ -767,6 +770,7 @@ mod tests { Arc::new(WalOptionsAllocator::default()), )), Arc::new(MemoryRegionKeeper::default()), + true, ); let expected_loaders = vec![ diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 5fcd7d7af7f0..e55ac27dfad0 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -467,7 +467,7 @@ async fn open_all_regions( )); } } - info!("going to open {} regions", regions.len()); + info!("going to open {} region(s)", regions.len()); let semaphore = Arc::new(tokio::sync::Semaphore::new(OPEN_REGION_PARALLELISM)); let mut tasks = vec![]; diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 3a3547c01c92..7c4d35c674fc 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -108,9 +108,6 @@ pub enum Error { #[snafu(display("Invalid DeleteRequest, reason: {}", reason))] InvalidDeleteRequest { reason: String, location: Location }, - #[snafu(display("Invalid system table definition: {err_msg}"))] - InvalidSystemTableDef { err_msg: String, location: Location }, - #[snafu(display("Table not found: {}", table_name))] TableNotFound { table_name: String }, @@ -322,9 +319,7 @@ impl ErrorExt for Error { | Error::VectorToGrpcColumn { .. } | Error::InvalidRegionRequest { .. } => StatusCode::Internal, - Error::ContextValueNotFound { .. } | Error::InvalidSystemTableDef { .. } => { - StatusCode::Unexpected - } + Error::ContextValueNotFound { .. } => StatusCode::Unexpected, Error::TableNotFound { .. } => StatusCode::TableNotFound, diff --git a/src/frontend/src/script.rs b/src/frontend/src/script.rs index 03bd2db0a7a3..88d5edb68424 100644 --- a/src/frontend/src/script.rs +++ b/src/frontend/src/script.rs @@ -67,21 +67,19 @@ mod dummy { mod python { use api::v1::ddl_request::Expr; use api::v1::greptime_request::Request; - use api::v1::{CreateTableExpr, DdlRequest}; + use api::v1::DdlRequest; use arc_swap::ArcSwap; use catalog::RegisterSystemTableRequest; use common_error::ext::BoxedError; use common_meta::table_name::TableName; use common_telemetry::{error, info}; - use operator::expr_factory; use script::manager::ScriptManager; use servers::query_handler::grpc::GrpcQueryHandler; use session::context::QueryContext; use snafu::{OptionExt, ResultExt}; - use table::requests::CreateTableRequest; use super::*; - use crate::error::{CatalogSnafu, InvalidSystemTableDefSnafu, TableNotFoundSnafu}; + use crate::error::{CatalogSnafu, TableNotFoundSnafu}; use crate::instance::Instance; /// A placeholder for the real gRPC handler. @@ -148,17 +146,13 @@ mod python { } let RegisterSystemTableRequest { - create_table_request: request, + create_table_expr: expr, open_hook, } = self.script_manager.create_table_request(catalog); if let Some(table) = self .catalog_manager - .table( - &request.catalog_name, - &request.schema_name, - &request.table_name, - ) + .table(&expr.catalog_name, &expr.schema_name, &expr.table_name) .await .context(CatalogSnafu)? { @@ -171,13 +165,8 @@ mod python { return Ok(()); } - let table_name = TableName::new( - &request.catalog_name, - &request.schema_name, - &request.table_name, - ); - - let expr = Self::create_table_expr(request)?; + let table_name = + TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name); let _ = self .grpc_handler @@ -217,46 +206,6 @@ mod python { Ok(()) } - fn create_table_expr(request: CreateTableRequest) -> Result { - let column_schemas = request.schema.column_schemas; - - let time_index = column_schemas - .iter() - .find_map(|x| { - if x.is_time_index() { - Some(x.name.clone()) - } else { - None - } - }) - .context(InvalidSystemTableDefSnafu { - err_msg: "Time index is not defined.", - })?; - - let primary_keys = request - .primary_key_indices - .iter() - // Indexing has to be safe because the create script table request is pre-defined. - .map(|i| column_schemas[*i].name.clone()) - .collect::>(); - - let column_defs = expr_factory::column_schemas_to_defs(column_schemas, &primary_keys)?; - - Ok(CreateTableExpr { - catalog_name: request.catalog_name, - schema_name: request.schema_name, - table_name: request.table_name, - desc: request.desc.unwrap_or_default(), - column_defs, - time_index, - primary_keys, - create_if_not_exists: request.create_if_not_exists, - table_options: (&request.table_options).into(), - table_id: None, // Should and will be assigned by Meta. - engine: request.engine, - }) - } - pub async fn insert_script( &self, query_ctx: QueryContextRef, diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 378d5f42dcc9..e7a561f87bdd 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -415,6 +415,7 @@ fn build_ddl_manager( table_metadata_manager.clone(), table_metadata_allocator.clone(), memory_region_keeper.clone(), + true, ) .context(error::InitDdlManagerSnafu)?, )) diff --git a/src/script/src/manager.rs b/src/script/src/manager.rs index c7301ab8d3cc..111adc24771e 100644 --- a/src/script/src/manager.rs +++ b/src/script/src/manager.rs @@ -16,9 +16,10 @@ use std::collections::HashMap; use std::sync::{Arc, RwLock}; +use api::v1::CreateTableExpr; use arc_swap::ArcSwap; use catalog::{OpenSystemTableHook, RegisterSystemTableRequest}; -use common_catalog::consts::{default_engine, DEFAULT_SCHEMA_NAME, SCRIPTS_TABLE_ID}; +use common_catalog::consts::{default_engine, DEFAULT_SCHEMA_NAME}; use common_error::ext::ErrorExt; use common_query::Output; use common_telemetry::logging; @@ -26,7 +27,6 @@ use futures::future::FutureExt; use query::QueryEngineRef; use servers::query_handler::grpc::GrpcQueryHandlerRef; use snafu::{OptionExt, ResultExt}; -use table::requests::{CreateTableRequest, TableOptions}; use table::TableRef; use crate::engine::{CompileContext, EvalContext, Script, ScriptEngine}; @@ -34,10 +34,7 @@ use crate::error::{ CompilePythonSnafu, ExecutePythonSnafu, Result, ScriptNotFoundSnafu, ScriptsTableNotFoundSnafu, }; use crate::python::{PyEngine, PyScript}; -use crate::table::{ - build_scripts_schema, get_primary_key_indices, ScriptsTable, ScriptsTableRef, - SCRIPTS_TABLE_NAME, -}; +use crate::table::{build_scripts_schema, ScriptsTable, ScriptsTableRef, SCRIPTS_TABLE_NAME}; pub struct ScriptManager { compiled: RwLock>>, @@ -69,19 +66,21 @@ impl ScriptManager { } pub fn create_table_request(&self, catalog: &str) -> RegisterSystemTableRequest { - let request = CreateTableRequest { - id: SCRIPTS_TABLE_ID, + let (time_index, primary_keys, column_defs) = build_scripts_schema(); + + let create_table_expr = CreateTableExpr { catalog_name: catalog.to_string(), // TODO(dennis): put the scripts table into `system` schema? // We always put the scripts table into `public` schema right now. schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: SCRIPTS_TABLE_NAME.to_string(), - desc: Some("GreptimeDB scripts table for Python".to_string()), - schema: build_scripts_schema(), - region_numbers: vec![0], - primary_key_indices: get_primary_key_indices(), + desc: "GreptimeDB scripts table for Python".to_string(), + column_defs, + time_index, + primary_keys, create_if_not_exists: true, - table_options: TableOptions::default(), + table_options: Default::default(), + table_id: None, // Should and will be assigned by Meta. engine: default_engine().to_string(), }; @@ -94,7 +93,7 @@ impl ScriptManager { }); RegisterSystemTableRequest { - create_table_request: request, + create_table_expr, open_hook: Some(hook), } } diff --git a/src/script/src/table.rs b/src/script/src/table.rs index 6620cd86fdb7..676118feb854 100644 --- a/src/script/src/table.rs +++ b/src/script/src/table.rs @@ -15,12 +15,11 @@ //! Scripts table use std::sync::Arc; -use api::helper::ColumnDataTypeWrapper; use api::v1::greptime_request::Request; use api::v1::value::ValueData; use api::v1::{ - ColumnDataType, ColumnSchema as PbColumnSchema, Row, RowInsertRequest, RowInsertRequests, Rows, - SemanticType, + ColumnDataType, ColumnDef, ColumnSchema as PbColumnSchema, Row, RowInsertRequest, + RowInsertRequests, Rows, SemanticType, }; use catalog::error::CompileScriptInternalSnafu; use common_error::ext::{BoxedError, ErrorExt}; @@ -33,7 +32,6 @@ use datafusion::logical_expr::{and, col, lit}; use datafusion_common::TableReference; use datafusion_expr::LogicalPlanBuilder; use datatypes::prelude::ScalarVector; -use datatypes::schema::{ColumnSchema, RawSchema}; use datatypes::vectors::{StringVector, Vector}; use query::plan::LogicalPlan; use query::QueryEngineRef; @@ -344,38 +342,35 @@ fn query_ctx(table_info: &TableInfo) -> QueryContextRef { .build() } -/// Returns the scripts schema's primary key indices -pub fn get_primary_key_indices() -> Vec { - let mut indices = vec![]; - for (index, c) in build_insert_column_schemas().into_iter().enumerate() { - if c.semantic_type == (SemanticType::Tag as i32) { - indices.push(index); - } - } +/// Builds scripts schema, returns (time index, primary keys, column defs) +pub fn build_scripts_schema() -> (String, Vec, Vec) { + let cols = build_insert_column_schemas(); - indices -} + let time_index = cols + .iter() + .find_map(|c| { + (c.semantic_type == (SemanticType::Timestamp as i32)).then(|| c.column_name.clone()) + }) + .unwrap(); // Safety: the column always exists -/// Build scripts table -pub fn build_scripts_schema() -> RawSchema { - let cols = build_insert_column_schemas() + let primary_keys = cols + .iter() + .filter(|c| (c.semantic_type == (SemanticType::Tag as i32))) + .map(|c| c.column_name.clone()) + .collect(); + + let column_defs = cols .into_iter() - .map(|c| { - let cs = ColumnSchema::new( - c.column_name, - // Safety: the type always exists - ColumnDataTypeWrapper::try_new(c.datatype, c.datatype_extension) - .unwrap() - .into(), - false, - ); - if c.semantic_type == SemanticType::Timestamp as i32 { - cs.with_time_index(true) - } else { - cs - } + .map(|c| ColumnDef { + name: c.column_name, + data_type: c.datatype, + is_nullable: false, + default_constraint: vec![], + semantic_type: c.semantic_type, + comment: "".to_string(), + datatype_extension: None, }) .collect(); - RawSchema::new(cols) + (time_index, primary_keys, column_defs) } diff --git a/src/servers/src/grpc/authorize.rs b/src/servers/src/grpc/authorize.rs index 84e203d3730e..ae003640ea4b 100644 --- a/src/servers/src/grpc/authorize.rs +++ b/src/servers/src/grpc/authorize.rs @@ -104,7 +104,7 @@ async fn do_auth( ) -> Result<(), tonic::Status> { let (catalog, schema) = extract_catalog_and_schema(req); - let query_ctx = QueryContext::with(catalog, schema); + let query_ctx = QueryContext::with(&catalog, &schema); let Some(user_provider) = user_provider else { query_ctx.set_current_user(Some(auth::userinfo_by_name(None))); @@ -119,7 +119,7 @@ async fn do_auth( let pwd = auth::Password::PlainText(password); let user_info = user_provider - .auth(id, pwd, catalog, schema) + .auth(id, pwd, &catalog, &schema) .await .map_err(|e| tonic::Status::unauthenticated(e.to_string()))?; diff --git a/src/servers/src/grpc/greptime_handler.rs b/src/servers/src/grpc/greptime_handler.rs index 19a4e1d373e0..a79217e6ee09 100644 --- a/src/servers/src/grpc/greptime_handler.rs +++ b/src/servers/src/grpc/greptime_handler.rs @@ -166,23 +166,28 @@ pub(crate) fn create_query_context(header: Option<&RequestHeader>) -> QueryConte } else { ( if !header.catalog.is_empty() { - &header.catalog + header.catalog.to_lowercase() } else { - DEFAULT_CATALOG_NAME + DEFAULT_CATALOG_NAME.to_string() }, if !header.schema.is_empty() { - &header.schema + header.schema.to_lowercase() } else { - DEFAULT_SCHEMA_NAME + DEFAULT_SCHEMA_NAME.to_string() }, ) } }) - .unwrap_or((DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME)); + .unwrap_or_else(|| { + ( + DEFAULT_CATALOG_NAME.to_string(), + DEFAULT_SCHEMA_NAME.to_string(), + ) + }); let timezone = parse_timezone(header.map(|h| h.timezone.as_str())); QueryContextBuilder::default() - .current_catalog(catalog.to_string()) - .current_schema(schema.to_string()) + .current_catalog(catalog) + .current_schema(schema) .timezone(Arc::new(timezone)) .build() } diff --git a/src/servers/src/http/authorize.rs b/src/servers/src/http/authorize.rs index de99828fb33e..12c270c43cda 100644 --- a/src/servers/src/http/authorize.rs +++ b/src/servers/src/http/authorize.rs @@ -64,8 +64,8 @@ pub async fn inner_auth( // TODO(ruihang): move this out of auth module let timezone = Arc::new(extract_timezone(&req)); let query_ctx_builder = QueryContextBuilder::default() - .current_catalog(catalog.to_string()) - .current_schema(schema.to_string()) + .current_catalog(catalog.clone()) + .current_schema(schema.clone()) .timezone(timezone); let query_ctx = query_ctx_builder.build(); @@ -97,8 +97,8 @@ pub async fn inner_auth( .auth( auth::Identity::UserId(&username, None), auth::Password::PlainText(password), - catalog, - schema, + &catalog, + &schema, ) .await { @@ -132,7 +132,7 @@ fn err_response(err: impl ErrorExt) -> Response { (StatusCode::UNAUTHORIZED, ErrorResponse::from_error(err)).into_response() } -pub fn extract_catalog_and_schema(request: &Request) -> (&str, &str) { +pub fn extract_catalog_and_schema(request: &Request) -> (String, String) { // parse database from header let dbname = request .headers() @@ -414,7 +414,7 @@ mod tests { .unwrap(); let db = extract_catalog_and_schema(&req); - assert_eq!(db, ("greptime", "tomcat")); + assert_eq!(db, ("greptime".to_string(), "tomcat".to_string())); } #[test] diff --git a/src/servers/src/http/error_result.rs b/src/servers/src/http/error_result.rs index d20de4f4602f..dac93c266ce2 100644 --- a/src/servers/src/http/error_result.rs +++ b/src/servers/src/http/error_result.rs @@ -84,6 +84,7 @@ impl IntoResponse for ErrorResponse { let status = StatusCode::from_u32(code).unwrap_or(StatusCode::Unknown); let status_code = match status { StatusCode::Success | StatusCode::Cancelled => HttpStatusCode::OK, + StatusCode::Unsupported | StatusCode::InvalidArguments | StatusCode::InvalidSyntax @@ -94,7 +95,9 @@ impl IntoResponse for ErrorResponse { | StatusCode::RegionNotFound | StatusCode::DatabaseNotFound | StatusCode::TableNotFound - | StatusCode::TableColumnNotFound => HttpStatusCode::BAD_REQUEST, + | StatusCode::TableColumnNotFound + | StatusCode::PlanQuery => HttpStatusCode::BAD_REQUEST, + StatusCode::PermissionDenied | StatusCode::AuthHeaderNotFound | StatusCode::InvalidAuthHeader @@ -102,16 +105,19 @@ impl IntoResponse for ErrorResponse { | StatusCode::UnsupportedPasswordType | StatusCode::UserPasswordMismatch | StatusCode::RegionReadonly => HttpStatusCode::UNAUTHORIZED, + StatusCode::AccessDenied => HttpStatusCode::FORBIDDEN, + + StatusCode::RateLimited => HttpStatusCode::TOO_MANY_REQUESTS, + + StatusCode::RegionNotReady + | StatusCode::RegionBusy + | StatusCode::StorageUnavailable => HttpStatusCode::SERVICE_UNAVAILABLE, + StatusCode::Internal | StatusCode::Unexpected | StatusCode::Unknown - | StatusCode::RegionNotReady - | StatusCode::RegionBusy - | StatusCode::RateLimited - | StatusCode::StorageUnavailable | StatusCode::RuntimeResourcesExhausted - | StatusCode::PlanQuery | StatusCode::EngineExecuteQuery => HttpStatusCode::INTERNAL_SERVER_ERROR, }; (status_code, resp).into_response() diff --git a/src/servers/src/http/prometheus.rs b/src/servers/src/http/prometheus.rs index af5567993fac..21e5b4c2ccd0 100644 --- a/src/servers/src/http/prometheus.rs +++ b/src/servers/src/http/prometheus.rs @@ -255,7 +255,7 @@ pub async fn labels_query( queries = form_params.matches.0; } if queries.is_empty() { - match get_all_column_names(catalog, schema, &handler.catalog_manager()).await { + match get_all_column_names(&catalog, &schema, &handler.catalog_manager()).await { Ok(labels) => { return PrometheusJsonResponse::success(PrometheusResponse::Labels(labels)) } @@ -530,7 +530,11 @@ pub async fn label_values_query( let (catalog, schema) = parse_catalog_and_schema_from_db_string(db); if label_name == METRIC_NAME_LABEL { - let mut table_names = match handler.catalog_manager().table_names(catalog, schema).await { + let mut table_names = match handler + .catalog_manager() + .table_names(&catalog, &schema) + .await + { Ok(table_names) => table_names, Err(e) => { return PrometheusJsonResponse::error(e.status_code().to_string(), e.output_msg()); diff --git a/src/servers/src/mysql/handler.rs b/src/servers/src/mysql/handler.rs index 9fe088cb6604..9e43aea7b42b 100644 --- a/src/servers/src/mysql/handler.rs +++ b/src/servers/src/mysql/handler.rs @@ -371,13 +371,17 @@ impl AsyncMysqlShim for MysqlInstanceShi async fn on_init<'a>(&'a mut self, database: &'a str, w: InitWriter<'a, W>) -> Result<()> { let (catalog_from_db, schema) = parse_optional_catalog_and_schema_from_db_string(database); - let catalog = if let Some(catalog) = catalog_from_db { - catalog.to_owned() + let catalog = if let Some(catalog) = &catalog_from_db { + catalog.to_string() } else { self.session.get_catalog() }; - if !self.query_handler.is_valid_schema(&catalog, schema).await? { + if !self + .query_handler + .is_valid_schema(&catalog, &schema) + .await? + { return w .error( ErrorKind::ER_WRONG_DB_NAME, @@ -391,7 +395,7 @@ impl AsyncMysqlShim for MysqlInstanceShi if let Some(schema_validator) = &self.user_provider { if let Err(e) = schema_validator - .authorize(&catalog, schema, user_info) + .authorize(&catalog, &schema, user_info) .await { METRIC_AUTH_FAILURE @@ -410,7 +414,7 @@ impl AsyncMysqlShim for MysqlInstanceShi if catalog_from_db.is_some() { self.session.set_catalog(catalog) } - self.session.set_schema(schema.into()); + self.session.set_schema(schema); w.ok().await.map_err(|e| e.into()) } diff --git a/src/servers/src/postgres/auth_handler.rs b/src/servers/src/postgres/auth_handler.rs index 3708f6f57a53..da316d04cf42 100644 --- a/src/servers/src/postgres/auth_handler.rs +++ b/src/servers/src/postgres/auth_handler.rs @@ -237,14 +237,11 @@ where if let Some(db) = db_ref { let (catalog, schema) = parse_catalog_and_schema_from_db_string(db); if query_handler - .is_valid_schema(catalog, schema) + .is_valid_schema(&catalog, &schema) .await .map_err(|e| PgWireError::ApiError(Box::new(e)))? { - Ok(DbResolution::Resolved( - catalog.to_owned(), - schema.to_owned(), - )) + Ok(DbResolution::Resolved(catalog, schema)) } else { Ok(DbResolution::NotFound(format!("Database not found: {db}"))) } diff --git a/src/session/src/context.rs b/src/session/src/context.rs index d401b0331637..ab1e468dc6e3 100644 --- a/src/session/src/context.rs +++ b/src/session/src/context.rs @@ -114,7 +114,7 @@ impl QueryContext { let (catalog, schema) = db_name .map(|db| { let (catalog, schema) = parse_catalog_and_schema_from_db_string(db); - (catalog.to_string(), schema.to_string()) + (catalog, schema) }) .unwrap_or_else(|| { ( diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 21541218ff7c..0c1e322952ac 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -23,11 +23,10 @@ use common_datasource::object_store::s3::is_supported_in_s3; use common_query::AddColumnLocation; use common_time::range::TimestampRange; use datatypes::prelude::VectorRef; -use datatypes::schema::{ColumnSchema, RawSchema}; +use datatypes::schema::ColumnSchema; use serde::{Deserialize, Serialize}; use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, PHYSICAL_TABLE_METADATA_KEY}; use store_api::mito_engine_options::is_mito_engine_option_key; -use store_api::storage::RegionNumber; use crate::error; use crate::error::ParseTableOptionSnafu; @@ -66,38 +65,6 @@ pub fn validate_table_option(key: &str) -> bool { .contains(&key) } -#[derive(Debug, Clone)] -pub struct CreateDatabaseRequest { - pub db_name: String, - pub create_if_not_exists: bool, -} - -/// Create table request -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct CreateTableRequest { - pub id: TableId, - pub catalog_name: String, - pub schema_name: String, - pub table_name: String, - pub desc: Option, - pub schema: RawSchema, - pub region_numbers: Vec, - pub primary_key_indices: Vec, - pub create_if_not_exists: bool, - pub table_options: TableOptions, - pub engine: String, -} - -impl CreateTableRequest { - pub fn table_ref(&self) -> TableReference { - TableReference { - catalog: &self.catalog_name, - schema: &self.schema_name, - table: &self.table_name, - } - } -} - #[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)] #[serde(default)] pub struct TableOptions { @@ -177,16 +144,6 @@ impl From<&TableOptions> for HashMap { } } -/// Open table request -#[derive(Debug, Clone)] -pub struct OpenTableRequest { - pub catalog_name: String, - pub schema_name: String, - pub table_name: String, - pub table_id: TableId, - pub region_numbers: Vec, -} - /// Alter table request #[derive(Debug, Clone, Serialize, Deserialize)] pub struct AlterTableRequest { @@ -199,20 +156,6 @@ pub struct AlterTableRequest { pub table_version: Option, } -impl AlterTableRequest { - pub fn table_ref(&self) -> TableReference { - TableReference { - catalog: &self.catalog_name, - schema: &self.schema_name, - table: &self.table_name, - } - } - - pub fn is_rename_table(&self) -> bool { - matches!(self.alter_kind, AlterKind::RenameTable { .. }) - } -} - /// Add column request #[derive(Debug, Clone, Serialize, Deserialize)] pub struct AddColumnRequest { @@ -228,48 +171,6 @@ pub enum AlterKind { RenameTable { new_table_name: String }, } -/// Drop table request -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct DropTableRequest { - pub catalog_name: String, - pub schema_name: String, - pub table_name: String, - pub table_id: TableId, -} - -impl DropTableRequest { - pub fn table_ref(&self) -> TableReference { - TableReference { - catalog: &self.catalog_name, - schema: &self.schema_name, - table: &self.table_name, - } - } -} - -/// Close table request -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct CloseTableRequest { - pub catalog_name: String, - pub schema_name: String, - pub table_name: String, - pub table_id: TableId, - /// Do nothing if region_numbers is empty - pub region_numbers: Vec, - /// flush regions - pub flush: bool, -} - -impl CloseTableRequest { - pub fn table_ref(&self) -> TableReference { - TableReference { - catalog: &self.catalog_name, - schema: &self.schema_name, - table: &self.table_name, - } - } -} - #[derive(Debug)] pub struct InsertRequest { pub catalog_name: String, diff --git a/src/table/src/test_util/empty_table.rs b/src/table/src/test_util/empty_table.rs index 8fde98e44068..bf5d68c2bd7c 100644 --- a/src/table/src/test_util/empty_table.rs +++ b/src/table/src/test_util/empty_table.rs @@ -20,40 +20,13 @@ use datatypes::schema::SchemaRef; use store_api::data_source::DataSource; use store_api::storage::ScanRequest; -use crate::metadata::{ - FilterPushDownType, TableInfo, TableInfoBuilder, TableMetaBuilder, TableType, -}; -use crate::requests::CreateTableRequest; +use crate::metadata::{FilterPushDownType, TableInfo}; use crate::thin_table::{ThinTable, ThinTableAdapter}; use crate::TableRef; pub struct EmptyTable; impl EmptyTable { - pub fn table(req: CreateTableRequest) -> TableRef { - let schema = Arc::new(req.schema.try_into().unwrap()); - let table_meta = TableMetaBuilder::default() - .schema(schema) - .primary_key_indices(req.primary_key_indices) - .next_column_id(0) - .options(req.table_options) - .region_numbers(req.region_numbers) - .engine(req.engine) - .build(); - let table_info = TableInfoBuilder::default() - .table_id(req.id) - .catalog_name(req.catalog_name) - .schema_name(req.schema_name) - .name(req.table_name) - .meta(table_meta.unwrap()) - .table_type(TableType::Temporary) - .desc(req.desc) - .build() - .unwrap(); - - Self::from_table_info(&table_info) - } - pub fn from_table_info(info: &TableInfo) -> TableRef { let thin_table = ThinTable::new(Arc::new(info.clone()), FilterPushDownType::Unsupported); let data_source = Arc::new(EmptyDataSource { diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 1c95982f6181..1e549c6f9337 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -77,6 +77,7 @@ opentelemetry-proto.workspace = true partition.workspace = true paste.workspace = true prost.workspace = true +rand.workspace = true script.workspace = true session = { workspace = true, features = ["testing"] } store-api.workspace = true diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index d427b2a8f27b..600eab950fbd 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -77,8 +77,8 @@ pub struct GreptimeDbClusterBuilder { store_config: Option, store_providers: Option>, datanodes: Option, - wal_config: DatanodeWalConfig, - meta_wal_config: MetaSrvWalConfig, + datanode_wal_config: DatanodeWalConfig, + metasrv_wal_config: MetaSrvWalConfig, shared_home_dir: Option>, meta_selector: Option, } @@ -108,8 +108,8 @@ impl GreptimeDbClusterBuilder { store_config: None, store_providers: None, datanodes: None, - wal_config: DatanodeWalConfig::default(), - meta_wal_config: MetaSrvWalConfig::default(), + datanode_wal_config: DatanodeWalConfig::default(), + metasrv_wal_config: MetaSrvWalConfig::default(), shared_home_dir: None, meta_selector: None, } @@ -134,14 +134,14 @@ impl GreptimeDbClusterBuilder { } #[must_use] - pub fn with_wal_config(mut self, wal_config: DatanodeWalConfig) -> Self { - self.wal_config = wal_config; + pub fn with_datanode_wal_config(mut self, datanode_wal_config: DatanodeWalConfig) -> Self { + self.datanode_wal_config = datanode_wal_config; self } #[must_use] - pub fn with_meta_wal_config(mut self, wal_meta: MetaSrvWalConfig) -> Self { - self.meta_wal_config = wal_meta; + pub fn with_metasrv_wal_config(mut self, metasrv_wal_config: MetaSrvWalConfig) -> Self { + self.metasrv_wal_config = metasrv_wal_config; self } @@ -174,7 +174,7 @@ impl GreptimeDbClusterBuilder { max_retry_times: 5, retry_delay: Duration::from_secs(1), }, - wal: self.meta_wal_config.clone(), + wal: self.metasrv_wal_config.clone(), ..Default::default() }; @@ -249,7 +249,7 @@ impl GreptimeDbClusterBuilder { store_config.clone(), vec![], home_dir, - self.wal_config.clone(), + self.datanode_wal_config.clone(), ) } else { let (opts, guard) = create_tmp_dir_and_datanode_opts( @@ -257,7 +257,7 @@ impl GreptimeDbClusterBuilder { StorageType::File, self.store_providers.clone().unwrap_or_default(), &format!("{}-dn-{}", self.cluster_name, datanode_id), - self.wal_config.clone(), + self.datanode_wal_config.clone(), ); storage_guards.push(guard.storage_guards); diff --git a/tests-integration/src/lib.rs b/tests-integration/src/lib.rs index edb88136d9c8..d3e700151345 100644 --- a/tests-integration/src/lib.rs +++ b/tests-integration/src/lib.rs @@ -13,6 +13,7 @@ // limitations under the License. #![feature(assert_matches)] + pub mod cluster; mod grpc; mod influxdb; diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index b76e6ed38ccb..5360f758c8b9 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -50,8 +50,8 @@ pub struct GreptimeDbStandalone { pub struct GreptimeDbStandaloneBuilder { instance_name: String, - wal_config: DatanodeWalConfig, - meta_wal_config: MetaSrvWalConfig, + datanode_wal_config: DatanodeWalConfig, + metasrv_wal_config: MetaSrvWalConfig, store_providers: Option>, default_store: Option, plugin: Option, @@ -64,8 +64,8 @@ impl GreptimeDbStandaloneBuilder { store_providers: None, plugin: None, default_store: None, - wal_config: DatanodeWalConfig::default(), - meta_wal_config: MetaSrvWalConfig::default(), + datanode_wal_config: DatanodeWalConfig::default(), + metasrv_wal_config: MetaSrvWalConfig::default(), } } @@ -96,23 +96,24 @@ impl GreptimeDbStandaloneBuilder { } #[must_use] - pub fn with_wal_config(mut self, wal_config: DatanodeWalConfig) -> Self { - self.wal_config = wal_config; + pub fn with_datanode_wal_config(mut self, datanode_wal_config: DatanodeWalConfig) -> Self { + self.datanode_wal_config = datanode_wal_config; self } #[must_use] - pub fn with_meta_wal_config(mut self, wal_meta: MetaSrvWalConfig) -> Self { - self.meta_wal_config = wal_meta; + pub fn with_metasrv_wal_config(mut self, metasrv_wal_config: MetaSrvWalConfig) -> Self { + self.metasrv_wal_config = metasrv_wal_config; self } pub async fn build_with( &self, kv_backend: KvBackendRef, - procedure_manager: ProcedureManagerRef, guard: TestGuard, mix_options: MixOptions, + procedure_manager: ProcedureManagerRef, + register_procedure_loaders: bool, ) -> GreptimeDbStandalone { let plugins = self.plugin.clone().unwrap_or_default(); @@ -153,6 +154,7 @@ impl GreptimeDbStandaloneBuilder { table_metadata_manager, table_meta_allocator, Arc::new(MemoryRegionKeeper::default()), + register_procedure_loaders, ) .unwrap(), ); @@ -194,7 +196,7 @@ impl GreptimeDbStandaloneBuilder { default_store_type, store_types, &self.instance_name, - self.wal_config.clone(), + self.datanode_wal_config.clone(), ); let kv_backend_config = KvBackendConfig::default(); @@ -207,7 +209,7 @@ impl GreptimeDbStandaloneBuilder { .await .unwrap(); - let wal_meta = self.meta_wal_config.clone(); + let wal_meta = self.metasrv_wal_config.clone(); let mix_options = MixOptions { data_home: opts.storage.data_home.to_string(), procedure: procedure_config, @@ -218,7 +220,7 @@ impl GreptimeDbStandaloneBuilder { wal_meta, }; - self.build_with(kv_backend, procedure_manager, guard, mix_options) + self.build_with(kv_backend, guard, mix_options, procedure_manager, true) .await } } diff --git a/tests-integration/src/tests/instance_kafka_wal_test.rs b/tests-integration/src/tests/instance_kafka_wal_test.rs index 2135c498239a..031fc16bc310 100644 --- a/tests-integration/src/tests/instance_kafka_wal_test.rs +++ b/tests-integration/src/tests/instance_kafka_wal_test.rs @@ -12,31 +12,43 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::assert_matches::assert_matches; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -use client::OutputData; -use common_query::Output; -use common_recordbatch::util; +use client::DEFAULT_CATALOG_NAME; +use common_query::{Output, OutputData}; use datatypes::vectors::{TimestampMillisecondVector, VectorRef}; -use frontend::error::Result; use frontend::instance::Instance; +use itertools::Itertools; +use rand::rngs::ThreadRng; +use rand::Rng; use rstest::rstest; use rstest_reuse::apply; use servers::query_handler::sql::SqlQueryHandler; use session::context::{QueryContext, QueryContextRef}; +use tokio::sync::Mutex; use crate::tests::test_util::*; #[apply(both_instances_cases_with_kafka_wal)] -async fn test_create_database_and_insert_query(instance: Option>) { - let Some(instance) = instance else { return }; - +async fn test_create_database_and_insert_query( + rebuildable_instance: Option>, +) { + let Some(instance) = rebuildable_instance else { + return; + }; let instance = instance.frontend(); - let output = execute_sql(&instance, "create database test").await; - assert!(matches!(output.data, OutputData::AffectedRows(1))); + let output = execute_sql_with( + &instance, + "create database test", + QueryContext::with(DEFAULT_CATALOG_NAME, "test"), + ) + .await; + assert_matches!(output.data, OutputData::AffectedRows(1)); - let output = execute_sql( + let output = execute_sql_with( &instance, r#"create table greptime.test.demo( host STRING, @@ -44,25 +56,32 @@ async fn test_create_database_and_insert_query(instance: Option { - let batches = util::collect(s).await.unwrap(); + let batches = common_recordbatch::util::collect(s).await.unwrap(); assert_eq!(1, batches[0].num_columns()); assert_eq!( Arc::new(TimestampMillisecondVector::from_vec(vec![ @@ -75,24 +94,216 @@ async fn test_create_database_and_insert_query(instance: Option, sql: &str) -> Output { - execute_sql_with(instance, sql, QueryContext::arc()).await +/// Maintains metadata of a table. +struct Table { + name: String, + logical_timer: AtomicU64, + inserted: Mutex>, } -async fn try_execute_sql_with( - instance: &Arc, - sql: &str, - query_ctx: QueryContextRef, -) -> Result { - instance.do_query(sql, query_ctx).await.remove(0) +/// Inserts some data to a collection of tables and checks if these data exist after restart. +#[apply(both_instances_cases_with_kafka_wal)] +async fn test_replay(rebuildable_instance: Option>) { + let Some(mut rebuildable_instance) = rebuildable_instance else { + return; + }; + let instance = rebuildable_instance.frontend(); + + let output = execute_sql_with( + &instance, + "create database test", + QueryContext::with(DEFAULT_CATALOG_NAME, "test"), + ) + .await; + assert_matches!(output.data, OutputData::AffectedRows(1)); + + let tables = create_tables("test_replay", &instance, 10).await; + insert_data(&tables, &instance, 15).await; + ensure_data_exists(&tables, &instance).await; + + // Rebuilds to emulate restart which triggers a replay. + rebuildable_instance.rebuild().await; + ensure_data_exists(&tables, &rebuildable_instance.frontend()).await; } +/// Inserts some data to a collection of tables and sends alter table requests to force flushing each table. +/// Then checks if these data exist after restart. +#[apply(both_instances_cases_with_kafka_wal)] +async fn test_flush_then_replay(rebuildable_instance: Option>) { + let Some(mut rebuildable_instance) = rebuildable_instance else { + return; + }; + let instance = rebuildable_instance.frontend(); + + let output = execute_sql_with( + &instance, + "create database test", + QueryContext::with(DEFAULT_CATALOG_NAME, "test"), + ) + .await; + assert_matches!(output.data, OutputData::AffectedRows(1)); + + let tables = create_tables("test_flush_then_replay", &instance, 10).await; + insert_data(&tables, &instance, 15).await; + ensure_data_exists(&tables, &instance).await; + + // Alters tables to force flushing. + futures::future::join_all(tables.iter().map(|table| { + let instance = instance.clone(); + async move { + assert_matches!( + do_alter(&instance, &table.name).await.data, + OutputData::AffectedRows(0) + ); + } + })) + .await; + + // Inserts more data and check all data exists after flushing. + insert_data(&tables, &instance, 15).await; + ensure_data_exists(&tables, &instance).await; + + // Rebuilds to emulate restart which triggers a replay. + rebuildable_instance.rebuild().await; + ensure_data_exists(&tables, &rebuildable_instance.frontend()).await; +} + +/// Creates a given number of tables. +async fn create_tables(test_name: &str, instance: &Arc, num_tables: usize) -> Vec { + futures::future::join_all((0..num_tables).map(|i| { + let instance = instance.clone(); + async move { + let table_name = format!("{}_{}", test_name, i); + assert_matches!( + do_create(&instance, &table_name).await.data, + OutputData::AffectedRows(0) + ); + Table { + name: table_name, + logical_timer: AtomicU64::new(1685508715000), + inserted: Mutex::new(Vec::new()), + } + } + })) + .await +} + +/// Inserts data to the tables in parallel. +/// The reason why the insertion is parallel is that we want to ensure the kafka wal works as expected under parallel write workloads. +async fn insert_data(tables: &[Table], instance: &Arc, num_writers: usize) { + // Each writer randomly chooses a table and inserts a sequence of rows into the table. + futures::future::join_all((0..num_writers).map(|_| async { + let mut rng = rand::thread_rng(); + let table = &tables[rng.gen_range(0..tables.len())]; + for _ in 0..10 { + let ts = table.logical_timer.fetch_add(1000, Ordering::Relaxed); + let row = make_row(ts, &mut rng); + assert_matches!( + do_insert(instance, &table.name, row).await.data, + OutputData::AffectedRows(1) + ); + { + // Inserting into the `inserted` vector and inserting into the database are not atomic + // which requires us to do a sorting upon checking data integrity. + let mut inserted = table.inserted.lock().await; + inserted.push(ts); + } + } + })) + .await; +} + +/// Sends queries to ensure the data exists for each table. +async fn ensure_data_exists(tables: &[Table], instance: &Arc) { + futures::future::join_all(tables.iter().map(|table| async { + let output = do_query(instance, &table.name).await; + let OutputData::Stream(stream) = output.data else { + unreachable!() + }; + let record_batches = common_recordbatch::util::collect(stream).await.unwrap(); + let queried = record_batches + .into_iter() + .flat_map(|rb| { + rb.rows() + .map(|row| row[0].as_timestamp().unwrap().value() as u64) + .collect::>() + }) + .collect::>(); + let inserted = table + .inserted + .lock() + .await + .iter() + .sorted() + .cloned() + .collect::>(); + assert_eq!(queried, inserted); + })) + .await; +} + +/// Sends a create table SQL. +async fn do_create(instance: &Arc, table_name: &str) -> Output { + execute_sql_with( + instance, + &format!( + r#"create table greptime.test.{} ( + host STRING, + cpu DOUBLE, + memory DOUBLE, + ts timestamp, + TIME INDEX(ts) + )"#, + table_name + ), + QueryContext::with(DEFAULT_CATALOG_NAME, "test"), + ) + .await +} + +/// Sends an alter table SQL. +async fn do_alter(instance: &Arc, table_name: &str) -> Output { + execute_sql_with( + instance, + &format!("alter table {} add column new_col STRING", table_name), + QueryContext::with(DEFAULT_CATALOG_NAME, "test"), + ) + .await +} + +/// Sends a insert SQL. +async fn do_insert(instance: &Arc, table_name: &str, row: String) -> Output { + execute_sql_with( + instance, + &format!("insert into test.{table_name}(host, cpu, memory, ts) values {row}"), + QueryContext::with(DEFAULT_CATALOG_NAME, "test"), + ) + .await +} + +/// Sends a query SQL. +async fn do_query(instance: &Arc, table_name: &str) -> Output { + execute_sql_with( + instance, + &format!("select ts from test.{table_name} order by ts"), + QueryContext::with(DEFAULT_CATALOG_NAME, "test"), + ) + .await +} + +/// Sends a SQL with the given context which specifies the catalog name and schema name, aka. database name. +/// The query context is required since the tables are created in the `test` schema rather than the default `public` schema. async fn execute_sql_with( instance: &Arc, sql: &str, query_ctx: QueryContextRef, ) -> Output { - try_execute_sql_with(instance, sql, query_ctx) - .await - .unwrap() + instance.do_query(sql, query_ctx).await.remove(0).unwrap() +} + +fn make_row(ts: u64, rng: &mut ThreadRng) -> String { + let host = format!("host{}", rng.gen_range(0..5)); + let cpu: f64 = rng.gen_range(0.0..99.9); + let memory: f64 = rng.gen_range(0.0..999.9); + format!("('{host}', {cpu}, {memory}, {ts})") } diff --git a/tests-integration/src/tests/test_util.rs b/tests-integration/src/tests/test_util.rs index 9f3bad224b91..43f3981fee0b 100644 --- a/tests-integration/src/tests/test_util.rs +++ b/tests-integration/src/tests/test_util.rs @@ -114,7 +114,7 @@ impl MockInstanceBuilder { } = instance; MockInstanceImpl::Standalone( builder - .build_with(kv_backend, procedure_manager, guard, mix_options) + .build_with(kv_backend, guard, mix_options, procedure_manager, false) .await, ) } @@ -223,11 +223,11 @@ pub(crate) async fn standalone_with_kafka_wal() -> Option>(); let test_name = uuid::Uuid::new_v4().to_string(); let builder = GreptimeDbStandaloneBuilder::new(&test_name) - .with_wal_config(DatanodeWalConfig::Kafka(DatanodeKafkaConfig { + .with_datanode_wal_config(DatanodeWalConfig::Kafka(DatanodeKafkaConfig { broker_endpoints: endpoints.clone(), ..Default::default() })) - .with_meta_wal_config(MetaSrvWalConfig::Kafka(MetaSrvKafkaConfig { + .with_metasrv_wal_config(MetaSrvWalConfig::Kafka(MetaSrvKafkaConfig { broker_endpoints: endpoints, topic_name_prefix: test_name.to_string(), num_topics: 3, @@ -253,11 +253,11 @@ pub(crate) async fn distributed_with_kafka_wal() -> Option Option>, + rebuildable_instance: Option>, ) { } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index abf207fd3577..689eeeab8c41 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -252,7 +252,7 @@ pub async fn test_sql_api(store_type: StorageType) { .get("/v1/sql?sql=select cpu, ts from demo limit 1;select cpu, ts from demo2 where ts > 0;") .send() .await; - assert_eq!(res.status(), StatusCode::INTERNAL_SERVER_ERROR); + assert_eq!(res.status(), StatusCode::BAD_REQUEST); let body = serde_json::from_str::(&res.text().await).unwrap(); // TODO(shuiyisong): fix this when return source err msg to client side diff --git a/tests-integration/tests/region_migration.rs b/tests-integration/tests/region_migration.rs index 89175d9093a3..79f7a3d38a68 100644 --- a/tests-integration/tests/region_migration.rs +++ b/tests-integration/tests/region_migration.rs @@ -111,12 +111,12 @@ pub async fn test_region_migration(store_type: StorageType, endpoints: Vec