-
I wrote a benchmark to test writing data to dis, 100KB data cost 20ms. But RaftEngine syncing data cost <5ms. So, Why RaftEngine so fast? my code// my code
use std::borrow::BorrowMut;
use std::fs::File;
use std::io::{Write, Error, SeekFrom, Seek};
use std::os::macos::fs::MetadataExt;
use std::ptr::addr_of_mut;
use std::{fs, time};
use std::path::Path;
fn write_to_file(data: &[u8], file: &mut File) -> Result<(), Error> {
file.write_all(data)?;
file.sync_all()?;
Ok(())
}
fn main() {
let file_paths = ["/Users/song/code/personal/rustacean/write_perf/src/10",
"/Users/song/code/personal/rustacean/write_perf/src/50",
"/Users/song/code/personal/rustacean/write_perf/src/100",
"/Users/song/code/personal/rustacean/write_perf/src/200"];
for file_path in file_paths {
let path = Path::new(file_path);
if path.exists() {
fs::remove_file(file_path).unwrap();
}
let mut file = File::create(file_path).unwrap();
let name = path.file_name().unwrap().to_str().unwrap();
let kb:i32 = name.parse().unwrap();
let mut data = vec![];
for i in 0..(1024*kb) {
data.push(48)
}
println!("\n------");
println!("{kb}KB:");
for i in 0..5 {
file.seek(SeekFrom::Start(0)).unwrap();
let st = time::SystemTime::now();
write_to_file(&data, file.borrow_mut()).unwrap();
let et = time::SystemTime::now();
let delta = et.duration_since(st).unwrap();
println!(" |- {i} : {} us", delta.as_micros())
}
}
} output:
raft engine// write 100KB data to disk, based on RaftEngine example.
use std::time;
use std::time::SystemTime;
use kvproto::raft_serverpb::RaftLocalState;
use raft::eraftpb::Entry;
use raft_engine::{Config, Engine, LogBatch, MessageExt, ReadableSize};
use rand::thread_rng;
use rand_distr::{Distribution, Normal};
#[derive(Clone)]
pub struct MessageExtTyped;
impl MessageExt for MessageExtTyped {
type Entry = Entry;
fn index(e: &Self::Entry) -> u64 {
e.index
}
}
// How to run the example:
// $ RUST_LOG=debug cargo run --release --example append-compact-purge
fn main() {
env_logger::init();
let config = Config {
dir: "append-compact-purge-data".to_owned(),
purge_threshold: ReadableSize::gb(2),
batch_compression_threshold: ReadableSize::kb(0),
..Default::default()
};
let engine = Engine::open(config).expect("Open raft engine");
let compact_offset = 32; // In src/purge.rs, it's the limit for rewrite.
let mut rand_regions = Normal::new(128.0, 96.0)
.unwrap()
.sample_iter(thread_rng())
.map(|x| x as u64);
let mut rand_compacts = Normal::new(compact_offset as f64, 16.0)
.unwrap()
.sample_iter(thread_rng())
.map(|x| x as u64);
let mut batch = LogBatch::with_capacity(256);
let mut entry = Entry::new();
entry.set_data(vec![b'x'; 1024 * 100].into());
let init_state = RaftLocalState {
last_index: 0,
..Default::default()
};
let region = rand_regions.next().unwrap();
let mut state = engine
.get_message::<RaftLocalState>(region, b"last_index")
.unwrap()
.unwrap_or_else(|| init_state.clone());
state.last_index += 1; // manually update the state
let mut e = entry.clone();
e.index = state.last_index;
batch.add_entries::<MessageExtTyped>(region, &[e]).unwrap();
batch
.put_message(region, b"last_index".to_vec(), &state)
.unwrap();
let st = time::SystemTime::now();
let batch_size = batch.approximate_size();
engine.write(&mut batch, true).unwrap();
let et = time::SystemTime::now();
let delta = et.duration_since(st).unwrap();
println!("engine write {} bytes data, cost {} us", batch_size, delta.as_micros());
} output:
|
Beta Was this translation helpful? Give feedback.
Replies: 2 comments
-
A few reasons I can think of right now:
raft-engine/src/file_pipe_log/log_file.rs Lines 96 to 107 in 2c76690
|
Beta Was this translation helpful? Give feedback.
-
macos: File::sync_data() call libc::fcntl(fd, libc::F_FULLFSYNC). Latency is big.macos unsupport libc::fdatasync(fd). use nix::unistd::{fsync};
use std::borrow::{Borrow, BorrowMut};
use std::fs::File;
use std::io::{Write, Error, SeekFrom, Seek};
use std::ptr::addr_of_mut;
use std::{fs, time};
use std::os::unix::io::{AsRawFd, RawFd};
use std::path::Path;
use std::thread::sleep;
use nix::fcntl;
const CIRCLE:i32 = 3;
const SYNC_METHOD: [SyncMethod; 3] = [SyncMethod::LibcSync, SyncMethod::RustSyncData, SyncMethod::RustSync];
#[derive(Debug)]
enum SyncMethod {
LibcSyncData, // macos unsupported
LibcSync,
RustSyncData,
RustSync,
FcntlFullSync
}
fn sync(fd: RawFd) -> std::io::Result<()> {
unsafe {
fsync(fd)?;
};
Ok(())
}
fn write_to_file(data: &[u8], file: &mut File, sm: &SyncMethod) -> Result<(), Error> {
file.write(data)?;
match sm {
SyncMethod::LibcSync => {
sync(file.as_raw_fd())?;
},
SyncMethod::RustSyncData => {
file.sync_data()?;
},
SyncMethod::RustSync => {
file.sync_all()?;
},
_ => {}
}
Ok(())
}
fn main() {
let file_paths = ["./test/10",
"./test/50",
"./test/100",
"./test/200",
"./test/500",
"./test/1000"];
for sm in SYNC_METHOD {
println!("\n---{:?}---", sm.borrow());
for file_path in file_paths {
let path = Path::new(file_path);
let name = path.file_name().unwrap().to_str().unwrap();
let kb:i32 = name.parse().unwrap();
let mut data = vec![];
for _ in 0..(1024*kb) {
data.push(b'x');
}
println!(" |- {kb}KB:");
for i in 0..CIRCLE {
if path.exists() {
fs::remove_file(file_path).unwrap();
while path.exists() {
sleep(time::Duration::from_millis(100))
}
}
let mut file = File::create(file_path).unwrap();
// file.set_len((1024 * kb) as u64).unwrap(); // 预先设置空间
let st = time::SystemTime::now();
write_to_file(&data, file.borrow_mut(), &sm).unwrap();
let et = time::SystemTime::now();
let delta = et.duration_since(st).unwrap();
println!(" |- {i} : {} us", delta.as_micros());
}
}
}
} macos output:
centos output:
|
Beta Was this translation helpful? Give feedback.
A few reasons I can think of right now:
raft-engine/src/file_pipe_log/log_file.rs
Lines 96 to 107 in 2c76690
std::File::sync_all
uses fsync.