From 062dd8ed6ebd9062cef31698f366b09391d238e8 Mon Sep 17 00:00:00 2001 From: TennyZhuang Date: Fri, 19 Jan 2024 00:15:35 +0800 Subject: [PATCH 1/8] perf(stream): concurrently lookup remote in temporal join Signed-off-by: TennyZhuang --- src/stream/src/executor/temporal_join.rs | 177 +++++++++++++++++------ 1 file changed, 134 insertions(+), 43 deletions(-) diff --git a/src/stream/src/executor/temporal_join.rs b/src/stream/src/executor/temporal_join.rs index 099abe658f615..d4d7afd7a1ec6 100644 --- a/src/stream/src/executor/temporal_join.rs +++ b/src/stream/src/executor/temporal_join.rs @@ -21,11 +21,11 @@ use std::sync::Arc; use either::Either; use futures::stream::{self, PollNext}; -use futures::{pin_mut, StreamExt, TryStreamExt}; +use futures::{future, pin_mut, StreamExt, TryStreamExt}; use futures_async_stream::try_stream; use local_stats_alloc::{SharedStatsAlloc, StatsAlloc}; use lru::DefaultHasher; -use risingwave_common::array::{Op, StreamChunk}; +use risingwave_common::array::{Op, RowRef, StreamChunk}; use risingwave_common::catalog::Schema; use risingwave_common::estimate_size::{EstimateSize, KvSize}; use risingwave_common::hash::{HashKey, NullBitmap}; @@ -148,9 +148,8 @@ struct TemporalSide { } impl TemporalSide { - /// Lookup the temporal side table and return a `JoinEntry` which could be empty if there are no - /// matched records. - async fn lookup(&mut self, key: &K, epoch: HummockEpoch) -> StreamExecutorResult { + /// Lookup the temporal side table cache and return a `Some(JoinEntry)`. If `None` was returned, then the result can't be found in cache. + fn lookup_cache(&mut self, key: &K) -> Option { let table_id_str = self.source.table_id().to_string(); let actor_id_str = self.ctx.id.to_string(); let fragment_id_str = self.ctx.id.to_string(); @@ -159,10 +158,9 @@ impl TemporalSide { .temporal_join_total_query_cache_count .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str]) .inc(); - - let res = if self.cache.contains(key) { + if self.cache.contains(key) { let mut state = self.cache.peek_mut(key).unwrap(); - state.take() + Some(state.take()) } else { // cache miss self.ctx @@ -170,36 +168,39 @@ impl TemporalSide { .temporal_join_cache_miss_count .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str]) .inc(); + None + } + } - let pk_prefix = key.deserialize(&self.join_key_data_types)?; - - let iter = self - .source - .batch_iter_with_pk_bounds( - HummockReadEpoch::NoWait(epoch), - &pk_prefix, - .., - false, - PrefetchOptions::default(), - ) - .await?; - - let mut entry = JoinEntry::default(); - - pin_mut!(iter); - while let Some(row) = iter.next_row().await? { - entry.insert( - row.as_ref() - .project(&self.table_stream_key_indices) - .into_owned_row(), - row.project(&self.table_output_indices).into_owned_row(), - ); - } - - entry - }; + /// Lookup the temporal side table and return a `JoinEntry` which could be empty if there are no + /// matched records. The lookup will bypass the cache, it's better to call [`Self::lookup_cache`] first. + async fn lookup_table(&self, key: &K, epoch: HummockEpoch) -> StreamExecutorResult { + let pk_prefix = key.deserialize(&self.join_key_data_types)?; + + let iter = self + .source + .batch_iter_with_pk_bounds( + HummockReadEpoch::NoWait(epoch), + &pk_prefix, + .., + false, + PrefetchOptions::default(), + ) + .await?; + + let mut entry = JoinEntry::default(); + + pin_mut!(iter); + while let Some(row) = iter.next_row().await? { + entry.insert( + row.as_ref() + .project(&self.table_stream_key_indices) + .into_owned_row(), + row.project(&self.table_output_indices).into_owned_row(), + ); + } - Ok(res) + Ok(entry) } fn update( @@ -427,12 +428,103 @@ impl TemporalJoinExecutor ); let epoch = prev_epoch.expect("Chunk data should come after some barrier."); let keys = K::build(&self.left_join_keys, chunk.data_chunk())?; - for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) { - let Some((op, left_row)) = r else { - continue; - }; - if key.null_bitmap().is_subset(&null_matched) - && let join_entry = self.right_table.lookup(&key, epoch).await? + + // Fetch the local cache. + struct FetchingCacheItem<'a, K> { + op: Op, + left_row: RowRef<'a>, + key: K, + inner: FetchingCacheItemInner, + } + enum FetchingCacheItemInner { + NotSubsetOfNullMatched, + Cached(JoinEntry), + NotFoundInCache, + } + let fetching_cache_items: Vec> = chunk + .rows_with_holes() + .zip_eq_debug(keys.into_iter()) + .filter_map(|(r, key)| { + use FetchingCacheItemInner::*; + let Some((op, left_row)) = r else { + return None; + }; + if !key.null_bitmap().is_subset(&null_matched) { + return Some(FetchingCacheItem { + op, + left_row, + key, + inner: NotSubsetOfNullMatched, + }); + } + let inner = match self.right_table.lookup_cache(&key) { + Some(entry) => Cached(entry), + None => NotFoundInCache, + }; + Some(FetchingCacheItem { + op, + left_row, + key, + inner, + }) + }) + .collect(); + + // Step 2: Fetch remote. + struct FetchingTableItem<'a, K> { + op: Op, + left_row: RowRef<'a>, + key: K, + inner: FetchingTableItemInner, + } + enum FetchingTableItemInner { + NotSubsetOfNullMatched, + Entry(JoinEntry), + } + let fetching_table_futs = fetching_cache_items.into_iter().map( + |FetchingCacheItem { + op, + left_row, + key, + inner, + }| { + let right_table = &self.right_table; + async move { + let inner = match inner { + FetchingCacheItemInner::NotSubsetOfNullMatched => { + Ok(FetchingTableItemInner::NotSubsetOfNullMatched) + as StreamExecutorResult<_> + } + FetchingCacheItemInner::Cached(entry) => { + Ok(FetchingTableItemInner::Entry(entry)) + as StreamExecutorResult<_> + } + FetchingCacheItemInner::NotFoundInCache => right_table + .lookup_table(&key, epoch) + .await + .map(FetchingTableItemInner::Entry), + }; + Ok(FetchingTableItem { + op, + left_row, + key, + inner: inner?, + }) as StreamExecutorResult<_> + } + }, + ); + // Fetching table can be concurrent. + let fetching_table_items: Vec> = + future::try_join_all(fetching_table_futs).await?; + + for FetchingTableItem { + op, + left_row, + key, + inner, + } in fetching_table_items + { + if let FetchingTableItemInner::Entry(join_entry) = inner && !join_entry.is_empty() { for right_row in join_entry.cached.values() { @@ -446,7 +538,6 @@ impl TemporalJoinExecutor } else { true }; - if ok { if let Some(chunk) = builder.append_row(op, left_row, right_row) { From b3269890bced5f72cc81b479cf280e1504a91813 Mon Sep 17 00:00:00 2001 From: TennyZhuang Date: Sat, 20 Jan 2024 23:55:07 +0800 Subject: [PATCH 2/8] Revert "perf(stream): concurrently lookup remote in temporal join" This reverts commit 062dd8ed6ebd9062cef31698f366b09391d238e8. --- src/stream/src/executor/temporal_join.rs | 177 ++++++----------------- 1 file changed, 43 insertions(+), 134 deletions(-) diff --git a/src/stream/src/executor/temporal_join.rs b/src/stream/src/executor/temporal_join.rs index d4d7afd7a1ec6..099abe658f615 100644 --- a/src/stream/src/executor/temporal_join.rs +++ b/src/stream/src/executor/temporal_join.rs @@ -21,11 +21,11 @@ use std::sync::Arc; use either::Either; use futures::stream::{self, PollNext}; -use futures::{future, pin_mut, StreamExt, TryStreamExt}; +use futures::{pin_mut, StreamExt, TryStreamExt}; use futures_async_stream::try_stream; use local_stats_alloc::{SharedStatsAlloc, StatsAlloc}; use lru::DefaultHasher; -use risingwave_common::array::{Op, RowRef, StreamChunk}; +use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::catalog::Schema; use risingwave_common::estimate_size::{EstimateSize, KvSize}; use risingwave_common::hash::{HashKey, NullBitmap}; @@ -148,8 +148,9 @@ struct TemporalSide { } impl TemporalSide { - /// Lookup the temporal side table cache and return a `Some(JoinEntry)`. If `None` was returned, then the result can't be found in cache. - fn lookup_cache(&mut self, key: &K) -> Option { + /// Lookup the temporal side table and return a `JoinEntry` which could be empty if there are no + /// matched records. + async fn lookup(&mut self, key: &K, epoch: HummockEpoch) -> StreamExecutorResult { let table_id_str = self.source.table_id().to_string(); let actor_id_str = self.ctx.id.to_string(); let fragment_id_str = self.ctx.id.to_string(); @@ -158,9 +159,10 @@ impl TemporalSide { .temporal_join_total_query_cache_count .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str]) .inc(); - if self.cache.contains(key) { + + let res = if self.cache.contains(key) { let mut state = self.cache.peek_mut(key).unwrap(); - Some(state.take()) + state.take() } else { // cache miss self.ctx @@ -168,39 +170,36 @@ impl TemporalSide { .temporal_join_cache_miss_count .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str]) .inc(); - None - } - } - /// Lookup the temporal side table and return a `JoinEntry` which could be empty if there are no - /// matched records. The lookup will bypass the cache, it's better to call [`Self::lookup_cache`] first. - async fn lookup_table(&self, key: &K, epoch: HummockEpoch) -> StreamExecutorResult { - let pk_prefix = key.deserialize(&self.join_key_data_types)?; - - let iter = self - .source - .batch_iter_with_pk_bounds( - HummockReadEpoch::NoWait(epoch), - &pk_prefix, - .., - false, - PrefetchOptions::default(), - ) - .await?; - - let mut entry = JoinEntry::default(); - - pin_mut!(iter); - while let Some(row) = iter.next_row().await? { - entry.insert( - row.as_ref() - .project(&self.table_stream_key_indices) - .into_owned_row(), - row.project(&self.table_output_indices).into_owned_row(), - ); - } + let pk_prefix = key.deserialize(&self.join_key_data_types)?; + + let iter = self + .source + .batch_iter_with_pk_bounds( + HummockReadEpoch::NoWait(epoch), + &pk_prefix, + .., + false, + PrefetchOptions::default(), + ) + .await?; + + let mut entry = JoinEntry::default(); + + pin_mut!(iter); + while let Some(row) = iter.next_row().await? { + entry.insert( + row.as_ref() + .project(&self.table_stream_key_indices) + .into_owned_row(), + row.project(&self.table_output_indices).into_owned_row(), + ); + } + + entry + }; - Ok(entry) + Ok(res) } fn update( @@ -428,103 +427,12 @@ impl TemporalJoinExecutor ); let epoch = prev_epoch.expect("Chunk data should come after some barrier."); let keys = K::build(&self.left_join_keys, chunk.data_chunk())?; - - // Fetch the local cache. - struct FetchingCacheItem<'a, K> { - op: Op, - left_row: RowRef<'a>, - key: K, - inner: FetchingCacheItemInner, - } - enum FetchingCacheItemInner { - NotSubsetOfNullMatched, - Cached(JoinEntry), - NotFoundInCache, - } - let fetching_cache_items: Vec> = chunk - .rows_with_holes() - .zip_eq_debug(keys.into_iter()) - .filter_map(|(r, key)| { - use FetchingCacheItemInner::*; - let Some((op, left_row)) = r else { - return None; - }; - if !key.null_bitmap().is_subset(&null_matched) { - return Some(FetchingCacheItem { - op, - left_row, - key, - inner: NotSubsetOfNullMatched, - }); - } - let inner = match self.right_table.lookup_cache(&key) { - Some(entry) => Cached(entry), - None => NotFoundInCache, - }; - Some(FetchingCacheItem { - op, - left_row, - key, - inner, - }) - }) - .collect(); - - // Step 2: Fetch remote. - struct FetchingTableItem<'a, K> { - op: Op, - left_row: RowRef<'a>, - key: K, - inner: FetchingTableItemInner, - } - enum FetchingTableItemInner { - NotSubsetOfNullMatched, - Entry(JoinEntry), - } - let fetching_table_futs = fetching_cache_items.into_iter().map( - |FetchingCacheItem { - op, - left_row, - key, - inner, - }| { - let right_table = &self.right_table; - async move { - let inner = match inner { - FetchingCacheItemInner::NotSubsetOfNullMatched => { - Ok(FetchingTableItemInner::NotSubsetOfNullMatched) - as StreamExecutorResult<_> - } - FetchingCacheItemInner::Cached(entry) => { - Ok(FetchingTableItemInner::Entry(entry)) - as StreamExecutorResult<_> - } - FetchingCacheItemInner::NotFoundInCache => right_table - .lookup_table(&key, epoch) - .await - .map(FetchingTableItemInner::Entry), - }; - Ok(FetchingTableItem { - op, - left_row, - key, - inner: inner?, - }) as StreamExecutorResult<_> - } - }, - ); - // Fetching table can be concurrent. - let fetching_table_items: Vec> = - future::try_join_all(fetching_table_futs).await?; - - for FetchingTableItem { - op, - left_row, - key, - inner, - } in fetching_table_items - { - if let FetchingTableItemInner::Entry(join_entry) = inner + for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) { + let Some((op, left_row)) = r else { + continue; + }; + if key.null_bitmap().is_subset(&null_matched) + && let join_entry = self.right_table.lookup(&key, epoch).await? && !join_entry.is_empty() { for right_row in join_entry.cached.values() { @@ -538,6 +446,7 @@ impl TemporalJoinExecutor } else { true }; + if ok { if let Some(chunk) = builder.append_row(op, left_row, right_row) { From f6d65aacd5aa4e8ca5b78f00b4f487809a733792 Mon Sep 17 00:00:00 2001 From: TennyZhuang Date: Mon, 22 Jan 2024 01:05:00 +0800 Subject: [PATCH 3/8] reimplement using Mutex Signed-off-by: TennyZhuang --- src/stream/Cargo.toml | 2 +- src/stream/src/executor/temporal_join.rs | 159 +++++++++++++++-------- 2 files changed, 107 insertions(+), 54 deletions(-) diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml index 7cce4531be55f..7f2b52a678412 100644 --- a/src/stream/Cargo.toml +++ b/src/stream/Cargo.toml @@ -41,7 +41,7 @@ lru = { git = "https://github.com/risingwavelabs/lru-rs.git", rev = "cb2d7c7" } maplit = "1.0.2" memcomparable = "0.2" multimap = "0.10" -parking_lot = "0.12" +parking_lot = { version = "0.12", features = ["arc_lock"] } pin-project = "1" prometheus = { version = "0.13", features = ["process"] } prost = { workspace = true } diff --git a/src/stream/src/executor/temporal_join.rs b/src/stream/src/executor/temporal_join.rs index 099abe658f615..eb6f100bbfdad 100644 --- a/src/stream/src/executor/temporal_join.rs +++ b/src/stream/src/executor/temporal_join.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use either::Either; use futures::stream::{self, PollNext}; -use futures::{pin_mut, StreamExt, TryStreamExt}; +use futures::{future, pin_mut, StreamExt, TryStreamExt}; use futures_async_stream::try_stream; use local_stats_alloc::{SharedStatsAlloc, StatsAlloc}; use lru::DefaultHasher; @@ -29,7 +29,7 @@ use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::catalog::Schema; use risingwave_common::estimate_size::{EstimateSize, KvSize}; use risingwave_common::hash::{HashKey, NullBitmap}; -use risingwave_common::row::{OwnedRow, Row, RowExt}; +use risingwave_common::row::{self, OwnedRow, Row, RowExt}; use risingwave_common::types::DataType; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_expr::expr::NonStrictExpression; @@ -38,6 +38,7 @@ use risingwave_storage::store::PrefetchOptions; use risingwave_storage::table::batch_table::storage_table::StorageTable; use risingwave_storage::table::TableIter; use risingwave_storage::StateStore; +use tokio::sync::Mutex; use super::{ Barrier, Executor, ExecutorInfo, Message, MessageStream, StreamExecutorError, @@ -55,7 +56,8 @@ pub struct TemporalJoinExecutor, + // Mutex here acts as a `RefCell` that implements `Sync`, it'll never be blocked. + right_table: Mutex>, left_join_keys: Vec, right_join_keys: Vec, null_safe: Vec, @@ -368,14 +370,14 @@ impl TemporalJoinExecutor info, left, right, - right_table: TemporalSide { + right_table: Mutex::new(TemporalSide { source: table, table_stream_key_indices, table_output_indices, cache, ctx, join_key_data_types, - }, + }), left_join_keys, right_join_keys, null_safe, @@ -387,7 +389,7 @@ impl TemporalJoinExecutor } #[try_stream(ok = Message, error = StreamExecutorError)] - async fn into_stream(mut self) { + async fn into_stream(self) { let (left_map, right_map) = JoinStreamChunkBuilder::get_i2o_mapping( &self.output_indices, self.left.schema().len(), @@ -402,17 +404,23 @@ impl TemporalJoinExecutor let mut prev_epoch = None; - let table_id_str = self.right_table.source.table_id().to_string(); + let table_id_str = { + let right_table = self.right_table.try_lock().unwrap(); + right_table.source.table_id().to_string() + }; let actor_id_str = self.ctx.id.to_string(); let fragment_id_str = self.ctx.fragment_id.to_string(); #[for_await] for msg in align_input(self.left, self.right) { - self.right_table.cache.evict(); - self.ctx - .streaming_metrics - .temporal_join_cached_entry_count - .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str]) - .set(self.right_table.cache.len() as i64); + { + let mut right_table = self.right_table.try_lock().unwrap(); + right_table.cache.evict(); + self.ctx + .streaming_metrics + .temporal_join_cached_entry_count + .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str]) + .set(right_table.cache.len() as i64); + } match msg? { InternalMessage::WaterMark(watermark) => { let output_watermark_col_idx = *left_to_output.get(&watermark.col_idx).unwrap(); @@ -425,61 +433,106 @@ impl TemporalJoinExecutor left_map.clone(), right_map.clone(), ); + let epoch = prev_epoch.expect("Chunk data should come after some barrier."); let keys = K::build(&self.left_join_keys, chunk.data_chunk())?; - for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) { - let Some((op, left_row)) = r else { - continue; - }; - if key.null_bitmap().is_subset(&null_matched) - && let join_entry = self.right_table.lookup(&key, epoch).await? - && !join_entry.is_empty() - { - for right_row in join_entry.cached.values() { - // check join condition - let ok = if let Some(ref mut cond) = self.condition { - let concat_row = left_row.chain(&right_row).into_owned_row(); - cond.eval_row_infallible(&concat_row) - .await - .map(|s| *s.as_bool()) - .unwrap_or(false) - } else { - true - }; - if ok { - if let Some(chunk) = builder.append_row(op, left_row, right_row) + enum RowAction { + Append { op: Op, left_row: R1, right_row: R2 }, + Update { op: Op, left_row: R1 }, + } + let futs = chunk + .rows_with_holes() + .zip_eq_debug(keys.into_iter()) + .filter_map(|(r, key)| { + let (op, left_row) = r?; + let right_table = &self.right_table; + let condition = &self.condition; + let null_matched = &null_matched; + + Some(async move { + let actions = if key.null_bitmap().is_subset(null_matched) + && let join_entry = { + let mut right_table = right_table.try_lock().unwrap(); + right_table.lookup(&key, epoch).await? + } + && !join_entry.is_empty() + { + let mut actions = + Vec::with_capacity(join_entry.cached.values().len()); + for right_row in join_entry.cached.values() { + // check join condition + let ok = if let Some(ref cond) = condition { + let concat_row = + left_row.chain(&right_row).into_owned_row(); + cond.eval_row_infallible(&concat_row) + .await + .map(|s| *s.as_bool()) + .unwrap_or(false) + } else { + true + }; + + if ok { + actions.push(RowAction::Append { + op, + left_row, + right_row: right_row.to_owned_row(), + }); + } + } + + // Insert back the state taken from ht. { - yield Message::Chunk(chunk); + let mut right_table = right_table.try_lock().unwrap(); + right_table.insert_back(key.clone(), join_entry); } - } - } - // Insert back the state taken from ht. - self.right_table.insert_back(key.clone(), join_entry); - } else if T == JoinType::LeftOuter { - if let Some(chunk) = builder.append_row_update(op, left_row) { - yield Message::Chunk(chunk); + + actions + } else if T == JoinType::LeftOuter { + vec![RowAction::Update { op, left_row }] + } else { + vec![] + }; + Ok(actions) as StreamExecutorResult<_> + }) + }); + for row_action in future::try_join_all(futs).await?.into_iter().flatten() { + if let Some(chunk) = match row_action { + RowAction::Append { + op, + left_row, + right_row, + } => builder.append_row(op, left_row, right_row), + RowAction::Update { op, left_row } => { + builder.append_row_update(op, left_row) } + } { + yield Message::Chunk(chunk); } } + if let Some(chunk) = builder.take() { yield Message::Chunk(chunk); } } InternalMessage::Barrier(updates, barrier) => { - if let Some(vnodes) = barrier.as_update_vnode_bitmap(self.ctx.id) { - let prev_vnodes = - self.right_table.source.update_vnode_bitmap(vnodes.clone()); - if cache_may_stale(&prev_vnodes, &vnodes) { - self.right_table.cache.clear(); + { + let mut right_table = self.right_table.try_lock().unwrap(); + if let Some(vnodes) = barrier.as_update_vnode_bitmap(self.ctx.id) { + let prev_vnodes = + right_table.source.update_vnode_bitmap(vnodes.clone()); + if cache_may_stale(&prev_vnodes, &vnodes) { + right_table.cache.clear(); + } } + right_table.cache.update_epoch(barrier.epoch.curr); + right_table.update( + updates, + &self.right_join_keys, + &right_stream_key_indices, + )?; } - self.right_table.cache.update_epoch(barrier.epoch.curr); - self.right_table.update( - updates, - &self.right_join_keys, - &right_stream_key_indices, - )?; prev_epoch = Some(barrier.epoch.curr); yield Message::Barrier(barrier) } From 9a8aa127ab3ae54b6d42774fa9df69e982dc3b67 Mon Sep 17 00:00:00 2001 From: TennyZhuang Date: Mon, 22 Jan 2024 13:13:03 +0800 Subject: [PATCH 4/8] revert an unnecessary dep change Signed-off-by: TennyZhuang --- src/stream/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml index 7f2b52a678412..7cce4531be55f 100644 --- a/src/stream/Cargo.toml +++ b/src/stream/Cargo.toml @@ -41,7 +41,7 @@ lru = { git = "https://github.com/risingwavelabs/lru-rs.git", rev = "cb2d7c7" } maplit = "1.0.2" memcomparable = "0.2" multimap = "0.10" -parking_lot = { version = "0.12", features = ["arc_lock"] } +parking_lot = "0.12" pin-project = "1" prometheus = { version = "0.13", features = ["process"] } prost = { workspace = true } From 6433579da6c26f4c41fe2eea278af49bdba51a74 Mon Sep 17 00:00:00 2001 From: TennyZhuang Date: Mon, 22 Jan 2024 17:30:22 +0800 Subject: [PATCH 5/8] Revert "revert an unnecessary dep change" This reverts commit 9a8aa127ab3ae54b6d42774fa9df69e982dc3b67. --- src/stream/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml index 7cce4531be55f..7f2b52a678412 100644 --- a/src/stream/Cargo.toml +++ b/src/stream/Cargo.toml @@ -41,7 +41,7 @@ lru = { git = "https://github.com/risingwavelabs/lru-rs.git", rev = "cb2d7c7" } maplit = "1.0.2" memcomparable = "0.2" multimap = "0.10" -parking_lot = "0.12" +parking_lot = { version = "0.12", features = ["arc_lock"] } pin-project = "1" prometheus = { version = "0.13", features = ["process"] } prost = { workspace = true } From f36958565328531b75a67066c923f54126f90379 Mon Sep 17 00:00:00 2001 From: TennyZhuang Date: Mon, 22 Jan 2024 17:30:25 +0800 Subject: [PATCH 6/8] Revert "reimplement using Mutex" This reverts commit f6d65aacd5aa4e8ca5b78f00b4f487809a733792. --- src/stream/Cargo.toml | 2 +- src/stream/src/executor/temporal_join.rs | 159 ++++++++--------------- 2 files changed, 54 insertions(+), 107 deletions(-) diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml index 7f2b52a678412..7cce4531be55f 100644 --- a/src/stream/Cargo.toml +++ b/src/stream/Cargo.toml @@ -41,7 +41,7 @@ lru = { git = "https://github.com/risingwavelabs/lru-rs.git", rev = "cb2d7c7" } maplit = "1.0.2" memcomparable = "0.2" multimap = "0.10" -parking_lot = { version = "0.12", features = ["arc_lock"] } +parking_lot = "0.12" pin-project = "1" prometheus = { version = "0.13", features = ["process"] } prost = { workspace = true } diff --git a/src/stream/src/executor/temporal_join.rs b/src/stream/src/executor/temporal_join.rs index eb6f100bbfdad..099abe658f615 100644 --- a/src/stream/src/executor/temporal_join.rs +++ b/src/stream/src/executor/temporal_join.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use either::Either; use futures::stream::{self, PollNext}; -use futures::{future, pin_mut, StreamExt, TryStreamExt}; +use futures::{pin_mut, StreamExt, TryStreamExt}; use futures_async_stream::try_stream; use local_stats_alloc::{SharedStatsAlloc, StatsAlloc}; use lru::DefaultHasher; @@ -29,7 +29,7 @@ use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::catalog::Schema; use risingwave_common::estimate_size::{EstimateSize, KvSize}; use risingwave_common::hash::{HashKey, NullBitmap}; -use risingwave_common::row::{self, OwnedRow, Row, RowExt}; +use risingwave_common::row::{OwnedRow, Row, RowExt}; use risingwave_common::types::DataType; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_expr::expr::NonStrictExpression; @@ -38,7 +38,6 @@ use risingwave_storage::store::PrefetchOptions; use risingwave_storage::table::batch_table::storage_table::StorageTable; use risingwave_storage::table::TableIter; use risingwave_storage::StateStore; -use tokio::sync::Mutex; use super::{ Barrier, Executor, ExecutorInfo, Message, MessageStream, StreamExecutorError, @@ -56,8 +55,7 @@ pub struct TemporalJoinExecutor>, + right_table: TemporalSide, left_join_keys: Vec, right_join_keys: Vec, null_safe: Vec, @@ -370,14 +368,14 @@ impl TemporalJoinExecutor info, left, right, - right_table: Mutex::new(TemporalSide { + right_table: TemporalSide { source: table, table_stream_key_indices, table_output_indices, cache, ctx, join_key_data_types, - }), + }, left_join_keys, right_join_keys, null_safe, @@ -389,7 +387,7 @@ impl TemporalJoinExecutor } #[try_stream(ok = Message, error = StreamExecutorError)] - async fn into_stream(self) { + async fn into_stream(mut self) { let (left_map, right_map) = JoinStreamChunkBuilder::get_i2o_mapping( &self.output_indices, self.left.schema().len(), @@ -404,23 +402,17 @@ impl TemporalJoinExecutor let mut prev_epoch = None; - let table_id_str = { - let right_table = self.right_table.try_lock().unwrap(); - right_table.source.table_id().to_string() - }; + let table_id_str = self.right_table.source.table_id().to_string(); let actor_id_str = self.ctx.id.to_string(); let fragment_id_str = self.ctx.fragment_id.to_string(); #[for_await] for msg in align_input(self.left, self.right) { - { - let mut right_table = self.right_table.try_lock().unwrap(); - right_table.cache.evict(); - self.ctx - .streaming_metrics - .temporal_join_cached_entry_count - .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str]) - .set(right_table.cache.len() as i64); - } + self.right_table.cache.evict(); + self.ctx + .streaming_metrics + .temporal_join_cached_entry_count + .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str]) + .set(self.right_table.cache.len() as i64); match msg? { InternalMessage::WaterMark(watermark) => { let output_watermark_col_idx = *left_to_output.get(&watermark.col_idx).unwrap(); @@ -433,106 +425,61 @@ impl TemporalJoinExecutor left_map.clone(), right_map.clone(), ); - let epoch = prev_epoch.expect("Chunk data should come after some barrier."); let keys = K::build(&self.left_join_keys, chunk.data_chunk())?; + for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) { + let Some((op, left_row)) = r else { + continue; + }; + if key.null_bitmap().is_subset(&null_matched) + && let join_entry = self.right_table.lookup(&key, epoch).await? + && !join_entry.is_empty() + { + for right_row in join_entry.cached.values() { + // check join condition + let ok = if let Some(ref mut cond) = self.condition { + let concat_row = left_row.chain(&right_row).into_owned_row(); + cond.eval_row_infallible(&concat_row) + .await + .map(|s| *s.as_bool()) + .unwrap_or(false) + } else { + true + }; - enum RowAction { - Append { op: Op, left_row: R1, right_row: R2 }, - Update { op: Op, left_row: R1 }, - } - let futs = chunk - .rows_with_holes() - .zip_eq_debug(keys.into_iter()) - .filter_map(|(r, key)| { - let (op, left_row) = r?; - let right_table = &self.right_table; - let condition = &self.condition; - let null_matched = &null_matched; - - Some(async move { - let actions = if key.null_bitmap().is_subset(null_matched) - && let join_entry = { - let mut right_table = right_table.try_lock().unwrap(); - right_table.lookup(&key, epoch).await? - } - && !join_entry.is_empty() - { - let mut actions = - Vec::with_capacity(join_entry.cached.values().len()); - for right_row in join_entry.cached.values() { - // check join condition - let ok = if let Some(ref cond) = condition { - let concat_row = - left_row.chain(&right_row).into_owned_row(); - cond.eval_row_infallible(&concat_row) - .await - .map(|s| *s.as_bool()) - .unwrap_or(false) - } else { - true - }; - - if ok { - actions.push(RowAction::Append { - op, - left_row, - right_row: right_row.to_owned_row(), - }); - } - } - - // Insert back the state taken from ht. + if ok { + if let Some(chunk) = builder.append_row(op, left_row, right_row) { - let mut right_table = right_table.try_lock().unwrap(); - right_table.insert_back(key.clone(), join_entry); + yield Message::Chunk(chunk); } - - actions - } else if T == JoinType::LeftOuter { - vec![RowAction::Update { op, left_row }] - } else { - vec![] - }; - Ok(actions) as StreamExecutorResult<_> - }) - }); - for row_action in future::try_join_all(futs).await?.into_iter().flatten() { - if let Some(chunk) = match row_action { - RowAction::Append { - op, - left_row, - right_row, - } => builder.append_row(op, left_row, right_row), - RowAction::Update { op, left_row } => { - builder.append_row_update(op, left_row) + } + } + // Insert back the state taken from ht. + self.right_table.insert_back(key.clone(), join_entry); + } else if T == JoinType::LeftOuter { + if let Some(chunk) = builder.append_row_update(op, left_row) { + yield Message::Chunk(chunk); } - } { - yield Message::Chunk(chunk); } } - if let Some(chunk) = builder.take() { yield Message::Chunk(chunk); } } InternalMessage::Barrier(updates, barrier) => { - { - let mut right_table = self.right_table.try_lock().unwrap(); - if let Some(vnodes) = barrier.as_update_vnode_bitmap(self.ctx.id) { - let prev_vnodes = - right_table.source.update_vnode_bitmap(vnodes.clone()); - if cache_may_stale(&prev_vnodes, &vnodes) { - right_table.cache.clear(); - } + if let Some(vnodes) = barrier.as_update_vnode_bitmap(self.ctx.id) { + let prev_vnodes = + self.right_table.source.update_vnode_bitmap(vnodes.clone()); + if cache_may_stale(&prev_vnodes, &vnodes) { + self.right_table.cache.clear(); } - right_table.cache.update_epoch(barrier.epoch.curr); - right_table.update( - updates, - &self.right_join_keys, - &right_stream_key_indices, - )?; } + self.right_table.cache.update_epoch(barrier.epoch.curr); + self.right_table.update( + updates, + &self.right_join_keys, + &right_stream_key_indices, + )?; prev_epoch = Some(barrier.epoch.curr); yield Message::Barrier(barrier) } From 5c0f661984bf1ff8127c942181ccfc2a93cfcf36 Mon Sep 17 00:00:00 2001 From: TennyZhuang Date: Mon, 22 Jan 2024 17:31:34 +0800 Subject: [PATCH 7/8] Reapply "perf(stream): concurrently lookup remote in temporal join" This reverts commit b3269890bced5f72cc81b479cf280e1504a91813. --- src/stream/src/executor/temporal_join.rs | 177 +++++++++++++++++------ 1 file changed, 134 insertions(+), 43 deletions(-) diff --git a/src/stream/src/executor/temporal_join.rs b/src/stream/src/executor/temporal_join.rs index 099abe658f615..d4d7afd7a1ec6 100644 --- a/src/stream/src/executor/temporal_join.rs +++ b/src/stream/src/executor/temporal_join.rs @@ -21,11 +21,11 @@ use std::sync::Arc; use either::Either; use futures::stream::{self, PollNext}; -use futures::{pin_mut, StreamExt, TryStreamExt}; +use futures::{future, pin_mut, StreamExt, TryStreamExt}; use futures_async_stream::try_stream; use local_stats_alloc::{SharedStatsAlloc, StatsAlloc}; use lru::DefaultHasher; -use risingwave_common::array::{Op, StreamChunk}; +use risingwave_common::array::{Op, RowRef, StreamChunk}; use risingwave_common::catalog::Schema; use risingwave_common::estimate_size::{EstimateSize, KvSize}; use risingwave_common::hash::{HashKey, NullBitmap}; @@ -148,9 +148,8 @@ struct TemporalSide { } impl TemporalSide { - /// Lookup the temporal side table and return a `JoinEntry` which could be empty if there are no - /// matched records. - async fn lookup(&mut self, key: &K, epoch: HummockEpoch) -> StreamExecutorResult { + /// Lookup the temporal side table cache and return a `Some(JoinEntry)`. If `None` was returned, then the result can't be found in cache. + fn lookup_cache(&mut self, key: &K) -> Option { let table_id_str = self.source.table_id().to_string(); let actor_id_str = self.ctx.id.to_string(); let fragment_id_str = self.ctx.id.to_string(); @@ -159,10 +158,9 @@ impl TemporalSide { .temporal_join_total_query_cache_count .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str]) .inc(); - - let res = if self.cache.contains(key) { + if self.cache.contains(key) { let mut state = self.cache.peek_mut(key).unwrap(); - state.take() + Some(state.take()) } else { // cache miss self.ctx @@ -170,36 +168,39 @@ impl TemporalSide { .temporal_join_cache_miss_count .with_label_values(&[&table_id_str, &actor_id_str, &fragment_id_str]) .inc(); + None + } + } - let pk_prefix = key.deserialize(&self.join_key_data_types)?; - - let iter = self - .source - .batch_iter_with_pk_bounds( - HummockReadEpoch::NoWait(epoch), - &pk_prefix, - .., - false, - PrefetchOptions::default(), - ) - .await?; - - let mut entry = JoinEntry::default(); - - pin_mut!(iter); - while let Some(row) = iter.next_row().await? { - entry.insert( - row.as_ref() - .project(&self.table_stream_key_indices) - .into_owned_row(), - row.project(&self.table_output_indices).into_owned_row(), - ); - } - - entry - }; + /// Lookup the temporal side table and return a `JoinEntry` which could be empty if there are no + /// matched records. The lookup will bypass the cache, it's better to call [`Self::lookup_cache`] first. + async fn lookup_table(&self, key: &K, epoch: HummockEpoch) -> StreamExecutorResult { + let pk_prefix = key.deserialize(&self.join_key_data_types)?; + + let iter = self + .source + .batch_iter_with_pk_bounds( + HummockReadEpoch::NoWait(epoch), + &pk_prefix, + .., + false, + PrefetchOptions::default(), + ) + .await?; + + let mut entry = JoinEntry::default(); + + pin_mut!(iter); + while let Some(row) = iter.next_row().await? { + entry.insert( + row.as_ref() + .project(&self.table_stream_key_indices) + .into_owned_row(), + row.project(&self.table_output_indices).into_owned_row(), + ); + } - Ok(res) + Ok(entry) } fn update( @@ -427,12 +428,103 @@ impl TemporalJoinExecutor ); let epoch = prev_epoch.expect("Chunk data should come after some barrier."); let keys = K::build(&self.left_join_keys, chunk.data_chunk())?; - for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) { - let Some((op, left_row)) = r else { - continue; - }; - if key.null_bitmap().is_subset(&null_matched) - && let join_entry = self.right_table.lookup(&key, epoch).await? + + // Fetch the local cache. + struct FetchingCacheItem<'a, K> { + op: Op, + left_row: RowRef<'a>, + key: K, + inner: FetchingCacheItemInner, + } + enum FetchingCacheItemInner { + NotSubsetOfNullMatched, + Cached(JoinEntry), + NotFoundInCache, + } + let fetching_cache_items: Vec> = chunk + .rows_with_holes() + .zip_eq_debug(keys.into_iter()) + .filter_map(|(r, key)| { + use FetchingCacheItemInner::*; + let Some((op, left_row)) = r else { + return None; + }; + if !key.null_bitmap().is_subset(&null_matched) { + return Some(FetchingCacheItem { + op, + left_row, + key, + inner: NotSubsetOfNullMatched, + }); + } + let inner = match self.right_table.lookup_cache(&key) { + Some(entry) => Cached(entry), + None => NotFoundInCache, + }; + Some(FetchingCacheItem { + op, + left_row, + key, + inner, + }) + }) + .collect(); + + // Step 2: Fetch remote. + struct FetchingTableItem<'a, K> { + op: Op, + left_row: RowRef<'a>, + key: K, + inner: FetchingTableItemInner, + } + enum FetchingTableItemInner { + NotSubsetOfNullMatched, + Entry(JoinEntry), + } + let fetching_table_futs = fetching_cache_items.into_iter().map( + |FetchingCacheItem { + op, + left_row, + key, + inner, + }| { + let right_table = &self.right_table; + async move { + let inner = match inner { + FetchingCacheItemInner::NotSubsetOfNullMatched => { + Ok(FetchingTableItemInner::NotSubsetOfNullMatched) + as StreamExecutorResult<_> + } + FetchingCacheItemInner::Cached(entry) => { + Ok(FetchingTableItemInner::Entry(entry)) + as StreamExecutorResult<_> + } + FetchingCacheItemInner::NotFoundInCache => right_table + .lookup_table(&key, epoch) + .await + .map(FetchingTableItemInner::Entry), + }; + Ok(FetchingTableItem { + op, + left_row, + key, + inner: inner?, + }) as StreamExecutorResult<_> + } + }, + ); + // Fetching table can be concurrent. + let fetching_table_items: Vec> = + future::try_join_all(fetching_table_futs).await?; + + for FetchingTableItem { + op, + left_row, + key, + inner, + } in fetching_table_items + { + if let FetchingTableItemInner::Entry(join_entry) = inner && !join_entry.is_empty() { for right_row in join_entry.cached.values() { @@ -446,7 +538,6 @@ impl TemporalJoinExecutor } else { true }; - if ok { if let Some(chunk) = builder.append_row(op, left_row, right_row) { From f64f75a8e740b9995e6f72b0f2e935d0da2dfac2 Mon Sep 17 00:00:00 2001 From: TennyZhuang Date: Mon, 22 Jan 2024 17:42:05 +0800 Subject: [PATCH 8/8] Signed-off-by: TennyZhuang