diff --git a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs index 8ac4682d3dd85..5a316eec209f7 100644 --- a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs +++ b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs @@ -44,6 +44,7 @@ pub(crate) struct SharedBufferBatchInner { payload: Vec, range_tombstone_list: Vec, largest_table_key: Vec, + smallest_table_key: Vec, size: usize, _tracker: Option, batch_id: SharedBufferBatchId, @@ -57,6 +58,8 @@ impl SharedBufferBatchInner { _tracker: Option, ) -> Self { let mut largest_table_key = vec![]; + let mut smallest_table_key = vec![]; + let mut smallest_empty = true; if !range_tombstone_list.is_empty() { range_tombstone_list.sort(); let mut range_tombstones: Vec = vec![]; @@ -68,6 +71,11 @@ impl SharedBufferBatchInner { largest_table_key.clear(); largest_table_key.extend_from_slice(&tombstone.end_user_key.table_key.0); } + if smallest_empty || smallest_table_key.gt(&tombstone.start_user_key.table_key.0) { + smallest_table_key.clear(); + smallest_table_key.extend_from_slice(&tombstone.start_user_key.table_key.0); + smallest_empty = false; + } if let Some(last) = range_tombstones.last_mut() { if last.end_user_key.gt(&tombstone.start_user_key) { if last.end_user_key.lt(&tombstone.end_user_key) { @@ -87,11 +95,18 @@ impl SharedBufferBatchInner { largest_table_key.extend_from_slice(item.0.as_ref()); } } + if let Some(item) = payload.first() { + if smallest_empty || item.0.lt(&smallest_table_key) { + smallest_table_key.clear(); + smallest_table_key.extend_from_slice(item.0.as_ref()); + } + } SharedBufferBatchInner { payload, range_tombstone_list, size, largest_table_key, + smallest_table_key, _tracker, batch_id: SHARED_BUFFER_BATCH_ID_GENERATOR.fetch_add(1, Relaxed), } @@ -242,39 +257,20 @@ impl SharedBufferBatch { &self.inner } + #[inline(always)] pub fn start_table_key(&self) -> TableKey<&[u8]> { - TableKey(&self.inner.first().unwrap().0) + TableKey(&self.inner.smallest_table_key) } + #[inline(always)] pub fn end_table_key(&self) -> TableKey<&[u8]> { - TableKey(&self.inner.last().unwrap().0) + TableKey(&self.inner.largest_table_key) } /// return inclusive left endpoint, which means that all data in this batch should be larger or /// equal than this key. pub fn start_user_key(&self) -> UserKey<&[u8]> { - if self.has_range_tombstone() - && (self.inner.is_empty() - || self - .inner - .range_tombstone_list - .first() - .unwrap() - .start_user_key - .table_key - .0 - .as_slice() - .le(&self.inner.first().unwrap().0)) - { - self.inner - .range_tombstone_list - .first() - .unwrap() - .start_user_key - .as_ref() - } else { - UserKey::new(self.table_id, self.start_table_key()) - } + UserKey::new(self.table_id, self.start_table_key()) } #[inline(always)] @@ -285,7 +281,7 @@ impl SharedBufferBatch { /// return inclusive right endpoint, which means that all data in this batch should be smaller /// or equal than this key. pub fn end_user_key(&self) -> UserKey<&[u8]> { - UserKey::new(self.table_id, TableKey(&self.inner.largest_table_key)) + UserKey::new(self.table_id, self.end_table_key()) } pub fn epoch(&self) -> u64 { @@ -620,6 +616,20 @@ mod tests { } output.reverse(); assert_eq!(output, shared_buffer_items); + + let batch = SharedBufferBatch::build_shared_buffer_batch( + epoch, + vec![], + 1, + vec![ + (Bytes::from("a"), Bytes::from("c")), + (Bytes::from("b"), Bytes::from("d")), + ], + TableId::new(0), + None, + ); + assert_eq!(batch.start_table_key().as_ref(), "a".as_bytes()); + assert_eq!(batch.end_table_key().as_ref(), "d".as_bytes()); } #[tokio::test]