diff --git a/e2e_test/batch/basic/rw_timestamp.slt.part b/e2e_test/batch/basic/rw_timestamp.slt.part index a3280c9ec8197..ce3427f786ac7 100644 --- a/e2e_test/batch/basic/rw_timestamp.slt.part +++ b/e2e_test/batch/basic/rw_timestamp.slt.part @@ -37,5 +37,8 @@ select ABS(EXTRACT(EPOCH FROM (_rw_timestamp - now()))) < 2, a, id from t where ---- t 11 1 +statement ok +delete from t; + statement ok drop table t; diff --git a/src/batch/src/executor/join/distributed_lookup_join.rs b/src/batch/src/executor/join/distributed_lookup_join.rs index 0a328e2f985f1..139717e06af36 100644 --- a/src/batch/src/executor/join/distributed_lookup_join.rs +++ b/src/batch/src/executor/join/distributed_lookup_join.rs @@ -355,7 +355,7 @@ impl LookupExecutorBuilder for InnerSideExecutorBuilder { let pk_prefix = OwnedRow::new(scan_range.eq_conds); - if self.lookup_prefix_len == self.table.pk_indices().len() { + if self.lookup_prefix_len == self.table.pk_indices().len() && !self.table.has_epoch_idx() { let row = self.table.get_row(&pk_prefix, self.epoch.into()).await?; if let Some(row) = row { diff --git a/src/batch/src/executor/row_seq_scan.rs b/src/batch/src/executor/row_seq_scan.rs index 25d8ecf82883b..1e0fa6318a2cd 100644 --- a/src/batch/src/executor/row_seq_scan.rs +++ b/src/batch/src/executor/row_seq_scan.rs @@ -20,7 +20,7 @@ use itertools::Itertools; use prometheus::Histogram; use risingwave_common::array::DataChunk; use risingwave_common::bitmap::Bitmap; -use risingwave_common::catalog::{ColumnId, Schema, RW_TIMESTAMP_COLUMN_NAME}; +use risingwave_common::catalog::{ColumnId, Schema}; use risingwave_common::hash::VnodeCountCompat; use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::types::{DataType, Datum}; @@ -322,11 +322,6 @@ impl RowSeqScanExecutor { assert_eq!(scan_ranges.len(), 1); } - let select_rw_timestamp = - table.schema().fields().iter().any(|x| { - x.name == RW_TIMESTAMP_COLUMN_NAME && x.data_type == DataType::Timestamptz - }); - let (point_gets, range_scans): (Vec, Vec) = scan_ranges .into_iter() .partition(|x| x.pk_prefix.len() == table.pk_indices().len()); @@ -342,14 +337,8 @@ impl RowSeqScanExecutor { // Point Get for point_get in point_gets { let table = table.clone(); - if let Some(row) = Self::execute_point_get( - table, - point_get, - query_epoch, - select_rw_timestamp, - histogram, - ) - .await? + if let Some(row) = + Self::execute_point_get(table, point_get, query_epoch, histogram).await? { if let Some(chunk) = data_chunk_builder.append_one_row(row) { returned += chunk.cardinality() as u64; @@ -403,14 +392,14 @@ impl RowSeqScanExecutor { table: Arc>, scan_range: ScanRange, epoch: BatchQueryEpoch, - select_rw_timestamp: bool, histogram: Option>, ) -> Result> { let pk_prefix = scan_range.pk_prefix; assert!(pk_prefix.len() == table.pk_indices().len()); let timer = histogram.as_ref().map(|histogram| histogram.start_timer()); - if select_rw_timestamp { + if table.has_epoch_idx() { + // has epoch_idx means we need to select `_rw_timestamp` column which is unsupported by `get_row` interface, so use iterator interface instead. let range_bounds = (Bound::::Unbounded, Bound::Unbounded); let iter = table .batch_chunk_iter_with_pk_bounds( diff --git a/src/frontend/planner_test/tests/testdata/output/basic_query.yaml b/src/frontend/planner_test/tests/testdata/output/basic_query.yaml index c80afc68dde7d..7d43fceef8306 100644 --- a/src/frontend/planner_test/tests/testdata/output/basic_query.yaml +++ b/src/frontend/planner_test/tests/testdata/output/basic_query.yaml @@ -113,7 +113,7 @@ BatchExchange { order: [], dist: Single } └─BatchDelete { table: t } └─BatchExchange { order: [], dist: Single } - └─BatchScan { table: t, columns: [t.v1, t.v2, t._row_id, t._rw_timestamp], distribution: UpstreamHashShard(t._row_id) } + └─BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) } - sql: | create table t (v1 int, v2 int); delete from t where v1 = 1; @@ -122,7 +122,7 @@ └─BatchDelete { table: t } └─BatchExchange { order: [], dist: Single } └─BatchFilter { predicate: (t.v1 = 1:Int32) } - └─BatchScan { table: t, columns: [t.v1, t.v2, t._row_id, t._rw_timestamp], distribution: UpstreamHashShard(t._row_id) } + └─BatchScan { table: t, columns: [t.v1, t.v2, t._row_id], distribution: UpstreamHashShard(t._row_id) } - sql: | create table t (v1 int, v2 int); delete from t where v1; diff --git a/src/frontend/planner_test/tests/testdata/output/delete.yaml b/src/frontend/planner_test/tests/testdata/output/delete.yaml index 0bb96471175d6..67a61a6ea4e60 100644 --- a/src/frontend/planner_test/tests/testdata/output/delete.yaml +++ b/src/frontend/planner_test/tests/testdata/output/delete.yaml @@ -6,13 +6,14 @@ logical_plan: |- LogicalProject { exprs: [t.a, t.b, t.a, (t.a + t.b) as $expr1] } └─LogicalDelete { table: t, returning: true } - └─LogicalScan { table: t, columns: [t.a, t.b, t._row_id, t._rw_timestamp] } + └─LogicalProject { exprs: [t.a, t.b, t._row_id] } + └─LogicalScan { table: t, columns: [t.a, t.b, t._row_id, t._rw_timestamp] } batch_plan: |- BatchExchange { order: [], dist: Single } └─BatchProject { exprs: [t.a, t.b, t.a, (t.a + t.b) as $expr1] } └─BatchDelete { table: t, returning: true } └─BatchExchange { order: [], dist: Single } - └─BatchScan { table: t, columns: [t.a, t.b, t._row_id, t._rw_timestamp], distribution: UpstreamHashShard(t._row_id) } + └─BatchScan { table: t, columns: [t.a, t.b, t._row_id], distribution: UpstreamHashShard(t._row_id) } - name: delete with returning constant, should keep `Delete` sql: | create table t (v int); @@ -20,13 +21,14 @@ logical_plan: |- LogicalProject { exprs: [114514:Int32] } └─LogicalDelete { table: t, returning: true } - └─LogicalScan { table: t, columns: [t.v, t._row_id, t._rw_timestamp] } + └─LogicalProject { exprs: [t.v, t._row_id] } + └─LogicalScan { table: t, columns: [t.v, t._row_id, t._rw_timestamp] } batch_plan: |- BatchExchange { order: [], dist: Single } └─BatchProject { exprs: [114514:Int32] } └─BatchDelete { table: t, returning: true } └─BatchExchange { order: [], dist: Single } - └─BatchScan { table: t, columns: [t.v, t._row_id, t._rw_timestamp], distribution: UpstreamHashShard(t._row_id) } + └─BatchScan { table: t, columns: [t.v, t._row_id], distribution: UpstreamHashShard(t._row_id) } - name: insert with returning agg functions, should not run sql: | create table t (a int, b int); @@ -41,5 +43,5 @@ BatchSimpleAgg { aggs: [sum()] } └─BatchExchange { order: [], dist: Single } └─BatchDelete { table: t } - └─BatchExchange { order: [], dist: HashShard(t.a, t.b, t._row_id, t._rw_timestamp) } - └─BatchScan { table: t, columns: [t.a, t.b, t._row_id, t._rw_timestamp], distribution: UpstreamHashShard(t._row_id) } + └─BatchExchange { order: [], dist: HashShard(t.a, t.b, t._row_id) } + └─BatchScan { table: t, columns: [t.a, t.b, t._row_id], distribution: UpstreamHashShard(t._row_id) } diff --git a/src/frontend/planner_test/tests/testdata/output/index_selection.yaml b/src/frontend/planner_test/tests/testdata/output/index_selection.yaml index 7ea1c5709951e..a6240c69f395f 100644 --- a/src/frontend/planner_test/tests/testdata/output/index_selection.yaml +++ b/src/frontend/planner_test/tests/testdata/output/index_selection.yaml @@ -197,12 +197,12 @@ BatchExchange { order: [], dist: Single } └─BatchDelete { table: t1 } └─BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1._row_id, t1._rw_timestamp], lookup table: t1 } + └─BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1._row_id], lookup table: t1 } └─BatchExchange { order: [], dist: UpstreamHashShard(idx2.t1._row_id) } └─BatchScan { table: idx2, columns: [idx2.t1._row_id], scan_ranges: [idx2.b = Decimal(Normalized(2))], distribution: SomeShard } batch_local_plan: |- BatchDelete { table: t1 } - └─BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1._row_id, t1._rw_timestamp], lookup table: t1 } + └─BatchLookupJoin { type: Inner, predicate: idx2.t1._row_id IS NOT DISTINCT FROM t1._row_id, output: [t1.a, t1.b, t1.c, t1._row_id], lookup table: t1 } └─BatchExchange { order: [], dist: Single } └─BatchScan { table: idx2, columns: [idx2.t1._row_id], scan_ranges: [idx2.b = Decimal(Normalized(2))], distribution: SomeShard } - sql: | diff --git a/src/frontend/planner_test/tests/testdata/output/update.yaml b/src/frontend/planner_test/tests/testdata/output/update.yaml index ec596ed2fc097..6e26252840629 100644 --- a/src/frontend/planner_test/tests/testdata/output/update.yaml +++ b/src/frontend/planner_test/tests/testdata/output/update.yaml @@ -141,17 +141,18 @@ delete from t where a not in (select b from t); logical_plan: |- LogicalDelete { table: t } - └─LogicalApply { type: LeftAnti, on: (t.a = t.b), correlated_id: 1 } - ├─LogicalScan { table: t, columns: [t.a, t.b, t._row_id, t._rw_timestamp] } - └─LogicalProject { exprs: [t.b] } - └─LogicalScan { table: t, columns: [t.a, t.b, t._row_id, t._rw_timestamp] } + └─LogicalProject { exprs: [t.a, t.b, t._row_id] } + └─LogicalApply { type: LeftAnti, on: (t.a = t.b), correlated_id: 1 } + ├─LogicalScan { table: t, columns: [t.a, t.b, t._row_id, t._rw_timestamp] } + └─LogicalProject { exprs: [t.b] } + └─LogicalScan { table: t, columns: [t.a, t.b, t._row_id, t._rw_timestamp] } batch_plan: |- BatchExchange { order: [], dist: Single } └─BatchDelete { table: t } └─BatchExchange { order: [], dist: Single } └─BatchHashJoin { type: LeftAnti, predicate: t.a = t.b, output: all } ├─BatchExchange { order: [], dist: HashShard(t.a) } - │ └─BatchScan { table: t, columns: [t.a, t.b, t._row_id, t._rw_timestamp], distribution: UpstreamHashShard(t._row_id) } + │ └─BatchScan { table: t, columns: [t.a, t.b, t._row_id], distribution: UpstreamHashShard(t._row_id) } └─BatchExchange { order: [], dist: HashShard(t.b) } └─BatchScan { table: t, columns: [t.b], distribution: SomeShard } - name: distributed update diff --git a/src/storage/src/table/batch_table/storage_table.rs b/src/storage/src/table/batch_table/storage_table.rs index 4a52b5f0326cb..66f926bf8213e 100644 --- a/src/storage/src/table/batch_table/storage_table.rs +++ b/src/storage/src/table/batch_table/storage_table.rs @@ -452,6 +452,10 @@ impl StorageTableInner { pub fn update_vnode_bitmap(&mut self, new_vnodes: Arc) -> Arc { self.distribution.update_vnode_bitmap(new_vnodes) } + + pub fn has_epoch_idx(&self) -> bool { + self.epoch_idx.is_some() + } } pub trait PkAndRowStream = Stream>> + Send;