Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
yuhao-su committed Sep 14, 2024
1 parent 72396b2 commit 5796347
Show file tree
Hide file tree
Showing 6 changed files with 249 additions and 183 deletions.
14 changes: 5 additions & 9 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -463,22 +463,18 @@ message AsOfJoinNode {
catalog.Table left_table = 4;
// Used for internal table states.
catalog.Table right_table = 5;
// Used for internal table states.
catalog.Table left_degree_table = 6;
// Used for internal table states.
catalog.Table right_degree_table = 7;
// The output indices of current node
repeated uint32 output_indices = 8;
repeated uint32 output_indices = 6;
// Left deduped input pk indices. The pk of the left_table and
// The pk of the left_table is [left_join_key | left_inequality_key | left_deduped_input_pk_indices]
// left_inequality_key is not used but for forward compatibility.
repeated uint32 left_deduped_input_pk_indices = 9;
repeated uint32 left_deduped_input_pk_indices = 7;
// Right deduped input pk indices.
// The pk of the right_table is [right_join_key | right_inequality_key | right_deduped_input_pk_indices]
// right_inequality_key is not used but for forward compatibility.
repeated uint32 right_deduped_input_pk_indices = 10;
repeated bool null_safe = 11;
optional plan_common.AsOfJoinDesc asof_desc = 12;
repeated uint32 right_deduped_input_pk_indices = 8;
repeated bool null_safe = 9;
optional plan_common.AsOfJoinDesc asof_desc = 10;
}

message TemporalJoinNode {
Expand Down
157 changes: 157 additions & 0 deletions src/connector/src/parser/protobuf/recursive.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ComplexRecursiveMessage {
#[prost(string, tag = "1")]
pub node_name: ::prost::alloc::string::String,
#[prost(int32, tag = "2")]
pub node_id: i32,
#[prost(message, repeated, tag = "3")]
pub attributes: ::prost::alloc::vec::Vec<complex_recursive_message::Attributes>,
#[prost(message, optional, tag = "4")]
pub parent: ::core::option::Option<complex_recursive_message::Parent>,
#[prost(message, repeated, tag = "5")]
pub children: ::prost::alloc::vec::Vec<ComplexRecursiveMessage>,
}
/// Nested message and enum types in `ComplexRecursiveMessage`.
pub mod complex_recursive_message {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Attributes {
#[prost(string, tag = "1")]
pub key: ::prost::alloc::string::String,
#[prost(string, tag = "2")]
pub value: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Parent {
#[prost(string, tag = "1")]
pub parent_name: ::prost::alloc::string::String,
#[prost(int32, tag = "2")]
pub parent_id: i32,
#[prost(message, repeated, tag = "3")]
pub siblings: ::prost::alloc::vec::Vec<super::ComplexRecursiveMessage>,
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct AllTypes {
/// standard types
#[prost(double, tag = "1")]
pub double_field: f64,
#[prost(float, tag = "2")]
pub float_field: f32,
#[prost(int32, tag = "3")]
pub int32_field: i32,
#[prost(int64, tag = "4")]
pub int64_field: i64,
#[prost(uint32, tag = "5")]
pub uint32_field: u32,
#[prost(uint64, tag = "6")]
pub uint64_field: u64,
#[prost(sint32, tag = "7")]
pub sint32_field: i32,
#[prost(sint64, tag = "8")]
pub sint64_field: i64,
#[prost(fixed32, tag = "9")]
pub fixed32_field: u32,
#[prost(fixed64, tag = "10")]
pub fixed64_field: u64,
#[prost(sfixed32, tag = "11")]
pub sfixed32_field: i32,
#[prost(sfixed64, tag = "12")]
pub sfixed64_field: i64,
#[prost(bool, tag = "13")]
pub bool_field: bool,
#[prost(string, tag = "14")]
pub string_field: ::prost::alloc::string::String,
#[prost(bytes = "vec", tag = "15")]
pub bytes_field: ::prost::alloc::vec::Vec<u8>,
#[prost(enumeration = "all_types::EnumType", tag = "16")]
pub enum_field: i32,
#[prost(message, optional, tag = "17")]
pub nested_message_field: ::core::option::Option<all_types::NestedMessage>,
/// repeated field
#[prost(int32, repeated, tag = "18")]
pub repeated_int_field: ::prost::alloc::vec::Vec<i32>,
/// timestamp
#[prost(message, optional, tag = "23")]
pub timestamp_field: ::core::option::Option<::prost_types::Timestamp>,
/// duration
#[prost(message, optional, tag = "24")]
pub duration_field: ::core::option::Option<::prost_types::Duration>,
/// any
#[prost(message, optional, tag = "25")]
pub any_field: ::core::option::Option<::prost_types::Any>,
/// wrapper types
#[prost(message, optional, tag = "27")]
pub int32_value_field: ::core::option::Option<i32>,
#[prost(message, optional, tag = "28")]
pub string_value_field: ::core::option::Option<::prost::alloc::string::String>,
/// oneof field
#[prost(oneof = "all_types::ExampleOneof", tags = "19, 20, 21")]
pub example_oneof: ::core::option::Option<all_types::ExampleOneof>,
}
/// Nested message and enum types in `AllTypes`.
pub mod all_types {
/// nested message
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct NestedMessage {
#[prost(int32, tag = "1")]
pub id: i32,
#[prost(string, tag = "2")]
pub name: ::prost::alloc::string::String,
}
/// enum
#[derive(
Clone,
Copy,
Debug,
PartialEq,
Eq,
Hash,
PartialOrd,
Ord,
::prost::Enumeration
)]
#[repr(i32)]
pub enum EnumType {
Default = 0,
Option1 = 1,
Option2 = 2,
}
impl EnumType {
/// String value of the enum field names used in the ProtoBuf definition.
///
/// The values are not transformed in any way and thus are considered stable
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
pub fn as_str_name(&self) -> &'static str {
match self {
EnumType::Default => "DEFAULT",
EnumType::Option1 => "OPTION1",
EnumType::Option2 => "OPTION2",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value {
"DEFAULT" => Some(Self::Default),
"OPTION1" => Some(Self::Option1),
"OPTION2" => Some(Self::Option2),
_ => None,
}
}
}
/// oneof field
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum ExampleOneof {
#[prost(string, tag = "19")]
OneofString(::prost::alloc::string::String),
#[prost(int32, tag = "20")]
OneofInt32(i32),
#[prost(enumeration = "EnumType", tag = "21")]
OneofEnum(i32),
}
}
79 changes: 11 additions & 68 deletions src/stream/src/executor/asof_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,7 @@ impl<K: HashKey, S: StateStore, const T: AsOfJoinTypePrimitive> AsOfJoinExecutor
null_safe: Vec<bool>,
output_indices: Vec<usize>,
state_table_l: StateTable<S>,
degree_state_table_l: StateTable<S>,
state_table_r: StateTable<S>,
degree_state_table_r: StateTable<S>,
watermark_epoch: AtomicU64Ref,
metrics: Arc<StreamingMetrics>,
chunk_size: usize,
Expand Down Expand Up @@ -219,22 +217,9 @@ impl<K: HashKey, S: StateStore, const T: AsOfJoinTypePrimitive> AsOfJoinExecutor
let state_pk_indices_l = input_l.pk_indices().to_vec();
let state_pk_indices_r = input_r.pk_indices().to_vec();

let state_order_key_indices_l = state_table_l.pk_indices();
let state_order_key_indices_r = state_table_r.pk_indices();

let state_join_key_indices_l = params_l.join_key_indices;
let state_join_key_indices_r = params_r.join_key_indices;

let degree_join_key_indices_l = (0..state_join_key_indices_l.len()).collect_vec();
let degree_join_key_indices_r = (0..state_join_key_indices_r.len()).collect_vec();

let degree_pk_indices_l = (state_join_key_indices_l.len()
..state_join_key_indices_l.len() + params_l.deduped_pk_indices.len())
.collect_vec();
let degree_pk_indices_r = (state_join_key_indices_r.len()
..state_join_key_indices_r.len() + params_r.deduped_pk_indices.len())
.collect_vec();

// If pk is contained in join key.
let pk_contained_in_jk_l =
is_subset(state_pk_indices_l.clone(), state_join_key_indices_l.clone());
Expand All @@ -253,20 +238,8 @@ impl<K: HashKey, S: StateStore, const T: AsOfJoinTypePrimitive> AsOfJoinExecutor

assert_eq!(join_key_data_types_l, join_key_data_types_r);

let degree_all_data_types_l = state_order_key_indices_l
.iter()
.map(|idx| state_all_data_types_l[*idx].clone())
.collect_vec();
let degree_all_data_types_r = state_order_key_indices_r
.iter()
.map(|idx| state_all_data_types_r[*idx].clone())
.collect_vec();

let null_matched = K::Bitmap::from_bool_vec(null_safe);

let need_degree_table_l = false;
let need_degree_table_r = false;

let (left_to_output, right_to_output) = {
let (left_len, right_len) = if is_left_semi_or_anti(T) {
(state_all_data_types_l.len(), 0usize)
Expand Down Expand Up @@ -303,12 +276,8 @@ impl<K: HashKey, S: StateStore, const T: AsOfJoinTypePrimitive> AsOfJoinExecutor
state_all_data_types_l.clone(),
state_table_l,
params_l.deduped_pk_indices,
degree_join_key_indices_l,
degree_all_data_types_l,
degree_state_table_l,
degree_pk_indices_l,
None,
null_matched.clone(),
need_degree_table_l,
pk_contained_in_jk_l,
inequal_key_idx_l,
metrics.clone(),
Expand All @@ -321,7 +290,7 @@ impl<K: HashKey, S: StateStore, const T: AsOfJoinTypePrimitive> AsOfJoinExecutor
i2o_mapping: left_to_output,
i2o_mapping_indexed: l2o_indexed,
start_pos: 0,
need_degree_table: need_degree_table_l,
need_degree_table: false,
},
side_r: JoinSide {
ht: JoinHashMap::new(
Expand All @@ -331,12 +300,8 @@ impl<K: HashKey, S: StateStore, const T: AsOfJoinTypePrimitive> AsOfJoinExecutor
state_all_data_types_r.clone(),
state_table_r,
params_r.deduped_pk_indices,
degree_join_key_indices_r,
degree_all_data_types_r,
degree_state_table_r,
degree_pk_indices_r,
None,
null_matched,
need_degree_table_r,
pk_contained_in_jk_r,
inequal_key_idx_r,
metrics.clone(),
Expand All @@ -349,7 +314,7 @@ impl<K: HashKey, S: StateStore, const T: AsOfJoinTypePrimitive> AsOfJoinExecutor
start_pos: side_l_column_n,
i2o_mapping: right_to_output,
i2o_mapping_indexed: r2o_indexed,
need_degree_table: need_degree_table_r,
need_degree_table: false,
},
metrics,
chunk_size,
Expand Down Expand Up @@ -675,7 +640,7 @@ impl<K: HashKey, S: StateStore, const T: AsOfJoinTypePrimitive> AsOfJoinExecutor
{
yield chunk;
}
side_update.ht.insert_row(key, row).await?;
side_update.ht.insert_row(key, row)?;
}
Op::Delete | Op::UpdateDelete => {
if let Some(matched_row_by_inequality) = matched_row_by_inequality {
Expand Down Expand Up @@ -924,7 +889,7 @@ impl<K: HashKey, S: StateStore, const T: AsOfJoinTypePrimitive> AsOfJoinExecutor

match op {
Op::Insert | Op::UpdateInsert => {
side_update.ht.insert_row(key, row).await?;
side_update.ht.insert_row(key, row)?;
}
Op::Delete | Op::UpdateDelete => {
side_update.ht.delete_row(key, row)?;
Expand Down Expand Up @@ -976,7 +941,7 @@ mod tests {
order_types: &[OrderType],
pk_indices: &[usize],
table_id: u32,
) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
) -> StateTable<MemoryStateStore> {
let column_descs = data_types
.iter()
.enumerate()
Expand All @@ -991,27 +956,7 @@ mod tests {
)
.await;

// Create degree table
let mut degree_table_column_descs = vec![];
pk_indices.iter().enumerate().for_each(|(pk_id, idx)| {
degree_table_column_descs.push(ColumnDesc::unnamed(
ColumnId::new(pk_id as i32),
data_types[*idx].clone(),
))
});
degree_table_column_descs.push(ColumnDesc::unnamed(
ColumnId::new(pk_indices.len() as i32),
DataType::Int64,
));
let degree_state_table = StateTable::new_without_distribution(
mem_state,
TableId::new(table_id + 1),
degree_table_column_descs,
order_types.to_vec(),
pk_indices.to_vec(),
)
.await;
(state_table, degree_state_table)
state_table
}

async fn create_executor<const T: AsOfJoinTypePrimitive>(
Expand All @@ -1033,7 +978,7 @@ mod tests {

let mem_state = MemoryStateStore::new();

let (state_l, degree_state_l) = create_in_memory_state_table(
let state_l = create_in_memory_state_table(
mem_state.clone(),
&[DataType::Int64, DataType::Int64, DataType::Int64],
&[
Expand All @@ -1046,7 +991,7 @@ mod tests {
)
.await;

let (state_r, degree_state_r) = create_in_memory_state_table(
let state_r = create_in_memory_state_table(
mem_state,
&[DataType::Int64, DataType::Int64, DataType::Int64],
&[
Expand All @@ -1055,7 +1000,7 @@ mod tests {
OrderType::ascending(),
],
&[0, asof_desc.right_idx, 1],
2,
1,
)
.await;

Expand All @@ -1080,9 +1025,7 @@ mod tests {
vec![false],
(0..schema_len).collect_vec(),
state_l,
degree_state_l,
state_r,
degree_state_r,
Arc::new(AtomicU64::new(0)),
Arc::new(StreamingMetrics::unused()),
1024,
Expand Down
Loading

0 comments on commit 5796347

Please sign in to comment.