diff --git a/crates/brush-dataset/src/brush_vfs.rs b/crates/brush-dataset/src/brush_vfs.rs index df39b406..628b9c0f 100644 --- a/crates/brush-dataset/src/brush_vfs.rs +++ b/crates/brush-dataset/src/brush_vfs.rs @@ -12,8 +12,8 @@ use std::{ }; use anyhow::Context; -use tokio::io::AsyncReadExt; -use tokio::{io::AsyncRead, sync::Mutex}; +use tokio::{io::AsyncRead, io::AsyncReadExt, sync::Mutex}; + use zip::{ result::{ZipError, ZipResult}, ZipArchive, @@ -66,6 +66,7 @@ impl PathReader { pub enum BrushVfs { Zip(ZipArchive>), Manual(PathReader), + #[cfg(not(target_family = "wasm"))] Directory(PathBuf, Vec), } @@ -87,6 +88,7 @@ impl BrushVfs { BrushVfs::Manual(paths) } + #[cfg(not(target_family = "wasm"))] pub async fn from_directory(dir: &Path) -> anyhow::Result { let mut read = ::tokio::fs::read_dir(dir).await?; let mut paths = vec![]; @@ -100,6 +102,7 @@ impl BrushVfs { let iterator: Box> = match self { BrushVfs::Zip(archive) => Box::new(archive.file_names().map(Path::new)), BrushVfs::Manual(map) => Box::new(map.paths().map(|p| p.as_path())), + #[cfg(not(target_family = "wasm"))] BrushVfs::Directory(_, paths) => Box::new(paths.iter().map(|p| p.as_path())), }; // stupic macOS. @@ -119,6 +122,7 @@ impl BrushVfs { Ok(Box::new(Cursor::new(buffer))) } BrushVfs::Manual(map) => map.open(path).await, + #[cfg(not(target_family = "wasm"))] BrushVfs::Directory(path_buf, _) => { let file = tokio::fs::File::open(path_buf).await?; Ok(Box::new(file)) diff --git a/crates/brush-viewer/src/data_source.rs b/crates/brush-viewer/src/data_source.rs new file mode 100644 index 00000000..e69de29b diff --git a/crates/brush-viewer/src/viewer.rs b/crates/brush-viewer/src/viewer.rs index 4ed3127b..faf25c05 100644 --- a/crates/brush-viewer/src/viewer.rs +++ b/crates/brush-viewer/src/viewer.rs @@ -16,6 +16,9 @@ use burn_wgpu::{Wgpu, WgpuDevice}; use eframe::egui; use egui_tiles::{Container, Tile, TileId, Tiles}; use glam::{Affine3A, Quat, Vec3, Vec3A}; +use tokio_stream::wrappers::ReceiverStream; +use tokio_util::bytes::Bytes; +use tokio_util::io::StreamReader; use tokio_with_wasm::alias as tokio; use ::tokio::io::AsyncReadExt; @@ -145,7 +148,7 @@ fn process_loop( // Small hack to peek some bytes: Read them // and add them at the start again. - let data = source.read().await?; + let data = source.into_reader()?; let mut data = BufReader::new(data); let mut peek = [0; 128]; data.read_exact(&mut peek).await?; @@ -228,31 +231,53 @@ pub enum DataSource { Url(String), } -#[cfg(target_family = "wasm")] -type DataRead = Pin>; - -#[cfg(not(target_family = "wasm"))] type DataRead = Pin>; impl DataSource { - async fn read(&self) -> anyhow::Result { - match self { - DataSource::PickFile => { - let picked = rrfd::pick_file().await?; - let data = picked.read().await; - Ok(Box::pin(std::io::Cursor::new(data))) - } - DataSource::Url(url) => { - let mut url = url.to_owned(); - if !url.starts_with("http://") && !url.starts_with("https://") { - url = format!("https://{}", url); + fn into_reader(self) -> anyhow::Result { + let (send, rec) = ::tokio::sync::mpsc::channel(16); + + // Spawn the data reading. + tokio::spawn(async move { + let stream = try_fn_stream(|emitter| async move { + match self { + DataSource::PickFile => { + let picked = rrfd::pick_file() + .await + .map_err(|_| std::io::ErrorKind::NotFound)?; + let data = picked.read().await; + emitter.emit(Bytes::from_owner(data)).await; + } + DataSource::Url(url) => { + let mut url = url.to_owned(); + if !url.starts_with("http://") && !url.starts_with("https://") { + url = format!("https://{}", url); + } + let mut response = reqwest::get(url) + .await + .map_err(|_| std::io::ErrorKind::InvalidInput)? + .bytes_stream(); + + while let Some(bytes) = response.next().await { + let bytes = bytes.map_err(|_| std::io::ErrorKind::ConnectionAborted)?; + emitter.emit(bytes).await; + } + } + }; + anyhow::Result::<(), std::io::Error>::Ok(()) + }); + + let mut stream = std::pin::pin!(stream); + + while let Some(data) = stream.next().await { + if send.send(data).await.is_err() { + break; } - let response = reqwest::get(url).await?.bytes_stream(); - let mapped = response - .map(|e| e.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))); - Ok(Box::pin(tokio_util::io::StreamReader::new(mapped))) } - } + }); + + let reader = StreamReader::new(ReceiverStream::new(rec)); + Ok(reader) } }