diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 5c04d75b2292..ef99a9a9a89d 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -50,6 +50,7 @@ mod set_readonly_test; mod truncate_test; use std::any::Any; +use std::collections::HashMap; use std::sync::Arc; use std::time::Instant; @@ -58,22 +59,29 @@ use async_trait::async_trait; use common_error::ext::BoxedError; use common_recordbatch::SendableRecordBatchStream; use common_telemetry::tracing; +use common_wal::options::{WalOptions, WAL_OPTIONS_KEY}; +use futures::future::try_join_all; use object_store::manager::ObjectStoreManagerRef; use snafu::{ensure, OptionExt, ResultExt}; +use store_api::logstore::provider::Provider; use store_api::logstore::LogStore; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse}; -use store_api::region_request::{AffectedRows, RegionRequest}; +use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest}; use store_api::storage::{RegionId, ScanRequest}; -use tokio::sync::oneshot; +use tokio::sync::{oneshot, Semaphore}; use crate::config::MitoConfig; -use crate::error::{InvalidRequestSnafu, RecvSnafu, RegionNotFoundSnafu, Result}; +use crate::error::{ + InvalidRequestSnafu, JoinSnafu, RecvSnafu, RegionNotFoundSnafu, Result, SerdeJsonSnafu, +}; use crate::manifest::action::RegionEdit; use crate::metrics::HANDLE_REQUEST_ELAPSED; use crate::read::scan_region::{ScanParallism, ScanRegion, Scanner}; use crate::region::RegionUsage; use crate::request::WorkerRequest; +use crate::wal::entry_distributor::build_wal_entry_distributor_and_receivers; +use crate::wal::raw_entry_reader::{LogStoreRawEntryReader, RawEntryReader}; use crate::worker::WorkerGroup; pub const MITO_ENGINE_NAME: &str = "mito"; @@ -211,6 +219,45 @@ struct EngineInner { workers: WorkerGroup, /// Config of the engine. config: Arc, + /// The Wal raw entry reader. + wal_raw_entry_reader: Arc, +} + +type TopicGroupedRegionOpenRequests = HashMap>; + +/// **For Kafka Remote Wal:** +/// Group requests by topic. +/// +/// **For RaftEngine Wal:** +/// Do nothing +fn prepare_batch_open_requests( + requests: Vec<(RegionId, RegionOpenRequest)>, +) -> Result<( + TopicGroupedRegionOpenRequests, + Vec<(RegionId, RegionOpenRequest)>, +)> { + let mut topic_to_regions: HashMap> = HashMap::new(); + let mut remaining_regions: Vec<(RegionId, RegionOpenRequest)> = Vec::new(); + for (region_id, request) in requests { + let options = if let Some(options) = request.options.get(WAL_OPTIONS_KEY) { + serde_json::from_str(options).context(SerdeJsonSnafu)? + } else { + WalOptions::RaftEngine + }; + match options { + WalOptions::Kafka(options) => { + topic_to_regions + .entry(options.topic) + .or_default() + .push((region_id, request)); + } + WalOptions::RaftEngine => { + remaining_regions.push((region_id, request)); + } + } + } + + Ok((topic_to_regions, remaining_regions)) } impl EngineInner { @@ -221,9 +268,11 @@ impl EngineInner { object_store_manager: ObjectStoreManagerRef, ) -> Result { let config = Arc::new(config); + let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone())); Ok(EngineInner { workers: WorkerGroup::start(config.clone(), log_store, object_store_manager).await?, config, + wal_raw_entry_reader, }) } @@ -244,6 +293,96 @@ impl EngineInner { Ok(region.metadata()) } + async fn open_topic_regions( + &self, + topic: String, + region_requests: Vec<(RegionId, RegionOpenRequest)>, + ) -> Result<()> { + let mut response_receivers = Vec::with_capacity(region_requests.len()); + let region_ids = region_requests + .iter() + .map(|(region_id, _)| *region_id) + .collect(); + let provider = Provider::kafka_provider(topic); + let (distributor, entry_receivers) = build_wal_entry_distributor_and_receivers( + provider, + self.wal_raw_entry_reader.clone(), + region_ids, + 2048, + ); + + for (region_id, request, receiver) in region_requests.into_iter().zip(entry_receivers).map( + |((region_id, request), entry_receiver)| { + let (request, receiver) = WorkerRequest::new_open_region_request( + region_id, + request, + Some(entry_receiver), + ); + + (region_id, request, receiver) + }, + ) { + self.workers.submit_to_worker(region_id, request).await?; + response_receivers.push((region_id, receiver)); + } + // Waits for entries distribution. + let distribution = + common_runtime::spawn_read(async move { distributor.distribute().await }); + + // Waits for worker returns. + try_join_all( + response_receivers + .into_iter() + .map(|(_, receiver)| async { receiver.await.context(RecvSnafu)? }), + ) + .await?; + + let _ = distribution.await.context(JoinSnafu)?; + Ok(()) + } + + async fn handle_batch_open_requests( + &self, + parallelism: usize, + requests: Vec<(RegionId, RegionOpenRequest)>, + ) -> Result<()> { + let semaphore = Arc::new(Semaphore::new(parallelism)); + let (topic_to_region_requests, remaining_region_requests) = + prepare_batch_open_requests(requests)?; + + if !topic_to_region_requests.is_empty() { + let mut tasks = Vec::with_capacity(topic_to_region_requests.len()); + for (topic, region_requests) in topic_to_region_requests { + let self_ref = self; + let semaphore_moved = semaphore.clone(); + tasks.push(async move { + // Safety: semaphore must exist + let _permit = semaphore_moved.acquire().await.unwrap(); + self_ref.open_topic_regions(topic, region_requests).await + }) + } + try_join_all(tasks).await?; + } + + if !remaining_region_requests.is_empty() { + let mut tasks = Vec::with_capacity(remaining_region_requests.len()); + for (region_id, request) in remaining_region_requests { + tasks.push(async move { + let (request, receiver) = + WorkerRequest::new_open_region_request(region_id, request, None); + + self.workers.submit_to_worker(region_id, request).await?; + + receiver.await.context(RecvSnafu)? + }) + } + + try_join_all(tasks).await?; + } + + Ok(()) + } + /// Handles [RegionRequest] and return its executed result. async fn handle_request( &self, @@ -323,6 +462,19 @@ impl RegionEngine for MitoEngine { MITO_ENGINE_NAME } + #[tracing::instrument(skip_all)] + async fn handle_batch_open_requests( + &self, + parallelism: usize, + requests: Vec<(RegionId, RegionOpenRequest)>, + ) -> Result<(), BoxedError> { + // TODO(weny): add metrics. + self.inner + .handle_batch_open_requests(parallelism, requests) + .await + .map_err(BoxedError::new) + } + #[tracing::instrument(skip_all)] async fn handle_request( &self, @@ -421,6 +573,7 @@ impl MitoEngine { config.sanitize(data_home)?; let config = Arc::new(config); + let wal_raw_entry_reader = Arc::new(LogStoreRawEntryReader::new(log_store.clone())); Ok(MitoEngine { inner: Arc::new(EngineInner { workers: WorkerGroup::start_for_test( @@ -433,6 +586,7 @@ impl MitoEngine { ) .await?, config, + wal_raw_entry_reader, }), }) } diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index ed9cbf037b30..5ed8e4f8e015 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -20,6 +20,7 @@ use std::sync::Arc; use common_telemetry::{debug, error, info, warn}; use common_wal::options::WalOptions; +use futures::future::BoxFuture; use futures::StreamExt; use object_store::manager::ObjectStoreManagerRef; use object_store::util::{join_dir, normalize_dir}; @@ -48,6 +49,7 @@ use crate::schedule::scheduler::SchedulerRef; use crate::sst::file_purger::LocalFilePurger; use crate::sst::index::intermediate::IntermediateManager; use crate::time_provider::{StdTimeProvider, TimeProviderRef}; +use crate::wal::entry_reader::WalEntryReader; use crate::wal::{EntryId, Wal}; /// Builder to create a new [MitoRegion] or open an existing one. @@ -64,6 +66,7 @@ pub(crate) struct RegionOpener { intermediate_manager: IntermediateManager, time_provider: Option, stats: ManifestStats, + wal_entry_reader: Option>, } impl RegionOpener { @@ -89,6 +92,7 @@ impl RegionOpener { intermediate_manager, time_provider: None, stats: Default::default(), + wal_entry_reader: None, } } @@ -104,6 +108,12 @@ impl RegionOpener { Ok(self) } + /// If a [WalEntryReader] is set + pub(crate) fn wal_entry_reader(mut self, wal_entry_reader: Box) -> Self { + self.wal_entry_reader = Some(wal_entry_reader); + self + } + /// Sets options for the region. pub(crate) fn options(mut self, options: RegionOptions) -> Self { self.options = Some(options); @@ -165,8 +175,8 @@ impl RegionOpener { } } let options = self.options.take().unwrap(); - let provider = self.provider(&options.wal_options); let object_store = self.object_store(&options.storage)?.clone(); + let provider = self.provider(&options.wal_options); // Create a manifest manager for this region and writes regions to the manifest file. let region_manifest_options = self.manifest_options(config, &options)?; @@ -231,7 +241,7 @@ impl RegionOpener { /// /// Returns error if the region doesn't exist. pub(crate) async fn open( - self, + mut self, config: &MitoConfig, wal: &Wal, ) -> Result { @@ -267,7 +277,7 @@ impl RegionOpener { /// Tries to open the region and returns `None` if the region directory is empty. async fn maybe_open( - &self, + &mut self, config: &MitoConfig, wal: &Wal, ) -> Result> { @@ -288,6 +298,11 @@ impl RegionOpener { let region_id = self.region_id; let provider = self.provider(®ion_options.wal_options); + let wal_entry_reader = self + .wal_entry_reader + .take() + .unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id)); + let on_region_opened = wal.on_region_opened(); let object_store = self.object_store(®ion_options.storage)?.clone(); debug!("Open region {} with options: {:?}", region_id, self.options); @@ -331,12 +346,13 @@ impl RegionOpener { region_id ); replay_memtable( - wal, &provider, + wal_entry_reader, region_id, flushed_entry_id, &version_control, config.allow_stale_entries, + on_region_opened, ) .await?; } else { @@ -357,7 +373,7 @@ impl RegionOpener { RegionState::ReadOnly, )), file_purger, - provider, + provider: provider.clone(), last_flush_millis: AtomicI64::new(time_provider.current_time_millis()), time_provider, memtable_builder, @@ -448,21 +464,25 @@ pub(crate) fn check_recovered_region( } /// Replays the mutations from WAL and inserts mutations to memtable of given region. -pub(crate) async fn replay_memtable( - wal: &Wal, +pub(crate) async fn replay_memtable( provider: &Provider, + mut wal_entry_reader: Box, region_id: RegionId, flushed_entry_id: EntryId, version_control: &VersionControlRef, allow_stale_entries: bool, -) -> Result { + on_region_opened: F, +) -> Result +where + F: FnOnce(RegionId, EntryId, &Provider) -> BoxFuture> + Send, +{ let mut rows_replayed = 0; // Last entry id should start from flushed entry id since there might be no // data in the WAL. let mut last_entry_id = flushed_entry_id; let replay_from_entry_id = flushed_entry_id + 1; - let mut wal_stream = wal.scan(region_id, replay_from_entry_id, provider)?; + let mut wal_stream = wal_entry_reader.read(provider, replay_from_entry_id)?; while let Some(res) = wal_stream.next().await { let (entry_id, entry) = res?; if entry_id <= flushed_entry_id { @@ -496,7 +516,7 @@ pub(crate) async fn replay_memtable( // TODO(weny): We need to update `flushed_entry_id` in the region manifest // to avoid reading potentially incomplete entries in the future. - wal.obsolete(region_id, flushed_entry_id, provider).await?; + (on_region_opened)(region_id, flushed_entry_id, provider).await?; info!( "Replay WAL for region: {}, rows recovered: {}, last entry id: {}", diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index b7eed9327561..efd796b44db9 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -44,6 +44,7 @@ use crate::error::{ }; use crate::manifest::action::RegionEdit; use crate::metrics::COMPACTION_ELAPSED_TOTAL; +use crate::wal::entry_distributor::WalEntryReceiver; use crate::wal::EntryId; /// Request to write a region. @@ -497,6 +498,22 @@ pub(crate) enum WorkerRequest { } impl WorkerRequest { + pub(crate) fn new_open_region_request( + region_id: RegionId, + request: RegionOpenRequest, + entry_receiver: Option, + ) -> (WorkerRequest, Receiver>) { + let (sender, receiver) = oneshot::channel(); + + let worker_request = WorkerRequest::Ddl(SenderDdlRequest { + region_id, + sender: sender.into(), + request: DdlRequest::Open((request, entry_receiver)), + }); + + (worker_request, receiver) + } + /// Converts request from a [RegionRequest]. pub(crate) fn try_from_region_request( region_id: RegionId, @@ -531,7 +548,7 @@ impl WorkerRequest { RegionRequest::Open(v) => WorkerRequest::Ddl(SenderDdlRequest { region_id, sender: sender.into(), - request: DdlRequest::Open(v), + request: DdlRequest::Open((v, None)), }), RegionRequest::Close(v) => WorkerRequest::Ddl(SenderDdlRequest { region_id, @@ -585,7 +602,7 @@ impl WorkerRequest { pub(crate) enum DdlRequest { Create(RegionCreateRequest), Drop(RegionDropRequest), - Open(RegionOpenRequest), + Open((RegionOpenRequest, Option)), Close(RegionCloseRequest), Alter(RegionAlterRequest), Flush(RegionFlushRequest), diff --git a/src/mito2/src/wal.rs b/src/mito2/src/wal.rs index c0cca52dcbbd..042fdf926478 100644 --- a/src/mito2/src/wal.rs +++ b/src/mito2/src/wal.rs @@ -30,6 +30,7 @@ use std::sync::Arc; use api::v1::WalEntry; use common_error::ext::BoxedError; +use futures::future::BoxFuture; use futures::stream::BoxStream; use prost::Message; use snafu::ResultExt; @@ -86,6 +87,39 @@ impl Wal { } } + /// Returns a [OnRegionOpened] function. + pub(crate) fn on_region_opened( + &self, + ) -> impl FnOnce(RegionId, EntryId, &Provider) -> BoxFuture> { + let store = self.store.clone(); + move |region_id, last_entry_id, provider| -> BoxFuture<'_, Result<()>> { + Box::pin(async move { + store + .obsolete(provider, last_entry_id) + .await + .map_err(BoxedError::new) + .context(DeleteWalSnafu { region_id }) + }) + } + } + + /// Returns a [WalEntryReader] + pub(crate) fn wal_entry_reader( + &self, + provider: &Provider, + region_id: RegionId, + ) -> Box { + match provider { + Provider::RaftEngine(_) => Box::new(LogStoreEntryReader::new( + LogStoreRawEntryReader::new(self.store.clone()), + )), + Provider::Kafka(_) => Box::new(LogStoreEntryReader::new(RegionRawEntryReader::new( + LogStoreRawEntryReader::new(self.store.clone()), + region_id, + ))), + } + } + /// Scan entries of specific region starting from `start_id` (inclusive). pub fn scan<'a>( &'a self, diff --git a/src/mito2/src/wal/entry_distributor.rs b/src/mito2/src/wal/entry_distributor.rs index 7e45f9123633..658e0f1ad0d6 100644 --- a/src/mito2/src/wal/entry_distributor.rs +++ b/src/mito2/src/wal/entry_distributor.rs @@ -20,7 +20,7 @@ use api::v1::WalEntry; use async_stream::stream; use common_telemetry::{debug, error}; use futures::future::join_all; -use snafu::ensure; +use snafu::{ensure, OptionExt}; use store_api::logstore::entry::Entry; use store_api::logstore::provider::Provider; use store_api::storage::RegionId; @@ -109,9 +109,9 @@ impl WalEntryDistributor { pub(crate) struct WalEntryReceiver { region_id: RegionId, /// Receives the [Entry] from the [WalEntryDistributor]. - entry_receiver: Receiver, + entry_receiver: Option>, /// Sends the `start_id` to the [WalEntryDistributor]. - arg_sender: oneshot::Sender, + arg_sender: Option>, } impl WalEntryReceiver { @@ -122,19 +122,22 @@ impl WalEntryReceiver { ) -> Self { Self { region_id, - entry_receiver, - arg_sender, + entry_receiver: Some(entry_receiver), + arg_sender: Some(arg_sender), } } } impl WalEntryReader for WalEntryReceiver { - fn read(self, provider: &Provider, start_id: EntryId) -> Result> { - let WalEntryReceiver { - region_id: expected_region_id, - mut entry_receiver, - arg_sender, - } = self; + fn read(&mut self, provider: &Provider, start_id: EntryId) -> Result> { + let mut arg_sender = self + .arg_sender + .take() + .context(error::InvalidWalReadRequestSnafu { + reason: format!("Call WalEntryReceiver multiple time, start_id: {start_id}"), + })?; + // Safety: check via arg_sender + let mut entry_receiver = self.entry_receiver.take().unwrap(); if arg_sender.send(start_id).is_err() { return error::InvalidWalReadRequestSnafu { @@ -340,7 +343,7 @@ mod tests { drop(last); let mut streams = receivers - .into_iter() + .iter_mut() .map(|receiver| receiver.read(&provider, 0).unwrap()) .collect::>(); distributor.distribute().await.unwrap(); @@ -441,7 +444,7 @@ mod tests { ); assert_eq!(receivers.len(), 3); let mut streams = receivers - .into_iter() + .iter_mut() .map(|receiver| receiver.read(&provider, 0).unwrap()) .collect::>(); distributor.distribute().await.unwrap(); @@ -524,7 +527,7 @@ mod tests { ); assert_eq!(receivers.len(), 2); let mut streams = receivers - .into_iter() + .iter_mut() .map(|receiver| receiver.read(&provider, 0).unwrap()) .collect::>(); distributor.distribute().await.unwrap(); @@ -616,7 +619,7 @@ mod tests { ); assert_eq!(receivers.len(), 2); let mut streams = receivers - .into_iter() + .iter_mut() .map(|receiver| receiver.read(&provider, 4).unwrap()) .collect::>(); distributor.distribute().await.unwrap(); diff --git a/src/mito2/src/wal/entry_reader.rs b/src/mito2/src/wal/entry_reader.rs index c88a7b76a02f..4b50e8fd6940 100644 --- a/src/mito2/src/wal/entry_reader.rs +++ b/src/mito2/src/wal/entry_reader.rs @@ -38,7 +38,7 @@ pub(crate) fn decode_raw_entry(raw_entry: Entry) -> Result<(EntryId, WalEntry)> /// [WalEntryReader] provides the ability to read and decode entries from the underlying store. pub(crate) trait WalEntryReader: Send + Sync { - fn read(self, ns: &'_ Provider, start_id: EntryId) -> Result>; + fn read(&mut self, ns: &'_ Provider, start_id: EntryId) -> Result>; } /// A Reader reads the [RawEntry] from [RawEntryReader] and decodes [RawEntry] into [WalEntry]. @@ -53,7 +53,7 @@ impl LogStoreEntryReader { } impl WalEntryReader for LogStoreEntryReader { - fn read(self, ns: &'_ Provider, start_id: EntryId) -> Result> { + fn read(&mut self, ns: &'_ Provider, start_id: EntryId) -> Result> { let LogStoreEntryReader { reader } = self; let mut stream = reader.read(ns, start_id)?; @@ -135,7 +135,7 @@ mod tests { ], }; - let reader = LogStoreEntryReader::new(raw_entry_stream); + let mut reader = LogStoreEntryReader::new(raw_entry_stream); let entries = reader .read(&provider, 0) .unwrap() @@ -171,7 +171,7 @@ mod tests { ], }; - let reader = LogStoreEntryReader::new(raw_entry_stream); + let mut reader = LogStoreEntryReader::new(raw_entry_stream); let err = reader .read(&provider, 0) .unwrap() diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 2122a052c839..b34d5b8566ec 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -721,8 +721,8 @@ impl RegionWorkerLoop { let res = match ddl.request { DdlRequest::Create(req) => self.handle_create_request(ddl.region_id, req).await, DdlRequest::Drop(_) => self.handle_drop_request(ddl.region_id).await, - DdlRequest::Open(req) => { - self.handle_open_request(ddl.region_id, req, ddl.sender) + DdlRequest::Open((req, wal_entry_receiver)) => { + self.handle_open_request(ddl.region_id, req, wal_entry_receiver, ddl.sender) .await; continue; } diff --git a/src/mito2/src/worker/handle_catchup.rs b/src/mito2/src/worker/handle_catchup.rs index 595b6ee56635..93a84b92e272 100644 --- a/src/mito2/src/worker/handle_catchup.rs +++ b/src/mito2/src/worker/handle_catchup.rs @@ -73,13 +73,16 @@ impl RegionWorkerLoop { let flushed_entry_id = region.version_control.current().last_entry_id; info!("Trying to replay memtable for region: {region_id}, flushed entry id: {flushed_entry_id}"); let timer = Instant::now(); + let wal_entry_reader = self.wal.wal_entry_reader(®ion.provider, region_id); + let on_region_opened = self.wal.on_region_opened(); let last_entry_id = replay_memtable( - &self.wal, ®ion.provider, + wal_entry_reader, region_id, flushed_entry_id, ®ion.version_control, self.config.allow_stale_entries, + on_region_opened, ) .await?; info!( diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index 6c63abf1535c..ac804e394d95 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -29,6 +29,7 @@ use crate::error::{ use crate::metrics::REGION_COUNT; use crate::region::opener::RegionOpener; use crate::request::OptionOutputTx; +use crate::wal::entry_distributor::WalEntryReceiver; use crate::worker::handle_drop::remove_region_dir_once; use crate::worker::{RegionWorkerLoop, DROPPING_MARKER_FILE}; @@ -66,6 +67,7 @@ impl RegionWorkerLoop { &mut self, region_id: RegionId, request: RegionOpenRequest, + wal_entry_receiver: Option, sender: OptionOutputTx, ) { if self.regions.is_region_exists(region_id) { @@ -85,7 +87,7 @@ impl RegionWorkerLoop { info!("Try to open region {}", region_id); // Open region from specific region dir. - let opener = match RegionOpener::new( + let mut opener = match RegionOpener::new( region_id, &request.region_dir, self.memtable_builder_provider.clone(), @@ -103,6 +105,9 @@ impl RegionWorkerLoop { return; } }; + if let Some(wal_entry_receiver) = wal_entry_receiver { + opener = opener.wal_entry_reader(Box::new(wal_entry_receiver)); + }; let regions = self.regions.clone(); let wal = self.wal.clone(); diff --git a/src/store-api/Cargo.toml b/src/store-api/Cargo.toml index 821f98c6fbf7..5b6f5ddc1603 100644 --- a/src/store-api/Cargo.toml +++ b/src/store-api/Cargo.toml @@ -26,8 +26,8 @@ serde.workspace = true serde_json.workspace = true snafu.workspace = true strum.workspace = true +tokio.workspace = true [dev-dependencies] async-stream.workspace = true serde_json.workspace = true -tokio.workspace = true diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 3f9d58a95588..0a3de1e389d8 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -26,12 +26,14 @@ use common_query::error::ExecuteRepeatedlySnafu; use common_recordbatch::SendableRecordBatchStream; use datafusion_physical_plan::{DisplayAs, DisplayFormatType}; use datatypes::schema::SchemaRef; +use futures::future::try_join_all; use serde::{Deserialize, Serialize}; use snafu::OptionExt; +use tokio::sync::Semaphore; use crate::logstore::entry; use crate::metadata::RegionMetadataRef; -use crate::region_request::RegionRequest; +use crate::region_request::{RegionOpenRequest, RegionRequest}; use crate::storage::{RegionId, ScanRequest}; /// The result of setting readonly for the region. @@ -182,6 +184,32 @@ pub trait RegionEngine: Send + Sync { /// Name of this engine fn name(&self) -> &str; + /// Handles batch open region requests. + async fn handle_batch_open_requests( + &self, + parallelism: usize, + requests: Vec<(RegionId, RegionOpenRequest)>, + ) -> Result<(), BoxedError> { + let semaphore = Arc::new(Semaphore::new(parallelism)); + let mut tasks = Vec::with_capacity(parallelism); + + for (region_id, request) in requests { + let semaphore_moved = semaphore.clone(); + + tasks.push(async move { + // Safety: semaphore must exist + let _permit = semaphore_moved.acquire().await.unwrap(); + self.handle_request(region_id, RegionRequest::Open(request)) + .await + .map(|response| (region_id, response)) + }); + } + + let _ = try_join_all(tasks).await?; + + Ok(()) + } + /// Handles non-query request to the region. Returns the count of affected rows. async fn handle_request( &self,