Skip to content

Commit

Permalink
feat: Split cache name and label (#2161)
Browse files Browse the repository at this point in the history
  • Loading branch information
chubei authored Oct 18, 2023
1 parent b43da60 commit 6c364e5
Show file tree
Hide file tree
Showing 23 changed files with 266 additions and 107 deletions.
7 changes: 6 additions & 1 deletion dozer-api/src/cache_builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,18 @@ pub fn open_or_create_cache(
connections: &HashSet<String>,
write_options: CacheWriteOptions,
) -> Result<Box<dyn RwCache>, CacheError> {
match cache_manager.open_rw_cache(labels.clone(), write_options)? {
match cache_manager.open_rw_cache(
labels.to_non_empty_string().into_owned(),
labels.clone(),
write_options,
)? {
Some(cache) => {
debug_assert!(cache.get_schema() == &schema);
Ok(cache)
}
None => {
let cache = cache_manager.create_cache(
labels.to_non_empty_string().into_owned(),
labels,
schema.0,
schema.1,
Expand Down
2 changes: 1 addition & 1 deletion dozer-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ fn open_cache_reader(
labels: Labels,
) -> Result<Option<CacheReader>, ApiInitError> {
let cache = cache_manager
.open_ro_cache(labels)
.open_ro_cache(labels.to_non_empty_string().into_owned(), labels)
.map_err(ApiInitError::OpenOrCreateCache)?;
Ok(cache.map(CacheReader::new))
}
Expand Down
1 change: 1 addition & 0 deletions dozer-api/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ pub fn initialize_cache(
let (schema, secondary_indexes) = schema.unwrap_or_else(get_schema);
let mut cache = cache_manager
.create_cache(
labels.to_non_empty_string().into_owned(),
labels,
schema,
secondary_indexes,
Expand Down
1 change: 1 addition & 0 deletions dozer-cache/benches/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ fn cache(c: &mut Criterion) {
let cache = Mutex::new(
cache_manager
.create_cache(
"temp".to_string(),
Default::default(),
schema,
secondary_indexes,
Expand Down
12 changes: 6 additions & 6 deletions dozer-cache/src/cache/lmdb/cache/dump_restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,21 +80,21 @@ pub async fn dump<'txn, T: Transaction, C: LmdbCache>(
}

pub async fn restore(
options: &CacheOptions,
options: CacheOptions,
write_options: CacheWriteOptions,
indexing_thread_pool: Arc<Mutex<IndexingThreadPool>>,
reader: &mut (impl AsyncRead + Unpin),
) -> Result<LmdbRwCache, CacheError> {
info!("Restoring cache with options {options:?}");
let rw_main_env =
main_environment::dump_restore::restore(options, write_options, reader).await?;
main_environment::dump_restore::restore(&options, write_options, reader).await?;

let options = CacheOptions {
path: Some((
rw_main_env.base_path().to_path_buf(),
rw_main_env.labels().clone(),
rw_main_env.name().to_string(),
)),
..*options
..options
};
let ro_main_env = rw_main_env.share();

Expand All @@ -103,7 +103,7 @@ pub async fn restore(
for index in 0..ro_main_env.schema().1.len() {
let name = secondary_environment_name(index);
let rw_secondary_env =
secondary_environment::dump_restore::restore(name, &options, reader).await?;
secondary_environment::dump_restore::restore(name, options.clone(), reader).await?;
let ro_secondary_env = rw_secondary_env.share();

rw_secondary_envs.push(rw_secondary_env);
Expand Down Expand Up @@ -156,7 +156,7 @@ mod tests {
}

let restored_cache = restore(
&Default::default(),
Default::default(),
Default::default(),
indexing_thread_pool.clone(),
&mut data.as_slice(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ fn init_env(conflict_resolution: ConflictResolution) -> (RwMainEnvironment, Sche
..Default::default()
};
let main_env =
RwMainEnvironment::new(Some(&schema), None, &Default::default(), write_options).unwrap();
RwMainEnvironment::new(Some(&schema), None, Default::default(), write_options).unwrap();
(main_env, schema.0)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub async fn restore(
reader: &mut (impl AsyncRead + Unpin),
) -> Result<RwMainEnvironment, CacheError> {
info!("Restoring main environment with options {options:?}");
let (mut env, (base_path, labels), temp_dir) = create_env(options)?;
let (mut env, (base_path, name), temp_dir) = create_env(options)?;

info!("Restoring schema");
dozer_storage::restore(&mut env, reader).await?;
Expand All @@ -62,7 +62,8 @@ pub async fn restore(
info!("Restoring connection snapshotting done");
dozer_storage::restore(&mut env, reader).await?;
info!("Restoring operation log");
let operation_log = OperationLog::restore(&mut env, reader, labels).await?;
let operation_log =
OperationLog::restore(&mut env, reader, name, options.labels.clone()).await?;

let schema_option = LmdbOption::open(&env, Some(SCHEMA_DB_NAME))?;
let commit_state = LmdbOption::open(&env, Some(COMMIT_STATE_DB_NAME))?;
Expand Down Expand Up @@ -142,7 +143,7 @@ pub mod tests {
let mut env = RwMainEnvironment::new(
Some(&(schema, vec![])),
None,
&Default::default(),
Default::default(),
Default::default(),
)
.unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ fn test_hash_insert_delete_insert() {
let mut env = RwMainEnvironment::new(
Some(&(schema, vec![])),
None,
&Default::default(),
Default::default(),
Default::default(),
)
.unwrap();
Expand Down
23 changes: 14 additions & 9 deletions dozer-cache/src/cache/lmdb/cache/main_environment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ pub trait MainEnvironment: LmdbEnvironment {
&self.common().base_path
}

fn name(&self) -> &str {
self.common().operation_log.name()
}

fn labels(&self) -> &Labels {
self.common().operation_log.labels()
}
Expand Down Expand Up @@ -154,12 +158,12 @@ impl RwMainEnvironment {
pub fn new(
schema: Option<&SchemaWithIndex>,
connections: Option<&HashSet<String>>,
options: &CacheOptions,
options: CacheOptions,
write_options: CacheWriteOptions,
) -> Result<Self, CacheError> {
let (mut env, (base_path, labels), temp_dir) = create_env(options)?;
let (mut env, (base_path, name), temp_dir) = create_env(&options)?;

let operation_log = OperationLog::create(&mut env, labels.clone())?;
let operation_log = OperationLog::create(&mut env, name.clone(), options.labels)?;
let schema_option = LmdbOption::create(&mut env, Some(SCHEMA_DB_NAME))?;
let commit_state = LmdbOption::create(&mut env, Some(COMMIT_STATE_DB_NAME))?;
let connection_snapshotting_done =
Expand All @@ -173,7 +177,7 @@ impl RwMainEnvironment {
(Some(schema), Some(old_schema)) => {
if &old_schema != schema {
return Err(CacheError::SchemaMismatch {
name: labels.to_string(),
name: name.clone(),
given: Box::new(schema.clone()),
stored: Box::new(old_schema),
});
Expand Down Expand Up @@ -206,7 +210,7 @@ impl RwMainEnvironment {
if &existing_connections != connections {
return Err(CacheError::ConnectionsMismatch(Box::new(
ConnectionMismatch {
name: labels.to_string(),
name,
given: connections.clone(),
stored: existing_connections,
},
Expand Down Expand Up @@ -587,10 +591,11 @@ impl MainEnvironment for RoMainEnvironment {
}

impl RoMainEnvironment {
pub fn new(options: &CacheOptions) -> Result<Self, CacheError> {
let (env, (base_path, labels), _temp_dir) = open_env(options)?;
pub fn new(options: CacheOptions) -> Result<Self, CacheError> {
let (env, (base_path, name)) = open_env(&options)?;
let base_path = base_path.to_path_buf();

let operation_log = OperationLog::open(&env, labels.clone())?;
let operation_log = OperationLog::open(&env, name.to_string(), options.labels)?;
let schema_option = LmdbOption::open(&env, Some(SCHEMA_DB_NAME))?;
let commit_state = LmdbOption::open(&env, Some(COMMIT_STATE_DB_NAME))?;
let connection_snapshotting_done =
Expand All @@ -604,7 +609,7 @@ impl RoMainEnvironment {
Ok(Self {
env,
common: MainEnvironmentCommon {
base_path: base_path.to_path_buf(),
base_path,
schema,
schema_option,
commit_state,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ pub struct OperationLog {
next_operation_id: LmdbCounter,
/// Operation_id -> operation.
operation_id_to_operation: LmdbMap<u64, Operation>,
/// The cache name.
name: String,
/// The cache labels.
labels: Labels,
}
Expand All @@ -69,7 +71,11 @@ const OPERATION_ID_TO_OPERATION_DB_NAME: &str = "operation_id_to_operation";
const CACHE_OPERATION_LOG_COUNTER_NAME: &str = "cache_operation_log";

impl OperationLog {
pub fn create(env: &mut RwLmdbEnvironment, labels: Labels) -> Result<Self, StorageError> {
pub fn create(
env: &mut RwLmdbEnvironment,
name: String,
labels: Labels,
) -> Result<Self, StorageError> {
describe_counter!(
CACHE_OPERATION_LOG_COUNTER_NAME,
"Number of operations stored in the cache"
Expand All @@ -87,11 +93,16 @@ impl OperationLog {
present_operation_ids,
next_operation_id,
operation_id_to_operation,
name,
labels,
})
}

pub fn open<E: LmdbEnvironment>(env: &E, labels: Labels) -> Result<Self, StorageError> {
pub fn open<E: LmdbEnvironment>(
env: &E,
name: String,
labels: Labels,
) -> Result<Self, StorageError> {
let primary_key_metadata = PrimaryKeyMetadata::open(env)?;
let hash_metadata = HashMetadata::open(env)?;
let present_operation_ids = LmdbSet::open(env, Some(PRESENT_OPERATION_IDS_DB_NAME))?;
Expand All @@ -104,10 +115,15 @@ impl OperationLog {
present_operation_ids,
next_operation_id,
operation_id_to_operation,
name,
labels,
})
}

pub fn name(&self) -> &str {
&self.name
}

pub fn labels(&self) -> &Labels {
&self.labels
}
Expand Down Expand Up @@ -506,6 +522,7 @@ impl OperationLog {
pub async fn restore<'txn, R: tokio::io::AsyncRead + Unpin>(
env: &mut RwLmdbEnvironment,
reader: &mut R,
name: String,
labels: Labels,
) -> Result<Self, dozer_storage::RestoreError> {
info!("Restoring primary key metadata");
Expand All @@ -518,7 +535,7 @@ impl OperationLog {
dozer_storage::restore(env, reader).await?;
info!("Restoring operation id to operation");
dozer_storage::restore(env, reader).await?;
Self::open(env, labels).map_err(Into::into)
Self::open(env, name, labels).map_err(Into::into)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub fn assert_operation_log_equal<T1: Transaction, T2: Transaction>(
#[test]
fn test_operation_log_append_only() {
let mut env = create_env(&Default::default()).unwrap().0;
let log = OperationLog::create(&mut env, Default::default()).unwrap();
let log = OperationLog::create(&mut env, "temp".to_string(), Default::default()).unwrap();
let txn = env.txn_mut().unwrap();
let append_only = true;

Expand Down Expand Up @@ -96,7 +96,7 @@ fn test_operation_log_append_only() {
#[test]
fn test_operation_log_with_primary_key() {
let mut env = create_env(&Default::default()).unwrap().0;
let log = OperationLog::create(&mut env, Default::default()).unwrap();
let log = OperationLog::create(&mut env, "temp".to_string(), Default::default()).unwrap();
let txn = env.txn_mut().unwrap();
let append_only = false;

Expand Down Expand Up @@ -219,7 +219,7 @@ fn test_operation_log_with_primary_key() {
#[test]
fn test_operation_log_without_primary_key() {
let mut env = create_env(&Default::default()).unwrap().0;
let log = OperationLog::create(&mut env, Default::default()).unwrap();
let log = OperationLog::create(&mut env, "temp".to_string(), Default::default()).unwrap();
let txn = env.txn_mut().unwrap();
let append_only = false;

Expand Down
31 changes: 21 additions & 10 deletions dozer-cache/src/cache/lmdb/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ pub struct CacheOptions {
pub intersection_chunk_size: usize,

/// Provide a path where db will be created. If nothing is provided, will default to a temp location.
/// Db path will be `PathBuf.join(Labels.to_non_empty_string())`.
pub path: Option<(PathBuf, Labels)>,
/// Db path will be `PathBuf.join(name)`.
pub path: Option<(PathBuf, String)>,

/// The labels to attach to the cache.
pub labels: Labels,
}

impl Default for CacheOptions {
Expand All @@ -51,6 +54,7 @@ impl Default for CacheOptions {
max_size: 1024 * 1024 * 1024,
intersection_chunk_size: 100,
path: None,
labels: Labels::default(),
}
}
}
Expand All @@ -62,10 +66,12 @@ pub struct LmdbRoCache {
}

impl LmdbRoCache {
pub fn new(options: &CacheOptions) -> Result<Self, CacheError> {
let main_env = RoMainEnvironment::new(options)?;
pub fn new(options: CacheOptions) -> Result<Self, CacheError> {
let main_env = RoMainEnvironment::new(options.clone())?;
let secondary_envs = (0..main_env.schema().1.len())
.map(|index| RoSecondaryEnvironment::new(secondary_environment_name(index), options))
.map(|index| {
RoSecondaryEnvironment::new(secondary_environment_name(index), options.clone())
})
.collect::<Result<_, _>>()?;
Ok(Self {
main_env,
Expand All @@ -85,18 +91,19 @@ impl LmdbRwCache {
pub fn new(
schema: Option<&SchemaWithIndex>,
connections: Option<&HashSet<String>>,
options: &CacheOptions,
options: CacheOptions,
write_options: CacheWriteOptions,
indexing_thread_pool: Arc<Mutex<IndexingThreadPool>>,
) -> Result<Self, CacheError> {
let rw_main_env = RwMainEnvironment::new(schema, connections, options, write_options)?;
let rw_main_env =
RwMainEnvironment::new(schema, connections, options.clone(), write_options)?;

let options = CacheOptions {
path: Some((
rw_main_env.base_path().to_path_buf(),
rw_main_env.labels().clone(),
rw_main_env.name().to_string(),
)),
..*options
..options
};
let ro_main_env = rw_main_env.share();

Expand All @@ -105,7 +112,7 @@ impl LmdbRwCache {
for (index, index_definition) in ro_main_env.schema().1.iter().enumerate() {
let name = secondary_environment_name(index);
let rw_secondary_env =
RwSecondaryEnvironment::new(index_definition, name.clone(), &options)?;
RwSecondaryEnvironment::new(index_definition, name.clone(), options.clone())?;
let ro_secondary_env = rw_secondary_env.share();

rw_secondary_envs.push(rw_secondary_env);
Expand All @@ -125,6 +132,10 @@ impl LmdbRwCache {
}

impl<C: LmdbCache> RoCache for C {
fn name(&self) -> &str {
self.main_env().name()
}

fn labels(&self) -> &Labels {
self.main_env().labels()
}
Expand Down
Loading

0 comments on commit 6c364e5

Please sign in to comment.