diff --git a/Cargo.lock b/Cargo.lock index c6de3ce..370f90e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1267,7 +1267,6 @@ dependencies = [ "anyhow", "smol", "spuz_folder", - "spuz_get", "spuz_piston", "spuz_spawner", "spuz_wrench", @@ -1305,22 +1304,6 @@ dependencies = [ "typed-builder", ] -[[package]] -name = "spuz_get_old" -version = "0.1.0" -dependencies = [ - "async-compression", - "futures", - "pin-project", - "reqwest", - "serde", - "spuz_folder", - "thiserror", - "tokio", - "tokio-util", - "url", -] - [[package]] name = "spuz_piston" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 50fd145..d945923 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,6 @@ spuz_piston = { path = "crates/spuz_piston" } spuz_folder = { path = "crates/spuz_folder" } spuz_spawner = { path = "crates/spuz_spawner" } spuz_wrench = { path = "crates/spuz_wrench" } -spuz_get = { path = "crates/spuz_get" } [workspace.lints.rust] [workspace.lints.clippy] diff --git a/crates/spuz_cli/Cargo.toml b/crates/spuz_cli/Cargo.toml index 3d897c9..68bcb2b 100644 --- a/crates/spuz_cli/Cargo.toml +++ b/crates/spuz_cli/Cargo.toml @@ -11,7 +11,6 @@ readme.workspace = true spuz_folder = { workspace = true } spuz_wrench = { workspace = true } spuz_piston = { workspace = true } -spuz_get = { workspace = true } spuz_spawner = { workspace = true, features = ["useful-layers", "process-handle"] } tracing = { workspace = true } diff --git a/crates/spuz_get/Cargo.toml b/crates/spuz_get/Cargo.toml deleted file mode 100644 index a267ff3..0000000 --- a/crates/spuz_get/Cargo.toml +++ /dev/null @@ -1,25 +0,0 @@ -[package] -name = "spuz_get" -version = "0.1.0" -edition.workspace = true -authors.workspace = true -license.workspace = true -repository.workspace = true -readme.workspace = true - -[dependencies] -spuz_folder = { workspace = true } - -thiserror = { workspace = true } -serde = { workspace = true } - -url = { version = "2" } -reqwest = { version = "0.12", features = ["stream", "json"] } -tokio = { version = "1", features = ["fs", "rt-multi-thread", "macros"] } -tokio-util = { version = "0.7" } -async-compression = { version = "0.4", features = ["futures-io", "lzma"] } -futures = { version = "0.3" } -pin-project = { version = "1" } - -[lints] -workspace = true diff --git a/crates/spuz_get/src/err.rs b/crates/spuz_get/src/err.rs deleted file mode 100644 index 89d78f0..0000000 --- a/crates/spuz_get/src/err.rs +++ /dev/null @@ -1,29 +0,0 @@ -use std::io::Error as IoError; - -use reqwest::Error as ReqwestError; -use thiserror::Error; - -pub type Result = std::result::Result; - -#[derive(Debug, Error)] -pub enum Error { - #[error(transparent)] - Io(#[from] IoError), - - #[error(transparent)] - Reqwest(#[from] ReqwestError), -} - -impl Error { - pub fn is_critical(&self) -> bool { - match self { - Error::Io(_) => true, - Error::Reqwest(err) - if err.is_status() || err.is_decode() || err.is_body() || err.is_redirect() || err.is_request() => - { - true - } - Error::Reqwest(_) => false, - } - } -} diff --git a/crates/spuz_get/src/event.rs b/crates/spuz_get/src/event.rs deleted file mode 100644 index c60b46b..0000000 --- a/crates/spuz_get/src/event.rs +++ /dev/null @@ -1,11 +0,0 @@ -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum Event { - JobStarted { tasks: usize, bytes: u64 }, - JobFinished, - JobFailed, - - TaskStarted, - TaskChunk { total: u64, size: usize }, - TaskFinished, - TaskFailed, -} diff --git a/crates/spuz_get/src/lib.rs b/crates/spuz_get/src/lib.rs deleted file mode 100644 index bea0317..0000000 --- a/crates/spuz_get/src/lib.rs +++ /dev/null @@ -1,13 +0,0 @@ -mod err; -mod event; -mod job; -mod shared; -mod task; -mod worker; - -pub use err::{Error, Result}; -pub use event::Event; -pub use job::{Job, JobBuilder, JobHandle}; -pub(crate) use shared::{loop_select, result_async, spawn}; -pub use task::Task; -pub use worker::Worker; diff --git a/crates/spuz_get/src/task.rs b/crates/spuz_get/src/task.rs deleted file mode 100644 index 3b29275..0000000 --- a/crates/spuz_get/src/task.rs +++ /dev/null @@ -1,32 +0,0 @@ -use std::{path::Path, sync::Arc}; - -use url::Url; - -#[derive(Debug, Clone)] -pub struct Task { - pub url: Url, - pub local: Arc, - pub size: u64, - - pub lzma: bool, - - pub(crate) retries: u8, -} - -impl Task { - pub fn new(url: Url, local: Arc, size: u64) -> Self { - Self { url, local, size, retries: 0, lzma: false } - } - - #[must_use] - pub fn lzma(mut self, enable: bool) -> Self { - self.lzma = enable; - self - } - - // TODO - #[allow(unused)] - pub(crate) fn retry_add(&mut self) { - self.retries += 1; - } -} diff --git a/crates/spuz_get/src/worker.rs b/crates/spuz_get/src/worker.rs deleted file mode 100644 index 41457af..0000000 --- a/crates/spuz_get/src/worker.rs +++ /dev/null @@ -1,163 +0,0 @@ -use std::{ - io, - io::ErrorKind, - pin::Pin, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, - task::{Context, Poll}, -}; - -use async_compression::futures::bufread::LzmaDecoder; -use futures::{io::BufReader, AsyncBufRead, AsyncRead, AsyncReadExt, TryStreamExt}; -use pin_project::pin_project; -use reqwest::Client; -use tokio::{ - fs::File, - io::AsyncWriteExt, - sync::{mpsc, Semaphore}, -}; - -use crate::{job::JobHandle, loop_select, result_async, spawn, Event, Job}; - -#[derive(Debug)] -pub struct Worker { - client: Client, - semaphore: Arc, -} - -impl Worker { - pub fn new(client: Client, concurrency: usize) -> Self { - let semaphore = Arc::new(Semaphore::new(concurrency)); - - Self { client, semaphore } - } - - pub fn push(&self, job: Job) -> Arc { - let (tx, rx) = mpsc::unbounded_channel(); - let handle = Arc::new(JobHandle::new(rx)); - let counter = Arc::new(AtomicUsize::new(job.total)); - - tx.send(Event::JobStarted { bytes: job.size, tasks: job.total }).unwrap(); - - for task in job.tasks { - let counter = counter.clone(); - let tx = tx.clone(); - let tx2 = tx.clone(); - let handle = handle.clone(); - let client = self.client.clone(); - let semaphore = self.semaphore.clone(); - - spawn! { - let permit = semaphore.acquire().await.expect("Semaphore unexpectedly closed"); - - tx.send(Event::TaskStarted).unwrap(); - - let result = result_async! { - let request = client.get(task.url); - let response = request.send().await?; - - let stream = response.bytes_stream().map_err(|err| io::Error::new(ErrorKind::Other, err)); - let reader = stream.into_async_read(); - let reader = TrackingReader::new(reader, &tx, task.size); - - let mut file = File::create(task.local).await?; - - let piped = task.lzma; - - let mut decoder: Box = if task.lzma { - Box::new(LzmaDecoder::new(reader)) - } else { - Box::new(reader) - }; - - let mut buf = vec![0; 16384]; - - loop_select! { - () = handle.ct.cancelled() => { - break; - } - read = decoder.read(&mut buf) => { - let read = read?; - - if read == 0 { - tx.send(Event::TaskFinished).unwrap(); - if counter.fetch_sub(1, Ordering::Relaxed) == 1 { - tx.send(Event::JobFinished).unwrap(); - }; - return Ok(()); - } - - if !piped { - tx.send(Event::TaskChunk { - total: task.size, - size: read, - }).unwrap(); - } - - file.write_all(&buf[..read]).await?; - } - } - - Ok(()) - }; - - if let Err(_err) = result.await { - tx2.send(Event::TaskFailed).unwrap(); - } - - drop(permit) - }; - } - - handle - } -} - -#[pin_project] -pub struct TrackingReader<'a, T> { - #[pin] - inner: BufReader, - total: u64, - tx: &'a mpsc::UnboundedSender, -} - -impl<'a, T> TrackingReader<'a, T> -where - T: AsyncRead, -{ - pub fn new(reader: T, tx: &'a mpsc::UnboundedSender, total: u64) -> Self { - Self { inner: BufReader::new(reader), total, tx } - } -} - -impl AsyncRead for TrackingReader<'_, T> -where - T: Unpin + AsyncRead, -{ - fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { - let this = self.project(); - this.inner.poll_read(cx, buf) - } -} - -impl AsyncBufRead for TrackingReader<'_, T> -where - T: Unpin + AsyncRead, -{ - fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - let result = this.inner.poll_fill_buf(cx); - if let Poll::Ready(Ok(result)) = &result { - let event = Event::TaskChunk { total: *this.total, size: result.len() }; - this.tx.send(event).unwrap(); - } - result - } - - fn consume(self: Pin<&mut Self>, amt: usize) { - let this = self.project(); - this.inner.consume(amt); - } -}