Skip to content

Commit

Permalink
HTTP routes to serve files with correct content type and etag headers (
Browse files Browse the repository at this point in the history
…#544)

* Remove symlink directory

* Add handlers for blob http routes

* Fix typo in test utils

* Add routes for blob reponses

* Remove redundant return statements

* Add entry to CHANGELOG.md

* Update test since we store files now directly in blobs folder

* Remove unused dependencies and features

* Clean up a little bit, add comments

* Make add_blob test helper accept bytes

* Correct etag format and precondition check

* WIP ETag test

* Add more tests to check against content type and etag

* Use tokio for fs io
  • Loading branch information
adzialocha authored Sep 5, 2023
1 parent 6c5d477 commit a404d0c
Show file tree
Hide file tree
Showing 23 changed files with 662 additions and 296 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Serve static files from `blobs` directory [#480](https://github.com/p2panda/aquadoggo/pull/480)
- Add method to store for pruning document views [#491](https://github.com/p2panda/aquadoggo/pull/491)
- Introduce `BlobStore` [#484](https://github.com/p2panda/aquadoggo/pull/484)
- Task for automatic garbage collection of unused documents and views [#500](https://github.com/p2panda/aquadoggo/pull/500)

### Changed

- HTTP routes to serve files with correct content type headers [#544](https://github.com/p2panda/aquadoggo/pull/544)

## [0.5.0]

### Added
Expand Down
27 changes: 1 addition & 26 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions aquadoggo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async-graphql = { version = "5.0.6", features = ["dynamic-schema"] }
async-graphql-axum = "5.0.6"
async-trait = "0.1.64"
asynchronous-codec = { version = "0.6.2", features = ["cbor"] }
axum = "0.6.10"
axum = { version = "0.6.10", features = ["headers"] }
bamboo-rs-core-ed25519-yasmf = "0.1.1"
bs58 = "0.4.0"
deadqueue = { version = "0.2.3", default-features = false, features = [
Expand Down Expand Up @@ -76,11 +76,12 @@ tokio = { version = "1.28.2", features = [
"rt-multi-thread",
"sync",
"time",
"fs",
] }
tokio-stream = { version = "0.1.14", features = ["sync"] }
tokio-util = { version = "0.7.8", features = ["io"] }
tower-http = { version = "0.4.0", default-features = false, features = [
"cors",
"fs",
] }
triggered = "0.1.2"
void = "1.0.2"
Expand Down
4 changes: 1 addition & 3 deletions aquadoggo/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@ use p2panda_rs::schema::SchemaId;

use crate::network::NetworkConfiguration;

/// Blobs directory
/// Blobs directory name.
pub const BLOBS_DIR_NAME: &str = "blobs";

pub const BLOBS_SYMLINK_DIR_NAME: &str = "documents";

/// Configuration object holding all important variables throughout the application.
#[derive(Debug, Clone)]
pub struct Configuration {
Expand Down
54 changes: 27 additions & 27 deletions aquadoggo/src/db/stores/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ use crate::db::SqlStore;
/// p2panda-rs blob validation too.
const MAX_BLOB_PIECES: u64 = 10000;

pub type BlobData = String;
pub type BlobData = Vec<u8>;

impl SqlStore {
/// Get the data for one blob from the store, identified by it's document id.
pub async fn get_blob(&self, id: &DocumentId) -> Result<Option<BlobData>, BlobStoreError> {
// Get the root blob document.
// Get the root blob document
let blob_document = match self.get_document(id).await? {
Some(document) => {
if document.schema_id != SchemaId::Blob(1) {
Expand All @@ -42,7 +42,7 @@ impl SqlStore {
&self,
view_id: &DocumentViewId,
) -> Result<Option<BlobData>, BlobStoreError> {
// Get the root blob document.
// Get the root blob document
let blob_document = match self.get_document_by_view_id(view_id).await? {
Some(document) => {
if document.schema_id != SchemaId::Blob(1) {
Expand Down Expand Up @@ -71,9 +71,9 @@ impl SqlStore {
operation_fields_v1.value
FROM
operation_fields_v1
LEFT JOIN
LEFT JOIN
operations_v1
ON
ON
operations_v1.operation_id = operation_fields_v1.operation_id
WHERE
operations_v1.document_id = $1
Expand Down Expand Up @@ -126,28 +126,28 @@ async fn reverse_relations(

query_scalar(&format!(
"
SELECT
document_view_fields.document_view_id
FROM
SELECT
document_view_fields.document_view_id
FROM
document_view_fields
LEFT JOIN
operation_fields_v1
ON
document_view_fields.operation_id = operation_fields_v1.operation_id
AND
document_view_fields.name = operation_fields_v1.name
LEFT JOIN
LEFT JOIN
document_views
ON
document_view_fields.document_view_id = document_views.document_view_id
WHERE
operation_fields_v1.field_type
IN
IN
('pinned_relation', 'pinned_relation_list', 'relation', 'relation_list')
{schema_id_condition}
AND
AND
operation_fields_v1.value IN (
SELECT document_views.document_view_id
SELECT document_views.document_view_id
FROM document_views
WHERE document_views.document_id = $1
) OR operation_fields_v1.value = $1
Expand Down Expand Up @@ -178,8 +178,8 @@ async fn document_to_blob_data(

// Now collect all existing pieces for the blob.
//
// We do this using the stores' query method, targeting pieces which are in the relation
// list of the blob.
// We do this using the stores' query method, targeting pieces which are in the relation list
// of the blob.
let schema = Schema::get_system(SchemaId::BlobPiece(1)).unwrap();
let list = RelationList::new_pinned(blob.view_id(), "pieces");
let pagination = Pagination {
Expand Down Expand Up @@ -224,7 +224,7 @@ async fn document_to_blob_data(
return Err(BlobStoreError::IncorrectLength);
};

Ok(Some(blob_data))
Ok(Some(blob_data.into_bytes()))
}

#[cfg(test)]
Expand All @@ -245,18 +245,18 @@ mod tests {
#[rstest]
fn get_blob(key_pair: KeyPair) {
test_runner(|mut node: TestNode| async move {
let blob_data = "Hello, World!".to_string();
let blob_view_id = add_blob(&mut node, &blob_data, &key_pair).await;
let blob_data = "Hello, World!".as_bytes();
let blob_view_id = add_blob(&mut node, &blob_data, 6, "text/plain", &key_pair).await;

let document_id: DocumentId = blob_view_id.to_string().parse().unwrap();

// Get blob by document id.
// Get blob by document id
let blob = node.context.store.get_blob(&document_id).await.unwrap();

assert!(blob.is_some());
assert_eq!(blob.unwrap(), blob_data);

// Get blob by view id.
// Get blob by view id
let blob = node
.context
.store
Expand Down Expand Up @@ -376,8 +376,8 @@ mod tests {
#[rstest]
fn purge_blob(key_pair: KeyPair) {
test_runner(|mut node: TestNode| async move {
let blob_data = "Hello, World!".to_string();
let blob_view_id = add_blob(&mut node, &blob_data, &key_pair).await;
let blob_data = "Hello, World!".as_bytes();
let blob_view_id = add_blob(&mut node, &blob_data, 7, "text/plain", &key_pair).await;

// There is one blob and two blob pieces in database.
//
Expand Down Expand Up @@ -419,8 +419,8 @@ mod tests {
test_runner(|mut node: TestNode| async move {
let _ = populate_and_materialize(&mut node, &config).await;

let blob_data = "Hello, World!".to_string();
let blob_view_id = add_blob(&mut node, &blob_data, &key_pair).await;
let blob_data = "Hello, World!".as_bytes();
let blob_view_id = add_blob(&mut node, &blob_data, 7, "text/plain", &key_pair).await;

// There is one blob and two blob pieces in database.
//
Expand Down Expand Up @@ -453,8 +453,8 @@ mod tests {
#[rstest]
fn does_not_purge_blob_if_still_pinned(key_pair: KeyPair) {
test_runner(|mut node: TestNode| async move {
let blob_data = "Hello, World!".to_string();
let blob_view_id = add_blob(&mut node, &blob_data, &key_pair).await;
let blob_data = "Hello, World!".as_bytes();
let blob_view_id = add_blob(&mut node, &blob_data, 7, "text/plain", &key_pair).await;

let _ = add_schema_and_documents(
&mut node,
Expand Down Expand Up @@ -497,8 +497,8 @@ mod tests {
#[rstest]
fn purge_all_pieces_of_updated_blob(key_pair: KeyPair) {
test_runner(|mut node: TestNode| async move {
let blob_data = "Hello, World!".to_string();
let blob_view_id = add_blob(&mut node, &blob_data, &key_pair).await;
let blob_data = "Hello, World!".as_bytes();
let blob_view_id = add_blob(&mut node, &blob_data, 7, "text/plain", &key_pair).await;

// Create a new blob piece.
let new_blob_pieces = add_document(
Expand Down
34 changes: 23 additions & 11 deletions aquadoggo/src/graphql/mutations/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ mod tests {

use crate::bus::ServiceMessage;
use crate::graphql::GraphQLSchemaManager;
use crate::http::{HttpServiceContext, BLOBS_ROUTE};
use crate::http::HttpServiceContext;
use crate::test_utils::{
add_schema, doggo_fields, doggo_schema, graphql_test_client, populate_and_materialize,
add_schema, doggo_fields, doggo_schema, http_test_client, populate_and_materialize,
populate_store_config, test_runner, TestNode,
};

Expand Down Expand Up @@ -237,7 +237,11 @@ mod tests {
node.context.schema_provider.clone(),
)
.await;
let context = HttpServiceContext::new(manager, BLOBS_ROUTE.into());
let context = HttpServiceContext::new(
node.context.store.clone(),
manager,
node.context.config.blob_dir.as_ref().unwrap().to_path_buf(),
);

let response = context.schema.execute(publish_request).await;

Expand Down Expand Up @@ -298,7 +302,11 @@ mod tests {
node.context.schema_provider.clone(),
)
.await;
let context = HttpServiceContext::new(manager, BLOBS_ROUTE.into());
let context = HttpServiceContext::new(
node.context.store.clone(),
manager,
node.context.config.blob_dir.as_ref().unwrap().to_path_buf(),
);

let response = context
.schema
Expand Down Expand Up @@ -326,7 +334,11 @@ mod tests {
node.context.schema_provider.clone(),
)
.await;
let context = HttpServiceContext::new(manager, BLOBS_ROUTE.into());
let context = HttpServiceContext::new(
node.context.store.clone(),
manager,
node.context.config.blob_dir.as_ref().unwrap().to_path_buf(),
);

context.schema.execute(publish_request).await;

Expand Down Expand Up @@ -354,7 +366,7 @@ mod tests {
populate_and_materialize(&mut node, &config).await;

// Init the test client.
let client = graphql_test_client(&node).await;
let client = http_test_client(&node).await;

let response = client
.post("/graphql")
Expand Down Expand Up @@ -573,7 +585,7 @@ mod tests {
populate_and_materialize(&mut node, &config).await;

// Init the test client
let client = graphql_test_client(&node).await;
let client = http_test_client(&node).await;

// Prepare the GQL publish request
let publish_request = publish_request(&entry_encoded, &encoded_operation);
Expand Down Expand Up @@ -701,7 +713,7 @@ mod tests {
populate_and_materialize(&mut node, &config).await;

// Init the test client.
let client = graphql_test_client(&node).await;
let client = http_test_client(&node).await;

let publish_request = publish_request(&entry_encoded, &encoded_operation);

Expand Down Expand Up @@ -736,7 +748,7 @@ mod tests {
populate_and_materialize(&mut node, &config).await;

// Init the test client.
let client = graphql_test_client(&node).await;
let client = http_test_client(&node).await;

// Two key pairs representing two different authors
let key_pairs = vec![KeyPair::new(), KeyPair::new()];
Expand Down Expand Up @@ -828,7 +840,7 @@ mod tests {
populate_and_materialize(&mut node, &config).await;

// Init the test client.
let client = graphql_test_client(&node).await;
let client = http_test_client(&node).await;

// Get the one entry from the store.
let entries = node
Expand Down Expand Up @@ -871,7 +883,7 @@ mod tests {
) {
test_runner(|node: TestNode| async move {
// Init the test client.
let client = graphql_test_client(&node).await;
let client = http_test_client(&node).await;

// Prepare a publish entry request for the entry.
let publish_entry = publish_request(
Expand Down
Loading

0 comments on commit a404d0c

Please sign in to comment.