Skip to content

Commit

Permalink
Revert "feat: Store SourceStates in log and cache"
Browse files Browse the repository at this point in the history
This reverts commit d069b25.
  • Loading branch information
chubei committed Oct 16, 2023
1 parent d069b25 commit e6e12d2
Show file tree
Hide file tree
Showing 20 changed files with 90 additions and 202 deletions.
12 changes: 6 additions & 6 deletions dozer-api/src/cache_builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub async fn build_cache(
labels: LabelsAndProgress,
) -> Result<(), CacheError> {
// Create log reader.
let starting_pos = cache.get_log_position()?.map(|pos| pos + 1).unwrap_or(0);
let starting_pos = cache.get_metadata()?.map(|pos| pos + 1).unwrap_or(0);
debug!(
"Starting log reader {} from position {starting_pos}",
log_reader_builder.options.endpoint
Expand Down Expand Up @@ -207,11 +207,9 @@ fn build_cache_task(
}
}
},
LogOperation::Commit {
source_states,
decision_instant,
} => {
cache.commit(&source_states, op_and_pos.pos)?;
LogOperation::Commit { decision_instant } => {
cache.set_metadata(op_and_pos.pos)?;
cache.commit()?;
if let Ok(duration) = decision_instant.elapsed() {
histogram!(
DATA_LATENCY_HISTOGRAM_NAME,
Expand All @@ -221,7 +219,9 @@ fn build_cache_task(
}
}
LogOperation::SnapshottingDone { connection_name } => {
cache.set_metadata(op_and_pos.pos)?;
cache.set_connection_snapshotting_done(&connection_name)?;
cache.commit()?;
snapshotting = !cache.is_snapshotting_done()?;
}
}
Expand Down
2 changes: 1 addition & 1 deletion dozer-api/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ pub fn initialize_cache(
for record in records {
cache.insert(&record.record).unwrap();
}
cache.commit(&Default::default(), 0).unwrap();
cache.commit().unwrap();
cache_manager.wait_until_indexing_catchup();

Box::new(cache_manager)
Expand Down
2 changes: 1 addition & 1 deletion dozer-cache/benches/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ fn insert(cache: &Mutex<Box<dyn RwCache>>, n: usize, commit_size: usize) {
cache.insert(&record).unwrap();

if n % commit_size == 0 {
cache.commit(&Default::default(), 0).unwrap();
cache.commit().unwrap();
}
}

Expand Down
4 changes: 2 additions & 2 deletions dozer-cache/src/cache/lmdb/cache/dump_restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub fn begin_dump_txn<C: LmdbCache>(
) -> Result<DumpTransaction<RoTransaction>, CacheError> {
let main_env = cache.main_env();
let main_txn = main_env.begin_txn()?;
let main_env_metadata = main_env.log_positions_with_txn(&main_txn)?;
let main_env_metadata = main_env.metadata_with_txn(&main_txn)?;

let mut secondary_txns = vec![];
let mut secondary_metadata = vec![];
Expand Down Expand Up @@ -139,7 +139,7 @@ mod tests {
insert_rec_1(&mut cache, (0, Some("a".to_string()), None));
insert_rec_1(&mut cache, (1, None, Some(2)));
insert_rec_1(&mut cache, (2, Some("b".to_string()), Some(3)));
cache.commit(&Default::default(), 0).unwrap();
cache.commit().unwrap();
indexing_thread_pool.lock().wait_until_catchup();

let mut data = vec![];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ fn ignore_insert_error_when_type_nothing() {
lifetime: None,
};
env.insert(&record).unwrap();
env.commit(&Default::default(), 0).unwrap();
env.commit().unwrap();

let key = index::get_primary_key(&schema.primary_index, &initial_values);
let record = env.get(&key).unwrap();
Expand Down Expand Up @@ -67,7 +67,7 @@ fn update_after_insert_error_when_type_update() {
lifetime: None,
};
env.insert(&record).unwrap();
env.commit(&Default::default(), 0).unwrap();
env.commit().unwrap();

let key = index::get_primary_key(&schema.primary_index, &initial_values);
let record = env.get(&key).unwrap();
Expand All @@ -85,7 +85,7 @@ fn update_after_insert_error_when_type_update() {
};

env.insert(&second_record).unwrap();
env.commit(&Default::default(), 0).unwrap();
env.commit().unwrap();

let key = index::get_primary_key(&schema.primary_index, &initial_values);
let record = env.get(&key).unwrap();
Expand Down Expand Up @@ -113,7 +113,7 @@ fn return_insert_error_when_type_panic() {
lifetime: None,
};
env.insert(&record).unwrap();
env.commit(&Default::default(), 0).unwrap();
env.commit().unwrap();

let key = index::get_primary_key(&schema.primary_index, &initial_values);
let record = env.get(&key).unwrap();
Expand Down Expand Up @@ -179,7 +179,7 @@ fn update_after_update_error_when_type_upsert() {
lifetime: None,
};
env.update(&initial_record, &update_record).unwrap();
env.commit(&Default::default(), 0).unwrap();
env.commit().unwrap();

let key = index::get_primary_key(&schema.primary_index, &initial_values);
let record = env.get(&key).unwrap();
Expand Down
36 changes: 9 additions & 27 deletions dozer-cache/src/cache/lmdb/cache/main_environment/dump_restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ use crate::{

use super::{
MainEnvironment, MainEnvironmentCommon, OperationLog, RwMainEnvironment,
CONNECTION_SNAPSHOTTING_DONE_DB_NAME, LOG_POSITION_DB_NAME, SCHEMA_DB_NAME,
SOURCE_STATES_DB_NAME,
CONNECTION_SNAPSHOTTING_DONE_DB_NAME, METADATA_DB_NAME, SCHEMA_DB_NAME,
};

pub async fn dump<'txn, E: MainEnvironment, T: Transaction>(
Expand All @@ -33,15 +32,8 @@ pub async fn dump<'txn, E: MainEnvironment, T: Transaction>(
.await?;
dozer_storage::dump(
txn,
SOURCE_STATES_DB_NAME,
env.common().source_states.database(),
context,
)
.await?;
dozer_storage::dump(
txn,
LOG_POSITION_DB_NAME,
env.common().log_position.database(),
METADATA_DB_NAME,
env.common().metadata.database(),
context,
)
.await?;
Expand All @@ -65,18 +57,15 @@ pub async fn restore(

info!("Restoring schema");
dozer_storage::restore(&mut env, reader).await?;
info!("Restoring source states");
dozer_storage::restore(&mut env, reader).await?;
info!("Restoring log position");
info!("Restoring metadata");
dozer_storage::restore(&mut env, reader).await?;
info!("Restoring connection snapshotting done");
dozer_storage::restore(&mut env, reader).await?;
info!("Restoring operation log");
let operation_log = OperationLog::restore(&mut env, reader, labels).await?;

let schema_option = LmdbOption::open(&env, Some(SCHEMA_DB_NAME))?;
let source_states = LmdbOption::open(&env, Some(SOURCE_STATES_DB_NAME))?;
let log_position = LmdbOption::open(&env, Some(LOG_POSITION_DB_NAME))?;
let metadata = LmdbOption::open(&env, Some(METADATA_DB_NAME))?;
let connection_snapshotting_done =
LmdbMap::open(&env, Some(CONNECTION_SNAPSHOTTING_DONE_DB_NAME))?;

Expand All @@ -91,8 +80,7 @@ pub async fn restore(
base_path,
schema,
schema_option,
source_states,
log_position,
metadata,
connection_snapshotting_done,
operation_log,
intersection_chunk_size: options.intersection_chunk_size,
Expand Down Expand Up @@ -130,15 +118,9 @@ pub mod tests {
);
assert_database_equal(
&txn1,
env1.common().source_states.database(),
&txn2,
env2.common().source_states.database(),
);
assert_database_equal(
&txn1,
env1.common().log_position.database(),
env1.common().metadata.database(),
&txn2,
env2.common().log_position.database(),
env2.common().metadata.database(),
);
assert_database_equal(
&txn1,
Expand Down Expand Up @@ -169,7 +151,7 @@ pub mod tests {
env.insert(&record).unwrap();
env.insert(&record).unwrap();
env.delete(&record).unwrap();
env.commit(&Default::default(), 0).unwrap();
env.commit().unwrap();

let mut data = vec![];
{
Expand Down
68 changes: 21 additions & 47 deletions dozer-cache/src/cache/lmdb/cache/main_environment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@ use dozer_storage::{
};
use dozer_tracing::Labels;
use dozer_types::{
borrow::{Borrow, IntoOwned},
node::SourceStates,
serde_json,
borrow::IntoOwned,
types::{Field, FieldType, Record, Schema, SchemaWithIndex},
};
use dozer_types::{
Expand Down Expand Up @@ -80,21 +78,8 @@ pub trait MainEnvironment: LmdbEnvironment {
.ok_or(CacheError::PrimaryKeyNotFound)
}

fn source_states(&self) -> Result<Option<SourceStates>, CacheError> {
let txn = self.begin_txn()?;
self.common()
.source_states
.load(&txn)?
.map(|source_states| {
serde_json::from_str(source_states.borrow())
.map_err(|e| CacheError::DeserializeSourceStates(e))
})
.transpose()
}

fn log_position(&self) -> Result<Option<u64>, CacheError> {
let txn = self.begin_txn()?;
self.log_positions_with_txn(&txn)
fn metadata(&self) -> Result<Option<u64>, CacheError> {
self.metadata_with_txn(&self.begin_txn()?)
}

fn is_snapshotting_done(&self) -> Result<bool, CacheError> {
Expand All @@ -107,18 +92,17 @@ pub trait MainEnvironment: LmdbEnvironment {
Ok(true)
}

fn log_positions_with_txn<T: Transaction>(&self, txn: &T) -> Result<Option<u64>, CacheError> {
fn metadata_with_txn<T: Transaction>(&self, txn: &T) -> Result<Option<u64>, CacheError> {
self.common()
.log_position
.metadata
.load(txn)
.map(|data| data.map(IntoOwned::into_owned))
.map_err(Into::into)
}
}

const SCHEMA_DB_NAME: &str = "schema";
const SOURCE_STATES_DB_NAME: &str = "source_states";
const LOG_POSITION_DB_NAME: &str = "log_position";
const METADATA_DB_NAME: &str = "metadata";
const CONNECTION_SNAPSHOTTING_DONE_DB_NAME: &str = "connection_snapshotting_done";

#[derive(Debug, Clone)]
Expand All @@ -129,10 +113,8 @@ pub struct MainEnvironmentCommon {
schema: SchemaWithIndex,
/// The schema database, used for dumping.
schema_option: LmdbOption<SchemaWithIndex>,
/// The serialized source states.
source_states: LmdbOption<String>,
/// The log position.
log_position: LmdbOption<u64>,
/// The metadata.
metadata: LmdbOption<u64>,
/// The source status.
connection_snapshotting_done: LmdbMap<String, bool>,
/// The operation log.
Expand Down Expand Up @@ -171,8 +153,7 @@ impl RwMainEnvironment {

let operation_log = OperationLog::create(&mut env, labels.clone())?;
let schema_option = LmdbOption::create(&mut env, Some(SCHEMA_DB_NAME))?;
let source_states = LmdbOption::create(&mut env, Some(SOURCE_STATES_DB_NAME))?;
let log_position = LmdbOption::create(&mut env, Some(LOG_POSITION_DB_NAME))?;
let metadata = LmdbOption::create(&mut env, Some(METADATA_DB_NAME))?;
let connection_snapshotting_done =
LmdbMap::create(&mut env, Some(CONNECTION_SNAPSHOTTING_DONE_DB_NAME))?;

Expand Down Expand Up @@ -232,8 +213,7 @@ impl RwMainEnvironment {
base_path,
schema,
schema_option,
log_position,
source_states,
metadata,
connection_snapshotting_done,
operation_log,
intersection_chunk_size: options.intersection_chunk_size,
Expand Down Expand Up @@ -409,6 +389,14 @@ impl RwMainEnvironment {
}
}

pub fn set_metadata(&mut self, metadata: u64) -> Result<(), CacheError> {
let txn = self.env.txn_mut()?;
self.common
.metadata
.store(txn, &metadata)
.map_err(Into::into)
}

pub fn set_connection_snapshotting_done(
&mut self,
connection_name: &str,
Expand All @@ -420,19 +408,7 @@ impl RwMainEnvironment {
.map_err(Into::into)
}

pub fn commit(
&mut self,
source_states: &SourceStates,
log_position: u64,
) -> Result<(), CacheError> {
let txn = self.env.txn_mut()?;
self.common.source_states.store(
txn,
serde_json::to_string(source_states)
.expect("`SourceStates` must be serializable to JSON")
.as_str(),
)?;
self.common.log_position.store(txn, &log_position)?;
pub fn commit(&mut self) -> Result<(), CacheError> {
self.env.commit().map_err(Into::into)
}
}
Expand Down Expand Up @@ -609,8 +585,7 @@ impl RoMainEnvironment {

let operation_log = OperationLog::open(&env, labels.clone())?;
let schema_option = LmdbOption::open(&env, Some(SCHEMA_DB_NAME))?;
let source_states = LmdbOption::open(&env, Some(SOURCE_STATES_DB_NAME))?;
let log_position = LmdbOption::open(&env, Some(LOG_POSITION_DB_NAME))?;
let metadata = LmdbOption::open(&env, Some(METADATA_DB_NAME))?;
let connection_snapshotting_done =
LmdbMap::open(&env, Some(CONNECTION_SNAPSHOTTING_DONE_DB_NAME))?;

Expand All @@ -625,8 +600,7 @@ impl RoMainEnvironment {
base_path: base_path.to_path_buf(),
schema,
schema_option,
source_states,
log_position,
metadata,
connection_snapshotting_done,
operation_log,
intersection_chunk_size: options.intersection_chunk_size,
Expand Down
21 changes: 8 additions & 13 deletions dozer-cache/src/cache/lmdb/cache/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use dozer_tracing::Labels;
use dozer_types::node::SourceStates;
use dozer_types::parking_lot::Mutex;
use std::collections::HashSet;
use std::path::PathBuf;
Expand Down Expand Up @@ -146,12 +145,8 @@ impl<C: LmdbCache> RoCache for C {
self.main_env().schema()
}

fn get_source_states(&self) -> Result<Option<SourceStates>, CacheError> {
self.main_env().source_states()
}

fn get_log_position(&self) -> Result<Option<u64>, CacheError> {
self.main_env().log_position()
fn get_metadata(&self) -> Result<Option<u64>, CacheError> {
self.main_env().metadata()
}

fn is_snapshotting_done(&self) -> Result<bool, CacheError> {
Expand All @@ -174,6 +169,10 @@ impl RwCache for LmdbRwCache {
self.main_env.update(old, new)
}

fn set_metadata(&mut self, metadata: u64) -> Result<(), CacheError> {
self.main_env.set_metadata(metadata)
}

fn set_connection_snapshotting_done(
&mut self,
connection_name: &str,
Expand All @@ -182,12 +181,8 @@ impl RwCache for LmdbRwCache {
.set_connection_snapshotting_done(connection_name)
}

fn commit(
&mut self,
source_states: &SourceStates,
log_position: u64,
) -> Result<(), CacheError> {
self.main_env.commit(source_states, log_position)?;
fn commit(&mut self) -> Result<(), CacheError> {
self.main_env.commit()?;
self.indexing_thread_pool.lock().wake(self.labels());
Ok(())
}
Expand Down
Loading

0 comments on commit e6e12d2

Please sign in to comment.