We've got data flowing from collector(s) to the server. We aren't storing any data, and we aren't doing anything with it---but we're getting it there.
Let's start by addressing data storage.
The code for this is in
code/05_server/server_v2
. I'm breaking the development into chunks in the curriculum so you can see the progression.
I didn't want to ask everyone to install Influx (or another time-series database), which is the optimal way to handle this type of data. So instead, we're going to use SQLite.
If you don't already have it installed, add the SQLx command-line client:
cargo install sqlx-cli
Now we need to tell sqlx where our database is. Create a .env
file in the server
directory:
DATABASE_URL="sqlite:collection.db"
Create the database:
sqlx database create
And create an initial migration:
sqlx migrate add initial
Open the migration file in server/migrations/
and add the following SQL:
CREATE TABLE IF NOT EXISTS timeseries
(
id SERIAL PRIMARY KEY,
collector_id VARCHAR(255),
received TIMESTAMP,
total_memory UNSIGNED BIG INT,
used_memory UNSIGNED BIG INT,
average_cpu FLOAT
)
You can apply this (to verify your SQL) by running:
sqlx migrate run
The server will need SQLX with SQLite support added to its dependencies:
cargo add sqlx -F runtime-tokio-native-tls -F sqlite
We're going to want to validate our UUIDs and turn them into strings. Let's use the UUID crate for that:
cargo add uuid
We're also going to need dotenv
to read the .env
file:
cargo add dotenv
Now your dependency list looks like this:
tokio = { version = "1.28.2", features = ["full"] }
shared_v2 = { path = "../shared_v2" }
anyhow = "1.0.71"
sqlx = { version = "0.6.3", features = ["runtime-tokio-native-tls", "sqlite"] }
uuid = "1.3.3"
dotenv = "0.15.0"
In main.rs
, add the following:
// Read the .env file and obtain the database URL
dotenv::dotenv()?;
let db_url = std::env::var("DATABASE_URL")?;
// Get a database connection pool
let pool = sqlx::SqlitePool::connect(&db_url).await?;
Change your data_collector
function to accept a connection pool as a parameter. It then clones
it for each connection, and sends that to the connection handler.
pub async fn data_collector(cnn: Pool<Sqlite>) -> anyhow::Result<()> {
// Listen for TCP connections on the data collector address
let listener = TcpListener::bind(DATA_COLLECTOR_ADDRESS).await?;
// Loop forever, accepting connections
loop {
// Wait for a new connection
let cnn = cnn.clone();
let (socket, address) = listener.accept().await?;
tokio::spawn(new_connection(socket, address, cnn));
}
}
The connection handler gains some additional code to store your data as it arrives:
async fn new_connection(mut socket: TcpStream, address: SocketAddr, cnn: Pool<Sqlite>) {
let mut buf = vec![0u8; 1024];
loop {
let n = socket
.read(&mut buf)
.await
.expect("failed to read data from socket");
if n == 0 {
println!("No data received - connection closed");
return;
}
let received_data = decode_v1(&buf[0..n]);
match received_data {
(timestamp, CollectorCommandV1::SubmitData { collector_id, total_memory, used_memory, average_cpu_usage }) => {
let collector_id = uuid::Uuid::from_u128(collector_id);
let collector_id = collector_id.to_string();
let result = sqlx::query("INSERT INTO timeseries (collector_id, received, total_memory, used_memory, average_cpu) VALUES ($1, $2, $3, $4, $5)")
.bind(collector_id)
.bind(timestamp)
.bind(total_memory as i64)
.bind(used_memory as i64)
.bind(average_cpu_usage)
.execute(&cnn)
.await;
if result.is_err() {
println!("Error inserting data into the database: {result:?}");
}
}
}
}
}
We're still running one task per connection, and that's good. Tokio will balance load between tasks for us, and we won't accept another item of data until we're ready for it. We're still accepting new connections.
If you run the program now (and the collector!), the database will grow---data is arriving.
We'll use the Tokio project's Axum
web server once again.
Add Axum
to your server:
cargo add axum
Let's also add futures
for its stream helpers:
cargo add futures
And in main.rs
, we'll build a skeleton to start the web service.
use std::net::SocketAddr;
use axum::{Router, routing::get};
And to make a basic test work:
// Start the web server
let app = Router::new()
.route("/", get(test))
.layer(Extension(pool));
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.unwrap();
// Wait for the data collector to finish
handle.await??; // Two question marks - we're unwrapping the task result, and the result from running the collector.
Ok(())
}
async fn test() -> &'static str {
"Hello, world!"
}
Run the server now, and you can go to http://localhost:3000 and see the message.
Create a new file, src/api.rs
and add mod api;
to main.rs
.
Let's build a basic "show everything" function. Initially, we'll just print to the console:
use axum::Extension;
use sqlx::FromRow;
use futures::TryStreamExt;
#[derive(FromRow, Debug)]
pub struct DataPoint {
id: i32,
collector_id: String,
received: i64,
total_memory: i64,
used_memory: i64,
average_cpu: f32,
}
pub async fn show_all(Extension(pool): Extension<sqlx::SqlitePool>) {
let mut rows = sqlx::query_as::<_, DataPoint>("SELECT * FROM timeseries")
.fetch(&pool);
while let Some(row) = rows.try_next().await.unwrap() {
println!("{:?}", row);
}
}
In main.rs
, add a route to it:
let app = Router::new()
.route("/", get(test))
.route("/api/all", get(api::show_all))
.layer(Extension(pool));
Now run the server, and connect to http://localhost:3000/api/all. You should see a list of data points.
You'll need to add Serde with the derive
feature:
cargo add serde -F derive
Using serde
actually shortens our API call:
pub async fn show_all(Extension(pool): Extension<sqlx::SqlitePool>) -> Json<Vec<DataPoint>> {
let rows = sqlx::query_as::<_, DataPoint>("SELECT * FROM timeseries")
.fetch_all(&pool)
.await
.unwrap();
Json(rows)
}
And if you run the server now and connect to http://localhost:3000/api/all, you'll see a JSON response.
Let's add a few more commands to our API.
Let's list just the collectors we know about
#[derive(FromRow, Debug, Serialize)]
pub struct Collector {
id: i32,
collector_id: String,
last_seen: i64,
}
pub async fn show_collectors(Extension(pool): Extension<sqlx::SqlitePool>) -> Json<Vec<Collector>> {
const SQL: &str = "SELECT
DISTINCT(id) AS id,
collector_id,
(SELECT MAX(received) FROM timeseries WHERE collector_id = ts.collector_id) AS last_seen
FROM timeseries ts";
Json(sqlx::query_as::<_, Collector>(SQL)
.fetch_all(&pool)
.await
.unwrap())
}
Don't forget to add the route!
.route("/api/collectors", get(api::show_collectors))
Let's list the data points for a collector:
pub async fn collector_data(Extension(pool): Extension<sqlx::SqlitePool>, uuid: Path<String>) -> Json<Vec<DataPoint>> {
let rows = sqlx::query_as::<_, DataPoint>("SELECT * FROM timeseries WHERE collector_id = ? ORDER BY received")
.bind(uuid.as_str())
.fetch_all(&pool)
.await
.unwrap();
Json(rows)
}
We've used the Axum extractor path to get the collector ID from the URL. We've also used a parameterized query to avoid SQL injection.
Add the route: .route("/api/collector/:uuid", get(api::collector_data))
Run the server now, and you can go to an URL like this (copy the UUID from the collectors list):
http://localhost:3000/api/collector/a96331b8-5604-4f45-9217-8e797c5ce9ea
You'll see the data points for that collector.