From 7f19d8eb80b948cc8f91db09c1b552d90ed12253 Mon Sep 17 00:00:00 2001 From: chubei <914745487@qq.com> Date: Wed, 27 Sep 2023 00:30:40 +0800 Subject: [PATCH] fix: `LogReader` task was not cancelled on drop --- dozer-log/src/reader.rs | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/dozer-log/src/reader.rs b/dozer-log/src/reader.rs index 627c529b27..ba0620935d 100644 --- a/dozer-log/src/reader.rs +++ b/dozer-log/src/reader.rs @@ -16,6 +16,7 @@ use dozer_types::models::api_endpoint::{ use dozer_types::tonic::transport::Channel; use dozer_types::tonic::Streaming; use dozer_types::{bincode, serde_json}; +use tokio::select; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::task::JoinHandle; use tokio_stream::wrappers::ReceiverStream; @@ -265,10 +266,27 @@ async fn call_get_log_once( } async fn log_reader_worker( + log_client: LogClient, + pos: u64, + options: LogReaderOptions, + op_sender: Sender, +) -> Result<(), ReaderError> { + select! { + _ = op_sender.closed() => { + debug!("Log reader thread quit because LogReader was dropped"); + return Ok(()); + } + result = log_reader_worker_loop(log_client, pos, options, &op_sender) => { + result + } + } +} + +async fn log_reader_worker_loop( mut log_client: LogClient, mut pos: u64, options: LogReaderOptions, - op_sender: Sender, + op_sender: &Sender, ) -> Result<(), ReaderError> { loop { // Request ops.