Skip to content
This repository has been archived by the owner on Mar 23, 2024. It is now read-only.

Commit

Permalink
Fix incidents preload (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
pwbh authored Oct 31, 2023
1 parent 1bdb41c commit de8f57d
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 111 deletions.
2 changes: 1 addition & 1 deletion nyx-storage/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl Batch {
latest_segment_count: usize,
latest_segment_size: usize,
) -> Result<BatchState, String> {
if self.current_batch_size + buf.len() < self.buffer.len() {
if self.current_batch_size + buf.len() < MAX_BATCH_SIZE {
if self.current_batch_index == 0 {
self.current_batch_index = latest_segment_count;
self.current_segment_size = latest_segment_size;
Expand Down
17 changes: 16 additions & 1 deletion nyx-storage/src/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
};

use async_std::{
fs::{self, File, OpenOptions},
fs::{self, DirEntry, File, OpenOptions, ReadDir},
io,
};

Expand Down Expand Up @@ -75,13 +75,15 @@ impl Directory {

pub async fn open_read(&self, datatype: DataType, count: usize) -> io::Result<File> {
let path = self.get_file_path(datatype, count)?;

OpenOptions::new().read(true).open(path).await
}

pub async fn open_write(&self, datatype: DataType, count: usize) -> io::Result<File> {
let path = self
.get_file_path(datatype, count)
.map_err(|e| Error::new(ErrorKind::NotFound, e))?;

OpenOptions::new()
.append(true)
.create(true)
Expand All @@ -93,6 +95,19 @@ impl Directory {
let path = self
.get_file_path(datatype, count)
.map_err(|e| Error::new(ErrorKind::NotFound, e))?;

OpenOptions::new().read(true).append(true).open(path).await
}

pub async fn open_read_write_create(
&self,
datatype: DataType,
count: usize,
) -> io::Result<File> {
let path = self
.get_file_path(datatype, count)
.map_err(|e| Error::new(ErrorKind::NotFound, e))?;

OpenOptions::new()
.read(true)
.append(true)
Expand Down
96 changes: 43 additions & 53 deletions nyx-storage/src/indices.rs
Original file line number Diff line number Diff line change
@@ -1,65 +1,57 @@
use std::collections::HashMap;
use std::{collections::HashMap, sync::Arc};

use async_std::io::{self, prelude::SeekExt, ReadExt, WriteExt};

use crate::{directory::Directory, offset::Offset};
use crate::{offset::Offset, segment::Segment};

#[derive(Debug)]
pub struct Indices {
pub data: HashMap<usize, Offset>,
pub total_bytes: usize,
}

const INDEX_SIZE: usize = std::mem::size_of::<Offset>();
const OFFSET_SIZE: usize = std::mem::size_of::<Offset>();

impl Indices {
pub async fn from(directory: &Directory) -> io::Result<Self> {
pub async fn from(segments: &[Arc<Segment>]) -> io::Result<Self> {
let mut indices = Self {
data: HashMap::new(),
total_bytes: 0,
};

let mut file = match directory
.open_read(crate::directory::DataType::Indices, 0)
.await
{
Ok(file) => file,
Err(e) => {
println!("Warning in for indexs file: {}", e);
return Ok(indices);
}
};

let mut buf = [0u8; INDEX_SIZE];
for segment in segments {
let mut buf = [0u8; OFFSET_SIZE];
let mut file = &(*segment).file;

loop {
let n = file.read(&mut buf).await?;
loop {
let n = file.read(&mut buf).await?;

if n == 0 {
break;
}
if n == 0 {
break;
}

file.seek(io::SeekFrom::Current(INDEX_SIZE as i64));
file.seek(io::SeekFrom::Current(OFFSET_SIZE as i64));

let index = usize::from_le_bytes([
buf[0], buf[1], buf[2], buf[3], buf[4], buf[5], buf[6], buf[7],
]);
let index = usize::from_le_bytes([
buf[0], buf[1], buf[2], buf[3], buf[4], buf[5], buf[6], buf[7],
]);

let start = usize::from_le_bytes([
buf[8], buf[9], buf[10], buf[11], buf[12], buf[13], buf[14], buf[15],
]);
let start = usize::from_le_bytes([
buf[8], buf[9], buf[10], buf[11], buf[12], buf[13], buf[14], buf[15],
]);

let data_size = usize::from_le_bytes([
buf[16], buf[17], buf[18], buf[19], buf[20], buf[21], buf[22], buf[23],
]);
let data_size = usize::from_le_bytes([
buf[16], buf[17], buf[18], buf[19], buf[20], buf[21], buf[22], buf[23],
]);

let segment_index = usize::from_le_bytes([
buf[24], buf[25], buf[26], buf[27], buf[28], buf[29], buf[30], buf[31],
]);
let segment_index = usize::from_le_bytes([
buf[24], buf[25], buf[26], buf[27], buf[28], buf[29], buf[30], buf[31],
]);

indices
.data
.insert(index, Offset::from(index, start, data_size, segment_index));
indices
.data
.insert(index, Offset::from(index, start, data_size, segment_index));
}
}

Ok(indices)
Expand All @@ -69,13 +61,11 @@ impl Indices {
#[cfg(test)]
mod tests {

use std::io::SeekFrom;

use crate::macros::function;
use crate::{directory::Directory, macros::function, MAX_SEGMENT_SIZE};

use super::*;

async fn create_test_data(directory: &Directory) {
async fn create_test_data(directory: &Directory) -> Vec<Arc<Segment>> {
let mut offsets = vec![];

for i in 0..50 {
Expand All @@ -88,28 +78,28 @@ mod tests {
.await
.unwrap();

for (index, offset) in offsets.iter().enumerate() {
let index_bytes = unsafe { *(&index as *const _ as *const [u8; 8]) };
for offset in offsets.iter() {
let offset_bytes = offset.as_bytes();

file.write_all(&index_bytes).await.unwrap();
file.seek(SeekFrom::End(0)).await.unwrap();

file.write_all(offset_bytes).await.unwrap();
file.seek(SeekFrom::End(0)).await.unwrap();
}

let segment = Segment::new(&directory, crate::directory::DataType::Indices, 0)
.await
.unwrap();

vec![Arc::new(segment)]
}

#[async_std::test]
async fn indices_from() {
let path = format!("./{}", function!());
let directory = Directory::new(&path).await.unwrap();
let segments = create_test_data(&directory).await;
let indices_result = Indices::from(&segments).await.unwrap();

create_test_data(&directory).await;

let indices_result = Indices::from(&directory).await;

assert!(indices_result.is_ok());
for (k, v) in indices_result.data {
assert_eq!(v, Offset::new(k, 15, 2500, 0).unwrap())
}

directory
.delete_file(crate::directory::DataType::Indices, 0)
Expand Down
33 changes: 12 additions & 21 deletions nyx-storage/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
use std::sync::Arc;

use async_std::{
channel::{bounded, Sender},
io::{self, prelude::SeekExt, ReadExt, SeekFrom, WriteExt},
};
use async_std::io::{self, prelude::SeekExt, ReadExt, SeekFrom, WriteExt};
use batch::{Batch, BatchState};
use compactor::Compactor;
use directory::{DataType, Directory};
use indices::Indices;
use offset::Offset;
use segment::Segment;
use segmentation_manager::SegmentationManager;

Expand Down Expand Up @@ -38,28 +33,25 @@ pub struct Storage {
segmentation_manager: SegmentationManager,
retrivable_buffer: [u8; MAX_MESSAGE_SIZE],
batch: Batch,
segment_sender: Sender<Segment>,
compaction: bool,
}

impl Storage {
pub async fn new(title: &str, max_queue: usize, compaction: bool) -> Result<Self, String> {
pub async fn new(title: &str, compaction: bool) -> Result<Self, String> {
let directory = Directory::new(title)
.await
.map_err(|e| format!("Storage (Directory::new): {}", e))?;

let indices = Indices::from(&directory)
let segmentation_manager = SegmentationManager::from(&directory)
.await
.map_err(|e| format!("Storage (Indices::from): {}", e))?;
.map_err(|e| format!("Storage (SegmentationManager::from): {}", e))?;

let segmentation_manager = SegmentationManager::new(&directory)
let indices = Indices::from(&segmentation_manager.indices_segments())
.await
.map_err(|e| format!("Storage (SegmentationManager::new): {}", e))?;

let (segment_sender, segment_receiver) = bounded(max_queue);
.map_err(|e| format!("Storage (Indices::from): {}", e))?;

if compaction {
async_std::task::spawn(Compactor::run(segment_receiver));
// async_std::task::spawn(Compactor::run(segment_receiver));
}

Ok(Self {
Expand All @@ -68,7 +60,6 @@ impl Storage {
segmentation_manager,
retrivable_buffer: [0; MAX_MESSAGE_SIZE],
batch: Batch::new(),
segment_sender,
compaction,
})
}
Expand Down Expand Up @@ -192,7 +183,7 @@ mod tests {
}

async fn setup_test_storage(title: &str, test_message: &[u8], count: usize) -> Storage {
let mut storage = Storage::new(title, 10_000, false).await.unwrap();
let mut storage = Storage::new(title, false).await.unwrap();

let messages = vec![test_message; count];

Expand All @@ -218,15 +209,15 @@ mod tests {
#[cfg_attr(miri, ignore)]
async fn new_creates_instances() {
// (l)eader/(r)eplica_topic-name_partition-count
let storage = Storage::new("TEST_l_reservations_1", 10_000, false).await;
let storage = Storage::new("TEST_l_reservations_1", false).await;

assert!(storage.is_ok());
}

#[async_std::test]
#[cfg_attr(miri, ignore)]
async fn get_returns_ok() {
let message_count = 100_000;
let message_count = 500;
let test_message = b"hello guys";

let mut storage = setup_test_storage(&function!(), test_message, message_count).await;
Expand All @@ -245,9 +236,9 @@ mod tests {

println!("Read {} messages in: {:.2?}", length, elapsed);

// assert_eq!(storage.len(), message_count);
assert_eq!(storage.len(), message_count);

// cleanup(&storage).await;
cleanup(&storage).await;
}

#[async_std::test]
Expand Down
2 changes: 1 addition & 1 deletion nyx-storage/src/offset.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, PartialEq)]
#[repr(C)]
pub struct Offset {
index: usize,
Expand Down
27 changes: 15 additions & 12 deletions nyx-storage/src/segment.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,33 @@
use async_std::{fs::File, io};

use crate::directory::{DataType, Directory};
use crate::{
directory::{DataType, Directory},
MAX_SEGMENT_SIZE,
};

#[derive(Debug)]
pub struct Segment {
clean: bool,
length: u64,
pub file: File,
pub location: String,
}

impl Segment {
pub async fn new(
directory: &Directory,
data_type: DataType,
length: u64,
count: usize,
) -> io::Result<Self> {
let file = directory.open_read_write(data_type, count).await?;
let location = directory.get_file_path(data_type, count)?;
pub async fn new(directory: &Directory, data_type: DataType, count: usize) -> io::Result<Self> {
let file = directory.open_read_write_create(data_type, count).await?;

Ok(Self {
length,
length: MAX_SEGMENT_SIZE,
clean: false,
file,
})
}

pub async fn from(data_type: DataType, file: File) -> io::Result<Self> {
Ok(Self {
length: MAX_SEGMENT_SIZE,
clean: false,
file,
location,
})
}
}
Expand Down
Loading

0 comments on commit de8f57d

Please sign in to comment.