Skip to content

Commit

Permalink
feat: support custom storage for every table(initial)
Browse files Browse the repository at this point in the history
  • Loading branch information
NiwakaDev committed Oct 15, 2023
1 parent 201acd1 commit 8e6016c
Show file tree
Hide file tree
Showing 43 changed files with 965 additions and 196 deletions.
2 changes: 1 addition & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ GT_AZBLOB_ENDPOINT=AZBLOB endpoint
GT_GCS_BUCKET = GCS bucket
GT_GCS_SCOPE = GCS scope
GT_GCS_CREDENTIAL_PATH = GCS credential path
GT_GCS_ENDPOINT = GCS end point
GT_GCS_ENDPOINT = GCS end point
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ sync_write = false
[storage]
# The working home directory.
data_home = "/tmp/greptimedb/"
# global_store = "File"
# Storage type.
[[storage.store]]
type = "File"
# TTL for all tables. Disabled by default.
# global_ttl = "7d"

# Compaction options, see `standalone.example.toml`.
[storage.compaction]
Expand Down
2 changes: 1 addition & 1 deletion config/metasrv.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ retry_delay = "500ms"
# [datanode.client_options]
# timeout_millis = 10000
# connect_timeout_millis = 10000
# tcp_nodelay = true
# tcp_nodelay = true
5 changes: 4 additions & 1 deletion config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,11 @@ retry_delay = "500ms"
[storage]
# The working home directory.
data_home = "/tmp/greptimedb/"
#global_store = "File"
# Storage type.
type = "File"
#[[storage.store]]
#type = "File"

# TTL for all tables. Disabled by default.
# global_ttl = "7d"

Expand Down
20 changes: 16 additions & 4 deletions src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,9 @@ mod tests {

use common_base::readable_size::ReadableSize;
use common_test_util::temp_dir::create_named_temp_file;
use datanode::config::{CompactionConfig, FileConfig, ObjectStoreConfig, RegionManifestConfig};
use datanode::config::{
CompactionConfig, FileConfig, ObjectStoreConfig, RegionManifestConfig, S3Config,
};
use servers::Mode;

use super::*;
Expand Down Expand Up @@ -212,8 +214,14 @@ mod tests {
sync_write = false
[storage]
type = "File"
data_home = "/tmp/greptimedb/"
[[storage.store]]
type = "File"
[[storage.store]]
type = "S3"
access_key_id = "access_key_id"
secret_access_key = "secret_access_key"
[storage.compaction]
max_inflight_tasks = 3
Expand Down Expand Up @@ -264,11 +272,15 @@ mod tests {
assert_eq!(3000, timeout_millis);
assert!(tcp_nodelay);
assert_eq!("/tmp/greptimedb/", options.storage.data_home);
assert_eq!(options.storage.store.len(), 2);
assert!(matches!(
&options.storage.store,
&options.storage.store[0],
ObjectStoreConfig::File(FileConfig { .. })
));

assert!(matches!(
&options.storage.store[1],
ObjectStoreConfig::S3(S3Config { .. })
));
assert_eq!(
CompactionConfig {
max_inflight_tasks: 3,
Expand Down
15 changes: 11 additions & 4 deletions src/cmd/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ mod tests {
use std::time::Duration;

use common_test_util::temp_dir::create_named_temp_file;
use datanode::config::{DatanodeOptions, ObjectStoreConfig};
use datanode::config::DatanodeOptions;

use super::*;

Expand Down Expand Up @@ -182,26 +182,31 @@ mod tests {
.join(ENV_VAR_SEP),
Some("99"),
),
// Can we pass storage.store.~ as env?
/***
(
// storage.type = S3
// storage.store.type = S3
[
env_prefix.to_string(),
"storage".to_uppercase(),
"store".to_uppercase(),
"type".to_uppercase(),
]
.join(ENV_VAR_SEP),
Some("S3"),
),
(
// storage.bucket = mybucket
// storage.store.bucket = mybucket
[
env_prefix.to_string(),
"storage".to_uppercase(),
"store".to_uppercase(),
"bucket".to_uppercase(),
]
.join(ENV_VAR_SEP),
Some("mybucket"),
),
***/
(
// storage.manifest.gc_duration = 42s
[
Expand Down Expand Up @@ -243,13 +248,15 @@ mod tests {
.unwrap();

// Check the configs from environment variables.
/***
assert_eq!(opts.storage.manifest.checkpoint_margin, Some(99));
match opts.storage.store {
match opts.storage.store[0].clone() {
ObjectStoreConfig::S3(s3_config) => {
assert_eq!(s3_config.bucket, "mybucket".to_string());
}
_ => panic!("unexpected store type"),
}
***/
assert_eq!(
opts.storage.manifest.gc_duration,
Some(Duration::from_secs(42))
Expand Down
14 changes: 11 additions & 3 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ mod tests {
use auth::{Identity, Password, UserProviderRef};
use common_base::readable_size::ReadableSize;
use common_test_util::temp_dir::create_named_temp_file;
use datanode::config::FileConfig;
use servers::Mode;

use super::*;
Expand Down Expand Up @@ -429,10 +430,14 @@ mod tests {
sync_write = false
[storage]
[[storage.store]]
type = "S3"
access_key_id = "access_key_id"
secret_access_key = "secret_access_key"
[[storage.store]]
type = "File"
[storage.compaction]
max_inflight_tasks = 3
max_files_in_level0 = 7
Expand Down Expand Up @@ -475,8 +480,8 @@ mod tests {
assert_eq!(2, fe_opts.mysql.runtime_size);
assert_eq!(None, fe_opts.mysql.reject_no_database);
assert!(fe_opts.influxdb.enable);

match &dn_opts.storage.store {
assert_eq!(dn_opts.storage.store.len(), 2);
match &dn_opts.storage.store[0] {
datanode::config::ObjectStoreConfig::S3(s3_config) => {
assert_eq!(
"Secret([REDACTED alloc::string::String])".to_string(),
Expand All @@ -487,7 +492,10 @@ mod tests {
unreachable!()
}
}

assert!(matches!(
dn_opts.storage.store[1],
datanode::config::ObjectStoreConfig::File(FileConfig { .. })
));
assert_eq!("debug", logging_opts.level.as_ref().unwrap());
assert_eq!("/tmp/greptimedb/test/logs".to_string(), logging_opts.dir);
}
Expand Down
1 change: 1 addition & 0 deletions src/common/datasource/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ strum.workspace = true
tokio-util.workspace = true
tokio.workspace = true
url = "2.3"
dotenv = "0.15"

[dev-dependencies]
common-test-util = { workspace = true }
79 changes: 79 additions & 0 deletions src/common/datasource/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
pub mod fs;
pub mod s3;
use std::collections::HashMap;
use std::env;

use lazy_static::lazy_static;
use object_store::ObjectStore;
Expand Down Expand Up @@ -84,6 +85,84 @@ pub fn handle_windows_path(url: &str) -> Option<String> {
.map(|captures| captures[0].to_string())
}

#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum StorageType {
S3,
S3WithCache,
File,
Oss,
Azblob,
Gcs,
}

impl std::fmt::Display for StorageType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
StorageType::S3 => write!(f, "S3"),
StorageType::S3WithCache => write!(f, "S3"),
StorageType::File => write!(f, "File"),
StorageType::Oss => write!(f, "Oss"),
StorageType::Azblob => write!(f, "Azblob"),
StorageType::Gcs => write!(f, "Gcs"),
}
}
}

impl StorageType {
pub fn build_storage_types_based_on_env() -> Vec<StorageType> {
let mut storage_types = Vec::with_capacity(4);
storage_types.push(StorageType::File);
if env::var("GT_S3_BUCKET").is_ok() {
storage_types.push(StorageType::S3);
}
if env::var("GT_OSS_BUCKET").is_ok() {
storage_types.push(StorageType::Oss);
}
if env::var("GT_AZBLOB_CONTAINER").is_ok() {
storage_types.push(StorageType::Azblob);
}
if env::var("GT_GCS_BUCKET").is_ok() {
storage_types.push(StorageType::Gcs);
}
storage_types
}
pub fn test_on(&self) -> bool {
let _ = dotenv::dotenv();

match self {
StorageType::File => true, // always test file
StorageType::S3 | StorageType::S3WithCache => {
if let Ok(b) = env::var("GT_S3_BUCKET") {
!b.is_empty()
} else {
false
}
}
StorageType::Oss => {
if let Ok(b) = env::var("GT_OSS_BUCKET") {
!b.is_empty()
} else {
false
}
}
StorageType::Azblob => {
if let Ok(b) = env::var("GT_AZBLOB_CONTAINER") {
!b.is_empty()
} else {
false
}
}
StorageType::Gcs => {
if let Ok(b) = env::var("GT_GCS_BUCKET") {
!b.is_empty()
} else {
false
}
}
}
}
}

#[cfg(test)]
mod tests {
use super::handle_windows_path;
Expand Down
11 changes: 6 additions & 5 deletions src/datanode/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,22 +66,23 @@ pub struct StorageConfig {
pub global_ttl: Option<Duration>,
/// The working directory of database
pub data_home: String,
#[serde(flatten)]
pub store: ObjectStoreConfig,
pub store: Vec<ObjectStoreConfig>,
pub compaction: CompactionConfig,
pub manifest: RegionManifestConfig,
pub flush: FlushConfig,
pub global_store: String,
}

impl Default for StorageConfig {
fn default() -> Self {
Self {
global_ttl: None,
data_home: DEFAULT_DATA_HOME.to_string(),
store: ObjectStoreConfig::default(),
store: vec![ObjectStoreConfig::default()],
compaction: CompactionConfig::default(),
manifest: RegionManifestConfig::default(),
flush: FlushConfig::default(),
global_store: "File".to_string(),
}
}
}
Expand Down Expand Up @@ -402,13 +403,13 @@ mod tests {
#[test]
fn test_secstr() {
let toml_str = r#"
[storage]
[[storage.store]]
type = "S3"
access_key_id = "access_key_id"
secret_access_key = "secret_access_key"
"#;
let opts: DatanodeOptions = toml::from_str(toml_str).unwrap();
match opts.storage.store {
match opts.storage.store[0].clone() {
ObjectStoreConfig::S3(cfg) => {
assert_eq!(
"Secret([REDACTED alloc::string::String])".to_string(),
Expand Down
Loading

0 comments on commit 8e6016c

Please sign in to comment.