Skip to content

Commit

Permalink
fix: Don't use blocking Mutex in async context
Browse files Browse the repository at this point in the history
  • Loading branch information
chubei committed Sep 22, 2023
1 parent 1951776 commit 5779581
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 79 deletions.
11 changes: 6 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions dozer-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ http-body = "0.4.5"
bytes = "1.4.0"
http = "0.2.9"
pin-project = "1.1.3"
async-stream = "0.3.5"

[dev-dependencies]
tempdir = "0.3.7"
54 changes: 31 additions & 23 deletions dozer-api/src/grpc/internal/internal_pipeline_server.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use async_stream::stream;
use dozer_cache::dozer_log::home_dir::BuildId;
use dozer_cache::dozer_log::replication::{Log, LogResponseFuture};
use dozer_types::bincode;
Expand All @@ -11,16 +12,15 @@ use dozer_types::grpc_types::internal::{
use dozer_types::log::info;
use dozer_types::models::api_config::AppGrpcOptions;
use dozer_types::models::api_endpoint::ApiEndpoint;
use dozer_types::parking_lot::Mutex;
use dozer_types::tonic::transport::server::TcpIncoming;
use dozer_types::tonic::transport::Server;
use dozer_types::tonic::{self, Request, Response, Status, Streaming};
use futures_util::future::Either;
use futures_util::stream::BoxStream;
use futures_util::{Future, StreamExt, TryStreamExt};
use futures_util::{Future, StreamExt};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;

use crate::errors::GrpcError;
use crate::grpc::run_server;
Expand Down Expand Up @@ -52,7 +52,7 @@ impl InternalPipelineService for InternalPipelineServer {
) -> Result<Response<StorageResponse>, Status> {
let endpoint = request.into_inner().endpoint;
let log = &find_log_endpoint(&self.endpoints, &endpoint)?.log;
let storage = log.lock().describe_storage();
let storage = log.lock().await.describe_storage();
Ok(Response::new(StorageResponse {
storage: Some(storage),
}))
Expand Down Expand Up @@ -103,26 +103,34 @@ impl InternalPipelineService for InternalPipelineServer {
requests: Request<Streaming<LogRequest>>,
) -> Result<Response<Self::GetLogStream>, Status> {
let endpoints = self.endpoints.clone();
Ok(Response::new(
requests
.into_inner()
.and_then(move |request| {
let log = &match find_log_endpoint(&endpoints, &request.endpoint) {
Ok(log) => log,
Err(e) => return Either::Left(std::future::ready(Err(e))),
let requests = requests.into_inner();
let stream = stream! {
for await request in requests {
let request = match request {
Ok(request) => request,
Err(e) => {
yield Err(e);
continue;
}
.log;

let response = log.lock().read(
request.start as usize..request.end as usize,
Duration::from_millis(request.timeout_in_millis as u64),
log.clone(),
);

Either::Right(serialize_log_response(response))
})
.boxed(),
))
};

let log = &match find_log_endpoint(&endpoints, &request.endpoint) {
Ok(log) => log,
Err(e) => {
yield Err(e);
continue;
}
}.log;

let response = log.lock().await.read(
request.start as usize..request.end as usize,
Duration::from_millis(request.timeout_in_millis as u64),
log.clone(),
).await;
yield serialize_log_response(response).await;
}
};
Ok(Response::new(stream.boxed()))
}
}

Expand Down
2 changes: 1 addition & 1 deletion dozer-cli/src/pipeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ use dozer_types::models::connection::Connection;
use dozer_types::models::flags::Flags;
use dozer_types::models::source::Source;
use dozer_types::models::udf_config::UdfConfig;
use dozer_types::parking_lot::Mutex;
use std::hash::Hash;
use tokio::runtime::Runtime;
use tokio::sync::Mutex;

use crate::pipeline::dummy_sink::DummySinkFactory;
use crate::pipeline::LogSinkFactory;
Expand Down
30 changes: 16 additions & 14 deletions dozer-cli/src/pipeline/log_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ use dozer_core::{
};
use dozer_recordstore::ProcessorRecordStore;
use dozer_tracing::LabelsAndProgress;
use dozer_types::errors::internal::BoxedError;
use dozer_types::indicatif::ProgressBar;
use dozer_types::types::Schema;
use dozer_types::{errors::internal::BoxedError, parking_lot::Mutex};
use tokio::runtime::Runtime;
use tokio::{runtime::Runtime, sync::Mutex};

#[derive(Debug)]
pub struct LogSinkFactory {
Expand Down Expand Up @@ -80,7 +80,7 @@ impl LogSink {
labels: LabelsAndProgress,
) -> Self {
let pb = labels.create_progress_bar(endpoint_name);
let counter = log.lock().end() as u64;
let counter = runtime.block_on(log.lock()).end() as u64;
Self {
runtime,
log,
Expand All @@ -102,33 +102,35 @@ impl Sink for LogSink {
record_store: &ProcessorRecordStore,
op: ProcessorOperation,
) -> Result<(), BoxedError> {
self.log
.lock()
.write(dozer_cache::dozer_log::replication::LogOperation::Op {
self.runtime.block_on(self.log.lock()).write(
dozer_cache::dozer_log::replication::LogOperation::Op {
op: op.load(record_store)?,
});
},
);
self.update_counter();
Ok(())
}

fn commit(&mut self, epoch_details: &Epoch) -> Result<(), BoxedError> {
self.log.lock().write(LogOperation::Commit {
decision_instant: epoch_details.decision_instant,
});
self.runtime
.block_on(self.log.lock())
.write(LogOperation::Commit {
decision_instant: epoch_details.decision_instant,
});
self.update_counter();
Ok(())
}

fn persist(&mut self, queue: &Queue) -> Result<(), BoxedError> {
self.log
.lock()
self.runtime
.block_on(self.log.lock())
.persist(queue, self.log.clone(), &self.runtime)?;
Ok(())
}

fn on_source_snapshotting_done(&mut self, connection_name: String) -> Result<(), BoxedError> {
self.log
.lock()
self.runtime
.block_on(self.log.lock())
.write(LogOperation::SnapshottingDone { connection_name });
self.update_counter();
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion dozer-cli/src/simple/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use dozer_core::checkpoint::{CheckpointFactory, CheckpointFactoryOptions, Option
use dozer_tracing::LabelsAndProgress;
use dozer_types::models::api_endpoint::ApiEndpoint;
use dozer_types::models::flags::Flags;
use dozer_types::parking_lot::Mutex;
use tokio::runtime::Runtime;
use tokio::sync::Mutex;

use std::sync::{atomic::AtomicBool, Arc};

Expand Down
10 changes: 6 additions & 4 deletions dozer-log/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ use std::time::{Duration, SystemTime};

use dozer_types::grpc_types::internal::storage_response;
use dozer_types::log::{debug, error};
use dozer_types::parking_lot::Mutex;
use dozer_types::serde::{Deserialize, Serialize};
use dozer_types::types::Operation;
use dozer_types::{bincode, thiserror};
use pin_project::pin_project;
use tokio::runtime::Runtime;
use tokio::sync::oneshot::error::RecvError;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;

use crate::storage::{Queue, Storage};
Expand Down Expand Up @@ -169,7 +169,7 @@ impl Log {
}
};

let mut this = this.lock();
let mut this = this.lock().await;
let this = this.deref_mut();
debug_assert!(persisted_log_entries_end(&this.persisted).unwrap_or(0) == range.start);
debug_assert!(this.in_memory.start == range.start);
Expand Down Expand Up @@ -198,7 +198,9 @@ impl Log {
}

/// Returned `LogResponse` is guaranteed to contain `request.start`, but may actually contain less then `request.end`.
pub fn read(
///
/// Function is marked as `async` because it needs to run in a tokio runtime.
pub async fn read(
&mut self,
request: Range<usize>,
timeout: Duration,
Expand Down Expand Up @@ -230,7 +232,7 @@ impl Log {
tokio::spawn(async move {
// Try to trigger watcher when timeout.
tokio::time::sleep(timeout).await;
let mut this = this.lock();
let mut this = this.lock().await;
let this = this.deref_mut();
// Find the watcher. It may have been triggered by slice fulfillment or persisting.
if let Some((index, watcher)) = this
Expand Down
Loading

0 comments on commit 5779581

Please sign in to comment.