Skip to content

Commit

Permalink
Move configuration of blobs base path into client
Browse files Browse the repository at this point in the history
  • Loading branch information
adzialocha committed Sep 6, 2023
1 parent a404d0c commit 6de1052
Show file tree
Hide file tree
Showing 12 changed files with 48 additions and 59 deletions.
2 changes: 1 addition & 1 deletion aquadoggo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ sqlx = { version = "0.6.1", features = [
"sqlite",
"runtime-tokio-rustls",
] }
tempfile = "3.7.0"
thiserror = "1.0.39"
tokio = { version = "1.28.2", features = [
"macros",
Expand Down Expand Up @@ -111,5 +110,6 @@ rstest = "0.15.0"
rstest_reuse = "0.3.0"
serde_bytes = "0.11.12"
serde_json = "1.0.85"
tempfile = "3.7.0"
tower = "0.4.13"
tower-service = "0.3.2"
11 changes: 4 additions & 7 deletions aquadoggo/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ use p2panda_rs::schema::SchemaId;

use crate::network::NetworkConfiguration;

/// Blobs directory name.
pub const BLOBS_DIR_NAME: &str = "blobs";

/// Configuration object holding all important variables throughout the application.
#[derive(Debug, Clone)]
pub struct Configuration {
Expand All @@ -26,9 +23,6 @@ pub struct Configuration {
/// _not_ recommended for production settings.
pub allow_schema_ids: AllowList<SchemaId>,

/// Path to blobs directory.
pub blob_dir: Option<PathBuf>,

/// URL / connection string to PostgreSQL or SQLite database.
pub database_url: String,

Expand All @@ -44,6 +38,9 @@ pub struct Configuration {
/// 2020.
pub http_port: u16,

/// Path to folder where blobs (binary files) are kept and served from.
pub blobs_base_path: PathBuf,

/// Number of concurrent workers which defines the maximum of materialization tasks which can
/// be worked on simultaneously.
///
Expand All @@ -59,10 +56,10 @@ impl Default for Configuration {
fn default() -> Self {
Self {
allow_schema_ids: AllowList::Wildcard,
blob_dir: None,
database_url: "sqlite::memory:".into(),
database_max_connections: 32,
http_port: 2020,
blobs_base_path: PathBuf::new(),
worker_pool_size: 16,
network: NetworkConfiguration::default(),
}
Expand Down
8 changes: 4 additions & 4 deletions aquadoggo/src/http/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub async fn handle_blob_document(
return Err(BlobHttpError::NotFound);
}

respond_with_blob(if_none_match, context.blob_dir_path, document).await
respond_with_blob(if_none_match, context.blobs_base_path, document).await
}

/// Handle requests for a blob document view served via HTTP.
Expand All @@ -87,15 +87,15 @@ pub async fn handle_blob_view(
return Err(BlobHttpError::NotFound);
}

respond_with_blob(if_none_match, context.blob_dir_path, document).await
respond_with_blob(if_none_match, context.blobs_base_path, document).await
}

/// Returns HTTP response with the contents, ETag and given MIME type of a blob.
///
/// Supports basic caching by handling "IfNoneMatch" headers matching the latest ETag.
async fn respond_with_blob(
if_none_match: IfNoneMatch,
blob_dir_path: PathBuf,
blobs_base_path: PathBuf,
document: impl AsDocument,
) -> Result<Response, BlobHttpError> {
let view_id = document.view_id();
Expand All @@ -120,7 +120,7 @@ async fn respond_with_blob(
}?;

// Get body from read-stream of stored file on file system
let mut file_path = blob_dir_path;
let mut file_path = blobs_base_path;
file_path.push(format!("{view_id}"));
match File::open(&file_path).await {
Ok(file) => {
Expand Down
8 changes: 4 additions & 4 deletions aquadoggo/src/http/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@ pub struct HttpServiceContext {
/// Dynamic GraphQL schema manager.
pub schema: GraphQLSchemaManager,

/// Path of the directory where blobs should be served from
pub blob_dir_path: PathBuf,
/// Path of the directory where blobs should be served from.
pub blobs_base_path: PathBuf,
}

impl HttpServiceContext {
pub fn new(store: SqlStore, schema: GraphQLSchemaManager, blob_dir_path: PathBuf) -> Self {
pub fn new(store: SqlStore, schema: GraphQLSchemaManager, blobs_base_path: PathBuf) -> Self {
Self {
store,
schema,
blob_dir_path,
blobs_base_path,
}
}
}
7 changes: 3 additions & 4 deletions aquadoggo/src/http/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,13 @@ pub async fn http_service(
let graphql_schema_manager =
GraphQLSchemaManager::new(context.store.clone(), tx, context.schema_provider.clone()).await;

let blob_dir_path = context.config.blob_dir.as_ref().expect("Base path not set");
let blobs_base_path = &context.config.blobs_base_path;

// Introduce a new context for all HTTP routes
let http_context = HttpServiceContext::new(
context.store.clone(),
graphql_schema_manager,
blob_dir_path.to_owned(),
blobs_base_path.to_owned(),
);

// Start HTTP server with given port and re-attempt with random port if it was taken already
Expand Down Expand Up @@ -105,7 +105,6 @@ mod tests {
use serde_json::json;
use tokio::sync::broadcast;

use crate::config::BLOBS_DIR_NAME;
use crate::graphql::GraphQLSchemaManager;
use crate::http::context::HttpServiceContext;
use crate::schema::SchemaProvider;
Expand All @@ -124,7 +123,7 @@ mod tests {
let context = HttpServiceContext::new(
node.context.store.clone(),
graphql_schema_manager,
BLOBS_DIR_NAME.into(),
node.context.config.blobs_base_path.into(),
);
let client = TestClient::new(build_server(context));

Expand Down
17 changes: 8 additions & 9 deletions aquadoggo/src/materializer/tasks/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub async fn blob_task(context: Context, input: TaskInput) -> TaskResult<TaskInp

// Materialize all updated blobs to the filesystem.
for blob_document in updated_blobs.iter() {
// Get the raw blob data.
// Get the raw blob data
let blob_data = context
.store
.get_blob_by_view_id(blob_document.view_id())
Expand All @@ -73,14 +73,13 @@ pub async fn blob_task(context: Context, input: TaskInput) -> TaskResult<TaskInp
.map_err(|err| TaskError::Failure(err.to_string()))?
.expect("Blob data exists at this point");

// Compose, and when needed create, the path for the blob file.
let base_path = match &context.config.blob_dir {
Some(base_path) => base_path,
None => return Err(TaskError::Critical("No base path configured".to_string())),
};
let blob_view_path = base_path.join(blob_document.view_id().to_string());
// Compose, and when needed create, the path for the blob file
let blob_view_path = context
.config
.blobs_base_path
.join(blob_document.view_id().to_string());

// Write the blob to the filesystem.
// Write the blob to the filesystem
info!("Creating blob at path {}", blob_view_path.display());

let mut file = File::create(&blob_view_path).await.map_err(|err| {
Expand Down Expand Up @@ -175,7 +174,7 @@ mod tests {
assert!(result.unwrap().is_none());

// Construct the expected path to the blob view file
let base_path = node.context.config.blob_dir.as_ref().unwrap();
let base_path = node.context.config.blobs_base_path;
let blob_path = base_path.join(blob_view_id.to_string());

// Read from this file
Expand Down
15 changes: 2 additions & 13 deletions aquadoggo/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@

use anyhow::Result;
use p2panda_rs::identity::KeyPair;
use tempfile::TempDir;
use tokio::fs;

use crate::bus::ServiceMessage;
use crate::config::{Configuration, BLOBS_DIR_NAME};
use crate::config::Configuration;
use crate::context::Context;
use crate::db::SqlStore;
use crate::db::{connection_pool, create_database, run_pending_migrations, Pool};
Expand Down Expand Up @@ -47,7 +45,7 @@ pub struct Node {
impl Node {
/// Start p2panda node with your configuration. This method can be used to run the node within
/// other applications.
pub async fn start(key_pair: KeyPair, mut config: Configuration) -> Self {
pub async fn start(key_pair: KeyPair, config: Configuration) -> Self {
// Initialize database and get connection pool
let pool = initialize_db(&config)
.await
Expand All @@ -64,15 +62,6 @@ impl Node {
let schema_provider =
SchemaProvider::new(application_schema, config.allow_schema_ids.clone());

// Create temporary dirs for blob storage.
//
// @TODO: Implement configuring this path for persistent storage, see related issue:
// https://github.com/p2panda/aquadoggo/issues/542
let tmp_dir = TempDir::new().unwrap();
let blob_dir_path = tmp_dir.path().join(BLOBS_DIR_NAME);
fs::create_dir_all(&blob_dir_path).await.unwrap();
config.blob_dir = Some(blob_dir_path);

// Create service manager with shared data between services
let context = Context::new(store, key_pair, config, schema_provider);
let mut manager =
Expand Down
11 changes: 5 additions & 6 deletions aquadoggo/src/test_utils/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use p2panda_rs::identity::KeyPair;
use tokio::runtime::Builder;
use tokio::sync::Mutex;

use crate::config::BLOBS_DIR_NAME;
use crate::context::Context;
use crate::db::Pool;
use crate::db::SqlStore;
Expand Down Expand Up @@ -104,19 +103,19 @@ pub fn test_runner<F: AsyncTestFn + Send + Sync + 'static>(test: F) {

// Construct temporary directory for the test runner
let tmp_dir = tempfile::TempDir::new().unwrap();
let blob_dir_path = tmp_dir.path().join(BLOBS_DIR_NAME);
fs::create_dir_all(&blob_dir_path).unwrap();
let blobs_base_path = tmp_dir.path().join("blobs");
fs::create_dir_all(&blobs_base_path).unwrap();

// Construct node config supporting any schema
let mut cfg = Configuration::default();
cfg.blob_dir = Some(blob_dir_path);
let mut config = Configuration::default();
config.blobs_base_path = blobs_base_path;

// Construct the actual test node
let node = TestNode {
context: Context::new(
store.clone(),
KeyPair::new(),
cfg,
config,
SchemaProvider::default(),
),
};
Expand Down
6 changes: 2 additions & 4 deletions aquadoggo_cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,13 @@ figment = { version = "0.10.10", features = ["toml", "env"] }
hex = "0.4.3"
libp2p = "0.52.0"
log = "0.4.20"
p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "17f4fcb1dcf7cebabd6d9b5a824399e9384d96b2" }
path-clean = "1.0.1"
serde = { version = "1.0.185", features = ["serde_derive"] }
p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "17f4fcb1dcf7cebabd6d9b5a824399e9384d96b2" }
tempfile = "3.7.0"
tokio = { version = "1.28.2", features = ["full"] }
toml = "0.7.6"

[dependencies.aquadoggo]
version = "~0.5.0"
path = "../aquadoggo"

[dev-dependencies]
tempfile = "3.4.0"
14 changes: 13 additions & 1 deletion aquadoggo_cli/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ pub struct Configuration {
pub database_max_connections: u32,
pub http_port: u16,
pub quic_port: u16,
pub blobs_base_path: Option<PathBuf>,
pub private_key: Option<PathBuf>,
pub mdns: bool,
pub direct_node_addresses: Vec<SocketAddr>,
Expand All @@ -286,6 +287,7 @@ impl Default for Configuration {
database_max_connections: 32,
http_port: 2020,
quic_port: 2022,
blobs_base_path: None,
mdns: true,
private_key: None,
direct_node_addresses: vec![],
Expand Down Expand Up @@ -338,12 +340,22 @@ impl TryFrom<Configuration> for NodeConfiguration {
}
};

// Create a temporary blobs directory when none was given
let blobs_base_path = match value.blobs_base_path {
Some(path) => path,
None => {
let tmp_dir = tempfile::TempDir::new()
.map_err(|_| anyhow!("Could not create temporary directory to store blobs"))?;
tmp_dir.path().to_path_buf()
}
};

Ok(NodeConfiguration {
allow_schema_ids,
blob_dir: None,
database_url: value.database_url,
database_max_connections: value.database_max_connections,
http_port: value.http_port,
blobs_base_path,
worker_pool_size: value.worker_pool_size,
network: NetworkConfiguration {
quic_port: value.quic_port,
Expand Down
6 changes: 1 addition & 5 deletions aquadoggo_cli/src/key_pair.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

use std::fs::{self, File};
use std::fs::File;
use std::io::{Read, Write};
#[cfg(target_os = "unix")]
use std::os::unix::fs::PermissionsExt;
Expand Down Expand Up @@ -39,8 +39,6 @@ pub fn generate_ephemeral_key_pair() -> KeyPair {
fn save_key_pair_to_file(key_pair: &KeyPair, path: PathBuf) -> Result<()> {
let private_key_hex = hex::encode(key_pair.private_key().as_bytes());

// Make sure that directories exist and write file into it
fs::create_dir_all(path.parent().unwrap())?;
let mut file = File::create(&path)?;
file.write_all(private_key_hex.as_bytes())?;
file.sync_all()?;
Expand All @@ -57,8 +55,6 @@ fn save_key_pair_to_file(key_pair: &KeyPair, path: PathBuf) -> Result<()> {
fn save_key_pair_to_file(key_pair: &KeyPair, path: PathBuf) -> Result<()> {
let private_key_hex = hex::encode(key_pair.private_key().as_bytes());

// Make sure that directories exist and write file into it
fs::create_dir_all(path.parent().unwrap())?;
let mut file = File::create(&path)?;
file.write_all(private_key_hex.as_bytes())?;
file.sync_all()?;
Expand Down
2 changes: 1 addition & 1 deletion aquadoggo_cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async fn main() -> anyhow::Result<()> {
// Load configuration from command line arguments, environment variables and .toml file
let (config_file_path, config) = load_config().context("Could not load configuration")?;

// Set log verbosity based on config. By default scope it always to the "aquadoggo" module.
// Set log verbosity based on config. By default scope it always to the "aquadoggo" module
let mut builder = env_logger::Builder::new();
let builder = match LevelFilter::from_str(&config.log_level) {
Ok(log_level) => builder.filter(Some("aquadoggo"), log_level),
Expand Down

0 comments on commit 6de1052

Please sign in to comment.