Skip to content

Commit

Permalink
feat(streaming): query global max watermark via storage table (#14591)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Jan 16, 2024
1 parent 1a70c3a commit 222bbd1
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 45 deletions.
27 changes: 27 additions & 0 deletions src/common/src/catalog/physical_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ use std::collections::HashMap;
use anyhow::anyhow;
use fixedbitset::FixedBitSet;
use itertools::Itertools;
use risingwave_pb::catalog::Table;
use risingwave_pb::common::PbColumnOrder;
use risingwave_pb::plan_common::StorageTableDesc;

use super::{ColumnDesc, ColumnId, TableId};
use crate::catalog::hummock::TABLE_OPTION_DUMMY_RETENTION_SECOND;
use crate::catalog::TableOption;
use crate::util::sort_util::ColumnOrder;

/// Includes necessary information for compute node to access data of the table.
Expand Down Expand Up @@ -138,4 +141,28 @@ impl TableDesc {
});
id_to_idx
}

pub fn from_pb_table(table: &Table) -> Self {
let table_options = TableOption::build_table_option(&table.properties);
Self {
table_id: TableId::new(table.id),
pk: table.pk.iter().map(ColumnOrder::from_protobuf).collect(),
columns: table
.columns
.iter()
.map(|col| ColumnDesc::from(col.column_desc.as_ref().unwrap()))
.collect(),
distribution_key: table.distribution_key.iter().map(|i| *i as _).collect(),
stream_key: table.stream_key.iter().map(|i| *i as _).collect(),
vnode_col_index: table.vnode_col_index.map(|i| i as _),
append_only: table.append_only,
retention_seconds: table_options
.retention_seconds
.unwrap_or(TABLE_OPTION_DUMMY_RETENTION_SECOND),
value_indices: table.value_indices.iter().map(|i| *i as _).collect(),
read_prefix_len_hint: table.read_prefix_len_hint as _,
watermark_columns: table.watermark_indices.iter().map(|i| *i as _).collect(),
versioned: table.version.is_some(),
}
}
}
6 changes: 5 additions & 1 deletion src/common/src/hash/consistent_hash/vnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use parse_display::Display;
use crate::array::{Array, ArrayImpl, DataChunk};
use crate::hash::Crc32HashCode;
use crate::row::{Row, RowExt};
use crate::types::{DataType, DatumRef, ScalarRefImpl};
use crate::types::{DataType, Datum, DatumRef, ScalarImpl, ScalarRefImpl};
use crate::util::hash_util::Crc32FastBuilder;
use crate::util::row_id::extract_vnode_id_from_row_id;

Expand Down Expand Up @@ -96,6 +96,10 @@ impl VirtualNode {
self.0 as _
}

pub const fn to_datum(self) -> Datum {
Some(ScalarImpl::Int16(self.to_scalar()))
}

/// Creates a virtual node from the given big-endian bytes representation.
pub const fn from_be_bytes(bytes: [u8; Self::SIZE]) -> Self {
let inner = VirtualNodeInner::from_be_bytes(bytes);
Expand Down
4 changes: 4 additions & 0 deletions src/storage/src/table/batch_table/storage_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,10 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
pub fn table_id(&self) -> TableId {
self.table_id
}

pub fn vnodes(&self) -> &Arc<Bitmap> {
self.distribution.vnodes()
}
}
/// Point get
impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
Expand Down
13 changes: 8 additions & 5 deletions src/storage/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,15 @@ impl TableDistribution {
Self::singleton_vnode_bitmap_ref().clone()
}

pub fn all_vnodes() -> Arc<Bitmap> {
pub fn all_vnodes_ref() -> &'static Arc<Bitmap> {
/// A bitmap that all vnodes are set.
static ALL_VNODES: LazyLock<Arc<Bitmap>> =
LazyLock::new(|| Bitmap::ones(VirtualNode::COUNT).into());
ALL_VNODES.clone()
&ALL_VNODES
}

pub fn all_vnodes() -> Arc<Bitmap> {
Self::all_vnodes_ref().clone()
}

/// Distribution that accesses all vnodes, mainly used for tests.
Expand Down Expand Up @@ -272,10 +276,9 @@ pub fn compute_vnode(row: impl Row, indices: &[usize], vnodes: &Bitmap) -> Virtu
vnode
}

pub fn get_vnode_from_row(row: impl Row, index: usize, _vnodes: &Bitmap) -> VirtualNode {
pub fn get_vnode_from_row(row: impl Row, index: usize, vnodes: &Bitmap) -> VirtualNode {
let vnode = VirtualNode::from_datum(row.datum_at(index));
// TODO: enable this check when `WatermarkFilterExecutor` use `StorageTable` to read global max watermark
// check_vnode_is_set(vnode, vnodes);
check_vnode_is_set(vnode, vnodes);

tracing::debug!(target: "events::storage::storage_table", "get vnode from row: {:?} vnode column index {:?} => {}", row, index, vnode);

Expand Down
146 changes: 107 additions & 39 deletions src/stream/src/executor/watermark_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@
// limitations under the License.

use std::cmp;
use std::ops::Deref;
use std::sync::Arc;

use futures::future::join_all;
use futures::future::{try_join, try_join_all};
use futures::StreamExt;
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
use risingwave_common::hash::VnodeBitmapExt;
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::{DataType, DefaultOrd, ScalarImpl};
use risingwave_common::{bail, row};
Expand All @@ -27,7 +28,10 @@ use risingwave_expr::expr::{
NonStrictExpression,
};
use risingwave_expr::Result as ExprResult;
use risingwave_hummock_sdk::HummockReadEpoch;
use risingwave_pb::expr::expr_node::Type;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::table::TableDistribution;
use risingwave_storage::StateStore;

use super::error::StreamExecutorError;
Expand All @@ -52,6 +56,7 @@ pub struct WatermarkFilterExecutor<S: StateStore> {
/// The column we should generate watermark and filter on.
event_time_col_idx: usize,
table: StateTable<S>,
global_watermark_table: StorageTable<S>,
}

impl<S: StateStore> WatermarkFilterExecutor<S> {
Expand All @@ -62,6 +67,7 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
watermark_expr: NonStrictExpression,
event_time_col_idx: usize,
table: StateTable<S>,
global_watermark_table: StorageTable<S>,
) -> Self {
Self {
ctx,
Expand All @@ -70,6 +76,7 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
watermark_expr,
event_time_col_idx,
table,
global_watermark_table,
}
}
}
Expand Down Expand Up @@ -106,6 +113,7 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
ctx,
info,
mut table,
mut global_watermark_table,
} = *self;

let eval_error_report = ActorEvalErrorReport {
Expand All @@ -126,7 +134,8 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
yield Message::Barrier(first_barrier);

// Initiate and yield the first watermark.
let mut current_watermark = Self::get_global_max_watermark(&table).await?;
let mut current_watermark =
Self::get_global_max_watermark(&table, &global_watermark_table).await?;

let mut last_checkpoint_watermark = None;

Expand Down Expand Up @@ -231,12 +240,19 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
Message::Barrier(barrier) => {
// Update the vnode bitmap for state tables of all agg calls if asked.
if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(ctx.id) {
let other_vnodes_bitmap = Arc::new(
(!(*vnode_bitmap).clone())
& TableDistribution::all_vnodes_ref().deref(),
);
let _ = global_watermark_table.update_vnode_bitmap(other_vnodes_bitmap);
let (previous_vnode_bitmap, _cache_may_stale) =
table.update_vnode_bitmap(vnode_bitmap.clone());

// Take the global max watermark when scaling happens.
if previous_vnode_bitmap != vnode_bitmap {
current_watermark = Self::get_global_max_watermark(&table).await?;
current_watermark =
Self::get_global_max_watermark(&table, &global_watermark_table)
.await?;
}
}

Expand All @@ -262,7 +278,8 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
if idle_input {
// Align watermark
let global_max_watermark =
Self::get_global_max_watermark(&table).await?;
Self::get_global_max_watermark(&table, &global_watermark_table)
.await?;

current_watermark = if let Some(global_max_watermark) =
global_max_watermark.clone()
Expand Down Expand Up @@ -314,29 +331,45 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {
/// If the returned if `Ok(None)`, it means there is no global max watermark.
async fn get_global_max_watermark(
table: &StateTable<S>,
global_watermark_table: &StorageTable<S>,
) -> StreamExecutorResult<Option<ScalarImpl>> {
let watermark_iter_futures = (0..VirtualNode::COUNT).map(|vnode| async move {
let pk = row::once(Some(ScalarImpl::Int16(vnode as _)));
let watermark_row: Option<OwnedRow> = table.get_row(pk).await?;
match watermark_row {
Some(row) => {
if row.len() == 1 {
Ok::<_, StreamExecutorError>(row[0].to_owned())
} else {
bail!("The watermark row should only contains 1 datum");
}
let epoch = table.epoch();
let handle_watermark_row = |watermark_row: Option<OwnedRow>| match watermark_row {
Some(row) => {
if row.len() == 1 {
Ok::<_, StreamExecutorError>(row[0].to_owned())
} else {
bail!("The watermark row should only contains 1 datum");
}
_ => Ok(None),
}
_ => Ok(None),
};
let global_watermark_iter_futures =
global_watermark_table
.vnodes()
.iter_vnodes()
.map(|vnode| async move {
let pk = row::once(vnode.to_datum());
let watermark_row: Option<OwnedRow> = global_watermark_table
.get_row(pk, HummockReadEpoch::NoWait(epoch))
.await?;
handle_watermark_row(watermark_row)
});
let local_watermark_iter_futures = table.vnodes().iter_vnodes().map(|vnode| async move {
let pk = row::once(vnode.to_datum());
let watermark_row: Option<OwnedRow> = table.get_row(pk).await?;
handle_watermark_row(watermark_row)
});
let watermarks: Vec<_> = join_all(watermark_iter_futures)
.await
.into_iter()
.try_collect()?;
let (global_watermarks, local_watermarks) = try_join(
try_join_all(global_watermark_iter_futures),
try_join_all(local_watermark_iter_futures),
)
.await?;

// Return the minimal value if the remote max watermark is Null.
let watermark = watermarks
let watermark = global_watermarks
.into_iter()
.chain(local_watermarks.into_iter())
.flatten()
.max_by(DefaultOrd::default_cmp);

Expand All @@ -346,11 +379,15 @@ impl<S: StateStore> WatermarkFilterExecutor<S> {

#[cfg(test)]
mod tests {
use itertools::Itertools;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId};
use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableDesc};
use risingwave_common::test_prelude::StreamChunkTestExt;
use risingwave_common::types::Date;
use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::catalog::Table;
use risingwave_pb::common::ColumnOrder;
use risingwave_pb::plan_common::PbColumnCatalog;
use risingwave_storage::memory::MemoryStateStore;
use risingwave_storage::table::TableDistribution;

Expand All @@ -368,24 +405,54 @@ mod tests {
pk_indices: &[usize],
val_indices: &[usize],
table_id: u32,
) -> StateTable<MemoryStateStore> {
let column_descs = data_types
.iter()
.enumerate()
.map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
.collect_vec();
) -> (StorageTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
let table = Table {
id: table_id,
columns: data_types
.iter()
.enumerate()
.map(|(id, data_type)| PbColumnCatalog {
column_desc: Some(
ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone())
.to_protobuf(),
),
is_hidden: false,
})
.collect(),
pk: pk_indices
.iter()
.zip_eq(order_types.iter())
.map(|(pk, order)| ColumnOrder {
column_index: *pk as _,
order_type: Some(order.to_protobuf()),
})
.collect(),
distribution_key: vec![],
stream_key: vec![0],
append_only: false,
vnode_col_index: Some(0),
value_indices: val_indices.iter().map(|i| *i as _).collect(),
read_prefix_len_hint: 0,
..Default::default()
};

// TODO: use consistent operations for watermark filter after we have upsert.
StateTable::new_with_distribution_inconsistent_op(
mem_state,
TableId::new(table_id),
column_descs,
order_types.to_vec(),
pk_indices.to_vec(),
TableDistribution::all(vec![0]),
Some(val_indices.to_vec()),
let state_table = StateTable::from_table_catalog_inconsistent_op(
&table,
mem_state.clone(),
Some(TableDistribution::all_vnodes()),
)
.await
.await;

let desc = TableDesc::from_pb_table(&table).try_to_protobuf().unwrap();

let storage_table = StorageTable::new_partial(
mem_state,
val_indices.iter().map(|i| ColumnId::new(*i as _)).collect(),
Some(TableDistribution::all_vnodes()),
&desc,
);
(storage_table, state_table)
}

async fn create_watermark_filter_executor(
Expand All @@ -400,7 +467,7 @@ mod tests {

let watermark_expr = build_from_pretty("(subtract:timestamp $1:timestamp 1day:interval)");

let table = create_in_memory_state_table(
let (storage_table, table) = create_in_memory_state_table(
mem_state,
&[DataType::Int16, WATERMARK_TYPE],
&[OrderType::ascending()],
Expand All @@ -426,6 +493,7 @@ mod tests {
watermark_expr,
1,
table,
storage_table,
)
.boxed(),
tx,
Expand Down
Loading

0 comments on commit 222bbd1

Please sign in to comment.