Skip to content

Commit

Permalink
Merge branch 'main' into tab/key-encode
Browse files Browse the repository at this point in the history
  • Loading branch information
tabVersion authored May 8, 2024
2 parents fed0cc2 + 2929fb8 commit 0128d55
Show file tree
Hide file tree
Showing 56 changed files with 1,112 additions and 719 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

7 changes: 1 addition & 6 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,12 +622,7 @@ def section_object_storage(outer_panels):
"",
[
panels.target(
f"sum(irate({metric('aws_sdk_retry_counts')}[$__rate_interval])) by ({NODE_LABEL}, {COMPONENT_LABEL}, type)",
"{{type}} - {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
),
panels.target(
f"sum(irate({metric('s3_read_request_retry_count')}[$__rate_interval])) by ({NODE_LABEL}, {COMPONENT_LABEL}, type)",
f"sum(rate({metric('object_store_request_retry_count')}[$__rate_interval])) by ({NODE_LABEL}, {COMPONENT_LABEL}, type)",
"{{type}} - {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions src/batch/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ pub enum BatchError {

#[error("Streaming vnode mapping not found for fragment {0}")]
StreamingVnodeMappingNotFound(FragmentId),

#[error("Not enough memory to run this query, batch memory limit is {0} bytes")]
OutOfMemory(u64),
}

// Serialize/deserialize error.
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/group_top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ mod tests {

#[tokio::test]
async fn test_group_top_n_executor() {
let parent_mem = MemoryContext::root(LabelGuardedIntGauge::<4>::test_int_gauge());
let parent_mem = MemoryContext::root(LabelGuardedIntGauge::<4>::test_int_gauge(), u64::MAX);
{
let schema = Schema {
fields: vec![
Expand Down
6 changes: 4 additions & 2 deletions src/batch/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,9 @@ impl<K: HashKey + Send + Sync> HashAggExecutor<K> {
}
}
// update memory usage
self.mem_context.add(memory_usage_diff);
if !self.mem_context.add(memory_usage_diff) {
Err(BatchError::OutOfMemory(self.mem_context.mem_limit()))?;
}
}

// Don't use `into_iter` here, it may cause memory leak.
Expand Down Expand Up @@ -323,7 +325,7 @@ mod tests {

#[tokio::test]
async fn execute_int32_grouped() {
let parent_mem = MemoryContext::root(LabelGuardedIntGauge::<4>::test_int_gauge());
let parent_mem = MemoryContext::root(LabelGuardedIntGauge::<4>::test_int_gauge(), u64::MAX);
{
let src_exec = Box::new(MockExecutor::with_chunk(
DataChunk::from_pretty(
Expand Down
10 changes: 7 additions & 3 deletions src/batch/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,9 @@ impl<K: HashKey> HashJoinExecutor<K> {
let build_chunk = build_chunk?;
if build_chunk.cardinality() > 0 {
build_row_count += build_chunk.cardinality();
self.mem_ctx.add(build_chunk.estimated_heap_size() as i64);
if !self.mem_ctx.add(build_chunk.estimated_heap_size() as i64) {
Err(BatchError::OutOfMemory(self.mem_ctx.mem_limit()))?;
}
build_side.push(build_chunk);
}
}
Expand Down Expand Up @@ -264,7 +266,9 @@ impl<K: HashKey> HashJoinExecutor<K> {
// Only insert key to hash map if it is consistent with the null safe restriction.
if build_key.null_bitmap().is_subset(&null_matched) {
let row_id = RowId::new(build_chunk_id, build_row_id);
self.mem_ctx.add(build_key.estimated_heap_size() as i64);
if !self.mem_ctx.add(build_key.estimated_heap_size() as i64) {
Err(BatchError::OutOfMemory(self.mem_ctx.mem_limit()))?;
}
next_build_row_with_same_key[row_id] = hash_map.insert(build_key, row_id);
}
}
Expand Down Expand Up @@ -2204,7 +2208,7 @@ mod tests {
right_executor: BoxedExecutor,
) {
let parent_mem_context =
MemoryContext::root(LabelGuardedIntGauge::<4>::test_int_gauge());
MemoryContext::root(LabelGuardedIntGauge::<4>::test_int_gauge(), u64::MAX);

{
let join_executor = self.create_join_executor_with_chunk_size_and_executors(
Expand Down
8 changes: 6 additions & 2 deletions src/batch/src/executor/join/lookup_join_base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ impl<K: HashKey> LookupJoinBase<K> {
let build_chunk = build_chunk?;
if build_chunk.cardinality() > 0 {
build_row_count += build_chunk.cardinality();
self.mem_ctx.add(build_chunk.estimated_heap_size() as i64);
if !self.mem_ctx.add(build_chunk.estimated_heap_size() as i64) {
Err(BatchError::OutOfMemory(self.mem_ctx.mem_limit()))?;
}
build_side.push(build_chunk);
}
}
Expand Down Expand Up @@ -160,7 +162,9 @@ impl<K: HashKey> LookupJoinBase<K> {
// restriction.
if build_key.null_bitmap().is_subset(&null_matched) {
let row_id = RowId::new(build_chunk_id, build_row_id);
self.mem_ctx.add(build_key.estimated_heap_size() as i64);
if !self.mem_ctx.add(build_key.estimated_heap_size() as i64) {
Err(BatchError::OutOfMemory(self.mem_ctx.mem_limit()))?;
}
hash_key_heap_size += build_key.estimated_heap_size() as i64;
next_build_row_with_same_key[row_id] = hash_map.insert(build_key, row_id);
}
Expand Down
4 changes: 3 additions & 1 deletion src/batch/src/executor/join/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ impl NestedLoopJoinExecutor {
for chunk in self.left_child.execute() {
let c = chunk?;
trace!("Estimated chunk size is {:?}", c.estimated_heap_size());
self.mem_context.add(c.estimated_heap_size() as i64);
if !self.mem_context.add(c.estimated_heap_size() as i64) {
Err(BatchError::OutOfMemory(self.mem_context.mem_limit()))?;
}
ret.push(c);
}
ret
Expand Down
16 changes: 12 additions & 4 deletions src/batch/src/executor/merge_sort_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl<CS: 'static + Send + CreateSource, C: BatchTaskContext> MergeSortExchangeEx

// Check whether there is indeed a chunk and there is a visible row sitting at `row_idx`
// in the chunk before calling this function.
fn push_row_into_heap(&mut self, source_idx: usize, row_idx: usize) {
fn push_row_into_heap(&mut self, source_idx: usize, row_idx: usize) -> Result<()> {
assert!(source_idx < self.source_inputs.len());
let chunk_ref = self.source_inputs[source_idx].as_ref().unwrap();
self.min_heap.push(HeapElem::new(
Expand All @@ -131,6 +131,14 @@ impl<CS: 'static + Send + CreateSource, C: BatchTaskContext> MergeSortExchangeEx
row_idx,
None,
));

if self.min_heap.mem_context().check_memory_usage() {
Ok(())
} else {
Err(BatchError::OutOfMemory(
self.min_heap.mem_context().mem_limit(),
))
}
}
}

Expand Down Expand Up @@ -166,7 +174,7 @@ impl<CS: 'static + Send + CreateSource, C: BatchTaskContext> MergeSortExchangeEx
// exchange, therefore we are sure that there is at least
// one visible row.
let next_row_idx = chunk.next_visible_row_idx(0);
self.push_row_into_heap(source_idx, next_row_idx.unwrap());
self.push_row_into_heap(source_idx, next_row_idx.unwrap())?;
}
}

Expand Down Expand Up @@ -201,13 +209,13 @@ impl<CS: 'static + Send + CreateSource, C: BatchTaskContext> MergeSortExchangeEx
let possible_next_row_idx = cur_chunk.next_visible_row_idx(row_idx + 1);
match possible_next_row_idx {
Some(next_row_idx) => {
self.push_row_into_heap(child_idx, next_row_idx);
self.push_row_into_heap(child_idx, next_row_idx)?;
}
None => {
self.get_source_chunk(child_idx).await?;
if let Some(chunk) = &self.source_inputs[child_idx] {
let next_row_idx = chunk.next_visible_row_idx(0);
self.push_row_into_heap(child_idx, next_row_idx.unwrap());
self.push_row_into_heap(child_idx, next_row_idx.unwrap())?;
}
}
}
Expand Down
39 changes: 0 additions & 39 deletions src/batch/src/task/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use prometheus::core::Atomic;
Expand Down Expand Up @@ -63,10 +62,6 @@ pub trait BatchTaskContext: Clone + Send + Sync + 'static {

fn source_metrics(&self) -> Arc<SourceMetrics>;

fn store_mem_usage(&self, val: usize);

fn mem_usage(&self) -> usize;

fn create_executor_mem_context(&self, executor_id: &str) -> MemoryContext;

fn worker_node_manager(&self) -> Option<WorkerNodeManagerRef>;
Expand All @@ -80,12 +75,6 @@ pub struct ComputeNodeContext {
batch_metrics: Option<BatchMetricsWithTaskLabels>,

mem_context: MemoryContext,

// Last mem usage value. Init to be 0. Should be the last value of `cur_mem_val`.
last_mem_val: Arc<AtomicUsize>,
// How many memory bytes have been used in this task for the latest report value. Will be moved
// to `last_mem_val` if new value comes in.
cur_mem_val: Arc<AtomicUsize>,
}

impl BatchTaskContext for ComputeNodeContext {
Expand Down Expand Up @@ -127,22 +116,6 @@ impl BatchTaskContext for ComputeNodeContext {
self.env.source_metrics()
}

fn store_mem_usage(&self, val: usize) {
// Record the last mem val.
// Calculate the difference between old val and new value, and apply the diff to total
// memory usage value.
let old_value = self.cur_mem_val.load(Ordering::Relaxed);
self.last_mem_val.store(old_value, Ordering::Relaxed);
let diff = val as i64 - old_value as i64;
self.env.task_manager().apply_mem_diff(diff);

self.cur_mem_val.store(val, Ordering::Relaxed);
}

fn mem_usage(&self) -> usize {
self.cur_mem_val.load(Ordering::Relaxed)
}

fn create_executor_mem_context(&self, executor_id: &str) -> MemoryContext {
if let Some(metrics) = &self.batch_metrics {
let executor_mem_usage = metrics
Expand All @@ -167,8 +140,6 @@ impl ComputeNodeContext {
Self {
env: BatchEnvironment::for_test(),
batch_metrics: None,
cur_mem_val: Arc::new(0.into()),
last_mem_val: Arc::new(0.into()),
mem_context: MemoryContext::none(),
}
}
Expand All @@ -189,17 +160,13 @@ impl ComputeNodeContext {
Self {
env,
batch_metrics: Some(batch_metrics),
cur_mem_val: Arc::new(0.into()),
last_mem_val: Arc::new(0.into()),
mem_context,
}
} else {
let batch_mem_context = env.task_manager().memory_context_ref();
Self {
env,
batch_metrics: None,
cur_mem_val: Arc::new(0.into()),
last_mem_val: Arc::new(0.into()),
mem_context: batch_mem_context,
}
}
Expand All @@ -210,13 +177,7 @@ impl ComputeNodeContext {
Self {
env,
batch_metrics: None,
cur_mem_val: Arc::new(0.into()),
last_mem_val: Arc::new(0.into()),
mem_context: batch_mem_context,
}
}

pub fn mem_usage(&self) -> usize {
self.cur_mem_val.load(Ordering::Relaxed)
}
}
1 change: 1 addition & 0 deletions src/batch/src/task/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ impl BatchEnvironment {
task_manager: Arc::new(BatchManager::new(
BatchConfig::default(),
BatchManagerMetrics::for_test(),
u64::MAX,
)),
server_addr: "127.0.0.1:5688".parse().unwrap(),
config: Arc::new(BatchConfig::default()),
Expand Down
4 changes: 0 additions & 4 deletions src/batch/src/task/task_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,10 +691,6 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
}
}

pub fn mem_usage(&self) -> usize {
self.context.mem_usage()
}

/// Check the task status: whether has ended.
pub fn is_end(&self) -> bool {
let guard = self.state.lock();
Expand Down
Loading

0 comments on commit 0128d55

Please sign in to comment.