Skip to content

Commit

Permalink
fix bug
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Sep 28, 2023
1 parent d38837e commit a03e0c5
Showing 1 changed file with 77 additions and 31 deletions.
108 changes: 77 additions & 31 deletions src/connector/src/sink/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,40 +329,38 @@ impl<F: TryFuture<Ok = ()> + Unpin + 'static> DeliveryFutureManager<F> {
) -> impl Future<Output = Result<TruncateOffset, F::Error>> + '_ {
poll_fn(move |cx| {
let mut latest_offset: Option<TruncateOffset> = None;
'outer: loop {
if let Some((epoch, item)) = self.items.front_mut() {
match item {
DeliveryFutureManagerItem::Chunk { chunk_id, futures } => {
while let Some(future) = futures.front_mut() {
match future.try_poll_unpin(cx) {
Poll::Ready(result) => match result {
Ok(()) => {
self.future_count -= 1;
futures.pop_front();
}
Err(e) => {
return Poll::Ready(Err(e));
}
},
Poll::Pending => {
break 'outer;
'outer: while let Some((epoch, item)) = self.items.front_mut() {
match item {
DeliveryFutureManagerItem::Chunk { chunk_id, futures } => {
while let Some(future) = futures.front_mut() {
match future.try_poll_unpin(cx) {
Poll::Ready(result) => match result {
Ok(()) => {
self.future_count -= 1;
futures.pop_front();
}
Err(e) => {
return Poll::Ready(Err(e));
}
},
Poll::Pending => {
break 'outer;
}
}

// when we reach here, there must not be any pending or error future.
// Which means all futures of this stream chunk have been finished
assert!(futures.is_empty());
latest_offset = Some(TruncateOffset::Chunk {
epoch: *epoch,
chunk_id: *chunk_id,
});
self.items.pop_front().expect("items not empty");
}
DeliveryFutureManagerItem::Barrier => {
// Barrier will be yielded anyway
return Poll::Ready(Ok(TruncateOffset::Barrier { epoch: *epoch }));
}

// when we reach here, there must not be any pending or error future.
// Which means all futures of this stream chunk have been finished
assert!(futures.is_empty());
latest_offset = Some(TruncateOffset::Chunk {
epoch: *epoch,
chunk_id: *chunk_id,
});
self.items.pop_front().expect("items not empty");
}
DeliveryFutureManagerItem::Barrier => {
// Barrier will be yielded anyway
return Poll::Ready(Ok(TruncateOffset::Barrier { epoch: *epoch }));
}
}
}
Expand All @@ -377,7 +375,15 @@ impl<F: TryFuture<Ok = ()> + Unpin + 'static> DeliveryFutureManager<F> {

#[cfg(test)]
mod tests {
use crate::sink::log_store::TruncateOffset;
use std::future::{poll_fn, Future};
use std::pin::pin;
use std::task::Poll;

use futures::{FutureExt, TryFuture};
use tokio::sync::oneshot;
use tokio::sync::oneshot::Receiver;

use crate::sink::log_store::{DeliveryFutureManager, TruncateOffset};

#[test]
fn test_truncate_offset_cmp() {
Expand Down Expand Up @@ -428,4 +434,44 @@ mod tests {
);
assert!(TruncateOffset::Barrier { epoch: 2 } > TruncateOffset::Barrier { epoch: 1 });
}

type TestFuture = impl TryFuture<Ok = (), Error = anyhow::Error> + Unpin + 'static;
fn to_test_future(rx: Receiver<anyhow::Result<()>>) -> TestFuture {
async move { rx.await.unwrap() }.boxed()
}

#[tokio::test]
async fn test_future_delivery_manager() {
let mut manager = DeliveryFutureManager::new(2);
let epoch1 = 233;
let chunk_id1 = 1;
let (tx1_1, rx1_1) = oneshot::channel();
let mut write_chunk = manager.start_write_chunk(epoch1, chunk_id1);
assert!(!write_chunk
.add_future_may_await(to_test_future(rx1_1))
.await
.unwrap());
assert_eq!(manager.future_count, 1);
{
let mut next_truncate_offset = pin!(manager.next_truncate_offset());
assert!(
poll_fn(|cx| Poll::Ready(next_truncate_offset.as_mut().poll(cx)))
.await
.is_pending()
);
tx1_1.send(Ok(())).unwrap();
assert_eq!(
next_truncate_offset.await.unwrap(),
TruncateOffset::Chunk {
epoch: epoch1,
chunk_id: chunk_id1
}
);
}
manager.add_barrier(epoch1);
assert_eq!(
manager.next_truncate_offset().await.unwrap(),
TruncateOffset::Barrier { epoch: epoch1 }
);
}
}

0 comments on commit a03e0c5

Please sign in to comment.