From 904f1e662f24c34b3683276b44a6fc1569627d54 Mon Sep 17 00:00:00 2001 From: TennyZhuang Date: Tue, 16 Jan 2024 15:37:22 +0800 Subject: [PATCH 1/3] refactor(utils): make rw_futures_util a standalone crate Signed-off-by: TennyZhuang --- Cargo.lock | 18 +++++ Cargo.toml | 2 + src/batch/Cargo.toml | 1 + src/batch/src/executor/generic_exchange.rs | 2 +- src/batch/src/executor/row_seq_scan.rs | 2 +- src/batch/src/executor/union.rs | 2 +- src/common/Cargo.toml | 1 + src/common/src/util/mod.rs | 5 -- src/connector/Cargo.toml | 1 + src/connector/src/sink/remote.rs | 2 +- src/connector/src/sink/writer.rs | 2 +- src/frontend/Cargo.toml | 1 + .../src/scheduler/distributed/stage.rs | 2 +- src/jni_core/Cargo.toml | 1 + src/jni_core/src/hummock_iterator.rs | 2 +- src/meta/Cargo.toml | 1 + src/meta/src/barrier/rpc.rs | 2 +- src/meta/src/hummock/manager/mod.rs | 2 +- .../src/manager/sink_coordination/manager.rs | 2 +- src/rpc_client/Cargo.toml | 1 + src/rpc_client/src/lib.rs | 2 +- src/source/Cargo.toml | 1 + src/source/src/connector_source.rs | 2 +- src/stream/Cargo.toml | 1 + src/stream/src/executor/project.rs | 2 +- src/utils/futures_util/Cargo.toml | 22 ++++++ .../futures_util/src}/buffered_with_fence.rs | 71 +++++++------------ src/utils/futures_util/src/lib.rs | 63 ++++++++++++++++ .../mod.rs => utils/futures_util/src/misc.rs} | 9 +-- 29 files changed, 155 insertions(+), 70 deletions(-) create mode 100644 src/utils/futures_util/Cargo.toml rename src/{common/src/util/future_utils => utils/futures_util/src}/buffered_with_fence.rs (80%) create mode 100644 src/utils/futures_util/src/lib.rs rename src/{common/src/util/future_utils/mod.rs => utils/futures_util/src/misc.rs} (94%) diff --git a/Cargo.lock b/Cargo.lock index 01792a8b108d9..79d9374fec73e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8552,6 +8552,7 @@ dependencies = [ "risingwave_rpc_client", "risingwave_source", "risingwave_storage", + "rw_futures_util", "scopeguard", "serde_json", "task_stats_alloc", @@ -8730,6 +8731,7 @@ dependencies = [ "risingwave_pb", "rust_decimal", "rusty-fork", + "rw_futures_util", "ryu", "serde", "serde_bytes", @@ -8974,6 +8976,7 @@ dependencies = [ "risingwave_pb", "risingwave_rpc_client", "rust_decimal", + "rw_futures_util", "serde", "serde_derive", "serde_json", @@ -9216,6 +9219,7 @@ dependencies = [ "risingwave_storage", "risingwave_udf", "risingwave_variables", + "rw_futures_util", "serde", "serde_json", "sha2", @@ -9334,6 +9338,7 @@ dependencies = [ "risingwave_object_store", "risingwave_pb", "risingwave_storage", + "rw_futures_util", "serde", "serde_json", "thiserror", @@ -9411,6 +9416,7 @@ dependencies = [ "risingwave_rpc_client", "risingwave_sqlparser", "risingwave_test_runner", + "rw_futures_util", "scopeguard", "sea-orm", "serde", @@ -9616,6 +9622,7 @@ dependencies = [ "risingwave_error", "risingwave_hummock_sdk", "risingwave_pb", + "rw_futures_util", "static_assertions", "thiserror", "thiserror-ext", @@ -9723,6 +9730,7 @@ dependencies = [ "risingwave_common", "risingwave_connector", "risingwave_pb", + "rw_futures_util", "tempfile", "tracing", "workspace-hack", @@ -9914,6 +9922,7 @@ dependencies = [ "risingwave_rpc_client", "risingwave_source", "risingwave_storage", + "rw_futures_util", "serde", "serde_json", "serde_yaml", @@ -10230,6 +10239,15 @@ dependencies = [ "wait-timeout", ] +[[package]] +name = "rw_futures_util" +version = "0.0.0" +dependencies = [ + "futures", + "pin-project-lite", + "tokio", +] + [[package]] name = "ryu" version = "1.0.15" diff --git a/Cargo.toml b/Cargo.toml index 555e6a7d2a1e0..7bd67bc583745 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ members = [ "src/tests/sqlsmith", "src/tests/state_cleaning_test", "src/utils/delta_btree_map", + "src/utils/futures_util", "src/utils/local_stats_alloc", "src/utils/pgwire", "src/utils/runtime", @@ -187,6 +188,7 @@ risingwave_udf = { path = "./src/expr/udf" } risingwave_variables = { path = "./src/utils/variables" } risingwave_java_binding = { path = "./src/java_binding" } risingwave_jni_core = { path = "src/jni_core" } +rw_futures_util = { path = "src/utils/futures_util" } tokio-util = "0.7" [workspace.lints.rust] diff --git a/src/batch/Cargo.toml b/src/batch/Cargo.toml index 3660318180c16..93656967b801f 100644 --- a/src/batch/Cargo.toml +++ b/src/batch/Cargo.toml @@ -37,6 +37,7 @@ risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true } risingwave_source = { workspace = true } risingwave_storage = { workspace = true } +rw_futures_util = { workspace = true } scopeguard = "1" serde_json = "1" thiserror = "1" diff --git a/src/batch/src/executor/generic_exchange.rs b/src/batch/src/executor/generic_exchange.rs index e54cb9069a393..c2cb928340cb4 100644 --- a/src/batch/src/executor/generic_exchange.rs +++ b/src/batch/src/executor/generic_exchange.rs @@ -18,7 +18,7 @@ use itertools::Itertools; use risingwave_common::array::DataChunk; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_common::util::select_all; +use rw_futures_util::select_all; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::PbExchangeSource; use risingwave_pb::plan_common::Field as NodeField; diff --git a/src/batch/src/executor/row_seq_scan.rs b/src/batch/src/executor/row_seq_scan.rs index 6a5ec3cdf704f..e0ca044086103 100644 --- a/src/batch/src/executor/row_seq_scan.rs +++ b/src/batch/src/executor/row_seq_scan.rs @@ -24,7 +24,7 @@ use risingwave_common::catalog::{ColumnId, Schema}; use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::types::{DataType, Datum}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; -use risingwave_common::util::select_all; +use rw_futures_util::select_all; use risingwave_common::util::value_encoding::deserialize_datum; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::{scan_range, PbScanRange}; diff --git a/src/batch/src/executor/union.rs b/src/batch/src/executor/union.rs index 00d01f93448f7..2d39d963ed92d 100644 --- a/src/batch/src/executor/union.rs +++ b/src/batch/src/executor/union.rs @@ -17,7 +17,7 @@ use futures_async_stream::try_stream; use itertools::Itertools; use risingwave_common::array::DataChunk; use risingwave_common::catalog::Schema; -use risingwave_common::util::select_all; +use rw_futures_util::select_all; use risingwave_pb::batch_plan::plan_node::NodeBody; use crate::error::{BatchError, Result}; diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml index cb859665fb6d6..f6741129d2428 100644 --- a/src/common/Cargo.toml +++ b/src/common/Cargo.toml @@ -82,6 +82,7 @@ risingwave_common_proc_macro = { path = "./proc_macro" } risingwave_error = { workspace = true } risingwave_pb = { workspace = true } rust_decimal = { version = "1", features = ["db-postgres", "maths"] } +rw_futures_util = { workspace = true } ryu = "1.0" serde = { version = "1", features = ["derive"] } serde_bytes = "0.11" diff --git a/src/common/src/util/mod.rs b/src/common/src/util/mod.rs index a629512b3d63e..917d982be3db6 100644 --- a/src/common/src/util/mod.rs +++ b/src/common/src/util/mod.rs @@ -21,7 +21,6 @@ pub mod compress; pub mod deployment; pub mod env_var; pub mod epoch; -mod future_utils; pub mod hash_util; pub mod iter_util; pub mod memcmp_encoding; @@ -42,9 +41,5 @@ pub mod tracing; pub mod value_encoding; pub mod worker_util; -pub use future_utils::{ - await_future_with_monitor_error_stream, drop_either_future, pending_on_none, select_all, - RwFutureExt, RwTryStreamExt, -}; #[macro_use] pub mod match_util; diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 791cc076d12e2..3f469e5ad65ae 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -110,6 +110,7 @@ risingwave_jni_core = { workspace = true } risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true } rust_decimal = "1" +rw_futures_util = { workspace = true } serde = { version = "1", features = ["derive", "rc"] } serde_derive = "1" serde_json = "1" diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 320f77c6a47ba..260f948b4200a 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -29,7 +29,7 @@ use risingwave_common::array::StreamChunk; use risingwave_common::catalog::{ColumnDesc, ColumnId}; use risingwave_common::error::anyhow_error; use risingwave_common::types::DataType; -use risingwave_common::util::drop_either_future; +use rw_futures_util::drop_either_future; use risingwave_jni_core::jvm_runtime::JVM; use risingwave_jni_core::{ call_static_method, gen_class_name, JniReceiverType, JniSenderType, JniSinkWriterStreamRequest, diff --git a/src/connector/src/sink/writer.rs b/src/connector/src/sink/writer.rs index fdfe1acd4301e..1d8142061f35b 100644 --- a/src/connector/src/sink/writer.rs +++ b/src/connector/src/sink/writer.rs @@ -22,7 +22,7 @@ use futures::future::{select, Either}; use futures::TryFuture; use risingwave_common::array::StreamChunk; use risingwave_common::buffer::Bitmap; -use risingwave_common::util::drop_either_future; +use rw_futures_util::drop_either_future; use crate::sink::encoder::SerTo; use crate::sink::formatter::SinkFormatter; diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index d6996008507a9..f692531c6470e 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -67,6 +67,7 @@ risingwave_sqlparser = { workspace = true } risingwave_storage = { workspace = true } risingwave_udf = { workspace = true } risingwave_variables = { workspace = true } +rw_futures_util = { workspace = true } serde = { version = "1", features = ["derive"] } serde_json = "1" sha2 = "0.10.7" diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index 2d0df049da3fa..e205f8fd175da 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -33,7 +33,7 @@ use risingwave_common::array::DataChunk; use risingwave_common::hash::ParallelUnitMapping; use risingwave_common::util::addr::HostAddr; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_common::util::select_all; +use rw_futures_util::select_all; use risingwave_connector::source::SplitMetaData; use risingwave_expr::expr_context::expr_context_scope; use risingwave_pb::batch_plan::plan_node::NodeBody; diff --git a/src/jni_core/Cargo.toml b/src/jni_core/Cargo.toml index 736c94e58060d..bb2f8bda463f4 100644 --- a/src/jni_core/Cargo.toml +++ b/src/jni_core/Cargo.toml @@ -24,6 +24,7 @@ risingwave_hummock_sdk = { workspace = true } risingwave_object_store = { workspace = true } risingwave_pb = { workspace = true } risingwave_storage = { workspace = true } +rw_futures_util = { workspace = true } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" thiserror = "1" diff --git a/src/jni_core/src/hummock_iterator.rs b/src/jni_core/src/hummock_iterator.rs index 009dce22c9dd9..c66669d559154 100644 --- a/src/jni_core/src/hummock_iterator.rs +++ b/src/jni_core/src/hummock_iterator.rs @@ -20,7 +20,6 @@ use risingwave_common::catalog::ColumnDesc; use risingwave_common::config::ObjectStoreConfig; use risingwave_common::hash::VirtualNode; use risingwave_common::row::OwnedRow; -use risingwave_common::util::select_all; use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde; use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde, ValueRowDeserializer}; use risingwave_hummock_sdk::key::{prefixed_range_with_vnode, TableKeyRange}; @@ -39,6 +38,7 @@ use risingwave_storage::hummock::{ use risingwave_storage::monitor::HummockStateStoreMetrics; use risingwave_storage::row_serde::value_serde::ValueRowSerdeNew; use risingwave_storage::store::{ReadOptions, StateStoreReadIterStream, StreamTypeOfIter}; +use rw_futures_util::select_all; use tokio::sync::mpsc::unbounded_channel; type SelectAllIterStream = impl StateStoreReadIterStream + Unpin; diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index c97dfab2d429a..013ce2200f0d1 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -59,6 +59,7 @@ risingwave_object_store = { workspace = true } risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true } risingwave_sqlparser = { workspace = true } +rw_futures_util = { workspace = true } scopeguard = "1.2.0" sea-orm = { version = "0.12.0", features = [ "sqlx-mysql", diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index e79ffadf3d991..b4ef87252cc84 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -24,7 +24,7 @@ use futures::stream::FuturesUnordered; use futures::{FutureExt, StreamExt}; use itertools::Itertools; use risingwave_common::bail; -use risingwave_common::util::pending_on_none; +use rw_futures_util::pending_on_none; use risingwave_common::util::tracing::TracingContext; use risingwave_pb::stream_plan::{Barrier, BarrierMutation}; use risingwave_pb::stream_service::{BarrierCompleteRequest, InjectBarrierRequest}; diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 5b07ff5be5123..f842518a2ec92 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -32,7 +32,7 @@ use risingwave_common::config::default::compaction_config; use risingwave_common::config::ObjectStoreConfig; use risingwave_common::monitor::rwlock::MonitoredRwLock; use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH}; -use risingwave_common::util::{pending_on_none, select_all}; +use rw_futures_util::{pending_on_none, select_all}; use risingwave_hummock_sdk::compact::{compact_task_to_string, statistics_compact_task}; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ build_version_delta_after_version, get_compaction_group_ids, diff --git a/src/meta/src/manager/sink_coordination/manager.rs b/src/meta/src/manager/sink_coordination/manager.rs index 34b4073916e6c..3919df81f6f75 100644 --- a/src/meta/src/manager/sink_coordination/manager.rs +++ b/src/meta/src/manager/sink_coordination/manager.rs @@ -20,7 +20,7 @@ use futures::future::{select, BoxFuture, Either}; use futures::stream::FuturesUnordered; use futures::{FutureExt, Stream, StreamExt, TryStreamExt}; use risingwave_common::buffer::Bitmap; -use risingwave_common::util::pending_on_none; +use rw_futures_util::pending_on_none; use risingwave_connector::sink::catalog::SinkId; use risingwave_connector::sink::SinkParam; use risingwave_pb::connector_service::coordinate_request::Msg; diff --git a/src/rpc_client/Cargo.toml b/src/rpc_client/Cargo.toml index 450bc894586ef..7a43b6359cd68 100644 --- a/src/rpc_client/Cargo.toml +++ b/src/rpc_client/Cargo.toml @@ -29,6 +29,7 @@ risingwave_common = { workspace = true } risingwave_error = { workspace = true } risingwave_hummock_sdk = { workspace = true } risingwave_pb = { workspace = true } +rw_futures_util = { workspace = true } static_assertions = "1" thiserror = "1" thiserror-ext = { workspace = true } diff --git a/src/rpc_client/src/lib.rs b/src/rpc_client/src/lib.rs index 726d2e4c6c986..17168d94f3ac6 100644 --- a/src/rpc_client/src/lib.rs +++ b/src/rpc_client/src/lib.rs @@ -61,7 +61,7 @@ pub use compute_client::{ComputeClient, ComputeClientPool, ComputeClientPoolRef} pub use connector_client::{ConnectorClient, SinkCoordinatorStreamHandle, SinkWriterStreamHandle}; pub use hummock_meta_client::{CompactionEventItem, HummockMetaClient}; pub use meta_client::{MetaClient, SinkCoordinationRpcClient}; -use risingwave_common::util::await_future_with_monitor_error_stream; +use rw_futures_util::await_future_with_monitor_error_stream; pub use sink_coordinate_client::CoordinatorStreamHandle; pub use stream_client::{StreamClient, StreamClientPool, StreamClientPoolRef}; diff --git a/src/source/Cargo.toml b/src/source/Cargo.toml index 735ca5f10d9b6..9949fd8ab11fa 100644 --- a/src/source/Cargo.toml +++ b/src/source/Cargo.toml @@ -23,6 +23,7 @@ rand = "0.8" risingwave_common = { workspace = true } risingwave_connector = { workspace = true } risingwave_pb = { workspace = true } +rw_futures_util = { workspace = true } tokio = { version = "0.2", package = "madsim-tokio", features = ["rt", "rt-multi-thread", "sync", "macros", "time", "signal", "fs"] } tracing = { version = "0.1" } diff --git a/src/source/src/connector_source.rs b/src/source/src/connector_source.rs index 441a91836bb0a..f126c2692a77b 100644 --- a/src/source/src/connector_source.rs +++ b/src/source/src/connector_source.rs @@ -25,7 +25,6 @@ use risingwave_common::bail; use risingwave_common::catalog::ColumnId; use risingwave_common::error::ErrorCode::ConnectorError; use risingwave_common::error::{Result, RwError}; -use risingwave_common::util::select_all; use risingwave_connector::dispatch_source_prop; use risingwave_connector::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig}; use risingwave_connector::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator; @@ -37,6 +36,7 @@ use risingwave_connector::source::{ create_split_reader, BoxSourceWithStateStream, BoxTryStream, Column, ConnectorProperties, ConnectorState, FsFilterCtrlCtx, SourceColumnDesc, SourceContext, SplitReader, }; +use rw_futures_util::select_all; use tokio::time; use tokio::time::Duration; diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml index d3a035fa8b594..dcc2a6f3a4cb5 100644 --- a/src/stream/Cargo.toml +++ b/src/stream/Cargo.toml @@ -54,6 +54,7 @@ risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true } risingwave_source = { workspace = true } risingwave_storage = { workspace = true } +rw_futures_util = { workspace = true } serde_json = "1" smallvec = "1" static_assertions = "1" diff --git a/src/stream/src/executor/project.rs b/src/stream/src/executor/project.rs index 07052f1408185..559178bd4cdd1 100644 --- a/src/stream/src/executor/project.rs +++ b/src/stream/src/executor/project.rs @@ -22,7 +22,7 @@ use risingwave_common::catalog::Schema; use risingwave_common::row::{Row, RowExt}; use risingwave_common::types::ToOwnedDatum; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_common::util::{RwFutureExt, RwTryStreamExt}; +use rw_futures_util::{RwFutureExt, RwTryStreamExt}; use risingwave_expr::expr::NonStrictExpression; use super::*; diff --git a/src/utils/futures_util/Cargo.toml b/src/utils/futures_util/Cargo.toml new file mode 100644 index 0000000000000..97bd794daaf8d --- /dev/null +++ b/src/utils/futures_util/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "rw_futures_util" +edition = { workspace = true } +license = { workspace = true } +repository = { workspace = true } +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[package.metadata.cargo-machete] +ignored = ["workspace-hack"] + +[package.metadata.cargo-udeps.ignore] +normal = ["workspace-hack"] + +[dependencies] +futures = "0.3" +pin-project-lite = "0.2" + +[dev-dependencies] +tokio = { version = "1", features = ["full"] } + +[lints] +workspace = true diff --git a/src/common/src/util/future_utils/buffered_with_fence.rs b/src/utils/futures_util/src/buffered_with_fence.rs similarity index 80% rename from src/common/src/util/future_utils/buffered_with_fence.rs rename to src/utils/futures_util/src/buffered_with_fence.rs index 30b1938fda991..6271fd189a587 100644 --- a/src/common/src/util/future_utils/buffered_with_fence.rs +++ b/src/utils/futures_util/src/buffered_with_fence.rs @@ -45,6 +45,21 @@ pin_project! { } } +impl TryBufferedWithFence +where + St: TryStream, + St::Ok: TryFuture + MaybeFence, +{ + pub(crate) fn new(stream: St, n: usize) -> Self { + Self { + stream: stream.into_stream().fuse(), + in_progress_queue: FuturesOrdered::new(), + syncing: false, + max: n, + } + } +} + impl Stream for TryBufferedWithFence where St: TryStream, @@ -100,6 +115,15 @@ pin_project! { } } +impl Fenced +where + Fut: Future, +{ + pub(crate) fn new(inner: Fut, is_fence: bool) -> Self { + Self { inner, is_fence } + } +} + impl Future for Fenced where Fut: Future, @@ -131,51 +155,6 @@ where } } -pub trait RwFutureExt: Future { - fn with_fence(self, is_fence: bool) -> Fenced - where - Self: Sized; -} - -impl RwFutureExt for Fut { - fn with_fence(self, is_fence: bool) -> Fenced { - Fenced { - inner: self, - is_fence, - } - } -} - -pub trait RwTryStreamExt: TryStream { - /// Similar to [`TryStreamExt::try_buffered`](https://docs.rs/futures/latest/futures/stream/trait.TryStreamExt.html#method.try_buffered), but respect to fence. - /// - /// Fence is provided by [`Future`] that implements [`MaybeFence`] and returns `true`. - /// When the stream receive a fenced future, it'll not do a sync operation. In brief, don't poll later futures until the current - /// buffer is cleared. - fn try_buffered_with_fence(self, n: usize) -> TryBufferedWithFence - where - Self: Sized, - Self::Ok: TryFuture + MaybeFence; -} - -impl RwTryStreamExt for St -where - St: TryStream, -{ - fn try_buffered_with_fence(self, n: usize) -> TryBufferedWithFence - where - Self: Sized, - Self::Ok: TryFuture + MaybeFence, - { - TryBufferedWithFence { - stream: self.into_stream().fuse(), - in_progress_queue: FuturesOrdered::new(), - syncing: false, - max: n, - } - } -} - #[cfg(test)] mod tests { use std::sync::{Arc, Mutex}; @@ -183,7 +162,7 @@ mod tests { use futures::stream::StreamExt; - use super::{RwFutureExt, RwTryStreamExt}; + use crate::{RwFutureExt, RwTryStreamExt}; #[tokio::test] async fn test_buffered_with_fence() { diff --git a/src/utils/futures_util/src/lib.rs b/src/utils/futures_util/src/lib.rs new file mode 100644 index 0000000000000..a1fbb7470bd65 --- /dev/null +++ b/src/utils/futures_util/src/lib.rs @@ -0,0 +1,63 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![feature(lint_reasons)] + +use std::future::Future; + +use futures::stream::TryStream; +use futures::TryFuture; + +mod buffered_with_fence; +mod misc; + +pub use misc::*; +use buffered_with_fence::{MaybeFence, Fenced, TryBufferedWithFence}; + +pub trait RwTryStreamExt: TryStream { + /// Similar to [`TryStreamExt::try_buffered`](https://docs.rs/futures/latest/futures/stream/trait.TryStreamExt.html#method.try_buffered), but respect to fence. + /// + /// Fence is provided by [`Future`] that implements [`MaybeFence`] and returns `true`. + /// When the stream receive a fenced future, it'll not do a sync operation. In brief, don't poll later futures until the current + /// buffer is cleared. + fn try_buffered_with_fence(self, n: usize) -> TryBufferedWithFence + where + Self: Sized, + Self::Ok: TryFuture + MaybeFence; +} + +impl RwTryStreamExt for St +where + St: TryStream, +{ + fn try_buffered_with_fence(self, n: usize) -> TryBufferedWithFence + where + Self: Sized, + Self::Ok: TryFuture + MaybeFence, + { + TryBufferedWithFence::new(self, n) + } +} + +pub trait RwFutureExt: Future { + fn with_fence(self, is_fence: bool) -> Fenced + where + Self: Sized; +} + +impl RwFutureExt for Fut { + fn with_fence(self, is_fence: bool) -> Fenced { + Fenced::new(self, is_fence) + } +} diff --git a/src/common/src/util/future_utils/mod.rs b/src/utils/futures_util/src/misc.rs similarity index 94% rename from src/common/src/util/future_utils/mod.rs rename to src/utils/futures_util/src/misc.rs index d71ebfa7d6765..01317b3e85f24 100644 --- a/src/common/src/util/future_utils/mod.rs +++ b/src/utils/futures_util/src/misc.rs @@ -12,15 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod buffered_with_fence; - -use std::future::pending; +use std::future::Future; use std::pin::{pin, Pin}; -pub use buffered_with_fence::*; -use futures::future::{select, Either}; +use futures::future::{pending, select, Either}; use futures::stream::Peekable; -use futures::{Future, FutureExt, Stream, StreamExt}; +use futures::{FutureExt, Stream, StreamExt}; /// Convert a list of streams into a [`Stream`] of results from the streams. pub fn select_all( From d35d28c9646201ffa014c8705d10bae519f8d82e Mon Sep 17 00:00:00 2001 From: TennyZhuang Date: Tue, 16 Jan 2024 15:38:51 +0800 Subject: [PATCH 2/3] fmt Signed-off-by: TennyZhuang --- src/batch/src/executor/generic_exchange.rs | 2 +- src/batch/src/executor/row_seq_scan.rs | 2 +- src/batch/src/executor/union.rs | 2 +- src/connector/src/sink/remote.rs | 2 +- src/frontend/src/scheduler/distributed/stage.rs | 2 +- src/meta/src/barrier/rpc.rs | 2 +- src/meta/src/hummock/manager/mod.rs | 2 +- src/meta/src/manager/sink_coordination/manager.rs | 2 +- src/stream/src/executor/project.rs | 2 +- src/utils/futures_util/src/lib.rs | 2 +- 10 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/batch/src/executor/generic_exchange.rs b/src/batch/src/executor/generic_exchange.rs index c2cb928340cb4..704a085fec245 100644 --- a/src/batch/src/executor/generic_exchange.rs +++ b/src/batch/src/executor/generic_exchange.rs @@ -18,11 +18,11 @@ use itertools::Itertools; use risingwave_common::array::DataChunk; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::util::iter_util::ZipEqFast; -use rw_futures_util::select_all; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::PbExchangeSource; use risingwave_pb::plan_common::Field as NodeField; use risingwave_rpc_client::ComputeClientPoolRef; +use rw_futures_util::select_all; use crate::error::{BatchError, Result}; use crate::exchange_source::ExchangeSourceImpl; diff --git a/src/batch/src/executor/row_seq_scan.rs b/src/batch/src/executor/row_seq_scan.rs index e0ca044086103..bf2fb9613b7eb 100644 --- a/src/batch/src/executor/row_seq_scan.rs +++ b/src/batch/src/executor/row_seq_scan.rs @@ -24,7 +24,6 @@ use risingwave_common::catalog::{ColumnId, Schema}; use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::types::{DataType, Datum}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; -use rw_futures_util::select_all; use risingwave_common::util::value_encoding::deserialize_datum; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::{scan_range, PbScanRange}; @@ -34,6 +33,7 @@ use risingwave_storage::store::PrefetchOptions; use risingwave_storage::table::batch_table::storage_table::StorageTable; use risingwave_storage::table::{collect_data_chunk, TableDistribution}; use risingwave_storage::{dispatch_state_store, StateStore}; +use rw_futures_util::select_all; use crate::error::{BatchError, Result}; use crate::executor::{ diff --git a/src/batch/src/executor/union.rs b/src/batch/src/executor/union.rs index 2d39d963ed92d..e37baed08debc 100644 --- a/src/batch/src/executor/union.rs +++ b/src/batch/src/executor/union.rs @@ -17,8 +17,8 @@ use futures_async_stream::try_stream; use itertools::Itertools; use risingwave_common::array::DataChunk; use risingwave_common::catalog::Schema; -use rw_futures_util::select_all; use risingwave_pb::batch_plan::plan_node::NodeBody; +use rw_futures_util::select_all; use crate::error::{BatchError, Result}; use crate::executor::{ diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 260f948b4200a..6c4e12c5997a0 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -29,7 +29,6 @@ use risingwave_common::array::StreamChunk; use risingwave_common::catalog::{ColumnDesc, ColumnId}; use risingwave_common::error::anyhow_error; use risingwave_common::types::DataType; -use rw_futures_util::drop_either_future; use risingwave_jni_core::jvm_runtime::JVM; use risingwave_jni_core::{ call_static_method, gen_class_name, JniReceiverType, JniSenderType, JniSinkWriterStreamRequest, @@ -49,6 +48,7 @@ use risingwave_rpc_client::{ BidiStreamReceiver, BidiStreamSender, SinkCoordinatorStreamHandle, SinkWriterStreamHandle, DEFAULT_BUFFER_SIZE, }; +use rw_futures_util::drop_either_future; use tokio::sync::mpsc; use tokio::sync::mpsc::{unbounded_channel, Receiver, Sender}; use tokio::task::spawn_blocking; diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index e205f8fd175da..568f6de613a95 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -33,7 +33,6 @@ use risingwave_common::array::DataChunk; use risingwave_common::hash::ParallelUnitMapping; use risingwave_common::util::addr::HostAddr; use risingwave_common::util::iter_util::ZipEqFast; -use rw_futures_util::select_all; use risingwave_connector::source::SplitMetaData; use risingwave_expr::expr_context::expr_context_scope; use risingwave_pb::batch_plan::plan_node::NodeBody; @@ -45,6 +44,7 @@ use risingwave_pb::common::{BatchQueryEpoch, HostAddress, WorkerNode}; use risingwave_pb::plan_common::ExprContext; use risingwave_pb::task_service::{CancelTaskRequest, TaskInfoResponse}; use risingwave_rpc_client::ComputeClientPoolRef; +use rw_futures_util::select_all; use thiserror_ext::AsReport; use tokio::spawn; use tokio::sync::mpsc::{Receiver, Sender}; diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index b4ef87252cc84..b9661a37d8e83 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -24,11 +24,11 @@ use futures::stream::FuturesUnordered; use futures::{FutureExt, StreamExt}; use itertools::Itertools; use risingwave_common::bail; -use rw_futures_util::pending_on_none; use risingwave_common::util::tracing::TracingContext; use risingwave_pb::stream_plan::{Barrier, BarrierMutation}; use risingwave_pb::stream_service::{BarrierCompleteRequest, InjectBarrierRequest}; use risingwave_rpc_client::StreamClientPoolRef; +use rw_futures_util::pending_on_none; use tokio::sync::oneshot; use uuid::Uuid; diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index f842518a2ec92..66d6b1aaa14cf 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -32,7 +32,6 @@ use risingwave_common::config::default::compaction_config; use risingwave_common::config::ObjectStoreConfig; use risingwave_common::monitor::rwlock::MonitoredRwLock; use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH}; -use rw_futures_util::{pending_on_none, select_all}; use risingwave_hummock_sdk::compact::{compact_task_to_string, statistics_compact_task}; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ build_version_delta_after_version, get_compaction_group_ids, @@ -64,6 +63,7 @@ use risingwave_pb::hummock::{ PbCompactionGroupInfo, SstableInfo, SubscribeCompactionEventRequest, TableOption, }; use risingwave_pb::meta::subscribe_response::{Info, Operation}; +use rw_futures_util::{pending_on_none, select_all}; use thiserror_ext::AsReport; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::oneshot::Sender; diff --git a/src/meta/src/manager/sink_coordination/manager.rs b/src/meta/src/manager/sink_coordination/manager.rs index 3919df81f6f75..2c1d248565d48 100644 --- a/src/meta/src/manager/sink_coordination/manager.rs +++ b/src/meta/src/manager/sink_coordination/manager.rs @@ -20,11 +20,11 @@ use futures::future::{select, BoxFuture, Either}; use futures::stream::FuturesUnordered; use futures::{FutureExt, Stream, StreamExt, TryStreamExt}; use risingwave_common::buffer::Bitmap; -use rw_futures_util::pending_on_none; use risingwave_connector::sink::catalog::SinkId; use risingwave_connector::sink::SinkParam; use risingwave_pb::connector_service::coordinate_request::Msg; use risingwave_pb::connector_service::{coordinate_request, CoordinateRequest, CoordinateResponse}; +use rw_futures_util::pending_on_none; use tokio::sync::mpsc; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::sync::oneshot::{channel, Receiver, Sender}; diff --git a/src/stream/src/executor/project.rs b/src/stream/src/executor/project.rs index 559178bd4cdd1..8cbd7e66e4897 100644 --- a/src/stream/src/executor/project.rs +++ b/src/stream/src/executor/project.rs @@ -22,8 +22,8 @@ use risingwave_common::catalog::Schema; use risingwave_common::row::{Row, RowExt}; use risingwave_common::types::ToOwnedDatum; use risingwave_common::util::iter_util::ZipEqFast; -use rw_futures_util::{RwFutureExt, RwTryStreamExt}; use risingwave_expr::expr::NonStrictExpression; +use rw_futures_util::{RwFutureExt, RwTryStreamExt}; use super::*; diff --git a/src/utils/futures_util/src/lib.rs b/src/utils/futures_util/src/lib.rs index a1fbb7470bd65..e4dfe78c922f8 100644 --- a/src/utils/futures_util/src/lib.rs +++ b/src/utils/futures_util/src/lib.rs @@ -22,8 +22,8 @@ use futures::TryFuture; mod buffered_with_fence; mod misc; +use buffered_with_fence::{Fenced, MaybeFence, TryBufferedWithFence}; pub use misc::*; -use buffered_with_fence::{MaybeFence, Fenced, TryBufferedWithFence}; pub trait RwTryStreamExt: TryStream { /// Similar to [`TryStreamExt::try_buffered`](https://docs.rs/futures/latest/futures/stream/trait.TryStreamExt.html#method.try_buffered), but respect to fence. From fad25d44121bc15fda1883f7afa478d1a7cd5d1d Mon Sep 17 00:00:00 2001 From: TennyZhuang Date: Tue, 16 Jan 2024 07:42:04 +0000 Subject: [PATCH 3/3] Fix "cargo-hakari" --- Cargo.lock | 1 + src/workspace-hack/Cargo.toml | 1 + 2 files changed, 2 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 79d9374fec73e..1262bfb13bc87 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13622,6 +13622,7 @@ dependencies = [ "futures", "futures-channel", "futures-core", + "futures-executor", "futures-io", "futures-sink", "futures-task", diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 6b5996fe81d56..fbd86300c73b1 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -52,6 +52,7 @@ frunk_core = { version = "0.4", default-features = false, features = ["std"] } futures = { version = "0.3" } futures-channel = { version = "0.3", features = ["sink"] } futures-core = { version = "0.3" } +futures-executor = { version = "0.3" } futures-io = { version = "0.3" } futures-sink = { version = "0.3" } futures-task = { version = "0.3" }