diff --git a/ci/scripts/run-backfill-tests.sh b/ci/scripts/run-backfill-tests.sh index 8b003c7e6454c..ffa1bebfdc7e6 100755 --- a/ci/scripts/run-backfill-tests.sh +++ b/ci/scripts/run-backfill-tests.sh @@ -86,7 +86,7 @@ test_replication_with_column_pruning() { run_sql_file "$PARENT_PATH"/sql/backfill/replication_with_column_pruning/insert_seed.sql # Provide snapshot - for i in $(seq 1 19) + for i in $(seq 1 18) do run_sql_file "$PARENT_PATH"/sql/backfill/replication_with_column_pruning/insert_recurse.sql done diff --git a/ci/scripts/sql/backfill/basic/drop.sql b/ci/scripts/sql/backfill/basic/drop.sql index e69de29bb2d1d..304929c7d7ee0 100644 --- a/ci/scripts/sql/backfill/basic/drop.sql +++ b/ci/scripts/sql/backfill/basic/drop.sql @@ -0,0 +1,2 @@ +DROP MATERIALIZED VIEW mv1; +DROP TABLE t1; \ No newline at end of file diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 4c5cb806d02e8..d2372569d9ed2 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -1199,32 +1199,6 @@ where Ok(self.local_store.iter(key_range, read_options).await?) } - /// Replicated tables might not have all columns in the output, so we need to project the - /// output. - /// Instead of doing this in the `iter_with_pk_range` function, we do it in a separate function, - /// since `RowStream` is an TAIT, hence we can't return a `RowStream` with a different type. - pub async fn iter_with_pk_range_and_output_indices( - &self, - pk_range: &(Bound, Bound), - // Optional vnode that returns an iterator only over the given range under that vnode. - // For now, we require this parameter, and will panic. In the future, when `None`, we can - // iterate over each vnode that the `StateTableInner` owns. - vnode: VirtualNode, - // TODO(kwannoel): Refactor `PrefetchOptions` -> `StorageIterOptions`, so we can include - // epoch? - prefetch_options: PrefetchOptions, - ) -> StreamExecutorResult> - { - assert!( - IS_REPLICATED, - "Only replicated tables can use this function" - ); - Ok(self - .iter_row_with_pk_range(pk_range, vnode, prefetch_options) - .await? - .map(|row| row.map(|r| r.into_owned_row().project(&self.output_indices)))) - } - async fn iter_kv_with_pk_prefix( &self, pk_prefix: impl Row, @@ -1342,14 +1316,6 @@ where } } -pub type ProjectedRowStream< - 'a, - S: StateStore, - SD: ValueRowSerde + 'a, - const IS_REPLICATED: bool, - W: WatermarkBufferStrategy + 'a, - const USE_WATERMARK_CACHE: bool, -> = impl Stream>> + 'a; pub type KeyedRowStream<'a, S: StateStore, SD: ValueRowSerde + 'a> = impl Stream>> + 'a;