Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Sep 12, 2023
1 parent 050bb70 commit bb6f3fa
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 35 deletions.
2 changes: 1 addition & 1 deletion ci/scripts/run-backfill-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ test_replication_with_column_pruning() {
run_sql_file "$PARENT_PATH"/sql/backfill/replication_with_column_pruning/insert_seed.sql

# Provide snapshot
for i in $(seq 1 19)
for i in $(seq 1 18)
do
run_sql_file "$PARENT_PATH"/sql/backfill/replication_with_column_pruning/insert_recurse.sql
done
Expand Down
2 changes: 2 additions & 0 deletions ci/scripts/sql/backfill/basic/drop.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP MATERIALIZED VIEW mv1;
DROP TABLE t1;
34 changes: 0 additions & 34 deletions src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1199,32 +1199,6 @@ where
Ok(self.local_store.iter(key_range, read_options).await?)
}

/// Replicated tables might not have all columns in the output, so we need to project the
/// output.
/// Instead of doing this in the `iter_with_pk_range` function, we do it in a separate function,
/// since `RowStream` is an TAIT, hence we can't return a `RowStream` with a different type.
pub async fn iter_with_pk_range_and_output_indices(
&self,
pk_range: &(Bound<OwnedRow>, Bound<OwnedRow>),
// Optional vnode that returns an iterator only over the given range under that vnode.
// For now, we require this parameter, and will panic. In the future, when `None`, we can
// iterate over each vnode that the `StateTableInner` owns.
vnode: VirtualNode,
// TODO(kwannoel): Refactor `PrefetchOptions` -> `StorageIterOptions`, so we can include
// epoch?
prefetch_options: PrefetchOptions,
) -> StreamExecutorResult<ProjectedRowStream<'_, S, SD, IS_REPLICATED, W, USE_WATERMARK_CACHE>>
{
assert!(
IS_REPLICATED,
"Only replicated tables can use this function"
);
Ok(self
.iter_row_with_pk_range(pk_range, vnode, prefetch_options)
.await?
.map(|row| row.map(|r| r.into_owned_row().project(&self.output_indices))))
}

async fn iter_kv_with_pk_prefix(
&self,
pk_prefix: impl Row,
Expand Down Expand Up @@ -1342,14 +1316,6 @@ where
}
}

pub type ProjectedRowStream<
'a,
S: StateStore,
SD: ValueRowSerde + 'a,
const IS_REPLICATED: bool,
W: WatermarkBufferStrategy + 'a,
const USE_WATERMARK_CACHE: bool,
> = impl Stream<Item = StreamExecutorResult<Project<'a, OwnedRow>>> + 'a;
pub type KeyedRowStream<'a, S: StateStore, SD: ValueRowSerde + 'a> =
impl Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a;

Expand Down

0 comments on commit bb6f3fa

Please sign in to comment.