Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(batch): remove unused mem code #16641

Merged
merged 1 commit into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 0 additions & 39 deletions src/batch/src/task/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use prometheus::core::Atomic;
Expand Down Expand Up @@ -63,10 +62,6 @@ pub trait BatchTaskContext: Clone + Send + Sync + 'static {

fn source_metrics(&self) -> Arc<SourceMetrics>;

fn store_mem_usage(&self, val: usize);

fn mem_usage(&self) -> usize;

fn create_executor_mem_context(&self, executor_id: &str) -> MemoryContext;

fn worker_node_manager(&self) -> Option<WorkerNodeManagerRef>;
Expand All @@ -80,12 +75,6 @@ pub struct ComputeNodeContext {
batch_metrics: Option<BatchMetricsWithTaskLabels>,

mem_context: MemoryContext,

// Last mem usage value. Init to be 0. Should be the last value of `cur_mem_val`.
last_mem_val: Arc<AtomicUsize>,
// How many memory bytes have been used in this task for the latest report value. Will be moved
// to `last_mem_val` if new value comes in.
cur_mem_val: Arc<AtomicUsize>,
}

impl BatchTaskContext for ComputeNodeContext {
Expand Down Expand Up @@ -127,22 +116,6 @@ impl BatchTaskContext for ComputeNodeContext {
self.env.source_metrics()
}

fn store_mem_usage(&self, val: usize) {
// Record the last mem val.
// Calculate the difference between old val and new value, and apply the diff to total
// memory usage value.
let old_value = self.cur_mem_val.load(Ordering::Relaxed);
self.last_mem_val.store(old_value, Ordering::Relaxed);
let diff = val as i64 - old_value as i64;
self.env.task_manager().apply_mem_diff(diff);

self.cur_mem_val.store(val, Ordering::Relaxed);
}

fn mem_usage(&self) -> usize {
self.cur_mem_val.load(Ordering::Relaxed)
}

fn create_executor_mem_context(&self, executor_id: &str) -> MemoryContext {
if let Some(metrics) = &self.batch_metrics {
let executor_mem_usage = metrics
Expand All @@ -167,8 +140,6 @@ impl ComputeNodeContext {
Self {
env: BatchEnvironment::for_test(),
batch_metrics: None,
cur_mem_val: Arc::new(0.into()),
last_mem_val: Arc::new(0.into()),
mem_context: MemoryContext::none(),
}
}
Expand All @@ -189,17 +160,13 @@ impl ComputeNodeContext {
Self {
env,
batch_metrics: Some(batch_metrics),
cur_mem_val: Arc::new(0.into()),
last_mem_val: Arc::new(0.into()),
mem_context,
}
} else {
let batch_mem_context = env.task_manager().memory_context_ref();
Self {
env,
batch_metrics: None,
cur_mem_val: Arc::new(0.into()),
last_mem_val: Arc::new(0.into()),
mem_context: batch_mem_context,
}
}
Expand All @@ -210,13 +177,7 @@ impl ComputeNodeContext {
Self {
env,
batch_metrics: None,
cur_mem_val: Arc::new(0.into()),
last_mem_val: Arc::new(0.into()),
mem_context: batch_mem_context,
}
}

pub fn mem_usage(&self) -> usize {
self.cur_mem_val.load(Ordering::Relaxed)
}
}
4 changes: 0 additions & 4 deletions src/batch/src/task/task_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,10 +691,6 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
}
}

pub fn mem_usage(&self) -> usize {
self.context.mem_usage()
}

/// Check the task status: whether has ended.
pub fn is_end(&self) -> bool {
let guard = self.state.lock();
Expand Down
46 changes: 0 additions & 46 deletions src/batch/src/task/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::net::SocketAddr;
use std::sync::Arc;

use anyhow::Context;
use hytra::TrAdder;
use parking_lot::Mutex;
use risingwave_common::config::BatchConfig;
use risingwave_common::memory::MemoryContext;
Expand Down Expand Up @@ -50,11 +49,6 @@ pub struct BatchManager {
/// Batch configuration
config: BatchConfig,

/// Total batch memory usage in this CN.
/// When each task context report their own usage, it will apply the diff into this total mem
/// value for all tasks.
total_mem_val: Arc<TrAdder<i64>>,

/// Memory context used for batch tasks in cn.
mem_context: MemoryContext,

Expand All @@ -81,7 +75,6 @@ impl BatchManager {
tasks: Arc::new(Mutex::new(HashMap::new())),
runtime: Arc::new(runtime.into()),
config,
total_mem_val: TrAdder::new().into(),
metrics,
mem_context,
}
Expand Down Expand Up @@ -288,45 +281,6 @@ impl BatchManager {
pub fn config(&self) -> &BatchConfig {
&self.config
}

/// Kill batch queries with larges memory consumption per task. Required to maintain task level
/// memory usage in the struct. Will be called by global memory manager.
pub fn kill_queries(&self, reason: String) {
let mut max_mem_task_id = None;
let mut max_mem = usize::MIN;
let guard = self.tasks.lock();
for (t_id, t) in &*guard {
// If the task has been stopped, we should not count this.
if t.is_end() {
continue;
}
// Alternatively, we can use a bool flag to indicate end of execution.
// Now we use only store 0 bytes in Context after execution ends.
let mem_usage = t.mem_usage();
if mem_usage > max_mem {
max_mem = mem_usage;
max_mem_task_id = Some(t_id.clone());
}
}
if let Some(id) = max_mem_task_id {
let t = guard.get(&id).unwrap();
// FIXME: `Abort` will not report error but truncated results to user. We should
// consider throw error.
t.abort(reason);
}
}

/// Called by global memory manager for total usage of batch tasks. This op is designed to be
/// light-weight
pub fn total_mem_usage(&self) -> usize {
self.total_mem_val.get() as usize
}

/// Calculate the diff between this time and last time memory usage report, apply the diff for
/// the global counter. Due to the limitation of hytra, we need to use i64 type here.
pub fn apply_mem_diff(&self, diff: i64) {
self.total_mem_val.inc(diff)
}
}

#[cfg(test)]
Expand Down
8 changes: 0 additions & 8 deletions src/frontend/src/scheduler/task_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,6 @@ impl BatchTaskContext for FrontendBatchTaskContext {
self.session.env().source_metrics()
}

fn store_mem_usage(&self, _val: usize) {
todo!()
}

fn mem_usage(&self) -> usize {
todo!()
}

fn create_executor_mem_context(&self, _executor_id: &str) -> MemoryContext {
MemoryContext::new(Some(self.mem_context.clone()), TrAdderAtomic::new(0))
}
Expand Down
Loading