Skip to content

Commit

Permalink
System Tagging
Browse files Browse the repository at this point in the history
  • Loading branch information
victoryforphil committed Apr 8, 2024
1 parent 5990f54 commit 40d7ec8
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 8 deletions.
10 changes: 8 additions & 2 deletions lil-broker/src/buckets/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::collections::{BTreeMap, BTreeSet};

use tracing::debug;
use crate::{DataPoint, Primatives, Tag, Timestamp};

mod querying;
Expand All @@ -24,7 +24,13 @@ impl Bucket{
let data_point = DataPoint::new(timestamp, value);
self.values.insert(timestamp, data_point);
self
}
}

pub fn add_tag(&mut self, tag: Tag) -> &mut Self{
debug!("Adding tag: {:#?} to bucket: {}", tag, self.name);
self.bucket_tags.insert(tag);
self
}

pub fn add_data_point(&mut self, data_point: DataPoint) -> &mut Self{
self.values.insert(data_point.timestamp, data_point);
Expand Down
3 changes: 2 additions & 1 deletion lil-broker/src/buckets/querying.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{Bucket, DataPoint, Timestamp};


use tracing::debug;
impl Bucket{

///Get the latest DataPoint in the bucket
Expand All @@ -26,6 +26,7 @@ impl Bucket{

let global_tags = self.bucket_tags.clone();
for tag in global_tags.iter(){
debug!("Adding tag: {:#?} to data point", tag);
data_point.add_tag(tag.clone());
}
Some(data_point)
Expand Down
19 changes: 17 additions & 2 deletions lil-broker/src/database/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
mod query;
use std::collections::BTreeMap;

use tracing::{info, error, debug};
pub use query::*;

use crate::Bucket;
use crate::{Bucket, Tag};


pub struct Database{
Expand All @@ -29,6 +29,21 @@ impl Database{

}

pub fn add_tag_to_bucket(&mut self, bucket_name: &str, tag: Tag){

//Create a new bucket if it doesn't exist
if !self.buckets.contains_key(bucket_name){
info!("Bucket: {} not found, creating new bucket", bucket_name);
self.new_bucket(bucket_name);
}

if let Some(bucket) = self.buckets.get_mut(bucket_name){
bucket.add_tag(tag);
}else{
error!("Bucket: {} not found", bucket_name);
}
}

pub fn query_batch(&mut self, queries: Vec<QueryCommand>) -> Result<Vec<QueryResponse>, String>{
let mut responses = Vec::new();
for query in queries{
Expand Down
29 changes: 26 additions & 3 deletions lil-broker/src/database/query/get_latest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ mod tests{
use super::*;

#[test]
fn test_write_query_basic(){
fn test_get_latest_basic(){
let mut db = Database::new();
let query1 = WriteQuery::new("test".into(), 7.0.into(), Timestamp::from_seconds(10.0));
let query2 = WriteQuery::new("test".into(), 10.0.into(), Timestamp::from_seconds(5.0));
Expand All @@ -109,7 +109,7 @@ mod tests{
}

#[test]
fn test_write_query_wildcard(){
fn test_get_latest_wildcard(){
let mut db = Database::new();
let query1 = WriteQuery::new("test/a/1".into(), 1.0.into(), Timestamp::from_seconds(1.0));
let query2 = WriteQuery::new("test/a/2".into(), 2.0.into(), Timestamp::from_seconds(1.0));
Expand All @@ -133,7 +133,7 @@ mod tests{
}

#[test]
fn test_write_query_ack(){
fn test_get_latest_query_ack(){
env_logger::init();
let mut db = Database::new();
let query1 = WriteQuery::new("test/a/1".into(), 1.0.into(), Timestamp::from_seconds(1.0));
Expand Down Expand Up @@ -162,4 +162,27 @@ mod tests{
debug!("Read Res after ack: {:#?}", read_res);
assert_eq!(read_res.metadata.n_results, 0);
}

#[test]
fn test_get_latest_query_bucket_tags(){
env_logger::init();
let mut db = Database::new();
let query1 = WriteQuery::new("test/a/1".into(), 1.0.into(), Timestamp::from_seconds(1.0));
db.add_tag_to_bucket("test/a/1", "user/test_tag".into());
let _write_res = db.query_batch(vec![query1.into() ]).unwrap();

let read_query = GetLatestQuery{
topics: vec!["test/a/".into()],
ack_topics: vec!["test/a/1".into()],
tag_filters: Vec::new(),
};
let read_res = db.query(read_query.into()).unwrap();
debug!("Read Response 1: {:#?}", read_res);
assert_eq!(read_res.metadata.n_results, 1);
assert_eq!(read_res.data.len(), 1);

let tags = read_res.data.get("test/a/1").unwrap().tags.clone();
assert_eq!(tags.contains(&"user/test_tag".into()), true);

}
}

0 comments on commit 40d7ec8

Please sign in to comment.