Skip to content

Commit

Permalink
refactor(object store): only the object store used by meta needs to s…
Browse files Browse the repository at this point in the history
…et `atomic_write_dir` (#15592)
  • Loading branch information
wcy-fdu authored Mar 14, 2024
1 parent c002073 commit ef2b4fe
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 11 deletions.
12 changes: 12 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -940,11 +940,19 @@ pub struct ObjectStoreConfig {
pub object_store_upload_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_read_timeout_ms")]
pub object_store_read_timeout_ms: u64,
#[serde(default = "default::object_store_config::object_store_set_atomic_write_dir")]
pub object_store_set_atomic_write_dir: bool,

#[serde(default)]
pub s3: S3ObjectStoreConfig,
}

impl ObjectStoreConfig {
pub fn set_atomic_write_dir(&mut self) {
self.object_store_set_atomic_write_dir = true;
}
}

/// The subsections `[storage.object_store.s3]`.
#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
pub struct S3ObjectStoreConfig {
Expand Down Expand Up @@ -1593,6 +1601,10 @@ pub mod default {
8 * 60 * 1000
}

pub fn object_store_set_atomic_write_dir() -> bool {
false
}

pub mod s3 {
/// Retry config for compute node http timeout error.
const DEFAULT_RETRY_INTERVAL_MS: u64 = 20;
Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ object_store_streaming_read_timeout_ms = 480000
object_store_streaming_upload_timeout_ms = 480000
object_store_upload_timeout_ms = 480000
object_store_read_timeout_ms = 480000
object_store_set_atomic_write_dir = false

[storage.object_store.s3]
object_store_keepalive_ms = 600000
Expand Down
6 changes: 5 additions & 1 deletion src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,12 +345,16 @@ impl HummockManager {
let state_store_url = sys_params.state_store();
let state_store_dir: &str = sys_params.data_directory();
let deterministic_mode = env.opts.compaction_deterministic_test;
let mut object_store_config = ObjectStoreConfig::default();
// For fs and hdfs object store, operations are not always atomic.
// We should manually enable atomicity guarantee by setting the atomic_write_dir config when building services.
object_store_config.set_atomic_write_dir();
let object_store = Arc::new(
build_remote_object_store(
state_store_url.strip_prefix("hummock+").unwrap_or("memory"),
metrics.object_store_metric.clone(),
"Version Checkpoint",
ObjectStoreConfig::default(),
object_store_config,
)
.await,
);
Expand Down
12 changes: 8 additions & 4 deletions src/object_store/src/object/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -817,9 +817,13 @@ pub async fn build_remote_object_store(
let hdfs = hdfs.strip_prefix("hdfs://").unwrap();
let (namenode, root) = hdfs.split_once('@').unwrap_or((hdfs, ""));
ObjectStoreImpl::Opendal(
OpendalObjectStore::new_hdfs_engine(namenode.to_string(), root.to_string())
.unwrap()
.monitored(metrics, config),
OpendalObjectStore::new_hdfs_engine(
namenode.to_string(),
root.to_string(),
config.clone(),
)
.unwrap()
.monitored(metrics, config),
)
}
gcs if gcs.starts_with("gcs://") => {
Expand Down Expand Up @@ -871,7 +875,7 @@ pub async fn build_remote_object_store(
fs if fs.starts_with("fs://") => {
let fs = fs.strip_prefix("fs://").unwrap();
ObjectStoreImpl::Opendal(
OpendalObjectStore::new_fs_engine(fs.to_string())
OpendalObjectStore::new_fs_engine(fs.to_string(), config.clone())
.unwrap()
.monitored(metrics, config),
)
Expand Down
10 changes: 7 additions & 3 deletions src/object_store/src/object/opendal_engine/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,23 @@
use opendal::layers::RetryLayer;
use opendal::services::Fs;
use opendal::Operator;
use risingwave_common::config::ObjectStoreConfig;

use super::{EngineType, OpendalObjectStore};
use crate::object::opendal_engine::ATOMIC_WRITE_DIR;
use crate::object::ObjectResult;

impl OpendalObjectStore {
/// create opendal fs engine.
pub fn new_fs_engine(root: String) -> ObjectResult<Self> {
pub fn new_fs_engine(root: String, config: ObjectStoreConfig) -> ObjectResult<Self> {
// Create fs backend builder.
let mut builder = Fs::default();
builder.root(&root);
let atomic_write_dir = format!("{}/{}", root, ATOMIC_WRITE_DIR);
builder.atomic_write_dir(&atomic_write_dir);
if config.object_store_set_atomic_write_dir {
let atomic_write_dir = format!("{}/{}", root, ATOMIC_WRITE_DIR);
builder.atomic_write_dir(&atomic_write_dir);
}

let op: Operator = Operator::new(builder)?
.layer(RetryLayer::default())
.finish();
Expand Down
13 changes: 10 additions & 3 deletions src/object_store/src/object/opendal_engine/hdfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,28 @@
use opendal::layers::{LoggingLayer, RetryLayer};
use opendal::services::Hdfs;
use opendal::Operator;
use risingwave_common::config::ObjectStoreConfig;

use super::{EngineType, OpendalObjectStore};
use crate::object::opendal_engine::ATOMIC_WRITE_DIR;
use crate::object::ObjectResult;

impl OpendalObjectStore {
/// create opendal hdfs engine.
pub fn new_hdfs_engine(namenode: String, root: String) -> ObjectResult<Self> {
pub fn new_hdfs_engine(
namenode: String,
root: String,
config: ObjectStoreConfig,
) -> ObjectResult<Self> {
// Create hdfs backend builder.
let mut builder = Hdfs::default();
// Set the name node for hdfs.
builder.name_node(&namenode);
builder.root(&root);
let atomic_write_dir = format!("{}/{}", root, ATOMIC_WRITE_DIR);
builder.atomic_write_dir(&atomic_write_dir);
if config.object_store_set_atomic_write_dir {
let atomic_write_dir = format!("{}/{}", root, ATOMIC_WRITE_DIR);
builder.atomic_write_dir(&atomic_write_dir);
}
let op: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.layer(RetryLayer::default())
Expand Down

0 comments on commit ef2b4fe

Please sign in to comment.