diff --git a/src/storage/src/table/mod.rs b/src/storage/src/table/mod.rs index 40545aae1175b..59e159bb5ca70 100644 --- a/src/storage/src/table/mod.rs +++ b/src/storage/src/table/mod.rs @@ -233,6 +233,10 @@ impl> KeyedRow { self.vnode_prefixed_key.key_part() } + pub fn row(&self) -> &OwnedRow { + &self.row + } + pub fn into_parts(self) -> (TableKey, OwnedRow) { (self.vnode_prefixed_key, self.row) } diff --git a/src/stream/src/executor/hash_join.rs b/src/stream/src/executor/hash_join.rs index 838d4cf5e1fda..b3ad2f2471cd6 100644 --- a/src/stream/src/executor/hash_join.rs +++ b/src/stream/src/executor/hash_join.rs @@ -480,29 +480,32 @@ impl HashJoinExecutor HashJoinExecutor HashJoinExecutor HashJoinExecutor HashJoinExecutor HashJoinExecutor HashJoinExecutor Result<&mut StateValueType, JoinEntryError> { self.kv_heap_size.add(&key, &value); - self.cached.try_insert(key, value).unwrap(); + self.cached + .try_insert(key, value) + .map_err(|_| JoinEntryError::OccupiedError) } /// Delete from the cache. - pub fn remove(&mut self, pk: PkType) { + pub fn remove(&mut self, pk: PkType) -> Result<(), JoinEntryError> { if let Some(value) = self.cached.remove(&pk) { self.kv_heap_size.sub(&pk, &value); + Ok(()) } else { - panic!("pk {:?} should be in the cache", pk); + Err(JoinEntryError::RemoveError) } } @@ -98,7 +114,7 @@ mod tests { // Pk is only a `i64` here, so encoding method does not matter. let pk = OwnedRow::new(pk).project(&value_indices).value_serialize(); let join_row = JoinRow { row, degree: 0 }; - managed_state.insert(pk, join_row.encode()); + managed_state.insert(pk, join_row.encode()).unwrap(); } } diff --git a/src/stream/src/executor/managed_state/join/mod.rs b/src/stream/src/executor/managed_state/join/mod.rs index 90839df2e3792..ff20e5346fc34 100644 --- a/src/stream/src/executor/managed_state/join/mod.rs +++ b/src/stream/src/executor/managed_state/join/mod.rs @@ -19,6 +19,7 @@ use std::alloc::Global; use std::ops::{Bound, Deref, DerefMut}; use std::sync::Arc; +use anyhow::Context; use futures::future::try_join; use futures::StreamExt; use futures_async_stream::for_await; @@ -267,7 +268,10 @@ pub struct JoinHashMap { } struct TableInner { + /// Indices of the (cache) pk in a state row pk_indices: Vec, + /// Indices of the join key in a state row + join_key_indices: Vec, // This should be identical to the pk in state table. order_key_indices: Vec, // This should be identical to the data types in table schema. @@ -276,15 +280,31 @@ struct TableInner { pub(crate) table: StateTable, } +impl TableInner { + fn error_context(&self, row: &impl Row) -> String { + let pk = row.project(&self.pk_indices); + let jk = row.project(&self.join_key_indices); + format!( + "join key: {}, pk: {}, row: {}, state_table_id: {}", + jk.display(), + pk.display(), + row.display(), + self.table.table_id() + ) + } +} + impl JoinHashMap { /// Create a [`JoinHashMap`] with the given LRU capacity. #[allow(clippy::too_many_arguments)] pub fn new( watermark_epoch: AtomicU64Ref, join_key_data_types: Vec, + state_join_key_indices: Vec, state_all_data_types: Vec, state_table: StateTable, state_pk_indices: Vec, + degree_join_key_indices: Vec, degree_all_data_types: Vec, degree_table: StateTable, degree_pk_indices: Vec, @@ -311,6 +331,7 @@ impl JoinHashMap { let degree_table_id = degree_table.table_id(); let state = TableInner { pk_indices: state_pk_indices, + join_key_indices: state_join_key_indices, order_key_indices: state_table.pk_indices().to_vec(), all_data_types: state_all_data_types, table: state_table, @@ -318,6 +339,7 @@ impl JoinHashMap { let degree_state = TableInner { pk_indices: degree_pk_indices, + join_key_indices: degree_join_key_indices, order_key_indices: degree_table.pk_indices().to_vec(), all_data_types: degree_all_data_types, table: degree_table, @@ -445,10 +467,12 @@ impl JoinHashMap { let degree_i64 = degree_row .datum_at(degree_row.len() - 1) .expect("degree should not be NULL"); - entry_state.insert( - pk, - JoinRow::new(row.into_owned_row(), degree_i64.into_int64() as u64).encode(), - ); + entry_state + .insert( + pk, + JoinRow::new(row.row(), degree_i64.into_int64() as u64).encode(), + ) + .with_context(|| self.state.error_context(row.row()))?; } } else { let sub_range: &(Bound, Bound) = @@ -466,7 +490,9 @@ impl JoinHashMap { .as_ref() .project(&self.state.pk_indices) .memcmp_serialize(&self.pk_serializer); - entry_state.insert(pk, JoinRow::new(row.into_owned_row(), 0).encode()); + entry_state + .insert(pk, JoinRow::new(row.row(), 0).encode()) + .with_context(|| self.state.error_context(row.row()))?; } }; @@ -498,12 +524,16 @@ impl JoinHashMap { if self.inner.contains(key) { // Update cache let mut entry = self.inner.get_mut(key).unwrap(); - entry.insert(pk, value.encode()); + entry + .insert(pk, value.encode()) + .with_context(|| self.state.error_context(&value.row))?; } else if self.pk_contained_in_jk { // Refill cache when the join key exist in neither cache or storage. self.metrics.insert_cache_miss_count += 1; let mut state = JoinEntryState::default(); - state.insert(pk, value.encode()); + state + .insert(pk, value.encode()) + .with_context(|| self.state.error_context(&value.row))?; self.update_state(key, state.into()); } @@ -528,12 +558,16 @@ impl JoinHashMap { if self.inner.contains(key) { // Update cache let mut entry = self.inner.get_mut(key).unwrap(); - entry.insert(pk, join_row.encode()); + entry + .insert(pk, join_row.encode()) + .with_context(|| self.state.error_context(&value))?; } else if self.pk_contained_in_jk { // Refill cache when the join key exist in neither cache or storage. self.metrics.insert_cache_miss_count += 1; let mut state = JoinEntryState::default(); - state.insert(pk, join_row.encode()); + state + .insert(pk, join_row.encode()) + .with_context(|| self.state.error_context(&value))?; self.update_state(key, state.into()); } @@ -543,32 +577,38 @@ impl JoinHashMap { } /// Delete a join row - pub fn delete(&mut self, key: &K, value: JoinRow) { + pub fn delete(&mut self, key: &K, value: JoinRow) -> StreamExecutorResult<()> { if let Some(mut entry) = self.inner.get_mut(key) { let pk = (&value.row) .project(&self.state.pk_indices) .memcmp_serialize(&self.pk_serializer); - entry.remove(pk); + entry + .remove(pk) + .with_context(|| self.state.error_context(&value.row))?; } // If no cache maintained, only update the state table. let (row, degree) = value.to_table_rows(&self.state.order_key_indices); self.state.table.delete(row); self.degree_state.table.delete(degree); + Ok(()) } /// Delete a row /// Used when the side does not need to update degree. - pub fn delete_row(&mut self, key: &K, value: impl Row) { + pub fn delete_row(&mut self, key: &K, value: impl Row) -> StreamExecutorResult<()> { if let Some(mut entry) = self.inner.get_mut(key) { let pk = (&value) .project(&self.state.pk_indices) .memcmp_serialize(&self.pk_serializer); - entry.remove(pk); + entry + .remove(pk) + .with_context(|| self.state.error_context(&value))?; } // If no cache maintained, only update the state table. self.state.table.delete(value); + Ok(()) } /// Update a [`JoinEntryState`] into the hash table.