diff --git a/dozer-api/src/cache_builder/mod.rs b/dozer-api/src/cache_builder/mod.rs index d0fe2fbd28..095d4b0e65 100644 --- a/dozer-api/src/cache_builder/mod.rs +++ b/dozer-api/src/cache_builder/mod.rs @@ -35,7 +35,7 @@ pub async fn build_cache( labels: LabelsAndProgress, ) -> Result<(), CacheError> { // Create log reader. - let starting_pos = cache.get_log_position()?.map(|pos| pos + 1).unwrap_or(0); + let starting_pos = cache.get_metadata()?.map(|pos| pos + 1).unwrap_or(0); debug!( "Starting log reader {} from position {starting_pos}", log_reader_builder.options.endpoint @@ -207,11 +207,9 @@ fn build_cache_task( } } }, - LogOperation::Commit { - source_states, - decision_instant, - } => { - cache.commit(&source_states, op_and_pos.pos)?; + LogOperation::Commit { decision_instant } => { + cache.set_metadata(op_and_pos.pos)?; + cache.commit()?; if let Ok(duration) = decision_instant.elapsed() { histogram!( DATA_LATENCY_HISTOGRAM_NAME, @@ -221,7 +219,9 @@ fn build_cache_task( } } LogOperation::SnapshottingDone { connection_name } => { + cache.set_metadata(op_and_pos.pos)?; cache.set_connection_snapshotting_done(&connection_name)?; + cache.commit()?; snapshotting = !cache.is_snapshotting_done()?; } } diff --git a/dozer-api/src/test_utils.rs b/dozer-api/src/test_utils.rs index 260294f522..404d5ee5e6 100644 --- a/dozer-api/src/test_utils.rs +++ b/dozer-api/src/test_utils.rs @@ -121,7 +121,7 @@ pub fn initialize_cache( for record in records { cache.insert(&record.record).unwrap(); } - cache.commit(&Default::default(), 0).unwrap(); + cache.commit().unwrap(); cache_manager.wait_until_indexing_catchup(); Box::new(cache_manager) diff --git a/dozer-cache/benches/cache.rs b/dozer-cache/benches/cache.rs index 6db28ce3c3..d0e8c3c87c 100644 --- a/dozer-cache/benches/cache.rs +++ b/dozer-cache/benches/cache.rs @@ -18,7 +18,7 @@ fn insert(cache: &Mutex>, n: usize, commit_size: usize) { cache.insert(&record).unwrap(); if n % commit_size == 0 { - cache.commit(&Default::default(), 0).unwrap(); + cache.commit().unwrap(); } } diff --git a/dozer-cache/src/cache/lmdb/cache/dump_restore.rs b/dozer-cache/src/cache/lmdb/cache/dump_restore.rs index ad9cb11b84..9ef50ff963 100644 --- a/dozer-cache/src/cache/lmdb/cache/dump_restore.rs +++ b/dozer-cache/src/cache/lmdb/cache/dump_restore.rs @@ -41,7 +41,7 @@ pub fn begin_dump_txn( ) -> Result, CacheError> { let main_env = cache.main_env(); let main_txn = main_env.begin_txn()?; - let main_env_metadata = main_env.log_positions_with_txn(&main_txn)?; + let main_env_metadata = main_env.metadata_with_txn(&main_txn)?; let mut secondary_txns = vec![]; let mut secondary_metadata = vec![]; @@ -139,7 +139,7 @@ mod tests { insert_rec_1(&mut cache, (0, Some("a".to_string()), None)); insert_rec_1(&mut cache, (1, None, Some(2))); insert_rec_1(&mut cache, (2, Some("b".to_string()), Some(3))); - cache.commit(&Default::default(), 0).unwrap(); + cache.commit().unwrap(); indexing_thread_pool.lock().wait_until_catchup(); let mut data = vec![]; diff --git a/dozer-cache/src/cache/lmdb/cache/main_environment/conflict_resolution_tests.rs b/dozer-cache/src/cache/lmdb/cache/main_environment/conflict_resolution_tests.rs index dde7048e13..335e66bed2 100644 --- a/dozer-cache/src/cache/lmdb/cache/main_environment/conflict_resolution_tests.rs +++ b/dozer-cache/src/cache/lmdb/cache/main_environment/conflict_resolution_tests.rs @@ -36,7 +36,7 @@ fn ignore_insert_error_when_type_nothing() { lifetime: None, }; env.insert(&record).unwrap(); - env.commit(&Default::default(), 0).unwrap(); + env.commit().unwrap(); let key = index::get_primary_key(&schema.primary_index, &initial_values); let record = env.get(&key).unwrap(); @@ -67,7 +67,7 @@ fn update_after_insert_error_when_type_update() { lifetime: None, }; env.insert(&record).unwrap(); - env.commit(&Default::default(), 0).unwrap(); + env.commit().unwrap(); let key = index::get_primary_key(&schema.primary_index, &initial_values); let record = env.get(&key).unwrap(); @@ -85,7 +85,7 @@ fn update_after_insert_error_when_type_update() { }; env.insert(&second_record).unwrap(); - env.commit(&Default::default(), 0).unwrap(); + env.commit().unwrap(); let key = index::get_primary_key(&schema.primary_index, &initial_values); let record = env.get(&key).unwrap(); @@ -113,7 +113,7 @@ fn return_insert_error_when_type_panic() { lifetime: None, }; env.insert(&record).unwrap(); - env.commit(&Default::default(), 0).unwrap(); + env.commit().unwrap(); let key = index::get_primary_key(&schema.primary_index, &initial_values); let record = env.get(&key).unwrap(); @@ -179,7 +179,7 @@ fn update_after_update_error_when_type_upsert() { lifetime: None, }; env.update(&initial_record, &update_record).unwrap(); - env.commit(&Default::default(), 0).unwrap(); + env.commit().unwrap(); let key = index::get_primary_key(&schema.primary_index, &initial_values); let record = env.get(&key).unwrap(); diff --git a/dozer-cache/src/cache/lmdb/cache/main_environment/dump_restore.rs b/dozer-cache/src/cache/lmdb/cache/main_environment/dump_restore.rs index 787a900438..8e6b856401 100644 --- a/dozer-cache/src/cache/lmdb/cache/main_environment/dump_restore.rs +++ b/dozer-cache/src/cache/lmdb/cache/main_environment/dump_restore.rs @@ -15,8 +15,7 @@ use crate::{ use super::{ MainEnvironment, MainEnvironmentCommon, OperationLog, RwMainEnvironment, - CONNECTION_SNAPSHOTTING_DONE_DB_NAME, LOG_POSITION_DB_NAME, SCHEMA_DB_NAME, - SOURCE_STATES_DB_NAME, + CONNECTION_SNAPSHOTTING_DONE_DB_NAME, METADATA_DB_NAME, SCHEMA_DB_NAME, }; pub async fn dump<'txn, E: MainEnvironment, T: Transaction>( @@ -33,15 +32,8 @@ pub async fn dump<'txn, E: MainEnvironment, T: Transaction>( .await?; dozer_storage::dump( txn, - SOURCE_STATES_DB_NAME, - env.common().source_states.database(), - context, - ) - .await?; - dozer_storage::dump( - txn, - LOG_POSITION_DB_NAME, - env.common().log_position.database(), + METADATA_DB_NAME, + env.common().metadata.database(), context, ) .await?; @@ -65,9 +57,7 @@ pub async fn restore( info!("Restoring schema"); dozer_storage::restore(&mut env, reader).await?; - info!("Restoring source states"); - dozer_storage::restore(&mut env, reader).await?; - info!("Restoring log position"); + info!("Restoring metadata"); dozer_storage::restore(&mut env, reader).await?; info!("Restoring connection snapshotting done"); dozer_storage::restore(&mut env, reader).await?; @@ -75,8 +65,7 @@ pub async fn restore( let operation_log = OperationLog::restore(&mut env, reader, labels).await?; let schema_option = LmdbOption::open(&env, Some(SCHEMA_DB_NAME))?; - let source_states = LmdbOption::open(&env, Some(SOURCE_STATES_DB_NAME))?; - let log_position = LmdbOption::open(&env, Some(LOG_POSITION_DB_NAME))?; + let metadata = LmdbOption::open(&env, Some(METADATA_DB_NAME))?; let connection_snapshotting_done = LmdbMap::open(&env, Some(CONNECTION_SNAPSHOTTING_DONE_DB_NAME))?; @@ -91,8 +80,7 @@ pub async fn restore( base_path, schema, schema_option, - source_states, - log_position, + metadata, connection_snapshotting_done, operation_log, intersection_chunk_size: options.intersection_chunk_size, @@ -130,15 +118,9 @@ pub mod tests { ); assert_database_equal( &txn1, - env1.common().source_states.database(), - &txn2, - env2.common().source_states.database(), - ); - assert_database_equal( - &txn1, - env1.common().log_position.database(), + env1.common().metadata.database(), &txn2, - env2.common().log_position.database(), + env2.common().metadata.database(), ); assert_database_equal( &txn1, @@ -169,7 +151,7 @@ pub mod tests { env.insert(&record).unwrap(); env.insert(&record).unwrap(); env.delete(&record).unwrap(); - env.commit(&Default::default(), 0).unwrap(); + env.commit().unwrap(); let mut data = vec![]; { diff --git a/dozer-cache/src/cache/lmdb/cache/main_environment/mod.rs b/dozer-cache/src/cache/lmdb/cache/main_environment/mod.rs index 638f0c49fb..aa44051893 100644 --- a/dozer-cache/src/cache/lmdb/cache/main_environment/mod.rs +++ b/dozer-cache/src/cache/lmdb/cache/main_environment/mod.rs @@ -12,9 +12,7 @@ use dozer_storage::{ }; use dozer_tracing::Labels; use dozer_types::{ - borrow::{Borrow, IntoOwned}, - node::SourceStates, - serde_json, + borrow::IntoOwned, types::{Field, FieldType, Record, Schema, SchemaWithIndex}, }; use dozer_types::{ @@ -80,21 +78,8 @@ pub trait MainEnvironment: LmdbEnvironment { .ok_or(CacheError::PrimaryKeyNotFound) } - fn source_states(&self) -> Result, CacheError> { - let txn = self.begin_txn()?; - self.common() - .source_states - .load(&txn)? - .map(|source_states| { - serde_json::from_str(source_states.borrow()) - .map_err(|e| CacheError::DeserializeSourceStates(e)) - }) - .transpose() - } - - fn log_position(&self) -> Result, CacheError> { - let txn = self.begin_txn()?; - self.log_positions_with_txn(&txn) + fn metadata(&self) -> Result, CacheError> { + self.metadata_with_txn(&self.begin_txn()?) } fn is_snapshotting_done(&self) -> Result { @@ -107,9 +92,9 @@ pub trait MainEnvironment: LmdbEnvironment { Ok(true) } - fn log_positions_with_txn(&self, txn: &T) -> Result, CacheError> { + fn metadata_with_txn(&self, txn: &T) -> Result, CacheError> { self.common() - .log_position + .metadata .load(txn) .map(|data| data.map(IntoOwned::into_owned)) .map_err(Into::into) @@ -117,8 +102,7 @@ pub trait MainEnvironment: LmdbEnvironment { } const SCHEMA_DB_NAME: &str = "schema"; -const SOURCE_STATES_DB_NAME: &str = "source_states"; -const LOG_POSITION_DB_NAME: &str = "log_position"; +const METADATA_DB_NAME: &str = "metadata"; const CONNECTION_SNAPSHOTTING_DONE_DB_NAME: &str = "connection_snapshotting_done"; #[derive(Debug, Clone)] @@ -129,10 +113,8 @@ pub struct MainEnvironmentCommon { schema: SchemaWithIndex, /// The schema database, used for dumping. schema_option: LmdbOption, - /// The serialized source states. - source_states: LmdbOption, - /// The log position. - log_position: LmdbOption, + /// The metadata. + metadata: LmdbOption, /// The source status. connection_snapshotting_done: LmdbMap, /// The operation log. @@ -171,8 +153,7 @@ impl RwMainEnvironment { let operation_log = OperationLog::create(&mut env, labels.clone())?; let schema_option = LmdbOption::create(&mut env, Some(SCHEMA_DB_NAME))?; - let source_states = LmdbOption::create(&mut env, Some(SOURCE_STATES_DB_NAME))?; - let log_position = LmdbOption::create(&mut env, Some(LOG_POSITION_DB_NAME))?; + let metadata = LmdbOption::create(&mut env, Some(METADATA_DB_NAME))?; let connection_snapshotting_done = LmdbMap::create(&mut env, Some(CONNECTION_SNAPSHOTTING_DONE_DB_NAME))?; @@ -232,8 +213,7 @@ impl RwMainEnvironment { base_path, schema, schema_option, - log_position, - source_states, + metadata, connection_snapshotting_done, operation_log, intersection_chunk_size: options.intersection_chunk_size, @@ -409,6 +389,14 @@ impl RwMainEnvironment { } } + pub fn set_metadata(&mut self, metadata: u64) -> Result<(), CacheError> { + let txn = self.env.txn_mut()?; + self.common + .metadata + .store(txn, &metadata) + .map_err(Into::into) + } + pub fn set_connection_snapshotting_done( &mut self, connection_name: &str, @@ -420,19 +408,7 @@ impl RwMainEnvironment { .map_err(Into::into) } - pub fn commit( - &mut self, - source_states: &SourceStates, - log_position: u64, - ) -> Result<(), CacheError> { - let txn = self.env.txn_mut()?; - self.common.source_states.store( - txn, - serde_json::to_string(source_states) - .expect("`SourceStates` must be serializable to JSON") - .as_str(), - )?; - self.common.log_position.store(txn, &log_position)?; + pub fn commit(&mut self) -> Result<(), CacheError> { self.env.commit().map_err(Into::into) } } @@ -609,8 +585,7 @@ impl RoMainEnvironment { let operation_log = OperationLog::open(&env, labels.clone())?; let schema_option = LmdbOption::open(&env, Some(SCHEMA_DB_NAME))?; - let source_states = LmdbOption::open(&env, Some(SOURCE_STATES_DB_NAME))?; - let log_position = LmdbOption::open(&env, Some(LOG_POSITION_DB_NAME))?; + let metadata = LmdbOption::open(&env, Some(METADATA_DB_NAME))?; let connection_snapshotting_done = LmdbMap::open(&env, Some(CONNECTION_SNAPSHOTTING_DONE_DB_NAME))?; @@ -625,8 +600,7 @@ impl RoMainEnvironment { base_path: base_path.to_path_buf(), schema, schema_option, - source_states, - log_position, + metadata, connection_snapshotting_done, operation_log, intersection_chunk_size: options.intersection_chunk_size, diff --git a/dozer-cache/src/cache/lmdb/cache/mod.rs b/dozer-cache/src/cache/lmdb/cache/mod.rs index 1fd904441c..84ac7bd41f 100644 --- a/dozer-cache/src/cache/lmdb/cache/mod.rs +++ b/dozer-cache/src/cache/lmdb/cache/mod.rs @@ -1,5 +1,4 @@ use dozer_tracing::Labels; -use dozer_types::node::SourceStates; use dozer_types::parking_lot::Mutex; use std::collections::HashSet; use std::path::PathBuf; @@ -146,12 +145,8 @@ impl RoCache for C { self.main_env().schema() } - fn get_source_states(&self) -> Result, CacheError> { - self.main_env().source_states() - } - - fn get_log_position(&self) -> Result, CacheError> { - self.main_env().log_position() + fn get_metadata(&self) -> Result, CacheError> { + self.main_env().metadata() } fn is_snapshotting_done(&self) -> Result { @@ -174,6 +169,10 @@ impl RwCache for LmdbRwCache { self.main_env.update(old, new) } + fn set_metadata(&mut self, metadata: u64) -> Result<(), CacheError> { + self.main_env.set_metadata(metadata) + } + fn set_connection_snapshotting_done( &mut self, connection_name: &str, @@ -182,12 +181,8 @@ impl RwCache for LmdbRwCache { .set_connection_snapshotting_done(connection_name) } - fn commit( - &mut self, - source_states: &SourceStates, - log_position: u64, - ) -> Result<(), CacheError> { - self.main_env.commit(source_states, log_position)?; + fn commit(&mut self) -> Result<(), CacheError> { + self.main_env.commit()?; self.indexing_thread_pool.lock().wake(self.labels()); Ok(()) } diff --git a/dozer-cache/src/cache/lmdb/cache/query/tests.rs b/dozer-cache/src/cache/lmdb/cache/query/tests.rs index ad7997af09..4594aadeb4 100644 --- a/dozer-cache/src/cache/lmdb/cache/query/tests.rs +++ b/dozer-cache/src/cache/lmdb/cache/query/tests.rs @@ -20,7 +20,7 @@ fn query_secondary_sorted_inverted() { ]); cache.insert(&record).unwrap(); - cache.commit(&Default::default(), 0).unwrap(); + cache.commit().unwrap(); indexing_thread_pool.lock().wait_until_catchup(); let filter = FilterExpression::And(vec![ @@ -51,7 +51,7 @@ fn query_secondary_full_text() { ]); cache.insert(&record).unwrap(); - cache.commit(&Default::default(), 0).unwrap(); + cache.commit().unwrap(); indexing_thread_pool.lock().wait_until_catchup(); let filter = FilterExpression::Simple("foo".into(), Operator::Contains, "good".into()); @@ -89,7 +89,7 @@ fn query_secondary_vars() { for val in items { insert_rec_1(&mut cache, val); } - cache.commit(&Default::default(), 0).unwrap(); + cache.commit().unwrap(); indexing_thread_pool.lock().wait_until_catchup(); test_query(json!({}), 8, &cache); @@ -198,7 +198,7 @@ fn query_secondary_multi_indices() { }; cache.insert(&record).unwrap(); } - cache.commit(&Default::default(), 0).unwrap(); + cache.commit().unwrap(); indexing_thread_pool.lock().wait_until_catchup(); let query = query_from_filter(FilterExpression::And(vec![ diff --git a/dozer-cache/src/cache/lmdb/cache/secondary_environment/dump_restore.rs b/dozer-cache/src/cache/lmdb/cache/secondary_environment/dump_restore.rs index 1b392ead6d..58b4a3642c 100644 --- a/dozer-cache/src/cache/lmdb/cache/secondary_environment/dump_restore.rs +++ b/dozer-cache/src/cache/lmdb/cache/secondary_environment/dump_restore.rs @@ -155,7 +155,7 @@ pub mod tests { main_env.insert(&record_a).unwrap(); main_env.insert(&record_b).unwrap(); main_env.delete(&record_a).unwrap(); - main_env.commit(&Default::default(), 0).unwrap(); + main_env.commit().unwrap(); let mut env = RwSecondaryEnvironment::new( &IndexDefinition::SortedInverted(vec![0]), diff --git a/dozer-cache/src/cache/lmdb/cache/secondary_environment/indexer.rs b/dozer-cache/src/cache/lmdb/cache/secondary_environment/indexer.rs index 40484e1283..fb3a1e8ac6 100644 --- a/dozer-cache/src/cache/lmdb/cache/secondary_environment/indexer.rs +++ b/dozer-cache/src/cache/lmdb/cache/secondary_environment/indexer.rs @@ -116,7 +116,7 @@ mod tests { for val in items.clone() { lmdb_utils::insert_rec_1(&mut cache, val); } - cache.commit(&Default::default(), 0).unwrap(); + cache.commit().unwrap(); indexing_thread_pool.lock().wait_until_catchup(); // No of index dbs @@ -137,7 +137,7 @@ mod tests { }; cache.delete(&record).unwrap(); } - cache.commit(&Default::default(), 0).unwrap(); + cache.commit().unwrap(); indexing_thread_pool.lock().wait_until_catchup(); assert_eq!( @@ -187,7 +187,7 @@ mod tests { cache.delete(&record).unwrap(); } - cache.commit(&Default::default(), 0).unwrap(); + cache.commit().unwrap(); indexing_thread_pool.lock().wait_until_catchup(); assert_eq!( diff --git a/dozer-cache/src/cache/lmdb/tests/basic.rs b/dozer-cache/src/cache/lmdb/tests/basic.rs index 16131bd3be..f36d1d1888 100644 --- a/dozer-cache/src/cache/lmdb/tests/basic.rs +++ b/dozer-cache/src/cache/lmdb/tests/basic.rs @@ -50,7 +50,7 @@ fn insert_get_and_delete_record() { let UpsertResult::Inserted { meta } = cache.insert(&record).unwrap() else { panic!("Must be inserted") }; - cache.commit(&Default::default(), 0).unwrap(); + cache.commit().unwrap(); indexing_thread_pool.lock().wait_until_catchup(); assert_eq!(cache.count(&QueryExpression::with_no_limit()).unwrap(), 1); @@ -70,7 +70,7 @@ fn insert_get_and_delete_record() { .unwrap(), meta ); - cache.commit(&Default::default(), 0).unwrap(); + cache.commit().unwrap(); indexing_thread_pool.lock().wait_until_catchup(); assert_eq!(cache.count(&QueryExpression::with_no_limit()).unwrap(), 0); @@ -106,7 +106,7 @@ fn insert_and_query_record_impl( let record = Record::new(vec![Field::String(val)]); cache.insert(&record).unwrap(); - cache.commit(&Default::default(), 0).unwrap(); + cache.commit().unwrap(); indexing_thread_pool.lock().wait_until_catchup(); // Query with an expression @@ -152,7 +152,7 @@ fn update_record_when_primary_changes() { }; cache.insert(&initial_record).unwrap(); - cache.commit(&Default::default(), 0).unwrap(); + cache.commit().unwrap(); let key = index::get_primary_key(&schema.primary_index, &initial_values); let record = cache.get(&key).unwrap().record; @@ -160,7 +160,7 @@ fn update_record_when_primary_changes() { assert_eq!(initial_values, record.values); cache.update(&initial_record, &updated_record).unwrap(); - cache.commit(&Default::default(), 0).unwrap(); + cache.commit().unwrap(); // Primary key with old values let key = index::get_primary_key(&schema.primary_index, &initial_values); @@ -177,20 +177,10 @@ fn update_record_when_primary_changes() { } #[test] -fn test_cache_source_states() { +fn test_cache_metadata() { let (mut cache, _, _) = _setup(); - assert!(cache.get_source_states().unwrap().is_none()); - cache.commit(&Default::default(), 0).unwrap(); - assert_eq!( - cache.get_source_states().unwrap().unwrap(), - Default::default() - ); -} - -#[test] -fn test_cache_log_position() { - let (mut cache, _, _) = _setup(); - assert!(cache.get_log_position().unwrap().is_none()); - cache.commit(&Default::default(), 32).unwrap(); - assert_eq!(cache.get_log_position().unwrap().unwrap(), 32); + assert!(cache.get_metadata().unwrap().is_none()); + cache.set_metadata(32).unwrap(); + cache.commit().unwrap(); + assert_eq!(cache.get_metadata().unwrap().unwrap(), 32); } diff --git a/dozer-cache/src/cache/lmdb/tests/read_write.rs b/dozer-cache/src/cache/lmdb/tests/read_write.rs index 964159cee3..2d321cb4ef 100644 --- a/dozer-cache/src/cache/lmdb/tests/read_write.rs +++ b/dozer-cache/src/cache/lmdb/tests/read_write.rs @@ -43,7 +43,7 @@ fn read_and_write() { for val in items.clone() { lmdb_utils::insert_rec_1(&mut cache_writer, val.clone()); } - cache_writer.commit(&Default::default(), 0).unwrap(); + cache_writer.commit().unwrap(); indexing_thread_pool.lock().wait_until_catchup(); diff --git a/dozer-cache/src/cache/mod.rs b/dozer-cache/src/cache/mod.rs index a7343f9ecf..512e81556b 100644 --- a/dozer-cache/src/cache/mod.rs +++ b/dozer-cache/src/cache/mod.rs @@ -8,7 +8,6 @@ use dozer_tracing::Labels; use dozer_types::models::api_endpoint::{ OnDeleteResolutionTypes, OnInsertResolutionTypes, OnUpdateResolutionTypes, }; -use dozer_types::node::SourceStates; use dozer_types::{ serde::{Deserialize, Serialize}, types::{IndexDefinition, Record, Schema, SchemaWithIndex}, @@ -93,8 +92,7 @@ pub trait RoCache: Send + Sync + Debug { fn query(&self, query: &QueryExpression) -> Result, CacheError>; // Cache metadata - fn get_source_states(&self) -> Result, CacheError>; - fn get_log_position(&self) -> Result, CacheError>; + fn get_metadata(&self) -> Result, CacheError>; fn is_snapshotting_done(&self) -> Result; } @@ -143,11 +141,11 @@ pub trait RwCache: RoCache { /// If the schema has primary index, only fields that are part of the primary index are used to identify the old record. fn update(&mut self, old: &Record, record: &Record) -> Result; - /// Marks a connection as snapshotting done. Implicitly starts a transaction if there's no active transaction. + /// Sets the metadata of the cache. Implicitly starts a transaction if there's no active transaction. + fn set_metadata(&mut self, metadata: u64) -> Result<(), CacheError>; fn set_connection_snapshotting_done(&mut self, connection_name: &str) -> Result<(), CacheError>; /// Commits the current transaction. - fn commit(&mut self, source_states: &SourceStates, log_position: u64) - -> Result<(), CacheError>; + fn commit(&mut self) -> Result<(), CacheError>; } diff --git a/dozer-cache/src/errors.rs b/dozer-cache/src/errors.rs index 36e96fa33a..7e80d19b85 100644 --- a/dozer-cache/src/errors.rs +++ b/dozer-cache/src/errors.rs @@ -3,8 +3,8 @@ use std::path::PathBuf; use dozer_storage::errors::StorageError; use dozer_storage::RestoreError; +use dozer_types::thiserror; use dozer_types::thiserror::Error; -use dozer_types::{serde_json, thiserror}; use dozer_log::errors::ReaderError; use dozer_types::errors::types::{DeserializationError, SerializationError, TypeError}; @@ -69,8 +69,6 @@ pub enum CacheError { meta: RecordMeta, insert_operation_id: u64, }, - #[error("Failed to deserialize source states: {0}")] - DeserializeSourceStates(#[from] serde_json::Error), #[error("Internal thread panic: {0}")] InternalThreadPanic(#[source] tokio::task::JoinError), } diff --git a/dozer-cli/src/pipeline/log_sink.rs b/dozer-cli/src/pipeline/log_sink.rs index a2cee3664d..06fafc9169 100644 --- a/dozer-cli/src/pipeline/log_sink.rs +++ b/dozer-cli/src/pipeline/log_sink.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, fmt::Debug, ops::Deref, sync::Arc}; +use std::{collections::HashMap, fmt::Debug, sync::Arc}; use dozer_cache::dozer_log::{ replication::{Log, LogOperation}, @@ -104,7 +104,6 @@ impl Sink for LogSink { .runtime .block_on(self.log.lock()) .write(LogOperation::Commit { - source_states: epoch_details.common_info.source_states.deref().clone(), decision_instant: epoch_details.decision_instant, }); self.pb.set_position(end as u64); diff --git a/dozer-log/src/errors.rs b/dozer-log/src/errors.rs index 159b8a935b..0070659da7 100644 --- a/dozer-log/src/errors.rs +++ b/dozer-log/src/errors.rs @@ -1,8 +1,6 @@ use dozer_types::thiserror::Error; use dozer_types::{bincode, serde_json, thiserror, tonic}; -use crate::replication::LoadPersistedLogEntryError; - #[derive(Error, Debug)] pub enum ReaderBuilderError { #[error("Tonic transport error: {0:?}")] @@ -19,8 +17,10 @@ pub enum ReaderBuilderError { pub enum ReaderError { #[error("Failed to deserialize log response: {0}")] DeserializeLogResponse(#[source] bincode::Error), - #[error("Failed to load persisted log entry: {0}")] - LoadPersistedLogEntry(#[from] LoadPersistedLogEntryError), + #[error("Failed to deserialize log entry: {0}")] + DeserializeLogEntry(#[source] bincode::Error), + #[error("Storage error: {0}")] + Storage(#[from] crate::storage::Error), #[error("Reader thread has quit: {0:?}")] ReaderThreadQuit(#[source] Option), } diff --git a/dozer-log/src/reader.rs b/dozer-log/src/reader.rs index 4e705deeba..dff4965a9a 100644 --- a/dozer-log/src/reader.rs +++ b/dozer-log/src/reader.rs @@ -1,5 +1,5 @@ use crate::errors::ReaderBuilderError; -use crate::replication::{load_persisted_log_entry, LogOperation}; +use crate::replication::LogOperation; use crate::schemas::EndpointSchema; use crate::storage::{LocalStorage, S3Storage, Storage}; @@ -222,7 +222,9 @@ impl LogClient { persisted.key, persisted.range, request_range ); // Load the persisted log entry. - let mut ops = load_persisted_log_entry(&*self.storage, &persisted).await?; + let data = self.storage.download_object(persisted.key).await?; + let mut ops: Vec = + bincode::deserialize(&data).map_err(ReaderError::DeserializeLogEntry)?; // Discard the ops that are before the requested range. ops.drain(..request_range.start as usize - persisted.range.start); Ok(ops) diff --git a/dozer-log/src/replication/mod.rs b/dozer-log/src/replication/mod.rs index 1b32ea22d2..c6e27588d7 100644 --- a/dozer-log/src/replication/mod.rs +++ b/dozer-log/src/replication/mod.rs @@ -5,7 +5,6 @@ use std::time::{Duration, SystemTime}; use dozer_types::grpc_types::internal::storage_response; use dozer_types::log::{debug, error}; -use dozer_types::node::SourceStates; use dozer_types::serde::{Deserialize, Serialize}; use dozer_types::types::Operation; use dozer_types::{bincode, thiserror}; @@ -44,8 +43,6 @@ pub enum Error { LogEntryNotConsecutive(PersistedLogEntry, PersistedLogEntry), #[error("Serialization error: {0}")] Serialization(#[from] bincode::Error), - #[error("Load persisted log entry error: {0}")] - LoadPersistedLogEntry(#[from] LoadPersistedLogEntryError), #[error("Persisting thread has quit")] PersistingThreadQuit, } @@ -64,8 +61,6 @@ pub struct Log { watchers: Vec, storage: storage_response::Storage, prefix: String, - /// The checkpoint state this `Log` was restored from. - from_checkpoint: Option, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -94,10 +89,6 @@ impl Log { self.storage.clone() } - pub fn from_checkpoint(&self) -> Option<&SourceStates> { - self.from_checkpoint.as_ref() - } - pub async fn new( storage: &dyn Storage, prefix: String, @@ -115,17 +106,6 @@ impl Log { }; let watchers = vec![]; let storage_description = storage.describe(); - - let from_checkpoint = if let Some(persisted) = persisted.last() { - let mut ops = load_persisted_log_entry(storage, persisted).await?; - ops.pop().map(|op| match op { - LogOperation::Commit { source_states, .. } => source_states, - _ => panic!("Last operation in a log entry must be a commit"), - }) - } else { - None - }; - Ok(Self { persisted, in_memory, @@ -133,7 +113,6 @@ impl Log { watchers, storage: storage_description, prefix, - from_checkpoint, }) } @@ -178,12 +157,6 @@ impl Log { // Persist this entry. let ops = &self.in_memory.ops[self.in_memory.next_persist_start..]; - if let Some(op) = ops.last() { - assert!( - matches!(op, LogOperation::Commit { .. }), - "Last operation in a log entry must be a commit" - ); - } let start = self.in_memory.start + self.in_memory.next_persist_start; let end = self.in_memory.end(); let range = start..end; @@ -307,16 +280,9 @@ impl Log { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[serde(crate = "dozer_types::serde")] pub enum LogOperation { - Op { - op: Operation, - }, - Commit { - source_states: SourceStates, - decision_instant: SystemTime, - }, - SnapshottingDone { - connection_name: String, - }, + Op { op: Operation }, + Commit { decision_instant: SystemTime }, + SnapshottingDone { connection_name: String }, } #[derive(Debug, PartialEq, Serialize, Deserialize)] @@ -326,22 +292,6 @@ pub enum LogResponse { Operations(Vec), } -#[derive(Debug, thiserror::Error)] -pub enum LoadPersistedLogEntryError { - #[error("Storage error: {0}")] - Storage(#[from] super::storage::Error), - #[error("Deserialization error: {0}")] - DeserializeLogEntry(#[from] bincode::Error), -} - -pub async fn load_persisted_log_entry( - storage: &dyn Storage, - persisted: &PersistedLogEntry, -) -> Result, LoadPersistedLogEntryError> { - let data = storage.download_object(persisted.key.clone()).await?; - bincode::deserialize(&data).map_err(Into::into) -} - #[pin_project(project = LogResponseFutureProj)] pub enum LogResponseFuture { Persisted(PersistedLogEntry), diff --git a/dozer-tests/src/cache_tests/film/load_database.rs b/dozer-tests/src/cache_tests/film/load_database.rs index 973c5fbf4e..4295f54f3a 100644 --- a/dozer-tests/src/cache_tests/film/load_database.rs +++ b/dozer-tests/src/cache_tests/film/load_database.rs @@ -65,7 +65,7 @@ pub async fn load_database( .await .unwrap(); } - cache.commit(&Default::default(), 0).unwrap(); + cache.commit().unwrap(); cache_manager.wait_until_indexing_catchup(); drop(cache);