Skip to content

Commit

Permalink
Introduced StreamElement
Browse files Browse the repository at this point in the history
Signed-off-by: Guillaume W. Bres <[email protected]>
  • Loading branch information
gwbres committed Nov 1, 2024
1 parent cc5e1d5 commit 5bf8494
Show file tree
Hide file tree
Showing 7 changed files with 233 additions and 231 deletions.
114 changes: 59 additions & 55 deletions binex/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use std::io::{Error as IoError, Read};
#[cfg(feature = "flate2")]
use flate2::read::GzDecoder;

use crate::{message::Message, Error};
use log::warn;
// use log::warn;

use crate::prelude::{ClosedSourceElement, Error, Message, StreamElement};

/// Abstraction for Plain or Compressed [R]
enum Reader<R: Read> {
Expand Down Expand Up @@ -38,45 +39,25 @@ impl<R: Read> Read for Reader<R> {
}
}

/// Decoder FSM
#[derive(Debug, Copy, Clone, Default, PartialEq)]
enum State {
/// Everything is OK we're consuming data
#[default]
Parsing,
/// Partial frame is found in internal Buffer.
/// We need a secondary read to complete this message
IncompleteMessage,
/// Partial frame was found in internal Buffer.
/// But the total expected payload exceeds our internal buffer capacity.
/// [Decoder] is currently limited to parsing [Message] that fits
/// in the buffer entirely. This may not apply to very length (> 1 MB) messages
/// which is the case of signal observations for example - that we do not support at the moment.
/// In this case, we proceed to trash (consume the Input interface), complete the message
/// we do not know how to interprate & move on to next message.
IncompleteTrashing,
}

/// [BINEX] Stream Decoder. Use this structure to decode all messages streamed
/// on a [Read]able interface. M represents the internal buffer depth:
/// * the larger M the less constrain on the I/O interface (less frequent access)
/// * but the larger the (initial) memory allocation
pub struct Decoder<const M: usize, R: Read> {
/// Internal state
state: State,
/// BINEX Stream Decoder. Use this structure to decode a serie
/// of [StreamElement]s streamed over any [Read]able interface.
pub struct Decoder<'a, R: Read> {
/// Write pointer
wr_ptr: usize,
/// Read pointer
rd_ptr: usize,
/// Reached EOS
eos: bool,
/// Internal buffer
buf: [u8; M],
/// Internal buffer. Buffer is sized to fully contain
/// the "worst case" open source [Message].
buf: [u8; 4096],
/// [R]
reader: Reader<R>,
/// Reference to past [ClosedSourceElement] (if any)
past_element: Option<ClosedSourceElement<'a>>,
}

impl<const M: usize, R: Read> Decoder<M, R> {
impl<'a, R: Read> Decoder<'a, R> {
/// Creates a new BINEX [Decoder] from [R] readable interface,
/// ready to parse incoming bytes.
/// ```
Expand All @@ -90,7 +71,7 @@ impl<const M: usize, R: Read> Decoder<M, R> {
/// .unwrap();
///
/// // Two generics: with M the internal buffer depth
/// let mut decoder = Decoder::<1024, File>::new(fd);
/// let mut decoder = Decoder::new(fd);
///
/// // Consume data stream
/// loop {
Expand Down Expand Up @@ -126,9 +107,9 @@ impl<const M: usize, R: Read> Decoder<M, R> {
eos: false,
rd_ptr: 0,
wr_ptr: 0,
buf: [0; M],
buf: [0; 4096],
past_element: None,
reader: reader.into(),
state: State::default(),
}
}

Expand All @@ -146,8 +127,7 @@ impl<const M: usize, R: Read> Decoder<M, R> {
/// let mut fd = File::open("../test_resources/BIN/mfle20200105.bnx.gz")
/// .unwrap();
///
/// // two generics: with M the internal buffer depth
/// let mut decoder = Decoder::<1024, File>::new(fd);
/// let mut decoder = Decoder::new(fd);
///
/// // Consume data stream
/// loop {
Expand Down Expand Up @@ -183,18 +163,19 @@ impl<const M: usize, R: Read> Decoder<M, R> {
eos: false,
rd_ptr: 0,
wr_ptr: 0,
buf: [0; M],
state: State::default(),
buf: [0; 4096],
past_element: None,
reader: GzDecoder::new(reader).into(),
}
}
}

impl<const M: usize, R: Read> Iterator for Decoder<M, R> {
type Item = Result<Message, Error>;
/// Parse next message contained in stream
impl<'a, R: Read> Iterator for Decoder<'a, R> {
type Item = Result<StreamElement<'a>, Error>;

/// Parse next [StreamElement] contained in this BINEX stream.
fn next(&mut self) -> Option<Self::Item> {
// always try to fill in buffer
// always try to fill internal buffer
let size = self.reader.read(&mut self.buf[self.wr_ptr..]).ok()?;
self.wr_ptr += size;
//println!("wr_ptr={}", self.wr_ptr);
Expand All @@ -210,7 +191,11 @@ impl<const M: usize, R: Read> Iterator for Decoder<M, R> {
// - increment pointer
// - expose to user
self.rd_ptr += msg.encoding_size();
Some(Ok(msg))

// terminates possible [ClosedSourceElement] serie
self.past_element = None;

Some(Ok(msg.into()))
},
Err(e) => {
match e {
Expand All @@ -224,24 +209,43 @@ impl<const M: usize, R: Read> Iterator for Decoder<M, R> {
return None;
}
},
Error::NonSupportedMesssage(mlen) => {
self.rd_ptr += mlen;

if self.rd_ptr > 4096 {
self.rd_ptr = 0;
self.wr_ptr = 0;
}

if self.eos == true {
// consumed everything and EOS has been reached
return None;
}
},
Error::IncompleteMessage(mlen) => {
// buffer does not contain the entire message
// preserve content and shift: to permit refilling the buffer
// two cases:
if mlen + 2 < M {
// - if that message would fit in buffer, shift and prepare to refill for completion
self.wr_ptr -= self.rd_ptr;
self.buf.copy_within(self.rd_ptr.., 0);
return Some(Err(Error::IncompleteMessage(mlen)));
} else {
// - or, we don't support messages that do not fit in the local buffer (yet)
self.buf = [0; M];
// decoded partial valid frame
if self.rd_ptr + mlen > 4096 {
// frame would not fit in buffer:
// abort: we do not support that scenario.
// This should never happen anyway: internal buffer should be sized correctly.
self.buf = [0; 4096];
self.wr_ptr = 0;
self.rd_ptr = 0;
return Some(Err(Error::NonSupportedMesssage));
return Some(Err(Error::TooLargeInternalLimitation));
} else {
// preserved content (shift left)
// and permit the refill that will conclude this message
self.buf.copy_within(self.rd_ptr.., 0);

self.wr_ptr -= self.rd_ptr;
self.rd_ptr = 0;
return Some(Err(Error::IncompleteMessage(mlen)));
}
},
_ => {
// bad content that does not look like valid BINEX.
// This is very inefficient. If returned error would increment
// the internal pointer, we could directly move on to next interesting bytes.
self.rd_ptr += 1;
},
}
Expand Down
66 changes: 0 additions & 66 deletions binex/src/encoder.rs

This file was deleted.

24 changes: 14 additions & 10 deletions binex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,20 @@
use thiserror::Error;

mod decoder;
mod encoder;
mod message;
mod stream;

pub(crate) mod constants;
pub(crate) mod utils;

pub mod prelude {
pub use crate::{
decoder::Decoder,
encoder::Encoder,
message::{
EphemerisFrame, GPSEphemeris, GPSRaw, Message, MonumentGeoMetadata, MonumentGeoRecord,
Record, TimeResolution,
EphemerisFrame, GALEphemeris, GLOEphemeris, GPSEphemeris, GPSRaw, Message,
MonumentGeoMetadata, MonumentGeoRecord, Record, SBASEphemeris, TimeResolution,
},
stream::{ClosedSourceElement, Provider, StreamElement},
Error,
};
// re-export
Expand All @@ -33,7 +33,7 @@ pub enum Error {
IoError(#[from] std::io::Error),
#[error("invalid start of stream")]
InvalidStartofStream,
#[error("no SYNC byte found")]
#[error("no sync byte")]
NoSyncByte,
#[error("reversed streams are not supported yet")]
ReversedStream,
Expand All @@ -43,8 +43,8 @@ pub enum Error {
EnhancedCrc,
#[error("non supported timescale")]
NonSupportedTimescale,
#[error("unknown message")]
UnknownMessage,
// #[error("unknown message")]
// UnknownMessage,
#[error("unknown record field id")]
UnknownRecordFieldId,
#[error("utf8 error")]
Expand All @@ -53,8 +53,12 @@ pub enum Error {
MissingCRC,
#[error("received invalid crc")]
BadCRC,
#[error("incomplete message")]
#[error("incomplete: need more data")]
IncompleteMessage(usize),
#[error("non supported message")]
NonSupportedMesssage,
#[error("non supported message: library limitation")]
NonSupportedMesssage(usize),
#[error("message too large: library limitation")]
// This message should never happen: library is to be designed
// to support largest open source (fully disclosed) message frame
TooLargeInternalLimitation,
}
Loading

0 comments on commit 5bf8494

Please sign in to comment.