diff --git a/Cargo.lock b/Cargo.lock index 5944557aa0..ba4a79503a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -721,9 +721,9 @@ dependencies = [ [[package]] name = "async-stream" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad445822218ce64be7a341abfb0b1ea43b5c23aa83902542a4542e78309d8e5e" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" dependencies = [ "async-stream-impl", "futures-core", @@ -732,13 +732,13 @@ dependencies = [ [[package]] name = "async-stream-impl" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4655ae1a7b0cdf149156f780c5bf3f1352bc53cbd9e0a361a7ef7b22947e965" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.29", ] [[package]] @@ -2598,6 +2598,7 @@ dependencies = [ "actix-web", "actix-web-httpauth", "arc-swap", + "async-stream", "async-trait", "bytes", "dozer-cache", diff --git a/dozer-api/Cargo.toml b/dozer-api/Cargo.toml index d0446b3c02..894eb9c158 100644 --- a/dozer-api/Cargo.toml +++ b/dozer-api/Cargo.toml @@ -50,6 +50,7 @@ http-body = "0.4.5" bytes = "1.4.0" http = "0.2.9" pin-project = "1.1.3" +async-stream = "0.3.5" [dev-dependencies] tempdir = "0.3.7" diff --git a/dozer-api/src/grpc/internal/internal_pipeline_server.rs b/dozer-api/src/grpc/internal/internal_pipeline_server.rs index f09053f2a6..1e7b4d9745 100644 --- a/dozer-api/src/grpc/internal/internal_pipeline_server.rs +++ b/dozer-api/src/grpc/internal/internal_pipeline_server.rs @@ -1,3 +1,4 @@ +use async_stream::stream; use dozer_cache::dozer_log::home_dir::BuildId; use dozer_cache::dozer_log::replication::{Log, LogResponseFuture}; use dozer_types::bincode; @@ -11,16 +12,15 @@ use dozer_types::grpc_types::internal::{ use dozer_types::log::info; use dozer_types::models::api_config::AppGrpcOptions; use dozer_types::models::api_endpoint::ApiEndpoint; -use dozer_types::parking_lot::Mutex; use dozer_types::tonic::transport::server::TcpIncoming; use dozer_types::tonic::transport::Server; use dozer_types::tonic::{self, Request, Response, Status, Streaming}; -use futures_util::future::Either; use futures_util::stream::BoxStream; -use futures_util::{Future, StreamExt, TryStreamExt}; +use futures_util::{Future, StreamExt}; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; +use tokio::sync::Mutex; use crate::errors::GrpcError; use crate::grpc::run_server; @@ -52,7 +52,7 @@ impl InternalPipelineService for InternalPipelineServer { ) -> Result, Status> { let endpoint = request.into_inner().endpoint; let log = &find_log_endpoint(&self.endpoints, &endpoint)?.log; - let storage = log.lock().describe_storage(); + let storage = log.lock().await.describe_storage(); Ok(Response::new(StorageResponse { storage: Some(storage), })) @@ -103,26 +103,34 @@ impl InternalPipelineService for InternalPipelineServer { requests: Request>, ) -> Result, Status> { let endpoints = self.endpoints.clone(); - Ok(Response::new( - requests - .into_inner() - .and_then(move |request| { - let log = &match find_log_endpoint(&endpoints, &request.endpoint) { - Ok(log) => log, - Err(e) => return Either::Left(std::future::ready(Err(e))), + let requests = requests.into_inner(); + let stream = stream! { + for await request in requests { + let request = match request { + Ok(request) => request, + Err(e) => { + yield Err(e); + continue; } - .log; - - let response = log.lock().read( - request.start as usize..request.end as usize, - Duration::from_millis(request.timeout_in_millis as u64), - log.clone(), - ); - - Either::Right(serialize_log_response(response)) - }) - .boxed(), - )) + }; + + let log = &match find_log_endpoint(&endpoints, &request.endpoint) { + Ok(log) => log, + Err(e) => { + yield Err(e); + continue; + } + }.log; + + let response = log.lock().await.read( + request.start as usize..request.end as usize, + Duration::from_millis(request.timeout_in_millis as u64), + log.clone(), + ).await; + yield serialize_log_response(response).await; + } + }; + Ok(Response::new(stream.boxed())) } } diff --git a/dozer-cli/src/pipeline/builder.rs b/dozer-cli/src/pipeline/builder.rs index ce393ae261..4229804321 100644 --- a/dozer-cli/src/pipeline/builder.rs +++ b/dozer-cli/src/pipeline/builder.rs @@ -18,9 +18,9 @@ use dozer_types::models::connection::Connection; use dozer_types::models::flags::Flags; use dozer_types::models::source::Source; use dozer_types::models::udf_config::UdfConfig; -use dozer_types::parking_lot::Mutex; use std::hash::Hash; use tokio::runtime::Runtime; +use tokio::sync::Mutex; use crate::pipeline::dummy_sink::DummySinkFactory; use crate::pipeline::LogSinkFactory; diff --git a/dozer-cli/src/pipeline/log_sink.rs b/dozer-cli/src/pipeline/log_sink.rs index f04bf00952..06fafc9169 100644 --- a/dozer-cli/src/pipeline/log_sink.rs +++ b/dozer-cli/src/pipeline/log_sink.rs @@ -12,10 +12,10 @@ use dozer_core::{ }; use dozer_recordstore::ProcessorRecordStore; use dozer_tracing::LabelsAndProgress; +use dozer_types::errors::internal::BoxedError; use dozer_types::indicatif::ProgressBar; use dozer_types::types::Schema; -use dozer_types::{errors::internal::BoxedError, parking_lot::Mutex}; -use tokio::runtime::Runtime; +use tokio::{runtime::Runtime, sync::Mutex}; #[derive(Debug)] pub struct LogSinkFactory { @@ -69,7 +69,6 @@ pub struct LogSink { runtime: Arc, log: Arc>, pb: ProgressBar, - counter: u64, } impl LogSink { @@ -80,18 +79,7 @@ impl LogSink { labels: LabelsAndProgress, ) -> Self { let pb = labels.create_progress_bar(endpoint_name); - let counter = log.lock().end() as u64; - Self { - runtime, - log, - pb, - counter, - } - } - - fn update_counter(&mut self) { - self.counter += 1; - self.pb.set_position(self.counter); + Self { runtime, log, pb } } } @@ -102,35 +90,39 @@ impl Sink for LogSink { record_store: &ProcessorRecordStore, op: ProcessorOperation, ) -> Result<(), BoxedError> { - self.log - .lock() - .write(dozer_cache::dozer_log::replication::LogOperation::Op { + let end = self.runtime.block_on(self.log.lock()).write( + dozer_cache::dozer_log::replication::LogOperation::Op { op: op.load(record_store)?, - }); - self.update_counter(); + }, + ); + self.pb.set_position(end as u64); Ok(()) } fn commit(&mut self, epoch_details: &Epoch) -> Result<(), BoxedError> { - self.log.lock().write(LogOperation::Commit { - decision_instant: epoch_details.decision_instant, - }); - self.update_counter(); + let end = self + .runtime + .block_on(self.log.lock()) + .write(LogOperation::Commit { + decision_instant: epoch_details.decision_instant, + }); + self.pb.set_position(end as u64); Ok(()) } fn persist(&mut self, queue: &Queue) -> Result<(), BoxedError> { - self.log - .lock() + self.runtime + .block_on(self.log.lock()) .persist(queue, self.log.clone(), &self.runtime)?; Ok(()) } fn on_source_snapshotting_done(&mut self, connection_name: String) -> Result<(), BoxedError> { - self.log - .lock() + let end = self + .runtime + .block_on(self.log.lock()) .write(LogOperation::SnapshottingDone { connection_name }); - self.update_counter(); + self.pb.set_position(end as u64); Ok(()) } } diff --git a/dozer-cli/src/simple/executor.rs b/dozer-cli/src/simple/executor.rs index 30de921be0..09b12d89d3 100644 --- a/dozer-cli/src/simple/executor.rs +++ b/dozer-cli/src/simple/executor.rs @@ -6,8 +6,8 @@ use dozer_core::checkpoint::{CheckpointFactory, CheckpointFactoryOptions, Option use dozer_tracing::LabelsAndProgress; use dozer_types::models::api_endpoint::ApiEndpoint; use dozer_types::models::flags::Flags; -use dozer_types::parking_lot::Mutex; use tokio::runtime::Runtime; +use tokio::sync::Mutex; use std::sync::{atomic::AtomicBool, Arc}; diff --git a/dozer-log/src/replication/mod.rs b/dozer-log/src/replication/mod.rs index 6d59d4d924..c6e27588d7 100644 --- a/dozer-log/src/replication/mod.rs +++ b/dozer-log/src/replication/mod.rs @@ -5,13 +5,13 @@ use std::time::{Duration, SystemTime}; use dozer_types::grpc_types::internal::storage_response; use dozer_types::log::{debug, error}; -use dozer_types::parking_lot::Mutex; use dozer_types::serde::{Deserialize, Serialize}; use dozer_types::types::Operation; use dozer_types::{bincode, thiserror}; use pin_project::pin_project; use tokio::runtime::Runtime; use tokio::sync::oneshot::error::RecvError; +use tokio::sync::Mutex; use tokio::task::JoinHandle; use crate::storage::{Queue, Storage}; @@ -120,7 +120,8 @@ impl Log { self.in_memory.end() } - pub fn write(&mut self, op: LogOperation) { + /// Returns the log length after this write. + pub fn write(&mut self, op: LogOperation) -> usize { // Record operation. self.in_memory.ops.push(op); @@ -138,6 +139,8 @@ impl Log { true } }); + + self.end() } pub fn persist( @@ -169,7 +172,7 @@ impl Log { } }; - let mut this = this.lock(); + let mut this = this.lock().await; let this = this.deref_mut(); debug_assert!(persisted_log_entries_end(&this.persisted).unwrap_or(0) == range.start); debug_assert!(this.in_memory.start == range.start); @@ -198,7 +201,9 @@ impl Log { } /// Returned `LogResponse` is guaranteed to contain `request.start`, but may actually contain less then `request.end`. - pub fn read( + /// + /// Function is marked as `async` because it needs to run in a tokio runtime. + pub async fn read( &mut self, request: Range, timeout: Duration, @@ -230,7 +235,7 @@ impl Log { tokio::spawn(async move { // Try to trigger watcher when timeout. tokio::time::sleep(timeout).await; - let mut this = this.lock(); + let mut this = this.lock().await; let this = this.deref_mut(); // Find the watcher. It may have been triggered by slice fulfillment or persisting. if let Some((index, watcher)) = this diff --git a/dozer-log/src/replication/tests.rs b/dozer-log/src/replication/tests.rs index dd7fc47be5..5b315d67af 100644 --- a/dozer-log/src/replication/tests.rs +++ b/dozer-log/src/replication/tests.rs @@ -1,8 +1,7 @@ use std::{sync::Arc, time::Duration}; -use dozer_types::parking_lot::Mutex; use tempdir::TempDir; -use tokio::runtime::Runtime; +use tokio::{runtime::Runtime, sync::Mutex}; use crate::{ replication::{Log, LogOperation, LogResponse}, @@ -24,7 +23,6 @@ fn create_runtime() -> Arc { .into() } -#[allow(clippy::await_holding_lock)] #[tokio::test] async fn write_read() { let (_temp_dir, log, _) = create_test_log().await; @@ -41,26 +39,30 @@ async fn write_read() { }, ]; - let mut log_mut = log.lock(); + let mut log_mut = log.lock().await; for op in &ops { log_mut.write(op.clone()); } let range = 1..ops.len(); - let ops_read_future = log_mut.read(range.clone(), Duration::from_secs(1), log.clone()); + let ops_read_future = log_mut + .read(range.clone(), Duration::from_secs(1), log.clone()) + .await; drop(log_mut); let ops_read = ops_read_future.await.unwrap(); assert_eq!(ops_read, LogResponse::Operations(ops[range].to_vec())); } -#[allow(clippy::await_holding_lock)] #[tokio::test] async fn watch_write() { let (_temp_dir, log, _) = create_test_log().await; let range = 1..3; - let mut log_mut = log.lock(); - let handle = tokio::spawn(log_mut.read(range.clone(), Duration::from_secs(1), log.clone())); + let mut log_mut = log.lock().await; + let ops_read_future = log_mut + .read(range.clone(), Duration::from_secs(1), log.clone()) + .await; + let handle = tokio::spawn(ops_read_future); let ops = vec![ LogOperation::SnapshottingDone { @@ -82,16 +84,14 @@ async fn watch_write() { assert_eq!(ops_read, LogResponse::Operations(ops[range].to_vec())); } -#[allow(clippy::async_yields_async)] #[test] fn watch_partial() { let runtime = create_runtime(); let (_temp_dir, log, queue) = runtime.block_on(create_test_log()); - let mut log_mut = log.lock(); - let future = - runtime.block_on(async { log_mut.read(1..3, Duration::from_secs(1), log.clone()) }); + let mut log_mut = runtime.block_on(log.lock()); + let future = runtime.block_on(log_mut.read(1..3, Duration::from_secs(1), log.clone())); let handle = runtime.spawn(future); let ops = vec![ @@ -109,7 +109,8 @@ fn watch_partial() { // Persist must be called outside of tokio runtime. let runtime_clone = runtime.clone(); std::thread::spawn(move || { - log.lock() + runtime_clone + .block_on(log.lock()) .persist(&queue, log.clone(), &runtime_clone) .unwrap(); }) @@ -120,7 +121,6 @@ fn watch_partial() { assert_eq!(ops_read, LogResponse::Operations(ops[1..].to_vec())); } -#[allow(clippy::async_yields_async)] #[test] fn watch_out_of_range() { let runtime = create_runtime(); @@ -128,9 +128,8 @@ fn watch_out_of_range() { let (_temp_dir, log, queue) = runtime.block_on(create_test_log()); let range = 2..3; - let mut log_mut = log.lock(); - let future = runtime - .block_on(async { log_mut.read(range.clone(), Duration::from_secs(1), log.clone()) }); + let mut log_mut = runtime.block_on(log.lock()); + let future = runtime.block_on(log_mut.read(range.clone(), Duration::from_secs(1), log.clone())); let handle = runtime.spawn(future); let ops = vec![ @@ -153,15 +152,15 @@ fn watch_out_of_range() { // Persist must be called outside of tokio runtime. let runtime_clone = runtime.clone(); std::thread::spawn(move || { - log_clone - .lock() + runtime_clone + .block_on(log_clone.lock()) .persist(&queue, log_clone.clone(), &runtime_clone) .unwrap(); }) .join() .unwrap(); - log.lock().write(ops[2].clone()); + runtime.block_on(log.lock()).write(ops[2].clone()); let ops_read = runtime.block_on(handle).unwrap().unwrap(); assert_eq!(ops_read, LogResponse::Operations(ops[range].to_vec())); @@ -184,7 +183,7 @@ fn in_memory_log_should_shrink_after_persist() { connection_name: "2".to_string(), }, ]; - let mut log_mut = log.lock(); + let mut log_mut = runtime.block_on(log.lock()); log_mut.write(ops[0].clone()); log_mut.write(ops[1].clone()); drop(log_mut); @@ -193,31 +192,39 @@ fn in_memory_log_should_shrink_after_persist() { // Persist must be called outside of tokio runtime. let runtime_clone = runtime.clone(); let handle = std::thread::spawn(move || { - log_clone - .lock() + runtime_clone + .block_on(log_clone.lock()) .persist(&queue, log_clone.clone(), &runtime_clone) .unwrap() }) .join() .unwrap(); - log.lock().write(ops[2].clone()); + runtime.block_on(log.lock()).write(ops[2].clone()); runtime.block_on(handle).unwrap().unwrap(); assert!(matches!( runtime - .block_on(log.lock().read(0..1, Duration::from_secs(1), log.clone())) + .block_on(async move { + log.lock() + .await + .read(0..1, Duration::from_secs(1), log.clone()) + .await + .await + }) .unwrap(), LogResponse::Persisted(_) )); } -#[allow(clippy::await_holding_lock)] #[tokio::test] async fn watch_partial_timeout() { let (_temp_dir, log, _) = create_test_log().await; - let mut log_mut = log.lock(); - let handle = tokio::spawn(log_mut.read(0..2, Duration::from_secs(0), log.clone())); + let mut log_mut = log.lock().await; + let ops_read_future = log_mut + .read(0..2, Duration::from_secs(0), log.clone()) + .await; + let handle = tokio::spawn(ops_read_future); let op = LogOperation::SnapshottingDone { connection_name: "0".to_string(), @@ -229,7 +236,6 @@ async fn watch_partial_timeout() { assert_eq!(ops_read, LogResponse::Operations(vec![op])); } -#[allow(clippy::await_holding_lock)] #[tokio::test] async fn write_watch_partial_timeout() { let (_temp_dir, log, _) = create_test_log().await; @@ -237,10 +243,12 @@ async fn write_watch_partial_timeout() { let op = LogOperation::SnapshottingDone { connection_name: "0".to_string(), }; - let mut log_mut = log.lock(); + let mut log_mut = log.lock().await; log_mut.write(op.clone()); - let ops_read_future = log_mut.read(0..2, Duration::from_secs(0), log.clone()); + let ops_read_future = log_mut + .read(0..2, Duration::from_secs(0), log.clone()) + .await; drop(log_mut); let ops_read = ops_read_future.await.unwrap(); assert_eq!(ops_read, LogResponse::Operations(vec![op]));