Skip to content

Commit

Permalink
fix(object_store): fs and hdfs object store should set atomic_write_d…
Browse files Browse the repository at this point in the history
…ir (#15155)
  • Loading branch information
wcy-fdu authored Feb 23, 2024
1 parent 41e723b commit 65550a0
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 3 deletions.
11 changes: 11 additions & 0 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,17 @@ profile:
- use: compactor
# - use: prometheus
# - use: grafana
fs:
steps:
# - use: etcd
- use: meta-node
- use: compute-node
- use: frontend
- use: opendal
engine: fs
- use: compactor
# - use: prometheus
# - use: grafana
webhdfs:
steps:
# - use: etcd
Expand Down
6 changes: 4 additions & 2 deletions src/object_store/src/object/opendal_engine/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@ use opendal::services::Fs;
use opendal::Operator;

use super::{EngineType, OpendalObjectStore};
use crate::object::opendal_engine::ATOMIC_WRITE_DIR;
use crate::object::ObjectResult;

impl OpendalObjectStore {
/// create opendal fs engine.
pub fn new_fs_engine(root: String) -> ObjectResult<Self> {
// Create fs backend builder.
let mut builder = Fs::default();

builder.root(&root);

let atomic_write_dir = format!("{}/{}", root, ATOMIC_WRITE_DIR);
builder.atomic_write_dir(&atomic_write_dir);
let op: Operator = Operator::new(builder)?
.layer(RetryLayer::default())
.finish();
Expand Down
5 changes: 4 additions & 1 deletion src/object_store/src/object/opendal_engine/hdfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ use opendal::services::Hdfs;
use opendal::Operator;

use super::{EngineType, OpendalObjectStore};
use crate::object::opendal_engine::ATOMIC_WRITE_DIR;
use crate::object::ObjectResult;

impl OpendalObjectStore {
/// create opendal hdfs engine.
pub fn new_hdfs_engine(namenode: String, root: String) -> ObjectResult<Self> {
Expand All @@ -26,7 +28,8 @@ impl OpendalObjectStore {
// Set the name node for hdfs.
builder.name_node(&namenode);
builder.root(&root);

let atomic_write_dir = format!("{}/{}", root, ATOMIC_WRITE_DIR);
builder.atomic_write_dir(&atomic_write_dir);
let op: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.layer(RetryLayer::default())
Expand Down
3 changes: 3 additions & 0 deletions src/object_store/src/object/opendal_engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,6 @@ pub mod opendal_s3;
pub mod oss;

pub mod fs;

// To make sure the the operation is consistent, we should specially set `atomic_write_dir` for fs, hdfs and webhdfs services.
const ATOMIC_WRITE_DIR: &str = "atomic_write_dir/";
3 changes: 3 additions & 0 deletions src/object_store/src/object/opendal_engine/webhdfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use opendal::services::Webhdfs;
use opendal::Operator;

use super::{EngineType, OpendalObjectStore};
use crate::object::opendal_engine::ATOMIC_WRITE_DIR;
use crate::object::ObjectResult;

impl OpendalObjectStore {
Expand All @@ -30,6 +31,8 @@ impl OpendalObjectStore {
// NOTE: the root must be absolute path.
builder.root(&root);

let atomic_write_dir = format!("{}/{}", root, ATOMIC_WRITE_DIR);
builder.atomic_write_dir(&atomic_write_dir);
let op: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.layer(RetryLayer::default())
Expand Down

0 comments on commit 65550a0

Please sign in to comment.