Skip to content

Commit

Permalink
Merge branch 'main' into chore/debug-lookup-join
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 authored Jun 11, 2024
2 parents 3655233 + daa1c42 commit 49b9d93
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 6 deletions.
16 changes: 16 additions & 0 deletions docs/backfill.md
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,22 @@ and arrangement backfill will consume this historical data snapshot:
| 1 | 'Jack' | 29 |
| 2 | 'Jill' | 30 |

#### Initialization

Something to note is that for the first snapshot,
upstream may not have finished committing data in that epoch to s3.

Additionally, we have not replicated any upstream records
during that epoch, only in the subsequent ones.

As such, we must wait for that first checkpoint to be committed,
before reading, or we risk missing the uncommitted data in our backfill.

This is supported internally inside `init_epoch` for replicated state table.
```shell
upstream_table.init_epoch(first_epoch).await?;
```

### Recovery

TODO
Expand Down
52 changes: 46 additions & 6 deletions src/compute/src/memory/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ pub const MIN_COMPUTE_MEMORY_MB: usize = 512;
/// overhead, network buffer, etc.) in megabytes.
pub const MIN_SYSTEM_RESERVED_MEMORY_MB: usize = 512;

const SYSTEM_RESERVED_MEMORY_PROPORTION: f64 = 0.3;
const RESERVED_MEMORY_LEVELS: [usize; 2] = [16 << 30, usize::MAX];

const RESERVED_MEMORY_PROPORTIONS: [f64; 2] = [0.3, 0.2];

const STORAGE_MEMORY_PROPORTION: f64 = 0.3;

Expand All @@ -44,7 +46,7 @@ const STORAGE_SHARED_BUFFER_MEMORY_PROPORTION: f64 = 0.3;
const COMPUTE_BATCH_MEMORY_PROPORTION: f64 = 0.3;

/// Each compute node reserves some memory for stack and code segment of processes, allocation
/// overhead, network buffer, etc. based on `SYSTEM_RESERVED_MEMORY_PROPORTION`. The reserve memory
/// overhead, network buffer, etc. based on gradient reserve memory proportion. The reserve memory
/// size must be larger than `MIN_SYSTEM_RESERVED_MEMORY_MB`
pub fn reserve_memory_bytes(opts: &ComputeNodeOpts) -> (usize, usize) {
if opts.total_memory_bytes < MIN_COMPUTE_MEMORY_MB << 20 {
Expand All @@ -55,17 +57,42 @@ pub fn reserve_memory_bytes(opts: &ComputeNodeOpts) -> (usize, usize) {
);
}

// If `reserved_memory_bytes` is not set, use `SYSTEM_RESERVED_MEMORY_PROPORTION` * `total_memory_bytes`.
let reserved = opts.reserved_memory_bytes.unwrap_or_else(|| {
(opts.total_memory_bytes as f64 * SYSTEM_RESERVED_MEMORY_PROPORTION).ceil() as usize
});
// If `reserved_memory_bytes` is not set, calculate total_memory_bytes based on gradient reserve memory proportion.
let reserved = opts
.reserved_memory_bytes
.unwrap_or_else(|| gradient_reserve_memory_bytes(opts.total_memory_bytes));

// Should have at least `MIN_SYSTEM_RESERVED_MEMORY_MB` for reserved memory.
let reserved = std::cmp::max(reserved, MIN_SYSTEM_RESERVED_MEMORY_MB << 20);

(reserved, opts.total_memory_bytes - reserved)
}

/// Calculate the reserved memory based on the total memory size.
/// The reserved memory size is calculated based on the following gradient:
/// - 30% of the first 16GB
/// - 20% of the rest
fn gradient_reserve_memory_bytes(total_memory_bytes: usize) -> usize {
let mut total_memory_bytes = total_memory_bytes;
let mut reserved = 0;
for i in 0..RESERVED_MEMORY_LEVELS.len() {
let level_diff = if i == 0 {
RESERVED_MEMORY_LEVELS[0]
} else {
RESERVED_MEMORY_LEVELS[i] - RESERVED_MEMORY_LEVELS[i - 1]
};
if total_memory_bytes <= level_diff {
reserved += (total_memory_bytes as f64 * RESERVED_MEMORY_PROPORTIONS[i]) as usize;
break;
} else {
reserved += (level_diff as f64 * RESERVED_MEMORY_PROPORTIONS[i]) as usize;
total_memory_bytes -= level_diff;
}
}

reserved
}

/// Decide the memory limit for each storage cache. If not specified in `StorageConfig`, memory
/// limits are calculated based on the proportions to total `non_reserved_memory_bytes`.
pub fn storage_memory_config(
Expand Down Expand Up @@ -346,4 +373,17 @@ mod tests {
assert_eq!(memory_config.shared_buffer_capacity_mb, 1024);
assert_eq!(memory_config.compactor_memory_limit_mb, 512);
}

#[test]
fn test_gradient_reserve_memory_bytes() {
assert_eq!(super::gradient_reserve_memory_bytes(4 << 30), 1288490188);
assert_eq!(super::gradient_reserve_memory_bytes(8 << 30), 2576980377);
assert_eq!(super::gradient_reserve_memory_bytes(16 << 30), 5153960755);
assert_eq!(super::gradient_reserve_memory_bytes(24 << 30), 6871947673);
assert_eq!(super::gradient_reserve_memory_bytes(32 << 30), 8589934591);
assert_eq!(super::gradient_reserve_memory_bytes(54 << 30), 13314398617);
assert_eq!(super::gradient_reserve_memory_bytes(64 << 30), 15461882265);
assert_eq!(super::gradient_reserve_memory_bytes(100 << 30), 23192823398);
assert_eq!(super::gradient_reserve_memory_bytes(128 << 30), 29205777612);
}
}

0 comments on commit 49b9d93

Please sign in to comment.