Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Don't use blocking Mutex in async context #2073

Merged
merged 1 commit into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions dozer-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ http-body = "0.4.5"
bytes = "1.4.0"
http = "0.2.9"
pin-project = "1.1.3"
async-stream = "0.3.5"

[dev-dependencies]
tempdir = "0.3.7"
54 changes: 31 additions & 23 deletions dozer-api/src/grpc/internal/internal_pipeline_server.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use async_stream::stream;
use dozer_cache::dozer_log::home_dir::BuildId;
use dozer_cache::dozer_log::replication::{Log, LogResponseFuture};
use dozer_types::bincode;
Expand All @@ -11,16 +12,15 @@ use dozer_types::grpc_types::internal::{
use dozer_types::log::info;
use dozer_types::models::api_config::AppGrpcOptions;
use dozer_types::models::api_endpoint::ApiEndpoint;
use dozer_types::parking_lot::Mutex;
use dozer_types::tonic::transport::server::TcpIncoming;
use dozer_types::tonic::transport::Server;
use dozer_types::tonic::{self, Request, Response, Status, Streaming};
use futures_util::future::Either;
use futures_util::stream::BoxStream;
use futures_util::{Future, StreamExt, TryStreamExt};
use futures_util::{Future, StreamExt};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;

use crate::errors::GrpcError;
use crate::grpc::run_server;
Expand Down Expand Up @@ -52,7 +52,7 @@ impl InternalPipelineService for InternalPipelineServer {
) -> Result<Response<StorageResponse>, Status> {
let endpoint = request.into_inner().endpoint;
let log = &find_log_endpoint(&self.endpoints, &endpoint)?.log;
let storage = log.lock().describe_storage();
let storage = log.lock().await.describe_storage();
Ok(Response::new(StorageResponse {
storage: Some(storage),
}))
Expand Down Expand Up @@ -103,26 +103,34 @@ impl InternalPipelineService for InternalPipelineServer {
requests: Request<Streaming<LogRequest>>,
) -> Result<Response<Self::GetLogStream>, Status> {
let endpoints = self.endpoints.clone();
Ok(Response::new(
requests
.into_inner()
.and_then(move |request| {
let log = &match find_log_endpoint(&endpoints, &request.endpoint) {
Ok(log) => log,
Err(e) => return Either::Left(std::future::ready(Err(e))),
let requests = requests.into_inner();
let stream = stream! {
for await request in requests {
let request = match request {
Ok(request) => request,
Err(e) => {
yield Err(e);
continue;
}
.log;

let response = log.lock().read(
request.start as usize..request.end as usize,
Duration::from_millis(request.timeout_in_millis as u64),
log.clone(),
);

Either::Right(serialize_log_response(response))
})
.boxed(),
))
};

let log = &match find_log_endpoint(&endpoints, &request.endpoint) {
Ok(log) => log,
Err(e) => {
yield Err(e);
continue;
}
}.log;

let response = log.lock().await.read(
request.start as usize..request.end as usize,
Duration::from_millis(request.timeout_in_millis as u64),
log.clone(),
).await;
yield serialize_log_response(response).await;
}
};
Ok(Response::new(stream.boxed()))
}
}

Expand Down
2 changes: 1 addition & 1 deletion dozer-cli/src/pipeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ use dozer_types::models::connection::Connection;
use dozer_types::models::flags::Flags;
use dozer_types::models::source::Source;
use dozer_types::models::udf_config::UdfConfig;
use dozer_types::parking_lot::Mutex;
use std::hash::Hash;
use tokio::runtime::Runtime;
use tokio::sync::Mutex;

use crate::pipeline::dummy_sink::DummySinkFactory;
use crate::pipeline::LogSinkFactory;
Expand Down
50 changes: 21 additions & 29 deletions dozer-cli/src/pipeline/log_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ use dozer_core::{
};
use dozer_recordstore::ProcessorRecordStore;
use dozer_tracing::LabelsAndProgress;
use dozer_types::errors::internal::BoxedError;
use dozer_types::indicatif::ProgressBar;
use dozer_types::types::Schema;
use dozer_types::{errors::internal::BoxedError, parking_lot::Mutex};
use tokio::runtime::Runtime;
use tokio::{runtime::Runtime, sync::Mutex};

#[derive(Debug)]
pub struct LogSinkFactory {
Expand Down Expand Up @@ -69,7 +69,6 @@ pub struct LogSink {
runtime: Arc<Runtime>,
log: Arc<Mutex<Log>>,
pb: ProgressBar,
counter: u64,
}

impl LogSink {
Expand All @@ -80,18 +79,7 @@ impl LogSink {
labels: LabelsAndProgress,
) -> Self {
let pb = labels.create_progress_bar(endpoint_name);
let counter = log.lock().end() as u64;
Self {
runtime,
log,
pb,
counter,
}
}

fn update_counter(&mut self) {
self.counter += 1;
self.pb.set_position(self.counter);
Self { runtime, log, pb }
}
}

Expand All @@ -102,35 +90,39 @@ impl Sink for LogSink {
record_store: &ProcessorRecordStore,
op: ProcessorOperation,
) -> Result<(), BoxedError> {
self.log
.lock()
.write(dozer_cache::dozer_log::replication::LogOperation::Op {
let end = self.runtime.block_on(self.log.lock()).write(
dozer_cache::dozer_log::replication::LogOperation::Op {
op: op.load(record_store)?,
});
self.update_counter();
},
);
self.pb.set_position(end as u64);
Ok(())
}

fn commit(&mut self, epoch_details: &Epoch) -> Result<(), BoxedError> {
self.log.lock().write(LogOperation::Commit {
decision_instant: epoch_details.decision_instant,
});
self.update_counter();
let end = self
.runtime
.block_on(self.log.lock())
.write(LogOperation::Commit {
decision_instant: epoch_details.decision_instant,
});
self.pb.set_position(end as u64);
Ok(())
}

fn persist(&mut self, queue: &Queue) -> Result<(), BoxedError> {
self.log
.lock()
self.runtime
.block_on(self.log.lock())
.persist(queue, self.log.clone(), &self.runtime)?;
Ok(())
}

fn on_source_snapshotting_done(&mut self, connection_name: String) -> Result<(), BoxedError> {
self.log
.lock()
let end = self
.runtime
.block_on(self.log.lock())
.write(LogOperation::SnapshottingDone { connection_name });
self.update_counter();
self.pb.set_position(end as u64);
Ok(())
}
}
2 changes: 1 addition & 1 deletion dozer-cli/src/simple/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use dozer_core::checkpoint::{CheckpointFactory, CheckpointFactoryOptions, Option
use dozer_tracing::LabelsAndProgress;
use dozer_types::models::api_endpoint::ApiEndpoint;
use dozer_types::models::flags::Flags;
use dozer_types::parking_lot::Mutex;
use tokio::runtime::Runtime;
use tokio::sync::Mutex;

use std::sync::{atomic::AtomicBool, Arc};

Expand Down
15 changes: 10 additions & 5 deletions dozer-log/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ use std::time::{Duration, SystemTime};

use dozer_types::grpc_types::internal::storage_response;
use dozer_types::log::{debug, error};
use dozer_types::parking_lot::Mutex;
use dozer_types::serde::{Deserialize, Serialize};
use dozer_types::types::Operation;
use dozer_types::{bincode, thiserror};
use pin_project::pin_project;
use tokio::runtime::Runtime;
use tokio::sync::oneshot::error::RecvError;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;

use crate::storage::{Queue, Storage};
Expand Down Expand Up @@ -120,7 +120,8 @@ impl Log {
self.in_memory.end()
}

pub fn write(&mut self, op: LogOperation) {
/// Returns the log length after this write.
pub fn write(&mut self, op: LogOperation) -> usize {
// Record operation.
self.in_memory.ops.push(op);

Expand All @@ -138,6 +139,8 @@ impl Log {
true
}
});

self.end()
}

pub fn persist(
Expand Down Expand Up @@ -169,7 +172,7 @@ impl Log {
}
};

let mut this = this.lock();
let mut this = this.lock().await;
let this = this.deref_mut();
debug_assert!(persisted_log_entries_end(&this.persisted).unwrap_or(0) == range.start);
debug_assert!(this.in_memory.start == range.start);
Expand Down Expand Up @@ -198,7 +201,9 @@ impl Log {
}

/// Returned `LogResponse` is guaranteed to contain `request.start`, but may actually contain less then `request.end`.
pub fn read(
///
/// Function is marked as `async` because it needs to run in a tokio runtime.
pub async fn read(
&mut self,
request: Range<usize>,
timeout: Duration,
Expand Down Expand Up @@ -230,7 +235,7 @@ impl Log {
tokio::spawn(async move {
// Try to trigger watcher when timeout.
tokio::time::sleep(timeout).await;
let mut this = this.lock();
let mut this = this.lock().await;
let this = this.deref_mut();
// Find the watcher. It may have been triggered by slice fulfillment or persisting.
if let Some((index, watcher)) = this
Expand Down
Loading
Loading