From d137c18e1ebfb7bbb67e7b27fa3b732debd337a5 Mon Sep 17 00:00:00 2001 From: yanhe Date: Wed, 4 Dec 2024 22:47:37 +0800 Subject: [PATCH] Additional file-stream examples --- examples/stream-to-file/Cargo.toml | 2 + examples/stream-to-file/src/main.rs | 292 +++++++++++++++++++++++++++- 2 files changed, 285 insertions(+), 9 deletions(-) diff --git a/examples/stream-to-file/Cargo.toml b/examples/stream-to-file/Cargo.toml index b159c871ba..fee5e0ebbe 100644 --- a/examples/stream-to-file/Cargo.toml +++ b/examples/stream-to-file/Cargo.toml @@ -5,7 +5,9 @@ edition = "2021" publish = false [dependencies] +async-stream = "0.3" axum = { path = "../../axum", features = ["multipart"] } +axum-extra = { path = "../../axum-extra", features = ["file-stream"] } futures = "0.3" tokio = { version = "1.0", features = ["full"] } tokio-util = { version = "0.7", features = ["io"] } diff --git a/examples/stream-to-file/src/main.rs b/examples/stream-to-file/src/main.rs index 7c44286d87..ded7c0e68c 100644 --- a/examples/stream-to-file/src/main.rs +++ b/examples/stream-to-file/src/main.rs @@ -4,22 +4,26 @@ //! cargo run -p example-stream-to-file //! ``` +use async_stream::try_stream; use axum::{ body::Bytes, extract::{Multipart, Path, Request}, - http::StatusCode, - response::{Html, Redirect}, + http::{header, HeaderMap, StatusCode}, + response::{Html, IntoResponse, Redirect, Response}, routing::{get, post}, BoxError, Router, }; +use axum_extra::response::file_stream::FileStream; use futures::{Stream, TryStreamExt}; -use std::io; -use tokio::{fs::File, io::BufWriter}; -use tokio_util::io::StreamReader; +use std::{io, path::PathBuf}; +use tokio::{ + fs::File, + io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufWriter}, +}; +use tokio_util::io::{ReaderStream, StreamReader}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; - const UPLOADS_DIRECTORY: &str = "uploads"; - +const DOWNLOAD_DIRECTORY: &str = "downloads"; #[tokio::main] async fn main() { tracing_subscriber::registry() @@ -35,9 +39,23 @@ async fn main() { .await .expect("failed to create `uploads` directory"); + tokio::fs::create_dir(DOWNLOAD_DIRECTORY) + .await + .expect("failed to create `downloads` directory"); + + //create a file to download + create_test_file(std::path::Path::new(DOWNLOAD_DIRECTORY).join("test.txt")) + .await + .expect("failed to create test file"); + let app = Router::new() - .route("/", get(show_form).post(accept_form)) - .route("/file/{file_name}", post(save_request_body)); + .route("/upload", get(show_form).post(accept_form)) + .route("/", get(show_form2).post(accept_form)) + .route("/file/{file_name}", post(save_request_body)) + .route("/file_download", get(file_download_handler)) + .route("/simpler_file_download", get(simpler_file_download_handler)) + .route("/range_file", get(file_range_handler)) + .route("/range_file_stream", get(try_file_range_handler)); let listener = tokio::net::TcpListener::bind("127.0.0.1:3000") .await @@ -46,6 +64,19 @@ async fn main() { axum::serve(listener, app).await.unwrap(); } +async fn create_test_file(path: PathBuf) -> io::Result<()> { + let mut file = File::create(path).await?; + for i in 1..=30 { + let line = format!( + "Hello, this is the simulated file content! This is line {}\n", + i + ); + file.write_all(line.as_bytes()).await?; + } + file.flush().await?; + Ok(()) +} + // Handler that streams the request body to a file. // // POST'ing to `/file/foo.txt` will create a file called `foo.txt`. @@ -84,6 +115,249 @@ async fn show_form() -> Html<&'static str> { ) } +// Handler that returns HTML for a multipart form. +async fn show_form2() -> Html<&'static str> { + Html( + r#" + + + + Upload and Download! + + +

Upload and Download Files

+ + +
+
+ +
+ +
+ +
+
+ +
+ +
+
+ +
+
+ + +
+
+ + +
+
+ + + + "#, + ) +} + +/// A simpler file download handler that uses the `FileStream` response. +/// Returns the entire file as a stream. +async fn simpler_file_download_handler() -> Response { + //If you want to simply return a file as a stream + // you can use the from_path method directly, passing in the path of the file to construct a stream with a header and length. + FileStream::>::from_path( + &std::path::Path::new(DOWNLOAD_DIRECTORY).join("test.txt"), + ) + .await + .map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Failed to open file").into_response()) + .into_response() +} + +/// If you want to control the returned files in more detail you can implement a Stream +/// For example, use the try_stream! macro to construct a file stream and set which parts are needed. +async fn file_download_handler() -> Response { + let file_path = format!("{DOWNLOAD_DIRECTORY}/test.txt"); + let file_stream = match try_stream(&file_path, 5, 25, 10).await { + Ok(file_stream) => file_stream, + Err(e) => { + println!("{e}"); + return (StatusCode::INTERNAL_SERVER_ERROR, "Failed try stream!").into_response(); + } + }; + + // Use FileStream to return and set some information. + // Will set application/octet-stream in the header. + let file_stream_resp = FileStream::new(Box::pin(file_stream)) + .file_name("test.txt") + .content_size(20_u64); + + file_stream_resp.into_response() +} + +/// More complex manipulation of files and conversion to a stream +async fn try_stream( + file_path: &str, + start: u64, + mut end: u64, + buffer_size: usize, +) -> Result, std::io::Error>>, String> { + let mut file = File::open(file_path) + .await + .map_err(|e| format!("open file:{file_path} err:{e}"))?; + + file.seek(std::io::SeekFrom::Start(start)) + .await + .map_err(|e| format!("file:{file_path} seek err:{e}"))?; + + if end == 0 { + let metadata = file + .metadata() + .await + .map_err(|e| format!("file:{file_path} get metadata err:{e}"))?; + end = metadata.len(); + } + + let mut buffer = vec![0; buffer_size]; + + let stream = try_stream! { + let mut total_read = 0; + + while total_read < end { + let bytes_to_read = std::cmp::min(buffer_size as u64, end - total_read); + let n = file.read(&mut buffer[..bytes_to_read as usize]).await.map_err(|e| { + std::io::Error::new(std::io::ErrorKind::Other, e) + })?; + if n == 0 { + break; // EOF + } + total_read += n as u64; + yield buffer[..n].to_vec(); + + } + }; + Ok(stream) +} + +async fn try_stream2( + mut file: File, + start: u64, + mut end: u64, + buffer_size: usize, +) -> Result, std::io::Error>>, String> { + file.seek(std::io::SeekFrom::Start(start)) + .await + .map_err(|e| format!("file seek err:{e}"))?; + + if end == 0 { + let metadata = file + .metadata() + .await + .map_err(|e| format!("file get metadata err:{e}"))?; + end = metadata.len(); + } + + let mut buffer = vec![0; buffer_size]; + + let stream = try_stream! { + let mut total_read = 0; + + while total_read < end { + let bytes_to_read = std::cmp::min(buffer_size as u64, end - total_read); + let n = file.read(&mut buffer[..bytes_to_read as usize]).await.map_err(|e| { + std::io::Error::new(std::io::ErrorKind::Other, e) + })?; + if n == 0 { + break; // EOF + } + total_read += n as u64; + yield buffer[..n].to_vec(); + + } + }; + Ok(stream) +} + +/// A file download handler that accepts a range header and returns a partial file as a stream. +/// You can return directly from the path +/// But you can't download this stream directly from your browser, you need to use a tool like curl or Postman. +async fn try_file_range_handler(headers: HeaderMap) -> Response { + let range_header = headers + .get(header::RANGE) + .and_then(|value| value.to_str().ok()); + + let (start, end) = if let Some(range) = range_header { + if let Some(range) = parse_range_header(range) { + range + } else { + return (StatusCode::RANGE_NOT_SATISFIABLE, "Invalid Range").into_response(); + } + } else { + (0, 0) // default range end = 0, if end = 0 end == file size - 1 + }; + + let file_path = format!("{DOWNLOAD_DIRECTORY}/test.txt"); + FileStream::>::try_range_response( + std::path::Path::new(&file_path), + start, + end, + ) + .await + .unwrap() +} + +/// If you want to control the stream yourself +async fn file_range_handler(headers: HeaderMap) -> Response { + // Parse the range header to get the start and end values. + let range_header = headers + .get(header::RANGE) + .and_then(|value| value.to_str().ok()); + + // If the range header is invalid, return a 416 Range Not Satisfiable response. + let (start, end) = if let Some(range) = range_header { + if let Some(range) = parse_range_header(range) { + range + } else { + return (StatusCode::RANGE_NOT_SATISFIABLE, "Invalid Range").into_response(); + } + } else { + (0, 0) // default range end = 0, if end = 0 end == file size - 1 + }; + + let file_path = format!("{DOWNLOAD_DIRECTORY}/test.txt"); + + let file = File::open(file_path).await.unwrap(); + + let file_size = file.metadata().await.unwrap().len(); + + let file_stream = match try_stream2(file, start, end, 256).await { + Ok(file_stream) => file_stream, + Err(e) => { + println!("{e}"); + return (StatusCode::INTERNAL_SERVER_ERROR, "Failed try stream!").into_response(); + } + }; + + FileStream::new(Box::pin(file_stream)).into_range_response(start, end, file_size) +} + +/// Parse the range header and return the start and end values. +fn parse_range_header(range: &str) -> Option<(u64, u64)> { + let range = range.strip_prefix("bytes=")?; + let mut parts = range.split('-'); + let start = parts.next()?.parse::().ok()?; + let end = parts + .next() + .and_then(|s| s.parse::().ok()) + .unwrap_or(0); + if start > end { + return None; + } + Some((start, end)) +} + // Handler that accepts a multipart form upload and streams each field to a file. async fn accept_form(mut multipart: Multipart) -> Result { while let Ok(Some(field)) = multipart.next_field().await {