diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index c05c121d..2a5ce83d 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -19,7 +19,7 @@ jobs: uses: actions-rs/toolchain@v1 with: profile: minimal - toolchain: nightly-2023-01-01 + toolchain: nightly-2023-07-01 override: true components: rustfmt, clippy, rust-src - uses: Swatinem/rust-cache@v1 @@ -87,7 +87,7 @@ jobs: uses: actions-rs/toolchain@v1 with: profile: minimal - toolchain: nightly-2023-01-01 + toolchain: nightly-2023-07-01 override: true components: llvm-tools-preview - uses: Swatinem/rust-cache@v1 @@ -97,8 +97,7 @@ jobs: run: if [[ ! -e ~/.cargo/bin/grcov ]]; then cargo install --locked grcov; fi - name: Run tests run: | - make test - env WITH_STABLE_TOOLCHAIN=auto make test + make test_matrix env: RUSTFLAGS: '-Zinstrument-coverage' LLVM_PROFILE_FILE: '%p-%m.profraw' diff --git a/CHANGELOG.md b/CHANGELOG.md index c7e3c9a6..e2aaa484 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,15 @@ ## [Unreleased] +## [0.4.1] - 2023-09-14 + +### Behavior Changes + +* When log recycling is enabled, Raft Engine will now retain 50% more log files to reduce the chance of running out. +* Reduce the scope of keys reserved for internal use. + +## [0.4.0] - 2023-09-01 + ### Behavior Changes * `LogBatch::put` returns a `Result<()>` instead of `()`. It errs when the key is reserved for internal use. @@ -18,6 +27,7 @@ * Support preparing prefilled logs to enable log recycling when start-up. The amount of logs to prepare is controlled by `Config::prefill_limit`. * Add a new configuration `spill-dir` to allow automatic placement of logs into an auxiliary directory when `dir` is full. * Add a new method `Engine::fork` to duplicate an `Engine` to a new place, with a few disk file copies. +* Support configuring lz4 acceleration factor with `compression-level`. ## [0.3.0] - 2022-09-14 diff --git a/Cargo.toml b/Cargo.toml index 8fc6b771..feb15a98 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "raft-engine" -version = "0.3.0" +version = "0.4.1" authors = ["The TiKV Project Developers"] edition = "2018" rust-version = "1.66.0" @@ -49,7 +49,7 @@ log = { version = "0.4", features = ["max_level_trace", "release_max_level_debug lz4-sys = "1.9" memmap2 = { version = "0.7", optional = true } nix = "0.26" -num-derive = "0.3" +num-derive = "0.4" num-traits = "0.2" parking_lot = "0.12" prometheus = { version = "0.13" } @@ -64,33 +64,43 @@ strum = { version = "0.25.0", features = ["derive"] } thiserror = "1.0" [dev-dependencies] -criterion = "0.5" +criterion = "0.4" ctor = "0.2" env_logger = "0.10" kvproto = { git = "https://github.com/pingcap/kvproto.git", default-features = false, features = ["protobuf-codec"] } raft = { git = "https://github.com/tikv/raft-rs", branch = "master", default-features = false, features = ["protobuf-codec"] } rand = "0.8" rand_distr = "0.4" -tempfile = "3.1" -toml = "0.7" +tempfile = "3.6" +toml = "0.8" md-5 = "0.10.5" [features] +default = ["internals", "scripting"] internals = [] -nightly = ["prometheus/nightly"] -failpoints = ["fail/failpoints"] -scripting = ["rhai"] -swap = ["nightly", "memmap2"] +nightly = [ + "prometheus/nightly", +] +failpoints = [ + "fail/failpoints", +] +scripting = [ + "rhai", +] +swap = [ + "nightly", + "memmap2", +] +std_fs = [] -# Shortcuts -all_except_failpoints = ["internals", "scripting", "nightly", "swap"] -all_stable = ["internals", "scripting", "failpoints"] -all_stable_except_failpoints = ["internals", "scripting"] +nightly_group = ["nightly", "swap"] [patch.crates-io] raft-proto = { git = "https://github.com/tikv/raft-rs", branch = "master" } protobuf = { git = "https://github.com/pingcap/rust-protobuf", branch = "v2.8" } protobuf-codegen = { git = "https://github.com/pingcap/rust-protobuf", branch = "v2.8" } +# TODO: Use official grpc-rs once https://github.com/tikv/grpc-rs/pull/622 is merged. +grpcio = { git = "https://github.com/tabokie/grpc-rs", branch = "v0.10.x-win" } [workspace] members = ["stress", "ctl"] diff --git a/Makefile b/Makefile index dc928e89..da01ab1b 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ EXTRA_CARGO_ARGS ?= ## How to test stable toolchain. ## - auto: use current default toolchain, disable nightly features. -## - force: always use stable toolchain, disable nightly features. +## - force: explicitly use stable toolchain, disable nightly features. WITH_STABLE_TOOLCHAIN ?= WITH_NIGHTLY_FEATURES = @@ -41,22 +41,35 @@ clean: format: cargo ${TOOLCHAIN_ARGS} fmt --all +CLIPPY_WHITELIST += -A clippy::bool_assert_comparison ## Run clippy. clippy: ifdef WITH_NIGHTLY_FEATURES - cargo ${TOOLCHAIN_ARGS} clippy --all --all-features --all-targets -- -D clippy::all + cargo ${TOOLCHAIN_ARGS} clippy --all --features nightly_group,failpoints --all-targets -- -D clippy::all ${CLIPPY_WHITELIST} else - cargo ${TOOLCHAIN_ARGS} clippy --all --features all_stable --all-targets -- -D clippy::all + cargo ${TOOLCHAIN_ARGS} clippy --all --features failpoints --all-targets -- -D clippy::all ${CLIPPY_WHITELIST} endif ## Run tests. test: ifdef WITH_NIGHTLY_FEATURES - cargo ${TOOLCHAIN_ARGS} test --all --features all_except_failpoints ${EXTRA_CARGO_ARGS} -- --nocapture - cargo ${TOOLCHAIN_ARGS} test --test failpoints --all-features ${EXTRA_CARGO_ARGS} -- --test-threads 1 --nocapture + cargo ${TOOLCHAIN_ARGS} test --all --features nightly_group ${EXTRA_CARGO_ARGS} -- --nocapture + cargo ${TOOLCHAIN_ARGS} test --test failpoints --features nightly_group,failpoints ${EXTRA_CARGO_ARGS} -- --test-threads 1 --nocapture else - cargo ${TOOLCHAIN_ARGS} test --all --features all_stable_except_failpoints ${EXTRA_CARGO_ARGS} -- --nocapture - cargo ${TOOLCHAIN_ARGS} test --test failpoints --features all_stable ${EXTRA_CARGO_ARGS} -- --test-threads 1 --nocapture + cargo ${TOOLCHAIN_ARGS} test --all ${EXTRA_CARGO_ARGS} -- --nocapture + cargo ${TOOLCHAIN_ARGS} test --test failpoints --features failpoints ${EXTRA_CARGO_ARGS} -- --test-threads 1 --nocapture +endif + +## Run tests with various features for maximum code coverage. +ifndef WITH_NIGHTLY_FEATURES +test_matrix: + $(error Must run test matrix with nightly features. Please reset WITH_STABLE_TOOLCHAIN.) +else +test_matrix: test + cargo ${TOOLCHAIN_ARGS} test --all ${EXTRA_CARGO_ARGS} -- --nocapture + cargo ${TOOLCHAIN_ARGS} test --test failpoints --features failpoints ${EXTRA_CARGO_ARGS} -- --test-threads 1 --nocapture + cargo ${TOOLCHAIN_ARGS} test --all --features nightly_group,std_fs ${EXTRA_CARGO_ARGS} -- --nocapture + cargo ${TOOLCHAIN_ARGS} test --test failpoints --features nightly_group,std_fs,failpoints ${EXTRA_CARGO_ARGS} -- --test-threads 1 --nocapture endif ## Build raft-engine-ctl. diff --git a/README.md b/README.md index f70a1b1b..bbba26ee 100644 --- a/README.md +++ b/README.md @@ -54,7 +54,7 @@ Put this in your Cargo.toml: ```rust [dependencies] -raft-engine = "0.3.0" +raft-engine = "0.4" ``` Available Cargo features: diff --git a/ctl/Cargo.toml b/ctl/Cargo.toml index c10b092e..5bf23c8b 100644 --- a/ctl/Cargo.toml +++ b/ctl/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "raft-engine-ctl" -version = "0.3.0" +version = "0.4.1" authors = ["The TiKV Project Developers"] edition = "2018" rust-version = "1.61.0" @@ -11,4 +11,4 @@ license = "Apache-2.0" [dependencies] clap = { version = "3.1", features = ["derive", "cargo"] } env_logger = "0.10" -raft-engine = { path = "..", version = "0.3.0", features = ["scripting", "internals"] } +raft-engine = { path = "..", version = "0.4.1", features = ["scripting", "internals"] } diff --git a/src/codec.rs b/src/codec.rs index af75b75b..b8b34b18 100644 --- a/src/codec.rs +++ b/src/codec.rs @@ -670,6 +670,7 @@ mod tests { decode_var_u64(&mut buf.as_slice()), ErrorKind::UnexpectedEof ); + check_error!(decode_var_u64(&mut [].as_slice()), ErrorKind::UnexpectedEof); buf.push(0); assert_eq!(0, decode_var_u64(&mut buf.as_slice()).unwrap()); diff --git a/src/config.rs b/src/config.rs index 3b5036be..c4cacf99 100644 --- a/src/config.rs +++ b/src/config.rs @@ -60,6 +60,12 @@ pub struct Config { /// /// Default: "8KB" pub batch_compression_threshold: ReadableSize, + /// Acceleration factor for LZ4 compression. It can be fine tuned, with each + /// successive value providing roughly +~3% to speed. The value will be + /// capped within [1, 65537] by LZ4. + /// + /// Default: 1. + pub compression_level: Option, /// Deprecated. /// Incrementally sync log files after specified bytes have been written. /// Setting it to zero disables incremental sync. @@ -112,7 +118,7 @@ pub struct Config { pub prefill_for_recycle: bool, /// Maximum capacity for preparing log files for recycling when start. - /// If not `None`, its size is equal to `purge-threshold`. + /// If `None`, its size is equal to `purge-threshold`*1.5. /// Only available for `prefill-for-recycle` is true. /// /// Default: None @@ -130,6 +136,7 @@ impl Default for Config { recovery_read_block_size: ReadableSize::kb(16), recovery_threads: 4, batch_compression_threshold: ReadableSize::kb(8), + compression_level: None, bytes_per_sync: None, format_version: Version::V2, target_file_size: ReadableSize::mb(128), @@ -215,10 +222,10 @@ impl Config { } if self.enable_log_recycle && self.purge_threshold.0 >= self.target_file_size.0 { // (1) At most u32::MAX so that the file number can be capped into an u32 - // without colliding. (2) Add some more file as an additional buffer to - // avoid jitters. + // without colliding. (2) Increase the threshold by 50% to add some more file + // as an additional buffer to avoid jitters. std::cmp::min( - (self.purge_threshold.0 / self.target_file_size.0) as usize + 2, + (self.purge_threshold.0 / self.target_file_size.0) as usize * 3 / 2, u32::MAX as usize, ) } else { @@ -237,7 +244,7 @@ impl Config { if self.prefill_for_recycle && prefill_limit >= self.target_file_size.0 { // Keep same with the maximum setting of `recycle_capacity`. std::cmp::min( - (prefill_limit / self.target_file_size.0) as usize + 2, + (prefill_limit / self.target_file_size.0) as usize * 3 / 2, u32::MAX as usize, ) } else { @@ -280,6 +287,8 @@ mod tests { assert_eq!(load.target_file_size, ReadableSize::mb(1)); assert_eq!(load.purge_threshold, ReadableSize::mb(3)); assert_eq!(load.format_version, Version::V1); + assert_eq!(load.enable_log_recycle, false); + assert_eq!(load.prefill_for_recycle, false); load.sanitize().unwrap(); } @@ -293,7 +302,7 @@ mod tests { assert!(hard_load.sanitize().is_err()); let soft_error = r#" - recovery-read-block-size = "1KB" + recovery-read-block-size = 1 recovery-threads = 0 target-file-size = "5000MB" format-version = 2 @@ -301,6 +310,8 @@ mod tests { prefill-for-recycle = true "#; let soft_load: Config = toml::from_str(soft_error).unwrap(); + assert!(soft_load.recovery_read_block_size.0 < MIN_RECOVERY_READ_BLOCK_SIZE as u64); + assert!(soft_load.recovery_threads < MIN_RECOVERY_THREADS); let mut soft_sanitized = soft_load; soft_sanitized.sanitize().unwrap(); assert!(soft_sanitized.recovery_read_block_size.0 >= MIN_RECOVERY_READ_BLOCK_SIZE as u64); @@ -309,8 +320,6 @@ mod tests { soft_sanitized.purge_rewrite_threshold.unwrap(), soft_sanitized.target_file_size ); - assert_eq!(soft_sanitized.format_version, Version::V2); - assert!(soft_sanitized.enable_log_recycle); let recycle_error = r#" enable-log-recycle = true diff --git a/src/engine.rs b/src/engine.rs index dab1b5b4..b369d5c0 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -143,7 +143,10 @@ where return Ok(0); } let start = Instant::now(); - let len = log_batch.finish_populate(self.cfg.batch_compression_threshold.0 as usize)?; + let len = log_batch.finish_populate( + self.cfg.batch_compression_threshold.0 as usize, + self.cfg.compression_level, + )?; debug_assert!(len > 0); let mut attempt_count = 0_u64; @@ -1315,6 +1318,7 @@ pub(crate) mod tests { engine.append(rid, index, index + 1, Some(&data)); } } + engine.append(11, 1, 11, Some(&data)); // The engine needs purge, and all old entries should be rewritten. assert!(engine @@ -1333,8 +1337,9 @@ pub(crate) mod tests { }); } - // Recover with rewrite queue and append queue. + engine.clean(11); let cleaned_region_ids = engine.memtables.cleaned_region_ids(); + assert_eq!(cleaned_region_ids.len(), 1); let engine = engine.reopen(); assert_eq!(engine.memtables.cleaned_region_ids(), cleaned_region_ids); @@ -2147,6 +2152,7 @@ pub(crate) mod tests { prefill_for_recycle: true, ..Default::default() }; + let recycle_capacity = cfg.recycle_capacity() as u64; let fs = Arc::new(DeleteMonitoredFileSystem::new()); let engine = Engine::open_with_file_system(cfg, fs.clone()).unwrap(); @@ -2182,11 +2188,11 @@ pub(crate) mod tests { assert_eq!(reserved_start_2, reserved_start_3); // Reuse all of reserved files. - for rid in 1..=50 { + for rid in 1..=recycle_capacity { engine.append(rid, 1, 11, Some(&entry_data)); } assert!(fs.reserved_metadata.lock().unwrap().is_empty()); - for rid in 1..=50 { + for rid in 1..=recycle_capacity { engine.clean(rid); } engine.purge_manager.must_rewrite_append_queue(None, None); @@ -2418,11 +2424,12 @@ pub(crate) mod tests { // Directly write to pipe log. let mut log_batch = LogBatch::default(); let flush = |lb: &mut LogBatch| { - lb.finish_populate(0).unwrap(); + lb.finish_populate(0, None).unwrap(); engine.pipe_log.append(LogQueue::Rewrite, lb).unwrap(); lb.drain(); }; { + // begin. let mut builder = AtomicGroupBuilder::with_id(3); builder.begin(&mut log_batch); log_batch.put(rid, key.clone(), value.clone()).unwrap(); @@ -2430,6 +2437,7 @@ pub(crate) mod tests { engine.pipe_log.rotate(LogQueue::Rewrite).unwrap(); } { + // begin - unrelated - end. let mut builder = AtomicGroupBuilder::with_id(3); builder.begin(&mut log_batch); rid += 1; @@ -2449,6 +2457,7 @@ pub(crate) mod tests { engine.pipe_log.rotate(LogQueue::Rewrite).unwrap(); } { + // begin - middle - middle - end. let mut builder = AtomicGroupBuilder::with_id(3); builder.begin(&mut log_batch); rid += 1; @@ -2473,6 +2482,7 @@ pub(crate) mod tests { engine.pipe_log.rotate(LogQueue::Rewrite).unwrap(); } { + // begin - begin - end. let mut builder = AtomicGroupBuilder::with_id(3); builder.begin(&mut log_batch); rid += 1; @@ -2492,6 +2502,7 @@ pub(crate) mod tests { engine.pipe_log.rotate(LogQueue::Rewrite).unwrap(); } { + // end - middle - end. // We must change id to avoid getting merged with last group. // It is actually not possible in real life to only have "begin" missing. let mut builder = AtomicGroupBuilder::with_id(4); @@ -2513,6 +2524,7 @@ pub(crate) mod tests { engine.pipe_log.rotate(LogQueue::Rewrite).unwrap(); } { + // end - begin - end let mut builder = AtomicGroupBuilder::with_id(5); builder.begin(&mut LogBatch::default()); builder.end(&mut log_batch); @@ -2532,6 +2544,26 @@ pub(crate) mod tests { flush(&mut log_batch); engine.pipe_log.rotate(LogQueue::Rewrite).unwrap(); } + { + // begin - end - begin - end. + let mut builder = AtomicGroupBuilder::with_id(6); + builder.begin(&mut log_batch); + rid += 1; + log_batch.put(rid, key.clone(), value.clone()).unwrap(); + data.insert(rid); + flush(&mut log_batch); + builder.end(&mut log_batch); + flush(&mut log_batch); + let mut builder = AtomicGroupBuilder::with_id(7); + builder.begin(&mut log_batch); + flush(&mut log_batch); + builder.end(&mut log_batch); + rid += 1; + log_batch.put(rid, key.clone(), value.clone()).unwrap(); + data.insert(rid); + flush(&mut log_batch); + engine.pipe_log.rotate(LogQueue::Rewrite).unwrap(); + } engine.pipe_log.sync(LogQueue::Rewrite).unwrap(); let engine = engine.reopen(); @@ -2568,7 +2600,7 @@ pub(crate) mod tests { log_batch.put_unchecked(3, crate::make_internal_key(&[1]), value.clone()); log_batch.put_unchecked(4, crate::make_internal_key(&[1]), value); - log_batch.finish_populate(0).unwrap(); + log_batch.finish_populate(0, None).unwrap(); let block_handle = engine .pipe_log .append(LogQueue::Rewrite, &mut log_batch) @@ -2915,6 +2947,7 @@ pub(crate) mod tests { purge_threshold: ReadableSize(40), ..cfg.clone() }; + let recycle_capacity = cfg_2.recycle_capacity() as u64; let engine = Engine::open_with_file_system(cfg_2, file_system.clone()).unwrap(); assert!(number_of_files(spill_dir.path()) > 0); for rid in 1..=10 { @@ -2929,7 +2962,7 @@ pub(crate) mod tests { ); assert!(file_count > engine.file_count(None)); // Append data, recycled files are reused. - for rid in 1..=30 { + for rid in 1..=recycle_capacity - 10 { engine.append(rid, 20, 30, Some(&entry_data)); } // No new file is created. diff --git a/src/env/default.rs b/src/env/default.rs index 7de4c0de..2c06338c 100644 --- a/src/env/default.rs +++ b/src/env/default.rs @@ -1,213 +1,16 @@ // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. +#[cfg(feature = "failpoints")] +use std::io::{Error, ErrorKind}; use std::io::{Read, Result as IoResult, Seek, SeekFrom, Write}; -use std::os::unix::io::RawFd; use std::path::Path; use std::sync::Arc; use fail::fail_point; -use log::error; -use nix::errno::Errno; -use nix::fcntl::{self, OFlag}; -use nix::sys::stat::Mode; -use nix::sys::uio::{pread, pwrite}; -use nix::unistd::{close, ftruncate, lseek, Whence}; -use nix::NixPath; +use crate::env::log_fd::LogFd; use crate::env::{FileSystem, Handle, Permission, WriteExt}; -fn from_nix_error(e: nix::Error, custom: &'static str) -> std::io::Error { - let kind = std::io::Error::from(e).kind(); - std::io::Error::new(kind, custom) -} - -impl From for OFlag { - fn from(value: Permission) -> OFlag { - match value { - Permission::ReadOnly => OFlag::O_RDONLY, - Permission::ReadWrite => OFlag::O_RDWR, - } - } -} - -/// A RAII-style low-level file. Errors occurred during automatic resource -/// release are logged and ignored. -/// -/// A [`LogFd`] is essentially a thin wrapper around [`RawFd`]. It's only -/// supported on *Unix*, and primarily optimized for *Linux*. -/// -/// All [`LogFd`] instances are opened with read and write permission. -#[derive(Debug)] -pub struct LogFd(RawFd); - -impl LogFd { - /// Opens a file with the given `path`. - pub fn open(path: &P, perm: Permission) -> IoResult { - fail_point!("log_fd::open::err", |_| { - Err(from_nix_error(nix::Error::EINVAL, "fp")) - }); - // Permission 644 - let mode = Mode::S_IRUSR | Mode::S_IWUSR | Mode::S_IRGRP | Mode::S_IROTH; - fail_point!("log_fd::open::fadvise_dontneed", |_| { - let fd = - LogFd(fcntl::open(path, perm.into(), mode).map_err(|e| from_nix_error(e, "open"))?); - #[cfg(target_os = "linux")] - unsafe { - extern crate libc; - libc::posix_fadvise64(fd.0, 0, fd.file_size()? as i64, libc::POSIX_FADV_DONTNEED); - } - Ok(fd) - }); - Ok(LogFd( - fcntl::open(path, perm.into(), mode).map_err(|e| from_nix_error(e, "open"))?, - )) - } - - /// Opens a file with the given `path`. The specified file will be created - /// first if not exists. - pub fn create(path: &P) -> IoResult { - fail_point!("log_fd::create::err", |_| { - Err(from_nix_error(nix::Error::EINVAL, "fp")) - }); - let flags = OFlag::O_RDWR | OFlag::O_CREAT; - // Permission 644 - let mode = Mode::S_IRUSR | Mode::S_IWUSR | Mode::S_IRGRP | Mode::S_IROTH; - let fd = fcntl::open(path, flags, mode).map_err(|e| from_nix_error(e, "open"))?; - Ok(LogFd(fd)) - } - - /// Closes the file. - pub fn close(&self) -> IoResult<()> { - fail_point!("log_fd::close::err", |_| { - Err(from_nix_error(nix::Error::EINVAL, "fp")) - }); - close(self.0).map_err(|e| from_nix_error(e, "close")) - } - - /// Reads some bytes starting at `offset` from this file into the specified - /// buffer. Returns how many bytes were read. - pub fn read(&self, mut offset: usize, buf: &mut [u8]) -> IoResult { - let mut readed = 0; - while readed < buf.len() { - fail_point!("log_fd::read::err", |_| { - Err(from_nix_error(nix::Error::EINVAL, "fp")) - }); - let bytes = match pread(self.0, &mut buf[readed..], offset as i64) { - Ok(bytes) => bytes, - Err(e) if e == Errno::EINTR => continue, - Err(e) => return Err(from_nix_error(e, "pread")), - }; - // EOF - if bytes == 0 { - break; - } - readed += bytes; - offset += bytes; - } - Ok(readed) - } - - /// Writes some bytes to this file starting at `offset`. Returns how many - /// bytes were written. - pub fn write(&self, mut offset: usize, content: &[u8]) -> IoResult { - fail_point!("log_fd::write::zero", |_| { Ok(0) }); - fail_point!("log_fd::write::no_space_err", |_| { - Err(from_nix_error(nix::Error::ENOSPC, "nospace")) - }); - let mut written = 0; - while written < content.len() { - let bytes = match pwrite(self.0, &content[written..], offset as i64) { - Ok(bytes) => bytes, - Err(e) if e == Errno::EINTR => continue, - Err(e) if e == Errno::ENOSPC => return Err(from_nix_error(e, "nospace")), - Err(e) => return Err(from_nix_error(e, "pwrite")), - }; - if bytes == 0 { - break; - } - written += bytes; - offset += bytes; - } - fail_point!("log_fd::write::err", |_| { - Err(from_nix_error(nix::Error::EINVAL, "fp")) - }); - Ok(written) - } - - /// Truncates all data after `offset`. - pub fn truncate(&self, offset: usize) -> IoResult<()> { - fail_point!("log_fd::truncate::err", |_| { - Err(from_nix_error(nix::Error::EINVAL, "fp")) - }); - ftruncate(self.0, offset as i64).map_err(|e| from_nix_error(e, "ftruncate")) - } - - /// Attempts to allocate space for `size` bytes starting at `offset`. - #[allow(unused_variables)] - pub fn allocate(&self, offset: usize, size: usize) -> IoResult<()> { - fail_point!("log_fd::allocate::err", |_| { - Err(from_nix_error(nix::Error::EINVAL, "fp")) - }); - #[cfg(target_os = "linux")] - { - if let Err(e) = fcntl::fallocate( - self.0, - fcntl::FallocateFlags::empty(), - offset as i64, - size as i64, - ) { - if e != nix::Error::EOPNOTSUPP { - return Err(from_nix_error(e, "fallocate")); - } - } - } - Ok(()) - } -} - -impl Handle for LogFd { - #[inline] - fn truncate(&self, offset: usize) -> IoResult<()> { - fail_point!("log_fd::truncate::err", |_| { - Err(from_nix_error(nix::Error::EINVAL, "fp")) - }); - ftruncate(self.0, offset as i64).map_err(|e| from_nix_error(e, "ftruncate")) - } - - #[inline] - fn file_size(&self) -> IoResult { - fail_point!("log_fd::file_size::err", |_| { - Err(from_nix_error(nix::Error::EINVAL, "fp")) - }); - lseek(self.0, 0, Whence::SeekEnd) - .map(|n| n as usize) - .map_err(|e| from_nix_error(e, "lseek")) - } - - #[inline] - fn sync(&self) -> IoResult<()> { - fail_point!("log_fd::sync::err", |_| { - Err(from_nix_error(nix::Error::EINVAL, "fp")) - }); - #[cfg(target_os = "linux")] - { - nix::unistd::fdatasync(self.0).map_err(|e| from_nix_error(e, "fdatasync")) - } - #[cfg(not(target_os = "linux"))] - { - nix::unistd::fsync(self.0).map_err(|e| from_nix_error(e, "fsync")) - } - } -} - -impl Drop for LogFd { - fn drop(&mut self) { - if let Err(e) = self.close() { - error!("error while closing file: {e}"); - } - } -} - /// A low-level file adapted for standard interfaces including [`Seek`], /// [`Write`] and [`Read`]. pub struct LogFile { @@ -227,7 +30,14 @@ impl LogFile { impl Write for LogFile { fn write(&mut self, buf: &[u8]) -> IoResult { + fail_point!("log_file::write::zero", |_| { Ok(0) }); + let len = self.inner.write(self.offset, buf)?; + + fail_point!("log_file::write::err", |_| { + Err(Error::new(ErrorKind::InvalidInput, "fp")) + }); + self.offset += len; Ok(len) } @@ -239,6 +49,10 @@ impl Write for LogFile { impl Read for LogFile { fn read(&mut self, buf: &mut [u8]) -> IoResult { + fail_point!("log_file::read::err", |_| { + Err(Error::new(ErrorKind::InvalidInput, "fp")) + }); + let len = self.inner.read(self.offset, buf)?; self.offset += len; Ok(len) @@ -261,12 +75,20 @@ impl Seek for LogFile { impl WriteExt for LogFile { fn truncate(&mut self, offset: usize) -> IoResult<()> { + fail_point!("log_file::truncate::err", |_| { + Err(Error::new(ErrorKind::InvalidInput, "fp")) + }); + self.inner.truncate(offset)?; self.offset = offset; Ok(()) } fn allocate(&mut self, offset: usize, size: usize) -> IoResult<()> { + fail_point!("log_file::allocate::err", |_| { + Err(Error::new(ErrorKind::InvalidInput, "fp")) + }); + self.inner.allocate(offset, size) } } @@ -280,10 +102,18 @@ impl FileSystem for DefaultFileSystem { type Writer = LogFile; fn create>(&self, path: P) -> IoResult { + fail_point!("default_fs::create::err", |_| { + Err(Error::new(ErrorKind::InvalidInput, "fp")) + }); + LogFd::create(path.as_ref()) } fn open>(&self, path: P, perm: Permission) -> IoResult { + fail_point!("default_fs::open::err", |_| { + Err(Error::new(ErrorKind::InvalidInput, "fp")) + }); + LogFd::open(path.as_ref(), perm) } diff --git a/src/env/hedged/mod.rs b/src/env/hedged/mod.rs index c2821b27..e6b693cd 100644 --- a/src/env/hedged/mod.rs +++ b/src/env/hedged/mod.rs @@ -13,7 +13,7 @@ use std::sync::Arc; use std::thread; use std::thread::JoinHandle; -use crate::env::default::LogFd; +use crate::env::log_fd::LogFd; use crate::env::DefaultFileSystem; use crate::env::{FileSystem, Handle, Permission, WriteExt}; use futures::executor::block_on; diff --git a/src/env/hedged/recover.rs b/src/env/hedged/recover.rs index 955aba5f..78751de2 100644 --- a/src/env/hedged/recover.rs +++ b/src/env/hedged/recover.rs @@ -14,7 +14,7 @@ use std::io::Result as IoResult; use std::path::PathBuf; use std::sync::Arc; -use crate::env::default::LogFd; +use crate::env::log_fd::LogFd; use crate::env::DefaultFileSystem; use crate::env::{FileSystem, Permission}; diff --git a/src/env/hedged/task.rs b/src/env/hedged/task.rs index 70b169da..206f2e44 100644 --- a/src/env/hedged/task.rs +++ b/src/env/hedged/task.rs @@ -5,7 +5,7 @@ use std::io::Result as IoResult; use std::path::PathBuf; use std::sync::Arc; -use crate::env::default::LogFd; +use crate::env::log_fd::LogFd; use crate::env::DefaultFileSystem; use crate::env::{FileSystem, Handle, Permission}; use futures::channel::oneshot::{self, Canceled}; diff --git a/src/env/log_fd.rs b/src/env/log_fd.rs new file mode 100644 index 00000000..23cc2b3f --- /dev/null +++ b/src/env/log_fd.rs @@ -0,0 +1,11 @@ +// Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. + +#[cfg(not(any(windows, feature = "std_fs")))] +mod unix; +#[cfg(not(any(windows, feature = "std_fs")))] +pub use unix::LogFd; + +#[cfg(any(windows, feature = "std_fs"))] +mod plain; +#[cfg(any(windows, feature = "std_fs"))] +pub use plain::LogFd; diff --git a/src/env/log_fd/plain.rs b/src/env/log_fd/plain.rs new file mode 100644 index 00000000..03328e91 --- /dev/null +++ b/src/env/log_fd/plain.rs @@ -0,0 +1,84 @@ +// Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. + +//! A naive file handle implementation based on standard `File`. All I/O +//! operations need to synchronize under a `RwLock`. + +use crate::env::{Handle, Permission}; + +use fail::fail_point; +use parking_lot::RwLock; + +use std::fs::{File, OpenOptions}; +use std::io::{Error, ErrorKind, Read, Result, Seek, SeekFrom, Write}; +use std::path::Path; +use std::sync::Arc; + +pub struct LogFd(Arc>); + +impl LogFd { + pub fn open>(path: P, _: Permission) -> Result { + OpenOptions::new() + .read(true) + .write(true) + .open(path) + .map(|x| Self(Arc::new(RwLock::new(x)))) + } + + pub fn create>(path: P) -> Result { + OpenOptions::new() + .create(true) + .read(true) + .write(true) + .open(path) + .map(|x| Self(Arc::new(RwLock::new(x)))) + } + + pub fn read(&self, offset: usize, buf: &mut [u8]) -> Result { + let mut file = self.0.write(); + let _ = file.seek(SeekFrom::Start(offset as u64))?; + file.read(buf) + } + + pub fn write(&self, offset: usize, content: &[u8]) -> Result { + fail_point!("log_fd::write::no_space_err", |_| { + Err(Error::new(ErrorKind::Other, "nospace")) + }); + + let mut file = self.0.write(); + let _ = file.seek(SeekFrom::Start(offset as u64))?; + file.write(content) + } + + pub fn truncate(&self, offset: usize) -> Result<()> { + let file = self.0.write(); + file.set_len(offset as u64) + } + + pub fn allocate(&self, _offset: usize, _size: usize) -> Result<()> { + Ok(()) + } +} + +impl Handle for LogFd { + fn truncate(&self, offset: usize) -> Result<()> { + self.truncate(offset) + } + + fn file_size(&self) -> Result { + fail_point!("log_fd::file_size::err", |_| { + Err(Error::new(ErrorKind::InvalidInput, "fp")) + }); + + let file = self.0.read(); + file.metadata().map(|x| x.len() as usize) + } + + fn sync(&self) -> Result<()> { + fail_point!("log_fd::sync::err", |_| { + Err(Error::new(ErrorKind::InvalidInput, "fp")) + }); + + let file = self.0.write(); + file.sync_all() + } +} diff --git a/src/env/log_fd/unix.rs b/src/env/log_fd/unix.rs new file mode 100644 index 00000000..608cca70 --- /dev/null +++ b/src/env/log_fd/unix.rs @@ -0,0 +1,185 @@ +// Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. + +use crate::env::{Handle, Permission}; + +use fail::fail_point; +use log::error; + +use std::io::Result as IoResult; +use std::os::unix::io::RawFd; + +use nix::errno::Errno; +use nix::fcntl::{self, OFlag}; +use nix::sys::stat::Mode; +use nix::sys::uio::{pread, pwrite}; +use nix::unistd::{close, ftruncate, lseek, Whence}; +use nix::NixPath; + +fn from_nix_error(e: nix::Error, custom: &'static str) -> std::io::Error { + let kind = std::io::Error::from(e).kind(); + std::io::Error::new(kind, custom) +} + +impl From for OFlag { + fn from(value: Permission) -> OFlag { + match value { + Permission::ReadOnly => OFlag::O_RDONLY, + Permission::ReadWrite => OFlag::O_RDWR, + } + } +} + +/// A RAII-style low-level file. Errors occurred during automatic resource +/// release are logged and ignored. +/// +/// A [`LogFd`] is essentially a thin wrapper around [`RawFd`]. It's only +/// supported on *Unix*, and primarily optimized for *Linux*. +/// +/// All [`LogFd`] instances are opened with read and write permission. +pub struct LogFd(RawFd); + +impl LogFd { + /// Opens a file with the given `path`. + pub fn open(path: &P, perm: Permission) -> IoResult { + // Permission 644 + let mode = Mode::S_IRUSR | Mode::S_IWUSR | Mode::S_IRGRP | Mode::S_IROTH; + fail_point!("log_fd::open::fadvise_dontneed", |_| { + let fd = + LogFd(fcntl::open(path, perm.into(), mode).map_err(|e| from_nix_error(e, "open"))?); + #[cfg(target_os = "linux")] + unsafe { + extern crate libc; + libc::posix_fadvise64(fd.0, 0, fd.file_size()? as i64, libc::POSIX_FADV_DONTNEED); + } + Ok(fd) + }); + Ok(LogFd( + fcntl::open(path, perm.into(), mode).map_err(|e| from_nix_error(e, "open"))?, + )) + } + + /// Opens a file with the given `path`. The specified file will be created + /// first if not exists. + pub fn create(path: &P) -> IoResult { + let flags = OFlag::O_RDWR | OFlag::O_CREAT; + // Permission 644 + let mode = Mode::S_IRUSR | Mode::S_IWUSR | Mode::S_IRGRP | Mode::S_IROTH; + let fd = fcntl::open(path, flags, mode).map_err(|e| from_nix_error(e, "open"))?; + Ok(LogFd(fd)) + } + + /// Closes the file. + pub fn close(&self) -> IoResult<()> { + fail_point!("log_fd::close::err", |_| { + Err(from_nix_error(nix::Error::EINVAL, "fp")) + }); + close(self.0).map_err(|e| from_nix_error(e, "close")) + } + + /// Reads some bytes starting at `offset` from this file into the specified + /// buffer. Returns how many bytes were read. + pub fn read(&self, mut offset: usize, buf: &mut [u8]) -> IoResult { + let mut readed = 0; + while readed < buf.len() { + let bytes = match pread(self.0, &mut buf[readed..], offset as i64) { + Ok(bytes) => bytes, + Err(e) if e == Errno::EINTR => continue, + Err(e) => return Err(from_nix_error(e, "pread")), + }; + // EOF + if bytes == 0 { + break; + } + readed += bytes; + offset += bytes; + } + Ok(readed) + } + + /// Writes some bytes to this file starting at `offset`. Returns how many + /// bytes were written. + pub fn write(&self, mut offset: usize, content: &[u8]) -> IoResult { + fail_point!("log_fd::write::no_space_err", |_| { + Err(from_nix_error(nix::Error::ENOSPC, "nospace")) + }); + let mut written = 0; + while written < content.len() { + let bytes = match pwrite(self.0, &content[written..], offset as i64) { + Ok(bytes) => bytes, + Err(e) if e == Errno::EINTR => continue, + Err(e) if e == Errno::ENOSPC => return Err(from_nix_error(e, "nospace")), + Err(e) => return Err(from_nix_error(e, "pwrite")), + }; + if bytes == 0 { + break; + } + written += bytes; + offset += bytes; + } + Ok(written) + } + + /// Truncates all data after `offset`. + pub fn truncate(&self, offset: usize) -> IoResult<()> { + ftruncate(self.0, offset as i64).map_err(|e| from_nix_error(e, "ftruncate")) + } + + /// Attempts to allocate space for `size` bytes starting at `offset`. + #[allow(unused_variables)] + pub fn allocate(&self, offset: usize, size: usize) -> IoResult<()> { + #[cfg(target_os = "linux")] + { + if let Err(e) = fcntl::fallocate( + self.0, + fcntl::FallocateFlags::empty(), + offset as i64, + size as i64, + ) { + if e != nix::Error::EOPNOTSUPP { + return Err(from_nix_error(e, "fallocate")); + } + } + } + Ok(()) + } +} + +impl Handle for LogFd { + #[inline] + fn truncate(&self, offset: usize) -> IoResult<()> { + self.truncate(offset) + } + + #[inline] + fn file_size(&self) -> IoResult { + fail_point!("log_fd::file_size::err", |_| { + Err(from_nix_error(nix::Error::EINVAL, "fp")) + }); + lseek(self.0, 0, Whence::SeekEnd) + .map(|n| n as usize) + .map_err(|e| from_nix_error(e, "lseek")) + } + + #[inline] + fn sync(&self) -> IoResult<()> { + fail_point!("log_fd::sync::err", |_| { + Err(from_nix_error(nix::Error::EINVAL, "fp")) + }); + #[cfg(target_os = "linux")] + { + nix::unistd::fdatasync(self.0).map_err(|e| from_nix_error(e, "fdatasync")) + } + #[cfg(not(target_os = "linux"))] + { + nix::unistd::fsync(self.0).map_err(|e| from_nix_error(e, "fsync")) + } + } +} + +impl Drop for LogFd { + fn drop(&mut self) { + if let Err(e) = self.close() { + error!("error while closing file: {e}"); + } + } +} diff --git a/src/env/mod.rs b/src/env/mod.rs index 43650ff2..6f9a88a8 100644 --- a/src/env/mod.rs +++ b/src/env/mod.rs @@ -6,6 +6,7 @@ use std::sync::Arc; mod default; mod hedged; +mod log_fd; mod obfuscated; pub use default::DefaultFileSystem; diff --git a/src/file_pipe_log/mod.rs b/src/file_pipe_log/mod.rs index 0ff8c02a..3bad1446 100644 --- a/src/file_pipe_log/mod.rs +++ b/src/file_pipe_log/mod.rs @@ -220,7 +220,7 @@ pub mod debug { for batch in bs.iter_mut() { let offset = writer.offset() as u64; let len = batch - .finish_populate(1 /* compression_threshold */) + .finish_populate(1 /* compression_threshold */, None) .unwrap(); batch.prepare_write(&log_file_format).unwrap(); writer diff --git a/src/file_pipe_log/pipe.rs b/src/file_pipe_log/pipe.rs index 5a5916ea..09bc1f42 100644 --- a/src/file_pipe_log/pipe.rs +++ b/src/file_pipe_log/pipe.rs @@ -33,7 +33,6 @@ pub const DEFAULT_PATH_ID: PathId = 0; /// compatibility. pub const DEFAULT_FIRST_FILE_SEQ: FileSeq = 1; -#[derive(Debug)] pub struct File { pub seq: FileSeq, pub handle: Arc, @@ -175,6 +174,10 @@ impl SinglePipe { /// filesystem. fn sync_dir(&self, path_id: PathId) -> Result<()> { debug_assert!(!self.paths.is_empty()); + + // Skip syncing directory in Windows. Refer to badger's discussion for more + // detail: https://github.com/dgraph-io/badger/issues/699 + #[cfg(not(windows))] std::fs::File::open(PathBuf::from(&self.paths[path_id])).and_then(|d| d.sync_all())?; Ok(()) } diff --git a/src/filter.rs b/src/filter.rs index 8a4fb4b7..f992d788 100644 --- a/src/filter.rs +++ b/src/filter.rs @@ -315,7 +315,7 @@ impl RhaiFilterMachine { } // Batch 64KB. if log_batch.approximate_size() >= 64 * 1024 { - log_batch.finish_populate(0 /* compression_threshold */)?; + log_batch.finish_populate(0 /* compression_threshold */, None)?; log_batch.prepare_write(&log_file_context)?; writer.write( log_batch.encoded_bytes(), @@ -325,7 +325,7 @@ impl RhaiFilterMachine { } } if !log_batch.is_empty() { - log_batch.finish_populate(0 /* compression_threshold */)?; + log_batch.finish_populate(0 /* compression_threshold */, None)?; log_batch.prepare_write(&log_file_context)?; writer.write( log_batch.encoded_bytes(), diff --git a/src/fork.rs b/src/fork.rs index 1bfc1251..cab65a92 100644 --- a/src/fork.rs +++ b/src/fork.rs @@ -1,10 +1,14 @@ // Copyright (c) 2023-present, PingCAP, Inc. Licensed under Apache-2.0. use std::fs::{copy, create_dir_all}; -use std::os::unix::fs::symlink; use std::path::Path; use std::sync::Arc; +#[cfg(not(windows))] +use std::os::unix::fs::symlink; +#[cfg(windows)] +use std::os::windows::fs::symlink_file as symlink; + use crate::config::{Config, RecoveryMode}; use crate::env::FileSystem; use crate::file_pipe_log::{FileNameExt, FilePipeLog, FilePipeLogBuilder}; diff --git a/src/lib.rs b/src/lib.rs index 9a786238..f282f6cc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,6 +16,8 @@ #![cfg_attr(feature = "nightly", feature(test))] #![cfg_attr(feature = "swap", feature(allocator_api))] #![cfg_attr(feature = "swap", feature(slice_ptr_get))] +// Though the new nightly rust stablized this feature, keep it anyway +// because some other project (like TiKV) is still using the old. #![cfg_attr(feature = "swap", feature(nonnull_slice_from_raw_parts))] #![cfg_attr(feature = "swap", feature(slice_ptr_len))] #![cfg_attr(feature = "swap", feature(alloc_layout_extra))] @@ -165,6 +167,7 @@ impl GlobalStats { pub(crate) const INTERNAL_KEY_PREFIX: &[u8] = b"__"; #[inline] +#[cfg(test)] pub(crate) fn make_internal_key(k: &[u8]) -> Vec { assert!(!k.is_empty()); let mut v = INTERNAL_KEY_PREFIX.to_vec(); @@ -172,12 +175,23 @@ pub(crate) fn make_internal_key(k: &[u8]) -> Vec { v } +#[cfg(not(test))] +pub(crate) fn make_internal_key(k: &[u8]) -> Vec { + use log_batch::ATOMIC_GROUP_KEY; + + assert!(k == ATOMIC_GROUP_KEY); + let mut v = INTERNAL_KEY_PREFIX.to_vec(); + v.extend_from_slice(k); + v +} + /// We ensure internal keys are not visible to the user by: /// (1) Writing internal keys will be rejected by `LogBatch::put`. /// (2) Internal keys are filtered out during apply and replay of both queues. /// This also makes sure future internal keys under the prefix won't become /// visible after downgrading. #[inline] +#[cfg(test)] pub(crate) fn is_internal_key(s: &[u8], ext: Option<&[u8]>) -> bool { if let Some(ext) = ext { s.len() == INTERNAL_KEY_PREFIX.len() + ext.len() @@ -189,6 +203,20 @@ pub(crate) fn is_internal_key(s: &[u8], ext: Option<&[u8]>) -> bool { } } +#[inline] +#[cfg(not(test))] +pub(crate) fn is_internal_key(s: &[u8], ext: Option<&[u8]>) -> bool { + use log_batch::ATOMIC_GROUP_KEY; + + if let Some(ext) = ext { + s.len() == INTERNAL_KEY_PREFIX.len() + ext.len() + && s[..INTERNAL_KEY_PREFIX.len()] == *INTERNAL_KEY_PREFIX + && s[INTERNAL_KEY_PREFIX.len()..] == *ext + } else { + is_internal_key(s, Some(ATOMIC_GROUP_KEY)) + } +} + #[cfg(test)] mod tests { use crate::log_batch::MessageExt; diff --git a/src/log_batch.rs b/src/log_batch.rs index b12cf727..c6ce147c 100644 --- a/src/log_batch.rs +++ b/src/log_batch.rs @@ -383,11 +383,6 @@ impl LogItemBatch { self.items.drain(..) } - pub fn push(&mut self, item: LogItem) { - self.item_size += item.approximate_size(); - self.items.push(item); - } - pub fn merge(&mut self, rhs: &mut LogItemBatch) { for item in &mut rhs.items { if let LogItemContent::EntryIndexes(entry_indexes) = &mut item.content { @@ -768,7 +763,11 @@ impl LogBatch { /// /// Internally, encodes and optionally compresses log entries. Sets the /// compression type to each entry index. - pub(crate) fn finish_populate(&mut self, compression_threshold: usize) -> Result { + pub(crate) fn finish_populate( + &mut self, + compression_threshold: usize, + compression_level: Option, + ) -> Result { let _t = StopWatch::new(perf_context!(log_populating_duration)); debug_assert!(self.buf_state == BufState::Open); if self.is_empty() { @@ -782,7 +781,11 @@ impl LogBatch { && self.buf.len() >= LOG_BATCH_HEADER_LEN + compression_threshold { let buf_len = self.buf.len(); - lz4::append_compress_block(&mut self.buf, LOG_BATCH_HEADER_LEN)?; + lz4::append_compress_block( + &mut self.buf, + LOG_BATCH_HEADER_LEN, + compression_level.unwrap_or(lz4::DEFAULT_LZ4_COMPRESSION_LEVEL), + )?; (buf_len - LOG_BATCH_HEADER_LEN, CompressionType::Lz4) } else { (0, CompressionType::None) @@ -991,7 +994,7 @@ fn verify_checksum_with_signature(buf: &[u8], signature: Option) -> Result< lazy_static! { static ref ATOMIC_GROUP_ID: Arc = Arc::new(AtomicU64::new(0)); } -const ATOMIC_GROUP_KEY: &[u8] = &[0x01]; +pub(crate) const ATOMIC_GROUP_KEY: &[u8] = &[0x01]; // const ATOMIC_GROUP_VALUE_LEN: usize = 1; @@ -1325,7 +1328,7 @@ mod tests { offset: 0, }; let old_approximate_size = batch.approximate_size(); - let len = batch.finish_populate(usize::from(compress)).unwrap(); + let len = batch.finish_populate(usize::from(compress), None).unwrap(); assert!(old_approximate_size >= len); assert_eq!(batch.approximate_size(), len); let mut batch_handle = mocked_file_block_handle; @@ -1490,7 +1493,7 @@ mod tests { batch1.merge(&mut batch2).unwrap(); assert!(batch2.is_empty()); - let len = batch1.finish_populate(0).unwrap(); + let len = batch1.finish_populate(0, None).unwrap(); batch1.prepare_write(&file_context).unwrap(); let encoded = batch1.encoded_bytes(); assert_eq!(len, encoded.len()); @@ -1546,7 +1549,7 @@ mod tests { offset: 0, }; let buf_len = batch.buf.len(); - let len = batch.finish_populate(1).unwrap(); + let len = batch.finish_populate(1, None).unwrap(); assert!(len == 0); assert_eq!(batch.buf_state, BufState::Encoded(buf_len, 0)); let file_context = LogFileContext::new(mocked_file_block_handles.id, Version::V2); @@ -1567,14 +1570,14 @@ mod tests { )); assert!(matches!( batch - .put_message(0, crate::make_internal_key(&[1]), &Entry::new()) + .put_message(0, crate::make_internal_key(ATOMIC_GROUP_KEY), &Entry::new()) .unwrap_err(), Error::InvalidArgument(_) )); } #[test] - fn test_corruption() { + fn test_header_corruption() { let region_id = 7; let data = vec![b'x'; 16]; let mut batch = LogBatch::default(); @@ -1585,7 +1588,7 @@ mod tests { .put(region_id, b"key".to_vec(), b"value".to_vec()) .unwrap(); // enable compression so that len_and_type > len. - batch.finish_populate(1).unwrap(); + batch.finish_populate(1, None).unwrap(); let file_context = LogFileContext::new(FileId::dummy(LogQueue::Append), Version::default()); batch.prepare_write(&file_context).unwrap(); let encoded = batch.encoded_bytes(); @@ -1626,7 +1629,7 @@ mod tests { .add_entries::(thread_rng().gen(), entries) .unwrap(); } - log_batch.finish_populate(0).unwrap(); + log_batch.finish_populate(0, None).unwrap(); let _ = log_batch.drain(); } let data: Vec = (0..128).map(|_| thread_rng().gen()).collect(); @@ -1668,7 +1671,7 @@ mod tests { }, ]; let old_approximate_size = batch.approximate_size(); - let len = batch.finish_populate(1).unwrap(); + let len = batch.finish_populate(1, None).unwrap(); assert!(old_approximate_size >= len); assert_eq!(batch.approximate_size(), len); let checksum = batch.item_batch.checksum; @@ -1680,7 +1683,7 @@ mod tests { batch_handle.len = len; let file_context = LogFileContext::new(batch_handle.id, Version::V2); batch.prepare_write(&file_context).unwrap(); - // batch.finish_write(batch_handle); + assert_eq!(batch.approximate_size(), len); let encoded = batch.encoded_bytes(); assert_eq!(encoded.len(), len); let mut bytes_slice = encoded; diff --git a/src/memtable.rs b/src/memtable.rs index d46ba68b..7a0ea41b 100644 --- a/src/memtable.rs +++ b/src/memtable.rs @@ -989,7 +989,7 @@ impl MemTableAccessor { /// [`MemTable`]s. /// /// This method is only used for recovery. - #[allow(dead_code)] + #[cfg(test)] pub fn cleaned_region_ids(&self) -> HashSet { let mut ids = HashSet::default(); let removed_memtables = self.removed_memtables.lock(); @@ -1202,7 +1202,6 @@ fn has_internal_key(item: &LogItem) -> bool { matches!(&item.content, LogItemContent::Kv(KeyValue { key, .. }) if crate::is_internal_key(key, None)) } -#[derive(Debug)] struct PendingAtomicGroup { status: AtomicGroupStatus, items: Vec, @@ -2256,6 +2255,7 @@ mod tests { 7, FileId::new(LogQueue::Rewrite, 1), )); + memtable.replay_rewrite(Vec::new()); } } memtable diff --git a/src/metrics.rs b/src/metrics.rs index b2304b1a..6ca10940 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -105,7 +105,7 @@ where } } -#[macro_export(crate)] +#[macro_export] macro_rules! perf_context { ($field: ident) => { $crate::metrics::PerfContextField::new(|perf_context| &mut perf_context.$field) diff --git a/src/pipe_log.rs b/src/pipe_log.rs index 57e94d1e..33ca4071 100644 --- a/src/pipe_log.rs +++ b/src/pipe_log.rs @@ -118,7 +118,6 @@ impl Display for Version { } } -#[derive(Debug, Clone)] pub struct LogFileContext { pub id: FileId, pub version: Version, diff --git a/src/purge.rs b/src/purge.rs index cb76f776..b1183438 100644 --- a/src/purge.rs +++ b/src/purge.rs @@ -103,12 +103,10 @@ where }); // Ordering - // 1. Must rewrite tombstones AFTER acquiring - // `append_queue_barrier`, or deletion marks might be lost - // after restart. - // 2. Must rewrite tombstones BEFORE rewrite entries, or - // entries from recreated region might be lost after - // restart. + // 1. Must rewrite tombstones AFTER acquiring `append_queue_barrier`, or + // deletion marks might be lost after restart. + // 2. Must rewrite tombstones BEFORE rewrite entries, or entries from recreated + // region might be lost after restart. self.rewrite_append_queue_tombstones()?; should_compact.extend(self.rewrite_or_compact_append_queue( rewrite_watermark, @@ -435,7 +433,10 @@ where self.pipe_log.sync(LogQueue::Rewrite)?; return Ok(None); } - log_batch.finish_populate(self.cfg.batch_compression_threshold.0 as usize)?; + log_batch.finish_populate( + self.cfg.batch_compression_threshold.0 as usize, + self.cfg.compression_level, + )?; let file_handle = self.pipe_log.append(LogQueue::Rewrite, log_batch)?; if sync { self.pipe_log.sync(LogQueue::Rewrite)? diff --git a/src/swappy_allocator.rs b/src/swappy_allocator.rs index 9661dc4b..06e02154 100644 --- a/src/swappy_allocator.rs +++ b/src/swappy_allocator.rs @@ -20,7 +20,7 @@ const DEFAULT_PAGE_SIZE: usize = 64 * 1024 * 1024; // 64MB struct SwappyAllocatorCore where - A: Allocator, + A: Allocator + Send + Sync, { budget: usize, path: PathBuf, @@ -39,9 +39,9 @@ where /// The allocations of its internal metadata are not managed (i.e. allocated via /// `std::alloc::Global`). Do NOT use it as the global allocator. #[derive(Clone)] -pub struct SwappyAllocator(Arc>); +pub struct SwappyAllocator(Arc>); -impl SwappyAllocator { +impl SwappyAllocator { pub fn new_over(path: &Path, budget: usize, alloc: A) -> SwappyAllocator { if path.exists() { if let Err(e) = std::fs::remove_dir_all(path) { @@ -106,7 +106,7 @@ impl SwappyAllocator { } } -unsafe impl Allocator for SwappyAllocator { +unsafe impl Allocator for SwappyAllocator { #[inline] fn allocate(&self, layout: Layout) -> Result, AllocError> { // Always use mem_allocator to allocate empty pointer. @@ -342,6 +342,13 @@ impl Page { #[inline] fn release(self, root: &Path) { debug_assert_eq!(self.ref_counter, 0); + + // Somehow in Windows, we have to drop the mmap file handle first, otherwise + // the following file removal will return "Access Denied (OS Error 5)". + // Not using `#[cfg(windows)]` here is because it might do no harm in other + // operating systems - the mmap file handle is dropped anyhow. + drop(self.mmap); + let path = root.join(Self::page_file_name(self.seq)); if let Err(e) = std::fs::remove_file(path) { warn!("Failed to delete swap file: {e}"); @@ -616,14 +623,14 @@ mod tests { assert_eq!(allocator.memory_usage(), 16); assert_eq!(global.stats(), (2, 1, 0, 0)); // Deallocate all pages, calls when memory use is low. - disk_vec.clear(); + disk_vec.truncate(1); disk_vec.shrink_to_fit(); - assert_eq!(allocator.memory_usage(), 16); + assert_eq!(allocator.memory_usage(), 16 + 1); assert_eq!(global.stats(), (3, 1, 0, 0)); assert_eq!(file_count(dir.path()), 0); // Grow calls now. mem_vec.resize(32, 0); - assert_eq!(allocator.memory_usage(), 32); + assert_eq!(allocator.memory_usage(), 32 + 1); assert_eq!(global.stats(), (3, 1, 1, 0)); } @@ -1134,7 +1141,7 @@ mod tests { { // issue-58952 let c = 2; - let bv = vec![2]; + let bv = [2]; let b = bv.iter().filter(|a| **a == c); let _a = collect( @@ -1159,8 +1166,8 @@ mod tests { } { // issue-54477 - let mut vecdeque_13 = collect(vec![].into_iter(), allocator.clone()); - let mut vecdeque_29 = collect(vec![0].into_iter(), allocator.clone()); + let mut vecdeque_13 = collect(vec![], allocator.clone()); + let mut vecdeque_29 = collect(vec![0], allocator.clone()); vecdeque_29.insert(0, 30); vecdeque_29.insert(1, 31); vecdeque_29.insert(2, 32); @@ -1172,7 +1179,7 @@ mod tests { assert_eq!( vecdeque_13, - collect(vec![30, 31, 32, 33, 34, 35, 0].into_iter(), allocator,) + collect(vec![30, 31, 32, 33, 34, 35, 0], allocator,) ); } diff --git a/src/util.rs b/src/util.rs index b86d7813..2e35a83e 100644 --- a/src/util.rs +++ b/src/util.rs @@ -223,8 +223,10 @@ pub mod lz4 { use crate::{Error, Result}; use std::{i32, ptr}; + pub const DEFAULT_LZ4_COMPRESSION_LEVEL: usize = 1; + /// Compress content in `buf[skip..]`, and append output to `buf`. - pub fn append_compress_block(buf: &mut Vec, skip: usize) -> Result<()> { + pub fn append_compress_block(buf: &mut Vec, skip: usize, level: usize) -> Result<()> { let buf_len = buf.len(); let content_len = buf_len - skip; if content_len > 0 { @@ -244,11 +246,12 @@ pub mod lz4 { let le_len = content_len.to_le_bytes(); ptr::copy_nonoverlapping(le_len.as_ptr(), buf_ptr.add(buf_len), 4); - let compressed = lz4_sys::LZ4_compress_default( + let compressed = lz4_sys::LZ4_compress_fast( buf_ptr.add(skip) as _, buf_ptr.add(buf_len + 4) as _, content_len as i32, bound, + level as i32, ); if compressed == 0 { return Err(Error::Other(box_err!("Compression failed"))); @@ -283,7 +286,7 @@ pub mod lz4 { } } else if !src.is_empty() { Err(Error::Corruption(format!( - "Content to compress to short {}", + "Content to compress too short {}", src.len() ))) } else { @@ -298,7 +301,8 @@ pub mod lz4 { let vecs: Vec> = vec![b"".to_vec(), b"123".to_vec(), b"12345678910".to_vec()]; for mut vec in vecs.into_iter() { let uncompressed_len = vec.len(); - super::append_compress_block(&mut vec, 0).unwrap(); + super::append_compress_block(&mut vec, 0, super::DEFAULT_LZ4_COMPRESSION_LEVEL) + .unwrap(); let res = super::decompress_block(&vec[uncompressed_len..]).unwrap(); assert_eq!(res, vec[..uncompressed_len].to_owned()); } diff --git a/stress/Cargo.toml b/stress/Cargo.toml index e89c92a5..79d131e2 100644 --- a/stress/Cargo.toml +++ b/stress/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "stress" -version = "0.3.0" +version = "0.4.1" authors = ["The TiKV Authors"] edition = "2018" diff --git a/tests/failpoints/test_engine.rs b/tests/failpoints/test_engine.rs index ecb925c2..b463d9e3 100644 --- a/tests/failpoints/test_engine.rs +++ b/tests/failpoints/test_engine.rs @@ -794,7 +794,7 @@ fn test_partial_rewrite_rewrite() { } { - let _f = FailGuard::new("log_fd::write::err", "10*off->return->off"); + let _f = FailGuard::new("log_file::write::err", "10*off->return->off"); assert!( catch_unwind_silent(|| engine.purge_manager().must_rewrite_rewrite_queue()).is_err() ); @@ -832,7 +832,7 @@ fn test_partial_rewrite_rewrite_online() { assert_eq!(engine.file_span(LogQueue::Append).0, old_active_file + 1); { - let _f = FailGuard::new("log_fd::write::err", "10*off->return->off"); + let _f = FailGuard::new("log_file::write::err", "10*off->return->off"); assert!( catch_unwind_silent(|| engine.purge_manager().must_rewrite_rewrite_queue()).is_err() ); @@ -899,13 +899,13 @@ fn test_split_rewrite_batch_imp(regions: u64, region_size: u64, split_size: u64, let count = AtomicU64::new(0); fail::cfg_callback("atomic_group::begin", move || { if count.fetch_add(1, Ordering::Relaxed) + 1 == i { - fail::cfg("log_fd::write::err", "return").unwrap(); + fail::cfg("log_file::write::err", "return").unwrap(); } }) .unwrap(); let r = catch_unwind_silent(|| engine.purge_manager().must_rewrite_rewrite_queue()); fail::remove("atomic_group::begin"); - fail::remove("log_fd::write::err"); + fail::remove("log_file::write::err"); if r.is_ok() { break; } @@ -919,13 +919,13 @@ fn test_split_rewrite_batch_imp(regions: u64, region_size: u64, split_size: u64, let count = AtomicU64::new(0); fail::cfg_callback("atomic_group::add", move || { if count.fetch_add(1, Ordering::Relaxed) + 1 == i { - fail::cfg("log_fd::write::err", "return").unwrap(); + fail::cfg("log_file::write::err", "return").unwrap(); } }) .unwrap(); let r = catch_unwind_silent(|| engine.purge_manager().must_rewrite_rewrite_queue()); fail::remove("atomic_group::add"); - fail::remove("log_fd::write::err"); + fail::remove("log_file::write::err"); if r.is_ok() { break; } diff --git a/tests/failpoints/test_io_error.rs b/tests/failpoints/test_io_error.rs index e3d3d574..d24ab049 100644 --- a/tests/failpoints/test_io_error.rs +++ b/tests/failpoints/test_io_error.rs @@ -23,11 +23,11 @@ fn test_file_open_error() { let fs = Arc::new(ObfuscatedFileSystem::default()); { - let _f = FailGuard::new("log_fd::create::err", "return"); + let _f = FailGuard::new("default_fs::create::err", "return"); assert!(Engine::open_with_file_system(cfg.clone(), fs.clone()).is_err()); } { - let _f = FailGuard::new("log_fd::open::err", "return"); + let _f = FailGuard::new("default_fs::open::err", "return"); let _ = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); assert!(Engine::open_with_file_system(cfg, fs).is_err()); } @@ -66,7 +66,7 @@ fn test_file_read_error() { engine.write(&mut kv_batch, true).unwrap(); let mut entries = Vec::new(); - let _f = FailGuard::new("log_fd::read::err", "return"); + let _f = FailGuard::new("log_file::read::err", "return"); engine .fetch_entries_to::(1, 0, 1, None, &mut entries) .unwrap(); @@ -95,7 +95,7 @@ fn test_file_write_error() { .write(&mut generate_batch(1, 1, 2, Some(&entry)), false) .unwrap(); { - let _f = FailGuard::new("log_fd::write::err", "return"); + let _f = FailGuard::new("log_file::write::err", "return"); engine .write(&mut generate_batch(1, 2, 3, Some(&entry)), false) .unwrap_err(); @@ -164,7 +164,7 @@ fn test_file_rotate_error() { } { // Fail to create new log file. - let _f = FailGuard::new("log_fd::create::err", "return"); + let _f = FailGuard::new("default_fs::create::err", "return"); assert!(catch_unwind_silent(|| { let _ = engine.write(&mut generate_batch(1, 4, 5, Some(&entry)), false); }) @@ -173,7 +173,7 @@ fn test_file_rotate_error() { } { // Fail to write header of new log file. - let _f = FailGuard::new("log_fd::write::err", "1*off->return"); + let _f = FailGuard::new("log_file::write::err", "1*off->return"); assert!(catch_unwind_silent(|| { let _ = engine.write(&mut generate_batch(1, 4, 5, Some(&entry)), false); }) @@ -220,7 +220,7 @@ fn test_concurrent_write_error() { let mut ctx = ConcurrentWriteContext::new(engine.clone()); // The second of three writes will fail. - fail::cfg("log_fd::write::err", "1*off->1*return->off").unwrap(); + fail::cfg("log_file::write::err", "1*off->1*return->off").unwrap(); let entry_clone = entry.clone(); ctx.write_ext(move |e| { e.write(&mut generate_batch(1, 1, 11, Some(&entry_clone)), false) @@ -258,8 +258,8 @@ fn test_concurrent_write_error() { ); { - let _f1 = FailGuard::new("log_fd::write::err", "return"); - let _f2 = FailGuard::new("log_fd::truncate::err", "return"); + let _f1 = FailGuard::new("log_file::write::err", "return"); + let _f2 = FailGuard::new("log_file::truncate::err", "return"); let entry_clone = entry.clone(); ctx.write_ext(move |e| { catch_unwind_silent(|| { @@ -305,7 +305,7 @@ fn test_non_atomic_write_error() { { // Write partially succeeds. We can reopen. let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); - let _f1 = FailGuard::new("log_fd::write::err", "return"); + let _f1 = FailGuard::new("log_file::write::err", "return"); engine .write(&mut generate_batch(rid, 0, 1, Some(&entry)), true) .unwrap_err(); @@ -317,7 +317,7 @@ fn test_non_atomic_write_error() { { // Write partially succeeds. We can overwrite. let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); - let _f1 = FailGuard::new("log_fd::write::err", "1*off->1*return->off"); + let _f1 = FailGuard::new("log_file::write::err", "1*off->1*return->off"); engine .write(&mut generate_batch(rid, 0, 1, Some(&entry)), true) .unwrap_err(); @@ -333,7 +333,7 @@ fn test_non_atomic_write_error() { { // Write partially succeeds and can't be reverted. We panic. let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); - let _f1 = FailGuard::new("log_fd::write::err", "return"); + let _f1 = FailGuard::new("log_file::write::err", "return"); let _f2 = FailGuard::new("log_file::seek::err", "return"); assert!(catch_unwind_silent(|| { engine @@ -378,7 +378,7 @@ fn test_error_during_repair() { " .to_owned(); { - let _f = FailGuard::new("log_fd::write::err", "return"); + let _f = FailGuard::new("log_file::write::err", "return"); assert!( Engine::unsafe_repair_with_file_system(dir.path(), None, script, fs.clone()).is_err() ); @@ -429,7 +429,7 @@ fn test_file_allocate_error() { let fs = Arc::new(ObfuscatedFileSystem::default()); let entry = vec![b'x'; 1024]; { - let _f = FailGuard::new("log_fd::allocate::err", "return"); + let _f = FailGuard::new("log_file::allocate::err", "return"); let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); engine .write(&mut generate_batch(1, 1, 5, Some(&entry)), true) @@ -458,7 +458,7 @@ fn test_start_with_recycled_file_allocate_error() { // Mock that the engine starts with the circumstance where // the pref-reserved file with seqno[5] failed to be generated. { - let _f = FailGuard::new("log_fd::write::zero", "4*off->1*return->off"); + let _f = FailGuard::new("log_file::write::zero", "4*off->1*return->off"); Engine::open(cfg.clone()).unwrap(); } // Extra recycled files have been supplemented.