From a03e0c52dd8c2162a432b34905454b616d18464a Mon Sep 17 00:00:00 2001 From: William Wen Date: Thu, 28 Sep 2023 18:38:50 +0800 Subject: [PATCH] fix bug --- src/connector/src/sink/log_store.rs | 108 ++++++++++++++++++++-------- 1 file changed, 77 insertions(+), 31 deletions(-) diff --git a/src/connector/src/sink/log_store.rs b/src/connector/src/sink/log_store.rs index 8ff02e7268121..2debbb9ae711c 100644 --- a/src/connector/src/sink/log_store.rs +++ b/src/connector/src/sink/log_store.rs @@ -329,40 +329,38 @@ impl + Unpin + 'static> DeliveryFutureManager { ) -> impl Future> + '_ { poll_fn(move |cx| { let mut latest_offset: Option = None; - 'outer: loop { - if let Some((epoch, item)) = self.items.front_mut() { - match item { - DeliveryFutureManagerItem::Chunk { chunk_id, futures } => { - while let Some(future) = futures.front_mut() { - match future.try_poll_unpin(cx) { - Poll::Ready(result) => match result { - Ok(()) => { - self.future_count -= 1; - futures.pop_front(); - } - Err(e) => { - return Poll::Ready(Err(e)); - } - }, - Poll::Pending => { - break 'outer; + 'outer: while let Some((epoch, item)) = self.items.front_mut() { + match item { + DeliveryFutureManagerItem::Chunk { chunk_id, futures } => { + while let Some(future) = futures.front_mut() { + match future.try_poll_unpin(cx) { + Poll::Ready(result) => match result { + Ok(()) => { + self.future_count -= 1; + futures.pop_front(); } + Err(e) => { + return Poll::Ready(Err(e)); + } + }, + Poll::Pending => { + break 'outer; } } - - // when we reach here, there must not be any pending or error future. - // Which means all futures of this stream chunk have been finished - assert!(futures.is_empty()); - latest_offset = Some(TruncateOffset::Chunk { - epoch: *epoch, - chunk_id: *chunk_id, - }); - self.items.pop_front().expect("items not empty"); - } - DeliveryFutureManagerItem::Barrier => { - // Barrier will be yielded anyway - return Poll::Ready(Ok(TruncateOffset::Barrier { epoch: *epoch })); } + + // when we reach here, there must not be any pending or error future. + // Which means all futures of this stream chunk have been finished + assert!(futures.is_empty()); + latest_offset = Some(TruncateOffset::Chunk { + epoch: *epoch, + chunk_id: *chunk_id, + }); + self.items.pop_front().expect("items not empty"); + } + DeliveryFutureManagerItem::Barrier => { + // Barrier will be yielded anyway + return Poll::Ready(Ok(TruncateOffset::Barrier { epoch: *epoch })); } } } @@ -377,7 +375,15 @@ impl + Unpin + 'static> DeliveryFutureManager { #[cfg(test)] mod tests { - use crate::sink::log_store::TruncateOffset; + use std::future::{poll_fn, Future}; + use std::pin::pin; + use std::task::Poll; + + use futures::{FutureExt, TryFuture}; + use tokio::sync::oneshot; + use tokio::sync::oneshot::Receiver; + + use crate::sink::log_store::{DeliveryFutureManager, TruncateOffset}; #[test] fn test_truncate_offset_cmp() { @@ -428,4 +434,44 @@ mod tests { ); assert!(TruncateOffset::Barrier { epoch: 2 } > TruncateOffset::Barrier { epoch: 1 }); } + + type TestFuture = impl TryFuture + Unpin + 'static; + fn to_test_future(rx: Receiver>) -> TestFuture { + async move { rx.await.unwrap() }.boxed() + } + + #[tokio::test] + async fn test_future_delivery_manager() { + let mut manager = DeliveryFutureManager::new(2); + let epoch1 = 233; + let chunk_id1 = 1; + let (tx1_1, rx1_1) = oneshot::channel(); + let mut write_chunk = manager.start_write_chunk(epoch1, chunk_id1); + assert!(!write_chunk + .add_future_may_await(to_test_future(rx1_1)) + .await + .unwrap()); + assert_eq!(manager.future_count, 1); + { + let mut next_truncate_offset = pin!(manager.next_truncate_offset()); + assert!( + poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx))) + .await + .is_pending() + ); + tx1_1.send(Ok(())).unwrap(); + assert_eq!( + next_truncate_offset.await.unwrap(), + TruncateOffset::Chunk { + epoch: epoch1, + chunk_id: chunk_id1 + } + ); + } + manager.add_barrier(epoch1); + assert_eq!( + manager.next_truncate_offset().await.unwrap(), + TruncateOffset::Barrier { epoch: epoch1 } + ); + } }