From de8f57dcf03b9001116c5c143cff94251a606644 Mon Sep 17 00:00:00 2001 From: pwbh <127856937+pwbh@users.noreply.github.com> Date: Tue, 31 Oct 2023 18:01:28 +0200 Subject: [PATCH] Fix incidents preload (#11) --- nyx-storage/src/batch.rs | 2 +- nyx-storage/src/directory.rs | 17 ++++- nyx-storage/src/indices.rs | 96 +++++++++++-------------- nyx-storage/src/lib.rs | 33 ++++----- nyx-storage/src/offset.rs | 2 +- nyx-storage/src/segment.rs | 27 +++---- nyx-storage/src/segmentation_manager.rs | 82 +++++++++++++++------ 7 files changed, 148 insertions(+), 111 deletions(-) diff --git a/nyx-storage/src/batch.rs b/nyx-storage/src/batch.rs index 759e8d8..2b55899 100644 --- a/nyx-storage/src/batch.rs +++ b/nyx-storage/src/batch.rs @@ -49,7 +49,7 @@ impl Batch { latest_segment_count: usize, latest_segment_size: usize, ) -> Result { - if self.current_batch_size + buf.len() < self.buffer.len() { + if self.current_batch_size + buf.len() < MAX_BATCH_SIZE { if self.current_batch_index == 0 { self.current_batch_index = latest_segment_count; self.current_segment_size = latest_segment_size; diff --git a/nyx-storage/src/directory.rs b/nyx-storage/src/directory.rs index 5d547a6..47f570e 100644 --- a/nyx-storage/src/directory.rs +++ b/nyx-storage/src/directory.rs @@ -4,7 +4,7 @@ use std::{ }; use async_std::{ - fs::{self, File, OpenOptions}, + fs::{self, DirEntry, File, OpenOptions, ReadDir}, io, }; @@ -75,6 +75,7 @@ impl Directory { pub async fn open_read(&self, datatype: DataType, count: usize) -> io::Result { let path = self.get_file_path(datatype, count)?; + OpenOptions::new().read(true).open(path).await } @@ -82,6 +83,7 @@ impl Directory { let path = self .get_file_path(datatype, count) .map_err(|e| Error::new(ErrorKind::NotFound, e))?; + OpenOptions::new() .append(true) .create(true) @@ -93,6 +95,19 @@ impl Directory { let path = self .get_file_path(datatype, count) .map_err(|e| Error::new(ErrorKind::NotFound, e))?; + + OpenOptions::new().read(true).append(true).open(path).await + } + + pub async fn open_read_write_create( + &self, + datatype: DataType, + count: usize, + ) -> io::Result { + let path = self + .get_file_path(datatype, count) + .map_err(|e| Error::new(ErrorKind::NotFound, e))?; + OpenOptions::new() .read(true) .append(true) diff --git a/nyx-storage/src/indices.rs b/nyx-storage/src/indices.rs index 0d7ea32..dd21a22 100644 --- a/nyx-storage/src/indices.rs +++ b/nyx-storage/src/indices.rs @@ -1,8 +1,8 @@ -use std::collections::HashMap; +use std::{collections::HashMap, sync::Arc}; use async_std::io::{self, prelude::SeekExt, ReadExt, WriteExt}; -use crate::{directory::Directory, offset::Offset}; +use crate::{offset::Offset, segment::Segment}; #[derive(Debug)] pub struct Indices { @@ -10,56 +10,48 @@ pub struct Indices { pub total_bytes: usize, } -const INDEX_SIZE: usize = std::mem::size_of::(); +const OFFSET_SIZE: usize = std::mem::size_of::(); impl Indices { - pub async fn from(directory: &Directory) -> io::Result { + pub async fn from(segments: &[Arc]) -> io::Result { let mut indices = Self { data: HashMap::new(), total_bytes: 0, }; - let mut file = match directory - .open_read(crate::directory::DataType::Indices, 0) - .await - { - Ok(file) => file, - Err(e) => { - println!("Warning in for indexs file: {}", e); - return Ok(indices); - } - }; - - let mut buf = [0u8; INDEX_SIZE]; + for segment in segments { + let mut buf = [0u8; OFFSET_SIZE]; + let mut file = &(*segment).file; - loop { - let n = file.read(&mut buf).await?; + loop { + let n = file.read(&mut buf).await?; - if n == 0 { - break; - } + if n == 0 { + break; + } - file.seek(io::SeekFrom::Current(INDEX_SIZE as i64)); + file.seek(io::SeekFrom::Current(OFFSET_SIZE as i64)); - let index = usize::from_le_bytes([ - buf[0], buf[1], buf[2], buf[3], buf[4], buf[5], buf[6], buf[7], - ]); + let index = usize::from_le_bytes([ + buf[0], buf[1], buf[2], buf[3], buf[4], buf[5], buf[6], buf[7], + ]); - let start = usize::from_le_bytes([ - buf[8], buf[9], buf[10], buf[11], buf[12], buf[13], buf[14], buf[15], - ]); + let start = usize::from_le_bytes([ + buf[8], buf[9], buf[10], buf[11], buf[12], buf[13], buf[14], buf[15], + ]); - let data_size = usize::from_le_bytes([ - buf[16], buf[17], buf[18], buf[19], buf[20], buf[21], buf[22], buf[23], - ]); + let data_size = usize::from_le_bytes([ + buf[16], buf[17], buf[18], buf[19], buf[20], buf[21], buf[22], buf[23], + ]); - let segment_index = usize::from_le_bytes([ - buf[24], buf[25], buf[26], buf[27], buf[28], buf[29], buf[30], buf[31], - ]); + let segment_index = usize::from_le_bytes([ + buf[24], buf[25], buf[26], buf[27], buf[28], buf[29], buf[30], buf[31], + ]); - indices - .data - .insert(index, Offset::from(index, start, data_size, segment_index)); + indices + .data + .insert(index, Offset::from(index, start, data_size, segment_index)); + } } Ok(indices) @@ -69,13 +61,11 @@ impl Indices { #[cfg(test)] mod tests { - use std::io::SeekFrom; - - use crate::macros::function; + use crate::{directory::Directory, macros::function, MAX_SEGMENT_SIZE}; use super::*; - async fn create_test_data(directory: &Directory) { + async fn create_test_data(directory: &Directory) -> Vec> { let mut offsets = vec![]; for i in 0..50 { @@ -88,28 +78,28 @@ mod tests { .await .unwrap(); - for (index, offset) in offsets.iter().enumerate() { - let index_bytes = unsafe { *(&index as *const _ as *const [u8; 8]) }; + for offset in offsets.iter() { let offset_bytes = offset.as_bytes(); - - file.write_all(&index_bytes).await.unwrap(); - file.seek(SeekFrom::End(0)).await.unwrap(); - file.write_all(offset_bytes).await.unwrap(); - file.seek(SeekFrom::End(0)).await.unwrap(); } + + let segment = Segment::new(&directory, crate::directory::DataType::Indices, 0) + .await + .unwrap(); + + vec![Arc::new(segment)] } #[async_std::test] async fn indices_from() { let path = format!("./{}", function!()); let directory = Directory::new(&path).await.unwrap(); + let segments = create_test_data(&directory).await; + let indices_result = Indices::from(&segments).await.unwrap(); - create_test_data(&directory).await; - - let indices_result = Indices::from(&directory).await; - - assert!(indices_result.is_ok()); + for (k, v) in indices_result.data { + assert_eq!(v, Offset::new(k, 15, 2500, 0).unwrap()) + } directory .delete_file(crate::directory::DataType::Indices, 0) diff --git a/nyx-storage/src/lib.rs b/nyx-storage/src/lib.rs index fb53bd7..4d31f69 100644 --- a/nyx-storage/src/lib.rs +++ b/nyx-storage/src/lib.rs @@ -1,14 +1,9 @@ use std::sync::Arc; -use async_std::{ - channel::{bounded, Sender}, - io::{self, prelude::SeekExt, ReadExt, SeekFrom, WriteExt}, -}; +use async_std::io::{self, prelude::SeekExt, ReadExt, SeekFrom, WriteExt}; use batch::{Batch, BatchState}; -use compactor::Compactor; use directory::{DataType, Directory}; use indices::Indices; -use offset::Offset; use segment::Segment; use segmentation_manager::SegmentationManager; @@ -38,28 +33,25 @@ pub struct Storage { segmentation_manager: SegmentationManager, retrivable_buffer: [u8; MAX_MESSAGE_SIZE], batch: Batch, - segment_sender: Sender, compaction: bool, } impl Storage { - pub async fn new(title: &str, max_queue: usize, compaction: bool) -> Result { + pub async fn new(title: &str, compaction: bool) -> Result { let directory = Directory::new(title) .await .map_err(|e| format!("Storage (Directory::new): {}", e))?; - let indices = Indices::from(&directory) + let segmentation_manager = SegmentationManager::from(&directory) .await - .map_err(|e| format!("Storage (Indices::from): {}", e))?; + .map_err(|e| format!("Storage (SegmentationManager::from): {}", e))?; - let segmentation_manager = SegmentationManager::new(&directory) + let indices = Indices::from(&segmentation_manager.indices_segments()) .await - .map_err(|e| format!("Storage (SegmentationManager::new): {}", e))?; - - let (segment_sender, segment_receiver) = bounded(max_queue); + .map_err(|e| format!("Storage (Indices::from): {}", e))?; if compaction { - async_std::task::spawn(Compactor::run(segment_receiver)); + // async_std::task::spawn(Compactor::run(segment_receiver)); } Ok(Self { @@ -68,7 +60,6 @@ impl Storage { segmentation_manager, retrivable_buffer: [0; MAX_MESSAGE_SIZE], batch: Batch::new(), - segment_sender, compaction, }) } @@ -192,7 +183,7 @@ mod tests { } async fn setup_test_storage(title: &str, test_message: &[u8], count: usize) -> Storage { - let mut storage = Storage::new(title, 10_000, false).await.unwrap(); + let mut storage = Storage::new(title, false).await.unwrap(); let messages = vec![test_message; count]; @@ -218,7 +209,7 @@ mod tests { #[cfg_attr(miri, ignore)] async fn new_creates_instances() { // (l)eader/(r)eplica_topic-name_partition-count - let storage = Storage::new("TEST_l_reservations_1", 10_000, false).await; + let storage = Storage::new("TEST_l_reservations_1", false).await; assert!(storage.is_ok()); } @@ -226,7 +217,7 @@ mod tests { #[async_std::test] #[cfg_attr(miri, ignore)] async fn get_returns_ok() { - let message_count = 100_000; + let message_count = 500; let test_message = b"hello guys"; let mut storage = setup_test_storage(&function!(), test_message, message_count).await; @@ -245,9 +236,9 @@ mod tests { println!("Read {} messages in: {:.2?}", length, elapsed); - // assert_eq!(storage.len(), message_count); + assert_eq!(storage.len(), message_count); - // cleanup(&storage).await; + cleanup(&storage).await; } #[async_std::test] diff --git a/nyx-storage/src/offset.rs b/nyx-storage/src/offset.rs index 55e3571..c260962 100644 --- a/nyx-storage/src/offset.rs +++ b/nyx-storage/src/offset.rs @@ -1,4 +1,4 @@ -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq)] #[repr(C)] pub struct Offset { index: usize, diff --git a/nyx-storage/src/segment.rs b/nyx-storage/src/segment.rs index e072e9f..7840d9d 100644 --- a/nyx-storage/src/segment.rs +++ b/nyx-storage/src/segment.rs @@ -1,30 +1,33 @@ use async_std::{fs::File, io}; -use crate::directory::{DataType, Directory}; +use crate::{ + directory::{DataType, Directory}, + MAX_SEGMENT_SIZE, +}; #[derive(Debug)] pub struct Segment { clean: bool, length: u64, pub file: File, - pub location: String, } impl Segment { - pub async fn new( - directory: &Directory, - data_type: DataType, - length: u64, - count: usize, - ) -> io::Result { - let file = directory.open_read_write(data_type, count).await?; - let location = directory.get_file_path(data_type, count)?; + pub async fn new(directory: &Directory, data_type: DataType, count: usize) -> io::Result { + let file = directory.open_read_write_create(data_type, count).await?; Ok(Self { - length, + length: MAX_SEGMENT_SIZE, + clean: false, + file, + }) + } + + pub async fn from(data_type: DataType, file: File) -> io::Result { + Ok(Self { + length: MAX_SEGMENT_SIZE, clean: false, file, - location, }) } } diff --git a/nyx-storage/src/segmentation_manager.rs b/nyx-storage/src/segmentation_manager.rs index 7edf65f..97e3342 100644 --- a/nyx-storage/src/segmentation_manager.rs +++ b/nyx-storage/src/segmentation_manager.rs @@ -24,21 +24,11 @@ pub struct SegmentationManager { impl SegmentationManager { pub async fn new(directory: &Directory) -> io::Result { - let latest_indices_segment = Segment::new( - &directory, - crate::directory::DataType::Indices, - MAX_SEGMENT_SIZE, - 0, - ) - .await?; - - let latest_partition_segment = Segment::new( - &directory, - crate::directory::DataType::Partition, - MAX_SEGMENT_SIZE, - 0, - ) - .await?; + let latest_indices_segment = + Segment::new(&directory, crate::directory::DataType::Indices, 0).await?; + + let latest_partition_segment = + Segment::new(&directory, crate::directory::DataType::Partition, 0).await?; Ok(Self { indices_segments: vec![Arc::new(latest_indices_segment)], @@ -48,6 +38,60 @@ impl SegmentationManager { }) } + pub async fn from(directory: &Directory) -> io::Result { + let mut indices_segments = vec![]; + let mut partition_segments = vec![]; + + let mut current_segment_candidate = 0; + + while let Some(file) = directory + .open_read_write(DataType::Partition, current_segment_candidate) + .await + .ok() + { + current_segment_candidate += 1; + partition_segments.push(Arc::new(Segment::from(DataType::Partition, file).await?)); + } + + current_segment_candidate = 0; + + while let Some(file) = directory + .open_read_write(DataType::Indices, current_segment_candidate) + .await + .ok() + { + current_segment_candidate += 1; + indices_segments.push(Arc::new(Segment::from(DataType::Indices, file).await?)); + } + + if indices_segments.len() == 0 { + let latest_indices_segment = + Segment::new(&directory, crate::directory::DataType::Indices, 0).await?; + indices_segments.push(Arc::new(latest_indices_segment)); + } + + if partition_segments.len() == 0 { + let latest_partition_segment = + Segment::new(&directory, crate::directory::DataType::Partition, 0).await?; + partition_segments.push(Arc::new(latest_partition_segment)); + } + + Ok(Self { + indices_segments, + partition_segments, + latest_index_segment: NonNull::dangling(), + directory: directory.clone(), + }) + } + + pub fn partition_segments(&self) -> &[Arc] { + &self.partition_segments + } + + pub fn indices_segments(&self) -> &[Arc] { + &self.indices_segments + } + pub async fn create_segment(&mut self, data_type: DataType) -> io::Result> { let new_segment_count = if data_type == DataType::Indices { self.indices_segments.len() @@ -55,13 +99,7 @@ impl SegmentationManager { self.partition_segments.len() }; - let new_segment = Segment::new( - &self.directory, - data_type, - MAX_SEGMENT_SIZE, - new_segment_count, - ) - .await?; + let new_segment = Segment::new(&self.directory, data_type, new_segment_count).await?; let new_segment = Arc::new(new_segment);