-
Notifications
You must be signed in to change notification settings - Fork 136
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft: Global process registry - resolves #127 #194
base: main
Are you sure you want to change the base?
Changes from all commits
e7b5da3
8b301a3
263f718
1d726d6
0e8b9f7
b8a3c61
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,13 +3,14 @@ use std::sync::Arc; | |
use axum::{ | ||
body::Bytes, | ||
extract::DefaultBodyLimit, | ||
routing::{get, post}, | ||
routing::{get, post, delete}, | ||
Extension, Json, Router, | ||
}; | ||
use lunatic_distributed::{ | ||
control::{api::*, cert::TEST_ROOT_CERT}, | ||
NodeInfo, | ||
}; | ||
use lunatic_process::{ProcessName, ProcessRecord}; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm not sure we should require lunatic-process crate just to get a string and node/process id. I think it's ok for the API to simply use String and define it's own {node_id, process_id} struct. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, we have merged submilisecond impl of ctrl srv which has lunatic-control crate with only types. We could add those types there... |
||
use rcgen::CertificateSigningRequest; | ||
use tower_http::limit::RequestBodyLimitLayer; | ||
|
||
|
@@ -50,6 +51,9 @@ pub async fn register( | |
get_module: format!("http://{host}/module/{{id}}"), | ||
add_module: format!("http://{host}/module"), | ||
get_nodes: format!("http://{host}/nodes"), | ||
get_process: format!("http://{host}/process/{{name}}"), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. By the way, we should check how path parms in axum work regarding URL encoding https://en.wikipedia.org/wiki/URL_encoding and also names cannot contain / etc. hmm There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that it can contain I'll try and write a test for that. |
||
add_process: format!("http://{host}/process/{{name}}"), | ||
remove_process: format!("http://{host}/process/{{name}}"), | ||
}, | ||
}) | ||
} | ||
|
@@ -141,6 +145,49 @@ pub async fn get_module( | |
ok(ModuleBytes { bytes }) | ||
} | ||
|
||
pub async fn get_process( | ||
node_auth: NodeAuth, | ||
PathExtractor(name): PathExtractor<ProcessName>, | ||
control: Extension<Arc<ControlServer>>, | ||
) -> ApiResponse<ProcessRecord> { | ||
log::info!("Node {} get_process {}", node_auth.node_name, name); | ||
|
||
let process = control | ||
.processes | ||
.get(&name) | ||
.ok_or(ApiError::ProcessNotFound)?; | ||
Yoric marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
ok(process.value().clone()) | ||
} | ||
|
||
pub async fn remove_process( | ||
node_auth: NodeAuth, | ||
PathExtractor(name): PathExtractor<ProcessName>, | ||
control: Extension<Arc<ControlServer>>, | ||
) -> ApiResponse<ProcessRecord> { | ||
log::info!("Node {} remove_process {}", node_auth.node_name, name); | ||
|
||
let process = control.processes | ||
.remove(&name) | ||
.ok_or(ApiError::ProcessNotFound)? | ||
.1; | ||
|
||
ok(process) | ||
} | ||
|
||
pub async fn add_process( | ||
node_auth: NodeAuth, | ||
control: Extension<Arc<ControlServer>>, | ||
PathExtractor(name): PathExtractor<ProcessName>, | ||
JsonExtractor(details): JsonExtractor<ProcessRecord>, | ||
) -> ApiResponse<Option<ProcessRecord>> { | ||
log::info!("Node {} add_process {}", node_auth.node_name, name); | ||
|
||
let previous = control.processes.insert(name, details); | ||
|
||
ok(previous) | ||
} | ||
|
||
pub fn init_routes() -> Router { | ||
Router::new() | ||
.route("/", post(register)) | ||
|
@@ -149,6 +196,9 @@ pub fn init_routes() -> Router { | |
.route("/nodes", get(list_nodes)) | ||
.route("/module", post(add_module)) | ||
.route("/module/:id", get(get_module)) | ||
.route("/process/:name", get(get_process)) | ||
.route("/process/:name", post(add_process)) | ||
.route("/process/:name", delete(remove_process)) | ||
.layer(DefaultBodyLimit::disable()) | ||
.layer(RequestBodyLimitLayer::new(50 * 1024 * 1024)) // 50 mb | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,7 @@ license = "Apache-2.0/MIT" | |
|
||
[dependencies] | ||
lunatic-common-api = { workspace = true } | ||
lunatic-control-axum = { workspace = true } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, no, if you need some types that are used both in control axum and here, now you have lunatic-control which should keep types in-common. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The reason for this dependency is so that I can use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you don't need them. ApiError is an internal thing, the external interface is HTTP. In the HTTP client you can just match code strings and HTTP statues which are meaningful to handle. Otherwise, you just report error. |
||
lunatic-distributed = { workspace = true } | ||
lunatic-error-api = { workspace = true } | ||
lunatic-process = { workspace = true } | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,7 +9,7 @@ use lunatic_distributed::{ | |
use lunatic_error_api::ErrorCtx; | ||
use lunatic_process::{ | ||
env::Environment, | ||
message::{DataMessage, Message}, | ||
message::{DataMessage, Message}, ProcessId, ProcessRecord, | ||
}; | ||
use lunatic_process_api::ProcessCtx; | ||
use tokio::time::timeout; | ||
|
@@ -18,7 +18,7 @@ use wasmtime::{Caller, Linker, ResourceLimiter}; | |
// Register the lunatic distributed APIs to the linker | ||
pub fn register<T, E>(linker: &mut Linker<T>) -> Result<()> | ||
where | ||
T: DistributedCtx<E> + ProcessCtx<T> + Send + ResourceLimiter + ErrorCtx + 'static, | ||
T: DistributedCtx<E> + ProcessCtx<T> + Send + Sync + ResourceLimiter + ErrorCtx + 'static, | ||
E: Environment + 'static, | ||
for<'a> &'a T: Send, | ||
{ | ||
|
@@ -43,6 +43,11 @@ where | |
"copy_lookup_nodes_results", | ||
copy_lookup_nodes_results, | ||
)?; | ||
|
||
linker.func_wrap4_async("lunatic::distributed", "get", get_process)?; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's put them then in namespace |
||
linker.func_wrap4_async("lunatic::distributed", "put", put_process)?; | ||
linker.func_wrap2_async("lunatic::distributed", "remove", remove_process)?; | ||
|
||
Ok(()) | ||
} | ||
|
||
|
@@ -498,3 +503,153 @@ where | |
{ | ||
caller.data().module_id() | ||
} | ||
|
||
// Looks up process under `name` and returns 0 if it was found or 1 if not found. | ||
// | ||
// Traps: | ||
// * If any memory outside the guest heap space is referenced. | ||
fn get_process<T, E>( | ||
mut caller: Caller<T>, | ||
name_str_ptr: u32, | ||
name_str_len: u32, | ||
node_id_ptr: u32, | ||
process_id_ptr: u32, | ||
) -> Box<dyn Future<Output = Result<u32>> + Send + '_> | ||
where T : DistributedCtx<E> + Send + Sync, | ||
E : Environment | ||
{ | ||
Box::new(async move { | ||
// Extract process name from memory. | ||
let memory = get_memory(&mut caller)?; | ||
let (memory_slice, state) = memory.data_and_store_mut(&mut caller); | ||
let name = memory_slice | ||
.get(name_str_ptr as usize..(name_str_ptr + name_str_len) as usize) | ||
.or_trap("lunatic::distributed::get")?; | ||
let name = std::str::from_utf8(name).or_trap("lunatic::distributed::get")?; | ||
|
||
// Sanity check | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ? |
||
if state.registry_atomic_put().is_some() { | ||
return Err(anyhow!( | ||
"calling `lunatic::distributed::get` after `get_or_put_later` will deadlock" | ||
)); | ||
} | ||
|
||
#[cfg(feature = "metrics")] | ||
metrics::increment_counter!("lunatic.registry.read"); | ||
|
||
// Lookup in distributed registry. | ||
let distributed = state.distributed() | ||
.or_trap("lunatic::distributed::get")?; | ||
let Ok(record) = distributed | ||
.control | ||
.get_process(name) | ||
.await | ||
else { | ||
return Ok(1); | ||
}; | ||
|
||
memory | ||
.write(&mut caller, node_id_ptr as usize, &record.node_id().to_le_bytes()) | ||
.or_trap("lunatic::distributed::get")?; | ||
|
||
memory | ||
.write( | ||
&mut caller, | ||
process_id_ptr as usize, | ||
&record.process_id().to_le_bytes(), | ||
) | ||
.or_trap("lunatic::distributed::get")?; | ||
Ok(0) | ||
}) | ||
} | ||
|
||
// Registers process with ID under `name`. | ||
// | ||
// Traps: | ||
// * If the process ID doesn't exist. | ||
// * If any memory outside the guest heap space is referenced. | ||
// * If the remote call fails. | ||
fn put_process<E, T>( | ||
mut caller: Caller<T>, | ||
name_str_ptr: u32, | ||
name_str_len: u32, | ||
node_id: u64, | ||
process_id: u64, | ||
) -> Box<dyn Future<Output = Result<()>> + Send + '_> | ||
where T : DistributedCtx<E> + Send + Sync, | ||
E : Environment | ||
{ | ||
Box::new(async move { | ||
let memory = get_memory(&mut caller)?; | ||
let (memory_slice, state) = memory.data_and_store_mut(&mut caller); | ||
let name = memory_slice | ||
.get(name_str_ptr as usize..(name_str_ptr + name_str_len) as usize) | ||
.or_trap("lunatic::distributed::put")?; | ||
let name = std::str::from_utf8(name).or_trap("lunatic::distributed::put")?; | ||
|
||
// Store in the distributed registry. | ||
let distributed = state.distributed().or_trap("lunatic::distributed::put")?; | ||
let record = ProcessRecord::new(node_id, ProcessId::new(process_id)); | ||
distributed.control.add_process(name, record).await | ||
.or_trap("lunatic::distributed::put")?; | ||
|
||
#[cfg(feature = "metrics")] | ||
metrics::increment_counter!("lunatic.distributed.write"); | ||
|
||
#[cfg(feature = "metrics")] | ||
metrics::increment_gauge!("lunatic.distributed.registered", 1.0); | ||
|
||
Ok(()) | ||
}) | ||
} | ||
|
||
// Removes process under `name` if it exists, returns 0 if it was found or 1 if not found. | ||
// | ||
// Traps: | ||
// * If any memory outside the guest heap space is referenced. | ||
// * If the remote call fails. | ||
fn remove_process<E, T>( | ||
mut caller: Caller<T>, | ||
name_str_ptr: u32, | ||
name_str_len: u32, | ||
) -> Box<dyn Future<Output = Result<u32>> + Send + '_> | ||
where T : DistributedCtx<E> + Send + Sync, | ||
E : Environment | ||
{ | ||
Box::new(async move { | ||
let memory = get_memory(&mut caller)?; | ||
let (memory_slice, state) = memory.data_and_store_mut(&mut caller); | ||
let name = memory_slice | ||
.get(name_str_ptr as usize..(name_str_ptr + name_str_len) as usize) | ||
.or_trap("lunatic::distributed::get")?; | ||
let name = std::str::from_utf8(name).or_trap("lunatic::distributed::get")?; | ||
|
||
// Sanity check | ||
if state.registry_atomic_put().is_some() { | ||
return Err(anyhow!( | ||
"calling `lunatic::distributed::remove` after `get_or_put_later` will deadlock" | ||
)); | ||
} | ||
|
||
// Remove globally. | ||
let distributed = state.distributed().or_trap("lunatic::distributed::remove_process")?; | ||
if let Err(err) = distributed.control.remove_process(name).await { | ||
// A double removal is not considered an error. | ||
for error in err.chain() { | ||
if let Some(&lunatic_control_axum::api::ApiError::ProcessNotFound) = error.downcast_ref() { | ||
return Ok(1); | ||
} | ||
} | ||
// Other errors are. | ||
return Err(err).or_trap("lunatic::distributed::remove")? | ||
} | ||
|
||
#[cfg(feature = "metrics")] | ||
metrics::increment_counter!("lunatic.distributed.deletion"); | ||
|
||
#[cfg(feature = "metrics")] | ||
metrics::decrement_gauge!("lunatic.distributed.registered", 1.0); | ||
|
||
Ok(0) | ||
}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are more HTTP errors, so we could simply add
NotFound(String)
and put info "Process not found" in it. It's already obvious from the context what HTTP 404 means. Maybe we don't even need the message, justNotFound
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm going to go with no-arg
NotFound
for the time being.