Skip to content

Commit

Permalink
support configuring compression level (#311)
Browse files Browse the repository at this point in the history
* support config compression level

Signed-off-by: tabokie <[email protected]>

* lock version

Signed-off-by: tabokie <[email protected]>

* update doc

Signed-off-by: tabokie <[email protected]>

* update changelog

Signed-off-by: tabokie <[email protected]>

* address comment

Signed-off-by: tabokie <[email protected]>

---------

Signed-off-by: tabokie <[email protected]>
  • Loading branch information
tabokie authored Sep 1, 2023
1 parent 2dcaf5b commit d4943a9
Show file tree
Hide file tree
Showing 9 changed files with 45 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* Support preparing prefilled logs to enable log recycling when start-up. The amount of logs to prepare is controlled by `Config::prefill_limit`.
* Add a new configuration `spill-dir` to allow automatic placement of logs into an auxiliary directory when `dir` is full.
* Add a new method `Engine::fork` to duplicate an `Engine` to a new place, with a few disk file copies.
* Support configuring lz4 acceleration factor with `compression-level`.

## [0.3.0] - 2022-09-14

Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ strum = { version = "0.25.0", features = ["derive"] }
thiserror = "1.0"

[dev-dependencies]
criterion = "0.5"
criterion = "0.4"
ctor = "0.2"
env_logger = "0.10"
kvproto = { git = "https://github.com/pingcap/kvproto.git", default-features = false, features = ["protobuf-codec"] }
Expand Down
7 changes: 7 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ pub struct Config {
///
/// Default: "8KB"
pub batch_compression_threshold: ReadableSize,
/// Acceleration factor for LZ4 compression. It can be fine tuned, with each
/// successive value providing roughly +~3% to speed. The value will be
/// capped within [1, 65537] by LZ4.
///
/// Default: 1.
pub compression_level: Option<usize>,
/// Deprecated.
/// Incrementally sync log files after specified bytes have been written.
/// Setting it to zero disables incremental sync.
Expand Down Expand Up @@ -127,6 +133,7 @@ impl Default for Config {
recovery_read_block_size: ReadableSize::kb(16),
recovery_threads: 4,
batch_compression_threshold: ReadableSize::kb(8),
compression_level: None,
bytes_per_sync: None,
format_version: Version::V2,
target_file_size: ReadableSize::mb(128),
Expand Down
9 changes: 6 additions & 3 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,10 @@ where
return Ok(0);
}
let start = Instant::now();
let len = log_batch.finish_populate(self.cfg.batch_compression_threshold.0 as usize)?;
let len = log_batch.finish_populate(
self.cfg.batch_compression_threshold.0 as usize,
self.cfg.compression_level,
)?;
debug_assert!(len > 0);

let mut attempt_count = 0_u64;
Expand Down Expand Up @@ -2430,7 +2433,7 @@ pub(crate) mod tests {
// Directly write to pipe log.
let mut log_batch = LogBatch::default();
let flush = |lb: &mut LogBatch| {
lb.finish_populate(0).unwrap();
lb.finish_populate(0, None).unwrap();
engine.pipe_log.append(LogQueue::Rewrite, lb).unwrap();
lb.drain();
};
Expand Down Expand Up @@ -2580,7 +2583,7 @@ pub(crate) mod tests {

log_batch.put_unchecked(3, crate::make_internal_key(&[1]), value.clone());
log_batch.put_unchecked(4, crate::make_internal_key(&[1]), value);
log_batch.finish_populate(0).unwrap();
log_batch.finish_populate(0, None).unwrap();
let block_handle = engine
.pipe_log
.append(LogQueue::Rewrite, &mut log_batch)
Expand Down
2 changes: 1 addition & 1 deletion src/file_pipe_log/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ pub mod debug {
for batch in bs.iter_mut() {
let offset = writer.offset() as u64;
let len = batch
.finish_populate(1 /* compression_threshold */)
.finish_populate(1 /* compression_threshold */, None)
.unwrap();
batch.prepare_write(&log_file_format).unwrap();
writer
Expand Down
4 changes: 2 additions & 2 deletions src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ impl RhaiFilterMachine {
}
// Batch 64KB.
if log_batch.approximate_size() >= 64 * 1024 {
log_batch.finish_populate(0 /* compression_threshold */)?;
log_batch.finish_populate(0 /* compression_threshold */, None)?;
log_batch.prepare_write(&log_file_context)?;
writer.write(
log_batch.encoded_bytes(),
Expand All @@ -325,7 +325,7 @@ impl RhaiFilterMachine {
}
}
if !log_batch.is_empty() {
log_batch.finish_populate(0 /* compression_threshold */)?;
log_batch.finish_populate(0 /* compression_threshold */, None)?;
log_batch.prepare_write(&log_file_context)?;
writer.write(
log_batch.encoded_bytes(),
Expand Down
24 changes: 16 additions & 8 deletions src/log_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,11 @@ impl LogBatch {
///
/// Internally, encodes and optionally compresses log entries. Sets the
/// compression type to each entry index.
pub(crate) fn finish_populate(&mut self, compression_threshold: usize) -> Result<usize> {
pub(crate) fn finish_populate(
&mut self,
compression_threshold: usize,
compression_level: Option<usize>,
) -> Result<usize> {
let _t = StopWatch::new(perf_context!(log_populating_duration));
debug_assert!(self.buf_state == BufState::Open);
if self.is_empty() {
Expand All @@ -782,7 +786,11 @@ impl LogBatch {
&& self.buf.len() >= LOG_BATCH_HEADER_LEN + compression_threshold
{
let buf_len = self.buf.len();
lz4::append_compress_block(&mut self.buf, LOG_BATCH_HEADER_LEN)?;
lz4::append_compress_block(
&mut self.buf,
LOG_BATCH_HEADER_LEN,
compression_level.unwrap_or(lz4::DEFAULT_LZ4_COMPRESSION_LEVEL),
)?;
(buf_len - LOG_BATCH_HEADER_LEN, CompressionType::Lz4)
} else {
(0, CompressionType::None)
Expand Down Expand Up @@ -1325,7 +1333,7 @@ mod tests {
offset: 0,
};
let old_approximate_size = batch.approximate_size();
let len = batch.finish_populate(usize::from(compress)).unwrap();
let len = batch.finish_populate(usize::from(compress), None).unwrap();
assert!(old_approximate_size >= len);
assert_eq!(batch.approximate_size(), len);
let mut batch_handle = mocked_file_block_handle;
Expand Down Expand Up @@ -1490,7 +1498,7 @@ mod tests {
batch1.merge(&mut batch2).unwrap();
assert!(batch2.is_empty());

let len = batch1.finish_populate(0).unwrap();
let len = batch1.finish_populate(0, None).unwrap();
batch1.prepare_write(&file_context).unwrap();
let encoded = batch1.encoded_bytes();
assert_eq!(len, encoded.len());
Expand Down Expand Up @@ -1546,7 +1554,7 @@ mod tests {
offset: 0,
};
let buf_len = batch.buf.len();
let len = batch.finish_populate(1).unwrap();
let len = batch.finish_populate(1, None).unwrap();
assert!(len == 0);
assert_eq!(batch.buf_state, BufState::Encoded(buf_len, 0));
let file_context = LogFileContext::new(mocked_file_block_handles.id, Version::V2);
Expand Down Expand Up @@ -1585,7 +1593,7 @@ mod tests {
.put(region_id, b"key".to_vec(), b"value".to_vec())
.unwrap();
// enable compression so that len_and_type > len.
batch.finish_populate(1).unwrap();
batch.finish_populate(1, None).unwrap();
let file_context = LogFileContext::new(FileId::dummy(LogQueue::Append), Version::default());
batch.prepare_write(&file_context).unwrap();
let encoded = batch.encoded_bytes();
Expand Down Expand Up @@ -1626,7 +1634,7 @@ mod tests {
.add_entries::<Entry>(thread_rng().gen(), entries)
.unwrap();
}
log_batch.finish_populate(0).unwrap();
log_batch.finish_populate(0, None).unwrap();
let _ = log_batch.drain();
}
let data: Vec<u8> = (0..128).map(|_| thread_rng().gen()).collect();
Expand Down Expand Up @@ -1668,7 +1676,7 @@ mod tests {
},
];
let old_approximate_size = batch.approximate_size();
let len = batch.finish_populate(1).unwrap();
let len = batch.finish_populate(1, None).unwrap();
assert!(old_approximate_size >= len);
assert_eq!(batch.approximate_size(), len);
let checksum = batch.item_batch.checksum;
Expand Down
5 changes: 4 additions & 1 deletion src/purge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,10 @@ where
self.pipe_log.sync(LogQueue::Rewrite)?;
return Ok(None);
}
log_batch.finish_populate(self.cfg.batch_compression_threshold.0 as usize)?;
log_batch.finish_populate(
self.cfg.batch_compression_threshold.0 as usize,
self.cfg.compression_level,
)?;
let file_handle = self.pipe_log.append(LogQueue::Rewrite, log_batch)?;
if sync {
self.pipe_log.sync(LogQueue::Rewrite)?
Expand Down
10 changes: 7 additions & 3 deletions src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,10 @@ pub mod lz4 {
use crate::{Error, Result};
use std::{i32, ptr};

pub const DEFAULT_LZ4_COMPRESSION_LEVEL: usize = 1;

/// Compress content in `buf[skip..]`, and append output to `buf`.
pub fn append_compress_block(buf: &mut Vec<u8>, skip: usize) -> Result<()> {
pub fn append_compress_block(buf: &mut Vec<u8>, skip: usize, level: usize) -> Result<()> {
let buf_len = buf.len();
let content_len = buf_len - skip;
if content_len > 0 {
Expand All @@ -244,11 +246,12 @@ pub mod lz4 {
let le_len = content_len.to_le_bytes();
ptr::copy_nonoverlapping(le_len.as_ptr(), buf_ptr.add(buf_len), 4);

let compressed = lz4_sys::LZ4_compress_default(
let compressed = lz4_sys::LZ4_compress_fast(
buf_ptr.add(skip) as _,
buf_ptr.add(buf_len + 4) as _,
content_len as i32,
bound,
level as i32,
);
if compressed == 0 {
return Err(Error::Other(box_err!("Compression failed")));
Expand Down Expand Up @@ -298,7 +301,8 @@ pub mod lz4 {
let vecs: Vec<Vec<u8>> = vec![b"".to_vec(), b"123".to_vec(), b"12345678910".to_vec()];
for mut vec in vecs.into_iter() {
let uncompressed_len = vec.len();
super::append_compress_block(&mut vec, 0).unwrap();
super::append_compress_block(&mut vec, 0, super::DEFAULT_LZ4_COMPRESSION_LEVEL)
.unwrap();
let res = super::decompress_block(&vec[uncompressed_len..]).unwrap();
assert_eq!(res, vec[..uncompressed_len].to_owned());
}
Expand Down

0 comments on commit d4943a9

Please sign in to comment.