Skip to content
This repository has been archived by the owner on Mar 20, 2023. It is now read-only.

Commit

Permalink
feat: restrict which ports a workload is allowed to use
Browse files Browse the repository at this point in the history
- Don't allow ports outside of a specific range.
- Don't allow ports already in use by another running workload.
- Prevent users listening on too many ports.
- Fetch Enarx.toml from drawbridge to determine which ports it will use.

Signed-off-by: Nicholas Farshidmehr <[email protected]>
  • Loading branch information
definitelynobody authored and rvolosatovs committed Aug 2, 2022
1 parent ae678ea commit 278b8a9
Show file tree
Hide file tree
Showing 8 changed files with 308 additions and 25 deletions.
1 change: 1 addition & 0 deletions Cargo.nix
Original file line number Diff line number Diff line change
Expand Up @@ -1998,6 +1998,7 @@ in
features = builtins.concatLists [
[ "bytes" ]
[ "default" ]
[ "fs" ]
[ "io-std" ]
[ "io-util" ]
[ "libc" ]
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ edition = "2021"
[dependencies]
axum = { version = "0.5.5", features = ["multipart", "headers"] }
askama = { version = "0.11.1", default-features = false }
tokio = { version = "1.19.2", features = ["macros", "process", "rt-multi-thread", "io-util", "sync"] }
tokio = { version = "1.19.2", features = ["macros", "process", "rt-multi-thread", "io-util", "fs", "sync"] }
tracing = { version = "0.1.35", default-features = false, features = ["std", "release_max_level_info"] }
tracing-subscriber = { version = "0.3.11", features = ["env-filter"] }
tower-http = { version = "0.3.0", features = ["trace"] }
Expand Down
31 changes: 31 additions & 0 deletions src/data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// SPDX-FileCopyrightText: 2022 Profian Inc. <[email protected]>
// SPDX-License-Identifier: AGPL-3.0-only

use crate::jobs::Job;

#[derive(Debug, Default)]
pub struct Data {
job: Option<Job>,
}

impl Data {
pub fn new(job: Option<Job>) -> Self {
Self { job }
}

pub fn job(&self) -> &Option<Job> {
&self.job
}

pub fn job_mut(&mut self) -> Option<&mut Job> {
self.job.as_mut()
}

pub async fn kill_job(&mut self) {
if let Some(job) = &mut self.job {
job.kill().await;
}

self.job = None;
}
}
18 changes: 18 additions & 0 deletions src/jobs.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
// SPDX-FileCopyrightText: 2022 Profian Inc. <[email protected]>
// SPDX-License-Identifier: AGPL-3.0-only

use crate::ports;

use std::process::Stdio;
use std::str::FromStr;
use std::sync::atomic::AtomicUsize;
Expand Down Expand Up @@ -47,11 +52,16 @@ pub struct Job {
slug: Option<String>,
wasm: Option<NamedTempFile>,
toml: Option<NamedTempFile>,
reserved_ports: Vec<u16>,
}

impl Drop for Job {
fn drop(&mut self) {
COUNT.fetch_sub(1, Ordering::SeqCst);

if !self.reserved_ports.is_empty() {
error!("a job was not cleaned up correctly");
}
}
}

Expand All @@ -66,6 +76,7 @@ impl Job {
slug: Option<String>,
wasm: Option<NamedTempFile>,
toml: Option<NamedTempFile>,
reserved_ports: Vec<u16>,
) -> Result<Self, Response> {
let workload_type = WorkloadType::from_str(&workload_type).map_err(|e| {
debug!("Failed to parse workload type: {e}");
Expand Down Expand Up @@ -122,6 +133,7 @@ impl Job {
slug,
wasm,
toml,
reserved_ports,
})
}

Expand All @@ -131,4 +143,10 @@ impl Job {
Standard::Error => self.exec.stderr.as_mut().unwrap().read(buffer).await,
}
}

pub async fn kill(&mut self) {
let _ = self.exec.kill().await;
ports::free(&self.reserved_ports).await;
self.reserved_ports.clear();
}
}
148 changes: 125 additions & 23 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,22 @@
#![warn(clippy::all, rust_2018_idioms, unused_lifetimes)]

mod auth;
mod data;
mod jobs;
mod ports;
mod redirect;
mod reference;
mod secret;
mod templates;

use crate::data::Data;
use crate::reference::Ref;
use crate::templates::{HtmlTemplate, IdxTemplate, JobTemplate};

use std::fs::read;
use std::io::Write;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::ops::Range;
use std::time::Duration;

use axum::extract::{Multipart, Query};
Expand All @@ -32,9 +36,10 @@ use lazy_static::lazy_static;
use once_cell::sync::Lazy;
use reqwest::{Client, ClientBuilder};
use serde::Deserialize;
use tokio::fs::read_to_string;
use tokio::time::{sleep, timeout};
use tower_http::trace::TraceLayer;
use tracing::{debug, error, info};
use tracing::{debug, error, info, warn};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

static HTTP: Lazy<Client> = Lazy::new(|| {
Expand All @@ -53,11 +58,6 @@ lazy_static! {
.collect::<Vec<_>>();
}

#[derive(Debug, Default)]
struct Data {
job: Option<jobs::Job>,
}

/// Demo workload executor.
///
/// Any command-line options listed here may be specified by one or
Expand Down Expand Up @@ -97,6 +97,18 @@ struct Args {
#[clap(long, default_value_t = 15 * 60)]
timeout_starred: u64,

/// The lowest listen port allowed in an Enarx.toml.
#[clap(long, default_value_t = 0)]
port_min: u16,

/// The highest listen port allowed in an Enarx.toml.
#[clap(long, default_value_t = 0)]
port_max: u16,

/// The maximum number of listen ports a workload is allowed to have (0 to disable).
#[clap(long, default_value_t = 0)]
listen_max: u16,

/// Command to execute, normally path to `enarx` binary.
/// This command will be executed as: `<cmd> run --wasmcfgfile <path-to-config> <path-to-wasm>`
#[clap(long, default_value = "enarx")]
Expand Down Expand Up @@ -135,6 +147,16 @@ impl Args {
let other = Other {
addr: self.addr,
jobs: self.jobs,
port_range: match (self.port_min, self.port_max) {
(0, 0) => None,
(min, 0) => Some(min..u16::MAX),
(min, max) => Some(min..max),
},
listen_max: if self.listen_max == 0 {
None
} else {
Some(self.listen_max)
},
cmd: self.command,
};

Expand Down Expand Up @@ -185,6 +207,8 @@ impl Limits {
struct Other {
addr: SocketAddr,
jobs: usize,
port_range: Option<Range<u16>>,
listen_max: Option<u16>,
cmd: String,
}

Expand Down Expand Up @@ -245,7 +269,17 @@ async fn main() -> anyhow::Result<()> {
.route(
"/",
get(move |user| root_get(user, limits))
.post(move |user, mp| root_post(user, mp, other.cmd, limits, other.jobs))
.post(move |user, mp| {
root_post(
user,
mp,
other.cmd,
limits,
other.port_range,
other.listen_max,
other.jobs,
)
})
.delete(root_delete),
);

Expand All @@ -261,7 +295,7 @@ async fn root_get(user: Option<Ref<auth::User<Data>>>, limits: Limits) -> impl I
let (user, star) = match user {
None => (false, false),
Some(user) => {
if user.read().await.data.job.is_some() {
if user.read().await.data.job().is_some() {
return HtmlTemplate(JobTemplate).into_response();
}

Expand All @@ -288,13 +322,15 @@ async fn root_post(
mut multipart: Multipart,
command: String,
limits: Limits,
port_range: Option<Range<u16>>,
listen_max: Option<u16>,
jobs: usize,
) -> impl IntoResponse {
let star = user.read().await.is_starred("enarx/enarx").await;
let ttl = limits.time_to_live(star);
let size = limits.size(star);

if user.read().await.data.job.is_some() {
if user.read().await.data.job().is_some() {
return Err(Redirect::to("/").into_response());
}

Expand Down Expand Up @@ -413,21 +449,81 @@ async fn root_post(

let workload_type = workload_type.ok_or_else(|| StatusCode::BAD_REQUEST.into_response())?;

let enarx_config_string = match &toml {
Some(toml) => read_to_string(toml).await.map_err(|e| {
debug!("failed to read enarx config file: {e}");
StatusCode::INTERNAL_SERVER_ERROR.into_response()
})?,
None => {
let slug = slug
.as_ref()
.ok_or_else(|| StatusCode::BAD_REQUEST.into_response())?;
let (repo, tag) = slug
.split_once(':')
.ok_or_else(|| StatusCode::BAD_REQUEST.into_response())?;
get_enarx_config_from_drawbridge(repo, tag)
.await
.map_err(|e| {
debug!("failed to get toml from drawbridge with tag: {}: {e}", slug);
StatusCode::BAD_REQUEST.into_response()
})?
.text()
.await
.map_err(|e| {
debug!(
"failed to get toml body from drawbridge response: {}: {e}",
slug
);
StatusCode::BAD_REQUEST.into_response()
})?
}
};

let ports = ports::get_listen_ports(&enarx_config_string).map_err(|e| {
debug!("failed to get ports from enarx config: {e}");
StatusCode::BAD_REQUEST.into_response()
})?;

if let Some(listen_max) = listen_max {
// Check if the user is trying to listen on too many ports.
if ports.len() > listen_max as usize {
return Err(redirect::too_many_listeners(listen_max).into_response());
}
}

if let Some(port_range) = port_range {
// Check if the port is outside of the range of allowed ports
let illegal_ports = ports
.iter()
.filter(|port| !port_range.contains(port))
.cloned()
.collect::<Vec<_>>();

if !illegal_ports.is_empty() {
return Err(redirect::illegal_ports(&illegal_ports, port_range).into_response());
}
}

// Check if a port is already in use by another running workload
ports::try_reserve(&ports)
.await
.map_err(|port_conflicts| redirect::port_conflicts(&port_conflicts).into_response())?;

// Create the new job and get an identifier.
let uuid = {
let mut lock = user.write().await;

if lock.data.job.is_some() {
if lock.data.job().is_some() {
return Err(Redirect::to("/").into_response());
}

if jobs::Job::count() >= jobs {
return Err(redirect::too_many_workloads().into_response());
}

let job = jobs::Job::new(command, workload_type, slug, wasm, toml)?;
let job = jobs::Job::new(command, workload_type, slug, wasm, toml, ports)?;
let uuid = job.uuid;
lock.data = Data { job: Some(job) };
lock.data = Data::new(Some(job));
uuid
};

Expand All @@ -439,8 +535,8 @@ async fn root_post(
if let Some(user) = weak.upgrade() {
debug!("timeout for: {}", uuid);
let mut lock = user.write().await;
if lock.data.job.as_ref().map(|j| j.uuid) == Some(uuid) {
lock.data.job = None;
if lock.data.job().as_ref().map(|j| j.uuid) == Some(uuid) {
lock.data.kill_job().await;
}
}
});
Expand All @@ -459,17 +555,23 @@ struct EnarxTomlFallbackParams {
tag: String,
}

async fn get_enarx_config_from_drawbridge(
repo: &str,
tag: &str,
) -> Result<reqwest::Response, reqwest::Error> {
HTTP.get(&format!(
"https://store.profian.com/api/v0.2.0/{repo}/_tag/{tag}/tree/Enarx.toml"
))
.send()
.await
}

async fn enarx_toml_fallback(
_user: Ref<auth::User<Data>>,
Query(params): Query<EnarxTomlFallbackParams>,
) -> Result<String, (StatusCode, String)> {
let EnarxTomlFallbackParams { repo, tag } = params;
let response = HTTP
.get(&format!(
"https://store.profian.com/api/v0.2.0/{repo}/_tag/{tag}/tree/Enarx.toml"
))
.send()
.await;
let response = get_enarx_config_from_drawbridge(&repo, &tag).await;
let response = response.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Expand Down Expand Up @@ -497,9 +599,9 @@ async fn enarx_toml_fallback(
async fn root_delete(user: Ref<auth::User<Data>>) -> StatusCode {
let mut lock = user.write().await;

if let Some(uuid) = lock.data.job.as_ref().map(|j| j.uuid) {
if let Some(uuid) = lock.data.job().as_ref().map(|j| j.uuid) {
debug!("killing: {}", uuid);
lock.data.job = None;
lock.data.kill_job().await;
}

StatusCode::OK
Expand All @@ -508,7 +610,7 @@ async fn root_delete(user: Ref<auth::User<Data>>) -> StatusCode {
async fn reader(user: Ref<auth::User<Data>>, kind: jobs::Standard) -> Result<Vec<u8>, StatusCode> {
let mut buf = [0; 4096];

match user.write().await.data.job.as_mut() {
match user.write().await.data.job_mut() {
None => Err(StatusCode::NOT_FOUND),
Some(job) => {
let future = job.read(kind, &mut buf);
Expand Down
Loading

0 comments on commit 278b8a9

Please sign in to comment.