Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Async test (ignore) #272

Closed
wants to merge 12 commits into from
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ include = ["src/**/*", "Cargo.*", "LICENSE", "README.md", "CHANGELOG.md"]
default = ["rustls"]
openssl = ["actix-web/openssl", "awc/openssl"]
rustls = ["actix-web/rustls-0_21", "awc/rustls-0_21"]
shuttle = ["dep:shuttle-actix-web", "dep:shuttle-runtime", "dep:tokio"]
shuttle = ["dep:shuttle-actix-web", "dep:shuttle-runtime"]

[dependencies]
actix-web = { version = "4.5.1" }
Expand Down Expand Up @@ -44,7 +44,7 @@ humantime-serde = "1.1.1"
glob = "0.3.1"
ring = "0.17.8"
hotwatch = "0.5.0"
tokio = { version = "1.37.0", optional = true }
tokio = { version = "1.37.0", features = ["fs"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
uts2ts = "0.4.1"
Expand All @@ -65,6 +65,7 @@ default-features = false

[dev-dependencies]
actix-rt = "2.9.0"
tempfile = "3.10.1"

[profile.dev]
opt-level = 0
Expand Down
6 changes: 3 additions & 3 deletions src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use actix_web::http::Method;
use actix_web::middleware::ErrorHandlerResponse;
use actix_web::{error, web, Error};
use std::collections::HashSet;
use std::sync::RwLock;
use tokio::sync::RwLock;

/// Extracts the tokens from the authorization header by token type.
///
Expand All @@ -14,8 +14,8 @@ pub(crate) async fn extract_tokens(req: &ServiceRequest) -> Result<HashSet<Token
let config = req
.app_data::<web::Data<RwLock<Config>>>()
.map(|cfg| cfg.read())
.and_then(Result::ok)
.ok_or_else(|| error::ErrorInternalServerError("cannot acquire config"))?;
.ok_or_else(|| error::ErrorInternalServerError("cannot acquire config"))?
.await;

let mut user_tokens = HashSet::with_capacity(2);

Expand Down
9 changes: 8 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,20 @@ pub struct PasteConfig {
pub delete_expired_files: Option<CleanupConfig>,
}

/// Default interval for cleanup
pub const DEFAULT_CLEANUP_INTERVAL: Duration = Duration::from_secs(60);

const fn get_default_cleanup_interval() -> Duration {
DEFAULT_CLEANUP_INTERVAL
}

/// Cleanup configuration.
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct CleanupConfig {
/// Enable cleaning up.
pub enabled: bool,
/// Interval between clean-ups.
#[serde(default, with = "humantime_serde")]
#[serde(default = "get_default_cleanup_interval", with = "humantime_serde")]
pub interval: Duration,
}

Expand Down
17 changes: 10 additions & 7 deletions src/file.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::util;
use actix_web::{error, Error as ActixError};
use glob::glob;
use std::convert::TryFrom;
use std::fs::File as OsFile;
Expand All @@ -21,12 +20,16 @@ pub struct Directory {
}

impl<'a> TryFrom<&'a Path> for Directory {
type Error = ActixError;
type Error = String;
fn try_from(directory: &'a Path) -> Result<Self, Self::Error> {
let files = glob(directory.join("**").join("*").to_str().ok_or_else(|| {
error::ErrorInternalServerError("directory contains invalid characters")
})?)
.map_err(error::ErrorInternalServerError)?
let files = glob(
directory
.join("**")
.join("*")
.to_str()
.ok_or_else(|| String::from("directory contains invalid characters"))?,
)
.map_err(|e| e.msg)?
.filter_map(Result::ok)
.filter(|path| !path.is_dir())
.filter_map(|path| match OsFile::open(&path) {
Expand Down Expand Up @@ -58,7 +61,7 @@ mod tests {
use std::ffi::OsString;

#[test]
fn test_file_checksum() -> Result<(), ActixError> {
fn test_file_checksum() -> Result<(), String> {
assert_eq!(
Some(OsString::from("rustypaste_logo.png").as_ref()),
Directory::try_from(
Expand Down
99 changes: 45 additions & 54 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use actix_web::{App, HttpServer};
use awc::ClientBuilder;
use hotwatch::notify::event::ModifyKind;
use hotwatch::{Event, EventKind, Hotwatch};
use rustypaste::config::{Config, ServerConfig};
use rustypaste::config::{Config, DEFAULT_CLEANUP_INTERVAL};
use rustypaste::middleware::ContentLengthLimiter;
use rustypaste::paste::PasteType;
use rustypaste::server;
Expand All @@ -15,9 +15,9 @@ use std::env;
use std::fs;
use std::io::Result as IoResult;
use std::path::{Path, PathBuf};
use std::sync::{mpsc, RwLock};
use std::thread;
use std::time::Duration;
use tokio::sync::RwLock;
#[cfg(not(feature = "shuttle"))]
use tracing_subscriber::{
filter::LevelFilter, layer::SubscriberExt as _, util::SubscriberInitExt as _, EnvFilter,
Expand All @@ -38,7 +38,7 @@ extern crate tracing;
/// * initializes the logger
/// * creates the necessary directories
/// * spawns the threads
fn setup(config_folder: &Path) -> IoResult<(Data<RwLock<Config>>, ServerConfig, Hotwatch)> {
async fn setup(config_folder: &Path) -> IoResult<(Data<RwLock<Config>>, Hotwatch)> {
// Load the .env file.
dotenvy::dotenv().ok();

Expand All @@ -61,24 +61,23 @@ fn setup(config_folder: &Path) -> IoResult<(Data<RwLock<Config>>, ServerConfig,
}
None => config_folder.join("config.toml"),
};

if !config_path.exists() {
error!(
"{} is not found, please provide a configuration file.",
config_path.display()
);
std::process::exit(1);
}

let config = Config::parse(&config_path).expect("failed to parse config");
trace!("{:#?}", config);
config.warn_deprecation();
let server_config = config.server.clone();
let paste_config = RwLock::new(config.paste.clone());
let (config_sender, config_receiver) = mpsc::channel::<Config>();

// Create necessary directories.
fs::create_dir_all(&server_config.upload_path)?;
fs::create_dir_all(&config.server.upload_path)?;
for paste_type in &[PasteType::Url, PasteType::Oneshot, PasteType::OneshotUrl] {
fs::create_dir_all(paste_type.get_path(&server_config.upload_path)?)?;
fs::create_dir_all(paste_type.get_path(&config.server.upload_path)?)?;
}

// Set up a watcher for the configuration file changes.
Expand All @@ -91,82 +90,71 @@ fn setup(config_folder: &Path) -> IoResult<(Data<RwLock<Config>>, ServerConfig,
)
.expect("failed to initialize configuration file watcher");

let config_lock = Data::new(RwLock::new(config));

// Hot-reload the configuration file.
let config = Data::new(RwLock::new(config));
let cloned_config = Data::clone(&config);
let config_watcher_config = config_lock.clone();
let config_watcher = move |event: Event| {
if let (EventKind::Modify(ModifyKind::Data(_)), Some(path)) =
(event.kind, event.paths.first())
{
info!("Reloading configuration");

match Config::parse(path) {
Ok(config) => match cloned_config.write() {
Ok(mut cloned_config) => {
*cloned_config = config.clone();
info!("Configuration has been updated.");
if let Err(e) = config_sender.send(config) {
error!("Failed to send config for the cleanup routine: {}", e)
}
cloned_config.warn_deprecation();
}
Err(e) => {
error!("Failed to acquire config: {}", e);
}
},
Ok(new_config) => {
let mut locked_config = config_watcher_config.blocking_write();
*locked_config = new_config;
info!("Configuration has been updated.");
locked_config.warn_deprecation();
}
Err(e) => {
error!("Failed to update config: {}", e);
}
}
}
};

hotwatch
.watch(&config_path, config_watcher)
.unwrap_or_else(|_| panic!("failed to watch {config_path:?}"));

// Create a thread for cleaning up expired files.
let upload_path = server_config.upload_path.clone();
let expired_files_config = config_lock.clone();
let mut cleanup_interval = DEFAULT_CLEANUP_INTERVAL;
thread::spawn(move || loop {
let mut enabled = false;
if let Some(ref cleanup_config) = paste_config
.read()
.ok()
.and_then(|v| v.delete_expired_files.clone())
// Additional context block to ensure the config lock is dropped
{
if cleanup_config.enabled {
debug!("Running cleanup...");
for file in util::get_expired_files(&upload_path) {
match fs::remove_file(&file) {
Ok(()) => info!("Removed expired file: {:?}", file),
Err(e) => error!("Cannot remove expired file: {}", e),
let locked_config = expired_files_config.blocking_read();
let upload_path = locked_config.server.upload_path.clone();

if let Some(ref cleanup_config) = locked_config.paste.delete_expired_files {
if cleanup_config.enabled {
debug!("Running cleanup...");
for file in util::get_expired_files(&upload_path) {
match fs::remove_file(&file) {
Ok(()) => info!("Removed expired file: {:?}", file),
Err(e) => error!("Cannot remove expired file: {}", e),
}
}
}
thread::sleep(cleanup_config.interval);
}
enabled = cleanup_config.enabled;
}
if let Some(new_config) = if enabled {
config_receiver.try_recv().ok()
} else {
config_receiver.recv().ok()
} {
match paste_config.write() {
Ok(mut paste_config) => {
*paste_config = new_config.paste;
}
Err(e) => {
error!("Failed to update config for the cleanup routine: {}", e);
cleanup_interval = cleanup_config.interval;
}
}
}

thread::sleep(cleanup_interval);
});

Ok((config, server_config, hotwatch))
Ok((config_lock, hotwatch))
}

#[cfg(not(feature = "shuttle"))]
#[actix_web::main]
async fn main() -> IoResult<()> {
// Set up the application.
let (config, server_config, _hotwatch) = setup(&PathBuf::new())?;
let (config, _hotwatch) = setup(&PathBuf::new()).await?;

// Extra context block ensures the lock is stopped
let server_config = { config.read().await.server.clone() };

// Create an HTTP server.
let mut http_server = HttpServer::new(move || {
Expand Down Expand Up @@ -203,7 +191,10 @@ async fn main() -> IoResult<()> {
#[shuttle_runtime::main]
async fn actix_web() -> ShuttleActixWeb<impl FnOnce(&mut ServiceConfig) + Send + Clone + 'static> {
// Set up the application.
let (config, server_config, _hotwatch) = setup(Path::new("shuttle"))?;
let (config, _hotwatch) = setup(Path::new("shuttle"))?;

// Extra context block ensures the lock is stopped
let server_config = { config.read().await.server.clone() };

// Create the service.
let service_config = move |cfg: &mut ServiceConfig| {
Expand Down
Loading