Skip to content

Commit

Permalink
fix(storage): fix object store config (#13618)
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k authored Nov 27, 2023
1 parent 45d2df5 commit 4f924d6
Show file tree
Hide file tree
Showing 25 changed files with 254 additions and 303 deletions.
8 changes: 0 additions & 8 deletions src/cmd_all/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,6 @@ mod test {
do_not_config_object_storage_lifecycle: None,
backup_storage_url: None,
backup_storage_directory: None,
object_store_streaming_read_timeout_ms: None,
object_store_streaming_upload_timeout_ms: None,
object_store_upload_timeout_ms: None,
object_store_read_timeout_ms: None,
heap_profiling_dir: None,
},
),
Expand All @@ -278,10 +274,6 @@ mod test {
meta_file_cache_dir: None,
async_stack_trace: None,
heap_profiling_dir: None,
object_store_streaming_read_timeout_ms: None,
object_store_streaming_upload_timeout_ms: None,
object_store_upload_timeout_ms: None,
object_store_read_timeout_ms: None,
},
),
frontend_opts: Some(
Expand Down
144 changes: 80 additions & 64 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,30 +568,6 @@ pub struct StorageConfig {
#[serde(default = "default::storage::max_version_pinning_duration_sec")]
pub max_version_pinning_duration_sec: u64,

#[serde(default = "default::storage::object_store_streaming_read_timeout_ms")]
pub object_store_streaming_read_timeout_ms: u64,
#[serde(default = "default::storage::object_store_streaming_upload_timeout_ms")]
pub object_store_streaming_upload_timeout_ms: u64,
#[serde(default = "default::storage::object_store_upload_timeout_ms")]
pub object_store_upload_timeout_ms: u64,
#[serde(default = "default::storage::object_store_read_timeout_ms")]
pub object_store_read_timeout_ms: u64,

#[serde(default = "default::s3_objstore_config::object_store_keepalive_ms")]
pub object_store_keepalive_ms: Option<u64>,
#[serde(default = "default::s3_objstore_config::object_store_recv_buffer_size")]
pub object_store_recv_buffer_size: Option<usize>,
#[serde(default = "default::s3_objstore_config::object_store_send_buffer_size")]
pub object_store_send_buffer_size: Option<usize>,
#[serde(default = "default::s3_objstore_config::object_store_nodelay")]
pub object_store_nodelay: Option<bool>,
#[serde(default = "default::s3_objstore_config::object_store_req_retry_interval_ms")]
pub object_store_req_retry_interval_ms: u64,
#[serde(default = "default::s3_objstore_config::object_store_req_retry_max_delay_ms")]
pub object_store_req_retry_max_delay_ms: u64,
#[serde(default = "default::s3_objstore_config::object_store_req_retry_max_attempts")]
pub object_store_req_retry_max_attempts: usize,

#[serde(default = "default::storage::compactor_max_sst_key_count")]
pub compactor_max_sst_key_count: u64,
#[serde(default = "default::storage::compact_iter_recreate_timeout_ms")]
Expand All @@ -608,6 +584,9 @@ pub struct StorageConfig {
/// The spill threshold for mem table.
#[serde(default = "default::storage::mem_table_spill_threshold")]
pub mem_table_spill_threshold: usize,

#[serde(default)]
pub object_store: ObjectStoreConfig,
}

#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
Expand Down Expand Up @@ -869,6 +848,41 @@ pub struct SystemConfig {
pub pause_on_next_bootstrap: Option<bool>,
}

/// The subsections `[storage.object_store]`.
#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
pub struct ObjectStoreConfig {
#[serde(default = "default::object_store_config::object_store_streaming_read_timeout_ms")]
pub object_store_streaming_read_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_streaming_upload_timeout_ms")]
pub object_store_streaming_upload_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_upload_timeout_ms")]
pub object_store_upload_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_read_timeout_ms")]
pub object_store_read_timeout_ms: u64,

#[serde(default)]
pub s3: S3ObjectStoreConfig,
}

/// The subsections `[storage.object_store.s3]`.
#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
pub struct S3ObjectStoreConfig {
#[serde(default = "default::object_store_config::s3::object_store_keepalive_ms")]
pub object_store_keepalive_ms: Option<u64>,
#[serde(default = "default::object_store_config::s3::object_store_recv_buffer_size")]
pub object_store_recv_buffer_size: Option<usize>,
#[serde(default = "default::object_store_config::s3::object_store_send_buffer_size")]
pub object_store_send_buffer_size: Option<usize>,
#[serde(default = "default::object_store_config::s3::object_store_nodelay")]
pub object_store_nodelay: Option<bool>,
#[serde(default = "default::object_store_config::s3::object_store_req_retry_interval_ms")]
pub object_store_req_retry_interval_ms: u64,
#[serde(default = "default::object_store_config::s3::object_store_req_retry_max_delay_ms")]
pub object_store_req_retry_max_delay_ms: u64,
#[serde(default = "default::object_store_config::s3::object_store_req_retry_max_attempts")]
pub object_store_req_retry_max_attempts: usize,
}

impl SystemConfig {
#![allow(deprecated)]
pub fn into_init_system_params(self) -> SystemParams {
Expand Down Expand Up @@ -1097,22 +1111,6 @@ pub mod default {
3 * 3600
}

pub fn object_store_streaming_read_timeout_ms() -> u64 {
10 * 60 * 1000
}

pub fn object_store_streaming_upload_timeout_ms() -> u64 {
10 * 60 * 1000
}

pub fn object_store_upload_timeout_ms() -> u64 {
60 * 60 * 1000
}

pub fn object_store_read_timeout_ms() -> u64 {
60 * 60 * 1000
}

pub fn compactor_max_sst_key_count() -> u64 {
2 * 1024 * 1024 // 200w
}
Expand Down Expand Up @@ -1387,40 +1385,58 @@ pub mod default {
}
}

pub mod s3_objstore_config {
/// Retry config for compute node http timeout error.
const DEFAULT_RETRY_INTERVAL_MS: u64 = 20;
const DEFAULT_RETRY_MAX_DELAY_MS: u64 = 10 * 1000;
const DEFAULT_RETRY_MAX_ATTEMPTS: usize = 8;

const DEFAULT_KEEPALIVE_MS: u64 = 600 * 1000; // 10min

pub fn object_store_keepalive_ms() -> Option<u64> {
Some(DEFAULT_KEEPALIVE_MS) // 10min
pub mod object_store_config {
pub fn object_store_streaming_read_timeout_ms() -> u64 {
10 * 60 * 1000
}

pub fn object_store_recv_buffer_size() -> Option<usize> {
Some(1 << 21) // 2m
pub fn object_store_streaming_upload_timeout_ms() -> u64 {
10 * 60 * 1000
}

pub fn object_store_send_buffer_size() -> Option<usize> {
None
pub fn object_store_upload_timeout_ms() -> u64 {
60 * 60 * 1000
}

pub fn object_store_nodelay() -> Option<bool> {
Some(true)
pub fn object_store_read_timeout_ms() -> u64 {
60 * 60 * 1000
}

pub fn object_store_req_retry_interval_ms() -> u64 {
DEFAULT_RETRY_INTERVAL_MS
}
pub mod s3 {
/// Retry config for compute node http timeout error.
const DEFAULT_RETRY_INTERVAL_MS: u64 = 20;
const DEFAULT_RETRY_MAX_DELAY_MS: u64 = 10 * 1000;
const DEFAULT_RETRY_MAX_ATTEMPTS: usize = 8;

pub fn object_store_req_retry_max_delay_ms() -> u64 {
DEFAULT_RETRY_MAX_DELAY_MS // 10s
}
const DEFAULT_KEEPALIVE_MS: u64 = 600 * 1000; // 10min

pub fn object_store_keepalive_ms() -> Option<u64> {
Some(DEFAULT_KEEPALIVE_MS) // 10min
}

pub fn object_store_recv_buffer_size() -> Option<usize> {
Some(1 << 21) // 2m
}

pub fn object_store_req_retry_max_attempts() -> usize {
DEFAULT_RETRY_MAX_ATTEMPTS
pub fn object_store_send_buffer_size() -> Option<usize> {
None
}

pub fn object_store_nodelay() -> Option<bool> {
Some(true)
}

pub fn object_store_req_retry_interval_ms() -> u64 {
DEFAULT_RETRY_INTERVAL_MS
}

pub fn object_store_req_retry_max_delay_ms() -> u64 {
DEFAULT_RETRY_MAX_DELAY_MS // 10s
}

pub fn object_store_req_retry_max_attempts() -> usize {
DEFAULT_RETRY_MAX_ATTEMPTS
}
}
}
}
Expand Down
13 changes: 0 additions & 13 deletions src/compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,19 +133,6 @@ pub struct ComputeNodeOpts {
#[clap(long, env = "RW_HEAP_PROFILING_DIR")]
#[override_opts(path = server.heap_profiling.dir)]
pub heap_profiling_dir: Option<String>,

#[clap(long, env = "RW_OBJECT_STORE_STREAMING_READ_TIMEOUT_MS", value_enum)]
#[override_opts(path = storage.object_store_streaming_read_timeout_ms)]
pub object_store_streaming_read_timeout_ms: Option<u64>,
#[clap(long, env = "RW_OBJECT_STORE_STREAMING_UPLOAD_TIMEOUT_MS", value_enum)]
#[override_opts(path = storage.object_store_streaming_upload_timeout_ms)]
pub object_store_streaming_upload_timeout_ms: Option<u64>,
#[clap(long, env = "RW_OBJECT_STORE_UPLOAD_TIMEOUT_MS", value_enum)]
#[override_opts(path = storage.object_store_upload_timeout_ms)]
pub object_store_upload_timeout_ms: Option<u64>,
#[clap(long, env = "RW_OBJECT_STORE_READ_TIMEOUT_MS", value_enum)]
#[override_opts(path = storage.object_store_read_timeout_ms)]
pub object_store_read_timeout_ms: Option<u64>,
}

#[derive(Copy, Clone, Debug, Default, ValueEnum, Serialize, Deserialize)]
Expand Down
3 changes: 1 addition & 2 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,7 @@ pub async fn compute_node_serve(
info!("> version: {} ({})", RW_VERSION, GIT_SHA);

// Initialize all the configs
let stream_config: Arc<risingwave_common::config::StreamingConfig> =
Arc::new(config.streaming.clone());
let stream_config = Arc::new(config.streaming.clone());
let batch_config = Arc::new(config.batch.clone());

// Register to the cluster. We're not ready to serve until activate is called.
Expand Down
24 changes: 14 additions & 10 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,6 @@ max_sub_compaction = 4
max_concurrent_compaction_task_number = 16
max_preload_wait_time_mill = 0
max_version_pinning_duration_sec = 10800
object_store_streaming_read_timeout_ms = 600000
object_store_streaming_upload_timeout_ms = 600000
object_store_upload_timeout_ms = 3600000
object_store_read_timeout_ms = 3600000
object_store_keepalive_ms = 600000
object_store_recv_buffer_size = 2097152
object_store_nodelay = true
object_store_req_retry_interval_ms = 20
object_store_req_retry_max_delay_ms = 10000
object_store_req_retry_max_attempts = 8
compactor_max_sst_key_count = 2097152
compact_iter_recreate_timeout_ms = 600000
compactor_max_sst_size = 536870912
Expand Down Expand Up @@ -162,6 +152,20 @@ threshold = 0.5
recent_filter_layers = 6
recent_filter_rotate_interval_ms = 10000

[storage.object_store]
object_store_streaming_read_timeout_ms = 600000
object_store_streaming_upload_timeout_ms = 600000
object_store_upload_timeout_ms = 3600000
object_store_read_timeout_ms = 3600000

[storage.object_store.s3]
object_store_keepalive_ms = 600000
object_store_recv_buffer_size = 2097152
object_store_nodelay = true
object_store_req_retry_interval_ms = 20
object_store_req_retry_max_delay_ms = 10000
object_store_req_retry_max_attempts = 8

[system]
barrier_interval_ms = 1000
checkpoint_frequency = 1
Expand Down
25 changes: 13 additions & 12 deletions src/ctl/src/common/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ use std::sync::Arc;
use std::time::Duration;

use anyhow::{anyhow, bail, Result};
use risingwave_object_store::object::parse_remote_object_store;
use risingwave_common::config::ObjectStoreConfig;
use risingwave_object_store::object::build_remote_object_store;
use risingwave_rpc_client::MetaClient;
use risingwave_storage::hummock::hummock_meta_client::MonitoredHummockMetaClient;
use risingwave_storage::hummock::{FileCache, HummockStorage, SstableStore};
Expand Down Expand Up @@ -66,16 +67,15 @@ impl HummockServiceOpts {
}
Err(_) => {
const MESSAGE: &str = "env variable `RW_HUMMOCK_URL` not found.
For `./risedev d` use cases, please do the following.
* start the cluster with shared storage:
- consider adding `use: minio` in the risedev config,
- or directly use `./risedev d for-ctl` to start the cluster.
* use `./risedev ctl` to use risectl.
For `./risedev apply-compose-deploy` users,
* `RW_HUMMOCK_URL` will be printed out when deploying. Please copy the bash exports to your console.
";
For `./risedev d` use cases, please do the following.
* start the cluster with shared storage:
- consider adding `use: minio` in the risedev config,
- or directly use `./risedev d for-ctl` to start the cluster.
* use `./risedev ctl` to use risectl.
For `./risedev apply-compose-deploy` users,
* `RW_HUMMOCK_URL` will be printed out when deploying. Please copy the bash exports to your console.
";
bail!(MESSAGE);
}
};
Expand Down Expand Up @@ -152,10 +152,11 @@ For `./risedev apply-compose-deploy` users,
}

pub async fn create_sstable_store(&self) -> Result<Arc<SstableStore>> {
let object_store = parse_remote_object_store(
let object_store = build_remote_object_store(
self.hummock_url.strip_prefix("hummock+").unwrap(),
Arc::new(ObjectStoreMetrics::unused()),
"Hummock",
ObjectStoreConfig::default(),
)
.await;

Expand Down
6 changes: 4 additions & 2 deletions src/jni_core/src/hummock_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ use std::sync::Arc;
use bytes::Bytes;
use futures::TryStreamExt;
use risingwave_common::catalog::ColumnDesc;
use risingwave_common::config::ObjectStoreConfig;
use risingwave_common::hash::VirtualNode;
use risingwave_common::row::OwnedRow;
use risingwave_common::util::select_all;
use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde, ValueRowDeserializer};
use risingwave_hummock_sdk::key::{map_table_key_range, prefixed_range, TableKeyRange};
use risingwave_object_store::object::build_remote_object_store;
use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
use risingwave_object_store::object::parse_remote_object_store;
use risingwave_pb::java_binding::key_range::Bound;
use risingwave_pb::java_binding::{KeyRange, ReadPlan};
use risingwave_storage::error::{StorageError, StorageResult};
Expand Down Expand Up @@ -54,10 +55,11 @@ impl HummockJavaBindingIterator {
pub async fn new(read_plan: ReadPlan) -> StorageResult<Self> {
// Note(bugen): should we forward the implementation to the `StorageTable`?
let object_store = Arc::new(
parse_remote_object_store(
build_remote_object_store(
&read_plan.object_store_url,
Arc::new(ObjectStoreMetrics::unused()),
"Hummock",
ObjectStoreConfig::default(),
)
.await,
);
Expand Down
13 changes: 0 additions & 13 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,19 +158,6 @@ pub struct MetaNodeOpts {
#[override_opts(path = system.backup_storage_directory)]
backup_storage_directory: Option<String>,

#[clap(long, env = "RW_OBJECT_STORE_STREAMING_READ_TIMEOUT_MS", value_enum)]
#[override_opts(path = storage.object_store_streaming_read_timeout_ms)]
pub object_store_streaming_read_timeout_ms: Option<u64>,
#[clap(long, env = "RW_OBJECT_STORE_STREAMING_UPLOAD_TIMEOUT_MS", value_enum)]
#[override_opts(path = storage.object_store_streaming_upload_timeout_ms)]
pub object_store_streaming_upload_timeout_ms: Option<u64>,
#[clap(long, env = "RW_OBJECT_STORE_UPLOAD_TIMEOUT_MS", value_enum)]
#[override_opts(path = storage.object_store_upload_timeout_ms)]
pub object_store_upload_timeout_ms: Option<u64>,
#[clap(long, env = "RW_OBJECT_STORE_READ_TIMEOUT_MS", value_enum)]
#[override_opts(path = storage.object_store_read_timeout_ms)]
pub object_store_read_timeout_ms: Option<u64>,

/// Enable heap profile dump when memory usage is high.
#[clap(long, env = "RW_HEAP_PROFILING_DIR")]
#[override_opts(path = server.heap_profiling.dir)]
Expand Down
Loading

0 comments on commit 4f924d6

Please sign in to comment.