Skip to content

Commit

Permalink
feat: support table ddl for custom_storage
Browse files Browse the repository at this point in the history
  • Loading branch information
NiwakaDev committed Nov 11, 2023
1 parent b53537e commit 1bb6b6a
Show file tree
Hide file tree
Showing 22 changed files with 438 additions and 56 deletions.
4 changes: 3 additions & 1 deletion config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ sync_write = false
[storage]
# The working home directory.
data_home = "/tmp/greptimedb/"
type = "File"
# TTL for all tables. Disabled by default.
# global_ttl = "7d"

Expand All @@ -53,6 +52,9 @@ type = "File"
# The local file cache capacity in bytes.
# cache_capacity = "256MB"

[storage.default_store]
type = "File"

# Compaction options, see `standalone.example.toml`.
[storage.compaction]
max_inflight_tasks = 4
Expand Down
4 changes: 2 additions & 2 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,14 @@ retry_delay = "500ms"
[storage]
# The working home directory.
data_home = "/tmp/greptimedb/"
# Storage type.
type = "File"
# TTL for all tables. Disabled by default.
# global_ttl = "7d"
# Cache configuration for object storage such as 'S3' etc.
# cache_path = "/path/local_cache"
# The local file cache capacity in bytes.
# cache_capacity = "256MB"
[storage.default_store]
type = "File"

# Compaction options.
[storage.compaction]
Expand Down
9 changes: 8 additions & 1 deletion src/client/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use common_telemetry::error;
use prost::Message;
use snafu::{location, Location, OptionExt, ResultExt};
use tokio_stream::StreamExt;
use tonic::Code;

use crate::error::Error::RegionServer;
use crate::error::{
Expand All @@ -45,7 +46,13 @@ pub struct RegionRequester {
impl Datanode for RegionRequester {
async fn handle(&self, request: RegionRequest) -> MetaResult<AffectedRows> {
self.handle_inner(request).await.map_err(|err| {
if matches!(err, RegionServer { .. }) {
if !matches!(
err,
RegionServer {
code: Code::InvalidArgument,
..
}
) {
meta_error::Error::RetryLater {
source: BoxedError::new(err),
}
Expand Down
27 changes: 24 additions & 3 deletions src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,9 @@ mod tests {
use std::time::Duration;

use common_test_util::temp_dir::create_named_temp_file;
use datanode::config::{CompactionConfig, FileConfig, ObjectStoreConfig, RegionManifestConfig};
use datanode::config::{
CompactionConfig, FileConfig, GcsConfig, ObjectStoreConfig, RegionManifestConfig, S3Config,
};
use servers::heartbeat_options::HeartbeatOptions;
use servers::Mode;

Expand Down Expand Up @@ -229,9 +231,19 @@ mod tests {
sync_write = false
[storage]
type = "File"
data_home = "/tmp/greptimedb/"
[storage.default_store]
type = "File"
[[storage.custom_stores]]
type = "Gcs"
bucket = "foo"
endpoint = "bar"
[[storage.custom_stores]]
type = "S3"
bucket = "foo"
[storage.compaction]
max_inflight_tasks = 3
max_files_in_level0 = 7
Expand Down Expand Up @@ -290,9 +302,18 @@ mod tests {
assert!(tcp_nodelay);
assert_eq!("/tmp/greptimedb/", options.storage.data_home);
assert!(matches!(
&options.storage.store,
&options.storage.default_store,
ObjectStoreConfig::File(FileConfig { .. })
));
assert_eq!(options.storage.custom_stores.len(), 2);
assert!(matches!(
options.storage.custom_stores[0],
ObjectStoreConfig::Gcs(GcsConfig { .. })
));
assert!(matches!(
options.storage.custom_stores[1],
ObjectStoreConfig::S3(S3Config { .. })
));

assert_eq!(
CompactionConfig {
Expand Down
4 changes: 3 additions & 1 deletion src/cmd/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ mod tests {
[
env_prefix.to_string(),
"storage".to_uppercase(),
"default_store".to_uppercase(),
"type".to_uppercase(),
]
.join(ENV_VAR_SEP),
Expand All @@ -211,6 +212,7 @@ mod tests {
[
env_prefix.to_string(),
"storage".to_uppercase(),
"default_store".to_uppercase(),
"bucket".to_uppercase(),
]
.join(ENV_VAR_SEP),
Expand Down Expand Up @@ -258,7 +260,7 @@ mod tests {

// Check the configs from environment variables.
assert_eq!(opts.storage.manifest.checkpoint_margin, Some(99));
match opts.storage.store {
match &opts.storage.default_store {
ObjectStoreConfig::S3(s3_config) => {
assert_eq!(s3_config.bucket, "mybucket".to_string());
}
Expand Down
24 changes: 22 additions & 2 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ mod tests {
use auth::{Identity, Password, UserProviderRef};
use common_base::readable_size::ReadableSize;
use common_test_util::temp_dir::create_named_temp_file;
use datanode::config::{FileConfig, GcsConfig};
use servers::Mode;

use super::*;
Expand Down Expand Up @@ -465,8 +466,18 @@ mod tests {
purge_interval = "10m"
read_batch_size = 128
sync_write = false
[storage]
data_home = "/tmp/greptimedb/"
[storage.default_store]
type = "File"
[[storage.custom_stores]]
type = "Gcs"
bucket = "foo"
endpoint = "bar"
[[storage.custom_stores]]
type = "S3"
access_key_id = "access_key_id"
secret_access_key = "secret_access_key"
Expand Down Expand Up @@ -516,7 +527,16 @@ mod tests {

assert_eq!("/tmp/greptimedb/test/wal", dn_opts.wal.dir.unwrap());

match &dn_opts.storage.store {
assert!(matches!(
&dn_opts.storage.default_store,
datanode::config::ObjectStoreConfig::File(FileConfig { .. })
));
assert_eq!(dn_opts.storage.custom_stores.len(), 2);
assert!(matches!(
dn_opts.storage.custom_stores[0],
datanode::config::ObjectStoreConfig::Gcs(GcsConfig { .. })
));
match &dn_opts.storage.custom_stores[1] {
datanode::config::ObjectStoreConfig::S3(s3_config) => {
assert_eq!(
"Secret([REDACTED alloc::string::String])".to_string(),
Expand Down
23 changes: 18 additions & 5 deletions src/datanode/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,18 @@ pub enum ObjectStoreConfig {
Gcs(GcsConfig),
}

impl ObjectStoreConfig {
pub fn extract_variant_name(&self) -> &'static str {
match self {
Self::File(_) => "File",
Self::S3(_) => "S3",
Self::Oss(_) => "Oss",
Self::Azblob(_) => "Azblob",
Self::Gcs(_) => "Gcs",
}
}
}

/// Storage engine config
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
Expand All @@ -66,8 +78,8 @@ pub struct StorageConfig {
pub global_ttl: Option<Duration>,
/// The working directory of database
pub data_home: String,
#[serde(flatten)]
pub store: ObjectStoreConfig,
pub default_store: ObjectStoreConfig,
pub custom_stores: Vec<ObjectStoreConfig>,
pub compaction: CompactionConfig,
pub manifest: RegionManifestConfig,
pub flush: FlushConfig,
Expand All @@ -78,10 +90,11 @@ impl Default for StorageConfig {
Self {
global_ttl: None,
data_home: DEFAULT_DATA_HOME.to_string(),
store: ObjectStoreConfig::default(),
custom_stores: vec![],
compaction: CompactionConfig::default(),
manifest: RegionManifestConfig::default(),
flush: FlushConfig::default(),
default_store: ObjectStoreConfig::default(),
}
}
}
Expand Down Expand Up @@ -403,13 +416,13 @@ mod tests {
#[test]
fn test_secstr() {
let toml_str = r#"
[storage]
[storage.default_store]
type = "S3"
access_key_id = "access_key_id"
secret_access_key = "secret_access_key"
"#;
let opts: DatanodeOptions = toml::from_str(toml_str).unwrap();
match opts.storage.store {
match &opts.storage.default_store {
ObjectStoreConfig::S3(cfg) => {
assert_eq!(
"Secret([REDACTED alloc::string::String])".to_string(),
Expand Down
27 changes: 19 additions & 8 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,6 @@ impl DatanodeBuilder {
));
}
}

info!("going to open {} regions", regions.len());
let semaphore = Arc::new(tokio::sync::Semaphore::new(OPEN_REGION_PARALLELISM));
let mut tasks = vec![];
Expand Down Expand Up @@ -354,13 +353,8 @@ impl DatanodeBuilder {

let mut region_server =
RegionServer::new(query_engine.clone(), runtime.clone(), event_listener);
let object_store = store::new_object_store(opts).await?;
let object_store_manager = ObjectStoreManager::new(
"default", // TODO: use a name which is set in the configuration when #919 is done.
object_store,
);
let engines =
Self::build_store_engines(opts, log_store, Arc::new(object_store_manager)).await?;
let object_store_manager = Self::build_object_store_manager(opts).await?;
let engines = Self::build_store_engines(opts, log_store, object_store_manager).await?;
for engine in engines {
region_server.register_engine(engine);
}
Expand Down Expand Up @@ -425,6 +419,23 @@ impl DatanodeBuilder {
}
Ok(engines)
}

/// Builds [ObjectStoreManager]
async fn build_object_store_manager(opts: &DatanodeOptions) -> Result<ObjectStoreManagerRef> {
let object_store =
store::new_object_store(opts.storage.default_store.clone(), &opts.storage.data_home)
.await?;
let default_name = opts.storage.default_store.extract_variant_name();
let mut object_store_manager = ObjectStoreManager::new(default_name, object_store);
for store in &opts.storage.custom_stores {
let name = store.extract_variant_name();
object_store_manager.add(
name,
store::new_object_store(store.clone(), &opts.storage.data_home).await?,
);
}
Ok(Arc::new(object_store_manager))
}
}

#[cfg(test)]
Expand Down
16 changes: 9 additions & 7 deletions src/datanode/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,15 @@ use object_store::util::normalize_dir;
use object_store::{util, HttpClient, ObjectStore, ObjectStoreBuilder};
use snafu::prelude::*;

use crate::config::{DatanodeOptions, ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
use crate::config::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
use crate::error::{self, Result};

pub(crate) async fn new_object_store(opts: &DatanodeOptions) -> Result<ObjectStore> {
let data_home = normalize_dir(&opts.storage.data_home);
let object_store = match &opts.storage.store {
pub(crate) async fn new_object_store(
store: ObjectStoreConfig,
data_home: &str,
) -> Result<ObjectStore> {
let data_home = normalize_dir(data_home);
let object_store = match &store {
ObjectStoreConfig::File(file_config) => {
fs::new_fs_object_store(&data_home, file_config).await
}
Expand All @@ -50,9 +53,8 @@ pub(crate) async fn new_object_store(opts: &DatanodeOptions) -> Result<ObjectSto
}?;

// Enable retry layer and cache layer for non-fs object storages
let object_store = if !matches!(opts.storage.store, ObjectStoreConfig::File(..)) {
let object_store =
create_object_store_with_cache(object_store, &opts.storage.store).await?;
let object_store = if !matches!(store, ObjectStoreConfig::File(..)) {
let object_store = create_object_store_with_cache(object_store, &store).await?;
object_store.layer(RetryLayer::new().with_jitter())
} else {
object_store
Expand Down
2 changes: 2 additions & 0 deletions src/sql/src/parsers/create_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ impl<'a> ParserContext<'a> {
}
);
}
// Sorts options so that `test_display_create_table` can always pass.
let options = options.into_iter().sorted().collect();
let create_table = CreateTable {
if_not_exists,
name: table_name,
Expand Down
3 changes: 2 additions & 1 deletion src/sql/src/statements/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ mod tests {
PARTITION r2 VALUES LESS THAN (MAXVALUE),
)
engine=mito
with(regions=1, ttl='7d');
with(regions=1, ttl='7d', storage='File');
";
let result = ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}).unwrap();
assert_eq!(1, result.len());
Expand All @@ -267,6 +267,7 @@ PARTITION BY RANGE COLUMNS (ts) (
ENGINE=mito
WITH(
regions = 1,
storage = 'File',
ttl = '7d'
)"#,
&new_sql
Expand Down
3 changes: 3 additions & 0 deletions src/table/src/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ pub struct TableOptions {
pub const WRITE_BUFFER_SIZE_KEY: &str = "write_buffer_size";
pub const TTL_KEY: &str = "ttl";
pub const REGIONS_KEY: &str = "regions";
pub const STORAGE_KEY: &str = "storage";

impl TryFrom<&HashMap<String, String>> for TableOptions {
type Error = error::Error;
Expand Down Expand Up @@ -340,6 +341,7 @@ pub fn valid_table_option(key: &str) -> bool {
| WRITE_BUFFER_SIZE_KEY
| TTL_KEY
| REGIONS_KEY
| STORAGE_KEY
) | is_supported_in_s3(key)
}

Expand All @@ -365,6 +367,7 @@ mod tests {
assert!(valid_table_option(TTL_KEY));
assert!(valid_table_option(REGIONS_KEY));
assert!(valid_table_option(WRITE_BUFFER_SIZE_KEY));
assert!(valid_table_option(STORAGE_KEY));
assert!(!valid_table_option("foo"));
}

Expand Down
Loading

0 comments on commit 1bb6b6a

Please sign in to comment.