Skip to content

Commit

Permalink
feat(batch): use get_keyed_row to implement point get for _rw_timesta…
Browse files Browse the repository at this point in the history
…mp (#19415)
  • Loading branch information
chenzl25 authored Nov 18, 2024
1 parent a4d96ec commit 084fff8
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 37 deletions.
2 changes: 1 addition & 1 deletion src/batch/src/executor/join/distributed_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ impl<S: StateStore> LookupExecutorBuilder for InnerSideExecutorBuilder<S> {

let pk_prefix = OwnedRow::new(scan_range.eq_conds);

if self.lookup_prefix_len == self.table.pk_indices().len() && !self.table.has_epoch_idx() {
if self.lookup_prefix_len == self.table.pk_indices().len() {
let row = self.table.get_row(&pk_prefix, self.epoch.into()).await?;

if let Some(row) = row {
Expand Down
31 changes: 4 additions & 27 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,40 +396,17 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
) -> Result<Option<OwnedRow>> {
let pk_prefix = scan_range.pk_prefix;
assert!(pk_prefix.len() == table.pk_indices().len());

let timer = histogram.as_ref().map(|histogram| histogram.start_timer());

let res = if table.has_epoch_idx() {
// has epoch_idx means we need to select `_rw_timestamp` column which is unsupported by `get_row` interface, so use iterator interface instead.
let range_bounds = (Bound::<OwnedRow>::Unbounded, Bound::Unbounded);
let iter = table
.batch_chunk_iter_with_pk_bounds(
epoch.into(),
&pk_prefix,
range_bounds,
false,
1,
PrefetchOptions::new(false, false),
)
.await?;
pin_mut!(iter);
let chunk = iter.next().await.transpose().map_err(BatchError::from)?;
if let Some(chunk) = chunk {
let row = chunk.row_at(0).0.to_owned_row();
Ok(Some(row))
} else {
Ok(None)
}
} else {
// Point Get.
let row = table.get_row(&pk_prefix, epoch.into()).await?;
Ok(row)
};
// Point Get.
let row = table.get_row(&pk_prefix, epoch.into()).await?;

if let Some(timer) = timer {
timer.observe_duration()
}

res
Ok(row)
}

#[try_stream(ok = DataChunk, error = BatchError)]
Expand Down
47 changes: 38 additions & 9 deletions src/storage/src/table/batch_table/storage_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,6 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
pk: impl Row,
wait_epoch: HummockReadEpoch,
) -> StorageResult<Option<OwnedRow>> {
// `get_row` doesn't support select `_rw_timestamp` yet.
assert!(self.epoch_idx.is_none());
let epoch = wait_epoch.get_epoch();
let read_backup = matches!(wait_epoch, HummockReadEpoch::Backup(_));
let read_committed = wait_epoch.is_read_committed();
Expand Down Expand Up @@ -406,7 +404,11 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
cache_policy: CachePolicy::Fill(CacheContext::Default),
..Default::default()
};
if let Some(value) = self.store.get(serialized_pk, epoch, read_options).await? {
if let Some((full_key, value)) = self
.store
.get_keyed_row(serialized_pk, epoch, read_options)
.await?
{
let row = self.row_serde.deserialize(&value)?;
let result_row_in_value = self.mapping.project(OwnedRow::new(row));

Expand All @@ -416,7 +418,13 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
pk.project(&self.output_row_in_key_indices).into_owned_row();
let mut result_row_vec = vec![];
for idx in &self.output_indices {
if self.value_output_indices.contains(idx) {
if let Some(epoch_idx) = self.epoch_idx
&& *idx == epoch_idx
{
let epoch = Epoch::from(full_key.epoch_with_gap.pure_epoch());
result_row_vec
.push(risingwave_common::types::Datum::from(epoch.as_scalar()));
} else if self.value_output_indices.contains(idx) {
let item_position_in_value_indices = &self
.value_output_indices
.iter()
Expand All @@ -440,7 +448,32 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
let result_row = OwnedRow::new(result_row_vec);
Ok(Some(result_row))
}
None => Ok(Some(result_row_in_value.into_owned_row())),
None => match &self.epoch_idx {
Some(epoch_idx) => {
let mut result_row_vec = vec![];
for idx in &self.output_indices {
if idx == epoch_idx {
let epoch = Epoch::from(full_key.epoch_with_gap.pure_epoch());
result_row_vec
.push(risingwave_common::types::Datum::from(epoch.as_scalar()));
} else {
let item_position_in_value_indices = &self
.value_output_indices
.iter()
.position(|p| idx == p)
.unwrap();
result_row_vec.push(
result_row_in_value
.datum_at(*item_position_in_value_indices)
.to_owned_datum(),
);
}
}
let result_row = OwnedRow::new(result_row_vec);
Ok(Some(result_row))
}
None => Ok(Some(result_row_in_value.into_owned_row())),
},
}
} else {
Ok(None)
Expand All @@ -452,10 +485,6 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
pub fn update_vnode_bitmap(&mut self, new_vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
self.distribution.update_vnode_bitmap(new_vnodes)
}

pub fn has_epoch_idx(&self) -> bool {
self.epoch_idx.is_some()
}
}

pub trait PkAndRowStream = Stream<Item = StorageResult<KeyedRow<Bytes>>> + Send;
Expand Down

0 comments on commit 084fff8

Please sign in to comment.