From 88d79fa76ce1e027c0bc4a01db6e7f934106157e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?misha=20=F0=9F=90=A6=E2=80=8D=F0=9F=94=A5?= <146671001+ovnanova@users.noreply.github.com> Date: Thu, 24 Oct 2024 15:40:29 -0700 Subject: [PATCH] it's a memory allocator --- .github/workflows/rust.yml | 22 ++++ Cargo.toml | 14 +++ src/atomalloc.rs | 211 +++++++++++++++++++++++++++++++++++++ src/benchmark.rs | 100 ++++++++++++++++++ src/block.rs | 52 +++++++++ src/cache.rs | 103 ++++++++++++++++++ src/error.rs | 33 ++++++ src/lib.rs | 5 + src/main.rs | 25 +++++ tests/allocator_tests.rs | 116 ++++++++++++++++++++ 10 files changed, 681 insertions(+) create mode 100644 .github/workflows/rust.yml create mode 100644 src/atomalloc.rs create mode 100644 src/benchmark.rs create mode 100644 src/block.rs create mode 100644 src/cache.rs create mode 100644 src/error.rs create mode 100644 src/lib.rs create mode 100644 src/main.rs create mode 100644 tests/allocator_tests.rs diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml new file mode 100644 index 0000000..9fd45e0 --- /dev/null +++ b/.github/workflows/rust.yml @@ -0,0 +1,22 @@ +name: Rust + +on: + push: + branches: [ "main" ] + pull_request: + branches: [ "main" ] + +env: + CARGO_TERM_COLOR: always + +jobs: + build: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + - name: Build + run: cargo build --verbose + - name: Run tests + run: cargo test --verbose diff --git a/Cargo.toml b/Cargo.toml index 26196ac..c1b22a4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,3 +7,17 @@ license = "MPL-2.0" readme = "README.md" repository = "https://github.com/ovnanova/atomalloc" +[dependencies] +async-trait = "0.1.83" +dashmap = "6.1.0" +miette = "7.2.0" +tokio = { version = "1.41.0", features = ["full"] } +tracing = "0.1.40" +tracing-subscriber = "0.3.18" + +[profile.release] +lto = true +codegen-units = 1 +panic = "abort" +opt-level = 3 +strip = true diff --git a/src/atomalloc.rs b/src/atomalloc.rs new file mode 100644 index 0000000..53d1086 --- /dev/null +++ b/src/atomalloc.rs @@ -0,0 +1,211 @@ +//! AtomAlloc: Async-first atomic allocator for Alloy +//! +//! Core features: +//! - Async by default +//! - Lock-free operations +//! - Zero unsafe code +//! - Basic security guarantees +//! - Task-local caching +//! +use super::{block::AtomicBlock, cache::BlockCache, error::AllocError}; +use async_trait::async_trait; +use dashmap::DashMap; +use std::{ + alloc::Layout, + sync::{ + atomic::{AtomicU64, AtomicUsize, Ordering}, + Arc, LazyLock, + }, + time::{Duration, Instant}, +}; + +/// Common block sizes for efficient allocation +const BLOCK_SIZES: &[usize] = &[ + 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, +]; + +#[derive(Clone, Debug, Default)] +pub struct AtomAlloc { + // Cache management + caches: Arc, Instant)>>, + next_cache_id: Arc, + current_generation: Arc, + + // Statistics + total_allocated: Arc, + total_freed: Arc, + + // Configuration + cache_ttl: Duration, + cleanup_threshold: usize, +} + +#[async_trait] +pub trait Alloc: Send + Sync + 'static { + async fn alloc(&self, layout: Layout) -> Result, AllocError>; + async fn dealloc(&self, block: Arc); +} + +impl AtomAlloc { + pub fn new() -> Self { + Self { + caches: Arc::new(DashMap::new()), + next_cache_id: Arc::new(AtomicU64::new(0)), + current_generation: Arc::new(AtomicU64::new(0)), + total_allocated: Arc::new(AtomicUsize::new(0)), + total_freed: Arc::new(AtomicUsize::new(0)), + cache_ttl: Duration::from_secs(300), // 5 minutes default TTL + cleanup_threshold: 1000, // Cleanup when we have >1000 caches + } + } + + pub fn global() -> &'static Self { + static GLOBAL: LazyLock = LazyLock::new(AtomAlloc::new); + &GLOBAL + } + + pub fn with_config(cache_ttl: Duration, cleanup_threshold: usize) -> Self { + Self { + cache_ttl, + cleanup_threshold, + ..Default::default() + } + } + + pub fn get_block_size(&self, size: usize) -> usize { + BLOCK_SIZES + .iter() + .find(|&&s| size <= s) + .copied() + .unwrap_or(size) + } + + fn get_next_id(&self) -> u64 { + self.next_cache_id.fetch_add(1, Ordering::Relaxed) + } + + pub fn current_generation(&self) -> u64 { + self.current_generation.load(Ordering::Relaxed) + } + + pub fn new_generation(&self) -> u64 { + self.current_generation.fetch_add(1, Ordering::Relaxed) + } + + pub fn get_cache(&self) -> Arc { + let cache_id = self.get_next_id(); + let generation = self.current_generation(); + + // Perform cleanup if we have too many caches + if self.caches.len() > self.cleanup_threshold { + self.cleanup_old_caches(); + } + + let cache = Arc::new(BlockCache::new(generation)); + let (cache, _) = self + .caches + .entry(cache_id) + .or_insert_with(|| (cache, Instant::now())) + .clone(); + + cache + } + + fn cleanup_old_caches(&self) { + let current_gen = self.current_generation(); + + // First cleanup old blocks in each cache + for entry in self.caches.iter() { + let (_, (cache, _)) = entry.pair(); + cache.cleanup_old_blocks(self.cache_ttl, current_gen); + } + + // Then remove empty caches + self.caches.retain(|_, (cache, _)| !cache.is_empty()); + } + + // Statistics methods + pub fn get_stats(&self) -> (usize, usize) { + ( + self.total_allocated.load(Ordering::Relaxed), + self.total_freed.load(Ordering::Relaxed), + ) + } + + async fn allocate_new_block(&self, size: usize) -> Arc { + let block = Arc::new(AtomicBlock::new(size, self.current_generation())); + self.total_allocated.fetch_add(size, Ordering::Relaxed); + block + } + + pub async fn alloc_with_cache( + &self, + layout: Layout, + cache: &Arc, + ) -> Result, AllocError> { + if layout.size() == 0 { + return Err(AllocError::InvalidLayout { + src: String::new(), + span: (0..1).into(), + message: "Layout size cannot be zero".to_string(), + }); + } + + let size = self.get_block_size(layout.size()); + + // Try to get a block from cache first + if let Some(block) = cache.get_block(size).await { + println!("Retrieved block from cache"); + return Ok(block); + } + + // Create new block if cache miss + //println!("Creating new block"); + let block = Arc::new(AtomicBlock::new(size, self.current_generation())); + self.total_allocated.fetch_add(size, Ordering::Relaxed); + Ok(block) + } + + pub async fn dealloc_with_cache(&self, block: Arc, cache: &Arc) { + let size = block.get_size(); + //println!("Deallocating block of size {}", size); + cache.return_block(block).await; + self.total_freed.fetch_add(size, Ordering::Relaxed); + } +} + +#[async_trait] +impl Alloc for AtomAlloc { + async fn alloc(&self, layout: Layout) -> Result, AllocError> { + if layout.size() == 0 { + return Err(AllocError::InvalidLayout { + src: String::new(), + span: (0..1).into(), + message: "Layout size cannot be zero".to_string(), + }); + } + + let size = self.get_block_size(layout.size()); + let cache = self.get_cache(); + + // Try to get a block from cache first + if let Some(block) = cache.get_block(size).await { + println!("Retrieved block from cache"); // Debug logging + return Ok(block); + } + + // Create new block if cache miss + println!("Creating new block"); // Debug logging + let block = self.allocate_new_block(size).await; + Ok(block) + } + + async fn dealloc(&self, block: Arc) { + let size = block.get_size(); + println!("Deallocating block of size {}", size); // Debug logging + + // Return to cache + self.get_cache().return_block(block).await; + self.total_freed.fetch_add(size, Ordering::Relaxed); + } +} diff --git a/src/benchmark.rs b/src/benchmark.rs new file mode 100644 index 0000000..48fc622 --- /dev/null +++ b/src/benchmark.rs @@ -0,0 +1,100 @@ +use crate::atomalloc::AtomAlloc; +use std::{alloc::Layout, time::Instant}; +use tracing::{info, instrument}; + +#[derive(Debug)] +pub struct BenchResult { + pub throughput: f64, + pub duration_ns: u128, + pub operations: usize, +} + +#[derive(Debug)] +pub struct BenchConfig { + pub name: &'static str, + pub size: usize, + pub iterations: usize, +} + +pub struct Benchmark; + +impl Benchmark { + #[instrument(level = "info")] + pub async fn run_atomalloc(config: &BenchConfig) -> BenchResult { + info!( + name = config.name, + size = config.size, + iterations = config.iterations, + "starting AtomAlloc benchmark" + ); + + let atomalloc = AtomAlloc::new(); + let cache = atomalloc.get_cache(); + let start = Instant::now(); + let layout = Layout::from_size_align(config.size, 8).unwrap(); + let mut blocks = Vec::new(); + + // Allocation phase + for _ in 0..config.iterations { + let block = atomalloc.alloc_with_cache(layout, &cache).await.unwrap(); + blocks.push(block); + } + + // Deallocation phase + for block in blocks { + atomalloc.dealloc_with_cache(block, &cache).await; + } + + let duration = start.elapsed(); + let duration_ns = duration.as_nanos(); + let throughput = config.iterations as f64 / duration.as_secs_f64(); + + let result = BenchResult { + throughput, + duration_ns, + operations: config.iterations * 2, // alloc + dealloc + }; + + info!(?result, name = config.name, "completed AtomAlloc benchmark"); + result + } + + pub fn get_standard_configs() -> Vec { + vec![ + BenchConfig { + name: "Small Allocations", + size: 64, + iterations: 1_000_000, + }, + BenchConfig { + name: "Medium Allocations", + size: 1024, + iterations: 100_000, + }, + BenchConfig { + name: "Large Allocations", + size: 32768, + iterations: 10_000, + }, + ] + } + + pub fn format_results(results: &[(&str, BenchResult)]) -> String { + let mut output = String::new(); + output.push_str("\nBenchmark Results:\n"); + output.push_str(&"-".repeat(80)); + output.push('\n'); + + for (name, result) in results { + output.push_str(&format!("\n{} Results:\n", name)); + output.push_str(&format!("Throughput: {:.2} ops/sec\n", result.throughput)); + output.push_str(&format!( + "Average Latency: {:.3} ns\n", + result.duration_ns / result.operations as u128 + )); + output.push_str(&format!("Total Operations: {}\n", result.operations)); + } + + output + } +} diff --git a/src/block.rs b/src/block.rs new file mode 100644 index 0000000..1d3a743 --- /dev/null +++ b/src/block.rs @@ -0,0 +1,52 @@ +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; + +#[derive(Debug)] +pub struct AtomicBlock { + data: Vec, + size: AtomicUsize, + generation: AtomicU64, + last_access: std::sync::atomic::AtomicU64, +} + +impl AtomicBlock { + pub fn new(size: usize, generation: u64) -> Self { + Self { + data: vec![0; size], + size: AtomicUsize::new(size), + generation: AtomicU64::new(generation), + last_access: AtomicU64::new(now_as_nanos()), + } + } + + pub fn get_size(&self) -> usize { + self.size.load(Ordering::Acquire) + } + + pub fn generation(&self) -> u64 { + self.generation.load(Ordering::Relaxed) + } + + pub fn is_current(&self, current_gen: u64) -> bool { + self.generation() == current_gen + } + + pub fn update_access(&self) { + self.last_access.store(now_as_nanos(), Ordering::Relaxed); + } + + pub fn last_access_time(&self) -> u64 { + self.last_access.load(Ordering::Relaxed) + } + + pub fn zero(&mut self) { + self.data.fill(0); + } +} + +fn now_as_nanos() -> u64 { + use std::time::SystemTime; + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_nanos() as u64 +} diff --git a/src/cache.rs b/src/cache.rs new file mode 100644 index 0000000..4248390 --- /dev/null +++ b/src/cache.rs @@ -0,0 +1,103 @@ +use crate::block::AtomicBlock; +use dashmap::DashMap; +use std::{ + sync::{ + atomic::{AtomicU64, AtomicUsize, Ordering}, + Arc, + }, + time::Duration, +}; + +#[derive(Debug)] +pub struct BlockCache { + free_blocks: DashMap>>, + generation: AtomicU64, + hits: AtomicUsize, + misses: AtomicUsize, + block_count: AtomicUsize, +} + +impl BlockCache { + pub fn new(generation: u64) -> Self { + Self { + free_blocks: DashMap::new(), + generation: AtomicU64::new(generation), + hits: AtomicUsize::new(0), + misses: AtomicUsize::new(0), + block_count: AtomicUsize::new(0), + } + } + + pub fn generation(&self) -> u64 { + self.generation.load(Ordering::Relaxed) + } + + pub fn is_empty(&self) -> bool { + self.block_count.load(Ordering::Relaxed) == 0 + } + + pub async fn get_block(&self, size: usize) -> Option> { + if let Some(mut blocks) = self.free_blocks.get_mut(&size) { + if let Some(block) = blocks.pop() { + //println!("Cache hit for size {}", size); // Debug logging + self.hits.fetch_add(1, Ordering::Relaxed); + self.block_count.fetch_sub(1, Ordering::Relaxed); + block.update_access(); + return Some(block); + } + } + //println!("Cache miss for size {}", size); // Debug logging + self.misses.fetch_add(1, Ordering::Relaxed); + None + } + + pub async fn return_block(&self, block: Arc) { + let size = block.get_size(); + //println!("Returning block of size {} to cache", size); // Debug logging + self.free_blocks.entry(size).or_default().push(block); + self.block_count.fetch_add(1, Ordering::Relaxed); + } + + pub fn get_stats(&self) -> (usize, usize) { + ( + self.hits.load(Ordering::Relaxed), + self.misses.load(Ordering::Relaxed), + ) + } + + pub fn cleanup_old_blocks(&self, ttl: Duration, current_gen: u64) { + self.free_blocks.retain(|_, blocks| { + // Remove old blocks from the vec + blocks.retain(|block| { + // Get last access time directly + let now = now_as_nanos(); + let last = block.last_access_time(); + let ttl_nanos = ttl.as_nanos() as u64; + + // Check both age and generation + let is_recent = now.saturating_sub(last) <= ttl_nanos; + let is_current_gen = block.generation() == current_gen; + + is_recent && is_current_gen + }); + // Keep the size entry if there are still blocks + !blocks.is_empty() + }); + + // Update block count + let total = self + .free_blocks + .iter() + .map(|entry| entry.value().len()) + .sum(); + self.block_count.store(total, Ordering::Relaxed); + } +} + +fn now_as_nanos() -> u64 { + use std::time::SystemTime; + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_nanos() as u64 +} diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 0000000..f7009bf --- /dev/null +++ b/src/error.rs @@ -0,0 +1,33 @@ +use miette::{Diagnostic, SourceSpan}; + +#[derive(Debug, Diagnostic)] +pub enum AllocError { + #[diagnostic(code(alloy::allocator::out_of_memory))] + #[diagnostic(help("Try freeing memory or reducing allocation size"))] + OutOfMemory, + + #[diagnostic(code(alloy::allocator::invalid_layout))] + InvalidLayout { + #[source_code] + src: String, + #[label("Invalid layout parameters")] + span: SourceSpan, + message: String, + }, + + #[diagnostic(code(alloy::allocator::security_violation))] + #[diagnostic(help("This may indicate a memory corruption or security issue"))] + SecurityViolation { + message: String, + #[label("Security violation occurred here")] + span: SourceSpan, + }, +} + +impl std::fmt::Display for AllocError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + +impl std::error::Error for AllocError {} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..9c8685c --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,5 @@ +pub mod atomalloc; +pub mod benchmark; +mod block; +mod cache; +pub mod error; diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..25e5409 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,25 @@ +use atomalloc::benchmark::Benchmark; +use tracing::info; + +#[tokio::main] +async fn main() { + // Initialize tracing + tracing_subscriber::fmt::init(); + + info!("Starting AtomAlloc benchmarks"); + + // Get standard benchmark configurations + let configs = Benchmark::get_standard_configs(); + let mut results = Vec::new(); + + // Run benchmarks + for config in configs { + info!(name = config.name, "Running benchmark configuration"); + + let result = Benchmark::run_atomalloc(&config).await; + results.push((config.name, result)); + } + + // Print formatted results + println!("{}", Benchmark::format_results(&results)); +} diff --git a/tests/allocator_tests.rs b/tests/allocator_tests.rs new file mode 100644 index 0000000..d6503e1 --- /dev/null +++ b/tests/allocator_tests.rs @@ -0,0 +1,116 @@ +#[cfg(test)] +mod tests { + use atomalloc::atomalloc::{Alloc, AtomAlloc}; + use std::alloc::Layout; + use tokio::runtime::Runtime; + + #[test] + fn test_basic_alloc_dealloc() { + let rt = Runtime::new().unwrap(); + rt.block_on(async { + let alloc = AtomAlloc::new(); + let layout = Layout::from_size_align(64, 8).unwrap(); + + // Test allocation + let block = alloc.alloc(layout).await.unwrap(); + assert_eq!(block.get_size(), alloc.get_block_size(64)); + + // Test deallocation + alloc.dealloc(block).await; + }); + } + + #[test] + fn test_multiple_sizes() { + let rt = Runtime::new().unwrap(); + rt.block_on(async { + let alloc = AtomAlloc::new(); + let sizes = [64, 1024, 32768]; + + for &size in &sizes { + let layout = Layout::from_size_align(size, 8).unwrap(); + let block = alloc.alloc(layout).await.unwrap(); + assert_eq!(block.get_size(), alloc.get_block_size(size)); + alloc.dealloc(block).await; + } + }); + } + + #[test] + fn test_parallel_allocations() { + let rt = Runtime::new().unwrap(); + rt.block_on(async { + let alloc = AtomAlloc::new(); + let mut handles = vec![]; + + // Spawn multiple allocation tasks + for i in 0..10 { + let alloc = alloc.clone(); + let handle = tokio::spawn(async move { + tracing::info!("Starting allocation task {}", i); + let layout = Layout::from_size_align(64, 8).unwrap(); + let block = alloc.alloc(layout).await.unwrap(); + alloc.dealloc(block).await; + tracing::info!("Completed allocation task {}", i); + }); + handles.push(handle); + } + + // Wait for all tasks to complete + for (i, handle) in handles.into_iter().enumerate() { + if let Err(e) = handle.await { + panic!("Task {} failed: {}", i, e); + } + } + }); + } + + #[test] + fn test_cache_behavior() { + let rt = Runtime::new().unwrap(); + rt.block_on(async { + let alloc = AtomAlloc::new(); + let layout = Layout::from_size_align(64, 8).unwrap(); + + // Get a single cache instance to use throughout + let cache = alloc.get_cache(); + + // First allocation - should miss + println!("First allocation"); + let block1 = alloc.alloc_with_cache(layout, &cache).await.unwrap(); + alloc.dealloc_with_cache(block1, &cache).await; + + // Second allocation - should hit + println!("Second allocation"); + let block2 = alloc.alloc_with_cache(layout, &cache).await.unwrap(); + let (hits, misses) = cache.get_stats(); + + println!("Cache stats - hits: {}, misses: {}", hits, misses); + assert!(hits > 0, "Expected cache hits"); + assert!(misses > 0, "Expected cache misses"); + + alloc.dealloc_with_cache(block2, &cache).await; + }); + } + + #[test] + fn test_large_allocation_series() { + let rt = Runtime::new().unwrap(); + rt.block_on(async { + let alloc = AtomAlloc::new(); + let mut blocks = Vec::new(); + + // Allocate series of blocks + for size in (64..=1024).step_by(64) { + let layout = Layout::from_size_align(size, 8).unwrap(); + let block = alloc.alloc(layout).await.unwrap(); + blocks.push(block); + } + + // Deallocate in reverse order + while let Some(block) = blocks.pop() { + alloc.dealloc(block).await; + } + }); + } +}