Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parse fixes #12

Merged
merged 8 commits into from
Jan 3, 2025
Merged
10 changes: 4 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "arkavo-rs"
version = "0.9.1"
edition = "2021"
rust-version = "1.80.0"
rust-version = "1.83.0"

[lib]
name = "nanotdf"
Expand All @@ -15,7 +15,7 @@ path = "src/bin/main.rs"
[profile.release]
opt-level = 3
lto = true
codegen-units = 1
codegen-units = 16

[profile.bench]
lto = true
Expand All @@ -32,7 +32,6 @@ aes-gcm = "0.10.3"
p256 = { version = "=0.13.2", features = ["ecdh"] }
once_cell = "1.19.0"
rand_core = "0.6.4"
zeroize = "1.8.1"
sha2 = "0.10.8"
hkdf = "0.12.4"
tokio-native-tls = "0.3.1"
Expand All @@ -41,10 +40,9 @@ env_logger = "0.11.5"
log = "0.4.22"
ink = "5.0.0"
jsonwebtoken = "9.3.0"
async-nats = "0.36.0"
serde_json = "1.0.128"
async-nats = "0.38.0"
redis = { version = "0.27.2", features = ["tokio-comp"] }
flatbuffers = "24.3.25"
flatbuffers = "24.12.23"
scale = { package = "parity-scale-codec", version = "3.6.12", default-features = false, features = ["derive"] }
scale-info = { version = "2.11.3", default-features = false, features = ["derive"], optional = true }
bs58 = "0.5.1"
Expand Down
22 changes: 13 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,7 @@ flatc --binary --rust idl/metadata.fbs

### Installation

1. Clone the repository:

```shell
git clone https://github.com/arkavo-org/backend-rust.git
cd backend-rust
```

2. Build the project to download and compile the dependencies:
1. Build the project to download and compile the dependencies:

```shell
cargo build
Expand All @@ -64,7 +57,18 @@ flatc --binary --rust idl/metadata.fbs
openssl ec -in recipient_private_key.pem -text -noout
```

2. Generating Self-Signed Certificate
2. Ensure you have a valid EC private key in PEM format named `recipient_private_key.pem`.

```shell
openssl ecparam -genkey -name prime256v1 -noout -out recipient_private_key.pem
```

Validate
```shell
openssl ec -in recipient_private_key.pem -text -noout
```

3. Generating Self-Signed Certificate

For development purposes, you can generate a self-signed certificate using OpenSSL. Run the following command in your
terminal:
Expand Down
50 changes: 41 additions & 9 deletions src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ static KAS_KEYS: OnceCell<Arc<KasKeys>> = OnceCell::new();
trait AsyncStream: AsyncRead + AsyncWrite + Unpin + Send {}
impl<T> AsyncStream for T where T: AsyncRead + AsyncWrite + Unpin + Send {}

#[tokio::main(flavor = "multi_thread")]
#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize logging
env_logger::init();
Expand Down Expand Up @@ -523,8 +523,10 @@ async fn handle_rewrap(
locator = policy.get_locator().clone();
}
PolicyType::Embedded => {
// println!("embedded policy");
println!("embedded policy");
if let Some(body) = &policy.body {
println!("Metadata buffer size: {}", body.len());
println!("Metadata buffer contents: {:?}", body);
metadata = match root_as_metadata(body) {
Ok(metadata) => Some(metadata),
Err(e) => {
Expand All @@ -533,7 +535,7 @@ async fn handle_rewrap(
}
};
// TODO add contracts
// println!("metadata: {:#?}", metadata);
println!("metadata: {:#?}", metadata);
}
// add content rating contract
let rl = ResourceLocator {
Expand Down Expand Up @@ -840,7 +842,7 @@ async fn handle_nats_subscription(
Ok(mut subscription) => {
info!("Subscribed to NATS subject: {}", subject);
while let Some(msg) = subscription.next().await {
if let Err(e) = handle_nats_event(msg, connection_state.clone()).await {
if let Err(e) = handle_nats(msg, connection_state.clone()).await {
error!("Error handling NATS message: {}", e);
}
}
Expand All @@ -856,25 +858,52 @@ async fn handle_nats_subscription(
tokio::time::sleep(NATS_RETRY_INTERVAL).await;
}
}
async fn handle_nats_event(

async fn handle_nats(
msg: NatsMessage,
connection_state: Arc<ConnectionState>,
) -> Result<(), Box<dyn std::error::Error>> {
// it nanotdf, then do a message, otherwise it is a Flatbuffers event
let message_type = if msg.payload[0..3].iter().eq(&[0x4C, 0x31, 0x4C]) {
MessageType::Nats
} else {
MessageType::Event
};

let ws_message = Message::Binary(
vec![MessageType::Event as u8]
vec![message_type as u8]
.into_iter()
.chain(msg.payload)
.collect(),
);
connection_state.outgoing_tx.send(ws_message)?;
Ok(())
}

async fn handle_event(
server_state: &Arc<ServerState>,
payload: &[u8],
nats_connection: Arc<NatsConnection>,
) -> Option<Message> {
let start_time = Instant::now();
println!(
"Payload (first 20 bytes in hex, space-delimited): {}",
payload
.iter()
.take(20)
.map(|byte| format!("{:02x}", byte))
.collect::<Vec<String>>()
.join(" ")
);
// Size validation for type 0x06
const MAX_EVENT_SIZE: usize = 2000; // Adjust this value as needed
if payload.len() > MAX_EVENT_SIZE {
error!(
"Event payload exceeds maximum allowed size of {} bytes",
MAX_EVENT_SIZE
);
return None;
}
let mut event_data: Option<Vec<u8>> = None;
if let Ok(event) = root::<Event>(payload) {
println!("Event Action: {:?}", event.action());
Expand Down Expand Up @@ -920,6 +949,9 @@ async fn handle_event(
}
};
// TODO if cache miss then route to device
} else {
error!("Failed to parse user event from payload");
return None;
}
}
EventData::CacheEvent => {
Expand Down Expand Up @@ -1162,21 +1194,21 @@ fn load_config() -> Result<ServerSettings, Box<dyn std::error::Error>> {
tls_enabled: env::var("TLS_CERT_PATH").is_ok(),
tls_cert_path: env::var("TLS_CERT_PATH").unwrap_or_else(|_| {
current_dir
.join("../../fullchain.pem")
.join("fullchain.pem")
.to_str()
.unwrap()
.to_string()
}),
tls_key_path: env::var("TLS_KEY_PATH").unwrap_or_else(|_| {
current_dir
.join("../../privkey.pem")
.join("privkey.pem")
.to_str()
.unwrap()
.to_string()
}),
kas_key_path: env::var("KAS_KEY_PATH").unwrap_or_else(|_| {
current_dir
.join("../../recipient_private_key.pem")
.join("recipient_private_key.pem")
.to_str()
.unwrap()
.to_string()
Expand Down
2 changes: 1 addition & 1 deletion src/bin/schemas/entity_generated.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#![allow(clippy::all)]
// automatically generated by the FlatBuffers compiler, do not modify
#![allow(clippy::extra_unused_lifetimes)]

// @generated

Expand Down
13 changes: 11 additions & 2 deletions src/bin/schemas/event_generated.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
#![allow(clippy::all)]
// automatically generated by the FlatBuffers compiler, do not modify
#![allow(clippy::extra_unused_lifetimes)]

// @generated

extern crate flatbuffers;

#[allow(unused_imports, dead_code)]
pub mod arkavo {

use core::cmp::Ordering;
use core::mem;

extern crate flatbuffers;
use self::flatbuffers::Follow;
use self::flatbuffers::{EndianScalar, Follow};

#[deprecated(
since = "2.0.0",
Expand Down Expand Up @@ -677,6 +682,7 @@ pub mod arkavo {
v: &mut flatbuffers::Verifier,
pos: usize,
) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<EntityType>("source_type", Self::VT_SOURCE_TYPE, false)?
.visit_field::<EntityType>("target_type", Self::VT_TARGET_TYPE, false)?
Expand Down Expand Up @@ -895,6 +901,7 @@ pub mod arkavo {
v: &mut flatbuffers::Verifier,
pos: usize,
) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>(
"target_id",
Expand Down Expand Up @@ -1140,6 +1147,7 @@ pub mod arkavo {
v: &mut flatbuffers::Verifier,
pos: usize,
) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<EntityType>("target_type", Self::VT_TARGET_TYPE, false)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>(
Expand Down Expand Up @@ -1433,6 +1441,7 @@ pub mod arkavo {
v: &mut flatbuffers::Verifier,
pos: usize,
) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<Action>("action", Self::VT_ACTION, false)?
.visit_field::<u64>("timestamp", Self::VT_TIMESTAMP, false)?
Expand Down
14 changes: 2 additions & 12 deletions src/bin/schemas/metadata_generated.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
#![allow(clippy::all)]
// automatically generated by the FlatBuffers compiler, do not modify
#![allow(clippy::extra_unused_lifetimes)]
// @generated

extern crate flatbuffers;

#[allow(unused_imports, dead_code)]
pub mod arkavo {

use crate::schemas::entity_generated::arkavo::MediaType;
use core::cmp::Ordering;
use core::mem;

extern crate flatbuffers;
use self::flatbuffers::{EndianScalar, Follow};
use self::flatbuffers::Follow;

#[deprecated(
since = "2.0.0",
Expand Down Expand Up @@ -627,7 +623,6 @@ pub mod arkavo {
v: &mut flatbuffers::Verifier,
pos: usize,
) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<RatingLevel>("violent", Self::VT_VIOLENT, false)?
.visit_field::<RatingLevel>("sexual", Self::VT_SEXUAL, false)?
Expand Down Expand Up @@ -904,7 +899,6 @@ pub mod arkavo {
v: &mut flatbuffers::Verifier,
pos: usize,
) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<f32>("educational", Self::VT_EDUCATIONAL, false)?
.visit_field::<f32>("entertainment", Self::VT_ENTERTAINMENT, false)?
Expand Down Expand Up @@ -1114,7 +1108,6 @@ pub mod arkavo {
v: &mut flatbuffers::Verifier,
pos: usize,
) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<FormatType>("type_", Self::VT_TYPE_, false)?
.visit_field::<flatbuffers::ForwardsUOffset<&str>>(
Expand Down Expand Up @@ -1279,7 +1272,6 @@ pub mod arkavo {
v: &mut flatbuffers::Verifier,
pos: usize,
) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<ArchiveType>("type_", Self::VT_TYPE_, false)?
.visit_field::<flatbuffers::ForwardsUOffset<&str>>(
Expand Down Expand Up @@ -1446,7 +1438,6 @@ pub mod arkavo {
v: &mut flatbuffers::Verifier,
pos: usize,
) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<MediaType>("media_type", Self::VT_MEDIA_TYPE, false)?
.visit_field::<DataEncoding>("data_encoding", Self::VT_DATA_ENCODING, false)?
Expand Down Expand Up @@ -1691,7 +1682,6 @@ pub mod arkavo {
v: &mut flatbuffers::Verifier,
pos: usize,
) -> Result<(), flatbuffers::InvalidFlatbuffer> {
use self::flatbuffers::Verifiable;
v.visit_table(pos)?
.visit_field::<i64>("created", Self::VT_CREATED, false)?
.visit_field::<flatbuffers::ForwardsUOffset<flatbuffers::Vector<'_, u8>>>(
Expand Down
Loading