diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index 2a70002c42af..7659db79962e 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -65,7 +65,8 @@ pub fn to_owned_item((key, value): StateStoreIterItemRef<'_>) -> StorageResult: StateStoreIter + Sized { - type ItemStream: Stream> + Send; + type ItemStream Fn(T::ItemRef<'a>) -> StorageResult>: Stream> + + Send; fn into_stream Fn(T::ItemRef<'a>) -> StorageResult + Send>( self, @@ -150,7 +151,8 @@ impl> FusedStateStoreIter { } impl> StateStoreIterExt for I { - type ItemStream = impl Stream> + Send; + type ItemStream Fn(T::ItemRef<'a>) -> StorageResult> = + impl Stream> + Send; fn into_stream Fn(T::ItemRef<'a>) -> StorageResult + Send>( self, diff --git a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs index 67167f466a50..10f7a0eb99c8 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs @@ -43,6 +43,7 @@ use risingwave_storage::row_serde::row_serde_util::{serialize_pk, serialize_pk_w use risingwave_storage::row_serde::value_serde::ValueRowSerdeNew; use risingwave_storage::store::{StateStoreIterExt, StateStoreReadIter}; use risingwave_storage::table::{compute_vnode, TableDistribution, SINGLETON_VNODE}; +use risingwave_storage::StateStoreIter; use rw_futures_util::select_all; use crate::common::log_store_impl::kv_log_store::{ @@ -544,7 +545,7 @@ impl LogStoreRowOpStream { } } -pub(crate) type LogStoreItemMergeStream = +pub(crate) type LogStoreItemMergeStream = impl Stream>; pub(crate) fn merge_log_store_item_stream( iters: Vec,