Skip to content

Commit

Permalink
feat: add options to enable log recycle and periodical fsync (#3114)
Browse files Browse the repository at this point in the history
* feat: add options to enable log recycle and periodical fsync

* fix: resolve review comments

* fix: conflicts
  • Loading branch information
v0y4g3r authored Jan 9, 2024
1 parent 6e860bc commit 62db28b
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 1 deletion.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ purge_interval = "10m"
read_batch_size = 128
# Whether to sync log file after every write.
sync_write = false
# Whether to reuse logically truncated log files.
enable_log_recycle = true
# Whether to pre-create log files on start up
prefill_log_files = false
# Duration for fsyncing log files.
sync_period = "1000ms"

# Metadata storage options.
[metadata_store]
Expand Down
10 changes: 10 additions & 0 deletions src/common/config/src/wal/raft_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ pub struct RaftEngineConfig {
pub read_batch_size: usize,
// whether to sync log file after every write
pub sync_write: bool,
// whether to reuse logically truncated log files.
pub enable_log_recycle: bool,
// whether to pre-create log files on start up
pub prefill_log_files: bool,
// duration for fsyncing log files.
#[serde(with = "humantime_serde")]
pub sync_period: Option<Duration>,
}

impl Default for RaftEngineConfig {
Expand All @@ -45,6 +52,9 @@ impl Default for RaftEngineConfig {
purge_interval: Duration::from_secs(600),
read_batch_size: 128,
sync_write: false,
enable_log_recycle: true,
prefill_log_files: false,
sync_period: None,
}
}
}
2 changes: 2 additions & 0 deletions src/log-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ common-macro.workspace = true
common-meta.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
dashmap.workspace = true
futures-util.workspace = true
futures.workspace = true
protobuf = { version = "2", features = ["bytes"] }
Expand Down
17 changes: 16 additions & 1 deletion src/log-store/src/raft_engine/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;

use async_stream::stream;
Expand Down Expand Up @@ -41,6 +42,7 @@ pub struct RaftEngineLogStore {
config: RaftEngineConfig,
engine: Arc<Engine>,
gc_task: RepeatedTask<Error>,
last_sync_time: AtomicI64,
}

pub struct PurgeExpiredFilesFunction {
Expand Down Expand Up @@ -80,6 +82,8 @@ impl RaftEngineLogStore {
recovery_mode: RecoveryMode::TolerateTailCorruption,
batch_compression_threshold: ReadableSize::kb(8),
target_file_size: ReadableSize(config.file_size.0),
enable_log_recycle: config.enable_log_recycle,
prefill_for_recycle: config.prefill_log_files,
..Default::default()
};
let engine = Arc::new(Engine::open(raft_engine_config).context(RaftEngineSnafu)?);
Expand All @@ -94,6 +98,7 @@ impl RaftEngineLogStore {
config,
engine,
gc_task,
last_sync_time: AtomicI64::new(0),
};
log_store.start()?;
Ok(log_store)
Expand Down Expand Up @@ -210,9 +215,19 @@ impl LogStore for RaftEngineLogStore {
.context(AddEntryLogBatchSnafu)?;
}

let mut sync = self.config.sync_write;

if let Some(sync_period) = &self.config.sync_period {
let now = common_time::util::current_time_millis();
if now - self.last_sync_time.load(Ordering::Relaxed) >= sync_period.as_millis() as i64 {
self.last_sync_time.store(now, Ordering::Relaxed);
sync = true;
}
}

let _ = self
.engine
.write(&mut batch, self.config.sync_write)
.write(&mut batch, sync)
.context(RaftEngineSnafu)?;

Ok(AppendBatchResponse { last_entry_ids })
Expand Down
2 changes: 2 additions & 0 deletions tests-integration/tests/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,8 @@ purge_threshold = "4GiB"
purge_interval = "10m"
read_batch_size = 128
sync_write = false
enable_log_recycle = true
prefill_log_files = false
[datanode.storage]
type = "{}"
Expand Down

0 comments on commit 62db28b

Please sign in to comment.