Skip to content

Commit

Permalink
feat: 🌈 Work in progress
Browse files Browse the repository at this point in the history
Introduction of more inference, broken type guessing, stupid exotic crypto,
and bugs.
  • Loading branch information
fungiboletus committed Jan 20, 2024
1 parent dff68a5 commit 8aa49fc
Show file tree
Hide file tree
Showing 20 changed files with 1,926 additions and 75 deletions.
667 changes: 659 additions & 8 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 10 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ edition = "2021"
anyhow = "1.0"
#async-stream = "0.3"
async-trait = "0.1"
axum = { version = "0.7" }
axum = { version = "0.7", features = ["multipart"] }
#axum-streams = { version = "0.12", features = ["json", "csv", "text"] }
#bytes = "1.5"
futures = "0.3"
Expand Down Expand Up @@ -37,3 +37,12 @@ serde_json = "1.0"
num-traits = "0.2.17"
hifitime = "3.9.0"
iso8601 = "0.6.1"
duckdb = "0.9.2"
config = "0.13.4"
serde = "1.0.195"
confique = "0.2.5"
once_cell = "1.19.0"
byte-unit = "5.1.3"
hex = "0.4.3"
blake3 = "1.5.0"
regex = "1.10.2"
60 changes: 60 additions & 0 deletions src/bus/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ impl WaitForAll {
let mut nb_started = self.nb_started.lock().await;
*nb_started += 1;
}
let nb_started_clone = self.nb_started.clone();
let nb_finished_clone = self.nb_finished.clone();
let finished_sender_clone = self.finished_sender.clone();

Expand All @@ -38,6 +39,13 @@ impl WaitForAll {
*nb_finished += 1;
}
if !finished_sender_clone.is_closed() && finished_sender_clone.receiver_count() > 0 {
{
let nb_started = nb_started_clone.lock().await;
let nb_finished = nb_finished_clone.lock().await;
if *nb_started != *nb_finished {
return;
}
}
let _ = finished_sender_clone.broadcast(()).await;
}
});
Expand All @@ -55,3 +63,55 @@ impl WaitForAll {
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn test_wait_for_all() {
let mut wfa = WaitForAll::new();

let (s1, r1) = async_broadcast::broadcast(1);
let (s2, r2) = async_broadcast::broadcast(1);

wfa.add(r1).await;
wfa.add(r2).await;

let s2_clone = s2.clone();

tokio::spawn(async move {
println!("Waiting for s1");
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
println!("Broadcasting s1");
s1.broadcast(()).await.unwrap();
});

tokio::spawn(async move {
println!("Waiting for s2");
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
println!("Broadcasting s2");
s2.broadcast(()).await.unwrap();
});

println!("Waiting for all");
wfa.wait().await.unwrap();
println!("done");

// Should return fast since it's already finished
wfa.wait().await.unwrap();

// What happens now ?
assert!(s2_clone.broadcast(()).await.is_err());
}

#[tokio::test]
async fn test_without_waiting() {
let mut wfa = WaitForAll::new();

let (s1, r1) = async_broadcast::broadcast(1);
wfa.add(r1).await;
s1.broadcast(()).await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
120 changes: 120 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
use anyhow::Error;
use confique::Config;
use once_cell::sync::OnceCell;
use std::{net::IpAddr, sync::Arc};

#[derive(Debug, Config)]
pub struct SensAppConfig {
#[config(env = "SENSAPP_PORT", default = 3000)]
pub port: u16,
#[config(env = "SENSAPP_ENDPOINT", default = "127.0.0.1")]
pub endpoint: IpAddr,

#[config(env = "SENSAPP_HTTP_BODY_LIMIT", default = "10mb")]
pub http_body_limit: String,

#[config(env = "SENSAPP_MAX_INFERENCES_ROWS", default = 128)]
pub max_inference_rows: usize,

#[config(env = "SENSAPP_BATCH_SIZE", default = 8192)]
pub batch_size: usize,
}

impl SensAppConfig {
pub fn load() -> Result<SensAppConfig, Error> {
let c = SensAppConfig::builder()
.env()
.file("settings.toml")
.load()?;

Ok(c)
}

pub fn parse_http_body_limit(&self) -> Result<usize, Error> {
let size = byte_unit::Byte::parse_str(self.http_body_limit.clone(), true)?.as_u64();
if size > 128 * 1024 * 1024 * 1024 {
anyhow::bail!("Body size is too big: > 128GB");
}
Ok(size as usize)
}
}

static SENSAPP_CONFIG: OnceCell<Arc<SensAppConfig>> = OnceCell::new();

pub fn set(config: Arc<SensAppConfig>) -> Result<(), Error> {
match SENSAPP_CONFIG.set(config) {
Ok(_) => Ok(()),
Err(e) => Err(Error::msg(format!("Failed to set configuration: {:?}", e))),
}
}

pub fn get() -> Result<Arc<SensAppConfig>, Error> {
SENSAPP_CONFIG.get().cloned().ok_or_else(|| {
Error::msg(
"Configuration not loaded. Please call load_configuration() before using the configuration",
)
})
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_load_config() {
let config = SensAppConfig::load().unwrap();

assert_eq!(config.port, 3000);
assert_eq!(config.endpoint, IpAddr::from([127, 0, 0, 1]));

// set env PORT
std::env::set_var("SENSAPP_PORT", "8080");
let config = SensAppConfig::load().unwrap();
assert_eq!(config.port, 8080);
}

#[test]
fn test_parse_http_body_limit() {
let config = SensAppConfig::load().unwrap();
assert_eq!(config.parse_http_body_limit().unwrap(), 10000000);

std::env::set_var("SENSAPP_HTTP_BODY_LIMIT", "12345");
let config = SensAppConfig::load().unwrap();
assert_eq!(config.parse_http_body_limit().unwrap(), 12345);

std::env::set_var("SENSAPP_HTTP_BODY_LIMIT", "10m");
let config = SensAppConfig::load().unwrap();
assert_eq!(config.parse_http_body_limit().unwrap(), 10000000);

std::env::set_var("SENSAPP_HTTP_BODY_LIMIT", "10mb");
let config = SensAppConfig::load().unwrap();
assert_eq!(config.parse_http_body_limit().unwrap(), 10000000);

std::env::set_var("SENSAPP_HTTP_BODY_LIMIT", "10MiB");
let config = SensAppConfig::load().unwrap();
assert_eq!(config.parse_http_body_limit().unwrap(), 10485760);

std::env::set_var("SENSAPP_HTTP_BODY_LIMIT", "1.5gb");
let config = SensAppConfig::load().unwrap();
assert_eq!(config.parse_http_body_limit().unwrap(), 1500000000);

std::env::set_var("SENSAPP_HTTP_BODY_LIMIT", "1tb");
let config = SensAppConfig::load().unwrap();
assert!(config.parse_http_body_limit().is_err());

std::env::set_var("SENSAPP_HTTP_BODY_LIMIT", "-5mb");
let config = SensAppConfig::load().unwrap();
assert!(config.parse_http_body_limit().is_err());
}

#[test]
fn test_set_get() {
assert!(SENSAPP_CONFIG.get().is_none());
let config = SensAppConfig::load().unwrap();
set(Arc::new(config)).unwrap();
assert!(SENSAPP_CONFIG.get().is_some());

let config = get().unwrap();
assert_eq!(config.port, 3000);
}
}
56 changes: 51 additions & 5 deletions src/http/server.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
use super::state::HttpServerState;
use crate::config;
use crate::importers::csv::publish_csv_async;
use crate::name_to_uuid::name_to_uuid;
use anyhow::Result;
use axum::extract::DefaultBodyLimit;
use axum::extract::Multipart;
use axum::extract::Path;
use axum::extract::State;
use axum::http::header;
use axum::http::StatusCode;
Expand Down Expand Up @@ -28,9 +34,6 @@ use tower_http::trace;
use tower_http::{timeout::TimeoutLayer, trace::TraceLayer, ServiceBuilderExt};
use tracing::Level;

use super::state::HttpServerState;
use crate::importers::csv::publish_csv_async;

// Anyhow error handling with axum
// https://github.com/tokio-rs/axum/blob/d3112a40d55f123bc5e65f995e2068e245f12055/examples/anyhow-error-response/src/main.rs
struct AppError(anyhow::Error);
Expand All @@ -53,6 +56,8 @@ where
}

pub async fn run_http_server(state: HttpServerState, address: SocketAddr) -> Result<()> {
let max_body_layer = DefaultBodyLimit::max(config::get()?.parse_http_body_limit()?);

// Initialize tracing
tracing_subscriber::fmt()
.with_target(false)
Expand Down Expand Up @@ -80,10 +85,17 @@ pub async fn run_http_server(state: HttpServerState, address: SocketAddr) -> Res
.route("/", get(handler))
.route(
"/publish",
post(publish_handler).layer(DefaultBodyLimit::max(1024 * 1024 * 1024)),
post(publish_handler).layer(max_body_layer.clone()),
)
//.route("/publish_stream", post(publish_stream_handler))
.route("/publish_csv", post(publish_csv))
.route(
"/sensors/:sensor_name_or_uuid/publish_csv",
post(publish_csv),
)
.route(
"/sensors/:sensor_name_or_uuid/publish_multipart",
post(publish_multipart).layer(max_body_layer.clone()),
)
.route("/fail", get(test_fail))
.layer(middleware)
.with_state(state);
Expand All @@ -106,8 +118,11 @@ async fn handler(State(state): State<HttpServerState>) -> Result<Json<String>, A

async fn publish_csv(
State(state): State<HttpServerState>,
Path(sensor_name_or_uuid): Path<String>,
body: axum::body::Body,
) -> Result<String, AppError> {
// let uuid = name_to_uuid(sensor_name_or_uuid.as_str())?;
// Convert the body in a stream
let stream = body.into_data_stream();
let stream = stream.map_err(|err| io::Error::new(io::ErrorKind::Other, err));
let reader = stream.into_async_read();
Expand Down Expand Up @@ -155,3 +170,34 @@ async fn publish_handler(bytes: Bytes) -> Result<Json<String>, (StatusCode, Stri
)),
}
}

async fn publish_multipart(mut multipart: Multipart) -> Result<Json<String>, (StatusCode, String)> {
Ok(Json("ok".to_string()))
}

#[cfg(test)]
mod tests {
use axum::{body::Body, http::Request};
use tower::ServiceExt;

use super::*;
use crate::bus::EventBus;

#[tokio::test]
async fn test_handler() {
let state = HttpServerState {
name: "hello world".to_string(),
event_bus: Arc::new(EventBus::init("test".to_string())),
};
let app = Router::new().route("/", get(handler)).with_state(state);
let request = Request::builder().uri("/").body(Body::empty()).unwrap();

let response = app.oneshot(request).await.unwrap();

assert_eq!(response.status(), StatusCode::OK);
use axum::body::to_bytes;
let body_str =
String::from_utf8(to_bytes(response.into_body(), 128).await.unwrap().to_vec()).unwrap();
assert_eq!(body_str, "\"hello world\"");
}
}
8 changes: 4 additions & 4 deletions src/importers/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub async fn publish_csv_async<R: io::AsyncRead + Unpin + Send>(

let mut current_samples: Vec<Sample<i64>> = vec![];

let mut toto = WaitForAll::new();
let mut all_batches_waiter = WaitForAll::new();

let mut i = 0;

Expand All @@ -43,7 +43,7 @@ pub async fn publish_csv_async<R: io::AsyncRead + Unpin + Send>(
let sync_receiver = event_bus.publish(batch).await?;
//sync_receiver.activate().recv().await?;
current_samples = vec![];
toto.add(sync_receiver.activate()).await;
all_batches_waiter.add(sync_receiver.activate()).await;
}
}

Expand All @@ -54,11 +54,11 @@ pub async fn publish_csv_async<R: io::AsyncRead + Unpin + Send>(
samples: Arc::new(TypedSamples::Integer(current_samples)),
};
let sync_receiver = event_bus.publish(batch).await?;
toto.add(sync_receiver.activate()).await;
all_batches_waiter.add(sync_receiver.activate()).await;
}

// Wololo ??
toto.wait().await?;
all_batches_waiter.wait().await?;

println!("Done reading CSV");
Ok(())
Expand Down
Loading

0 comments on commit 8aa49fc

Please sign in to comment.