Skip to content

Commit

Permalink
implement streaming iter and adapt tests
Browse files Browse the repository at this point in the history
disables lib tests for now
also adds result to some tests
  • Loading branch information
Quba1 committed Feb 3, 2024
1 parent f5978dd commit ae1d428
Show file tree
Hide file tree
Showing 11 changed files with 248 additions and 236 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ tokio = { version = "1.35", features = ["macros", "rt"] }
criterion = "0.5"
testing_logger = "0.1"
rand = "0.8"
anyhow = "1.0"

[features]
default = ["ec_index"]
Expand All @@ -47,3 +48,6 @@ features = ["docs", "ec_index"]
[[bench]]
name = "main"
harness = false

[lib]
doctest = false
2 changes: 1 addition & 1 deletion benches/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use eccodes::FallibleIterator;
use eccodes::FallibleStreamingIterator;
use std::path::Path;

use criterion::{black_box, criterion_group, criterion_main, Criterion};
Expand Down
171 changes: 79 additions & 92 deletions src/codes_handle/iterator.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,11 @@
use std::ptr;

use eccodes_sys::codes_handle;
use fallible_iterator::FallibleIterator;
use fallible_streaming_iterator::FallibleStreamingIterator;

use crate::{
codes_handle::{CodesHandle, KeyedMessage},
errors::CodesError,
intermediate_bindings::{
codes_get_message_copy, codes_handle_delete, codes_handle_new_from_file,
codes_handle_new_from_message_copy,
},
intermediate_bindings::{codes_handle_delete, codes_handle_new_from_file},
};
#[cfg(feature = "ec_index")]
use crate::{intermediate_bindings::codes_index::codes_handle_new_from_index, CodesIndex};
Expand Down Expand Up @@ -84,64 +79,46 @@ use super::GribFile;
///## Errors
///The `next()` method will return [`CodesInternal`](crate::errors::CodesInternal)
///when internal ecCodes function returns non-zero code.
impl FallibleIterator for CodesHandle<GribFile> {
impl FallibleStreamingIterator for CodesHandle<GribFile> {
type Item = KeyedMessage;

type Error = CodesError;

fn next(&mut self) -> Result<Option<Self::Item>, Self::Error> {
let new_eccodes_handle;
fn advance(&mut self) -> Result<(), Self::Error> {
unsafe {
codes_handle_delete(self.eccodes_handle)?;
new_eccodes_handle = codes_handle_new_from_file(self.source.pointer, self.product_kind);
}

match new_eccodes_handle {
Ok(h) => {
self.eccodes_handle = h;

if self.eccodes_handle.is_null() {
Ok(None)
} else {
let message = get_message_from_handle(h);
Ok(Some(message))
}
}
Err(e) => Err(e),
codes_handle_delete(self.unsafe_message.message_handle)?;
}
}
}

#[cfg(feature = "ec_index")]
impl FallibleIterator for CodesHandle<CodesIndex> {
type Item = KeyedMessage;
// nullify message handle so that destructor is harmless
// it might be excessive but it follows the correct pattern
self.unsafe_message.message_handle = ptr::null_mut();

type Error = CodesError;
let new_eccodes_handle =
unsafe { codes_handle_new_from_file(self.source.pointer, self.product_kind)? };

fn next(&mut self) -> Result<Option<Self::Item>, Self::Error> {
let new_eccodes_handle;
unsafe {
codes_handle_delete(self.eccodes_handle)?;
new_eccodes_handle = codes_handle_new_from_index(self.source.pointer);
}
self.unsafe_message = KeyedMessage {
message_handle: new_eccodes_handle,
iterator_flags: None,
iterator_namespace: None,
keys_iterator: None,
keys_iterator_next_item_exists: false,
nearest_handle: None,
};

match new_eccodes_handle {
Ok(h) => {
self.eccodes_handle = h;
Ok(())
}

if self.eccodes_handle.is_null() {
Ok(None)
} else {
let message = get_message_from_handle(h);
Ok(Some(message))
}
}
Err(e) => Err(e),
fn get(&self) -> Option<&Self::Item> {
if self.unsafe_message.message_handle.is_null() {
None
} else {
Some(&self.unsafe_message)
}
}
}

impl FallibleStreamingIterator for CodesHandle<GribFile> {
#[cfg(feature = "ec_index")]
impl FallibleStreamingIterator for CodesHandle<CodesIndex> {
type Item = KeyedMessage;

type Error = CodesError;
Expand All @@ -155,8 +132,7 @@ impl FallibleStreamingIterator for CodesHandle<GribFile> {
// it might be excessive but it follows the correct pattern
self.unsafe_message.message_handle = ptr::null_mut();

let new_eccodes_handle =
unsafe { codes_handle_new_from_file(self.source.pointer, self.product_kind)? };
let new_eccodes_handle = unsafe { codes_handle_new_from_index(self.source.pointer)? };

self.unsafe_message = KeyedMessage {
message_handle: new_eccodes_handle,
Expand All @@ -179,57 +155,65 @@ impl FallibleStreamingIterator for CodesHandle<GribFile> {
}
}

fn get_message_from_handle(handle: *mut codes_handle) -> KeyedMessage {
let new_handle;
let new_buffer;

unsafe {
new_buffer = codes_get_message_copy(handle).expect(
"Getting message clone failed.
Please report this panic on Github",
);
new_handle = codes_handle_new_from_message_copy(&new_buffer);
}

KeyedMessage {
message_handle: new_handle,
iterator_flags: None,
iterator_namespace: None,
keys_iterator: None,
keys_iterator_next_item_exists: false,
nearest_handle: None,
}
}

#[cfg(test)]
mod tests {
use crate::codes_handle::{CodesHandle, KeyType, KeyedMessage, ProductKind};
use crate::codes_handle::{CodesHandle, KeyType, ProductKind};
use anyhow::Result;
use fallible_streaming_iterator::FallibleStreamingIterator;
use std::path::Path;

#[test]
fn iterator_fn() {
fn iterator_lifetimes() -> Result<()> {
let file_path = Path::new("./data/iceland-levels.grib");
let product_kind = ProductKind::GRIB;
let mut handle = CodesHandle::new_from_file(file_path, product_kind).unwrap();

let msg1 = handle.next()?.unwrap();
let key1 = msg1.read_key("typeOfLevel")?;

let msg2 = handle.next()?.unwrap();
let key2 = msg2.read_key("typeOfLevel")?;

let msg3 = handle.next()?.unwrap();
let key3 = msg3.read_key("typeOfLevel")?;

assert_eq!(key1.value, KeyType::Str("isobaricInhPa".to_string()));
assert_eq!(key2.value, KeyType::Str("isobaricInhPa".to_string()));
assert_eq!(key3.value, KeyType::Str("isobaricInhPa".to_string()));

Ok(())
}

#[test]
fn iterator_fn() -> Result<()> {
let file_path = Path::new("./data/iceland-surface.grib");
let product_kind = ProductKind::GRIB;

let mut handle = CodesHandle::new_from_file(file_path, product_kind).unwrap();

while let Some(msg) = handle.next().unwrap() {
let key = msg.read_key("shortName").unwrap();
while let Some(msg) = handle.next()? {
let key = msg.read_key("shortName")?;

match key.value {
KeyType::Str(_) => {}
_ => panic!("Incorrect variant of string key"),
}
}

Ok(())
}

fn iterator_collected() {
#[test]
fn iterator_collected() -> Result<()> {
let file_path = Path::new("./data/iceland-surface.grib");
let product_kind = ProductKind::GRIB;
let handle = CodesHandle::new_from_file(file_path, product_kind).unwrap();
let mut handle = CodesHandle::new_from_file(file_path, product_kind).unwrap();

let handle_collected: Vec<KeyedMessage> = handle.collect().unwrap();
let mut handle_collected = vec![];

while let Some(msg) = handle.next()? {
handle_collected.push(msg.clone());
}

for msg in handle_collected {
let key = msg.read_key("name").unwrap();
Expand All @@ -238,6 +222,8 @@ mod tests {
_ => panic!("Incorrect variant of string key"),
}
}

Ok(())
}

#[test]
Expand All @@ -256,24 +242,23 @@ mod tests {
}

#[test]
fn iterator_collect() {
fn iterator_filter() -> Result<()> {
let file_path = Path::new("./data/iceland.grib");
let product_kind = ProductKind::GRIB;

let handle = CodesHandle::new_from_file(file_path, product_kind).unwrap();
let mut handle = CodesHandle::new_from_file(file_path, product_kind).unwrap();

// Use iterator to get a Keyed message with shortName "msl" and typeOfLevel "surface"
// First, filter and collect the messages to get those that we want
let mut level: Vec<KeyedMessage> = handle
.filter(|msg| {
Ok(
msg.read_key("shortName")?.value == KeyType::Str("msl".to_string())
&& msg.read_key("typeOfLevel")?.value
== KeyType::Str("surface".to_string()),
)
})
.collect()
.unwrap();
let mut level = vec![];

while let Some(msg) = handle.next()? {
if msg.read_key("shortName")?.value == KeyType::Str("msl".to_string())
&& msg.read_key("typeOfLevel")?.value == KeyType::Str("surface".to_string())
{
level.push(msg.clone());
}
}

// Now unwrap and access the first and only element of resulting vector
// Find nearest modifies internal KeyedMessage fields so we need mutable reference
Expand All @@ -289,5 +274,7 @@ mod tests {
"value: {}, distance: {}",
nearest_gridpoints[3].value, nearest_gridpoints[3].distance
);

Ok(())
}
}
16 changes: 11 additions & 5 deletions src/codes_handle/keyed_message/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,17 +156,19 @@ impl KeyedMessage {

#[cfg(test)]
mod tests {
use anyhow::Result;

use crate::codes_handle::{CodesHandle, KeysIteratorFlags, ProductKind};
use crate::FallibleIterator;
use crate::{FallibleIterator, FallibleStreamingIterator};
use std::path::Path;

#[test]
fn keys_iterator_parameters() {
fn keys_iterator_parameters() -> Result<()> {
let file_path = Path::new("./data/iceland.grib");
let product_kind = ProductKind::GRIB;

let mut handle = CodesHandle::new_from_file(file_path, product_kind).unwrap();
let mut current_message = handle.next().unwrap().unwrap();
let mut current_message = handle.next()?.unwrap().clone();

assert!(current_message.iterator_flags.is_none());
assert!(current_message.iterator_namespace.is_none());
Expand All @@ -193,15 +195,17 @@ mod tests {
while let Some(key) = current_message.next().unwrap() {
assert!(!key.name.is_empty());
}

Ok(())
}

#[test]
fn invalid_namespace() {
fn invalid_namespace() -> Result<()> {
let file_path = Path::new("./data/iceland.grib");
let product_kind = ProductKind::GRIB;

let mut handle = CodesHandle::new_from_file(file_path, product_kind).unwrap();
let mut current_message = handle.next().unwrap().unwrap();
let mut current_message = handle.next()?.unwrap().clone();

let flags = vec![
KeysIteratorFlags::AllKeys, //0
Expand All @@ -214,5 +218,7 @@ mod tests {
while let Some(key) = current_message.next().unwrap() {
assert!(!key.name.is_empty());
}

Ok(())
}
}
Loading

0 comments on commit ae1d428

Please sign in to comment.