Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Global process registry - resolves #127 #194

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/lunatic-control-axum/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ license = "Apache-2.0/MIT"

[dependencies]
lunatic-distributed = { workspace = true }
lunatic-process = { workspace = true }

anyhow = { workspace = true }
axum = { version = "0.6", features = ["json", "query", "macros" ] }
Expand Down
5 changes: 5 additions & 0 deletions crates/lunatic-control-axum/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub enum ApiError {
InvalidData(String),
InvalidPathArg(String),
InvalidQueryArg(String),
ProcessNotFound,
Custom {
code: &'static str,
message: Option<String>,
Expand All @@ -42,6 +43,7 @@ impl ApiError {
ApiError::InvalidData(_) => "invalid_data",
ApiError::InvalidPathArg(_) => "invalid_path_arg",
ApiError::InvalidQueryArg(_) => "invalid_query_arg",
ApiError::ProcessNotFound => "process_not_found",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are more HTTP errors, so we could simply add NotFound(String) and put info "Process not found" in it. It's already obvious from the context what HTTP 404 means. Maybe we don't even need the message, just NotFound

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to go with no-arg NotFound for the time being.

ApiError::Custom { code, .. } => code,
}
}
Expand All @@ -51,6 +53,7 @@ impl ApiError {
ApiError::Internal => "".into(),
ApiError::NotAuthenticated => "Not authenticated".into(),
ApiError::NotAuthorized => "Not authorized".into(),
ApiError::ProcessNotFound => "No such process".into(),
ApiError::InvalidData(msg) => msg.clone(),
ApiError::InvalidPathArg(msg) => msg.clone(),
ApiError::InvalidQueryArg(msg) => msg.clone(),
Expand Down Expand Up @@ -106,6 +109,7 @@ impl IntoResponse for ApiError {
Self::Internal => S::INTERNAL_SERVER_ERROR,
Self::NotAuthenticated => S::UNAUTHORIZED,
Self::NotAuthorized => S::FORBIDDEN,
Self::ProcessNotFound => S::NOT_FOUND,
InvalidData(_) | InvalidPathArg(_) | InvalidQueryArg(_) | Custom { .. } => {
S::BAD_REQUEST
}
Expand All @@ -115,6 +119,7 @@ impl IntoResponse for ApiError {
}
}

impl std::error::Error for ApiError { }
pub struct JsonExtractor<T>(pub T);

#[async_trait]
Expand Down
52 changes: 51 additions & 1 deletion crates/lunatic-control-axum/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ use std::sync::Arc;
use axum::{
body::Bytes,
extract::DefaultBodyLimit,
routing::{get, post},
routing::{get, post, delete},
Extension, Json, Router,
};
use lunatic_distributed::{
control::{api::*, cert::TEST_ROOT_CERT},
NodeInfo,
};
use lunatic_process::{ProcessName, ProcessRecord};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm not sure we should require lunatic-process crate just to get a string and node/process id. I think it's ok for the API to simply use String and define it's own {node_id, process_id} struct.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, we have merged submilisecond impl of ctrl srv which has lunatic-control crate with only types. We could add those types there...

use rcgen::CertificateSigningRequest;
use tower_http::limit::RequestBodyLimitLayer;

Expand Down Expand Up @@ -50,6 +51,9 @@ pub async fn register(
get_module: format!("http://{host}/module/{{id}}"),
add_module: format!("http://{host}/module"),
get_nodes: format!("http://{host}/nodes"),
get_process: format!("http://{host}/process/{{name}}"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, we should check how path parms in axum work regarding URL encoding https://en.wikipedia.org/wiki/URL_encoding and also names cannot contain / etc. hmm

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that it can contain / as long as it's properly url-encoded.

I'll try and write a test for that.

add_process: format!("http://{host}/process/{{name}}"),
remove_process: format!("http://{host}/process/{{name}}"),
},
})
}
Expand Down Expand Up @@ -141,6 +145,49 @@ pub async fn get_module(
ok(ModuleBytes { bytes })
}

pub async fn get_process(
node_auth: NodeAuth,
PathExtractor(name): PathExtractor<ProcessName>,
control: Extension<Arc<ControlServer>>,
) -> ApiResponse<ProcessRecord> {
log::info!("Node {} get_process {}", node_auth.node_name, name);

let process = control
.processes
.get(&name)
.ok_or(ApiError::ProcessNotFound)?;
Yoric marked this conversation as resolved.
Show resolved Hide resolved

ok(process.value().clone())
}

pub async fn remove_process(
node_auth: NodeAuth,
PathExtractor(name): PathExtractor<ProcessName>,
control: Extension<Arc<ControlServer>>,
) -> ApiResponse<ProcessRecord> {
log::info!("Node {} remove_process {}", node_auth.node_name, name);

let process = control.processes
.remove(&name)
.ok_or(ApiError::ProcessNotFound)?
.1;

ok(process)
}

pub async fn add_process(
node_auth: NodeAuth,
control: Extension<Arc<ControlServer>>,
PathExtractor(name): PathExtractor<ProcessName>,
JsonExtractor(details): JsonExtractor<ProcessRecord>,
) -> ApiResponse<Option<ProcessRecord>> {
log::info!("Node {} add_process {}", node_auth.node_name, name);

let previous = control.processes.insert(name, details);

ok(previous)
}

pub fn init_routes() -> Router {
Router::new()
.route("/", post(register))
Expand All @@ -149,6 +196,9 @@ pub fn init_routes() -> Router {
.route("/nodes", get(list_nodes))
.route("/module", post(add_module))
.route("/module/:id", get(get_module))
.route("/process/:name", get(get_process))
.route("/process/:name", post(add_process))
.route("/process/:name", delete(remove_process))
.layer(DefaultBodyLimit::disable())
.layer(RequestBodyLimitLayer::new(50 * 1024 * 1024)) // 50 mb
}
4 changes: 4 additions & 0 deletions crates/lunatic-control-axum/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use axum::{Extension, Router};
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use lunatic_distributed::control::api::{NodeStart, Register};
use lunatic_process::{ProcessName, ProcessRecord};
use rcgen::Certificate;
use uuid::Uuid;

Expand All @@ -33,12 +34,14 @@ pub struct NodeDetails {
pub attributes: serde_json::Value,
}


pub struct ControlServer {
pub ca_cert: Certificate,
pub quic_client: lunatic_distributed::quic::Client,
pub registrations: DashMap<u64, Registered>,
pub nodes: DashMap<u64, NodeDetails>,
pub modules: DashMap<u64, Vec<u8>>,
pub(crate) processes: DashMap<ProcessName, ProcessRecord>,
next_registration_id: AtomicU64,
next_node_id: AtomicU64,
next_module_id: AtomicU64,
Expand All @@ -52,6 +55,7 @@ impl ControlServer {
registrations: DashMap::new(),
nodes: DashMap::new(),
modules: DashMap::new(),
processes: DashMap::new(),
next_registration_id: AtomicU64::new(1),
next_node_id: AtomicU64::new(1),
next_module_id: AtomicU64::new(1),
Expand Down
1 change: 1 addition & 0 deletions crates/lunatic-distributed-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ license = "Apache-2.0/MIT"

[dependencies]
lunatic-common-api = { workspace = true }
lunatic-control-axum = { workspace = true }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, no, if you need some types that are used both in control axum and here, now you have lunatic-control which should keep types in-common.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason for this dependency is so that I can use ApiError. Should I move ApiError to lunatic-control? If so, I suspect that I will need to introduce from lunatic-control to axum, is that alright?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you don't need them. ApiError is an internal thing, the external interface is HTTP. In the HTTP client you can just match code strings and HTTP statues which are meaningful to handle. Otherwise, you just report error.

lunatic-distributed = { workspace = true }
lunatic-error-api = { workspace = true }
lunatic-process = { workspace = true }
Expand Down
159 changes: 157 additions & 2 deletions crates/lunatic-distributed-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use lunatic_distributed::{
use lunatic_error_api::ErrorCtx;
use lunatic_process::{
env::Environment,
message::{DataMessage, Message},
message::{DataMessage, Message}, ProcessId, ProcessRecord,
};
use lunatic_process_api::ProcessCtx;
use tokio::time::timeout;
Expand All @@ -18,7 +18,7 @@ use wasmtime::{Caller, Linker, ResourceLimiter};
// Register the lunatic distributed APIs to the linker
pub fn register<T, E>(linker: &mut Linker<T>) -> Result<()>
where
T: DistributedCtx<E> + ProcessCtx<T> + Send + ResourceLimiter + ErrorCtx + 'static,
T: DistributedCtx<E> + ProcessCtx<T> + Send + Sync + ResourceLimiter + ErrorCtx + 'static,
E: Environment + 'static,
for<'a> &'a T: Send,
{
Expand All @@ -43,6 +43,11 @@ where
"copy_lookup_nodes_results",
copy_lookup_nodes_results,
)?;

linker.func_wrap4_async("lunatic::distributed", "get", get_process)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's put them then in namespace lunatic::distributed::registry

linker.func_wrap4_async("lunatic::distributed", "put", put_process)?;
linker.func_wrap2_async("lunatic::distributed", "remove", remove_process)?;

Ok(())
}

Expand Down Expand Up @@ -498,3 +503,153 @@ where
{
caller.data().module_id()
}

// Looks up process under `name` and returns 0 if it was found or 1 if not found.
//
// Traps:
// * If any memory outside the guest heap space is referenced.
fn get_process<T, E>(
mut caller: Caller<T>,
name_str_ptr: u32,
name_str_len: u32,
node_id_ptr: u32,
process_id_ptr: u32,
) -> Box<dyn Future<Output = Result<u32>> + Send + '_>
where T : DistributedCtx<E> + Send + Sync,
E : Environment
{
Box::new(async move {
// Extract process name from memory.
let memory = get_memory(&mut caller)?;
let (memory_slice, state) = memory.data_and_store_mut(&mut caller);
let name = memory_slice
.get(name_str_ptr as usize..(name_str_ptr + name_str_len) as usize)
.or_trap("lunatic::distributed::get")?;
let name = std::str::from_utf8(name).or_trap("lunatic::distributed::get")?;

// Sanity check
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

if state.registry_atomic_put().is_some() {
return Err(anyhow!(
"calling `lunatic::distributed::get` after `get_or_put_later` will deadlock"
));
}

#[cfg(feature = "metrics")]
metrics::increment_counter!("lunatic.registry.read");

// Lookup in distributed registry.
let distributed = state.distributed()
.or_trap("lunatic::distributed::get")?;
let Ok(record) = distributed
.control
.get_process(name)
.await
else {
return Ok(1);
};

memory
.write(&mut caller, node_id_ptr as usize, &record.node_id().to_le_bytes())
.or_trap("lunatic::distributed::get")?;

memory
.write(
&mut caller,
process_id_ptr as usize,
&record.process_id().to_le_bytes(),
)
.or_trap("lunatic::distributed::get")?;
Ok(0)
})
}

// Registers process with ID under `name`.
//
// Traps:
// * If the process ID doesn't exist.
// * If any memory outside the guest heap space is referenced.
// * If the remote call fails.
fn put_process<E, T>(
mut caller: Caller<T>,
name_str_ptr: u32,
name_str_len: u32,
node_id: u64,
process_id: u64,
) -> Box<dyn Future<Output = Result<()>> + Send + '_>
where T : DistributedCtx<E> + Send + Sync,
E : Environment
{
Box::new(async move {
let memory = get_memory(&mut caller)?;
let (memory_slice, state) = memory.data_and_store_mut(&mut caller);
let name = memory_slice
.get(name_str_ptr as usize..(name_str_ptr + name_str_len) as usize)
.or_trap("lunatic::distributed::put")?;
let name = std::str::from_utf8(name).or_trap("lunatic::distributed::put")?;

// Store in the distributed registry.
let distributed = state.distributed().or_trap("lunatic::distributed::put")?;
let record = ProcessRecord::new(node_id, ProcessId::new(process_id));
distributed.control.add_process(name, record).await
.or_trap("lunatic::distributed::put")?;

#[cfg(feature = "metrics")]
metrics::increment_counter!("lunatic.distributed.write");

#[cfg(feature = "metrics")]
metrics::increment_gauge!("lunatic.distributed.registered", 1.0);

Ok(())
})
}

// Removes process under `name` if it exists, returns 0 if it was found or 1 if not found.
//
// Traps:
// * If any memory outside the guest heap space is referenced.
// * If the remote call fails.
fn remove_process<E, T>(
mut caller: Caller<T>,
name_str_ptr: u32,
name_str_len: u32,
) -> Box<dyn Future<Output = Result<u32>> + Send + '_>
where T : DistributedCtx<E> + Send + Sync,
E : Environment
{
Box::new(async move {
let memory = get_memory(&mut caller)?;
let (memory_slice, state) = memory.data_and_store_mut(&mut caller);
let name = memory_slice
.get(name_str_ptr as usize..(name_str_ptr + name_str_len) as usize)
.or_trap("lunatic::distributed::get")?;
let name = std::str::from_utf8(name).or_trap("lunatic::distributed::get")?;

// Sanity check
if state.registry_atomic_put().is_some() {
return Err(anyhow!(
"calling `lunatic::distributed::remove` after `get_or_put_later` will deadlock"
));
}

// Remove globally.
let distributed = state.distributed().or_trap("lunatic::distributed::remove_process")?;
if let Err(err) = distributed.control.remove_process(name).await {
// A double removal is not considered an error.
for error in err.chain() {
if let Some(&lunatic_control_axum::api::ApiError::ProcessNotFound) = error.downcast_ref() {
return Ok(1);
}
}
// Other errors are.
return Err(err).or_trap("lunatic::distributed::remove")?
}

#[cfg(feature = "metrics")]
metrics::increment_counter!("lunatic.distributed.deletion");

#[cfg(feature = "metrics")]
metrics::decrement_gauge!("lunatic.distributed.registered", 1.0);

Ok(0)
})
}
6 changes: 6 additions & 0 deletions crates/lunatic-distributed/src/control/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ pub struct ControlUrls {
pub get_module: String,
pub add_module: String,
pub get_nodes: String,

/// Get a process
pub get_process: String,
pub add_process: String,
pub remove_process: String,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
Expand All @@ -47,6 +52,7 @@ pub struct NodesList {
}

#[derive(Clone, Debug, Serialize, Deserialize)]
/// The binary for a wasm module.
pub struct ModuleBytes {
pub bytes: Vec<u8>,
}
Expand Down
Loading