From 65550a0e0c81282ffb4d5286bd23bada5519172b Mon Sep 17 00:00:00 2001 From: congyi wang <58715567+wcy-fdu@users.noreply.github.com> Date: Fri, 23 Feb 2024 10:31:26 +0800 Subject: [PATCH] fix(object_store): fs and hdfs object store should set atomic_write_dir (#15155) --- risedev.yml | 11 +++++++++++ src/object_store/src/object/opendal_engine/fs.rs | 6 ++++-- src/object_store/src/object/opendal_engine/hdfs.rs | 5 ++++- src/object_store/src/object/opendal_engine/mod.rs | 3 +++ src/object_store/src/object/opendal_engine/webhdfs.rs | 3 +++ 5 files changed, 25 insertions(+), 3 deletions(-) diff --git a/risedev.yml b/risedev.yml index 69b0c23b05dd3..22356f2e1ac89 100644 --- a/risedev.yml +++ b/risedev.yml @@ -164,6 +164,17 @@ profile: - use: compactor # - use: prometheus # - use: grafana + fs: + steps: + # - use: etcd + - use: meta-node + - use: compute-node + - use: frontend + - use: opendal + engine: fs + - use: compactor + # - use: prometheus + # - use: grafana webhdfs: steps: # - use: etcd diff --git a/src/object_store/src/object/opendal_engine/fs.rs b/src/object_store/src/object/opendal_engine/fs.rs index 23d7dcbd503e8..ece3555d5b777 100644 --- a/src/object_store/src/object/opendal_engine/fs.rs +++ b/src/object_store/src/object/opendal_engine/fs.rs @@ -17,15 +17,17 @@ use opendal::services::Fs; use opendal::Operator; use super::{EngineType, OpendalObjectStore}; +use crate::object::opendal_engine::ATOMIC_WRITE_DIR; use crate::object::ObjectResult; + impl OpendalObjectStore { /// create opendal fs engine. pub fn new_fs_engine(root: String) -> ObjectResult { // Create fs backend builder. let mut builder = Fs::default(); - builder.root(&root); - + let atomic_write_dir = format!("{}/{}", root, ATOMIC_WRITE_DIR); + builder.atomic_write_dir(&atomic_write_dir); let op: Operator = Operator::new(builder)? .layer(RetryLayer::default()) .finish(); diff --git a/src/object_store/src/object/opendal_engine/hdfs.rs b/src/object_store/src/object/opendal_engine/hdfs.rs index b52be4094df80..12ee292a85416 100644 --- a/src/object_store/src/object/opendal_engine/hdfs.rs +++ b/src/object_store/src/object/opendal_engine/hdfs.rs @@ -17,7 +17,9 @@ use opendal::services::Hdfs; use opendal::Operator; use super::{EngineType, OpendalObjectStore}; +use crate::object::opendal_engine::ATOMIC_WRITE_DIR; use crate::object::ObjectResult; + impl OpendalObjectStore { /// create opendal hdfs engine. pub fn new_hdfs_engine(namenode: String, root: String) -> ObjectResult { @@ -26,7 +28,8 @@ impl OpendalObjectStore { // Set the name node for hdfs. builder.name_node(&namenode); builder.root(&root); - + let atomic_write_dir = format!("{}/{}", root, ATOMIC_WRITE_DIR); + builder.atomic_write_dir(&atomic_write_dir); let op: Operator = Operator::new(builder)? .layer(LoggingLayer::default()) .layer(RetryLayer::default()) diff --git a/src/object_store/src/object/opendal_engine/mod.rs b/src/object_store/src/object/opendal_engine/mod.rs index ccaba375a1302..c1ab929d5586f 100644 --- a/src/object_store/src/object/opendal_engine/mod.rs +++ b/src/object_store/src/object/opendal_engine/mod.rs @@ -31,3 +31,6 @@ pub mod opendal_s3; pub mod oss; pub mod fs; + +// To make sure the the operation is consistent, we should specially set `atomic_write_dir` for fs, hdfs and webhdfs services. +const ATOMIC_WRITE_DIR: &str = "atomic_write_dir/"; diff --git a/src/object_store/src/object/opendal_engine/webhdfs.rs b/src/object_store/src/object/opendal_engine/webhdfs.rs index ff61b39ec9e79..1f6b87b44fd5e 100644 --- a/src/object_store/src/object/opendal_engine/webhdfs.rs +++ b/src/object_store/src/object/opendal_engine/webhdfs.rs @@ -17,6 +17,7 @@ use opendal::services::Webhdfs; use opendal::Operator; use super::{EngineType, OpendalObjectStore}; +use crate::object::opendal_engine::ATOMIC_WRITE_DIR; use crate::object::ObjectResult; impl OpendalObjectStore { @@ -30,6 +31,8 @@ impl OpendalObjectStore { // NOTE: the root must be absolute path. builder.root(&root); + let atomic_write_dir = format!("{}/{}", root, ATOMIC_WRITE_DIR); + builder.atomic_write_dir(&atomic_write_dir); let op: Operator = Operator::new(builder)? .layer(LoggingLayer::default()) .layer(RetryLayer::default())