Skip to content

Commit

Permalink
feat: Add block cache size to rocksdb config
Browse files Browse the repository at this point in the history
  • Loading branch information
karolisg committed Sep 28, 2023
1 parent 1c9a1dc commit f87e7fc
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 13 deletions.
11 changes: 8 additions & 3 deletions dozer-recordstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ impl ProcessorRecordStore {
pub fn new(record_store: RecordStore) -> Result<Self, RecordStoreError> {
match record_store {
RecordStore::InMemory => Ok(Self::InMemory(in_memory::ProcessorRecordStore::new()?)),
RecordStore::Rocksdb => Ok(Self::Rocksdb(rocksdb::ProcessorRecordStore::new()?)),
RecordStore::Rocksdb(config) => {
Ok(Self::Rocksdb(rocksdb::ProcessorRecordStore::new(config)?))
}
}
}

Expand Down Expand Up @@ -178,7 +180,9 @@ impl ProcessorRecordStoreDeserializer {
RecordStore::InMemory => Ok(Self::InMemory(
in_memory::ProcessorRecordStoreDeserializer::new()?,
)),
RecordStore::Rocksdb => Ok(Self::Rocksdb(rocksdb::ProcessorRecordStore::new()?)),
RecordStore::Rocksdb(config) => {
Ok(Self::Rocksdb(rocksdb::ProcessorRecordStore::new(config)?))
}
}
}

Expand Down Expand Up @@ -253,6 +257,7 @@ struct ProcessorRecordForSerialization {

#[cfg(test)]
mod tests {
use dozer_types::models::app_config::RocksdbConfig;
use std::time::Duration;

use dozer_types::types::Timestamp;
Expand Down Expand Up @@ -283,7 +288,7 @@ mod tests {
#[test]
fn test_record_roundtrip() {
test_record_roundtrip_impl(RecordStore::InMemory);
test_record_roundtrip_impl(RecordStore::Rocksdb);
test_record_roundtrip_impl(RecordStore::Rocksdb(RocksdbConfig::default()));
}

fn test_record_serialization_roundtrip_impl(record_store_kind: RecordStore) {
Expand Down
5 changes: 3 additions & 2 deletions dozer-recordstore/src/rocksdb.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::atomic::AtomicU64;

use dozer_storage::RocksdbMap;
use dozer_types::models::app_config::RocksdbConfig;
use dozer_types::types::Field;
use tempdir::TempDir;

Expand All @@ -14,10 +15,10 @@ pub struct ProcessorRecordStore {
}

impl ProcessorRecordStore {
pub fn new() -> Result<Self, RecordStoreError> {
pub fn new(config: RocksdbConfig) -> Result<Self, RecordStoreError> {
let temp_dir = TempDir::new("rocksdb_processor_record_store")
.map_err(RecordStoreError::FailedToCreateTempDir)?;
let records = RocksdbMap::<u64, Vec<Field>>::create(temp_dir.path())?;
let records = RocksdbMap::<u64, Vec<Field>>::create(temp_dir.path(), config)?;

Ok(Self {
_temp_dir: temp_dir,
Expand Down
18 changes: 15 additions & 3 deletions dozer-storage/src/rocksdb_map.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::path::Path;

use rocksdb::DB;
use rocksdb::{BlockBasedOptions, Cache, Options, DB};

use dozer_types::borrow::IntoOwned;
use dozer_types::models::app_config::RocksdbConfig;

use crate::{errors::StorageError, BorrowEncode, Encode, LmdbVal};

Expand All @@ -17,8 +18,19 @@ impl<K: BorrowEncode, V: LmdbVal> RocksdbMap<K, V>
where
for<'a> V::Borrowed<'a>: IntoOwned<V>,
{
pub fn create(path: &Path) -> Result<Self, StorageError> {
let db = DB::open_default(path)?;
pub fn create(path: &Path, config: RocksdbConfig) -> Result<Self, StorageError> {
let mut options = Options::default();
options.create_if_missing(true);

if let Some(block_cache_size) = config.block_cache_size {
let mut block_options = BlockBasedOptions::default();
let cache = Cache::new_lru_cache(block_cache_size);
block_options.set_block_cache(&cache);

options.set_block_based_table_factory(&block_options);
}

let db = DB::open(&options, path)?;
Ok(Self {
db,
_key: std::marker::PhantomData,
Expand Down
8 changes: 7 additions & 1 deletion dozer-types/src/models/app_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,13 @@ pub struct S3Storage {
pub enum RecordStore {
#[default]
InMemory,
Rocksdb,
Rocksdb(RocksdbConfig),
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema, Default)]
#[serde(deny_unknown_fields)]
pub struct RocksdbConfig {
pub block_cache_size: Option<usize>,
}

pub fn default_persist_queue_capacity() -> u32 {
Expand Down
37 changes: 33 additions & 4 deletions json_schemas/dozer.json
Original file line number Diff line number Diff line change
Expand Up @@ -1307,10 +1307,25 @@
"additionalProperties": false
},
"RecordStore": {
"type": "string",
"enum": [
"InMemory",
"Rocksdb"
"oneOf": [
{
"type": "string",
"enum": [
"InMemory"
]
},
{
"type": "object",
"required": [
"Rocksdb"
],
"properties": {
"Rocksdb": {
"$ref": "#/definitions/RocksdbConfig"
}
},
"additionalProperties": false
}
]
},
"RefreshConfig": {
Expand Down Expand Up @@ -1351,6 +1366,20 @@
},
"additionalProperties": false
},
"RocksdbConfig": {
"type": "object",
"properties": {
"block_cache_size": {
"type": [
"integer",
"null"
],
"format": "uint",
"minimum": 0.0
}
},
"additionalProperties": false
},
"S3Details": {
"type": "object",
"required": [
Expand Down

0 comments on commit f87e7fc

Please sign in to comment.