Skip to content

Commit

Permalink
propagate refactor
Browse files Browse the repository at this point in the history
Signed-off-by: tison <[email protected]>
  • Loading branch information
tisonkun committed Dec 5, 2023
1 parent 541a11c commit 79d7176
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 21 deletions.
2 changes: 1 addition & 1 deletion src/datanode/src/alive_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ impl CountdownTask {
let request = RegionRequest::Close(RegionCloseRequest {});
match self
.region_server
.handle_request(self.region_id, request)
.handle_execution(self.region_id, request)
.await
{
Ok(_) => return true,
Expand Down
2 changes: 1 addition & 1 deletion src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ impl DatanodeBuilder {
tasks.push(async move {
let _permit = semaphore_moved.acquire().await;
region_server
.handle_request(
.handle_execution(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: engine.clone(),
Expand Down
12 changes: 6 additions & 6 deletions src/datanode/src/heartbeat/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl RegionHeartbeatResponseHandler {
region_dir: region_dir(&region_storage_path, region_id),
options,
});
let result = region_server.handle_request(region_id, request).await;
let result = region_server.handle_execution(region_id, request).await;

let success = result.is_ok();
let error = result.as_ref().map_err(|e| e.to_string()).err();
Expand All @@ -77,7 +77,7 @@ impl RegionHeartbeatResponseHandler {
Box::pin(async move {
let region_id = Self::region_ident_to_region_id(&region_ident);
let request = RegionRequest::Close(RegionCloseRequest {});
let result = region_server.handle_request(region_id, request).await;
let result = region_server.handle_execution(region_id, request).await;

match result {
Ok(_) => InstructionReply::CloseRegion(SimpleReply {
Expand Down Expand Up @@ -257,7 +257,7 @@ mod tests {
let builder = CreateRequestBuilder::new();
let create_req = builder.build();
region_server
.handle_request(region_id, RegionRequest::Create(create_req))
.handle_execution(region_id, RegionRequest::Create(create_req))
.await
.unwrap();

Expand Down Expand Up @@ -306,12 +306,12 @@ mod tests {
create_req.region_dir = region_dir(storage_path, region_id);

region_server
.handle_request(region_id, RegionRequest::Create(create_req))
.handle_execution(region_id, RegionRequest::Create(create_req))
.await
.unwrap();

region_server
.handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
.handle_execution(region_id, RegionRequest::Close(RegionCloseRequest {}))
.await
.unwrap();
let mut heartbeat_env = HeartbeatResponseTestEnv::new();
Expand Down Expand Up @@ -386,7 +386,7 @@ mod tests {
create_req.region_dir = region_dir(storage_path, region_id);

region_server
.handle_request(region_id, RegionRequest::Create(create_req))
.handle_execution(region_id, RegionRequest::Create(create_req))
.await
.unwrap();

Expand Down
20 changes: 7 additions & 13 deletions src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,12 @@ impl RegionServer {
self.inner.register_engine(engine);
}

pub async fn handle_request(
pub async fn handle_execution(
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<Output> {
self.inner.handle_request(region_id, request).await
) -> Result<usize> {
self.inner.handle_execution(region_id, request).await
}

#[tracing::instrument(skip_all)]
Expand Down Expand Up @@ -194,7 +194,7 @@ impl RegionServerHandler for RegionServer {
));
async move {
self_to_move
.handle_request(region_id, req)
.handle_execution(region_id, req)
.trace(span)
.await
}
Expand All @@ -209,13 +209,7 @@ impl RegionServerHandler for RegionServer {
// only insert/delete will have multiple results.
let mut affected_rows = 0;
for result in results {
match result {
Output::AffectedRows(rows) => affected_rows += rows,
Output::Stream(_) | Output::RecordBatches(_) => {
// TODO: change the output type to only contains `affected_rows`
unreachable!()
}
}
affected_rows += result;
}

Ok(RegionResponse {
Expand Down Expand Up @@ -290,11 +284,11 @@ impl RegionServerInner {
.insert(engine_name.to_string(), engine);
}

pub async fn handle_request(
pub async fn handle_execution(
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<Output> {
) -> Result<usize> {
let request_type = request.request_type();
let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED
.with_label_values(&[request_type])
Expand Down

0 comments on commit 79d7176

Please sign in to comment.