Skip to content

Commit

Permalink
feat(faucet_server): download and upload gutenberger book part by part
Browse files Browse the repository at this point in the history
  • Loading branch information
maqi authored and joshuef committed Jun 4, 2024
1 parent 30c2d2c commit 66fa8a8
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 70 deletions.
165 changes: 96 additions & 69 deletions sn_faucet/src/faucet_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,18 @@ use warp::{
Filter, Reply,
};

#[cfg(feature = "initial-data")]
use crate::gutenberger::{download_book, State};
#[cfg(feature = "initial-data")]
use autonomi::FilesUploader;
#[cfg(feature = "initial-data")]
use color_eyre::eyre;
use reqwest::Client as ReqwestClient;
#[cfg(feature = "initial-data")]
use sn_client::{UploadCfg, BATCH_SIZE};
#[cfg(feature = "initial-data")]
use sn_protocol::storage::RetryStrategy;
use sn_protocol::storage::{ChunkAddress, RetryStrategy};
#[cfg(feature = "initial-data")]
use std::{fs::File, path::PathBuf};
#[cfg(feature = "initial-data")]
use tokio::{fs, io::AsyncWriteExt};

Expand Down Expand Up @@ -70,7 +74,7 @@ pub async fn run_faucet_server(client: &Client) -> Result<()> {

#[cfg(feature = "initial-data")]
{
upload_initial_data(client, &root_dir).await?;
let _ = upload_initial_data(client, &root_dir).await;
}

startup_server(client.clone()).await
Expand All @@ -79,91 +83,114 @@ pub async fn run_faucet_server(client: &Client) -> Result<()> {
#[cfg(feature = "initial-data")]
/// Trigger one by one uploading of intitial data packets to the entwork.
async fn upload_initial_data(client: &Client, root_dir: &Path) -> Result<()> {
let urls = vec![
"https://releases.ubuntu.com/23.04/ubuntu-23.04-desktop-amd64.iso",
"https://releases.ubuntu.com/23.04/ubuntu-23.04-live-server-amd64.iso",
"https://releases.ubuntu.com/22.04.3/ubuntu-22.04.3-desktop-amd64.iso",
"https://releases.ubuntu.com/22.04.3/ubuntu-22.04.3-live-server-amd64.iso",
"https://releases.ubuntu.com/20.04.6/ubuntu-20.04.6-desktop-amd64.iso",
];

let temp_dir = std::env::temp_dir();
let mut download_tasks = Vec::new();

for url in urls {
let temp_dir = temp_dir.clone();
let task = tokio::spawn(async move {
info!("Starting download for URL: {}", url);
info!("Downloading to {temp_dir:?}");
let response = reqwest::get(url).await?;
let (mut dest, fname) = {
let fname = response
.url()
.path_segments()
.and_then(std::iter::Iterator::last)
.unwrap_or("tempfile");
let fname = temp_dir.join(fname);
(fs::File::create(fname.clone()).await?, fname)
};
let content = response.bytes().await?;
dest.write_all(&content).await?;
info!("Download completed and saved to {:?}", dest);
Ok::<_, eyre::Report>(fname)
});
download_tasks.push(task);
}
let state_file = temp_dir.join("state.json");
let uploaded_books_file = temp_dir.join("uploaded_books.json");
let mut state = State::load_from_file(&state_file)?;

let reqwest_client = ReqwestClient::new();

let results = futures::future::join_all(download_tasks).await;
let mut download_files = vec![];
results.into_iter().for_each(|res| match res {
Ok(Ok(fname)) => {
info!("Download completed successfully, file written to {fname:?}");
println!("Download completed successfully, file written to {fname:?}");
download_files.push(fname);
let mut uploaded_books = vec![];

for book_id in 1..u16::MAX as u32 {
if state.has_seen(book_id) {
println!("Already seen book ID: {book_id}");
info!("Already seen book ID: {book_id}");
continue;
}
Ok(Err(e)) => error!("Error downloading file: {}", e),
Err(e) => error!("Task panicked: {}", e),
});

match download_book(&reqwest_client, book_id).await {
Ok(data) => {
println!("Downloaded book ID: {book_id}");
info!("Downloaded book ID: {book_id}");

let fname = format!("{book_id}.book");
let fpath = temp_dir.join(fname.clone());
let mut dest = fs::File::create(fpath.clone()).await?;
dest.write_all(&data).await?;

state.mark_seen(book_id);
state.save_to_file(&state_file)?;

match upload_downloaded_book(client, root_dir, fpath).await {
Ok(head_addresses) => {
println!("Uploaded book ID: {book_id}");
info!("Uploaded book ID: {book_id}");

// There shall be just one
for head_address in head_addresses {
uploaded_books.push((fname.clone(), head_address.to_hex()));
let file = File::create(uploaded_books_file.clone())?;
serde_json::to_writer(file, &uploaded_books)?;
}
}
Err(err) => {
println!("Failed to upload book ID: {book_id} with error {err:?}");
info!("Failed to upload book ID: {book_id} with error {err:?}");
}
}

println!("Sleeping for 1 minutes...");
tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
}
Err(e) => {
eprintln!("Failed to download book ID {book_id}: {e:?}");
}
}
}

Ok(())
}

#[cfg(feature = "initial-data")]
async fn upload_downloaded_book(
client: &Client,
root_dir: &Path,
file_path: PathBuf,
) -> Result<Vec<ChunkAddress>> {
let upload_cfg = UploadCfg {
batch_size: BATCH_SIZE,
verify_store: true,
retry_strategy: RetryStrategy::Quick,
..Default::default()
};

for file_path in download_files {
let files_uploader = FilesUploader::new(client.clone(), root_dir.to_path_buf())
.set_make_data_public(true)
.set_upload_cfg(upload_cfg)
.insert_path(&file_path);
let files_uploader = FilesUploader::new(client.clone(), root_dir.to_path_buf())
.set_make_data_public(true)
.set_upload_cfg(upload_cfg)
.insert_path(&file_path);

let summary = files_uploader.start_upload().await?;
let summary = match files_uploader.start_upload().await {
Ok(summary) => summary,
Err(err) => {
println!("Failed to upload {file_path:?} with error {err:?}");
return Ok(vec![]);
}
};

info!(
"File {file_path:?} uploaded completed with summary {:?}",
summary.upload_summary
);
println!(
"File {file_path:?} uploaded completed with summary {:?}",
summary.upload_summary
);

let mut head_addresses = vec![];
for (_, file_name, head_address) in summary.completed_files.iter() {
info!(
"File {file_path:?} uploaded completed with summary {:?}",
summary.upload_summary
"Head address of {file_name:?} is {:?}",
head_address.to_hex()
);
println!(
"File {file_path:?} uploaded completed with summary {:?}",
summary.upload_summary
"Head address of {file_name:?} is {:?}",
head_address.to_hex()
);

for (_, file_name, head_address) in summary.completed_files.iter() {
info!(
"Head address of {file_name:?} is {:?}",
head_address.to_hex()
);
println!(
"Head address of {file_name:?} is {:?}",
head_address.to_hex()
);
}

std::thread::sleep(std::time::Duration::from_millis(600));
head_addresses.push(*head_address);
}

Ok(())
Ok(head_addresses)
}

pub async fn restart_faucet_server(client: &Client) -> Result<()> {
Expand Down
57 changes: 57 additions & 0 deletions sn_faucet/src/gutenberger.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2024 MaidSafe.net limited.
//
// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use color_eyre::eyre::Result;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::fs::File;
use std::path::Path;

#[derive(Serialize, Deserialize)]
pub(crate) struct State {
seen_books: HashSet<u32>,
}

impl State {
pub(crate) fn new() -> Self {
State {
seen_books: HashSet::new(),
}
}

pub(crate) fn load_from_file(path: &Path) -> Result<Self> {
if path.exists() {
let file = File::open(path)?;
let state: State = serde_json::from_reader(file)?;
Ok(state)
} else {
Ok(Self::new())
}
}

pub(crate) fn save_to_file(&self, path: &Path) -> Result<()> {
let file = File::create(path)?;
serde_json::to_writer(file, self)?;
Ok(())
}

pub(crate) fn mark_seen(&mut self, book_id: u32) {
self.seen_books.insert(book_id);
}

pub(crate) fn has_seen(&self, book_id: u32) -> bool {
self.seen_books.contains(&book_id)
}
}

pub(crate) async fn download_book(client: &Client, book_id: u32) -> Result<Vec<u8>> {
let url = format!("http://www.gutenberg.org/ebooks/{book_id}.txt.utf-8");
let response = client.get(&url).send().await?.bytes().await?;
Ok(response.to_vec())
}
2 changes: 2 additions & 0 deletions sn_faucet/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
// permissions and limitations relating to use of the SAFE Network Software.

mod faucet_server;
#[cfg(feature = "initial-data")]
pub(crate) mod gutenberger;
#[cfg(feature = "distribution")]
mod token_distribution;

Expand Down
2 changes: 1 addition & 1 deletion sn_node_manager/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const MAX_DOWNLOAD_RETRIES: u8 = 3;

#[cfg(windows)]
pub async fn configure_winsw(dest_path: &Path, verbosity: VerbosityLevel) -> Result<()> {
if let Ok(_) = which::which("winsw.exe") {
if which::which("winsw.exe").is_ok() {
return Ok(());
}

Expand Down

0 comments on commit 66fa8a8

Please sign in to comment.