Skip to content

Commit

Permalink
refactor(utils): make rw_futures_util a standalone crate (#14595)
Browse files Browse the repository at this point in the history
Signed-off-by: TennyZhuang <[email protected]>
Co-authored-by: TennyZhuang <[email protected]>
  • Loading branch information
2 people authored and Little-Wallace committed Jan 20, 2024
1 parent 467e3db commit 2bb2133
Show file tree
Hide file tree
Showing 30 changed files with 157 additions and 70 deletions.
19 changes: 19 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ members = [
"src/tests/sqlsmith",
"src/tests/state_cleaning_test",
"src/utils/delta_btree_map",
"src/utils/futures_util",
"src/utils/local_stats_alloc",
"src/utils/pgwire",
"src/utils/runtime",
Expand Down Expand Up @@ -187,6 +188,7 @@ risingwave_udf = { path = "./src/expr/udf" }
risingwave_variables = { path = "./src/utils/variables" }
risingwave_java_binding = { path = "./src/java_binding" }
risingwave_jni_core = { path = "src/jni_core" }
rw_futures_util = { path = "src/utils/futures_util" }
tokio-util = "0.7"

[workspace.lints.rust]
Expand Down
1 change: 1 addition & 0 deletions src/batch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ risingwave_pb = { workspace = true }
risingwave_rpc_client = { workspace = true }
risingwave_source = { workspace = true }
risingwave_storage = { workspace = true }
rw_futures_util = { workspace = true }
scopeguard = "1"
serde_json = "1"
thiserror = "1"
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/generic_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ use itertools::Itertools;
use risingwave_common::array::DataChunk;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::select_all;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::PbExchangeSource;
use risingwave_pb::plan_common::Field as NodeField;
use risingwave_rpc_client::ComputeClientPoolRef;
use rw_futures_util::select_all;

use crate::error::{BatchError, Result};
use crate::exchange_source::ExchangeSourceImpl;
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use risingwave_common::catalog::{ColumnId, Schema};
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::{DataType, Datum};
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::select_all;
use risingwave_common::util::value_encoding::deserialize_datum;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::{scan_range, PbScanRange};
Expand All @@ -34,6 +33,7 @@ use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::table::{collect_data_chunk, TableDistribution};
use risingwave_storage::{dispatch_state_store, StateStore};
use rw_futures_util::select_all;

use crate::error::{BatchError, Result};
use crate::executor::{
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::DataChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::util::select_all;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use rw_futures_util::select_all;

use crate::error::{BatchError, Result};
use crate::executor::{
Expand Down
1 change: 1 addition & 0 deletions src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ risingwave_common_proc_macro = { path = "./proc_macro" }
risingwave_error = { workspace = true }
risingwave_pb = { workspace = true }
rust_decimal = { version = "1", features = ["db-postgres", "maths"] }
rw_futures_util = { workspace = true }
ryu = "1.0"
serde = { version = "1", features = ["derive"] }
serde_bytes = "0.11"
Expand Down
5 changes: 0 additions & 5 deletions src/common/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ pub mod compress;
pub mod deployment;
pub mod env_var;
pub mod epoch;
mod future_utils;
pub mod hash_util;
pub mod iter_util;
pub mod memcmp_encoding;
Expand All @@ -42,9 +41,5 @@ pub mod tracing;
pub mod value_encoding;
pub mod worker_util;

pub use future_utils::{
await_future_with_monitor_error_stream, drop_either_future, pending_on_none, select_all,
RwFutureExt, RwTryStreamExt,
};
#[macro_use]
pub mod match_util;
1 change: 1 addition & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ risingwave_jni_core = { workspace = true }
risingwave_pb = { workspace = true }
risingwave_rpc_client = { workspace = true }
rust_decimal = "1"
rw_futures_util = { workspace = true }
serde = { version = "1", features = ["derive", "rc"] }
serde_derive = "1"
serde_json = "1"
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::{ColumnDesc, ColumnId};
use risingwave_common::error::anyhow_error;
use risingwave_common::types::DataType;
use risingwave_common::util::drop_either_future;
use risingwave_jni_core::jvm_runtime::JVM;
use risingwave_jni_core::{
call_static_method, gen_class_name, JniReceiverType, JniSenderType, JniSinkWriterStreamRequest,
Expand All @@ -49,6 +48,7 @@ use risingwave_rpc_client::{
BidiStreamReceiver, BidiStreamSender, SinkCoordinatorStreamHandle, SinkWriterStreamHandle,
DEFAULT_BUFFER_SIZE,
};
use rw_futures_util::drop_either_future;
use tokio::sync::mpsc;
use tokio::sync::mpsc::{unbounded_channel, Receiver, Sender};
use tokio::task::spawn_blocking;
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use futures::future::{select, Either};
use futures::TryFuture;
use risingwave_common::array::StreamChunk;
use risingwave_common::buffer::Bitmap;
use risingwave_common::util::drop_either_future;
use rw_futures_util::drop_either_future;

use crate::sink::encoder::SerTo;
use crate::sink::formatter::SinkFormatter;
Expand Down
1 change: 1 addition & 0 deletions src/frontend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ risingwave_sqlparser = { workspace = true }
risingwave_storage = { workspace = true }
risingwave_udf = { workspace = true }
risingwave_variables = { workspace = true }
rw_futures_util = { workspace = true }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
sha2 = "0.10.7"
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/scheduler/distributed/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use risingwave_common::array::DataChunk;
use risingwave_common::hash::ParallelUnitMapping;
use risingwave_common::util::addr::HostAddr;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::select_all;
use risingwave_connector::source::SplitMetaData;
use risingwave_expr::expr_context::expr_context_scope;
use risingwave_pb::batch_plan::plan_node::NodeBody;
Expand All @@ -45,6 +44,7 @@ use risingwave_pb::common::{BatchQueryEpoch, HostAddress, WorkerNode};
use risingwave_pb::plan_common::ExprContext;
use risingwave_pb::task_service::{CancelTaskRequest, TaskInfoResponse};
use risingwave_rpc_client::ComputeClientPoolRef;
use rw_futures_util::select_all;
use thiserror_ext::AsReport;
use tokio::spawn;
use tokio::sync::mpsc::{Receiver, Sender};
Expand Down
1 change: 1 addition & 0 deletions src/jni_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ risingwave_hummock_sdk = { workspace = true }
risingwave_object_store = { workspace = true }
risingwave_pb = { workspace = true }
risingwave_storage = { workspace = true }
rw_futures_util = { workspace = true }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1"
Expand Down
2 changes: 1 addition & 1 deletion src/jni_core/src/hummock_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use risingwave_common::catalog::ColumnDesc;
use risingwave_common::config::ObjectStoreConfig;
use risingwave_common::hash::VirtualNode;
use risingwave_common::row::OwnedRow;
use risingwave_common::util::select_all;
use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde, ValueRowDeserializer};
use risingwave_hummock_sdk::key::{prefixed_range_with_vnode, TableKeyRange};
Expand All @@ -39,6 +38,7 @@ use risingwave_storage::hummock::{
use risingwave_storage::monitor::HummockStateStoreMetrics;
use risingwave_storage::row_serde::value_serde::ValueRowSerdeNew;
use risingwave_storage::store::{ReadOptions, StateStoreReadIterStream, StreamTypeOfIter};
use rw_futures_util::select_all;
use tokio::sync::mpsc::unbounded_channel;

type SelectAllIterStream = impl StateStoreReadIterStream + Unpin;
Expand Down
1 change: 1 addition & 0 deletions src/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ risingwave_object_store = { workspace = true }
risingwave_pb = { workspace = true }
risingwave_rpc_client = { workspace = true }
risingwave_sqlparser = { workspace = true }
rw_futures_util = { workspace = true }
scopeguard = "1.2.0"
sea-orm = { version = "0.12.0", features = [
"sqlx-mysql",
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::util::pending_on_none;
use risingwave_common::util::tracing::TracingContext;
use risingwave_pb::stream_plan::{Barrier, BarrierMutation};
use risingwave_pb::stream_service::{BarrierCompleteRequest, InjectBarrierRequest};
use risingwave_rpc_client::StreamClientPoolRef;
use rw_futures_util::pending_on_none;
use tokio::sync::oneshot;
use uuid::Uuid;

Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use risingwave_common::config::default::compaction_config;
use risingwave_common::config::ObjectStoreConfig;
use risingwave_common::monitor::rwlock::MonitoredRwLock;
use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH};
use risingwave_common::util::{pending_on_none, select_all};
use risingwave_hummock_sdk::compact::{compact_task_to_string, statistics_compact_task};
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
build_version_delta_after_version, get_compaction_group_ids,
Expand Down Expand Up @@ -64,6 +63,7 @@ use risingwave_pb::hummock::{
PbCompactionGroupInfo, SstableInfo, SubscribeCompactionEventRequest, TableOption,
};
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use rw_futures_util::{pending_on_none, select_all};
use thiserror_ext::AsReport;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot::Sender;
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/manager/sink_coordination/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ use futures::future::{select, BoxFuture, Either};
use futures::stream::FuturesUnordered;
use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
use risingwave_common::buffer::Bitmap;
use risingwave_common::util::pending_on_none;
use risingwave_connector::sink::catalog::SinkId;
use risingwave_connector::sink::SinkParam;
use risingwave_pb::connector_service::coordinate_request::Msg;
use risingwave_pb::connector_service::{coordinate_request, CoordinateRequest, CoordinateResponse};
use rw_futures_util::pending_on_none;
use tokio::sync::mpsc;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot::{channel, Receiver, Sender};
Expand Down
1 change: 1 addition & 0 deletions src/rpc_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ risingwave_common = { workspace = true }
risingwave_error = { workspace = true }
risingwave_hummock_sdk = { workspace = true }
risingwave_pb = { workspace = true }
rw_futures_util = { workspace = true }
static_assertions = "1"
thiserror = "1"
thiserror-ext = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion src/rpc_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub use compute_client::{ComputeClient, ComputeClientPool, ComputeClientPoolRef}
pub use connector_client::{ConnectorClient, SinkCoordinatorStreamHandle, SinkWriterStreamHandle};
pub use hummock_meta_client::{CompactionEventItem, HummockMetaClient};
pub use meta_client::{MetaClient, SinkCoordinationRpcClient};
use risingwave_common::util::await_future_with_monitor_error_stream;
use rw_futures_util::await_future_with_monitor_error_stream;
pub use sink_coordinate_client::CoordinatorStreamHandle;
pub use stream_client::{StreamClient, StreamClientPool, StreamClientPoolRef};

Expand Down
1 change: 1 addition & 0 deletions src/source/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ rand = "0.8"
risingwave_common = { workspace = true }
risingwave_connector = { workspace = true }
risingwave_pb = { workspace = true }
rw_futures_util = { workspace = true }
tokio = { version = "0.2", package = "madsim-tokio", features = ["rt", "rt-multi-thread", "sync", "macros", "time", "signal", "fs"] }
tracing = { version = "0.1" }

Expand Down
2 changes: 1 addition & 1 deletion src/source/src/connector_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use risingwave_common::bail;
use risingwave_common::catalog::ColumnId;
use risingwave_common::error::ErrorCode::ConnectorError;
use risingwave_common::error::{Result, RwError};
use risingwave_common::util::select_all;
use risingwave_connector::dispatch_source_prop;
use risingwave_connector::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig};
use risingwave_connector::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator;
Expand All @@ -37,6 +36,7 @@ use risingwave_connector::source::{
create_split_reader, BoxSourceWithStateStream, BoxTryStream, Column, ConnectorProperties,
ConnectorState, FsFilterCtrlCtx, SourceColumnDesc, SourceContext, SplitReader,
};
use rw_futures_util::select_all;
use tokio::time;
use tokio::time::Duration;

Expand Down
1 change: 1 addition & 0 deletions src/stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ risingwave_pb = { workspace = true }
risingwave_rpc_client = { workspace = true }
risingwave_source = { workspace = true }
risingwave_storage = { workspace = true }
rw_futures_util = { workspace = true }
serde_json = "1"
smallvec = "1"
static_assertions = "1"
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use risingwave_common::catalog::Schema;
use risingwave_common::row::{Row, RowExt};
use risingwave_common::types::ToOwnedDatum;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::{RwFutureExt, RwTryStreamExt};
use risingwave_expr::expr::NonStrictExpression;
use rw_futures_util::{RwFutureExt, RwTryStreamExt};

use super::*;

Expand Down
22 changes: 22 additions & 0 deletions src/utils/futures_util/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
name = "rw_futures_util"
edition = { workspace = true }
license = { workspace = true }
repository = { workspace = true }
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[package.metadata.cargo-machete]
ignored = ["workspace-hack"]

[package.metadata.cargo-udeps.ignore]
normal = ["workspace-hack"]

[dependencies]
futures = "0.3"
pin-project-lite = "0.2"

[dev-dependencies]
tokio = { version = "1", features = ["full"] }

[lints]
workspace = true
Loading

0 comments on commit 2bb2133

Please sign in to comment.