Skip to content

Commit

Permalink
Merge pull request #17 from OpenArchive/create-backup-server
Browse files Browse the repository at this point in the history
Create backup server
  • Loading branch information
tripledoublev authored Dec 10, 2024
2 parents 094b4ed + 697014d commit 0cc46fc
Show file tree
Hide file tree
Showing 8 changed files with 1,011 additions and 71 deletions.
7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@ xdg = "2.4"
tmpdir = "1"
serde = "1.0.204"
serde_cbor = "0.11.2"
clap = "4.5.9"
clap = { version = "4.5.9", features = ["derive"] }
anyhow = "1.0.86"
tokio = {version ="1.39.3", features=["full"] }
tokio-stream = "0.1.16"
async-stream = "0.3.5"
futures-core = "0.3.30"
futures = "0.3.31"
futures-core = "0.3.31"
futures-util = "0.3.31"
bytes = "1.6.1"
serial_test = "3.1.1"
url = "2.5.2"
hex = "0.4.3"
rand = "0.8.5"
base64 = "0.22.1"
39 changes: 36 additions & 3 deletions src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub struct BackendInner {
update_rx: Option<broadcast::Receiver<VeilidUpdate>>,
groups: HashMap<CryptoKey, Box<Group>>,
pub iroh_blobs: Option<VeilidIrohBlobs>,
on_new_route_callback: Option<OnNewRouteCallback>,
}

impl BackendInner {
Expand Down Expand Up @@ -85,6 +86,7 @@ impl BackendInner {
}
}

#[derive(Clone)]
pub struct Backend {
inner: Arc<Mutex<BackendInner>>,
}
Expand All @@ -97,6 +99,7 @@ impl Backend {
update_rx: None,
groups: HashMap::new(),
iroh_blobs: None,
on_new_route_callback: None,
};

let backend = Backend {
Expand All @@ -118,6 +121,7 @@ impl Backend {
update_rx: Some(update_rx),
groups: HashMap::new(),
iroh_blobs: None,
on_new_route_callback: None,
};

let backend = Backend {
Expand All @@ -126,12 +130,16 @@ impl Backend {

let inner_clone = backend.inner.clone();

let on_new_route_callback: OnNewRouteCallback = Arc::new(move |_, _| {
let on_new_route_callback: OnNewRouteCallback = Arc::new(move |route_id, route_id_blob| {
let inner = inner_clone.clone();
println!("Re-generating route");
tokio::spawn(async move {
let inner = inner.lock().await;

if let Some(on_new_route) = &inner.on_new_route_callback {
on_new_route(route_id, route_id_blob)
}

for group in inner.groups.clone().into_values() {
if let Some(repo) = group.get_own_repo().await {
if let Err(err) = repo.update_route_on_dht().await {
Expand Down Expand Up @@ -204,12 +212,16 @@ impl Backend {

let inner_clone = self.inner.clone();

let on_new_route_callback: OnNewRouteCallback = Arc::new(move |_, _| {
let on_new_route_callback: OnNewRouteCallback = Arc::new(move |route_id, route_id_blob| {
let inner = inner_clone.clone();
println!("Re-generating route");
tokio::spawn(async move {
let inner = inner.lock().await;

if let Some(on_new_route) = &inner.on_new_route_callback {
on_new_route(route_id, route_id_blob)
}

for group in inner.groups.clone().into_values() {
if let Some(repo) = group.get_own_repo().await {
if let Err(err) = repo.update_route_on_dht().await {
Expand Down Expand Up @@ -264,11 +276,27 @@ impl Backend {
Ok(())
}

pub async fn set_on_new_route_callback(
&self,
on_new_route_connected_callback: OnNewRouteCallback,
) {
let mut inner = self.inner.lock().await;
inner.on_new_route_callback = Some(on_new_route_connected_callback);
}

pub async fn join_from_url(&self, url_string: &str) -> Result<Box<Group>> {
let keys = parse_url(url_string)?;
self.join_group(keys).await
}

pub async fn get_route_id_blob(&self) -> Result<Vec<u8>> {
if let Some(blobs) = self.get_iroh_blobs().await {
Ok(blobs.route_id_blob().await)
} else {
Err(anyhow!("Veilid not initialized"))
}
}

pub async fn join_group(&self, keys: CommonKeypair) -> Result<Box<Group>> {
let mut inner = self.inner.lock().await;

Expand Down Expand Up @@ -487,6 +515,11 @@ impl Backend {
let mut inner = self.inner.lock().await;
inner.iroh_blobs.clone()
}

pub async fn get_routing_context(&self) -> Option<RoutingContext> {
let veilid_api = self.get_veilid_api().await?;
veilid_api.routing_context().ok()
}
}

async fn wait_for_network(update_rx: &mut broadcast::Receiver<VeilidUpdate>) -> Result<()> {
Expand All @@ -511,7 +544,7 @@ fn find_query(url: &Url, key: &str) -> Result<String> {
Err(anyhow!("Unable to find parameter {} in URL {:?}", key, url))
}

fn crypto_key_from_query(url: &Url, key: &str) -> Result<CryptoKey> {
pub fn crypto_key_from_query(url: &Url, key: &str) -> Result<CryptoKey> {
let value = find_query(url, key)?;
let bytes = hex::decode(value)?;
let mut key_vec: [u8; CRYPTO_KEY_LENGTH] = [0; CRYPTO_KEY_LENGTH];
Expand Down
14 changes: 8 additions & 6 deletions src/common.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,28 @@
#![allow(async_fn_in_trait)]
#![allow(clippy::async_yields_async)]

use crate::constants::ROUTE_ID_DHT_KEY;
use anyhow::{anyhow, Result};
use serde::{Deserialize, Serialize};
use std::{path::Path, path::PathBuf, sync::Arc};
use tokio::sync::broadcast::{self, Receiver};
use url::Url;
use veilid_core::{
CryptoKey, CryptoSystem, CryptoSystemVLD0, CryptoTyped, DHTRecordDescriptor, KeyPair, Nonce,
ProtectedStore, RouteId, RoutingContext, Sequencing, SharedSecret, Stability, UpdateCallback,
VeilidAPI, VeilidConfigInner, VeilidUpdate, CRYPTO_KIND_VLD0, VALID_CRYPTO_KINDS,
};

use crate::constants::ROUTE_ID_DHT_KEY;

pub async fn make_route(veilid: &VeilidAPI) -> Result<(RouteId, Vec<u8>)> {
let mut retries = 6;
while retries > 0 {
retries -= 1;
let result = veilid
.new_custom_private_route(
&VALID_CRYPTO_KINDS,
Stability::Reliable,
Sequencing::EnsureOrdered,
Stability::LowLatency,
Sequencing::NoPreference,
)
.await;

Expand Down Expand Up @@ -73,7 +75,7 @@ pub async fn init_veilid(
}

pub fn config_for_dir(base_dir: PathBuf, namespace: String) -> VeilidConfigInner {
return VeilidConfigInner {
VeilidConfigInner {
program_name: "save-dweb-backend".to_string(),
namespace,
protected_store: veilid_core::VeilidConfigProtectedStore {
Expand All @@ -94,10 +96,10 @@ pub fn config_for_dir(base_dir: PathBuf, namespace: String) -> VeilidConfigInner
..Default::default()
},
..Default::default()
};
}
}

#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone)]
pub struct CommonKeypair {
pub id: CryptoKey,
pub public_key: CryptoKey,
Expand Down
78 changes: 71 additions & 7 deletions src/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,17 @@ use rand::thread_rng;
use serde::{Deserialize, Serialize};
use std::any::Any;
use std::collections::HashMap;
use std::future::Future;
use std::time::{SystemTime, UNIX_EPOCH};

use std::path::PathBuf;
use std::result;
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex};
use url::Url;
use veilid_core::{
CryptoKey, CryptoSystemVLD0, CryptoTyped, DHTRecordDescriptor, DHTReportScope, DHTSchema,
KeyPair, ProtectedStore, RoutingContext, SharedSecret, TypedKey, ValueSubkeyRangeSet,
KeyPair, ProtectedStore, RoutingContext, SharedSecret, TypedKey, ValueSubkeyRangeSet, VeilidUpdate,
VeilidAPI, CRYPTO_KEY_LENGTH, CRYPTO_KIND_VLD0,
};
use veilid_iroh_blobs::iroh::VeilidIrohBlobs;
Expand Down Expand Up @@ -80,8 +83,7 @@ impl Group {
.lock()
.await
.get(id)
.ok_or_else(|| anyhow!("Repo not loaded"))
.map(|repo| repo.clone())
.ok_or_else(|| anyhow!("Repo not loaded")).cloned()
}

pub async fn has_repo(&self, id: &CryptoKey) -> bool {
Expand Down Expand Up @@ -122,7 +124,7 @@ impl Group {
let mut repos = self.list_peer_repos().await;
repos.shuffle(&mut rng);

if repos.len() == 0 {
if repos.is_empty() {
return Err(anyhow!("Cannot download hash. No other peers found"));
}

Expand Down Expand Up @@ -310,6 +312,15 @@ impl Group {

let mut repo_id_buffer: [u8; CRYPTO_KEY_LENGTH] = [0; CRYPTO_KEY_LENGTH];

// Validate the length before copying
if repo_id_raw.data().len() != repo_id_buffer.len() {
return Err(anyhow!(
"Slice length mismatch: expected {}, got {}",
repo_id_buffer.len(),
repo_id_raw.data().len()
));
}

repo_id_buffer.copy_from_slice(repo_id_raw.data());

let repo_id = TypedKey::new(CRYPTO_KIND_VLD0, CryptoKey::from(repo_id_buffer));
Expand Down Expand Up @@ -338,9 +349,9 @@ impl Group {
pub async fn try_load_repo_from_disk(&mut self) -> bool {
if let Err(err) = self.load_repo_from_disk().await {
eprintln!("Unable to load own repo from disk {}", err);
return false;
false
} else {
return true;
true
}
}

Expand Down Expand Up @@ -429,7 +440,7 @@ impl Group {
let keypair = CommonKeypair {
id: repo.id(),
public_key: repo_dht_record.owner().clone(),
secret_key: repo_dht_record.owner_secret().map(|key| *key),
secret_key: repo_dht_record.owner_secret().copied(),
encryption_key: encryption_key.clone(),
};

Expand Down Expand Up @@ -463,6 +474,59 @@ impl Group {
.await
.map_err(|_| anyhow!("Failed to load keypair for repo_id: {:?}", repo_id))
}

pub async fn watch_changes<F, Fut>(&self, on_change: F) -> Result<()>
where
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<()>> + Send + 'static,
{
let repo_count = self.dht_repo_count().await?;
let range = if repo_count > 0 {
ValueSubkeyRangeSet::single_range(0, repo_count as u32 - 1)
} else {
ValueSubkeyRangeSet::full()
};

let expiration_duration = 600_000_000;
let expiration = SystemTime::now()
.duration_since(UNIX_EPOCH)?
.as_micros() as u64 + expiration_duration;
let count = 0;

// Clone necessary data for the async block
let routing_context = self.routing_context.clone();
let dht_record_key = self.dht_record.key().clone();

// Spawn a task that uses only owned data
tokio::spawn(async move {
match routing_context
.watch_dht_values(dht_record_key.clone(), range.clone(), expiration.into(), count)
.await
{
Ok(_) => {
println!("DHT watch successfully set on record key {:?}", dht_record_key);

loop {
if let Ok(change) = routing_context
.watch_dht_values(dht_record_key.clone(), range.clone(), expiration.into(), count)
.await
{
if change > 0.into() {
if let Err(e) = on_change().await {
eprintln!("Failed to re-download files: {:?}", e);
}
}
}
}
}
Err(e) => eprintln!("Failed to set DHT watch: {:?}", e),
}
});

Ok(())
}


}

impl DHTEntity for Group {
Expand Down
Loading

0 comments on commit 0cc46fc

Please sign in to comment.