From f73dbe4fc76dfb8204df8cb85a0a0e15dd489425 Mon Sep 17 00:00:00 2001 From: ZENOTME <43447882+ZENOTME@users.noreply.github.com> Date: Tue, 16 Jan 2024 15:14:52 +0800 Subject: [PATCH 1/7] fix: update icelake (#14588) Co-authored-by: ZENOTME --- Cargo.lock | 6 +++--- Cargo.toml | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fb63420db1f9..01792a8b108d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -198,7 +198,7 @@ dependencies = [ [[package]] name = "apache-avro" version = "0.17.0" -source = "git+https://github.com/icelake-io/avro.git?rev=4866a4ad0ed5d6af7160c9b52af898ab6d0551f9#4866a4ad0ed5d6af7160c9b52af898ab6d0551f9" +source = "git+https://github.com/icelake-io/avro.git?branch=icelake-dev#4b828e9283e7248fd3ca42f5b590c2160b201785" dependencies = [ "apache-avro-derive", "bigdecimal 0.4.2", @@ -221,7 +221,7 @@ dependencies = [ [[package]] name = "apache-avro-derive" version = "0.17.0" -source = "git+https://github.com/icelake-io/avro.git?rev=4866a4ad0ed5d6af7160c9b52af898ab6d0551f9#4866a4ad0ed5d6af7160c9b52af898ab6d0551f9" +source = "git+https://github.com/icelake-io/avro.git?branch=icelake-dev#4b828e9283e7248fd3ca42f5b590c2160b201785" dependencies = [ "darling 0.20.3", "proc-macro2", @@ -5051,7 +5051,7 @@ dependencies = [ [[package]] name = "icelake" version = "0.0.10" -source = "git+https://github.com/icelake-io/icelake?rev=b4f4ca3c6d29092bd331925ead0bcceaa38bdd57#b4f4ca3c6d29092bd331925ead0bcceaa38bdd57" +source = "git+https://github.com/icelake-io/icelake?rev=32c0bbf242f5c47b1e743f10577012fe7436c770#32c0bbf242f5c47b1e743f10577012fe7436c770" dependencies = [ "anyhow", "apache-avro 0.17.0", diff --git a/Cargo.toml b/Cargo.toml index d3fec07a9c3d..555e6a7d2a1e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -120,7 +120,7 @@ tonic = { package = "madsim-tonic", version = "0.4.1" } tonic-build = { package = "madsim-tonic-build", version = "0.4.2" } otlp-embedded = { git = "https://github.com/risingwavelabs/otlp-embedded", rev = "58c1f003484449d7c6dd693b348bf19dd44889cb" } prost = { version = "0.12" } -icelake = { git = "https://github.com/icelake-io/icelake", rev = "b4f4ca3c6d29092bd331925ead0bcceaa38bdd57", features = [ +icelake = { git = "https://github.com/icelake-io/icelake", rev = "32c0bbf242f5c47b1e743f10577012fe7436c770", features = [ "prometheus", ] } arrow-array = "49" From 1a70c3a5f32f2746333b44e682a735e11b52b6df Mon Sep 17 00:00:00 2001 From: Bohan Zhang Date: Tue, 16 Jan 2024 15:22:19 +0800 Subject: [PATCH 2/7] feat: pre-release pubsub (#14531) --- e2e_test/source/basic/pubsub.slt | 22 ++++--------------- .../source/google_pubsub/enumerator/client.rs | 9 ++------ src/connector/src/source/google_pubsub/mod.rs | 13 +++-------- src/connector/with_options_source.yaml | 9 +++----- 4 files changed, 12 insertions(+), 41 deletions(-) diff --git a/e2e_test/source/basic/pubsub.slt b/e2e_test/source/basic/pubsub.slt index e5d991140579..b245d9b2aea8 100644 --- a/e2e_test/source/basic/pubsub.slt +++ b/e2e_test/source/basic/pubsub.slt @@ -2,16 +2,14 @@ statement error CREATE TABLE s1 (v1 int, v2 varchar) WITH ( pubsub.subscription = 'test-subscription-1', - pubsub.emulator_host = 'localhost:5981', - pubsub.split_count = 3 + pubsub.emulator_host = 'invalid_host:5981' ) FORMAT PLAIN ENCODE JSON; statement ok CREATE TABLE s1 (v1 int, v2 varchar) WITH ( connector = 'google_pubsub', pubsub.subscription = 'test-subscription-1', - pubsub.emulator_host = 'localhost:5980', - pubsub.split_count = 3 + pubsub.emulator_host = 'localhost:5980' ) FORMAT PLAIN ENCODE JSON; statement ok @@ -25,25 +23,14 @@ statement error CREATE TABLE s2 (v1 int, v2 varchar) WITH ( connector = 'google_pubsub', pubsub.subscription = 'test-subscription-not-2', - pubsub.emulator_host = 'localhost:5980', - pubsub.split_count = 3 + pubsub.emulator_host = 'localhost:5980' ) FORMAT PLAIN ENCODE JSON; statement ok CREATE TABLE s2 (v1 int, v2 varchar) WITH ( connector = 'google_pubsub', pubsub.subscription = 'test-subscription-2', - pubsub.emulator_host = 'localhost:5980', - pubsub.split_count = 3 -) FORMAT PLAIN ENCODE JSON; - -# fail with invalid split count -statement error -CREATE TABLE s3 (v1 int, v2 varchar) WITH ( - connector = 'google_pubsub', - pubsub.subscription = 'test-subscription-3', - pubsub.emulator_host = 'localhost:5980', - pubsub.split_count = 0 + pubsub.emulator_host = 'localhost:5980' ) FORMAT PLAIN ENCODE JSON; # fail if both start_offset and start_snapshot are provided @@ -52,7 +39,6 @@ CREATE TABLE s3 (v1 int, v2 varchar) WITH ( connector = 'google_pubsub', pubsub.subscription = 'test-subscription-3', pubsub.emulator_host = 'localhost:5980', - pubsub.split_count = 2, pubsub.start_offset = "121212", pubsub.start_snapshot = "snapshot-that-doesnt-exist" ) FORMAT PLAIN ENCODE JSON; diff --git a/src/connector/src/source/google_pubsub/enumerator/client.rs b/src/connector/src/source/google_pubsub/enumerator/client.rs index e957bac8a641..01809a3c773b 100644 --- a/src/connector/src/source/google_pubsub/enumerator/client.rs +++ b/src/connector/src/source/google_pubsub/enumerator/client.rs @@ -37,13 +37,8 @@ impl SplitEnumerator for PubsubSplitEnumerator { properties: Self::Properties, _context: SourceEnumeratorContextRef, ) -> anyhow::Result { - let split_count = properties.split_count; let subscription = properties.subscription.to_owned(); - if split_count < 1 { - bail!("split_count must be >= 1") - } - if properties.credentials.is_none() && properties.emulator_host.is_none() { bail!("credentials must be set if not using the pubsub emulator") } @@ -87,7 +82,7 @@ impl SplitEnumerator for PubsubSplitEnumerator { } (None, Some(snapshot)) => Some(SeekTo::Snapshot(snapshot)), (Some(_), Some(_)) => { - bail!("specify atmost one of start_offset or start_snapshot") + bail!("specify at most one of start_offset or start_snapshot") } }; @@ -99,7 +94,7 @@ impl SplitEnumerator for PubsubSplitEnumerator { Ok(Self { subscription, - split_count, + split_count: 1, }) } diff --git a/src/connector/src/source/google_pubsub/mod.rs b/src/connector/src/source/google_pubsub/mod.rs index 18fd11b1e3ba..aeec1accd820 100644 --- a/src/connector/src/source/google_pubsub/mod.rs +++ b/src/connector/src/source/google_pubsub/mod.rs @@ -21,7 +21,6 @@ pub mod source; pub mod split; pub use enumerator::*; -use serde_with::{serde_as, DisplayFromStr}; pub use source::*; pub use split::*; use with_options::WithOptions; @@ -30,13 +29,8 @@ use crate::source::SourceProperties; pub const GOOGLE_PUBSUB_CONNECTOR: &str = "google_pubsub"; -#[serde_as] #[derive(Clone, Debug, Deserialize, WithOptions)] pub struct PubsubProperties { - #[serde_as(as = "DisplayFromStr")] - #[serde(rename = "pubsub.split_count")] - pub split_count: u32, - /// pubsub subscription to consume messages from /// The subscription should be configured with the `retain-on-ack` property to enable /// message recovery within risingwave. @@ -48,19 +42,19 @@ pub struct PubsubProperties { #[serde(rename = "pubsub.emulator_host")] pub emulator_host: Option, - /// credentials JSON object encoded with base64 + /// `credentials` is a JSON string containing the service account credentials. /// See the [service-account credentials guide](https://developers.google.com/workspace/guides/create-credentials#create_credentials_for_a_service_account). /// The service account must have the `pubsub.subscriber` [role](https://cloud.google.com/pubsub/docs/access-control#roles). #[serde(rename = "pubsub.credentials")] pub credentials: Option, - /// `start_offset` is a numeric timestamp, ideallly the publish timestamp of a message + /// `start_offset` is a numeric timestamp, ideally the publish timestamp of a message /// in the subscription. If present, the connector will attempt to seek the subscription /// to the timestamp and start consuming from there. Note that the seek operation is /// subject to limitations around the message retention policy of the subscription. See /// [Seeking to a timestamp](https://cloud.google.com/pubsub/docs/replay-overview#seeking_to_a_timestamp) for /// more details. - #[serde(rename = "pubsub.start_offset")] + #[serde(rename = "pubsub.start_offset.nanos")] pub start_offset: Option, /// `start_snapshot` is a named pub/sub snapshot. If present, the connector will first seek @@ -127,7 +121,6 @@ mod tests { let default_properties = PubsubProperties { credentials: None, emulator_host: None, - split_count: 1, start_offset: None, start_snapshot: None, subscription: String::from("test-subscription"), diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index 187780cd2382..84a125813733 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -471,9 +471,6 @@ PosixFsProperties: default: Default::default PubsubProperties: fields: - - name: pubsub.split_count - field_type: u32 - required: true - name: pubsub.subscription field_type: String comments: pubsub subscription to consume messages from The subscription should be configured with the `retain-on-ack` property to enable message recovery within risingwave. @@ -484,11 +481,11 @@ PubsubProperties: required: false - name: pubsub.credentials field_type: String - comments: credentials JSON object encoded with base64 See the [service-account credentials guide](https://developers.google.com/workspace/guides/create-credentials#create_credentials_for_a_service_account). The service account must have the `pubsub.subscriber` [role](https://cloud.google.com/pubsub/docs/access-control#roles). + comments: '`credentials` is a JSON string containing the service account credentials. See the [service-account credentials guide](https://developers.google.com/workspace/guides/create-credentials#create_credentials_for_a_service_account). The service account must have the `pubsub.subscriber` [role](https://cloud.google.com/pubsub/docs/access-control#roles).' required: false - - name: pubsub.start_offset + - name: pubsub.start_offset.nanos field_type: String - comments: '`start_offset` is a numeric timestamp, ideallly the publish timestamp of a message in the subscription. If present, the connector will attempt to seek the subscription to the timestamp and start consuming from there. Note that the seek operation is subject to limitations around the message retention policy of the subscription. See [Seeking to a timestamp](https://cloud.google.com/pubsub/docs/replay-overview#seeking_to_a_timestamp) for more details.' + comments: '`start_offset` is a numeric timestamp, ideally the publish timestamp of a message in the subscription. If present, the connector will attempt to seek the subscription to the timestamp and start consuming from there. Note that the seek operation is subject to limitations around the message retention policy of the subscription. See [Seeking to a timestamp](https://cloud.google.com/pubsub/docs/replay-overview#seeking_to_a_timestamp) for more details.' required: false - name: pubsub.start_snapshot field_type: String From 222bbd1800359c54dd429b511554d1642614ed8d Mon Sep 17 00:00:00 2001 From: William Wen <44139337+wenym1@users.noreply.github.com> Date: Tue, 16 Jan 2024 15:50:50 +0800 Subject: [PATCH 3/7] feat(streaming): query global max watermark via storage table (#14591) --- src/common/src/catalog/physical_table.rs | 27 ++++ src/common/src/hash/consistent_hash/vnode.rs | 6 +- .../src/table/batch_table/storage_table.rs | 4 + src/storage/src/table/mod.rs | 13 +- src/stream/src/executor/watermark_filter.rs | 146 +++++++++++++----- src/stream/src/from_proto/watermark_filter.rs | 16 ++ 6 files changed, 167 insertions(+), 45 deletions(-) diff --git a/src/common/src/catalog/physical_table.rs b/src/common/src/catalog/physical_table.rs index 48ead96874e7..92c71d37e2aa 100644 --- a/src/common/src/catalog/physical_table.rs +++ b/src/common/src/catalog/physical_table.rs @@ -17,10 +17,13 @@ use std::collections::HashMap; use anyhow::anyhow; use fixedbitset::FixedBitSet; use itertools::Itertools; +use risingwave_pb::catalog::Table; use risingwave_pb::common::PbColumnOrder; use risingwave_pb::plan_common::StorageTableDesc; use super::{ColumnDesc, ColumnId, TableId}; +use crate::catalog::hummock::TABLE_OPTION_DUMMY_RETENTION_SECOND; +use crate::catalog::TableOption; use crate::util::sort_util::ColumnOrder; /// Includes necessary information for compute node to access data of the table. @@ -138,4 +141,28 @@ impl TableDesc { }); id_to_idx } + + pub fn from_pb_table(table: &Table) -> Self { + let table_options = TableOption::build_table_option(&table.properties); + Self { + table_id: TableId::new(table.id), + pk: table.pk.iter().map(ColumnOrder::from_protobuf).collect(), + columns: table + .columns + .iter() + .map(|col| ColumnDesc::from(col.column_desc.as_ref().unwrap())) + .collect(), + distribution_key: table.distribution_key.iter().map(|i| *i as _).collect(), + stream_key: table.stream_key.iter().map(|i| *i as _).collect(), + vnode_col_index: table.vnode_col_index.map(|i| i as _), + append_only: table.append_only, + retention_seconds: table_options + .retention_seconds + .unwrap_or(TABLE_OPTION_DUMMY_RETENTION_SECOND), + value_indices: table.value_indices.iter().map(|i| *i as _).collect(), + read_prefix_len_hint: table.read_prefix_len_hint as _, + watermark_columns: table.watermark_indices.iter().map(|i| *i as _).collect(), + versioned: table.version.is_some(), + } + } } diff --git a/src/common/src/hash/consistent_hash/vnode.rs b/src/common/src/hash/consistent_hash/vnode.rs index 19b8975f775a..9bc49a9372ac 100644 --- a/src/common/src/hash/consistent_hash/vnode.rs +++ b/src/common/src/hash/consistent_hash/vnode.rs @@ -18,7 +18,7 @@ use parse_display::Display; use crate::array::{Array, ArrayImpl, DataChunk}; use crate::hash::Crc32HashCode; use crate::row::{Row, RowExt}; -use crate::types::{DataType, DatumRef, ScalarRefImpl}; +use crate::types::{DataType, Datum, DatumRef, ScalarImpl, ScalarRefImpl}; use crate::util::hash_util::Crc32FastBuilder; use crate::util::row_id::extract_vnode_id_from_row_id; @@ -96,6 +96,10 @@ impl VirtualNode { self.0 as _ } + pub const fn to_datum(self) -> Datum { + Some(ScalarImpl::Int16(self.to_scalar())) + } + /// Creates a virtual node from the given big-endian bytes representation. pub const fn from_be_bytes(bytes: [u8; Self::SIZE]) -> Self { let inner = VirtualNodeInner::from_be_bytes(bytes); diff --git a/src/storage/src/table/batch_table/storage_table.rs b/src/storage/src/table/batch_table/storage_table.rs index 922f0c22060e..0c55bf79ffa4 100644 --- a/src/storage/src/table/batch_table/storage_table.rs +++ b/src/storage/src/table/batch_table/storage_table.rs @@ -335,6 +335,10 @@ impl StorageTableInner { pub fn table_id(&self) -> TableId { self.table_id } + + pub fn vnodes(&self) -> &Arc { + self.distribution.vnodes() + } } /// Point get impl StorageTableInner { diff --git a/src/storage/src/table/mod.rs b/src/storage/src/table/mod.rs index f2822ca88022..105a2f1f1860 100644 --- a/src/storage/src/table/mod.rs +++ b/src/storage/src/table/mod.rs @@ -115,11 +115,15 @@ impl TableDistribution { Self::singleton_vnode_bitmap_ref().clone() } - pub fn all_vnodes() -> Arc { + pub fn all_vnodes_ref() -> &'static Arc { /// A bitmap that all vnodes are set. static ALL_VNODES: LazyLock> = LazyLock::new(|| Bitmap::ones(VirtualNode::COUNT).into()); - ALL_VNODES.clone() + &ALL_VNODES + } + + pub fn all_vnodes() -> Arc { + Self::all_vnodes_ref().clone() } /// Distribution that accesses all vnodes, mainly used for tests. @@ -272,10 +276,9 @@ pub fn compute_vnode(row: impl Row, indices: &[usize], vnodes: &Bitmap) -> Virtu vnode } -pub fn get_vnode_from_row(row: impl Row, index: usize, _vnodes: &Bitmap) -> VirtualNode { +pub fn get_vnode_from_row(row: impl Row, index: usize, vnodes: &Bitmap) -> VirtualNode { let vnode = VirtualNode::from_datum(row.datum_at(index)); - // TODO: enable this check when `WatermarkFilterExecutor` use `StorageTable` to read global max watermark - // check_vnode_is_set(vnode, vnodes); + check_vnode_is_set(vnode, vnodes); tracing::debug!(target: "events::storage::storage_table", "get vnode from row: {:?} vnode column index {:?} => {}", row, index, vnode); diff --git a/src/stream/src/executor/watermark_filter.rs b/src/stream/src/executor/watermark_filter.rs index 454f69582981..bca8f6f872dd 100644 --- a/src/stream/src/executor/watermark_filter.rs +++ b/src/stream/src/executor/watermark_filter.rs @@ -13,12 +13,13 @@ // limitations under the License. use std::cmp; +use std::ops::Deref; +use std::sync::Arc; -use futures::future::join_all; +use futures::future::{try_join, try_join_all}; use futures::StreamExt; use futures_async_stream::try_stream; -use itertools::Itertools; -use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; +use risingwave_common::hash::VnodeBitmapExt; use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::types::{DataType, DefaultOrd, ScalarImpl}; use risingwave_common::{bail, row}; @@ -27,7 +28,10 @@ use risingwave_expr::expr::{ NonStrictExpression, }; use risingwave_expr::Result as ExprResult; +use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_pb::expr::expr_node::Type; +use risingwave_storage::table::batch_table::storage_table::StorageTable; +use risingwave_storage::table::TableDistribution; use risingwave_storage::StateStore; use super::error::StreamExecutorError; @@ -52,6 +56,7 @@ pub struct WatermarkFilterExecutor { /// The column we should generate watermark and filter on. event_time_col_idx: usize, table: StateTable, + global_watermark_table: StorageTable, } impl WatermarkFilterExecutor { @@ -62,6 +67,7 @@ impl WatermarkFilterExecutor { watermark_expr: NonStrictExpression, event_time_col_idx: usize, table: StateTable, + global_watermark_table: StorageTable, ) -> Self { Self { ctx, @@ -70,6 +76,7 @@ impl WatermarkFilterExecutor { watermark_expr, event_time_col_idx, table, + global_watermark_table, } } } @@ -106,6 +113,7 @@ impl WatermarkFilterExecutor { ctx, info, mut table, + mut global_watermark_table, } = *self; let eval_error_report = ActorEvalErrorReport { @@ -126,7 +134,8 @@ impl WatermarkFilterExecutor { yield Message::Barrier(first_barrier); // Initiate and yield the first watermark. - let mut current_watermark = Self::get_global_max_watermark(&table).await?; + let mut current_watermark = + Self::get_global_max_watermark(&table, &global_watermark_table).await?; let mut last_checkpoint_watermark = None; @@ -231,12 +240,19 @@ impl WatermarkFilterExecutor { Message::Barrier(barrier) => { // Update the vnode bitmap for state tables of all agg calls if asked. if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(ctx.id) { + let other_vnodes_bitmap = Arc::new( + (!(*vnode_bitmap).clone()) + & TableDistribution::all_vnodes_ref().deref(), + ); + let _ = global_watermark_table.update_vnode_bitmap(other_vnodes_bitmap); let (previous_vnode_bitmap, _cache_may_stale) = table.update_vnode_bitmap(vnode_bitmap.clone()); // Take the global max watermark when scaling happens. if previous_vnode_bitmap != vnode_bitmap { - current_watermark = Self::get_global_max_watermark(&table).await?; + current_watermark = + Self::get_global_max_watermark(&table, &global_watermark_table) + .await?; } } @@ -262,7 +278,8 @@ impl WatermarkFilterExecutor { if idle_input { // Align watermark let global_max_watermark = - Self::get_global_max_watermark(&table).await?; + Self::get_global_max_watermark(&table, &global_watermark_table) + .await?; current_watermark = if let Some(global_max_watermark) = global_max_watermark.clone() @@ -314,29 +331,45 @@ impl WatermarkFilterExecutor { /// If the returned if `Ok(None)`, it means there is no global max watermark. async fn get_global_max_watermark( table: &StateTable, + global_watermark_table: &StorageTable, ) -> StreamExecutorResult> { - let watermark_iter_futures = (0..VirtualNode::COUNT).map(|vnode| async move { - let pk = row::once(Some(ScalarImpl::Int16(vnode as _))); - let watermark_row: Option = table.get_row(pk).await?; - match watermark_row { - Some(row) => { - if row.len() == 1 { - Ok::<_, StreamExecutorError>(row[0].to_owned()) - } else { - bail!("The watermark row should only contains 1 datum"); - } + let epoch = table.epoch(); + let handle_watermark_row = |watermark_row: Option| match watermark_row { + Some(row) => { + if row.len() == 1 { + Ok::<_, StreamExecutorError>(row[0].to_owned()) + } else { + bail!("The watermark row should only contains 1 datum"); } - _ => Ok(None), } + _ => Ok(None), + }; + let global_watermark_iter_futures = + global_watermark_table + .vnodes() + .iter_vnodes() + .map(|vnode| async move { + let pk = row::once(vnode.to_datum()); + let watermark_row: Option = global_watermark_table + .get_row(pk, HummockReadEpoch::NoWait(epoch)) + .await?; + handle_watermark_row(watermark_row) + }); + let local_watermark_iter_futures = table.vnodes().iter_vnodes().map(|vnode| async move { + let pk = row::once(vnode.to_datum()); + let watermark_row: Option = table.get_row(pk).await?; + handle_watermark_row(watermark_row) }); - let watermarks: Vec<_> = join_all(watermark_iter_futures) - .await - .into_iter() - .try_collect()?; + let (global_watermarks, local_watermarks) = try_join( + try_join_all(global_watermark_iter_futures), + try_join_all(local_watermark_iter_futures), + ) + .await?; // Return the minimal value if the remote max watermark is Null. - let watermark = watermarks + let watermark = global_watermarks .into_iter() + .chain(local_watermarks.into_iter()) .flatten() .max_by(DefaultOrd::default_cmp); @@ -346,11 +379,15 @@ impl WatermarkFilterExecutor { #[cfg(test)] mod tests { + use itertools::Itertools; use risingwave_common::array::StreamChunk; - use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId}; + use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableDesc}; use risingwave_common::test_prelude::StreamChunkTestExt; use risingwave_common::types::Date; use risingwave_common::util::sort_util::OrderType; + use risingwave_pb::catalog::Table; + use risingwave_pb::common::ColumnOrder; + use risingwave_pb::plan_common::PbColumnCatalog; use risingwave_storage::memory::MemoryStateStore; use risingwave_storage::table::TableDistribution; @@ -368,24 +405,54 @@ mod tests { pk_indices: &[usize], val_indices: &[usize], table_id: u32, - ) -> StateTable { - let column_descs = data_types - .iter() - .enumerate() - .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone())) - .collect_vec(); + ) -> (StorageTable, StateTable) { + let table = Table { + id: table_id, + columns: data_types + .iter() + .enumerate() + .map(|(id, data_type)| PbColumnCatalog { + column_desc: Some( + ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()) + .to_protobuf(), + ), + is_hidden: false, + }) + .collect(), + pk: pk_indices + .iter() + .zip_eq(order_types.iter()) + .map(|(pk, order)| ColumnOrder { + column_index: *pk as _, + order_type: Some(order.to_protobuf()), + }) + .collect(), + distribution_key: vec![], + stream_key: vec![0], + append_only: false, + vnode_col_index: Some(0), + value_indices: val_indices.iter().map(|i| *i as _).collect(), + read_prefix_len_hint: 0, + ..Default::default() + }; // TODO: use consistent operations for watermark filter after we have upsert. - StateTable::new_with_distribution_inconsistent_op( - mem_state, - TableId::new(table_id), - column_descs, - order_types.to_vec(), - pk_indices.to_vec(), - TableDistribution::all(vec![0]), - Some(val_indices.to_vec()), + let state_table = StateTable::from_table_catalog_inconsistent_op( + &table, + mem_state.clone(), + Some(TableDistribution::all_vnodes()), ) - .await + .await; + + let desc = TableDesc::from_pb_table(&table).try_to_protobuf().unwrap(); + + let storage_table = StorageTable::new_partial( + mem_state, + val_indices.iter().map(|i| ColumnId::new(*i as _)).collect(), + Some(TableDistribution::all_vnodes()), + &desc, + ); + (storage_table, state_table) } async fn create_watermark_filter_executor( @@ -400,7 +467,7 @@ mod tests { let watermark_expr = build_from_pretty("(subtract:timestamp $1:timestamp 1day:interval)"); - let table = create_in_memory_state_table( + let (storage_table, table) = create_in_memory_state_table( mem_state, &[DataType::Int16, WATERMARK_TYPE], &[OrderType::ascending()], @@ -426,6 +493,7 @@ mod tests { watermark_expr, 1, table, + storage_table, ) .boxed(), tx, diff --git a/src/stream/src/from_proto/watermark_filter.rs b/src/stream/src/from_proto/watermark_filter.rs index ed44a90480a6..44618e812fd8 100644 --- a/src/stream/src/from_proto/watermark_filter.rs +++ b/src/stream/src/from_proto/watermark_filter.rs @@ -12,10 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::ops::Deref; use std::sync::Arc; +use risingwave_common::catalog::{ColumnId, TableDesc}; use risingwave_expr::expr::build_non_strict_from_prost; use risingwave_pb::stream_plan::WatermarkFilterNode; +use risingwave_storage::table::batch_table::storage_table::StorageTable; +use risingwave_storage::table::TableDistribution; use super::*; use crate::common::table::state_table::StateTable; @@ -46,6 +50,17 @@ impl ExecutorBuilder for WatermarkFilterBuilder { // TODO: may use consistent op for watermark filter after we have upsert. let [table]: [_; 1] = node.get_tables().clone().try_into().unwrap(); + let desc = TableDesc::from_pb_table(&table).try_to_protobuf()?; + let column_ids = desc + .value_indices + .iter() + .map(|i| ColumnId::new(*i as _)) + .collect_vec(); + let other_vnodes = + Arc::new((!(*vnodes).clone()) & TableDistribution::all_vnodes_ref().deref()); + let global_watermark_table = + StorageTable::new_partial(store.clone(), column_ids, Some(other_vnodes), &desc); + let table = StateTable::from_table_catalog_inconsistent_op(&table, store, Some(vnodes)).await; @@ -56,6 +71,7 @@ impl ExecutorBuilder for WatermarkFilterBuilder { watermark_expr, event_time_col_idx, table, + global_watermark_table, ) .boxed()) } From 9ebf73443d198268fb845cf2e028ee208e6f68ec Mon Sep 17 00:00:00 2001 From: xfz <73645462+xuefengze@users.noreply.github.com> Date: Tue, 16 Jan 2024 16:39:27 +0800 Subject: [PATCH 4/7] test: add `upsert` test for `doris/starrocks-sink` integration test (#14587) --- integration_tests/doris-sink/README.md | 14 +++------- integration_tests/doris-sink/create_mv.sql | 8 ++++++ integration_tests/doris-sink/create_sink.sql | 15 ++++++++++- .../doris-sink/create_source.sql | 17 ++++++++++++ .../doris-sink/docker-compose.yml | 9 +++++++ .../doris-sink/doris_prepare.sql | 10 +++++++ integration_tests/doris-sink/sink_check.py | 26 ++++++++++++++++++- .../doris-sink/update_delete.sql | 5 ++++ .../doris-sink/upsert/create_mv.sql | 7 ----- .../doris-sink/upsert/create_sink.sql | 12 --------- .../doris-sink/upsert/create_table.sql | 10 ------- .../upsert/insert_update_delete.sql | 8 ------ integration_tests/starrocks-sink/README.md | 14 +++------- .../starrocks-sink/create_mv.sql | 8 ++++++ .../starrocks-sink/create_sink.sql | 17 +++++++++++- .../starrocks-sink/create_source.sql | 17 ++++++++++++ .../starrocks-sink/docker-compose.yml | 6 +++++ .../starrocks-sink/sink_check.py | 26 ++++++++++++++++++- .../starrocks-sink/starrocks_prepare.sql | 8 ++++++ .../starrocks-sink/update_delete.sql | 5 ++++ .../starrocks-sink/upsert/create_mv.sql | 7 ----- .../starrocks-sink/upsert/create_sink.sql | 14 ---------- .../starrocks-sink/upsert/create_table.sql | 10 ------- .../upsert/insert_update_delete.sql | 8 ------ 24 files changed, 181 insertions(+), 100 deletions(-) create mode 100644 integration_tests/doris-sink/update_delete.sql delete mode 100644 integration_tests/doris-sink/upsert/create_mv.sql delete mode 100644 integration_tests/doris-sink/upsert/create_sink.sql delete mode 100644 integration_tests/doris-sink/upsert/create_table.sql delete mode 100644 integration_tests/doris-sink/upsert/insert_update_delete.sql create mode 100644 integration_tests/starrocks-sink/update_delete.sql delete mode 100644 integration_tests/starrocks-sink/upsert/create_mv.sql delete mode 100644 integration_tests/starrocks-sink/upsert/create_sink.sql delete mode 100644 integration_tests/starrocks-sink/upsert/create_table.sql delete mode 100644 integration_tests/starrocks-sink/upsert/insert_update_delete.sql diff --git a/integration_tests/doris-sink/README.md b/integration_tests/doris-sink/README.md index 75baa2d2449f..b62c2d2e3adc 100644 --- a/integration_tests/doris-sink/README.md +++ b/integration_tests/doris-sink/README.md @@ -45,16 +45,10 @@ GRANT ALL ON *.* TO 'users'@'%'; 4. Execute the SQL queries in sequence: -- append-only sql: - - create_source.sql - - create_mv.sql - - create_sink.sql - -- upsert sql: - - upsert/create_table.sql - - upsert/create_mv.sql - - upsert/create_sink.sql - - upsert/insert_update_delete.sql +- create_source.sql +- create_mv.sql +- create_sink.sql +- update_delete.sql We only support `upsert` with doris' `UNIQUE KEY` diff --git a/integration_tests/doris-sink/create_mv.sql b/integration_tests/doris-sink/create_mv.sql index c367e6f2baa9..6e466703b076 100644 --- a/integration_tests/doris-sink/create_mv.sql +++ b/integration_tests/doris-sink/create_mv.sql @@ -5,3 +5,11 @@ SELECT event_timestamp AT TIME ZONE 'Asia/Shanghai' as event_timestamp_local FROM user_behaviors; + +CREATE MATERIALIZED VIEW upsert_bhv_mv AS +SELECT + user_id, + target_id, + event_timestamp AT TIME ZONE 'Asia/Shanghai' as event_timestamp_local +FROM + upsert_user_behaviors; diff --git a/integration_tests/doris-sink/create_sink.sql b/integration_tests/doris-sink/create_sink.sql index fa0cfddf7bf1..7cd1ac24857e 100644 --- a/integration_tests/doris-sink/create_sink.sql +++ b/integration_tests/doris-sink/create_sink.sql @@ -9,4 +9,17 @@ FROM doris.database = 'demo', doris.table='demo_bhv_table', force_append_only='true' -); \ No newline at end of file +); + +CREATE SINK upsert_doris_sink +FROM + upsert_bhv_mv WITH ( + connector = 'doris', + type = 'upsert', + doris.url = 'http://fe:8030', + doris.user = 'users', + doris.password = '123456', + doris.database = 'demo', + doris.table='upsert_table', + primary_key = 'user_id' +); diff --git a/integration_tests/doris-sink/create_source.sql b/integration_tests/doris-sink/create_source.sql index ed7c02341638..0e4230851112 100644 --- a/integration_tests/doris-sink/create_source.sql +++ b/integration_tests/doris-sink/create_source.sql @@ -14,3 +14,20 @@ CREATE table user_behaviors ( fields.user_id.end = '1000', datagen.rows.per.second = '100' ) FORMAT PLAIN ENCODE JSON; + +CREATE table upsert_user_behaviors ( + user_id int, + target_id VARCHAR, + target_type VARCHAR, + event_timestamp TIMESTAMPTZ, + behavior_type VARCHAR, + parent_target_type VARCHAR, + parent_target_id VARCHAR, + PRIMARY KEY(user_id) +); + +INSERT INTO upsert_user_behaviors VALUES + (1,'1','1','2020-01-01T01:01:01Z','1','1','1'), + (2,'2','2','2020-01-01T01:01:02Z','2','2','2'), + (3,'3','3','2020-01-01T01:01:03Z','3','3','3'), + (4,'4','4','2020-01-01T01:01:04Z','4','4','4'); diff --git a/integration_tests/doris-sink/docker-compose.yml b/integration_tests/doris-sink/docker-compose.yml index 74fecbee2baa..fc7cfd751e98 100644 --- a/integration_tests/doris-sink/docker-compose.yml +++ b/integration_tests/doris-sink/docker-compose.yml @@ -74,6 +74,15 @@ services: networks: mynetwork: ipv4_address: 172.21.0.9 + postgres: + image: postgres:latest + command: tail -f /dev/null + volumes: + - "./update_delete.sql:/update_delete.sql" + restart: on-failure + networks: + mynetwork: + ipv4_address: 172.21.0.11 volumes: risingwave-standalone: external: false diff --git a/integration_tests/doris-sink/doris_prepare.sql b/integration_tests/doris-sink/doris_prepare.sql index c95e8ac3f9b3..b65e419999ca 100644 --- a/integration_tests/doris-sink/doris_prepare.sql +++ b/integration_tests/doris-sink/doris_prepare.sql @@ -11,5 +11,15 @@ PROPERTIES ( "replication_allocation" = "tag.location.default: 1" ); +CREATE table upsert_table( + user_id int, + target_id text, + event_timestamp_local datetime +) UNIQUE KEY(`user_id`) +DISTRIBUTED BY HASH(`user_id`) BUCKETS 1 +PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" +); + CREATE USER 'users'@'%' IDENTIFIED BY '123456'; GRANT ALL ON *.* TO 'users'@'%'; diff --git a/integration_tests/doris-sink/sink_check.py b/integration_tests/doris-sink/sink_check.py index 39109f4194fe..510cc867dcda 100644 --- a/integration_tests/doris-sink/sink_check.py +++ b/integration_tests/doris-sink/sink_check.py @@ -1,7 +1,7 @@ import subprocess import sys -relations = ['demo.demo_bhv_table'] +relations = ['demo.demo_bhv_table', 'demo.upsert_table'] failed_cases = [] for rel in relations: @@ -18,6 +18,30 @@ if rows < 1: failed_cases.append(rel) +# update data +subprocess.run(["docker", "compose", "exec", "postgres", "bash", "-c", "psql -h risingwave-standalone -p 4566 -d dev -U root -f update_delete.sql"], check=True) + +# delete +sql = f"SELECT COUNT(*) FROM demo.upsert_table;" +command = f'mysql -uroot -P9030 -hfe -e "{sql}"' +output = subprocess.check_output( + ["docker", "compose", "exec", "mysql", "bash", "-c", command]) +rows = int(output.decode('utf-8').split('\n')[1]) +print(f"{rows} rows in demo.upsert_table") +if rows != 3: + print(f"rows expected 3, get {rows}") + failed_cases.append("delete demo.upsert_table") + +# update +sql = f"SELECT target_id FROM demo.upsert_table WHERE user_id = 3;" +command = f'mysql -uroot -P9030 -hfe -e "{sql}"' +output = subprocess.check_output( + ["docker", "compose", "exec", "mysql", "bash", "-c", command]) +id = int(output.decode('utf-8').split('\n')[1]) +if id != 30: + print(f"target_id expected 30, get {id}") + failed_cases.append("update demo.upsert_table") + if len(failed_cases) != 0: print(f"Data check failed for case {failed_cases}") sys.exit(1) diff --git a/integration_tests/doris-sink/update_delete.sql b/integration_tests/doris-sink/update_delete.sql new file mode 100644 index 000000000000..adabd5163ef4 --- /dev/null +++ b/integration_tests/doris-sink/update_delete.sql @@ -0,0 +1,5 @@ +DELETE FROM upsert_user_behaviors WHERE user_id = 2; + +UPDATE upsert_user_behaviors SET target_id = 30 WHERE user_id = 3; + +FLUSH; diff --git a/integration_tests/doris-sink/upsert/create_mv.sql b/integration_tests/doris-sink/upsert/create_mv.sql deleted file mode 100644 index c367e6f2baa9..000000000000 --- a/integration_tests/doris-sink/upsert/create_mv.sql +++ /dev/null @@ -1,7 +0,0 @@ -CREATE MATERIALIZED VIEW bhv_mv AS -SELECT - user_id, - target_id, - event_timestamp AT TIME ZONE 'Asia/Shanghai' as event_timestamp_local -FROM - user_behaviors; diff --git a/integration_tests/doris-sink/upsert/create_sink.sql b/integration_tests/doris-sink/upsert/create_sink.sql deleted file mode 100644 index e7bd5445ba55..000000000000 --- a/integration_tests/doris-sink/upsert/create_sink.sql +++ /dev/null @@ -1,12 +0,0 @@ -CREATE SINK bhv_doris_sink -FROM - bhv_mv WITH ( - connector = 'doris', - type = 'upsert', - doris.url = 'http://fe:8030', - doris.user = 'users', - doris.password = '123456', - doris.database = 'demo', - doris.table='demo_bhv_table', - primary_key = 'user_id' -); \ No newline at end of file diff --git a/integration_tests/doris-sink/upsert/create_table.sql b/integration_tests/doris-sink/upsert/create_table.sql deleted file mode 100644 index c6cfa87eed3c..000000000000 --- a/integration_tests/doris-sink/upsert/create_table.sql +++ /dev/null @@ -1,10 +0,0 @@ -CREATE table user_behaviors ( - user_id int, - target_id VARCHAR, - target_type VARCHAR, - event_timestamp TIMESTAMPTZ, - behavior_type VARCHAR, - parent_target_type VARCHAR, - parent_target_id VARCHAR, - PRIMARY KEY(user_id) -); diff --git a/integration_tests/doris-sink/upsert/insert_update_delete.sql b/integration_tests/doris-sink/upsert/insert_update_delete.sql deleted file mode 100644 index f21353c16115..000000000000 --- a/integration_tests/doris-sink/upsert/insert_update_delete.sql +++ /dev/null @@ -1,8 +0,0 @@ -INSERT INTO user_behaviors VALUES(1,'1','1','2020-01-01T01:01:01Z','1','1','1'), -(2,'2','2','2020-01-01T01:01:02Z','2','2','2'), -(3,'3','3','2020-01-01T01:01:03Z','3','3','3'), -(4,'4','4','2020-01-01T01:01:04Z','4','4','4'); - -DELETE FROM user_behaviors WHERE user_id = 2; - -UPDATE user_behaviors SET target_id = 30 WHERE user_id = 3; diff --git a/integration_tests/starrocks-sink/README.md b/integration_tests/starrocks-sink/README.md index 817ab57481e4..30cb79623d1e 100644 --- a/integration_tests/starrocks-sink/README.md +++ b/integration_tests/starrocks-sink/README.md @@ -37,16 +37,10 @@ GRANT ALL ON *.* TO 'users'@'%'; 3. Execute the SQL queries in sequence: -- append-only sql: - - create_source.sql - - create_mv.sql - - create_sink.sql - -- upsert sql: - - upsert/create_table.sql - - upsert/create_mv.sql - - upsert/create_sink.sql - - upsert/insert_update_delete.sql +- create_source.sql +- create_mv.sql +- create_sink.sql +- update_delete.sql We only support `upsert` with starrocks' `PRIMARY KEY` diff --git a/integration_tests/starrocks-sink/create_mv.sql b/integration_tests/starrocks-sink/create_mv.sql index c367e6f2baa9..6e466703b076 100644 --- a/integration_tests/starrocks-sink/create_mv.sql +++ b/integration_tests/starrocks-sink/create_mv.sql @@ -5,3 +5,11 @@ SELECT event_timestamp AT TIME ZONE 'Asia/Shanghai' as event_timestamp_local FROM user_behaviors; + +CREATE MATERIALIZED VIEW upsert_bhv_mv AS +SELECT + user_id, + target_id, + event_timestamp AT TIME ZONE 'Asia/Shanghai' as event_timestamp_local +FROM + upsert_user_behaviors; diff --git a/integration_tests/starrocks-sink/create_sink.sql b/integration_tests/starrocks-sink/create_sink.sql index 56d1b227512d..f2f5b5eac965 100644 --- a/integration_tests/starrocks-sink/create_sink.sql +++ b/integration_tests/starrocks-sink/create_sink.sql @@ -11,4 +11,19 @@ FROM starrocks.database = 'demo', starrocks.table = 'demo_bhv_table', force_append_only='true' -); \ No newline at end of file +); + +CREATE SINK upsert_starrocks_sink +FROM + upsert_bhv_mv WITH ( + connector = 'starrocks', + type = 'upsert', + starrocks.host = 'starrocks-fe', + starrocks.mysqlport = '9030', + starrocks.httpport = '8030', + starrocks.user = 'users', + starrocks.password = '123456', + starrocks.database = 'demo', + starrocks.table = 'upsert_table', + primary_key = 'user_id' +); diff --git a/integration_tests/starrocks-sink/create_source.sql b/integration_tests/starrocks-sink/create_source.sql index ed7c02341638..0e4230851112 100644 --- a/integration_tests/starrocks-sink/create_source.sql +++ b/integration_tests/starrocks-sink/create_source.sql @@ -14,3 +14,20 @@ CREATE table user_behaviors ( fields.user_id.end = '1000', datagen.rows.per.second = '100' ) FORMAT PLAIN ENCODE JSON; + +CREATE table upsert_user_behaviors ( + user_id int, + target_id VARCHAR, + target_type VARCHAR, + event_timestamp TIMESTAMPTZ, + behavior_type VARCHAR, + parent_target_type VARCHAR, + parent_target_id VARCHAR, + PRIMARY KEY(user_id) +); + +INSERT INTO upsert_user_behaviors VALUES + (1,'1','1','2020-01-01T01:01:01Z','1','1','1'), + (2,'2','2','2020-01-01T01:01:02Z','2','2','2'), + (3,'3','3','2020-01-01T01:01:03Z','3','3','3'), + (4,'4','4','2020-01-01T01:01:04Z','4','4','4'); diff --git a/integration_tests/starrocks-sink/docker-compose.yml b/integration_tests/starrocks-sink/docker-compose.yml index 41dabac20dc7..4210206aa770 100644 --- a/integration_tests/starrocks-sink/docker-compose.yml +++ b/integration_tests/starrocks-sink/docker-compose.yml @@ -52,6 +52,12 @@ services: extends: file: ../../docker/docker-compose.yml service: prometheus-0 + postgres: + image: postgres:latest + command: tail -f /dev/null + volumes: + - "./update_delete.sql:/update_delete.sql" + restart: on-failure volumes: risingwave-standalone: external: false diff --git a/integration_tests/starrocks-sink/sink_check.py b/integration_tests/starrocks-sink/sink_check.py index 699304854dc1..7ab27e1e01cd 100644 --- a/integration_tests/starrocks-sink/sink_check.py +++ b/integration_tests/starrocks-sink/sink_check.py @@ -1,7 +1,7 @@ import subprocess import sys -relations = ['demo.demo_bhv_table'] +relations = ['demo.demo_bhv_table', 'demo.upsert_table'] failed_cases = [] for rel in relations: @@ -18,6 +18,30 @@ if rows < 1: failed_cases.append(rel) +# update data +subprocess.run(["docker", "compose", "exec", "postgres", "bash", "-c", "psql -h risingwave-standalone -p 4566 -d dev -U root -f update_delete.sql"], check=True) + +# delete +sql = f"SELECT COUNT(*) FROM demo.upsert_table;" +command = f'mysql -uroot -P9030 -h127.0.0.1 -e "{sql}"' +output = subprocess.check_output( + ["docker", "compose", "exec", "starrocks-fe", "bash", "-c", command]) +rows = int(output.decode('utf-8').split('\n')[1]) +print(f"{rows} rows in demo.upsert_table") +if rows != 3: + print(f"rows expected 3, get {rows}") + failed_cases.append("delete demo.upsert_table") + +# update +sql = f"SELECT target_id FROM demo.upsert_table WHERE user_id = 3;" +command = f'mysql -uroot -P9030 -h127.0.0.1 -e "{sql}"' +output = subprocess.check_output( + ["docker", "compose", "exec", "starrocks-fe", "bash", "-c", command]) +id = int(output.decode('utf-8').split('\n')[1]) +if id != 30: + print(f"target_id expected 30, get {id}") + failed_cases.append("update demo.upsert_table") + if len(failed_cases) != 0: print(f"Data check failed for case {failed_cases}") sys.exit(1) diff --git a/integration_tests/starrocks-sink/starrocks_prepare.sql b/integration_tests/starrocks-sink/starrocks_prepare.sql index aadaf85289b3..6b304534061f 100644 --- a/integration_tests/starrocks-sink/starrocks_prepare.sql +++ b/integration_tests/starrocks-sink/starrocks_prepare.sql @@ -9,5 +9,13 @@ CREATE table demo_bhv_table( PRIMARY KEY(`user_id`) DISTRIBUTED BY HASH(`user_id`) properties("replication_num" = "1"); +CREATE table upsert_table( + user_id int, + target_id text, + event_timestamp_local datetime +) ENGINE=OLAP +PRIMARY KEY(`user_id`) +DISTRIBUTED BY HASH(`user_id`) properties("replication_num" = "1"); + CREATE USER 'users'@'%' IDENTIFIED BY '123456'; GRANT ALL ON *.* TO 'users'@'%'; diff --git a/integration_tests/starrocks-sink/update_delete.sql b/integration_tests/starrocks-sink/update_delete.sql new file mode 100644 index 000000000000..adabd5163ef4 --- /dev/null +++ b/integration_tests/starrocks-sink/update_delete.sql @@ -0,0 +1,5 @@ +DELETE FROM upsert_user_behaviors WHERE user_id = 2; + +UPDATE upsert_user_behaviors SET target_id = 30 WHERE user_id = 3; + +FLUSH; diff --git a/integration_tests/starrocks-sink/upsert/create_mv.sql b/integration_tests/starrocks-sink/upsert/create_mv.sql deleted file mode 100644 index c367e6f2baa9..000000000000 --- a/integration_tests/starrocks-sink/upsert/create_mv.sql +++ /dev/null @@ -1,7 +0,0 @@ -CREATE MATERIALIZED VIEW bhv_mv AS -SELECT - user_id, - target_id, - event_timestamp AT TIME ZONE 'Asia/Shanghai' as event_timestamp_local -FROM - user_behaviors; diff --git a/integration_tests/starrocks-sink/upsert/create_sink.sql b/integration_tests/starrocks-sink/upsert/create_sink.sql deleted file mode 100644 index d7557bc1bd4f..000000000000 --- a/integration_tests/starrocks-sink/upsert/create_sink.sql +++ /dev/null @@ -1,14 +0,0 @@ -CREATE SINK bhv_starrocks_sink -FROM - bhv_mv WITH ( - connector = 'starrocks', - type = 'upsert', - starrocks.host = 'starrocks-fe', - starrocks.mysqlport = '9030', - starrocks.httpport = '8030', - starrocks.user = 'users', - starrocks.password = '123456', - starrocks.database = 'demo', - starrocks.table = 'demo_bhv_table', - primary_key = 'user_id' -); \ No newline at end of file diff --git a/integration_tests/starrocks-sink/upsert/create_table.sql b/integration_tests/starrocks-sink/upsert/create_table.sql deleted file mode 100644 index c6cfa87eed3c..000000000000 --- a/integration_tests/starrocks-sink/upsert/create_table.sql +++ /dev/null @@ -1,10 +0,0 @@ -CREATE table user_behaviors ( - user_id int, - target_id VARCHAR, - target_type VARCHAR, - event_timestamp TIMESTAMPTZ, - behavior_type VARCHAR, - parent_target_type VARCHAR, - parent_target_id VARCHAR, - PRIMARY KEY(user_id) -); diff --git a/integration_tests/starrocks-sink/upsert/insert_update_delete.sql b/integration_tests/starrocks-sink/upsert/insert_update_delete.sql deleted file mode 100644 index f21353c16115..000000000000 --- a/integration_tests/starrocks-sink/upsert/insert_update_delete.sql +++ /dev/null @@ -1,8 +0,0 @@ -INSERT INTO user_behaviors VALUES(1,'1','1','2020-01-01T01:01:01Z','1','1','1'), -(2,'2','2','2020-01-01T01:01:02Z','2','2','2'), -(3,'3','3','2020-01-01T01:01:03Z','3','3','3'), -(4,'4','4','2020-01-01T01:01:04Z','4','4','4'); - -DELETE FROM user_behaviors WHERE user_id = 2; - -UPDATE user_behaviors SET target_id = 30 WHERE user_id = 3; From c7dada8405af2df0d12cb795320f7ec48cb45a87 Mon Sep 17 00:00:00 2001 From: TennyZhuang Date: Tue, 16 Jan 2024 16:53:46 +0800 Subject: [PATCH 5/7] refactor(utils): make rw_futures_util a standalone crate (#14595) Signed-off-by: TennyZhuang Co-authored-by: TennyZhuang --- Cargo.lock | 19 +++++ Cargo.toml | 2 + src/batch/Cargo.toml | 1 + src/batch/src/executor/generic_exchange.rs | 2 +- src/batch/src/executor/row_seq_scan.rs | 2 +- src/batch/src/executor/union.rs | 2 +- src/common/Cargo.toml | 1 + src/common/src/util/mod.rs | 5 -- src/connector/Cargo.toml | 1 + src/connector/src/sink/remote.rs | 2 +- src/connector/src/sink/writer.rs | 2 +- src/frontend/Cargo.toml | 1 + .../src/scheduler/distributed/stage.rs | 2 +- src/jni_core/Cargo.toml | 1 + src/jni_core/src/hummock_iterator.rs | 2 +- src/meta/Cargo.toml | 1 + src/meta/src/barrier/rpc.rs | 2 +- src/meta/src/hummock/manager/mod.rs | 2 +- .../src/manager/sink_coordination/manager.rs | 2 +- src/rpc_client/Cargo.toml | 1 + src/rpc_client/src/lib.rs | 2 +- src/source/Cargo.toml | 1 + src/source/src/connector_source.rs | 2 +- src/stream/Cargo.toml | 1 + src/stream/src/executor/project.rs | 2 +- src/utils/futures_util/Cargo.toml | 22 ++++++ .../futures_util/src}/buffered_with_fence.rs | 71 +++++++------------ src/utils/futures_util/src/lib.rs | 63 ++++++++++++++++ .../mod.rs => utils/futures_util/src/misc.rs} | 9 +-- src/workspace-hack/Cargo.toml | 1 + 30 files changed, 157 insertions(+), 70 deletions(-) create mode 100644 src/utils/futures_util/Cargo.toml rename src/{common/src/util/future_utils => utils/futures_util/src}/buffered_with_fence.rs (80%) create mode 100644 src/utils/futures_util/src/lib.rs rename src/{common/src/util/future_utils/mod.rs => utils/futures_util/src/misc.rs} (94%) diff --git a/Cargo.lock b/Cargo.lock index 01792a8b108d..1262bfb13bc8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8552,6 +8552,7 @@ dependencies = [ "risingwave_rpc_client", "risingwave_source", "risingwave_storage", + "rw_futures_util", "scopeguard", "serde_json", "task_stats_alloc", @@ -8730,6 +8731,7 @@ dependencies = [ "risingwave_pb", "rust_decimal", "rusty-fork", + "rw_futures_util", "ryu", "serde", "serde_bytes", @@ -8974,6 +8976,7 @@ dependencies = [ "risingwave_pb", "risingwave_rpc_client", "rust_decimal", + "rw_futures_util", "serde", "serde_derive", "serde_json", @@ -9216,6 +9219,7 @@ dependencies = [ "risingwave_storage", "risingwave_udf", "risingwave_variables", + "rw_futures_util", "serde", "serde_json", "sha2", @@ -9334,6 +9338,7 @@ dependencies = [ "risingwave_object_store", "risingwave_pb", "risingwave_storage", + "rw_futures_util", "serde", "serde_json", "thiserror", @@ -9411,6 +9416,7 @@ dependencies = [ "risingwave_rpc_client", "risingwave_sqlparser", "risingwave_test_runner", + "rw_futures_util", "scopeguard", "sea-orm", "serde", @@ -9616,6 +9622,7 @@ dependencies = [ "risingwave_error", "risingwave_hummock_sdk", "risingwave_pb", + "rw_futures_util", "static_assertions", "thiserror", "thiserror-ext", @@ -9723,6 +9730,7 @@ dependencies = [ "risingwave_common", "risingwave_connector", "risingwave_pb", + "rw_futures_util", "tempfile", "tracing", "workspace-hack", @@ -9914,6 +9922,7 @@ dependencies = [ "risingwave_rpc_client", "risingwave_source", "risingwave_storage", + "rw_futures_util", "serde", "serde_json", "serde_yaml", @@ -10230,6 +10239,15 @@ dependencies = [ "wait-timeout", ] +[[package]] +name = "rw_futures_util" +version = "0.0.0" +dependencies = [ + "futures", + "pin-project-lite", + "tokio", +] + [[package]] name = "ryu" version = "1.0.15" @@ -13604,6 +13622,7 @@ dependencies = [ "futures", "futures-channel", "futures-core", + "futures-executor", "futures-io", "futures-sink", "futures-task", diff --git a/Cargo.toml b/Cargo.toml index 555e6a7d2a1e..7bd67bc58374 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ members = [ "src/tests/sqlsmith", "src/tests/state_cleaning_test", "src/utils/delta_btree_map", + "src/utils/futures_util", "src/utils/local_stats_alloc", "src/utils/pgwire", "src/utils/runtime", @@ -187,6 +188,7 @@ risingwave_udf = { path = "./src/expr/udf" } risingwave_variables = { path = "./src/utils/variables" } risingwave_java_binding = { path = "./src/java_binding" } risingwave_jni_core = { path = "src/jni_core" } +rw_futures_util = { path = "src/utils/futures_util" } tokio-util = "0.7" [workspace.lints.rust] diff --git a/src/batch/Cargo.toml b/src/batch/Cargo.toml index 3660318180c1..93656967b801 100644 --- a/src/batch/Cargo.toml +++ b/src/batch/Cargo.toml @@ -37,6 +37,7 @@ risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true } risingwave_source = { workspace = true } risingwave_storage = { workspace = true } +rw_futures_util = { workspace = true } scopeguard = "1" serde_json = "1" thiserror = "1" diff --git a/src/batch/src/executor/generic_exchange.rs b/src/batch/src/executor/generic_exchange.rs index e54cb9069a39..704a085fec24 100644 --- a/src/batch/src/executor/generic_exchange.rs +++ b/src/batch/src/executor/generic_exchange.rs @@ -18,11 +18,11 @@ use itertools::Itertools; use risingwave_common::array::DataChunk; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_common::util::select_all; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::PbExchangeSource; use risingwave_pb::plan_common::Field as NodeField; use risingwave_rpc_client::ComputeClientPoolRef; +use rw_futures_util::select_all; use crate::error::{BatchError, Result}; use crate::exchange_source::ExchangeSourceImpl; diff --git a/src/batch/src/executor/row_seq_scan.rs b/src/batch/src/executor/row_seq_scan.rs index 6a5ec3cdf704..bf2fb9613b7e 100644 --- a/src/batch/src/executor/row_seq_scan.rs +++ b/src/batch/src/executor/row_seq_scan.rs @@ -24,7 +24,6 @@ use risingwave_common::catalog::{ColumnId, Schema}; use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::types::{DataType, Datum}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; -use risingwave_common::util::select_all; use risingwave_common::util::value_encoding::deserialize_datum; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::{scan_range, PbScanRange}; @@ -34,6 +33,7 @@ use risingwave_storage::store::PrefetchOptions; use risingwave_storage::table::batch_table::storage_table::StorageTable; use risingwave_storage::table::{collect_data_chunk, TableDistribution}; use risingwave_storage::{dispatch_state_store, StateStore}; +use rw_futures_util::select_all; use crate::error::{BatchError, Result}; use crate::executor::{ diff --git a/src/batch/src/executor/union.rs b/src/batch/src/executor/union.rs index 00d01f93448f..e37baed08deb 100644 --- a/src/batch/src/executor/union.rs +++ b/src/batch/src/executor/union.rs @@ -17,8 +17,8 @@ use futures_async_stream::try_stream; use itertools::Itertools; use risingwave_common::array::DataChunk; use risingwave_common::catalog::Schema; -use risingwave_common::util::select_all; use risingwave_pb::batch_plan::plan_node::NodeBody; +use rw_futures_util::select_all; use crate::error::{BatchError, Result}; use crate::executor::{ diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml index cb859665fb6d..f6741129d242 100644 --- a/src/common/Cargo.toml +++ b/src/common/Cargo.toml @@ -82,6 +82,7 @@ risingwave_common_proc_macro = { path = "./proc_macro" } risingwave_error = { workspace = true } risingwave_pb = { workspace = true } rust_decimal = { version = "1", features = ["db-postgres", "maths"] } +rw_futures_util = { workspace = true } ryu = "1.0" serde = { version = "1", features = ["derive"] } serde_bytes = "0.11" diff --git a/src/common/src/util/mod.rs b/src/common/src/util/mod.rs index a629512b3d63..917d982be3db 100644 --- a/src/common/src/util/mod.rs +++ b/src/common/src/util/mod.rs @@ -21,7 +21,6 @@ pub mod compress; pub mod deployment; pub mod env_var; pub mod epoch; -mod future_utils; pub mod hash_util; pub mod iter_util; pub mod memcmp_encoding; @@ -42,9 +41,5 @@ pub mod tracing; pub mod value_encoding; pub mod worker_util; -pub use future_utils::{ - await_future_with_monitor_error_stream, drop_either_future, pending_on_none, select_all, - RwFutureExt, RwTryStreamExt, -}; #[macro_use] pub mod match_util; diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 791cc076d12e..3f469e5ad65a 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -110,6 +110,7 @@ risingwave_jni_core = { workspace = true } risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true } rust_decimal = "1" +rw_futures_util = { workspace = true } serde = { version = "1", features = ["derive", "rc"] } serde_derive = "1" serde_json = "1" diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index 320f77c6a47b..6c4e12c5997a 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -29,7 +29,6 @@ use risingwave_common::array::StreamChunk; use risingwave_common::catalog::{ColumnDesc, ColumnId}; use risingwave_common::error::anyhow_error; use risingwave_common::types::DataType; -use risingwave_common::util::drop_either_future; use risingwave_jni_core::jvm_runtime::JVM; use risingwave_jni_core::{ call_static_method, gen_class_name, JniReceiverType, JniSenderType, JniSinkWriterStreamRequest, @@ -49,6 +48,7 @@ use risingwave_rpc_client::{ BidiStreamReceiver, BidiStreamSender, SinkCoordinatorStreamHandle, SinkWriterStreamHandle, DEFAULT_BUFFER_SIZE, }; +use rw_futures_util::drop_either_future; use tokio::sync::mpsc; use tokio::sync::mpsc::{unbounded_channel, Receiver, Sender}; use tokio::task::spawn_blocking; diff --git a/src/connector/src/sink/writer.rs b/src/connector/src/sink/writer.rs index fdfe1acd4301..1d8142061f35 100644 --- a/src/connector/src/sink/writer.rs +++ b/src/connector/src/sink/writer.rs @@ -22,7 +22,7 @@ use futures::future::{select, Either}; use futures::TryFuture; use risingwave_common::array::StreamChunk; use risingwave_common::buffer::Bitmap; -use risingwave_common::util::drop_either_future; +use rw_futures_util::drop_either_future; use crate::sink::encoder::SerTo; use crate::sink::formatter::SinkFormatter; diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index d6996008507a..f692531c6470 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -67,6 +67,7 @@ risingwave_sqlparser = { workspace = true } risingwave_storage = { workspace = true } risingwave_udf = { workspace = true } risingwave_variables = { workspace = true } +rw_futures_util = { workspace = true } serde = { version = "1", features = ["derive"] } serde_json = "1" sha2 = "0.10.7" diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index 2d0df049da3f..568f6de613a9 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -33,7 +33,6 @@ use risingwave_common::array::DataChunk; use risingwave_common::hash::ParallelUnitMapping; use risingwave_common::util::addr::HostAddr; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_common::util::select_all; use risingwave_connector::source::SplitMetaData; use risingwave_expr::expr_context::expr_context_scope; use risingwave_pb::batch_plan::plan_node::NodeBody; @@ -45,6 +44,7 @@ use risingwave_pb::common::{BatchQueryEpoch, HostAddress, WorkerNode}; use risingwave_pb::plan_common::ExprContext; use risingwave_pb::task_service::{CancelTaskRequest, TaskInfoResponse}; use risingwave_rpc_client::ComputeClientPoolRef; +use rw_futures_util::select_all; use thiserror_ext::AsReport; use tokio::spawn; use tokio::sync::mpsc::{Receiver, Sender}; diff --git a/src/jni_core/Cargo.toml b/src/jni_core/Cargo.toml index 736c94e58060..bb2f8bda463f 100644 --- a/src/jni_core/Cargo.toml +++ b/src/jni_core/Cargo.toml @@ -24,6 +24,7 @@ risingwave_hummock_sdk = { workspace = true } risingwave_object_store = { workspace = true } risingwave_pb = { workspace = true } risingwave_storage = { workspace = true } +rw_futures_util = { workspace = true } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" thiserror = "1" diff --git a/src/jni_core/src/hummock_iterator.rs b/src/jni_core/src/hummock_iterator.rs index 009dce22c9dd..c66669d55915 100644 --- a/src/jni_core/src/hummock_iterator.rs +++ b/src/jni_core/src/hummock_iterator.rs @@ -20,7 +20,6 @@ use risingwave_common::catalog::ColumnDesc; use risingwave_common::config::ObjectStoreConfig; use risingwave_common::hash::VirtualNode; use risingwave_common::row::OwnedRow; -use risingwave_common::util::select_all; use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde; use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde, ValueRowDeserializer}; use risingwave_hummock_sdk::key::{prefixed_range_with_vnode, TableKeyRange}; @@ -39,6 +38,7 @@ use risingwave_storage::hummock::{ use risingwave_storage::monitor::HummockStateStoreMetrics; use risingwave_storage::row_serde::value_serde::ValueRowSerdeNew; use risingwave_storage::store::{ReadOptions, StateStoreReadIterStream, StreamTypeOfIter}; +use rw_futures_util::select_all; use tokio::sync::mpsc::unbounded_channel; type SelectAllIterStream = impl StateStoreReadIterStream + Unpin; diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index c97dfab2d429..013ce2200f0d 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -59,6 +59,7 @@ risingwave_object_store = { workspace = true } risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true } risingwave_sqlparser = { workspace = true } +rw_futures_util = { workspace = true } scopeguard = "1.2.0" sea-orm = { version = "0.12.0", features = [ "sqlx-mysql", diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index e79ffadf3d99..b9661a37d8e8 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -24,11 +24,11 @@ use futures::stream::FuturesUnordered; use futures::{FutureExt, StreamExt}; use itertools::Itertools; use risingwave_common::bail; -use risingwave_common::util::pending_on_none; use risingwave_common::util::tracing::TracingContext; use risingwave_pb::stream_plan::{Barrier, BarrierMutation}; use risingwave_pb::stream_service::{BarrierCompleteRequest, InjectBarrierRequest}; use risingwave_rpc_client::StreamClientPoolRef; +use rw_futures_util::pending_on_none; use tokio::sync::oneshot; use uuid::Uuid; diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 5b07ff5be512..66d6b1aaa14c 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -32,7 +32,6 @@ use risingwave_common::config::default::compaction_config; use risingwave_common::config::ObjectStoreConfig; use risingwave_common::monitor::rwlock::MonitoredRwLock; use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH}; -use risingwave_common::util::{pending_on_none, select_all}; use risingwave_hummock_sdk::compact::{compact_task_to_string, statistics_compact_task}; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ build_version_delta_after_version, get_compaction_group_ids, @@ -64,6 +63,7 @@ use risingwave_pb::hummock::{ PbCompactionGroupInfo, SstableInfo, SubscribeCompactionEventRequest, TableOption, }; use risingwave_pb::meta::subscribe_response::{Info, Operation}; +use rw_futures_util::{pending_on_none, select_all}; use thiserror_ext::AsReport; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::oneshot::Sender; diff --git a/src/meta/src/manager/sink_coordination/manager.rs b/src/meta/src/manager/sink_coordination/manager.rs index 34b4073916e6..2c1d248565d4 100644 --- a/src/meta/src/manager/sink_coordination/manager.rs +++ b/src/meta/src/manager/sink_coordination/manager.rs @@ -20,11 +20,11 @@ use futures::future::{select, BoxFuture, Either}; use futures::stream::FuturesUnordered; use futures::{FutureExt, Stream, StreamExt, TryStreamExt}; use risingwave_common::buffer::Bitmap; -use risingwave_common::util::pending_on_none; use risingwave_connector::sink::catalog::SinkId; use risingwave_connector::sink::SinkParam; use risingwave_pb::connector_service::coordinate_request::Msg; use risingwave_pb::connector_service::{coordinate_request, CoordinateRequest, CoordinateResponse}; +use rw_futures_util::pending_on_none; use tokio::sync::mpsc; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::sync::oneshot::{channel, Receiver, Sender}; diff --git a/src/rpc_client/Cargo.toml b/src/rpc_client/Cargo.toml index 450bc894586e..7a43b6359cd6 100644 --- a/src/rpc_client/Cargo.toml +++ b/src/rpc_client/Cargo.toml @@ -29,6 +29,7 @@ risingwave_common = { workspace = true } risingwave_error = { workspace = true } risingwave_hummock_sdk = { workspace = true } risingwave_pb = { workspace = true } +rw_futures_util = { workspace = true } static_assertions = "1" thiserror = "1" thiserror-ext = { workspace = true } diff --git a/src/rpc_client/src/lib.rs b/src/rpc_client/src/lib.rs index 726d2e4c6c98..17168d94f3ac 100644 --- a/src/rpc_client/src/lib.rs +++ b/src/rpc_client/src/lib.rs @@ -61,7 +61,7 @@ pub use compute_client::{ComputeClient, ComputeClientPool, ComputeClientPoolRef} pub use connector_client::{ConnectorClient, SinkCoordinatorStreamHandle, SinkWriterStreamHandle}; pub use hummock_meta_client::{CompactionEventItem, HummockMetaClient}; pub use meta_client::{MetaClient, SinkCoordinationRpcClient}; -use risingwave_common::util::await_future_with_monitor_error_stream; +use rw_futures_util::await_future_with_monitor_error_stream; pub use sink_coordinate_client::CoordinatorStreamHandle; pub use stream_client::{StreamClient, StreamClientPool, StreamClientPoolRef}; diff --git a/src/source/Cargo.toml b/src/source/Cargo.toml index 735ca5f10d9b..9949fd8ab11f 100644 --- a/src/source/Cargo.toml +++ b/src/source/Cargo.toml @@ -23,6 +23,7 @@ rand = "0.8" risingwave_common = { workspace = true } risingwave_connector = { workspace = true } risingwave_pb = { workspace = true } +rw_futures_util = { workspace = true } tokio = { version = "0.2", package = "madsim-tokio", features = ["rt", "rt-multi-thread", "sync", "macros", "time", "signal", "fs"] } tracing = { version = "0.1" } diff --git a/src/source/src/connector_source.rs b/src/source/src/connector_source.rs index 441a91836bb0..f126c2692a77 100644 --- a/src/source/src/connector_source.rs +++ b/src/source/src/connector_source.rs @@ -25,7 +25,6 @@ use risingwave_common::bail; use risingwave_common::catalog::ColumnId; use risingwave_common::error::ErrorCode::ConnectorError; use risingwave_common::error::{Result, RwError}; -use risingwave_common::util::select_all; use risingwave_connector::dispatch_source_prop; use risingwave_connector::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig}; use risingwave_connector::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator; @@ -37,6 +36,7 @@ use risingwave_connector::source::{ create_split_reader, BoxSourceWithStateStream, BoxTryStream, Column, ConnectorProperties, ConnectorState, FsFilterCtrlCtx, SourceColumnDesc, SourceContext, SplitReader, }; +use rw_futures_util::select_all; use tokio::time; use tokio::time::Duration; diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml index d3a035fa8b59..dcc2a6f3a4cb 100644 --- a/src/stream/Cargo.toml +++ b/src/stream/Cargo.toml @@ -54,6 +54,7 @@ risingwave_pb = { workspace = true } risingwave_rpc_client = { workspace = true } risingwave_source = { workspace = true } risingwave_storage = { workspace = true } +rw_futures_util = { workspace = true } serde_json = "1" smallvec = "1" static_assertions = "1" diff --git a/src/stream/src/executor/project.rs b/src/stream/src/executor/project.rs index 07052f140818..8cbd7e66e489 100644 --- a/src/stream/src/executor/project.rs +++ b/src/stream/src/executor/project.rs @@ -22,8 +22,8 @@ use risingwave_common::catalog::Schema; use risingwave_common::row::{Row, RowExt}; use risingwave_common::types::ToOwnedDatum; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_common::util::{RwFutureExt, RwTryStreamExt}; use risingwave_expr::expr::NonStrictExpression; +use rw_futures_util::{RwFutureExt, RwTryStreamExt}; use super::*; diff --git a/src/utils/futures_util/Cargo.toml b/src/utils/futures_util/Cargo.toml new file mode 100644 index 000000000000..97bd794daaf8 --- /dev/null +++ b/src/utils/futures_util/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "rw_futures_util" +edition = { workspace = true } +license = { workspace = true } +repository = { workspace = true } +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[package.metadata.cargo-machete] +ignored = ["workspace-hack"] + +[package.metadata.cargo-udeps.ignore] +normal = ["workspace-hack"] + +[dependencies] +futures = "0.3" +pin-project-lite = "0.2" + +[dev-dependencies] +tokio = { version = "1", features = ["full"] } + +[lints] +workspace = true diff --git a/src/common/src/util/future_utils/buffered_with_fence.rs b/src/utils/futures_util/src/buffered_with_fence.rs similarity index 80% rename from src/common/src/util/future_utils/buffered_with_fence.rs rename to src/utils/futures_util/src/buffered_with_fence.rs index 30b1938fda99..6271fd189a58 100644 --- a/src/common/src/util/future_utils/buffered_with_fence.rs +++ b/src/utils/futures_util/src/buffered_with_fence.rs @@ -45,6 +45,21 @@ pin_project! { } } +impl TryBufferedWithFence +where + St: TryStream, + St::Ok: TryFuture + MaybeFence, +{ + pub(crate) fn new(stream: St, n: usize) -> Self { + Self { + stream: stream.into_stream().fuse(), + in_progress_queue: FuturesOrdered::new(), + syncing: false, + max: n, + } + } +} + impl Stream for TryBufferedWithFence where St: TryStream, @@ -100,6 +115,15 @@ pin_project! { } } +impl Fenced +where + Fut: Future, +{ + pub(crate) fn new(inner: Fut, is_fence: bool) -> Self { + Self { inner, is_fence } + } +} + impl Future for Fenced where Fut: Future, @@ -131,51 +155,6 @@ where } } -pub trait RwFutureExt: Future { - fn with_fence(self, is_fence: bool) -> Fenced - where - Self: Sized; -} - -impl RwFutureExt for Fut { - fn with_fence(self, is_fence: bool) -> Fenced { - Fenced { - inner: self, - is_fence, - } - } -} - -pub trait RwTryStreamExt: TryStream { - /// Similar to [`TryStreamExt::try_buffered`](https://docs.rs/futures/latest/futures/stream/trait.TryStreamExt.html#method.try_buffered), but respect to fence. - /// - /// Fence is provided by [`Future`] that implements [`MaybeFence`] and returns `true`. - /// When the stream receive a fenced future, it'll not do a sync operation. In brief, don't poll later futures until the current - /// buffer is cleared. - fn try_buffered_with_fence(self, n: usize) -> TryBufferedWithFence - where - Self: Sized, - Self::Ok: TryFuture + MaybeFence; -} - -impl RwTryStreamExt for St -where - St: TryStream, -{ - fn try_buffered_with_fence(self, n: usize) -> TryBufferedWithFence - where - Self: Sized, - Self::Ok: TryFuture + MaybeFence, - { - TryBufferedWithFence { - stream: self.into_stream().fuse(), - in_progress_queue: FuturesOrdered::new(), - syncing: false, - max: n, - } - } -} - #[cfg(test)] mod tests { use std::sync::{Arc, Mutex}; @@ -183,7 +162,7 @@ mod tests { use futures::stream::StreamExt; - use super::{RwFutureExt, RwTryStreamExt}; + use crate::{RwFutureExt, RwTryStreamExt}; #[tokio::test] async fn test_buffered_with_fence() { diff --git a/src/utils/futures_util/src/lib.rs b/src/utils/futures_util/src/lib.rs new file mode 100644 index 000000000000..e4dfe78c922f --- /dev/null +++ b/src/utils/futures_util/src/lib.rs @@ -0,0 +1,63 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![feature(lint_reasons)] + +use std::future::Future; + +use futures::stream::TryStream; +use futures::TryFuture; + +mod buffered_with_fence; +mod misc; + +use buffered_with_fence::{Fenced, MaybeFence, TryBufferedWithFence}; +pub use misc::*; + +pub trait RwTryStreamExt: TryStream { + /// Similar to [`TryStreamExt::try_buffered`](https://docs.rs/futures/latest/futures/stream/trait.TryStreamExt.html#method.try_buffered), but respect to fence. + /// + /// Fence is provided by [`Future`] that implements [`MaybeFence`] and returns `true`. + /// When the stream receive a fenced future, it'll not do a sync operation. In brief, don't poll later futures until the current + /// buffer is cleared. + fn try_buffered_with_fence(self, n: usize) -> TryBufferedWithFence + where + Self: Sized, + Self::Ok: TryFuture + MaybeFence; +} + +impl RwTryStreamExt for St +where + St: TryStream, +{ + fn try_buffered_with_fence(self, n: usize) -> TryBufferedWithFence + where + Self: Sized, + Self::Ok: TryFuture + MaybeFence, + { + TryBufferedWithFence::new(self, n) + } +} + +pub trait RwFutureExt: Future { + fn with_fence(self, is_fence: bool) -> Fenced + where + Self: Sized; +} + +impl RwFutureExt for Fut { + fn with_fence(self, is_fence: bool) -> Fenced { + Fenced::new(self, is_fence) + } +} diff --git a/src/common/src/util/future_utils/mod.rs b/src/utils/futures_util/src/misc.rs similarity index 94% rename from src/common/src/util/future_utils/mod.rs rename to src/utils/futures_util/src/misc.rs index d71ebfa7d676..01317b3e85f2 100644 --- a/src/common/src/util/future_utils/mod.rs +++ b/src/utils/futures_util/src/misc.rs @@ -12,15 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod buffered_with_fence; - -use std::future::pending; +use std::future::Future; use std::pin::{pin, Pin}; -pub use buffered_with_fence::*; -use futures::future::{select, Either}; +use futures::future::{pending, select, Either}; use futures::stream::Peekable; -use futures::{Future, FutureExt, Stream, StreamExt}; +use futures::{FutureExt, Stream, StreamExt}; /// Convert a list of streams into a [`Stream`] of results from the streams. pub fn select_all( diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 6b5996fe81d5..fbd86300c73b 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -52,6 +52,7 @@ frunk_core = { version = "0.4", default-features = false, features = ["std"] } futures = { version = "0.3" } futures-channel = { version = "0.3", features = ["sink"] } futures-core = { version = "0.3" } +futures-executor = { version = "0.3" } futures-io = { version = "0.3" } futures-sink = { version = "0.3" } futures-task = { version = "0.3" } From 0c577c861d16668cc31cb91f85ccc14a3e7ea05a Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 16 Jan 2024 08:56:18 +0000 Subject: [PATCH 6/7] chore(deps): Bump smallvec from 1.11.1 to 1.12.0 (#14582) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: dependabot[bot] Co-authored-by: xiangjinwu <17769960+xiangjinwu@users.noreply.github.com> --- Cargo.lock | 10 +++++----- src/frontend/Cargo.toml | 2 +- src/workspace-hack/Cargo.toml | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1262bfb13bc8..5c4d757a6d11 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7798,7 +7798,7 @@ checksum = "8bdf592881d821b83d471f8af290226c8d51402259e9bb5be7f9f8bdebbb11ac" dependencies = [ "bytes", "heck 0.4.1", - "itertools 0.10.5", + "itertools 0.11.0", "log", "multimap 0.8.3", "once_cell", @@ -7832,7 +7832,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "265baba7fabd416cf5078179f7d2cbeca4ce7a9041111900675ea7c4cb8a4c32" dependencies = [ "anyhow", - "itertools 0.10.5", + "itertools 0.11.0", "proc-macro2", "quote", "syn 2.0.48", @@ -11027,9 +11027,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.11.1" +version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "942b4a808e05215192e39f4ab80813e599068285906cc91aa64f923db842bd5a" +checksum = "2593d31f82ead8df961d8bd23a64c2ccf2eb5dd34b0a34bfb4dd54011c72009e" dependencies = [ "serde", ] @@ -13636,7 +13636,7 @@ dependencies = [ "hyper", "indexmap 1.9.3", "indexmap 2.0.0", - "itertools 0.10.5", + "itertools 0.11.0", "jni", "lazy_static", "lexical-core", diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index f692531c6470..8963dab80abf 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -71,7 +71,7 @@ rw_futures_util = { workspace = true } serde = { version = "1", features = ["derive"] } serde_json = "1" sha2 = "0.10.7" -smallvec = { version = "1.11.1", features = ["serde"] } +smallvec = { version = "1.12.0", features = ["serde"] } tempfile = "3" thiserror = "1" thiserror-ext = { workspace = true } diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index fbd86300c73b..fe84476bea3c 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -66,7 +66,7 @@ hmac = { version = "0.12", default-features = false, features = ["reset"] } hyper = { version = "0.14", features = ["full"] } indexmap-dff4ba8e3ae991db = { package = "indexmap", version = "1", default-features = false, features = ["serde", "std"] } indexmap-f595c2ba2a3f28df = { package = "indexmap", version = "2", features = ["serde"] } -itertools = { version = "0.10" } +itertools = { version = "0.11" } jni = { version = "0.21", features = ["invocation"] } lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] } lexical-core = { version = "0.8", features = ["format"] } @@ -170,7 +170,7 @@ frunk_core = { version = "0.4", default-features = false, features = ["std"] } generic-array = { version = "0.14", default-features = false, features = ["more_lengths", "zeroize"] } hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", features = ["nightly", "raw"] } indexmap-f595c2ba2a3f28df = { package = "indexmap", version = "2", features = ["serde"] } -itertools = { version = "0.10" } +itertools = { version = "0.11" } lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] } libc = { version = "0.2", features = ["extra_traits"] } log = { version = "0.4", default-features = false, features = ["kv_unstable", "std"] } From 83e829ed9eaebdf30da6610386b3f2b42d18accc Mon Sep 17 00:00:00 2001 From: Yuhao Su <31772373+yuhao-su@users.noreply.github.com> Date: Tue, 16 Jan 2024 16:59:11 +0800 Subject: [PATCH 7/7] chore: rename system catalog field total_memory_bytes total_cpu_cores (#14594) --- .../src/catalog/system_catalog/rw_catalog/rw_worker_nodes.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_worker_nodes.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_worker_nodes.rs index e975a2665d9a..c3f270cd422f 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_worker_nodes.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_worker_nodes.rs @@ -35,8 +35,8 @@ pub const RW_WORKER_NODES: BuiltinTable = BuiltinTable { (DataType::Boolean, "is_serving"), (DataType::Boolean, "is_unschedulable"), (DataType::Varchar, "rw_version"), - (DataType::Int64, "total_memory_bytes"), - (DataType::Int64, "total_cpu_cores"), + (DataType::Int64, "system_total_memory_bytes"), + (DataType::Int64, "system_total_cpu_cores"), (DataType::Timestamptz, "started_at"), ], pk: &[0],