Skip to content

Commit

Permalink
feat: 🌈 Work in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
fungiboletus committed Feb 24, 2024
1 parent f7a650b commit f7666ba
Show file tree
Hide file tree
Showing 36 changed files with 2,252 additions and 408 deletions.
53 changes: 53 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,14 @@ futures = "0.3"
#http-body = "1.0"
#http-body-util = "0.1"
polars = { version = "0.37" }
sqlx = { version = "0.7", features = ["runtime-tokio", "sqlite"] }
sqlx = { version = "0.7", features = [
"runtime-tokio",
"sqlite",
"postgres",
"uuid",
"json",
"time",
] }
tokio = { version = "1.35", features = ["full"] }
tokio-stream = { version = "0.1", features = ["io-util"] }
tokio-util = "0.7"
Expand Down
4 changes: 2 additions & 2 deletions docs/DATAMODEL.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ erDiagram
Boolean value
}
%% Localisations are common enough to be part of the core data model
%% Locations are common enough to be part of the core data model
LOCATION_VALUES {
UUID sensor
DateTime datetime
Expand Down Expand Up @@ -177,7 +177,7 @@ Using a dictionary improves the performances in most cases. However if many stri
In practice, we expect sensors to not generate unique distinct strings all the time, so using a dictionary should be a good idea for the majority of use cases.

## Geolocalisation and Coordinates Systems
## Geolocation and Coordinates Systems

In the current version, the geolocalised data doesn't really mind the coordinate system used. The data is likely going to use WGS84, but it could be ETRS89 or something else. It's up to the publisher and the consumer to agree on the coordinate system used, for now.

Expand Down
159 changes: 159 additions & 0 deletions docs/INFLUX_DB.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
## SensApp and InfluxDB

SensApp can be used instead of InfluxDB or aside it. InfluxDB is a time-series database that is widely used in the IoT and monitoring space. It is the most popular time-series database according to the [DB-Engines Ranking](https://db-engines.com/en/ranking/time+series+dbms).

Data sources can write data to SensApp as if it was InfluxDB. SensApp does **not** support writing data to InfluxDB, and does **not** support the InfluxDB query language or any other advanced InfluxDB features.

## How is the compatibility achieved?

SensApp is compatible with the [InfluxDB line protocol](https://docs.influxdata.com/influxdb/v2/reference/syntax/line-protocol/). It actually uses the [InfluxDB line protocol parser](https://crates.io/crates/influxdb-line-protocol) that InfluxDB v3 conveniently provides as a standalone Rust™️ crate/library.

SensApp exposes the same [InfluxDB v2 Writing API](https://docs.influxdata.com/influxdb/v2/api/#operation/PostWrite) as InfluxDB, so if your application is already writing data to InfluxDB, you can easily switch to SensApp by updating the URL and credentials.

The Writing API is the **only** compatible API.

## Using SensApp instead of InfluxDB

For writing data to SensApp, you can use the same API as InfluxDB v2. The only difference is the URL and the credentials.

If you use [Telegraf](https://www.influxdata.com/time-series-platform/telegraf/) for example, you can specify the SensAPP URL and credentials in the `[[outputs.influxdb_v2]]` section of the configuration file.

```toml
[[outputs.influxdb_v2]]
urls = ["http://sensapp:3000"]
token = "your-sensapp-token-if-needed"
organization = "your-sensapp-org"
bucket = "your-sensapp-bucket"
content_encoding = "gzip"
influx_uint_support = true
```

## Using SensApp and InfluxDB

You may prefer to keep using InfluxDB aside SensApp. InfluxDB performs pretty well for data with no long-term retention for example. You may also have applications or data pipelines relying on InfluxDB that you don't want to change.

In this case, you have a few options:

- Update your data sources to write to both SensApp and InfluxDB.
- Use an intermediate HTTP(s) server that provides an InfluxDB-compatible API and then writes to both SensApp and InfluxDB. [Telegraf](https://www.influxdata.com/time-series-platform/telegraf/) with its `inputs.influxdb_v2_listener` can be used for this purpose.
- Use the [InfluxDB replication feature](https://docs.influxdata.com/influxdb/v2/write-data/replication/replicate-data/) to replicate data to SensApp.

### Using Telegraf as a proxy/replicator

```toml
[[inputs.influxdb_v2_listener]]
service_address = ":8086"

[[outputs.influxdb_v2]]
urls = ["http://influxdb:8086"]
token = "your-influxdb-token"
organization = "your-influxdb-org"
bucket = "your-influxdb-bucket"
content_encoding = "gzip"
influx_uint_support = true

[[outputs.influxdb_v2]]
urls = ["http://sensapp:3000"]
token = "your-sensapp-token-if-needed"
organization = "your-sensapp-org"
bucket = "your-sensapp-bucket"
content_encoding = "gzip"
influx_uint_support = true
```

```bash
telegraf --config telegraf.conf
```

### How to Setup InfluxDB Replication to SensApp

Make sure that the remoteURL is reachable from the InfluxDB instance. If you run InfluxDB in Docker for example, remember that the container network is not the same as the host network by default.

Create the SensApp remote in InfluxDB:
```bash
curl --request POST \
--url http://influxdb:8086/api/v2/remotes \
--header 'Authorization: Bearer $YOUR_SECRET_TOKEN' \
--header 'Content-Type: application/json' \
--data '{
"remoteURL": "http://sensapp:3000",
"name": "sensapp-remote",
"description": "Example of SensApp Remote",
"orgID": "your-influxdb-org-id",
"remoteOrgID": "remote-sensapp-org-id",
"remoteAPIToken": "if-you-need-it"
}'
```
```json
{
"id": "created-sensapp-remote-id",
"orgID": "your-influxdb-org-id",
"name": "sensapp-remote",
"description": "Example of SensApp Remote",
"remoteURL": "http://influxdb:3000",
"remoteOrgID": "remote-sensapp-org-id",
"allowInsecureTLS": false
}
```

Create a bucket replication to SensApp:
```bash
curl --request POST \
--url http://influxdb:8086/api/v2/replications \
--header 'Authorization: Bearer $YOUR_SECRET_TOKEN' \
--header 'Content-Type: application/json' \
--data '{
"name": "sensapp-replication",
"description": "Example of SensApp Replication",
"localBucketID": "influxdb-bucket-id",
"orgID": "your-influxdb-org-id",
"remoteBucketName": "sensapp-bucket-name",
"remoteBucketID": "sensapp-bucket-id",
"remoteID": "created-sensapp-remote-id",
"maxAgeSeconds": 604800
}'
```
```json
{
"id": "created-sensapp-replication-id",
"orgID": "your-influxdb-org-id",
"name": "sensapp-replication",
"description": "Example of SensApp Replication",
"remoteID":"created-sensapp-remote-id",
"localBucketID":"influxdb-bucket-id",
"remoteBucketID":"sensapp-bucket-id",
"RemoteBucketName": "",
"maxQueueSizeBytes": 67108860,
"currentQueueSizeBytes": 0,
"remainingBytesToBeSynced": 0,
"dropNonRetryableData": false,
"maxAgeSeconds": 604800
}
```

Fetch the replication status:
```bash
curl --request GET \
--url http://localhost:8086/api/v2/replications/created-sensapp-replication-id \
--header 'Authorization: Bearer $YOUR_SECRET_TOKEN'
```
```json
{
"id": "created-sensapp-replication-id",
"orgID": "your-influxdb-org-id",
"name": "sensapp-replication",
"description": "Example of SensApp Replication",
"remoteID":"created-sensapp-remote-id",
"localBucketID":"influxdb-bucket-id",
"remoteBucketID":"sensapp-bucket-id",
"RemoteBucketName": "",
"maxQueueSizeBytes": 67108860,
"dropNonRetryableData": false,
"maxAgeSeconds": 604800,

// Important status fields:
"currentQueueSizeBytes": 18894,
"remainingBytesToBeSynced": 0,
"latestResponseCode": 204,
"latestErrorMessage": ""
}
19 changes: 0 additions & 19 deletions src/bus/event_bus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,6 @@ use std::sync::Arc;
#[derive(Debug)]
pub struct EventBus {
pub name: String,
//main_bus_sender: mpsc::Sender<u8>,
//main_bus_receiver: mpsc::Receiver<u8>,
//pub main_bus_sender: async_channel::Sender<u8>,
//pub main_bus_receiver: async_channel::Receiver<u8>,
//pub main_bus_sender: tokio::sync::broadcast::Sender<u8>,
//pub main_bus_receiver: tokio::sync::broadcast::Receiver<u8>,
pub main_bus_sender: async_broadcast::Sender<Message>,
pub main_bus_receiver: async_broadcast::InactiveReceiver<Message>,
}
Expand All @@ -21,9 +15,6 @@ impl EventBus {
// Please note that the receiver is inactive by default as it may be cloned many times.
// Consider using .activate() or .activate_cloned() to activate it.
pub fn init(name: String) -> Self {
// let (tx, rx) = mpsc::channel(10);
//let (s, _) = tokio::sync::broadcast::channel::<u8>(1000);
//let (s, r) = async_broadcast::broadcast(128);
let (s, r) = async_broadcast::broadcast(128);
let r = r.deactivate();
Self {
Expand All @@ -34,8 +25,6 @@ impl EventBus {
}

async fn broadcast(&self, message: Message) -> Result<()> {
//self.main_bus_sender.send(event).await?;
//self.main_bus_sender.send(event)?;
self.main_bus_sender.broadcast(message).await?;
Ok(())
}
Expand All @@ -58,14 +47,6 @@ impl EventBus {

Ok(sync_receiver)
}

// receive
/*pub async fn receive_one(&mut self) -> Result<u8> {
self.main_bus_receiver
.recv()
.await
.map_err(|e| anyhow::anyhow!("Failed to receive event: {}", e))
}*/
}

pub fn init_event_bus() -> Arc<EventBus> {
Expand Down
14 changes: 0 additions & 14 deletions src/bus/wait_for_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,6 @@ impl WaitForAll {

Ok(())
}

pub fn get_finished_receiver(&self) -> InactiveReceiver<()> {
self.finished_receiver.clone()
}
}

#[cfg(test)]
Expand Down Expand Up @@ -118,14 +114,4 @@ mod tests {
s1.broadcast(()).await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}

#[tokio::test]
async fn test_get_finished_receiver() {
let mut wfa = WaitForAll::new();
let mut finished_receiver = wfa.get_finished_receiver().activate();
let (s1, r1) = async_broadcast::broadcast(1);
wfa.add(r1).await;
s1.broadcast(()).await.unwrap();
finished_receiver.recv().await.unwrap();
}
}
Loading

0 comments on commit f7666ba

Please sign in to comment.