diff --git a/src/object_store/src/object/opendal_engine/fs.rs b/src/object_store/src/object/opendal_engine/fs.rs index 7591501730a81..c1841363e198f 100644 --- a/src/object_store/src/object/opendal_engine/fs.rs +++ b/src/object_store/src/object/opendal_engine/fs.rs @@ -17,7 +17,9 @@ use opendal::services::Fs; use opendal::Operator; use super::{EngineType, OpendalObjectStore}; +use crate::object::opendal_engine::ATOMIC_WRITE_DIR; use crate::object::ObjectResult; + impl OpendalObjectStore { /// create opendal fs engine. pub fn new_fs_engine(root: String) -> ObjectResult { @@ -25,7 +27,8 @@ impl OpendalObjectStore { let mut builder = Fs::default(); builder.root(&root); - builder.atomic_write_dir(&root); + let atomic_write_dir = format!("{}{}", root, ATOMIC_WRITE_DIR); + builder.atomic_write_dir(&atomic_write_dir); let op: Operator = Operator::new(builder)? .layer(RetryLayer::default()) diff --git a/src/object_store/src/object/opendal_engine/hdfs.rs b/src/object_store/src/object/opendal_engine/hdfs.rs index 39c089490d1dc..8a1bb49954c93 100644 --- a/src/object_store/src/object/opendal_engine/hdfs.rs +++ b/src/object_store/src/object/opendal_engine/hdfs.rs @@ -17,7 +17,9 @@ use opendal::services::Hdfs; use opendal::Operator; use super::{EngineType, OpendalObjectStore}; +use crate::object::opendal_engine::ATOMIC_WRITE_DIR; use crate::object::ObjectResult; + impl OpendalObjectStore { /// create opendal hdfs engine. pub fn new_hdfs_engine(namenode: String, root: String) -> ObjectResult { @@ -26,7 +28,7 @@ impl OpendalObjectStore { // Set the name node for hdfs. builder.name_node(&namenode); builder.root(&root); - builder.atomic_write_dir(&root); + builder.atomic_write_dir(ATOMIC_WRITE_DIR); let op: Operator = Operator::new(builder)? .layer(LoggingLayer::default()) .layer(RetryLayer::default()) diff --git a/src/object_store/src/object/opendal_engine/mod.rs b/src/object_store/src/object/opendal_engine/mod.rs index ccaba375a1302..c1ab929d5586f 100644 --- a/src/object_store/src/object/opendal_engine/mod.rs +++ b/src/object_store/src/object/opendal_engine/mod.rs @@ -31,3 +31,6 @@ pub mod opendal_s3; pub mod oss; pub mod fs; + +// To make sure the the operation is consistent, we should specially set `atomic_write_dir` for fs, hdfs and webhdfs services. +const ATOMIC_WRITE_DIR: &str = "atomic_write_dir/"; diff --git a/src/object_store/src/object/opendal_engine/webhdfs.rs b/src/object_store/src/object/opendal_engine/webhdfs.rs index af97d57917936..f591a91f8f16a 100644 --- a/src/object_store/src/object/opendal_engine/webhdfs.rs +++ b/src/object_store/src/object/opendal_engine/webhdfs.rs @@ -17,6 +17,7 @@ use opendal::services::Webhdfs; use opendal::Operator; use super::{EngineType, OpendalObjectStore}; +use crate::object::opendal_engine::ATOMIC_WRITE_DIR; use crate::object::ObjectResult; impl OpendalObjectStore { @@ -29,7 +30,9 @@ impl OpendalObjectStore { // Set the root for hdfs, all operations will happen under this root. // NOTE: the root must be absolute path. builder.root(&root); - builder.atomic_write_dir(&root); + + let atomic_write_dir = format!("{}{}", root, ATOMIC_WRITE_DIR); + builder.atomic_write_dir(&atomic_write_dir); let op: Operator = Operator::new(builder)? .layer(LoggingLayer::default()) .layer(RetryLayer::default())