diff --git a/Cargo.lock b/Cargo.lock index 23de815..fd25cbb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -85,6 +85,7 @@ version = "0.1.0" dependencies = [ "divan", "include_dir", + "memchr", "memmap2", "rand", "rusqlite", diff --git a/core/Cargo.toml b/core/Cargo.toml index f47fb6b..b9e01ab 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -10,6 +10,7 @@ testing = [] [dependencies] include_dir = "0.7.3" +memchr = "2.7.4" memmap2 = "0.9.4" rand = "0.8.5" rusqlite = { version = "0.31.0", features = ["bundled", "limits"] } diff --git a/core/benches/pyreport.rs b/core/benches/pyreport.rs index 67bbaa9..99585b3 100644 --- a/core/benches/pyreport.rs +++ b/core/benches/pyreport.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::{collections::HashMap, hint::black_box}; use codecov_rs::{ parsers::pyreport::{chunks, chunks_serde, report_json}, @@ -125,21 +125,11 @@ fn complex_chunks_serde(bencher: Bencher) { } fn parse_chunks_file_serde(input: &[u8]) { - let mut parser = chunks_serde::Parser::new(input); - loop { - // TODO: these are just for debugging - let rest = parser.rest; - let expecting = parser.expecting; - let event = parser.next(); - match event { - Ok(None) => break, - Ok(Some(_)) => {} - Err(err) => { - let rest = std::str::from_utf8(rest).unwrap(); - let rest = rest.get(..32).unwrap_or(rest); - dbg!(rest, expecting); - panic!("{err}"); - } + let chunks_file = chunks_serde::ChunksFile::new(input).unwrap(); + let mut chunks = chunks_file.chunks(); + while let Some(mut chunk) = chunks.next_chunk().unwrap() { + while let Some(line) = chunk.next_line().unwrap() { + black_box(line); } } } diff --git a/core/src/parsers/pyreport/chunks_serde.rs b/core/src/parsers/pyreport/chunks_serde.rs index 9335d8c..121057b 100644 --- a/core/src/parsers/pyreport/chunks_serde.rs +++ b/core/src/parsers/pyreport/chunks_serde.rs @@ -32,24 +32,146 @@ //! stream and only returns a `ReportLine` for tests. The `report_line_or_empty` //! parser which wraps this and supports empty lines returns `Ok(())`. -use std::{collections::HashMap, fmt}; +use std::{collections::HashMap, fmt, mem, sync::OnceLock}; +use memchr::{memchr, memmem}; use serde::{de, de::IgnoredAny, Deserialize}; +use crate::report::pyreport::{CHUNKS_FILE_END_OF_CHUNK, CHUNKS_FILE_HEADER_TERMINATOR}; + +#[derive(Debug, thiserror::Error)] +pub enum ParserError { + #[error("unexpected EOF")] + UnexpectedEof, + #[error("unexpected input")] + UnexpectedInput, + #[error("invalid file header")] + InvalidFileHeader(#[source] serde_json::Error), + #[error("invalid chunk header")] + InvalidChunkHeader(#[source] serde_json::Error), + #[error("invalid line record")] + InvalidLineRecord(#[source] serde_json::Error), +} + +impl PartialEq for ParserError { + fn eq(&self, other: &Self) -> bool { + core::mem::discriminant(self) == core::mem::discriminant(other) + } +} +impl Eq for ParserError {} + #[derive(Debug)] -pub struct Parser<'d> { - // TODO: these are pub just for debugging - pub rest: &'d [u8], - pub expecting: Expecting, +pub struct ChunksFile<'d> { + file_header: FileHeader, + input: &'d [u8], +} + +impl<'d> ChunksFile<'d> { + pub fn new(mut input: &'d [u8]) -> Result { + static HEADER_FINDER: OnceLock = OnceLock::new(); + let header_finder = + HEADER_FINDER.get_or_init(|| memmem::Finder::new(CHUNKS_FILE_HEADER_TERMINATOR)); + + let file_header = if let Some(pos) = header_finder.find(input) { + let header_bytes = &input[..pos]; + input = &input[pos + header_finder.needle().len()..]; + let file_header: FileHeader = + serde_json::from_slice(header_bytes).map_err(ParserError::InvalidFileHeader)?; + file_header + } else { + FileHeader::default() + }; + + Ok(Self { file_header, input }) + } + + pub fn labels_index(&self) -> &HashMap { + &self.file_header.labels_index + } + + pub fn chunks(&self) -> Chunks { + Chunks { input: self.input } + } } -#[derive(Debug, PartialEq, Eq)] -pub enum ParserEvent { - EmptyLineRecord, - LineRecord(LineRecord), - EmptyChunk, - FileHeader(FileHeader), - ChunkHeader(ChunkHeader), +pub struct Chunks<'d> { + input: &'d [u8], +} + +impl<'d> Chunks<'d> { + pub fn next_chunk(&mut self) -> Result>, ParserError> { + if self.input.is_empty() { + return Ok(None); + } + + static CHUNK_FINDER: OnceLock = OnceLock::new(); + let chunk_finder = + CHUNK_FINDER.get_or_init(|| memmem::Finder::new(CHUNKS_FILE_END_OF_CHUNK)); + + let mut chunk_bytes = if let Some(pos) = chunk_finder.find(self.input) { + let chunk_bytes = &self.input[..pos]; + self.input = &self.input[pos + chunk_finder.needle().len()..]; + chunk_bytes + } else { + mem::take(&mut self.input) + }; + + if chunk_bytes == b"null" { + return Ok(Some(Chunk { + chunk_header: ChunkHeader::default(), + input: &[], + })); + } + + let header_bytes = next_line(&mut chunk_bytes).ok_or(ParserError::UnexpectedInput)?; + let chunk_header: ChunkHeader = + serde_json::from_slice(header_bytes).map_err(ParserError::InvalidFileHeader)?; + + Ok(Some(Chunk { + chunk_header, + input: chunk_bytes, + })) + } +} + +pub struct Chunk<'d> { + chunk_header: ChunkHeader, + input: &'d [u8], +} + +impl<'d> Chunk<'d> { + pub fn present_sessions(&self) -> &[u32] { + &self.chunk_header.present_sessions + } + + pub fn next_line(&mut self) -> Result>, ParserError> { + let Some(line) = next_line(&mut self.input) else { + return Ok(None); + }; + + if line.is_empty() { + return Ok(Some(None)); + } + + let line_record: LineRecord = + serde_json::from_slice(line).map_err(ParserError::InvalidLineRecord)?; + return Ok(Some(Some(line_record))); + } +} + +fn next_line<'d>(input: &mut &'d [u8]) -> Option<&'d [u8]> { + if input.is_empty() { + return None; + } + + let line_bytes = if let Some(pos) = memchr(b'\n', input) { + let line_bytes = &input[..pos]; + *input = &input[pos + 1..]; + line_bytes + } else { + mem::take(input) + }; + Some(line_bytes) } #[derive(Debug, PartialEq, Eq, Default, Deserialize)] @@ -186,125 +308,10 @@ impl<'de> Deserialize<'de> for Coverage { } } -#[derive(Debug, thiserror::Error)] -pub enum ParserError { - #[error("unexpected EOF")] - UnexpectedEof, - #[error("unexpected input")] - UnexpectedInput, - #[error("invalid file header")] - InvalidFileHeader(#[source] serde_json::Error), - #[error("invalid chunk header")] - InvalidChunkHeader(#[source] serde_json::Error), - #[error("invalid line record")] - InvalidLineRecord(#[source] serde_json::Error), -} - -impl PartialEq for ParserError { - fn eq(&self, other: &Self) -> bool { - core::mem::discriminant(self) == core::mem::discriminant(other) - } -} -impl Eq for ParserError {} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum Expecting { - FileHeader, - ChunkHeader, - LineRecord, - EndOfChunk, -} - -const END_OF_CHUNK: &[u8] = b"<<<<< end_of_chunk >>>>>"; -const END_OF_HEADER: &[u8] = b"<<<<< end_of_header >>>>>"; - -// `slice::split_once` is still unstable: -// -fn slice_split_once(slice: &[u8], pred: u8) -> Option<(&[u8], &[u8])> { - let index = slice.iter().position(|b| *b == pred)?; - Some((&slice[..index], &slice[index + 1..])) -} - -impl<'d> Parser<'d> { - pub fn new(input: &'d [u8]) -> Self { - Self { - rest: input, - expecting: Expecting::FileHeader, - } - } - - pub fn next(&mut self) -> Result, ParserError> { - loop { - let Some((line, rest)) = slice_split_once(self.rest, b'\n') else { - return Ok(None); - }; - self.rest = rest; - - if self.expecting == Expecting::LineRecord { - if line.is_empty() { - return Ok(Some(ParserEvent::EmptyLineRecord)); - } - if line == END_OF_CHUNK { - self.expecting = Expecting::ChunkHeader; - continue; - } - - let line_record: LineRecord = - serde_json::from_slice(line).map_err(ParserError::InvalidLineRecord)?; - return Ok(Some(ParserEvent::LineRecord(line_record))); - } - - if self.expecting == Expecting::EndOfChunk { - if line != END_OF_CHUNK { - return Err(ParserError::UnexpectedInput); - } - - self.expecting = Expecting::ChunkHeader; - continue; - } - - // else: expecting a file or chunk header - - // this is an empty chunk (header) - if line == b"null" { - self.expecting = Expecting::EndOfChunk; - - return Ok(Some(ParserEvent::EmptyChunk)); - } - - // otherwise, the header has to be a JSON object - if !line.starts_with(b"{") { - return Err(ParserError::UnexpectedInput); - } - if self.expecting == Expecting::FileHeader { - if let Some((next_line, rest)) = slice_split_once(self.rest, b'\n') { - if next_line == END_OF_HEADER { - self.rest = rest; - self.expecting = Expecting::ChunkHeader; - - let file_header: FileHeader = - serde_json::from_slice(line).map_err(ParserError::InvalidFileHeader)?; - return Ok(Some(ParserEvent::FileHeader(file_header))); - } - } - } - // else: chunk header - - self.expecting = Expecting::LineRecord; - - let chunk_header: ChunkHeader = - serde_json::from_slice(line).map_err(ParserError::InvalidChunkHeader)?; - return Ok(Some(ParserEvent::ChunkHeader(chunk_header))); - } - } -} - #[cfg(test)] mod tests { use super::*; - type ParserItem = Result, ParserError>; - #[test] fn test_parsing_events() { let simple_line_record = LineRecord( @@ -316,60 +323,65 @@ mod tests { None, ); - let cases: &[(&[u8], &[ParserItem])] = &[ + let cases: &[( + &[u8], // input + HashMap, // labels index + &[(&[u32], &[Option])], // chunks: session ids, line records + )] = &[ ( // Header and one chunk with an empty line b"{}\n<<<<< end_of_header >>>>>\n{}\n", - &[ - Ok(Some(ParserEvent::FileHeader(FileHeader::default()))), - Ok(Some(ParserEvent::ChunkHeader(ChunkHeader::default()))), - Ok(None), - ], + HashMap::default(), + &[(&[], &[])], ), ( // No header, one chunk with a populated line and an empty line b"{}\n[1, null, [[0, 1]]]\n", - &[ - Ok(Some(ParserEvent::ChunkHeader(ChunkHeader::default()))), - Ok(Some(ParserEvent::LineRecord(simple_line_record.clone()))), - Ok(None), - ], + HashMap::default(), + &[(&[], &[Some(simple_line_record.clone())])], ), ( // No header, two chunks, the second having just one empty line b"{}\n[1, null, [[0, 1]]]\n\n<<<<< end_of_chunk >>>>>\n{}\n", - &[ - Ok(Some(ParserEvent::ChunkHeader(ChunkHeader::default()))), - Ok(Some(ParserEvent::LineRecord(simple_line_record.clone()))), - Ok(Some(ParserEvent::EmptyLineRecord)), - Ok(Some(ParserEvent::ChunkHeader(ChunkHeader::default()))), - Ok(None), - ], + HashMap::default(), + &[(&[], &[Some(simple_line_record.clone())]), (&[], &[])], ), ( // Header, two chunks, the second having multiple data lines and an empty line b"{}\n<<<<< end_of_header >>>>>\n{}\n[1, null, [[0, 1]]]\n\n<<<<< end_of_chunk >>>>>\n{}\n[1, null, [[0, 1]]]\n[1, null, [[0, 1]]]\n", + HashMap::default(), &[ - Ok(Some(ParserEvent::FileHeader(FileHeader::default()))), - Ok(Some(ParserEvent::ChunkHeader(ChunkHeader::default()))), - Ok(Some(ParserEvent::LineRecord(simple_line_record.clone()))), - Ok(Some(ParserEvent::EmptyLineRecord)), - Ok(Some(ParserEvent::ChunkHeader(ChunkHeader::default()))), - Ok(Some(ParserEvent::LineRecord(simple_line_record.clone()))), - Ok(Some(ParserEvent::LineRecord(simple_line_record.clone()))), - Ok(None), + (&[], &[Some(simple_line_record.clone())]), + ( + &[], + &[ + Some(simple_line_record.clone()), + Some(simple_line_record.clone()), + ], + ), ], ), ]; - for (input, expected_events) in cases { - let mut parser = Parser::new(input); + for (input, expected_labels_index, expected_chunks) in cases { + let chunks_file = ChunksFile::new(input).unwrap(); + let mut chunks = chunks_file.chunks(); + + assert_eq!(chunks_file.labels_index(), expected_labels_index); + + for (expected_sessions, expected_line_records) in *expected_chunks { + let mut chunk = chunks.next_chunk().unwrap().unwrap(); + + assert_eq!(chunk.present_sessions(), *expected_sessions); + + let mut lines = vec![]; + while let Some(line) = chunk.next_line().unwrap() { + lines.push(line); + } - for expected_event in *expected_events { - dbg!(std::str::from_utf8(parser.rest).unwrap(), parser.expecting); - let event = parser.next(); - assert_eq!(dbg!(event), *expected_event); + assert_eq!(lines, *expected_line_records); } + assert!(chunks.next_chunk().unwrap().is_none()); } } }