From 290ba9d7582e41552bdd8c583fabcfe13242deee Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 11 Sep 2024 15:14:45 +0800 Subject: [PATCH] fix e2e --- e2e_test/time_travel/syntax.slt | 4 + src/storage/hummock_trace/src/opts.rs | 2 + .../hummock/local_version/recent_versions.rs | 70 ++++++++++---- .../src/hummock/store/hummock_storage.rs | 94 +++++++++++-------- src/storage/src/store.rs | 3 + .../src/table/batch_table/storage_table.rs | 10 ++ 6 files changed, 129 insertions(+), 54 deletions(-) diff --git a/e2e_test/time_travel/syntax.slt b/e2e_test/time_travel/syntax.slt index 6c3408a276763..5895f6d9b9e8b 100644 --- a/e2e_test/time_travel/syntax.slt +++ b/e2e_test/time_travel/syntax.slt @@ -7,6 +7,10 @@ SET QUERY_MODE TO local; statement ok CREATE TABLE t (k INT); +query I +SELECT * FROM t; +---- + query error SELECT * FROM t FOR SYSTEM_TIME AS OF 963716300; ---- diff --git a/src/storage/hummock_trace/src/opts.rs b/src/storage/hummock_trace/src/opts.rs index 13b4b49b2022b..ff8b43c15c458 100644 --- a/src/storage/hummock_trace/src/opts.rs +++ b/src/storage/hummock_trace/src/opts.rs @@ -109,6 +109,7 @@ pub struct TracedReadOptions { pub retention_seconds: Option, pub table_id: TracedTableId, pub read_version_from_backup: bool, + pub read_committed: bool, } impl TracedReadOptions { @@ -124,6 +125,7 @@ impl TracedReadOptions { retention_seconds: None, table_id: TracedTableId { table_id }, read_version_from_backup: false, + read_committed: false, } } } diff --git a/src/storage/src/hummock/local_version/recent_versions.rs b/src/storage/src/hummock/local_version/recent_versions.rs index ff90bc0c9bbe2..8d3f1a015ad6a 100644 --- a/src/storage/src/hummock/local_version/recent_versions.rs +++ b/src/storage/src/hummock/local_version/recent_versions.rs @@ -104,19 +104,21 @@ impl RecentVersions { table_id: TableId, epoch: HummockEpoch, ) -> Option { - let Some(info) = self + if let Some(info) = self .latest_version .version() .state_table_info .info() .get(&table_id) - else { - return Some(self.latest_version.clone()); - }; - if info.committed_epoch <= epoch { - return Some(self.latest_version.clone()); + { + if info.committed_epoch <= epoch { + Some(self.latest_version.clone()) + } else { + self.get_safe_version_from_recent(table_id, epoch) + } + } else { + None } - self.get_safe_version_from_recent(table_id, epoch) } fn get_safe_version_from_recent( @@ -124,29 +126,61 @@ impl RecentVersions { table_id: TableId, epoch: HummockEpoch, ) -> Option { + if cfg!(debug_assertions) { + assert!( + epoch + < self + .latest_version + .version() + .state_table_info + .info() + .get(&table_id) + .expect("should exist") + .committed_epoch + ); + } let result = self.recent_versions.binary_search_by(|version| { let committed_epoch = version .version() .state_table_info .info() .get(&table_id) - .map(|info| info.committed_epoch) - .unwrap_or(0); - committed_epoch.cmp(&epoch) + .map(|info| info.committed_epoch); + if let Some(committed_epoch) = committed_epoch { + committed_epoch.cmp(&epoch) + } else { + // We have ensured that the table_id exists in the latest version, so if the table_id does not exist in a + // previous version, the table must have not created yet, and therefore has less ordering. + Ordering::Less + } }); match result { Ok(index) => Some(self.recent_versions[index].clone()), Err(index) => { // `index` is index of the first version that has `committed_epoch` greater than `epoch` // or `index` equals `recent_version.len()` when `epoch` is greater than all `committed_epoch` - if index >= self.recent_versions.len() { + let version = if index >= self.recent_versions.len() { assert_eq!(index, self.recent_versions.len()); self.recent_versions.last().cloned() } else if index == 0 { + // The earliest version has a higher committed epoch None } else { self.recent_versions.get(index - 1).cloned() - } + }; + version.and_then(|version| { + if version + .version() + .state_table_info + .info() + .contains_key(&table_id) + { + Some(version) + } else { + // if the table does not exist in the version, return `None` to try get a time travel version + None + } + }) } } } @@ -196,8 +230,12 @@ mod tests { recent_version: &RecentVersions, expected: &[(TableId, u64, Option<&PinnedVersion>)], ) { - for (table_id, epoch, expected_version) in expected { - let version = recent_version.get_safe_version(*table_id, *epoch); + for (table_id, epoch, expected_version) in expected + .iter() + .cloned() + .chain([(TEST_TABLE_ID1, 0, None), (TEST_TABLE_ID2, 0, None)]) + { + let version = recent_version.get_safe_version(table_id, epoch); assert_eq!( version.as_ref().map(|version| version.id()), expected_version.map(|version| version.id()) @@ -256,7 +294,7 @@ mod tests { (TEST_TABLE_ID1, epoch1, Some(&version1)), (TEST_TABLE_ID1, epoch2, Some(&version4)), (TEST_TABLE_ID1, epoch3, Some(&version4)), - (TEST_TABLE_ID2, epoch0, Some(&version3)), + (TEST_TABLE_ID2, epoch0, None), (TEST_TABLE_ID2, epoch1, Some(&version4)), (TEST_TABLE_ID2, epoch2, Some(&version4)), ], @@ -274,7 +312,7 @@ mod tests { (TEST_TABLE_ID1, epoch2, Some(&version4)), (TEST_TABLE_ID1, epoch3, Some(&version5)), (TEST_TABLE_ID1, epoch4, Some(&version5)), - (TEST_TABLE_ID2, epoch0, Some(&version3)), + (TEST_TABLE_ID2, epoch0, None), (TEST_TABLE_ID2, epoch1, Some(&version5)), (TEST_TABLE_ID2, epoch2, Some(&version5)), ], diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index bd321197be45d..b4924a5dca60f 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -261,13 +261,9 @@ impl HummockStorage { ) -> StorageResult> { let key_range = (Bound::Included(key.clone()), Bound::Included(key.clone())); - let (key_range, read_version_tuple) = if read_options.read_version_from_backup { - self.build_read_version_tuple_from_backup(epoch, read_options.table_id, key_range) - .await? - } else { - self.build_read_version_tuple(epoch, read_options.table_id, key_range) - .await? - }; + let (key_range, read_version_tuple) = self + .build_read_version_tuple(epoch, key_range, &read_options) + .await?; if is_empty_key_range(&key_range) { return Ok(None); @@ -284,13 +280,9 @@ impl HummockStorage { epoch: u64, read_options: ReadOptions, ) -> StorageResult { - let (key_range, read_version_tuple) = if read_options.read_version_from_backup { - self.build_read_version_tuple_from_backup(epoch, read_options.table_id, key_range) - .await? - } else { - self.build_read_version_tuple(epoch, read_options.table_id, key_range) - .await? - }; + let (key_range, read_version_tuple) = self + .build_read_version_tuple(epoch, key_range, &read_options) + .await?; self.hummock_version_reader .iter(key_range, epoch, read_options, read_version_tuple) @@ -303,13 +295,9 @@ impl HummockStorage { epoch: u64, read_options: ReadOptions, ) -> StorageResult { - let (key_range, read_version_tuple) = if read_options.read_version_from_backup { - self.build_read_version_tuple_from_backup(epoch, read_options.table_id, key_range) - .await? - } else { - self.build_read_version_tuple(epoch, read_options.table_id, key_range) - .await? - }; + let (key_range, read_version_tuple) = self + .build_read_version_tuple(epoch, key_range, &read_options) + .await?; self.hummock_version_reader .rev_iter(key_range, epoch, read_options, read_version_tuple, None) @@ -339,6 +327,23 @@ impl HummockStorage { Ok(version) } + async fn build_read_version_tuple( + &self, + epoch: u64, + key_range: TableKeyRange, + read_options: &ReadOptions, + ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> { + if read_options.read_version_from_backup { + self.build_read_version_tuple_from_backup(epoch, read_options.table_id, key_range) + .await + } else if read_options.read_committed { + self.build_read_version_tuple_from_committed(epoch, read_options.table_id, key_range) + .await + } else { + self.build_read_version_tuple_from_all(epoch, read_options.table_id, key_range) + } + } + async fn build_read_version_tuple_from_backup( &self, epoch: u64, @@ -365,34 +370,47 @@ impl HummockStorage { } } - async fn build_read_version_tuple( + async fn build_read_version_tuple_from_committed( + &self, + epoch: u64, + table_id: TableId, + key_range: TableKeyRange, + ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> { + let version = match self + .recent_versions + .load() + .get_safe_version(table_id, epoch) + { + Some(version) => version, + None => self.get_time_travel_version(epoch, table_id).await?, + }; + Ok(get_committed_read_version_tuple( + version, table_id, key_range, epoch, + )) + } + + fn build_read_version_tuple_from_all( &self, epoch: u64, table_id: TableId, key_range: TableKeyRange, ) -> StorageResult<(TableKeyRange, ReadVersionTuple)> { - let recent_versions = self.recent_versions.load(); - let pinned_version = recent_versions.latest_version().clone(); - let state_table_info = pinned_version + let pinned_version = self.recent_versions.load().latest_version().clone(); + let info = pinned_version .version() .state_table_info .info() .get(&table_id); // check epoch if lower mce - let ret = if let Some(info) = state_table_info + let ret = if let Some(info) = info && epoch <= info.committed_epoch { - let version = if epoch >= info.safe_epoch { - recent_versions.latest_version().clone() - } else if let Some(version) = recent_versions.get_safe_version(table_id, epoch) { - version - } else { - drop(recent_versions); - self.get_time_travel_version(epoch, table_id).await? - }; + if epoch < info.safe_epoch { + return Err(HummockError::expired_epoch(table_id, info.safe_epoch, epoch).into()); + } // read committed_version directly without build snapshot - get_committed_read_version_tuple(version, table_id, key_range, epoch) + get_committed_read_version_tuple(pinned_version, table_id, key_range, epoch) } else { let vnode = vnode(&key_range); let mut matched_replicated_read_version_cnt = 0; @@ -425,6 +443,7 @@ impl HummockStorage { // When the system has just started and no state has been created, the memory state // may be empty if read_version_vec.is_empty() { + let table_committed_epoch = info.map(|info| info.committed_epoch); if matched_replicated_read_version_cnt > 0 { tracing::warn!( "Read(table_id={} vnode={} epoch={}) is not allowed on replicated read version ({} found). Fall back to committed version (epoch={:?})", @@ -432,7 +451,7 @@ impl HummockStorage { vnode.to_index(), epoch, matched_replicated_read_version_cnt, - state_table_info, + table_committed_epoch, ); } else { tracing::debug!( @@ -440,7 +459,7 @@ impl HummockStorage { table_id, vnode.to_index(), epoch, - state_table_info + table_committed_epoch ); } get_committed_read_version_tuple(pinned_version, table_id, key_range, epoch) @@ -526,7 +545,6 @@ impl HummockStorage { self.buffer_tracker.get_memory_limiter().clone() } - #[cfg(any(test, feature = "test"))] pub fn get_pinned_version(&self) -> PinnedVersion { self.recent_versions.load().latest_version().clone() } diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index 480b24d5defb9..ab80f712570ca 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -502,6 +502,7 @@ pub struct ReadOptions { /// Read from historical hummock version of meta snapshot backup. /// It should only be used by `StorageTable` for batch query. pub read_version_from_backup: bool, + pub read_committed: bool, } impl From for ReadOptions { @@ -514,6 +515,7 @@ impl From for ReadOptions { retention_seconds: value.retention_seconds, table_id: value.table_id.into(), read_version_from_backup: value.read_version_from_backup, + read_committed: value.read_committed, } } } @@ -528,6 +530,7 @@ impl From for TracedReadOptions { retention_seconds: value.retention_seconds, table_id: value.table_id.into(), read_version_from_backup: value.read_version_from_backup, + read_committed: value.read_committed, } } } diff --git a/src/storage/src/table/batch_table/storage_table.rs b/src/storage/src/table/batch_table/storage_table.rs index 52a7655288c13..8c5f432f46c57 100644 --- a/src/storage/src/table/batch_table/storage_table.rs +++ b/src/storage/src/table/batch_table/storage_table.rs @@ -361,6 +361,10 @@ impl StorageTableInner { ) -> StorageResult> { let epoch = wait_epoch.get_epoch(); let read_backup = matches!(wait_epoch, HummockReadEpoch::Backup(_)); + let read_committed = matches!( + wait_epoch, + HummockReadEpoch::TimeTravel(_) | HummockReadEpoch::Committed(_) + ); self.store.try_wait_epoch(wait_epoch).await?; let serialized_pk = serialize_pk_with_vnode( &pk, @@ -381,6 +385,7 @@ impl StorageTableInner { retention_seconds: self.table_option.retention_seconds, table_id: self.table_id, read_version_from_backup: read_backup, + read_committed, cache_policy: CachePolicy::Fill(CacheContext::Default), ..Default::default() }; @@ -485,12 +490,17 @@ impl StorageTableInner { let iterators: Vec<_> = try_join_all(table_key_ranges.map(|table_key_range| { let prefix_hint = prefix_hint.clone(); let read_backup = matches!(wait_epoch, HummockReadEpoch::Backup(_)); + let read_committed = matches!( + wait_epoch, + HummockReadEpoch::TimeTravel(_) | HummockReadEpoch::Committed(_) + ); async move { let read_options = ReadOptions { prefix_hint, retention_seconds: self.table_option.retention_seconds, table_id: self.table_id, read_version_from_backup: read_backup, + read_committed, prefetch_options, cache_policy, ..Default::default()