From 53bb8a983114ba27f3316aa0415ba0849985cae9 Mon Sep 17 00:00:00 2001 From: link2xt Date: Thu, 26 Oct 2023 14:33:20 +0000 Subject: [PATCH] chore: update to async-channel 2 --- Cargo.lock | 49 ++++++++++++++++++++++++++++++------ Cargo.toml | 3 ++- deltachat-jsonrpc/Cargo.toml | 2 +- deltachat-jsonrpc/src/lib.rs | 20 +++++++-------- deny.toml | 2 ++ src/events.rs | 8 +++--- 6 files changed, 62 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 79bb926bc2..2f31d91cde 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -192,10 +192,23 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" dependencies = [ "concurrent-queue", - "event-listener", + "event-listener 2.5.3", "futures-core", ] +[[package]] +name = "async-channel" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "336d835910fab747186c56586562cb46f42809c2843ef3a84f47509009522838" +dependencies = [ + "concurrent-queue", + "event-listener 3.0.0", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-compression" version = "0.3.15" @@ -215,7 +228,7 @@ version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "936c1b580be4373b48c9c687e0c79285441664398354df28d0860087cac0c069" dependencies = [ - "async-channel", + "async-channel 1.9.0", "base64 0.21.5", "bytes", "chrono", @@ -237,7 +250,7 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "479db852db25d9dbf6204e6cb6253698f175c15726470f78af0d918e99d6156e" dependencies = [ - "event-listener", + "event-listener 2.5.3", ] [[package]] @@ -1078,7 +1091,7 @@ version = "1.127.0" dependencies = [ "ansi_term", "anyhow", - "async-channel", + "async-channel 2.0.0", "async-imap", "async-native-tls", "async-smtp", @@ -1115,6 +1128,7 @@ dependencies = [ "parking_lot", "percent-encoding", "pgp", + "pin-project", "pretty_assertions", "pretty_env_logger", "proptest", @@ -1154,7 +1168,7 @@ name = "deltachat-jsonrpc" version = "1.127.0" dependencies = [ "anyhow", - "async-channel", + "async-channel 2.0.0", "axum", "base64 0.21.5", "deltachat", @@ -1776,6 +1790,27 @@ version = "2.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" +[[package]] +name = "event-listener" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29e56284f00d94c1bc7fd3c77027b4623c88c1f53d8d2394c6199f2921dea325" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d96b852f1345da36d551b9473fa1e2b1eb5c5195585c6c018118bc92a8d91160" +dependencies = [ + "event-listener 3.0.0", + "pin-project-lite", +] + [[package]] name = "fallible-iterator" version = "0.2.0" @@ -4560,7 +4595,7 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af91f480ee899ab2d9f8435bfdfc14d08a5754bd9d3fef1f1a1c23336aad6c8b" dependencies = [ - "async-channel", + "async-channel 1.9.0", "cfg-if", "futures-core", "pin-project-lite", @@ -5625,7 +5660,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75b5547af776328f66a5476ea3b7c0789e6fed164eb32d1a2122cfb39ffa505d" dependencies = [ "anyhow", - "async-channel", + "async-channel 1.9.0", "async-mutex", "async-trait", "axum", diff --git a/Cargo.toml b/Cargo.toml index 53fd694930..2a84864af9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,7 +32,7 @@ format-flowed = { path = "./format-flowed" } ratelimit = { path = "./deltachat-ratelimit" } anyhow = "1" -async-channel = "1.8.0" +async-channel = "2.0.0" async-imap = { version = "0.9.1", default-features = false, features = ["runtime-tokio"] } async-native-tls = { version = "0.5", default-features = false, features = ["runtime-tokio"] } async-smtp = { version = "0.9", default-features = false, features = ["runtime-tokio"] } @@ -65,6 +65,7 @@ once_cell = "1.18.0" percent-encoding = "2.3" parking_lot = "0.12" pgp = { version = "0.10", default-features = false } +pin-project = "1" pretty_env_logger = { version = "0.5", optional = true } qrcodegen = "1.7.0" quick-xml = "0.31" diff --git a/deltachat-jsonrpc/Cargo.toml b/deltachat-jsonrpc/Cargo.toml index c615973092..bc1ed00786 100644 --- a/deltachat-jsonrpc/Cargo.toml +++ b/deltachat-jsonrpc/Cargo.toml @@ -19,7 +19,7 @@ schemars = "0.8.13" serde = { version = "1.0", features = ["derive"] } tempfile = "3.8.0" log = "0.4" -async-channel = { version = "1.8.0" } +async-channel = { version = "2.0.0" } futures = { version = "0.3.28" } serde_json = "1.0.105" yerpc = { version = "0.5.2", features = ["anyhow_expose", "openrpc"] } diff --git a/deltachat-jsonrpc/src/lib.rs b/deltachat-jsonrpc/src/lib.rs index 10ec39ea48..cba1625cea 100644 --- a/deltachat-jsonrpc/src/lib.rs +++ b/deltachat-jsonrpc/src/lib.rs @@ -17,7 +17,7 @@ mod tests { let accounts = Accounts::new(tmp_dir, writable).await?; let api = CommandApi::new(accounts); - let (sender, mut receiver) = unbounded::(); + let (sender, receiver) = unbounded::(); let (client, mut rx) = RpcClient::new(); let session = RpcSession::new(client, api); @@ -36,17 +36,17 @@ mod tests { let request = r#"{"jsonrpc":"2.0","method":"add_account","params":[],"id":1}"#; let response = r#"{"jsonrpc":"2.0","id":1,"result":1}"#; session.handle_incoming(request).await; - let result = receiver.next().await; + let result = receiver.recv().await?; println!("{result:?}"); - assert_eq!(result, Some(response.to_owned())); + assert_eq!(result, response.to_owned()); } { let request = r#"{"jsonrpc":"2.0","method":"get_all_account_ids","params":[],"id":2}"#; let response = r#"{"jsonrpc":"2.0","id":2,"result":[1]}"#; session.handle_incoming(request).await; - let result = receiver.next().await; + let result = receiver.recv().await?; println!("{result:?}"); - assert_eq!(result, Some(response.to_owned())); + assert_eq!(result, response.to_owned()); } Ok(()) @@ -59,7 +59,7 @@ mod tests { let accounts = Accounts::new(tmp_dir, writable).await?; let api = CommandApi::new(accounts); - let (sender, mut receiver) = unbounded::(); + let (sender, receiver) = unbounded::(); let (client, mut rx) = RpcClient::new(); let session = RpcSession::new(client, api); @@ -78,15 +78,15 @@ mod tests { let request = r#"{"jsonrpc":"2.0","method":"add_account","params":[],"id":1}"#; let response = r#"{"jsonrpc":"2.0","id":1,"result":1}"#; session.handle_incoming(request).await; - let result = receiver.next().await; - assert_eq!(result, Some(response.to_owned())); + let result = receiver.recv().await?; + assert_eq!(result, response.to_owned()); } { let request = r#"{"jsonrpc":"2.0","method":"batch_set_config","id":2,"params":[1,{"addr":"","mail_user":"","mail_pw":"","mail_server":"","mail_port":"","mail_security":"","imap_certificate_checks":"","send_user":"","send_pw":"","send_server":"","send_port":"","send_security":"","smtp_certificate_checks":"","socks5_enabled":"0","socks5_host":"","socks5_port":"","socks5_user":"","socks5_password":""}]}"#; let response = r#"{"jsonrpc":"2.0","id":2,"result":null}"#; session.handle_incoming(request).await; - let result = receiver.next().await; - assert_eq!(result, Some(response.to_owned())); + let result = receiver.recv().await?; + assert_eq!(result, response.to_owned()); } Ok(()) diff --git a/deny.toml b/deny.toml index bb7961bd12..86ec81adf8 100644 --- a/deny.toml +++ b/deny.toml @@ -11,6 +11,7 @@ ignore = [ # when upgrading. # Please keep this list alphabetically sorted. skip = [ + { name = "async-channel", version = "1.9.0" }, { name = "base16ct", version = "0.1.1" }, { name = "base64", version = "<0.21" }, { name = "bitflags", version = "1.3.2" }, @@ -24,6 +25,7 @@ skip = [ { name = "digest", version = "<0.10" }, { name = "ed25519-dalek", version = "1.0.1" }, { name = "ed25519", version = "1.5.3" }, + { name = "event-listener", version = "2.5.3" }, { name = "getrandom", version = "<0.2" }, { name = "hashbrown", version = "<0.14.0" }, { name = "indexmap", version = "<2.0.0" }, diff --git a/src/events.rs b/src/events.rs index 977d3f62cd..e1a9e89575 100644 --- a/src/events.rs +++ b/src/events.rs @@ -1,6 +1,7 @@ //! # Events specification. use async_channel::{self as channel, Receiver, Sender, TrySendError}; +use pin_project::pin_project; mod payload; @@ -64,7 +65,8 @@ impl Events { /// [`Context::get_event_emitter`]: crate::context::Context::get_event_emitter /// [`Stream`]: futures::stream::Stream #[derive(Debug, Clone)] -pub struct EventEmitter(Receiver); +#[pin_project] +pub struct EventEmitter(#[pin] Receiver); impl EventEmitter { /// Async recv of an event. Return `None` if the `Sender` has been dropped. @@ -77,10 +79,10 @@ impl futures::stream::Stream for EventEmitter { type Item = Event; fn poll_next( - mut self: std::pin::Pin<&mut Self>, + self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - std::pin::Pin::new(&mut self.0).poll_next(cx) + self.project().0.poll_next(cx) } }