Skip to content

Commit

Permalink
it's a memory allocator
Browse files Browse the repository at this point in the history
  • Loading branch information
mekaem committed Oct 24, 2024
1 parent 42cda02 commit 88d79fa
Show file tree
Hide file tree
Showing 10 changed files with 681 additions and 0 deletions.
22 changes: 22 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
name: Rust

on:
push:
branches: [ "main" ]
pull_request:
branches: [ "main" ]

env:
CARGO_TERM_COLOR: always

jobs:
build:

runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4
- name: Build
run: cargo build --verbose
- name: Run tests
run: cargo test --verbose
14 changes: 14 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,17 @@ license = "MPL-2.0"
readme = "README.md"
repository = "https://github.com/ovnanova/atomalloc"

[dependencies]
async-trait = "0.1.83"
dashmap = "6.1.0"
miette = "7.2.0"
tokio = { version = "1.41.0", features = ["full"] }
tracing = "0.1.40"
tracing-subscriber = "0.3.18"

[profile.release]
lto = true
codegen-units = 1
panic = "abort"
opt-level = 3
strip = true
211 changes: 211 additions & 0 deletions src/atomalloc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
//! AtomAlloc: Async-first atomic allocator for Alloy
//!
//! Core features:
//! - Async by default
//! - Lock-free operations
//! - Zero unsafe code
//! - Basic security guarantees
//! - Task-local caching
//!
use super::{block::AtomicBlock, cache::BlockCache, error::AllocError};
use async_trait::async_trait;
use dashmap::DashMap;
use std::{
alloc::Layout,
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc, LazyLock,
},
time::{Duration, Instant},
};

/// Common block sizes for efficient allocation
const BLOCK_SIZES: &[usize] = &[
16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768,
];

#[derive(Clone, Debug, Default)]
pub struct AtomAlloc {
// Cache management
caches: Arc<DashMap<u64, (Arc<BlockCache>, Instant)>>,
next_cache_id: Arc<AtomicU64>,
current_generation: Arc<AtomicU64>,

// Statistics
total_allocated: Arc<AtomicUsize>,
total_freed: Arc<AtomicUsize>,

// Configuration
cache_ttl: Duration,
cleanup_threshold: usize,
}

#[async_trait]
pub trait Alloc: Send + Sync + 'static {
async fn alloc(&self, layout: Layout) -> Result<Arc<AtomicBlock>, AllocError>;
async fn dealloc(&self, block: Arc<AtomicBlock>);
}

impl AtomAlloc {
pub fn new() -> Self {
Self {
caches: Arc::new(DashMap::new()),
next_cache_id: Arc::new(AtomicU64::new(0)),
current_generation: Arc::new(AtomicU64::new(0)),
total_allocated: Arc::new(AtomicUsize::new(0)),
total_freed: Arc::new(AtomicUsize::new(0)),
cache_ttl: Duration::from_secs(300), // 5 minutes default TTL
cleanup_threshold: 1000, // Cleanup when we have >1000 caches
}
}

pub fn global() -> &'static Self {
static GLOBAL: LazyLock<AtomAlloc> = LazyLock::new(AtomAlloc::new);
&GLOBAL
}

pub fn with_config(cache_ttl: Duration, cleanup_threshold: usize) -> Self {
Self {
cache_ttl,
cleanup_threshold,
..Default::default()
}
}

pub fn get_block_size(&self, size: usize) -> usize {
BLOCK_SIZES
.iter()
.find(|&&s| size <= s)
.copied()
.unwrap_or(size)
}

fn get_next_id(&self) -> u64 {
self.next_cache_id.fetch_add(1, Ordering::Relaxed)
}

pub fn current_generation(&self) -> u64 {
self.current_generation.load(Ordering::Relaxed)
}

pub fn new_generation(&self) -> u64 {
self.current_generation.fetch_add(1, Ordering::Relaxed)
}

pub fn get_cache(&self) -> Arc<BlockCache> {
let cache_id = self.get_next_id();
let generation = self.current_generation();

// Perform cleanup if we have too many caches
if self.caches.len() > self.cleanup_threshold {
self.cleanup_old_caches();
}

let cache = Arc::new(BlockCache::new(generation));
let (cache, _) = self
.caches
.entry(cache_id)
.or_insert_with(|| (cache, Instant::now()))
.clone();

cache
}

fn cleanup_old_caches(&self) {
let current_gen = self.current_generation();

// First cleanup old blocks in each cache
for entry in self.caches.iter() {
let (_, (cache, _)) = entry.pair();
cache.cleanup_old_blocks(self.cache_ttl, current_gen);
}

// Then remove empty caches
self.caches.retain(|_, (cache, _)| !cache.is_empty());
}

// Statistics methods
pub fn get_stats(&self) -> (usize, usize) {
(
self.total_allocated.load(Ordering::Relaxed),
self.total_freed.load(Ordering::Relaxed),
)
}

async fn allocate_new_block(&self, size: usize) -> Arc<AtomicBlock> {
let block = Arc::new(AtomicBlock::new(size, self.current_generation()));
self.total_allocated.fetch_add(size, Ordering::Relaxed);
block
}

pub async fn alloc_with_cache(
&self,
layout: Layout,
cache: &Arc<BlockCache>,
) -> Result<Arc<AtomicBlock>, AllocError> {
if layout.size() == 0 {
return Err(AllocError::InvalidLayout {
src: String::new(),
span: (0..1).into(),
message: "Layout size cannot be zero".to_string(),
});
}

let size = self.get_block_size(layout.size());

// Try to get a block from cache first
if let Some(block) = cache.get_block(size).await {
println!("Retrieved block from cache");
return Ok(block);
}

// Create new block if cache miss
//println!("Creating new block");
let block = Arc::new(AtomicBlock::new(size, self.current_generation()));
self.total_allocated.fetch_add(size, Ordering::Relaxed);
Ok(block)
}

pub async fn dealloc_with_cache(&self, block: Arc<AtomicBlock>, cache: &Arc<BlockCache>) {
let size = block.get_size();
//println!("Deallocating block of size {}", size);
cache.return_block(block).await;
self.total_freed.fetch_add(size, Ordering::Relaxed);
}
}

#[async_trait]
impl Alloc for AtomAlloc {
async fn alloc(&self, layout: Layout) -> Result<Arc<AtomicBlock>, AllocError> {
if layout.size() == 0 {
return Err(AllocError::InvalidLayout {
src: String::new(),
span: (0..1).into(),
message: "Layout size cannot be zero".to_string(),
});
}

let size = self.get_block_size(layout.size());
let cache = self.get_cache();

// Try to get a block from cache first
if let Some(block) = cache.get_block(size).await {
println!("Retrieved block from cache"); // Debug logging
return Ok(block);
}

// Create new block if cache miss
println!("Creating new block"); // Debug logging
let block = self.allocate_new_block(size).await;
Ok(block)
}

async fn dealloc(&self, block: Arc<AtomicBlock>) {
let size = block.get_size();
println!("Deallocating block of size {}", size); // Debug logging

// Return to cache
self.get_cache().return_block(block).await;
self.total_freed.fetch_add(size, Ordering::Relaxed);
}
}
100 changes: 100 additions & 0 deletions src/benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use crate::atomalloc::AtomAlloc;
use std::{alloc::Layout, time::Instant};
use tracing::{info, instrument};

#[derive(Debug)]
pub struct BenchResult {
pub throughput: f64,
pub duration_ns: u128,
pub operations: usize,
}

#[derive(Debug)]
pub struct BenchConfig {
pub name: &'static str,
pub size: usize,
pub iterations: usize,
}

pub struct Benchmark;

impl Benchmark {
#[instrument(level = "info")]
pub async fn run_atomalloc(config: &BenchConfig) -> BenchResult {
info!(
name = config.name,
size = config.size,
iterations = config.iterations,
"starting AtomAlloc benchmark"
);

let atomalloc = AtomAlloc::new();
let cache = atomalloc.get_cache();
let start = Instant::now();
let layout = Layout::from_size_align(config.size, 8).unwrap();
let mut blocks = Vec::new();

// Allocation phase
for _ in 0..config.iterations {
let block = atomalloc.alloc_with_cache(layout, &cache).await.unwrap();
blocks.push(block);
}

// Deallocation phase
for block in blocks {
atomalloc.dealloc_with_cache(block, &cache).await;
}

let duration = start.elapsed();
let duration_ns = duration.as_nanos();
let throughput = config.iterations as f64 / duration.as_secs_f64();

let result = BenchResult {
throughput,
duration_ns,
operations: config.iterations * 2, // alloc + dealloc
};

info!(?result, name = config.name, "completed AtomAlloc benchmark");
result
}

pub fn get_standard_configs() -> Vec<BenchConfig> {
vec![
BenchConfig {
name: "Small Allocations",
size: 64,
iterations: 1_000_000,
},
BenchConfig {
name: "Medium Allocations",
size: 1024,
iterations: 100_000,
},
BenchConfig {
name: "Large Allocations",
size: 32768,
iterations: 10_000,
},
]
}

pub fn format_results(results: &[(&str, BenchResult)]) -> String {
let mut output = String::new();
output.push_str("\nBenchmark Results:\n");
output.push_str(&"-".repeat(80));
output.push('\n');

for (name, result) in results {
output.push_str(&format!("\n{} Results:\n", name));
output.push_str(&format!("Throughput: {:.2} ops/sec\n", result.throughput));
output.push_str(&format!(
"Average Latency: {:.3} ns\n",
result.duration_ns / result.operations as u128
));
output.push_str(&format!("Total Operations: {}\n", result.operations));
}

output
}
}
Loading

0 comments on commit 88d79fa

Please sign in to comment.