From 442a08771606397c7c75c557f9369c6e36f21394 Mon Sep 17 00:00:00 2001 From: William Wen <44139337+wenym1@users.noreply.github.com> Date: Sun, 14 Jul 2024 16:49:52 +0800 Subject: [PATCH] feat(storage): decouple spill task from epoch (#17539) --- .../event_handler/hummock_event_handler.rs | 2 +- .../src/hummock/event_handler/uploader/mod.rs | 473 ++++-------------- .../hummock/event_handler/uploader/spiller.rs | 427 ++++++++++++++++ .../event_handler/uploader/test_utils.rs | 346 +++++++++++++ .../src/common/table/test_state_table.rs | 6 - .../src/common/table/test_storage_table.rs | 3 - 6 files changed, 865 insertions(+), 392 deletions(-) create mode 100644 src/storage/src/hummock/event_handler/uploader/spiller.rs create mode 100644 src/storage/src/hummock/event_handler/uploader/test_utils.rs diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index f4038f0d7d527..addb2d08d5fc4 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -936,7 +936,7 @@ mod tests { use tokio::sync::oneshot; use crate::hummock::event_handler::refiller::CacheRefiller; - use crate::hummock::event_handler::uploader::tests::{gen_imm, TEST_TABLE_ID}; + use crate::hummock::event_handler::uploader::test_utils::{gen_imm, TEST_TABLE_ID}; use crate::hummock::event_handler::uploader::UploadTaskOutput; use crate::hummock::event_handler::{HummockEvent, HummockEventHandler, HummockVersionUpdate}; use crate::hummock::iterator::test_utils::mock_sstable_store; diff --git a/src/storage/src/hummock/event_handler/uploader/mod.rs b/src/storage/src/hummock/event_handler/uploader/mod.rs index f0a18aa9eca65..101f54541fede 100644 --- a/src/storage/src/hummock/event_handler/uploader/mod.rs +++ b/src/storage/src/hummock/event_handler/uploader/mod.rs @@ -12,7 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod spiller; mod task_manager; +pub(crate) mod test_utils; use std::cmp::Ordering; use std::collections::btree_map::Entry; @@ -43,6 +45,7 @@ use tokio::task::JoinHandle; use tracing::{debug, error, info, warn}; use crate::hummock::event_handler::hummock_event_handler::{send_sync_result, BufferTracker}; +use crate::hummock::event_handler::uploader::spiller::Spiller; use crate::hummock::event_handler::uploader::uploader_imm::UploaderImm; use crate::hummock::event_handler::LocalInstanceId; use crate::hummock::local_version::pinned_version::PinnedVersion; @@ -657,6 +660,7 @@ impl TableUnsyncData { impl Iterator)>, )>, impl Iterator, + BTreeMap, ) { if let Some(prev_epoch) = self.max_sync_epoch() { assert_gt!(epoch, prev_epoch) @@ -687,6 +691,7 @@ impl TableUnsyncData { take_before_epoch(&mut self.spill_tasks, epoch) .into_values() .flat_map(|tasks| tasks.into_iter()), + epochs, ) } @@ -735,7 +740,7 @@ impl TableUnsyncData { fn max_epoch(&self) -> Option { self.unsync_epochs - .first_key_value() + .last_key_value() .map(|(epoch, _)| *epoch) .or_else(|| self.max_sync_epoch()) } @@ -747,6 +752,22 @@ impl TableUnsyncData { } } +#[derive(Eq, Hash, PartialEq, Copy, Clone)] +struct UnsyncEpochId(HummockEpoch, TableId); + +impl UnsyncEpochId { + fn epoch(&self) -> HummockEpoch { + self.0 + } +} + +fn get_unsync_epoch_id(epoch: HummockEpoch, table_ids: &HashSet) -> Option { + table_ids + .iter() + .min() + .map(|table_id| UnsyncEpochId(epoch, *table_id)) +} + #[derive(Default)] /// Unsync data, can be either imm or spilled sst, and some aggregated epoch information. /// @@ -756,9 +777,7 @@ struct UnsyncData { table_data: HashMap, // An index as a mapping from instance id to its table id instance_table_id: HashMap, - // TODO: this is only used in spill to get existing epochs and can be removed - // when we support spill not based on epoch - epochs: BTreeMap>, + unsync_epochs: HashMap>, } impl UnsyncData { @@ -869,49 +888,56 @@ impl UploaderData { table_ids: HashSet, sync_result_sender: oneshot::Sender>, ) { - // clean old epochs - let epochs = take_before_epoch(&mut self.unsync_data.epochs, epoch); - if cfg!(debug_assertions) { - for epoch_table_ids in epochs.into_values() { - assert_eq!(epoch_table_ids, table_ids); - } - } - let mut all_table_watermarks = HashMap::new(); let mut uploading_tasks = HashSet::new(); let mut spilled_tasks = BTreeSet::new(); let mut flush_payload = HashMap::new(); - for (table_id, table_data) in &self.unsync_data.table_data { - if !table_ids.contains(table_id) { - table_data.assert_after_epoch(epoch); - } - } - for table_id in &table_ids { - let table_data = self + + if let Some(UnsyncEpochId(_, min_table_id)) = get_unsync_epoch_id(epoch, &table_ids) { + let min_table_id_data = self .unsync_data .table_data - .get_mut(table_id) + .get_mut(&min_table_id) .expect("should exist"); - let (unflushed_payload, table_watermarks, task_ids) = table_data.sync(epoch); - for (instance_id, payload) in unflushed_payload { - if !payload.is_empty() { - flush_payload.insert(instance_id, payload); - } - } - if let Some((direction, watermarks)) = table_watermarks { - Self::add_table_watermarks( - &mut all_table_watermarks, - *table_id, - direction, - watermarks, + let epochs = take_before_epoch(&mut min_table_id_data.unsync_epochs.clone(), epoch); + for epoch in epochs.keys() { + assert_eq!( + self.unsync_data + .unsync_epochs + .remove(&UnsyncEpochId(*epoch, min_table_id)) + .expect("should exist"), + table_ids ); } - for task_id in task_ids { - if self.spilled_data.contains_key(&task_id) { - spilled_tasks.insert(task_id); - } else { - uploading_tasks.insert(task_id); + for table_id in &table_ids { + let table_data = self + .unsync_data + .table_data + .get_mut(table_id) + .expect("should exist"); + let (unflushed_payload, table_watermarks, task_ids, table_unsync_epochs) = + table_data.sync(epoch); + assert_eq!(table_unsync_epochs, epochs); + for (instance_id, payload) in unflushed_payload { + if !payload.is_empty() { + flush_payload.insert(instance_id, payload); + } + } + if let Some((direction, watermarks)) = table_watermarks { + Self::add_table_watermarks( + &mut all_table_watermarks, + *table_id, + direction, + watermarks, + ); + } + for task_id in task_ids { + if self.spilled_data.contains_key(&task_id) { + spilled_tasks.insert(task_id); + } else { + uploading_tasks.insert(task_id); + } } } } @@ -1153,7 +1179,13 @@ impl HummockUploader { }); table_data.new_epoch(epoch); } - data.unsync_data.epochs.insert(epoch, table_ids); + if let Some(unsync_epoch_id) = get_unsync_epoch_id(epoch, &table_ids) { + assert!(data + .unsync_data + .unsync_epochs + .insert(unsync_epoch_id, table_ids) + .is_none()); + } } pub(super) fn start_sync_epoch( @@ -1209,43 +1241,28 @@ impl HummockUploader { return false; }; if self.context.buffer_tracker.need_flush() { + let mut spiller = Spiller::new(&mut data.unsync_data); let mut curr_batch_flush_size = 0; // iterate from older epoch to newer epoch - for (epoch, table_ids) in &data.unsync_data.epochs { - if !self - .context - .buffer_tracker - .need_more_flush(curr_batch_flush_size) + while self + .context + .buffer_tracker + .need_more_flush(curr_batch_flush_size) + && let Some((epoch, payload, spilled_table_ids)) = spiller.next_spilled_payload() + { + assert!(!payload.is_empty()); { - break; - } - let mut spilled_table_ids = HashSet::new(); - let mut payload = HashMap::new(); - for table_id in table_ids { - let table_data = data - .unsync_data - .table_data - .get_mut(table_id) - .expect("should exist"); - for (instance_id, instance_data) in &mut table_data.instance_data { - let instance_payload = instance_data.spill(*epoch); - if !instance_payload.is_empty() { - payload.insert(*instance_id, instance_payload); - spilled_table_ids.insert(*table_id); - } - } - } - if !payload.is_empty() { let (task_id, task_size, spilled_table_ids) = data.task_manager .spill(&self.context, spilled_table_ids, payload); for table_id in spilled_table_ids { - data.unsync_data + spiller + .unsync_data() .table_data .get_mut(table_id) .expect("should exist") .spill_tasks - .entry(*epoch) + .entry(epoch) .or_default() .push_front(task_id); } @@ -1459,257 +1476,25 @@ impl HummockUploader { #[cfg(test)] pub(crate) mod tests { - use std::collections::{HashMap, HashSet, VecDeque}; + use std::collections::{HashMap, HashSet}; use std::future::{poll_fn, Future}; use std::ops::Deref; use std::pin::pin; use std::sync::atomic::AtomicUsize; - use std::sync::atomic::Ordering::{Relaxed, SeqCst}; - use std::sync::{Arc, LazyLock}; + use std::sync::atomic::Ordering::SeqCst; + use std::sync::Arc; use std::task::Poll; - use bytes::Bytes; - use futures::future::BoxFuture; use futures::FutureExt; - use itertools::Itertools; - use prometheus::core::GenericGauge; - use risingwave_common::catalog::TableId; - use risingwave_common::must_match; - use risingwave_common::util::epoch::{test_epoch, EpochExt}; - use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; - use risingwave_hummock_sdk::key::{FullKey, TableKey}; - use risingwave_hummock_sdk::version::HummockVersion; - use risingwave_hummock_sdk::{HummockEpoch, LocalSstableInfo}; - use risingwave_pb::hummock::{KeyRange, SstableInfo, StateTableInfoDelta}; - use spin::Mutex; - use tokio::spawn; - use tokio::sync::mpsc::unbounded_channel; + use risingwave_common::util::epoch::EpochExt; + use risingwave_hummock_sdk::HummockEpoch; use tokio::sync::oneshot; - use tokio::task::yield_now; - - use crate::hummock::event_handler::hummock_event_handler::BufferTracker; - use crate::hummock::event_handler::uploader::uploader_imm::UploaderImm; - use crate::hummock::event_handler::uploader::{ - get_payload_imm_ids, HummockUploader, SyncedData, TableUnsyncData, UploadTaskInfo, - UploadTaskOutput, UploadTaskPayload, UploaderContext, UploaderData, UploaderState, - UploadingTask, UploadingTaskId, - }; - use crate::hummock::event_handler::{LocalInstanceId, TEST_LOCAL_INSTANCE_ID}; - use crate::hummock::local_version::pinned_version::PinnedVersion; - use crate::hummock::shared_buffer::shared_buffer_batch::{ - SharedBufferBatch, SharedBufferBatchId, SharedBufferValue, - }; - use crate::hummock::{HummockError, HummockResult, MemoryLimiter}; - use crate::mem_table::{ImmId, ImmutableMemtable}; - use crate::monitor::HummockStateStoreMetrics; - use crate::opts::StorageOpts; - use crate::store::SealCurrentEpochOptions; - - const INITIAL_EPOCH: HummockEpoch = test_epoch(5); - pub(crate) const TEST_TABLE_ID: TableId = TableId { table_id: 233 }; - - pub trait UploadOutputFuture = - Future> + Send + 'static; - pub trait UploadFn = - Fn(UploadTaskPayload, UploadTaskInfo) -> Fut + Send + Sync + 'static; - - impl HummockUploader { - fn data(&self) -> &UploaderData { - must_match!(&self.state, UploaderState::Working(data) => data) - } - fn table_data(&self) -> &TableUnsyncData { - self.data() - .unsync_data - .table_data - .get(&TEST_TABLE_ID) - .expect("should exist") - } - - fn test_max_syncing_epoch(&self) -> HummockEpoch { - self.table_data().max_sync_epoch().unwrap() - } - - fn test_max_synced_epoch(&self) -> HummockEpoch { - self.table_data().max_synced_epoch.unwrap() - } - } - - fn test_hummock_version(epoch: HummockEpoch) -> HummockVersion { - let mut version = HummockVersion::default(); - version.id = epoch; - version.max_committed_epoch = epoch; - version.state_table_info.apply_delta( - &HashMap::from_iter([( - TEST_TABLE_ID, - StateTableInfoDelta { - committed_epoch: epoch, - safe_epoch: epoch, - compaction_group_id: StaticCompactionGroupId::StateDefault as _, - }, - )]), - &HashSet::new(), - ); - version - } - - fn initial_pinned_version() -> PinnedVersion { - PinnedVersion::new(test_hummock_version(INITIAL_EPOCH), unbounded_channel().0) - } - - fn dummy_table_key() -> Vec { - vec![b't', b'e', b's', b't'] - } - - async fn gen_imm_with_limiter( - epoch: HummockEpoch, - limiter: Option<&MemoryLimiter>, - ) -> ImmutableMemtable { - let sorted_items = vec![( - TableKey(Bytes::from(dummy_table_key())), - SharedBufferValue::Delete, - )]; - let size = SharedBufferBatch::measure_batch_size(&sorted_items, None).0; - let tracker = match limiter { - Some(limiter) => Some(limiter.require_memory(size as u64).await), - None => None, - }; - SharedBufferBatch::build_shared_buffer_batch( - epoch, - 0, - sorted_items, - None, - size, - TEST_TABLE_ID, - tracker, - ) - } - - pub(crate) async fn gen_imm(epoch: HummockEpoch) -> ImmutableMemtable { - gen_imm_with_limiter(epoch, None).await - } - - fn gen_sstable_info( - start_epoch: HummockEpoch, - end_epoch: HummockEpoch, - ) -> Vec { - let start_full_key = FullKey::new(TEST_TABLE_ID, TableKey(dummy_table_key()), start_epoch); - let end_full_key = FullKey::new(TEST_TABLE_ID, TableKey(dummy_table_key()), end_epoch); - let gen_sst_object_id = (start_epoch << 8) + end_epoch; - vec![LocalSstableInfo::for_test(SstableInfo { - object_id: gen_sst_object_id, - sst_id: gen_sst_object_id, - key_range: Some(KeyRange { - left: start_full_key.encode(), - right: end_full_key.encode(), - right_exclusive: true, - }), - table_ids: vec![TEST_TABLE_ID.table_id], - ..Default::default() - })] - } - - fn test_uploader_context(upload_fn: F) -> UploaderContext - where - Fut: UploadOutputFuture, - F: UploadFn, - { - let config = StorageOpts::default(); - UploaderContext::new( - initial_pinned_version(), - Arc::new(move |payload, task_info| spawn(upload_fn(payload, task_info))), - BufferTracker::for_test(), - &config, - Arc::new(HummockStateStoreMetrics::unused()), - ) - } - - fn test_uploader(upload_fn: F) -> HummockUploader - where - Fut: UploadOutputFuture, - F: UploadFn, - { - let config = StorageOpts { - ..Default::default() - }; - HummockUploader::new( - Arc::new(HummockStateStoreMetrics::unused()), - initial_pinned_version(), - Arc::new(move |payload, task_info| spawn(upload_fn(payload, task_info))), - BufferTracker::for_test(), - &config, - ) - } - - fn dummy_success_upload_output() -> UploadTaskOutput { - UploadTaskOutput { - new_value_ssts: gen_sstable_info(INITIAL_EPOCH, INITIAL_EPOCH), - old_value_ssts: vec![], - wait_poll_timer: None, - } - } - - #[allow(clippy::unused_async)] - async fn dummy_success_upload_future( - _: UploadTaskPayload, - _: UploadTaskInfo, - ) -> HummockResult { - Ok(dummy_success_upload_output()) - } - - #[allow(clippy::unused_async)] - async fn dummy_fail_upload_future( - _: UploadTaskPayload, - _: UploadTaskInfo, - ) -> HummockResult { - Err(HummockError::other("failed")) - } - - impl UploadingTask { - fn from_vec(imms: Vec, context: &UploaderContext) -> Self { - let input = HashMap::from_iter([( - TEST_LOCAL_INSTANCE_ID, - imms.into_iter().map(UploaderImm::for_test).collect_vec(), - )]); - static NEXT_TASK_ID: LazyLock = LazyLock::new(|| AtomicUsize::new(0)); - Self::new( - UploadingTaskId(NEXT_TASK_ID.fetch_add(1, Relaxed)), - input, - context, - ) - } - } - - fn get_imm_ids<'a>( - imms: impl IntoIterator, - ) -> HashMap> { - HashMap::from_iter([( - TEST_LOCAL_INSTANCE_ID, - imms.into_iter().map(|imm| imm.batch_id()).collect_vec(), - )]) - } - - impl HummockUploader { - fn local_seal_epoch_for_test(&mut self, instance_id: LocalInstanceId, epoch: HummockEpoch) { - self.local_seal_epoch( - instance_id, - epoch.next_epoch(), - SealCurrentEpochOptions::for_test(), - ); - } - - fn start_epochs_for_test(&mut self, epochs: impl IntoIterator) { - let mut last_epoch = None; - for epoch in epochs { - last_epoch = Some(epoch); - self.start_epoch(epoch, HashSet::from_iter([TEST_TABLE_ID])); - } - self.start_epoch( - last_epoch.unwrap().next_epoch(), - HashSet::from_iter([TEST_TABLE_ID]), - ); - } - } + use super::test_utils::*; + use crate::hummock::event_handler::uploader::{get_payload_imm_ids, SyncedData, UploadingTask}; + use crate::hummock::event_handler::TEST_LOCAL_INSTANCE_ID; + use crate::hummock::HummockError; + use crate::opts::StorageOpts; #[tokio::test] pub async fn test_uploading_task_future() { @@ -1973,82 +1758,6 @@ pub(crate) mod tests { assert_eq!(epoch6, uploader.test_max_syncing_epoch()); } - fn prepare_uploader_order_test( - config: &StorageOpts, - skip_schedule: bool, - ) -> ( - BufferTracker, - HummockUploader, - impl Fn(HashMap>) -> (BoxFuture<'static, ()>, oneshot::Sender<()>), - ) { - let gauge = GenericGauge::new("test", "test").unwrap(); - let buffer_tracker = BufferTracker::from_storage_opts(config, gauge); - // (the started task send the imm ids of payload, the started task wait for finish notify) - #[allow(clippy::type_complexity)] - let task_notifier_holder: Arc< - Mutex, oneshot::Receiver<()>)>>, - > = Arc::new(Mutex::new(VecDeque::new())); - - let new_task_notifier = { - let task_notifier_holder = task_notifier_holder.clone(); - move |imm_ids: HashMap>| { - let (start_tx, start_rx) = oneshot::channel(); - let (finish_tx, finish_rx) = oneshot::channel(); - task_notifier_holder - .lock() - .push_front((start_tx, finish_rx)); - let await_start_future = async move { - let task_info = start_rx.await.unwrap(); - assert_eq!(imm_ids, task_info.imm_ids); - } - .boxed(); - (await_start_future, finish_tx) - } - }; - - let config = StorageOpts::default(); - let uploader = HummockUploader::new( - Arc::new(HummockStateStoreMetrics::unused()), - initial_pinned_version(), - Arc::new({ - move |_, task_info: UploadTaskInfo| { - let task_notifier_holder = task_notifier_holder.clone(); - let task_item = task_notifier_holder.lock().pop_back(); - let start_epoch = *task_info.epochs.last().unwrap(); - let end_epoch = *task_info.epochs.first().unwrap(); - assert!(end_epoch >= start_epoch); - spawn(async move { - let ssts = gen_sstable_info(start_epoch, end_epoch); - if !skip_schedule { - let (start_tx, finish_rx) = task_item.unwrap(); - start_tx.send(task_info).unwrap(); - finish_rx.await.unwrap(); - } - Ok(UploadTaskOutput { - new_value_ssts: ssts, - old_value_ssts: vec![], - wait_poll_timer: None, - }) - }) - } - }), - buffer_tracker.clone(), - &config, - ); - (buffer_tracker, uploader, new_task_notifier) - } - - async fn assert_uploader_pending(uploader: &mut HummockUploader) { - for _ in 0..10 { - yield_now().await; - } - assert!( - poll_fn(|cx| Poll::Ready(uploader.next_uploaded_sst().poll_unpin(cx))) - .await - .is_pending() - ) - } - #[tokio::test] async fn test_uploader_finish_in_order() { let config = StorageOpts { diff --git a/src/storage/src/hummock/event_handler/uploader/spiller.rs b/src/storage/src/hummock/event_handler/uploader/spiller.rs new file mode 100644 index 0000000000000..a4caa3c05fe33 --- /dev/null +++ b/src/storage/src/hummock/event_handler/uploader/spiller.rs @@ -0,0 +1,427 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, HashSet}; + +use risingwave_common::catalog::TableId; +use risingwave_hummock_sdk::HummockEpoch; + +use crate::hummock::event_handler::uploader::{ + LocalInstanceUnsyncData, UnsyncData, UnsyncEpochId, UploadTaskInput, +}; +use crate::hummock::event_handler::LocalInstanceId; + +#[derive(Default)] +struct EpochSpillableDataInfo { + instance_ids: HashSet, + payload_size: usize, +} + +pub(super) struct Spiller<'a> { + unsync_data: &'a mut UnsyncData, + epoch_info: HashMap, + unsync_epoch_id_map: HashMap<(HummockEpoch, TableId), UnsyncEpochId>, +} + +impl<'a> Spiller<'a> { + pub(super) fn new(unsync_data: &'a mut UnsyncData) -> Self { + let unsync_epoch_id_map: HashMap<_, _> = unsync_data + .unsync_epochs + .iter() + .flat_map(|(unsync_epoch_id, table_ids)| { + let epoch = unsync_epoch_id.epoch(); + let unsync_epoch_id = *unsync_epoch_id; + table_ids + .iter() + .map(move |table_id| ((epoch, *table_id), unsync_epoch_id)) + }) + .collect(); + let mut epoch_info: HashMap<_, EpochSpillableDataInfo> = HashMap::new(); + for instance_data in unsync_data + .table_data + .values() + .flat_map(|table_data| table_data.instance_data.values()) + { + if let Some((epoch, spill_size)) = instance_data.spillable_data_info() { + let unsync_epoch_id = unsync_epoch_id_map + .get(&(epoch, instance_data.table_id)) + .expect("should exist"); + let epoch_info = epoch_info.entry(*unsync_epoch_id).or_default(); + assert!(epoch_info.instance_ids.insert(instance_data.instance_id)); + epoch_info.payload_size += spill_size; + } + } + Self { + unsync_data, + epoch_info, + unsync_epoch_id_map, + } + } + + pub(super) fn next_spilled_payload( + &mut self, + ) -> Option<(HummockEpoch, UploadTaskInput, HashSet)> { + if let Some(unsync_epoch_id) = self + .epoch_info + .iter() + .max_by_key(|(_, info)| info.payload_size) + .map(|(unsync_epoch_id, _)| *unsync_epoch_id) + { + let spill_epoch = unsync_epoch_id.epoch(); + let spill_info = self + .epoch_info + .remove(&unsync_epoch_id) + .expect("should exist"); + let epoch_info = &mut self.epoch_info; + let mut payload = HashMap::new(); + let mut spilled_table_ids = HashSet::new(); + for instance_id in spill_info.instance_ids { + let table_id = *self + .unsync_data + .instance_table_id + .get(&instance_id) + .expect("should exist"); + let instance_data = self + .unsync_data + .table_data + .get_mut(&table_id) + .expect("should exist") + .instance_data + .get_mut(&instance_id) + .expect("should exist"); + let instance_payload = instance_data.spill(spill_epoch); + assert!(!instance_payload.is_empty()); + payload.insert(instance_id, instance_payload); + spilled_table_ids.insert(table_id); + + // update the spill info + if let Some((new_spill_epoch, size)) = instance_data.spillable_data_info() { + let new_unsync_epoch_id = self + .unsync_epoch_id_map + .get(&(new_spill_epoch, instance_data.table_id)) + .expect("should exist"); + let info = epoch_info.entry(*new_unsync_epoch_id).or_default(); + assert!(info.instance_ids.insert(instance_id)); + info.payload_size += size; + } + } + Some((spill_epoch, payload, spilled_table_ids)) + } else { + None + } + } + + pub(super) fn unsync_data(&mut self) -> &mut UnsyncData { + self.unsync_data + } +} + +impl LocalInstanceUnsyncData { + fn spillable_data_info(&self) -> Option<(HummockEpoch, usize)> { + self.sealed_data + .back() + .or(self.current_epoch_data.as_ref()) + .and_then(|epoch_data| { + if !epoch_data.is_empty() { + Some(( + epoch_data.epoch, + epoch_data.imms.iter().map(|imm| imm.size()).sum(), + )) + } else { + None + } + }) + } +} + +#[cfg(test)] +mod tests { + use std::collections::{HashMap, HashSet}; + use std::ops::Deref; + + use futures::future::join_all; + use futures::FutureExt; + use itertools::Itertools; + use risingwave_common::catalog::TableId; + use risingwave_common::util::epoch::EpochExt; + use tokio::sync::oneshot; + + use crate::hummock::event_handler::uploader::test_utils::*; + use crate::opts::StorageOpts; + use crate::store::SealCurrentEpochOptions; + + #[tokio::test] + async fn test_spill_in_order() { + let config = StorageOpts { + shared_buffer_capacity_mb: 1024 * 1024, + shared_buffer_flush_ratio: 0.0, + ..Default::default() + }; + let (buffer_tracker, mut uploader, new_task_notifier) = + prepare_uploader_order_test(&config, false); + + let table_id1 = TableId::new(1); + let table_id2 = TableId::new(2); + + let instance_id1_1 = 1; + let instance_id1_2 = 2; + let instance_id2 = 3; + + let epoch1 = INITIAL_EPOCH.next_epoch(); + let epoch2 = epoch1.next_epoch(); + let epoch3 = epoch2.next_epoch(); + let epoch4 = epoch3.next_epoch(); + let memory_limiter = buffer_tracker.get_memory_limiter().clone(); + let memory_limiter = Some(memory_limiter.deref()); + + // epoch1 + uploader.start_epoch(epoch1, HashSet::from_iter([table_id1])); + uploader.start_epoch(epoch1, HashSet::from_iter([table_id2])); + + uploader.init_instance(instance_id1_1, table_id1, epoch1); + uploader.init_instance(instance_id1_2, table_id1, epoch1); + uploader.init_instance(instance_id2, table_id2, epoch1); + + // naming: imm__ + let imm1_1_1 = gen_imm_inner(table_id1, epoch1, 0, memory_limiter).await; + uploader.add_imm(instance_id1_1, imm1_1_1.clone()); + let imm1_2_1 = gen_imm_inner(table_id1, epoch1, 0, memory_limiter).await; + uploader.add_imm(instance_id1_2, imm1_2_1.clone()); + let imm2_1 = gen_imm_inner(table_id2, epoch1, 0, memory_limiter).await; + uploader.add_imm(instance_id2, imm2_1.clone()); + + // epoch2 + uploader.start_epoch(epoch2, HashSet::from_iter([table_id1])); + uploader.start_epoch(epoch2, HashSet::from_iter([table_id2])); + + uploader.local_seal_epoch(instance_id1_1, epoch2, SealCurrentEpochOptions::for_test()); + uploader.local_seal_epoch(instance_id1_2, epoch2, SealCurrentEpochOptions::for_test()); + uploader.local_seal_epoch(instance_id2, epoch2, SealCurrentEpochOptions::for_test()); + + let imms1_1_2 = join_all( + [0, 1, 2].map(|offset| gen_imm_inner(table_id1, epoch2, offset, memory_limiter)), + ) + .await; + for imm in imms1_1_2.clone() { + uploader.add_imm(instance_id1_1, imm); + } + + // epoch3 + uploader.start_epoch(epoch3, HashSet::from_iter([table_id1])); + uploader.start_epoch(epoch3, HashSet::from_iter([table_id2])); + + uploader.local_seal_epoch(instance_id1_1, epoch3, SealCurrentEpochOptions::for_test()); + uploader.local_seal_epoch(instance_id1_2, epoch3, SealCurrentEpochOptions::for_test()); + uploader.local_seal_epoch(instance_id2, epoch3, SealCurrentEpochOptions::for_test()); + + let imms1_2_3 = join_all( + [0, 1, 2, 3].map(|offset| gen_imm_inner(table_id1, epoch3, offset, memory_limiter)), + ) + .await; + for imm in imms1_2_3.clone() { + uploader.add_imm(instance_id1_2, imm); + } + + // epoch4 + uploader.start_epoch(epoch4, HashSet::from_iter([table_id1, table_id2])); + + uploader.local_seal_epoch(instance_id1_1, epoch4, SealCurrentEpochOptions::for_test()); + uploader.local_seal_epoch(instance_id1_2, epoch4, SealCurrentEpochOptions::for_test()); + uploader.local_seal_epoch(instance_id2, epoch4, SealCurrentEpochOptions::for_test()); + + let imm1_1_4 = gen_imm_inner(table_id1, epoch4, 0, memory_limiter).await; + uploader.add_imm(instance_id1_1, imm1_1_4.clone()); + let imm1_2_4 = gen_imm_inner(table_id1, epoch4, 0, memory_limiter).await; + uploader.add_imm(instance_id1_2, imm1_2_4.clone()); + let imm2_4_1 = gen_imm_inner(table_id2, epoch4, 0, memory_limiter).await; + uploader.add_imm(instance_id2, imm2_4_1.clone()); + + // uploader state: + // table_id1: + // instance_id1_1: instance_id1_2: instance_id2 + // epoch1 imm1_1_1 imm1_2_1 | imm2_1 | + // epoch2 imms1_1_2(size 3) | | + // epoch3 imms_1_2_3(size 4) | | + // epoch4 imm1_1_4 imm1_2_4 imm2_4_1 | + + let (await_start1_1, finish_tx1_1) = new_task_notifier(HashMap::from_iter([ + (instance_id1_1, vec![imm1_1_1.batch_id()]), + (instance_id1_2, vec![imm1_2_1.batch_id()]), + ])); + let (await_start3, finish_tx3) = new_task_notifier(HashMap::from_iter([( + instance_id1_2, + imms1_2_3 + .iter() + .rev() + .map(|imm| imm.batch_id()) + .collect_vec(), + )])); + let (await_start2, finish_tx2) = new_task_notifier(HashMap::from_iter([( + instance_id1_1, + imms1_1_2 + .iter() + .rev() + .map(|imm| imm.batch_id()) + .collect_vec(), + )])); + let (await_start1_4, finish_tx1_4) = new_task_notifier(HashMap::from_iter([ + (instance_id1_1, vec![imm1_1_4.batch_id()]), + (instance_id1_2, vec![imm1_2_4.batch_id()]), + ])); + let (await_start2_1, finish_tx2_1) = new_task_notifier(HashMap::from_iter([( + instance_id2, + vec![imm2_1.batch_id()], + )])); + let (await_start2_4_1, finish_tx2_4_1) = new_task_notifier(HashMap::from_iter([( + instance_id2, + vec![imm2_4_1.batch_id()], + )])); + + uploader.may_flush(); + await_start1_1.await; + await_start3.await; + await_start2.await; + await_start1_4.await; + await_start2_1.await; + await_start2_4_1.await; + + assert_uploader_pending(&mut uploader).await; + + let imm2_4_2 = gen_imm_inner(table_id2, epoch4, 1, memory_limiter).await; + uploader.add_imm(instance_id2, imm2_4_2.clone()); + + uploader.local_seal_epoch( + instance_id1_1, + u64::MAX, + SealCurrentEpochOptions::for_test(), + ); + uploader.local_seal_epoch( + instance_id1_2, + u64::MAX, + SealCurrentEpochOptions::for_test(), + ); + uploader.local_seal_epoch(instance_id2, u64::MAX, SealCurrentEpochOptions::for_test()); + + // uploader state: + // table_id1: + // instance_id1_1: instance_id1_2: instance_id2 + // epoch1 spill(imm1_1_1, imm1_2_1, size 2) | spill(imm2_1, size 1) | + // epoch2 spill(imms1_1_2, size 3) | | + // epoch3 spill(imms_1_2_3, size 4) | | + // epoch4 spill(imm1_1_4, imm1_2_4, size 2) | spill(imm2_4_1, size 1), imm2_4_2 | + + let (sync_tx1_1, sync_rx1_1) = oneshot::channel(); + uploader.start_sync_epoch(epoch1, sync_tx1_1, HashSet::from_iter([table_id1])); + let (sync_tx2_1, sync_rx2_1) = oneshot::channel(); + uploader.start_sync_epoch(epoch2, sync_tx2_1, HashSet::from_iter([table_id1])); + let (sync_tx3_1, sync_rx3_1) = oneshot::channel(); + uploader.start_sync_epoch(epoch3, sync_tx3_1, HashSet::from_iter([table_id1])); + let (sync_tx1_2, sync_rx1_2) = oneshot::channel(); + uploader.start_sync_epoch(epoch1, sync_tx1_2, HashSet::from_iter([table_id2])); + let (sync_tx2_2, sync_rx2_2) = oneshot::channel(); + uploader.start_sync_epoch(epoch2, sync_tx2_2, HashSet::from_iter([table_id2])); + let (sync_tx3_2, sync_rx3_2) = oneshot::channel(); + uploader.start_sync_epoch(epoch3, sync_tx3_2, HashSet::from_iter([table_id2])); + + let (await_start2_4_2, finish_tx2_4_2) = new_task_notifier(HashMap::from_iter([( + instance_id2, + vec![imm2_4_2.batch_id()], + )])); + + let (sync_tx4, mut sync_rx4) = oneshot::channel(); + uploader.start_sync_epoch(epoch4, sync_tx4, HashSet::from_iter([table_id1, table_id2])); + await_start2_4_2.await; + + finish_tx2_4_1.send(()).unwrap(); + finish_tx3.send(()).unwrap(); + finish_tx2.send(()).unwrap(); + finish_tx1_4.send(()).unwrap(); + assert_uploader_pending(&mut uploader).await; + + finish_tx1_1.send(()).unwrap(); + { + let imm_ids = HashMap::from_iter([ + (instance_id1_1, vec![imm1_1_1.batch_id()]), + (instance_id1_2, vec![imm1_2_1.batch_id()]), + ]); + let sst = uploader.next_uploaded_sst().await; + assert_eq!(&imm_ids, sst.imm_ids()); + let synced_data = sync_rx1_1.await.unwrap().unwrap(); + assert_eq!(synced_data.uploaded_ssts.len(), 1); + assert_eq!(&imm_ids, synced_data.uploaded_ssts[0].imm_ids()); + } + { + let imm_ids3 = HashMap::from_iter([( + instance_id1_2, + imms1_2_3 + .iter() + .rev() + .map(|imm| imm.batch_id()) + .collect_vec(), + )]); + let imm_ids2 = HashMap::from_iter([( + instance_id1_1, + imms1_1_2 + .iter() + .rev() + .map(|imm| imm.batch_id()) + .collect_vec(), + )]); + let sst = uploader.next_uploaded_sst().await; + assert_eq!(&imm_ids3, sst.imm_ids()); + let sst = uploader.next_uploaded_sst().await; + assert_eq!(&imm_ids2, sst.imm_ids()); + let synced_data = sync_rx2_1.await.unwrap().unwrap(); + assert_eq!(synced_data.uploaded_ssts.len(), 1); + assert_eq!(&imm_ids2, synced_data.uploaded_ssts[0].imm_ids()); + let synced_data = sync_rx3_1.await.unwrap().unwrap(); + assert_eq!(synced_data.uploaded_ssts.len(), 1); + assert_eq!(&imm_ids3, synced_data.uploaded_ssts[0].imm_ids()); + } + { + let imm_ids1_4 = HashMap::from_iter([ + (instance_id1_1, vec![imm1_1_4.batch_id()]), + (instance_id1_2, vec![imm1_2_4.batch_id()]), + ]); + let imm_ids2_1 = HashMap::from_iter([(instance_id2, vec![imm2_1.batch_id()])]); + let imm_ids2_4_1 = HashMap::from_iter([(instance_id2, vec![imm2_4_1.batch_id()])]); + finish_tx2_1.send(()).unwrap(); + let sst = uploader.next_uploaded_sst().await; + assert_eq!(&imm_ids1_4, sst.imm_ids()); + let sst = uploader.next_uploaded_sst().await; + assert_eq!(&imm_ids2_1, sst.imm_ids()); + let sst = uploader.next_uploaded_sst().await; + assert_eq!(&imm_ids2_4_1, sst.imm_ids()); + let synced_data = sync_rx1_2.await.unwrap().unwrap(); + assert_eq!(synced_data.uploaded_ssts.len(), 1); + assert_eq!(&imm_ids2_1, synced_data.uploaded_ssts[0].imm_ids()); + let synced_data = sync_rx2_2.await.unwrap().unwrap(); + assert!(synced_data.uploaded_ssts.is_empty()); + let synced_data = sync_rx3_2.await.unwrap().unwrap(); + assert!(synced_data.uploaded_ssts.is_empty()); + + let imm_ids2_4_2 = HashMap::from_iter([(instance_id2, vec![imm2_4_2.batch_id()])]); + + assert!((&mut sync_rx4).now_or_never().is_none()); + finish_tx2_4_2.send(()).unwrap(); + let sst = uploader.next_uploaded_sst().await; + assert_eq!(&imm_ids2_4_2, sst.imm_ids()); + let synced_data = sync_rx4.await.unwrap().unwrap(); + assert_eq!(synced_data.uploaded_ssts.len(), 3); + assert_eq!(&imm_ids2_4_2, synced_data.uploaded_ssts[0].imm_ids()); + assert_eq!(&imm_ids2_4_1, synced_data.uploaded_ssts[1].imm_ids()); + assert_eq!(&imm_ids1_4, synced_data.uploaded_ssts[2].imm_ids()); + } + } +} diff --git a/src/storage/src/hummock/event_handler/uploader/test_utils.rs b/src/storage/src/hummock/event_handler/uploader/test_utils.rs new file mode 100644 index 0000000000000..2fa574c72fc28 --- /dev/null +++ b/src/storage/src/hummock/event_handler/uploader/test_utils.rs @@ -0,0 +1,346 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![cfg(test)] + +use std::collections::{HashMap, HashSet, VecDeque}; +use std::future::{poll_fn, Future}; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::Relaxed; +use std::sync::{Arc, LazyLock}; +use std::task::Poll; + +use bytes::Bytes; +use futures::future::BoxFuture; +use futures::FutureExt; +use itertools::Itertools; +use prometheus::core::GenericGauge; +use risingwave_common::catalog::TableId; +use risingwave_common::must_match; +use risingwave_common::util::epoch::{test_epoch, EpochExt}; +use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; +use risingwave_hummock_sdk::key::{FullKey, TableKey}; +use risingwave_hummock_sdk::version::HummockVersion; +use risingwave_hummock_sdk::{HummockEpoch, LocalSstableInfo}; +use risingwave_pb::hummock::{KeyRange, SstableInfo, StateTableInfoDelta}; +use spin::Mutex; +use tokio::spawn; +use tokio::sync::mpsc::unbounded_channel; +use tokio::sync::oneshot; +use tokio::task::yield_now; + +use crate::hummock::event_handler::hummock_event_handler::BufferTracker; +use crate::hummock::event_handler::uploader::uploader_imm::UploaderImm; +use crate::hummock::event_handler::uploader::{ + HummockUploader, TableUnsyncData, UploadTaskInfo, UploadTaskOutput, UploadTaskPayload, + UploaderContext, UploaderData, UploaderState, UploadingTask, UploadingTaskId, +}; +use crate::hummock::event_handler::{LocalInstanceId, TEST_LOCAL_INSTANCE_ID}; +use crate::hummock::local_version::pinned_version::PinnedVersion; +use crate::hummock::shared_buffer::shared_buffer_batch::{ + SharedBufferBatch, SharedBufferBatchId, SharedBufferValue, +}; +use crate::hummock::{HummockError, HummockResult, MemoryLimiter}; +use crate::mem_table::{ImmId, ImmutableMemtable}; +use crate::monitor::HummockStateStoreMetrics; +use crate::opts::StorageOpts; +use crate::store::SealCurrentEpochOptions; + +pub(crate) const INITIAL_EPOCH: HummockEpoch = test_epoch(5); +pub(crate) const TEST_TABLE_ID: TableId = TableId { table_id: 233 }; + +pub trait UploadOutputFuture = Future> + Send + 'static; +pub trait UploadFn = + Fn(UploadTaskPayload, UploadTaskInfo) -> Fut + Send + Sync + 'static; + +impl HummockUploader { + pub(super) fn data(&self) -> &UploaderData { + must_match!(&self.state, UploaderState::Working(data) => data) + } + + pub(super) fn table_data(&self) -> &TableUnsyncData { + self.data() + .unsync_data + .table_data + .get(&TEST_TABLE_ID) + .expect("should exist") + } + + pub(super) fn test_max_syncing_epoch(&self) -> HummockEpoch { + self.table_data().max_sync_epoch().unwrap() + } + + pub(super) fn test_max_synced_epoch(&self) -> HummockEpoch { + self.table_data().max_synced_epoch.unwrap() + } +} + +pub(super) fn test_hummock_version(epoch: HummockEpoch) -> HummockVersion { + let mut version = HummockVersion::default(); + version.id = epoch; + version.max_committed_epoch = epoch; + version.state_table_info.apply_delta( + &HashMap::from_iter([( + TEST_TABLE_ID, + StateTableInfoDelta { + committed_epoch: epoch, + safe_epoch: epoch, + compaction_group_id: StaticCompactionGroupId::StateDefault as _, + }, + )]), + &HashSet::new(), + ); + version +} + +pub(super) fn initial_pinned_version() -> PinnedVersion { + PinnedVersion::new(test_hummock_version(INITIAL_EPOCH), unbounded_channel().0) +} + +pub(super) fn dummy_table_key() -> Vec { + vec![b't', b'e', b's', b't'] +} + +pub(super) async fn gen_imm_with_limiter( + epoch: HummockEpoch, + limiter: Option<&MemoryLimiter>, +) -> ImmutableMemtable { + gen_imm_inner(TEST_TABLE_ID, epoch, 0, limiter).await +} + +pub(super) async fn gen_imm_inner( + table_id: TableId, + epoch: HummockEpoch, + spill_offset: u16, + limiter: Option<&MemoryLimiter>, +) -> ImmutableMemtable { + let sorted_items = vec![( + TableKey(Bytes::from(dummy_table_key())), + SharedBufferValue::Delete, + )]; + let size = SharedBufferBatch::measure_batch_size(&sorted_items, None).0; + let tracker = match limiter { + Some(limiter) => Some(limiter.require_memory(size as u64).await), + None => None, + }; + SharedBufferBatch::build_shared_buffer_batch( + epoch, + spill_offset, + sorted_items, + None, + size, + table_id, + tracker, + ) +} + +pub(crate) async fn gen_imm(epoch: HummockEpoch) -> ImmutableMemtable { + gen_imm_with_limiter(epoch, None).await +} + +pub(super) fn gen_sstable_info( + start_epoch: HummockEpoch, + end_epoch: HummockEpoch, +) -> Vec { + let start_full_key = FullKey::new(TEST_TABLE_ID, TableKey(dummy_table_key()), start_epoch); + let end_full_key = FullKey::new(TEST_TABLE_ID, TableKey(dummy_table_key()), end_epoch); + let gen_sst_object_id = (start_epoch << 8) + end_epoch; + vec![LocalSstableInfo::for_test(SstableInfo { + object_id: gen_sst_object_id, + sst_id: gen_sst_object_id, + key_range: Some(KeyRange { + left: start_full_key.encode(), + right: end_full_key.encode(), + right_exclusive: true, + }), + table_ids: vec![TEST_TABLE_ID.table_id], + ..Default::default() + })] +} + +pub(super) fn test_uploader_context(upload_fn: F) -> UploaderContext +where + Fut: UploadOutputFuture, + F: UploadFn, +{ + let config = StorageOpts::default(); + UploaderContext::new( + initial_pinned_version(), + Arc::new(move |payload, task_info| spawn(upload_fn(payload, task_info))), + BufferTracker::for_test(), + &config, + Arc::new(HummockStateStoreMetrics::unused()), + ) +} + +pub(super) fn test_uploader(upload_fn: F) -> HummockUploader +where + Fut: UploadOutputFuture, + F: UploadFn, +{ + let config = StorageOpts { + ..Default::default() + }; + HummockUploader::new( + Arc::new(HummockStateStoreMetrics::unused()), + initial_pinned_version(), + Arc::new(move |payload, task_info| spawn(upload_fn(payload, task_info))), + BufferTracker::for_test(), + &config, + ) +} + +pub(super) fn dummy_success_upload_output() -> UploadTaskOutput { + UploadTaskOutput { + new_value_ssts: gen_sstable_info(INITIAL_EPOCH, INITIAL_EPOCH), + old_value_ssts: vec![], + wait_poll_timer: None, + } +} + +#[allow(clippy::unused_async)] +pub(super) async fn dummy_success_upload_future( + _: UploadTaskPayload, + _: UploadTaskInfo, +) -> HummockResult { + Ok(dummy_success_upload_output()) +} + +#[allow(clippy::unused_async)] +pub(super) async fn dummy_fail_upload_future( + _: UploadTaskPayload, + _: UploadTaskInfo, +) -> HummockResult { + Err(HummockError::other("failed")) +} + +impl UploadingTask { + pub(super) fn from_vec(imms: Vec, context: &UploaderContext) -> Self { + let input = HashMap::from_iter([( + TEST_LOCAL_INSTANCE_ID, + imms.into_iter().map(UploaderImm::for_test).collect_vec(), + )]); + static NEXT_TASK_ID: LazyLock = LazyLock::new(|| AtomicUsize::new(0)); + Self::new( + UploadingTaskId(NEXT_TASK_ID.fetch_add(1, Relaxed)), + input, + context, + ) + } +} + +pub(super) fn get_imm_ids<'a>( + imms: impl IntoIterator, +) -> HashMap> { + HashMap::from_iter([( + TEST_LOCAL_INSTANCE_ID, + imms.into_iter().map(|imm| imm.batch_id()).collect_vec(), + )]) +} + +impl HummockUploader { + pub(super) fn local_seal_epoch_for_test( + &mut self, + instance_id: LocalInstanceId, + epoch: HummockEpoch, + ) { + self.local_seal_epoch( + instance_id, + epoch.next_epoch(), + SealCurrentEpochOptions::for_test(), + ); + } + + pub(super) fn start_epochs_for_test(&mut self, epochs: impl IntoIterator) { + for epoch in epochs { + self.start_epoch(epoch, HashSet::from_iter([TEST_TABLE_ID])); + } + } +} + +pub(crate) fn prepare_uploader_order_test( + config: &StorageOpts, + skip_schedule: bool, +) -> ( + BufferTracker, + HummockUploader, + impl Fn(HashMap>) -> (BoxFuture<'static, ()>, oneshot::Sender<()>), +) { + let gauge = GenericGauge::new("test", "test").unwrap(); + let buffer_tracker = BufferTracker::from_storage_opts(config, gauge); + // (the started task send the imm ids of payload, the started task wait for finish notify) + #[allow(clippy::type_complexity)] + let task_notifier_holder: Arc< + Mutex, oneshot::Receiver<()>)>>, + > = Arc::new(Mutex::new(VecDeque::new())); + + let new_task_notifier = { + let task_notifier_holder = task_notifier_holder.clone(); + move |imm_ids: HashMap>| { + let (start_tx, start_rx) = oneshot::channel(); + let (finish_tx, finish_rx) = oneshot::channel(); + task_notifier_holder + .lock() + .push_front((start_tx, finish_rx)); + let await_start_future = async move { + let task_info = start_rx.await.unwrap(); + assert_eq!(imm_ids, task_info.imm_ids); + } + .boxed(); + (await_start_future, finish_tx) + } + }; + + let config = StorageOpts::default(); + let uploader = HummockUploader::new( + Arc::new(HummockStateStoreMetrics::unused()), + initial_pinned_version(), + Arc::new({ + move |_, task_info: UploadTaskInfo| { + let task_notifier_holder = task_notifier_holder.clone(); + let task_item = task_notifier_holder.lock().pop_back(); + let start_epoch = *task_info.epochs.last().unwrap(); + let end_epoch = *task_info.epochs.first().unwrap(); + assert!(end_epoch >= start_epoch); + spawn(async move { + let ssts = gen_sstable_info(start_epoch, end_epoch); + if !skip_schedule { + let (start_tx, finish_rx) = task_item.unwrap(); + start_tx.send(task_info).unwrap(); + finish_rx.await.unwrap(); + } + Ok(UploadTaskOutput { + new_value_ssts: ssts, + old_value_ssts: vec![], + wait_poll_timer: None, + }) + }) + } + }), + buffer_tracker.clone(), + &config, + ); + (buffer_tracker, uploader, new_task_notifier) +} + +pub(crate) async fn assert_uploader_pending(uploader: &mut HummockUploader) { + for _ in 0..10 { + yield_now().await; + } + assert!( + poll_fn(|cx| Poll::Ready(uploader.next_uploaded_sst().poll_unpin(cx))) + .await + .is_pending() + ) +} diff --git a/src/stream/src/common/table/test_state_table.rs b/src/stream/src/common/table/test_state_table.rs index 72ffa72479cf2..d6e9c9bed5b94 100644 --- a/src/stream/src/common/table/test_state_table.rs +++ b/src/stream/src/common/table/test_state_table.rs @@ -1963,9 +1963,6 @@ async fn test_replicated_state_table_replication() { ])); epoch.inc_for_test(); - test_env - .storage - .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); test_env .storage .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); @@ -2029,9 +2026,6 @@ async fn test_replicated_state_table_replication() { replicated_state_table.write_chunk(replicate_chunk); epoch.inc_for_test(); - test_env - .storage - .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); test_env .storage .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); diff --git a/src/stream/src/common/table/test_storage_table.rs b/src/stream/src/common/table/test_storage_table.rs index 1f130330e3be4..1eb552271dced 100644 --- a/src/stream/src/common/table/test_storage_table.rs +++ b/src/stream/src/common/table/test_storage_table.rs @@ -583,9 +583,6 @@ async fn test_batch_scan_chunk_with_value_indices() { .collect_vec(); epoch.inc_for_test(); - test_env - .storage - .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID])); test_env .storage .start_epoch(epoch.curr, HashSet::from_iter([TEST_TABLE_ID]));