diff --git a/src/batch/src/exchange_source.rs b/src/batch/src/exchange_source.rs index 5c34922a7c6df..e5aaa295e5dc2 100644 --- a/src/batch/src/exchange_source.rs +++ b/src/batch/src/exchange_source.rs @@ -16,6 +16,7 @@ use std::fmt::Debug; use std::future::Future; use risingwave_common::array::DataChunk; +use risingwave_common::error::Result; use crate::execution::grpc_exchange::GrpcExchangeSource; use crate::execution::local_exchange::LocalExchangeSource; @@ -24,11 +25,7 @@ use crate::task::TaskId; /// Each `ExchangeSource` maps to one task, it takes the execution result from task chunk by chunk. pub trait ExchangeSource: Send + Debug { - type TakeDataFuture<'a>: Future>> - + 'a - where - Self: 'a; - fn take_data(&mut self) -> Self::TakeDataFuture<'_>; + fn take_data(&mut self) -> impl Future>> + '_; /// Get upstream task id. fn get_task_id(&self) -> TaskId; @@ -42,9 +39,7 @@ pub enum ExchangeSourceImpl { } impl ExchangeSourceImpl { - pub(crate) async fn take_data( - &mut self, - ) -> risingwave_common::error::Result> { + pub(crate) async fn take_data(&mut self) -> Result> { match self { ExchangeSourceImpl::Grpc(grpc) => grpc.take_data().await, ExchangeSourceImpl::Local(local) => local.take_data().await, diff --git a/src/batch/src/execution/grpc_exchange.rs b/src/batch/src/execution/grpc_exchange.rs index 21705ab634c29..1ec24e5b440fb 100644 --- a/src/batch/src/execution/grpc_exchange.rs +++ b/src/batch/src/execution/grpc_exchange.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::fmt::{Debug, Formatter}; -use std::future::Future; use futures::StreamExt; use risingwave_common::array::DataChunk; @@ -73,26 +72,22 @@ impl Debug for GrpcExchangeSource { } impl ExchangeSource for GrpcExchangeSource { - type TakeDataFuture<'a> = impl Future>> + 'a; - - fn take_data(&mut self) -> Self::TakeDataFuture<'_> { - async { - let res = match self.stream.next().await { - None => { - return Ok(None); - } - Some(r) => r, - }; - let task_data = res?; - let data = DataChunk::from_protobuf(task_data.get_record_batch()?)?.compact(); - trace!( - "Receiver taskOutput = {:?}, data = {:?}", - self.task_output_id, - data - ); + async fn take_data(&mut self) -> Result> { + let res = match self.stream.next().await { + None => { + return Ok(None); + } + Some(r) => r, + }; + let task_data = res?; + let data = DataChunk::from_protobuf(task_data.get_record_batch()?)?.compact(); + trace!( + "Receiver taskOutput = {:?}, data = {:?}", + self.task_output_id, + data + ); - Ok(Some(data)) - } + Ok(Some(data)) } fn get_task_id(&self) -> TaskId { diff --git a/src/batch/src/execution/local_exchange.rs b/src/batch/src/execution/local_exchange.rs index b28687c5d25c2..c08bd6a7ef145 100644 --- a/src/batch/src/execution/local_exchange.rs +++ b/src/batch/src/execution/local_exchange.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::fmt::{Debug, Formatter}; -use std::future::Future; use risingwave_common::array::DataChunk; use risingwave_common::error::Result; @@ -52,23 +51,19 @@ impl Debug for LocalExchangeSource { } impl ExchangeSource for LocalExchangeSource { - type TakeDataFuture<'a> = impl Future>> + 'a; - - fn take_data(&mut self) -> Self::TakeDataFuture<'_> { - async { - let ret = self.task_output.direct_take_data().await?; - if let Some(data) = ret { - let data = data.compact(); - trace!( - "Receiver task: {:?}, source task output: {:?}, data: {:?}", - self.task_id, - self.task_output.id(), - data - ); - Ok(Some(data)) - } else { - Ok(None) - } + async fn take_data(&mut self) -> Result> { + let ret = self.task_output.direct_take_data().await?; + if let Some(data) = ret { + let data = data.compact(); + trace!( + "Receiver task: {:?}, source task output: {:?}, data: {:?}", + self.task_id, + self.task_output.id(), + data + ); + Ok(Some(data)) + } else { + Ok(None) } } diff --git a/src/batch/src/executor/test_utils.rs b/src/batch/src/executor/test_utils.rs index e6840ff3ea396..0b7e684348338 100644 --- a/src/batch/src/executor/test_utils.rs +++ b/src/batch/src/executor/test_utils.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::collections::VecDeque; -use std::future::Future; use assert_matches::assert_matches; use futures::StreamExt; @@ -246,15 +245,11 @@ impl FakeExchangeSource { } impl ExchangeSource for FakeExchangeSource { - type TakeDataFuture<'a> = impl Future>> + 'a; - - fn take_data(&mut self) -> Self::TakeDataFuture<'_> { - async { - if let Some(chunk) = self.chunks.pop() { - Ok(chunk) - } else { - Ok(None) - } + async fn take_data(&mut self) -> Result> { + if let Some(chunk) = self.chunks.pop() { + Ok(chunk) + } else { + Ok(None) } } diff --git a/src/batch/src/lib.rs b/src/batch/src/lib.rs index fe85ecab3223f..ac062a16c1c10 100644 --- a/src/batch/src/lib.rs +++ b/src/batch/src/lib.rs @@ -34,6 +34,7 @@ #![feature(result_option_inspect)] #![feature(assert_matches)] #![feature(lazy_cell)] +#![feature(return_position_impl_trait_in_trait)] mod error; pub mod exchange_source; diff --git a/src/connector/src/source/external.rs b/src/connector/src/source/external.rs index d4293fca1d6f9..3a47dea0b13ff 100644 --- a/src/connector/src/source/external.rs +++ b/src/connector/src/source/external.rs @@ -200,13 +200,9 @@ impl MySqlOffset { } pub trait ExternalTableReader { - type CdcOffsetFuture<'a>: Future> + Send + 'a - where - Self: 'a; - fn get_normalized_table_name(&self, table_name: &SchemaTableName) -> String; - fn current_cdc_offset(&self) -> Self::CdcOffsetFuture<'_>; + fn current_cdc_offset(&self) -> impl Future> + Send + '_; fn parse_binlog_offset(&self, offset: &str) -> ConnectorResult; @@ -248,32 +244,28 @@ pub struct ExternalTableConfig { } impl ExternalTableReader for MySqlExternalTableReader { - type CdcOffsetFuture<'a> = impl Future> + 'a; - fn get_normalized_table_name(&self, table_name: &SchemaTableName) -> String { format!("`{}`", table_name.table_name) } - fn current_cdc_offset(&self) -> Self::CdcOffsetFuture<'_> { - async move { - let mut conn = self - .pool - .get_conn() - .await - .map_err(|e| ConnectorError::Connection(anyhow!(e)))?; - - let sql = "SHOW MASTER STATUS".to_string(); - let mut rs = conn.query::(sql).await?; - let row = rs - .iter_mut() - .exactly_one() - .map_err(|e| ConnectorError::Internal(anyhow!("read binlog error: {}", e)))?; - - Ok(CdcOffset::MySql(MySqlOffset { - filename: row.take("File").unwrap(), - position: row.take("Position").unwrap(), - })) - } + async fn current_cdc_offset(&self) -> ConnectorResult { + let mut conn = self + .pool + .get_conn() + .await + .map_err(|e| ConnectorError::Connection(anyhow!(e)))?; + + let sql = "SHOW MASTER STATUS".to_string(); + let mut rs = conn.query::(sql).await?; + let row = rs + .iter_mut() + .exactly_one() + .map_err(|e| ConnectorError::Internal(anyhow!("read binlog error: {}", e)))?; + + Ok(CdcOffset::MySql(MySqlOffset { + filename: row.take("File").unwrap(), + position: row.take("Position").unwrap(), + })) } fn parse_binlog_offset(&self, offset: &str) -> ConnectorResult { @@ -478,8 +470,6 @@ impl MySqlExternalTableReader { } impl ExternalTableReader for ExternalTableReaderImpl { - type CdcOffsetFuture<'a> = impl Future> + 'a; - fn get_normalized_table_name(&self, table_name: &SchemaTableName) -> String { match self { ExternalTableReaderImpl::MySql(mysql) => mysql.get_normalized_table_name(table_name), @@ -487,12 +477,10 @@ impl ExternalTableReader for ExternalTableReaderImpl { } } - fn current_cdc_offset(&self) -> Self::CdcOffsetFuture<'_> { - async move { - match self { - ExternalTableReaderImpl::MySql(mysql) => mysql.current_cdc_offset().await, - ExternalTableReaderImpl::Mock(mock) => mock.current_cdc_offset().await, - } + async fn current_cdc_offset(&self) -> ConnectorResult { + match self { + ExternalTableReaderImpl::MySql(mysql) => mysql.current_cdc_offset().await, + ExternalTableReaderImpl::Mock(mock) => mock.current_cdc_offset().await, } } diff --git a/src/connector/src/source/mock_external_table.rs b/src/connector/src/source/mock_external_table.rs index c4e16aae6ae85..7224a32b9e571 100644 --- a/src/connector/src/source/mock_external_table.rs +++ b/src/connector/src/source/mock_external_table.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::future::Future; use std::sync::atomic::AtomicUsize; use futures::stream::BoxStream; @@ -91,24 +90,21 @@ impl MockExternalTableReader { } impl ExternalTableReader for MockExternalTableReader { - type CdcOffsetFuture<'a> = impl Future> + 'a; - fn get_normalized_table_name(&self, _table_name: &SchemaTableName) -> String { "`mock_table`".to_string() } - fn current_cdc_offset(&self) -> Self::CdcOffsetFuture<'_> { + async fn current_cdc_offset(&self) -> ConnectorResult { static IDX: AtomicUsize = AtomicUsize::new(0); - async move { - let idx = IDX.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - if idx < self.binlog_watermarks.len() { - Ok(CdcOffset::MySql(self.binlog_watermarks[idx].clone())) - } else { - Ok(CdcOffset::MySql(MySqlOffset { - filename: "1.binlog".to_string(), - position: u64::MAX, - })) - } + + let idx = IDX.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + if idx < self.binlog_watermarks.len() { + Ok(CdcOffset::MySql(self.binlog_watermarks[idx].clone())) + } else { + Ok(CdcOffset::MySql(MySqlOffset { + filename: "1.binlog".to_string(), + position: u64::MAX, + })) } } diff --git a/src/stream/src/executor/backfill/upstream_table/snapshot.rs b/src/stream/src/executor/backfill/upstream_table/snapshot.rs index 806d78700154e..0e17ba7e722c4 100644 --- a/src/stream/src/executor/backfill/upstream_table/snapshot.rs +++ b/src/stream/src/executor/backfill/upstream_table/snapshot.rs @@ -24,21 +24,17 @@ use risingwave_connector::source::external::{CdcOffset, ExternalTableReader}; use crate::executor::backfill::upstream_table::external::ExternalStorageTable; use crate::executor::backfill::utils::iter_chunks; -use crate::executor::{StreamExecutorResult, INVALID_EPOCH}; +use crate::executor::{StreamExecutorError, StreamExecutorResult, INVALID_EPOCH}; pub trait UpstreamTableRead { - type BinlogOffsetFuture<'a>: Future>> - + Send - + 'a - where - Self: 'a; - type SnapshotStream<'a>: Stream>> + Send + 'a - where - Self: 'a; - - fn snapshot_read(&self, args: SnapshotReadArgs) -> Self::SnapshotStream<'_>; - - fn current_binlog_offset(&self) -> Self::BinlogOffsetFuture<'_>; + fn snapshot_read( + &self, + args: SnapshotReadArgs, + ) -> impl Stream>> + Send + '_; + + fn current_binlog_offset( + &self, + ) -> impl Future>> + Send + '_; } #[derive(Debug, Default)] @@ -92,52 +88,43 @@ impl UpstreamTableReader { } impl UpstreamTableRead for UpstreamTableReader { - type BinlogOffsetFuture<'a> = - impl Future>> + 'a; - type SnapshotStream<'a> = impl Stream>> + 'a; - - fn snapshot_read(&self, args: SnapshotReadArgs) -> Self::SnapshotStream<'_> { - #[try_stream] - async move { - let primary_keys = self - .inner - .pk_indices() - .iter() - .map(|idx| { - let f = &self.inner.schema().fields[*idx]; - f.name.clone() - }) - .collect_vec(); - - tracing::debug!( - "snapshot_read primary keys: {:?}, current_pos: {:?}", - primary_keys, - args.current_pos - ); - - let row_stream = self.inner.table_reader().snapshot_read( - self.inner.schema_table_name(), - args.current_pos, - primary_keys, - ); - - pin_mut!(row_stream); - - let mut builder = - DataChunkBuilder::new(self.inner.schema().data_types(), args.chunk_size); - let chunk_stream = iter_chunks(row_stream, &mut builder); - #[for_await] - for chunk in chunk_stream { - yield chunk?; - } + #[try_stream(ok = Option, error = StreamExecutorError)] + async fn snapshot_read(&self, args: SnapshotReadArgs) { + let primary_keys = self + .inner + .pk_indices() + .iter() + .map(|idx| { + let f = &self.inner.schema().fields[*idx]; + f.name.clone() + }) + .collect_vec(); + + tracing::debug!( + "snapshot_read primary keys: {:?}, current_pos: {:?}", + primary_keys, + args.current_pos + ); + + let row_stream = self.inner.table_reader().snapshot_read( + self.inner.schema_table_name(), + args.current_pos, + primary_keys, + ); + + pin_mut!(row_stream); + + let mut builder = DataChunkBuilder::new(self.inner.schema().data_types(), args.chunk_size); + let chunk_stream = iter_chunks(row_stream, &mut builder); + #[for_await] + for chunk in chunk_stream { + yield chunk?; } } - fn current_binlog_offset(&self) -> Self::BinlogOffsetFuture<'_> { - async move { - let binlog = self.inner.table_reader().current_cdc_offset(); - let binlog = binlog.await?; - Ok(Some(binlog)) - } + async fn current_binlog_offset(&self) -> StreamExecutorResult> { + let binlog = self.inner.table_reader().current_cdc_offset(); + let binlog = binlog.await?; + Ok(Some(binlog)) } } diff --git a/src/stream/src/executor/dispatch.rs b/src/stream/src/executor/dispatch.rs index 17b8866543c6f..024b830161ccf 100644 --- a/src/stream/src/executor/dispatch.rs +++ b/src/stream/src/executor/dispatch.rs @@ -433,27 +433,15 @@ macro_rules! for_all_dispatcher_variants { for_all_dispatcher_variants! { impl_dispatcher } -macro_rules! define_dispatcher_associated_types { - () => { - type DataFuture<'a> = impl DispatchFuture<'a>; - type BarrierFuture<'a> = impl DispatchFuture<'a>; - type WatermarkFuture<'a> = impl DispatchFuture<'a>; - }; -} - pub trait DispatchFuture<'a> = Future> + Send; pub trait Dispatcher: Debug + 'static { - type DataFuture<'a>: DispatchFuture<'a>; - type BarrierFuture<'a>: DispatchFuture<'a>; - type WatermarkFuture<'a>: DispatchFuture<'a>; - /// Dispatch a data chunk to downstream actors. - fn dispatch_data(&mut self, chunk: StreamChunk) -> Self::DataFuture<'_>; + fn dispatch_data(&mut self, chunk: StreamChunk) -> impl DispatchFuture<'_>; /// Dispatch a barrier to downstream actors, generally by broadcasting it. - fn dispatch_barrier(&mut self, barrier: Barrier) -> Self::BarrierFuture<'_>; + fn dispatch_barrier(&mut self, barrier: Barrier) -> impl DispatchFuture<'_>; /// Dispatch a watermark to downstream actors, generally by broadcasting it. - fn dispatch_watermark(&mut self, watermark: Watermark) -> Self::WatermarkFuture<'_>; + fn dispatch_watermark(&mut self, watermark: Watermark) -> impl DispatchFuture<'_>; /// Add new outputs to the dispatcher. fn add_outputs(&mut self, outputs: impl IntoIterator); @@ -493,38 +481,30 @@ impl RoundRobinDataDispatcher { } impl Dispatcher for RoundRobinDataDispatcher { - define_dispatcher_associated_types!(); - - fn dispatch_data(&mut self, chunk: StreamChunk) -> Self::DataFuture<'_> { - async move { - let chunk = chunk.project(&self.output_indices); - self.outputs[self.cur].send(Message::Chunk(chunk)).await?; - self.cur += 1; - self.cur %= self.outputs.len(); - Ok(()) - } + async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> { + let chunk = chunk.project(&self.output_indices); + self.outputs[self.cur].send(Message::Chunk(chunk)).await?; + self.cur += 1; + self.cur %= self.outputs.len(); + Ok(()) } - fn dispatch_barrier(&mut self, barrier: Barrier) -> Self::BarrierFuture<'_> { - async move { - // always broadcast barrier - for output in &mut self.outputs { - output.send(Message::Barrier(barrier.clone())).await?; - } - Ok(()) + async fn dispatch_barrier(&mut self, barrier: Barrier) -> StreamResult<()> { + // always broadcast barrier + for output in &mut self.outputs { + output.send(Message::Barrier(barrier.clone())).await?; } + Ok(()) } - fn dispatch_watermark(&mut self, watermark: Watermark) -> Self::WatermarkFuture<'_> { - async move { - if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) { - // always broadcast watermark - for output in &mut self.outputs { - output.send(Message::Watermark(watermark.clone())).await?; - } + async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> { + if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) { + // always broadcast watermark + for output in &mut self.outputs { + output.send(Message::Watermark(watermark.clone())).await?; } - Ok(()) } + Ok(()) } fn add_outputs(&mut self, outputs: impl IntoIterator) { @@ -586,112 +566,102 @@ impl HashDataDispatcher { } impl Dispatcher for HashDataDispatcher { - define_dispatcher_associated_types!(); - fn add_outputs(&mut self, outputs: impl IntoIterator) { self.outputs.extend(outputs); } - fn dispatch_barrier(&mut self, barrier: Barrier) -> Self::BarrierFuture<'_> { - async move { - // always broadcast barrier - for output in &mut self.outputs { - output.send(Message::Barrier(barrier.clone())).await?; - } - Ok(()) + async fn dispatch_barrier(&mut self, barrier: Barrier) -> StreamResult<()> { + // always broadcast barrier + for output in &mut self.outputs { + output.send(Message::Barrier(barrier.clone())).await?; } + Ok(()) } - fn dispatch_watermark(&mut self, watermark: Watermark) -> Self::WatermarkFuture<'_> { - async move { - if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) { - // always broadcast watermark - for output in &mut self.outputs { - output.send(Message::Watermark(watermark.clone())).await?; - } + async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> { + if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) { + // always broadcast watermark + for output in &mut self.outputs { + output.send(Message::Watermark(watermark.clone())).await?; } - Ok(()) } + Ok(()) } - fn dispatch_data(&mut self, chunk: StreamChunk) -> Self::DataFuture<'_> { - async move { - // A chunk can be shuffled into multiple output chunks that to be sent to downstreams. - // In these output chunks, the only difference are visibility map, which is calculated - // by the hash value of each line in the input chunk. - let num_outputs = self.outputs.len(); + async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> { + // A chunk can be shuffled into multiple output chunks that to be sent to downstreams. + // In these output chunks, the only difference are visibility map, which is calculated + // by the hash value of each line in the input chunk. + let num_outputs = self.outputs.len(); - // get hash value of every line by its key - let vnodes = VirtualNode::compute_chunk(chunk.data_chunk(), &self.keys); + // get hash value of every line by its key + let vnodes = VirtualNode::compute_chunk(chunk.data_chunk(), &self.keys); - tracing::trace!(target: "events::stream::dispatch::hash", "\n{}\n keys {:?} => {:?}", chunk.to_pretty(), self.keys, vnodes); + tracing::trace!(target: "events::stream::dispatch::hash", "\n{}\n keys {:?} => {:?}", chunk.to_pretty(), self.keys, vnodes); - let mut vis_maps = repeat_with(|| BitmapBuilder::with_capacity(chunk.capacity())) - .take(num_outputs) - .collect_vec(); - let mut last_vnode_when_update_delete = None; - let mut new_ops: Vec = Vec::with_capacity(chunk.capacity()); + let mut vis_maps = repeat_with(|| BitmapBuilder::with_capacity(chunk.capacity())) + .take(num_outputs) + .collect_vec(); + let mut last_vnode_when_update_delete = None; + let mut new_ops: Vec = Vec::with_capacity(chunk.capacity()); - // Apply output indices after calculating the vnode. - let chunk = chunk.project(&self.output_indices); + // Apply output indices after calculating the vnode. + let chunk = chunk.project(&self.output_indices); - for ((vnode, &op), visible) in vnodes - .iter() - .copied() - .zip_eq_fast(chunk.ops()) - .zip_eq_fast(chunk.vis().iter()) - { - // Build visibility map for every output chunk. - for (output, vis_map) in self.outputs.iter().zip_eq_fast(vis_maps.iter_mut()) { - vis_map.append( - visible && self.hash_mapping[vnode.to_index()] == output.actor_id(), - ); - } + for ((vnode, &op), visible) in vnodes + .iter() + .copied() + .zip_eq_fast(chunk.ops()) + .zip_eq_fast(chunk.vis().iter()) + { + // Build visibility map for every output chunk. + for (output, vis_map) in self.outputs.iter().zip_eq_fast(vis_maps.iter_mut()) { + vis_map.append(visible && self.hash_mapping[vnode.to_index()] == output.actor_id()); + } - if !visible { - new_ops.push(op); - continue; - } + if !visible { + new_ops.push(op); + continue; + } - // The 'update' message, noted by an `UpdateDelete` and a successive `UpdateInsert`, - // need to be rewritten to common `Delete` and `Insert` if they were dispatched to - // different actors. - if op == Op::UpdateDelete { - last_vnode_when_update_delete = Some(vnode); - } else if op == Op::UpdateInsert { - if vnode != last_vnode_when_update_delete.unwrap() { - new_ops.push(Op::Delete); - new_ops.push(Op::Insert); - } else { - new_ops.push(Op::UpdateDelete); - new_ops.push(Op::UpdateInsert); - } + // The 'update' message, noted by an `UpdateDelete` and a successive `UpdateInsert`, + // need to be rewritten to common `Delete` and `Insert` if they were dispatched to + // different actors. + if op == Op::UpdateDelete { + last_vnode_when_update_delete = Some(vnode); + } else if op == Op::UpdateInsert { + if vnode != last_vnode_when_update_delete.unwrap() { + new_ops.push(Op::Delete); + new_ops.push(Op::Insert); } else { - new_ops.push(op); + new_ops.push(Op::UpdateDelete); + new_ops.push(Op::UpdateInsert); } + } else { + new_ops.push(op); } + } - let ops = new_ops; - - // individually output StreamChunk integrated with vis_map - for (vis_map, output) in vis_maps.into_iter().zip_eq_fast(self.outputs.iter_mut()) { - let vis_map = vis_map.finish(); - // columns is not changed in this function - let new_stream_chunk = - StreamChunk::new(ops.clone(), chunk.columns().into(), Some(vis_map)); - if new_stream_chunk.cardinality() > 0 { - event!( - tracing::Level::TRACE, - msg = "chunk", - downstream = output.actor_id(), - "send = \n{:#?}", - new_stream_chunk - ); - output.send(Message::Chunk(new_stream_chunk)).await?; - } + let ops = new_ops; + + // individually output StreamChunk integrated with vis_map + for (vis_map, output) in vis_maps.into_iter().zip_eq_fast(self.outputs.iter_mut()) { + let vis_map = vis_map.finish(); + // columns is not changed in this function + let new_stream_chunk = + StreamChunk::new(ops.clone(), chunk.columns().into(), Some(vis_map)); + if new_stream_chunk.cardinality() > 0 { + event!( + tracing::Level::TRACE, + msg = "chunk", + downstream = output.actor_id(), + "send = \n{:#?}", + new_stream_chunk + ); + output.send(Message::Chunk(new_stream_chunk)).await?; } - Ok(()) } + Ok(()) } fn remove_outputs(&mut self, actor_ids: &HashSet) { @@ -740,37 +710,29 @@ impl BroadcastDispatcher { } impl Dispatcher for BroadcastDispatcher { - define_dispatcher_associated_types!(); - - fn dispatch_data(&mut self, chunk: StreamChunk) -> Self::DataFuture<'_> { - async move { - let chunk = chunk.project(&self.output_indices); - for output in self.outputs.values_mut() { - output.send(Message::Chunk(chunk.clone())).await?; - } - Ok(()) + async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> { + let chunk = chunk.project(&self.output_indices); + for output in self.outputs.values_mut() { + output.send(Message::Chunk(chunk.clone())).await?; } + Ok(()) } - fn dispatch_barrier(&mut self, barrier: Barrier) -> Self::BarrierFuture<'_> { - async move { - for output in self.outputs.values_mut() { - output.send(Message::Barrier(barrier.clone())).await?; - } - Ok(()) + async fn dispatch_barrier(&mut self, barrier: Barrier) -> StreamResult<()> { + for output in self.outputs.values_mut() { + output.send(Message::Barrier(barrier.clone())).await?; } + Ok(()) } - fn dispatch_watermark(&mut self, watermark: Watermark) -> Self::WatermarkFuture<'_> { - async move { - if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) { - // always broadcast watermark - for output in self.outputs.values_mut() { - output.send(Message::Watermark(watermark.clone())).await?; - } + async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> { + if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) { + // always broadcast watermark + for output in self.outputs.values_mut() { + output.send(Message::Watermark(watermark.clone())).await?; } - Ok(()) } + Ok(()) } fn add_outputs(&mut self, outputs: impl IntoIterator) { @@ -828,49 +790,41 @@ impl SimpleDispatcher { } impl Dispatcher for SimpleDispatcher { - define_dispatcher_associated_types!(); - fn add_outputs(&mut self, outputs: impl IntoIterator) { self.output.extend(outputs); assert!(self.output.len() <= 2); } - fn dispatch_barrier(&mut self, barrier: Barrier) -> Self::BarrierFuture<'_> { - async move { - // Only barrier is allowed to be dispatched to multiple outputs during migration. - for output in &mut self.output { - output.send(Message::Barrier(barrier.clone())).await?; - } - Ok(()) + async fn dispatch_barrier(&mut self, barrier: Barrier) -> StreamResult<()> { + // Only barrier is allowed to be dispatched to multiple outputs during migration. + for output in &mut self.output { + output.send(Message::Barrier(barrier.clone())).await?; } + Ok(()) } - fn dispatch_data(&mut self, chunk: StreamChunk) -> Self::DataFuture<'_> { - async move { - let output = self - .output - .iter_mut() - .exactly_one() - .expect("expect exactly one output"); + async fn dispatch_data(&mut self, chunk: StreamChunk) -> StreamResult<()> { + let output = self + .output + .iter_mut() + .exactly_one() + .expect("expect exactly one output"); - let chunk = chunk.project(&self.output_indices); - output.send(Message::Chunk(chunk)).await - } + let chunk = chunk.project(&self.output_indices); + output.send(Message::Chunk(chunk)).await } - fn dispatch_watermark(&mut self, watermark: Watermark) -> Self::WatermarkFuture<'_> { - async move { - let output = self - .output - .iter_mut() - .exactly_one() - .expect("expect exactly one output"); + async fn dispatch_watermark(&mut self, watermark: Watermark) -> StreamResult<()> { + let output = self + .output + .iter_mut() + .exactly_one() + .expect("expect exactly one output"); - if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) { - output.send(Message::Watermark(watermark)).await?; - } - Ok(()) + if let Some(watermark) = watermark.transform_with_indices(&self.output_indices) { + output.send(Message::Watermark(watermark)).await?; } + Ok(()) } fn remove_outputs(&mut self, actor_ids: &HashSet) {