From f73dbe4fc76dfb8204df8cb85a0a0e15dd489425 Mon Sep 17 00:00:00 2001 From: ZENOTME <43447882+ZENOTME@users.noreply.github.com> Date: Tue, 16 Jan 2024 15:14:52 +0800 Subject: [PATCH 1/5] fix: update icelake (#14588) Co-authored-by: ZENOTME --- Cargo.lock | 6 +++--- Cargo.toml | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fb63420db1f9a..01792a8b108d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -198,7 +198,7 @@ dependencies = [ [[package]] name = "apache-avro" version = "0.17.0" -source = "git+https://github.com/icelake-io/avro.git?rev=4866a4ad0ed5d6af7160c9b52af898ab6d0551f9#4866a4ad0ed5d6af7160c9b52af898ab6d0551f9" +source = "git+https://github.com/icelake-io/avro.git?branch=icelake-dev#4b828e9283e7248fd3ca42f5b590c2160b201785" dependencies = [ "apache-avro-derive", "bigdecimal 0.4.2", @@ -221,7 +221,7 @@ dependencies = [ [[package]] name = "apache-avro-derive" version = "0.17.0" -source = "git+https://github.com/icelake-io/avro.git?rev=4866a4ad0ed5d6af7160c9b52af898ab6d0551f9#4866a4ad0ed5d6af7160c9b52af898ab6d0551f9" +source = "git+https://github.com/icelake-io/avro.git?branch=icelake-dev#4b828e9283e7248fd3ca42f5b590c2160b201785" dependencies = [ "darling 0.20.3", "proc-macro2", @@ -5051,7 +5051,7 @@ dependencies = [ [[package]] name = "icelake" version = "0.0.10" -source = "git+https://github.com/icelake-io/icelake?rev=b4f4ca3c6d29092bd331925ead0bcceaa38bdd57#b4f4ca3c6d29092bd331925ead0bcceaa38bdd57" +source = "git+https://github.com/icelake-io/icelake?rev=32c0bbf242f5c47b1e743f10577012fe7436c770#32c0bbf242f5c47b1e743f10577012fe7436c770" dependencies = [ "anyhow", "apache-avro 0.17.0", diff --git a/Cargo.toml b/Cargo.toml index d3fec07a9c3d2..555e6a7d2a1e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -120,7 +120,7 @@ tonic = { package = "madsim-tonic", version = "0.4.1" } tonic-build = { package = "madsim-tonic-build", version = "0.4.2" } otlp-embedded = { git = "https://github.com/risingwavelabs/otlp-embedded", rev = "58c1f003484449d7c6dd693b348bf19dd44889cb" } prost = { version = "0.12" } -icelake = { git = "https://github.com/icelake-io/icelake", rev = "b4f4ca3c6d29092bd331925ead0bcceaa38bdd57", features = [ +icelake = { git = "https://github.com/icelake-io/icelake", rev = "32c0bbf242f5c47b1e743f10577012fe7436c770", features = [ "prometheus", ] } arrow-array = "49" From 1a70c3a5f32f2746333b44e682a735e11b52b6df Mon Sep 17 00:00:00 2001 From: Bohan Zhang Date: Tue, 16 Jan 2024 15:22:19 +0800 Subject: [PATCH 2/5] feat: pre-release pubsub (#14531) --- e2e_test/source/basic/pubsub.slt | 22 ++++--------------- .../source/google_pubsub/enumerator/client.rs | 9 ++------ src/connector/src/source/google_pubsub/mod.rs | 13 +++-------- src/connector/with_options_source.yaml | 9 +++----- 4 files changed, 12 insertions(+), 41 deletions(-) diff --git a/e2e_test/source/basic/pubsub.slt b/e2e_test/source/basic/pubsub.slt index e5d9911405797..b245d9b2aea89 100644 --- a/e2e_test/source/basic/pubsub.slt +++ b/e2e_test/source/basic/pubsub.slt @@ -2,16 +2,14 @@ statement error CREATE TABLE s1 (v1 int, v2 varchar) WITH ( pubsub.subscription = 'test-subscription-1', - pubsub.emulator_host = 'localhost:5981', - pubsub.split_count = 3 + pubsub.emulator_host = 'invalid_host:5981' ) FORMAT PLAIN ENCODE JSON; statement ok CREATE TABLE s1 (v1 int, v2 varchar) WITH ( connector = 'google_pubsub', pubsub.subscription = 'test-subscription-1', - pubsub.emulator_host = 'localhost:5980', - pubsub.split_count = 3 + pubsub.emulator_host = 'localhost:5980' ) FORMAT PLAIN ENCODE JSON; statement ok @@ -25,25 +23,14 @@ statement error CREATE TABLE s2 (v1 int, v2 varchar) WITH ( connector = 'google_pubsub', pubsub.subscription = 'test-subscription-not-2', - pubsub.emulator_host = 'localhost:5980', - pubsub.split_count = 3 + pubsub.emulator_host = 'localhost:5980' ) FORMAT PLAIN ENCODE JSON; statement ok CREATE TABLE s2 (v1 int, v2 varchar) WITH ( connector = 'google_pubsub', pubsub.subscription = 'test-subscription-2', - pubsub.emulator_host = 'localhost:5980', - pubsub.split_count = 3 -) FORMAT PLAIN ENCODE JSON; - -# fail with invalid split count -statement error -CREATE TABLE s3 (v1 int, v2 varchar) WITH ( - connector = 'google_pubsub', - pubsub.subscription = 'test-subscription-3', - pubsub.emulator_host = 'localhost:5980', - pubsub.split_count = 0 + pubsub.emulator_host = 'localhost:5980' ) FORMAT PLAIN ENCODE JSON; # fail if both start_offset and start_snapshot are provided @@ -52,7 +39,6 @@ CREATE TABLE s3 (v1 int, v2 varchar) WITH ( connector = 'google_pubsub', pubsub.subscription = 'test-subscription-3', pubsub.emulator_host = 'localhost:5980', - pubsub.split_count = 2, pubsub.start_offset = "121212", pubsub.start_snapshot = "snapshot-that-doesnt-exist" ) FORMAT PLAIN ENCODE JSON; diff --git a/src/connector/src/source/google_pubsub/enumerator/client.rs b/src/connector/src/source/google_pubsub/enumerator/client.rs index e957bac8a641d..01809a3c773b0 100644 --- a/src/connector/src/source/google_pubsub/enumerator/client.rs +++ b/src/connector/src/source/google_pubsub/enumerator/client.rs @@ -37,13 +37,8 @@ impl SplitEnumerator for PubsubSplitEnumerator { properties: Self::Properties, _context: SourceEnumeratorContextRef, ) -> anyhow::Result { - let split_count = properties.split_count; let subscription = properties.subscription.to_owned(); - if split_count < 1 { - bail!("split_count must be >= 1") - } - if properties.credentials.is_none() && properties.emulator_host.is_none() { bail!("credentials must be set if not using the pubsub emulator") } @@ -87,7 +82,7 @@ impl SplitEnumerator for PubsubSplitEnumerator { } (None, Some(snapshot)) => Some(SeekTo::Snapshot(snapshot)), (Some(_), Some(_)) => { - bail!("specify atmost one of start_offset or start_snapshot") + bail!("specify at most one of start_offset or start_snapshot") } }; @@ -99,7 +94,7 @@ impl SplitEnumerator for PubsubSplitEnumerator { Ok(Self { subscription, - split_count, + split_count: 1, }) } diff --git a/src/connector/src/source/google_pubsub/mod.rs b/src/connector/src/source/google_pubsub/mod.rs index 18fd11b1e3ba0..aeec1accd820b 100644 --- a/src/connector/src/source/google_pubsub/mod.rs +++ b/src/connector/src/source/google_pubsub/mod.rs @@ -21,7 +21,6 @@ pub mod source; pub mod split; pub use enumerator::*; -use serde_with::{serde_as, DisplayFromStr}; pub use source::*; pub use split::*; use with_options::WithOptions; @@ -30,13 +29,8 @@ use crate::source::SourceProperties; pub const GOOGLE_PUBSUB_CONNECTOR: &str = "google_pubsub"; -#[serde_as] #[derive(Clone, Debug, Deserialize, WithOptions)] pub struct PubsubProperties { - #[serde_as(as = "DisplayFromStr")] - #[serde(rename = "pubsub.split_count")] - pub split_count: u32, - /// pubsub subscription to consume messages from /// The subscription should be configured with the `retain-on-ack` property to enable /// message recovery within risingwave. @@ -48,19 +42,19 @@ pub struct PubsubProperties { #[serde(rename = "pubsub.emulator_host")] pub emulator_host: Option, - /// credentials JSON object encoded with base64 + /// `credentials` is a JSON string containing the service account credentials. /// See the [service-account credentials guide](https://developers.google.com/workspace/guides/create-credentials#create_credentials_for_a_service_account). /// The service account must have the `pubsub.subscriber` [role](https://cloud.google.com/pubsub/docs/access-control#roles). #[serde(rename = "pubsub.credentials")] pub credentials: Option, - /// `start_offset` is a numeric timestamp, ideallly the publish timestamp of a message + /// `start_offset` is a numeric timestamp, ideally the publish timestamp of a message /// in the subscription. If present, the connector will attempt to seek the subscription /// to the timestamp and start consuming from there. Note that the seek operation is /// subject to limitations around the message retention policy of the subscription. See /// [Seeking to a timestamp](https://cloud.google.com/pubsub/docs/replay-overview#seeking_to_a_timestamp) for /// more details. - #[serde(rename = "pubsub.start_offset")] + #[serde(rename = "pubsub.start_offset.nanos")] pub start_offset: Option, /// `start_snapshot` is a named pub/sub snapshot. If present, the connector will first seek @@ -127,7 +121,6 @@ mod tests { let default_properties = PubsubProperties { credentials: None, emulator_host: None, - split_count: 1, start_offset: None, start_snapshot: None, subscription: String::from("test-subscription"), diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index 187780cd23826..84a125813733e 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -471,9 +471,6 @@ PosixFsProperties: default: Default::default PubsubProperties: fields: - - name: pubsub.split_count - field_type: u32 - required: true - name: pubsub.subscription field_type: String comments: pubsub subscription to consume messages from The subscription should be configured with the `retain-on-ack` property to enable message recovery within risingwave. @@ -484,11 +481,11 @@ PubsubProperties: required: false - name: pubsub.credentials field_type: String - comments: credentials JSON object encoded with base64 See the [service-account credentials guide](https://developers.google.com/workspace/guides/create-credentials#create_credentials_for_a_service_account). The service account must have the `pubsub.subscriber` [role](https://cloud.google.com/pubsub/docs/access-control#roles). + comments: '`credentials` is a JSON string containing the service account credentials. See the [service-account credentials guide](https://developers.google.com/workspace/guides/create-credentials#create_credentials_for_a_service_account). The service account must have the `pubsub.subscriber` [role](https://cloud.google.com/pubsub/docs/access-control#roles).' required: false - - name: pubsub.start_offset + - name: pubsub.start_offset.nanos field_type: String - comments: '`start_offset` is a numeric timestamp, ideallly the publish timestamp of a message in the subscription. If present, the connector will attempt to seek the subscription to the timestamp and start consuming from there. Note that the seek operation is subject to limitations around the message retention policy of the subscription. See [Seeking to a timestamp](https://cloud.google.com/pubsub/docs/replay-overview#seeking_to_a_timestamp) for more details.' + comments: '`start_offset` is a numeric timestamp, ideally the publish timestamp of a message in the subscription. If present, the connector will attempt to seek the subscription to the timestamp and start consuming from there. Note that the seek operation is subject to limitations around the message retention policy of the subscription. See [Seeking to a timestamp](https://cloud.google.com/pubsub/docs/replay-overview#seeking_to_a_timestamp) for more details.' required: false - name: pubsub.start_snapshot field_type: String From 222bbd1800359c54dd429b511554d1642614ed8d Mon Sep 17 00:00:00 2001 From: William Wen <44139337+wenym1@users.noreply.github.com> Date: Tue, 16 Jan 2024 15:50:50 +0800 Subject: [PATCH 3/5] feat(streaming): query global max watermark via storage table (#14591) --- src/common/src/catalog/physical_table.rs | 27 ++++ src/common/src/hash/consistent_hash/vnode.rs | 6 +- .../src/table/batch_table/storage_table.rs | 4 + src/storage/src/table/mod.rs | 13 +- src/stream/src/executor/watermark_filter.rs | 146 +++++++++++++----- src/stream/src/from_proto/watermark_filter.rs | 16 ++ 6 files changed, 167 insertions(+), 45 deletions(-) diff --git a/src/common/src/catalog/physical_table.rs b/src/common/src/catalog/physical_table.rs index 48ead96874e75..92c71d37e2aa1 100644 --- a/src/common/src/catalog/physical_table.rs +++ b/src/common/src/catalog/physical_table.rs @@ -17,10 +17,13 @@ use std::collections::HashMap; use anyhow::anyhow; use fixedbitset::FixedBitSet; use itertools::Itertools; +use risingwave_pb::catalog::Table; use risingwave_pb::common::PbColumnOrder; use risingwave_pb::plan_common::StorageTableDesc; use super::{ColumnDesc, ColumnId, TableId}; +use crate::catalog::hummock::TABLE_OPTION_DUMMY_RETENTION_SECOND; +use crate::catalog::TableOption; use crate::util::sort_util::ColumnOrder; /// Includes necessary information for compute node to access data of the table. @@ -138,4 +141,28 @@ impl TableDesc { }); id_to_idx } + + pub fn from_pb_table(table: &Table) -> Self { + let table_options = TableOption::build_table_option(&table.properties); + Self { + table_id: TableId::new(table.id), + pk: table.pk.iter().map(ColumnOrder::from_protobuf).collect(), + columns: table + .columns + .iter() + .map(|col| ColumnDesc::from(col.column_desc.as_ref().unwrap())) + .collect(), + distribution_key: table.distribution_key.iter().map(|i| *i as _).collect(), + stream_key: table.stream_key.iter().map(|i| *i as _).collect(), + vnode_col_index: table.vnode_col_index.map(|i| i as _), + append_only: table.append_only, + retention_seconds: table_options + .retention_seconds + .unwrap_or(TABLE_OPTION_DUMMY_RETENTION_SECOND), + value_indices: table.value_indices.iter().map(|i| *i as _).collect(), + read_prefix_len_hint: table.read_prefix_len_hint as _, + watermark_columns: table.watermark_indices.iter().map(|i| *i as _).collect(), + versioned: table.version.is_some(), + } + } } diff --git a/src/common/src/hash/consistent_hash/vnode.rs b/src/common/src/hash/consistent_hash/vnode.rs index 19b8975f775af..9bc49a9372ac0 100644 --- a/src/common/src/hash/consistent_hash/vnode.rs +++ b/src/common/src/hash/consistent_hash/vnode.rs @@ -18,7 +18,7 @@ use parse_display::Display; use crate::array::{Array, ArrayImpl, DataChunk}; use crate::hash::Crc32HashCode; use crate::row::{Row, RowExt}; -use crate::types::{DataType, DatumRef, ScalarRefImpl}; +use crate::types::{DataType, Datum, DatumRef, ScalarImpl, ScalarRefImpl}; use crate::util::hash_util::Crc32FastBuilder; use crate::util::row_id::extract_vnode_id_from_row_id; @@ -96,6 +96,10 @@ impl VirtualNode { self.0 as _ } + pub const fn to_datum(self) -> Datum { + Some(ScalarImpl::Int16(self.to_scalar())) + } + /// Creates a virtual node from the given big-endian bytes representation. pub const fn from_be_bytes(bytes: [u8; Self::SIZE]) -> Self { let inner = VirtualNodeInner::from_be_bytes(bytes); diff --git a/src/storage/src/table/batch_table/storage_table.rs b/src/storage/src/table/batch_table/storage_table.rs index 922f0c22060e2..0c55bf79ffa4b 100644 --- a/src/storage/src/table/batch_table/storage_table.rs +++ b/src/storage/src/table/batch_table/storage_table.rs @@ -335,6 +335,10 @@ impl StorageTableInner { pub fn table_id(&self) -> TableId { self.table_id } + + pub fn vnodes(&self) -> &Arc { + self.distribution.vnodes() + } } /// Point get impl StorageTableInner { diff --git a/src/storage/src/table/mod.rs b/src/storage/src/table/mod.rs index f2822ca88022f..105a2f1f18604 100644 --- a/src/storage/src/table/mod.rs +++ b/src/storage/src/table/mod.rs @@ -115,11 +115,15 @@ impl TableDistribution { Self::singleton_vnode_bitmap_ref().clone() } - pub fn all_vnodes() -> Arc { + pub fn all_vnodes_ref() -> &'static Arc { /// A bitmap that all vnodes are set. static ALL_VNODES: LazyLock> = LazyLock::new(|| Bitmap::ones(VirtualNode::COUNT).into()); - ALL_VNODES.clone() + &ALL_VNODES + } + + pub fn all_vnodes() -> Arc { + Self::all_vnodes_ref().clone() } /// Distribution that accesses all vnodes, mainly used for tests. @@ -272,10 +276,9 @@ pub fn compute_vnode(row: impl Row, indices: &[usize], vnodes: &Bitmap) -> Virtu vnode } -pub fn get_vnode_from_row(row: impl Row, index: usize, _vnodes: &Bitmap) -> VirtualNode { +pub fn get_vnode_from_row(row: impl Row, index: usize, vnodes: &Bitmap) -> VirtualNode { let vnode = VirtualNode::from_datum(row.datum_at(index)); - // TODO: enable this check when `WatermarkFilterExecutor` use `StorageTable` to read global max watermark - // check_vnode_is_set(vnode, vnodes); + check_vnode_is_set(vnode, vnodes); tracing::debug!(target: "events::storage::storage_table", "get vnode from row: {:?} vnode column index {:?} => {}", row, index, vnode); diff --git a/src/stream/src/executor/watermark_filter.rs b/src/stream/src/executor/watermark_filter.rs index 454f69582981b..bca8f6f872dda 100644 --- a/src/stream/src/executor/watermark_filter.rs +++ b/src/stream/src/executor/watermark_filter.rs @@ -13,12 +13,13 @@ // limitations under the License. use std::cmp; +use std::ops::Deref; +use std::sync::Arc; -use futures::future::join_all; +use futures::future::{try_join, try_join_all}; use futures::StreamExt; use futures_async_stream::try_stream; -use itertools::Itertools; -use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; +use risingwave_common::hash::VnodeBitmapExt; use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::types::{DataType, DefaultOrd, ScalarImpl}; use risingwave_common::{bail, row}; @@ -27,7 +28,10 @@ use risingwave_expr::expr::{ NonStrictExpression, }; use risingwave_expr::Result as ExprResult; +use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_pb::expr::expr_node::Type; +use risingwave_storage::table::batch_table::storage_table::StorageTable; +use risingwave_storage::table::TableDistribution; use risingwave_storage::StateStore; use super::error::StreamExecutorError; @@ -52,6 +56,7 @@ pub struct WatermarkFilterExecutor { /// The column we should generate watermark and filter on. event_time_col_idx: usize, table: StateTable, + global_watermark_table: StorageTable, } impl WatermarkFilterExecutor { @@ -62,6 +67,7 @@ impl WatermarkFilterExecutor { watermark_expr: NonStrictExpression, event_time_col_idx: usize, table: StateTable, + global_watermark_table: StorageTable, ) -> Self { Self { ctx, @@ -70,6 +76,7 @@ impl WatermarkFilterExecutor { watermark_expr, event_time_col_idx, table, + global_watermark_table, } } } @@ -106,6 +113,7 @@ impl WatermarkFilterExecutor { ctx, info, mut table, + mut global_watermark_table, } = *self; let eval_error_report = ActorEvalErrorReport { @@ -126,7 +134,8 @@ impl WatermarkFilterExecutor { yield Message::Barrier(first_barrier); // Initiate and yield the first watermark. - let mut current_watermark = Self::get_global_max_watermark(&table).await?; + let mut current_watermark = + Self::get_global_max_watermark(&table, &global_watermark_table).await?; let mut last_checkpoint_watermark = None; @@ -231,12 +240,19 @@ impl WatermarkFilterExecutor { Message::Barrier(barrier) => { // Update the vnode bitmap for state tables of all agg calls if asked. if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(ctx.id) { + let other_vnodes_bitmap = Arc::new( + (!(*vnode_bitmap).clone()) + & TableDistribution::all_vnodes_ref().deref(), + ); + let _ = global_watermark_table.update_vnode_bitmap(other_vnodes_bitmap); let (previous_vnode_bitmap, _cache_may_stale) = table.update_vnode_bitmap(vnode_bitmap.clone()); // Take the global max watermark when scaling happens. if previous_vnode_bitmap != vnode_bitmap { - current_watermark = Self::get_global_max_watermark(&table).await?; + current_watermark = + Self::get_global_max_watermark(&table, &global_watermark_table) + .await?; } } @@ -262,7 +278,8 @@ impl WatermarkFilterExecutor { if idle_input { // Align watermark let global_max_watermark = - Self::get_global_max_watermark(&table).await?; + Self::get_global_max_watermark(&table, &global_watermark_table) + .await?; current_watermark = if let Some(global_max_watermark) = global_max_watermark.clone() @@ -314,29 +331,45 @@ impl WatermarkFilterExecutor { /// If the returned if `Ok(None)`, it means there is no global max watermark. async fn get_global_max_watermark( table: &StateTable, + global_watermark_table: &StorageTable, ) -> StreamExecutorResult> { - let watermark_iter_futures = (0..VirtualNode::COUNT).map(|vnode| async move { - let pk = row::once(Some(ScalarImpl::Int16(vnode as _))); - let watermark_row: Option = table.get_row(pk).await?; - match watermark_row { - Some(row) => { - if row.len() == 1 { - Ok::<_, StreamExecutorError>(row[0].to_owned()) - } else { - bail!("The watermark row should only contains 1 datum"); - } + let epoch = table.epoch(); + let handle_watermark_row = |watermark_row: Option| match watermark_row { + Some(row) => { + if row.len() == 1 { + Ok::<_, StreamExecutorError>(row[0].to_owned()) + } else { + bail!("The watermark row should only contains 1 datum"); } - _ => Ok(None), } + _ => Ok(None), + }; + let global_watermark_iter_futures = + global_watermark_table + .vnodes() + .iter_vnodes() + .map(|vnode| async move { + let pk = row::once(vnode.to_datum()); + let watermark_row: Option = global_watermark_table + .get_row(pk, HummockReadEpoch::NoWait(epoch)) + .await?; + handle_watermark_row(watermark_row) + }); + let local_watermark_iter_futures = table.vnodes().iter_vnodes().map(|vnode| async move { + let pk = row::once(vnode.to_datum()); + let watermark_row: Option = table.get_row(pk).await?; + handle_watermark_row(watermark_row) }); - let watermarks: Vec<_> = join_all(watermark_iter_futures) - .await - .into_iter() - .try_collect()?; + let (global_watermarks, local_watermarks) = try_join( + try_join_all(global_watermark_iter_futures), + try_join_all(local_watermark_iter_futures), + ) + .await?; // Return the minimal value if the remote max watermark is Null. - let watermark = watermarks + let watermark = global_watermarks .into_iter() + .chain(local_watermarks.into_iter()) .flatten() .max_by(DefaultOrd::default_cmp); @@ -346,11 +379,15 @@ impl WatermarkFilterExecutor { #[cfg(test)] mod tests { + use itertools::Itertools; use risingwave_common::array::StreamChunk; - use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId}; + use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableDesc}; use risingwave_common::test_prelude::StreamChunkTestExt; use risingwave_common::types::Date; use risingwave_common::util::sort_util::OrderType; + use risingwave_pb::catalog::Table; + use risingwave_pb::common::ColumnOrder; + use risingwave_pb::plan_common::PbColumnCatalog; use risingwave_storage::memory::MemoryStateStore; use risingwave_storage::table::TableDistribution; @@ -368,24 +405,54 @@ mod tests { pk_indices: &[usize], val_indices: &[usize], table_id: u32, - ) -> StateTable { - let column_descs = data_types - .iter() - .enumerate() - .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone())) - .collect_vec(); + ) -> (StorageTable, StateTable) { + let table = Table { + id: table_id, + columns: data_types + .iter() + .enumerate() + .map(|(id, data_type)| PbColumnCatalog { + column_desc: Some( + ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()) + .to_protobuf(), + ), + is_hidden: false, + }) + .collect(), + pk: pk_indices + .iter() + .zip_eq(order_types.iter()) + .map(|(pk, order)| ColumnOrder { + column_index: *pk as _, + order_type: Some(order.to_protobuf()), + }) + .collect(), + distribution_key: vec![], + stream_key: vec![0], + append_only: false, + vnode_col_index: Some(0), + value_indices: val_indices.iter().map(|i| *i as _).collect(), + read_prefix_len_hint: 0, + ..Default::default() + }; // TODO: use consistent operations for watermark filter after we have upsert. - StateTable::new_with_distribution_inconsistent_op( - mem_state, - TableId::new(table_id), - column_descs, - order_types.to_vec(), - pk_indices.to_vec(), - TableDistribution::all(vec![0]), - Some(val_indices.to_vec()), + let state_table = StateTable::from_table_catalog_inconsistent_op( + &table, + mem_state.clone(), + Some(TableDistribution::all_vnodes()), ) - .await + .await; + + let desc = TableDesc::from_pb_table(&table).try_to_protobuf().unwrap(); + + let storage_table = StorageTable::new_partial( + mem_state, + val_indices.iter().map(|i| ColumnId::new(*i as _)).collect(), + Some(TableDistribution::all_vnodes()), + &desc, + ); + (storage_table, state_table) } async fn create_watermark_filter_executor( @@ -400,7 +467,7 @@ mod tests { let watermark_expr = build_from_pretty("(subtract:timestamp $1:timestamp 1day:interval)"); - let table = create_in_memory_state_table( + let (storage_table, table) = create_in_memory_state_table( mem_state, &[DataType::Int16, WATERMARK_TYPE], &[OrderType::ascending()], @@ -426,6 +493,7 @@ mod tests { watermark_expr, 1, table, + storage_table, ) .boxed(), tx, diff --git a/src/stream/src/from_proto/watermark_filter.rs b/src/stream/src/from_proto/watermark_filter.rs index ed44a90480a6d..44618e812fd82 100644 --- a/src/stream/src/from_proto/watermark_filter.rs +++ b/src/stream/src/from_proto/watermark_filter.rs @@ -12,10 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::ops::Deref; use std::sync::Arc; +use risingwave_common::catalog::{ColumnId, TableDesc}; use risingwave_expr::expr::build_non_strict_from_prost; use risingwave_pb::stream_plan::WatermarkFilterNode; +use risingwave_storage::table::batch_table::storage_table::StorageTable; +use risingwave_storage::table::TableDistribution; use super::*; use crate::common::table::state_table::StateTable; @@ -46,6 +50,17 @@ impl ExecutorBuilder for WatermarkFilterBuilder { // TODO: may use consistent op for watermark filter after we have upsert. let [table]: [_; 1] = node.get_tables().clone().try_into().unwrap(); + let desc = TableDesc::from_pb_table(&table).try_to_protobuf()?; + let column_ids = desc + .value_indices + .iter() + .map(|i| ColumnId::new(*i as _)) + .collect_vec(); + let other_vnodes = + Arc::new((!(*vnodes).clone()) & TableDistribution::all_vnodes_ref().deref()); + let global_watermark_table = + StorageTable::new_partial(store.clone(), column_ids, Some(other_vnodes), &desc); + let table = StateTable::from_table_catalog_inconsistent_op(&table, store, Some(vnodes)).await; @@ -56,6 +71,7 @@ impl ExecutorBuilder for WatermarkFilterBuilder { watermark_expr, event_time_col_idx, table, + global_watermark_table, ) .boxed()) } From 9ebf73443d198268fb845cf2e028ee208e6f68ec Mon Sep 17 00:00:00 2001 From: xfz <73645462+xuefengze@users.noreply.github.com> Date: Tue, 16 Jan 2024 16:39:27 +0800 Subject: [PATCH 4/5] test: add `upsert` test for `doris/starrocks-sink` integration test (#14587) --- integration_tests/doris-sink/README.md | 14 +++------- integration_tests/doris-sink/create_mv.sql | 8 ++++++ integration_tests/doris-sink/create_sink.sql | 15 ++++++++++- .../doris-sink/create_source.sql | 17 ++++++++++++ .../doris-sink/docker-compose.yml | 9 +++++++ .../doris-sink/doris_prepare.sql | 10 +++++++ integration_tests/doris-sink/sink_check.py | 26 ++++++++++++++++++- .../doris-sink/update_delete.sql | 5 ++++ .../doris-sink/upsert/create_mv.sql | 7 ----- .../doris-sink/upsert/create_sink.sql | 12 --------- .../doris-sink/upsert/create_table.sql | 10 ------- .../upsert/insert_update_delete.sql | 8 ------ integration_tests/starrocks-sink/README.md | 14 +++------- .../starrocks-sink/create_mv.sql | 8 ++++++ .../starrocks-sink/create_sink.sql | 17 +++++++++++- .../starrocks-sink/create_source.sql | 17 ++++++++++++ .../starrocks-sink/docker-compose.yml | 6 +++++ .../starrocks-sink/sink_check.py | 26 ++++++++++++++++++- .../starrocks-sink/starrocks_prepare.sql | 8 ++++++ .../starrocks-sink/update_delete.sql | 5 ++++ .../starrocks-sink/upsert/create_mv.sql | 7 ----- .../starrocks-sink/upsert/create_sink.sql | 14 ---------- .../starrocks-sink/upsert/create_table.sql | 10 ------- .../upsert/insert_update_delete.sql | 8 ------ 24 files changed, 181 insertions(+), 100 deletions(-) create mode 100644 integration_tests/doris-sink/update_delete.sql delete mode 100644 integration_tests/doris-sink/upsert/create_mv.sql delete mode 100644 integration_tests/doris-sink/upsert/create_sink.sql delete mode 100644 integration_tests/doris-sink/upsert/create_table.sql delete mode 100644 integration_tests/doris-sink/upsert/insert_update_delete.sql create mode 100644 integration_tests/starrocks-sink/update_delete.sql delete mode 100644 integration_tests/starrocks-sink/upsert/create_mv.sql delete mode 100644 integration_tests/starrocks-sink/upsert/create_sink.sql delete mode 100644 integration_tests/starrocks-sink/upsert/create_table.sql delete mode 100644 integration_tests/starrocks-sink/upsert/insert_update_delete.sql diff --git a/integration_tests/doris-sink/README.md b/integration_tests/doris-sink/README.md index 75baa2d2449f1..b62c2d2e3adcf 100644 --- a/integration_tests/doris-sink/README.md +++ b/integration_tests/doris-sink/README.md @@ -45,16 +45,10 @@ GRANT ALL ON *.* TO 'users'@'%'; 4. Execute the SQL queries in sequence: -- append-only sql: - - create_source.sql - - create_mv.sql - - create_sink.sql - -- upsert sql: - - upsert/create_table.sql - - upsert/create_mv.sql - - upsert/create_sink.sql - - upsert/insert_update_delete.sql +- create_source.sql +- create_mv.sql +- create_sink.sql +- update_delete.sql We only support `upsert` with doris' `UNIQUE KEY` diff --git a/integration_tests/doris-sink/create_mv.sql b/integration_tests/doris-sink/create_mv.sql index c367e6f2baa94..6e466703b0769 100644 --- a/integration_tests/doris-sink/create_mv.sql +++ b/integration_tests/doris-sink/create_mv.sql @@ -5,3 +5,11 @@ SELECT event_timestamp AT TIME ZONE 'Asia/Shanghai' as event_timestamp_local FROM user_behaviors; + +CREATE MATERIALIZED VIEW upsert_bhv_mv AS +SELECT + user_id, + target_id, + event_timestamp AT TIME ZONE 'Asia/Shanghai' as event_timestamp_local +FROM + upsert_user_behaviors; diff --git a/integration_tests/doris-sink/create_sink.sql b/integration_tests/doris-sink/create_sink.sql index fa0cfddf7bf16..7cd1ac24857e9 100644 --- a/integration_tests/doris-sink/create_sink.sql +++ b/integration_tests/doris-sink/create_sink.sql @@ -9,4 +9,17 @@ FROM doris.database = 'demo', doris.table='demo_bhv_table', force_append_only='true' -); \ No newline at end of file +); + +CREATE SINK upsert_doris_sink +FROM + upsert_bhv_mv WITH ( + connector = 'doris', + type = 'upsert', + doris.url = 'http://fe:8030', + doris.user = 'users', + doris.password = '123456', + doris.database = 'demo', + doris.table='upsert_table', + primary_key = 'user_id' +); diff --git a/integration_tests/doris-sink/create_source.sql b/integration_tests/doris-sink/create_source.sql index ed7c02341638a..0e42308511121 100644 --- a/integration_tests/doris-sink/create_source.sql +++ b/integration_tests/doris-sink/create_source.sql @@ -14,3 +14,20 @@ CREATE table user_behaviors ( fields.user_id.end = '1000', datagen.rows.per.second = '100' ) FORMAT PLAIN ENCODE JSON; + +CREATE table upsert_user_behaviors ( + user_id int, + target_id VARCHAR, + target_type VARCHAR, + event_timestamp TIMESTAMPTZ, + behavior_type VARCHAR, + parent_target_type VARCHAR, + parent_target_id VARCHAR, + PRIMARY KEY(user_id) +); + +INSERT INTO upsert_user_behaviors VALUES + (1,'1','1','2020-01-01T01:01:01Z','1','1','1'), + (2,'2','2','2020-01-01T01:01:02Z','2','2','2'), + (3,'3','3','2020-01-01T01:01:03Z','3','3','3'), + (4,'4','4','2020-01-01T01:01:04Z','4','4','4'); diff --git a/integration_tests/doris-sink/docker-compose.yml b/integration_tests/doris-sink/docker-compose.yml index 74fecbee2baab..fc7cfd751e989 100644 --- a/integration_tests/doris-sink/docker-compose.yml +++ b/integration_tests/doris-sink/docker-compose.yml @@ -74,6 +74,15 @@ services: networks: mynetwork: ipv4_address: 172.21.0.9 + postgres: + image: postgres:latest + command: tail -f /dev/null + volumes: + - "./update_delete.sql:/update_delete.sql" + restart: on-failure + networks: + mynetwork: + ipv4_address: 172.21.0.11 volumes: risingwave-standalone: external: false diff --git a/integration_tests/doris-sink/doris_prepare.sql b/integration_tests/doris-sink/doris_prepare.sql index c95e8ac3f9b32..b65e419999caf 100644 --- a/integration_tests/doris-sink/doris_prepare.sql +++ b/integration_tests/doris-sink/doris_prepare.sql @@ -11,5 +11,15 @@ PROPERTIES ( "replication_allocation" = "tag.location.default: 1" ); +CREATE table upsert_table( + user_id int, + target_id text, + event_timestamp_local datetime +) UNIQUE KEY(`user_id`) +DISTRIBUTED BY HASH(`user_id`) BUCKETS 1 +PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" +); + CREATE USER 'users'@'%' IDENTIFIED BY '123456'; GRANT ALL ON *.* TO 'users'@'%'; diff --git a/integration_tests/doris-sink/sink_check.py b/integration_tests/doris-sink/sink_check.py index 39109f4194fef..510cc867dcda4 100644 --- a/integration_tests/doris-sink/sink_check.py +++ b/integration_tests/doris-sink/sink_check.py @@ -1,7 +1,7 @@ import subprocess import sys -relations = ['demo.demo_bhv_table'] +relations = ['demo.demo_bhv_table', 'demo.upsert_table'] failed_cases = [] for rel in relations: @@ -18,6 +18,30 @@ if rows < 1: failed_cases.append(rel) +# update data +subprocess.run(["docker", "compose", "exec", "postgres", "bash", "-c", "psql -h risingwave-standalone -p 4566 -d dev -U root -f update_delete.sql"], check=True) + +# delete +sql = f"SELECT COUNT(*) FROM demo.upsert_table;" +command = f'mysql -uroot -P9030 -hfe -e "{sql}"' +output = subprocess.check_output( + ["docker", "compose", "exec", "mysql", "bash", "-c", command]) +rows = int(output.decode('utf-8').split('\n')[1]) +print(f"{rows} rows in demo.upsert_table") +if rows != 3: + print(f"rows expected 3, get {rows}") + failed_cases.append("delete demo.upsert_table") + +# update +sql = f"SELECT target_id FROM demo.upsert_table WHERE user_id = 3;" +command = f'mysql -uroot -P9030 -hfe -e "{sql}"' +output = subprocess.check_output( + ["docker", "compose", "exec", "mysql", "bash", "-c", command]) +id = int(output.decode('utf-8').split('\n')[1]) +if id != 30: + print(f"target_id expected 30, get {id}") + failed_cases.append("update demo.upsert_table") + if len(failed_cases) != 0: print(f"Data check failed for case {failed_cases}") sys.exit(1) diff --git a/integration_tests/doris-sink/update_delete.sql b/integration_tests/doris-sink/update_delete.sql new file mode 100644 index 0000000000000..adabd5163ef44 --- /dev/null +++ b/integration_tests/doris-sink/update_delete.sql @@ -0,0 +1,5 @@ +DELETE FROM upsert_user_behaviors WHERE user_id = 2; + +UPDATE upsert_user_behaviors SET target_id = 30 WHERE user_id = 3; + +FLUSH; diff --git a/integration_tests/doris-sink/upsert/create_mv.sql b/integration_tests/doris-sink/upsert/create_mv.sql deleted file mode 100644 index c367e6f2baa94..0000000000000 --- a/integration_tests/doris-sink/upsert/create_mv.sql +++ /dev/null @@ -1,7 +0,0 @@ -CREATE MATERIALIZED VIEW bhv_mv AS -SELECT - user_id, - target_id, - event_timestamp AT TIME ZONE 'Asia/Shanghai' as event_timestamp_local -FROM - user_behaviors; diff --git a/integration_tests/doris-sink/upsert/create_sink.sql b/integration_tests/doris-sink/upsert/create_sink.sql deleted file mode 100644 index e7bd5445ba557..0000000000000 --- a/integration_tests/doris-sink/upsert/create_sink.sql +++ /dev/null @@ -1,12 +0,0 @@ -CREATE SINK bhv_doris_sink -FROM - bhv_mv WITH ( - connector = 'doris', - type = 'upsert', - doris.url = 'http://fe:8030', - doris.user = 'users', - doris.password = '123456', - doris.database = 'demo', - doris.table='demo_bhv_table', - primary_key = 'user_id' -); \ No newline at end of file diff --git a/integration_tests/doris-sink/upsert/create_table.sql b/integration_tests/doris-sink/upsert/create_table.sql deleted file mode 100644 index c6cfa87eed3c8..0000000000000 --- a/integration_tests/doris-sink/upsert/create_table.sql +++ /dev/null @@ -1,10 +0,0 @@ -CREATE table user_behaviors ( - user_id int, - target_id VARCHAR, - target_type VARCHAR, - event_timestamp TIMESTAMPTZ, - behavior_type VARCHAR, - parent_target_type VARCHAR, - parent_target_id VARCHAR, - PRIMARY KEY(user_id) -); diff --git a/integration_tests/doris-sink/upsert/insert_update_delete.sql b/integration_tests/doris-sink/upsert/insert_update_delete.sql deleted file mode 100644 index f21353c161154..0000000000000 --- a/integration_tests/doris-sink/upsert/insert_update_delete.sql +++ /dev/null @@ -1,8 +0,0 @@ -INSERT INTO user_behaviors VALUES(1,'1','1','2020-01-01T01:01:01Z','1','1','1'), -(2,'2','2','2020-01-01T01:01:02Z','2','2','2'), -(3,'3','3','2020-01-01T01:01:03Z','3','3','3'), -(4,'4','4','2020-01-01T01:01:04Z','4','4','4'); - -DELETE FROM user_behaviors WHERE user_id = 2; - -UPDATE user_behaviors SET target_id = 30 WHERE user_id = 3; diff --git a/integration_tests/starrocks-sink/README.md b/integration_tests/starrocks-sink/README.md index 817ab57481e43..30cb79623d1e8 100644 --- a/integration_tests/starrocks-sink/README.md +++ b/integration_tests/starrocks-sink/README.md @@ -37,16 +37,10 @@ GRANT ALL ON *.* TO 'users'@'%'; 3. Execute the SQL queries in sequence: -- append-only sql: - - create_source.sql - - create_mv.sql - - create_sink.sql - -- upsert sql: - - upsert/create_table.sql - - upsert/create_mv.sql - - upsert/create_sink.sql - - upsert/insert_update_delete.sql +- create_source.sql +- create_mv.sql +- create_sink.sql +- update_delete.sql We only support `upsert` with starrocks' `PRIMARY KEY` diff --git a/integration_tests/starrocks-sink/create_mv.sql b/integration_tests/starrocks-sink/create_mv.sql index c367e6f2baa94..6e466703b0769 100644 --- a/integration_tests/starrocks-sink/create_mv.sql +++ b/integration_tests/starrocks-sink/create_mv.sql @@ -5,3 +5,11 @@ SELECT event_timestamp AT TIME ZONE 'Asia/Shanghai' as event_timestamp_local FROM user_behaviors; + +CREATE MATERIALIZED VIEW upsert_bhv_mv AS +SELECT + user_id, + target_id, + event_timestamp AT TIME ZONE 'Asia/Shanghai' as event_timestamp_local +FROM + upsert_user_behaviors; diff --git a/integration_tests/starrocks-sink/create_sink.sql b/integration_tests/starrocks-sink/create_sink.sql index 56d1b227512de..f2f5b5eac9653 100644 --- a/integration_tests/starrocks-sink/create_sink.sql +++ b/integration_tests/starrocks-sink/create_sink.sql @@ -11,4 +11,19 @@ FROM starrocks.database = 'demo', starrocks.table = 'demo_bhv_table', force_append_only='true' -); \ No newline at end of file +); + +CREATE SINK upsert_starrocks_sink +FROM + upsert_bhv_mv WITH ( + connector = 'starrocks', + type = 'upsert', + starrocks.host = 'starrocks-fe', + starrocks.mysqlport = '9030', + starrocks.httpport = '8030', + starrocks.user = 'users', + starrocks.password = '123456', + starrocks.database = 'demo', + starrocks.table = 'upsert_table', + primary_key = 'user_id' +); diff --git a/integration_tests/starrocks-sink/create_source.sql b/integration_tests/starrocks-sink/create_source.sql index ed7c02341638a..0e42308511121 100644 --- a/integration_tests/starrocks-sink/create_source.sql +++ b/integration_tests/starrocks-sink/create_source.sql @@ -14,3 +14,20 @@ CREATE table user_behaviors ( fields.user_id.end = '1000', datagen.rows.per.second = '100' ) FORMAT PLAIN ENCODE JSON; + +CREATE table upsert_user_behaviors ( + user_id int, + target_id VARCHAR, + target_type VARCHAR, + event_timestamp TIMESTAMPTZ, + behavior_type VARCHAR, + parent_target_type VARCHAR, + parent_target_id VARCHAR, + PRIMARY KEY(user_id) +); + +INSERT INTO upsert_user_behaviors VALUES + (1,'1','1','2020-01-01T01:01:01Z','1','1','1'), + (2,'2','2','2020-01-01T01:01:02Z','2','2','2'), + (3,'3','3','2020-01-01T01:01:03Z','3','3','3'), + (4,'4','4','2020-01-01T01:01:04Z','4','4','4'); diff --git a/integration_tests/starrocks-sink/docker-compose.yml b/integration_tests/starrocks-sink/docker-compose.yml index 41dabac20dc7f..4210206aa7705 100644 --- a/integration_tests/starrocks-sink/docker-compose.yml +++ b/integration_tests/starrocks-sink/docker-compose.yml @@ -52,6 +52,12 @@ services: extends: file: ../../docker/docker-compose.yml service: prometheus-0 + postgres: + image: postgres:latest + command: tail -f /dev/null + volumes: + - "./update_delete.sql:/update_delete.sql" + restart: on-failure volumes: risingwave-standalone: external: false diff --git a/integration_tests/starrocks-sink/sink_check.py b/integration_tests/starrocks-sink/sink_check.py index 699304854dc1f..7ab27e1e01cd1 100644 --- a/integration_tests/starrocks-sink/sink_check.py +++ b/integration_tests/starrocks-sink/sink_check.py @@ -1,7 +1,7 @@ import subprocess import sys -relations = ['demo.demo_bhv_table'] +relations = ['demo.demo_bhv_table', 'demo.upsert_table'] failed_cases = [] for rel in relations: @@ -18,6 +18,30 @@ if rows < 1: failed_cases.append(rel) +# update data +subprocess.run(["docker", "compose", "exec", "postgres", "bash", "-c", "psql -h risingwave-standalone -p 4566 -d dev -U root -f update_delete.sql"], check=True) + +# delete +sql = f"SELECT COUNT(*) FROM demo.upsert_table;" +command = f'mysql -uroot -P9030 -h127.0.0.1 -e "{sql}"' +output = subprocess.check_output( + ["docker", "compose", "exec", "starrocks-fe", "bash", "-c", command]) +rows = int(output.decode('utf-8').split('\n')[1]) +print(f"{rows} rows in demo.upsert_table") +if rows != 3: + print(f"rows expected 3, get {rows}") + failed_cases.append("delete demo.upsert_table") + +# update +sql = f"SELECT target_id FROM demo.upsert_table WHERE user_id = 3;" +command = f'mysql -uroot -P9030 -h127.0.0.1 -e "{sql}"' +output = subprocess.check_output( + ["docker", "compose", "exec", "starrocks-fe", "bash", "-c", command]) +id = int(output.decode('utf-8').split('\n')[1]) +if id != 30: + print(f"target_id expected 30, get {id}") + failed_cases.append("update demo.upsert_table") + if len(failed_cases) != 0: print(f"Data check failed for case {failed_cases}") sys.exit(1) diff --git a/integration_tests/starrocks-sink/starrocks_prepare.sql b/integration_tests/starrocks-sink/starrocks_prepare.sql index aadaf85289b3c..6b304534061fe 100644 --- a/integration_tests/starrocks-sink/starrocks_prepare.sql +++ b/integration_tests/starrocks-sink/starrocks_prepare.sql @@ -9,5 +9,13 @@ CREATE table demo_bhv_table( PRIMARY KEY(`user_id`) DISTRIBUTED BY HASH(`user_id`) properties("replication_num" = "1"); +CREATE table upsert_table( + user_id int, + target_id text, + event_timestamp_local datetime +) ENGINE=OLAP +PRIMARY KEY(`user_id`) +DISTRIBUTED BY HASH(`user_id`) properties("replication_num" = "1"); + CREATE USER 'users'@'%' IDENTIFIED BY '123456'; GRANT ALL ON *.* TO 'users'@'%'; diff --git a/integration_tests/starrocks-sink/update_delete.sql b/integration_tests/starrocks-sink/update_delete.sql new file mode 100644 index 0000000000000..adabd5163ef44 --- /dev/null +++ b/integration_tests/starrocks-sink/update_delete.sql @@ -0,0 +1,5 @@ +DELETE FROM upsert_user_behaviors WHERE user_id = 2; + +UPDATE upsert_user_behaviors SET target_id = 30 WHERE user_id = 3; + +FLUSH; diff --git a/integration_tests/starrocks-sink/upsert/create_mv.sql b/integration_tests/starrocks-sink/upsert/create_mv.sql deleted file mode 100644 index c367e6f2baa94..0000000000000 --- a/integration_tests/starrocks-sink/upsert/create_mv.sql +++ /dev/null @@ -1,7 +0,0 @@ -CREATE MATERIALIZED VIEW bhv_mv AS -SELECT - user_id, - target_id, - event_timestamp AT TIME ZONE 'Asia/Shanghai' as event_timestamp_local -FROM - user_behaviors; diff --git a/integration_tests/starrocks-sink/upsert/create_sink.sql b/integration_tests/starrocks-sink/upsert/create_sink.sql deleted file mode 100644 index d7557bc1bd4fc..0000000000000 --- a/integration_tests/starrocks-sink/upsert/create_sink.sql +++ /dev/null @@ -1,14 +0,0 @@ -CREATE SINK bhv_starrocks_sink -FROM - bhv_mv WITH ( - connector = 'starrocks', - type = 'upsert', - starrocks.host = 'starrocks-fe', - starrocks.mysqlport = '9030', - starrocks.httpport = '8030', - starrocks.user = 'users', - starrocks.password = '123456', - starrocks.database = 'demo', - starrocks.table = 'demo_bhv_table', - primary_key = 'user_id' -); \ No newline at end of file diff --git a/integration_tests/starrocks-sink/upsert/create_table.sql b/integration_tests/starrocks-sink/upsert/create_table.sql deleted file mode 100644 index c6cfa87eed3c8..0000000000000 --- a/integration_tests/starrocks-sink/upsert/create_table.sql +++ /dev/null @@ -1,10 +0,0 @@ -CREATE table user_behaviors ( - user_id int, - target_id VARCHAR, - target_type VARCHAR, - event_timestamp TIMESTAMPTZ, - behavior_type VARCHAR, - parent_target_type VARCHAR, - parent_target_id VARCHAR, - PRIMARY KEY(user_id) -); diff --git a/integration_tests/starrocks-sink/upsert/insert_update_delete.sql b/integration_tests/starrocks-sink/upsert/insert_update_delete.sql deleted file mode 100644 index f21353c161154..0000000000000 --- a/integration_tests/starrocks-sink/upsert/insert_update_delete.sql +++ /dev/null @@ -1,8 +0,0 @@ -INSERT INTO user_behaviors VALUES(1,'1','1','2020-01-01T01:01:01Z','1','1','1'), -(2,'2','2','2020-01-01T01:01:02Z','2','2','2'), -(3,'3','3','2020-01-01T01:01:03Z','3','3','3'), -(4,'4','4','2020-01-01T01:01:04Z','4','4','4'); - -DELETE FROM user_behaviors WHERE user_id = 2; - -UPDATE user_behaviors SET target_id = 30 WHERE user_id = 3; From c7dada8405af2df0d12cb795320f7ec48cb45a87 Mon Sep 17 00:00:00 2001 From: TennyZhuang Date: Tue, 16 Jan 2024 16:53:46 +0800 Subject: [PATCH 5/5] refactor(utils): make rw_futures_util a standalone crate (#14595) Signed-off-by: TennyZhuang Co-authored-by: TennyZhuang --- Cargo.lock | 19 +++++ Cargo.toml | 2 + src/batch/Cargo.toml | 1 + src/batch/src/executor/generic_exchange.rs | 2 +- src/batch/src/executor/row_seq_scan.rs | 2 +- src/batch/src/executor/union.rs | 2 +- src/common/Cargo.toml | 1 + src/common/src/util/mod.rs | 5 -- src/connector/Cargo.toml | 1 + src/connector/src/sink/remote.rs | 2 +- src/connector/src/sink/writer.rs | 2 +- src/frontend/Cargo.toml | 1 + .../src/scheduler/distributed/stage.rs | 2 +- src/jni_core/Cargo.toml | 1 + src/jni_core/src/hummock_iterator.rs | 2 +- src/meta/Cargo.toml | 1 + src/meta/src/barrier/rpc.rs | 2 +- src/meta/src/hummock/manager/mod.rs | 2 +- .../src/manager/sink_coordination/manager.rs | 2 +- src/rpc_client/Cargo.toml | 1 + src/rpc_client/src/lib.rs | 2 +- src/source/Cargo.toml | 1 + src/source/src/connector_source.rs | 2 +- src/stream/Cargo.toml | 1 + src/stream/src/executor/project.rs | 2 +- src/utils/futures_util/Cargo.toml | 22 ++++++ .../futures_util/src}/buffered_with_fence.rs | 71 +++++++------------ src/utils/futures_util/src/lib.rs | 63 ++++++++++++++++ .../mod.rs => utils/futures_util/src/misc.rs} | 9 +-- src/workspace-hack/Cargo.toml | 1 + 30 files changed, 157 insertions(+), 70 deletions(-) create mode 100644 src/utils/futures_util/Cargo.toml rename src/{common/src/util/future_utils => utils/futures_util/src}/buffered_with_fence.rs (80%) create mode 100644 src/utils/futures_util/src/lib.rs rename src/{common/src/util/future_utils/mod.rs => utils/futures_util/src/misc.rs} (94%) diff --git a/Cargo.lock b/Cargo.lock index 01792a8b108d9..1262bfb13bc87 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8552,6 +8552,7 @@ dependencies = [ "risingwave_rpc_client", "risingwave_source", "risingwave_storage", + "rw_futures_util", "scopeguard", "serde_json", "task_stats_alloc", @@ -8730,6 +8731,7 @@ dependencies = [ "risingwave_pb", "rust_decimal", "rusty-fork", + "rw_futures_util", "ryu", "serde", "serde_bytes", @@ -8974,6 +8976,7 @@ dependencies = [ "risingwave_pb", "risingwave_rpc_client", "rust_decimal", + "rw_futures_util", "serde", "serde_derive", "serde_json", @@ -9216,6 +9219,7 @@ dependencies = [ "risingwave_storage", "risingwave_udf", "risingwave_variables", + "rw_futures_util", "serde", "serde_json", "sha2", @@ -9334,6 +9338,7 @@ dependencies = [ "risingwave_object_store", "risingwave_pb", "risingwave_storage", + "rw_futures_util", "serde", "serde_json", "thiserror", @@ -9411,6 +9416,7 @@ dependencies = [ "risingwave_rpc_client", "risingwave_sqlparser", "risingwave_test_runner", + "rw_futures_util", "scopeguard", "sea-orm", "serde", @@ -9616,6 +9622,7 @@ dependencies = [ "risingwave_error", "risingwave_hummock_sdk", "risingwave_pb", + "rw_futures_util", "static_assertions", "thiserror", "thiserror-ext", @@ -9723,6 +9730,7 @@ dependencies = [ "risingwave_common", "risingwave_connector", "risingwave_pb", + "rw_futures_util", "tempfile", "tracing", "workspace-hack", @@ -9914,6 +9922,7 @@ dependencies = [ "risingwave_rpc_client", "risingwave_source", "risingwave_storage", + "rw_futures_util", "serde", "serde_json", "serde_yaml", @@ -10230,6 +10239,15 @@ dependencies = [ "wait-timeout", ] +[[package]] +name = "rw_futures_util" +version = "0.0.0" +dependencies = [ + "futures", + "pin-project-lite", + "tokio", +] + [[package]] name = "ryu" version = "1.0.15" @@ -13604,6 +13622,7 @@ dependencies = [ "futures", "futures-channel", "futures-core", + "futures-executor", "futures-io", "futures-sink", "futures-task", diff --git a/Cargo.toml b/Cargo.toml index 555e6a7d2a1e0..7bd67bc583745 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ members = [ "src/tests/sqlsmith", "src/tests/state_cleaning_test", "src/utils/delta_btree_map", + "src/utils/futures_util", "src/utils/local_stats_alloc", "src/utils/pgwire", "src/utils/runtime", @@ -187,6 +188,7 @@ risingwave_udf = { path = "./src/expr/udf" } risingwave_variables = { path = "./src/utils/variables" } risingwave_java_binding = { path = "./src/java_binding" } risingwave_jni_core = { path = "src/jni_core" } +rw_futures_util = { path = "src/utils/futures_util" } tokio-util = "0.7" [workspace.lints.rust] diff --git a/src/batch/Cargo.toml b/src/batch/Cargo.toml index 3660318180c16..93656967b801f 100644 --- a/src/batch/Cargo.toml +++ b/src/batch/Cargo.toml @@ -37,6 +37,7 @@ risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true } risingwave_source = { workspace = true } risingwave_storage = { workspace = true } +rw_futures_util = { workspace = true } scopeguard = "1" serde_json = "1" thiserror = "1" diff --git a/src/batch/src/executor/generic_exchange.rs b/src/batch/src/executor/generic_exchange.rs index e54cb9069a393..704a085fec245 100644 --- a/src/batch/src/executor/generic_exchange.rs +++ b/src/batch/src/executor/generic_exchange.rs @@ -18,11 +18,11 @@ use itertools::Itertools; use risingwave_common::array::DataChunk; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_common::util::select_all; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::PbExchangeSource; use risingwave_pb::plan_common::Field as NodeField; use risingwave_rpc_client::ComputeClientPoolRef; +use rw_futures_util::select_all; use crate::error::{BatchError, Result}; use crate::exchange_source::ExchangeSourceImpl; diff --git a/src/batch/src/executor/row_seq_scan.rs b/src/batch/src/executor/row_seq_scan.rs index 6a5ec3cdf704f..bf2fb9613b7eb 100644 --- a/src/batch/src/executor/row_seq_scan.rs +++ b/src/batch/src/executor/row_seq_scan.rs @@ -24,7 +24,6 @@ use risingwave_common::catalog::{ColumnId, Schema}; use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::types::{DataType, Datum}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; -use risingwave_common::util::select_all; use risingwave_common::util::value_encoding::deserialize_datum; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::{scan_range, PbScanRange}; @@ -34,6 +33,7 @@ use risingwave_storage::store::PrefetchOptions; use risingwave_storage::table::batch_table::storage_table::StorageTable; use risingwave_storage::table::{collect_data_chunk, TableDistribution}; use risingwave_storage::{dispatch_state_store, StateStore}; +use rw_futures_util::select_all; use crate::error::{BatchError, Result}; use crate::executor::{ diff --git a/src/batch/src/executor/union.rs b/src/batch/src/executor/union.rs index 00d01f93448f7..e37baed08debc 100644 --- a/src/batch/src/executor/union.rs +++ b/src/batch/src/executor/union.rs @@ -17,8 +17,8 @@ use futures_async_stream::try_stream; use itertools::Itertools; use risingwave_common::array::DataChunk; use risingwave_common::catalog::Schema; -use risingwave_common::util::select_all; use risingwave_pb::batch_plan::plan_node::NodeBody; +use rw_futures_util::select_all; use crate::error::{BatchError, Result}; use crate::executor::{ diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml index cb859665fb6d6..f6741129d2428 100644 --- a/src/common/Cargo.toml +++ b/src/common/Cargo.toml @@ -82,6 +82,7 @@ risingwave_common_proc_macro = { path = "./proc_macro" } risingwave_error = { workspace = true } risingwave_pb = { workspace = true } rust_decimal = { version = "1", features = ["db-postgres", "maths"] } +rw_futures_util = { workspace = true } ryu = "1.0" serde = { version = "1", features = ["derive"] } serde_bytes = "0.11" diff --git a/src/common/src/util/mod.rs b/src/common/src/util/mod.rs index a629512b3d63e..917d982be3db6 100644 --- a/src/common/src/util/mod.rs +++ b/src/common/src/util/mod.rs @@ -21,7 +21,6 @@ pub mod compress; pub mod deployment; pub mod env_var; pub mod epoch; -mod future_utils; pub mod hash_util; pub mod iter_util; pub mod memcmp_encoding; @@ -42,9 +41,5 @@ pub mod tracing; pub mod value_encoding; pub mod worker_util; -pub use future_utils::{ - await_future_with_monitor_error_stream, drop_either_future, pending_on_none, select_all, - RwFutureExt, RwTryStreamExt, -}; #[macro_use] pub mod match_util; diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 791cc076d12e2..3f469e5ad65ae 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -110,6 +110,7 @@ risingwave_jni_core = { workspace = true } risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true } rust_decimal = "1" +rw_futures_util = { workspace = true } serde = { version = "1", features = ["derive", "rc"] } serde_derive = "1" serde_json = "1" diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 320f77c6a47ba..6c4e12c5997a0 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -29,7 +29,6 @@ use risingwave_common::array::StreamChunk; use risingwave_common::catalog::{ColumnDesc, ColumnId}; use risingwave_common::error::anyhow_error; use risingwave_common::types::DataType; -use risingwave_common::util::drop_either_future; use risingwave_jni_core::jvm_runtime::JVM; use risingwave_jni_core::{ call_static_method, gen_class_name, JniReceiverType, JniSenderType, JniSinkWriterStreamRequest, @@ -49,6 +48,7 @@ use risingwave_rpc_client::{ BidiStreamReceiver, BidiStreamSender, SinkCoordinatorStreamHandle, SinkWriterStreamHandle, DEFAULT_BUFFER_SIZE, }; +use rw_futures_util::drop_either_future; use tokio::sync::mpsc; use tokio::sync::mpsc::{unbounded_channel, Receiver, Sender}; use tokio::task::spawn_blocking; diff --git a/src/connector/src/sink/writer.rs b/src/connector/src/sink/writer.rs index fdfe1acd4301e..1d8142061f35b 100644 --- a/src/connector/src/sink/writer.rs +++ b/src/connector/src/sink/writer.rs @@ -22,7 +22,7 @@ use futures::future::{select, Either}; use futures::TryFuture; use risingwave_common::array::StreamChunk; use risingwave_common::buffer::Bitmap; -use risingwave_common::util::drop_either_future; +use rw_futures_util::drop_either_future; use crate::sink::encoder::SerTo; use crate::sink::formatter::SinkFormatter; diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index d6996008507a9..f692531c6470e 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -67,6 +67,7 @@ risingwave_sqlparser = { workspace = true } risingwave_storage = { workspace = true } risingwave_udf = { workspace = true } risingwave_variables = { workspace = true } +rw_futures_util = { workspace = true } serde = { version = "1", features = ["derive"] } serde_json = "1" sha2 = "0.10.7" diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index 2d0df049da3fa..568f6de613a95 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -33,7 +33,6 @@ use risingwave_common::array::DataChunk; use risingwave_common::hash::ParallelUnitMapping; use risingwave_common::util::addr::HostAddr; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_common::util::select_all; use risingwave_connector::source::SplitMetaData; use risingwave_expr::expr_context::expr_context_scope; use risingwave_pb::batch_plan::plan_node::NodeBody; @@ -45,6 +44,7 @@ use risingwave_pb::common::{BatchQueryEpoch, HostAddress, WorkerNode}; use risingwave_pb::plan_common::ExprContext; use risingwave_pb::task_service::{CancelTaskRequest, TaskInfoResponse}; use risingwave_rpc_client::ComputeClientPoolRef; +use rw_futures_util::select_all; use thiserror_ext::AsReport; use tokio::spawn; use tokio::sync::mpsc::{Receiver, Sender}; diff --git a/src/jni_core/Cargo.toml b/src/jni_core/Cargo.toml index 736c94e58060d..bb2f8bda463f4 100644 --- a/src/jni_core/Cargo.toml +++ b/src/jni_core/Cargo.toml @@ -24,6 +24,7 @@ risingwave_hummock_sdk = { workspace = true } risingwave_object_store = { workspace = true } risingwave_pb = { workspace = true } risingwave_storage = { workspace = true } +rw_futures_util = { workspace = true } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" thiserror = "1" diff --git a/src/jni_core/src/hummock_iterator.rs b/src/jni_core/src/hummock_iterator.rs index 009dce22c9dd9..c66669d559154 100644 --- a/src/jni_core/src/hummock_iterator.rs +++ b/src/jni_core/src/hummock_iterator.rs @@ -20,7 +20,6 @@ use risingwave_common::catalog::ColumnDesc; use risingwave_common::config::ObjectStoreConfig; use risingwave_common::hash::VirtualNode; use risingwave_common::row::OwnedRow; -use risingwave_common::util::select_all; use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde; use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde, ValueRowDeserializer}; use risingwave_hummock_sdk::key::{prefixed_range_with_vnode, TableKeyRange}; @@ -39,6 +38,7 @@ use risingwave_storage::hummock::{ use risingwave_storage::monitor::HummockStateStoreMetrics; use risingwave_storage::row_serde::value_serde::ValueRowSerdeNew; use risingwave_storage::store::{ReadOptions, StateStoreReadIterStream, StreamTypeOfIter}; +use rw_futures_util::select_all; use tokio::sync::mpsc::unbounded_channel; type SelectAllIterStream = impl StateStoreReadIterStream + Unpin; diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index c97dfab2d429a..013ce2200f0d1 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -59,6 +59,7 @@ risingwave_object_store = { workspace = true } risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true } risingwave_sqlparser = { workspace = true } +rw_futures_util = { workspace = true } scopeguard = "1.2.0" sea-orm = { version = "0.12.0", features = [ "sqlx-mysql", diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index e79ffadf3d991..b9661a37d8e83 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -24,11 +24,11 @@ use futures::stream::FuturesUnordered; use futures::{FutureExt, StreamExt}; use itertools::Itertools; use risingwave_common::bail; -use risingwave_common::util::pending_on_none; use risingwave_common::util::tracing::TracingContext; use risingwave_pb::stream_plan::{Barrier, BarrierMutation}; use risingwave_pb::stream_service::{BarrierCompleteRequest, InjectBarrierRequest}; use risingwave_rpc_client::StreamClientPoolRef; +use rw_futures_util::pending_on_none; use tokio::sync::oneshot; use uuid::Uuid; diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 5b07ff5be5123..66d6b1aaa14cf 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -32,7 +32,6 @@ use risingwave_common::config::default::compaction_config; use risingwave_common::config::ObjectStoreConfig; use risingwave_common::monitor::rwlock::MonitoredRwLock; use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH}; -use risingwave_common::util::{pending_on_none, select_all}; use risingwave_hummock_sdk::compact::{compact_task_to_string, statistics_compact_task}; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ build_version_delta_after_version, get_compaction_group_ids, @@ -64,6 +63,7 @@ use risingwave_pb::hummock::{ PbCompactionGroupInfo, SstableInfo, SubscribeCompactionEventRequest, TableOption, }; use risingwave_pb::meta::subscribe_response::{Info, Operation}; +use rw_futures_util::{pending_on_none, select_all}; use thiserror_ext::AsReport; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::oneshot::Sender; diff --git a/src/meta/src/manager/sink_coordination/manager.rs b/src/meta/src/manager/sink_coordination/manager.rs index 34b4073916e6c..2c1d248565d48 100644 --- a/src/meta/src/manager/sink_coordination/manager.rs +++ b/src/meta/src/manager/sink_coordination/manager.rs @@ -20,11 +20,11 @@ use futures::future::{select, BoxFuture, Either}; use futures::stream::FuturesUnordered; use futures::{FutureExt, Stream, StreamExt, TryStreamExt}; use risingwave_common::buffer::Bitmap; -use risingwave_common::util::pending_on_none; use risingwave_connector::sink::catalog::SinkId; use risingwave_connector::sink::SinkParam; use risingwave_pb::connector_service::coordinate_request::Msg; use risingwave_pb::connector_service::{coordinate_request, CoordinateRequest, CoordinateResponse}; +use rw_futures_util::pending_on_none; use tokio::sync::mpsc; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::sync::oneshot::{channel, Receiver, Sender}; diff --git a/src/rpc_client/Cargo.toml b/src/rpc_client/Cargo.toml index 450bc894586ef..7a43b6359cd68 100644 --- a/src/rpc_client/Cargo.toml +++ b/src/rpc_client/Cargo.toml @@ -29,6 +29,7 @@ risingwave_common = { workspace = true } risingwave_error = { workspace = true } risingwave_hummock_sdk = { workspace = true } risingwave_pb = { workspace = true } +rw_futures_util = { workspace = true } static_assertions = "1" thiserror = "1" thiserror-ext = { workspace = true } diff --git a/src/rpc_client/src/lib.rs b/src/rpc_client/src/lib.rs index 726d2e4c6c986..17168d94f3ac6 100644 --- a/src/rpc_client/src/lib.rs +++ b/src/rpc_client/src/lib.rs @@ -61,7 +61,7 @@ pub use compute_client::{ComputeClient, ComputeClientPool, ComputeClientPoolRef} pub use connector_client::{ConnectorClient, SinkCoordinatorStreamHandle, SinkWriterStreamHandle}; pub use hummock_meta_client::{CompactionEventItem, HummockMetaClient}; pub use meta_client::{MetaClient, SinkCoordinationRpcClient}; -use risingwave_common::util::await_future_with_monitor_error_stream; +use rw_futures_util::await_future_with_monitor_error_stream; pub use sink_coordinate_client::CoordinatorStreamHandle; pub use stream_client::{StreamClient, StreamClientPool, StreamClientPoolRef}; diff --git a/src/source/Cargo.toml b/src/source/Cargo.toml index 735ca5f10d9b6..9949fd8ab11fa 100644 --- a/src/source/Cargo.toml +++ b/src/source/Cargo.toml @@ -23,6 +23,7 @@ rand = "0.8" risingwave_common = { workspace = true } risingwave_connector = { workspace = true } risingwave_pb = { workspace = true } +rw_futures_util = { workspace = true } tokio = { version = "0.2", package = "madsim-tokio", features = ["rt", "rt-multi-thread", "sync", "macros", "time", "signal", "fs"] } tracing = { version = "0.1" } diff --git a/src/source/src/connector_source.rs b/src/source/src/connector_source.rs index 441a91836bb0a..f126c2692a77b 100644 --- a/src/source/src/connector_source.rs +++ b/src/source/src/connector_source.rs @@ -25,7 +25,6 @@ use risingwave_common::bail; use risingwave_common::catalog::ColumnId; use risingwave_common::error::ErrorCode::ConnectorError; use risingwave_common::error::{Result, RwError}; -use risingwave_common::util::select_all; use risingwave_connector::dispatch_source_prop; use risingwave_connector::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig}; use risingwave_connector::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator; @@ -37,6 +36,7 @@ use risingwave_connector::source::{ create_split_reader, BoxSourceWithStateStream, BoxTryStream, Column, ConnectorProperties, ConnectorState, FsFilterCtrlCtx, SourceColumnDesc, SourceContext, SplitReader, }; +use rw_futures_util::select_all; use tokio::time; use tokio::time::Duration; diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml index d3a035fa8b594..dcc2a6f3a4cb5 100644 --- a/src/stream/Cargo.toml +++ b/src/stream/Cargo.toml @@ -54,6 +54,7 @@ risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true } risingwave_source = { workspace = true } risingwave_storage = { workspace = true } +rw_futures_util = { workspace = true } serde_json = "1" smallvec = "1" static_assertions = "1" diff --git a/src/stream/src/executor/project.rs b/src/stream/src/executor/project.rs index 07052f1408185..8cbd7e66e4897 100644 --- a/src/stream/src/executor/project.rs +++ b/src/stream/src/executor/project.rs @@ -22,8 +22,8 @@ use risingwave_common::catalog::Schema; use risingwave_common::row::{Row, RowExt}; use risingwave_common::types::ToOwnedDatum; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_common::util::{RwFutureExt, RwTryStreamExt}; use risingwave_expr::expr::NonStrictExpression; +use rw_futures_util::{RwFutureExt, RwTryStreamExt}; use super::*; diff --git a/src/utils/futures_util/Cargo.toml b/src/utils/futures_util/Cargo.toml new file mode 100644 index 0000000000000..97bd794daaf8d --- /dev/null +++ b/src/utils/futures_util/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "rw_futures_util" +edition = { workspace = true } +license = { workspace = true } +repository = { workspace = true } +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[package.metadata.cargo-machete] +ignored = ["workspace-hack"] + +[package.metadata.cargo-udeps.ignore] +normal = ["workspace-hack"] + +[dependencies] +futures = "0.3" +pin-project-lite = "0.2" + +[dev-dependencies] +tokio = { version = "1", features = ["full"] } + +[lints] +workspace = true diff --git a/src/common/src/util/future_utils/buffered_with_fence.rs b/src/utils/futures_util/src/buffered_with_fence.rs similarity index 80% rename from src/common/src/util/future_utils/buffered_with_fence.rs rename to src/utils/futures_util/src/buffered_with_fence.rs index 30b1938fda991..6271fd189a587 100644 --- a/src/common/src/util/future_utils/buffered_with_fence.rs +++ b/src/utils/futures_util/src/buffered_with_fence.rs @@ -45,6 +45,21 @@ pin_project! { } } +impl TryBufferedWithFence +where + St: TryStream, + St::Ok: TryFuture + MaybeFence, +{ + pub(crate) fn new(stream: St, n: usize) -> Self { + Self { + stream: stream.into_stream().fuse(), + in_progress_queue: FuturesOrdered::new(), + syncing: false, + max: n, + } + } +} + impl Stream for TryBufferedWithFence where St: TryStream, @@ -100,6 +115,15 @@ pin_project! { } } +impl Fenced +where + Fut: Future, +{ + pub(crate) fn new(inner: Fut, is_fence: bool) -> Self { + Self { inner, is_fence } + } +} + impl Future for Fenced where Fut: Future, @@ -131,51 +155,6 @@ where } } -pub trait RwFutureExt: Future { - fn with_fence(self, is_fence: bool) -> Fenced - where - Self: Sized; -} - -impl RwFutureExt for Fut { - fn with_fence(self, is_fence: bool) -> Fenced { - Fenced { - inner: self, - is_fence, - } - } -} - -pub trait RwTryStreamExt: TryStream { - /// Similar to [`TryStreamExt::try_buffered`](https://docs.rs/futures/latest/futures/stream/trait.TryStreamExt.html#method.try_buffered), but respect to fence. - /// - /// Fence is provided by [`Future`] that implements [`MaybeFence`] and returns `true`. - /// When the stream receive a fenced future, it'll not do a sync operation. In brief, don't poll later futures until the current - /// buffer is cleared. - fn try_buffered_with_fence(self, n: usize) -> TryBufferedWithFence - where - Self: Sized, - Self::Ok: TryFuture + MaybeFence; -} - -impl RwTryStreamExt for St -where - St: TryStream, -{ - fn try_buffered_with_fence(self, n: usize) -> TryBufferedWithFence - where - Self: Sized, - Self::Ok: TryFuture + MaybeFence, - { - TryBufferedWithFence { - stream: self.into_stream().fuse(), - in_progress_queue: FuturesOrdered::new(), - syncing: false, - max: n, - } - } -} - #[cfg(test)] mod tests { use std::sync::{Arc, Mutex}; @@ -183,7 +162,7 @@ mod tests { use futures::stream::StreamExt; - use super::{RwFutureExt, RwTryStreamExt}; + use crate::{RwFutureExt, RwTryStreamExt}; #[tokio::test] async fn test_buffered_with_fence() { diff --git a/src/utils/futures_util/src/lib.rs b/src/utils/futures_util/src/lib.rs new file mode 100644 index 0000000000000..e4dfe78c922f8 --- /dev/null +++ b/src/utils/futures_util/src/lib.rs @@ -0,0 +1,63 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![feature(lint_reasons)] + +use std::future::Future; + +use futures::stream::TryStream; +use futures::TryFuture; + +mod buffered_with_fence; +mod misc; + +use buffered_with_fence::{Fenced, MaybeFence, TryBufferedWithFence}; +pub use misc::*; + +pub trait RwTryStreamExt: TryStream { + /// Similar to [`TryStreamExt::try_buffered`](https://docs.rs/futures/latest/futures/stream/trait.TryStreamExt.html#method.try_buffered), but respect to fence. + /// + /// Fence is provided by [`Future`] that implements [`MaybeFence`] and returns `true`. + /// When the stream receive a fenced future, it'll not do a sync operation. In brief, don't poll later futures until the current + /// buffer is cleared. + fn try_buffered_with_fence(self, n: usize) -> TryBufferedWithFence + where + Self: Sized, + Self::Ok: TryFuture + MaybeFence; +} + +impl RwTryStreamExt for St +where + St: TryStream, +{ + fn try_buffered_with_fence(self, n: usize) -> TryBufferedWithFence + where + Self: Sized, + Self::Ok: TryFuture + MaybeFence, + { + TryBufferedWithFence::new(self, n) + } +} + +pub trait RwFutureExt: Future { + fn with_fence(self, is_fence: bool) -> Fenced + where + Self: Sized; +} + +impl RwFutureExt for Fut { + fn with_fence(self, is_fence: bool) -> Fenced { + Fenced::new(self, is_fence) + } +} diff --git a/src/common/src/util/future_utils/mod.rs b/src/utils/futures_util/src/misc.rs similarity index 94% rename from src/common/src/util/future_utils/mod.rs rename to src/utils/futures_util/src/misc.rs index d71ebfa7d6765..01317b3e85f24 100644 --- a/src/common/src/util/future_utils/mod.rs +++ b/src/utils/futures_util/src/misc.rs @@ -12,15 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod buffered_with_fence; - -use std::future::pending; +use std::future::Future; use std::pin::{pin, Pin}; -pub use buffered_with_fence::*; -use futures::future::{select, Either}; +use futures::future::{pending, select, Either}; use futures::stream::Peekable; -use futures::{Future, FutureExt, Stream, StreamExt}; +use futures::{FutureExt, Stream, StreamExt}; /// Convert a list of streams into a [`Stream`] of results from the streams. pub fn select_all( diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 6b5996fe81d56..fbd86300c73b1 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -52,6 +52,7 @@ frunk_core = { version = "0.4", default-features = false, features = ["std"] } futures = { version = "0.3" } futures-channel = { version = "0.3", features = ["sink"] } futures-core = { version = "0.3" } +futures-executor = { version = "0.3" } futures-io = { version = "0.3" } futures-sink = { version = "0.3" } futures-task = { version = "0.3" }