Skip to content

Commit

Permalink
fix e2e
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Sep 11, 2024
1 parent 84154f3 commit 290ba9d
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 54 deletions.
4 changes: 4 additions & 0 deletions e2e_test/time_travel/syntax.slt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ SET QUERY_MODE TO local;
statement ok
CREATE TABLE t (k INT);

query I
SELECT * FROM t;
----

query error
SELECT * FROM t FOR SYSTEM_TIME AS OF 963716300;
----
Expand Down
2 changes: 2 additions & 0 deletions src/storage/hummock_trace/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ pub struct TracedReadOptions {
pub retention_seconds: Option<u32>,
pub table_id: TracedTableId,
pub read_version_from_backup: bool,
pub read_committed: bool,
}

impl TracedReadOptions {
Expand All @@ -124,6 +125,7 @@ impl TracedReadOptions {
retention_seconds: None,
table_id: TracedTableId { table_id },
read_version_from_backup: false,
read_committed: false,
}
}
}
Expand Down
70 changes: 54 additions & 16 deletions src/storage/src/hummock/local_version/recent_versions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,49 +104,83 @@ impl RecentVersions {
table_id: TableId,
epoch: HummockEpoch,
) -> Option<PinnedVersion> {
let Some(info) = self
if let Some(info) = self
.latest_version
.version()
.state_table_info
.info()
.get(&table_id)
else {
return Some(self.latest_version.clone());
};
if info.committed_epoch <= epoch {
return Some(self.latest_version.clone());
{
if info.committed_epoch <= epoch {
Some(self.latest_version.clone())
} else {
self.get_safe_version_from_recent(table_id, epoch)
}
} else {
None
}
self.get_safe_version_from_recent(table_id, epoch)
}

fn get_safe_version_from_recent(
&self,
table_id: TableId,
epoch: HummockEpoch,
) -> Option<PinnedVersion> {
if cfg!(debug_assertions) {
assert!(
epoch
< self
.latest_version
.version()
.state_table_info
.info()
.get(&table_id)
.expect("should exist")
.committed_epoch
);
}
let result = self.recent_versions.binary_search_by(|version| {
let committed_epoch = version
.version()
.state_table_info
.info()
.get(&table_id)
.map(|info| info.committed_epoch)
.unwrap_or(0);
committed_epoch.cmp(&epoch)
.map(|info| info.committed_epoch);
if let Some(committed_epoch) = committed_epoch {
committed_epoch.cmp(&epoch)
} else {
// We have ensured that the table_id exists in the latest version, so if the table_id does not exist in a
// previous version, the table must have not created yet, and therefore has less ordering.
Ordering::Less
}
});
match result {
Ok(index) => Some(self.recent_versions[index].clone()),
Err(index) => {
// `index` is index of the first version that has `committed_epoch` greater than `epoch`
// or `index` equals `recent_version.len()` when `epoch` is greater than all `committed_epoch`
if index >= self.recent_versions.len() {
let version = if index >= self.recent_versions.len() {
assert_eq!(index, self.recent_versions.len());
self.recent_versions.last().cloned()
} else if index == 0 {
// The earliest version has a higher committed epoch
None
} else {
self.recent_versions.get(index - 1).cloned()
}
};
version.and_then(|version| {
if version
.version()
.state_table_info
.info()
.contains_key(&table_id)
{
Some(version)
} else {
// if the table does not exist in the version, return `None` to try get a time travel version
None
}
})
}
}
}
Expand Down Expand Up @@ -196,8 +230,12 @@ mod tests {
recent_version: &RecentVersions,
expected: &[(TableId, u64, Option<&PinnedVersion>)],
) {
for (table_id, epoch, expected_version) in expected {
let version = recent_version.get_safe_version(*table_id, *epoch);
for (table_id, epoch, expected_version) in expected
.iter()
.cloned()
.chain([(TEST_TABLE_ID1, 0, None), (TEST_TABLE_ID2, 0, None)])
{
let version = recent_version.get_safe_version(table_id, epoch);
assert_eq!(
version.as_ref().map(|version| version.id()),
expected_version.map(|version| version.id())
Expand Down Expand Up @@ -256,7 +294,7 @@ mod tests {
(TEST_TABLE_ID1, epoch1, Some(&version1)),
(TEST_TABLE_ID1, epoch2, Some(&version4)),
(TEST_TABLE_ID1, epoch3, Some(&version4)),
(TEST_TABLE_ID2, epoch0, Some(&version3)),
(TEST_TABLE_ID2, epoch0, None),
(TEST_TABLE_ID2, epoch1, Some(&version4)),
(TEST_TABLE_ID2, epoch2, Some(&version4)),
],
Expand All @@ -274,7 +312,7 @@ mod tests {
(TEST_TABLE_ID1, epoch2, Some(&version4)),
(TEST_TABLE_ID1, epoch3, Some(&version5)),
(TEST_TABLE_ID1, epoch4, Some(&version5)),
(TEST_TABLE_ID2, epoch0, Some(&version3)),
(TEST_TABLE_ID2, epoch0, None),
(TEST_TABLE_ID2, epoch1, Some(&version5)),
(TEST_TABLE_ID2, epoch2, Some(&version5)),
],
Expand Down
94 changes: 56 additions & 38 deletions src/storage/src/hummock/store/hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,13 +261,9 @@ impl HummockStorage {
) -> StorageResult<Option<Bytes>> {
let key_range = (Bound::Included(key.clone()), Bound::Included(key.clone()));

let (key_range, read_version_tuple) = if read_options.read_version_from_backup {
self.build_read_version_tuple_from_backup(epoch, read_options.table_id, key_range)
.await?
} else {
self.build_read_version_tuple(epoch, read_options.table_id, key_range)
.await?
};
let (key_range, read_version_tuple) = self
.build_read_version_tuple(epoch, key_range, &read_options)
.await?;

if is_empty_key_range(&key_range) {
return Ok(None);
Expand All @@ -284,13 +280,9 @@ impl HummockStorage {
epoch: u64,
read_options: ReadOptions,
) -> StorageResult<HummockStorageIterator> {
let (key_range, read_version_tuple) = if read_options.read_version_from_backup {
self.build_read_version_tuple_from_backup(epoch, read_options.table_id, key_range)
.await?
} else {
self.build_read_version_tuple(epoch, read_options.table_id, key_range)
.await?
};
let (key_range, read_version_tuple) = self
.build_read_version_tuple(epoch, key_range, &read_options)
.await?;

self.hummock_version_reader
.iter(key_range, epoch, read_options, read_version_tuple)
Expand All @@ -303,13 +295,9 @@ impl HummockStorage {
epoch: u64,
read_options: ReadOptions,
) -> StorageResult<HummockStorageRevIterator> {
let (key_range, read_version_tuple) = if read_options.read_version_from_backup {
self.build_read_version_tuple_from_backup(epoch, read_options.table_id, key_range)
.await?
} else {
self.build_read_version_tuple(epoch, read_options.table_id, key_range)
.await?
};
let (key_range, read_version_tuple) = self
.build_read_version_tuple(epoch, key_range, &read_options)
.await?;

self.hummock_version_reader
.rev_iter(key_range, epoch, read_options, read_version_tuple, None)
Expand Down Expand Up @@ -339,6 +327,23 @@ impl HummockStorage {
Ok(version)
}

async fn build_read_version_tuple(
&self,
epoch: u64,
key_range: TableKeyRange,
read_options: &ReadOptions,
) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
if read_options.read_version_from_backup {
self.build_read_version_tuple_from_backup(epoch, read_options.table_id, key_range)
.await
} else if read_options.read_committed {
self.build_read_version_tuple_from_committed(epoch, read_options.table_id, key_range)
.await
} else {
self.build_read_version_tuple_from_all(epoch, read_options.table_id, key_range)
}
}

async fn build_read_version_tuple_from_backup(
&self,
epoch: u64,
Expand All @@ -365,34 +370,47 @@ impl HummockStorage {
}
}

async fn build_read_version_tuple(
async fn build_read_version_tuple_from_committed(
&self,
epoch: u64,
table_id: TableId,
key_range: TableKeyRange,
) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
let version = match self
.recent_versions
.load()
.get_safe_version(table_id, epoch)
{
Some(version) => version,
None => self.get_time_travel_version(epoch, table_id).await?,
};
Ok(get_committed_read_version_tuple(
version, table_id, key_range, epoch,
))
}

fn build_read_version_tuple_from_all(
&self,
epoch: u64,
table_id: TableId,
key_range: TableKeyRange,
) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
let recent_versions = self.recent_versions.load();
let pinned_version = recent_versions.latest_version().clone();
let state_table_info = pinned_version
let pinned_version = self.recent_versions.load().latest_version().clone();
let info = pinned_version
.version()
.state_table_info
.info()
.get(&table_id);

// check epoch if lower mce
let ret = if let Some(info) = state_table_info
let ret = if let Some(info) = info
&& epoch <= info.committed_epoch
{
let version = if epoch >= info.safe_epoch {
recent_versions.latest_version().clone()
} else if let Some(version) = recent_versions.get_safe_version(table_id, epoch) {
version
} else {
drop(recent_versions);
self.get_time_travel_version(epoch, table_id).await?
};
if epoch < info.safe_epoch {
return Err(HummockError::expired_epoch(table_id, info.safe_epoch, epoch).into());
}
// read committed_version directly without build snapshot
get_committed_read_version_tuple(version, table_id, key_range, epoch)
get_committed_read_version_tuple(pinned_version, table_id, key_range, epoch)
} else {
let vnode = vnode(&key_range);
let mut matched_replicated_read_version_cnt = 0;
Expand Down Expand Up @@ -425,22 +443,23 @@ impl HummockStorage {
// When the system has just started and no state has been created, the memory state
// may be empty
if read_version_vec.is_empty() {
let table_committed_epoch = info.map(|info| info.committed_epoch);
if matched_replicated_read_version_cnt > 0 {
tracing::warn!(
"Read(table_id={} vnode={} epoch={}) is not allowed on replicated read version ({} found). Fall back to committed version (epoch={:?})",
table_id,
vnode.to_index(),
epoch,
matched_replicated_read_version_cnt,
state_table_info,
table_committed_epoch,
);
} else {
tracing::debug!(
"No read version found for read(table_id={} vnode={} epoch={}). Fall back to committed version (epoch={:?})",
table_id,
vnode.to_index(),
epoch,
state_table_info
table_committed_epoch
);
}
get_committed_read_version_tuple(pinned_version, table_id, key_range, epoch)
Expand Down Expand Up @@ -526,7 +545,6 @@ impl HummockStorage {
self.buffer_tracker.get_memory_limiter().clone()
}

#[cfg(any(test, feature = "test"))]
pub fn get_pinned_version(&self) -> PinnedVersion {
self.recent_versions.load().latest_version().clone()
}
Expand Down
3 changes: 3 additions & 0 deletions src/storage/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ pub struct ReadOptions {
/// Read from historical hummock version of meta snapshot backup.
/// It should only be used by `StorageTable` for batch query.
pub read_version_from_backup: bool,
pub read_committed: bool,
}

impl From<TracedReadOptions> for ReadOptions {
Expand All @@ -514,6 +515,7 @@ impl From<TracedReadOptions> for ReadOptions {
retention_seconds: value.retention_seconds,
table_id: value.table_id.into(),
read_version_from_backup: value.read_version_from_backup,
read_committed: value.read_committed,
}
}
}
Expand All @@ -528,6 +530,7 @@ impl From<ReadOptions> for TracedReadOptions {
retention_seconds: value.retention_seconds,
table_id: value.table_id.into(),
read_version_from_backup: value.read_version_from_backup,
read_committed: value.read_committed,
}
}
}
Expand Down
10 changes: 10 additions & 0 deletions src/storage/src/table/batch_table/storage_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,10 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
) -> StorageResult<Option<OwnedRow>> {
let epoch = wait_epoch.get_epoch();
let read_backup = matches!(wait_epoch, HummockReadEpoch::Backup(_));
let read_committed = matches!(
wait_epoch,
HummockReadEpoch::TimeTravel(_) | HummockReadEpoch::Committed(_)
);
self.store.try_wait_epoch(wait_epoch).await?;
let serialized_pk = serialize_pk_with_vnode(
&pk,
Expand All @@ -381,6 +385,7 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
retention_seconds: self.table_option.retention_seconds,
table_id: self.table_id,
read_version_from_backup: read_backup,
read_committed,
cache_policy: CachePolicy::Fill(CacheContext::Default),
..Default::default()
};
Expand Down Expand Up @@ -485,12 +490,17 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
let iterators: Vec<_> = try_join_all(table_key_ranges.map(|table_key_range| {
let prefix_hint = prefix_hint.clone();
let read_backup = matches!(wait_epoch, HummockReadEpoch::Backup(_));
let read_committed = matches!(
wait_epoch,
HummockReadEpoch::TimeTravel(_) | HummockReadEpoch::Committed(_)
);
async move {
let read_options = ReadOptions {
prefix_hint,
retention_seconds: self.table_option.retention_seconds,
table_id: self.table_id,
read_version_from_backup: read_backup,
read_committed,
prefetch_options,
cache_policy,
..Default::default()
Expand Down

0 comments on commit 290ba9d

Please sign in to comment.