Skip to content

Commit

Permalink
refactor(stream): remove TopNStateError (#3142)
Browse files Browse the repository at this point in the history
  • Loading branch information
lmatz authored Jun 12, 2022
1 parent f9ddfbc commit 3acee84
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 117 deletions.
8 changes: 0 additions & 8 deletions src/stream/src/executor/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ enum StreamExecutorErrorInner {
#[error("Serialize/deserialize error: {0}")]
SerdeError(BoxedError),

// TODO: remove this
#[error("TopN state error: {0}")]
TopNStateError(RwError),

// TODO: remove this
#[error("Hash join error: {0}")]
HashJoinError(RwError),
Expand Down Expand Up @@ -78,10 +74,6 @@ impl StreamExecutorError {
StreamExecutorErrorInner::SerdeError(error.into()).into()
}

pub fn top_n_state_error(error: impl Into<RwError>) -> Self {
StreamExecutorErrorInner::TopNStateError(error.into()).into()
}

pub fn hash_join_error(error: impl Into<RwError>) -> Self {
StreamExecutorErrorInner::HashJoinError(error.into()).into()
}
Expand Down
26 changes: 18 additions & 8 deletions src/stream/src/executor/managed_state/top_n/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ mod top_n_state;

use bytes::Bytes;
use risingwave_common::array::Row;
use risingwave_common::error::Result;
use risingwave_common::util::ordered::{OrderedRow, OrderedRowDeserializer};
use risingwave_storage::cell_based_row_deserializer::CellBasedRowDeserializer;
use risingwave_storage::StateStoreIter;
pub use top_n_bottom_n_state::ManagedTopNBottomNState;
pub use top_n_state::ManagedTopNState;

use crate::executor::error::{StreamExecutorError, StreamExecutorResult};

pub mod variants {
pub const TOP_N_MIN: usize = 0;
pub const TOP_N_MAX: usize = 1;
Expand All @@ -32,12 +33,14 @@ pub mod variants {
fn deserialize_pk<const TOP_N_TYPE: usize>(
pk_buf: &mut [u8],
ordered_row_deserializer: &mut OrderedRowDeserializer,
) -> Result<OrderedRow> {
) -> StreamExecutorResult<OrderedRow> {
if TOP_N_TYPE == variants::TOP_N_MAX {
pk_buf.iter_mut().for_each(|byte| *byte = !*byte);
}
// We just encounter the start of a new row, so we finalize the previous one.
let pk = ordered_row_deserializer.deserialize(pk_buf)?;
let pk = ordered_row_deserializer
.deserialize(pk_buf)
.map_err(StreamExecutorError::serde_error)?;
Ok(pk)
}

Expand All @@ -62,13 +65,19 @@ impl<'a, I: StateStoreIter<Item = (Bytes, Bytes)>, const TOP_N_TYPE: usize>
}
}

async fn deserialize_bytes_to_pk_and_row(&mut self) -> Result<Option<(OrderedRow, Row)>> {
async fn deserialize_bytes_to_pk_and_row(
&mut self,
) -> StreamExecutorResult<Option<(OrderedRow, Row)>> {
while let Some((key, value)) = self.iter.next().await? {
let pk_buf_and_row = self.cell_based_row_deserializer.deserialize(&key, &value)?;
let pk_buf_and_row = self
.cell_based_row_deserializer
.deserialize(&key, &value)
.map_err(StreamExecutorError::serde_error)?;
match pk_buf_and_row {
Some((mut pk_buf, row)) => {
let pk =
deserialize_pk::<TOP_N_TYPE>(&mut pk_buf, self.ordered_row_deserializer)?;
deserialize_pk::<TOP_N_TYPE>(&mut pk_buf, self.ordered_row_deserializer)
.map_err(StreamExecutorError::serde_error)?;
return Ok(Some((pk, row)));
}
None => {}
Expand All @@ -81,14 +90,15 @@ impl<'a, I: StateStoreIter<Item = (Bytes, Bytes)>, const TOP_N_TYPE: usize>
let pk_buf_and_row = self.cell_based_row_deserializer.take();
if let Some(mut pk_buf_and_row) = pk_buf_and_row {
let pk =
deserialize_pk::<TOP_N_TYPE>(&mut pk_buf_and_row.0, self.ordered_row_deserializer)?;
deserialize_pk::<TOP_N_TYPE>(&mut pk_buf_and_row.0, self.ordered_row_deserializer)
.map_err(StreamExecutorError::serde_error)?;
Ok(Some((pk, pk_buf_and_row.1)))
} else {
Ok(None)
}
}

pub async fn next(&mut self) -> Result<Option<(OrderedRow, Row)>> {
pub async fn next(&mut self) -> StreamExecutorResult<Option<(OrderedRow, Row)>> {
let pk_and_row = self.deserialize_bytes_to_pk_and_row().await?;
Ok(pk_and_row)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use std::vec::Drain;
use madsim::collections::BTreeMap;
use risingwave_common::array::Row;
use risingwave_common::catalog::ColumnId;
use risingwave_common::error::Result;
use risingwave_common::types::DataType;
use risingwave_common::util::ordered::*;
use risingwave_storage::cell_based_row_deserializer::CellBasedRowDeserializer;
Expand All @@ -30,6 +29,7 @@ use risingwave_storage::{Keyspace, StateStore};
use super::super::flush_status::BtreeMapFlushStatus as FlushStatus;
use super::variants::TOP_N_MIN;
use super::PkAndRowIterator;
use crate::executor::error::{StreamExecutorError, StreamExecutorResult};

/// This state is used for `[offset, offset+limit)` part in the `TopNExecutor`.
///
Expand Down Expand Up @@ -114,7 +114,10 @@ impl<S: StateStore> ManagedTopNBottomNState<S> {
}
}

pub async fn pop_top_element(&mut self, epoch: u64) -> Result<Option<(OrderedRow, Row)>> {
pub async fn pop_top_element(
&mut self,
epoch: u64,
) -> StreamExecutorResult<Option<(OrderedRow, Row)>> {
if self.total_count == 0 {
Ok(None)
} else {
Expand All @@ -129,7 +132,10 @@ impl<S: StateStore> ManagedTopNBottomNState<S> {
}
}

pub async fn pop_bottom_element(&mut self, epoch: u64) -> Result<Option<(OrderedRow, Row)>> {
pub async fn pop_bottom_element(
&mut self,
epoch: u64,
) -> StreamExecutorResult<Option<(OrderedRow, Row)>> {
if self.total_count == 0 {
Ok(None)
} else {
Expand Down Expand Up @@ -187,7 +193,11 @@ impl<S: StateStore> ManagedTopNBottomNState<S> {
self.total_count += 1;
}

pub async fn delete(&mut self, key: &OrderedRow, epoch: u64) -> Result<Option<Row>> {
pub async fn delete(
&mut self,
key: &OrderedRow,
epoch: u64,
) -> StreamExecutorResult<Option<Row>> {
let prev_top_n_entry = self.top_n.remove(key);
let prev_bottom_n_entry = self.bottom_n.remove(key);
FlushStatus::do_delete(self.flush_buffer.entry(key.clone()));
Expand All @@ -205,7 +215,7 @@ impl<S: StateStore> ManagedTopNBottomNState<S> {
}

/// The same as the one in `ManagedTopNState`.
pub async fn scan_and_merge(&mut self, epoch: u64) -> Result<()> {
pub async fn scan_and_merge(&mut self, epoch: u64) -> StreamExecutorResult<()> {
let iter = self.keyspace.iter(epoch).await?;
let mut pk_and_row_iter = PkAndRowIterator::<_, TOP_N_MIN>::new(
iter,
Expand Down Expand Up @@ -264,7 +274,7 @@ impl<S: StateStore> ManagedTopNBottomNState<S> {

/// We can fill in the cache from storage only when state is not dirty, i.e. right after
/// `flush`.
pub async fn fill_in_cache(&mut self, epoch: u64) -> Result<()> {
pub async fn fill_in_cache(&mut self, epoch: u64) -> StreamExecutorResult<()> {
debug_assert!(!self.is_dirty());
let mut pk_and_row_iter = PkAndRowIterator::<_, TOP_N_MIN>::new(
self.keyspace.iter(epoch).await?,
Expand All @@ -280,7 +290,7 @@ impl<S: StateStore> ManagedTopNBottomNState<S> {

/// `Flush` can be called by the executor when it receives a barrier and thus needs to
/// checkpoint.
pub async fn flush(&mut self, epoch: u64) -> Result<()> {
pub async fn flush(&mut self, epoch: u64) -> StreamExecutorResult<()> {
if !self.is_dirty() {
// We don't retain `n` elements as we have a all-or-nothing policy for now.
return Ok(());
Expand All @@ -296,7 +306,8 @@ impl<S: StateStore> ManagedTopNBottomNState<S> {
let column_ids = (0..self.data_types.len() as i32)
.map(ColumnId::from)
.collect::<Vec<_>>();
let bytes = serialize_pk_and_row_state(&pk_buf, &row, &column_ids)?;
let bytes = serialize_pk_and_row_state(&pk_buf, &row, &column_ids)
.map_err(StreamExecutorError::serde_error)?;
for (key, value) in bytes {
match value {
// TODO(Yuanxin): Implement value meta
Expand Down
31 changes: 22 additions & 9 deletions src/stream/src/executor/managed_state/top_n/top_n_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::cmp::Ordering;
use madsim::collections::BTreeMap;
use risingwave_common::array::Row;
use risingwave_common::catalog::ColumnId;
use risingwave_common::error::Result;
use risingwave_common::types::DataType;
use risingwave_common::util::ordered::*;
use risingwave_storage::cell_based_row_deserializer::CellBasedRowDeserializer;
Expand All @@ -27,6 +26,7 @@ use risingwave_storage::{Keyspace, StateStore};
use super::super::flush_status::BtreeMapFlushStatus as FlushStatus;
use super::variants::*;
use super::PkAndRowIterator;
use crate::executor::error::{StreamExecutorError, StreamExecutorResult};

/// This state is used for several ranges (e.g `[0, offset)`, `[offset+limit, +inf)` of elements in
/// the `AppendOnlyTopNExecutor` and `TopNExecutor`. For these ranges, we only care about one of the
Expand Down Expand Up @@ -104,7 +104,10 @@ impl<S: StateStore, const TOP_N_TYPE: usize> ManagedTopNState<S, TOP_N_TYPE> {
}
}

pub async fn pop_top_element(&mut self, epoch: u64) -> Result<Option<(OrderedRow, Row)>> {
pub async fn pop_top_element(
&mut self,
epoch: u64,
) -> StreamExecutorResult<Option<(OrderedRow, Row)>> {
if self.total_count == 0 {
Ok(None)
} else {
Expand Down Expand Up @@ -146,7 +149,12 @@ impl<S: StateStore, const TOP_N_TYPE: usize> ManagedTopNState<S, TOP_N_TYPE> {
}
}

pub async fn insert(&mut self, key: OrderedRow, value: Row, _epoch: u64) -> Result<()> {
pub async fn insert(
&mut self,
key: OrderedRow,
value: Row,
_epoch: u64,
) -> StreamExecutorResult<()> {
let have_key_on_storage = self.total_count > self.top_n.len();
let need_to_flush = if have_key_on_storage {
// It is impossible that the cache is empty.
Expand Down Expand Up @@ -176,7 +184,7 @@ impl<S: StateStore, const TOP_N_TYPE: usize> ManagedTopNState<S, TOP_N_TYPE> {
///
/// This function scans kv pairs from the storage, and properly deal with them
/// according to the flush buffer.
pub async fn scan_and_merge(&mut self, epoch: u64) -> Result<()> {
pub async fn scan_and_merge(&mut self, epoch: u64) -> StreamExecutorResult<()> {
// For a key scanned from the storage,
// 1. Not touched by flush buffer. Do nothing.
// 2. Deleted by flush buffer. Do not go into cache.
Expand Down Expand Up @@ -283,7 +291,11 @@ impl<S: StateStore, const TOP_N_TYPE: usize> ManagedTopNState<S, TOP_N_TYPE> {
Ok(())
}

pub async fn delete(&mut self, key: &OrderedRow, epoch: u64) -> Result<Option<Row>> {
pub async fn delete(
&mut self,
key: &OrderedRow,
epoch: u64,
) -> StreamExecutorResult<Option<Row>> {
let prev_entry = self.top_n.remove(key);
FlushStatus::do_delete(self.flush_buffer.entry(key.clone()));
self.total_count -= 1;
Expand All @@ -301,7 +313,7 @@ impl<S: StateStore, const TOP_N_TYPE: usize> ManagedTopNState<S, TOP_N_TYPE> {
/// We don't need to care about whether `self.top_n` is empty or not as the key is unique.
/// An element with duplicated key scanned from the storage would just override the element with
/// the same key in the cache, and their value must be the same.
pub async fn fill_in_cache(&mut self, epoch: u64) -> Result<()> {
pub async fn fill_in_cache(&mut self, epoch: u64) -> StreamExecutorResult<()> {
debug_assert!(!self.is_dirty());
let iter = self.keyspace.iter(epoch).await?;
let mut pk_and_row_iter = PkAndRowIterator::<_, TOP_N_TYPE>::new(
Expand All @@ -325,7 +337,7 @@ impl<S: StateStore, const TOP_N_TYPE: usize> ManagedTopNState<S, TOP_N_TYPE> {
&mut self,
iterator: impl Iterator<Item = (OrderedRow, FlushStatus<Row>)>,
epoch: u64,
) -> Result<()> {
) -> StreamExecutorResult<()> {
let mut write_batch = self.keyspace.state_store().start_write_batch();
let mut local = write_batch.prefixify(&self.keyspace);
for (pk, cells) in iterator {
Expand All @@ -338,7 +350,8 @@ impl<S: StateStore, const TOP_N_TYPE: usize> ManagedTopNState<S, TOP_N_TYPE> {
let column_ids = (0..self.data_types.len() as i32)
.map(ColumnId::from)
.collect::<Vec<_>>();
let bytes = serialize_pk_and_row_state(&pk_buf, &row, &column_ids)?;
let bytes = serialize_pk_and_row_state(&pk_buf, &row, &column_ids)
.map_err(StreamExecutorError::serde_error)?;
for (key, value) in bytes {
match value {
// TODO(Yuanxin): Implement value meta
Expand All @@ -356,7 +369,7 @@ impl<S: StateStore, const TOP_N_TYPE: usize> ManagedTopNState<S, TOP_N_TYPE> {
///
/// TODO: `Flush` should also be called internally when `top_n` and `flush_buffer` exceeds
/// certain limit.
pub async fn flush(&mut self, epoch: u64) -> Result<()> {
pub async fn flush(&mut self, epoch: u64) -> StreamExecutorResult<()> {
if !self.is_dirty() {
self.retain_top_n();
return Ok(());
Expand Down
Loading

0 comments on commit 3acee84

Please sign in to comment.