Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: support table ddl for custom storage #2733

4 changes: 4 additions & 0 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ sync_write = false
[storage]
# The working home directory.
data_home = "/tmp/greptimedb/"
# Storage type.
type = "File"
# TTL for all tables. Disabled by default.
# global_ttl = "7d"
Expand All @@ -53,6 +54,9 @@ type = "File"
# The local file cache capacity in bytes.
# cache_capacity = "256MB"

[storage.default_store]
type = "File"

# Mito engine options
[[region_engine]]
[region_engine.mito]
Expand Down
12 changes: 12 additions & 0 deletions src/client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,15 @@ impl From<Status> for Error {
Self::Server { code, msg }
}
}

impl Error {
pub fn should_retry(&self) -> bool {
!matches!(
self,
Self::RegionServer {
code: Code::InvalidArgument,
..
}
)
}
}
3 changes: 1 addition & 2 deletions src/client/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use prost::Message;
use snafu::{location, Location, OptionExt, ResultExt};
use tokio_stream::StreamExt;

use crate::error::Error::RegionServer;
use crate::error::{
self, ConvertFlightDataSnafu, IllegalDatabaseResponseSnafu, IllegalFlightMessagesSnafu,
MissingFieldSnafu, Result, ServerSnafu,
Expand All @@ -45,7 +44,7 @@ pub struct RegionRequester {
impl Datanode for RegionRequester {
async fn handle(&self, request: RegionRequest) -> MetaResult<AffectedRows> {
self.handle_inner(request).await.map_err(|err| {
if matches!(err, RegionServer { .. }) {
if err.should_retry() {
meta_error::Error::RetryLater {
source: BoxedError::new(err),
}
Expand Down
22 changes: 20 additions & 2 deletions src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ mod tests {
use std::time::Duration;

use common_test_util::temp_dir::create_named_temp_file;
use datanode::config::{FileConfig, ObjectStoreConfig};
use datanode::config::{FileConfig, GcsConfig, ObjectStoreConfig, S3Config};
use servers::heartbeat_options::HeartbeatOptions;
use servers::Mode;

Expand Down Expand Up @@ -251,8 +251,17 @@ mod tests {
sync_write = false

[storage]
type = "File"
data_home = "/tmp/greptimedb/"
type = "File"

[[storage.providers]]
type = "Gcs"
bucket = "foo"
endpoint = "bar"

[[storage.providers]]
type = "S3"
bucket = "foo"

[logging]
level = "debug"
Expand Down Expand Up @@ -305,6 +314,15 @@ mod tests {
&options.storage.store,
ObjectStoreConfig::File(FileConfig { .. })
));
assert_eq!(options.storage.providers.len(), 2);
assert!(matches!(
options.storage.providers[0],
ObjectStoreConfig::Gcs(GcsConfig { .. })
));
assert!(matches!(
options.storage.providers[1],
ObjectStoreConfig::S3(S3Config { .. })
));

assert_eq!("debug", options.logging.level.unwrap());
assert_eq!("/tmp/greptimedb/test/logs".to_string(), options.logging.dir);
Expand Down
2 changes: 1 addition & 1 deletion src/cmd/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ mod tests {
.unwrap();

// Check the configs from environment variables.
match opts.storage.store {
match &opts.storage.store {
ObjectStoreConfig::S3(s3_config) => {
assert_eq!(s3_config.bucket, "mybucket".to_string());
}
Expand Down
22 changes: 20 additions & 2 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ mod tests {
use auth::{Identity, Password, UserProviderRef};
use common_base::readable_size::ReadableSize;
use common_test_util::temp_dir::create_named_temp_file;
use datanode::config::{FileConfig, GcsConfig};
use servers::Mode;

use super::*;
Expand Down Expand Up @@ -459,8 +460,16 @@ mod tests {
purge_interval = "10m"
read_batch_size = 128
sync_write = false

[storage]
data_home = "/tmp/greptimedb/"
type = "File"

[[storage.providers]]
type = "Gcs"
bucket = "foo"
endpoint = "bar"

[[storage.providers]]
type = "S3"
access_key_id = "access_key_id"
secret_access_key = "secret_access_key"
Expand Down Expand Up @@ -510,7 +519,16 @@ mod tests {

assert_eq!("/tmp/greptimedb/test/wal", dn_opts.wal.dir.unwrap());

match &dn_opts.storage.store {
assert!(matches!(
&dn_opts.storage.store,
datanode::config::ObjectStoreConfig::File(FileConfig { .. })
));
assert_eq!(dn_opts.storage.providers.len(), 2);
assert!(matches!(
dn_opts.storage.providers[0],
datanode::config::ObjectStoreConfig::Gcs(GcsConfig { .. })
));
match &dn_opts.storage.providers[1] {
datanode::config::ObjectStoreConfig::S3(s3_config) => {
assert_eq!(
"Secret([REDACTED alloc::string::String])".to_string(),
Expand Down
16 changes: 15 additions & 1 deletion src/datanode/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,18 @@ pub enum ObjectStoreConfig {
Gcs(GcsConfig),
}

impl ObjectStoreConfig {
pub fn name(&self) -> &'static str {
match self {
Self::File(_) => "File",
Self::S3(_) => "S3",
Self::Oss(_) => "Oss",
Self::Azblob(_) => "Azblob",
Self::Gcs(_) => "Gcs",
}
}
}

/// Storage engine config
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
Expand All @@ -63,6 +75,7 @@ pub struct StorageConfig {
pub data_home: String,
#[serde(flatten)]
pub store: ObjectStoreConfig,
pub providers: Vec<ObjectStoreConfig>,
}

impl Default for StorageConfig {
Expand All @@ -71,6 +84,7 @@ impl Default for StorageConfig {
global_ttl: None,
data_home: DEFAULT_DATA_HOME.to_string(),
store: ObjectStoreConfig::default(),
providers: vec![],
}
}
}
Expand Down Expand Up @@ -295,7 +309,7 @@ mod tests {
secret_access_key = "secret_access_key"
"#;
let opts: DatanodeOptions = toml::from_str(toml_str).unwrap();
match opts.storage.store {
match &opts.storage.store {
ObjectStoreConfig::S3(cfg) => {
assert_eq!(
"Secret([REDACTED alloc::string::String])".to_string(),
Expand Down
26 changes: 17 additions & 9 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,6 @@ impl DatanodeBuilder {
));
}
}

info!("going to open {} regions", regions.len());
let semaphore = Arc::new(tokio::sync::Semaphore::new(OPEN_REGION_PARALLELISM));
let mut tasks = vec![];
Expand Down Expand Up @@ -417,21 +416,15 @@ impl DatanodeBuilder {
);

let table_provider_factory = Arc::new(DummyTableProviderFactory);

let mut region_server = RegionServer::with_table_provider(
query_engine,
runtime,
event_listener,
table_provider_factory,
);

let object_store = store::new_object_store(opts).await?;
let object_store_manager = ObjectStoreManager::new(
"default", // TODO: use a name which is set in the configuration when #919 is done.
object_store,
);
let engines =
Self::build_store_engines(opts, log_store, Arc::new(object_store_manager)).await?;
let object_store_manager = Self::build_object_store_manager(opts).await?;
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
let engines = Self::build_store_engines(opts, log_store, object_store_manager).await?;
for engine in engines {
region_server.register_engine(engine);
}
Expand Down Expand Up @@ -496,6 +489,21 @@ impl DatanodeBuilder {
}
Ok(engines)
}

/// Builds [ObjectStoreManager]
async fn build_object_store_manager(opts: &DatanodeOptions) -> Result<ObjectStoreManagerRef> {
let object_store =
store::new_object_store(opts.storage.store.clone(), &opts.storage.data_home).await?;
let default_name = opts.storage.store.name();
let mut object_store_manager = ObjectStoreManager::new(default_name, object_store);
for store in &opts.storage.providers {
object_store_manager.add(
store.name(),
store::new_object_store(store.clone(), &opts.storage.data_home).await?,
);
}
Ok(Arc::new(object_store_manager))
}
}

#[cfg(test)]
Expand Down
16 changes: 9 additions & 7 deletions src/datanode/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,15 @@ use object_store::util::normalize_dir;
use object_store::{util, HttpClient, ObjectStore, ObjectStoreBuilder};
use snafu::prelude::*;

use crate::config::{DatanodeOptions, ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
use crate::config::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
use crate::error::{self, Result};

pub(crate) async fn new_object_store(opts: &DatanodeOptions) -> Result<ObjectStore> {
let data_home = normalize_dir(&opts.storage.data_home);
let object_store = match &opts.storage.store {
pub(crate) async fn new_object_store(
store: ObjectStoreConfig,
data_home: &str,
) -> Result<ObjectStore> {
let data_home = normalize_dir(data_home);
let object_store = match &store {
ObjectStoreConfig::File(file_config) => {
fs::new_fs_object_store(&data_home, file_config).await
}
Expand All @@ -50,9 +53,8 @@ pub(crate) async fn new_object_store(opts: &DatanodeOptions) -> Result<ObjectSto
}?;

// Enable retry layer and cache layer for non-fs object storages
let object_store = if !matches!(opts.storage.store, ObjectStoreConfig::File(..)) {
let object_store =
create_object_store_with_cache(object_store, &opts.storage.store).await?;
let object_store = if !matches!(store, ObjectStoreConfig::File(..)) {
let object_store = create_object_store_with_cache(object_store, &store).await?;
object_store.layer(RetryLayer::new().with_jitter())
} else {
object_store
Expand Down
2 changes: 2 additions & 0 deletions src/sql/src/parsers/create_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ impl<'a> ParserContext<'a> {
}
);
}
// Sorts options so that `test_display_create_table` can always pass.
let options = options.into_iter().sorted().collect();
let create_table = CreateTable {
if_not_exists,
name: table_name,
Expand Down
3 changes: 2 additions & 1 deletion src/sql/src/statements/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ mod tests {
PARTITION r2 VALUES LESS THAN (MAXVALUE),
)
engine=mito
with(regions=1, ttl='7d');
with(regions=1, ttl='7d', storage='File');
";
let result = ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}).unwrap();
assert_eq!(1, result.len());
Expand All @@ -267,6 +267,7 @@ PARTITION BY RANGE COLUMNS (ts) (
ENGINE=mito
WITH(
regions = 1,
storage = 'File',
ttl = '7d'
)"#,
&new_sql
Expand Down
3 changes: 3 additions & 0 deletions src/table/src/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ pub struct TableOptions {
pub const WRITE_BUFFER_SIZE_KEY: &str = "write_buffer_size";
pub const TTL_KEY: &str = "ttl";
pub const REGIONS_KEY: &str = "regions";
pub const STORAGE_KEY: &str = "storage";

impl TryFrom<&HashMap<String, String>> for TableOptions {
type Error = error::Error;
Expand Down Expand Up @@ -340,6 +341,7 @@ pub fn valid_table_option(key: &str) -> bool {
| WRITE_BUFFER_SIZE_KEY
| TTL_KEY
| REGIONS_KEY
| STORAGE_KEY
) | is_supported_in_s3(key)
}

Expand All @@ -365,6 +367,7 @@ mod tests {
assert!(valid_table_option(TTL_KEY));
assert!(valid_table_option(REGIONS_KEY));
assert!(valid_table_option(WRITE_BUFFER_SIZE_KEY));
assert!(valid_table_option(STORAGE_KEY));
assert!(!valid_table_option("foo"));
}

Expand Down
18 changes: 15 additions & 3 deletions tests-integration/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub struct GreptimeDbClusterBuilder {
cluster_name: String,
kv_backend: KvBackendRef,
store_config: Option<ObjectStoreConfig>,
store_providers: Option<Vec<StorageType>>,
datanodes: Option<u32>,
}

Expand All @@ -87,6 +88,7 @@ impl GreptimeDbClusterBuilder {
cluster_name: cluster_name.to_string(),
kv_backend,
store_config: None,
store_providers: None,
datanodes: None,
}
}
Expand All @@ -96,6 +98,11 @@ impl GreptimeDbClusterBuilder {
self
}

pub fn with_store_providers(mut self, store_providers: Vec<StorageType>) -> Self {
self.store_providers = Some(store_providers);
self
}

pub fn with_datanodes(mut self, datanodes: u32) -> Self {
self.datanodes = Some(datanodes);
self
Expand Down Expand Up @@ -171,14 +178,15 @@ impl GreptimeDbClusterBuilder {

dir_guards.push(FileDirGuard::new(home_tmp_dir));

create_datanode_opts(store_config.clone(), home_dir)
create_datanode_opts(store_config.clone(), vec![], home_dir)
} else {
let (opts, guard) = create_tmp_dir_and_datanode_opts(
StorageType::File,
self.store_providers.clone().unwrap_or_default(),
&format!("{}-dn-{}", self.cluster_name, datanode_id),
);

storage_guards.push(guard.storage_guard);
storage_guards.push(guard.storage_guards);
dir_guards.push(guard.home_guard);

opts
Expand All @@ -190,7 +198,11 @@ impl GreptimeDbClusterBuilder {

instances.insert(datanode_id, datanode);
}
(instances, storage_guards, dir_guards)
(
instances,
storage_guards.into_iter().flatten().collect(),
dir_guards,
)
}

async fn wait_datanodes_alive(
Expand Down
Loading
Loading