Skip to content

Commit

Permalink
utilize async as much as possible
Browse files Browse the repository at this point in the history
  • Loading branch information
magiclen committed Nov 11, 2023
1 parent aa49af1 commit d81e365
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 53 deletions.
8 changes: 5 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "nginx-cache-purge"
version = "0.3.1"
version = "0.3.2"
authors = ["Magic Len <[email protected]>"]
edition = "2021"
rust-version = "1.70"
Expand Down Expand Up @@ -28,8 +28,10 @@ anyhow = "1"
md-5 = "0.10"
scanner-rust = "2"

tokio = { version = "1", features = ["full"] }
async-recursion = "1"

serde = { version = "1", features = ["derive"], optional = true }
tokio = { version = "1", features = ["full"], optional = true }
hyper = { version = "0.14", optional = true }
axum = { version = "0.6", optional = true }

Expand All @@ -40,4 +42,4 @@ tower-http = { version = "0.4", features = ["trace", "set-header"], optional = t

[features]
default = ["service"]
service = ["serde", "tokio", "hyper", "axum", "tracing", "enable-ansi-support", "tracing-subscriber", "tower-http"]
service = ["serde", "hyper", "axum", "tracing", "enable-ansi-support", "tracing-subscriber", "tower-http"]
90 changes: 60 additions & 30 deletions src/functions.rs
Original file line number Diff line number Diff line change
@@ -1,45 +1,50 @@
use std::{io, path::Path};
use std::{
io,
path::{Path, PathBuf},
sync::Arc,
};

use anyhow::{anyhow, Context};
use async_recursion::async_recursion;
use md5::{Digest, Md5};
use scanner_rust::{generic_array::typenum::U384, ScannerAscii};

use crate::AppResult;

#[inline]
fn remove_file<P: AsRef<Path>>(path: P) -> io::Result<()> {
async fn remove_file<P: AsRef<Path>>(path: P) -> io::Result<()> {
let path = path.as_ref();

if cfg!(debug_assertions) {
println!("Remove file: {path:?}");

Ok(())
} else {
std::fs::remove_file(path)
tokio::fs::remove_file(path).await
}
}

#[inline]
fn remove_dir_all<P: AsRef<Path>>(path: P) -> io::Result<()> {
async fn remove_dir_all<P: AsRef<Path>>(path: P) -> io::Result<()> {
let path = path.as_ref();

if cfg!(debug_assertions) {
println!("Remove dir all: {path:?}");

Ok(())
} else {
std::fs::remove_dir_all(path)
tokio::fs::remove_dir_all(path).await
}
}

#[inline]
fn remove_dir<P: AsRef<Path>>(path: P) -> io::Result<()> {
async fn remove_dir<P: AsRef<Path>>(path: P) -> io::Result<()> {
let path = path.as_ref();

if cfg!(debug_assertions) {
println!("Remove dir: {path:?}");
} else {
match std::fs::remove_dir(path) {
match tokio::fs::remove_dir(path).await {
Ok(_) => (),
Err(error) => {
// check if the error is caused by directory is not empty
Expand All @@ -56,10 +61,13 @@ fn remove_dir<P: AsRef<Path>>(path: P) -> io::Result<()> {
Ok(())
}

fn remove_empty_ancestors<P: AsRef<Path>>(path: P, relative_degree: usize) -> anyhow::Result<()> {
async fn remove_empty_ancestors<P: AsRef<Path>>(
path: P,
relative_degree: usize,
) -> anyhow::Result<()> {
if let Some(mut path) = path.as_ref().parent() {
for _ in 1..=relative_degree {
match remove_dir(path) {
match remove_dir(path).await {
Ok(_) => (),
Err(error)
if matches!(error.kind(), io::ErrorKind::NotFound | io::ErrorKind::Other) =>
Expand All @@ -82,7 +90,7 @@ fn remove_empty_ancestors<P: AsRef<Path>>(path: P, relative_degree: usize) -> an
}

/// Do something like `rm -rf /path/to/*`. The `/path/to` directory will not be deleted. This function may be dangerous.
pub fn remove_all_files_in_directory<P: AsRef<Path>>(path: P) -> anyhow::Result<bool> {
pub async fn remove_all_files_in_directory<P: AsRef<Path>>(path: P) -> anyhow::Result<bool> {
let mut result = false;

let path = path.as_ref();
Expand All @@ -99,7 +107,7 @@ pub fn remove_all_files_in_directory<P: AsRef<Path>>(path: P) -> anyhow::Result<
let path = dir_entry.path();

if file_type.is_dir() {
match remove_dir_all(&path) {
match remove_dir_all(&path).await {
Ok(_) => result = true,
Err(error) if error.kind() == io::ErrorKind::NotFound => {
result = true;
Expand All @@ -108,7 +116,7 @@ pub fn remove_all_files_in_directory<P: AsRef<Path>>(path: P) -> anyhow::Result<
Err(error) => return Err(error).with_context(|| anyhow!("{path:?}")),
}
} else {
match remove_file(&path) {
match remove_file(&path).await {
Ok(_) => result = true,
Err(error) if error.kind() == io::ErrorKind::NotFound => {
result = true;
Expand All @@ -124,7 +132,7 @@ pub fn remove_all_files_in_directory<P: AsRef<Path>>(path: P) -> anyhow::Result<
}

/// Purge a cache with a specific key.
pub fn remove_one_cache<P: AsRef<Path>, L: AsRef<str>, K: AsRef<str>>(
pub async fn remove_one_cache<P: AsRef<Path>, L: AsRef<str>, K: AsRef<str>>(
cache_path: P,
levels: L,
key: K,
Expand Down Expand Up @@ -163,9 +171,9 @@ pub fn remove_one_cache<P: AsRef<Path>, L: AsRef<str>, K: AsRef<str>>(

file_path.push(hashed_key);

match remove_file(&file_path) {
match remove_file(&file_path).await {
Ok(_) => {
remove_empty_ancestors(file_path, number_of_levels)?;
remove_empty_ancestors(file_path, number_of_levels).await?;

Ok(AppResult::Ok)
},
Expand All @@ -177,16 +185,24 @@ pub fn remove_one_cache<P: AsRef<Path>, L: AsRef<str>, K: AsRef<str>>(
}

/// Purge multiple caches via wildcard.
pub fn remove_caches_via_wildcard<P: AsRef<Path>, L: AsRef<str>, K: AsRef<str>>(
pub async fn remove_caches_via_wildcard<P: AsRef<Path>, L: AsRef<str>, K: AsRef<str>>(
cache_path: P,
levels: L,
key: K,
) -> anyhow::Result<AppResult> {
fn iterate(levels: &[&str], keys: &[&[u8]], path: &Path, level: usize) -> anyhow::Result<bool> {
#[async_recursion]
async fn iterate(
levels: Arc<Vec<String>>,
keys: Arc<Vec<Vec<u8>>>,
path: PathBuf,
level: usize,
) -> anyhow::Result<bool> {
let mut result = false;

let number_of_levels = levels.len();

let mut tasks = Vec::new();

for dir_entry in path.read_dir().with_context(|| anyhow!("{path:?}"))? {
let dir_entry = dir_entry.with_context(|| anyhow!("{path:?}"))?;

Expand All @@ -198,15 +214,26 @@ pub fn remove_caches_via_wildcard<P: AsRef<Path>, L: AsRef<str>, K: AsRef<str>>(

if number_of_levels == level {
if file_type.is_file() {
result =
match_key_and_remove_one_cache(dir_entry.path(), keys, number_of_levels)?
|| result;
tasks.push(tokio::spawn(match_key_and_remove_one_cache(
dir_entry.path(),
keys.clone(),
number_of_levels,
)));
}
} else if file_type.is_dir() {
result = iterate(levels, keys, dir_entry.path().as_path(), level + 1)? || result;
tasks.push(tokio::spawn(iterate(
levels.clone(),
keys.clone(),
dir_entry.path(),
level + 1,
)));
}
}

for task in tasks {
result = task.await.unwrap()? || result;
}

Ok(result)
}

Expand Down Expand Up @@ -251,7 +278,7 @@ pub fn remove_caches_via_wildcard<P: AsRef<Path>, L: AsRef<str>, K: AsRef<str>>(
};

if keys.len() == 1 && keys[0].is_empty() {
return remove_all_files_in_directory(cache_path).map(|modified| {
return remove_all_files_in_directory(cache_path).await.map(|modified| {
if modified {
AppResult::Ok
} else {
Expand All @@ -260,11 +287,14 @@ pub fn remove_caches_via_wildcard<P: AsRef<Path>, L: AsRef<str>, K: AsRef<str>>(
});
}

let levels = parse_levels_stage_1(&levels)?;
let keys = keys.into_iter().map(|v| v.to_vec()).collect::<Vec<Vec<u8>>>();

let levels =
parse_levels_stage_1(&levels)?.into_iter().map(|s| s.to_string()).collect::<Vec<String>>();

let path = cache_path.as_ref();

iterate(&levels, &keys, path, 0).map(|modified| {
iterate(Arc::new(levels), Arc::new(keys), path.to_path_buf(), 0).await.map(|modified| {
if modified {
AppResult::Ok
} else {
Expand All @@ -273,9 +303,9 @@ pub fn remove_caches_via_wildcard<P: AsRef<Path>, L: AsRef<str>, K: AsRef<str>>(
})
}

fn match_key_and_remove_one_cache<P: AsRef<Path>>(
async fn match_key_and_remove_one_cache<P: AsRef<Path>>(
file_path: P,
keys: &[&[u8]],
keys: Arc<Vec<Vec<u8>>>,
number_of_levels: usize,
) -> anyhow::Result<bool> {
let file_path = file_path.as_ref();
Expand All @@ -299,7 +329,7 @@ fn match_key_and_remove_one_cache<P: AsRef<Path>>(
let keys_len = keys.len();

let hit = loop {
let key = keys[i];
let key = &keys[i];
let key_len = key.len();

if key_len == 0 {
Expand All @@ -309,7 +339,7 @@ fn match_key_and_remove_one_cache<P: AsRef<Path>>(
break true;
}

let key = keys[i];
let key = &keys[i];
let key_len = key.len();
debug_assert!(!key.is_empty());

Expand Down Expand Up @@ -347,13 +377,13 @@ fn match_key_and_remove_one_cache<P: AsRef<Path>>(
};

if hit {
match remove_file(file_path) {
match remove_file(file_path).await {
Ok(_) => (),
Err(error) if error.kind() == io::ErrorKind::NotFound => (),
Err(error) => return Err(error).with_context(|| anyhow!("{file_path:?}")),
}

remove_empty_ancestors(file_path, number_of_levels)?;
remove_empty_ancestors(file_path, number_of_levels).await?;

Ok(true)
} else {
Expand Down
33 changes: 19 additions & 14 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::{
use cli::*;
#[cfg(feature = "service")]
use server::*;
use tokio::runtime;

#[derive(Debug)]
pub enum AppResult {
Expand Down Expand Up @@ -44,7 +45,7 @@ impl Termination for AppResult {
}

#[inline]
fn purge<P: AsRef<Path>, L: AsRef<str>, K: AsRef<str>>(
async fn purge<P: AsRef<Path>, L: AsRef<str>, K: AsRef<str>>(
cache_path: P,
levels: L,
key: K,
Expand All @@ -54,24 +55,28 @@ fn purge<P: AsRef<Path>, L: AsRef<str>, K: AsRef<str>>(
let key = key.as_ref();

if key.contains('*') {
functions::remove_caches_via_wildcard(cache_path, levels, key)
functions::remove_caches_via_wildcard(cache_path, levels, key).await
} else {
functions::remove_one_cache(cache_path, levels, key)
functions::remove_one_cache(cache_path, levels, key).await
}
}

fn main() -> anyhow::Result<AppResult> {
let args = get_args();

match &args.command {
CLICommands::Purge {
cache_path,
levels,
key,
} => purge(cache_path, levels, key),
#[cfg(feature = "service")]
CLICommands::Start {
socket_file_path,
} => server_main(socket_file_path.as_path()),
}
let runtime = runtime::Runtime::new()?;

runtime.block_on(async move {
match &args.command {
CLICommands::Purge {
cache_path,
levels,
key,
} => purge(cache_path, levels, key).await,
#[cfg(feature = "service")]
CLICommands::Start {
socket_file_path,
} => server_main(socket_file_path.as_path()).await,
}
})
}
9 changes: 3 additions & 6 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use serde::Deserialize;
use tokio::{
fs,
net::{UnixListener, UnixStream},
runtime,
};
use tower_http::{
set_header::SetResponseHeaderLayer,
Expand Down Expand Up @@ -71,7 +70,7 @@ async fn index_handler(args: Query<Args>) -> impl IntoResponse {
}
}

match tokio::task::spawn_blocking(|| purge(cache_path, levels, key)).await.unwrap() {
match purge(cache_path, levels, key).await {
Ok(result) => match result {
AppResult::Ok => (StatusCode::OK, "Ok.".to_string()),
AppResult::AlreadyPurged(_) | AppResult::AlreadyPurgedWildcard => {
Expand Down Expand Up @@ -142,7 +141,7 @@ async fn run(socket_file_path: &Path) -> anyhow::Result<AppResult> {
}

#[inline]
pub fn server_main(socket_file_path: &Path) -> anyhow::Result<AppResult> {
pub async fn server_main(socket_file_path: &Path) -> anyhow::Result<AppResult> {
let mut ansi_color = io::stdout().is_terminal();

if ansi_color && enable_ansi_support::enable_ansi_support().is_err() {
Expand All @@ -154,7 +153,5 @@ pub fn server_main(socket_file_path: &Path) -> anyhow::Result<AppResult> {
.with(EnvFilter::builder().with_default_directive(Level::INFO.into()).from_env_lossy())
.init();

let runtime = runtime::Runtime::new()?;

runtime.block_on(run(socket_file_path))
run(socket_file_path).await
}

0 comments on commit d81e365

Please sign in to comment.