diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 0ed19fc755e4..a5e39303c810 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -16,6 +16,7 @@ use std::time::Duration; use clap::Parser; use common_telemetry::logging; +use datanode::datanode::builder::DatanodeBuilder; use datanode::datanode::{Datanode, DatanodeOptions}; use meta_client::MetaClientOptions; use servers::Mode; @@ -162,7 +163,8 @@ impl StartCommand { logging::info!("Datanode start command: {:#?}", self); logging::info!("Datanode options: {:#?}", opts); - let datanode = Datanode::new(opts, Default::default()) + let datanode = DatanodeBuilder::new(opts, Default::default()) + .build() .await .context(StartDatanodeSnafu)?; diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 8c26ced2df21..117ccccfb35c 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -23,6 +23,7 @@ use common_meta::kv_backend::KvBackendRef; use common_procedure::ProcedureManagerRef; use common_telemetry::info; use common_telemetry::logging::LoggingOptions; +use datanode::datanode::builder::DatanodeBuilder; use datanode::datanode::{Datanode, DatanodeOptions, ProcedureConfig, StorageConfig}; use datanode::region_server::RegionServer; use frontend::catalog::FrontendCatalogManager; @@ -306,7 +307,8 @@ impl StartCommand { .await .context(StartFrontendSnafu)?; - let datanode = Datanode::new(dn_opts.clone(), plugins.clone()) + let datanode = DatanodeBuilder::new(dn_opts.clone(), plugins.clone()) + .build() .await .context(StartDatanodeSnafu)?; let region_server = datanode.region_server(); diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 566e49f05bea..f7c4b3435f8b 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -4,6 +4,9 @@ version.workspace = true edition.workspace = true license.workspace = true +[features] +testing = [] + [dependencies] api = { workspace = true } arrow-flight.workspace = true diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index e2ebbe6c9310..66b80f56b128 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -202,6 +202,11 @@ impl TableMetadataManager { &self.table_route_manager } + #[cfg(feature = "testing")] + pub fn kv_backend(&self) -> &KvBackendRef { + &self.kv_backend + } + pub async fn get_full_table_info( &self, table_id: TableId, diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 07c42d6c04de..135d751c0104 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -14,6 +14,8 @@ //! Datanode configurations +pub mod builder; + use std::path::Path; use std::sync::Arc; use std::time::Duration; @@ -33,7 +35,7 @@ use meta_client::MetaClientOptions; use mito2::config::MitoConfig; use mito2::engine::MitoEngine; use object_store::util::normalize_dir; -use query::{QueryEngineFactory, QueryEngineRef}; +use query::QueryEngineFactory; use secrecy::SecretString; use serde::{Deserialize, Serialize}; use servers::heartbeat_options::HeartbeatOptions; @@ -53,7 +55,6 @@ use tokio::fs; use crate::error::{ CreateDirSnafu, OpenLogStoreSnafu, Result, RuntimeResourceSnafu, ShutdownInstanceSnafu, }; -use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; use crate::heartbeat::HeartbeatTask; use crate::region_server::RegionServer; use crate::server::Services; @@ -401,12 +402,14 @@ pub struct Datanode { services: Option, heartbeat_task: Option, region_server: RegionServer, - query_engine: QueryEngineRef, greptimedb_telemetry_task: Arc, } impl Datanode { - pub async fn new(opts: DatanodeOptions, plugins: Arc) -> Result { + async fn new_region_server( + opts: &DatanodeOptions, + plugins: Arc, + ) -> Result { let query_engine_factory = QueryEngineFactory::new_with_plugins( // query engine in datanode only executes plan with resolved table source. MemoryCatalogManager::with_default_setup(), @@ -425,46 +428,30 @@ impl Datanode { ); let mut region_server = RegionServer::new(query_engine.clone(), runtime.clone()); - let log_store = Self::build_log_store(&opts).await?; - let object_store = store::new_object_store(&opts).await?; - let engines = Self::build_store_engines(&opts, log_store, object_store).await?; + let log_store = Self::build_log_store(opts).await?; + let object_store = store::new_object_store(opts).await?; + let engines = Self::build_store_engines(opts, log_store, object_store).await?; for engine in engines { region_server.register_engine(engine); } - // build optional things with different modes - let services = match opts.mode { - Mode::Distributed => Some(Services::try_new(region_server.clone(), &opts).await?), - Mode::Standalone => None, - }; - let heartbeat_task = match opts.mode { - Mode::Distributed => Some(HeartbeatTask::try_new(&opts, region_server.clone()).await?), - Mode::Standalone => None, - }; - let greptimedb_telemetry_task = get_greptimedb_telemetry_task( - Some(opts.storage.data_home.clone()), - &opts.mode, - opts.enable_telemetry, - ) - .await; - - Ok(Self { - opts, - services, - heartbeat_task, - region_server, - query_engine, - greptimedb_telemetry_task, - }) + Ok(region_server) } pub async fn start(&mut self) -> Result<()> { info!("Starting datanode instance..."); + + self.start_heartbeat().await?; + + let _ = self.greptimedb_telemetry_task.start(); + self.start_services().await + } + + pub async fn start_heartbeat(&self) -> Result<()> { if let Some(task) = &self.heartbeat_task { task.start().await?; } - let _ = self.greptimedb_telemetry_task.start(); - self.start_services().await + Ok(()) } /// Start services of datanode. This method call will block until services are shutdown. @@ -503,10 +490,6 @@ impl Datanode { self.region_server.clone() } - pub fn query_engine(&self) -> QueryEngineRef { - self.query_engine.clone() - } - // internal utils /// Build [RaftEngineLogStore] diff --git a/src/datanode/src/datanode/builder.rs b/src/datanode/src/datanode/builder.rs new file mode 100644 index 000000000000..6f2379d6a2a9 --- /dev/null +++ b/src/datanode/src/datanode/builder.rs @@ -0,0 +1,98 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_base::Plugins; +use meta_client::client::MetaClient; +use servers::Mode; +use snafu::OptionExt; + +use crate::datanode::{Datanode, DatanodeOptions}; +use crate::error::{MissingMetasrvOptsSnafu, MissingNodeIdSnafu, Result}; +use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; +use crate::heartbeat::{new_metasrv_client, HeartbeatTask}; +use crate::server::Services; + +pub struct DatanodeBuilder { + opts: DatanodeOptions, + plugins: Arc, + meta_client: Option, +} + +impl DatanodeBuilder { + pub fn new(opts: DatanodeOptions, plugins: Arc) -> Self { + Self { + opts, + plugins, + meta_client: None, + } + } + + pub fn with_meta_client(self, meta_client: MetaClient) -> Self { + Self { + meta_client: Some(meta_client), + ..self + } + } + + pub async fn build(mut self) -> Result { + let region_server = Datanode::new_region_server(&self.opts, self.plugins.clone()).await?; + + let mode = &self.opts.mode; + + let heartbeat_task = match mode { + Mode::Distributed => { + let meta_client = if let Some(meta_client) = self.meta_client.take() { + meta_client + } else { + let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?; + + let meta_config = self + .opts + .meta_client_options + .as_ref() + .context(MissingMetasrvOptsSnafu)?; + + new_metasrv_client(node_id, meta_config).await? + }; + + let heartbeat_task = + HeartbeatTask::try_new(&self.opts, region_server.clone(), meta_client).await?; + Some(heartbeat_task) + } + Mode::Standalone => None, + }; + + let services = match mode { + Mode::Distributed => Some(Services::try_new(region_server.clone(), &self.opts).await?), + Mode::Standalone => None, + }; + + let greptimedb_telemetry_task = get_greptimedb_telemetry_task( + Some(self.opts.storage.data_home.clone()), + mode, + self.opts.enable_telemetry, + ) + .await; + + Ok(Datanode { + opts: self.opts, + services, + heartbeat_task, + region_server, + greptimedb_telemetry_task, + }) + } +} diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index 92843e7630aa..deac537772b9 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -27,16 +27,14 @@ use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message; use common_telemetry::{debug, error, info, trace, warn}; use meta_client::client::{HeartbeatSender, MetaClient, MetaClientBuilder}; use meta_client::MetaClientOptions; -use snafu::{OptionExt, ResultExt}; +use snafu::ResultExt; use tokio::sync::mpsc; use tokio::time::Instant; use self::handler::RegionHeartbeatResponseHandler; use crate::alive_keeper::RegionAliveKeeper; use crate::datanode::DatanodeOptions; -use crate::error::{ - self, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu, Result, -}; +use crate::error::{self, MetaClientInitSnafu, Result}; use crate::region_server::RegionServer; pub(crate) mod handler; @@ -62,15 +60,11 @@ impl Drop for HeartbeatTask { impl HeartbeatTask { /// Create a new heartbeat task instance. - pub async fn try_new(opts: &DatanodeOptions, region_server: RegionServer) -> Result { - let meta_client = new_metasrv_client( - opts.node_id.context(MissingNodeIdSnafu)?, - opts.meta_client_options - .as_ref() - .context(MissingMetasrvOptsSnafu)?, - ) - .await?; - + pub async fn try_new( + opts: &DatanodeOptions, + region_server: RegionServer, + meta_client: MetaClient, + ) -> Result { let region_alive_keeper = Arc::new(RegionAliveKeeper::new( region_server.clone(), opts.heartbeat.interval_millis, diff --git a/src/frontend/src/statement.rs b/src/frontend/src/statement.rs index ae71a6b31a1f..7ff68a9de06b 100644 --- a/src/frontend/src/statement.rs +++ b/src/frontend/src/statement.rs @@ -194,7 +194,11 @@ impl StatementExecutor { } } - async fn plan(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result { + pub async fn plan( + &self, + stmt: QueryStatement, + query_ctx: QueryContextRef, + ) -> Result { self.query_engine .planner() .plan(stmt, query_ctx) diff --git a/src/servers/src/grpc.rs b/src/servers/src/grpc.rs index 44fce8b4a275..3307ba55dbfd 100644 --- a/src/servers/src/grpc.rs +++ b/src/servers/src/grpc.rs @@ -21,11 +21,11 @@ pub mod region_server; use std::net::SocketAddr; use std::sync::Arc; -#[cfg(feature = "testing")] -use api::v1::greptime_database_server::GreptimeDatabase; use api::v1::greptime_database_server::GreptimeDatabaseServer; use api::v1::health_check_server::{HealthCheck, HealthCheckServer}; use api::v1::prometheus_gateway_server::{PrometheusGateway, PrometheusGatewayServer}; +#[cfg(feature = "testing")] +use api::v1::region::region_server::Region; use api::v1::region::region_server::RegionServer; use api::v1::{HealthCheckRequest, HealthCheckResponse}; #[cfg(feature = "testing")] @@ -123,12 +123,12 @@ impl GrpcServer { #[cfg(feature = "testing")] pub fn create_flight_service(&self) -> FlightServiceServer { - FlightServiceServer::new(FlightCraftWrapper(self.database_handler.clone().unwrap())) + FlightServiceServer::new(FlightCraftWrapper(self.flight_handler.clone().unwrap())) } #[cfg(feature = "testing")] - pub fn create_database_service(&self) -> GreptimeDatabaseServer { - GreptimeDatabaseServer::new(DatabaseService::new(self.database_handler.clone().unwrap())) + pub fn create_region_service(&self) -> RegionServer { + RegionServer::new(self.region_server_handler.clone().unwrap()) } pub fn create_healthcheck_service(&self) -> HealthCheckServer { diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index ea69c1af25f7..f3a40d23ff70 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -18,9 +18,11 @@ chrono.workspace = true client = { workspace = true, features = ["testing"] } common-base = { workspace = true } common-catalog = { workspace = true } +common-config = { workspace = true } common-error = { workspace = true } common-grpc = { workspace = true } -common-meta = { workspace = true } +common-meta = { workspace = true, features = ["testing"] } +common-procedure = { workspace = true } common-query = { workspace = true } common-recordbatch = { workspace = true } common-runtime = { workspace = true } @@ -53,6 +55,7 @@ sqlx = { version = "0.6", features = [ "postgres", "chrono", ] } +substrait = { workspace = true } table = { workspace = true } tempfile.workspace = true tokio.workspace = true @@ -61,7 +64,6 @@ tower = "0.4" uuid.workspace = true [dev-dependencies] -common-procedure = { workspace = true } datafusion-expr.workspace = true datafusion.workspace = true itertools.workspace = true diff --git a/tests-integration/src/catalog.rs b/tests-integration/src/catalog.rs deleted file mode 100644 index adf2b8a11a11..000000000000 --- a/tests-integration/src/catalog.rs +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#[cfg(test)] -mod tests { - use catalog::RegisterSystemTableRequest; - use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE}; - use script::table::{build_scripts_schema, SCRIPTS_TABLE_NAME}; - use table::requests::{CreateTableRequest, TableOptions}; - - #[tokio::test(flavor = "multi_thread")] - async fn test_register_system_table() { - let instance = - crate::tests::create_distributed_instance("test_register_system_table").await; - - let catalog_name = DEFAULT_CATALOG_NAME; - let schema_name = DEFAULT_SCHEMA_NAME; - let table_name = SCRIPTS_TABLE_NAME; - let request = CreateTableRequest { - id: 1, - catalog_name: catalog_name.to_string(), - schema_name: schema_name.to_string(), - table_name: table_name.to_string(), - desc: Some("Scripts table".to_string()), - schema: build_scripts_schema(), - region_numbers: vec![0], - primary_key_indices: vec![0, 1], - create_if_not_exists: true, - table_options: TableOptions::default(), - engine: MITO_ENGINE.to_string(), - }; - - instance - .frontend() - .catalog_manager() - .register_system_table(RegisterSystemTableRequest { - create_table_request: request, - open_hook: None, - }) - .await - .unwrap(); - - assert!( - instance - .frontend() - .catalog_manager() - .table(catalog_name, schema_name, table_name) - .await - .unwrap() - .is_some(), - "the registered system table cannot be found in catalog" - ); - - let mut actually_created_table_in_datanode = 0; - for datanode in instance.datanodes().values() { - if datanode - .catalog_manager() - .table(catalog_name, schema_name, table_name) - .await - .unwrap() - .is_some() - { - actually_created_table_in_datanode += 1; - } - } - assert_eq!( - actually_created_table_in_datanode, 1, - "system table should be actually created at one and only one datanode" - ) - } -} diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 8659054e8aa0..d7eec56ed6c7 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -21,26 +21,21 @@ use client::client_manager::DatanodeClients; use client::Client; use common_base::Plugins; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; -use common_meta::key::TableMetadataManager; use common_meta::peer::Peer; use common_meta::DatanodeId; use common_runtime::Builder as RuntimeBuilder; use common_test_util::temp_dir::create_temp_dir; -use datanode::datanode::{DatanodeOptions, ObjectStoreConfig, ProcedureConfig}; -use datanode::heartbeat::HeartbeatTask; -use datanode::instance::Instance as DatanodeInstance; +use datanode::datanode::builder::DatanodeBuilder; +use datanode::datanode::{Datanode, DatanodeOptions, ObjectStoreConfig, ProcedureConfig}; use frontend::frontend::FrontendOptions; use frontend::instance::{FrontendInstance, Instance as FeInstance}; use meta_client::client::MetaClientBuilder; use meta_srv::cluster::MetaPeerClientRef; -use meta_srv::metadata_service::{DefaultMetadataService, MetadataService}; use meta_srv::metasrv::{MetaSrv, MetaSrvOptions}; use meta_srv::mocks::MockInfo; -use meta_srv::service::store::kv::{KvBackendAdapter, KvStoreRef}; +use meta_srv::service::store::kv::KvStoreRef; use meta_srv::service::store::memory::MemStore; -use servers::grpc::greptime_handler::GreptimeRequestHandler; use servers::grpc::GrpcServer; -use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor; use servers::Mode; use tonic::transport::Server; use tower::service_fn; @@ -53,8 +48,7 @@ pub struct GreptimeDbCluster { pub storage_guards: Vec, pub _dir_guards: Vec, - pub datanode_instances: HashMap>, - pub datanode_heartbeat_tasks: HashMap>, + pub datanode_instances: HashMap, pub kv_store: KvStoreRef, pub meta_srv: MetaSrv, pub frontend: Arc, @@ -95,7 +89,7 @@ impl GreptimeDbClusterBuilder { let meta_srv = self.build_metasrv(datanode_clients.clone()).await; - let (datanode_instances, heartbeat_tasks, storage_guards, dir_guards) = + let (datanode_instances, storage_guards, dir_guards) = self.build_datanodes(meta_srv.clone(), datanodes).await; build_datanode_clients(datanode_clients.clone(), &datanode_instances, datanodes).await; @@ -113,7 +107,6 @@ impl GreptimeDbClusterBuilder { storage_guards, _dir_guards: dir_guards, datanode_instances, - datanode_heartbeat_tasks: heartbeat_tasks, kv_store: self.kv_store.clone(), meta_srv: meta_srv.meta_srv, frontend, @@ -131,18 +124,7 @@ impl GreptimeDbClusterBuilder { ..Default::default() }; - let mock = - meta_srv::mocks::mock(opt, self.kv_store.clone(), None, Some(datanode_clients)).await; - let table_metadata_manager = Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap( - mock.meta_srv.kv_store().clone(), - ))); - let metadata_service = DefaultMetadataService::new(table_metadata_manager); - metadata_service - .create_schema("another_catalog", "another_schema", true) - .await - .unwrap(); - - mock + meta_srv::mocks::mock(opt, self.kv_store.clone(), None, Some(datanode_clients)).await } async fn build_datanodes( @@ -150,13 +132,11 @@ impl GreptimeDbClusterBuilder { meta_srv: MockInfo, datanodes: u32, ) -> ( - HashMap>, - HashMap>, + HashMap, Vec, Vec, ) { let mut instances = HashMap::with_capacity(datanodes as usize); - let mut heartbeat_tasks = HashMap::with_capacity(datanodes as usize); let mut storage_guards = Vec::with_capacity(datanodes as usize); let mut dir_guards = Vec::with_capacity(datanodes as usize); @@ -167,12 +147,9 @@ impl GreptimeDbClusterBuilder { let home_tmp_dir = create_temp_dir(&format!("gt_home_{}", &self.cluster_name)); let home_dir = home_tmp_dir.path().to_str().unwrap().to_string(); - let wal_tmp_dir = create_temp_dir(&format!("gt_wal_{}", &self.cluster_name)); - let wal_dir = wal_tmp_dir.path().to_str().unwrap().to_string(); - dir_guards.push(FileDirGuard::new(home_tmp_dir, false)); - dir_guards.push(FileDirGuard::new(wal_tmp_dir, true)); + dir_guards.push(FileDirGuard::new(home_tmp_dir)); - create_datanode_opts(store_config.clone(), home_dir, wal_dir) + create_datanode_opts(store_config.clone(), home_dir) } else { let (opts, guard) = create_tmp_dir_and_datanode_opts( StorageType::File, @@ -181,19 +158,17 @@ impl GreptimeDbClusterBuilder { storage_guards.push(guard.storage_guard); dir_guards.push(guard.home_guard); - dir_guards.push(guard.wal_guard); opts }; opts.node_id = Some(datanode_id); opts.mode = Mode::Distributed; - let dn_instance = self.create_datanode(&opts, meta_srv.clone()).await; + let datanode = self.create_datanode(opts, meta_srv.clone()).await; - let _ = instances.insert(datanode_id, dn_instance.0.clone()); - let _ = heartbeat_tasks.insert(datanode_id, dn_instance.1); + instances.insert(datanode_id, datanode); } - (instances, heartbeat_tasks, storage_guards, dir_guards) + (instances, storage_guards, dir_guards) } async fn wait_datanodes_alive( @@ -215,19 +190,24 @@ impl GreptimeDbClusterBuilder { panic!("Some Datanodes are not alive in 10 seconds!") } - async fn create_datanode( - &self, - opts: &DatanodeOptions, - meta_srv: MockInfo, - ) -> (Arc, Option) { - let (instance, heartbeat) = DatanodeInstance::with_mock_meta_server(opts, meta_srv) + async fn create_datanode(&self, opts: DatanodeOptions, meta_srv: MockInfo) -> Datanode { + let mut meta_client = MetaClientBuilder::new(1000, opts.node_id.unwrap(), Role::Datanode) + .enable_router() + .enable_store() + .enable_heartbeat() + .channel_manager(meta_srv.channel_manager) + .build(); + meta_client.start(&[&meta_srv.server_addr]).await.unwrap(); + + let datanode = DatanodeBuilder::new(opts, Arc::new(Plugins::default())) + .with_meta_client(meta_client) + .build() .await .unwrap(); - instance.start().await.unwrap(); - if let Some(heartbeat) = heartbeat.as_ref() { - heartbeat.start().await.unwrap(); - } - (instance, heartbeat) + + datanode.start_heartbeat().await.unwrap(); + + datanode } async fn build_frontend( @@ -262,12 +242,12 @@ impl GreptimeDbClusterBuilder { async fn build_datanode_clients( clients: Arc, - instances: &HashMap>, + instances: &HashMap, datanodes: u32, ) { for i in 0..datanodes { let datanode_id = i as u64 + 1; - let instance = instances.get(&datanode_id).cloned().unwrap(); + let instance = instances.get(&datanode_id).unwrap(); let (addr, client) = create_datanode_client(instance).await; clients .insert_client(Peer::new(datanode_id, addr), client) @@ -275,7 +255,7 @@ async fn build_datanode_clients( } } -async fn create_datanode_client(datanode_instance: Arc) -> (String, Client) { +async fn create_datanode_client(datanode: &Datanode) -> (String, Client) { let (client, server) = tokio::io::duplex(1024); let runtime = Arc::new( @@ -286,25 +266,20 @@ async fn create_datanode_client(datanode_instance: Arc) -> (St .unwrap(), ); - // create a mock datanode grpc service, see example here: - // https://github.com/hyperium/tonic/blob/master/examples/src/mock/mock.rs - let query_handler = Arc::new(GreptimeRequestHandler::new( - ServerGrpcQueryHandlerAdaptor::arc(datanode_instance.clone()), - None, - runtime.clone(), - )); + let flight_handler = Some(Arc::new(datanode.region_server()) as _); + let region_server_handler = Some(Arc::new(datanode.region_server()) as _); let grpc_server = GrpcServer::new( - Some(ServerGrpcQueryHandlerAdaptor::arc(datanode_instance)), None, - Some(query_handler), None, + flight_handler, + region_server_handler, None, runtime, ); let _handle = tokio::spawn(async move { Server::builder() .add_service(grpc_server.create_flight_service()) - .add_service(grpc_server.create_database_service()) + .add_service(grpc_server.create_region_service()) .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)])) .await }); diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index 672e40261a92..85940f3a366c 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -20,6 +20,7 @@ mod test { use api::v1::ddl_request::Expr as DdlExpr; use api::v1::greptime_request::Request; use api::v1::query_request::Query; + use api::v1::region::QueryRequest as RegionQueryRequest; use api::v1::{ alter_expr, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef, CreateDatabaseExpr, CreateTableExpr, DdlRequest, DeleteRequest, DeleteRequests, @@ -31,9 +32,13 @@ mod test { use common_recordbatch::RecordBatches; use frontend::instance::Instance; use query::parser::QueryLanguageParser; + use query::plan::LogicalPlan; use servers::query_handler::grpc::GrpcQueryHandler; use session::context::QueryContext; + use store_api::storage::RegionId; + use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; + use crate::standalone::GreptimeDbStandaloneBuilder; use crate::tests; use crate::tests::MockDistributedInstance; @@ -49,8 +54,9 @@ mod test { #[tokio::test(flavor = "multi_thread")] async fn test_standalone_handle_ddl_request() { - let standalone = - tests::create_standalone_instance("test_standalone_handle_ddl_request").await; + let standalone = GreptimeDbStandaloneBuilder::new("test_standalone_handle_ddl_request") + .build() + .await; let instance = &standalone.instance; test_handle_ddl_request(instance.as_ref()).await; @@ -81,15 +87,17 @@ mod test { column_defs: vec![ ColumnDef { name: "a".to_string(), - datatype: ColumnDataType::String as _, + data_type: ColumnDataType::String as _, is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Field as i32, }, ColumnDef { name: "ts".to_string(), - datatype: ColumnDataType::TimestampMillisecond as _, + data_type: ColumnDataType::TimestampMillisecond as _, is_nullable: false, default_constraint: vec![], + semantic_type: SemanticType::Timestamp as i32, }, ], time_index: "ts".to_string(), @@ -109,15 +117,14 @@ mod test { add_columns: vec![AddColumn { column_def: Some(ColumnDef { name: "b".to_string(), - datatype: ColumnDataType::Int32 as _, + data_type: ColumnDataType::Int32 as _, is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Field as i32, }), - is_key: false, location: None, }], })), - ..Default::default() })), }); let output = query(instance, request).await; @@ -161,18 +168,17 @@ mod test { } async fn verify_table_is_dropped(instance: &MockDistributedInstance) { - for (_, dn) in instance.datanodes().iter() { - assert!(dn - .catalog_manager() - .table( - "greptime", - "database_created_through_grpc", - "table_created_through_grpc" - ) - .await - .unwrap() - .is_none()); - } + assert!(instance + .frontend() + .catalog_manager() + .table( + "greptime", + "database_created_through_grpc", + "table_created_through_grpc" + ) + .await + .unwrap() + .is_none()); } #[tokio::test(flavor = "multi_thread")] @@ -277,10 +283,9 @@ CREATE TABLE {table_name} ( #[tokio::test(flavor = "multi_thread")] async fn test_standalone_insert_and_query() { - common_telemetry::init_default_ut_logging(); - - let standalone = - tests::create_standalone_instance("test_standalone_insert_and_query").await; + let standalone = GreptimeDbStandaloneBuilder::new("test_standalone_insert_and_query") + .build() + .await; let instance = &standalone.instance; let table_name = "my_table"; @@ -518,22 +523,31 @@ CREATE TABLE {table_name} ( .collect::>(); assert_eq!(region_to_dn_map.len(), expected_distribution.len()); - for (region, dn) in region_to_dn_map.iter() { - let stmt = QueryLanguageParser::parse_sql(&format!( - "SELECT ts, a, b FROM {table_name} ORDER BY ts" - )) + let stmt = QueryLanguageParser::parse_sql(&format!( + "SELECT ts, a, b FROM {table_name} ORDER BY ts" + )) + .unwrap(); + let LogicalPlan::DfPlan(plan) = instance + .frontend() + .statement_executor() + .plan(stmt, QueryContext::arc()) + .await .unwrap(); - let dn = instance.datanodes().get(dn).unwrap(); - let engine = dn.query_engine(); - let plan = engine - .planner() - .plan(stmt, QueryContext::arc()) + let plan = DFLogicalSubstraitConvertor.encode(&plan).unwrap(); + + for (region, dn) in region_to_dn_map.iter() { + let region_server = instance.datanodes().get(dn).unwrap().region_server(); + + let region_id = RegionId::new(table_id, *region); + + let stream = region_server + .handle_read(RegionQueryRequest { + region_id: region_id.as_u64(), + plan: plan.to_vec(), + }) .await .unwrap(); - let output = engine.execute(plan, QueryContext::arc()).await.unwrap(); - let Output::Stream(stream) = output else { - unreachable!() - }; + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); let actual = recordbatches.pretty_print().unwrap(); @@ -680,9 +694,9 @@ CREATE TABLE {table_name} ( #[tokio::test(flavor = "multi_thread")] async fn test_promql_query() { - common_telemetry::init_default_ut_logging(); - - let standalone = tests::create_standalone_instance("test_standalone_promql_query").await; + let standalone = GreptimeDbStandaloneBuilder::new("test_standalone_promql_query") + .build() + .await; let instance = &standalone.instance; let table_name = "my_table"; diff --git a/tests-integration/src/influxdb.rs b/tests-integration/src/influxdb.rs index aa3f319e663b..8848c0a67904 100644 --- a/tests-integration/src/influxdb.rs +++ b/tests-integration/src/influxdb.rs @@ -24,12 +24,14 @@ mod test { use servers::query_handler::InfluxdbLineProtocolHandler; use session::context::QueryContext; + use crate::standalone::GreptimeDbStandaloneBuilder; use crate::tests; #[tokio::test(flavor = "multi_thread")] async fn test_standalone_put_influxdb_lines() { - let standalone = - tests::create_standalone_instance("test_standalone_put_influxdb_lines").await; + let standalone = GreptimeDbStandaloneBuilder::new("test_standalone_put_influxdb_lines") + .build() + .await; let instance = &standalone.instance; test_put_influxdb_lines(instance).await; @@ -44,8 +46,11 @@ mod test { #[tokio::test(flavor = "multi_thread")] async fn test_standalone_put_influxdb_lines_without_time_column() { - let standalone = - tests::create_standalone_instance("test_standalone_put_influxdb_lines").await; + let standalone = GreptimeDbStandaloneBuilder::new( + "test_standalone_put_influxdb_lines_without_time_column", + ) + .build() + .await; test_put_influxdb_lines_without_time_column(&standalone.instance).await; } diff --git a/tests-integration/src/instance.rs b/tests-integration/src/instance.rs index d97f4e78dfdd..4740e03aa02f 100644 --- a/tests-integration/src/instance.rs +++ b/tests-integration/src/instance.rs @@ -19,6 +19,7 @@ mod tests { use std::sync::atomic::AtomicU32; use std::sync::Arc; + use api::v1::region::QueryRequest; use common_base::Plugins; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_meta::key::table_name::TableNameKey; @@ -29,17 +30,23 @@ mod tests { use frontend::error::{self, Error, Result}; use frontend::instance::Instance; use query::parser::QueryLanguageParser; + use query::plan::LogicalPlan; use servers::interceptor::{SqlQueryInterceptor, SqlQueryInterceptorRef}; use servers::query_handler::sql::SqlQueryHandler; use session::context::{QueryContext, QueryContextRef}; use sql::statements::statement::Statement; + use store_api::storage::RegionId; + use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; + use crate::standalone::GreptimeDbStandaloneBuilder; use crate::tests; use crate::tests::MockDistributedInstance; #[tokio::test(flavor = "multi_thread")] async fn test_standalone_exec_sql() { - let standalone = tests::create_standalone_instance("test_standalone_exec_sql").await; + let standalone = GreptimeDbStandaloneBuilder::new("test_standalone_exec_sql") + .build() + .await; let instance = standalone.instance.as_ref(); let sql = r#" @@ -216,18 +223,27 @@ mod tests { assert_eq!(region_to_dn_map.len(), expected_distribution.len()); let stmt = QueryLanguageParser::parse_sql("SELECT ts, host FROM demo ORDER BY ts").unwrap(); + let LogicalPlan::DfPlan(plan) = instance + .frontend() + .statement_executor() + .plan(stmt, QueryContext::arc()) + .await + .unwrap(); + let plan = DFLogicalSubstraitConvertor.encode(&plan).unwrap(); + for (region, dn) in region_to_dn_map.iter() { - let dn = instance.datanodes().get(dn).unwrap(); - let engine = dn.query_engine(); - let plan = engine - .planner() - .plan(stmt.clone(), QueryContext::arc()) + let region_server = instance.datanodes().get(dn).unwrap().region_server(); + + let region_id = RegionId::new(table_id, *region); + + let stream = region_server + .handle_read(QueryRequest { + region_id: region_id.as_u64(), + plan: plan.to_vec(), + }) .await .unwrap(); - let output = engine.execute(plan, QueryContext::arc()).await.unwrap(); - let Output::Stream(stream) = output else { - unreachable!() - }; + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); let actual = recordbatches.pretty_print().unwrap(); @@ -246,14 +262,13 @@ mod tests { } async fn verify_table_is_dropped(instance: &MockDistributedInstance) { - for (_, dn) in instance.datanodes().iter() { - assert!(dn - .catalog_manager() - .table("greptime", "public", "demo") - .await - .unwrap() - .is_none()) - } + assert!(instance + .frontend() + .catalog_manager() + .table("greptime", "public", "demo") + .await + .unwrap() + .is_none()) } #[tokio::test(flavor = "multi_thread")] @@ -314,13 +329,15 @@ mod tests { } } - let standalone = tests::create_standalone_instance("test_hook").await; - let mut instance = standalone.instance; - let plugins = Plugins::new(); let counter_hook = Arc::new(AssertionHook::default()); plugins.insert::>(counter_hook.clone()); - Arc::make_mut(&mut instance).set_plugins(Arc::new(plugins)); + + let standalone = GreptimeDbStandaloneBuilder::new("test_sql_interceptor_plugin") + .with_plugin(plugins) + .build() + .await; + let instance = standalone.instance; let sql = r#"CREATE TABLE demo( host STRING, @@ -374,13 +391,15 @@ mod tests { let query_ctx = QueryContext::arc(); - let standalone = tests::create_standalone_instance("test_db_hook").await; - let mut instance = standalone.instance; - let plugins = Plugins::new(); let hook = Arc::new(DisableDBOpHook); plugins.insert::>(hook.clone()); - Arc::make_mut(&mut instance).set_plugins(Arc::new(plugins)); + + let standalone = GreptimeDbStandaloneBuilder::new("test_disable_db_operation_plugin") + .with_plugin(plugins) + .build() + .await; + let instance = standalone.instance; let sql = r#"CREATE TABLE demo( host STRING, diff --git a/tests-integration/src/lib.rs b/tests-integration/src/lib.rs index 46981cf43931..b0b28ba4651c 100644 --- a/tests-integration/src/lib.rs +++ b/tests-integration/src/lib.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod catalog; pub mod cluster; mod grpc; mod influxdb; @@ -22,7 +21,7 @@ mod otlp; mod prom_store; pub mod test_util; -// TODO(LFC): Refactor: move instance structs out of mod "tests", like the `GreptimeDbCluster`. +mod standalone; #[cfg(test)] mod tests; diff --git a/tests-integration/src/opentsdb.rs b/tests-integration/src/opentsdb.rs index 59c473d7e3ff..a66786e4fb0e 100644 --- a/tests-integration/src/opentsdb.rs +++ b/tests-integration/src/opentsdb.rs @@ -25,11 +25,14 @@ mod tests { use servers::query_handler::OpentsdbProtocolHandler; use session::context::QueryContext; + use crate::standalone::GreptimeDbStandaloneBuilder; use crate::tests; #[tokio::test(flavor = "multi_thread")] async fn test_standalone_exec() { - let standalone = tests::create_standalone_instance("test_standalone_exec").await; + let standalone = GreptimeDbStandaloneBuilder::new("test_standalone_exec") + .build() + .await; let instance = &standalone.instance; test_exec(instance).await; diff --git a/tests-integration/src/otlp.rs b/tests-integration/src/otlp.rs index 6cbce7d76721..437d8256b49f 100644 --- a/tests-integration/src/otlp.rs +++ b/tests-integration/src/otlp.rs @@ -30,11 +30,14 @@ mod test { use servers::query_handler::OpenTelemetryProtocolHandler; use session::context::QueryContext; + use crate::standalone::GreptimeDbStandaloneBuilder; use crate::tests; #[tokio::test(flavor = "multi_thread")] pub async fn test_otlp_on_standalone() { - let standalone = tests::create_standalone_instance("test_standalone_otlp").await; + let standalone = GreptimeDbStandaloneBuilder::new("test_standalone_otlp") + .build() + .await; let instance = &standalone.instance; test_otlp(instance).await; diff --git a/tests-integration/src/prom_store.rs b/tests-integration/src/prom_store.rs index 048501c6f38e..c3186c9af199 100644 --- a/tests-integration/src/prom_store.rs +++ b/tests-integration/src/prom_store.rs @@ -28,12 +28,14 @@ mod tests { use servers::query_handler::PromStoreProtocolHandler; use session::context::QueryContext; + use crate::standalone::GreptimeDbStandaloneBuilder; use crate::tests; #[tokio::test(flavor = "multi_thread")] async fn test_standalone_prom_store_remote_rw() { - let standalone = - tests::create_standalone_instance("test_standalone_prom_store_remote_rw").await; + let standalone = GreptimeDbStandaloneBuilder::new("test_standalone_prom_store_remote_rw") + .build() + .await; let instance = &standalone.instance; test_prom_store_remote_rw(instance).await; diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs new file mode 100644 index 000000000000..50269ada5253 --- /dev/null +++ b/tests-integration/src/standalone.rs @@ -0,0 +1,112 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use catalog::remote::DummyKvCacheInvalidator; +use common_base::Plugins; +use common_config::KvStoreConfig; +use common_procedure::options::ProcedureConfig; +use datanode::datanode::builder::DatanodeBuilder; +use datanode::datanode::DatanodeOptions; +use frontend::catalog::FrontendCatalogManager; +use frontend::instance::{Instance, StandaloneDatanodeManager}; + +use crate::test_util::{create_tmp_dir_and_datanode_opts, StorageType, TestGuard}; + +pub struct GreptimeDbStandalone { + pub instance: Arc, + pub datanode_opts: DatanodeOptions, + pub guard: TestGuard, +} + +pub struct GreptimeDbStandaloneBuilder { + instance_name: String, + store_type: Option, + plugin: Option, +} + +impl GreptimeDbStandaloneBuilder { + pub fn new(instance_name: &str) -> Self { + Self { + instance_name: instance_name.to_string(), + store_type: None, + plugin: None, + } + } + + pub fn with_store_type(self, store_type: StorageType) -> Self { + Self { + store_type: Some(store_type), + ..self + } + } + + #[cfg(test)] + pub fn with_plugin(self, plugin: Plugins) -> Self { + Self { + plugin: Some(plugin), + ..self + } + } + + pub async fn build(self) -> GreptimeDbStandalone { + let store_type = self.store_type.unwrap_or(StorageType::File); + + let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, &self.instance_name); + + let (kv_store, procedure_manager) = Instance::try_build_standalone_components( + format!("{}/kv", &opts.storage.data_home), + KvStoreConfig::default(), + ProcedureConfig::default(), + ) + .await + .unwrap(); + + let plugins = Arc::new(self.plugin.unwrap_or_default()); + + let datanode = DatanodeBuilder::new(opts.clone(), plugins.clone()) + .build() + .await + .unwrap(); + + let catalog_manager = FrontendCatalogManager::new( + kv_store.clone(), + Arc::new(DummyKvCacheInvalidator), + Arc::new(StandaloneDatanodeManager(datanode.region_server())), + ); + + catalog_manager + .table_metadata_manager_ref() + .init() + .await + .unwrap(); + + let instance = Instance::try_new_standalone( + kv_store, + procedure_manager, + catalog_manager, + plugins, + datanode.region_server(), + ) + .await + .unwrap(); + + GreptimeDbStandalone { + instance: Arc::new(instance), + datanode_opts: opts, + guard, + } + } +} diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 9727980ff7db..059ba92cebe9 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -21,29 +21,21 @@ use std::time::Duration; use auth::UserProviderRef; use axum::Router; -use catalog::{CatalogManagerRef, RegisterTableRequest}; -use common_catalog::consts::{ - DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID, MITO_ENGINE, -}; +use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_query::Output; use common_recordbatch::util; use common_runtime::Builder as RuntimeBuilder; use common_test_util::ports; use common_test_util::temp_dir::{create_temp_dir, TempDir}; use datanode::datanode::{ - AzblobConfig, DatanodeOptions, FileConfig, GcsConfig, ObjectStoreConfig, OssConfig, - ProcedureConfig, S3Config, StorageConfig, WalConfig, + AzblobConfig, DatanodeOptions, FileConfig, GcsConfig, ObjectStoreConfig, OssConfig, S3Config, + StorageConfig, }; -use datanode::error::{CreateTableSnafu, Result}; -use datanode::instance::Instance; -use datanode::sql::SqlHandler; -use datatypes::data_type::ConcreteDataType; use datatypes::scalars::ScalarVectorBuilder; -use datatypes::schema::{ColumnSchema, RawSchema}; use datatypes::vectors::{ Float64VectorBuilder, MutableVector, StringVectorBuilder, TimestampMillisecondVectorBuilder, }; -use frontend::instance::Instance as FeInstance; +use frontend::instance::Instance; use frontend::service_config::{MysqlOptions, PostgresOptions}; use object_store::services::{Azblob, Gcs, Oss, S3}; use object_store::test_util::TempFolder; @@ -56,12 +48,13 @@ use servers::metrics_handler::MetricsHandler; use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef}; use servers::postgres::PostgresServer; use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor; -use servers::query_handler::sql::ServerSqlQueryHandlerAdaptor; +use servers::query_handler::sql::{ServerSqlQueryHandlerAdaptor, SqlQueryHandler}; use servers::server::Server; use servers::Mode; -use snafu::ResultExt; -use table::engine::{EngineContext, TableEngineRef}; -use table::requests::{CreateTableRequest, InsertRequest, TableOptions}; +use session::context::QueryContext; +use table::requests::InsertRequest; + +use crate::standalone::{GreptimeDbStandalone, GreptimeDbStandaloneBuilder}; #[derive(Debug, Clone, Copy, Eq, PartialEq)] pub enum StorageType { @@ -254,18 +247,16 @@ pub enum TempDirGuard { pub struct TestGuard { pub home_guard: FileDirGuard, - pub wal_guard: FileDirGuard, pub storage_guard: StorageGuard, } pub struct FileDirGuard { pub temp_dir: TempDir, - pub is_wal: bool, } impl FileDirGuard { - pub fn new(temp_dir: TempDir, is_wal: bool) -> Self { - Self { temp_dir, is_wal } + pub fn new(temp_dir: TempDir) -> Self { + Self { temp_dir } } } @@ -288,124 +279,77 @@ pub fn create_tmp_dir_and_datanode_opts( name: &str, ) -> (DatanodeOptions, TestGuard) { let home_tmp_dir = create_temp_dir(&format!("gt_data_{name}")); - let wal_tmp_dir = create_temp_dir(&format!("gt_wal_{name}")); let home_dir = home_tmp_dir.path().to_str().unwrap().to_string(); - let wal_dir = wal_tmp_dir.path().to_str().unwrap().to_string(); let (store, data_tmp_dir) = get_test_store_config(&store_type); - let opts = create_datanode_opts(store, home_dir, wal_dir); + let opts = create_datanode_opts(store, home_dir); ( opts, TestGuard { - home_guard: FileDirGuard::new(home_tmp_dir, false), - wal_guard: FileDirGuard::new(wal_tmp_dir, true), + home_guard: FileDirGuard::new(home_tmp_dir), storage_guard: StorageGuard(data_tmp_dir), }, ) } -pub fn create_datanode_opts( - store: ObjectStoreConfig, - home_dir: String, - wal_dir: String, -) -> DatanodeOptions { +pub(crate) fn create_datanode_opts(store: ObjectStoreConfig, home_dir: String) -> DatanodeOptions { DatanodeOptions { - wal: WalConfig { - dir: Some(wal_dir), - ..Default::default() - }, storage: StorageConfig { data_home: home_dir, store, ..Default::default() }, mode: Mode::Standalone, - procedure: ProcedureConfig::default(), ..Default::default() } } -pub async fn create_test_table( - catalog_manager: &CatalogManagerRef, - sql_handler: &SqlHandler, - ts_type: ConcreteDataType, - table_name: &str, -) -> Result<()> { - let column_schemas = vec![ - ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), - ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true), - ColumnSchema::new("memory", ConcreteDataType::float64_datatype(), true), - ColumnSchema::new("ts", ts_type, true).with_time_index(true), - ]; - let table_engine: TableEngineRef = sql_handler - .table_engine_manager() - .engine(MITO_ENGINE) - .unwrap(); - let table = table_engine - .create_table( - &EngineContext::default(), - CreateTableRequest { - id: MIN_USER_TABLE_ID, - catalog_name: "greptime".to_string(), - schema_name: "public".to_string(), - table_name: table_name.to_string(), - desc: Some(" a test table".to_string()), - schema: RawSchema::new(column_schemas), - create_if_not_exists: true, - primary_key_indices: vec![0], // "host" is in primary keys - table_options: TableOptions::default(), - region_numbers: vec![0], - engine: MITO_ENGINE.to_string(), - }, - ) - .await - .context(CreateTableSnafu { table_name })?; - - let req = RegisterTableRequest { - catalog: DEFAULT_CATALOG_NAME.to_string(), - schema: DEFAULT_SCHEMA_NAME.to_string(), - table_name: table_name.to_string(), - table_id: table.table_info().ident.table_id, - table, - }; - let _ = catalog_manager.register_table(req).await.unwrap(); - Ok(()) +pub(crate) async fn create_test_table(instance: &Instance, table_name: &str) { + let sql = format!( + r#" +CREATE TABLE IF NOT EXISTS {table_name} ( + host String NOT NULL PRIMARY KEY, + cpu DOUBLE NULL, + memory DOUBLE NULL, + ts TIMESTAMP NULL TIME INDEX +) +"# + ); + + let _ = instance.do_query(&sql, QueryContext::arc()).await; +} + +async fn setup_standalone_instance( + test_name: &str, + store_type: StorageType, +) -> GreptimeDbStandalone { + let instance = GreptimeDbStandaloneBuilder::new(test_name) + .with_store_type(store_type) + .build() + .await; + + create_test_table(instance.instance.as_ref(), "demo").await; + + instance } pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router, TestGuard) { - let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name); - let (instance, heartbeat) = Instance::with_mock_meta_client(&opts).await.unwrap(); - instance.start().await.unwrap(); - - create_test_table( - instance.catalog_manager(), - instance.sql_handler(), - ConcreteDataType::timestamp_millisecond_datatype(), - "demo", - ) - .await - .unwrap(); - let frontend_instance = FeInstance::try_new_standalone(instance.clone()) - .await - .unwrap(); - if let Some(heartbeat) = heartbeat { - heartbeat.start().await.unwrap(); - } + let instance = setup_standalone_instance(name, store_type).await; let http_opts = HttpOptions { addr: format!("127.0.0.1:{}", ports::get_port()), ..Default::default() }; let http_server = HttpServerBuilder::new(http_opts) - .with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(Arc::new( - frontend_instance, - ))) - .with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(instance.clone())) + .with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(instance.instance.clone())) + .with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc( + instance.instance.clone(), + )) .with_metrics_handler(MetricsHandler) - .with_greptime_config_options(opts.to_toml_string()) + .with_greptime_config_options(instance.datanode_opts.to_toml_string()) .build(); - (http_server.build(http_server.make_app()), guard) + (http_server.build(http_server.make_app()), instance.guard) } pub async fn setup_test_http_app_with_frontend( @@ -420,37 +364,22 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider( name: &str, user_provider: Option, ) -> (Router, TestGuard) { - let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name); - let (instance, heartbeat) = Instance::with_mock_meta_client(&opts).await.unwrap(); - let frontend = FeInstance::try_new_standalone(instance.clone()) - .await - .unwrap(); - instance.start().await.unwrap(); - if let Some(heartbeat) = heartbeat { - heartbeat.start().await.unwrap(); - } - create_test_table( - frontend.catalog_manager(), - instance.sql_handler(), - ConcreteDataType::timestamp_millisecond_datatype(), - "demo", - ) - .await - .unwrap(); + let instance = setup_standalone_instance(name, store_type).await; let http_opts = HttpOptions { addr: format!("127.0.0.1:{}", ports::get_port()), ..Default::default() }; - let frontend_ref = Arc::new(frontend); let mut http_server = HttpServerBuilder::new(http_opts); http_server - .with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(frontend_ref.clone())) - .with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(frontend_ref.clone())) - .with_script_handler(frontend_ref) - .with_greptime_config_options(opts.to_toml_string()); + .with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(instance.instance.clone())) + .with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc( + instance.instance.clone(), + )) + .with_script_handler(instance.instance.clone()) + .with_greptime_config_options(instance.datanode_opts.to_toml_string()); if let Some(user_provider) = user_provider { http_server.with_user_provider(user_provider); @@ -459,7 +388,7 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider( let http_server = http_server.build(); let app = http_server.build(http_server.make_app()); - (app, guard) + (app, instance.guard) } fn mock_insert_request(host: &str, cpu: f64, memory: f64, ts: i64) -> InsertRequest { @@ -500,25 +429,11 @@ pub async fn setup_test_prom_app_with_frontend( name: &str, ) -> (Router, TestGuard) { std::env::set_var("TZ", "UTC"); - let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name); - let (instance, heartbeat) = Instance::with_mock_meta_client(&opts).await.unwrap(); - let frontend = FeInstance::try_new_standalone(instance.clone()) - .await - .unwrap(); - instance.start().await.unwrap(); - if let Some(heartbeat) = heartbeat { - heartbeat.start().await.unwrap(); - } - create_test_table( - frontend.catalog_manager(), - instance.sql_handler(), - ConcreteDataType::timestamp_millisecond_datatype(), - "demo", - ) - .await - .unwrap(); - let demo = frontend + let instance = setup_standalone_instance(name, store_type).await; + + let demo = instance + .instance .catalog_manager() .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "demo") .await @@ -538,17 +453,17 @@ pub async fn setup_test_prom_app_with_frontend( addr: format!("127.0.0.1:{}", ports::get_port()), ..Default::default() }; - let frontend_ref = Arc::new(frontend); + let frontend_ref = instance.instance.clone(); let http_server = HttpServerBuilder::new(http_opts) .with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(frontend_ref.clone())) .with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(frontend_ref.clone())) .with_script_handler(frontend_ref.clone()) .with_prom_handler(frontend_ref.clone()) .with_prometheus_handler(frontend_ref) - .with_greptime_config_options(opts.to_toml_string()) + .with_greptime_config_options(instance.datanode_opts.to_toml_string()) .build(); let app = http_server.build(http_server.make_app()); - (app, guard) + (app, instance.guard) } pub async fn setup_grpc_server( @@ -563,10 +478,7 @@ pub async fn setup_grpc_server_with_user_provider( name: &str, user_provider: Option, ) -> (String, TestGuard, Arc) { - common_telemetry::init_default_ut_logging(); - - let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name); - let (instance, heartbeat) = Instance::with_mock_meta_client(&opts).await.unwrap(); + let instance = setup_standalone_instance(name, store_type).await; let runtime = Arc::new( RuntimeBuilder::default() @@ -576,14 +488,7 @@ pub async fn setup_grpc_server_with_user_provider( .unwrap(), ); - let fe_instance = FeInstance::try_new_standalone(instance.clone()) - .await - .unwrap(); - instance.start().await.unwrap(); - if let Some(heartbeat) = heartbeat { - heartbeat.start().await.unwrap(); - } - let fe_instance_ref = Arc::new(fe_instance); + let fe_instance_ref = instance.instance.clone(); let flight_handler = Arc::new(GreptimeRequestHandler::new( ServerGrpcQueryHandlerAdaptor::arc(fe_instance_ref.clone()), user_provider.clone(), @@ -608,7 +513,7 @@ pub async fn setup_grpc_server_with_user_provider( // wait for GRPC server to start tokio::time::sleep(Duration::from_secs(1)).await; - (fe_grpc_addr, guard, fe_grpc_server) + (fe_grpc_addr, instance.guard, fe_grpc_server) } pub async fn check_output_stream(output: Output, expected: &str) { @@ -633,10 +538,7 @@ pub async fn setup_mysql_server_with_user_provider( name: &str, user_provider: Option, ) -> (String, TestGuard, Arc>) { - common_telemetry::init_default_ut_logging(); - - let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name); - let (instance, heartbeat) = Instance::with_mock_meta_client(&opts).await.unwrap(); + let instance = setup_standalone_instance(name, store_type).await; let runtime = Arc::new( RuntimeBuilder::default() @@ -648,14 +550,7 @@ pub async fn setup_mysql_server_with_user_provider( let fe_mysql_addr = format!("127.0.0.1:{}", ports::get_port()); - let fe_instance = FeInstance::try_new_standalone(instance.clone()) - .await - .unwrap(); - instance.start().await.unwrap(); - if let Some(heartbeat) = heartbeat { - heartbeat.start().await.unwrap(); - } - let fe_instance_ref = Arc::new(fe_instance); + let fe_instance_ref = instance.instance.clone(); let opts = MysqlOptions { addr: fe_mysql_addr.clone(), ..Default::default() @@ -682,7 +577,7 @@ pub async fn setup_mysql_server_with_user_provider( tokio::time::sleep(Duration::from_secs(1)).await; - (fe_mysql_addr, guard, fe_mysql_server) + (fe_mysql_addr, instance.guard, fe_mysql_server) } pub async fn setup_pg_server( @@ -697,10 +592,7 @@ pub async fn setup_pg_server_with_user_provider( name: &str, user_provider: Option, ) -> (String, TestGuard, Arc>) { - common_telemetry::init_default_ut_logging(); - - let (opts, guard) = create_tmp_dir_and_datanode_opts(store_type, name); - let (instance, heartbeat) = Instance::with_mock_meta_client(&opts).await.unwrap(); + let instance = setup_standalone_instance(name, store_type).await; let runtime = Arc::new( RuntimeBuilder::default() @@ -712,14 +604,7 @@ pub async fn setup_pg_server_with_user_provider( let fe_pg_addr = format!("127.0.0.1:{}", ports::get_port()); - let fe_instance = FeInstance::try_new_standalone(instance.clone()) - .await - .unwrap(); - instance.start().await.unwrap(); - if let Some(heartbeat) = heartbeat { - heartbeat.start().await.unwrap(); - } - let fe_instance_ref = Arc::new(fe_instance); + let fe_instance_ref = instance.instance.clone(); let opts = PostgresOptions { addr: fe_pg_addr.clone(), ..Default::default() @@ -740,5 +625,5 @@ pub async fn setup_pg_server_with_user_provider( tokio::time::sleep(Duration::from_secs(1)).await; - (fe_pg_addr, guard, fe_pg_server) + (fe_pg_addr, instance.guard, fe_pg_server) } diff --git a/tests-integration/src/tests.rs b/tests-integration/src/tests.rs index 1a6bc1e4135b..2781d04b2f86 100644 --- a/tests-integration/src/tests.rs +++ b/tests-integration/src/tests.rs @@ -19,13 +19,11 @@ mod test_util; use std::collections::HashMap; use std::sync::Arc; -use catalog::RegisterSchemaRequest; use common_meta::key::TableMetadataManagerRef; -use datanode::instance::Instance as DatanodeInstance; +use datanode::datanode::Datanode; use frontend::instance::Instance; use crate::cluster::{GreptimeDbCluster, GreptimeDbClusterBuilder}; -use crate::test_util::{create_tmp_dir_and_datanode_opts, StorageType, TestGuard}; pub struct MockDistributedInstance(GreptimeDbCluster); @@ -34,7 +32,7 @@ impl MockDistributedInstance { self.0.frontend.clone() } - pub fn datanodes(&self) -> &HashMap> { + pub fn datanodes(&self) -> &HashMap { &self.0.datanode_instances } @@ -43,48 +41,6 @@ impl MockDistributedInstance { } } -pub struct MockStandaloneInstance { - pub instance: Arc, - _guard: TestGuard, -} - -pub(crate) async fn create_standalone_instance(test_name: &str) -> MockStandaloneInstance { - let (opts, guard) = create_tmp_dir_and_datanode_opts(StorageType::File, test_name); - let (dn_instance, heartbeat) = DatanodeInstance::with_opts(&opts, Default::default()) - .await - .unwrap(); - - let frontend_instance = Instance::try_new_standalone(dn_instance.clone()) - .await - .unwrap(); - - dn_instance.start().await.unwrap(); - - assert!(dn_instance - .catalog_manager() - .clone() - .register_catalog("another_catalog".to_string()) - .await - .is_ok()); - let req = RegisterSchemaRequest { - catalog: "another_catalog".to_string(), - schema: "another_schema".to_string(), - }; - assert!(dn_instance - .catalog_manager() - .register_schema(req) - .await - .is_ok()); - - if let Some(heartbeat) = heartbeat { - heartbeat.start().await.unwrap(); - }; - MockStandaloneInstance { - instance: Arc::new(frontend_instance), - _guard: guard, - } -} - pub async fn create_distributed_instance(test_name: &str) -> MockDistributedInstance { let cluster = GreptimeDbClusterBuilder::new(test_name).build().await; MockDistributedInstance(cluster) diff --git a/tests-integration/src/tests/test_util.rs b/tests-integration/src/tests/test_util.rs index 2d02f4a47a98..40736f25385e 100644 --- a/tests-integration/src/tests/test_util.rs +++ b/tests-integration/src/tests/test_util.rs @@ -20,10 +20,8 @@ use common_test_util::find_workspace_path; use frontend::instance::Instance; use rstest_reuse::{self, template}; -use crate::tests::{ - create_distributed_instance, create_standalone_instance, MockDistributedInstance, - MockStandaloneInstance, -}; +use crate::standalone::{GreptimeDbStandalone, GreptimeDbStandaloneBuilder}; +use crate::tests::{create_distributed_instance, MockDistributedInstance}; pub(crate) trait MockInstance { fn frontend(&self) -> Arc; @@ -31,7 +29,7 @@ pub(crate) trait MockInstance { fn is_distributed_mode(&self) -> bool; } -impl MockInstance for MockStandaloneInstance { +impl MockInstance for GreptimeDbStandalone { fn frontend(&self) -> Arc { self.instance.clone() } @@ -53,7 +51,7 @@ impl MockInstance for MockDistributedInstance { pub(crate) async fn standalone() -> Arc { let test_name = uuid::Uuid::new_v4().to_string(); - let instance = create_standalone_instance(&test_name).await; + let instance = GreptimeDbStandaloneBuilder::new(&test_name).build().await; Arc::new(instance) } diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index bc3f8abf128c..48d7289a5bb8 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -250,14 +250,14 @@ pub async fn test_insert_and_select(store_type: StorageType) { //alter let add_column = ColumnDef { name: "test_column".to_string(), - datatype: ColumnDataType::Int64.into(), + data_type: ColumnDataType::Int64.into(), is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Field as i32, }; let kind = Kind::AddColumns(AddColumns { add_columns: vec![AddColumn { column_def: Some(add_column), - is_key: false, location: None, }], }); @@ -266,7 +266,6 @@ pub async fn test_insert_and_select(store_type: StorageType) { schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: "demo".to_string(), kind: Some(kind), - ..Default::default() }; let result = db.alter(expr).await.unwrap(); assert!(matches!(result, Output::AffectedRows(0))); @@ -342,27 +341,31 @@ fn testing_create_expr() -> CreateTableExpr { let column_defs = vec![ ColumnDef { name: "host".to_string(), - datatype: ColumnDataType::String as i32, + data_type: ColumnDataType::String as i32, is_nullable: false, default_constraint: vec![], + semantic_type: SemanticType::Tag as i32, }, ColumnDef { name: "cpu".to_string(), - datatype: ColumnDataType::Float64 as i32, + data_type: ColumnDataType::Float64 as i32, is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Field as i32, }, ColumnDef { name: "memory".to_string(), - datatype: ColumnDataType::Float64 as i32, + data_type: ColumnDataType::Float64 as i32, is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Field as i32, }, ColumnDef { name: "ts".to_string(), - datatype: ColumnDataType::TimestampMillisecond as i32, // timestamp + data_type: ColumnDataType::TimestampMillisecond as i32, // timestamp is_nullable: true, default_constraint: vec![], + semantic_type: SemanticType::Timestamp as i32, }, ]; CreateTableExpr { diff --git a/tests-integration/tests/region_failover.rs b/tests-integration/tests/region_failover.rs index 11f6eb1beefd..dfb45d8b9b84 100644 --- a/tests-integration/tests/region_failover.rs +++ b/tests-integration/tests/region_failover.rs @@ -19,10 +19,9 @@ use api::v1::meta::Peer; use catalog::remote::CachedMetaKvBackend; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE}; use common_meta::ident::TableIdent; -use common_meta::key::table_name::{TableNameKey, TableNameValue}; +use common_meta::key::table_name::TableNameKey; +use common_meta::key::table_route::TableRouteKey; use common_meta::key::{RegionDistribution, TableMetaKey}; -use common_meta::rpc::router::TableRoute; -use common_meta::rpc::KeyValue; use common_meta::RegionIdent; use common_procedure::{watcher, ProcedureWithId}; use common_query::Output; @@ -109,12 +108,12 @@ pub async fn test_region_failover(store_type: StorageType) { assert!(matches!(result.unwrap(), Output::AffectedRows(1))); } - let cache_key = TableNameKey::new("greptime", "public", "my_table").as_raw_key(); - - let cache = get_table_cache(&frontend, &cache_key).unwrap(); - let table_name_value = TableNameValue::try_from_raw_value(&cache.unwrap().value).unwrap(); - let table_id = table_name_value.table_id(); - assert!(get_route_cache(&frontend, table_id).is_some()); + let table_id = get_table_id( + &frontend, + TableNameKey::new("greptime", "public", "my_table"), + ) + .await; + assert!(has_route_cache(&frontend, table_id).await); let distribution = find_region_distribution(&cluster, table_id).await; info!("Find region distribution: {distribution:?}"); @@ -145,8 +144,7 @@ pub async fn test_region_failover(store_type: StorageType) { // Waits for invalidating table cache time::sleep(Duration::from_millis(100)).await; - let route_cache = get_route_cache(&frontend, table_id); - assert!(route_cache.is_none()); + assert!(!has_route_cache(&frontend, table_id).await); // Inserts data to each datanode after failover let frontend = cluster.frontend.clone(); @@ -167,33 +165,41 @@ pub async fn test_region_failover(store_type: StorageType) { assert!(success) } -fn get_table_cache(instance: &Arc, key: &[u8]) -> Option> { +async fn get_table_id(instance: &Arc, key: TableNameKey<'_>) -> TableId { let catalog_manager = instance .catalog_manager() .as_any() .downcast_ref::() .unwrap(); - let kvbackend = catalog_manager.backend(); - - let kvbackend = kvbackend - .as_any() - .downcast_ref::() - .unwrap(); - let cache = kvbackend.cache(); - - Some(cache.get(key)) + catalog_manager + .table_metadata_manager_ref() + .table_name_manager() + .get(key) + .await + .unwrap() + .unwrap() + .table_id() } -fn get_route_cache(instance: &Arc, table_id: TableId) -> Option> { +async fn has_route_cache(instance: &Arc, table_id: TableId) -> bool { let catalog_manager = instance .catalog_manager() .as_any() .downcast_ref::() .unwrap(); - let pm = catalog_manager.partition_manager(); - let cache = pm.table_routes().cache(); - cache.get(&table_id) + + let kv_backend = catalog_manager.table_metadata_manager_ref().kv_backend(); + + let cache = kv_backend + .as_any() + .downcast_ref::() + .unwrap() + .cache(); + + cache + .get(TableRouteKey::new(table_id).as_raw_key().as_slice()) + .is_some() } async fn write_datas(instance: &Arc, ts: u64) -> Vec> { @@ -354,6 +360,7 @@ async fn run_region_failover_procedure( let procedure = RegionFailoverProcedure::new( failed_region.clone(), RegionFailoverContext { + region_lease_secs: 10, in_memory: meta_srv.in_memory().clone(), mailbox: meta_srv.mailbox().clone(), selector,