From 90a6b992c2816b30ca5ca5d059cba3af9685bbd9 Mon Sep 17 00:00:00 2001 From: cchudant Date: Fri, 29 Mar 2024 13:09:24 +0000 Subject: [PATCH] kvdb-rocksdb: support the rocksdb/multi-threaded-cf feature --- kvdb-rocksdb/src/iter.rs | 4 ++-- kvdb-rocksdb/src/lib.rs | 20 +++++++++++--------- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/kvdb-rocksdb/src/iter.rs b/kvdb-rocksdb/src/iter.rs index 08ed32022..2287bc21f 100644 --- a/kvdb-rocksdb/src/iter.rs +++ b/kvdb-rocksdb/src/iter.rs @@ -39,7 +39,7 @@ impl<'a> IterationHandler for &'a DBAndColumns { fn iter(self, col: u32, read_opts: ReadOptions) -> Self::Iterator { match self.cf(col as usize) { - Ok(cf) => EitherIter::A(KvdbAdapter(self.db.iterator_cf_opt(cf, read_opts, IteratorMode::Start))), + Ok(cf) => EitherIter::A(KvdbAdapter(self.db.iterator_cf_opt(&cf, read_opts, IteratorMode::Start))), Err(e) => EitherIter::B(std::iter::once(Err(e))), } } @@ -47,7 +47,7 @@ impl<'a> IterationHandler for &'a DBAndColumns { fn iter_with_prefix(self, col: u32, prefix: &[u8], read_opts: ReadOptions) -> Self::Iterator { match self.cf(col as usize) { Ok(cf) => EitherIter::A(KvdbAdapter(self.db.iterator_cf_opt( - cf, + &cf, read_opts, IteratorMode::From(prefix, Direction::Forward), ))), diff --git a/kvdb-rocksdb/src/lib.rs b/kvdb-rocksdb/src/lib.rs index 0af25f6b9..c7f6bcffe 100644 --- a/kvdb-rocksdb/src/lib.rs +++ b/kvdb-rocksdb/src/lib.rs @@ -17,7 +17,7 @@ use std::{ }; use rocksdb::{ - BlockBasedOptions, ColumnFamily, ColumnFamilyDescriptor, Options, ReadOptions, WriteBatch, WriteOptions, DB, + BlockBasedOptions, ColumnFamilyDescriptor, ColumnFamilyRef, Options, ReadOptions, WriteBatch, WriteOptions, DB }; use kvdb::{DBKeyValue, DBOp, DBTransaction, DBValue, KeyValueDB}; @@ -251,7 +251,7 @@ struct DBAndColumns { } impl DBAndColumns { - fn cf(&self, i: usize) -> io::Result<&ColumnFamily> { + fn cf(&self, i: usize) -> io::Result> { let name = self.column_names.get(i).ok_or_else(|| invalid_column(i as u32))?; self.db .cf_handle(&name) @@ -377,6 +377,7 @@ impl Database { Err(_) => { // retry and create CFs match DB::open_cf(&opts, path.as_ref(), &[] as &[&str]) { + #[allow(unused_mut)] // warns when `multi-threaded-cf` feature is enabled on rocksdb, as `create_cf` takes an &self. Ok(mut db) => { for (i, name) in column_names.iter().enumerate() { let _ = db @@ -436,23 +437,23 @@ impl Database { match op { DBOp::Insert { col: _, key, value } => { stats_total_bytes += key.len() + value.len(); - batch.put_cf(cf, &key, &value); + batch.put_cf(&cf, &key, &value); }, DBOp::Delete { col: _, key } => { // We count deletes as writes. stats_total_bytes += key.len(); - batch.delete_cf(cf, &key); + batch.delete_cf(&cf, &key); }, DBOp::DeletePrefix { col, prefix } => { let end_prefix = kvdb::end_prefix(&prefix[..]); let no_end = end_prefix.is_none(); let end_range = end_prefix.unwrap_or_else(|| vec![u8::max_value(); 16]); - batch.delete_range_cf(cf, &prefix[..], &end_range[..]); + batch.delete_range_cf(&cf, &prefix[..], &end_range[..]); if no_end { let prefix = if prefix.len() > end_range.len() { &prefix[..] } else { &end_range[..] }; for result in self.iter_with_prefix(col, prefix) { let (key, _) = result?; - batch.delete_cf(cf, &key[..]); + batch.delete_cf(&cf, &key[..]); } } }, @@ -460,7 +461,8 @@ impl Database { } self.stats.tally_bytes_written(stats_total_bytes as u64); - cfs.db.write_opt(batch, &self.write_opts).map_err(other_io_err) + cfs.db.write_opt(batch, &self.write_opts).map_err(other_io_err)?; + Ok(()) } /// Get value by key. @@ -470,7 +472,7 @@ impl Database { self.stats.tally_reads(1); let value = cfs .db - .get_pinned_cf_opt(cf, key, &self.read_opts) + .get_pinned_cf_opt(&cf, key, &self.read_opts) .map(|r| r.map(|v| v.to_vec())) .map_err(other_io_err); @@ -521,7 +523,7 @@ impl Database { const ESTIMATE_NUM_KEYS: &str = "rocksdb.estimate-num-keys"; let cfs = &self.inner; let cf = cfs.cf(col as usize)?; - match cfs.db.property_int_value_cf(cf, ESTIMATE_NUM_KEYS) { + match cfs.db.property_int_value_cf(&cf, ESTIMATE_NUM_KEYS) { Ok(estimate) => Ok(estimate.unwrap_or_default()), Err(err_string) => Err(other_io_err(err_string)), }