diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 772777d3ad53..af6001e4308e 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -21,6 +21,8 @@ mod append_mode_test; #[cfg(test)] mod basic_test; #[cfg(test)] +mod batch_open_test; +#[cfg(test)] mod catchup_test; #[cfg(test)] mod close_test; @@ -299,8 +301,8 @@ impl EngineInner { &self, topic: String, region_requests: Vec<(RegionId, RegionOpenRequest)>, - ) -> Result>> { - let mut response_receivers = Vec::with_capacity(region_requests.len()); + ) -> Result)>> { + let mut responses = Vec::with_capacity(region_requests.len()); let region_ids = region_requests .iter() .map(|(region_id, _)| *region_id) @@ -325,21 +327,18 @@ impl EngineInner { }, ) { self.workers.submit_to_worker(region_id, request).await?; - response_receivers.push((region_id, receiver)); + responses.push((region_id, async move { receiver.await.context(RecvSnafu)? })); } // Waits for entries distribution. let distribution = common_runtime::spawn_read(async move { distributor.distribute().await }); // Waits for worker returns. - let responses = join_all(response_receivers.into_iter().map( - |(region_id, receiver)| async move { - receiver - .await - .context(RecvSnafu)? - .map(|response| (region_id, response)) - }, - )) + let responses = join_all( + responses + .into_iter() + .map(|(region_id, rows)| async move { (region_id, rows.await) }), + ) .await; let _ = distribution.await.context(JoinSnafu)?; @@ -350,7 +349,7 @@ impl EngineInner { &self, parallelism: usize, requests: Vec<(RegionId, RegionOpenRequest)>, - ) -> Result>> { + ) -> Result)>> { let semaphore = Arc::new(Semaphore::new(parallelism)); let (topic_to_region_requests, remaining_region_requests) = prepare_batch_open_requests(requests)?; @@ -375,20 +374,23 @@ impl EngineInner { if !remaining_region_requests.is_empty() { let mut tasks = Vec::with_capacity(remaining_region_requests.len()); for (region_id, request) in remaining_region_requests { - tasks.push(async move { + tasks.push((region_id, async move { let (request, receiver) = WorkerRequest::new_open_region_request(region_id, request, None); self.workers.submit_to_worker(region_id, request).await?; - receiver - .await - .context(RecvSnafu)? - .map(|response| (region_id, response)) - }) + receiver.await.context(RecvSnafu)? + })) } - let r = join_all(tasks).await; + let r = join_all( + tasks + .into_iter() + .map(|(region_id, rows)| async move { (region_id, rows.await) }), + ) + .await; + responses.extend(r); } @@ -487,10 +489,11 @@ impl RegionEngine for MitoEngine { .map(|responses| { responses .into_iter() - .map(|response| { - response - .map(|(region_id, row)| (region_id, RegionResponse::new(row))) - .map_err(BoxedError::new) + .map(|(region_id, response)| { + ( + region_id, + response.map(RegionResponse::new).map_err(BoxedError::new), + ) }) .collect::>() }) diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index ba6c928d097e..3b3ed3b696e8 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -179,7 +179,7 @@ pub trait RegionScanner: Debug + DisplayAs + Send + Sync { pub type RegionScannerRef = Arc; -pub type BatchResponses = Vec>; +pub type BatchResponses = Vec<(RegionId, Result)>; #[async_trait] pub trait RegionEngine: Send + Sync { @@ -201,9 +201,10 @@ pub trait RegionEngine: Send + Sync { tasks.push(async move { // Safety: semaphore must exist let _permit = semaphore_moved.acquire().await.unwrap(); - self.handle_request(region_id, RegionRequest::Open(request)) - .await - .map(|response| (region_id, response)) + let result = self + .handle_request(region_id, RegionRequest::Open(request)) + .await; + (region_id, result) }); }