Skip to content

Commit

Permalink
feat: Store SourceStates in log and cache
Browse files Browse the repository at this point in the history
  • Loading branch information
chubei committed Oct 16, 2023
1 parent e6e12d2 commit 7e09ce7
Show file tree
Hide file tree
Showing 20 changed files with 212 additions and 96 deletions.
12 changes: 6 additions & 6 deletions dozer-api/src/cache_builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub async fn build_cache(
labels: LabelsAndProgress,
) -> Result<(), CacheError> {
// Create log reader.
let starting_pos = cache.get_metadata()?.map(|pos| pos + 1).unwrap_or(0);
let starting_pos = cache.get_log_position()?.map(|pos| pos + 1).unwrap_or(0);
debug!(
"Starting log reader {} from position {starting_pos}",
log_reader_builder.options.endpoint
Expand Down Expand Up @@ -207,9 +207,11 @@ fn build_cache_task(
}
}
},
LogOperation::Commit { decision_instant } => {
cache.set_metadata(op_and_pos.pos)?;
cache.commit()?;
LogOperation::Commit {
source_states,
decision_instant,
} => {
cache.commit(&source_states, op_and_pos.pos)?;
if let Ok(duration) = decision_instant.elapsed() {
histogram!(
DATA_LATENCY_HISTOGRAM_NAME,
Expand All @@ -219,9 +221,7 @@ fn build_cache_task(
}
}
LogOperation::SnapshottingDone { connection_name } => {
cache.set_metadata(op_and_pos.pos)?;
cache.set_connection_snapshotting_done(&connection_name)?;
cache.commit()?;
snapshotting = !cache.is_snapshotting_done()?;
}
}
Expand Down
2 changes: 1 addition & 1 deletion dozer-api/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ pub fn initialize_cache(
for record in records {
cache.insert(&record.record).unwrap();
}
cache.commit().unwrap();
cache.commit(&Default::default(), 0).unwrap();
cache_manager.wait_until_indexing_catchup();

Box::new(cache_manager)
Expand Down
2 changes: 1 addition & 1 deletion dozer-cache/benches/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ fn insert(cache: &Mutex<Box<dyn RwCache>>, n: usize, commit_size: usize) {
cache.insert(&record).unwrap();

if n % commit_size == 0 {
cache.commit().unwrap();
cache.commit(&Default::default(), 0).unwrap();
}
}

Expand Down
4 changes: 2 additions & 2 deletions dozer-cache/src/cache/lmdb/cache/dump_restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub fn begin_dump_txn<C: LmdbCache>(
) -> Result<DumpTransaction<RoTransaction>, CacheError> {
let main_env = cache.main_env();
let main_txn = main_env.begin_txn()?;
let main_env_metadata = main_env.metadata_with_txn(&main_txn)?;
let main_env_metadata = main_env.log_positions_with_txn(&main_txn)?;

let mut secondary_txns = vec![];
let mut secondary_metadata = vec![];
Expand Down Expand Up @@ -139,7 +139,7 @@ mod tests {
insert_rec_1(&mut cache, (0, Some("a".to_string()), None));
insert_rec_1(&mut cache, (1, None, Some(2)));
insert_rec_1(&mut cache, (2, Some("b".to_string()), Some(3)));
cache.commit().unwrap();
cache.commit(&Default::default(), 0).unwrap();
indexing_thread_pool.lock().wait_until_catchup();

let mut data = vec![];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ fn ignore_insert_error_when_type_nothing() {
lifetime: None,
};
env.insert(&record).unwrap();
env.commit().unwrap();
env.commit(&Default::default(), 0).unwrap();

let key = index::get_primary_key(&schema.primary_index, &initial_values);
let record = env.get(&key).unwrap();
Expand Down Expand Up @@ -67,7 +67,7 @@ fn update_after_insert_error_when_type_update() {
lifetime: None,
};
env.insert(&record).unwrap();
env.commit().unwrap();
env.commit(&Default::default(), 0).unwrap();

let key = index::get_primary_key(&schema.primary_index, &initial_values);
let record = env.get(&key).unwrap();
Expand All @@ -85,7 +85,7 @@ fn update_after_insert_error_when_type_update() {
};

env.insert(&second_record).unwrap();
env.commit().unwrap();
env.commit(&Default::default(), 0).unwrap();

let key = index::get_primary_key(&schema.primary_index, &initial_values);
let record = env.get(&key).unwrap();
Expand Down Expand Up @@ -113,7 +113,7 @@ fn return_insert_error_when_type_panic() {
lifetime: None,
};
env.insert(&record).unwrap();
env.commit().unwrap();
env.commit(&Default::default(), 0).unwrap();

let key = index::get_primary_key(&schema.primary_index, &initial_values);
let record = env.get(&key).unwrap();
Expand Down Expand Up @@ -179,7 +179,7 @@ fn update_after_update_error_when_type_upsert() {
lifetime: None,
};
env.update(&initial_record, &update_record).unwrap();
env.commit().unwrap();
env.commit(&Default::default(), 0).unwrap();

let key = index::get_primary_key(&schema.primary_index, &initial_values);
let record = env.get(&key).unwrap();
Expand Down
36 changes: 27 additions & 9 deletions dozer-cache/src/cache/lmdb/cache/main_environment/dump_restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ use crate::{

use super::{
MainEnvironment, MainEnvironmentCommon, OperationLog, RwMainEnvironment,
CONNECTION_SNAPSHOTTING_DONE_DB_NAME, METADATA_DB_NAME, SCHEMA_DB_NAME,
CONNECTION_SNAPSHOTTING_DONE_DB_NAME, LOG_POSITION_DB_NAME, SCHEMA_DB_NAME,
SOURCE_STATES_DB_NAME,
};

pub async fn dump<'txn, E: MainEnvironment, T: Transaction>(
Expand All @@ -32,8 +33,15 @@ pub async fn dump<'txn, E: MainEnvironment, T: Transaction>(
.await?;
dozer_storage::dump(
txn,
METADATA_DB_NAME,
env.common().metadata.database(),
SOURCE_STATES_DB_NAME,
env.common().source_states.database(),
context,
)
.await?;
dozer_storage::dump(
txn,
LOG_POSITION_DB_NAME,
env.common().log_position.database(),
context,
)
.await?;
Expand All @@ -57,15 +65,18 @@ pub async fn restore(

info!("Restoring schema");
dozer_storage::restore(&mut env, reader).await?;
info!("Restoring metadata");
info!("Restoring source states");
dozer_storage::restore(&mut env, reader).await?;
info!("Restoring log position");
dozer_storage::restore(&mut env, reader).await?;
info!("Restoring connection snapshotting done");
dozer_storage::restore(&mut env, reader).await?;
info!("Restoring operation log");
let operation_log = OperationLog::restore(&mut env, reader, labels).await?;

let schema_option = LmdbOption::open(&env, Some(SCHEMA_DB_NAME))?;
let metadata = LmdbOption::open(&env, Some(METADATA_DB_NAME))?;
let source_states = LmdbOption::open(&env, Some(SOURCE_STATES_DB_NAME))?;
let log_position = LmdbOption::open(&env, Some(LOG_POSITION_DB_NAME))?;
let connection_snapshotting_done =
LmdbMap::open(&env, Some(CONNECTION_SNAPSHOTTING_DONE_DB_NAME))?;

Expand All @@ -80,7 +91,8 @@ pub async fn restore(
base_path,
schema,
schema_option,
metadata,
source_states,
log_position,
connection_snapshotting_done,
operation_log,
intersection_chunk_size: options.intersection_chunk_size,
Expand Down Expand Up @@ -118,9 +130,15 @@ pub mod tests {
);
assert_database_equal(
&txn1,
env1.common().metadata.database(),
env1.common().source_states.database(),
&txn2,
env2.common().source_states.database(),
);
assert_database_equal(
&txn1,
env1.common().log_position.database(),
&txn2,
env2.common().metadata.database(),
env2.common().log_position.database(),
);
assert_database_equal(
&txn1,
Expand Down Expand Up @@ -151,7 +169,7 @@ pub mod tests {
env.insert(&record).unwrap();
env.insert(&record).unwrap();
env.delete(&record).unwrap();
env.commit().unwrap();
env.commit(&Default::default(), 0).unwrap();

let mut data = vec![];
{
Expand Down
68 changes: 47 additions & 21 deletions dozer-cache/src/cache/lmdb/cache/main_environment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ use dozer_storage::{
};
use dozer_tracing::Labels;
use dozer_types::{
borrow::IntoOwned,
bincode,
borrow::{Borrow, IntoOwned},
node::SourceStates,
types::{Field, FieldType, Record, Schema, SchemaWithIndex},
};
use dozer_types::{
Expand Down Expand Up @@ -78,8 +80,21 @@ pub trait MainEnvironment: LmdbEnvironment {
.ok_or(CacheError::PrimaryKeyNotFound)
}

fn metadata(&self) -> Result<Option<u64>, CacheError> {
self.metadata_with_txn(&self.begin_txn()?)
fn source_states(&self) -> Result<Option<SourceStates>, CacheError> {
let txn = self.begin_txn()?;
self.common()
.source_states
.load(&txn)?
.map(|source_states| {
bincode::deserialize(source_states.borrow())
.map_err(CacheError::map_deserialization_error)
})
.transpose()
}

fn log_position(&self) -> Result<Option<u64>, CacheError> {
let txn = self.begin_txn()?;
self.log_positions_with_txn(&txn)
}

fn is_snapshotting_done(&self) -> Result<bool, CacheError> {
Expand All @@ -92,17 +107,18 @@ pub trait MainEnvironment: LmdbEnvironment {
Ok(true)
}

fn metadata_with_txn<T: Transaction>(&self, txn: &T) -> Result<Option<u64>, CacheError> {
fn log_positions_with_txn<T: Transaction>(&self, txn: &T) -> Result<Option<u64>, CacheError> {
self.common()
.metadata
.log_position
.load(txn)
.map(|data| data.map(IntoOwned::into_owned))
.map_err(Into::into)
}
}

const SCHEMA_DB_NAME: &str = "schema";
const METADATA_DB_NAME: &str = "metadata";
const SOURCE_STATES_DB_NAME: &str = "source_states";
const LOG_POSITION_DB_NAME: &str = "log_position";
const CONNECTION_SNAPSHOTTING_DONE_DB_NAME: &str = "connection_snapshotting_done";

#[derive(Debug, Clone)]
Expand All @@ -113,8 +129,10 @@ pub struct MainEnvironmentCommon {
schema: SchemaWithIndex,
/// The schema database, used for dumping.
schema_option: LmdbOption<SchemaWithIndex>,
/// The metadata.
metadata: LmdbOption<u64>,
/// The serialized source states.
source_states: LmdbOption<Vec<u8>>,
/// The log position.
log_position: LmdbOption<u64>,
/// The source status.
connection_snapshotting_done: LmdbMap<String, bool>,
/// The operation log.
Expand Down Expand Up @@ -153,7 +171,8 @@ impl RwMainEnvironment {

let operation_log = OperationLog::create(&mut env, labels.clone())?;
let schema_option = LmdbOption::create(&mut env, Some(SCHEMA_DB_NAME))?;
let metadata = LmdbOption::create(&mut env, Some(METADATA_DB_NAME))?;
let source_states = LmdbOption::create(&mut env, Some(SOURCE_STATES_DB_NAME))?;
let log_position = LmdbOption::create(&mut env, Some(LOG_POSITION_DB_NAME))?;
let connection_snapshotting_done =
LmdbMap::create(&mut env, Some(CONNECTION_SNAPSHOTTING_DONE_DB_NAME))?;

Expand Down Expand Up @@ -213,7 +232,8 @@ impl RwMainEnvironment {
base_path,
schema,
schema_option,
metadata,
log_position,
source_states,
connection_snapshotting_done,
operation_log,
intersection_chunk_size: options.intersection_chunk_size,
Expand Down Expand Up @@ -389,14 +409,6 @@ impl RwMainEnvironment {
}
}

pub fn set_metadata(&mut self, metadata: u64) -> Result<(), CacheError> {
let txn = self.env.txn_mut()?;
self.common
.metadata
.store(txn, &metadata)
.map_err(Into::into)
}

pub fn set_connection_snapshotting_done(
&mut self,
connection_name: &str,
Expand All @@ -408,7 +420,19 @@ impl RwMainEnvironment {
.map_err(Into::into)
}

pub fn commit(&mut self) -> Result<(), CacheError> {
pub fn commit(
&mut self,
source_states: &SourceStates,
log_position: u64,
) -> Result<(), CacheError> {
let txn = self.env.txn_mut()?;
self.common.source_states.store(
txn,
bincode::serialize(source_states)
.map_err(CacheError::map_serialization_error)?
.as_slice(),
)?;
self.common.log_position.store(txn, &log_position)?;
self.env.commit().map_err(Into::into)
}
}
Expand Down Expand Up @@ -585,7 +609,8 @@ impl RoMainEnvironment {

let operation_log = OperationLog::open(&env, labels.clone())?;
let schema_option = LmdbOption::open(&env, Some(SCHEMA_DB_NAME))?;
let metadata = LmdbOption::open(&env, Some(METADATA_DB_NAME))?;
let source_states = LmdbOption::open(&env, Some(SOURCE_STATES_DB_NAME))?;
let log_position = LmdbOption::open(&env, Some(LOG_POSITION_DB_NAME))?;
let connection_snapshotting_done =
LmdbMap::open(&env, Some(CONNECTION_SNAPSHOTTING_DONE_DB_NAME))?;

Expand All @@ -600,7 +625,8 @@ impl RoMainEnvironment {
base_path: base_path.to_path_buf(),
schema,
schema_option,
metadata,
source_states,
log_position,
connection_snapshotting_done,
operation_log,
intersection_chunk_size: options.intersection_chunk_size,
Expand Down
21 changes: 13 additions & 8 deletions dozer-cache/src/cache/lmdb/cache/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use dozer_tracing::Labels;
use dozer_types::node::SourceStates;
use dozer_types::parking_lot::Mutex;
use std::collections::HashSet;
use std::path::PathBuf;
Expand Down Expand Up @@ -145,8 +146,12 @@ impl<C: LmdbCache> RoCache for C {
self.main_env().schema()
}

fn get_metadata(&self) -> Result<Option<u64>, CacheError> {
self.main_env().metadata()
fn get_source_states(&self) -> Result<Option<SourceStates>, CacheError> {
self.main_env().source_states()
}

fn get_log_position(&self) -> Result<Option<u64>, CacheError> {
self.main_env().log_position()
}

fn is_snapshotting_done(&self) -> Result<bool, CacheError> {
Expand All @@ -169,10 +174,6 @@ impl RwCache for LmdbRwCache {
self.main_env.update(old, new)
}

fn set_metadata(&mut self, metadata: u64) -> Result<(), CacheError> {
self.main_env.set_metadata(metadata)
}

fn set_connection_snapshotting_done(
&mut self,
connection_name: &str,
Expand All @@ -181,8 +182,12 @@ impl RwCache for LmdbRwCache {
.set_connection_snapshotting_done(connection_name)
}

fn commit(&mut self) -> Result<(), CacheError> {
self.main_env.commit()?;
fn commit(
&mut self,
source_states: &SourceStates,
log_position: u64,
) -> Result<(), CacheError> {
self.main_env.commit(source_states, log_position)?;
self.indexing_thread_pool.lock().wake(self.labels());
Ok(())
}
Expand Down
Loading

0 comments on commit 7e09ce7

Please sign in to comment.