Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: upgrade to [email protected] #6309

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,922 changes: 971 additions & 951 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ fd-lock = "4"
futures-lite = { workspace = true }
futures = { workspace = true }
hex = "0.4.0"
hickory-resolver = "=0.25.0-alpha.2"
hickory-resolver = "=0.25.0-alpha.4"
http-body-util = "0.1.2"
humansize = "2"
hyper = "1"
hyper-util = "0.1.10"
image = { version = "0.25.5", default-features=false, features = ["gif", "jpeg", "ico", "png", "pnm", "webp", "bmp"] }
iroh-gossip = { version = "0.28.1", default-features = false, features = ["net"] }
iroh-net = { version = "0.28.1", default-features = false }
iroh-gossip = { version = "0.30", default-features = false, features = ["net"] }
iroh = { version = "0.30", default-features = false }
kamadak-exif = "0.6.1"
lettre_email = { git = "https://github.com/deltachat/lettre", branch = "master" }
libc = { workspace = true }
Expand Down Expand Up @@ -110,6 +110,7 @@ toml = "0.8"
url = "2"
uuid = { version = "1", features = ["serde", "v4"] }
webpki-roots = "0.26.7"
data-encoding = "2.6.0"

[dev-dependencies]
anyhow = { workspace = true, features = ["backtrace"] } # Enable `backtrace` feature in tests.
Expand Down
23 changes: 11 additions & 12 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,8 @@ ignore = [
# Unmaintained encoding
"RUSTSEC-2021-0153",

# Unmaintained proc-macro-error
# <https://rustsec.org/advisories/RUSTSEC-2024-0370>
"RUSTSEC-2024-0370",

# Unmaintained instant
"RUSTSEC-2024-0384",

# idna 0.5.0
"RUSTSEC-2024-0421",
]

[bans]
Expand All @@ -34,24 +27,32 @@ skip = [
{ name = "base64", version = "0.21.7" },
{ name = "bitflags", version = "1.3.2" },
{ name = "event-listener", version = "2.5.3" },
{ name = "event-listener", version = "4.0.3" },
{ name = "fastrand", version = "1.9.0" },
{ name = "fiat-crypto", version = "0.1.20" },
{ name = "futures-lite", version = "1.13.0" },
{ name = "getrandom", version = "<0.2" },
{ name = "hashbrown", version = "0.14.5" },
{ name = "hashbrown", version = "0.15.2" },
{ name = "hostname", version = "0.3.1" },
{ name = "http", version = "0.2.12" },
{ name = "idna", version = "0.5.0" },
{ name = "netlink-packet-route", version = "0.17.1" },
{ name = "netlink-packet-route", version = "0.21.0" },
{ name = "nix", version = "0.26.4" },
{ name = "nix", version = "0.27.1" },
{ name = "quick-error", version = "<2.0" },
{ name = "rand_chacha", version = "<0.3" },
{ name = "rand_core", version = "<0.6" },
{ name = "rand", version = "<0.8" },
{ name = "redox_syscall", version = "0.3.5" },
{ name = "regex-automata", version = "0.1.10" },
{ name = "regex-syntax", version = "0.6.29" },
{ name = "sync_wrapper", version = "0.1.2" },
{ name = "rtnetlink", version = "0.13.1" },
{ name = "syn", version = "1.0.109" },
{ name = "thiserror-impl", version = "1.0.69" },
{ name = "thiserror", version = "1.0.69" },
{ name = "time", version = "<0.3" },
{ name = "tokio-tungstenite", version = "0.21.0" },
{ name = "tungstenite", version = "0.21.0" },
{ name = "unicode-width", version = "0.1.11" },
{ name = "wasi", version = "<0.11" },
{ name = "windows_aarch64_gnullvm", version = "<0.52" },
Expand All @@ -65,7 +66,6 @@ skip = [
{ name = "windows_x86_64_gnullvm", version = "<0.52" },
{ name = "windows_x86_64_gnu", version = "<0.52" },
{ name = "windows_x86_64_msvc", version = "<0.52" },
{ name = "winreg", version = "0.50.0" },
]


Expand All @@ -82,7 +82,6 @@ allow = [
"MPL-2.0",
"OpenSSL",
"Unicode-3.0",
"Unicode-DFS-2016",
"Zlib",
]

Expand Down
15 changes: 7 additions & 8 deletions src/imex/transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ use std::task::Poll;

use anyhow::{bail, format_err, Context as _, Result};
use futures_lite::FutureExt;
use iroh_net::relay::RelayMode;
use iroh_net::Endpoint;
use iroh::{Endpoint, RelayMode};
use tokio::fs;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -65,11 +64,11 @@ const BACKUP_ALPN: &[u8] = b"/deltachat/backup";
/// task use the [`Context::stop_ongoing`] mechanism.
#[derive(Debug)]
pub struct BackupProvider {
/// iroh-net endpoint.
/// iroh endpoint.
_endpoint: Endpoint,

/// iroh-net address.
node_addr: iroh_net::NodeAddr,
/// iroh address.
node_addr: iroh::NodeAddr,

/// Authentication token that should be submitted
/// to retrieve the backup.
Expand Down Expand Up @@ -162,7 +161,7 @@ impl BackupProvider {

async fn handle_connection(
context: Context,
conn: iroh_net::endpoint::Connecting,
conn: iroh::endpoint::Connecting,
auth_token: String,
dbfile: Arc<TempPathGuard>,
) -> Result<()> {
Expand Down Expand Up @@ -291,7 +290,7 @@ impl Future for BackupProvider {

pub async fn get_backup2(
context: &Context,
node_addr: iroh_net::NodeAddr,
node_addr: iroh::NodeAddr,
auth_token: String,
) -> Result<()> {
let relay_mode = RelayMode::Disabled;
Expand Down Expand Up @@ -337,7 +336,7 @@ pub async fn get_backup2(
/// This is a long running operation which will return only when completed.
///
/// Using [`Qr`] as argument is a bit odd as it only accepts specific variant of it. It
/// does avoid having [`iroh_net::NodeAddr`] in the primary API however, without
/// does avoid having [`iroh::NodeAddr`] in the primary API however, without
/// having to revert to untyped bytes.
pub async fn get_backup(context: &Context, qr: Qr) -> Result<()> {
match qr {
Expand Down
105 changes: 29 additions & 76 deletions src/peer_channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,12 @@
//! and `joinRealtimeChannel().send()` just like the other peers.

use anyhow::{anyhow, bail, Context as _, Result};
use data_encoding::BASE32_NOPAD;
use email::Header;
use futures_lite::StreamExt;
use iroh::{Endpoint, NodeAddr, NodeId, PublicKey, RelayMap, RelayMode, RelayUrl, SecretKey};
use iroh_gossip::net::{Event, Gossip, GossipEvent, JoinOptions, GOSSIP_ALPN};
use iroh_gossip::proto::TopicId;
use iroh_net::key::{PublicKey, SecretKey};
use iroh_net::relay::{RelayMap, RelayUrl};
use iroh_net::{relay::RelayMode, Endpoint};
use iroh_net::{NodeAddr, NodeId};
use parking_lot::Mutex;
use std::collections::{BTreeSet, HashMap};
use std::env;
Expand All @@ -54,8 +52,8 @@ const PUBLIC_KEY_STUB: &[u8] = "static_string".as_bytes();
/// Store iroh peer channels for the context.
#[derive(Debug)]
pub struct Iroh {
/// [Endpoint] needed for iroh peer channels.
pub(crate) endpoint: Endpoint,
/// iroh router needed for iroh peer channels.
pub(crate) router: iroh::protocol::Router,

/// [Gossip] needed for iroh peer channels.
pub(crate) gossip: Gossip,
Expand All @@ -75,15 +73,12 @@ pub struct Iroh {
impl Iroh {
/// Notify the endpoint that the network has changed.
pub(crate) async fn network_change(&self) {
self.endpoint.network_change().await
self.router.endpoint().network_change().await
}

/// Closes the QUIC endpoint.
pub(crate) async fn close(self) -> Result<()> {
self.endpoint
.close(0u32.into(), b"")
.await
.context("Closing iroh endpoint failed")
self.router.shutdown().await.context("Closing iroh failed")
}

/// Join a topic and create the subscriber loop for it.
Expand Down Expand Up @@ -120,16 +115,16 @@ impl Iroh {

// Inform iroh of potentially new node addresses
for node_addr in &peers {
if !node_addr.info.is_empty() {
self.endpoint.add_node_addr(node_addr.clone())?;
if !node_addr.is_empty() {
self.router.endpoint().add_node_addr(node_addr.clone())?;
}
}

let (join_tx, join_rx) = oneshot::channel();

let (gossip_sender, gossip_receiver) = self
.gossip
.join_with_opts(topic, JoinOptions::with_bootstrap(node_ids))
.subscribe_with_opts(topic, JoinOptions::with_bootstrap(node_ids))
.split();

let ctx = ctx.clone();
Expand All @@ -148,10 +143,10 @@ impl Iroh {
pub async fn maybe_add_gossip_peers(&self, topic: TopicId, peers: Vec<NodeAddr>) -> Result<()> {
if self.iroh_channels.read().await.get(&topic).is_some() {
for peer in &peers {
self.endpoint.add_node_addr(peer.clone())?;
self.router.endpoint().add_node_addr(peer.clone())?;
}

self.gossip.join_with_opts(
self.gossip.subscribe_with_opts(
topic,
JoinOptions::with_bootstrap(peers.into_iter().map(|peer| peer.node_id)),
);
Expand Down Expand Up @@ -198,8 +193,8 @@ impl Iroh {

/// Get the iroh [NodeAddr] without direct IP addresses.
pub(crate) async fn get_node_addr(&self) -> Result<NodeAddr> {
let mut addr = self.endpoint.node_addr().await?;
addr.info.direct_addresses = BTreeSet::new();
let mut addr = self.router.endpoint().node_addr().await?;
addr.direct_addresses = BTreeSet::new();
Ok(addr)
}

Expand Down Expand Up @@ -242,7 +237,7 @@ impl Context {
/// Create iroh endpoint and gossip.
async fn init_peer_channels(&self) -> Result<Iroh> {
info!(self, "Initializing peer channels.");
let secret_key = SecretKey::generate();
let secret_key = SecretKey::generate(rand::rngs::OsRng);
let public_key = secret_key.public();

let relay_mode = if let Some(relay_url) = self
Expand All @@ -267,24 +262,22 @@ impl Context {
.await?;

// create gossip
let my_addr = endpoint.node_addr().await?;
let gossip_config = iroh_gossip::proto::topic::Config {
// Allow messages up to 128 KB in size.
// We set the limit to 128 KiB to account for internal overhead,
// but only guarantee 128 KB of payload to WebXDC developers.
max_message_size: 128 * 1024,
..Default::default()
};
let gossip = Gossip::from_endpoint(endpoint.clone(), gossip_config, &my_addr.info);
// Allow messages up to 128 KB in size.
// We set the limit to 128 KiB to account for internal overhead,
// but only guarantee 128 KB of payload to WebXDC developers.

// spawn endpoint loop that forwards incoming connections to the gossiper
let context = self.clone();
let gossip = Gossip::builder()
.max_message_size(128 * 1024)
.spawn(endpoint.clone())
.await?;

// Shuts down on deltachat shutdown
tokio::spawn(endpoint_loop(context, endpoint.clone(), gossip.clone()));
let router = iroh::protocol::Router::builder(endpoint)
.accept(GOSSIP_ALPN, gossip.clone())
.spawn()
.await?;

Ok(Iroh {
endpoint,
router,
gossip,
sequence_numbers: Mutex::new(HashMap::new()),
iroh_channels: RwLock::new(HashMap::new()),
Expand Down Expand Up @@ -507,54 +500,13 @@ fn create_random_topic() -> TopicId {
pub(crate) async fn create_iroh_header(ctx: &Context, msg_id: MsgId) -> Result<Header> {
let topic = create_random_topic();
insert_topic_stub(ctx, msg_id, topic).await?;
let topic_string = BASE32_NOPAD.encode(topic.as_bytes()).to_ascii_lowercase();
Ok(Header::new(
HeaderDef::IrohGossipTopic.get_headername().to_string(),
topic.to_string(),
topic_string,
))
}

async fn endpoint_loop(context: Context, endpoint: Endpoint, gossip: Gossip) {
while let Some(conn) = endpoint.accept().await {
let conn = match conn.accept() {
Ok(conn) => conn,
Err(err) => {
warn!(context, "Failed to accept iroh connection: {err:#}.");
continue;
}
};
info!(context, "IROH_REALTIME: accepting iroh connection");
let gossip = gossip.clone();
let context = context.clone();
tokio::spawn(async move {
if let Err(err) = handle_connection(&context, conn, gossip).await {
warn!(context, "IROH_REALTIME: iroh connection error: {err}");
}
});
}
}

async fn handle_connection(
context: &Context,
mut conn: iroh_net::endpoint::Connecting,
gossip: Gossip,
) -> anyhow::Result<()> {
let alpn = conn.alpn().await?;
let conn = conn.await?;
let peer_id = iroh_net::endpoint::get_remote_node_id(&conn)?;

match alpn.as_slice() {
GOSSIP_ALPN => gossip
.handle_connection(conn)
.await
.context(format!("Gossip connection to {peer_id} failed"))?,
_ => warn!(
context,
"Ignoring connection from {peer_id}: unsupported ALPN protocol"
),
}
Ok(())
}

async fn subscribe_loop(
context: &Context,
mut stream: iroh_gossip::net::GossipReceiver,
Expand Down Expand Up @@ -971,6 +923,7 @@ mod tests {

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_parallel_connect() {
eprintln!("START-----");
let mut tcm = TestContextManager::new();
let alice = &mut tcm.alice().await;
let bob = &mut tcm.bob().await;
Expand Down
8 changes: 4 additions & 4 deletions src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pub(crate) mod data;

use anyhow::Result;
use deltachat_contact_tools::EmailAddress;
use hickory_resolver::{config, AsyncResolver, TokioAsyncResolver};
use hickory_resolver::{config, Resolver, TokioResolver};

use crate::config::Config;
use crate::context::Context;
Expand Down Expand Up @@ -165,11 +165,11 @@ impl ProviderOptions {
/// We first try to read the system's resolver from `/etc/resolv.conf`.
/// This does not work at least on some Androids, therefore we fallback
/// to the default `ResolverConfig` which uses eg. to google's `8.8.8.8` or `8.8.4.4`.
fn get_resolver() -> Result<TokioAsyncResolver> {
if let Ok(resolver) = AsyncResolver::tokio_from_system_conf() {
fn get_resolver() -> Result<TokioResolver> {
if let Ok(resolver) = Resolver::tokio_from_system_conf() {
return Ok(resolver);
}
let resolver = AsyncResolver::tokio(
let resolver = Resolver::tokio(
config::ResolverConfig::default(),
config::ResolverOpts::default(),
);
Expand Down
4 changes: 2 additions & 2 deletions src/qr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ pub enum Qr {
/// Provides a backup that can be retrieved using iroh-net based backup transfer protocol.
Backup2 {
/// Iroh node address.
node_addr: iroh_net::NodeAddr,
node_addr: iroh::NodeAddr,

/// Authentication token.
auth_token: String,
Expand Down Expand Up @@ -644,7 +644,7 @@ fn decode_backup2(qr: &str) -> Result<Qr> {
.split_once('&')
.context("Backup QR code has no separator")?;
let auth_token = auth_token.to_string();
let node_addr = serde_json::from_str::<iroh_net::NodeAddr>(node_addr)
let node_addr = serde_json::from_str::<iroh::NodeAddr>(node_addr)
.context("Invalid node addr in backup QR code")?;

Ok(Qr::Backup2 {
Expand Down
Loading
Loading