Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batch): support system column _rw_timestamp for tables #19232

Merged
merged 27 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions e2e_test/batch/basic/rw_timestamp.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table t (a int, id int primary key);

statement ok
create index idx on t(a);

statement ok
insert into t values (1, 1), (2, 2);

query ?? rowsort
select ABS(EXTRACT(EPOCH FROM (_rw_timestamp - now()))) < 2, a, id from t;
----
t 1 1
t 2 2

sleep 3s

statement ok
update t set a = 11 where id = 1;

query ?? rowsort
select ABS(EXTRACT(EPOCH FROM (_rw_timestamp - now()))) < 2, a, id from t;
----
f 2 2
t 11 1

query ?? rowsort
select ABS(EXTRACT(EPOCH FROM (_rw_timestamp - now()))) < 2, a, id from t where id = 1;
----
t 11 1

query ?? rowsort
select ABS(EXTRACT(EPOCH FROM (_rw_timestamp - now()))) < 2, a, id from t where a = 11;
----
t 11 1

statement ok
delete from t;

statement ok
drop table t;
3 changes: 3 additions & 0 deletions e2e_test/ddl/show.slt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ v1 integer false turpis vehicula
v2 integer false Lorem ipsum dolor sit amet
v3 integer false NULL
_row_id serial true consectetur adipiscing elit
_rw_timestamp timestamp with time zone true NULL
primary key _row_id NULL NULL
distribution key _row_id NULL NULL
table description t3 NULL volutpat vitae
Expand All @@ -37,6 +38,7 @@ v1 integer false turpis vehicula
v2 integer false Lorem ipsum dolor sit amet
v3 integer false NULL
_row_id serial true consectetur adipiscing elit
_rw_timestamp timestamp with time zone true NULL

statement ok
create index idx1 on t3 (v1,v2);
Expand Down Expand Up @@ -65,6 +67,7 @@ v1 integer false Nemo enim ipsam
v2 integer false NULL
v3 integer false NULL
_row_id serial true NULL
_rw_timestamp timestamp with time zone true NULL
primary key _row_id NULL NULL
distribution key _row_id NULL NULL
idx1 index(v1 ASC, v2 ASC) include(v3) distributed by(v1) NULL NULL
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/join/distributed_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ impl<S: StateStore> LookupExecutorBuilder for InnerSideExecutorBuilder<S> {

let pk_prefix = OwnedRow::new(scan_range.eq_conds);

if self.lookup_prefix_len == self.table.pk_indices().len() {
if self.lookup_prefix_len == self.table.pk_indices().len() && !self.table.has_epoch_idx() {
let row = self.table.get_row(&pk_prefix, self.epoch.into()).await?;

if let Some(row) = row {
Expand Down
36 changes: 29 additions & 7 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,17 +396,39 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
) -> Result<Option<OwnedRow>> {
let pk_prefix = scan_range.pk_prefix;
assert!(pk_prefix.len() == table.pk_indices().len());

let timer = histogram.as_ref().map(|histogram| histogram.start_timer());

// Point Get.
let row = table.get_row(&pk_prefix, epoch.into()).await?;
if table.has_epoch_idx() {
// has epoch_idx means we need to select `_rw_timestamp` column which is unsupported by `get_row` interface, so use iterator interface instead.
let range_bounds = (Bound::<OwnedRow>::Unbounded, Bound::Unbounded);
let iter = table
.batch_chunk_iter_with_pk_bounds(
epoch.into(),
&pk_prefix,
range_bounds,
false,
1,
PrefetchOptions::new(false, false),
)
.await?;
pin_mut!(iter);
let chunk = iter.next().await.transpose().map_err(BatchError::from)?;
if let Some(chunk) = chunk {
let row = chunk.row_at(0).0.to_owned_row();
Ok(Some(row))
} else {
Ok(None)
}
Comment on lines +405 to +421
Copy link
Contributor

@kwannoel kwannoel Nov 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactoring this into a separate method like get_row_with_rw_timestamp, and calling it here seems more readable.

} else {
// Point Get.
let row = table.get_row(&pk_prefix, epoch.into()).await?;

if let Some(timer) = timer {
timer.observe_duration()
}
if let Some(timer) = timer {
timer.observe_duration()
}
chenzl25 marked this conversation as resolved.
Show resolved Hide resolved

Ok(row)
Ok(row)
}
}

#[try_stream(ok = DataChunk, error = BatchError)]
Expand Down
17 changes: 16 additions & 1 deletion src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use risingwave_pb::plan_common::{
AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, PbColumnCatalog, PbColumnDesc,
};

use super::{row_id_column_desc, USER_COLUMN_ID_OFFSET};
use super::{row_id_column_desc, rw_timestamp_column_desc, USER_COLUMN_ID_OFFSET};
use crate::catalog::{cdc_table_name_column_desc, offset_column_desc, Field, ROW_ID_COLUMN_ID};
use crate::types::DataType;
use crate::util::value_encoding::DatumToProtoExt;
Expand Down Expand Up @@ -372,6 +372,10 @@ impl ColumnCatalog {
self.column_desc.is_generated()
}

pub fn can_dml(&self) -> bool {
!self.is_generated() && !self.is_rw_timestamp_column()
}

/// If the column is a generated column
pub fn generated_expr(&self) -> Option<&ExprNode> {
if let Some(GeneratedOrDefaultColumn::GeneratedColumn(desc)) =
Expand Down Expand Up @@ -424,6 +428,17 @@ impl ColumnCatalog {
}
}

pub fn rw_timestamp_column() -> Self {
Self {
column_desc: rw_timestamp_column_desc(),
is_hidden: true,
}
}

pub fn is_rw_timestamp_column(&self) -> bool {
self == &Self::rw_timestamp_column()
}

pub fn offset_column() -> Self {
Self {
column_desc: offset_column_desc(),
Expand Down
10 changes: 10 additions & 0 deletions src/common/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,16 @@ pub fn row_id_column_desc() -> ColumnDesc {
ColumnDesc::named(ROWID_PREFIX, ROW_ID_COLUMN_ID, DataType::Serial)
}

pub const RW_TIMESTAMP_COLUMN_NAME: &str = "_rw_timestamp";
pub const RW_TIMESTAMP_COLUMN_ID: ColumnId = ColumnId::new(-1);
pub fn rw_timestamp_column_desc() -> ColumnDesc {
ColumnDesc::named(
RW_TIMESTAMP_COLUMN_NAME,
RW_TIMESTAMP_COLUMN_ID,
DataType::Timestamptz,
)
}

pub const OFFSET_COLUMN_NAME: &str = "_rw_offset";

// The number of columns output by the cdc source job
Expand Down
22 changes: 22 additions & 0 deletions src/frontend/planner_test/tests/testdata/input/rw_timestamp.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
- sql: |
create table t (a int);
select _rw_timestamp from t;
expected_outputs:
- logical_plan
- batch_plan
- stream_error
- sql: |
create table t (a int);
select t.*, _rw_timestamp from t;
expected_outputs:
- logical_plan
- batch_plan
- stream_error
- sql: |
create table t (a int);
create index idx on t(a);
select _rw_timestamp from t where a = 1;
expected_outputs:
- logical_plan
- batch_plan
- stream_error
Loading
Loading