Skip to content

Commit

Permalink
Merge pull request #110 from pipeless-ai/clean_kvs
Browse files Browse the repository at this point in the history
fix: cleanup KV store when pipeline ends
  • Loading branch information
miguelaeh authored Jan 5, 2024
2 parents b05635f + 8568089 commit 2782a5e
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 14 deletions.
2 changes: 1 addition & 1 deletion pipeless/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pipeless/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pipeless-ai"
version = "1.5.0"
version = "1.5.1"
edition = "2021"
authors = ["Miguel A. Cabrera Minagorri"]
description = "An open-source computer vision framework to build and deploy applications in minutes"
Expand Down
15 changes: 8 additions & 7 deletions pipeless/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ pub fn start(
info!("Stream config entry removed. Stopping associated pipeline");
manager.stop().await;
manager_to_remove = Some(pipeline_id.clone());

// Cleanup KV store keys of that pipeline
pipeless::kvs::store::KV_STORE.clean(&pipeline_id.to_string());
}
}
}
Expand Down Expand Up @@ -225,18 +228,16 @@ pub fn start(
}
},
}

// Create new event since we have modified the streams config table
if let Err(err) = dispatcher_sender.send(DispatcherEvent::TableChange) {
warn!("Unable to send dispatcher event for streams table changed. Error: {}", err.to_string());
}
} else {
warn!("
Unable to unassign pipeline for stream. Stream entry not found.
Pipeline id: {}
", pipeline_id);

return;
}

// Create new event since we have modified the streams config table
if let Err(err) = dispatcher_sender.send(DispatcherEvent::TableChange) {
warn!("Unable to send dispatcher event for streams table changed. Error: {}", err.to_string());
}
}
}
Expand Down
19 changes: 17 additions & 2 deletions pipeless/src/kvs/store.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use log::error;
use log::{error, warn};
use sled;
use lazy_static::lazy_static;

// We assume the type implementing StoreInterface can be send thread safely
pub trait StoreInterface: Sync {
fn get(&self, key: &str) -> String;
fn set(&self, key: &str, value: &str);
fn clean(&self, prefix: &str); // clean all the keys that start with prefix
}

struct LocalStore {
Expand Down Expand Up @@ -43,6 +44,20 @@ impl StoreInterface for LocalStore {
}
}
}

fn clean(&self, prefix: &str) {
let keys_to_remove: Vec<sled::IVec> = self.backend
.scan_prefix(prefix)
.keys()
.filter_map(Result::ok)
.collect();

for key in keys_to_remove {
if let Err(err) = self.backend.remove(&key) {
warn!("Failed to remove key from KV store. {}", err);
}
}
}
}

// TODO: setup Redis or any other distributed solution.
Expand All @@ -59,6 +74,6 @@ impl StoreInterface for DistributedStore {
*/

lazy_static! {
// TODO: Add support for distributed store do not hardcode the local one
// TODO: Add support for distributed store. Do not hardcode the local one
pub static ref KV_STORE: Box<dyn StoreInterface> = Box::new(LocalStore::new());
}
7 changes: 4 additions & 3 deletions pipeless/src/stages/languages/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,16 +154,17 @@ impl PythonHook {
let module_file_name = format!("{}.py", module_name);
let wrapper_module_name = format!("{}_wrapper", module_name);
let wrapper_module_file_name = format!("{}.py", wrapper_module_name);
// NOTE: We prepend the keys with the pipeline id and the stage name. The stage avoids key collision when you import a stage from the hub. We set the pipeline_id first to make it easier to cleanup when a stream ends
// TODO: create a KVS module for python using PYo3 and expose it via pyo3::append_to_inittab!(make_person_module); so users can import it on their hooks
let wrapper_py_code = format!("
import {0}
def hook_wrapper(frame, context):
pipeline_id = frame['pipeline_id']
def pipeless_kvs_set(key, value):
_pipeless_kvs_set(f'{1}:{{pipeline_id}}:{{key}}', str(value))
_pipeless_kvs_set(f'{{pipeline_id}}:{1}:{{key}}', str(value))
def pipeless_kvs_get(key):
return _pipeless_kvs_get(f'{1}:{{pipeline_id}}:{{key}}')
return _pipeless_kvs_get(f'{{pipeline_id}}:{1}:{{key}}')
{0}.pipeless_kvs_set = pipeless_kvs_set
{0}.pipeless_kvs_get = pipeless_kvs_get
{0}.hook(frame, context)
Expand Down Expand Up @@ -247,4 +248,4 @@ impl HookTrait for PythonHook {

out_frame
}
}
}

0 comments on commit 2782a5e

Please sign in to comment.