Skip to content

Commit

Permalink
fix: LogReader task was not cancelled on drop
Browse files Browse the repository at this point in the history
  • Loading branch information
chubei committed Sep 26, 2023
1 parent 66074f9 commit 7f19d8e
Showing 1 changed file with 19 additions and 1 deletion.
20 changes: 19 additions & 1 deletion dozer-log/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use dozer_types::models::api_endpoint::{
use dozer_types::tonic::transport::Channel;
use dozer_types::tonic::Streaming;
use dozer_types::{bincode, serde_json};
use tokio::select;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::task::JoinHandle;
use tokio_stream::wrappers::ReceiverStream;
Expand Down Expand Up @@ -265,10 +266,27 @@ async fn call_get_log_once(
}

async fn log_reader_worker(
log_client: LogClient,
pos: u64,
options: LogReaderOptions,
op_sender: Sender<OpAndPos>,
) -> Result<(), ReaderError> {
select! {
_ = op_sender.closed() => {
debug!("Log reader thread quit because LogReader was dropped");
return Ok(());
}
result = log_reader_worker_loop(log_client, pos, options, &op_sender) => {
result
}
}
}

async fn log_reader_worker_loop(
mut log_client: LogClient,
mut pos: u64,
options: LogReaderOptions,
op_sender: Sender<OpAndPos>,
op_sender: &Sender<OpAndPos>,
) -> Result<(), ReaderError> {
loop {
// Request ops.
Expand Down

0 comments on commit 7f19d8e

Please sign in to comment.