Skip to content

Commit

Permalink
feat: 🌈 Work in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
fungiboletus committed Jan 26, 2024
1 parent 8aa49fc commit ddef3a2
Show file tree
Hide file tree
Showing 17 changed files with 431 additions and 83 deletions.
48 changes: 46 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,6 @@ byte-unit = "5.1.3"
hex = "0.4.3"
blake3 = "1.5.0"
regex = "1.10.2"
influxdb-line-protocol = "2.0.0"
flate2 = "1.0.28"
smallvec = "1.13.1"
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ It enables the handling of small time series data of the edge efficiently to lar

## Features

- **Flexible Time Series DataBase Storage**: Supports various time-series databases like SQLite, PostgreSQL (with optional TimeScaleDB plugin), and ClickHouse, with the potential to extend support to other databases in the future.
- **Flexible Time Series DataBase Storage**: Supports various time-series databases such as SQLite, PostgreSQL (with optional TimeScaleDB plugin), DuckDB, and ClickHouse, with the potential to extend support to other databases in the future.
- **Data Lake Storage**: Supports Parquet files over S3 compatible object stores for long-term time-series data storage.
- **Multiple Data Ingestion Protocols**: Easy data ingestion via HTTP REST API, MQTT, AMQP, KAFKA, OPCUA, and NATS.
- **Compatibility with Existing Pipelines**: Offers Prometheus Remote Write and InfluxDB line format support for seamless integration into existing sensor data pipelines.
Expand All @@ -33,6 +33,7 @@ Not only the language, it's also the extensive high quality open-source ecosyste
* [Axum](https://github.com/tokio-rs/axum) web framework
* [SQLx](https://github.com/launchbadge/sqlx) database driver
* [Polars](https://pola.rs) data frame library
* [nom](https://github.com/rust-bakery/nom) parser combinator library
* *and many more…*

## Contributing
Expand All @@ -56,4 +57,6 @@ We thank [the historical authors of SensApp](https://github.com/SINTEF/sensapp/g
SensApp is developed by
[SINTEF](https://www.sintef.no) ([Digital division](https://www.sintef.no/en/digital/), [Sustainable Communication Technologies department](https://www.sintef.no/en/digital/departments-new/department-of-sustainable-communication-technologies/), [Smart Data research group](https://www.sintef.no/en/expertise/digital/sustainable-communication-technologies/smart-data/)).

It is made possible thanks to the research and development of many research projects, founded notably by the [European Commission](https://ec.europa.eu/programmes/horizon2020/en) and the [Norwegian Research Council](https://www.forskningsradet.no/en/).

We also thank the open-source community for all the tools they create and maintain that allow SensApp to exist.
10 changes: 7 additions & 3 deletions docs/ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,23 +98,27 @@ We also support InfluxDB line protocol, and the Prometheus remote stores protoco
SensApp should support various storage backends. The best storage backend for time series has yet to exist.

* For small deployments, SQLite is used.
* It is possible to use Litestream to replicate and backup the SQLite database.
* For medium deployments, PostgreSQL is used.
* It is optional to use the TimescaleDB plugin or Citus Columnar.
* For larger deployments, ClickHouse is used.

* SensApp can also produce Parquet files stored in S3-compatible object stores.
* A experimental support for DuckDB is also available. The DuckDB database format isn't stable yet, and it may be wise to wait for the DuckDB 1.0 release before using it in production.

SensApp can use other storage backends in the future. Could it be Cassandra, Apache IoTDB, OpenTSDB, QuestDB, HoraeDB, or something new?

We base our storage on the findings of the paper [TSM-Bench: Benchmarking Time Series Database Systems for Monitoring Applications](https://dl.acm.org/doi/abs/10.14778/3611479.3611532) that shows that ClickHouse is a better choice than most databases for time series at scale, at the moment. Unfortunately, The paper didn't include IoTDB, and the new author doesn't like the JVM much, so IoTDB support is not a priority. Other databases are relatively new, and we favour the most popular ones for now.

SensApp also supports SQLite for small deployments and local persistence. The SQLite storage feature cannot scale to large deployments, but many deployments are small, and SQLite is a good choice for these.
SensApp also supports SQLite for small deployments and local persistence. The SQLite storage feature cannot scale to large deployments, but many deployments are small, and SQLite is a good choice for these. When combined with [Litestream](https://litestream.io/), SQLite can have some replication and backup capabilities.

PostgreSQL is also supported as it is the most popular database according to the [StackOverflow developer Survey 2023](https://survey.stackoverflow.co/2023/) and should provide a good compromise between performance and convenience. The choice between Vanilla PostgreSQL tables, TimeScaleDB bucketstyle (hyper) tables, or Citus columnar tables is left to the user.

Columnar storage with compression fits well with time series data, and a distributed Clickhouse cluster is the favoured choice for large deployments.

SensApp used to rely on MongoDB, as it was created during the NoSQL hype, but the performances were very poor for this use case.
DuckDB is a new database that is promising. It can be seen as some kind of columnar SQLite, for analytics. Once the DuckDB 1.0 release is out, it may be a good choice for small to medium deployments.

SensApp used to rely on MongoDB, as it was created during the NoSQL hype, but the performances were very poor for this use case. It was supposed to web scale, but it didn't scale well enough for time series data. Having a binary JSON (BSON) document per datapoint had a too significant overhead.

## Scalability

Expand All @@ -126,7 +130,7 @@ The publisher should have a mechanism to automatically retry when SensAPP return

SensApp should scale horizontally and not persist state on its own. It keeps relatively small buffers in memory to improve performances and relies on the storage backend to persist data. Publishers should consider the data as persisted once SensApp acknowledges it.

The storage layer should scale as well. SQLite on a network filesystem could work, but using a distributed storage backend is more advisable when one single database instance isn't enough.
The storage layer should scale as well. SQLite on a network filesystem or using Litestream may, but using another distributed storage backend is advised when one single database instance isn't enough.

It is essential to mention that horizontal scalability comes with a higher complexity and energy cost. Prefer vertical scalability when possible. In 2024, single database servers can handle high loads, with hundreds of cores, petabytes of storage, and terabytes of RAM.

Expand Down
34 changes: 25 additions & 9 deletions docs/DATAMODEL.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ SensApp distinguises between:
erDiagram
SENSORS {
String name UK "Name of the sensor"
UUID id PK "UUID (v7 by default) of the sensor"
TypeEnum type PK "The type of the sensor (integer, float, string, boolean), part of the primary key"
String name "Name of the sensor"
TypeEnum type "The type of the sensor (integer, float, string, boolean, etc…)"
Serial unit FK "The unit of the sensor, for documentation purposes, if provided"
}
Expand Down Expand Up @@ -90,20 +90,36 @@ erDiagram
Boolean value
}
SENSORS ||--o{ STRING_VALUES : ""
SENSORS ||--o{ INTEGER_VALUES : ""
SENSORS ||--o{ NUMERIC_VALUES : ""
SENSORS ||--o{ FLOAT_VALUES : ""
SENSORS ||--o{ LOCATION_VALUES : ""
SENSORS ||--o{ BOOLEAN_VALUES : ""
%% Localisations are common enough to be part of the core data model
LOCATION_VALUES {
UUID sensor
DateTime datetime
Float latitude
Float longitude
}
%% JSON values are not recommended, but they can be very convenient
JSON_VALUES {
UUID sensor
DateTime datetime
JSON value
}
BLOB_VALUES {
UUID sensor
DateTime datetime
Blob value
}
SENSORS ||--o{ STRING_VALUES : ""
SENSORS ||--o{ INTEGER_VALUES : ""
SENSORS ||--o{ NUMERIC_VALUES : ""
SENSORS ||--o{ FLOAT_VALUES : ""
SENSORS ||--o{ LOCATION_VALUES : ""
SENSORS ||--o{ BOOLEAN_VALUES : ""
SENSORS ||--o{ JSON_VALUES : ""
SENSORS ||--o{ BLOB_VALUES : ""
```

## Virtual Composite Sensors
Expand Down
3 changes: 2 additions & 1 deletion src/bus/message.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::datamodel::batch::Batch;
use std::sync::Arc;

#[derive(Debug, Clone)]
Expand All @@ -7,7 +8,7 @@ pub enum Message {

#[derive(Debug, Clone)]
pub struct PublishMessage {
pub batch: Arc<crate::datamodel::batch::Batch>,
pub batch: Arc<Batch>,
// A request sync message is sent to ask the storage backends
// to sync. This is done to ensure that the data is persisted.
//
Expand Down
3 changes: 3 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ pub struct SensAppConfig {

#[config(env = "SENSAPP_BATCH_SIZE", default = 8192)]
pub batch_size: usize,

#[config(env = "SENSAPP_SQLITE_CONNECTION_STRING")]
pub sqlite_connection_string: Option<String>,
}

impl SensAppConfig {
Expand Down
26 changes: 18 additions & 8 deletions src/datamodel/batch.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use smallvec::SmallVec;
use std::sync::Arc;

#[derive(Debug)]
Expand All @@ -6,20 +7,29 @@ pub struct Sample<V> {
pub value: V,
}

// Small vec size
const SMALLVEC_BUFFER_SIZE: usize = 1;

#[derive(Debug)]
pub enum TypedSamples {
Integer(Vec<Sample<i64>>),
Numeric(Vec<Sample<rust_decimal::Decimal>>),
Float(Vec<Sample<f64>>),
String(Vec<Sample<String>>),
Boolean(Vec<Sample<bool>>),
Location(Vec<Sample<geo::Point>>),
Blob(Vec<Sample<Vec<u8>>>),
Integer(SmallVec<[Sample<i64>; SMALLVEC_BUFFER_SIZE]>),
Numeric(SmallVec<[Sample<rust_decimal::Decimal>; SMALLVEC_BUFFER_SIZE]>),
Float(SmallVec<[Sample<f64>; SMALLVEC_BUFFER_SIZE]>),
String(SmallVec<[Sample<String>; SMALLVEC_BUFFER_SIZE]>),
Boolean(SmallVec<[Sample<bool>; SMALLVEC_BUFFER_SIZE]>),
Location(SmallVec<[Sample<geo::Point>; SMALLVEC_BUFFER_SIZE]>),
Blob(SmallVec<[Sample<Vec<u8>>; SMALLVEC_BUFFER_SIZE]>),
Json(SmallVec<[Sample<serde_json::Value>; SMALLVEC_BUFFER_SIZE]>),
}

#[derive(Debug)]
pub struct Batch {
pub struct SingleSensorBatch {
pub sensor_uuid: uuid::Uuid,
pub sensor_name: String,
pub samples: Arc<TypedSamples>,
}

#[derive(Debug)]
pub struct Batch {
pub sensor_batches: Arc<SmallVec<[SingleSensorBatch; SMALLVEC_BUFFER_SIZE]>>,
}
38 changes: 38 additions & 0 deletions src/http/app_error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::response::Response;
use axum::Json;
use serde_json::json;

// Anyhow error handling with axum
// https://github.com/tokio-rs/axum/blob/d3112a40d55f123bc5e65f995e2068e245f12055/examples/anyhow-error-response/src/main.rs
#[derive(Debug)]
pub enum AppError {
InternalServerError(anyhow::Error),
BadRequest(anyhow::Error),
}

impl IntoResponse for AppError {
fn into_response(self) -> Response {
let (status, message) = match self {
AppError::InternalServerError(error) => {
eprintln!("Internal Server Error: {}", error.backtrace());
(
StatusCode::INTERNAL_SERVER_ERROR,
"Internal Server Error".to_string(),
)
}
AppError::BadRequest(error) => (StatusCode::BAD_REQUEST, error.to_string()),
};
let body = Json(json!({ "error": message }));
(status, body).into_response()
}
}
impl<E> From<E> for AppError
where
E: Into<anyhow::Error>,
{
fn from(err: E) -> Self {
Self::InternalServerError(err.into())
}
}
Loading

0 comments on commit ddef3a2

Please sign in to comment.