Skip to content

Commit

Permalink
refactor: remove send_result
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag committed Sep 12, 2023
1 parent a7978e9 commit 54f1871
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 28 deletions.
3 changes: 1 addition & 2 deletions src/mito2/src/compaction/twcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use crate::request::{
use crate::sst::file::{FileHandle, FileId, FileMeta};
use crate::sst::file_purger::FilePurgerRef;
use crate::sst::version::LevelMeta;
use crate::worker::send_result;

const MAX_PARALLEL_COMPACTION: usize = 8;

Expand Down Expand Up @@ -158,7 +157,7 @@ impl Picker for TwcsPicker {

if outputs.is_empty() && expired_ssts.is_empty() {
// Nothing to compact.
send_result(waiter, Ok(Output::AffectedRows(0)));
waiter.send(Ok(Output::AffectedRows(0)));
return None;
}
let task = TwcsCompactionTask {
Expand Down
19 changes: 7 additions & 12 deletions src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ use crate::schedule::scheduler::{Job, SchedulerRef};
use crate::sst::file::{FileId, FileMeta};
use crate::sst::file_purger::FilePurgerRef;
use crate::sst::parquet::WriteOptions;
use crate::worker::send_result;

/// Global write buffer (memtable) manager.
///
Expand Down Expand Up @@ -560,20 +559,16 @@ impl FlushStatus {
task.on_failure(err.clone());
}
for ddl in self.pending_ddls {
send_result(
ddl.sender,
Err(err.clone()).context(FlushRegionSnafu {
region_id: self.region.region_id,
}),
);
ddl.sender.send(Err(err.clone()).context(FlushRegionSnafu {
region_id: self.region.region_id,
}));
}
for write_req in self.pending_writes {
send_result(
write_req.sender,
Err(err.clone()).context(FlushRegionSnafu {
write_req
.sender
.send(Err(err.clone()).context(FlushRegionSnafu {
region_id: self.region.region_id,
}),
);
}));
}
}
}
Expand Down
9 changes: 1 addition & 8 deletions src/mito2/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use std::hash::{Hash, Hasher};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use common_query::Output;
use common_runtime::JoinHandle;
use common_telemetry::{error, info, warn};
use futures::future::try_join_all;
Expand All @@ -47,8 +46,7 @@ use crate::memtable::time_series::TimeSeriesMemtableBuilder;
use crate::memtable::MemtableBuilderRef;
use crate::region::{MitoRegionRef, RegionMap, RegionMapRef};
use crate::request::{
BackgroundNotify, DdlRequest, OptionOutputTx, SenderDdlRequest, SenderWriteRequest,
WorkerRequest,
BackgroundNotify, DdlRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest,
};
use crate::schedule::scheduler::{LocalScheduler, SchedulerRef};
use crate::wal::Wal;
Expand Down Expand Up @@ -178,11 +176,6 @@ impl WorkerGroup {
}
}

/// Send result to the sender.
pub(crate) fn send_result(sender: OptionOutputTx, res: Result<Output>) {
sender.send(res);
}

// Tests methods.
#[cfg(test)]
impl WorkerGroup {
Expand Down
8 changes: 4 additions & 4 deletions src/mito2/src/worker/handle_alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::memtable::MemtableBuilderRef;
use crate::region::version::Version;
use crate::region::MitoRegionRef;
use crate::request::{DdlRequest, OptionOutputTx, SenderDdlRequest};
use crate::worker::{send_result, RegionWorkerLoop};
use crate::worker::RegionWorkerLoop;

impl<S> RegionWorkerLoop<S> {
pub(crate) async fn handle_alter_request(
Expand All @@ -57,7 +57,7 @@ impl<S> RegionWorkerLoop<S> {
let task = self.new_flush_task(&region, FlushReason::Alter);
if let Err(e) = self.flush_scheduler.schedule_flush(&region, task) {
// Unable to flush the region, send error to waiter.
send_result(sender, Err(e));
sender.send(Err(e));
return;
}

Expand All @@ -77,7 +77,7 @@ impl<S> RegionWorkerLoop<S> {
alter_region_schema(&region, &version, request, &self.memtable_builder).await
{
error!(e; "Failed to alter region schema, region_id: {}", region_id);
send_result(sender, Err(e));
sender.send(Err(e));
return;
}

Expand All @@ -89,7 +89,7 @@ impl<S> RegionWorkerLoop<S> {
);

// Notifies waiters.
send_result(sender, Ok(Output::AffectedRows(0)));
sender.send(Ok(Output::AffectedRows(0)));
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/worker/handle_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use store_api::storage::RegionId;
use crate::error::{RejectWriteSnafu, Result};
use crate::region_write_ctx::RegionWriteCtx;
use crate::request::{SenderWriteRequest, WriteRequest};
use crate::worker::{send_result, RegionWorkerLoop};
use crate::worker::RegionWorkerLoop;

impl<S: LogStore> RegionWorkerLoop<S> {
/// Takes and handles all write requests.
Expand Down Expand Up @@ -122,7 +122,7 @@ impl<S> RegionWorkerLoop<S> {
if let Err(e) =
maybe_fill_missing_columns(&mut sender_req.request, &region_ctx.version().metadata)
{
send_result(sender_req.sender, Err(e));
sender_req.sender.send(Err(e));

continue;
}
Expand Down

0 comments on commit 54f1871

Please sign in to comment.