Skip to content

Commit

Permalink
running plugin index added
Browse files Browse the repository at this point in the history
  • Loading branch information
milyin committed Oct 26, 2023
1 parent 84498a4 commit ef7bfaa
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 54 deletions.
72 changes: 57 additions & 15 deletions plugins/zenoh-plugin-trait/src/loading.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,29 @@ pub struct PluginsManager<StartArgs: CompatibilityVersion, RunningPlugin: Compat
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct PluginIndex(usize);

/// Unique identifier of running plugin in plugin manager.
/// The ``RunningPligin`` insterface can be accessed through this index without
/// additional checks.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct RunningPluginIndex(PluginIndex);

/// Trait allowing to use ``RunningPluginIndex`` as ``PluginIndex``
pub trait PluginIndexTrait {
fn index(&self) -> PluginIndex;
}

impl PluginIndexTrait for PluginIndex {
fn index(&self) -> PluginIndex {
*self
}
}

impl PluginIndexTrait for RunningPluginIndex {
fn index(&self) -> PluginIndex {
self.0
}
}

impl<StartArgs: 'static + CompatibilityVersion, RunningPlugin: 'static + CompatibilityVersion>
PluginsManager<StartArgs, RunningPlugin>
{
Expand Down Expand Up @@ -117,8 +140,8 @@ impl<StartArgs: 'static + CompatibilityVersion, RunningPlugin: 'static + Compati
}

/// Returns `true` if the plugin with the given index exists in the manager.
pub fn check_plugin_index(&self, index: PluginIndex) -> bool {
index.0 < self.plugins.len()
pub fn check_plugin_index<T: PluginIndexTrait>(&self, index: T) -> bool {
index.index().0 < self.plugins.len()
}

/// Returns plugin index by name
Expand All @@ -135,20 +158,39 @@ impl<StartArgs: 'static + CompatibilityVersion, RunningPlugin: 'static + Compati
.ok_or_else(|| format!("Plugin `{}` not found", name).into())
}

/// Returns running plugin index by name
pub fn get_running_plugin_index(&self, name: &str) -> Option<RunningPluginIndex> {
self.get_plugin_index(name).and_then(|i| {
self.plugins[i.0]
.get_running_plugin()
.map(|_| RunningPluginIndex(i))
})
}

/// Returns running plugin index by name or error if plugin not found
pub fn get_running_plugin_index_err(&self, name: &str) -> ZResult<RunningPluginIndex> {
self.get_running_plugin_index(name)
.ok_or_else(|| format!("Plugin `{}` not running", name).into())
}

/// Starts plugin.
/// Returns
/// Ok(true) => plugin was successfully started
/// Ok(false) => plugin was already running
/// Ok(true, index) => plugin was successfully started
/// Ok(false, index) => plugin was already running
/// Err(e) => starting the plugin failed due to `e`
pub fn start(&mut self, index: PluginIndex, args: &StartArgs) -> ZResult<bool> {
let instance = &mut self.plugins[index.0];
pub fn start<T: PluginIndexTrait>(
&mut self,
index: T,
args: &StartArgs,
) -> ZResult<(bool, RunningPluginIndex)> {
let instance = &mut self.plugins[index.index().0];
let already_started = instance.start(args)?;
Ok(already_started)
Ok((already_started, RunningPluginIndex(index.index())))
}

/// Stops `plugin`, returning `true` if it was indeed running.
pub fn stop(&mut self, index: PluginIndex) -> ZResult<bool> {
let instance = &mut self.plugins[index.0];
pub fn stop(&mut self, index: RunningPluginIndex) -> ZResult<bool> {
let instance = &mut self.plugins[index.index().0];
let was_running = instance.stop();
Ok(was_running)
}
Expand All @@ -157,22 +199,22 @@ impl<StartArgs: 'static + CompatibilityVersion, RunningPlugin: 'static + Compati
self.plugins.iter().enumerate().map(|(i, _)| PluginIndex(i))
}
/// Lists the loaded plugins
pub fn running_plugins(&self) -> impl Iterator<Item = PluginIndex> + '_ {
pub fn running_plugins(&self) -> impl Iterator<Item = RunningPluginIndex> + '_ {
self.plugins.iter().enumerate().filter_map(|(i, p)| {
if p.get_running_plugin().is_some() {
Some(PluginIndex(i))
Some(RunningPluginIndex(PluginIndex(i)))
} else {
None
}
})
}
/// Returns running plugin interface of plugin or none if plugin is stopped
pub fn running_plugin(&self, index: PluginIndex) -> Option<&RunningPlugin> {
self.plugins[index.0].get_running_plugin()
pub fn running_plugin(&self, index: RunningPluginIndex) -> &RunningPlugin {
self.plugins[index.index().0].get_running_plugin().unwrap()
}
/// Returns plugin information
pub fn plugin(&self, index: PluginIndex) -> &dyn PluginInfo {
self.plugins[index.0].as_plugin_info()
pub fn plugin<T: PluginIndexTrait>(&self, index: T) -> &dyn PluginInfo {
self.plugins[index.index().0].as_plugin_info()
}

fn load_plugin(
Expand Down
79 changes: 40 additions & 39 deletions zenoh/src/net/runtime/adminspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,10 @@ impl ConfigValidator for AdminSpace {
new: &serde_json::Map<String, serde_json::Value>,
) -> ZResult<Option<serde_json::Map<String, serde_json::Value>>> {
let plugins_mgr = zlock!(self.context.plugins_mgr);
if let Some(plugin) = plugins_mgr.get_plugin_index(name) {
if let Some(plugin) = plugins_mgr.running_plugin(plugin) {
plugin.config_checker(path, current, new)
} else {
Err(format!("Plugin {name} not running").into())
}
} else {
Err(format!("Plugin {name} not found").into())
}
let index = plugins_mgr.get_running_plugin_index_err(name)?;
plugins_mgr
.running_plugin(index)
.config_checker(path, current, new)
}
}

Expand Down Expand Up @@ -198,41 +193,47 @@ impl AdminSpace {
match diff {
PluginDiff::Delete(name) => {
active_plugins.remove(name.as_str());
if let Some(index) = plugins_mgr.get_plugin_index(&name) {
if let Some(index) = plugins_mgr.get_running_plugin_index(&name) {
let _ = plugins_mgr.stop(index);
}
}
PluginDiff::Start(plugin) => {
let name = &plugin.name;
if let Ok(index) = match &plugin.paths {
Some(paths) => plugins_mgr.load_plugin_by_paths(name, paths),
None => {
plugins_mgr.load_plugin_by_backend_name(name, &plugin.name)
}
}
.map_err(|e| {
if plugin.required {
panic!("Failed to load plugin `{}`: {}", name, e)
} else {
log::error!("Failed to load plugin `{}`: {}", name, e)
}
}) {
let path = plugins_mgr.plugin(index).path().to_string();
log::info!("Loaded plugin `{}` from {}", name, path);
match plugins_mgr.start(index, &admin.context.runtime) {
Ok(true) => {
active_plugins.insert(name.into(), path.clone());
log::info!(
"Successfully started plugin `{}` from {}",
name,
path
);
let index = if let Some(paths) = &plugin.paths {
plugins_mgr.load_plugin_by_paths(name, paths)
} else {
plugins_mgr.load_plugin_by_backend_name(name, &plugin.name)
};
match index {
Ok(index) => {
let path = plugins_mgr.plugin(index).path().to_string();
log::info!("Loaded plugin `{}` from {}", name, path);
match plugins_mgr.start(index, &admin.context.runtime) {
Ok((true, _)) => {
active_plugins.insert(name.into(), path.clone());
log::info!(
"Successfully started plugin `{}` from {}",
name,
path
);
}
Ok((false, _)) => {
log::warn!("Plugin `{}` was already running", name)
}
Err(e) => {
log::error!(
"Failed to start plugin `{}`: {}",
name,
e
)
}
}
Ok(false) => {
log::warn!("Plugin `{}` was already running", name)
}
Err(e) => {
log::error!("Failed to start plugin `{}`: {}", name, e)
}
Err(e) => {
if plugin.required {
panic!("Failed to load plugin `{}`: {}", name, e)
} else {
log::error!("Failed to load plugin `{}`: {}", name, e)
}
}
}
Expand Down Expand Up @@ -645,7 +646,7 @@ fn plugins_status(context: &AdminContext, query: Query) {
let info = guard.plugin(index);
let name = info.name();
let path = info.path();
let plugin = guard.running_plugin(index).unwrap();
let plugin = guard.running_plugin(index);
with_extended_string(&mut root_key, &[name], |plugin_key| {
with_extended_string(plugin_key, &["/__path__"], |plugin_path_key| {
if let Ok(key_expr) = KeyExpr::try_from(plugin_path_key.clone()) {
Expand Down

0 comments on commit ef7bfaa

Please sign in to comment.