Skip to content

Commit

Permalink
feat!: support table ddl for custom storage (#2733)
Browse files Browse the repository at this point in the history
* feat: support table ddl for custom_storage

* refactor: rename extract_variant_name to name

* chore: add blank

* chore: keep compatible

* feat: rename custom_stores to providers

* chore: rename

* chore: config

* refactor: add should_retry in client Error

* fix: test fail

* chore: remove unused options

* chore: remove unused import

* chore: remove the blanks.

* chore: revert

---------

Co-authored-by: dennis zhuang <[email protected]>
  • Loading branch information
NiwakaDev and killme2008 authored Dec 6, 2023
1 parent 2cca267 commit cfe3a2c
Show file tree
Hide file tree
Showing 20 changed files with 391 additions and 50 deletions.
4 changes: 4 additions & 0 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ sync_write = false
[storage]
# The working home directory.
data_home = "/tmp/greptimedb/"
# Storage type.
type = "File"
# TTL for all tables. Disabled by default.
# global_ttl = "7d"
Expand All @@ -53,6 +54,9 @@ type = "File"
# The local file cache capacity in bytes.
# cache_capacity = "256MB"

[storage.default_store]
type = "File"

# Mito engine options
[[region_engine]]
[region_engine.mito]
Expand Down
12 changes: 12 additions & 0 deletions src/client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,15 @@ impl From<Status> for Error {
Self::Server { code, msg }
}
}

impl Error {
pub fn should_retry(&self) -> bool {
!matches!(
self,
Self::RegionServer {
code: Code::InvalidArgument,
..
}
)
}
}
3 changes: 1 addition & 2 deletions src/client/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use prost::Message;
use snafu::{location, Location, OptionExt, ResultExt};
use tokio_stream::StreamExt;

use crate::error::Error::RegionServer;
use crate::error::{
self, ConvertFlightDataSnafu, IllegalDatabaseResponseSnafu, IllegalFlightMessagesSnafu,
MissingFieldSnafu, Result, ServerSnafu,
Expand All @@ -45,7 +44,7 @@ pub struct RegionRequester {
impl Datanode for RegionRequester {
async fn handle(&self, request: RegionRequest) -> MetaResult<AffectedRows> {
self.handle_inner(request).await.map_err(|err| {
if matches!(err, RegionServer { .. }) {
if err.should_retry() {
meta_error::Error::RetryLater {
source: BoxedError::new(err),
}
Expand Down
22 changes: 20 additions & 2 deletions src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ mod tests {
use std::time::Duration;

use common_test_util::temp_dir::create_named_temp_file;
use datanode::config::{FileConfig, ObjectStoreConfig};
use datanode::config::{FileConfig, GcsConfig, ObjectStoreConfig, S3Config};
use servers::heartbeat_options::HeartbeatOptions;
use servers::Mode;

Expand Down Expand Up @@ -251,8 +251,17 @@ mod tests {
sync_write = false
[storage]
type = "File"
data_home = "/tmp/greptimedb/"
type = "File"
[[storage.providers]]
type = "Gcs"
bucket = "foo"
endpoint = "bar"
[[storage.providers]]
type = "S3"
bucket = "foo"
[logging]
level = "debug"
Expand Down Expand Up @@ -305,6 +314,15 @@ mod tests {
&options.storage.store,
ObjectStoreConfig::File(FileConfig { .. })
));
assert_eq!(options.storage.providers.len(), 2);
assert!(matches!(
options.storage.providers[0],
ObjectStoreConfig::Gcs(GcsConfig { .. })
));
assert!(matches!(
options.storage.providers[1],
ObjectStoreConfig::S3(S3Config { .. })
));

assert_eq!("debug", options.logging.level.unwrap());
assert_eq!("/tmp/greptimedb/test/logs".to_string(), options.logging.dir);
Expand Down
2 changes: 1 addition & 1 deletion src/cmd/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ mod tests {
.unwrap();

// Check the configs from environment variables.
match opts.storage.store {
match &opts.storage.store {
ObjectStoreConfig::S3(s3_config) => {
assert_eq!(s3_config.bucket, "mybucket".to_string());
}
Expand Down
22 changes: 20 additions & 2 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ mod tests {
use auth::{Identity, Password, UserProviderRef};
use common_base::readable_size::ReadableSize;
use common_test_util::temp_dir::create_named_temp_file;
use datanode::config::{FileConfig, GcsConfig};
use servers::Mode;

use super::*;
Expand Down Expand Up @@ -473,8 +474,16 @@ mod tests {
purge_interval = "10m"
read_batch_size = 128
sync_write = false
[storage]
data_home = "/tmp/greptimedb/"
type = "File"
[[storage.providers]]
type = "Gcs"
bucket = "foo"
endpoint = "bar"
[[storage.providers]]
type = "S3"
access_key_id = "access_key_id"
secret_access_key = "secret_access_key"
Expand Down Expand Up @@ -524,7 +533,16 @@ mod tests {

assert_eq!("/tmp/greptimedb/test/wal", dn_opts.wal.dir.unwrap());

match &dn_opts.storage.store {
assert!(matches!(
&dn_opts.storage.store,
datanode::config::ObjectStoreConfig::File(FileConfig { .. })
));
assert_eq!(dn_opts.storage.providers.len(), 2);
assert!(matches!(
dn_opts.storage.providers[0],
datanode::config::ObjectStoreConfig::Gcs(GcsConfig { .. })
));
match &dn_opts.storage.providers[1] {
datanode::config::ObjectStoreConfig::S3(s3_config) => {
assert_eq!(
"Secret([REDACTED alloc::string::String])".to_string(),
Expand Down
16 changes: 15 additions & 1 deletion src/datanode/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,18 @@ pub enum ObjectStoreConfig {
Gcs(GcsConfig),
}

impl ObjectStoreConfig {
pub fn name(&self) -> &'static str {
match self {
Self::File(_) => "File",
Self::S3(_) => "S3",
Self::Oss(_) => "Oss",
Self::Azblob(_) => "Azblob",
Self::Gcs(_) => "Gcs",
}
}
}

/// Storage engine config
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
Expand All @@ -63,6 +75,7 @@ pub struct StorageConfig {
pub data_home: String,
#[serde(flatten)]
pub store: ObjectStoreConfig,
pub providers: Vec<ObjectStoreConfig>,
}

impl Default for StorageConfig {
Expand All @@ -71,6 +84,7 @@ impl Default for StorageConfig {
global_ttl: None,
data_home: DEFAULT_DATA_HOME.to_string(),
store: ObjectStoreConfig::default(),
providers: vec![],
}
}
}
Expand Down Expand Up @@ -295,7 +309,7 @@ mod tests {
secret_access_key = "secret_access_key"
"#;
let opts: DatanodeOptions = toml::from_str(toml_str).unwrap();
match opts.storage.store {
match &opts.storage.store {
ObjectStoreConfig::S3(cfg) => {
assert_eq!(
"Secret([REDACTED alloc::string::String])".to_string(),
Expand Down
26 changes: 17 additions & 9 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,6 @@ impl DatanodeBuilder {
));
}
}

info!("going to open {} regions", regions.len());
let semaphore = Arc::new(tokio::sync::Semaphore::new(OPEN_REGION_PARALLELISM));
let mut tasks = vec![];
Expand Down Expand Up @@ -417,21 +416,15 @@ impl DatanodeBuilder {
);

let table_provider_factory = Arc::new(DummyTableProviderFactory);

let mut region_server = RegionServer::with_table_provider(
query_engine,
runtime,
event_listener,
table_provider_factory,
);

let object_store = store::new_object_store(opts).await?;
let object_store_manager = ObjectStoreManager::new(
"default", // TODO: use a name which is set in the configuration when #919 is done.
object_store,
);
let engines =
Self::build_store_engines(opts, log_store, Arc::new(object_store_manager)).await?;
let object_store_manager = Self::build_object_store_manager(opts).await?;
let engines = Self::build_store_engines(opts, log_store, object_store_manager).await?;
for engine in engines {
region_server.register_engine(engine);
}
Expand Down Expand Up @@ -496,6 +489,21 @@ impl DatanodeBuilder {
}
Ok(engines)
}

/// Builds [ObjectStoreManager]
async fn build_object_store_manager(opts: &DatanodeOptions) -> Result<ObjectStoreManagerRef> {
let object_store =
store::new_object_store(opts.storage.store.clone(), &opts.storage.data_home).await?;
let default_name = opts.storage.store.name();
let mut object_store_manager = ObjectStoreManager::new(default_name, object_store);
for store in &opts.storage.providers {
object_store_manager.add(
store.name(),
store::new_object_store(store.clone(), &opts.storage.data_home).await?,
);
}
Ok(Arc::new(object_store_manager))
}
}

#[cfg(test)]
Expand Down
16 changes: 9 additions & 7 deletions src/datanode/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,15 @@ use object_store::util::normalize_dir;
use object_store::{util, HttpClient, ObjectStore, ObjectStoreBuilder};
use snafu::prelude::*;

use crate::config::{DatanodeOptions, ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
use crate::config::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
use crate::error::{self, Result};

pub(crate) async fn new_object_store(opts: &DatanodeOptions) -> Result<ObjectStore> {
let data_home = normalize_dir(&opts.storage.data_home);
let object_store = match &opts.storage.store {
pub(crate) async fn new_object_store(
store: ObjectStoreConfig,
data_home: &str,
) -> Result<ObjectStore> {
let data_home = normalize_dir(data_home);
let object_store = match &store {
ObjectStoreConfig::File(file_config) => {
fs::new_fs_object_store(&data_home, file_config).await
}
Expand All @@ -50,9 +53,8 @@ pub(crate) async fn new_object_store(opts: &DatanodeOptions) -> Result<ObjectSto
}?;

// Enable retry layer and cache layer for non-fs object storages
let object_store = if !matches!(opts.storage.store, ObjectStoreConfig::File(..)) {
let object_store =
create_object_store_with_cache(object_store, &opts.storage.store).await?;
let object_store = if !matches!(store, ObjectStoreConfig::File(..)) {
let object_store = create_object_store_with_cache(object_store, &store).await?;
object_store.layer(RetryLayer::new().with_jitter())
} else {
object_store
Expand Down
2 changes: 2 additions & 0 deletions src/sql/src/parsers/create_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ impl<'a> ParserContext<'a> {
}
);
}
// Sorts options so that `test_display_create_table` can always pass.
let options = options.into_iter().sorted().collect();
let create_table = CreateTable {
if_not_exists,
name: table_name,
Expand Down
3 changes: 2 additions & 1 deletion src/sql/src/statements/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ mod tests {
PARTITION r2 VALUES LESS THAN (MAXVALUE),
)
engine=mito
with(regions=1, ttl='7d');
with(regions=1, ttl='7d', storage='File');
";
let result = ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}).unwrap();
assert_eq!(1, result.len());
Expand All @@ -267,6 +267,7 @@ PARTITION BY RANGE COLUMNS (ts) (
ENGINE=mito
WITH(
regions = 1,
storage = 'File',
ttl = '7d'
)"#,
&new_sql
Expand Down
3 changes: 3 additions & 0 deletions src/table/src/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ pub struct TableOptions {
pub const WRITE_BUFFER_SIZE_KEY: &str = "write_buffer_size";
pub const TTL_KEY: &str = "ttl";
pub const REGIONS_KEY: &str = "regions";
pub const STORAGE_KEY: &str = "storage";

impl TryFrom<&HashMap<String, String>> for TableOptions {
type Error = error::Error;
Expand Down Expand Up @@ -340,6 +341,7 @@ pub fn valid_table_option(key: &str) -> bool {
| WRITE_BUFFER_SIZE_KEY
| TTL_KEY
| REGIONS_KEY
| STORAGE_KEY
) | is_supported_in_s3(key)
}

Expand All @@ -365,6 +367,7 @@ mod tests {
assert!(valid_table_option(TTL_KEY));
assert!(valid_table_option(REGIONS_KEY));
assert!(valid_table_option(WRITE_BUFFER_SIZE_KEY));
assert!(valid_table_option(STORAGE_KEY));
assert!(!valid_table_option("foo"));
}

Expand Down
18 changes: 15 additions & 3 deletions tests-integration/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub struct GreptimeDbClusterBuilder {
cluster_name: String,
kv_backend: KvBackendRef,
store_config: Option<ObjectStoreConfig>,
store_providers: Option<Vec<StorageType>>,
datanodes: Option<u32>,
}

Expand All @@ -92,6 +93,7 @@ impl GreptimeDbClusterBuilder {
cluster_name: cluster_name.to_string(),
kv_backend,
store_config: None,
store_providers: None,
datanodes: None,
}
}
Expand All @@ -101,6 +103,11 @@ impl GreptimeDbClusterBuilder {
self
}

pub fn with_store_providers(mut self, store_providers: Vec<StorageType>) -> Self {
self.store_providers = Some(store_providers);
self
}

pub fn with_datanodes(mut self, datanodes: u32) -> Self {
self.datanodes = Some(datanodes);
self
Expand Down Expand Up @@ -176,14 +183,15 @@ impl GreptimeDbClusterBuilder {

dir_guards.push(FileDirGuard::new(home_tmp_dir));

create_datanode_opts(store_config.clone(), home_dir)
create_datanode_opts(store_config.clone(), vec![], home_dir)
} else {
let (opts, guard) = create_tmp_dir_and_datanode_opts(
StorageType::File,
self.store_providers.clone().unwrap_or_default(),
&format!("{}-dn-{}", self.cluster_name, datanode_id),
);

storage_guards.push(guard.storage_guard);
storage_guards.push(guard.storage_guards);
dir_guards.push(guard.home_guard);

opts
Expand All @@ -195,7 +203,11 @@ impl GreptimeDbClusterBuilder {

instances.insert(datanode_id, datanode);
}
(instances, storage_guards, dir_guards)
(
instances,
storage_guards.into_iter().flatten().collect(),
dir_guards,
)
}

async fn wait_datanodes_alive(
Expand Down
Loading

0 comments on commit cfe3a2c

Please sign in to comment.