diff --git a/Cargo.lock b/Cargo.lock index 1c72db0c6f..c6dc4cee48 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2666,6 +2666,7 @@ dependencies = [ "tower", "tower-http", "tracing-actix-web", + "uuid", ] [[package]] @@ -8808,9 +8809,9 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" [[package]] name = "uuid" -version = "1.3.1" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b55a3fef2a1e3b3a00ce878640918820d3c51081576ac657d23af9fc7928fdb" +checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d" dependencies = [ "atomic", "getrandom", diff --git a/dozer-api/Cargo.toml b/dozer-api/Cargo.toml index ee39e8f577..8049744536 100644 --- a/dozer-api/Cargo.toml +++ b/dozer-api/Cargo.toml @@ -51,6 +51,7 @@ bytes = "1.4.0" http = "0.2.9" pin-project = "1.1.3" async-stream = "0.3.5" +uuid = "1.4.1" [dev-dependencies] tempdir = "0.3.7" diff --git a/dozer-api/src/cache_builder/builder_impl.rs b/dozer-api/src/cache_builder/builder_impl.rs new file mode 100644 index 0000000000..7893dc3f4e --- /dev/null +++ b/dozer-api/src/cache_builder/builder_impl.rs @@ -0,0 +1,417 @@ +use std::sync::Arc; + +use arc_swap::ArcSwap; +use dozer_cache::{ + cache::{CacheRecord, CacheWriteOptions, CommitState, RwCache, RwCacheManager, UpsertResult}, + dozer_log::{reader::OpAndPos, replication::LogOperation}, + errors::CacheError, + CacheReader, +}; +use dozer_tracing::Labels; +use dozer_types::{ + grpc_types::types::Operation as GrpcOperation, + indicatif::ProgressBar, + log::error, + types::{Field, Operation, Record, Schema}, +}; +use metrics::{describe_counter, describe_histogram, histogram, increment_counter}; +use tokio::sync::broadcast::Sender; + +use crate::grpc::types_helper; + +use super::endpoint_meta::EndpointMeta; + +#[derive(Debug, PartialEq, Eq)] +struct CatchUpInfo { + /// The log position that's being served. Need to catch up with it. + serving_cache_next_log_position: u64, + /// The endpoint meta that triggered the catch up. + endpoint_meta: EndpointMeta, +} + +#[derive(Debug)] +pub struct CacheBuilderImpl { + cache_manager: Arc, + labels: Labels, + building: Box, + next_log_position: u64, + /// The cache that's being served. + serving: Arc>, + /// `Some`: The cache that's being built is behind the one being served, + /// because the data source (log) has a different id and we have to rebuild the cache. + /// + /// `None`: The one that's being served is the one being built. + catch_up_info: Option, + progress_bar: ProgressBar, +} + +impl CacheBuilderImpl { + /// Checks if `serving` is consistent with `endpoint_meta`. + /// If so, build the serving cache. + /// If not, build a new cache and replace `serving` when the new cache catches up with `serving`. + /// + /// Returns the builder and the log position that the building cache starts with. + pub fn new( + cache_manager: Arc, + serving: Arc>, + endpoint_meta: EndpointMeta, + labels: Labels, + cache_write_options: CacheWriteOptions, + progress_bar: ProgressBar, + ) -> Result<(CacheBuilderImpl, u64), CacheError> { + // Compare cache and log id. + let this = if serving.load().cache_name() != endpoint_meta.log_id { + let building = super::create_cache( + &*cache_manager, + endpoint_meta.clone(), + labels.clone(), + cache_write_options, + )?; + let serving_cache_next_log_position = + next_log_position(serving.load().get_commit_state()?.as_ref()); + Self { + cache_manager, + labels, + building, + next_log_position: 0, + serving, + catch_up_info: Some(CatchUpInfo { + serving_cache_next_log_position, + endpoint_meta, + }), + progress_bar, + } + } else { + let building = + super::open_cache(&*cache_manager, endpoint_meta.clone(), labels.clone())? + .expect("cache should exist"); + let next_log_position = next_log_position(building.get_commit_state()?.as_ref()); + Self { + cache_manager, + labels, + building, + next_log_position, + serving, + catch_up_info: None, + progress_bar, + } + }; + + let next_log_position = this.next_log_position; + this.progress_bar.set_position(next_log_position); + + Ok((this, next_log_position)) + } + + pub fn process_op( + &mut self, + op_and_pos: OpAndPos, + operations_sender: Option<&(String, Sender)>, + ) -> Result<(), CacheError> { + const CACHE_OPERATION_COUNTER_NAME: &str = "cache_operation"; + describe_counter!( + CACHE_OPERATION_COUNTER_NAME, + "Number of message processed by cache builder" + ); + + const DATA_LATENCY_HISTOGRAM_NAME: &str = "data_latency"; + describe_histogram!( + DATA_LATENCY_HISTOGRAM_NAME, + "End-to-end data latency in seconds" + ); + + const OPERATION_TYPE_LABEL: &str = "operation_type"; + const SNAPSHOTTING_LABEL: &str = "snapshotting"; + + assert!(op_and_pos.pos == self.next_log_position); + self.next_log_position += 1; + + self.progress_bar.set_position(self.next_log_position); + match op_and_pos.op { + LogOperation::Op { op } => match op { + Operation::Delete { old } => { + if let Some(meta) = self.building.delete(&old)? { + if let Some((endpoint_name, operations_sender)) = operations_sender { + let operation = types_helper::map_delete_operation( + endpoint_name.clone(), + CacheRecord::new(meta.id, meta.version, old), + ); + send_and_log_error(operations_sender, operation); + } + } + let mut labels = self.building.labels().clone(); + labels.push(OPERATION_TYPE_LABEL, "delete"); + labels.push( + SNAPSHOTTING_LABEL, + snapshotting_str(!self.building.is_snapshotting_done()?), + ); + increment_counter!(CACHE_OPERATION_COUNTER_NAME, labels); + } + Operation::Insert { new } => { + let result = self.building.insert(&new)?; + let mut labels = self.building.labels().clone(); + labels.push(OPERATION_TYPE_LABEL, "insert"); + labels.push( + SNAPSHOTTING_LABEL, + snapshotting_str(!self.building.is_snapshotting_done()?), + ); + increment_counter!(CACHE_OPERATION_COUNTER_NAME, labels); + + if let Some((endpoint_name, operations_sender)) = operations_sender { + send_upsert_result( + endpoint_name, + operations_sender, + result, + &self.building.get_schema().0, + None, + new, + ); + } + } + Operation::Update { old, new } => { + let upsert_result = self.building.update(&old, &new)?; + let mut labels = self.building.labels().clone(); + labels.push(OPERATION_TYPE_LABEL, "update"); + labels.push( + SNAPSHOTTING_LABEL, + snapshotting_str(!self.building.is_snapshotting_done()?), + ); + increment_counter!(CACHE_OPERATION_COUNTER_NAME, labels); + + if let Some((endpoint_name, operations_sender)) = operations_sender { + send_upsert_result( + endpoint_name, + operations_sender, + upsert_result, + &self.building.get_schema().0, + Some(old), + new, + ); + } + } + }, + LogOperation::Commit { + source_states, + decision_instant, + } => { + self.building.commit(&CommitState { + source_states, + log_position: op_and_pos.pos, + })?; + if let Ok(duration) = decision_instant.elapsed() { + histogram!( + DATA_LATENCY_HISTOGRAM_NAME, + duration, + self.building.labels().clone() + ); + } + + // See if we have caught up with the cache being served. + if let Some(catchup_info) = &self.catch_up_info { + if self.next_log_position >= catchup_info.serving_cache_next_log_position { + self.serving.store(Arc::new( + super::open_cache_reader( + &*self.cache_manager, + catchup_info.endpoint_meta.clone(), + self.labels.clone(), + )? + .expect("cache should exist"), + )); + self.catch_up_info = None; + } + } + } + LogOperation::SnapshottingDone { connection_name } => { + self.building + .set_connection_snapshotting_done(&connection_name)?; + } + } + + Ok(()) + } +} + +fn next_log_position(commit_state: Option<&CommitState>) -> u64 { + commit_state + .map(|commit_state| commit_state.log_position + 1) + .unwrap_or(0) +} + +fn send_and_log_error(sender: &Sender, msg: T) { + if let Err(e) = sender.send(msg) { + error!("Failed to send broadcast message: {}", e); + } +} + +fn snapshotting_str(snapshotting: bool) -> &'static str { + if snapshotting { + "true" + } else { + "false" + } +} + +fn send_upsert_result( + endpoint_name: &str, + operations_sender: &Sender, + upsert_result: UpsertResult, + schema: &Schema, + old: Option, + new: Record, +) { + match upsert_result { + UpsertResult::Inserted { meta } => { + let op = types_helper::map_insert_operation( + endpoint_name.to_string(), + CacheRecord::new(meta.id, meta.version, new), + ); + send_and_log_error(operations_sender, op); + } + UpsertResult::Updated { old_meta, new_meta } => { + // If `old` is `None`, it means `Updated` comes from `Insert` operation. + // In this case, we can't get the full old record, but the fields in the primary index must be the same with the new record. + // So we create the old record with only the fields in the primary index, cloned from `new`. + let old = old.unwrap_or_else(|| { + let mut record = Record::new(vec![Field::Null; new.values.len()]); + for index in schema.primary_index.iter() { + record.values[*index] = new.values[*index].clone(); + } + record + }); + let op = types_helper::map_update_operation( + endpoint_name.to_string(), + CacheRecord::new(old_meta.id, old_meta.version, old), + CacheRecord::new(new_meta.id, new_meta.version, new), + ); + send_and_log_error(operations_sender, op); + } + UpsertResult::Ignored => {} + } +} + +#[cfg(test)] +mod tests { + use std::time::SystemTime; + + use dozer_cache::{ + cache::{LmdbRwCacheManager, RwCacheManager}, + dozer_log::schemas::EndpointSchema, + }; + + use super::*; + + const INITIAL_CACHE_NAME: &str = "initial_cache_name"; + const INITIAL_LOG_POSITION: u64 = 42; + + fn create_build_impl(log_id: String) -> (CacheBuilderImpl, u64) { + let cache_manager: Arc = + Arc::new(LmdbRwCacheManager::new(Default::default()).unwrap()); + let mut cache = super::super::create_cache( + &*cache_manager, + test_endpoint_meta(INITIAL_CACHE_NAME.to_string()), + Default::default(), + Default::default(), + ) + .unwrap(); + cache + .commit(&CommitState { + source_states: Default::default(), + log_position: INITIAL_LOG_POSITION, + }) + .unwrap(); + drop(cache); + let serving = super::super::open_cache_reader( + &*cache_manager, + test_endpoint_meta(INITIAL_CACHE_NAME.to_string()), + Default::default(), + ) + .unwrap() + .unwrap(); + let (builder, start) = CacheBuilderImpl::new( + cache_manager, + Arc::new(ArcSwap::from_pointee(serving)), + test_endpoint_meta(log_id), + Default::default(), + Default::default(), + ProgressBar::hidden(), + ) + .unwrap(); + (builder, start) + } + + fn test_endpoint_meta(log_id: String) -> EndpointMeta { + EndpointMeta { + name: Default::default(), + log_id, + build_name: Default::default(), + schema: EndpointSchema { + path: Default::default(), + schema: Default::default(), + secondary_indexes: Default::default(), + enable_token: Default::default(), + enable_on_event: Default::default(), + connections: Default::default(), + }, + descriptor_bytes: Default::default(), + } + } + + #[test] + fn test_builder_impl_new() { + let (builder, start) = create_build_impl(INITIAL_CACHE_NAME.to_string()); + assert_eq!(start, INITIAL_LOG_POSITION + 1); + assert_eq!(builder.building.name(), INITIAL_CACHE_NAME); + assert_eq!(builder.next_log_position, INITIAL_LOG_POSITION + 1); + assert!(builder.catch_up_info.is_none()); + + let new_log_id = "new_log_id"; + let (builder, start) = create_build_impl(new_log_id.to_string()); + assert_eq!(start, 0); + assert_eq!(builder.building.name(), new_log_id); + assert_eq!(builder.next_log_position, 0); + assert_eq!( + builder.catch_up_info, + Some(CatchUpInfo { + serving_cache_next_log_position: INITIAL_LOG_POSITION + 1, + endpoint_meta: test_endpoint_meta(new_log_id.to_string()), + }) + ); + } + + #[test] + fn test_builder_impl_process_op() { + let new_log_id = "new_log_id"; + let (mut builder, _) = create_build_impl(new_log_id.to_string()); + + for pos in 0..INITIAL_LOG_POSITION { + builder + .process_op( + OpAndPos { + op: LogOperation::Commit { + source_states: Default::default(), + decision_instant: SystemTime::now(), + }, + pos, + }, + None, + ) + .unwrap(); + assert!(builder.catch_up_info.is_some()); + assert_eq!(builder.serving.load().cache_name(), INITIAL_CACHE_NAME); + } + builder + .process_op( + OpAndPos { + op: LogOperation::Commit { + source_states: Default::default(), + decision_instant: SystemTime::now(), + }, + pos: INITIAL_LOG_POSITION, + }, + None, + ) + .unwrap(); + assert!(builder.catch_up_info.is_none()); + assert_eq!(builder.serving.load().cache_name(), new_log_id); + } +} diff --git a/dozer-api/src/cache_builder/endpoint_meta.rs b/dozer-api/src/cache_builder/endpoint_meta.rs new file mode 100644 index 0000000000..a48327df88 --- /dev/null +++ b/dozer-api/src/cache_builder/endpoint_meta.rs @@ -0,0 +1,56 @@ +use dozer_cache::dozer_log::{reader::LogClient, schemas::EndpointSchema}; +use dozer_tracing::Labels; +use dozer_types::{ + grpc_types::internal::{ + internal_pipeline_service_client::InternalPipelineServiceClient, BuildRequest, + }, + serde_json, + tonic::transport::Channel, +}; + +use crate::{cache_alias_and_labels, errors::ApiInitError}; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct EndpointMeta { + pub name: String, + pub log_id: String, + pub build_name: String, + pub schema: EndpointSchema, + pub descriptor_bytes: Vec, +} + +impl EndpointMeta { + pub async fn load_from_client( + client: &mut InternalPipelineServiceClient, + endpoint: String, + ) -> Result<(Self, LogClient), ApiInitError> { + // We establish the log stream first to avoid tonic auto-reconnecting without us knowing. + let log_client = LogClient::new(client, endpoint.clone()).await?; + let log_id = client.get_id(()).await?.into_inner().id; + let build = client + .describe_build(BuildRequest { + endpoint: endpoint.clone(), + }) + .await? + .into_inner(); + let schema = serde_json::from_str(&build.schema_string)?; + + Ok(( + Self { + name: endpoint, + log_id, + build_name: build.name, + schema, + descriptor_bytes: build.descriptor_bytes, + }, + log_client, + )) + } + + pub fn cache_alias_and_labels(&self, extra_labels: Labels) -> (String, Labels) { + let (alias, mut labels) = + cache_alias_and_labels(self.name.clone(), self.build_name.clone()); + labels.extend(extra_labels); + (alias, labels) + } +} diff --git a/dozer-api/src/cache_builder/mod.rs b/dozer-api/src/cache_builder/mod.rs index 7f1ee38ca2..0320e09150 100644 --- a/dozer-api/src/cache_builder/mod.rs +++ b/dozer-api/src/cache_builder/mod.rs @@ -1,295 +1,295 @@ -use std::collections::HashSet; +use std::sync::Arc; use std::time::Duration; -use crate::grpc::types_helper; -use dozer_cache::cache::CommitState; -use dozer_cache::dozer_log::reader::{LogReader, LogReaderBuilder, OpAndPos}; -use dozer_cache::dozer_log::replication::LogOperation; +use crate::errors::ApiInitError; +use arc_swap::ArcSwap; +use dozer_cache::cache::{RoCache, RwCache}; +use dozer_cache::dozer_log::reader::{LogClient, LogReader, LogReaderOptions}; +use dozer_cache::CacheReader; use dozer_cache::{ - cache::{CacheRecord, CacheWriteOptions, RwCache, RwCacheManager, UpsertResult}, + cache::{CacheWriteOptions, RwCacheManager}, errors::CacheError, }; use dozer_tracing::{Labels, LabelsAndProgress}; +use dozer_types::grpc_types::internal::internal_pipeline_service_client::InternalPipelineServiceClient; use dozer_types::indicatif::ProgressBar; -use dozer_types::log::debug; -use dozer_types::types::SchemaWithIndex; -use dozer_types::{ - grpc_types::types::Operation as GrpcOperation, - log::error, - types::{Field, Operation, Record, Schema}, +use dozer_types::models::api_endpoint::{ + default_log_reader_batch_size, default_log_reader_buffer_size, + default_log_reader_timeout_in_millis, ApiEndpoint, ConflictResolution, }; -use futures_util::stream::FuturesUnordered; +use dozer_types::tonic::transport::Channel; +use dozer_types::types::SchemaWithIndex; +use dozer_types::{grpc_types::types::Operation as GrpcOperation, log::error}; use futures_util::{ future::{select, Either}, Future, }; -use metrics::{describe_counter, describe_histogram, histogram, increment_counter}; +use tokio::runtime::Runtime; use tokio::sync::broadcast::Sender; -use tokio::sync::mpsc; -use tokio_stream::StreamExt; -pub async fn build_cache( - cache: Box, - cancel: impl Future + Unpin + Send + 'static, - log_reader_builder: LogReaderBuilder, - operations_sender: Option<(String, Sender)>, - labels: LabelsAndProgress, -) -> Result<(), CacheError> { - // Create log reader. - let starting_pos = cache - .get_commit_state()? - .map(|commit_state| commit_state.log_position + 1) - .unwrap_or(0); - debug!( - "Starting log reader {} from position {starting_pos}", - log_reader_builder.options.endpoint - ); - let pb = labels.create_progress_bar(format!("cache: {}", log_reader_builder.options.endpoint)); - pb.set_position(starting_pos); - let log_reader = log_reader_builder.build(starting_pos); +const READ_LOG_RETRY_INTERVAL: Duration = Duration::from_secs(1); - // Spawn tasks - let mut futures = FuturesUnordered::new(); - let (sender, receiver) = mpsc::channel(1); - futures.push(tokio::spawn(async move { - read_log_task(cancel, log_reader, sender).await; - Ok(()) - })); - futures.push({ - tokio::task::spawn_blocking(|| build_cache_task(cache, receiver, operations_sender, pb)) - }); +#[derive(Debug)] +pub struct CacheBuilder { + client: InternalPipelineServiceClient, + cache_manager: Arc, + serving: Arc>, + labels: Labels, + cache_write_options: CacheWriteOptions, + progress_bar: ProgressBar, + log_reader_options: LogReaderOptions, +} - while let Some(result) = futures.next().await { - match result { - Ok(Ok(())) => (), - Ok(Err(e)) => return Err(e), - Err(e) => return Err(CacheError::InternalThreadPanic(e)), - } +mod builder_impl; +mod endpoint_meta; + +use builder_impl::CacheBuilderImpl; +use endpoint_meta::EndpointMeta; + +impl CacheBuilder { + pub async fn new( + cache_manager: Arc, + app_server_url: String, + endpoint: &ApiEndpoint, + labels: LabelsAndProgress, + ) -> Result<(Self, Vec), ApiInitError> { + // Connect to the endpoint's log. + let mut client = InternalPipelineServiceClient::connect(app_server_url.clone()) + .await + .map_err(|error| ApiInitError::ConnectToAppServer { + url: app_server_url, + error, + })?; + let (endpoint_meta, _) = + EndpointMeta::load_from_client(&mut client, endpoint.name.clone()).await?; + + // Open or create cache. + let cache_write_options = cache_write_options(endpoint.conflict_resolution); + let serving = open_or_create_cache_reader( + &*cache_manager, + endpoint_meta.clone(), + labels.labels().clone(), + cache_write_options, + ) + .map_err(ApiInitError::OpenOrCreateCache)?; + let progress_bar = labels.create_progress_bar(format!("cache: {}", endpoint_meta.name)); + + let log_reader_options = get_log_reader_options(endpoint); + + Ok(( + Self { + client, + cache_manager, + serving: Arc::new(ArcSwap::from_pointee(serving)), + labels: labels.labels().clone(), + cache_write_options, + progress_bar, + log_reader_options, + }, + endpoint_meta.descriptor_bytes, + )) } - Ok(()) -} + pub fn cache_reader(&self) -> &Arc> { + &self.serving + } -pub fn open_or_create_cache( - cache_manager: &dyn RwCacheManager, - labels: Labels, - schema: SchemaWithIndex, - connections: &HashSet, - write_options: CacheWriteOptions, -) -> Result, CacheError> { - match cache_manager.open_rw_cache( - labels.to_non_empty_string().into_owned(), - labels.clone(), - write_options, - )? { - Some(cache) => { - debug_assert!(cache.get_schema() == &schema); - Ok(cache) - } - None => { - let cache = cache_manager.create_cache( - labels.to_non_empty_string().into_owned(), - labels, - schema.0, - schema.1, - connections, - write_options, + pub fn run( + mut self, + runtime: Arc, + mut cancel: impl Future + Unpin + Send + 'static, + operations_sender: Option<(String, Sender)>, + ) -> Result<(), CacheError> { + loop { + // Connect to the endpoint's log. + let Some(connect_result) = runtime.block_on(with_cancel( + connect_until_success(&mut self.client, &self.log_reader_options.endpoint), + cancel, + )) else { + return Ok(()); + }; + cancel = connect_result.1; + + // Create `CacheBuilderImpl` and `LogReader`. + let (endpoint_meta, log_client) = connect_result.0; + let (mut builder, start) = CacheBuilderImpl::new( + self.cache_manager.clone(), + self.serving.clone(), + endpoint_meta.clone(), + self.labels.clone(), + self.cache_write_options, + self.progress_bar.clone(), )?; - Ok(cache) - } - } -} + let mut log_reader = LogReader::new( + endpoint_meta.schema, + log_client, + self.log_reader_options.clone(), + start, + ); -const READ_LOG_RETRY_INTERVAL: Duration = Duration::from_secs(1); + // Loop over the log until error. + loop { + let Some(read_one_result) = + runtime.block_on(with_cancel(log_reader.read_one(), cancel)) + else { + return Ok(()); + }; + cancel = read_one_result.1; -async fn read_log_task( - mut cancel: impl Future + Unpin + Send + 'static, - mut log_reader: LogReader, - sender: mpsc::Sender, -) { - loop { - let next_op = std::pin::pin!(log_reader.read_one()); - match select(cancel, next_op).await { - Either::Left(_) => break, - Either::Right((op, c)) => { - let op = match op { - Ok(op) => op, + match read_one_result.0 { + Ok(op_and_pos) => { + builder.process_op(op_and_pos, operations_sender.as_ref())?; + } Err(e) => { - error!( - "Failed to read log: {e}, retrying after {READ_LOG_RETRY_INTERVAL:?}" - ); - tokio::time::sleep(READ_LOG_RETRY_INTERVAL).await; - cancel = c; - continue; + error!("Failed to read log: {e}, reconnecting"); + break; } - }; - - cancel = c; - if sender.send(op).await.is_err() { - debug!("Stop reading log because receiver is dropped"); - break; } } } } } -fn build_cache_task( - mut cache: Box, - mut receiver: mpsc::Receiver, - operations_sender: Option<(String, Sender)>, - progress_bar: ProgressBar, -) -> Result<(), CacheError> { - let schema = cache.get_schema().0.clone(); - - const CACHE_OPERATION_COUNTER_NAME: &str = "cache_operation"; - describe_counter!( - CACHE_OPERATION_COUNTER_NAME, - "Number of message processed by cache builder" - ); - - const DATA_LATENCY_HISTOGRAM_NAME: &str = "data_latency"; - describe_histogram!( - DATA_LATENCY_HISTOGRAM_NAME, - "End-to-end data latency in seconds" - ); - - const OPERATION_TYPE_LABEL: &str = "operation_type"; - const SNAPSHOTTING_LABEL: &str = "snapshotting"; +async fn connect_until_success( + client: &mut InternalPipelineServiceClient, + endpoint: &str, +) -> (EndpointMeta, LogClient) { + loop { + match EndpointMeta::load_from_client(client, endpoint.to_string()).await { + Ok(endpoint_meta_and_log_client) => return endpoint_meta_and_log_client, + Err(e) => { + error!("Failed to reconnect: {e}, retrying after {READ_LOG_RETRY_INTERVAL:?}"); + tokio::time::sleep(READ_LOG_RETRY_INTERVAL).await; + } + } + } +} - let mut snapshotting = !cache.is_snapshotting_done()?; +async fn with_cancel, C: Future + Unpin>( + future: F, + cancel: C, +) -> Option<(T, C)> { + let future = std::pin::pin!(future); + match select(cancel, future).await { + Either::Left(_) => None, + Either::Right((result, c)) => Some((result, c)), + } +} - while let Some(op_and_pos) = receiver.blocking_recv() { - progress_bar.set_position(op_and_pos.pos + 1); - match op_and_pos.op { - LogOperation::Op { op } => match op { - Operation::Delete { old } => { - if let Some(meta) = cache.delete(&old)? { - if let Some((endpoint_name, operations_sender)) = operations_sender.as_ref() - { - let operation = types_helper::map_delete_operation( - endpoint_name.clone(), - CacheRecord::new(meta.id, meta.version, old), - ); - send_and_log_error(operations_sender, operation); - } - } - let mut labels = cache.labels().clone(); - labels.push(OPERATION_TYPE_LABEL, "delete"); - labels.push(SNAPSHOTTING_LABEL, snapshotting_str(snapshotting)); - increment_counter!(CACHE_OPERATION_COUNTER_NAME, labels); - } - Operation::Insert { new } => { - let result = cache.insert(&new)?; - let mut labels = cache.labels().clone(); - labels.push(OPERATION_TYPE_LABEL, "insert"); - labels.push(SNAPSHOTTING_LABEL, snapshotting_str(snapshotting)); - increment_counter!(CACHE_OPERATION_COUNTER_NAME, labels); +fn cache_write_options(conflict_resolution: ConflictResolution) -> CacheWriteOptions { + CacheWriteOptions { + insert_resolution: conflict_resolution.on_insert, + delete_resolution: conflict_resolution.on_delete, + update_resolution: conflict_resolution.on_update, + ..Default::default() + } +} - if let Some((endpoint_name, operations_sender)) = operations_sender.as_ref() { - send_upsert_result( - endpoint_name, - operations_sender, - result, - &schema, - None, - new, - ); - } - } - Operation::Update { old, new } => { - let upsert_result = cache.update(&old, &new)?; - let mut labels = cache.labels().clone(); - labels.push(OPERATION_TYPE_LABEL, "update"); - labels.push(SNAPSHOTTING_LABEL, snapshotting_str(snapshotting)); - increment_counter!(CACHE_OPERATION_COUNTER_NAME, labels); +fn open_or_create_cache_reader( + cache_manager: &dyn RwCacheManager, + endpoint_meta: EndpointMeta, + labels: Labels, + write_options: CacheWriteOptions, +) -> Result { + if let Some(reader) = open_cache_reader(cache_manager, endpoint_meta.clone(), labels.clone())? { + Ok(reader) + } else { + create_cache( + cache_manager, + endpoint_meta.clone(), + labels.clone(), + write_options, + )?; + Ok(open_cache_reader(cache_manager, endpoint_meta, labels)? + .expect("cache was just created")) + } +} - if let Some((endpoint_name, operations_sender)) = operations_sender.as_ref() { - send_upsert_result( - endpoint_name, - operations_sender, - upsert_result, - &schema, - Some(old), - new, - ); - } - } - }, - LogOperation::Commit { - source_states, - decision_instant, - } => { - cache.commit(&CommitState { - source_states, - log_position: op_and_pos.pos, - })?; - if let Ok(duration) = decision_instant.elapsed() { - histogram!( - DATA_LATENCY_HISTOGRAM_NAME, - duration, - cache.labels().clone() - ); - } - } - LogOperation::SnapshottingDone { connection_name } => { - cache.set_connection_snapshotting_done(&connection_name)?; - snapshotting = !cache.is_snapshotting_done()?; - } - } +fn open_cache( + cache_manager: &dyn RwCacheManager, + endpoint_meta: EndpointMeta, + labels: Labels, +) -> Result>, CacheError> { + let (alias, cache_labels) = endpoint_meta.cache_alias_and_labels(labels); + let cache = cache_manager.open_rw_cache(alias.clone(), cache_labels, Default::default())?; + if let Some(cache) = cache.as_ref() { + check_cache_schema( + cache.as_ro(), + ( + endpoint_meta.schema.schema, + endpoint_meta.schema.secondary_indexes, + ), + )?; } + Ok(cache) +} - Ok(()) +fn create_cache( + cache_manager: &dyn RwCacheManager, + endpoint_meta: EndpointMeta, + labels: Labels, + write_options: CacheWriteOptions, +) -> Result, CacheError> { + let (alias, cache_labels) = endpoint_meta.cache_alias_and_labels(labels); + let cache = cache_manager.create_cache( + endpoint_meta.log_id.clone(), + cache_labels, + ( + endpoint_meta.schema.schema, + endpoint_meta.schema.secondary_indexes, + ), + &endpoint_meta.schema.connections, + write_options, + )?; + cache_manager.create_alias(&endpoint_meta.log_id, &alias)?; + Ok(cache) } -fn send_upsert_result( - endpoint_name: &str, - operations_sender: &Sender, - upsert_result: UpsertResult, - schema: &Schema, - old: Option, - new: Record, -) { - match upsert_result { - UpsertResult::Inserted { meta } => { - let op = types_helper::map_insert_operation( - endpoint_name.to_string(), - CacheRecord::new(meta.id, meta.version, new), - ); - send_and_log_error(operations_sender, op); - } - UpsertResult::Updated { old_meta, new_meta } => { - // If `old` is `None`, it means `Updated` comes from `Insert` operation. - // In this case, we can't get the full old record, but the fields in the primary index must be the same with the new record. - // So we create the old record with only the fields in the primary index, cloned from `new`. - let old = old.unwrap_or_else(|| { - let mut record = Record::new(vec![Field::Null; new.values.len()]); - for index in schema.primary_index.iter() { - record.values[*index] = new.values[*index].clone(); - } - record - }); - let op = types_helper::map_update_operation( - endpoint_name.to_string(), - CacheRecord::new(old_meta.id, old_meta.version, old), - CacheRecord::new(new_meta.id, new_meta.version, new), - ); - send_and_log_error(operations_sender, op); - } - UpsertResult::Ignored => {} +fn open_cache_reader( + cache_manager: &dyn RwCacheManager, + endpoint_meta: EndpointMeta, + labels: Labels, +) -> Result, CacheError> { + let (alias, cache_labels) = endpoint_meta.cache_alias_and_labels(labels); + let cache = cache_manager.open_ro_cache(alias.clone(), cache_labels)?; + if let Some(cache) = cache.as_ref() { + check_cache_schema( + &**cache, + ( + endpoint_meta.schema.schema, + endpoint_meta.schema.secondary_indexes, + ), + )?; } + Ok(cache.map(CacheReader::new)) } -fn send_and_log_error(sender: &Sender, msg: T) { - if let Err(e) = sender.send(msg) { - error!("Failed to send broadcast message: {}", e); +fn check_cache_schema(cache: &dyn RoCache, given: SchemaWithIndex) -> Result<(), CacheError> { + let stored = cache.get_schema(); + if &given != stored { + return Err(CacheError::SchemaMismatch { + name: cache.name().to_string(), + given: Box::new(given), + stored: Box::new(stored.clone()), + }); } + Ok(()) } -fn snapshotting_str(snapshotting: bool) -> &'static str { - if snapshotting { - "true" - } else { - "false" +fn get_log_reader_options(endpoint: &ApiEndpoint) -> LogReaderOptions { + LogReaderOptions { + endpoint: endpoint.name.clone(), + batch_size: endpoint + .log_reader_options + .batch_size + .unwrap_or_else(default_log_reader_batch_size), + timeout_in_millis: endpoint + .log_reader_options + .timeout_in_millis + .unwrap_or_else(default_log_reader_timeout_in_millis), + buffer_size: endpoint + .log_reader_options + .buffer_size + .unwrap_or_else(default_log_reader_buffer_size), } } diff --git a/dozer-api/src/errors.rs b/dozer-api/src/errors.rs index 33a1206474..5a7363fbaa 100644 --- a/dozer-api/src/errors.rs +++ b/dozer-api/src/errors.rs @@ -1,4 +1,3 @@ -#![allow(clippy::enum_variant_names)] use std::net::{AddrParseError, SocketAddr}; use std::path::PathBuf; @@ -10,7 +9,7 @@ use dozer_tracing::Labels; use dozer_types::errors::internal::BoxedError; use dozer_types::errors::types::{CannotConvertF64ToJson, TypeError}; use dozer_types::thiserror::Error; -use dozer_types::{serde_json, thiserror}; +use dozer_types::{bincode, serde_json, thiserror, tonic}; use dozer_cache::errors::CacheError; use handlebars::{RenderError, TemplateError}; @@ -22,8 +21,22 @@ pub enum ApiInitError { Grpc(#[from] GrpcError), #[error("Generation error: {0}")] Generation(#[from] GenerationError), - #[error("Failed to create log reader builder: {0}")] - CreateLogReaderBuilder(#[from] ReaderBuilderError), + #[error("Failed to create log reader")] + ReaderBuilder(#[from] ReaderBuilderError), + #[error("Failed to connect to app server {url}: {error}")] + ConnectToAppServer { + url: String, + #[source] + error: tonic::transport::Error, + }, + #[error("Failed to get log metadata: {0}")] + GetLogMetadata(#[from] tonic::Status), + #[error("Query cache: {0}")] + GetCacheCommitState(#[source] CacheError), + #[error("Failed to parse endpoint schema: {0}")] + ParseEndpointSchema(#[from] serde_json::Error), + #[error("Failed to parse checkpoint: {0}")] + ParseCheckpoint(#[from] bincode::Error), #[error("Failed to open or create cache: {0}")] OpenOrCreateCache(#[source] CacheError), #[error("Failed to find cache: {0}")] diff --git a/dozer-api/src/grpc/internal/internal_pipeline_server.rs b/dozer-api/src/grpc/internal/internal_pipeline_server.rs index 0f5ddb27f2..4befe9b7e9 100644 --- a/dozer-api/src/grpc/internal/internal_pipeline_server.rs +++ b/dozer-api/src/grpc/internal/internal_pipeline_server.rs @@ -7,7 +7,8 @@ use dozer_types::grpc_types::internal::internal_pipeline_service_server::{ }; use dozer_types::grpc_types::internal::{ BuildRequest, BuildResponse, DescribeApplicationRequest, DescribeApplicationResponse, - EndpointResponse, EndpointsResponse, LogRequest, LogResponse, StorageRequest, StorageResponse, + EndpointResponse, EndpointsResponse, GetIdResponse, LogRequest, LogResponse, StorageRequest, + StorageResponse, }; use dozer_types::log::info; use dozer_types::models::api_config::{ @@ -38,17 +39,27 @@ pub struct LogEndpoint { #[derive(Debug)] pub struct InternalPipelineServer { + id: String, endpoints: HashMap, } impl InternalPipelineServer { pub fn new(endpoints: HashMap) -> Self { - Self { endpoints } + Self { + id: uuid::Uuid::new_v4().to_string(), + endpoints, + } } } #[tonic::async_trait] impl InternalPipelineService for InternalPipelineServer { + async fn get_id(&self, _: Request<()>) -> Result, Status> { + Ok(Response::new(GetIdResponse { + id: self.id.clone(), + })) + } + async fn describe_storage( &self, request: Request, diff --git a/dozer-api/src/lib.rs b/dozer-api/src/lib.rs index 1fda4ede40..67ac65a145 100644 --- a/dozer-api/src/lib.rs +++ b/dozer-api/src/lib.rs @@ -1,19 +1,8 @@ use arc_swap::ArcSwap; -use cache_builder::open_or_create_cache; -use dozer_cache::{ - cache::{CacheWriteOptions, RwCacheManager}, - dozer_log::reader::{LogReaderBuilder, LogReaderOptions}, - errors::CacheError, - CacheReader, -}; +use cache_builder::CacheBuilder; +use dozer_cache::{cache::RwCacheManager, errors::CacheError, CacheReader}; use dozer_tracing::{Labels, LabelsAndProgress}; -use dozer_types::{ - grpc_types::types::Operation, - models::api_endpoint::{ - default_log_reader_batch_size, default_log_reader_buffer_size, - default_log_reader_timeout_in_millis, ApiEndpoint, - }, -}; +use dozer_types::{grpc_types::types::Operation, models::api_endpoint::ApiEndpoint}; use futures_util::Future; use std::{ops::Deref, sync::Arc}; @@ -25,7 +14,7 @@ pub use api_helper::get_api_security; #[derive(Debug)] pub struct CacheEndpoint { - cache_reader: ArcSwap, + cache_reader: Arc>, descriptor: Vec, endpoint: ApiEndpoint, } @@ -35,61 +24,29 @@ const BUILD_LABEL: &str = "build"; impl CacheEndpoint { pub async fn new( - app_server_addr: String, - cache_manager: &dyn RwCacheManager, + runtime: Arc, + app_server_url: String, + cache_manager: Arc, endpoint: ApiEndpoint, cancel: impl Future + Unpin + Send + 'static, operations_sender: Option>, labels: LabelsAndProgress, ) -> Result<(Self, JoinHandle>), ApiInitError> { - // Create log reader builder. - let log_reader_builder = - LogReaderBuilder::new(app_server_addr, get_log_reader_options(&endpoint)).await?; - let descriptor = log_reader_builder.descriptor.clone(); - - // Open or create cache. - let mut cache_labels = - cache_labels(endpoint.name.clone(), log_reader_builder.build_name.clone()); - cache_labels.extend(labels.labels().clone()); - let schema = log_reader_builder.schema.clone(); - let conflict_resolution = endpoint.conflict_resolution; - let write_options = CacheWriteOptions { - insert_resolution: conflict_resolution.on_insert, - delete_resolution: conflict_resolution.on_delete, - update_resolution: conflict_resolution.on_update, - ..Default::default() - }; - let cache = open_or_create_cache( - cache_manager, - cache_labels.clone(), - (schema.schema, schema.secondary_indexes), - &schema.connections, - write_options, - ) - .map_err(ApiInitError::OpenOrCreateCache)?; - - // Open cache reader. - let cache_reader = - open_cache_reader(cache_manager, cache_labels)?.expect("We just created the cache"); + let (cache_builder, descriptor) = + CacheBuilder::new(cache_manager, app_server_url, &endpoint, labels).await?; + let cache_reader = cache_builder.cache_reader().clone(); // Start cache builder. let handle = { let operations_sender = operations_sender.map(|sender| (endpoint.name.clone(), sender)); - tokio::spawn(async move { - cache_builder::build_cache( - cache, - cancel, - log_reader_builder, - operations_sender, - labels, - ) - .await - }) + let runtime_clone = runtime.clone(); + runtime_clone + .spawn_blocking(move || cache_builder.run(runtime, cancel, operations_sender)) }; Ok(( Self { - cache_reader: ArcSwap::from_pointee(cache_reader), + cache_reader, descriptor, endpoint, }, @@ -105,7 +62,10 @@ impl CacheEndpoint { let mut labels = Labels::new(); labels.push(endpoint.name.clone(), endpoint.name.clone()); Ok(Self { - cache_reader: ArcSwap::from_pointee(open_existing_cache_reader(cache_manager, labels)?), + cache_reader: Arc::new(ArcSwap::from_pointee(open_existing_cache_reader( + cache_manager, + labels, + )?)), descriptor, endpoint, }) @@ -124,11 +84,11 @@ impl CacheEndpoint { } } -pub fn cache_labels(endpoint: String, build: String) -> Labels { +pub fn cache_alias_and_labels(endpoint: String, build: String) -> (String, Labels) { let mut labels = Labels::new(); labels.push(ENDPOINT_LABEL, endpoint); labels.push(BUILD_LABEL, build); - labels + (labels.to_non_empty_string().into_owned(), labels) } fn open_cache_reader( @@ -149,24 +109,6 @@ fn open_existing_cache_reader( .ok_or_else(|| ApiInitError::CacheNotFound(labels)) } -fn get_log_reader_options(endpoint: &ApiEndpoint) -> LogReaderOptions { - LogReaderOptions { - endpoint: endpoint.name.clone(), - batch_size: endpoint - .log_reader_options - .batch_size - .unwrap_or_else(default_log_reader_batch_size), - timeout_in_millis: endpoint - .log_reader_options - .timeout_in_millis - .unwrap_or_else(default_log_reader_timeout_in_millis), - buffer_size: endpoint - .log_reader_options - .buffer_size - .unwrap_or_else(default_log_reader_buffer_size), - } -} - // Exports pub mod auth; mod cache_builder; @@ -186,7 +128,7 @@ pub use dozer_types::tonic; use errors::ApiInitError; pub use openapiv3; pub use tokio; -use tokio::{sync::broadcast::Sender, task::JoinHandle}; +use tokio::{runtime::Runtime, sync::broadcast::Sender, task::JoinHandle}; pub use tracing_actix_web; #[cfg(test)] mod test_utils; diff --git a/dozer-api/src/test_utils.rs b/dozer-api/src/test_utils.rs index 48ef76e4e6..ea5af4c09f 100644 --- a/dozer-api/src/test_utils.rs +++ b/dozer-api/src/test_utils.rs @@ -112,8 +112,7 @@ pub fn initialize_cache( .create_cache( labels.to_non_empty_string().into_owned(), labels, - schema, - secondary_indexes, + (schema, secondary_indexes), &Default::default(), Default::default(), ) diff --git a/dozer-cache/benches/cache.rs b/dozer-cache/benches/cache.rs index 9b38d73b90..7fb773e83c 100644 --- a/dozer-cache/benches/cache.rs +++ b/dozer-cache/benches/cache.rs @@ -60,8 +60,7 @@ fn cache(c: &mut Criterion) { .create_cache( "temp".to_string(), Default::default(), - schema, - secondary_indexes, + (schema, secondary_indexes), &Default::default(), Default::default(), ) diff --git a/dozer-cache/src/cache/lmdb/cache/mod.rs b/dozer-cache/src/cache/lmdb/cache/mod.rs index 3b79165828..925cdf88f7 100644 --- a/dozer-cache/src/cache/lmdb/cache/mod.rs +++ b/dozer-cache/src/cache/lmdb/cache/mod.rs @@ -193,6 +193,10 @@ impl RwCache for LmdbRwCache { self.indexing_thread_pool.lock().wake(self.labels()); Ok(()) } + + fn as_ro(&self) -> &dyn RoCache { + self + } } pub trait LmdbCache: Send + Sync + Debug { diff --git a/dozer-cache/src/cache/lmdb/cache_manager.rs b/dozer-cache/src/cache/lmdb/cache_manager.rs index 96d6c7aa29..ee23a6fcc9 100644 --- a/dozer-cache/src/cache/lmdb/cache_manager.rs +++ b/dozer-cache/src/cache/lmdb/cache_manager.rs @@ -6,11 +6,9 @@ use dozer_storage::{lmdb_storage::LmdbEnvironmentManager, LmdbMap, RwLmdbEnviron use dozer_storage::{LmdbEnvironment, RoLmdbEnvironment}; use dozer_tracing::Labels; use dozer_types::borrow::IntoOwned; +use dozer_types::parking_lot::Mutex; use dozer_types::parking_lot::RwLock; -use dozer_types::{ - parking_lot::Mutex, - types::{IndexDefinition, Schema}, -}; +use dozer_types::types::SchemaWithIndex; use tempdir::TempDir; use tokio::io::AsyncRead; @@ -240,8 +238,7 @@ impl RwCacheManager for LmdbRwCacheManager { &self, name: String, labels: Labels, - schema: Schema, - indexes: Vec, + schema: SchemaWithIndex, connections: &HashSet, write_options: CacheWriteOptions, ) -> Result, CacheError> { @@ -250,7 +247,7 @@ impl RwCacheManager for LmdbRwCacheManager { } let cache = LmdbRwCache::new( - Some(&(schema, indexes)), + Some(&schema), Some(connections), cache_options(&self.options, self.base_path.clone(), name, labels), write_options, @@ -326,8 +323,7 @@ mod tests { .create_cache( "temp".to_string(), Default::default(), - Schema::default(), - vec![], + Default::default(), &Default::default(), Default::default(), ) diff --git a/dozer-cache/src/cache/mod.rs b/dozer-cache/src/cache/mod.rs index e8cf085cae..5a50e2efcd 100644 --- a/dozer-cache/src/cache/mod.rs +++ b/dozer-cache/src/cache/mod.rs @@ -11,7 +11,7 @@ use dozer_types::models::api_endpoint::{ use dozer_types::node::SourceStates; use dozer_types::{ serde::{Deserialize, Serialize}, - types::{IndexDefinition, Record, Schema, SchemaWithIndex}, + types::{Record, SchemaWithIndex}, }; pub use lmdb::cache_manager::{ begin_dump_txn, dump, CacheManagerOptions, LmdbRoCacheManager, LmdbRwCacheManager, @@ -74,8 +74,7 @@ pub trait RwCacheManager: RoCacheManager { &self, name: String, labels: Labels, - schema: Schema, - indexes: Vec, + schema: SchemaWithIndex, connections: &HashSet, write_options: CacheWriteOptions, ) -> Result, CacheError>; @@ -163,4 +162,7 @@ pub trait RwCache: RoCache { /// Commits the current transaction. fn commit(&mut self, state: &CommitState) -> Result<(), CacheError>; + + /// Upcast. + fn as_ro(&self) -> &dyn RoCache; } diff --git a/dozer-cache/src/reader.rs b/dozer-cache/src/reader.rs index 5095a703c4..d1e7c645f1 100644 --- a/dozer-cache/src/reader.rs +++ b/dozer-cache/src/reader.rs @@ -1,4 +1,4 @@ -use crate::cache::{expression::QueryExpression, CacheRecord, RoCache}; +use crate::cache::{expression::QueryExpression, CacheRecord, CommitState, RoCache}; use super::cache::expression::FilterExpression; use crate::errors::CacheError; @@ -44,6 +44,14 @@ impl CacheReader { Ok(()) } + pub fn cache_name(&self) -> &str { + self.cache.name() + } + + pub fn get_commit_state(&self) -> Result, CacheError> { + self.cache.get_commit_state() + } + pub fn get_schema(&self) -> &SchemaWithIndex { self.cache.get_schema() } diff --git a/dozer-cli/src/simple/build/contract/mod.rs b/dozer-cli/src/simple/build/contract/mod.rs index 0f043b40c6..e2cb860bc1 100644 --- a/dozer-cli/src/simple/build/contract/mod.rs +++ b/dozer-cli/src/simple/build/contract/mod.rs @@ -229,7 +229,6 @@ where impl PartialEq for PipelineContract { fn eq(&self, other: &PipelineContract) -> bool { - dbg!(self, other); is_isomorphic_matching( self.0.graph(), other.0.graph(), diff --git a/dozer-cli/src/simple/orchestrator.rs b/dozer-cli/src/simple/orchestrator.rs index e6a0cdf449..67e784389e 100644 --- a/dozer-cli/src/simple/orchestrator.rs +++ b/dozer-cli/src/simple/orchestrator.rs @@ -94,7 +94,7 @@ impl SimpleOrchestrator { }; let internal_grpc_config = &self.config.api.app_grpc; - let app_server_addr = format!( + let app_server_url = format!( "http://{}:{}", internal_grpc_config .host @@ -115,8 +115,9 @@ impl SimpleOrchestrator { // If we're shutting down, the cache endpoint will fail to connect _shutdown_future = shutdown.create_shutdown_future() => return Ok(()), result = CacheEndpoint::new( - app_server_addr.clone(), - &*cache_manager, + self.runtime.clone(), + app_server_url.clone(), + cache_manager.clone(), endpoint.clone(), Box::pin(shutdown.create_shutdown_future()), operations_sender.clone(), diff --git a/dozer-log/src/errors.rs b/dozer-log/src/errors.rs index 159b8a935b..a44a73a73f 100644 --- a/dozer-log/src/errors.rs +++ b/dozer-log/src/errors.rs @@ -17,6 +17,10 @@ pub enum ReaderBuilderError { #[derive(Debug, Error)] pub enum ReaderError { + #[error("Log stream ended")] + LogStreamEnded, + #[error("Tonic error: {0}")] + Tonic(#[from] tonic::Status), #[error("Failed to deserialize log response: {0}")] DeserializeLogResponse(#[source] bincode::Error), #[error("Failed to load persisted log entry: {0}")] diff --git a/dozer-log/src/reader.rs b/dozer-log/src/reader.rs index 4e705deeba..acdf6f5745 100644 --- a/dozer-log/src/reader.rs +++ b/dozer-log/src/reader.rs @@ -8,7 +8,7 @@ use dozer_types::grpc_types::internal::internal_pipeline_service_client::Interna use dozer_types::grpc_types::internal::{ storage_response, BuildRequest, LogRequest, LogResponse, StorageRequest, }; -use dozer_types::log::{debug, error}; +use dozer_types::log::debug; use dozer_types::models::api_endpoint::{ default_log_reader_batch_size, default_log_reader_buffer_size, default_log_reader_timeout_in_millis, @@ -22,7 +22,7 @@ use tokio::task::JoinHandle; use tokio_stream::wrappers::ReceiverStream; use tokio_stream::StreamExt; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct LogReaderOptions { pub endpoint: String, pub batch_size: u32, @@ -53,13 +53,10 @@ pub struct LogReaderBuilder { client: LogClient, } +#[derive(Debug)] pub struct LogReader { - /// Log server runs on a specific build of the endpoint. This is the name of the build. - pub build_name: String, /// Schema of this endpoint. pub schema: EndpointSchema, - /// Protobuf descriptor of this endpoint's API. - pub descriptor: Vec, op_receiver: Receiver, worker: Option>>, } @@ -69,7 +66,7 @@ impl LogReaderBuilder { server_addr: String, options: LogReaderOptions, ) -> Result { - let mut client = Self::get_client(server_addr).await?; + let mut client = InternalPipelineServiceClient::connect(server_addr).await?; let build = client .describe_build(BuildRequest { endpoint: options.endpoint.clone(), @@ -79,7 +76,7 @@ impl LogReaderBuilder { let build_name = build.name; let schema = serde_json::from_str(&build.schema_string)?; - let client = LogClient::new(client, options.endpoint.clone()).await?; + let client = LogClient::new(&mut client, options.endpoint.clone()).await?; Ok(Self { build_name, @@ -90,31 +87,15 @@ impl LogReaderBuilder { }) } - pub async fn get_client( - server_addr: String, - ) -> Result, ReaderBuilderError> { - let client = InternalPipelineServiceClient::connect(server_addr).await?; - Ok(client) - } - pub fn build(self, start: u64) -> LogReader { let LogReaderBuilder { - build_name, schema, - descriptor, client, options, + .. } = self; - let (op_sender, op_receiver) = tokio::sync::mpsc::channel(options.buffer_size as usize); - let worker = tokio::spawn(log_reader_worker(client, start, options, op_sender)); - LogReader { - build_name, - schema, - descriptor, - op_receiver, - worker: Some(worker), - } + LogReader::new(schema, client, options, start) } } @@ -125,6 +106,21 @@ pub struct OpAndPos { } impl LogReader { + pub fn new( + schema: EndpointSchema, + client: LogClient, + options: LogReaderOptions, + start: u64, + ) -> Self { + let (op_sender, op_receiver) = tokio::sync::mpsc::channel(options.buffer_size as usize); + let worker = tokio::spawn(log_reader_worker(client, start, options, op_sender)); + LogReader { + schema, + op_receiver, + worker: Some(worker), + } + } + pub async fn read_one(&mut self) -> Result { if let Some(result) = self.op_receiver.recv().await { Ok(result) @@ -143,18 +139,19 @@ impl LogReader { } #[derive(Debug)] -struct LogClient { - client: InternalPipelineServiceClient, +pub struct LogClient { request_sender: Sender, response_stream: Streaming, storage: Box, } impl LogClient { - async fn new( - mut client: InternalPipelineServiceClient, + pub async fn new( + client: &mut InternalPipelineServiceClient, endpoint: String, ) -> Result { + let (request_sender, response_stream) = create_get_log_stream(client).await?; + let storage = client .describe_storage(StorageRequest { endpoint }) .await? @@ -168,10 +165,7 @@ impl LogClient { } }; - let (request_sender, response_stream) = create_get_log_stream(&mut client).await?; - Ok(Self { - client, request_sender, response_stream, storage, @@ -180,35 +174,12 @@ impl LogClient { async fn get_log(&mut self, request: LogRequest) -> Result, ReaderError> { // Send the request. - let response = loop { - match call_get_log_once( - &self.request_sender, - request.clone(), - &mut self.response_stream, - ) - .await - { - Ok(response) => break response, - Err(e) => { - error!("Error getting log: {:?}", e); - (self.request_sender, self.response_stream) = loop { - match create_get_log_stream(&mut self.client).await { - Ok((request_sender, response_stream)) => { - break (request_sender, response_stream) - } - Err(e) => { - const RETRY_INTERVAL: std::time::Duration = - std::time::Duration::from_secs(5); - error!( - "Error creating log stream: {e}, retrying after {RETRY_INTERVAL:?}..." - ); - tokio::time::sleep(RETRY_INTERVAL).await; - } - } - } - } - } - }; + let response = call_get_log_once( + &self.request_sender, + request.clone(), + &mut self.response_stream, + ) + .await?; use crate::replication::LogResponse; let response: LogResponse = bincode::deserialize(&response.data).map_err(ReaderError::DeserializeLogResponse)?; @@ -252,14 +223,14 @@ async fn call_get_log_once( request_sender: &Sender, request: LogRequest, stream: &mut Streaming, -) -> Result> { +) -> Result { if request_sender.send(request).await.is_err() { - return Err(None); + return Err(ReaderError::LogStreamEnded); } match stream.next().await { Some(Ok(response)) => Ok(response), - Some(Err(e)) => Err(Some(e)), - None => Err(None), + Some(Err(e)) => Err(ReaderError::Tonic(e)), + None => Err(ReaderError::LogStreamEnded), } } diff --git a/dozer-tests/src/cache_tests/film/load_database.rs b/dozer-tests/src/cache_tests/film/load_database.rs index 4f96ad4132..bd917951a9 100644 --- a/dozer-tests/src/cache_tests/film/load_database.rs +++ b/dozer-tests/src/cache_tests/film/load_database.rs @@ -23,8 +23,7 @@ pub async fn load_database( .create_cache( labels.to_non_empty_string().into_owned(), labels.clone(), - schema.clone(), - secondary_indexes, + (schema.clone(), secondary_indexes), &Default::default(), Default::default(), ) diff --git a/dozer-types/protos/internal.proto b/dozer-types/protos/internal.proto index a3d5b06f48..54d7a3197d 100644 --- a/dozer-types/protos/internal.proto +++ b/dozer-types/protos/internal.proto @@ -5,6 +5,9 @@ package dozer.internal; import "google/protobuf/empty.proto"; service InternalPipelineService { + /// Get a running server's uuid. The id never changes. Different servers will have different ids. + rpc GetId(google.protobuf.Empty) returns (GetIdResponse); + rpc DescribeStorage(StorageRequest) returns (StorageResponse); rpc ListEndpoints(google.protobuf.Empty) returns (EndpointsResponse); rpc DescribeBuild(BuildRequest) returns (BuildResponse); @@ -13,6 +16,10 @@ service InternalPipelineService { rpc GetLog(stream LogRequest) returns (stream LogResponse); } +message GetIdResponse { + string id = 1; +} + message StorageRequest { string endpoint = 1; }