diff --git a/Cargo.lock b/Cargo.lock index f4a05346e..7aa23effc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1429,6 +1429,7 @@ dependencies = [ "http", "log", "lunatic-distributed", + "lunatic-process", "rcgen", "serde", "serde_json", @@ -1466,6 +1467,7 @@ dependencies = [ "anyhow", "log", "lunatic-common-api", + "lunatic-control-axum", "lunatic-distributed", "lunatic-error-api", "lunatic-process", diff --git a/crates/lunatic-control-axum/Cargo.toml b/crates/lunatic-control-axum/Cargo.toml index 7d82dab55..f7fc0d691 100644 --- a/crates/lunatic-control-axum/Cargo.toml +++ b/crates/lunatic-control-axum/Cargo.toml @@ -9,6 +9,7 @@ license = "Apache-2.0/MIT" [dependencies] lunatic-distributed = { workspace = true } +lunatic-process = { workspace = true } anyhow = { workspace = true } axum = { version = "0.6", features = ["json", "query", "macros" ] } diff --git a/crates/lunatic-control-axum/src/api.rs b/crates/lunatic-control-axum/src/api.rs index bf117907e..acf3c4a6c 100644 --- a/crates/lunatic-control-axum/src/api.rs +++ b/crates/lunatic-control-axum/src/api.rs @@ -27,6 +27,7 @@ pub enum ApiError { InvalidData(String), InvalidPathArg(String), InvalidQueryArg(String), + ProcessNotFound, Custom { code: &'static str, message: Option, @@ -42,6 +43,7 @@ impl ApiError { ApiError::InvalidData(_) => "invalid_data", ApiError::InvalidPathArg(_) => "invalid_path_arg", ApiError::InvalidQueryArg(_) => "invalid_query_arg", + ApiError::ProcessNotFound => "process_not_found", ApiError::Custom { code, .. } => code, } } @@ -51,6 +53,7 @@ impl ApiError { ApiError::Internal => "".into(), ApiError::NotAuthenticated => "Not authenticated".into(), ApiError::NotAuthorized => "Not authorized".into(), + ApiError::ProcessNotFound => "No such process".into(), ApiError::InvalidData(msg) => msg.clone(), ApiError::InvalidPathArg(msg) => msg.clone(), ApiError::InvalidQueryArg(msg) => msg.clone(), @@ -106,6 +109,7 @@ impl IntoResponse for ApiError { Self::Internal => S::INTERNAL_SERVER_ERROR, Self::NotAuthenticated => S::UNAUTHORIZED, Self::NotAuthorized => S::FORBIDDEN, + Self::ProcessNotFound => S::NOT_FOUND, InvalidData(_) | InvalidPathArg(_) | InvalidQueryArg(_) | Custom { .. } => { S::BAD_REQUEST } @@ -115,6 +119,7 @@ impl IntoResponse for ApiError { } } +impl std::error::Error for ApiError { } pub struct JsonExtractor(pub T); #[async_trait] diff --git a/crates/lunatic-control-axum/src/routes.rs b/crates/lunatic-control-axum/src/routes.rs index 943071aac..94cd836c6 100644 --- a/crates/lunatic-control-axum/src/routes.rs +++ b/crates/lunatic-control-axum/src/routes.rs @@ -3,13 +3,14 @@ use std::sync::Arc; use axum::{ body::Bytes, extract::DefaultBodyLimit, - routing::{get, post}, + routing::{get, post, delete}, Extension, Json, Router, }; use lunatic_distributed::{ control::{api::*, cert::TEST_ROOT_CERT}, NodeInfo, }; +use lunatic_process::{ProcessName, ProcessRecord}; use rcgen::CertificateSigningRequest; use tower_http::limit::RequestBodyLimitLayer; @@ -50,6 +51,9 @@ pub async fn register( get_module: format!("http://{host}/module/{{id}}"), add_module: format!("http://{host}/module"), get_nodes: format!("http://{host}/nodes"), + get_process: format!("http://{host}/process/{{name}}"), + add_process: format!("http://{host}/process/{{name}}"), + remove_process: format!("http://{host}/process/{{name}}"), }, }) } @@ -141,6 +145,49 @@ pub async fn get_module( ok(ModuleBytes { bytes }) } +pub async fn get_process( + node_auth: NodeAuth, + PathExtractor(name): PathExtractor, + control: Extension>, +) -> ApiResponse { + log::info!("Node {} get_process {}", node_auth.node_name, name); + + let process = control + .processes + .get(&name) + .ok_or(ApiError::ProcessNotFound)?; + + ok(process.value().clone()) +} + +pub async fn remove_process( + node_auth: NodeAuth, + PathExtractor(name): PathExtractor, + control: Extension>, +) -> ApiResponse { + log::info!("Node {} remove_process {}", node_auth.node_name, name); + + let process = control.processes + .remove(&name) + .ok_or(ApiError::ProcessNotFound)? + .1; + + ok(process) +} + +pub async fn add_process( + node_auth: NodeAuth, + control: Extension>, + PathExtractor(name): PathExtractor, + JsonExtractor(details): JsonExtractor, +) -> ApiResponse> { + log::info!("Node {} add_process {}", node_auth.node_name, name); + + let previous = control.processes.insert(name, details); + + ok(previous) +} + pub fn init_routes() -> Router { Router::new() .route("/", post(register)) @@ -149,6 +196,9 @@ pub fn init_routes() -> Router { .route("/nodes", get(list_nodes)) .route("/module", post(add_module)) .route("/module/:id", get(get_module)) + .route("/process/:name", get(get_process)) + .route("/process/:name", post(add_process)) + .route("/process/:name", delete(remove_process)) .layer(DefaultBodyLimit::disable()) .layer(RequestBodyLimitLayer::new(50 * 1024 * 1024)) // 50 mb } diff --git a/crates/lunatic-control-axum/src/server.rs b/crates/lunatic-control-axum/src/server.rs index 9ba342fb5..d5bc63d4a 100644 --- a/crates/lunatic-control-axum/src/server.rs +++ b/crates/lunatic-control-axum/src/server.rs @@ -11,6 +11,7 @@ use axum::{Extension, Router}; use chrono::{DateTime, Utc}; use dashmap::DashMap; use lunatic_distributed::control::api::{NodeStart, Register}; +use lunatic_process::{ProcessName, ProcessRecord}; use rcgen::Certificate; use uuid::Uuid; @@ -33,12 +34,14 @@ pub struct NodeDetails { pub attributes: serde_json::Value, } + pub struct ControlServer { pub ca_cert: Certificate, pub quic_client: lunatic_distributed::quic::Client, pub registrations: DashMap, pub nodes: DashMap, pub modules: DashMap>, + pub(crate) processes: DashMap, next_registration_id: AtomicU64, next_node_id: AtomicU64, next_module_id: AtomicU64, @@ -52,6 +55,7 @@ impl ControlServer { registrations: DashMap::new(), nodes: DashMap::new(), modules: DashMap::new(), + processes: DashMap::new(), next_registration_id: AtomicU64::new(1), next_node_id: AtomicU64::new(1), next_module_id: AtomicU64::new(1), diff --git a/crates/lunatic-distributed-api/Cargo.toml b/crates/lunatic-distributed-api/Cargo.toml index 2e630abda..cf2b015d1 100644 --- a/crates/lunatic-distributed-api/Cargo.toml +++ b/crates/lunatic-distributed-api/Cargo.toml @@ -9,6 +9,7 @@ license = "Apache-2.0/MIT" [dependencies] lunatic-common-api = { workspace = true } +lunatic-control-axum = { workspace = true } lunatic-distributed = { workspace = true } lunatic-error-api = { workspace = true } lunatic-process = { workspace = true } diff --git a/crates/lunatic-distributed-api/src/lib.rs b/crates/lunatic-distributed-api/src/lib.rs index 0e235b119..da2ee5264 100644 --- a/crates/lunatic-distributed-api/src/lib.rs +++ b/crates/lunatic-distributed-api/src/lib.rs @@ -9,7 +9,7 @@ use lunatic_distributed::{ use lunatic_error_api::ErrorCtx; use lunatic_process::{ env::Environment, - message::{DataMessage, Message}, + message::{DataMessage, Message}, ProcessId, ProcessRecord, }; use lunatic_process_api::ProcessCtx; use tokio::time::timeout; @@ -18,7 +18,7 @@ use wasmtime::{Caller, Linker, ResourceLimiter}; // Register the lunatic distributed APIs to the linker pub fn register(linker: &mut Linker) -> Result<()> where - T: DistributedCtx + ProcessCtx + Send + ResourceLimiter + ErrorCtx + 'static, + T: DistributedCtx + ProcessCtx + Send + Sync + ResourceLimiter + ErrorCtx + 'static, E: Environment + 'static, for<'a> &'a T: Send, { @@ -43,6 +43,11 @@ where "copy_lookup_nodes_results", copy_lookup_nodes_results, )?; + + linker.func_wrap4_async("lunatic::distributed", "get", get_process)?; + linker.func_wrap4_async("lunatic::distributed", "put", put_process)?; + linker.func_wrap2_async("lunatic::distributed", "remove", remove_process)?; + Ok(()) } @@ -498,3 +503,153 @@ where { caller.data().module_id() } + +// Looks up process under `name` and returns 0 if it was found or 1 if not found. +// +// Traps: +// * If any memory outside the guest heap space is referenced. +fn get_process( + mut caller: Caller, + name_str_ptr: u32, + name_str_len: u32, + node_id_ptr: u32, + process_id_ptr: u32, +) -> Box> + Send + '_> +where T : DistributedCtx + Send + Sync, + E : Environment +{ + Box::new(async move { + // Extract process name from memory. + let memory = get_memory(&mut caller)?; + let (memory_slice, state) = memory.data_and_store_mut(&mut caller); + let name = memory_slice + .get(name_str_ptr as usize..(name_str_ptr + name_str_len) as usize) + .or_trap("lunatic::distributed::get")?; + let name = std::str::from_utf8(name).or_trap("lunatic::distributed::get")?; + + // Sanity check + if state.registry_atomic_put().is_some() { + return Err(anyhow!( + "calling `lunatic::distributed::get` after `get_or_put_later` will deadlock" + )); + } + + #[cfg(feature = "metrics")] + metrics::increment_counter!("lunatic.registry.read"); + + // Lookup in distributed registry. + let distributed = state.distributed() + .or_trap("lunatic::distributed::get")?; + let Ok(record) = distributed + .control + .get_process(name) + .await + else { + return Ok(1); + }; + + memory + .write(&mut caller, node_id_ptr as usize, &record.node_id().to_le_bytes()) + .or_trap("lunatic::distributed::get")?; + + memory + .write( + &mut caller, + process_id_ptr as usize, + &record.process_id().to_le_bytes(), + ) + .or_trap("lunatic::distributed::get")?; + Ok(0) + }) +} + +// Registers process with ID under `name`. +// +// Traps: +// * If the process ID doesn't exist. +// * If any memory outside the guest heap space is referenced. +// * If the remote call fails. +fn put_process( + mut caller: Caller, + name_str_ptr: u32, + name_str_len: u32, + node_id: u64, + process_id: u64, +) -> Box> + Send + '_> +where T : DistributedCtx + Send + Sync, + E : Environment +{ + Box::new(async move { + let memory = get_memory(&mut caller)?; + let (memory_slice, state) = memory.data_and_store_mut(&mut caller); + let name = memory_slice + .get(name_str_ptr as usize..(name_str_ptr + name_str_len) as usize) + .or_trap("lunatic::distributed::put")?; + let name = std::str::from_utf8(name).or_trap("lunatic::distributed::put")?; + + // Store in the distributed registry. + let distributed = state.distributed().or_trap("lunatic::distributed::put")?; + let record = ProcessRecord::new(node_id, ProcessId::new(process_id)); + distributed.control.add_process(name, record).await + .or_trap("lunatic::distributed::put")?; + + #[cfg(feature = "metrics")] + metrics::increment_counter!("lunatic.distributed.write"); + + #[cfg(feature = "metrics")] + metrics::increment_gauge!("lunatic.distributed.registered", 1.0); + + Ok(()) + }) +} + +// Removes process under `name` if it exists, returns 0 if it was found or 1 if not found. +// +// Traps: +// * If any memory outside the guest heap space is referenced. +// * If the remote call fails. +fn remove_process( + mut caller: Caller, + name_str_ptr: u32, + name_str_len: u32, +) -> Box> + Send + '_> + where T : DistributedCtx + Send + Sync, + E : Environment +{ + Box::new(async move { + let memory = get_memory(&mut caller)?; + let (memory_slice, state) = memory.data_and_store_mut(&mut caller); + let name = memory_slice + .get(name_str_ptr as usize..(name_str_ptr + name_str_len) as usize) + .or_trap("lunatic::distributed::get")?; + let name = std::str::from_utf8(name).or_trap("lunatic::distributed::get")?; + + // Sanity check + if state.registry_atomic_put().is_some() { + return Err(anyhow!( + "calling `lunatic::distributed::remove` after `get_or_put_later` will deadlock" + )); + } + + // Remove globally. + let distributed = state.distributed().or_trap("lunatic::distributed::remove_process")?; + if let Err(err) = distributed.control.remove_process(name).await { + // A double removal is not considered an error. + for error in err.chain() { + if let Some(&lunatic_control_axum::api::ApiError::ProcessNotFound) = error.downcast_ref() { + return Ok(1); + } + } + // Other errors are. + return Err(err).or_trap("lunatic::distributed::remove")? + } + + #[cfg(feature = "metrics")] + metrics::increment_counter!("lunatic.distributed.deletion"); + + #[cfg(feature = "metrics")] + metrics::decrement_gauge!("lunatic.distributed.registered", 1.0); + + Ok(0) + }) +} diff --git a/crates/lunatic-distributed/src/control/api.rs b/crates/lunatic-distributed/src/control/api.rs index af290b4f4..fb2301835 100644 --- a/crates/lunatic-distributed/src/control/api.rs +++ b/crates/lunatic-distributed/src/control/api.rs @@ -27,6 +27,11 @@ pub struct ControlUrls { pub get_module: String, pub add_module: String, pub get_nodes: String, + + /// Get a process + pub get_process: String, + pub add_process: String, + pub remove_process: String, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -47,6 +52,7 @@ pub struct NodesList { } #[derive(Clone, Debug, Serialize, Deserialize)] +/// The binary for a wasm module. pub struct ModuleBytes { pub bytes: Vec, } diff --git a/crates/lunatic-distributed/src/control/client.rs b/crates/lunatic-distributed/src/control/client.rs index 5ac1fc3ae..b21719d38 100644 --- a/crates/lunatic-distributed/src/control/client.rs +++ b/crates/lunatic-distributed/src/control/client.rs @@ -1,6 +1,6 @@ use anyhow::{Context, Result}; use dashmap::DashMap; -use lunatic_process::runtimes::RawWasm; +use lunatic_process::{runtimes::RawWasm, ProcessRecord}; use reqwest::{Client as HttpClient, Url}; use serde::{de::DeserializeOwned, Serialize}; use std::{ @@ -129,6 +129,7 @@ impl Client { Ok(resp.node_id as u64) } + /// Place a GET call to a url. pub async fn get(&self, url: &str, query: Option<&str>) -> Result { let mut url: Url = url.parse()?; url.set_query(query); @@ -154,6 +155,33 @@ impl Client { Ok(resp) } + /// Place a DELETE call to a url. + pub async fn delete(&self, url: &str, query: Option<&str>) -> Result { + let mut url: Url = url.parse()?; + url.set_query(query); + + let resp: T = self + .inner + .http_client + .delete(url.clone()) + .bearer_auth(&self.inner.reg.authentication_token) + .header( + "x-lunatic-node-name", + &self.inner.reg.node_name.hyphenated().to_string(), + ) + .send() + .await + .with_context(|| format!("Error sending HTTP DELETE request: {}.", &url))? + .error_for_status() + .with_context(|| format!("HTTP DELETE request returned an error response: {}", &url))? + .json() + .await + .with_context(|| format!("Error parsing the HTTP DELETE request JSON: {}", &url))?; + + Ok(resp) + } + + /// Place a POST call to a url. pub async fn post(&self, url: &str, data: T) -> Result { let url: Url = url.parse()?; @@ -271,6 +299,48 @@ impl Client { let resp: ModuleId = self.upload(url, module.clone()).await?; Ok(RawWasm::new(Some(resp.module_id), module)) } + + /// Register a process. + /// + /// If a process was previously registered with the same `name`, return the previous process. + pub async fn add_process(&self, name: &str, record: ProcessRecord) -> Result> { + let url = self + .inner + .reg + .urls + .add_process + .replace("{name}", name); + let result = self.post(&url, record).await?; + Ok(result) + } + + /// Remove a process. + /// + /// Returns `ApiError::ProcessNotFound` if the process does not exist. + pub async fn remove_process(&self, name: &str) -> Result { + let url = self + .inner + .reg + .urls + .remove_process + .replace("{name}", name); + let result = self.delete(&url, None).await?; + Ok(result) + } + + /// Get a process. + /// + /// Returns `ApiError::ProcessNotFound` if the process does not exist. + pub async fn get_process(&self, name: &str) -> Result { + let url = self + .inner + .reg + .urls + .add_process + .replace("{name}", name); + let result = self.get(&url, None).await?; + Ok(result) + } } async fn refresh_nodes_task(client: Client) -> Result<()> { diff --git a/crates/lunatic-process/src/lib.rs b/crates/lunatic-process/src/lib.rs index 719921238..b7d36d675 100644 --- a/crates/lunatic-process/src/lib.rs +++ b/crates/lunatic-process/src/lib.rs @@ -12,6 +12,7 @@ use anyhow::{anyhow, Result}; use env::Environment; use log::{debug, log_enabled, trace, warn, Level}; +use serde::{Deserialize, Serialize}; use smallvec::SmallVec; use state::ProcessState; use tokio::{ @@ -388,7 +389,7 @@ where let registry = result.state().registry().read().await; let name = registry .iter() - .filter(|(_, (_, process_id))| process_id == &id) + .filter(|(_, (_, process_id))| Into::::into(*process_id) == id) .map(|(name, _)| name.splitn(4, '/').last().unwrap_or(name.as_str())) .collect::() .or_id(id); @@ -540,3 +541,75 @@ pub enum ResultValue { Failed(String), SpawnError(String), } + + +/// A name for a process. +/// +/// This name is provided by the process itself and is used for the purpose +/// of finding the process id. There is no guarantee that the name is unique. +#[derive(PartialEq, Eq, Hash, Clone, Debug, Deserialize, Serialize)] +pub struct ProcessName(String); +impl std::fmt::Display for ProcessName { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} +impl AsRef for ProcessName { + fn as_ref(&self) -> &str { + &self.0 + } +} +impl ProcessName { + pub fn from_utf8(bytes: Vec) -> Result { + Ok(ProcessName(String::from_utf8(bytes)?)) + } +} +/// The id of a process. +/// +/// FIXME: Is this a global id or a local id? +#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug, Deserialize, Serialize)] +pub struct ProcessId(u64); +impl std::fmt::Display for ProcessId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} +impl From for u64 { + fn from(value: ProcessId) -> u64 { + value.0 + } +} +impl ProcessId { + pub fn new(id: u64) -> Self { + ProcessId(id) + } + pub fn to_le_bytes(&self) -> [u8;8] { + self.0.to_le_bytes() + } +} + +/// A record in the table of processes. +/// +/// Records are maintained by processes themselves and should not be considered +/// trusted by the VM. +#[derive(Deserialize, Serialize, Clone)] +pub struct ProcessRecord { + /// The id of the node hosting the process. + node_id: u64, + + /// The id of the process itself. + process_id: ProcessId +} +impl ProcessRecord { + pub fn new(node_id: u64, process_id: ProcessId) -> Self { + ProcessRecord { node_id, process_id } + } + + pub fn node_id(&self) -> u64 { + self.node_id + } + + pub fn process_id(&self) -> ProcessId { + self.process_id + } +} diff --git a/crates/lunatic-process/src/state.rs b/crates/lunatic-process/src/state.rs index b48281602..cfa743341 100644 --- a/crates/lunatic-process/src/state.rs +++ b/crates/lunatic-process/src/state.rs @@ -12,7 +12,7 @@ use crate::{ config::ProcessConfig, mailbox::MessageMailbox, runtimes::wasmtime::{WasmtimeCompiledModule, WasmtimeRuntime}, - Signal, + Signal, ProcessId, }; pub type ConfigResources = HashMapId; @@ -63,9 +63,12 @@ pub trait ProcessState: Sized { fn config_resources(&self) -> &ConfigResources; fn config_resources_mut(&mut self) -> &mut ConfigResources; - // Registry - fn registry(&self) -> &Arc>>; + // Process registry + // + // Key: process name. + // Value: node id, process id. + fn registry(&self) -> &Arc>>; fn registry_atomic_put( &mut self, - ) -> &mut Option>>; + ) -> &mut Option>>; } diff --git a/crates/lunatic-registry-api/src/lib.rs b/crates/lunatic-registry-api/src/lib.rs index 55993561f..fc0254919 100644 --- a/crates/lunatic-registry-api/src/lib.rs +++ b/crates/lunatic-registry-api/src/lib.rs @@ -2,7 +2,7 @@ use std::{collections::HashMap, future::Future, mem::transmute}; use anyhow::{anyhow, Result}; use lunatic_common_api::{get_memory, IntoTrap}; -use lunatic_process::state::ProcessState; +use lunatic_process::{state::ProcessState, ProcessId}; use lunatic_process_api::ProcessCtx; use tokio::sync::RwLockWriteGuard; use wasmtime::{Caller, Linker}; @@ -67,7 +67,7 @@ fn put + Send + Sync>( match state.registry_atomic_put().take() { // Use existing lock for writing. Some(mut registry_lock) => { - registry_lock.insert(name.to_owned(), (node_id, process_id)); + registry_lock.insert(name.to_owned(), (node_id, ProcessId::new(process_id))); } // If no lock exists, acquire it. None => { @@ -75,7 +75,7 @@ fn put + Send + Sync>( .registry() .write() .await - .insert(name.to_owned(), (node_id, process_id)); + .insert(name.to_owned(), (node_id, ProcessId::new(process_id))); } } @@ -178,7 +178,7 @@ fn get_or_put_later + Send + Sync>( // Safety: // The process state (containing the lock) can't outlive the `RwLock`, // which is global. This makes it safe to extend the lifetime. - let registry_lock: RwLockWriteGuard<'static, HashMap> = + let registry_lock: RwLockWriteGuard<'static, HashMap> = unsafe { transmute(registry_lock) }; if let Some(process) = registry_lock.get(name) { diff --git a/src/state.rs b/src/state.rs index 84fa22d5f..beb57250b 100644 --- a/src/state.rs +++ b/src/state.rs @@ -8,6 +8,7 @@ use lunatic_distributed::{DistributedCtx, DistributedProcessState}; use lunatic_error_api::{ErrorCtx, ErrorResource}; use lunatic_networking_api::{DnsIterator, TlsConnection, TlsListener}; use lunatic_networking_api::{NetworkingCtx, TcpConnection}; +use lunatic_process::ProcessId; use lunatic_process::env::{Environment, LunaticEnvironment}; use lunatic_process::runtimes::wasmtime::{WasmtimeCompiledModule, WasmtimeRuntime}; use lunatic_process::state::{ConfigResources, ProcessState}; @@ -70,11 +71,11 @@ pub struct DefaultProcessState { initialized: bool, // database resources db_resources: DbResources, - registry: Arc>>, + registry: Arc>>, // Allows for atomic registry "lookup and insert" operations, by holding the write-lock of a // `RwLock` struct. The lifetime of the lock will need to be extended to `'static`, but this // is a safe operation, because it references a global registry that outlives all processes. - registry_atomic_put: Option>>, + registry_atomic_put: Option>>, } impl DefaultProcessState { @@ -84,7 +85,7 @@ impl DefaultProcessState { runtime: WasmtimeRuntime, module: Arc>, config: Arc, - registry: Arc>>, + registry: Arc>>, ) -> Result { let signal_mailbox = unbounded_channel(); let signal_mailbox = (signal_mailbox.0, Arc::new(Mutex::new(signal_mailbox.1))); @@ -243,13 +244,13 @@ impl ProcessState for DefaultProcessState { &mut self.resources.configs } - fn registry(&self) -> &Arc>> { + fn registry(&self) -> &Arc>> { &self.registry } fn registry_atomic_put( &mut self, - ) -> &mut Option>> { + ) -> &mut Option>> { &mut self.registry_atomic_put } }