From 5796347d9f807f6b2a529ac333e1c7be9ec68c6a Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Sat, 14 Sep 2024 16:52:29 +0800 Subject: [PATCH 1/3] init --- proto/stream_plan.proto | 14 +- .../src/parser/protobuf/recursive.rs | 157 ++++++++++++++++++ src/stream/src/executor/asof_join.rs | 79 ++------- src/stream/src/executor/hash_join.rs | 52 +++--- src/stream/src/executor/join/hash_join.rs | 116 ++++++------- src/stream/src/from_proto/asof_join.rs | 14 -- 6 files changed, 249 insertions(+), 183 deletions(-) create mode 100644 src/connector/src/parser/protobuf/recursive.rs diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index ca67737aeafe0..d6a6ae0ed67e9 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -463,22 +463,18 @@ message AsOfJoinNode { catalog.Table left_table = 4; // Used for internal table states. catalog.Table right_table = 5; - // Used for internal table states. - catalog.Table left_degree_table = 6; - // Used for internal table states. - catalog.Table right_degree_table = 7; // The output indices of current node - repeated uint32 output_indices = 8; + repeated uint32 output_indices = 6; // Left deduped input pk indices. The pk of the left_table and // The pk of the left_table is [left_join_key | left_inequality_key | left_deduped_input_pk_indices] // left_inequality_key is not used but for forward compatibility. - repeated uint32 left_deduped_input_pk_indices = 9; + repeated uint32 left_deduped_input_pk_indices = 7; // Right deduped input pk indices. // The pk of the right_table is [right_join_key | right_inequality_key | right_deduped_input_pk_indices] // right_inequality_key is not used but for forward compatibility. - repeated uint32 right_deduped_input_pk_indices = 10; - repeated bool null_safe = 11; - optional plan_common.AsOfJoinDesc asof_desc = 12; + repeated uint32 right_deduped_input_pk_indices = 8; + repeated bool null_safe = 9; + optional plan_common.AsOfJoinDesc asof_desc = 10; } message TemporalJoinNode { diff --git a/src/connector/src/parser/protobuf/recursive.rs b/src/connector/src/parser/protobuf/recursive.rs new file mode 100644 index 0000000000000..dc367eb5f70cd --- /dev/null +++ b/src/connector/src/parser/protobuf/recursive.rs @@ -0,0 +1,157 @@ +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ComplexRecursiveMessage { + #[prost(string, tag = "1")] + pub node_name: ::prost::alloc::string::String, + #[prost(int32, tag = "2")] + pub node_id: i32, + #[prost(message, repeated, tag = "3")] + pub attributes: ::prost::alloc::vec::Vec, + #[prost(message, optional, tag = "4")] + pub parent: ::core::option::Option, + #[prost(message, repeated, tag = "5")] + pub children: ::prost::alloc::vec::Vec, +} +/// Nested message and enum types in `ComplexRecursiveMessage`. +pub mod complex_recursive_message { + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct Attributes { + #[prost(string, tag = "1")] + pub key: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub value: ::prost::alloc::string::String, + } + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct Parent { + #[prost(string, tag = "1")] + pub parent_name: ::prost::alloc::string::String, + #[prost(int32, tag = "2")] + pub parent_id: i32, + #[prost(message, repeated, tag = "3")] + pub siblings: ::prost::alloc::vec::Vec, + } +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct AllTypes { + /// standard types + #[prost(double, tag = "1")] + pub double_field: f64, + #[prost(float, tag = "2")] + pub float_field: f32, + #[prost(int32, tag = "3")] + pub int32_field: i32, + #[prost(int64, tag = "4")] + pub int64_field: i64, + #[prost(uint32, tag = "5")] + pub uint32_field: u32, + #[prost(uint64, tag = "6")] + pub uint64_field: u64, + #[prost(sint32, tag = "7")] + pub sint32_field: i32, + #[prost(sint64, tag = "8")] + pub sint64_field: i64, + #[prost(fixed32, tag = "9")] + pub fixed32_field: u32, + #[prost(fixed64, tag = "10")] + pub fixed64_field: u64, + #[prost(sfixed32, tag = "11")] + pub sfixed32_field: i32, + #[prost(sfixed64, tag = "12")] + pub sfixed64_field: i64, + #[prost(bool, tag = "13")] + pub bool_field: bool, + #[prost(string, tag = "14")] + pub string_field: ::prost::alloc::string::String, + #[prost(bytes = "vec", tag = "15")] + pub bytes_field: ::prost::alloc::vec::Vec, + #[prost(enumeration = "all_types::EnumType", tag = "16")] + pub enum_field: i32, + #[prost(message, optional, tag = "17")] + pub nested_message_field: ::core::option::Option, + /// repeated field + #[prost(int32, repeated, tag = "18")] + pub repeated_int_field: ::prost::alloc::vec::Vec, + /// timestamp + #[prost(message, optional, tag = "23")] + pub timestamp_field: ::core::option::Option<::prost_types::Timestamp>, + /// duration + #[prost(message, optional, tag = "24")] + pub duration_field: ::core::option::Option<::prost_types::Duration>, + /// any + #[prost(message, optional, tag = "25")] + pub any_field: ::core::option::Option<::prost_types::Any>, + /// wrapper types + #[prost(message, optional, tag = "27")] + pub int32_value_field: ::core::option::Option, + #[prost(message, optional, tag = "28")] + pub string_value_field: ::core::option::Option<::prost::alloc::string::String>, + /// oneof field + #[prost(oneof = "all_types::ExampleOneof", tags = "19, 20, 21")] + pub example_oneof: ::core::option::Option, +} +/// Nested message and enum types in `AllTypes`. +pub mod all_types { + /// nested message + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct NestedMessage { + #[prost(int32, tag = "1")] + pub id: i32, + #[prost(string, tag = "2")] + pub name: ::prost::alloc::string::String, + } + /// enum + #[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + Hash, + PartialOrd, + Ord, + ::prost::Enumeration + )] + #[repr(i32)] + pub enum EnumType { + Default = 0, + Option1 = 1, + Option2 = 2, + } + impl EnumType { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + EnumType::Default => "DEFAULT", + EnumType::Option1 => "OPTION1", + EnumType::Option2 => "OPTION2", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "DEFAULT" => Some(Self::Default), + "OPTION1" => Some(Self::Option1), + "OPTION2" => Some(Self::Option2), + _ => None, + } + } + } + /// oneof field + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum ExampleOneof { + #[prost(string, tag = "19")] + OneofString(::prost::alloc::string::String), + #[prost(int32, tag = "20")] + OneofInt32(i32), + #[prost(enumeration = "EnumType", tag = "21")] + OneofEnum(i32), + } +} diff --git a/src/stream/src/executor/asof_join.rs b/src/stream/src/executor/asof_join.rs index cb8a141481f28..97a2e5af78162 100644 --- a/src/stream/src/executor/asof_join.rs +++ b/src/stream/src/executor/asof_join.rs @@ -186,9 +186,7 @@ impl AsOfJoinExecutor null_safe: Vec, output_indices: Vec, state_table_l: StateTable, - degree_state_table_l: StateTable, state_table_r: StateTable, - degree_state_table_r: StateTable, watermark_epoch: AtomicU64Ref, metrics: Arc, chunk_size: usize, @@ -219,22 +217,9 @@ impl AsOfJoinExecutor let state_pk_indices_l = input_l.pk_indices().to_vec(); let state_pk_indices_r = input_r.pk_indices().to_vec(); - let state_order_key_indices_l = state_table_l.pk_indices(); - let state_order_key_indices_r = state_table_r.pk_indices(); - let state_join_key_indices_l = params_l.join_key_indices; let state_join_key_indices_r = params_r.join_key_indices; - let degree_join_key_indices_l = (0..state_join_key_indices_l.len()).collect_vec(); - let degree_join_key_indices_r = (0..state_join_key_indices_r.len()).collect_vec(); - - let degree_pk_indices_l = (state_join_key_indices_l.len() - ..state_join_key_indices_l.len() + params_l.deduped_pk_indices.len()) - .collect_vec(); - let degree_pk_indices_r = (state_join_key_indices_r.len() - ..state_join_key_indices_r.len() + params_r.deduped_pk_indices.len()) - .collect_vec(); - // If pk is contained in join key. let pk_contained_in_jk_l = is_subset(state_pk_indices_l.clone(), state_join_key_indices_l.clone()); @@ -253,20 +238,8 @@ impl AsOfJoinExecutor assert_eq!(join_key_data_types_l, join_key_data_types_r); - let degree_all_data_types_l = state_order_key_indices_l - .iter() - .map(|idx| state_all_data_types_l[*idx].clone()) - .collect_vec(); - let degree_all_data_types_r = state_order_key_indices_r - .iter() - .map(|idx| state_all_data_types_r[*idx].clone()) - .collect_vec(); - let null_matched = K::Bitmap::from_bool_vec(null_safe); - let need_degree_table_l = false; - let need_degree_table_r = false; - let (left_to_output, right_to_output) = { let (left_len, right_len) = if is_left_semi_or_anti(T) { (state_all_data_types_l.len(), 0usize) @@ -303,12 +276,8 @@ impl AsOfJoinExecutor state_all_data_types_l.clone(), state_table_l, params_l.deduped_pk_indices, - degree_join_key_indices_l, - degree_all_data_types_l, - degree_state_table_l, - degree_pk_indices_l, + None, null_matched.clone(), - need_degree_table_l, pk_contained_in_jk_l, inequal_key_idx_l, metrics.clone(), @@ -321,7 +290,7 @@ impl AsOfJoinExecutor i2o_mapping: left_to_output, i2o_mapping_indexed: l2o_indexed, start_pos: 0, - need_degree_table: need_degree_table_l, + need_degree_table: false, }, side_r: JoinSide { ht: JoinHashMap::new( @@ -331,12 +300,8 @@ impl AsOfJoinExecutor state_all_data_types_r.clone(), state_table_r, params_r.deduped_pk_indices, - degree_join_key_indices_r, - degree_all_data_types_r, - degree_state_table_r, - degree_pk_indices_r, + None, null_matched, - need_degree_table_r, pk_contained_in_jk_r, inequal_key_idx_r, metrics.clone(), @@ -349,7 +314,7 @@ impl AsOfJoinExecutor start_pos: side_l_column_n, i2o_mapping: right_to_output, i2o_mapping_indexed: r2o_indexed, - need_degree_table: need_degree_table_r, + need_degree_table: false, }, metrics, chunk_size, @@ -675,7 +640,7 @@ impl AsOfJoinExecutor { yield chunk; } - side_update.ht.insert_row(key, row).await?; + side_update.ht.insert_row(key, row)?; } Op::Delete | Op::UpdateDelete => { if let Some(matched_row_by_inequality) = matched_row_by_inequality { @@ -924,7 +889,7 @@ impl AsOfJoinExecutor match op { Op::Insert | Op::UpdateInsert => { - side_update.ht.insert_row(key, row).await?; + side_update.ht.insert_row(key, row)?; } Op::Delete | Op::UpdateDelete => { side_update.ht.delete_row(key, row)?; @@ -976,7 +941,7 @@ mod tests { order_types: &[OrderType], pk_indices: &[usize], table_id: u32, - ) -> (StateTable, StateTable) { + ) -> StateTable { let column_descs = data_types .iter() .enumerate() @@ -991,27 +956,7 @@ mod tests { ) .await; - // Create degree table - let mut degree_table_column_descs = vec![]; - pk_indices.iter().enumerate().for_each(|(pk_id, idx)| { - degree_table_column_descs.push(ColumnDesc::unnamed( - ColumnId::new(pk_id as i32), - data_types[*idx].clone(), - )) - }); - degree_table_column_descs.push(ColumnDesc::unnamed( - ColumnId::new(pk_indices.len() as i32), - DataType::Int64, - )); - let degree_state_table = StateTable::new_without_distribution( - mem_state, - TableId::new(table_id + 1), - degree_table_column_descs, - order_types.to_vec(), - pk_indices.to_vec(), - ) - .await; - (state_table, degree_state_table) + state_table } async fn create_executor( @@ -1033,7 +978,7 @@ mod tests { let mem_state = MemoryStateStore::new(); - let (state_l, degree_state_l) = create_in_memory_state_table( + let state_l = create_in_memory_state_table( mem_state.clone(), &[DataType::Int64, DataType::Int64, DataType::Int64], &[ @@ -1046,7 +991,7 @@ mod tests { ) .await; - let (state_r, degree_state_r) = create_in_memory_state_table( + let state_r = create_in_memory_state_table( mem_state, &[DataType::Int64, DataType::Int64, DataType::Int64], &[ @@ -1055,7 +1000,7 @@ mod tests { OrderType::ascending(), ], &[0, asof_desc.right_idx, 1], - 2, + 1, ) .await; @@ -1080,9 +1025,7 @@ mod tests { vec![false], (0..schema_len).collect_vec(), state_l, - degree_state_l, state_r, - degree_state_r, Arc::new(AtomicU64::new(0)), Arc::new(StreamingMetrics::unused()), 1024, diff --git a/src/stream/src/executor/hash_join.rs b/src/stream/src/executor/hash_join.rs index e23c17724be02..9e974cb5071f9 100644 --- a/src/stream/src/executor/hash_join.rs +++ b/src/stream/src/executor/hash_join.rs @@ -248,9 +248,6 @@ impl HashJoinExecutor HashJoinExecutor HashJoinExecutor HashJoinExecutor HashJoinExecutor { /// - Full Outer: both side /// - Left Outer/Semi/Anti: left side /// - Right Outer/Semi/Anti: right side - /// - Inner: None. - degree_state: TableInner, + /// - Inner: neither side. + /// + /// Should be set to `None` if `need_degree_table` was set to `false`. + degree_state: Option>, /// If degree table is need need_degree_table: bool, /// Pk is part of the join key. @@ -206,20 +208,27 @@ pub struct JoinHashMap { metrics: JoinHashMapMetrics, } -struct TableInner { +pub struct TableInner { /// Indices of the (cache) pk in a state row pk_indices: Vec, /// Indices of the join key in a state row join_key_indices: Vec, // This should be identical to the pk in state table. order_key_indices: Vec, - // This should be identical to the data types in table schema. - #[expect(dead_code)] - all_data_types: Vec, pub(crate) table: StateTable, } impl TableInner { + pub fn new(pk_indices: Vec, join_key_indices: Vec, table: StateTable) -> Self { + let order_key_indices = table.pk_indices().to_vec(); + Self { + pk_indices, + join_key_indices, + order_key_indices, + table, + } + } + fn error_context(&self, row: &impl Row) -> String { let pk = row.project(&self.pk_indices); let jk = row.project(&self.join_key_indices); @@ -243,12 +252,8 @@ impl JoinHashMap { state_all_data_types: Vec, state_table: StateTable, state_pk_indices: Vec, - degree_join_key_indices: Vec, - degree_all_data_types: Vec, - degree_table: StateTable, - degree_pk_indices: Vec, + degree_state: Option>, null_matched: K::Bitmap, - need_degree_table: bool, pk_contained_in_jk: bool, inequality_key_idx: Option, metrics: Arc, @@ -280,17 +285,10 @@ impl JoinHashMap { pk_indices: state_pk_indices, join_key_indices: state_join_key_indices, order_key_indices: state_table.pk_indices().to_vec(), - all_data_types: state_all_data_types, table: state_table, }; - let degree_state = TableInner { - pk_indices: degree_pk_indices, - join_key_indices: degree_join_key_indices, - order_key_indices: degree_table.pk_indices().to_vec(), - all_data_types: degree_all_data_types, - table: degree_table, - }; + let need_degree_table = degree_state.is_some(); let metrics_info = MetricsInfo::new( metrics.clone(), @@ -322,14 +320,19 @@ impl JoinHashMap { pub fn init(&mut self, epoch: EpochPair) { self.state.table.init_epoch(epoch); - self.degree_state.table.init_epoch(epoch); + if let Some(degree_state) = &mut self.degree_state { + degree_state.table.init_epoch(epoch); + } } /// Update the vnode bitmap and manipulate the cache if necessary. pub fn update_vnode_bitmap(&mut self, vnode_bitmap: Arc) -> bool { let (_previous_vnode_bitmap, cache_may_stale) = self.state.table.update_vnode_bitmap(vnode_bitmap.clone()); - let _ = self.degree_state.table.update_vnode_bitmap(vnode_bitmap); + let _ = self + .degree_state + .as_mut() + .map(|degree_state| degree_state.table.update_vnode_bitmap(vnode_bitmap.clone())); if cache_may_stale { self.inner.clear(); @@ -341,7 +344,9 @@ impl JoinHashMap { pub fn update_watermark(&mut self, watermark: ScalarImpl) { // TODO: remove data in cache. self.state.table.update_watermark(watermark.clone()); - self.degree_state.table.update_watermark(watermark); + if let Some(degree_state) = &mut self.degree_state { + degree_state.table.update_watermark(watermark); + } } /// Take the state for the given `key` out of the hash table and return it. One **MUST** call @@ -380,11 +385,11 @@ impl JoinHashMap { self.state .table .iter_with_prefix(&key, sub_range, PrefetchOptions::default()); - let degree_table_iter_fut = self.degree_state.table.iter_with_prefix( - &key, - sub_range, - PrefetchOptions::default(), - ); + let degree_state = self.degree_state.as_ref().unwrap(); + let degree_table_iter_fut = + degree_state + .table + .iter_with_prefix(&key, sub_range, PrefetchOptions::default()); let (table_iter, degree_table_iter) = try_join(table_iter_fut, degree_table_iter_fut).await?; @@ -542,19 +547,22 @@ impl JoinHashMap { pub async fn flush(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { self.metrics.flush(); self.state.table.commit(epoch).await?; - self.degree_state.table.commit(epoch).await?; + if let Some(degree_state) = &mut self.degree_state { + degree_state.table.commit(epoch).await?; + } Ok(()) } pub async fn try_flush(&mut self) -> StreamExecutorResult<()> { self.state.table.try_flush().await?; - self.degree_state.table.try_flush().await?; + if let Some(degree_state) = &mut self.degree_state { + degree_state.table.try_flush().await?; + } Ok(()) } /// Insert a join row - #[allow(clippy::unused_async)] - pub async fn insert(&mut self, key: &K, value: JoinRow) -> StreamExecutorResult<()> { + pub fn insert(&mut self, key: &K, value: JoinRow) -> StreamExecutorResult<()> { let pk = self.serialize_pk_from_row(&value.row); let inequality_key = self @@ -581,42 +589,21 @@ impl JoinHashMap { } // Update the flush buffer. - let (row, degree) = value.to_table_rows(&self.state.order_key_indices); - self.state.table.insert(row); - self.degree_state.table.insert(degree); + if let Some(degree_state) = self.degree_state.as_mut() { + let (row, degree) = value.to_table_rows(&self.state.order_key_indices); + self.state.table.insert(row); + degree_state.table.insert(degree); + } else { + self.state.table.insert(value.row); + } Ok(()) } /// Insert a row. /// Used when the side does not need to update degree. - #[allow(clippy::unused_async)] - pub async fn insert_row(&mut self, key: &K, value: impl Row) -> StreamExecutorResult<()> { + pub fn insert_row(&mut self, key: &K, value: impl Row) -> StreamExecutorResult<()> { let join_row = JoinRow::new(&value, 0); - let pk = self.serialize_pk_from_row(&value); - let inequality_key = self - .inequality_key_desc - .as_ref() - .map(|desc| desc.serialize_inequal_key_from_row(&value)); - // TODO(yuhao): avoid this `contains`. - // https://github.com/risingwavelabs/risingwave/issues/9233 - if self.inner.contains(key) { - // Update cache - let mut entry = self.inner.get_mut(key).unwrap(); - entry - .insert(pk, join_row.encode(), inequality_key) - .with_context(|| self.state.error_context(&value))?; - } else if self.pk_contained_in_jk { - // Refill cache when the join key exist in neither cache or storage. - self.metrics.insert_cache_miss_count += 1; - let mut state = JoinEntryState::default(); - state - .insert(pk, join_row.encode(), inequality_key) - .with_context(|| self.state.error_context(&value))?; - self.update_state(key, state.into()); - } - - // Update the flush buffer. - self.state.table.insert(value); + self.insert(key, join_row)?; Ok(()) } @@ -638,7 +625,8 @@ impl JoinHashMap { // If no cache maintained, only update the state table. let (row, degree) = value.to_table_rows(&self.state.order_key_indices); self.state.table.delete(row); - self.degree_state.table.delete(degree); + let degree_state = self.degree_state.as_mut().unwrap(); + degree_state.table.delete(degree); Ok(()) } @@ -687,8 +675,8 @@ impl JoinHashMap { action(&mut join_row.degree); let new_degree = join_row.to_table_rows(&self.state.order_key_indices).1; - - self.degree_state.table.update(old_degree, new_degree); + let degree_state = self.degree_state.as_mut().unwrap(); + degree_state.table.update(old_degree, new_degree); } /// Increment the degree of the given [`JoinRow`] and [`EncodedJoinRow`] with `action`, both in diff --git a/src/stream/src/from_proto/asof_join.rs b/src/stream/src/from_proto/asof_join.rs index 3d74ac884b4f0..f82b4ed490117 100644 --- a/src/stream/src/from_proto/asof_join.rs +++ b/src/stream/src/from_proto/asof_join.rs @@ -44,10 +44,7 @@ impl ExecutorBuilder for AsOfJoinExecutorBuilder { let [source_l, source_r]: [_; 2] = params.input.try_into().unwrap(); let table_l = node.get_left_table()?; - let degree_table_l = node.get_left_degree_table()?; - let table_r = node.get_right_table()?; - let degree_table_r = node.get_right_degree_table()?; let params_l = JoinParams::new( node.get_left_key() @@ -84,14 +81,9 @@ impl ExecutorBuilder for AsOfJoinExecutorBuilder { let state_table_l = StateTable::from_table_catalog(table_l, store.clone(), Some(vnodes.clone())).await; - let degree_state_table_l = - StateTable::from_table_catalog(degree_table_l, store.clone(), Some(vnodes.clone())) - .await; let state_table_r = StateTable::from_table_catalog(table_r, store.clone(), Some(vnodes.clone())).await; - let degree_state_table_r = - StateTable::from_table_catalog(degree_table_r, store, Some(vnodes)).await; let join_type_proto = node.get_join_type()?; let as_of_desc_proto = node.get_asof_desc()?; @@ -107,9 +99,7 @@ impl ExecutorBuilder for AsOfJoinExecutorBuilder { null_safe, output_indices, state_table_l, - degree_state_table_l, state_table_r, - degree_state_table_r, lru_manager: params.watermark_epoch, metrics: params.executor_stats, join_type_proto, @@ -138,9 +128,7 @@ struct AsOfJoinExecutorDispatcherArgs { null_safe: Vec, output_indices: Vec, state_table_l: StateTable, - degree_state_table_l: StateTable, state_table_r: StateTable, - degree_state_table_r: StateTable, lru_manager: AtomicU64Ref, metrics: Arc, join_type_proto: JoinTypeProto, @@ -167,9 +155,7 @@ impl HashKeyDispatcher for AsOfJoinExecutorDispatcherArgs { self.null_safe, self.output_indices, self.state_table_l, - self.degree_state_table_l, self.state_table_r, - self.degree_state_table_r, self.lru_manager, self.metrics, self.chunk_size, From bc5543ad1831d2837ca21cad076dc3eb878c4d30 Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Sat, 14 Sep 2024 17:09:12 +0800 Subject: [PATCH 2/3] remove file --- .../src/parser/protobuf/recursive.rs | 157 ------------------ 1 file changed, 157 deletions(-) delete mode 100644 src/connector/src/parser/protobuf/recursive.rs diff --git a/src/connector/src/parser/protobuf/recursive.rs b/src/connector/src/parser/protobuf/recursive.rs deleted file mode 100644 index dc367eb5f70cd..0000000000000 --- a/src/connector/src/parser/protobuf/recursive.rs +++ /dev/null @@ -1,157 +0,0 @@ -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct ComplexRecursiveMessage { - #[prost(string, tag = "1")] - pub node_name: ::prost::alloc::string::String, - #[prost(int32, tag = "2")] - pub node_id: i32, - #[prost(message, repeated, tag = "3")] - pub attributes: ::prost::alloc::vec::Vec, - #[prost(message, optional, tag = "4")] - pub parent: ::core::option::Option, - #[prost(message, repeated, tag = "5")] - pub children: ::prost::alloc::vec::Vec, -} -/// Nested message and enum types in `ComplexRecursiveMessage`. -pub mod complex_recursive_message { - #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Message)] - pub struct Attributes { - #[prost(string, tag = "1")] - pub key: ::prost::alloc::string::String, - #[prost(string, tag = "2")] - pub value: ::prost::alloc::string::String, - } - #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Message)] - pub struct Parent { - #[prost(string, tag = "1")] - pub parent_name: ::prost::alloc::string::String, - #[prost(int32, tag = "2")] - pub parent_id: i32, - #[prost(message, repeated, tag = "3")] - pub siblings: ::prost::alloc::vec::Vec, - } -} -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct AllTypes { - /// standard types - #[prost(double, tag = "1")] - pub double_field: f64, - #[prost(float, tag = "2")] - pub float_field: f32, - #[prost(int32, tag = "3")] - pub int32_field: i32, - #[prost(int64, tag = "4")] - pub int64_field: i64, - #[prost(uint32, tag = "5")] - pub uint32_field: u32, - #[prost(uint64, tag = "6")] - pub uint64_field: u64, - #[prost(sint32, tag = "7")] - pub sint32_field: i32, - #[prost(sint64, tag = "8")] - pub sint64_field: i64, - #[prost(fixed32, tag = "9")] - pub fixed32_field: u32, - #[prost(fixed64, tag = "10")] - pub fixed64_field: u64, - #[prost(sfixed32, tag = "11")] - pub sfixed32_field: i32, - #[prost(sfixed64, tag = "12")] - pub sfixed64_field: i64, - #[prost(bool, tag = "13")] - pub bool_field: bool, - #[prost(string, tag = "14")] - pub string_field: ::prost::alloc::string::String, - #[prost(bytes = "vec", tag = "15")] - pub bytes_field: ::prost::alloc::vec::Vec, - #[prost(enumeration = "all_types::EnumType", tag = "16")] - pub enum_field: i32, - #[prost(message, optional, tag = "17")] - pub nested_message_field: ::core::option::Option, - /// repeated field - #[prost(int32, repeated, tag = "18")] - pub repeated_int_field: ::prost::alloc::vec::Vec, - /// timestamp - #[prost(message, optional, tag = "23")] - pub timestamp_field: ::core::option::Option<::prost_types::Timestamp>, - /// duration - #[prost(message, optional, tag = "24")] - pub duration_field: ::core::option::Option<::prost_types::Duration>, - /// any - #[prost(message, optional, tag = "25")] - pub any_field: ::core::option::Option<::prost_types::Any>, - /// wrapper types - #[prost(message, optional, tag = "27")] - pub int32_value_field: ::core::option::Option, - #[prost(message, optional, tag = "28")] - pub string_value_field: ::core::option::Option<::prost::alloc::string::String>, - /// oneof field - #[prost(oneof = "all_types::ExampleOneof", tags = "19, 20, 21")] - pub example_oneof: ::core::option::Option, -} -/// Nested message and enum types in `AllTypes`. -pub mod all_types { - /// nested message - #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Message)] - pub struct NestedMessage { - #[prost(int32, tag = "1")] - pub id: i32, - #[prost(string, tag = "2")] - pub name: ::prost::alloc::string::String, - } - /// enum - #[derive( - Clone, - Copy, - Debug, - PartialEq, - Eq, - Hash, - PartialOrd, - Ord, - ::prost::Enumeration - )] - #[repr(i32)] - pub enum EnumType { - Default = 0, - Option1 = 1, - Option2 = 2, - } - impl EnumType { - /// String value of the enum field names used in the ProtoBuf definition. - /// - /// The values are not transformed in any way and thus are considered stable - /// (if the ProtoBuf definition does not change) and safe for programmatic use. - pub fn as_str_name(&self) -> &'static str { - match self { - EnumType::Default => "DEFAULT", - EnumType::Option1 => "OPTION1", - EnumType::Option2 => "OPTION2", - } - } - /// Creates an enum from field names used in the ProtoBuf definition. - pub fn from_str_name(value: &str) -> ::core::option::Option { - match value { - "DEFAULT" => Some(Self::Default), - "OPTION1" => Some(Self::Option1), - "OPTION2" => Some(Self::Option2), - _ => None, - } - } - } - /// oneof field - #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Oneof)] - pub enum ExampleOneof { - #[prost(string, tag = "19")] - OneofString(::prost::alloc::string::String), - #[prost(int32, tag = "20")] - OneofInt32(i32), - #[prost(enumeration = "EnumType", tag = "21")] - OneofEnum(i32), - } -} From 6b497d25092842da6d81d6cdd902343394dbed7b Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Wed, 18 Sep 2024 16:52:30 +0800 Subject: [PATCH 3/3] fix --- src/stream/src/executor/asof_join.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/stream/src/executor/asof_join.rs b/src/stream/src/executor/asof_join.rs index 97a2e5af78162..bd7aa9c1831bd 100644 --- a/src/stream/src/executor/asof_join.rs +++ b/src/stream/src/executor/asof_join.rs @@ -947,16 +947,14 @@ mod tests { .enumerate() .map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone())) .collect_vec(); - let state_table = StateTable::new_without_distribution( + StateTable::new_without_distribution( mem_state.clone(), TableId::new(table_id), column_descs, order_types.to_vec(), pk_indices.to_vec(), ) - .await; - - state_table + .await } async fn create_executor(