diff --git a/binex/src/decoder.rs b/binex/src/decoder.rs index 61aa83b0..1b0828ba 100644 --- a/binex/src/decoder.rs +++ b/binex/src/decoder.rs @@ -4,8 +4,9 @@ use std::io::{Error as IoError, Read}; #[cfg(feature = "flate2")] use flate2::read::GzDecoder; -use crate::{message::Message, Error}; -use log::warn; +// use log::warn; + +use crate::prelude::{ClosedSourceElement, Error, Message, StreamElement}; /// Abstraction for Plain or Compressed [R] enum Reader { @@ -38,45 +39,25 @@ impl Read for Reader { } } -/// Decoder FSM -#[derive(Debug, Copy, Clone, Default, PartialEq)] -enum State { - /// Everything is OK we're consuming data - #[default] - Parsing, - /// Partial frame is found in internal Buffer. - /// We need a secondary read to complete this message - IncompleteMessage, - /// Partial frame was found in internal Buffer. - /// But the total expected payload exceeds our internal buffer capacity. - /// [Decoder] is currently limited to parsing [Message] that fits - /// in the buffer entirely. This may not apply to very length (> 1 MB) messages - /// which is the case of signal observations for example - that we do not support at the moment. - /// In this case, we proceed to trash (consume the Input interface), complete the message - /// we do not know how to interprate & move on to next message. - IncompleteTrashing, -} - -/// [BINEX] Stream Decoder. Use this structure to decode all messages streamed -/// on a [Read]able interface. M represents the internal buffer depth: -/// * the larger M the less constrain on the I/O interface (less frequent access) -/// * but the larger the (initial) memory allocation -pub struct Decoder { - /// Internal state - state: State, +/// BINEX Stream Decoder. Use this structure to decode a serie +/// of [StreamElement]s streamed over any [Read]able interface. +pub struct Decoder<'a, R: Read> { /// Write pointer wr_ptr: usize, /// Read pointer rd_ptr: usize, /// Reached EOS eos: bool, - /// Internal buffer - buf: [u8; M], + /// Internal buffer. Buffer is sized to fully contain + /// the "worst case" open source [Message]. + buf: [u8; 4096], /// [R] reader: Reader, + /// Reference to past [ClosedSourceElement] (if any) + past_element: Option>, } -impl Decoder { +impl<'a, R: Read> Decoder<'a, R> { /// Creates a new BINEX [Decoder] from [R] readable interface, /// ready to parse incoming bytes. /// ``` @@ -90,7 +71,7 @@ impl Decoder { /// .unwrap(); /// /// // Two generics: with M the internal buffer depth - /// let mut decoder = Decoder::<1024, File>::new(fd); + /// let mut decoder = Decoder::new(fd); /// /// // Consume data stream /// loop { @@ -126,9 +107,9 @@ impl Decoder { eos: false, rd_ptr: 0, wr_ptr: 0, - buf: [0; M], + buf: [0; 4096], + past_element: None, reader: reader.into(), - state: State::default(), } } @@ -146,8 +127,7 @@ impl Decoder { /// let mut fd = File::open("../test_resources/BIN/mfle20200105.bnx.gz") /// .unwrap(); /// - /// // two generics: with M the internal buffer depth - /// let mut decoder = Decoder::<1024, File>::new(fd); + /// let mut decoder = Decoder::new(fd); /// /// // Consume data stream /// loop { @@ -183,18 +163,19 @@ impl Decoder { eos: false, rd_ptr: 0, wr_ptr: 0, - buf: [0; M], - state: State::default(), + buf: [0; 4096], + past_element: None, reader: GzDecoder::new(reader).into(), } } } -impl Iterator for Decoder { - type Item = Result; - /// Parse next message contained in stream +impl<'a, R: Read> Iterator for Decoder<'a, R> { + type Item = Result, Error>; + + /// Parse next [StreamElement] contained in this BINEX stream. fn next(&mut self) -> Option { - // always try to fill in buffer + // always try to fill internal buffer let size = self.reader.read(&mut self.buf[self.wr_ptr..]).ok()?; self.wr_ptr += size; //println!("wr_ptr={}", self.wr_ptr); @@ -210,7 +191,11 @@ impl Iterator for Decoder { // - increment pointer // - expose to user self.rd_ptr += msg.encoding_size(); - Some(Ok(msg)) + + // terminates possible [ClosedSourceElement] serie + self.past_element = None; + + Some(Ok(msg.into())) }, Err(e) => { match e { @@ -224,24 +209,43 @@ impl Iterator for Decoder { return None; } }, + Error::NonSupportedMesssage(mlen) => { + self.rd_ptr += mlen; + + if self.rd_ptr > 4096 { + self.rd_ptr = 0; + self.wr_ptr = 0; + } + + if self.eos == true { + // consumed everything and EOS has been reached + return None; + } + }, Error::IncompleteMessage(mlen) => { - // buffer does not contain the entire message - // preserve content and shift: to permit refilling the buffer - // two cases: - if mlen + 2 < M { - // - if that message would fit in buffer, shift and prepare to refill for completion - self.wr_ptr -= self.rd_ptr; - self.buf.copy_within(self.rd_ptr.., 0); - return Some(Err(Error::IncompleteMessage(mlen))); - } else { - // - or, we don't support messages that do not fit in the local buffer (yet) - self.buf = [0; M]; + // decoded partial valid frame + if self.rd_ptr + mlen > 4096 { + // frame would not fit in buffer: + // abort: we do not support that scenario. + // This should never happen anyway: internal buffer should be sized correctly. + self.buf = [0; 4096]; self.wr_ptr = 0; self.rd_ptr = 0; - return Some(Err(Error::NonSupportedMesssage)); + return Some(Err(Error::TooLargeInternalLimitation)); + } else { + // preserved content (shift left) + // and permit the refill that will conclude this message + self.buf.copy_within(self.rd_ptr.., 0); + + self.wr_ptr -= self.rd_ptr; + self.rd_ptr = 0; + return Some(Err(Error::IncompleteMessage(mlen))); } }, _ => { + // bad content that does not look like valid BINEX. + // This is very inefficient. If returned error would increment + // the internal pointer, we could directly move on to next interesting bytes. self.rd_ptr += 1; }, } diff --git a/binex/src/encoder.rs b/binex/src/encoder.rs deleted file mode 100644 index e3cc2c8d..00000000 --- a/binex/src/encoder.rs +++ /dev/null @@ -1,66 +0,0 @@ -// use log::{debug, error}; -use std::io::{ - Result as IoResult, - //Error as IoError, - Write, -}; - -#[cfg(feature = "flate2")] -use flate2::{write::GzEncoder, Compression as GzCompression}; - -/// Abstraction for Plain or Compressed [R] -enum Writer { - Plain(W), - #[cfg(feature = "flate2")] - Compressed(GzEncoder), -} - -impl From for Writer { - fn from(w: W) -> Writer { - Self::Plain(w) - } -} - -#[cfg(feature = "flate2")] -impl From> for Writer { - fn from(w: GzEncoder) -> Writer { - Self::Compressed(w) - } -} - -impl Write for Writer { - fn write(&mut self, buf: &[u8]) -> IoResult { - match self { - Self::Plain(w) => w.write(buf), - #[cfg(feature = "flate2")] - Self::Compressed(w) => w.write(buf), - } - } - fn flush(&mut self) -> IoResult<()> { - match self { - Self::Plain(w) => w.flush(), - #[cfg(feature = "flate2")] - Self::Compressed(w) => w.flush(), - } - } -} - -/// [BINEX] Stream Encoder. -pub struct Encoder { - /// [W] - writer: Writer, -} - -impl Encoder { - pub fn new(writer: W) -> Self { - Self { - writer: writer.into(), - } - } - #[cfg(feature = "flate2")] - pub fn new_gzip(writer: W, compression_level: u32) -> Self { - Self { - writer: GzEncoder::new(writer, GzCompression::new(compression_level)).into(), - } - } -} diff --git a/binex/src/lib.rs b/binex/src/lib.rs index f28ea8a0..a725134a 100644 --- a/binex/src/lib.rs +++ b/binex/src/lib.rs @@ -5,8 +5,8 @@ use thiserror::Error; mod decoder; -mod encoder; mod message; +mod stream; pub(crate) mod constants; pub(crate) mod utils; @@ -14,11 +14,11 @@ pub(crate) mod utils; pub mod prelude { pub use crate::{ decoder::Decoder, - encoder::Encoder, message::{ - EphemerisFrame, GPSEphemeris, GPSRaw, Message, MonumentGeoMetadata, MonumentGeoRecord, - Record, TimeResolution, + EphemerisFrame, GALEphemeris, GLOEphemeris, GPSEphemeris, GPSRaw, Message, + MonumentGeoMetadata, MonumentGeoRecord, Record, SBASEphemeris, TimeResolution, }, + stream::{ClosedSourceElement, Provider, StreamElement}, Error, }; // re-export @@ -33,7 +33,7 @@ pub enum Error { IoError(#[from] std::io::Error), #[error("invalid start of stream")] InvalidStartofStream, - #[error("no SYNC byte found")] + #[error("no sync byte")] NoSyncByte, #[error("reversed streams are not supported yet")] ReversedStream, @@ -43,8 +43,8 @@ pub enum Error { EnhancedCrc, #[error("non supported timescale")] NonSupportedTimescale, - #[error("unknown message")] - UnknownMessage, + // #[error("unknown message")] + // UnknownMessage, #[error("unknown record field id")] UnknownRecordFieldId, #[error("utf8 error")] @@ -53,8 +53,12 @@ pub enum Error { MissingCRC, #[error("received invalid crc")] BadCRC, - #[error("incomplete message")] + #[error("incomplete: need more data")] IncompleteMessage(usize), - #[error("non supported message")] - NonSupportedMesssage, + #[error("non supported message: library limitation")] + NonSupportedMesssage(usize), + #[error("message too large: library limitation")] + // This message should never happen: library is to be designed + // to support largest open source (fully disclosed) message frame + TooLargeInternalLimitation, } diff --git a/binex/src/message/mod.rs b/binex/src/message/mod.rs index db292221..e55043de 100644 --- a/binex/src/message/mod.rs +++ b/binex/src/message/mod.rs @@ -14,31 +14,13 @@ pub(crate) use mid::MessageID; use checksum::Checksum; -use crate::{constants::Constants, utils::Utils, Error}; - -/// [Message] [Provider] -#[derive(Debug, Clone, PartialEq, Default)] -pub enum Provider { - /// Used for any general public data. - /// Only general public [Message]s are garanteed - /// to be decyphered by this parser. - #[default] - PublicDomain, - /// JPL for internal needs or prototyping. - JPL, - /// CU Boulder for internal needs or prototyping. - ColoradoUnivBoulder, - /// NRCan for internal needs or prototyping. - NRCan, -} +use crate::{constants::Constants, Error}; #[derive(Debug, Clone, PartialEq, Default)] pub struct Message { /// Endianness used when encoding current message, /// defined by SYNC byte pub big_endian: bool, - /// Provider - pub provider: Provider, /// MID byte stored as [MessageID] mid: MessageID, /// True when using enhanced CRC @@ -52,8 +34,7 @@ pub struct Message { } impl Message { - /// Creates a new [Message] that will follow [Provider::PublicDomain] - /// specifications, ready to encode. + /// Creates a new [Message] ready to be encoded. pub fn new( big_endian: bool, time_res: TimeResolution, @@ -69,70 +50,6 @@ impl Message { reversed, big_endian, enhanced_crc, - provider: Provider::PublicDomain, - } - } - - /// Builds new [Provider::JPL] [Message] prototype, - /// that most likely only this organization can fully interprate. - pub fn new_jpl_prototype( - big_endian: bool, - time_res: TimeResolution, - enhanced_crc: bool, - reversed: bool, - record: Record, - ) -> Self { - let mid = record.to_message_id(); - Self { - mid, - record, - time_res, - reversed, - big_endian, - enhanced_crc, - provider: Provider::JPL, - } - } - - /// Builds new [Provider::ColoradoUnivBoulder] [Message] prototype, - /// that most likely only this organization can fully interprate. - pub fn new_cu_boulder_prototype( - big_endian: bool, - time_res: TimeResolution, - enhanced_crc: bool, - reversed: bool, - record: Record, - ) -> Self { - let mid = record.to_message_id(); - Self { - mid, - record, - time_res, - reversed, - big_endian, - enhanced_crc, - provider: Provider::ColoradoUnivBoulder, - } - } - - /// Builds new [Provider::NRCan] [Message] prototype, - /// that most likely only this organization can fully interprate. - pub fn new_nrcan_prototype( - big_endian: bool, - time_res: TimeResolution, - enhanced_crc: bool, - reversed: bool, - record: Record, - ) -> Self { - let mid = record.to_message_id(); - Self { - mid, - record, - time_res, - reversed, - big_endian, - enhanced_crc, - provider: Provider::NRCan, } } @@ -252,9 +169,9 @@ impl Message { let fr = EphemerisFrame::decode(big_endian, &buf[ptr..])?; Record::new_ephemeris_frame(fr) }, - _ => { - //println!("found unsupported msg id={:?}", id); - return Err(Error::UnknownMessage); + id => { + println!("found unsupported msg id={:?}", id); + return Err(Error::NonSupportedMesssage(mlen)); }, }; @@ -282,7 +199,6 @@ impl Message { time_res, big_endian, enhanced_crc, - provider: Provider::PublicDomain, }) // } } @@ -407,7 +323,7 @@ impl Message { // multi byte case let (val, size) = if buf[1] & Constants::BNXI_KEEP_GOING_MASK == 0 { - let mut val = 0_u32; + let mut val; let (byte0, byte1) = if big_endian { (buf[0], buf[1]) @@ -421,7 +337,7 @@ impl Message { (val, 2) } else if buf[2] & Constants::BNXI_KEEP_GOING_MASK == 0 { - let mut val = 0_u32; + let mut val; let (byte0, byte1, byte2) = if big_endian { (buf[0], buf[1], buf[2]) @@ -438,7 +354,7 @@ impl Message { val |= byte2 as u32; (val, 3) } else { - let mut val = 0_u32; + let mut val; let (byte0, byte1, byte2, byte3) = if big_endian { (buf[0], buf[1], buf[2], buf[3]) diff --git a/binex/src/message/record/monument/frame.rs b/binex/src/message/record/monument/frame.rs index be7bfc1a..d39df86d 100644 --- a/binex/src/message/record/monument/frame.rs +++ b/binex/src/message/record/monument/frame.rs @@ -249,9 +249,9 @@ impl MonumentGeoFrame { | FieldID::Geocode | FieldID::AntennaOffset3D | FieldID::AntennaGeo3D - | FieldID::Unknown => Err(Error::UnknownMessage), + | FieldID::Unknown => Err(Error::NonSupportedMesssage(24)), }, - Err(e) => { + Err(_) => { // println!("bnx00-str: utf8 error {}", e); Err(Error::Utf8Error) }, diff --git a/binex/src/stream.rs b/binex/src/stream.rs new file mode 100644 index 00000000..9c9d45ba --- /dev/null +++ b/binex/src/stream.rs @@ -0,0 +1,142 @@ +//! BINEX Stream representation +use crate::prelude::Message; + +/// [Message] [Provider] +#[derive(Debug, Copy, Clone, PartialEq)] +pub enum Provider { + /// JPL for internal needs or prototyping. + JPL, + /// IGS + IGS, + /// CU Boulder for internal needs or prototyping. + ColoradoUnivBoulder, + /// NRCan for internal needs or prototyping. + NRCan, + /// UCAR COSMIC [https://www.cosmic.ucar.edu] + UCAR, + /// GPS Solutions Inc. + GPSSolutions, + /// Astech Precision Products + Ashtech, + /// Topcon Positioning Systems + Topcon, +} + +/// Closed source frame that we can encode but not interprate. +/// This particular [StreamElement] can be either a part of a continuous serie or self sustainable. +pub struct ClosedSourceElement<'a> { + /// Provider of this frame. + /// Only this organization may have capabilities to interprate this frame. + pub provider: Provider, + /// Size of this element. Use this to determine the packet index + /// in a continuous stream of undisclosed [StreamElement]s. + pub size: usize, + /// Total size of this undisclosed message. Use this to determine the packet index + /// in a continuous stream of undisclosed [StreamElement]s. + pub total: usize, + /// Raw data content that we can encode, decode but not interprate. + raw: &'a [u8], +} + +impl<'a> ClosedSourceElement<'a> { + /// Interprate this [ClosedSourceElement] using custom undisclosed method. + /// ``` + /// + /// ``` + pub fn interprate(&self, f: &dyn Fn(&[u8])) { + f(&self.raw[..self.size]) + } + + /// Returns reference to raw data "as is", since interpration is not possible + pub fn raw(&self) -> &'a [u8] { + &self.raw[..self.size] + } +} + +/// [StreamElement] represents one element of a continuous BINEX stream. +pub enum StreamElement<'a> { + /// Open Source [Message] we can fully decode & interprate + OpenSource(Message), + /// One non disclosed [ClosedSourceElement] that may be part of a continuous serie of elements. + /// Each chunk of the serie is internally limited to 4096 bytes. + /// While we can encode and decode this serie, we cannot interprate it. + ClosedSource(ClosedSourceElement<'a>), +} + +impl<'a> From for StreamElement<'a> { + fn from(msg: Message) -> Self { + Self::OpenSource(msg) + } +} + +impl<'a> StreamElement<'a> { + /// Creates a new open source [Message] ready to be encoded + pub fn new_open_source(msg: Message) -> Self { + Self::OpenSource(msg) + } + + /// Creates a new self sustained closed source [StreamElement] provided by desired [Provider]. + /// ## Inputs + /// - provider: specific [Provider] + /// - raw: content we can encode, decode but not interprate + /// - size: size of this [StreamElement] + /// - total: total size of the [StreamElement] serie + pub fn new_prototype(provider: Provider, raw: &'a [u8], size: usize, total: usize) -> Self { + Self::ClosedSource(ClosedSourceElement { + raw, + total, + size, + provider, + }) + } + + /// Add one closed source [StreamElement]s provided by desired [Provider::JPL]. + /// While we can encode this into a BINEX stream, only this organization can fully interprate the resulting stream. + /// ## Inputs + /// - raw: content we can encode, decode but not interprate + /// - size: size of the provided buffer (bytewise) + /// - total: total size of the closed source Message (bytewise) + pub fn jpl_prototype(raw: &'a [u8], size: usize, total: usize) -> Self { + Self::new_prototype(Provider::JPL, raw, size, total) + } + + /// Add one closed source [StreamElement]s provided by desired [Provider::JPL]. + /// While we can encode this into a BINEX stream, only this organization can fully interprate the resulting stream. + /// ## Inputs + /// - raw: content we can encode, decode but not interprate + /// - size: size of the provided buffer (bytewise) + /// - total: total size of the closed source Message (bytewise) + pub fn igs_prototype(raw: &'a [u8], size: usize, total: usize) -> Self { + Self::new_prototype(Provider::IGS, raw, size, total) + } + + /// Add one closed source [StreamElement]s provided by desired [Provider::ColoradoUnivBoulder]. + /// While we can encode this into a BINEX stream, only this organization can fully interprate the resulting stream. + /// ## Inputs + /// - raw: content we can encode, decode but not interprate + /// - size: size of the provided buffer (bytewise) + /// - total: total size of the closed source Message (bytewise) + pub fn cuboulder_prototype(raw: &'a [u8], size: usize, total: usize) -> Self { + Self::new_prototype(Provider::ColoradoUnivBoulder, raw, size, total) + } + + /// Add one closed source [StreamElement]s provided by desired [Provider::NRCan]. + /// While we can encode this into a BINEX stream, only this organization can fully interprate the resulting stream. + /// ## Inputs + /// - raw: content we can encode, decode but not interprate + /// - size: size of the provided buffer (bytewise) + /// - total: total size of the closed source Message (bytewise) + pub fn nrcan_prototype(raw: &'a [u8], size: usize, total: usize) -> Self { + Self::new_prototype(Provider::NRCan, raw, size, total) + } + + /// Add one closed source [StreamElement]s provided by desired [Provider::UCAR]. + /// While we can encode this into a BINEX stream, only this organization can fully interprate the resulting stream. + /// ## Inputs + /// - raw: content we can encode, decode but not interprate + /// - size: size of the provided buffer (bytewise) + /// - total: total size of the closed source Message (bytewise) + pub fn ucar_prototype(raw: &'a [u8], size: usize, total: usize) -> Self { + Self::new_prototype(Provider::UCAR, raw, size, total) + } +} diff --git a/binex/tests/decoder.rs b/binex/tests/decoder.rs index 2836c799..d3404e98 100644 --- a/binex/tests/decoder.rs +++ b/binex/tests/decoder.rs @@ -1,4 +1,4 @@ -use binex::prelude::{Decoder, Error}; +use binex::prelude::{Decoder, Error, StreamElement}; use std::fs::File; #[test] @@ -6,18 +6,19 @@ fn mfle20190130() { let mut found = 0; let fd = File::open("../test_resources/BIN/mfle20190130.bnx").unwrap(); - let mut decoder = Decoder::<1024, File>::new(fd); + let mut decoder = Decoder::new(fd); loop { match decoder.next() { - Some(Ok(msg)) => { + Some(Ok(StreamElement::OpenSource(msg))) => { found += 1; println!("parsed: {:?}", msg); }, + Some(Ok(StreamElement::ClosedSource(element))) => {}, Some(Err(e)) => match e { Error::IoError(e) => panic!("i/o error: {}", e), e => { - // println!("err={}", e); + println!("err={}", e); }, }, None => { @@ -35,14 +36,15 @@ fn gziped_files() { for fp in ["mfle20200105.bnx.gz", "mfle20200113.bnx.gz"] { let fp = format!("../test_resources/BIN/{}", fp); let fd = File::open(fp).unwrap(); - let mut decoder = Decoder::<1024, File>::new_gzip(fd); + let mut decoder = Decoder::new_gzip(fd); loop { match decoder.next() { - Some(Ok(msg)) => { + Some(Ok(StreamElement::OpenSource(msg))) => { found += 1; println!("parsed: {:?}", msg); }, + Some(Ok(StreamElement::ClosedSource(element))) => {}, Some(Err(e)) => match e { Error::IoError(e) => panic!("i/o error: {}", e), e => {