From 7b2a7a4f46954e363c585f3e92df5edb10d16308 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Fri, 17 Nov 2023 00:15:41 +0100 Subject: [PATCH 1/4] ws: improve socket worker connection code --- crates/ws/src/workers/socket/connection.rs | 439 ++++++++++----------- 1 file changed, 213 insertions(+), 226 deletions(-) diff --git a/crates/ws/src/workers/socket/connection.rs b/crates/ws/src/workers/socket/connection.rs index 12328080..b4fbc547 100644 --- a/crates/ws/src/workers/socket/connection.rs +++ b/crates/ws/src/workers/socket/connection.rs @@ -229,11 +229,14 @@ impl ConnectionReader { match &message { tungstenite::Message::Text(_) | tungstenite::Message::Binary(_) => { match InMessage::from_ws_message(message) { - Ok(in_message) => { - self.handle_in_message(in_message).await?; + Ok(InMessage::AnnounceRequest(request)) => { + self.handle_announce_request(request).await?; + } + Ok(InMessage::ScrapeRequest(request)) => { + self.handle_scrape_request(request).await?; } Err(err) => { - ::log::debug!("Couldn't parse in_message: {:?}", err); + ::log::debug!("Couldn't parse in_message: {:#}", err); self.send_error_response("Invalid request".into(), None, None) .await?; @@ -261,171 +264,167 @@ impl ConnectionReader { } } - async fn handle_in_message(&mut self, in_message: InMessage) -> anyhow::Result<()> { - match in_message { - InMessage::AnnounceRequest(announce_request) => { - #[cfg(feature = "metrics")] - ::metrics::increment_counter!( - "aquatic_requests_total", - "type" => "announce", - "ip_version" => ip_version_to_metrics_str(self.ip_version), - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ); + async fn handle_announce_request(&mut self, request: AnnounceRequest) -> anyhow::Result<()> { + #[cfg(feature = "metrics")] + ::metrics::increment_counter!( + "aquatic_requests_total", + "type" => "announce", + "ip_version" => ip_version_to_metrics_str(self.ip_version), + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), + ); - let info_hash = announce_request.info_hash; - - if self - .access_list_cache - .load() - .allows(self.config.access_list.mode, &info_hash.0) - { - let mut announced_info_hashes = - self.clean_up_data.announced_info_hashes.borrow_mut(); - - // Store peer id / check if stored peer id matches - match announced_info_hashes.entry(announce_request.info_hash) { - Entry::Occupied(entry) => { - if *entry.get() != announce_request.peer_id { - // Drop Rc borrow before awaiting - drop(announced_info_hashes); - - self.send_error_response( - "Only one peer id can be used per torrent".into(), - Some(ErrorResponseAction::Announce), - Some(info_hash), - ) - .await?; + let info_hash = request.info_hash; - return Err(anyhow::anyhow!( - "Peer used more than one PeerId for a single torrent" - )); - } - } - Entry::Vacant(entry) => { - entry.insert(announce_request.peer_id); - - // Set peer client info if not set - #[cfg(feature = "metrics")] - if self.config.metrics.run_prometheus_endpoint - && self.config.metrics.peer_clients - && self.clean_up_data.opt_peer_client.borrow().is_none() - { - let peer_id = aquatic_peer_id::PeerId(announce_request.peer_id.0); - let client = peer_id.client(); - let prefix = peer_id.first_8_bytes_hex().to_string(); - - ::metrics::increment_gauge!( - "aquatic_peer_clients", - 1.0, - "client" => client.to_string(), - ); - - if self.config.metrics.peer_id_prefixes { - ::metrics::increment_gauge!( - "aquatic_peer_id_prefixes", - 1.0, - "prefix_hex" => prefix.to_string(), - ); - } - - *self.clean_up_data.opt_peer_client.borrow_mut() = - Some((client, prefix)); - }; - } - } + if self + .access_list_cache + .load() + .allows(self.config.access_list.mode, &info_hash.0) + { + let mut announced_info_hashes = self.clean_up_data.announced_info_hashes.borrow_mut(); + + // Store peer id / check if stored peer id matches + match announced_info_hashes.entry(request.info_hash) { + Entry::Occupied(entry) => { + if *entry.get() != request.peer_id { + // Drop Rc borrow before awaiting + drop(announced_info_hashes); + + self.send_error_response( + "Only one peer id can be used per torrent".into(), + Some(ErrorResponseAction::Announce), + Some(info_hash), + ) + .await?; - if let Some(AnnounceEvent::Stopped) = announce_request.event { - announced_info_hashes.remove(&announce_request.info_hash); + return Err(anyhow::anyhow!( + "Peer used more than one PeerId for a single torrent" + )); } + } + Entry::Vacant(entry) => { + entry.insert(request.peer_id); + + // Set peer client info if not set + #[cfg(feature = "metrics")] + if self.config.metrics.run_prometheus_endpoint + && self.config.metrics.peer_clients + && self.clean_up_data.opt_peer_client.borrow().is_none() + { + let peer_id = aquatic_peer_id::PeerId(request.peer_id.0); + let client = peer_id.client(); + let prefix = peer_id.first_8_bytes_hex().to_string(); - // Drop Rc borrow before awaiting - drop(announced_info_hashes); - - let in_message = InMessage::AnnounceRequest(announce_request); + ::metrics::increment_gauge!( + "aquatic_peer_clients", + 1.0, + "client" => client.to_string(), + ); - let consumer_index = - calculate_in_message_consumer_index(&self.config, info_hash); + if self.config.metrics.peer_id_prefixes { + ::metrics::increment_gauge!( + "aquatic_peer_id_prefixes", + 1.0, + "prefix_hex" => prefix.to_string(), + ); + } - // Only fails when receiver is closed - self.in_message_senders - .send_to( - consumer_index, - (self.make_connection_meta(None), in_message), - ) - .await - .unwrap(); - } else { - self.send_error_response( - "Info hash not allowed".into(), - Some(ErrorResponseAction::Announce), - Some(info_hash), - ) - .await?; + *self.clean_up_data.opt_peer_client.borrow_mut() = Some((client, prefix)); + }; } } - InMessage::ScrapeRequest(ScrapeRequest { info_hashes, .. }) => { - #[cfg(feature = "metrics")] - ::metrics::increment_counter!( - "aquatic_requests_total", - "type" => "scrape", - "ip_version" => ip_version_to_metrics_str(self.ip_version), - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ); - let info_hashes = if let Some(info_hashes) = info_hashes { - info_hashes - } else { - // If request.info_hashes is empty, don't return scrape for all - // torrents, even though reference server does it. It is too expensive. - self.send_error_response( - "Full scrapes are not allowed".into(), - Some(ErrorResponseAction::Scrape), - None, - ) - .await?; - - return Ok(()); - }; + if let Some(AnnounceEvent::Stopped) = request.event { + announced_info_hashes.remove(&request.info_hash); + } - let mut info_hashes_by_worker: BTreeMap> = BTreeMap::new(); + // Drop Rc borrow before awaiting + drop(announced_info_hashes); - for info_hash in info_hashes.as_vec() { - let info_hashes = info_hashes_by_worker - .entry(calculate_in_message_consumer_index(&self.config, info_hash)) - .or_default(); + let in_message = InMessage::AnnounceRequest(request); - info_hashes.push(info_hash); - } + let consumer_index = calculate_in_message_consumer_index(&self.config, info_hash); - let pending_worker_out_messages = info_hashes_by_worker.len(); + // Only fails when receiver is closed + self.in_message_senders + .send_to( + consumer_index, + (self.make_connection_meta(None), in_message), + ) + .await + .unwrap(); + } else { + self.send_error_response( + "Info hash not allowed".into(), + Some(ErrorResponseAction::Announce), + Some(info_hash), + ) + .await?; + } - let pending_scrape_response = PendingScrapeResponse { - pending_worker_out_messages, - stats: Default::default(), - }; + Ok(()) + } - let pending_scrape_id: u8 = self - .pending_scrape_slab - .borrow_mut() - .insert(pending_scrape_response) - .try_into() - .with_context(|| "Reached 256 pending scrape responses")?; + async fn handle_scrape_request(&mut self, request: ScrapeRequest) -> anyhow::Result<()> { + #[cfg(feature = "metrics")] + ::metrics::increment_counter!( + "aquatic_requests_total", + "type" => "scrape", + "ip_version" => ip_version_to_metrics_str(self.ip_version), + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), + ); - let meta = self.make_connection_meta(Some(PendingScrapeId(pending_scrape_id))); + let info_hashes = if let Some(info_hashes) = request.info_hashes { + info_hashes + } else { + // If request.info_hashes is empty, don't return scrape for all + // torrents, even though reference server does it. It is too expensive. + self.send_error_response( + "Full scrapes are not allowed".into(), + Some(ErrorResponseAction::Scrape), + None, + ) + .await?; + + return Ok(()); + }; - for (consumer_index, info_hashes) in info_hashes_by_worker { - let in_message = InMessage::ScrapeRequest(ScrapeRequest { - action: ScrapeAction::Scrape, - info_hashes: Some(ScrapeRequestInfoHashes::Multiple(info_hashes)), - }); + let mut info_hashes_by_worker: BTreeMap> = BTreeMap::new(); - // Only fails when receiver is closed - self.in_message_senders - .send_to(consumer_index, (meta, in_message)) - .await - .unwrap(); - } - } + for info_hash in info_hashes.as_vec() { + let info_hashes = info_hashes_by_worker + .entry(calculate_in_message_consumer_index(&self.config, info_hash)) + .or_default(); + + info_hashes.push(info_hash); + } + + let pending_worker_out_messages = info_hashes_by_worker.len(); + + let pending_scrape_response = PendingScrapeResponse { + pending_worker_out_messages, + stats: Default::default(), + }; + + let pending_scrape_id: u8 = self + .pending_scrape_slab + .borrow_mut() + .insert(pending_scrape_response) + .try_into() + .with_context(|| "Reached 256 pending scrape responses")?; + + let meta = self.make_connection_meta(Some(PendingScrapeId(pending_scrape_id))); + + for (consumer_index, info_hashes) in info_hashes_by_worker { + let in_message = InMessage::ScrapeRequest(ScrapeRequest { + action: ScrapeAction::Scrape, + info_hashes: Some(ScrapeRequestInfoHashes::Multiple(info_hashes)), + }); + + // Only fails when receiver is closed + self.in_message_senders + .send_to(consumer_index, (meta, in_message)) + .await + .unwrap(); } Ok(()) @@ -485,31 +484,28 @@ impl ConnectionWriter { .pending_scrape_id .expect("meta.pending_scrape_id not set"); - let finished = if let Some(pending) = Slab::get_mut( - &mut RefCell::borrow_mut(&self.pending_scrape_slab), - pending_scrape_id.0 as usize, - ) { - pending.stats.extend(out_message.files); - pending.pending_worker_out_messages -= 1; + let mut pending_responses = self.pending_scrape_slab.borrow_mut(); - pending.pending_worker_out_messages == 0 - } else { - return Err(anyhow::anyhow!("pending scrape not found in slab")); - }; + let pending_response = pending_responses + .get_mut(pending_scrape_id.0 as usize) + .ok_or(anyhow::anyhow!("pending scrape not found in slab"))?; + + pending_response.stats.extend(out_message.files); + pending_response.pending_worker_out_messages -= 1; - if finished { - let out_message = { - let mut slab = RefCell::borrow_mut(&self.pending_scrape_slab); + if pending_response.pending_worker_out_messages == 0 { + let pending_response = + pending_responses.remove(pending_scrape_id.0 as usize); - let pending = slab.remove(pending_scrape_id.0 as usize); + pending_responses.shrink_to_fit(); - slab.shrink_to_fit(); + let out_message = OutMessage::ScrapeResponse(ScrapeResponse { + action: ScrapeAction::Scrape, + files: pending_response.stats, + }); - OutMessage::ScrapeResponse(ScrapeResponse { - action: ScrapeAction::Scrape, - files: pending.stats, - }) - }; + // Drop Rc borrow before awaiting + drop(pending_responses); self.send_out_message(&out_message).await?; } @@ -522,72 +518,63 @@ impl ConnectionWriter { } async fn send_out_message(&mut self, out_message: &OutMessage) -> anyhow::Result<()> { - let result = timeout(Duration::from_secs(10), async { - let result = - futures::SinkExt::send(&mut self.ws_out, out_message.to_ws_message()).await; - - Ok(result) + timeout(Duration::from_secs(10), async { + Ok(futures::SinkExt::send(&mut self.ws_out, out_message.to_ws_message()).await) }) - .await; - - match result { - Ok(Ok(())) => { - if let OutMessage::AnnounceResponse(_) | OutMessage::ScrapeResponse(_) = out_message - { - *self.connection_valid_until.borrow_mut() = ValidUntil::new( - self.server_start_instant, - self.config.cleaning.max_connection_idle, - ); - } + .await + .map_err(|err| { + anyhow::anyhow!("send_out_message: sending to peer took too long: {:#}", err) + })? + .with_context(|| "send_out_message")?; + + if let OutMessage::AnnounceResponse(_) | OutMessage::ScrapeResponse(_) = out_message { + *self.connection_valid_until.borrow_mut() = ValidUntil::new( + self.server_start_instant, + self.config.cleaning.max_connection_idle, + ); + } - #[cfg(feature = "metrics")] - { - let out_message_type = match &out_message { - OutMessage::OfferOutMessage(_) => "offer", - OutMessage::AnswerOutMessage(_) => "offer_answer", - OutMessage::AnnounceResponse(_) => "announce", - OutMessage::ScrapeResponse(_) => "scrape", - OutMessage::ErrorResponse(_) => "error", - }; + #[cfg(feature = "metrics")] + { + let out_message_type = match &out_message { + OutMessage::OfferOutMessage(_) => "offer", + OutMessage::AnswerOutMessage(_) => "offer_answer", + OutMessage::AnnounceResponse(_) => "announce", + OutMessage::ScrapeResponse(_) => "scrape", + OutMessage::ErrorResponse(_) => "error", + }; - ::metrics::increment_counter!( - "aquatic_responses_total", - "type" => out_message_type, - "ip_version" => ip_version_to_metrics_str(self.ip_version), - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ); + ::metrics::increment_counter!( + "aquatic_responses_total", + "type" => out_message_type, + "ip_version" => ip_version_to_metrics_str(self.ip_version), + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), + ); - if let Some((peer_client, prefix)) = - self.clean_up_data.opt_peer_client.borrow().as_ref() - { - // As long as connection is still alive, increment peer client - // gauges by zero to prevent them from being removed due to - // idleness + if let Some((peer_client, prefix)) = + self.clean_up_data.opt_peer_client.borrow().as_ref() + { + // As long as connection is still alive, increment peer client + // gauges by zero to prevent them from being removed due to + // idleness - ::metrics::increment_gauge!( - "aquatic_peer_clients", - 0.0, - "client" => peer_client.to_string(), - ); + ::metrics::increment_gauge!( + "aquatic_peer_clients", + 0.0, + "client" => peer_client.to_string(), + ); - if self.config.metrics.peer_id_prefixes { - ::metrics::increment_gauge!( - "aquatic_peer_id_prefixes", - 0.0, - "prefix_hex" => prefix.to_string(), - ); - } - } + if self.config.metrics.peer_id_prefixes { + ::metrics::increment_gauge!( + "aquatic_peer_id_prefixes", + 0.0, + "prefix_hex" => prefix.to_string(), + ); } - - Ok(()) } - Ok(Err(err)) => Err(err.into()), - Err(err) => Err(anyhow::anyhow!( - "send_out_message: sending to peer took too long: {}", - err - )), } + + Ok(()) } } From 923b3637e872e9957855ff5340a64d85d98f11a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Fri, 17 Nov 2023 18:16:29 +0100 Subject: [PATCH 2/4] http: allow disabling TLS, allow reverse proxies, general fixes --- CHANGELOG.md | 9 + Cargo.lock | 5 +- README.md | 10 +- TODO.md | 3 + crates/http/Cargo.toml | 4 +- crates/http/README.md | 7 +- crates/http/src/common.rs | 6 +- crates/http/src/config.rs | 39 +- crates/http/src/lib.rs | 38 +- crates/http/src/workers/socket.rs | 576 ------------------- crates/http/src/workers/socket/connection.rs | 471 +++++++++++++++ crates/http/src/workers/socket/mod.rs | 212 +++++++ crates/http/src/workers/socket/request.rs | 147 +++++ crates/http_load_test/Cargo.toml | 1 + crates/http_load_test/src/config.rs | 2 + crates/http_load_test/src/main.rs | 12 +- crates/http_load_test/src/network.rs | 77 ++- crates/http_protocol/src/request.rs | 33 +- 18 files changed, 987 insertions(+), 665 deletions(-) delete mode 100644 crates/http/src/workers/socket.rs create mode 100644 crates/http/src/workers/socket/connection.rs create mode 100644 crates/http/src/workers/socket/mod.rs create mode 100644 crates/http/src/workers/socket/request.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index ddefb9d2..747bd56a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,15 @@ * Reload TLS certificate (and key) on SIGUSR1 +#### Changed + +* Allow running without TLS +* Allow running behind reverse proxy + +#### Fixed + +* Fix bug where clean up after closing connections wasn't always done + ### aquatic_ws #### Added diff --git a/Cargo.lock b/Cargo.lock index 2b2a24c1..a20e9cf1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -125,6 +125,7 @@ dependencies = [ "futures-lite", "futures-rustls", "glommio", + "httparse", "itoa", "libc", "log", @@ -140,8 +141,9 @@ dependencies = [ "rustls-pemfile", "serde", "signal-hook", - "slab", + "slotmap", "socket2 0.5.4", + "thiserror", ] [[package]] @@ -152,6 +154,7 @@ dependencies = [ "aquatic_common", "aquatic_http_protocol", "aquatic_toml_config", + "futures", "futures-lite", "futures-rustls", "glommio", diff --git a/README.md b/README.md index 8cdcbb6b..e0c2ea0a 100644 --- a/README.md +++ b/README.md @@ -9,11 +9,11 @@ of sub-implementations for different protocols: [aquatic_http]: ./crates/http [aquatic_ws]: ./crates/ws -| Name | Protocol | OS requirements | -|----------------|---------------------------------|-----------------| -| [aquatic_udp] | BitTorrent over UDP | Unix-like | -| [aquatic_http] | BitTorrent over HTTP over TLS | Linux 5.8+ | -| [aquatic_ws] | WebTorrent, optionally over TLS | Linux 5.8+ | +| Name | Protocol | OS requirements | +|----------------|-------------------------------------------|-----------------| +| [aquatic_udp] | BitTorrent over UDP | Unix-like | +| [aquatic_http] | BitTorrent over HTTP, optionally over TLS | Linux 5.8+ | +| [aquatic_ws] | WebTorrent, optionally over TLS | Linux 5.8+ | Features at a glance: diff --git a/TODO.md b/TODO.md index da43bc66..abff8c28 100644 --- a/TODO.md +++ b/TODO.md @@ -5,6 +5,9 @@ * aquatic_ws * Validate SDP data +* http + * panic sentinel not working + ## Medium priority * stagger cleaning tasks? diff --git a/crates/http/Cargo.toml b/crates/http/Cargo.toml index 8f52edd9..2660d466 100644 --- a/crates/http/Cargo.toml +++ b/crates/http/Cargo.toml @@ -35,6 +35,7 @@ futures = "0.3" futures-lite = "1" futures-rustls = "0.24" glommio = "0.8" +httparse = "1" itoa = "1" libc = "0.2" log = "0.4" @@ -48,8 +49,9 @@ rand = { version = "0.8", features = ["small_rng"] } rustls-pemfile = "1" serde = { version = "1", features = ["derive"] } signal-hook = { version = "0.3" } -slab = "0.4" +slotmap = "1" socket2 = { version = "0.5", features = ["all"] } +thiserror = "1" [dev-dependencies] quickcheck = "1" diff --git a/crates/http/README.md b/crates/http/README.md index eecad8ed..fbc0372b 100644 --- a/crates/http/README.md +++ b/crates/http/README.md @@ -49,12 +49,9 @@ Generate the configuration file: Make necessary adjustments to the file. You will likely want to adjust `address` (listening address) under the `network` section. -`aquatic_http` __only__ runs over TLS, so configuring certificate and private -key files is required. +To run over TLS, configure certificate and private key files. -Running behind a reverse proxy is currently not supported due to the -[difficulties of determining the originating IP address](https://adam-p.ca/blog/2022/03/x-forwarded-for/) -without knowing the exact setup. +Running behind a reverse proxy is supported. ### Running diff --git a/crates/http/src/common.rs b/crates/http/src/common.rs index 7f2b3568..9c326d6f 100644 --- a/crates/http/src/common.rs +++ b/crates/http/src/common.rs @@ -10,12 +10,14 @@ use aquatic_http_protocol::{ response::{AnnounceResponse, ScrapeResponse}, }; use glommio::channels::shared_channel::SharedSender; +use slotmap::new_key_type; #[derive(Copy, Clone, Debug)] pub struct ConsumerId(pub usize); -#[derive(Clone, Copy, Debug)] -pub struct ConnectionId(pub usize); +new_key_type! { + pub struct ConnectionId; +} #[derive(Debug)] pub enum ChannelRequest { diff --git a/crates/http/src/config.rs b/crates/http/src/config.rs index 7696bf0c..f7a46663 100644 --- a/crates/http/src/config.rs +++ b/crates/http/src/config.rs @@ -5,13 +5,18 @@ use aquatic_common::{ privileges::PrivilegeConfig, }; use aquatic_toml_config::TomlConfig; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use aquatic_common::cli::LogLevel; +#[derive(Clone, Copy, Debug, PartialEq, Serialize, TomlConfig, Deserialize, Default)] +#[serde(rename_all = "snake_case")] +pub enum ReverseProxyPeerIpHeaderFormat { + #[default] + LastAddress, +} + /// aquatic_http configuration -/// -/// Does not support running behind a reverse proxy. #[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] #[serde(default, deny_unknown_fields)] pub struct Config { @@ -76,29 +81,55 @@ pub struct NetworkConfig { pub only_ipv6: bool, /// Maximum number of pending TCP connections pub tcp_backlog: i32, - /// Path to TLS certificate (DER-encoded X.509) + /// Enable TLS /// /// The TLS files are read on start and when the program receives `SIGUSR1`. /// If initial parsing fails, the program exits. Later failures result in /// in emitting of an error-level log message, while successful updates /// result in emitting of an info-level log message. Updates only affect /// new connections. + pub enable_tls: bool, + /// Path to TLS certificate (DER-encoded X.509) pub tls_certificate_path: PathBuf, /// Path to TLS private key (DER-encoded ASN.1 in PKCS#8 or PKCS#1 format) pub tls_private_key_path: PathBuf, /// Keep connections alive after sending a response pub keep_alive: bool, + /// Does tracker run behind reverse proxy? + /// + /// MUST be set to false if not running behind reverse proxy. + /// + /// If set to true, make sure that reverse_proxy_ip_header_name and + /// reverse_proxy_ip_header_format are set to match your reverse proxy + /// setup. + /// + /// More info on what can go wrong when running behind reverse proxies: + /// https://adam-p.ca/blog/2022/03/x-forwarded-for/ + pub runs_behind_reverse_proxy: bool, + /// Name of header set by reverse proxy to indicate peer ip + pub reverse_proxy_ip_header_name: String, + /// How to extract peer IP from header field + /// + /// Options: + /// - last_address: use the last address in the last instance of the + /// header. Works with typical multi-IP setups (e.g., "X-Forwarded-For") + /// as well as for single-IP setups (e.g., nginx "X-Real-IP") + pub reverse_proxy_ip_header_format: ReverseProxyPeerIpHeaderFormat, } impl Default for NetworkConfig { fn default() -> Self { Self { address: SocketAddr::from(([0, 0, 0, 0], 3000)), + enable_tls: false, tls_certificate_path: "".into(), tls_private_key_path: "".into(), only_ipv6: false, tcp_backlog: 1024, keep_alive: true, + runs_behind_reverse_proxy: false, + reverse_proxy_ip_header_name: "X-Forwarded-For".into(), + reverse_proxy_ip_header_format: Default::default(), } } } diff --git a/crates/http/src/lib.rs b/crates/http/src/lib.rs index d4e18640..3bb04840 100644 --- a/crates/http/src/lib.rs +++ b/crates/http/src/lib.rs @@ -24,7 +24,7 @@ mod common; pub mod config; mod workers; -pub const APP_NAME: &str = "aquatic_http: BitTorrent tracker (HTTP over TLS)"; +pub const APP_NAME: &str = "aquatic_http: HTTP BitTorrent tracker"; pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION"); const SHARED_CHANNEL_SIZE: usize = 1024; @@ -58,10 +58,14 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel(); let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); - let tls_config = Arc::new(ArcSwap::from_pointee(create_rustls_config( - &config.network.tls_certificate_path, - &config.network.tls_private_key_path, - )?)); + let opt_tls_config = if config.network.enable_tls { + Some(Arc::new(ArcSwap::from_pointee(create_rustls_config( + &config.network.tls_certificate_path, + &config.network.tls_private_key_path, + )?))) + } else { + None + }; let server_start_instant = ServerStartInstant::new(); @@ -71,7 +75,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { let sentinel = sentinel.clone(); let config = config.clone(); let state = state.clone(); - let tls_config = tls_config.clone(); + let opt_tls_config = opt_tls_config.clone(); let request_mesh_builder = request_mesh_builder.clone(); let priv_dropper = priv_dropper.clone(); @@ -89,7 +93,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { sentinel, config, state, - tls_config, + opt_tls_config, request_mesh_builder, priv_dropper, server_start_instant, @@ -146,16 +150,18 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { SIGUSR1 => { let _ = update_access_list(&config.access_list, &state.access_list); - match create_rustls_config( - &config.network.tls_certificate_path, - &config.network.tls_private_key_path, - ) { - Ok(config) => { - tls_config.store(Arc::new(config)); - - ::log::info!("successfully updated tls config"); + if let Some(tls_config) = opt_tls_config.as_ref() { + match create_rustls_config( + &config.network.tls_certificate_path, + &config.network.tls_private_key_path, + ) { + Ok(config) => { + tls_config.store(Arc::new(config)); + + ::log::info!("successfully updated tls config"); + } + Err(err) => ::log::error!("could not update tls config: {:#}", err), } - Err(err) => ::log::error!("could not update tls config: {:#}", err), } } SIGTERM => { diff --git a/crates/http/src/workers/socket.rs b/crates/http/src/workers/socket.rs deleted file mode 100644 index a3751ece..00000000 --- a/crates/http/src/workers/socket.rs +++ /dev/null @@ -1,576 +0,0 @@ -use std::cell::RefCell; -use std::collections::BTreeMap; -use std::os::unix::prelude::{FromRawFd, IntoRawFd}; -use std::rc::Rc; -use std::sync::Arc; -use std::time::Duration; - -use anyhow::Context; -use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; -use aquatic_common::privileges::PrivilegeDropper; -use aquatic_common::rustls_config::RustlsConfig; -use aquatic_common::{CanonicalSocketAddr, PanicSentinel, ServerStartInstant}; -use aquatic_http_protocol::common::InfoHash; -use aquatic_http_protocol::request::{Request, RequestParseError, ScrapeRequest}; -use aquatic_http_protocol::response::{ - FailureResponse, Response, ScrapeResponse, ScrapeStatistics, -}; -use arc_swap::ArcSwap; -use either::Either; -use futures::stream::FuturesUnordered; -use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt}; -use futures_rustls::server::TlsStream; -use futures_rustls::TlsAcceptor; -use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; -use glommio::channels::shared_channel::{self, SharedReceiver}; -use glommio::net::{TcpListener, TcpStream}; -use glommio::task::JoinHandle; -use glommio::timer::TimerActionRepeat; -use glommio::{enclose, prelude::*}; -use once_cell::sync::Lazy; -use slab::Slab; - -use crate::common::*; -use crate::config::Config; - -const REQUEST_BUFFER_SIZE: usize = 2048; -const RESPONSE_BUFFER_SIZE: usize = 4096; - -const RESPONSE_HEADER_A: &[u8] = b"HTTP/1.1 200 OK\r\nContent-Length: "; -const RESPONSE_HEADER_B: &[u8] = b" "; -const RESPONSE_HEADER_C: &[u8] = b"\r\n\r\n"; - -#[cfg(feature = "metrics")] -thread_local! { static WORKER_INDEX: ::std::cell::Cell = Default::default() } - -static RESPONSE_HEADER: Lazy> = - Lazy::new(|| [RESPONSE_HEADER_A, RESPONSE_HEADER_B, RESPONSE_HEADER_C].concat()); - -struct PendingScrapeResponse { - pending_worker_responses: usize, - stats: BTreeMap, -} - -struct ConnectionReference { - task_handle: Option>, - valid_until: ValidUntil, -} - -pub async fn run_socket_worker( - _sentinel: PanicSentinel, - config: Config, - state: State, - tls_config: Arc>, - request_mesh_builder: MeshBuilder, - priv_dropper: PrivilegeDropper, - server_start_instant: ServerStartInstant, - worker_index: usize, -) { - #[cfg(feature = "metrics")] - WORKER_INDEX.with(|index| index.set(worker_index)); - - let config = Rc::new(config); - let access_list = state.access_list; - - let listener = create_tcp_listener(&config, priv_dropper).expect("create tcp listener"); - - let (request_senders, _) = request_mesh_builder.join(Role::Producer).await.unwrap(); - let request_senders = Rc::new(request_senders); - - let connection_slab = Rc::new(RefCell::new(Slab::new())); - - TimerActionRepeat::repeat(enclose!((config, connection_slab) move || { - clean_connections( - config.clone(), - connection_slab.clone(), - server_start_instant, - ) - })); - - let mut incoming = listener.incoming(); - - while let Some(stream) = incoming.next().await { - match stream { - Ok(stream) => { - let key = connection_slab.borrow_mut().insert(ConnectionReference { - task_handle: None, - valid_until: ValidUntil::new( - server_start_instant, - config.cleaning.max_connection_idle, - ), - }); - - let task_handle = spawn_local(enclose!((config, access_list, request_senders, tls_config, connection_slab) async move { - let result = match stream.peer_addr() { - Ok(peer_addr) => { - let peer_addr = CanonicalSocketAddr::new(peer_addr); - - #[cfg(feature = "metrics")] - let ip_version_str = peer_addr_to_ip_version_str(&peer_addr); - - #[cfg(feature = "metrics")] - ::metrics::increment_gauge!( - "aquatic_active_connections", - 1.0, - "ip_version" => ip_version_str, - "worker_index" => worker_index.to_string(), - ); - - let result = Connection::run( - config, - access_list, - request_senders, - server_start_instant, - ConnectionId(key), - tls_config, - connection_slab.clone(), - stream, - peer_addr - ).await; - - #[cfg(feature = "metrics")] - ::metrics::decrement_gauge!( - "aquatic_active_connections", - 1.0, - "ip_version" => ip_version_str, - "worker_index" => worker_index.to_string(), - ); - - result - } - Err(err) => { - Err(anyhow::anyhow!("Couldn't get peer addr: {:?}", err)) - } - }; - - if let Err(err) = result { - ::log::debug!("Connection::run() error: {:?}", err); - } - - connection_slab.borrow_mut().try_remove(key); - })) - .detach(); - - if let Some(reference) = connection_slab.borrow_mut().get_mut(key) { - reference.task_handle = Some(task_handle); - } - } - Err(err) => { - ::log::error!("accept connection: {:?}", err); - } - } - } -} - -async fn clean_connections( - config: Rc, - connection_slab: Rc>>, - server_start_instant: ServerStartInstant, -) -> Option { - let now = server_start_instant.seconds_elapsed(); - - connection_slab.borrow_mut().retain(|_, reference| { - if reference.valid_until.valid(now) { - true - } else { - if let Some(ref handle) = reference.task_handle { - handle.cancel(); - } - - false - } - }); - - connection_slab.borrow_mut().shrink_to_fit(); - - Some(Duration::from_secs( - config.cleaning.connection_cleaning_interval, - )) -} - -struct Connection { - config: Rc, - access_list_cache: AccessListCache, - request_senders: Rc>, - connection_slab: Rc>>, - server_start_instant: ServerStartInstant, - stream: TlsStream, - peer_addr: CanonicalSocketAddr, - connection_id: ConnectionId, - request_buffer: [u8; REQUEST_BUFFER_SIZE], - request_buffer_position: usize, - response_buffer: [u8; RESPONSE_BUFFER_SIZE], -} - -impl Connection { - async fn run( - config: Rc, - access_list: Arc, - request_senders: Rc>, - server_start_instant: ServerStartInstant, - connection_id: ConnectionId, - tls_config: Arc>, - connection_slab: Rc>>, - stream: TcpStream, - peer_addr: CanonicalSocketAddr, - ) -> anyhow::Result<()> { - let tls_acceptor: TlsAcceptor = tls_config.load_full().into(); - let stream = tls_acceptor.accept(stream).await?; - - let mut response_buffer = [0; RESPONSE_BUFFER_SIZE]; - - response_buffer[..RESPONSE_HEADER.len()].copy_from_slice(&RESPONSE_HEADER); - - let mut conn = Connection { - config: config.clone(), - access_list_cache: create_access_list_cache(&access_list), - request_senders: request_senders.clone(), - connection_slab, - server_start_instant, - stream, - peer_addr, - connection_id, - request_buffer: [0; REQUEST_BUFFER_SIZE], - request_buffer_position: 0, - response_buffer, - }; - - conn.run_request_response_loop().await?; - - Ok(()) - } - - async fn run_request_response_loop(&mut self) -> anyhow::Result<()> { - loop { - let response = match self.read_request().await? { - Either::Left(response) => Response::Failure(response), - Either::Right(request) => self.handle_request(request).await?, - }; - - self.write_response(&response).await?; - - if matches!(response, Response::Failure(_)) || !self.config.network.keep_alive { - let _ = self - .stream - .get_ref() - .0 - .shutdown(std::net::Shutdown::Both) - .await; - - break; - } - } - - Ok(()) - } - - async fn read_request(&mut self) -> anyhow::Result> { - self.request_buffer_position = 0; - - loop { - if self.request_buffer_position == self.request_buffer.len() { - return Err(anyhow::anyhow!("request buffer is full")); - } - - let bytes_read = self - .stream - .read(&mut self.request_buffer[self.request_buffer_position..]) - .await?; - - if bytes_read == 0 { - return Err(anyhow::anyhow!("peer closed connection")); - } - - self.request_buffer_position += bytes_read; - - match Request::from_bytes(&self.request_buffer[..self.request_buffer_position]) { - Ok(request) => { - return Ok(Either::Right(request)); - } - Err(RequestParseError::Invalid(err)) => { - let response = FailureResponse { - failure_reason: "Invalid request".into(), - }; - - ::log::debug!("Invalid request: {:#}", err); - - return Ok(Either::Left(response)); - } - Err(RequestParseError::NeedMoreData) => { - ::log::debug!( - "need more request data. current data: {}", - &self.request_buffer[..self.request_buffer_position].escape_ascii() - ); - } - } - } - } - - /// Take a request and: - /// - Update connection ValidUntil - /// - Return error response if request is not allowed - /// - If it is an announce request, send it to swarm workers an await a - /// response - /// - If it is a scrape requests, split it up, pass on the parts to - /// relevant swarm workers and await a response - async fn handle_request(&mut self, request: Request) -> anyhow::Result { - if let Ok(mut slab) = self.connection_slab.try_borrow_mut() { - if let Some(reference) = slab.get_mut(self.connection_id.0) { - reference.valid_until = ValidUntil::new( - self.server_start_instant, - self.config.cleaning.max_connection_idle, - ); - } - } - - match request { - Request::Announce(request) => { - #[cfg(feature = "metrics")] - ::metrics::increment_counter!( - "aquatic_requests_total", - "type" => "announce", - "ip_version" => peer_addr_to_ip_version_str(&self.peer_addr), - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ); - - let info_hash = request.info_hash; - - if self - .access_list_cache - .load() - .allows(self.config.access_list.mode, &info_hash.0) - { - let (response_sender, response_receiver) = shared_channel::new_bounded(1); - - let request = ChannelRequest::Announce { - request, - peer_addr: self.peer_addr, - response_sender, - }; - - let consumer_index = calculate_request_consumer_index(&self.config, info_hash); - - // Only fails when receiver is closed - self.request_senders - .send_to(consumer_index, request) - .await - .unwrap(); - - response_receiver - .connect() - .await - .recv() - .await - .ok_or_else(|| anyhow::anyhow!("response sender closed")) - .map(Response::Announce) - } else { - let response = Response::Failure(FailureResponse { - failure_reason: "Info hash not allowed".into(), - }); - - Ok(response) - } - } - Request::Scrape(ScrapeRequest { info_hashes }) => { - #[cfg(feature = "metrics")] - ::metrics::increment_counter!( - "aquatic_requests_total", - "type" => "scrape", - "ip_version" => peer_addr_to_ip_version_str(&self.peer_addr), - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ); - - let mut info_hashes_by_worker: BTreeMap> = BTreeMap::new(); - - for info_hash in info_hashes.into_iter() { - let info_hashes = info_hashes_by_worker - .entry(calculate_request_consumer_index(&self.config, info_hash)) - .or_default(); - - info_hashes.push(info_hash); - } - - let pending_worker_responses = info_hashes_by_worker.len(); - let mut response_receivers = Vec::with_capacity(pending_worker_responses); - - for (consumer_index, info_hashes) in info_hashes_by_worker { - let (response_sender, response_receiver) = shared_channel::new_bounded(1); - - response_receivers.push(response_receiver); - - let request = ChannelRequest::Scrape { - request: ScrapeRequest { info_hashes }, - peer_addr: self.peer_addr, - response_sender, - }; - - // Only fails when receiver is closed - self.request_senders - .send_to(consumer_index, request) - .await - .unwrap(); - } - - let pending_scrape_response = PendingScrapeResponse { - pending_worker_responses, - stats: Default::default(), - }; - - self.wait_for_scrape_responses(response_receivers, pending_scrape_response) - .await - } - } - } - - /// Wait for partial scrape responses to arrive, - /// return full response - async fn wait_for_scrape_responses( - &self, - response_receivers: Vec>, - mut pending: PendingScrapeResponse, - ) -> anyhow::Result { - let mut responses = response_receivers - .into_iter() - .map(|receiver| async { receiver.connect().await.recv().await }) - .collect::>(); - - loop { - let response = responses - .next() - .await - .ok_or_else(|| { - anyhow::anyhow!("stream ended before all partial scrape responses received") - })? - .ok_or_else(|| { - anyhow::anyhow!( - "wait_for_scrape_response: can't receive response, sender is closed" - ) - })?; - - pending.stats.extend(response.files); - pending.pending_worker_responses -= 1; - - if pending.pending_worker_responses == 0 { - let response = Response::Scrape(ScrapeResponse { - files: pending.stats, - }); - - break Ok(response); - } - } - } - - async fn write_response(&mut self, response: &Response) -> anyhow::Result<()> { - // Write body and final newline to response buffer - - let mut position = RESPONSE_HEADER.len(); - - let body_len = response.write(&mut &mut self.response_buffer[position..])?; - - position += body_len; - - if position + 2 > self.response_buffer.len() { - ::log::error!("Response buffer is too short for response"); - - return Err(anyhow::anyhow!("Response buffer is too short for response")); - } - - (&mut self.response_buffer[position..position + 2]).copy_from_slice(b"\r\n"); - - position += 2; - - let content_len = body_len + 2; - - // Clear content-len header value - - { - let start = RESPONSE_HEADER_A.len(); - let end = start + RESPONSE_HEADER_B.len(); - - (&mut self.response_buffer[start..end]).copy_from_slice(RESPONSE_HEADER_B); - } - - // Set content-len header value - - { - let mut buf = ::itoa::Buffer::new(); - let content_len_bytes = buf.format(content_len).as_bytes(); - - let start = RESPONSE_HEADER_A.len(); - let end = start + content_len_bytes.len(); - - (&mut self.response_buffer[start..end]).copy_from_slice(content_len_bytes); - } - - // Write buffer to stream - - self.stream.write(&self.response_buffer[..position]).await?; - self.stream.flush().await?; - - #[cfg(feature = "metrics")] - { - let response_type = match response { - Response::Announce(_) => "announce", - Response::Scrape(_) => "scrape", - Response::Failure(_) => "error", - }; - - ::metrics::increment_counter!( - "aquatic_responses_total", - "type" => response_type, - "ip_version" => peer_addr_to_ip_version_str(&self.peer_addr), - "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), - ); - } - - Ok(()) - } -} - -fn calculate_request_consumer_index(config: &Config, info_hash: InfoHash) -> usize { - (info_hash.0[0] as usize) % config.swarm_workers -} - -fn create_tcp_listener( - config: &Config, - priv_dropper: PrivilegeDropper, -) -> anyhow::Result { - let domain = if config.network.address.is_ipv4() { - socket2::Domain::IPV4 - } else { - socket2::Domain::IPV6 - }; - - let socket = socket2::Socket::new(domain, socket2::Type::STREAM, Some(socket2::Protocol::TCP))?; - - if config.network.only_ipv6 { - socket - .set_only_v6(true) - .with_context(|| "socket: set only ipv6")?; - } - - socket - .set_reuse_port(true) - .with_context(|| "socket: set reuse port")?; - - socket - .bind(&config.network.address.into()) - .with_context(|| format!("socket: bind to {}", config.network.address))?; - - socket - .listen(config.network.tcp_backlog) - .with_context(|| format!("socket: listen on {}", config.network.address))?; - - priv_dropper.after_socket_creation()?; - - Ok(unsafe { TcpListener::from_raw_fd(socket.into_raw_fd()) }) -} - -#[cfg(feature = "metrics")] -fn peer_addr_to_ip_version_str(addr: &CanonicalSocketAddr) -> &'static str { - if addr.is_ipv4() { - "4" - } else { - "6" - } -} diff --git a/crates/http/src/workers/socket/connection.rs b/crates/http/src/workers/socket/connection.rs new file mode 100644 index 00000000..7db51464 --- /dev/null +++ b/crates/http/src/workers/socket/connection.rs @@ -0,0 +1,471 @@ +use std::cell::RefCell; +use std::collections::BTreeMap; +use std::net::SocketAddr; +use std::rc::Rc; +use std::sync::Arc; + +use anyhow::Context; +use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; +use aquatic_common::rustls_config::RustlsConfig; +use aquatic_common::{CanonicalSocketAddr, ServerStartInstant}; +use aquatic_http_protocol::common::InfoHash; +use aquatic_http_protocol::request::{Request, ScrapeRequest}; +use aquatic_http_protocol::response::{ + FailureResponse, Response, ScrapeResponse, ScrapeStatistics, +}; +use arc_swap::ArcSwap; +use either::Either; +use futures::stream::FuturesUnordered; +use futures_lite::future::race; +use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt}; +use futures_rustls::TlsAcceptor; +use glommio::channels::channel_mesh::Senders; +use glommio::channels::local_channel::LocalReceiver; +use glommio::channels::shared_channel::{self, SharedReceiver}; +use glommio::net::TcpStream; +use once_cell::sync::Lazy; + +use crate::common::*; +use crate::config::Config; + +use super::request::{parse_request, RequestParseError}; +#[cfg(feature = "metrics")] +use super::{peer_addr_to_ip_version_str, WORKER_INDEX}; + +const REQUEST_BUFFER_SIZE: usize = 2048; +const RESPONSE_BUFFER_SIZE: usize = 4096; + +const RESPONSE_HEADER_A: &[u8] = b"HTTP/1.1 200 OK\r\nContent-Length: "; +const RESPONSE_HEADER_B: &[u8] = b" "; +const RESPONSE_HEADER_C: &[u8] = b"\r\n\r\n"; + +static RESPONSE_HEADER: Lazy> = + Lazy::new(|| [RESPONSE_HEADER_A, RESPONSE_HEADER_B, RESPONSE_HEADER_C].concat()); + +struct PendingScrapeResponse { + pending_worker_responses: usize, + stats: BTreeMap, +} + +#[derive(Debug, thiserror::Error)] +pub enum ConnectionError { + #[error("inactive")] + Inactive, + #[error("socket peer addr extraction failed")] + NoSocketPeerAddr(String), + #[error("request buffer full")] + RequestBufferFull, + #[error("response buffer full")] + ResponseBufferFull, + #[error("response buffer write error: {0}")] + ResponseBufferWrite(::std::io::Error), + #[error("peer closed")] + PeerClosed, + #[error("response sender closed")] + ResponseSenderClosed, + #[error("scrape channel error: {0}")] + ScrapeChannelError(&'static str), + #[error(transparent)] + Other(#[from] anyhow::Error), +} + +pub(super) async fn run_connection( + config: Rc, + access_list: Arc, + request_senders: Rc>, + server_start_instant: ServerStartInstant, + opt_tls_config: Option>>, + valid_until: Rc>, + close_conn_receiver: LocalReceiver<()>, + stream: TcpStream, +) -> Result<(), ConnectionError> { + let access_list_cache = create_access_list_cache(&access_list); + let request_buffer = Box::new([0u8; REQUEST_BUFFER_SIZE]); + + let mut response_buffer = Box::new([0; RESPONSE_BUFFER_SIZE]); + + response_buffer[..RESPONSE_HEADER.len()].copy_from_slice(&RESPONSE_HEADER); + + let remote_addr = stream + .peer_addr() + .map_err(|err| ConnectionError::NoSocketPeerAddr(err.to_string()))?; + + let opt_peer_addr = if config.network.runs_behind_reverse_proxy { + None + } else { + Some(CanonicalSocketAddr::new(remote_addr)) + }; + + let peer_port = remote_addr.port(); + + if let Some(tls_config) = opt_tls_config { + let tls_acceptor: TlsAcceptor = tls_config.load_full().into(); + let stream = tls_acceptor + .accept(stream) + .await + .with_context(|| "tls accept")?; + + let mut conn = Connection { + config, + access_list_cache, + request_senders, + valid_until, + server_start_instant, + opt_peer_addr, + peer_port, + request_buffer, + request_buffer_position: 0, + response_buffer, + stream, + }; + + conn.run(close_conn_receiver).await?; + } else { + let mut conn = Connection { + config, + access_list_cache, + request_senders, + valid_until, + server_start_instant, + opt_peer_addr, + peer_port, + request_buffer, + request_buffer_position: 0, + response_buffer, + stream, + }; + + conn.run(close_conn_receiver).await?; + } + + Ok(()) +} + +struct Connection { + config: Rc, + access_list_cache: AccessListCache, + request_senders: Rc>, + valid_until: Rc>, + server_start_instant: ServerStartInstant, + opt_peer_addr: Option, + peer_port: u16, + request_buffer: Box<[u8; REQUEST_BUFFER_SIZE]>, + request_buffer_position: usize, + response_buffer: Box<[u8; RESPONSE_BUFFER_SIZE]>, + stream: S, +} + +impl Connection +where + S: futures::AsyncRead + futures::AsyncWrite + Unpin + 'static, +{ + async fn run(&mut self, close_conn_receiver: LocalReceiver<()>) -> Result<(), ConnectionError> { + let f1 = async { self.run_request_response_loop().await }; + let f2 = async { + close_conn_receiver.recv().await; + + Err(ConnectionError::Inactive) + }; + + race(f1, f2).await + } + + async fn run_request_response_loop(&mut self) -> Result<(), ConnectionError> { + loop { + let response = match self.read_request().await? { + Either::Left(response) => Response::Failure(response), + Either::Right(request) => self.handle_request(request).await?, + }; + + self.write_response(&response).await?; + + if matches!(response, Response::Failure(_)) || !self.config.network.keep_alive { + break; + } + } + + Ok(()) + } + + async fn read_request(&mut self) -> Result, ConnectionError> { + self.request_buffer_position = 0; + + loop { + if self.request_buffer_position == self.request_buffer.len() { + return Err(ConnectionError::RequestBufferFull); + } + + let bytes_read = self + .stream + .read(&mut self.request_buffer[self.request_buffer_position..]) + .await + .with_context(|| "read")?; + + if bytes_read == 0 { + return Err(ConnectionError::PeerClosed); + } + + self.request_buffer_position += bytes_read; + + let buffer_slice = &self.request_buffer[..self.request_buffer_position]; + + match parse_request(&self.config, buffer_slice) { + Ok((request, opt_peer_ip)) => { + if self.config.network.runs_behind_reverse_proxy { + let peer_ip = opt_peer_ip + .expect("logic error: peer ip must have been extracted at this point"); + + self.opt_peer_addr = Some(CanonicalSocketAddr::new(SocketAddr::new( + peer_ip, + self.peer_port, + ))); + } + + return Ok(Either::Right(request)); + } + Err(RequestParseError::MoreDataNeeded) => continue, + Err(RequestParseError::RequiredPeerIpHeaderMissing(err)) => { + panic!("Tracker configured as running behind reverse proxy, but no corresponding IP header set in request. Please check your reverse proxy setup as well as your aquatic configuration. Error: {:#}", err); + } + Err(RequestParseError::Other(err)) => { + ::log::debug!("Failed parsing request: {:#}", err); + + let response = FailureResponse { + failure_reason: "Invalid request".into(), + }; + + return Ok(Either::Left(response)); + } + } + } + } + + /// Take a request and: + /// - Update connection ValidUntil + /// - Return error response if request is not allowed + /// - If it is an announce request, send it to swarm workers an await a + /// response + /// - If it is a scrape requests, split it up, pass on the parts to + /// relevant swarm workers and await a response + async fn handle_request(&mut self, request: Request) -> Result { + let peer_addr = self + .opt_peer_addr + .expect("peer addr should already have been extracted by now"); + + *self.valid_until.borrow_mut() = ValidUntil::new( + self.server_start_instant, + self.config.cleaning.max_connection_idle, + ); + + match request { + Request::Announce(request) => { + #[cfg(feature = "metrics")] + ::metrics::increment_counter!( + "aquatic_requests_total", + "type" => "announce", + "ip_version" => peer_addr_to_ip_version_str(&peer_addr), + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), + ); + + let info_hash = request.info_hash; + + if self + .access_list_cache + .load() + .allows(self.config.access_list.mode, &info_hash.0) + { + let (response_sender, response_receiver) = shared_channel::new_bounded(1); + + let request = ChannelRequest::Announce { + request, + peer_addr, + response_sender, + }; + + let consumer_index = calculate_request_consumer_index(&self.config, info_hash); + + // Only fails when receiver is closed + self.request_senders + .send_to(consumer_index, request) + .await + .unwrap(); + + response_receiver + .connect() + .await + .recv() + .await + .ok_or(ConnectionError::ResponseSenderClosed) + .map(Response::Announce) + } else { + let response = Response::Failure(FailureResponse { + failure_reason: "Info hash not allowed".into(), + }); + + Ok(response) + } + } + Request::Scrape(ScrapeRequest { info_hashes }) => { + #[cfg(feature = "metrics")] + ::metrics::increment_counter!( + "aquatic_requests_total", + "type" => "scrape", + "ip_version" => peer_addr_to_ip_version_str(&peer_addr), + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), + ); + + let mut info_hashes_by_worker: BTreeMap> = BTreeMap::new(); + + for info_hash in info_hashes.into_iter() { + let info_hashes = info_hashes_by_worker + .entry(calculate_request_consumer_index(&self.config, info_hash)) + .or_default(); + + info_hashes.push(info_hash); + } + + let pending_worker_responses = info_hashes_by_worker.len(); + let mut response_receivers = Vec::with_capacity(pending_worker_responses); + + for (consumer_index, info_hashes) in info_hashes_by_worker { + let (response_sender, response_receiver) = shared_channel::new_bounded(1); + + response_receivers.push(response_receiver); + + let request = ChannelRequest::Scrape { + request: ScrapeRequest { info_hashes }, + peer_addr, + response_sender, + }; + + // Only fails when receiver is closed + self.request_senders + .send_to(consumer_index, request) + .await + .unwrap(); + } + + let pending_scrape_response = PendingScrapeResponse { + pending_worker_responses, + stats: Default::default(), + }; + + self.wait_for_scrape_responses(response_receivers, pending_scrape_response) + .await + } + } + } + + /// Wait for partial scrape responses to arrive, + /// return full response + async fn wait_for_scrape_responses( + &self, + response_receivers: Vec>, + mut pending: PendingScrapeResponse, + ) -> Result { + let mut responses = response_receivers + .into_iter() + .map(|receiver| async { receiver.connect().await.recv().await }) + .collect::>(); + + loop { + let response = responses + .next() + .await + .ok_or_else(|| { + ConnectionError::ScrapeChannelError( + "stream ended before all partial scrape responses received", + ) + })? + .ok_or_else(|| ConnectionError::ScrapeChannelError("sender is closed"))?; + + pending.stats.extend(response.files); + pending.pending_worker_responses -= 1; + + if pending.pending_worker_responses == 0 { + let response = Response::Scrape(ScrapeResponse { + files: pending.stats, + }); + + break Ok(response); + } + } + } + + async fn write_response(&mut self, response: &Response) -> Result<(), ConnectionError> { + // Write body and final newline to response buffer + + let mut position = RESPONSE_HEADER.len(); + + let body_len = response + .write(&mut &mut self.response_buffer[position..]) + .map_err(|err| ConnectionError::ResponseBufferWrite(err))?; + + position += body_len; + + if position + 2 > self.response_buffer.len() { + return Err(ConnectionError::ResponseBufferFull); + } + + (&mut self.response_buffer[position..position + 2]).copy_from_slice(b"\r\n"); + + position += 2; + + let content_len = body_len + 2; + + // Clear content-len header value + + { + let start = RESPONSE_HEADER_A.len(); + let end = start + RESPONSE_HEADER_B.len(); + + (&mut self.response_buffer[start..end]).copy_from_slice(RESPONSE_HEADER_B); + } + + // Set content-len header value + + { + let mut buf = ::itoa::Buffer::new(); + let content_len_bytes = buf.format(content_len).as_bytes(); + + let start = RESPONSE_HEADER_A.len(); + let end = start + content_len_bytes.len(); + + (&mut self.response_buffer[start..end]).copy_from_slice(content_len_bytes); + } + + // Write buffer to stream + + self.stream + .write(&self.response_buffer[..position]) + .await + .with_context(|| "write")?; + self.stream.flush().await.with_context(|| "flush")?; + + #[cfg(feature = "metrics")] + { + let response_type = match response { + Response::Announce(_) => "announce", + Response::Scrape(_) => "scrape", + Response::Failure(_) => "error", + }; + + let peer_addr = self + .opt_peer_addr + .expect("peer addr should already have been extracted by now"); + + ::metrics::increment_counter!( + "aquatic_responses_total", + "type" => response_type, + "ip_version" => peer_addr_to_ip_version_str(&peer_addr), + "worker_index" => WORKER_INDEX.with(|index| index.get()).to_string(), + ); + } + + Ok(()) + } +} + +fn calculate_request_consumer_index(config: &Config, info_hash: InfoHash) -> usize { + (info_hash.0[0] as usize) % config.swarm_workers +} diff --git a/crates/http/src/workers/socket/mod.rs b/crates/http/src/workers/socket/mod.rs new file mode 100644 index 00000000..0baa68b7 --- /dev/null +++ b/crates/http/src/workers/socket/mod.rs @@ -0,0 +1,212 @@ +mod connection; +mod request; + +use std::cell::RefCell; +use std::os::unix::prelude::{FromRawFd, IntoRawFd}; +use std::rc::Rc; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Context; +use aquatic_common::privileges::PrivilegeDropper; +use aquatic_common::rustls_config::RustlsConfig; +use aquatic_common::{CanonicalSocketAddr, PanicSentinel, ServerStartInstant}; +use arc_swap::ArcSwap; +use futures_lite::StreamExt; +use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role}; +use glommio::channels::local_channel::{new_bounded, LocalSender}; +use glommio::net::TcpListener; +use glommio::timer::TimerActionRepeat; +use glommio::{enclose, prelude::*}; +use slotmap::HopSlotMap; + +use crate::common::*; +use crate::config::Config; +use crate::workers::socket::connection::{run_connection, ConnectionError}; + +#[cfg(feature = "metrics")] +thread_local! { static WORKER_INDEX: ::std::cell::Cell = Default::default() } + +struct ConnectionHandle { + close_conn_sender: LocalSender<()>, + valid_until: Rc>, +} + +pub async fn run_socket_worker( + _sentinel: PanicSentinel, + config: Config, + state: State, + opt_tls_config: Option>>, + request_mesh_builder: MeshBuilder, + priv_dropper: PrivilegeDropper, + server_start_instant: ServerStartInstant, + worker_index: usize, +) { + #[cfg(feature = "metrics")] + WORKER_INDEX.with(|index| index.set(worker_index)); + + let config = Rc::new(config); + let access_list = state.access_list; + + let listener = create_tcp_listener(&config, priv_dropper).expect("create tcp listener"); + + let (request_senders, _) = request_mesh_builder.join(Role::Producer).await.unwrap(); + let request_senders = Rc::new(request_senders); + + let connection_handles = Rc::new(RefCell::new(HopSlotMap::with_key())); + + TimerActionRepeat::repeat(enclose!((config, connection_handles) move || { + clean_connections( + config.clone(), + connection_handles.clone(), + server_start_instant, + ) + })); + + let mut incoming = listener.incoming(); + + while let Some(stream) = incoming.next().await { + match stream { + Ok(stream) => { + let (close_conn_sender, close_conn_receiver) = new_bounded(1); + + let valid_until = Rc::new(RefCell::new(ValidUntil::new( + server_start_instant, + config.cleaning.max_connection_idle, + ))); + + let connection_id = connection_handles.borrow_mut().insert(ConnectionHandle { + close_conn_sender, + valid_until: valid_until.clone(), + }); + + spawn_local(enclose!( + ( + config, + access_list, + request_senders, + opt_tls_config, + connection_handles, + valid_until, + ) + async move { + #[cfg(feature = "metrics")] + ::metrics::increment_gauge!( + "aquatic_active_connections", + 1.0, + "worker_index" => worker_index.to_string(), + ); + + let result = run_connection( + config, + access_list, + request_senders, + server_start_instant, + opt_tls_config, + valid_until.clone(), + close_conn_receiver, + stream, + ).await; + + #[cfg(feature = "metrics")] + ::metrics::decrement_gauge!( + "aquatic_active_connections", + 1.0, + "worker_index" => worker_index.to_string(), + ); + + match result { + Ok(()) => (), + Err(err@( + ConnectionError::ResponseBufferWrite(_) | + ConnectionError::ResponseBufferFull | + ConnectionError::ScrapeChannelError(_) | + ConnectionError::ResponseSenderClosed + )) => { + ::log::error!("connection closed: {:#}", err); + } + Err(err@ConnectionError::RequestBufferFull) => { + ::log::info!("connection closed: {:#}", err); + } + Err(err) => { + ::log::debug!("connection closed: {:#}", err); + } + } + + connection_handles.borrow_mut().remove(connection_id); + } + )) + .detach(); + } + Err(err) => { + ::log::error!("accept connection: {:?}", err); + } + } + } +} + +async fn clean_connections( + config: Rc, + connection_slab: Rc>>, + server_start_instant: ServerStartInstant, +) -> Option { + let now = server_start_instant.seconds_elapsed(); + + connection_slab.borrow_mut().retain(|_, handle| { + if handle.valid_until.borrow().valid(now) { + true + } else { + let _ = handle.close_conn_sender.try_send(()); + + false + } + }); + + Some(Duration::from_secs( + config.cleaning.connection_cleaning_interval, + )) +} + +fn create_tcp_listener( + config: &Config, + priv_dropper: PrivilegeDropper, +) -> anyhow::Result { + let domain = if config.network.address.is_ipv4() { + socket2::Domain::IPV4 + } else { + socket2::Domain::IPV6 + }; + + let socket = socket2::Socket::new(domain, socket2::Type::STREAM, Some(socket2::Protocol::TCP))?; + + if config.network.only_ipv6 { + socket + .set_only_v6(true) + .with_context(|| "socket: set only ipv6")?; + } + + socket + .set_reuse_port(true) + .with_context(|| "socket: set reuse port")?; + + socket + .bind(&config.network.address.into()) + .with_context(|| format!("socket: bind to {}", config.network.address))?; + + socket + .listen(config.network.tcp_backlog) + .with_context(|| format!("socket: listen on {}", config.network.address))?; + + priv_dropper.after_socket_creation()?; + + Ok(unsafe { TcpListener::from_raw_fd(socket.into_raw_fd()) }) +} + +#[cfg(feature = "metrics")] +fn peer_addr_to_ip_version_str(addr: &CanonicalSocketAddr) -> &'static str { + if addr.is_ipv4() { + "4" + } else { + "6" + } +} diff --git a/crates/http/src/workers/socket/request.rs b/crates/http/src/workers/socket/request.rs new file mode 100644 index 00000000..44123829 --- /dev/null +++ b/crates/http/src/workers/socket/request.rs @@ -0,0 +1,147 @@ +use std::net::IpAddr; + +use anyhow::Context; +use aquatic_http_protocol::request::Request; + +use crate::config::{Config, ReverseProxyPeerIpHeaderFormat}; + +#[derive(Debug, thiserror::Error)] +pub enum RequestParseError { + #[error("required peer ip header missing or invalid")] + RequiredPeerIpHeaderMissing(anyhow::Error), + #[error("more data needed")] + MoreDataNeeded, + #[error(transparent)] + Other(#[from] anyhow::Error), +} + +pub fn parse_request( + config: &Config, + buffer: &[u8], +) -> Result<(Request, Option), RequestParseError> { + let mut headers = [httparse::EMPTY_HEADER; 16]; + let mut http_request = httparse::Request::new(&mut headers); + + match http_request.parse(buffer).with_context(|| "httparse")? { + httparse::Status::Complete(_) => { + let path = http_request.path.ok_or(anyhow::anyhow!("no http path"))?; + let request = Request::from_http_get_path(path)?; + + let opt_peer_ip = if config.network.runs_behind_reverse_proxy { + let header_name = &config.network.reverse_proxy_ip_header_name; + let header_format = config.network.reverse_proxy_ip_header_format; + + match parse_forwarded_header(header_name, header_format, http_request.headers) { + Ok(peer_ip) => Some(peer_ip), + Err(err) => { + return Err(RequestParseError::RequiredPeerIpHeaderMissing(err)); + } + } + } else { + None + }; + + Ok((request, opt_peer_ip)) + } + httparse::Status::Partial => Err(RequestParseError::MoreDataNeeded), + } +} + +fn parse_forwarded_header( + header_name: &str, + header_format: ReverseProxyPeerIpHeaderFormat, + headers: &[httparse::Header<'_>], +) -> anyhow::Result { + for header in headers.into_iter().rev() { + if header.name == header_name { + match header_format { + ReverseProxyPeerIpHeaderFormat::LastAddress => { + return ::std::str::from_utf8(header.value)? + .split(',') + .last() + .ok_or(anyhow::anyhow!("no header value"))? + .trim() + .parse::() + .with_context(|| "parse ip"); + } + } + } + } + + Err(anyhow::anyhow!("header not present")) +} + +#[cfg(test)] +mod tests { + use super::*; + + const REQUEST_START: &str = "GET /announce?info_hash=%04%0bkV%3f%5cr%14%a6%b7%98%adC%c3%c9.%40%24%00%b9&peer_id=-ABC940-5ert69muw5t8&port=12345&uploaded=1&downloaded=2&left=3&numwant=0&key=4ab4b877&compact=1&supportcrypto=1&event=started HTTP/1.1\r\nHost: example.com\r\n"; + + #[test] + fn test_parse_peer_ip_header_multiple() { + let mut config = Config::default(); + + config.network.runs_behind_reverse_proxy = true; + config.network.reverse_proxy_ip_header_name = "X-Forwarded-For".into(); + config.network.reverse_proxy_ip_header_format = ReverseProxyPeerIpHeaderFormat::LastAddress; + + let mut request = REQUEST_START.to_string(); + + request.push_str("X-Forwarded-For: 200.0.0.1\r\n"); + request.push_str("X-Forwarded-For: 1.2.3.4, 5.6.7.8,9.10.11.12\r\n"); + request.push_str("\r\n"); + + let expected_ip = IpAddr::from([9, 10, 11, 12]); + + assert_eq!( + parse_request(&config, request.as_bytes()) + .unwrap() + .1 + .unwrap(), + expected_ip + ) + } + + #[test] + fn test_parse_peer_ip_header_single() { + let mut config = Config::default(); + + config.network.runs_behind_reverse_proxy = true; + config.network.reverse_proxy_ip_header_name = "X-Forwarded-For".into(); + config.network.reverse_proxy_ip_header_format = ReverseProxyPeerIpHeaderFormat::LastAddress; + + let mut request = REQUEST_START.to_string(); + + request.push_str("X-Forwarded-For: 1.2.3.4, 5.6.7.8,9.10.11.12\r\n"); + request.push_str("X-Forwarded-For: 200.0.0.1\r\n"); + request.push_str("\r\n"); + + let expected_ip = IpAddr::from([200, 0, 0, 1]); + + assert_eq!( + parse_request(&config, request.as_bytes()) + .unwrap() + .1 + .unwrap(), + expected_ip + ) + } + + #[test] + fn test_parse_peer_ip_header_no_header() { + let mut config = Config::default(); + + config.network.runs_behind_reverse_proxy = true; + + let mut request = REQUEST_START.to_string(); + + request.push_str("\r\n"); + + let res = parse_request(&config, request.as_bytes()); + + assert!(matches!( + res, + Err(RequestParseError::RequiredPeerIpHeaderMissing(_)) + )); + } +} diff --git a/crates/http_load_test/Cargo.toml b/crates/http_load_test/Cargo.toml index 291d0941..b4f44110 100644 --- a/crates/http_load_test/Cargo.toml +++ b/crates/http_load_test/Cargo.toml @@ -19,6 +19,7 @@ aquatic_http_protocol.workspace = true aquatic_toml_config.workspace = true anyhow = "1" +futures = "0.3" futures-lite = "1" futures-rustls = "0.24" hashbrown = "0.14" diff --git a/crates/http_load_test/src/config.rs b/crates/http_load_test/src/config.rs index 060b401b..b41d58a8 100644 --- a/crates/http_load_test/src/config.rs +++ b/crates/http_load_test/src/config.rs @@ -23,6 +23,7 @@ pub struct Config { pub url_suffix: String, pub duration: usize, pub keep_alive: bool, + pub enable_tls: bool, pub torrents: TorrentConfig, pub cpu_pinning: CpuPinningConfigDesc, } @@ -44,6 +45,7 @@ impl Default for Config { url_suffix: "".into(), duration: 0, keep_alive: true, + enable_tls: true, torrents: TorrentConfig::default(), cpu_pinning: Default::default(), } diff --git a/crates/http_load_test/src/main.rs b/crates/http_load_test/src/main.rs index 7e3c0179..36301249 100644 --- a/crates/http_load_test/src/main.rs +++ b/crates/http_load_test/src/main.rs @@ -59,11 +59,15 @@ fn run(config: Config) -> ::anyhow::Result<()> { gamma: Arc::new(gamma), }; - let tls_config = create_tls_config().unwrap(); + let opt_tls_config = if config.enable_tls { + Some(create_tls_config().unwrap()) + } else { + None + }; for i in 0..config.num_workers { let config = config.clone(); - let tls_config = tls_config.clone(); + let opt_tls_config = opt_tls_config.clone(); let state = state.clone(); let placement = get_worker_placement( @@ -76,7 +80,9 @@ fn run(config: Config) -> ::anyhow::Result<()> { LocalExecutorBuilder::new(placement) .name("load-test") .spawn(move || async move { - run_socket_thread(config, tls_config, state).await.unwrap(); + run_socket_thread(config, opt_tls_config, state) + .await + .unwrap(); }) .unwrap(); } diff --git a/crates/http_load_test/src/network.rs b/crates/http_load_test/src/network.rs index f4d718d5..6c1ff126 100644 --- a/crates/http_load_test/src/network.rs +++ b/crates/http_load_test/src/network.rs @@ -9,7 +9,7 @@ use std::{ use aquatic_http_protocol::response::Response; use futures_lite::{AsyncReadExt, AsyncWriteExt}; -use futures_rustls::{client::TlsStream, TlsConnector}; +use futures_rustls::TlsConnector; use glommio::net::TcpStream; use glommio::{prelude::*, timer::TimerActionRepeat}; use rand::{prelude::SmallRng, SeedableRng}; @@ -18,7 +18,7 @@ use crate::{common::LoadTestState, config::Config, utils::create_random_request} pub async fn run_socket_thread( config: Config, - tls_config: Arc, + opt_tls_config: Option>, load_test_state: LoadTestState, ) -> anyhow::Result<()> { let config = Rc::new(config); @@ -30,9 +30,9 @@ pub async fn run_socket_thread( if interval == 0 { loop { if *num_active_connections.borrow() < config.num_connections { - if let Err(err) = Connection::run( + if let Err(err) = run_connection( config.clone(), - tls_config.clone(), + opt_tls_config.clone(), load_test_state.clone(), num_active_connections.clone(), rng.clone(), @@ -50,7 +50,7 @@ pub async fn run_socket_thread( periodically_open_connections( config.clone(), interval, - tls_config.clone(), + opt_tls_config.clone(), load_test_state.clone(), num_active_connections.clone(), rng.clone(), @@ -66,16 +66,16 @@ pub async fn run_socket_thread( async fn periodically_open_connections( config: Rc, interval: Duration, - tls_config: Arc, + opt_tls_config: Option>, load_test_state: LoadTestState, num_active_connections: Rc>, rng: Rc>, ) -> Option { if *num_active_connections.borrow() < config.num_connections { spawn_local(async move { - if let Err(err) = Connection::run( + if let Err(err) = run_connection( config, - tls_config, + opt_tls_config, load_test_state, num_active_connections, rng.clone(), @@ -91,26 +91,18 @@ async fn periodically_open_connections( Some(interval) } -struct Connection { +async fn run_connection( config: Rc, + opt_tls_config: Option>, load_test_state: LoadTestState, + num_active_connections: Rc>, rng: Rc>, - stream: TlsStream, - buffer: [u8; 2048], -} - -impl Connection { - async fn run( - config: Rc, - tls_config: Arc, - load_test_state: LoadTestState, - num_active_connections: Rc>, - rng: Rc>, - ) -> anyhow::Result<()> { - let stream = TcpStream::connect(config.server_address) - .await - .map_err(|err| anyhow::anyhow!("connect: {:?}", err))?; +) -> anyhow::Result<()> { + let stream = TcpStream::connect(config.server_address) + .await + .map_err(|err| anyhow::anyhow!("connect: {:?}", err))?; + if let Some(tls_config) = opt_tls_config { let stream = TlsConnector::from(tls_config) .connect("example.com".try_into().unwrap(), stream) .await?; @@ -120,18 +112,49 @@ impl Connection { load_test_state, rng, stream, - buffer: [0; 2048], + buffer: Box::new([0; 2048]), + }; + + connection.run(num_active_connections).await?; + } else { + let mut connection = Connection { + config, + load_test_state, + rng, + stream, + buffer: Box::new([0; 2048]), }; + connection.run(num_active_connections).await?; + } + + Ok(()) +} + +struct Connection { + config: Rc, + load_test_state: LoadTestState, + rng: Rc>, + stream: S, + buffer: Box<[u8; 2048]>, +} + +impl Connection +where + S: futures::AsyncRead + futures::AsyncWrite + Unpin + 'static, +{ + async fn run(&mut self, num_active_connections: Rc>) -> anyhow::Result<()> { *num_active_connections.borrow_mut() += 1; - if let Err(err) = connection.run_connection_loop().await { + let result = self.run_connection_loop().await; + + if let Err(err) = &result { ::log::info!("connection error: {:?}", err); } *num_active_connections.borrow_mut() -= 1; - Ok(()) + result } async fn run_connection_loop(&mut self) -> anyhow::Result<()> { diff --git a/crates/http_protocol/src/request.rs b/crates/http_protocol/src/request.rs index db4c7e2e..865a4486 100644 --- a/crates/http_protocol/src/request.rs +++ b/crates/http_protocol/src/request.rs @@ -244,23 +244,6 @@ impl ScrapeRequest { } } -#[derive(Debug)] -pub enum RequestParseError { - NeedMoreData, - Invalid(anyhow::Error), -} - -impl ::std::fmt::Display for RequestParseError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::NeedMoreData => write!(f, "Incomplete request, more data needed"), - Self::Invalid(err) => write!(f, "Invalid request: {:#}", err), - } - } -} - -impl ::std::error::Error for RequestParseError {} - #[derive(Debug, Clone, PartialEq, Eq)] pub enum Request { Announce(AnnounceRequest), @@ -269,20 +252,20 @@ pub enum Request { impl Request { /// Parse Request from HTTP request bytes - pub fn from_bytes(bytes: &[u8]) -> Result { + pub fn from_bytes(bytes: &[u8]) -> anyhow::Result> { let mut headers = [httparse::EMPTY_HEADER; 16]; let mut http_request = httparse::Request::new(&mut headers); match http_request.parse(bytes) { Ok(httparse::Status::Complete(_)) => { if let Some(path) = http_request.path { - Self::from_http_get_path(path).map_err(RequestParseError::Invalid) + Self::from_http_get_path(path).map(Some) } else { - Err(RequestParseError::Invalid(anyhow::anyhow!("no http path"))) + Err(anyhow::anyhow!("no http path")) } } - Ok(httparse::Status::Partial) => Err(RequestParseError::NeedMoreData), - Err(err) => Err(RequestParseError::Invalid(anyhow::Error::from(err))), + Ok(httparse::Status::Partial) => Ok(None), + Err(err) => Err(anyhow::Error::from(err)), } } @@ -368,7 +351,7 @@ mod tests { bytes.extend_from_slice(&ANNOUNCE_REQUEST_PATH.as_bytes()); bytes.extend_from_slice(b" HTTP/1.1\r\n\r\n"); - let parsed_request = Request::from_bytes(&bytes[..]).unwrap(); + let parsed_request = Request::from_bytes(&bytes[..]).unwrap().unwrap(); let reference_request = get_reference_announce_request(); assert_eq!(parsed_request, reference_request); @@ -382,7 +365,7 @@ mod tests { bytes.extend_from_slice(&SCRAPE_REQUEST_PATH.as_bytes()); bytes.extend_from_slice(b" HTTP/1.1\r\n\r\n"); - let parsed_request = Request::from_bytes(&bytes[..]).unwrap(); + let parsed_request = Request::from_bytes(&bytes[..]).unwrap().unwrap(); let reference_request = Request::Scrape(ScrapeRequest { info_hashes: vec![InfoHash(REFERENCE_INFO_HASH)], }); @@ -449,7 +432,7 @@ mod tests { request.write(&mut bytes, &[]).unwrap(); - let parsed_request = Request::from_bytes(&bytes[..]).unwrap(); + let parsed_request = Request::from_bytes(&bytes[..]).unwrap().unwrap(); let success = request == parsed_request; From d7e9b688d9a66a3c1ba74877c9577a03028ca413 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Fri, 17 Nov 2023 18:22:00 +0100 Subject: [PATCH 3/4] Run cargo update Removing ahash v0.7.6 Removing ahash v0.8.3 Adding ahash v0.7.7 Adding ahash v0.8.6 Updating base64 v0.21.4 -> v0.21.5 Updating clap v4.4.6 -> v4.4.8 Updating clap_builder v4.4.6 -> v4.4.8 Updating clap_lex v0.5.1 -> v0.6.0 Updating cpufeatures v0.2.9 -> v0.2.11 Updating errno v0.3.5 -> v0.3.7 Updating futures v0.3.28 -> v0.3.29 Updating futures-channel v0.3.28 -> v0.3.29 Updating futures-core v0.3.28 -> v0.3.29 Updating futures-executor v0.3.28 -> v0.3.29 Updating futures-io v0.3.28 -> v0.3.29 Updating futures-macro v0.3.28 -> v0.3.29 Updating futures-sink v0.3.28 -> v0.3.29 Updating futures-task v0.3.28 -> v0.3.29 Updating futures-util v0.3.28 -> v0.3.29 Updating getrandom v0.2.10 -> v0.2.11 Updating hashbrown v0.14.1 -> v0.14.2 Updating hdrhistogram v7.5.2 -> v7.5.3 Updating http v0.2.9 -> v0.2.11 Updating indexmap v2.0.2 -> v2.1.0 Updating ipnet v2.8.0 -> v2.9.0 Updating js-sys v0.3.64 -> v0.3.65 Updating libc v0.2.149 -> v0.2.150 Updating linux-raw-sys v0.4.10 -> v0.4.11 Updating mio v0.8.8 -> v0.8.9 Updating portable-atomic v1.4.3 -> v1.5.1 Updating redox_syscall v0.3.5 -> v0.4.1 Updating ring v0.16.20 -> v0.17.5 Updating rustix v0.38.19 -> v0.38.24 Updating rustls v0.21.7 -> v0.21.9 Updating rustls-pemfile v1.0.3 -> v1.0.4 Updating rustls-webpki v0.101.6 -> v0.101.7 Updating sct v0.7.0 -> v0.7.1 Updating serde v1.0.189 -> v1.0.192 Updating serde_derive v1.0.189 -> v1.0.192 Updating serde_json v1.0.107 -> v1.0.108 Updating smallvec v1.11.1 -> v1.11.2 Removing socket2 v0.4.9 Removing socket2 v0.5.4 Adding socket2 v0.4.10 Adding socket2 v0.5.5 Removing spin v0.5.2 Updating syn v2.0.38 -> v2.0.39 Updating tempfile v3.8.0 -> v3.8.1 Updating thiserror v1.0.49 -> v1.0.50 Updating thiserror-impl v1.0.49 -> v1.0.50 Updating tokio v1.33.0 -> v1.34.0 Updating tracing v0.1.39 -> v0.1.40 Updating untrusted v0.7.1 -> v0.9.0 Updating wasm-bindgen v0.2.87 -> v0.2.88 Updating wasm-bindgen-backend v0.2.87 -> v0.2.88 Updating wasm-bindgen-macro v0.2.87 -> v0.2.88 Updating wasm-bindgen-macro-support v0.2.87 -> v0.2.88 Updating wasm-bindgen-shared v0.2.87 -> v0.2.88 Updating web-sys v0.3.64 -> v0.3.65 Adding zerocopy v0.7.26 Adding zerocopy-derive v0.7.26 --- Cargo.lock | 322 ++++++++++++++++++++++++++++------------------------- 1 file changed, 168 insertions(+), 154 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a20e9cf1..63fbd468 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -19,9 +19,9 @@ checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" [[package]] name = "ahash" -version = "0.7.6" +version = "0.7.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47" +checksum = "5a824f2aa7e75a0c98c5a504fceb80649e9c35265d44525b5f94de4771a395cd" dependencies = [ "getrandom", "once_cell", @@ -30,14 +30,15 @@ dependencies = [ [[package]] name = "ahash" -version = "0.8.3" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c99f64d1e06488f620f932677e24bc6e2897582980441ae90a671415bd7ec2f" +checksum = "91429305e9f0a25f6205c5b8e0d2db09e0708a7a6df0f42212bb56c32c8ac97a" dependencies = [ "cfg-if", "getrandom", "once_cell", "version_check", + "zerocopy", ] [[package]] @@ -88,17 +89,17 @@ dependencies = [ name = "aquatic_common" version = "0.8.0" dependencies = [ - "ahash 0.8.3", + "ahash 0.8.6", "anyhow", "aquatic_toml_config", "arc-swap", "duplicate", "git-testament", "glommio", - "hashbrown 0.14.1", + "hashbrown 0.14.2", "hex", "hwloc", - "indexmap 2.0.2", + "indexmap 2.1.0", "libc", "log", "privdrop", @@ -142,7 +143,7 @@ dependencies = [ "serde", "signal-hook", "slotmap", - "socket2 0.5.4", + "socket2 0.5.5", "thiserror", ] @@ -158,7 +159,7 @@ dependencies = [ "futures-lite", "futures-rustls", "glommio", - "hashbrown 0.14.1", + "hashbrown 0.14.2", "log", "mimalloc", "quickcheck", @@ -234,7 +235,7 @@ dependencies = [ "constant_time_eq", "crossbeam-channel", "getrandom", - "hashbrown 0.14.1", + "hashbrown 0.14.2", "hdrhistogram", "hex", "io-uring", @@ -252,7 +253,7 @@ dependencies = [ "serde", "signal-hook", "slab", - "socket2 0.5.4", + "socket2 0.5.5", "tempfile", "time", "tinytemplate", @@ -284,7 +285,7 @@ dependencies = [ "aquatic_common", "aquatic_toml_config", "aquatic_udp_protocol", - "hashbrown 0.14.1", + "hashbrown 0.14.2", "mimalloc", "mio", "quickcheck", @@ -292,7 +293,7 @@ dependencies = [ "rand", "rand_distr", "serde", - "socket2 0.5.4", + "socket2 0.5.5", ] [[package]] @@ -322,9 +323,9 @@ dependencies = [ "futures-lite", "futures-rustls", "glommio", - "hashbrown 0.14.1", + "hashbrown 0.14.2", "httparse", - "indexmap 2.0.2", + "indexmap 2.1.0", "log", "metrics", "metrics-exporter-prometheus", @@ -340,7 +341,7 @@ dependencies = [ "signal-hook", "slab", "slotmap", - "socket2 0.5.4", + "socket2 0.5.5", "tungstenite", ] @@ -374,7 +375,7 @@ version = "0.8.0" dependencies = [ "anyhow", "criterion 0.5.1", - "hashbrown 0.14.1", + "hashbrown 0.14.2", "quickcheck", "quickcheck_macros", "serde", @@ -454,9 +455,9 @@ checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" [[package]] name = "base64" -version = "0.21.4" +version = "0.21.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ba43ea6f343b788c8764558649e08df62f86c6ef251fdaeb1ffd010a9ae50a2" +checksum = "35636a1494ede3b646cc98f74f8e62c773a38a659ebc777a2cf26b9b74171df9" [[package]] name = "bendy" @@ -617,21 +618,21 @@ dependencies = [ [[package]] name = "clap" -version = "4.4.6" +version = "4.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d04704f56c2cde07f43e8e2c154b43f216dc5c92fc98ada720177362f953b956" +checksum = "2275f18819641850fa26c89acc84d465c1bf91ce57bc2748b28c420473352f64" dependencies = [ "clap_builder", ] [[package]] name = "clap_builder" -version = "4.4.6" +version = "4.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e231faeaca65ebd1ea3c737966bf858971cd38c3849107aa3ea7de90a804e45" +checksum = "07cdf1b148b25c1e1f7a42225e30a0d99a615cd4637eae7365548dd4529b95bc" dependencies = [ "anstyle", - "clap_lex 0.5.1", + "clap_lex 0.6.0", ] [[package]] @@ -645,9 +646,9 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.5.1" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd7cc57abe963c6d3b9d8be5b06ba7c8957a930305ca90304f24ef040aa6f961" +checksum = "702fc72eb24e5a1e48ce58027a675bc24edd52096d5397d4aea7c6dd9eca0bd1" [[package]] name = "colored" @@ -704,9 +705,9 @@ checksum = "f7144d30dcf0fafbce74250a3963025d8d52177934239851c917d29f1df280c2" [[package]] name = "cpufeatures" -version = "0.2.9" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a17b76ff3a4162b0b27f354a0c87015ddad39d35f9c0c36607a3bdd175dde1f1" +checksum = "ce420fe07aecd3e67c5f910618fe65e94158f6dcc0adf44e00d69ce2bdfe0fd0" dependencies = [ "libc", ] @@ -755,7 +756,7 @@ dependencies = [ "anes", "cast", "ciborium", - "clap 4.4.6", + "clap 4.4.8", "criterion-plot", "is-terminal", "itertools", @@ -953,9 +954,9 @@ dependencies = [ [[package]] name = "errno" -version = "0.3.5" +version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac3e13f66a2f95e32a39eaa81f6b95d42878ca0e1db0c7543723dfe12557e860" +checksum = "f258a7194e7f7c2a7837a8913aeab7fd8c383457034fa20ce4dd3dcb813e8eb8" dependencies = [ "libc", "windows-sys 0.48.0", @@ -1015,7 +1016,7 @@ dependencies = [ "futures-sink", "nanorand", "pin-project", - "spin 0.9.8", + "spin", ] [[package]] @@ -1035,9 +1036,9 @@ dependencies = [ [[package]] name = "futures" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" +checksum = "da0290714b38af9b4a7b094b8a37086d1b4e61f2df9122c3cad2577669145335" dependencies = [ "futures-channel", "futures-core", @@ -1050,9 +1051,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" +checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb" dependencies = [ "futures-core", "futures-sink", @@ -1060,15 +1061,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" +checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c" [[package]] name = "futures-executor" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0" +checksum = "0f4fb8693db0cf099eadcca0efe2a5a22e4550f98ed16aba6c48700da29597bc" dependencies = [ "futures-core", "futures-task", @@ -1077,9 +1078,9 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" +checksum = "8bf34a163b5c4c52d0478a4d757da8fb65cabef42ba90515efee0f6f9fa45aaa" [[package]] name = "futures-lite" @@ -1098,13 +1099,13 @@ dependencies = [ [[package]] name = "futures-macro" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" +checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.39", ] [[package]] @@ -1119,21 +1120,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e" +checksum = "e36d3378ee38c2a36ad710c5d30c2911d752cb941c00c72dbabfb786a7970817" [[package]] name = "futures-task" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" +checksum = "efd193069b0ddadc69c46389b740bbccdd97203899b48d09c5f7969591d6bae2" [[package]] name = "futures-util" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" +checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104" dependencies = [ "futures-channel", "futures-core", @@ -1159,9 +1160,9 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.2.10" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427" +checksum = "fe9006bed769170c11f845cf00c7c1e9092aeb3f268e007c3e760ac68008070f" dependencies = [ "cfg-if", "js-sys", @@ -1194,7 +1195,7 @@ dependencies = [ "log", "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.39", "time", ] @@ -1204,7 +1205,7 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac1f09bf53139d5680da6325b4e79c6bc1518e94a65ab74df14b7e3693a8c78b" dependencies = [ - "ahash 0.7.6", + "ahash 0.7.7", "backtrace", "bitflags 1.3.2", "bitmaps", @@ -1228,7 +1229,7 @@ dependencies = [ "signal-hook", "sketches-ddsketch 0.1.3", "smallvec", - "socket2 0.4.9", + "socket2 0.4.10", "tracing", "typenum", ] @@ -1261,25 +1262,25 @@ version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33ff8ae62cd3a9102e5637afc8452c55acf3844001bd5374e0b0bd7b6616c038" dependencies = [ - "ahash 0.8.3", + "ahash 0.8.6", ] [[package]] name = "hashbrown" -version = "0.14.1" +version = "0.14.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7dfda62a12f55daeae5015f81b0baea145391cb4520f86c248fc615d72640d12" +checksum = "f93e7192158dbcda357bdec5fb5788eebf8bbac027f3f33e719d29135ae84156" dependencies = [ - "ahash 0.8.3", + "ahash 0.8.6", "allocator-api2", "serde", ] [[package]] name = "hdrhistogram" -version = "7.5.2" +version = "7.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f19b9f54f7c7f55e31401bb647626ce0cf0f67b0004982ce815b3ee72a02aa8" +checksum = "a5b38e5c02b7c7be48c8dc5217c4f1634af2ea221caae2e024bffc7a7651c691" dependencies = [ "base64 0.13.1", "byteorder", @@ -1318,9 +1319,9 @@ checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" [[package]] name = "http" -version = "0.2.9" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd6effc99afb63425aff9b05836f029929e345a6148a14b7ecd5ab67af944482" +checksum = "8947b1a6fad4393052c7ba1f4cd97bed3e953a95c79c92ad9b051a04611d9fbb" dependencies = [ "bytes", "fnv", @@ -1381,7 +1382,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.9", + "socket2 0.4.10", "tokio", "tower-service", "tracing", @@ -1410,12 +1411,12 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.0.2" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8adf3ddd720272c6ea8bf59463c04e0f93d0bbf7c5439b691bca2987e0270897" +checksum = "d530e1a18b1cb4c484e6e34556a0d948706958449fca0cab753d649f2bce3d1f" dependencies = [ "equivalent", - "hashbrown 0.14.1", + "hashbrown 0.14.2", ] [[package]] @@ -1461,9 +1462,9 @@ dependencies = [ [[package]] name = "ipnet" -version = "2.8.0" +version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28b29a3cd74f0f4598934efe3aeba42bae0eb4680554128851ebbecb02af14e6" +checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" [[package]] name = "is-terminal" @@ -1493,9 +1494,9 @@ checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38" [[package]] name = "js-sys" -version = "0.3.64" +version = "0.3.65" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5f195fe497f702db0f318b07fdd68edb16955aed830df8363d837542f8f935a" +checksum = "54c0c35952f67de54bb584e9fd912b3023117cbafc0a77d8f3dee1fb5f572fe8" dependencies = [ "wasm-bindgen", ] @@ -1582,9 +1583,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.149" +version = "0.2.150" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b" +checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c" [[package]] name = "libm" @@ -1604,9 +1605,9 @@ dependencies = [ [[package]] name = "linux-raw-sys" -version = "0.4.10" +version = "0.4.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da2479e8c062e40bf0066ffa0bc823de0a9368974af99c9f6df941d2c231e03f" +checksum = "969488b55f8ac402214f3f5fd243ebb7206cf82de60d3172994707a4bcc2b829" [[package]] name = "lock_api" @@ -1681,7 +1682,7 @@ version = "0.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fde3af1a009ed76a778cb84fdef9e7dbbdf5775ae3e4cc1f434a6a307f6f76c5" dependencies = [ - "ahash 0.8.3", + "ahash 0.8.6", "metrics-macros", "portable-atomic", ] @@ -1692,7 +1693,7 @@ version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a4964177ddfdab1e3a2b37aec7cf320e14169abb0ed73999f558136409178d5" dependencies = [ - "base64 0.21.4", + "base64 0.21.5", "hyper", "indexmap 1.9.3", "ipnet", @@ -1711,7 +1712,7 @@ checksum = "ddece26afd34c31585c74a4db0630c376df271c285d682d1e55012197830b6df" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.39", ] [[package]] @@ -1759,9 +1760,9 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.8" +version = "0.8.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" +checksum = "3dce281c5e46beae905d4de1870d8b1509a9142b62eedf18b443b011ca8343d0" dependencies = [ "libc", "log", @@ -1971,7 +1972,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.39", ] [[package]] @@ -2022,9 +2023,9 @@ dependencies = [ [[package]] name = "portable-atomic" -version = "1.4.3" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31114a898e107c51bb1609ffaf55a0e011cf6a4d7f1170d0015a165082c0338b" +checksum = "3bccab0e7fd7cc19f820a1c8c91720af652d0c88dc9664dd72aef2614f04af3b" [[package]] name = "powerfmt" @@ -2209,9 +2210,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.3.5" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29" +checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa" dependencies = [ "bitflags 1.3.2", ] @@ -2247,17 +2248,16 @@ checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" [[package]] name = "ring" -version = "0.16.20" +version = "0.17.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" +checksum = "fb0205304757e5d899b9c2e448b867ffd03ae7f988002e47cd24954391394d0b" dependencies = [ "cc", + "getrandom", "libc", - "once_cell", - "spin 0.5.2", + "spin", "untrusted", - "web-sys", - "winapi 0.3.9", + "windows-sys 0.48.0", ] [[package]] @@ -2277,12 +2277,12 @@ checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" [[package]] name = "rustix" -version = "0.38.19" +version = "0.38.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "745ecfa778e66b2b63c88a61cb36e0eea109e803b0b86bf9879fbc77c70e86ed" +checksum = "9ad981d6c340a49cdc40a1028d9c6084ec7e9fa33fcb839cab656a267071e234" dependencies = [ "bitflags 2.4.1", - "errno 0.3.5", + "errno 0.3.7", "libc", "linux-raw-sys", "windows-sys 0.48.0", @@ -2290,9 +2290,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.21.7" +version = "0.21.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd8d6c9f025a446bc4d18ad9632e69aec8f287aa84499ee335599fabd20c3fd8" +checksum = "629648aced5775d558af50b2b4c7b02983a04b312126d45eeead26e7caa498b9" dependencies = [ "log", "ring", @@ -2302,18 +2302,18 @@ dependencies = [ [[package]] name = "rustls-pemfile" -version = "1.0.3" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d3987094b1d07b653b7dfdc3f70ce9a1da9c51ac18c1b06b662e4f9a0e9f4b2" +checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" dependencies = [ - "base64 0.21.4", + "base64 0.21.5", ] [[package]] name = "rustls-webpki" -version = "0.101.6" +version = "0.101.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c7d5dece342910d9ba34d259310cae3e0154b873b35408b787b59bce53d34fe" +checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" dependencies = [ "ring", "untrusted", @@ -2354,9 +2354,9 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" [[package]] name = "sct" -version = "0.7.0" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" +checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" dependencies = [ "ring", "untrusted", @@ -2364,9 +2364,9 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.189" +version = "1.0.192" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e422a44e74ad4001bdc8eede9a4570ab52f71190e9c076d14369f38b9200537" +checksum = "bca2a08484b285dcb282d0f67b26cadc0df8b19f8c12502c13d966bf9482f001" dependencies = [ "serde_derive", ] @@ -2392,20 +2392,20 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.189" +version = "1.0.192" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e48d1f918009ce3145511378cf68d613e3b3d9137d67272562080d68a2b32d5" +checksum = "d6c7207fbec9faa48073f3e3074cbe553af6ea512d7c21ba46e434e70ea9fbc1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.39", ] [[package]] name = "serde_json" -version = "1.0.107" +version = "1.0.108" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65" +checksum = "3d1c7e3eac408d115102c4c24ad393e0821bb3a5df4d506a80f85f7a742a526b" dependencies = [ "itoa", "ryu", @@ -2507,9 +2507,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.11.1" +version = "1.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "942b4a808e05215192e39f4ab80813e599068285906cc91aa64f923db842bd5a" +checksum = "4dccd0940a2dcdf68d092b8cbab7dc0ad8fa938bf95787e1b916b0e3d0e8e970" [[package]] name = "snafu" @@ -2535,9 +2535,9 @@ dependencies = [ [[package]] name = "socket2" -version = "0.4.9" +version = "0.4.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64a4a911eed85daf18834cfaa86a79b7d266ff93ff5ba14005426219480ed662" +checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" dependencies = [ "libc", "winapi 0.3.9", @@ -2545,20 +2545,14 @@ dependencies = [ [[package]] name = "socket2" -version = "0.5.4" +version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4031e820eb552adee9295814c0ced9e5cf38ddf1e8b7d566d6de8e2538ea989e" +checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9" dependencies = [ "libc", "windows-sys 0.48.0", ] -[[package]] -name = "spin" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" - [[package]] name = "spin" version = "0.9.8" @@ -2587,9 +2581,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.38" +version = "2.0.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e96b79aaa137db8f61e26363a0c9b47d8b4ec75da28b7d1d614c2303e232408b" +checksum = "23e78b90f2fcf45d3e842032ce32e3f2d1545ba6636271dcbf24fa306d87be7a" dependencies = [ "proc-macro2", "quote", @@ -2598,9 +2592,9 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.8.0" +version = "3.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb94d2f3cc536af71caac6b6fcebf65860b347e7ce0cc9ebe8f70d3e521054ef" +checksum = "7ef1adac450ad7f4b3c28589471ade84f25f731a7a0fe30d71dfa9f60fd808e5" dependencies = [ "cfg-if", "fastrand 2.0.1", @@ -2617,22 +2611,22 @@ checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d" [[package]] name = "thiserror" -version = "1.0.49" +version = "1.0.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1177e8c6d7ede7afde3585fd2513e611227efd6481bd78d2e82ba1ce16557ed4" +checksum = "f9a7210f5c9a7156bb50aa36aed4c95afb51df0df00713949448cf9e97d382d2" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.49" +version = "1.0.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10712f02019e9288794769fba95cd6847df9874d49d871d062172f9dd41bc4cc" +checksum = "266b2e40bc00e5a6c09c3584011e08b06f123c00362c92b975ba9843aaaa14b8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.39", ] [[package]] @@ -2693,15 +2687,15 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.33.0" +version = "1.34.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f38200e3ef7995e5ef13baec2f432a6da0aa9ac495b2c0e8f3b7eec2c92d653" +checksum = "d0c014766411e834f7af5b8f4cf46257aab4036ca95e9d2c144a10f59ad6f5b9" dependencies = [ "backtrace", "libc", "mio", "pin-project-lite", - "socket2 0.5.4", + "socket2 0.5.5", "windows-sys 0.48.0", ] @@ -2722,9 +2716,9 @@ checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" [[package]] name = "tracing" -version = "0.1.39" +version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee2ef2af84856a50c1d430afce2fdded0a4ec7eda868db86409b4543df0797f9" +checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ "pin-project-lite", "tracing-attributes", @@ -2739,7 +2733,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.39", ] [[package]] @@ -2811,9 +2805,9 @@ checksum = "e51733f11c9c4f72aa0c160008246859e340b00807569a0da0e7a1079b27ba85" [[package]] name = "untrusted" -version = "0.7.1" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" [[package]] name = "url" @@ -2889,9 +2883,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.87" +version = "0.2.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7706a72ab36d8cb1f80ffbf0e071533974a60d0a308d01a5d0375bf60499a342" +checksum = "7daec296f25a1bae309c0cd5c29c4b260e510e6d813c286b19eaadf409d40fce" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -2899,24 +2893,24 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.87" +version = "0.2.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ef2b6d3c510e9625e5fe6f509ab07d66a760f0885d858736483c32ed7809abd" +checksum = "e397f4664c0e4e428e8313a469aaa58310d302159845980fd23b0f22a847f217" dependencies = [ "bumpalo", "log", "once_cell", "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.39", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-macro" -version = "0.2.87" +version = "0.2.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dee495e55982a3bd48105a7b947fd2a9b4a8ae3010041b9e0faab3f9cd028f1d" +checksum = "5961017b3b08ad5f3fe39f1e79877f8ee7c23c5e5fd5eb80de95abc41f1f16b2" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -2924,28 +2918,28 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.87" +version = "0.2.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" +checksum = "c5353b8dab669f5e10f5bd76df26a9360c748f054f862ff5f3f8aae0c7fb3907" dependencies = [ "proc-macro2", "quote", - "syn 2.0.38", + "syn 2.0.39", "wasm-bindgen-backend", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-shared" -version = "0.2.87" +version = "0.2.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1" +checksum = "0d046c5d029ba91a1ed14da14dca44b68bf2f124cfbaf741c54151fdb3e0750b" [[package]] name = "web-sys" -version = "0.3.64" +version = "0.3.65" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b85cbef8c220a6abc02aefd892dfc0fc23afb1c6a426316ec33253a3877249b" +checksum = "5db499c5f66323272151db0e666cd34f78617522fb0c1604d31a27c50c206a85" dependencies = [ "js-sys", "wasm-bindgen", @@ -3140,3 +3134,23 @@ name = "windows_x86_64_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" + +[[package]] +name = "zerocopy" +version = "0.7.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e97e415490559a91254a2979b4829267a57d2fcd741a98eee8b722fb57289aa0" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd7e48ccf166952882ca8bd778a43502c64f33bf94c12ebe2a7f08e5a0f6689f" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.39", +] From 2520b2cbbdc63064b4917b54d27291bd6919f75b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Fri, 17 Nov 2023 18:29:34 +0100 Subject: [PATCH 4/4] Fix GitHub CI build --- .github/actions/test-file-transfers/entrypoint.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/actions/test-file-transfers/entrypoint.sh b/.github/actions/test-file-transfers/entrypoint.sh index 87bae510..9672b860 100755 --- a/.github/actions/test-file-transfers/entrypoint.sh +++ b/.github/actions/test-file-transfers/entrypoint.sh @@ -62,6 +62,7 @@ echo "log_level = 'debug' [network] address = '127.0.0.1:3001' +enable_tls = true tls_certificate_path = './cert.crt' tls_private_key_path = './key.pk8' " > tls.toml