From c560637fe84220738725328e63c5407c3a86c3ff Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Thu, 18 Jan 2024 15:05:44 +0000 Subject: [PATCH] tinkering with some iterator ideas. #24 #25 --- design.md | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/design.md b/design.md index 580db36..50fe6d1 100644 --- a/design.md +++ b/design.md @@ -453,3 +453,36 @@ impl Writer for IoUringLocal { } impl GetFilesize for IoUringLocal { } impl ReadThenWrite for IoUringLocal { } ``` + +### Trying out some iterator ideas again + +```rust +let errors = reader.read( + vec![ + // First group: All the source GRIB files which will be collated into + // a set of Zarr chunks. + HashMap::from([ + ("/foo/grib1.1", ByteRanges(vec![...])), + ("/foo/grib1.2", ByteRanges(vec![...])), + ]), + // Second group: + HashMap::from([ + ("/foo/grib2.1", ByteRanges(vec![...])), + ("/foo/grib2.1", ByteRanges(vec![...])), + ]), + ]) + // Behind the scenes, `reader.read()` launches a thread which owns its own + // io_uring, and continually keeps that io_uring topped up. All IO operations in group n + // are guaranteed to be completed before LSIO begins IO ops from group n+1. + .iter() + // `map` acts on each byte range. + // TODO: How to allow `map` to run concurrently across all tasks? Maybe: + // - Rayon's par_iter? + // - Return a Future? (Not sure that'll work?) + // - Use Tokio with Rayon?! See https://ryhl.io/blog/async-what-is-blocking/ + .map(|(buffer, path, byte_range, group_index)| (decompress(buffer), path, byte_range, group_index)) + // `reduce` collects all the buffers for a group, and outputs a vector of + // (output_buffer, output_path, output_byte_range) + .reduce(|buffers_with_paths_and_byte_range| collate(buffers_with_paths_and_byte_range)) + // Now compress each +```