diff --git a/src/batch/src/task/context.rs b/src/batch/src/task/context.rs index 886eeb7d9753d..f633439675eb1 100644 --- a/src/batch/src/task/context.rs +++ b/src/batch/src/task/context.rs @@ -11,7 +11,6 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use prometheus::core::Atomic; @@ -63,10 +62,6 @@ pub trait BatchTaskContext: Clone + Send + Sync + 'static { fn source_metrics(&self) -> Arc; - fn store_mem_usage(&self, val: usize); - - fn mem_usage(&self) -> usize; - fn create_executor_mem_context(&self, executor_id: &str) -> MemoryContext; fn worker_node_manager(&self) -> Option; @@ -80,12 +75,6 @@ pub struct ComputeNodeContext { batch_metrics: Option, mem_context: MemoryContext, - - // Last mem usage value. Init to be 0. Should be the last value of `cur_mem_val`. - last_mem_val: Arc, - // How many memory bytes have been used in this task for the latest report value. Will be moved - // to `last_mem_val` if new value comes in. - cur_mem_val: Arc, } impl BatchTaskContext for ComputeNodeContext { @@ -127,22 +116,6 @@ impl BatchTaskContext for ComputeNodeContext { self.env.source_metrics() } - fn store_mem_usage(&self, val: usize) { - // Record the last mem val. - // Calculate the difference between old val and new value, and apply the diff to total - // memory usage value. - let old_value = self.cur_mem_val.load(Ordering::Relaxed); - self.last_mem_val.store(old_value, Ordering::Relaxed); - let diff = val as i64 - old_value as i64; - self.env.task_manager().apply_mem_diff(diff); - - self.cur_mem_val.store(val, Ordering::Relaxed); - } - - fn mem_usage(&self) -> usize { - self.cur_mem_val.load(Ordering::Relaxed) - } - fn create_executor_mem_context(&self, executor_id: &str) -> MemoryContext { if let Some(metrics) = &self.batch_metrics { let executor_mem_usage = metrics @@ -167,8 +140,6 @@ impl ComputeNodeContext { Self { env: BatchEnvironment::for_test(), batch_metrics: None, - cur_mem_val: Arc::new(0.into()), - last_mem_val: Arc::new(0.into()), mem_context: MemoryContext::none(), } } @@ -189,8 +160,6 @@ impl ComputeNodeContext { Self { env, batch_metrics: Some(batch_metrics), - cur_mem_val: Arc::new(0.into()), - last_mem_val: Arc::new(0.into()), mem_context, } } else { @@ -198,8 +167,6 @@ impl ComputeNodeContext { Self { env, batch_metrics: None, - cur_mem_val: Arc::new(0.into()), - last_mem_val: Arc::new(0.into()), mem_context: batch_mem_context, } } @@ -210,13 +177,7 @@ impl ComputeNodeContext { Self { env, batch_metrics: None, - cur_mem_val: Arc::new(0.into()), - last_mem_val: Arc::new(0.into()), mem_context: batch_mem_context, } } - - pub fn mem_usage(&self) -> usize { - self.cur_mem_val.load(Ordering::Relaxed) - } } diff --git a/src/batch/src/task/task_execution.rs b/src/batch/src/task/task_execution.rs index 7b5e00df18c03..87d94bc1d90b9 100644 --- a/src/batch/src/task/task_execution.rs +++ b/src/batch/src/task/task_execution.rs @@ -691,10 +691,6 @@ impl BatchTaskExecution { } } - pub fn mem_usage(&self) -> usize { - self.context.mem_usage() - } - /// Check the task status: whether has ended. pub fn is_end(&self) -> bool { let guard = self.state.lock(); diff --git a/src/batch/src/task/task_manager.rs b/src/batch/src/task/task_manager.rs index d8ef54bc23764..70af7e520b46c 100644 --- a/src/batch/src/task/task_manager.rs +++ b/src/batch/src/task/task_manager.rs @@ -17,7 +17,6 @@ use std::net::SocketAddr; use std::sync::Arc; use anyhow::Context; -use hytra::TrAdder; use parking_lot::Mutex; use risingwave_common::config::BatchConfig; use risingwave_common::memory::MemoryContext; @@ -50,11 +49,6 @@ pub struct BatchManager { /// Batch configuration config: BatchConfig, - /// Total batch memory usage in this CN. - /// When each task context report their own usage, it will apply the diff into this total mem - /// value for all tasks. - total_mem_val: Arc>, - /// Memory context used for batch tasks in cn. mem_context: MemoryContext, @@ -81,7 +75,6 @@ impl BatchManager { tasks: Arc::new(Mutex::new(HashMap::new())), runtime: Arc::new(runtime.into()), config, - total_mem_val: TrAdder::new().into(), metrics, mem_context, } @@ -288,45 +281,6 @@ impl BatchManager { pub fn config(&self) -> &BatchConfig { &self.config } - - /// Kill batch queries with larges memory consumption per task. Required to maintain task level - /// memory usage in the struct. Will be called by global memory manager. - pub fn kill_queries(&self, reason: String) { - let mut max_mem_task_id = None; - let mut max_mem = usize::MIN; - let guard = self.tasks.lock(); - for (t_id, t) in &*guard { - // If the task has been stopped, we should not count this. - if t.is_end() { - continue; - } - // Alternatively, we can use a bool flag to indicate end of execution. - // Now we use only store 0 bytes in Context after execution ends. - let mem_usage = t.mem_usage(); - if mem_usage > max_mem { - max_mem = mem_usage; - max_mem_task_id = Some(t_id.clone()); - } - } - if let Some(id) = max_mem_task_id { - let t = guard.get(&id).unwrap(); - // FIXME: `Abort` will not report error but truncated results to user. We should - // consider throw error. - t.abort(reason); - } - } - - /// Called by global memory manager for total usage of batch tasks. This op is designed to be - /// light-weight - pub fn total_mem_usage(&self) -> usize { - self.total_mem_val.get() as usize - } - - /// Calculate the diff between this time and last time memory usage report, apply the diff for - /// the global counter. Due to the limitation of hytra, we need to use i64 type here. - pub fn apply_mem_diff(&self, diff: i64) { - self.total_mem_val.inc(diff) - } } #[cfg(test)] diff --git a/src/frontend/src/scheduler/task_context.rs b/src/frontend/src/scheduler/task_context.rs index 35f20a6daeffd..e8aafc440f289 100644 --- a/src/frontend/src/scheduler/task_context.rs +++ b/src/frontend/src/scheduler/task_context.rs @@ -94,14 +94,6 @@ impl BatchTaskContext for FrontendBatchTaskContext { self.session.env().source_metrics() } - fn store_mem_usage(&self, _val: usize) { - todo!() - } - - fn mem_usage(&self) -> usize { - todo!() - } - fn create_executor_mem_context(&self, _executor_id: &str) -> MemoryContext { MemoryContext::new(Some(self.mem_context.clone()), TrAdderAtomic::new(0)) }