From 483a7c49043f56227bfd00529858bea0c5456534 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Thu, 6 Jun 2024 17:46:09 +0800 Subject: [PATCH 1/2] clean dpill directory at start --- src/batch/src/lib.rs | 2 +- src/batch/src/spill/spill_op.rs | 20 ++++++++++++++++++++ src/compute/src/server.rs | 4 ++++ src/frontend/src/session.rs | 7 +++++++ 4 files changed, 32 insertions(+), 1 deletion(-) diff --git a/src/batch/src/lib.rs b/src/batch/src/lib.rs index 2cf92d6ab08be..3a29e2a90b27e 100644 --- a/src/batch/src/lib.rs +++ b/src/batch/src/lib.rs @@ -38,7 +38,7 @@ pub mod execution; pub mod executor; pub mod monitor; pub mod rpc; -mod spill; +pub mod spill; pub mod task; pub mod worker_manager; diff --git a/src/batch/src/spill/spill_op.rs b/src/batch/src/spill/spill_op.rs index 70334c4c89795..d04c5ac26019a 100644 --- a/src/batch/src/spill/spill_op.rs +++ b/src/batch/src/spill/spill_op.rs @@ -14,6 +14,7 @@ use std::hash::BuildHasher; use std::ops::{Deref, DerefMut}; +use std::sync::LazyLock; use anyhow::anyhow; use futures_async_stream::try_stream; @@ -25,6 +26,7 @@ use prost::Message; use risingwave_common::array::DataChunk; use risingwave_pb::data::DataChunk as PbDataChunk; use thiserror_ext::AsReport; +use tokio::sync::Mutex; use twox_hash::XxHash64; use crate::error::{BatchError, Result}; @@ -58,6 +60,24 @@ impl SpillOp { Ok(SpillOp { op }) } + pub async fn clean_spill_directory() -> opendal::Result<()> { + static LOCK: LazyLock> = LazyLock::new(|| Mutex::new(0)); + let _guard = LOCK.lock().await; + + let spill_dir = + std::env::var(RW_BATCH_SPILL_DIR_ENV).unwrap_or_else(|_| DEFAULT_SPILL_DIR.to_string()); + let root = format!("/{}/{}/", spill_dir, RW_MANAGED_SPILL_DIR); + + let mut builder = Fs::default(); + builder.root(&root); + + let op: Operator = Operator::new(builder)? + .layer(RetryLayer::default()) + .finish(); + + op.remove_all("/").await + } + pub async fn writer_with(&self, name: &str) -> Result { Ok(self .op diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index a9cb9afa9736f..5231bcbb184cd 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -20,6 +20,7 @@ use risingwave_batch::monitor::{ GLOBAL_BATCH_EXECUTOR_METRICS, GLOBAL_BATCH_MANAGER_METRICS, GLOBAL_BATCH_TASK_METRICS, }; use risingwave_batch::rpc::service::task_service::BatchServiceImpl; +use risingwave_batch::spill::spill_op::SpillOp; use risingwave_batch::task::{BatchEnvironment, BatchManager}; use risingwave_common::config::{ load_config, AsyncStackTraceOption, MetricLevel, StorageMemoryConfig, @@ -391,6 +392,9 @@ pub async fn compute_node_serve( tracing::info!("Telemetry didn't start due to config"); } + // Clean up the spill directory. + SpillOp::clean_spill_directory().await.unwrap(); + let (shutdown_send, mut shutdown_recv) = tokio::sync::oneshot::channel::<()>(); let join_handle = tokio::spawn(async move { tonic::transport::Server::builder() diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 04e6d7fb4e294..5d85de84ae364 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -20,6 +20,7 @@ use std::sync::atomic::{AtomicI32, Ordering}; use std::sync::{Arc, Weak}; use std::time::{Duration, Instant}; +use anyhow::anyhow; use bytes::Bytes; use either::Either; use parking_lot::{Mutex, RwLock, RwLockReadGuard}; @@ -34,6 +35,7 @@ use pgwire::pg_server::{ }; use pgwire::types::{Format, FormatIterator}; use rand::RngCore; +use risingwave_batch::spill::spill_op::SpillOp; use risingwave_batch::task::{ShutdownSender, ShutdownToken}; use risingwave_batch::worker_manager::worker_node_manager::{ WorkerNodeManager, WorkerNodeManagerRef, @@ -401,6 +403,11 @@ impl FrontendEnv { }); join_handles.push(join_handle); + // Clean up the spill directory. + SpillOp::clean_spill_directory() + .await + .map_err(|err| anyhow!(err))?; + let total_memory_bytes = resource_util::memory::system_memory_available_bytes(); let heap_profiler = HeapProfiler::new(total_memory_bytes, config.server.heap_profiling.clone()); From ee64fada734b783601cc74db49cca5a4a3f1674a Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Thu, 6 Jun 2024 18:25:51 +0800 Subject: [PATCH 2/2] workaround madsim --- src/compute/src/server.rs | 1 + src/frontend/src/session.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index 5231bcbb184cd..21ec9de730228 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -393,6 +393,7 @@ pub async fn compute_node_serve( } // Clean up the spill directory. + #[cfg(not(madsim))] SpillOp::clean_spill_directory().await.unwrap(); let (shutdown_send, mut shutdown_recv) = tokio::sync::oneshot::channel::<()>(); diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 5d85de84ae364..7853f97bc9d4a 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -404,6 +404,7 @@ impl FrontendEnv { join_handles.push(join_handle); // Clean up the spill directory. + #[cfg(not(madsim))] SpillOp::clean_spill_directory() .await .map_err(|err| anyhow!(err))?;