Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: import_verifiable_stream and write_verifiable_stream in Store #10

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions src/store/fs/tests.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
use std::io::Cursor;

use bao_tree::ChunkRanges;
use bytes::BytesMut;
use iroh_io::AsyncSliceReaderExt;

use crate::{
store::{
bao_file::test_support::{
decode_response_into_batch, make_wire_data, random_test_data, simulate_remote, validate,
},
Map as _, MapEntryMut, MapMut, ReadableStore, Store as _,
Map as _, MapEntry, MapEntryMut, MapMut, ReadableStore, Store as _, ValidateProgress,
},
util::raw_outboard,
util::{progress::AsyncChannelProgressSender, raw_outboard},
IROH_BLOCK_SIZE,
};

Expand Down Expand Up @@ -809,3 +810,34 @@ async fn actor_store_smoke() {
db.sync().await.unwrap();
db.dump().await.unwrap();
}

#[tokio::test]
async fn verifiable_stream_smoke() -> testresult::TestResult {
let db1 = crate::store::mem::Store::new();
let db2 = crate::store::mem::Store::new();

const SIZE: usize = 16 * 1024 * 1024;
let data = random_test_data(SIZE);
let tag = db1.import_bytes(Bytes::from(data), BlobFormat::Raw).await?;
let mut buffer = BytesMut::with_capacity(SIZE + 1024 * 1024);
let entry = db1.get(tag.hash()).await?.expect("We just wrote this hash");

entry.write_verifiable_stream(0, &mut buffer).await?;

db2.import_verifiable_stream(*tag.hash(), SIZE as u64, 0, buffer.freeze())
.await?;

let (tx, rx) = async_channel::bounded(128);
let handle = tokio::spawn(async move {
while let Ok(progress) = rx.recv().await {
if let ValidateProgress::Abort(err) = progress {
panic!("Got an error: {err}");
}
}
});
db2.validate(false, AsyncChannelProgressSender::new(tx).boxed())
.await?;
handle.await?;

Ok(())
}
107 changes: 105 additions & 2 deletions src/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,18 @@ use std::{collections::BTreeSet, future::Future, io, path::PathBuf, time::Durati

pub use bao_tree;
use bao_tree::{
io::fsm::{BaoContentItem, Outboard},
io::{
fsm::{
encode_ranges_validated, BaoContentItem, Outboard, ResponseDecoder, ResponseDecoderNext,
},
DecodeError,
},
BaoTree, ChunkRanges,
};
use bytes::Bytes;
use futures_lite::{Stream, StreamExt};
use genawaiter::rc::{Co, Gen};
use iroh_io::AsyncSliceReader;
use iroh_io::{AsyncSliceReader, AsyncStreamReader, AsyncStreamWriter};
pub use range_collections;
use serde::{Deserialize, Serialize};
use tokio::io::AsyncRead;
Expand Down Expand Up @@ -90,6 +95,31 @@ pub trait MapEntry: std::fmt::Debug + Clone + Send + Sync + 'static {
fn outboard(&self) -> impl Future<Output = io::Result<impl Outboard>> + Send;
/// A future that resolves to a reader that can be used to read the data
fn data_reader(&self) -> impl Future<Output = io::Result<impl AsyncSliceReader>> + Send;

/// Encodes data and outboard into a [`AsyncStreamWriter`].
///
/// Data and outboard parts will be interleaved.
///
/// `offset` is the byte offset in the blob to start the stream from. It will be rounded down to
matheus23 marked this conversation as resolved.
Show resolved Hide resolved
/// the next chunk group.
///
/// Returns immediately without error if `start` is equal or larger than the entry's size.
fn write_verifiable_stream<'a>(
&'a self,
offset: u64,
writer: impl AsyncStreamWriter + 'a,
) -> impl Future<Output = io::Result<()>> + 'a {
async move {
let size = self.size().value();
if offset >= size {
return Ok(());
}
let ranges = range_from_offset_and_length(offset, size - offset);
let (outboard, data) = tokio::try_join!(self.outboard(), self.data_reader())?;
encode_ranges_validated(data, outboard, &ranges, writer).await?;
Ok(())
}
}
}

/// A generic map from hashes to bao blobs (blobs with bao outboards).
Expand Down Expand Up @@ -341,6 +371,74 @@ pub trait Store: ReadableStore + MapMut + std::fmt::Debug {
self.import_stream(stream, format, progress)
}

/// Import a blob from a verified stream, as emitted by [`MapEntry::write_verifiable_stream`];
///
/// `total_size` is the total size of the blob as reported by the remote.
/// `offset` is the byte offset in the blob where the stream starts. It will be rounded
/// to the next chunk group.
fn import_verifiable_stream<'a>(
&'a self,
hash: Hash,
total_size: u64,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does the total_size come from? Is it a verified size?

offset: u64,
reader: impl AsyncStreamReader + 'a,
) -> impl Future<Output = io::Result<()>> + 'a {
async move {
if offset >= total_size {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"offset must not be greater than total_size",
));
}
let entry = self.get_or_create(hash, total_size).await?;
let mut bw = entry.batch_writer().await?;

let ranges = range_from_offset_and_length(offset, total_size - offset);
let mut decoder = ResponseDecoder::new(
hash.into(),
ranges,
BaoTree::new(total_size, IROH_BLOCK_SIZE),
reader,
);
let size = decoder.tree().size();
let mut buf = Vec::new();
let is_complete = loop {
decoder = match decoder.next().await {
ResponseDecoderNext::More((decoder, item)) => {
let item = match item {
Err(DecodeError::LeafNotFound(_) | DecodeError::ParentNotFound(_)) => {
break false
}
Err(err) => return Err(err.into()),
Ok(item) => item,
};
match &item {
BaoContentItem::Parent(_) => {
buf.push(item);
}
BaoContentItem::Leaf(_) => {
buf.push(item);
let batch = std::mem::take(&mut buf);
bw.write_batch(size, batch).await?;
}
}
decoder
}
ResponseDecoderNext::Done(_reader) => {
debug_assert!(buf.is_empty(), "last node of bao tree must be leaf node");
break true;
}
};
};
bw.sync().await?;
drop(bw);
if is_complete {
self.insert_complete(entry).await?;
}
Ok(())
}
}

/// Set a tag
fn set_tag(
&self,
Expand Down Expand Up @@ -386,6 +484,11 @@ pub trait Store: ReadableStore + MapMut + std::fmt::Debug {
}
}

fn range_from_offset_and_length(offset: u64, length: u64) -> bao_tree::ChunkRanges {
let ranges = bao_tree::ByteRanges::from(offset..(offset + length));
bao_tree::io::round_up_to_chunks(&ranges)
}

async fn validate_impl(
store: &impl Store,
repair: bool,
Expand Down
Loading