diff --git a/CHANGELOG.md b/CHANGELOG.md index c7e3c9a6..c27405d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ * Support preparing prefilled logs to enable log recycling when start-up. The amount of logs to prepare is controlled by `Config::prefill_limit`. * Add a new configuration `spill-dir` to allow automatic placement of logs into an auxiliary directory when `dir` is full. * Add a new method `Engine::fork` to duplicate an `Engine` to a new place, with a few disk file copies. +* Support configuring lz4 acceleration factor with `compression-level`. ## [0.3.0] - 2022-09-14 diff --git a/Cargo.toml b/Cargo.toml index 973335ca..177ae280 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,7 +61,7 @@ strum = { version = "0.25.0", features = ["derive"] } thiserror = "1.0" [dev-dependencies] -criterion = "0.5" +criterion = "0.4" ctor = "0.2" env_logger = "0.10" kvproto = { git = "https://github.com/pingcap/kvproto.git", default-features = false, features = ["protobuf-codec"] } diff --git a/src/config.rs b/src/config.rs index fd467571..30bf4082 100644 --- a/src/config.rs +++ b/src/config.rs @@ -58,6 +58,12 @@ pub struct Config { /// /// Default: "8KB" pub batch_compression_threshold: ReadableSize, + /// Acceleration factor for LZ4 compression. It can be fine tuned, with each + /// successive value providing roughly +~3% to speed. The value will be + /// capped within [1, 65537] by LZ4. + /// + /// Default: 1. + pub compression_level: Option, /// Deprecated. /// Incrementally sync log files after specified bytes have been written. /// Setting it to zero disables incremental sync. @@ -127,6 +133,7 @@ impl Default for Config { recovery_read_block_size: ReadableSize::kb(16), recovery_threads: 4, batch_compression_threshold: ReadableSize::kb(8), + compression_level: None, bytes_per_sync: None, format_version: Version::V2, target_file_size: ReadableSize::mb(128), diff --git a/src/engine.rs b/src/engine.rs index 42e55821..17321649 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -142,7 +142,10 @@ where return Ok(0); } let start = Instant::now(); - let len = log_batch.finish_populate(self.cfg.batch_compression_threshold.0 as usize)?; + let len = log_batch.finish_populate( + self.cfg.batch_compression_threshold.0 as usize, + self.cfg.compression_level, + )?; debug_assert!(len > 0); let mut attempt_count = 0_u64; @@ -2430,7 +2433,7 @@ pub(crate) mod tests { // Directly write to pipe log. let mut log_batch = LogBatch::default(); let flush = |lb: &mut LogBatch| { - lb.finish_populate(0).unwrap(); + lb.finish_populate(0, None).unwrap(); engine.pipe_log.append(LogQueue::Rewrite, lb).unwrap(); lb.drain(); }; @@ -2580,7 +2583,7 @@ pub(crate) mod tests { log_batch.put_unchecked(3, crate::make_internal_key(&[1]), value.clone()); log_batch.put_unchecked(4, crate::make_internal_key(&[1]), value); - log_batch.finish_populate(0).unwrap(); + log_batch.finish_populate(0, None).unwrap(); let block_handle = engine .pipe_log .append(LogQueue::Rewrite, &mut log_batch) diff --git a/src/file_pipe_log/mod.rs b/src/file_pipe_log/mod.rs index fd7c5036..64042e01 100644 --- a/src/file_pipe_log/mod.rs +++ b/src/file_pipe_log/mod.rs @@ -219,7 +219,7 @@ pub mod debug { for batch in bs.iter_mut() { let offset = writer.offset() as u64; let len = batch - .finish_populate(1 /* compression_threshold */) + .finish_populate(1 /* compression_threshold */, None) .unwrap(); batch.prepare_write(&log_file_format).unwrap(); writer diff --git a/src/filter.rs b/src/filter.rs index 8a4fb4b7..f992d788 100644 --- a/src/filter.rs +++ b/src/filter.rs @@ -315,7 +315,7 @@ impl RhaiFilterMachine { } // Batch 64KB. if log_batch.approximate_size() >= 64 * 1024 { - log_batch.finish_populate(0 /* compression_threshold */)?; + log_batch.finish_populate(0 /* compression_threshold */, None)?; log_batch.prepare_write(&log_file_context)?; writer.write( log_batch.encoded_bytes(), @@ -325,7 +325,7 @@ impl RhaiFilterMachine { } } if !log_batch.is_empty() { - log_batch.finish_populate(0 /* compression_threshold */)?; + log_batch.finish_populate(0 /* compression_threshold */, None)?; log_batch.prepare_write(&log_file_context)?; writer.write( log_batch.encoded_bytes(), diff --git a/src/log_batch.rs b/src/log_batch.rs index b12cf727..33587ea9 100644 --- a/src/log_batch.rs +++ b/src/log_batch.rs @@ -768,7 +768,11 @@ impl LogBatch { /// /// Internally, encodes and optionally compresses log entries. Sets the /// compression type to each entry index. - pub(crate) fn finish_populate(&mut self, compression_threshold: usize) -> Result { + pub(crate) fn finish_populate( + &mut self, + compression_threshold: usize, + compression_level: Option, + ) -> Result { let _t = StopWatch::new(perf_context!(log_populating_duration)); debug_assert!(self.buf_state == BufState::Open); if self.is_empty() { @@ -782,7 +786,11 @@ impl LogBatch { && self.buf.len() >= LOG_BATCH_HEADER_LEN + compression_threshold { let buf_len = self.buf.len(); - lz4::append_compress_block(&mut self.buf, LOG_BATCH_HEADER_LEN)?; + lz4::append_compress_block( + &mut self.buf, + LOG_BATCH_HEADER_LEN, + compression_level.unwrap_or(lz4::DEFAULT_LZ4_COMPRESSION_LEVEL), + )?; (buf_len - LOG_BATCH_HEADER_LEN, CompressionType::Lz4) } else { (0, CompressionType::None) @@ -1325,7 +1333,7 @@ mod tests { offset: 0, }; let old_approximate_size = batch.approximate_size(); - let len = batch.finish_populate(usize::from(compress)).unwrap(); + let len = batch.finish_populate(usize::from(compress), None).unwrap(); assert!(old_approximate_size >= len); assert_eq!(batch.approximate_size(), len); let mut batch_handle = mocked_file_block_handle; @@ -1490,7 +1498,7 @@ mod tests { batch1.merge(&mut batch2).unwrap(); assert!(batch2.is_empty()); - let len = batch1.finish_populate(0).unwrap(); + let len = batch1.finish_populate(0, None).unwrap(); batch1.prepare_write(&file_context).unwrap(); let encoded = batch1.encoded_bytes(); assert_eq!(len, encoded.len()); @@ -1546,7 +1554,7 @@ mod tests { offset: 0, }; let buf_len = batch.buf.len(); - let len = batch.finish_populate(1).unwrap(); + let len = batch.finish_populate(1, None).unwrap(); assert!(len == 0); assert_eq!(batch.buf_state, BufState::Encoded(buf_len, 0)); let file_context = LogFileContext::new(mocked_file_block_handles.id, Version::V2); @@ -1585,7 +1593,7 @@ mod tests { .put(region_id, b"key".to_vec(), b"value".to_vec()) .unwrap(); // enable compression so that len_and_type > len. - batch.finish_populate(1).unwrap(); + batch.finish_populate(1, None).unwrap(); let file_context = LogFileContext::new(FileId::dummy(LogQueue::Append), Version::default()); batch.prepare_write(&file_context).unwrap(); let encoded = batch.encoded_bytes(); @@ -1626,7 +1634,7 @@ mod tests { .add_entries::(thread_rng().gen(), entries) .unwrap(); } - log_batch.finish_populate(0).unwrap(); + log_batch.finish_populate(0, None).unwrap(); let _ = log_batch.drain(); } let data: Vec = (0..128).map(|_| thread_rng().gen()).collect(); @@ -1668,7 +1676,7 @@ mod tests { }, ]; let old_approximate_size = batch.approximate_size(); - let len = batch.finish_populate(1).unwrap(); + let len = batch.finish_populate(1, None).unwrap(); assert!(old_approximate_size >= len); assert_eq!(batch.approximate_size(), len); let checksum = batch.item_batch.checksum; diff --git a/src/purge.rs b/src/purge.rs index 7fd3b777..b1183438 100644 --- a/src/purge.rs +++ b/src/purge.rs @@ -433,7 +433,10 @@ where self.pipe_log.sync(LogQueue::Rewrite)?; return Ok(None); } - log_batch.finish_populate(self.cfg.batch_compression_threshold.0 as usize)?; + log_batch.finish_populate( + self.cfg.batch_compression_threshold.0 as usize, + self.cfg.compression_level, + )?; let file_handle = self.pipe_log.append(LogQueue::Rewrite, log_batch)?; if sync { self.pipe_log.sync(LogQueue::Rewrite)? diff --git a/src/util.rs b/src/util.rs index b86d7813..363ace71 100644 --- a/src/util.rs +++ b/src/util.rs @@ -223,8 +223,10 @@ pub mod lz4 { use crate::{Error, Result}; use std::{i32, ptr}; + pub const DEFAULT_LZ4_COMPRESSION_LEVEL: usize = 1; + /// Compress content in `buf[skip..]`, and append output to `buf`. - pub fn append_compress_block(buf: &mut Vec, skip: usize) -> Result<()> { + pub fn append_compress_block(buf: &mut Vec, skip: usize, level: usize) -> Result<()> { let buf_len = buf.len(); let content_len = buf_len - skip; if content_len > 0 { @@ -244,11 +246,12 @@ pub mod lz4 { let le_len = content_len.to_le_bytes(); ptr::copy_nonoverlapping(le_len.as_ptr(), buf_ptr.add(buf_len), 4); - let compressed = lz4_sys::LZ4_compress_default( + let compressed = lz4_sys::LZ4_compress_fast( buf_ptr.add(skip) as _, buf_ptr.add(buf_len + 4) as _, content_len as i32, bound, + level as i32, ); if compressed == 0 { return Err(Error::Other(box_err!("Compression failed"))); @@ -298,7 +301,8 @@ pub mod lz4 { let vecs: Vec> = vec![b"".to_vec(), b"123".to_vec(), b"12345678910".to_vec()]; for mut vec in vecs.into_iter() { let uncompressed_len = vec.len(); - super::append_compress_block(&mut vec, 0).unwrap(); + super::append_compress_block(&mut vec, 0, super::DEFAULT_LZ4_COMPRESSION_LEVEL) + .unwrap(); let res = super::decompress_block(&vec[uncompressed_len..]).unwrap(); assert_eq!(res, vec[..uncompressed_len].to_owned()); }