Skip to content

Commit

Permalink
refactor: refactor handle_batch_open_requests method signature
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Jun 4, 2024
1 parent 4d71a52 commit feec1fd
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 27 deletions.
49 changes: 26 additions & 23 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ mod append_mode_test;
#[cfg(test)]
mod basic_test;
#[cfg(test)]
mod batch_open_test;
#[cfg(test)]
mod catchup_test;
#[cfg(test)]
mod close_test;
Expand Down Expand Up @@ -299,8 +301,8 @@ impl EngineInner {
&self,
topic: String,
region_requests: Vec<(RegionId, RegionOpenRequest)>,
) -> Result<Vec<Result<(RegionId, AffectedRows)>>> {
let mut response_receivers = Vec::with_capacity(region_requests.len());
) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
let mut responses = Vec::with_capacity(region_requests.len());
let region_ids = region_requests
.iter()
.map(|(region_id, _)| *region_id)
Expand All @@ -325,21 +327,18 @@ impl EngineInner {
},
) {
self.workers.submit_to_worker(region_id, request).await?;
response_receivers.push((region_id, receiver));
responses.push((region_id, async move { receiver.await.context(RecvSnafu)? }));
}
// Waits for entries distribution.
let distribution =
common_runtime::spawn_read(async move { distributor.distribute().await });

// Waits for worker returns.
let responses = join_all(response_receivers.into_iter().map(
|(region_id, receiver)| async move {
receiver
.await
.context(RecvSnafu)?
.map(|response| (region_id, response))
},
))
let responses = join_all(
responses
.into_iter()
.map(|(region_id, rows)| async move { (region_id, rows.await) }),
)
.await;

let _ = distribution.await.context(JoinSnafu)?;
Expand All @@ -350,7 +349,7 @@ impl EngineInner {
&self,
parallelism: usize,
requests: Vec<(RegionId, RegionOpenRequest)>,
) -> Result<Vec<Result<(RegionId, AffectedRows)>>> {
) -> Result<Vec<(RegionId, Result<AffectedRows>)>> {
let semaphore = Arc::new(Semaphore::new(parallelism));
let (topic_to_region_requests, remaining_region_requests) =
prepare_batch_open_requests(requests)?;
Expand All @@ -375,20 +374,23 @@ impl EngineInner {
if !remaining_region_requests.is_empty() {
let mut tasks = Vec::with_capacity(remaining_region_requests.len());
for (region_id, request) in remaining_region_requests {
tasks.push(async move {
tasks.push((region_id, async move {
let (request, receiver) =
WorkerRequest::new_open_region_request(region_id, request, None);

self.workers.submit_to_worker(region_id, request).await?;

receiver
.await
.context(RecvSnafu)?
.map(|response| (region_id, response))
})
receiver.await.context(RecvSnafu)?
}))
}

let r = join_all(tasks).await;
let r = join_all(
tasks
.into_iter()
.map(|(region_id, rows)| async move { (region_id, rows.await) }),
)
.await;

responses.extend(r);
}

Expand Down Expand Up @@ -487,10 +489,11 @@ impl RegionEngine for MitoEngine {
.map(|responses| {
responses
.into_iter()
.map(|response| {
response
.map(|(region_id, row)| (region_id, RegionResponse::new(row)))
.map_err(BoxedError::new)
.map(|(region_id, response)| {
(
region_id,
response.map(RegionResponse::new).map_err(BoxedError::new),
)
})
.collect::<Vec<_>>()
})
Expand Down
9 changes: 5 additions & 4 deletions src/store-api/src/region_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ pub trait RegionScanner: Debug + DisplayAs + Send + Sync {

pub type RegionScannerRef = Arc<dyn RegionScanner>;

pub type BatchResponses = Vec<Result<(RegionId, RegionResponse), BoxedError>>;
pub type BatchResponses = Vec<(RegionId, Result<RegionResponse, BoxedError>)>;

#[async_trait]
pub trait RegionEngine: Send + Sync {
Expand All @@ -201,9 +201,10 @@ pub trait RegionEngine: Send + Sync {
tasks.push(async move {
// Safety: semaphore must exist
let _permit = semaphore_moved.acquire().await.unwrap();
self.handle_request(region_id, RegionRequest::Open(request))
.await
.map(|response| (region_id, response))
let result = self
.handle_request(region_id, RegionRequest::Open(request))
.await;
(region_id, result)
});
}

Expand Down

0 comments on commit feec1fd

Please sign in to comment.