From c149c123c3f49880ddee0977448b316512e3b167 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 15 Sep 2023 08:30:20 -0500 Subject: [PATCH] feat: reopen corresponding regions on starting datanode (#2399) * separate config and datanode impl Signed-off-by: Ruihang Xia * partial implement of fetching region id list Signed-off-by: Ruihang Xia * reopen all regions on starting region server Signed-off-by: Ruihang Xia * update sqlness & assign default datanode id Signed-off-by: Ruihang Xia * set writable on lease Signed-off-by: Ruihang Xia * apply cr suggs. Signed-off-by: Ruihang Xia * Update src/datanode/src/datanode.rs Co-authored-by: Yingwen --------- Signed-off-by: Ruihang Xia Co-authored-by: dennis zhuang Co-authored-by: Yingwen --- src/cmd/src/datanode.rs | 10 +- src/cmd/src/options.rs | 5 +- src/cmd/src/standalone.rs | 16 +- src/datanode/src/alive_keeper.rs | 5 + src/datanode/src/config.rs | 405 ++++++++++++ src/datanode/src/datanode.rs | 608 ++++++------------ src/datanode/src/datanode/builder.rs | 98 --- src/datanode/src/error.rs | 24 +- src/datanode/src/heartbeat.rs | 2 +- src/datanode/src/lib.rs | 1 + src/datanode/src/region_server.rs | 11 + src/datanode/src/server.rs | 2 +- src/datanode/src/store.rs | 2 +- src/datanode/src/store/azblob.rs | 2 +- src/datanode/src/store/fs.rs | 2 +- src/datanode/src/store/gcs.rs | 2 +- src/datanode/src/store/oss.rs | 2 +- src/datanode/src/store/s3.rs | 2 +- .../tql-explain-analyze/analyze.result | 3 +- .../tql-explain-analyze/analyze.sql | 1 + .../tql-explain-analyze/explain.result | 31 +- .../tql-explain-analyze/explain.sql | 1 + 22 files changed, 678 insertions(+), 557 deletions(-) create mode 100644 src/datanode/src/config.rs delete mode 100644 src/datanode/src/datanode/builder.rs diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index a5e39303c810..928433ee345e 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -16,8 +16,8 @@ use std::time::Duration; use clap::Parser; use common_telemetry::logging; -use datanode::datanode::builder::DatanodeBuilder; -use datanode::datanode::{Datanode, DatanodeOptions}; +use datanode::config::DatanodeOptions; +use datanode::datanode::{Datanode, DatanodeBuilder}; use meta_client::MetaClientOptions; use servers::Mode; use snafu::ResultExt; @@ -163,7 +163,7 @@ impl StartCommand { logging::info!("Datanode start command: {:#?}", self); logging::info!("Datanode options: {:#?}", opts); - let datanode = DatanodeBuilder::new(opts, Default::default()) + let datanode = DatanodeBuilder::new(opts, None, Default::default()) .build() .await .context(StartDatanodeSnafu)?; @@ -179,9 +179,7 @@ mod tests { use common_base::readable_size::ReadableSize; use common_test_util::temp_dir::create_named_temp_file; - use datanode::datanode::{ - CompactionConfig, FileConfig, ObjectStoreConfig, RegionManifestConfig, - }; + use datanode::config::{CompactionConfig, FileConfig, ObjectStoreConfig, RegionManifestConfig}; use servers::Mode; use super::*; diff --git a/src/cmd/src/options.rs b/src/cmd/src/options.rs index 0f87c135adb2..7154289ccf3a 100644 --- a/src/cmd/src/options.rs +++ b/src/cmd/src/options.rs @@ -15,7 +15,7 @@ use common_config::KvStoreConfig; use common_telemetry::logging::LoggingOptions; use config::{Config, Environment, File, FileFormat}; -use datanode::datanode::{DatanodeOptions, ProcedureConfig}; +use datanode::config::{DatanodeOptions, ProcedureConfig}; use frontend::frontend::FrontendOptions; use meta_srv::metasrv::MetaSrvOptions; use serde::{Deserialize, Serialize}; @@ -26,6 +26,7 @@ use crate::error::{LoadLayeredConfigSnafu, Result}; pub const ENV_VAR_SEP: &str = "__"; pub const ENV_LIST_SEP: &str = ","; +/// Options mixed up from datanode, frontend and metasrv. pub struct MixOptions { pub data_home: String, pub procedure_cfg: ProcedureConfig, @@ -119,7 +120,7 @@ mod tests { use std::time::Duration; use common_test_util::temp_dir::create_named_temp_file; - use datanode::datanode::{DatanodeOptions, ObjectStoreConfig}; + use datanode::config::{DatanodeOptions, ObjectStoreConfig}; use super::*; diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index da0870362290..be3567570e09 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -23,8 +23,8 @@ use common_meta::kv_backend::KvBackendRef; use common_procedure::ProcedureManagerRef; use common_telemetry::info; use common_telemetry::logging::LoggingOptions; -use datanode::datanode::builder::DatanodeBuilder; -use datanode::datanode::{Datanode, DatanodeOptions, ProcedureConfig, StorageConfig}; +use datanode::config::{DatanodeOptions, ProcedureConfig, StorageConfig}; +use datanode::datanode::{Datanode, DatanodeBuilder}; use datanode::region_server::RegionServer; use frontend::frontend::FrontendOptions; use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager}; @@ -138,6 +138,7 @@ impl StandaloneOptions { fn datanode_options(self) -> DatanodeOptions { DatanodeOptions { + node_id: Some(0), enable_telemetry: self.enable_telemetry, wal: self.wal, storage: self.storage, @@ -306,10 +307,11 @@ impl StartCommand { .await .context(StartFrontendSnafu)?; - let datanode = DatanodeBuilder::new(dn_opts.clone(), plugins.clone()) - .build() - .await - .context(StartDatanodeSnafu)?; + let datanode = + DatanodeBuilder::new(dn_opts.clone(), Some(kv_store.clone()), plugins.clone()) + .build() + .await + .context(StartDatanodeSnafu)?; let region_server = datanode.region_server(); let catalog_manager = KvBackendCatalogManager::new( @@ -468,7 +470,7 @@ mod tests { assert!(fe_opts.influxdb_options.enable); match &dn_opts.storage.store { - datanode::datanode::ObjectStoreConfig::S3(s3_config) => { + datanode::config::ObjectStoreConfig::S3(s3_config) => { assert_eq!( "Secret([REDACTED alloc::string::String])".to_string(), format!("{:?}", s3_config.access_key_id) diff --git a/src/datanode/src/alive_keeper.rs b/src/datanode/src/alive_keeper.rs index 37d5ee098523..0d2c096da1dd 100644 --- a/src/datanode/src/alive_keeper.rs +++ b/src/datanode/src/alive_keeper.rs @@ -42,10 +42,14 @@ const MAX_CLOSE_RETRY_TIMES: usize = 10; /// /// [RegionAliveKeeper] starts a [CountdownTask] for each region. When deadline is reached, /// the region will be closed. +/// /// The deadline is controlled by Metasrv. It works like "lease" for regions: a Datanode submits its /// opened regions to Metasrv, in heartbeats. If Metasrv decides some region could be resided in this /// Datanode, it will "extend" the region's "lease", with a deadline for [RegionAliveKeeper] to /// countdown. +/// +/// On each lease extension, [RegionAliveKeeper] will reset the deadline to the corresponding time, and +/// set region's status to "writable". pub struct RegionAliveKeeper { region_server: RegionServer, tasks: Arc>>>, @@ -313,6 +317,7 @@ impl CountdownTask { "Reset deadline of region {region_id} to approximately {} seconds later", (deadline - Instant::now()).as_secs_f32(), ); + let _ = self.region_server.set_writable(self.region_id, true); countdown.set(tokio::time::sleep_until(deadline)); } // Else the countdown could be either: diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs new file mode 100644 index 000000000000..2f1ac2796aa6 --- /dev/null +++ b/src/datanode/src/config.rs @@ -0,0 +1,405 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Datanode configurations + +use std::time::Duration; + +use common_base::readable_size::ReadableSize; +use common_config::WalConfig; +pub use common_procedure::options::ProcedureConfig; +use common_telemetry::logging::LoggingOptions; +use meta_client::MetaClientOptions; +use mito2::config::MitoConfig; +use secrecy::SecretString; +use serde::{Deserialize, Serialize}; +use servers::heartbeat_options::HeartbeatOptions; +use servers::http::HttpOptions; +use servers::Mode; +use storage::config::{ + EngineConfig as StorageEngineConfig, DEFAULT_AUTO_FLUSH_INTERVAL, DEFAULT_MAX_FLUSH_TASKS, + DEFAULT_PICKER_SCHEDULE_INTERVAL, DEFAULT_REGION_WRITE_BUFFER_SIZE, +}; +use storage::scheduler::SchedulerConfig; + +pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize(1024); + +/// Default data home in file storage +const DEFAULT_DATA_HOME: &str = "/tmp/greptimedb"; + +/// Object storage config +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type")] +pub enum ObjectStoreConfig { + File(FileConfig), + S3(S3Config), + Oss(OssConfig), + Azblob(AzblobConfig), + Gcs(GcsConfig), +} + +/// Storage engine config +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct StorageConfig { + /// Retention period for all tables. + /// + /// Default value is `None`, which means no TTL. + /// + /// The precedence order is: ttl in table options > global ttl. + #[serde(with = "humantime_serde")] + pub global_ttl: Option, + /// The working directory of database + pub data_home: String, + #[serde(flatten)] + pub store: ObjectStoreConfig, + pub compaction: CompactionConfig, + pub manifest: RegionManifestConfig, + pub flush: FlushConfig, +} + +impl Default for StorageConfig { + fn default() -> Self { + Self { + global_ttl: None, + data_home: DEFAULT_DATA_HOME.to_string(), + store: ObjectStoreConfig::default(), + compaction: CompactionConfig::default(), + manifest: RegionManifestConfig::default(), + flush: FlushConfig::default(), + } + } +} + +#[derive(Debug, Clone, Serialize, Default, Deserialize)] +#[serde(default)] +pub struct FileConfig {} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct S3Config { + pub bucket: String, + pub root: String, + #[serde(skip_serializing)] + pub access_key_id: SecretString, + #[serde(skip_serializing)] + pub secret_access_key: SecretString, + pub endpoint: Option, + pub region: Option, + pub cache_path: Option, + pub cache_capacity: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct OssConfig { + pub bucket: String, + pub root: String, + #[serde(skip_serializing)] + pub access_key_id: SecretString, + #[serde(skip_serializing)] + pub access_key_secret: SecretString, + pub endpoint: String, + pub cache_path: Option, + pub cache_capacity: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct AzblobConfig { + pub container: String, + pub root: String, + #[serde(skip_serializing)] + pub account_name: SecretString, + #[serde(skip_serializing)] + pub account_key: SecretString, + pub endpoint: String, + pub sas_token: Option, + pub cache_path: Option, + pub cache_capacity: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct GcsConfig { + pub root: String, + pub bucket: String, + pub scope: String, + #[serde(skip_serializing)] + pub credential_path: SecretString, + pub endpoint: String, + pub cache_path: Option, + pub cache_capacity: Option, +} + +impl Default for S3Config { + fn default() -> Self { + Self { + bucket: String::default(), + root: String::default(), + access_key_id: SecretString::from(String::default()), + secret_access_key: SecretString::from(String::default()), + endpoint: Option::default(), + region: Option::default(), + cache_path: Option::default(), + cache_capacity: Option::default(), + } + } +} + +impl Default for OssConfig { + fn default() -> Self { + Self { + bucket: String::default(), + root: String::default(), + access_key_id: SecretString::from(String::default()), + access_key_secret: SecretString::from(String::default()), + endpoint: String::default(), + cache_path: Option::default(), + cache_capacity: Option::default(), + } + } +} + +impl Default for AzblobConfig { + fn default() -> Self { + Self { + container: String::default(), + root: String::default(), + account_name: SecretString::from(String::default()), + account_key: SecretString::from(String::default()), + endpoint: String::default(), + cache_path: Option::default(), + cache_capacity: Option::default(), + sas_token: Option::default(), + } + } +} + +impl Default for GcsConfig { + fn default() -> Self { + Self { + root: String::default(), + bucket: String::default(), + scope: String::default(), + credential_path: SecretString::from(String::default()), + endpoint: String::default(), + cache_path: Option::default(), + cache_capacity: Option::default(), + } + } +} + +impl Default for ObjectStoreConfig { + fn default() -> Self { + ObjectStoreConfig::File(FileConfig {}) + } +} + +/// Options for region manifest +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] +#[serde(default)] +pub struct RegionManifestConfig { + /// Region manifest checkpoint actions margin. + /// Manifest service create a checkpoint every [checkpoint_margin] actions. + pub checkpoint_margin: Option, + /// Region manifest logs and checkpoints gc task execution duration. + #[serde(with = "humantime_serde")] + pub gc_duration: Option, + /// Whether to compress manifest and checkpoint file by gzip + pub compress: bool, +} + +impl Default for RegionManifestConfig { + fn default() -> Self { + Self { + checkpoint_margin: Some(10u16), + gc_duration: Some(Duration::from_secs(600)), + compress: false, + } + } +} + +/// Options for table compaction +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] +#[serde(default)] +pub struct CompactionConfig { + /// Max task number that can concurrently run. + pub max_inflight_tasks: usize, + /// Max files in level 0 to trigger compaction. + pub max_files_in_level0: usize, + /// Max task number for SST purge task after compaction. + pub max_purge_tasks: usize, + /// Buffer threshold while writing SST files + pub sst_write_buffer_size: ReadableSize, +} + +impl Default for CompactionConfig { + fn default() -> Self { + Self { + max_inflight_tasks: 4, + max_files_in_level0: 8, + max_purge_tasks: 32, + sst_write_buffer_size: ReadableSize::mb(8), + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] +#[serde(default)] +pub struct FlushConfig { + /// Max inflight flush tasks. + pub max_flush_tasks: usize, + /// Default write buffer size for a region. + pub region_write_buffer_size: ReadableSize, + /// Interval to schedule auto flush picker to find region to flush. + #[serde(with = "humantime_serde")] + pub picker_schedule_interval: Duration, + /// Interval to auto flush a region if it has not flushed yet. + #[serde(with = "humantime_serde")] + pub auto_flush_interval: Duration, + /// Global write buffer size for all regions. + pub global_write_buffer_size: Option, +} + +impl Default for FlushConfig { + fn default() -> Self { + Self { + max_flush_tasks: DEFAULT_MAX_FLUSH_TASKS, + region_write_buffer_size: DEFAULT_REGION_WRITE_BUFFER_SIZE, + picker_schedule_interval: Duration::from_millis( + DEFAULT_PICKER_SCHEDULE_INTERVAL.into(), + ), + auto_flush_interval: Duration::from_millis(DEFAULT_AUTO_FLUSH_INTERVAL.into()), + global_write_buffer_size: None, + } + } +} + +impl From<&DatanodeOptions> for SchedulerConfig { + fn from(value: &DatanodeOptions) -> Self { + Self { + max_inflight_tasks: value.storage.compaction.max_inflight_tasks, + } + } +} + +impl From<&DatanodeOptions> for StorageEngineConfig { + fn from(value: &DatanodeOptions) -> Self { + Self { + compress_manifest: value.storage.manifest.compress, + manifest_checkpoint_margin: value.storage.manifest.checkpoint_margin, + manifest_gc_duration: value.storage.manifest.gc_duration, + max_files_in_l0: value.storage.compaction.max_files_in_level0, + max_purge_tasks: value.storage.compaction.max_purge_tasks, + sst_write_buffer_size: value.storage.compaction.sst_write_buffer_size, + max_flush_tasks: value.storage.flush.max_flush_tasks, + region_write_buffer_size: value.storage.flush.region_write_buffer_size, + picker_schedule_interval: value.storage.flush.picker_schedule_interval, + auto_flush_interval: value.storage.flush.auto_flush_interval, + global_write_buffer_size: value.storage.flush.global_write_buffer_size, + global_ttl: value.storage.global_ttl, + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(default)] +pub struct DatanodeOptions { + pub mode: Mode, + pub node_id: Option, + pub rpc_addr: String, + pub rpc_hostname: Option, + pub rpc_runtime_size: usize, + pub heartbeat: HeartbeatOptions, + pub http_opts: HttpOptions, + pub meta_client_options: Option, + pub wal: WalConfig, + pub storage: StorageConfig, + /// Options for different store engines. + pub region_engine: Vec, + pub logging: LoggingOptions, + pub enable_telemetry: bool, +} + +impl Default for DatanodeOptions { + fn default() -> Self { + Self { + mode: Mode::Standalone, + node_id: None, + rpc_addr: "127.0.0.1:3001".to_string(), + rpc_hostname: None, + rpc_runtime_size: 8, + http_opts: HttpOptions::default(), + meta_client_options: None, + wal: WalConfig::default(), + storage: StorageConfig::default(), + region_engine: vec![RegionEngineConfig::Mito(MitoConfig::default())], + logging: LoggingOptions::default(), + heartbeat: HeartbeatOptions::default(), + enable_telemetry: true, + } + } +} + +impl DatanodeOptions { + pub fn env_list_keys() -> Option<&'static [&'static str]> { + Some(&["meta_client_options.metasrv_addrs"]) + } + + pub fn to_toml_string(&self) -> String { + toml::to_string(&self).unwrap() + } +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub enum RegionEngineConfig { + #[serde(rename = "mito")] + Mito(MitoConfig), +} + +#[cfg(test)] +mod tests { + use secrecy::ExposeSecret; + + use super::*; + + #[test] + fn test_toml() { + let opts = DatanodeOptions::default(); + let toml_string = toml::to_string(&opts).unwrap(); + let _parsed: DatanodeOptions = toml::from_str(&toml_string).unwrap(); + } + + #[test] + fn test_secstr() { + let toml_str = r#" + [storage] + type = "S3" + access_key_id = "access_key_id" + secret_access_key = "secret_access_key" + "#; + let opts: DatanodeOptions = toml::from_str(toml_str).unwrap(); + match opts.storage.store { + ObjectStoreConfig::S3(cfg) => { + assert_eq!( + "Secret([REDACTED alloc::string::String])".to_string(), + format!("{:?}", cfg.access_key_id) + ); + assert_eq!("access_key_id", cfg.access_key_id.expose_secret()); + } + _ => unreachable!(), + } + } +} diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index b0c73c1c4c24..4c79c0ebde5f 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -12,403 +12,269 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Datanode configurations - -pub mod builder; +//! Datanode implementation. +use std::collections::HashMap; use std::path::Path; use std::sync::Arc; -use std::time::Duration; +use catalog::kvbackend::MetaKvBackend; use catalog::memory::MemoryCatalogManager; use common_base::readable_size::ReadableSize; use common_base::Plugins; -use common_config::WalConfig; use common_error::ext::BoxedError; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; +use common_meta::key::datanode_table::DatanodeTableManager; +use common_meta::kv_backend::KvBackendRef; pub use common_procedure::options::ProcedureConfig; use common_runtime::Runtime; -use common_telemetry::info; -use common_telemetry::logging::LoggingOptions; +use common_telemetry::{error, info}; +use futures_util::StreamExt; use log_store::raft_engine::log_store::RaftEngineLogStore; -use meta_client::MetaClientOptions; -use mito2::config::MitoConfig; +use meta_client::client::MetaClient; use mito2::engine::MitoEngine; use object_store::util::normalize_dir; use query::QueryEngineFactory; -use secrecy::SecretString; -use serde::{Deserialize, Serialize}; -use servers::heartbeat_options::HeartbeatOptions; -use servers::http::HttpOptions; use servers::Mode; -use snafu::ResultExt; -use storage::config::{ - EngineConfig as StorageEngineConfig, DEFAULT_AUTO_FLUSH_INTERVAL, DEFAULT_MAX_FLUSH_TASKS, - DEFAULT_PICKER_SCHEDULE_INTERVAL, DEFAULT_REGION_WRITE_BUFFER_SIZE, -}; -use storage::scheduler::SchedulerConfig; +use snafu::{OptionExt, ResultExt}; use store_api::logstore::LogStore; use store_api::path_utils::WAL_DIR; use store_api::region_engine::RegionEngineRef; +use store_api::region_request::{RegionOpenRequest, RegionRequest}; +use store_api::storage::RegionId; use tokio::fs; +use crate::config::{DatanodeOptions, RegionEngineConfig}; use crate::error::{ - CreateDirSnafu, OpenLogStoreSnafu, Result, RuntimeResourceSnafu, ShutdownInstanceSnafu, + CreateDirSnafu, GetMetadataSnafu, MissingKvBackendSnafu, MissingMetaClientSnafu, + MissingMetasrvOptsSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result, RuntimeResourceSnafu, + ShutdownInstanceSnafu, }; -use crate::heartbeat::HeartbeatTask; +use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; +use crate::heartbeat::{new_metasrv_client, HeartbeatTask}; use crate::region_server::RegionServer; use crate::server::Services; use crate::store; pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize(1024); -/// Default data home in file storage -const DEFAULT_DATA_HOME: &str = "/tmp/greptimedb"; - -/// Object storage config -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(tag = "type")] -pub enum ObjectStoreConfig { - File(FileConfig), - S3(S3Config), - Oss(OssConfig), - Azblob(AzblobConfig), - Gcs(GcsConfig), -} - -/// Storage engine config -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(default)] -pub struct StorageConfig { - /// Retention period for all tables. - /// - /// Default value is `None`, which means no TTL. - /// - /// The precedence order is: ttl in table options > global ttl. - #[serde(with = "humantime_serde")] - pub global_ttl: Option, - /// The working directory of database - pub data_home: String, - #[serde(flatten)] - pub store: ObjectStoreConfig, - pub compaction: CompactionConfig, - pub manifest: RegionManifestConfig, - pub flush: FlushConfig, -} - -impl Default for StorageConfig { - fn default() -> Self { - Self { - global_ttl: None, - data_home: DEFAULT_DATA_HOME.to_string(), - store: ObjectStoreConfig::default(), - compaction: CompactionConfig::default(), - manifest: RegionManifestConfig::default(), - flush: FlushConfig::default(), - } - } -} - -#[derive(Debug, Clone, Serialize, Default, Deserialize)] -#[serde(default)] -pub struct FileConfig {} - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(default)] -pub struct S3Config { - pub bucket: String, - pub root: String, - #[serde(skip_serializing)] - pub access_key_id: SecretString, - #[serde(skip_serializing)] - pub secret_access_key: SecretString, - pub endpoint: Option, - pub region: Option, - pub cache_path: Option, - pub cache_capacity: Option, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(default)] -pub struct OssConfig { - pub bucket: String, - pub root: String, - #[serde(skip_serializing)] - pub access_key_id: SecretString, - #[serde(skip_serializing)] - pub access_key_secret: SecretString, - pub endpoint: String, - pub cache_path: Option, - pub cache_capacity: Option, +/// Datanode service. +pub struct Datanode { + opts: DatanodeOptions, + services: Option, + heartbeat_task: Option, + region_server: RegionServer, + greptimedb_telemetry_task: Arc, } -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(default)] -pub struct AzblobConfig { - pub container: String, - pub root: String, - #[serde(skip_serializing)] - pub account_name: SecretString, - #[serde(skip_serializing)] - pub account_key: SecretString, - pub endpoint: String, - pub sas_token: Option, - pub cache_path: Option, - pub cache_capacity: Option, -} +impl Datanode { + pub async fn start(&mut self) -> Result<()> { + info!("Starting datanode instance..."); -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(default)] -pub struct GcsConfig { - pub root: String, - pub bucket: String, - pub scope: String, - #[serde(skip_serializing)] - pub credential_path: SecretString, - pub endpoint: String, - pub cache_path: Option, - pub cache_capacity: Option, -} + self.start_heartbeat().await?; -impl Default for S3Config { - fn default() -> Self { - Self { - bucket: String::default(), - root: String::default(), - access_key_id: SecretString::from(String::default()), - secret_access_key: SecretString::from(String::default()), - endpoint: Option::default(), - region: Option::default(), - cache_path: Option::default(), - cache_capacity: Option::default(), - } + let _ = self.greptimedb_telemetry_task.start(); + self.start_services().await } -} -impl Default for OssConfig { - fn default() -> Self { - Self { - bucket: String::default(), - root: String::default(), - access_key_id: SecretString::from(String::default()), - access_key_secret: SecretString::from(String::default()), - endpoint: String::default(), - cache_path: Option::default(), - cache_capacity: Option::default(), + pub async fn start_heartbeat(&self) -> Result<()> { + if let Some(task) = &self.heartbeat_task { + task.start().await?; } + Ok(()) } -} -impl Default for AzblobConfig { - fn default() -> Self { - Self { - container: String::default(), - root: String::default(), - account_name: SecretString::from(String::default()), - account_key: SecretString::from(String::default()), - endpoint: String::default(), - cache_path: Option::default(), - cache_capacity: Option::default(), - sas_token: Option::default(), + /// Start services of datanode. This method call will block until services are shutdown. + pub async fn start_services(&mut self) -> Result<()> { + if let Some(service) = self.services.as_mut() { + service.start(&self.opts).await + } else { + Ok(()) } } -} -impl Default for GcsConfig { - fn default() -> Self { - Self { - root: String::default(), - bucket: String::default(), - scope: String::default(), - credential_path: SecretString::from(String::default()), - endpoint: String::default(), - cache_path: Option::default(), - cache_capacity: Option::default(), + async fn shutdown_services(&self) -> Result<()> { + if let Some(service) = self.services.as_ref() { + service.shutdown().await + } else { + Ok(()) } } -} - -impl Default for ObjectStoreConfig { - fn default() -> Self { - ObjectStoreConfig::File(FileConfig {}) - } -} - -/// Options for region manifest -#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] -#[serde(default)] -pub struct RegionManifestConfig { - /// Region manifest checkpoint actions margin. - /// Manifest service create a checkpoint every [checkpoint_margin] actions. - pub checkpoint_margin: Option, - /// Region manifest logs and checkpoints gc task execution duration. - #[serde(with = "humantime_serde")] - pub gc_duration: Option, - /// Whether to compress manifest and checkpoint file by gzip - pub compress: bool, -} -impl Default for RegionManifestConfig { - fn default() -> Self { - Self { - checkpoint_margin: Some(10u16), - gc_duration: Some(Duration::from_secs(600)), - compress: false, + pub async fn shutdown(&self) -> Result<()> { + // We must shutdown services first + self.shutdown_services().await?; + let _ = self.greptimedb_telemetry_task.stop().await; + if let Some(heartbeat_task) = &self.heartbeat_task { + heartbeat_task + .close() + .await + .map_err(BoxedError::new) + .context(ShutdownInstanceSnafu)?; } + self.region_server.stop().await?; + Ok(()) } -} -/// Options for table compaction -#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] -#[serde(default)] -pub struct CompactionConfig { - /// Max task number that can concurrently run. - pub max_inflight_tasks: usize, - /// Max files in level 0 to trigger compaction. - pub max_files_in_level0: usize, - /// Max task number for SST purge task after compaction. - pub max_purge_tasks: usize, - /// Buffer threshold while writing SST files - pub sst_write_buffer_size: ReadableSize, -} - -impl Default for CompactionConfig { - fn default() -> Self { - Self { - max_inflight_tasks: 4, - max_files_in_level0: 8, - max_purge_tasks: 32, - sst_write_buffer_size: ReadableSize::mb(8), - } + pub fn region_server(&self) -> RegionServer { + self.region_server.clone() } } -#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] -#[serde(default)] -pub struct FlushConfig { - /// Max inflight flush tasks. - pub max_flush_tasks: usize, - /// Default write buffer size for a region. - pub region_write_buffer_size: ReadableSize, - /// Interval to schedule auto flush picker to find region to flush. - #[serde(with = "humantime_serde")] - pub picker_schedule_interval: Duration, - /// Interval to auto flush a region if it has not flushed yet. - #[serde(with = "humantime_serde")] - pub auto_flush_interval: Duration, - /// Global write buffer size for all regions. - pub global_write_buffer_size: Option, +pub struct DatanodeBuilder { + opts: DatanodeOptions, + plugins: Arc, + meta_client: Option, + kv_backend: Option, } -impl Default for FlushConfig { - fn default() -> Self { +impl DatanodeBuilder { + /// `kv_backend` is optional. If absent, the builder will try to build one + /// by using the given `opts` + pub fn new( + opts: DatanodeOptions, + kv_backend: Option, + plugins: Arc, + ) -> Self { Self { - max_flush_tasks: DEFAULT_MAX_FLUSH_TASKS, - region_write_buffer_size: DEFAULT_REGION_WRITE_BUFFER_SIZE, - picker_schedule_interval: Duration::from_millis( - DEFAULT_PICKER_SCHEDULE_INTERVAL.into(), - ), - auto_flush_interval: Duration::from_millis(DEFAULT_AUTO_FLUSH_INTERVAL.into()), - global_write_buffer_size: None, + opts, + plugins, + meta_client: None, + kv_backend, } } -} -impl From<&DatanodeOptions> for SchedulerConfig { - fn from(value: &DatanodeOptions) -> Self { + pub fn with_meta_client(self, meta_client: MetaClient) -> Self { Self { - max_inflight_tasks: value.storage.compaction.max_inflight_tasks, + meta_client: Some(meta_client), + ..self } } -} -impl From<&DatanodeOptions> for StorageEngineConfig { - fn from(value: &DatanodeOptions) -> Self { - Self { - compress_manifest: value.storage.manifest.compress, - manifest_checkpoint_margin: value.storage.manifest.checkpoint_margin, - manifest_gc_duration: value.storage.manifest.gc_duration, - max_files_in_l0: value.storage.compaction.max_files_in_level0, - max_purge_tasks: value.storage.compaction.max_purge_tasks, - sst_write_buffer_size: value.storage.compaction.sst_write_buffer_size, - max_flush_tasks: value.storage.flush.max_flush_tasks, - region_write_buffer_size: value.storage.flush.region_write_buffer_size, - picker_schedule_interval: value.storage.flush.picker_schedule_interval, - auto_flush_interval: value.storage.flush.auto_flush_interval, - global_write_buffer_size: value.storage.flush.global_write_buffer_size, - global_ttl: value.storage.global_ttl, - } + pub async fn build(mut self) -> Result { + let mode = &self.opts.mode; + + // build meta client + let meta_client = match mode { + Mode::Distributed => { + let meta_client = if let Some(meta_client) = self.meta_client.take() { + meta_client + } else { + let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?; + + let meta_config = self + .opts + .meta_client_options + .as_ref() + .context(MissingMetasrvOptsSnafu)?; + + new_metasrv_client(node_id, meta_config).await? + }; + Some(meta_client) + } + Mode::Standalone => None, + }; + + // build kv-backend + let kv_backend = match mode { + Mode::Distributed => Arc::new(MetaKvBackend { + client: Arc::new(meta_client.clone().context(MissingMetaClientSnafu)?), + }), + Mode::Standalone => self.kv_backend.clone().context(MissingKvBackendSnafu)?, + }; + + // build and initialize region server + let log_store = Self::build_log_store(&self.opts).await?; + let region_server = + Self::new_region_server(&self.opts, self.plugins.clone(), log_store).await?; + self.initialize_region_server(®ion_server, kv_backend, matches!(mode, Mode::Standalone)) + .await?; + + let heartbeat_task = match mode { + Mode::Distributed => { + let meta_client = meta_client.context(MissingMetaClientSnafu)?; + + let heartbeat_task = + HeartbeatTask::try_new(&self.opts, region_server.clone(), meta_client).await?; + Some(heartbeat_task) + } + Mode::Standalone => None, + }; + + let services = match mode { + Mode::Distributed => Some(Services::try_new(region_server.clone(), &self.opts).await?), + Mode::Standalone => None, + }; + + let greptimedb_telemetry_task = get_greptimedb_telemetry_task( + Some(self.opts.storage.data_home.clone()), + mode, + self.opts.enable_telemetry, + ) + .await; + + Ok(Datanode { + opts: self.opts, + services, + heartbeat_task, + region_server, + greptimedb_telemetry_task, + }) } -} -#[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(default)] -pub struct DatanodeOptions { - pub mode: Mode, - pub node_id: Option, - pub rpc_addr: String, - pub rpc_hostname: Option, - pub rpc_runtime_size: usize, - pub heartbeat: HeartbeatOptions, - pub http_opts: HttpOptions, - pub meta_client_options: Option, - pub wal: WalConfig, - pub storage: StorageConfig, - /// Options for different store engines. - pub region_engine: Vec, - pub logging: LoggingOptions, - pub enable_telemetry: bool, -} + /// Open all regions belong to this datanode. + async fn initialize_region_server( + &self, + region_server: &RegionServer, + kv_backend: KvBackendRef, + open_with_writable: bool, + ) -> Result<()> { + let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone()); + let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?; + let mut regions = vec![]; + let mut table_values = datanode_table_manager.tables(node_id); + while let Some(table_value) = table_values.next().await { + let table_value = table_value.context(GetMetadataSnafu)?; + for region_number in table_value.regions { + regions.push(( + RegionId::new(table_value.table_id, region_number), + table_value.engine.clone(), + table_value.region_storage_path.clone(), + )); + } + } -impl Default for DatanodeOptions { - fn default() -> Self { - Self { - mode: Mode::Standalone, - node_id: None, - rpc_addr: "127.0.0.1:3001".to_string(), - rpc_hostname: None, - rpc_runtime_size: 8, - http_opts: HttpOptions::default(), - meta_client_options: None, - wal: WalConfig::default(), - storage: StorageConfig::default(), - region_engine: vec![RegionEngineConfig::Mito(MitoConfig::default())], - logging: LoggingOptions::default(), - heartbeat: HeartbeatOptions::default(), - enable_telemetry: true, + info!("going to open {} regions", regions.len()); + + for (region_id, engine, region_dir) in regions { + region_server + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: engine.clone(), + region_dir, + options: HashMap::new(), + }), + ) + .await?; + if open_with_writable { + if let Err(e) = region_server.set_writable(region_id, true) { + error!( + e; "failed to set writable for region {region_id}" + ); + } + } } - } -} -impl DatanodeOptions { - pub fn env_list_keys() -> Option<&'static [&'static str]> { - Some(&["meta_client_options.metasrv_addrs"]) - } + info!("region server is initialized"); - pub fn to_toml_string(&self) -> String { - toml::to_string(&self).unwrap() + Ok(()) } -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub enum RegionEngineConfig { - #[serde(rename = "mito")] - Mito(MitoConfig), -} -/// Datanode service. -pub struct Datanode { - opts: DatanodeOptions, - services: Option, - heartbeat_task: Option, - region_server: RegionServer, - greptimedb_telemetry_task: Arc, -} - -impl Datanode { async fn new_region_server( opts: &DatanodeOptions, plugins: Arc, + log_store: Arc, ) -> Result { let query_engine_factory = QueryEngineFactory::new_with_plugins( // query engine in datanode only executes plan with resolved table source. @@ -428,7 +294,6 @@ impl Datanode { ); let mut region_server = RegionServer::new(query_engine.clone(), runtime.clone()); - let log_store = Self::build_log_store(opts).await?; let object_store = store::new_object_store(opts).await?; let engines = Self::build_store_engines(opts, log_store, object_store).await?; for engine in engines { @@ -438,60 +303,6 @@ impl Datanode { Ok(region_server) } - pub async fn start(&mut self) -> Result<()> { - info!("Starting datanode instance..."); - - self.start_heartbeat().await?; - - let _ = self.greptimedb_telemetry_task.start(); - self.start_services().await - } - - pub async fn start_heartbeat(&self) -> Result<()> { - if let Some(task) = &self.heartbeat_task { - task.start().await?; - } - Ok(()) - } - - /// Start services of datanode. This method call will block until services are shutdown. - pub async fn start_services(&mut self) -> Result<()> { - if let Some(service) = self.services.as_mut() { - service.start(&self.opts).await - } else { - Ok(()) - } - } - - async fn shutdown_services(&self) -> Result<()> { - if let Some(service) = self.services.as_ref() { - service.shutdown().await - } else { - Ok(()) - } - } - - pub async fn shutdown(&self) -> Result<()> { - // We must shutdown services first - self.shutdown_services().await?; - let _ = self.greptimedb_telemetry_task.stop().await; - if let Some(heartbeat_task) = &self.heartbeat_task { - heartbeat_task - .close() - .await - .map_err(BoxedError::new) - .context(ShutdownInstanceSnafu)?; - } - self.region_server.stop().await?; - Ok(()) - } - - pub fn region_server(&self) -> RegionServer { - self.region_server.clone() - } - - // internal utils - /// Build [RaftEngineLogStore] async fn build_log_store(opts: &DatanodeOptions) -> Result> { let data_home = normalize_dir(&opts.storage.data_home); @@ -535,38 +346,3 @@ impl Datanode { Ok(engines) } } - -#[cfg(test)] -mod tests { - use secrecy::ExposeSecret; - - use super::*; - - #[test] - fn test_toml() { - let opts = DatanodeOptions::default(); - let toml_string = toml::to_string(&opts).unwrap(); - let _parsed: DatanodeOptions = toml::from_str(&toml_string).unwrap(); - } - - #[test] - fn test_secstr() { - let toml_str = r#" - [storage] - type = "S3" - access_key_id = "access_key_id" - secret_access_key = "secret_access_key" - "#; - let opts: DatanodeOptions = toml::from_str(toml_str).unwrap(); - match opts.storage.store { - ObjectStoreConfig::S3(cfg) => { - assert_eq!( - "Secret([REDACTED alloc::string::String])".to_string(), - format!("{:?}", cfg.access_key_id) - ); - assert_eq!("access_key_id", cfg.access_key_id.expose_secret()); - } - _ => unreachable!(), - } - } -} diff --git a/src/datanode/src/datanode/builder.rs b/src/datanode/src/datanode/builder.rs deleted file mode 100644 index 6f2379d6a2a9..000000000000 --- a/src/datanode/src/datanode/builder.rs +++ /dev/null @@ -1,98 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use common_base::Plugins; -use meta_client::client::MetaClient; -use servers::Mode; -use snafu::OptionExt; - -use crate::datanode::{Datanode, DatanodeOptions}; -use crate::error::{MissingMetasrvOptsSnafu, MissingNodeIdSnafu, Result}; -use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; -use crate::heartbeat::{new_metasrv_client, HeartbeatTask}; -use crate::server::Services; - -pub struct DatanodeBuilder { - opts: DatanodeOptions, - plugins: Arc, - meta_client: Option, -} - -impl DatanodeBuilder { - pub fn new(opts: DatanodeOptions, plugins: Arc) -> Self { - Self { - opts, - plugins, - meta_client: None, - } - } - - pub fn with_meta_client(self, meta_client: MetaClient) -> Self { - Self { - meta_client: Some(meta_client), - ..self - } - } - - pub async fn build(mut self) -> Result { - let region_server = Datanode::new_region_server(&self.opts, self.plugins.clone()).await?; - - let mode = &self.opts.mode; - - let heartbeat_task = match mode { - Mode::Distributed => { - let meta_client = if let Some(meta_client) = self.meta_client.take() { - meta_client - } else { - let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?; - - let meta_config = self - .opts - .meta_client_options - .as_ref() - .context(MissingMetasrvOptsSnafu)?; - - new_metasrv_client(node_id, meta_config).await? - }; - - let heartbeat_task = - HeartbeatTask::try_new(&self.opts, region_server.clone(), meta_client).await?; - Some(heartbeat_task) - } - Mode::Standalone => None, - }; - - let services = match mode { - Mode::Distributed => Some(Services::try_new(region_server.clone(), &self.opts).await?), - Mode::Standalone => None, - }; - - let greptimedb_telemetry_task = get_greptimedb_telemetry_task( - Some(self.opts.storage.data_home.clone()), - mode, - self.opts.enable_telemetry, - ) - .await; - - Ok(Datanode { - opts: self.opts, - services, - heartbeat_task, - region_server, - greptimedb_telemetry_task, - }) - } -} diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index bd0e6555201b..d3db68da80c3 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -94,6 +94,12 @@ pub enum Error { source: common_meta::error::Error, }, + #[snafu(display("Failed to get info from meta server, source: {}", source))] + GetMetadata { + location: Location, + source: common_meta::error::Error, + }, + #[snafu(display("Failed to execute sql, source: {}", source))] ExecuteSql { location: Location, @@ -266,6 +272,12 @@ pub enum Error { source: common_runtime::error::Error, }, + #[snafu(display("Expect KvBackend but not found"))] + MissingKvBackend { location: Location }, + + #[snafu(display("Expect MetaClient but not found, location: {}", location))] + MissingMetaClient { location: Location }, + #[snafu(display("Invalid SQL, error: {}", msg))] InvalidSql { msg: String }, @@ -366,10 +378,10 @@ pub enum Error { source: table::error::Error, }, - #[snafu(display("Missing node id option in distributed mode"))] + #[snafu(display("Missing node id in Datanode config, location: {}", location))] MissingNodeId { location: Location }, - #[snafu(display("Missing node id option in distributed mode"))] + #[snafu(display("Missing node id option in distributed mode, location: {}", location))] MissingMetasrvOpts { location: Location }, #[snafu(display("Missing required field: {}", name))] @@ -587,7 +599,9 @@ impl ErrorExt for Error { | ExecuteLogicalPlan { source, .. } => source.status_code(), BuildRegionRequests { source, .. } => source.status_code(), - HandleHeartbeatResponse { source, .. } => source.status_code(), + HandleHeartbeatResponse { source, .. } | GetMetadata { source, .. } => { + source.status_code() + } DecodeLogicalPlan { source, .. } => source.status_code(), RegisterSchema { source, .. } => source.status_code(), @@ -629,7 +643,9 @@ impl ErrorExt for Error { | MissingWalDirConfig { .. } | PrepareImmutableTable { .. } | InvalidInsertRowLen { .. } - | ColumnDataType { .. } => StatusCode::InvalidArguments, + | ColumnDataType { .. } + | MissingKvBackend { .. } + | MissingMetaClient { .. } => StatusCode::InvalidArguments, EncodeJson { .. } | DecodeJson { .. } | PayloadNotExist { .. } | Unexpected { .. } => { StatusCode::Unexpected diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index deac537772b9..1b08166d7b0b 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -33,7 +33,7 @@ use tokio::time::Instant; use self::handler::RegionHeartbeatResponseHandler; use crate::alive_keeper::RegionAliveKeeper; -use crate::datanode::DatanodeOptions; +use crate::config::DatanodeOptions; use crate::error::{self, MetaClientInitSnafu, Result}; use crate::region_server::RegionServer; diff --git a/src/datanode/src/lib.rs b/src/datanode/src/lib.rs index 50e4ff278c52..72f705c78c1b 100644 --- a/src/datanode/src/lib.rs +++ b/src/datanode/src/lib.rs @@ -18,6 +18,7 @@ use query::query_engine::SqlStatementExecutor; pub mod alive_keeper; +pub mod config; pub mod datanode; pub mod error; mod greptimedb_telemetry; diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 5d37f37a5377..b168d85c875b 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -96,6 +96,17 @@ impl RegionServer { .collect() } + pub fn set_writable(&self, region_id: RegionId, writable: bool) -> Result<()> { + let engine = self + .inner + .region_map + .get(®ion_id) + .with_context(|| RegionNotFoundSnafu { region_id })?; + engine + .set_writable(region_id, writable) + .with_context(|_| HandleRegionRequestSnafu { region_id }) + } + pub fn runtime(&self) -> Arc { self.inner.runtime.clone() } diff --git a/src/datanode/src/server.rs b/src/datanode/src/server.rs index 41ca6365cc52..5ff2dd680b70 100644 --- a/src/datanode/src/server.rs +++ b/src/datanode/src/server.rs @@ -22,7 +22,7 @@ use servers::metrics_handler::MetricsHandler; use servers::server::Server; use snafu::ResultExt; -use crate::datanode::DatanodeOptions; +use crate::config::DatanodeOptions; use crate::error::{ ParseAddrSnafu, Result, ShutdownServerSnafu, StartServerSnafu, WaitForGrpcServingSnafu, }; diff --git a/src/datanode/src/store.rs b/src/datanode/src/store.rs index a9d13050a104..937729671fb3 100644 --- a/src/datanode/src/store.rs +++ b/src/datanode/src/store.rs @@ -32,7 +32,7 @@ use object_store::util::normalize_dir; use object_store::{util, HttpClient, ObjectStore, ObjectStoreBuilder}; use snafu::prelude::*; -use crate::datanode::{DatanodeOptions, ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE}; +use crate::config::{DatanodeOptions, ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE}; use crate::error::{self, Result}; pub(crate) async fn new_object_store(opts: &DatanodeOptions) -> Result { diff --git a/src/datanode/src/store/azblob.rs b/src/datanode/src/store/azblob.rs index 97319e13e87e..53ea66a83d28 100644 --- a/src/datanode/src/store/azblob.rs +++ b/src/datanode/src/store/azblob.rs @@ -18,7 +18,7 @@ use object_store::{util, ObjectStore}; use secrecy::ExposeSecret; use snafu::prelude::*; -use crate::datanode::AzblobConfig; +use crate::config::AzblobConfig; use crate::error::{self, Result}; use crate::store::build_http_client; diff --git a/src/datanode/src/store/fs.rs b/src/datanode/src/store/fs.rs index a065a3d27c51..51f1c79a833d 100644 --- a/src/datanode/src/store/fs.rs +++ b/src/datanode/src/store/fs.rs @@ -19,7 +19,7 @@ use object_store::services::Fs as FsBuilder; use object_store::ObjectStore; use snafu::prelude::*; -use crate::datanode::FileConfig; +use crate::config::FileConfig; use crate::error::{self, Result}; use crate::store; diff --git a/src/datanode/src/store/gcs.rs b/src/datanode/src/store/gcs.rs index 2cb6a8e82cf6..57e4a68c6d85 100644 --- a/src/datanode/src/store/gcs.rs +++ b/src/datanode/src/store/gcs.rs @@ -18,7 +18,7 @@ use object_store::{util, ObjectStore}; use secrecy::ExposeSecret; use snafu::prelude::*; -use crate::datanode::GcsConfig; +use crate::config::GcsConfig; use crate::error::{self, Result}; use crate::store::build_http_client; diff --git a/src/datanode/src/store/oss.rs b/src/datanode/src/store/oss.rs index 588b7602ba03..3278b83d7396 100644 --- a/src/datanode/src/store/oss.rs +++ b/src/datanode/src/store/oss.rs @@ -18,7 +18,7 @@ use object_store::{util, ObjectStore}; use secrecy::ExposeSecret; use snafu::prelude::*; -use crate::datanode::OssConfig; +use crate::config::OssConfig; use crate::error::{self, Result}; use crate::store::build_http_client; diff --git a/src/datanode/src/store/s3.rs b/src/datanode/src/store/s3.rs index d240500b6cb5..5c67cd6def7e 100644 --- a/src/datanode/src/store/s3.rs +++ b/src/datanode/src/store/s3.rs @@ -18,7 +18,7 @@ use object_store::{util, ObjectStore}; use secrecy::ExposeSecret; use snafu::prelude::*; -use crate::datanode::S3Config; +use crate::config::S3Config; use crate::error::{self, Result}; use crate::store::build_http_client; diff --git a/tests/cases/standalone/tql-explain-analyze/analyze.result b/tests/cases/standalone/tql-explain-analyze/analyze.result index 46e4b3b4e7a3..32bb5223d6c9 100644 --- a/tests/cases/standalone/tql-explain-analyze/analyze.result +++ b/tests/cases/standalone/tql-explain-analyze/analyze.result @@ -12,6 +12,7 @@ Affected Rows: 3 -- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED -- SQLNESS REPLACE (-+) - -- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED TQL ANALYZE (0, 10, '5s') test; +-+-+ @@ -22,7 +23,7 @@ TQL ANALYZE (0, 10, '5s') test; |_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false], REDACTED |_|_PromSeriesDivideExec: tags=["k"], REDACTED |_|_SortExec: expr=[k@2 DESC NULLS LAST,j@1 DESC NULLS LAST], REDACTED -|_|_StreamScanAdapter { stream: "", schema: [Field { name: "i", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "j", data_type: Timestamp(Millisecond, None), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"greptime:time_index": "true"} }, Field { name: "k", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }] }, REDACTED +|_|_MergeScanExec: REDACTED |_|_| +-+-+ diff --git a/tests/cases/standalone/tql-explain-analyze/analyze.sql b/tests/cases/standalone/tql-explain-analyze/analyze.sql index 677e9069bcd9..e9e01bfb091c 100644 --- a/tests/cases/standalone/tql-explain-analyze/analyze.sql +++ b/tests/cases/standalone/tql-explain-analyze/analyze.sql @@ -8,6 +8,7 @@ INSERT INTO test VALUES (1, 1, "a"), (1, 1, "b"), (2, 2, "a"); -- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED -- SQLNESS REPLACE (-+) - -- SQLNESS REPLACE (\s\s+) _ +-- SQLNESS REPLACE (peers.*) REDACTED TQL ANALYZE (0, 10, '5s') test; DROP TABLE test; diff --git a/tests/cases/standalone/tql-explain-analyze/explain.result b/tests/cases/standalone/tql-explain-analyze/explain.result index ae1b4bb9368d..edc07a6a7f8e 100644 --- a/tests/cases/standalone/tql-explain-analyze/explain.result +++ b/tests/cases/standalone/tql-explain-analyze/explain.result @@ -9,23 +9,24 @@ Affected Rows: 3 -- explain at 0s, 5s and 10s. No point at 0s. -- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED TQL EXPLAIN (0, 10, '5s') test; -+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[j] | -| | PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false] | -| | PromSeriesDivide: tags=["k"] | -| | Sort: test.k DESC NULLS LAST, test.j DESC NULLS LAST | -| | TableScan: test projection=[i, j, k], partial_filters=[j >= TimestampMillisecond(-300000, None), j <= TimestampMillisecond(300000, None)] | -| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j] | -| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] | -| | PromSeriesDivideExec: tags=["k"] | -| | SortExec: expr=[k@2 DESC NULLS LAST,j@1 DESC NULLS LAST] | -| | StreamScanAdapter { stream: "", schema: [Field { name: "i", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "j", data_type: Timestamp(Millisecond, None), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"greptime:time_index": "true"} }, Field { name: "k", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }] } | -| | | -+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ ++---------------+-----------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-----------------------------------------------------------------------------------------------+ +| logical_plan | PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[j] | +| | PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false] | +| | PromSeriesDivide: tags=["k"] | +| | Sort: test.k DESC NULLS LAST, test.j DESC NULLS LAST | +| | MergeScan [is_placeholder=false] | +| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j] | +| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] | +| | PromSeriesDivideExec: tags=["k"] | +| | SortExec: expr=[k@2 DESC NULLS LAST,j@1 DESC NULLS LAST] | +| | MergeScanExec: REDACTED +| | | ++---------------+-----------------------------------------------------------------------------------------------+ DROP TABLE test; diff --git a/tests/cases/standalone/tql-explain-analyze/explain.sql b/tests/cases/standalone/tql-explain-analyze/explain.sql index 448c185c01b7..66cbb7186d8e 100644 --- a/tests/cases/standalone/tql-explain-analyze/explain.sql +++ b/tests/cases/standalone/tql-explain-analyze/explain.sql @@ -5,6 +5,7 @@ INSERT INTO test VALUES (1, 1, "a"), (1, 1, "b"), (2, 2, "a"); -- explain at 0s, 5s and 10s. No point at 0s. -- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED +-- SQLNESS REPLACE (peers.*) REDACTED TQL EXPLAIN (0, 10, '5s') test; DROP TABLE test;