Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batch): clean spill directory at the start #17155

Merged
merged 2 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/batch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub mod execution;
pub mod executor;
pub mod monitor;
pub mod rpc;
mod spill;
pub mod spill;
pub mod task;
pub mod worker_manager;

Expand Down
20 changes: 20 additions & 0 deletions src/batch/src/spill/spill_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::hash::BuildHasher;
use std::ops::{Deref, DerefMut};
use std::sync::LazyLock;

use anyhow::anyhow;
use futures_async_stream::try_stream;
Expand All @@ -25,6 +26,7 @@ use prost::Message;
use risingwave_common::array::DataChunk;
use risingwave_pb::data::DataChunk as PbDataChunk;
use thiserror_ext::AsReport;
use tokio::sync::Mutex;
use twox_hash::XxHash64;

use crate::error::{BatchError, Result};
Expand Down Expand Up @@ -58,6 +60,24 @@ impl SpillOp {
Ok(SpillOp { op })
}

pub async fn clean_spill_directory() -> opendal::Result<()> {
static LOCK: LazyLock<Mutex<usize>> = LazyLock::new(|| Mutex::new(0));
let _guard = LOCK.lock().await;

let spill_dir =
std::env::var(RW_BATCH_SPILL_DIR_ENV).unwrap_or_else(|_| DEFAULT_SPILL_DIR.to_string());
let root = format!("/{}/{}/", spill_dir, RW_MANAGED_SPILL_DIR);

let mut builder = Fs::default();
builder.root(&root);

let op: Operator = Operator::new(builder)?
.layer(RetryLayer::default())
.finish();

op.remove_all("/").await
}

pub async fn writer_with(&self, name: &str) -> Result<opendal::Writer> {
Ok(self
.op
Expand Down
4 changes: 4 additions & 0 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use risingwave_batch::monitor::{
GLOBAL_BATCH_EXECUTOR_METRICS, GLOBAL_BATCH_MANAGER_METRICS, GLOBAL_BATCH_TASK_METRICS,
};
use risingwave_batch::rpc::service::task_service::BatchServiceImpl;
use risingwave_batch::spill::spill_op::SpillOp;
use risingwave_batch::task::{BatchEnvironment, BatchManager};
use risingwave_common::config::{
load_config, AsyncStackTraceOption, MetricLevel, StorageMemoryConfig,
Expand Down Expand Up @@ -391,6 +392,9 @@ pub async fn compute_node_serve(
tracing::info!("Telemetry didn't start due to config");
}

// Clean up the spill directory.
SpillOp::clean_spill_directory().await.unwrap();

let (shutdown_send, mut shutdown_recv) = tokio::sync::oneshot::channel::<()>();
let join_handle = tokio::spawn(async move {
tonic::transport::Server::builder()
Expand Down
7 changes: 7 additions & 0 deletions src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};

use anyhow::anyhow;
use bytes::Bytes;
use either::Either;
use parking_lot::{Mutex, RwLock, RwLockReadGuard};
Expand All @@ -34,6 +35,7 @@ use pgwire::pg_server::{
};
use pgwire::types::{Format, FormatIterator};
use rand::RngCore;
use risingwave_batch::spill::spill_op::SpillOp;
use risingwave_batch::task::{ShutdownSender, ShutdownToken};
use risingwave_batch::worker_manager::worker_node_manager::{
WorkerNodeManager, WorkerNodeManagerRef,
Expand Down Expand Up @@ -401,6 +403,11 @@ impl FrontendEnv {
});
join_handles.push(join_handle);

// Clean up the spill directory.
SpillOp::clean_spill_directory()
.await
.map_err(|err| anyhow!(err))?;

let total_memory_bytes = resource_util::memory::system_memory_available_bytes();
let heap_profiler =
HeapProfiler::new(total_memory_bytes, config.server.heap_profiling.clone());
Expand Down
Loading