Skip to content

Commit

Permalink
FIx
Browse files Browse the repository at this point in the history
  • Loading branch information
liurenjie1024 committed Oct 27, 2023
1 parent cb6feef commit b1f98d1
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 63 deletions.
6 changes: 3 additions & 3 deletions src/batch/src/executor/group_top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,9 @@ impl<K: HashKey> GroupTopNExecutor<K> {
#[cfg(test)]
mod tests {
use futures::stream::StreamExt;
use prometheus::IntGauge;
use risingwave_common::array::DataChunk;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::metrics::LabelGuardedIntGauge;
use risingwave_common::test_prelude::DataChunkTestExt;
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::OrderType;
Expand All @@ -254,7 +254,7 @@ mod tests {

#[tokio::test]
async fn test_group_top_n_executor() {
let parent_mem = MemoryContext::root(IntGauge::new("root_memory_usage", " ").unwrap());
let parent_mem = MemoryContext::root(LabelGuardedIntGauge::<4>::test_int_gauge());
{
let schema = Schema {
fields: vec![
Expand Down Expand Up @@ -290,7 +290,7 @@ mod tests {
];
let mem_ctx = MemoryContext::new(
Some(parent_mem.clone()),
IntGauge::new("memory_usage", " ").unwrap(),
LabelGuardedIntGauge::<4>::test_int_gauge(),
);
let top_n_executor = (GroupTopNExecutorBuilder {
child: Box::new(mock_executor),
Expand Down
6 changes: 3 additions & 3 deletions src/batch/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,8 @@ mod tests {
use std::sync::Arc;

use futures_async_stream::for_await;
use prometheus::IntGauge;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::metrics::LabelGuardedIntGauge;
use risingwave_common::test_prelude::DataChunkTestExt;
use risingwave_pb::data::data_type::TypeName;
use risingwave_pb::data::PbDataType;
Expand All @@ -323,7 +323,7 @@ mod tests {

#[tokio::test]
async fn execute_int32_grouped() {
let parent_mem = MemoryContext::root(IntGauge::new("root_memory_usage", " ").unwrap());
let parent_mem = MemoryContext::root(LabelGuardedIntGauge::<4>::test_int_gauge());
{
let src_exec = Box::new(MockExecutor::with_chunk(
DataChunk::from_pretty(
Expand Down Expand Up @@ -370,7 +370,7 @@ mod tests {

let mem_context = MemoryContext::new(
Some(parent_mem.clone()),
IntGauge::new("memory_usage", " ").unwrap(),
LabelGuardedIntGauge::<4>::test_int_gauge(),
);
let actual_exec = HashAggExecutorBuilder::deserialize(
&agg_prost,
Expand Down
6 changes: 3 additions & 3 deletions src/batch/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1933,12 +1933,12 @@ impl<K> HashJoinExecutor<K> {
mod tests {
use futures::StreamExt;
use futures_async_stream::for_await;
use prometheus::IntGauge;
use risingwave_common::array::{ArrayBuilderImpl, DataChunk};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::error::Result;
use risingwave_common::hash::Key32;
use risingwave_common::memory::MemoryContext;
use risingwave_common::metrics::LabelGuardedIntGauge;
use risingwave_common::test_prelude::DataChunkTestExt;
use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqDebug;
Expand Down Expand Up @@ -2157,7 +2157,7 @@ mod tests {
};

let mem_ctx =
MemoryContext::new(parent_mem_ctx, IntGauge::new("memory_usage", " ").unwrap());
MemoryContext::new(parent_mem_ctx, LabelGuardedIntGauge::<4>::test_int_gauge());
Box::new(HashJoinExecutor::<Key32>::new(
join_type,
output_indices,
Expand Down Expand Up @@ -2198,7 +2198,7 @@ mod tests {
right_executor: BoxedExecutor,
) {
let parent_mem_context =
MemoryContext::root(IntGauge::new("total_memory_usage", " ").unwrap());
MemoryContext::root(LabelGuardedIntGauge::<4>::test_int_gauge());

{
let join_executor = self.create_join_executor_with_chunk_size_and_executors(
Expand Down
15 changes: 4 additions & 11 deletions src/batch/src/task/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,14 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::Deref;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use prometheus::IntGauge;
use risingwave_common::catalog::SysCatalogReaderRef;
use risingwave_common::config::BatchConfig;
use risingwave_common::error::Result;
use risingwave_common::memory::MemoryContext;
use risingwave_common::metrics::LabelGuardedIntGauge;
use risingwave_common::util::addr::{is_local_address, HostAddr};
use risingwave_connector::source::monitor::SourceMetrics;
use risingwave_rpc_client::ComputeClientPoolRef;
Expand Down Expand Up @@ -147,10 +145,7 @@ impl BatchTaskContext for ComputeNodeContext {
.executor_metrics()
.mem_usage
.with_label_values(&metrics.executor_labels(executor_id));
MemoryContext::new(
Some(self.mem_context.clone()),
executor_mem_usage.deref().clone(),
)
MemoryContext::new(Some(self.mem_context.clone()), executor_mem_usage)
} else {
MemoryContext::none()
}
Expand Down Expand Up @@ -181,9 +176,7 @@ impl ComputeNodeContext {
batch_metrics
.get_task_metrics()
.task_mem_usage
.with_label_values(&batch_metrics.task_labels())
.deref()
.clone(),
.with_label_values(&batch_metrics.task_labels()),
);
Self {
env,
Expand All @@ -201,7 +194,7 @@ impl ComputeNodeContext {
cur_mem_val: Arc::new(0.into()),
last_mem_val: Arc::new(0.into()),
// Leave it for now, it should be None
mem_context: MemoryContext::root(IntGauge::new("test", "test").unwrap()),
mem_context: MemoryContext::root(LabelGuardedIntGauge::<4>::test_int_gauge()),
}
}

Expand Down
7 changes: 3 additions & 4 deletions src/common/src/estimate_size/collections/heap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,13 @@ where

#[cfg(test)]
mod tests {
use prometheus::IntGauge;

use crate::estimate_size::collections::MemMonitoredHeap;
use crate::memory::MemoryContext;
use crate::metrics::LabelGuardedIntGauge;

#[test]
fn test_heap() {
let gauge = IntGauge::new("test", "test").unwrap();
let gauge = LabelGuardedIntGauge::<4>::test_int_gauge();
let mem_ctx = MemoryContext::root(gauge.clone());

let mut heap = MemMonitoredHeap::<u8>::new_with(mem_ctx);
Expand All @@ -130,7 +129,7 @@ mod tests {

#[test]
fn test_heap_drop() {
let gauge = IntGauge::new("test", "test").unwrap();
let gauge = LabelGuardedIntGauge::<4>::test_int_gauge();
let mem_ctx = MemoryContext::root(gauge.clone());

let vec = {
Expand Down
102 changes: 63 additions & 39 deletions src/common/src/memory/mem_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,64 +12,88 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::Deref;
use std::sync::Arc;

use prometheus::IntGauge;

use super::MonitoredGlobalAlloc;
use crate::metrics::TrAdderGauge;
use crate::metrics::{LabelGuardedIntGauge, TrAdderGauge};

struct MemoryContextInner {
counter: MemCounter,
parent: Option<MemoryContext>,
pub trait MemCounter: Send + Sync + 'static {
fn add(&self, bytes: i64);
fn get_bytes_used(&self) -> i64;
}

#[derive(Clone)]
pub struct MemoryContext {
/// Add None op mem context, so that we don't need to return [`Option`] in
/// `BatchTaskContext`. This helps with later `Allocator` implementation.
inner: Option<Arc<MemoryContextInner>>,
}

#[derive(Debug)]
pub enum MemCounter {
/// Used when the add/sub operation don't have much conflicts.
Atomic(IntGauge),
/// Used when the add/sub operation may cause a lot of conflicts.
TrAdder(TrAdderGauge),
}
impl MemCounter for TrAdderGauge {
fn add(&self, bytes: i64) {
self.add(bytes)
}

impl From<IntGauge> for MemCounter {
fn from(value: IntGauge) -> Self {
MemCounter::Atomic(value)
fn get_bytes_used(&self) -> i64 {
self.get()
}
}

impl MemCounter {
impl<const N: usize> MemCounter for LabelGuardedIntGauge<N> {
fn add(&self, bytes: i64) {
match &self {
MemCounter::TrAdder(c) => c.add(bytes),
MemCounter::Atomic(c) => c.add(bytes),
}
self.deref().add(bytes)
}

fn get_bytes_used(&self) -> i64 {
match &self {
MemCounter::TrAdder(c) => c.get(),
MemCounter::Atomic(c) => c.get(),
}
self.get()
}
}

impl From<TrAdderGauge> for MemCounter {
fn from(value: TrAdderGauge) -> Self {
MemCounter::TrAdder(value)
}
struct MemoryContextInner {
counter: Box<dyn MemCounter>,
parent: Option<MemoryContext>,
}

#[derive(Clone)]
pub struct MemoryContext {
/// Add None op mem context, so that we don't need to return [`Option`] in
/// `BatchTaskContext`. This helps with later `Allocator` implementation.
inner: Option<Arc<MemoryContextInner>>,
}

// #[derive(Debug)]
// pub enum MemCounter<const N: usize> {
// /// Used when the add/sub operation don't have much conflicts.
// Atomic(LabelGuardedIntGauge<N>),
// /// Used when the add/sub operation may cause a lot of conflicts.
// TrAdder(TrAdderGauge),
// }

// impl<const N: usize> From<LabelGuardedIntGauge<N>> for MemCounter {
// fn from(value: LabelGuardedIntGauge<N>) -> Self {
// MemCounter::Atomic(value)
// }
// }
//
// impl<const N: usize> MemCounter<N> {
// fn add(&self, bytes: i64) {
// match &self {
// MemCounter::TrAdder(c) => c.add(bytes),
// MemCounter::Atomic(c) => c.add(bytes),
// }
// }
//
// fn get_bytes_used(&self) -> i64 {
// match &self {
// MemCounter::TrAdder(c) => c.get(),
// MemCounter::Atomic(c) => c.get(),
// }
// }
// }
//
// impl From<TrAdderGauge> for MemCounter {
// fn from(value: TrAdderGauge) -> Self {
// MemCounter::TrAdder(value)
// }
// }

impl MemoryContext {
pub fn new(parent: Option<MemoryContext>, counter: impl Into<MemCounter>) -> Self {
let c = counter.into();
pub fn new(parent: Option<MemoryContext>, counter: impl MemCounter) -> Self {
let c = Box::new(counter);
Self {
inner: Some(Arc::new(MemoryContextInner { counter: c, parent })),
}
Expand All @@ -80,7 +104,7 @@ impl MemoryContext {
Self { inner: None }
}

pub fn root(counter: impl Into<MemCounter>) -> Self {
pub fn root(counter: impl MemCounter) -> Self {
Self::new(None, counter)
}

Expand Down

0 comments on commit b1f98d1

Please sign in to comment.