Skip to content

Commit

Permalink
safer plugin load api, unfinished
Browse files Browse the repository at this point in the history
  • Loading branch information
milyin committed Oct 27, 2023
1 parent ef7bfaa commit 7d4856b
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 148 deletions.
249 changes: 123 additions & 126 deletions plugins/zenoh-plugin-trait/src/loading.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,94 +18,96 @@ use vtable::{Compatibility, PluginLoaderVersion, PluginVTable, PLUGIN_LOADER_VER
use zenoh_result::{bail, ZResult};
use zenoh_util::LibLoader;

struct PluginInstance<StartArgs: CompatibilityVersion, RunningPlugin: CompatibilityVersion> {
pub struct PluginRecord<StartArgs: CompatibilityVersion, RunningPlugin: CompatibilityVersion> {
starter: Box<dyn PluginStarter<StartArgs, RunningPlugin> + Send + Sync>,
running_plugin: Option<RunningPlugin>,
}

impl<StartArgs: CompatibilityVersion, RunningPlugin: CompatibilityVersion> PluginInfo
for PluginInstance<StartArgs, RunningPlugin>
{
fn name(&self) -> &str {
self.starter.name()
}
fn path(&self) -> &str {
self.starter.path()
}
fn deletable(&self) -> bool {
self.starter.deletable()
}
}

impl<StartArgs: CompatibilityVersion, RunningPlugin: CompatibilityVersion>
PluginInstance<StartArgs, RunningPlugin>
PluginRecord<StartArgs, RunningPlugin>
{
fn new<T: PluginStarter<StartArgs, RunningPlugin> + Send + Sync + 'static>(starter: T) -> Self {
Self {
starter: Box::new(starter),
running_plugin: None,
}
}
fn get_running_plugin(&self) -> Option<&RunningPlugin> {
self.running_plugin.as_ref()
}
fn stop(&mut self) -> bool {
pub fn running(&self) -> Option<&dyn RunningPluginRecord<StartArgs, RunningPlugin>> {
if self.running_plugin.is_some() {
self.running_plugin = None;
true
Some(self)
} else {
false
None
}
}
fn start(&mut self, args: &StartArgs) -> ZResult<bool> {
pub fn running_mut(
&mut self,
) -> Option<&mut dyn RunningPluginRecord<StartArgs, RunningPlugin>> {
if self.running_plugin.is_some() {
return Ok(false);
Some(self)
} else {
None
}
let plugin = self.starter.start(args)?;
self.running_plugin = Some(plugin);
Ok(true)
}
fn as_plugin_info(&self) -> &dyn PluginInfo {
self
pub fn start(
&mut self,
args: &StartArgs,
) -> ZResult<(bool, &mut dyn RunningPluginRecord<StartArgs, RunningPlugin>)> {
let already_running = self.running_plugin.is_some();
if !already_running {
self.running_plugin = Some(self.starter.start(args)?);
}
Ok((already_running, self))
}
pub fn name(&self) -> &str {
self.starter.name()
}
pub fn path(&self) -> &str {
self.starter.path()
}
pub fn deletable(&self) -> bool {
self.starter.deletable()
}
}

/// A plugins manager that handles starting and stopping plugins.
/// Plugins can be loaded from shared libraries using [`Self::load_plugin_by_name`] or [`Self::load_plugin_by_paths`], or added directly from the binary if available using [`Self::add_static`].
pub struct PluginsManager<StartArgs: CompatibilityVersion, RunningPlugin: CompatibilityVersion> {
default_lib_prefix: String,
loader: Option<LibLoader>,
plugins: Vec<PluginInstance<StartArgs, RunningPlugin>>,
}

/// Unique identifier of plugin in plugin manager. Actually is just an index in the plugins list.
/// It's guaranteed that if intex is obtained from plugin manager, it's valid for this plugin manager.
/// (at least at this moment, when there is no pluing unload support).
/// Using it instead of plugin name allows to avoid checking for ``Option`` every time.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct PluginIndex(usize);

/// Unique identifier of running plugin in plugin manager.
/// The ``RunningPligin`` insterface can be accessed through this index without
/// additional checks.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct RunningPluginIndex(PluginIndex);

/// Trait allowing to use ``RunningPluginIndex`` as ``PluginIndex``
pub trait PluginIndexTrait {
fn index(&self) -> PluginIndex;
pub trait RunningPluginRecord<StartArgs: CompatibilityVersion, RunningPlugin: CompatibilityVersion>
{
fn name(&self) -> &str;
fn path(&self) -> &str;
fn deletable(&self) -> bool;
fn stop(&mut self);
fn running(&self) -> &RunningPlugin;
fn running_mut(&mut self) -> &mut RunningPlugin;
}

impl PluginIndexTrait for PluginIndex {
fn index(&self) -> PluginIndex {
*self
impl<StartArgs: CompatibilityVersion, RunningPligin: CompatibilityVersion>
RunningPluginRecord<StartArgs, RunningPligin> for PluginRecord<StartArgs, RunningPligin>
{
fn name(&self) -> &str {
self.name()
}
fn path(&self) -> &str {
self.path()
}
fn deletable(&self) -> bool {
self.deletable()
}
fn stop(&mut self) {
self.running_plugin = None;
}
fn running(&self) -> &RunningPligin {
self.running_plugin.as_ref().unwrap()
}
fn running_mut(&mut self) -> &mut RunningPligin {
self.running_plugin.as_mut().unwrap()
}
}

impl PluginIndexTrait for RunningPluginIndex {
fn index(&self) -> PluginIndex {
self.0
}
/// A plugins manager that handles starting and stopping plugins.
/// Plugins can be loaded from shared libraries using [`Self::load_plugin_by_name`] or [`Self::load_plugin_by_paths`], or added directly from the binary if available using [`Self::add_static`].
pub struct PluginsManager<StartArgs: CompatibilityVersion, RunningPlugin: CompatibilityVersion> {
default_lib_prefix: String,
loader: Option<LibLoader>,
plugins: Vec<PluginRecord<StartArgs, RunningPlugin>>,
}

impl<StartArgs: 'static + CompatibilityVersion, RunningPlugin: 'static + CompatibilityVersion>
Expand Down Expand Up @@ -135,86 +137,81 @@ impl<StartArgs: 'static + CompatibilityVersion, RunningPlugin: 'static + Compati
mut self,
) -> Self {
let plugin_starter: StaticPlugin<P> = StaticPlugin::new();
self.plugins.push(PluginInstance::new(plugin_starter));
self.plugins.push(PluginRecord::new(plugin_starter));
self
}

/// Returns `true` if the plugin with the given index exists in the manager.
pub fn check_plugin_index<T: PluginIndexTrait>(&self, index: T) -> bool {
index.index().0 < self.plugins.len()
}

/// Returns plugin index by name
pub fn get_plugin_index(&self, name: &str) -> Option<PluginIndex> {
self.plugins
.iter()
.position(|p| p.name() == name)
.map(PluginIndex)
fn get_plugin_index(&self, name: &str) -> Option<usize> {
self.plugins.iter().position(|p| p.name() == name)
}

/// Returns plugin index by name or error if plugin not found
pub fn get_plugin_index_err(&self, name: &str) -> ZResult<PluginIndex> {
fn get_plugin_index_err(&self, name: &str) -> ZResult<usize> {
self.get_plugin_index(name)
.ok_or_else(|| format!("Plugin `{}` not found", name).into())
}

/// Returns running plugin index by name
pub fn get_running_plugin_index(&self, name: &str) -> Option<RunningPluginIndex> {
self.get_plugin_index(name).and_then(|i| {
self.plugins[i.0]
.get_running_plugin()
.map(|_| RunningPluginIndex(i))
})
/// Lists the loaded plugins
pub fn plugins(&self) -> impl Iterator<Item = &PluginRecord<StartArgs, RunningPlugin>> + '_ {
self.plugins.iter()
}

/// Returns running plugin index by name or error if plugin not found
pub fn get_running_plugin_index_err(&self, name: &str) -> ZResult<RunningPluginIndex> {
self.get_running_plugin_index(name)
.ok_or_else(|| format!("Plugin `{}` not running", name).into())
/// Lists the loaded plugins mutable
pub fn plugins_mut(
&mut self,
) -> impl Iterator<Item = &mut PluginRecord<StartArgs, RunningPlugin>> + '_ {
self.plugins.iter_mut()
}

/// Starts plugin.
/// Returns
/// Ok(true, index) => plugin was successfully started
/// Ok(false, index) => plugin was already running
/// Err(e) => starting the plugin failed due to `e`
pub fn start<T: PluginIndexTrait>(
&mut self,
index: T,
args: &StartArgs,
) -> ZResult<(bool, RunningPluginIndex)> {
let instance = &mut self.plugins[index.index().0];
let already_started = instance.start(args)?;
Ok((already_started, RunningPluginIndex(index.index())))
/// Lists the running plugins
pub fn running_plugins(
&self,
) -> impl Iterator<Item = &dyn RunningPluginRecord<StartArgs, RunningPlugin>> + '_ {
self.plugins().filter_map(|p| p.running())
}

/// Stops `plugin`, returning `true` if it was indeed running.
pub fn stop(&mut self, index: RunningPluginIndex) -> ZResult<bool> {
let instance = &mut self.plugins[index.index().0];
let was_running = instance.stop();
Ok(was_running)
/// Lists the running plugins mutable
pub fn running_plugins_mut(
&mut self,
) -> impl Iterator<Item = &mut dyn RunningPluginRecord<StartArgs, RunningPlugin>> + '_ {
self.plugins_mut().filter_map(|p| p.running_mut())
}
/// Lists the loaded plugins
pub fn plugins(&self) -> impl Iterator<Item = PluginIndex> + '_ {
self.plugins.iter().enumerate().map(|(i, _)| PluginIndex(i))

/// Returns plugin record
pub fn plugin(&self, name: &str) -> ZResult<&PluginRecord<StartArgs, RunningPlugin>> {
Ok(&self.plugins[self.get_plugin_index_err(name)?])
}
/// Lists the loaded plugins
pub fn running_plugins(&self) -> impl Iterator<Item = RunningPluginIndex> + '_ {
self.plugins.iter().enumerate().filter_map(|(i, p)| {
if p.get_running_plugin().is_some() {
Some(RunningPluginIndex(PluginIndex(i)))
} else {
None
}
})

/// Returns mutable plugin record
pub fn plugin_mut(
&mut self,
name: &str,
) -> ZResult<&mut PluginRecord<StartArgs, RunningPlugin>> {
let index = self.get_plugin_index_err(name)?;
Ok(&mut self.plugins[index])
}
/// Returns running plugin interface of plugin or none if plugin is stopped
pub fn running_plugin(&self, index: RunningPluginIndex) -> &RunningPlugin {
self.plugins[index.index().0].get_running_plugin().unwrap()

/// Returns running plugin record
pub fn running_plugin(
&self,
name: &str,
) -> ZResult<&dyn RunningPluginRecord<StartArgs, RunningPlugin>> {
Ok(self
.plugin(name)?
.running()
.ok_or_else(|| format!("Plugin `{}` is not running", name))?)
}
/// Returns plugin information
pub fn plugin<T: PluginIndexTrait>(&self, index: T) -> &dyn PluginInfo {
self.plugins[index.index().0].as_plugin_info()

/// Returns mutable running plugin record
pub fn running_plugin_mut(
&mut self,
name: &str,
) -> ZResult<&mut dyn RunningPluginRecord<StartArgs, RunningPlugin>> {
Ok(self
.plugin_mut(name)?
.running_mut()
.ok_or_else(|| format!("Plugin `{}` is not running", name))?)
}

fn load_plugin(
Expand All @@ -231,7 +228,7 @@ impl<StartArgs: 'static + CompatibilityVersion, RunningPlugin: 'static + Compati
&mut self,
name: T,
backend_name: T1,
) -> ZResult<PluginIndex> {
) -> ZResult<&mut PluginRecord<StartArgs, RunningPlugin>> {
let name = name.as_ref();
if self.get_plugin_index(name).is_some() {
bail!("Plugin `{}` already loaded", name);
Expand All @@ -245,15 +242,15 @@ impl<StartArgs: 'static + CompatibilityVersion, RunningPlugin: 'static + Compati
Ok(p) => p,
Err(e) => bail!("After loading `{:?}`: {}", &p, e),
};
self.plugins.push(PluginInstance::new(plugin));
Ok(PluginIndex(self.plugins.len() - 1))
self.plugins.push(PluginRecord::new(plugin));
Ok(self.plugins.last_mut().unwrap())
}
/// Tries to load a plugin from the list of path to plugin (absolute or relative to the current working directory)
pub fn load_plugin_by_paths<T: AsRef<str>, P: AsRef<str> + std::fmt::Debug>(
&mut self,
name: T,
paths: &[P],
) -> ZResult<PluginIndex> {
) -> ZResult<&mut PluginRecord<StartArgs, RunningPlugin>> {
let name = name.as_ref();
if self.get_plugin_index(name).is_some() {
bail!("Plugin `{}` already loaded", name);
Expand All @@ -263,8 +260,8 @@ impl<StartArgs: 'static + CompatibilityVersion, RunningPlugin: 'static + Compati
match unsafe { LibLoader::load_file(path) } {
Ok((lib, p)) => {
let plugin = Self::load_plugin(name, lib, p)?;
self.plugins.push(PluginInstance::new(plugin));
return Ok(PluginIndex(self.plugins.len() - 1));
self.plugins.push(PluginRecord::new(plugin));
return Ok(self.plugins.last_mut().unwrap());
}
Err(e) => log::warn!("Plugin '{}' load fail at {}: {}", &name, path, e),
}
Expand Down
Loading

0 comments on commit 7d4856b

Please sign in to comment.