Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 committed Jun 21, 2024
1 parent 26bd7e1 commit 7a02f4a
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 23 deletions.
4 changes: 2 additions & 2 deletions src/batch/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::executor::{
};
use crate::monitor::BatchSpillMetrics;
use crate::spill::spill_op::{
SpillBuildHasher, SpillOp, SpillType, DEFAULT_SPILL_PARTITION_NUM, SPILL_AT_LEAST_MEMORY,
SpillBackend, SpillBuildHasher, SpillOp, DEFAULT_SPILL_PARTITION_NUM, SPILL_AT_LEAST_MEMORY,
};
use crate::task::{BatchTaskContext, ShutdownToken, TaskId};

Expand Down Expand Up @@ -325,7 +325,7 @@ impl AggSpillManager {
) -> Result<Self> {
let suffix_uuid = uuid::Uuid::new_v4();
let dir = format!("/{}-{}/", agg_identity, suffix_uuid);
let op = SpillOp::create(dir, SpillType::Disk)?;
let op = SpillOp::create(dir, SpillBackend::Disk)?;
let agg_state_writers = Vec::with_capacity(partition_num);
let agg_state_chunk_builder = Vec::with_capacity(partition_num);
let input_writers = Vec::with_capacity(partition_num);
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::executor::{
use crate::monitor::BatchSpillMetrics;
use crate::risingwave_common::hash::NullBitmap;
use crate::spill::spill_op::{
SpillBuildHasher, SpillOp, SpillType, DEFAULT_SPILL_PARTITION_NUM, SPILL_AT_LEAST_MEMORY,
SpillBackend, SpillBuildHasher, SpillOp, DEFAULT_SPILL_PARTITION_NUM, SPILL_AT_LEAST_MEMORY,
};
use crate::task::{BatchTaskContext, ShutdownToken};

Expand Down Expand Up @@ -273,7 +273,7 @@ impl JoinSpillManager {
) -> Result<Self> {
let suffix_uuid = uuid::Uuid::new_v4();
let dir = format!("/{}-{}/", join_identity, suffix_uuid);
let op = SpillOp::create(dir, SpillType::Disk)?;
let op = SpillOp::create(dir, SpillBackend::Disk)?;
let probe_side_writers = Vec::with_capacity(partition_num);
let build_side_writers = Vec::with_capacity(partition_num);
let probe_side_chunk_builders = Vec::with_capacity(partition_num);
Expand Down
28 changes: 14 additions & 14 deletions src/batch/src/executor/order_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ use super::{
use crate::error::{BatchError, Result};
use crate::executor::merge_sort::MergeSortExecutor;
use crate::monitor::BatchSpillMetrics;
use crate::spill::spill_op::SpillType::Disk;
use crate::spill::spill_op::SpillBackend::Disk;
use crate::spill::spill_op::{
SpillOp, SpillType, DEFAULT_SPILL_PARTITION_NUM, SPILL_AT_LEAST_MEMORY,
SpillBackend, SpillOp, DEFAULT_SPILL_PARTITION_NUM, SPILL_AT_LEAST_MEMORY,
};
use crate::task::BatchTaskContext;

Expand All @@ -56,7 +56,7 @@ pub struct SortExecutor {
schema: Schema,
chunk_size: usize,
mem_context: MemoryContext,
spill_type: Option<SpillType>,
spill_backend: Option<SpillBackend>,
spill_metrics: Arc<BatchSpillMetrics>,
/// The upper bound of memory usage for this executor.
memory_upper_bound: Option<u64>,
Expand Down Expand Up @@ -131,7 +131,7 @@ impl SortExecutor {
let chunk_estimated_heap_size = chunk.estimated_heap_size();
chunks.push(chunk);
if !self.mem_context.add(chunk_estimated_heap_size as i64) && check_memory {
if self.spill_type.is_some() {
if self.spill_backend.is_some() {
need_to_spill = true;
break;
} else {
Expand All @@ -156,7 +156,7 @@ impl SortExecutor {
.map(|(row_id, row)| (chunk.row_at_unchecked_vis(row_id), row)),
);
if !self.mem_context.add(chunk_estimated_heap_size as i64) && check_memory {
if self.spill_type.is_some() {
if self.spill_backend.is_some() {
need_to_spill = true;
break;
} else {
Expand All @@ -174,7 +174,7 @@ impl SortExecutor {
// If memory is still not enough in the sub SortExecutor, it will spill its inputs recursively.
info!("batch sort executor {} starts to spill out", &self.identity);
let mut sort_spill_manager = SortSpillManager::new(
self.spill_type.clone().unwrap(),
self.spill_backend.clone().unwrap(),
&self.identity,
DEFAULT_SPILL_PARTITION_NUM,
child_schema.data_types(),
Expand Down Expand Up @@ -214,7 +214,7 @@ impl SortExecutor {
format!("{}-sub{}", self.identity.clone(), i),
self.chunk_size,
self.mem_context.clone(),
self.spill_type.clone(),
self.spill_backend.clone(),
self.spill_metrics.clone(),
Some(partition_size),
);
Expand Down Expand Up @@ -263,7 +263,7 @@ impl SortExecutor {
identity: String,
chunk_size: usize,
mem_context: MemoryContext,
spill_type: Option<SpillType>,
spill_backend: Option<SpillBackend>,
spill_metrics: Arc<BatchSpillMetrics>,
) -> Self {
Self::new_inner(
Expand All @@ -272,7 +272,7 @@ impl SortExecutor {
identity,
chunk_size,
mem_context,
spill_type,
spill_backend,
spill_metrics,
None,
)
Expand All @@ -284,7 +284,7 @@ impl SortExecutor {
identity: String,
chunk_size: usize,
mem_context: MemoryContext,
spill_type: Option<SpillType>,
spill_backend: Option<SpillBackend>,
spill_metrics: Arc<BatchSpillMetrics>,
memory_upper_bound: Option<u64>,
) -> Self {
Expand All @@ -296,7 +296,7 @@ impl SortExecutor {
schema,
chunk_size,
mem_context,
spill_type,
spill_backend,
spill_metrics,
memory_upper_bound,
}
Expand Down Expand Up @@ -330,7 +330,7 @@ struct SortSpillManager {

impl SortSpillManager {
fn new(
spill_type: SpillType,
spill_backend: SpillBackend,
agg_identity: &String,
partition_num: usize,
child_data_types: Vec<DataType>,
Expand All @@ -339,7 +339,7 @@ impl SortSpillManager {
) -> Result<Self> {
let suffix_uuid = uuid::Uuid::new_v4();
let dir = format!("/{}-{}/", agg_identity, suffix_uuid);
let op = SpillOp::create(dir, spill_type)?;
let op = SpillOp::create(dir, spill_backend)?;
let input_writers = Vec::with_capacity(partition_num);
let input_chunk_builders = Vec::with_capacity(partition_num);
Ok(Self {
Expand Down Expand Up @@ -1030,7 +1030,7 @@ mod tests {
"SortExecutor2".to_string(),
CHUNK_SIZE,
MemoryContext::for_spill_test(),
Some(SpillType::Memory),
Some(SpillBackend::Memory),
BatchSpillMetrics::for_test(),
));
let fields = &order_by_executor.schema().fields;
Expand Down
11 changes: 6 additions & 5 deletions src/batch/src/spill/spill_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ const DEFAULT_IO_BUFFER_SIZE: usize = 256 * 1024;
const DEFAULT_IO_CONCURRENT_TASK: usize = 8;

#[derive(Clone)]
pub enum SpillType {
pub enum SpillBackend {
Disk,
/// Only for testing purpose
Memory,
}

Expand All @@ -51,22 +52,22 @@ pub struct SpillOp {
}

impl SpillOp {
pub fn create(path: String, spill_type: SpillType) -> Result<SpillOp> {
pub fn create(path: String, spill_backend: SpillBackend) -> Result<SpillOp> {
assert!(path.ends_with('/'));

let spill_dir =
std::env::var(RW_BATCH_SPILL_DIR_ENV).unwrap_or_else(|_| DEFAULT_SPILL_DIR.to_string());
let root = format!("/{}/{}/{}/", spill_dir, RW_MANAGED_SPILL_DIR, path);

let op = match spill_type {
SpillType::Disk => {
let op = match spill_backend {
SpillBackend::Disk => {
let mut builder = Fs::default();
builder.root(&root);
Operator::new(builder)?
.layer(RetryLayer::default())
.finish()
}
SpillType::Memory => {
SpillBackend::Memory => {
let mut builder = Memory::default();
builder.root(&root);
Operator::new(builder)?
Expand Down

0 comments on commit 7a02f4a

Please sign in to comment.