Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Panic on concurrent queue declaration #145

Open
ShaddyDC opened this issue Sep 4, 2024 · 17 comments
Open

Panic on concurrent queue declaration #145

ShaddyDC opened this issue Sep 4, 2024 · 17 comments
Labels
documentation Improvements or additions to documentation

Comments

@ShaddyDC
Copy link

ShaddyDC commented Sep 4, 2024

We recently ran into an issue where some of our tests using amqprs 1.7.0 would fail. I honestly don't know why they worked before when we didn't change anything relevant, but since about 5 days ago we would get a panic when declaring a queue repeatedly in a short time frame.
I managed to create this minimum sample from the pub-sub example:

use amqprs::{
    callbacks::{DefaultChannelCallback, DefaultConnectionCallback},
    channel::{
        BasicConsumeArguments, BasicPublishArguments, QueueBindArguments, QueueDeclareArguments,
    },
    connection::{Connection, OpenConnectionArguments},
    consumer::DefaultConsumer,
    BasicProperties,
};
use futures_util::future::join_all;
use tokio::time;

use tracing_subscriber::{fmt, prelude::*, EnvFilter};

#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
async fn main() {
    // construct a subscriber that prints formatted traces to stdout
    // global subscriber with log level according to RUST_LOG
    tracing_subscriber::registry()
        .with(fmt::layer())
        .with(EnvFilter::from_default_env())
        .try_init()
        .ok();

    // open a connection to RabbitMQ server
    let connection = Connection::open(&OpenConnectionArguments::new(
        "localhost", 5672, "guest", "guest",
    ))
    .await
    .unwrap();
    connection
        .register_callback(DefaultConnectionCallback)
        .await
        .unwrap();

    // open a channel on the connection
    let channel = connection.open_channel(None).await.unwrap();
    channel
        .register_callback(DefaultChannelCallback)
        .await
        .unwrap();

    // declare a durable queue
    let futures = (0..2).map(|_| {
        channel.queue_declare(QueueDeclareArguments::durable_client_named(
            "amqprs.examples.basic",
        ))
    });

    let results = join_all(futures).await;

    for result in results {
        result.unwrap().unwrap();
    }
    let (queue_name, _, _) = channel
        .queue_declare(QueueDeclareArguments::durable_client_named(
            "amqprs.examples.basic",
        ))
        .await
        .unwrap()
        .unwrap();

    // bind the queue to exchange
    let routing_key = "amqprs.example";
    let exchange_name = "amq.topic";
    channel
        .queue_bind(QueueBindArguments::new(
            &queue_name,
            exchange_name,
            routing_key,
        ))
        .await
        .unwrap();

    //////////////////////////////////////////////////////////////////////////////
    // start consumer with given name
    let args = BasicConsumeArguments::new(&queue_name, "example_basic_pub_sub");

    channel
        .basic_consume(DefaultConsumer::new(args.no_ack), args)
        .await
        .unwrap();

    //////////////////////////////////////////////////////////////////////////////
    // publish message
    let content = String::from(
        r#"
            {
                "publisher": "example"
                "data": "Hello, amqprs!"
            }
        "#,
    )
    .into_bytes();

    // create arguments for basic_publish
    let args = BasicPublishArguments::new(exchange_name, routing_key);

    channel
        .basic_publish(BasicProperties::default(), content, args)
        .await
        .unwrap();

    // keep the `channel` and `connection` object from dropping before pub/sub is done.
    // channel/connection will be closed when drop.
    time::sleep(time::Duration::from_secs(1)).await;
    // explicitly close

    channel.close().await.unwrap();
    connection.close().await.unwrap();
}

The log looks like this:

2024-09-04T10:21:14.226083Z TRACE amqprs::net::split_connection: 521 bytes read from network
2024-09-04T10:21:14.226184Z TRACE amqprs::net::split_connection: RECV on channel 0: Start(MethodHeader { class_id: 10, method_id: 10 }, Start { version_major: 0, version_minor: 9, server_properties: FieldTable(476, {ShortStr(12, "capabilities"): F(FieldTable(199, {ShortStr(18, "publisher_confirms"): t(true), ShortStr(28, "authentication_failure_close"): t(true), ShortStr(18, "connection.blocked"): t(true), ShortStr(22, "consumer_cancel_notify"): t(true), ShortStr(15, "direct_reply_to"): t(true), ShortStr(26, "exchange_exchange_bindings"): t(true), ShortStr(16, "per_consumer_qos"): t(true), ShortStr(19, "consumer_priorities"): t(true), ShortStr(10, "basic.nack"): t(true)})), ShortStr(12, "cluster_name"): S(LongStr(19, "rabbit@f65d2c47302b")), ShortStr(9, "copyright"): S(LongStr(60, "Copyright (c) 2007-2024 Broadcom Inc and/or its subsidiaries")), ShortStr(11, "information"): S(LongStr(57, "Licensed under the MPL 2.0. Website: https://rabbitmq.com")), ShortStr(8, "platform"): S(LongStr(19, "Erlang/OTP 26.2.5.2")), ShortStr(7, "product"): S(LongStr(8, "RabbitMQ")), ShortStr(7, "version"): S(LongStr(6, "3.13.7"))}), mechanisms: LongStr(14, "PLAIN AMQPLAIN"), locales: LongStr(5, "en_US") })
2024-09-04T10:21:14.226245Z TRACE amqprs::net::split_connection: SENT on channel 0: StartOk(MethodHeader { class_id: 10, method_id: 11 }, StartOk { client_properties: FieldTable(142, {ShortStr(7, "product"): S(LongStr(6, "AMQPRS")), ShortStr(8, "platform"): S(LongStr(4, "Rust")), ShortStr(15, "connection_name"): S(LongStr(25, "AMQPRS000@localhost:5672/")), ShortStr(7, "version"): S(LongStr(3, "0.1")), ShortStr(12, "capabilities"): F(FieldTable(25, {ShortStr(22, "consumer_cancel_notify"): t(true)}))}), machanisms: ShortStr(5, "PLAIN"), response: LongStr(12, "\0guest\0guest"), locale: ShortStr(5, "en_US") })
2024-09-04T10:21:14.226615Z TRACE amqprs::net::split_connection: 20 bytes read from network
2024-09-04T10:21:14.226627Z TRACE amqprs::net::split_connection: RECV on channel 0: Tune(MethodHeader { class_id: 10, method_id: 30 }, Tune { channel_max: 2047, frame_max: 131072, heartbeat: 60 })
2024-09-04T10:21:14.226637Z TRACE amqprs::net::split_connection: SENT on channel 0: TuneOk(MethodHeader { class_id: 10, method_id: 31 }, TuneOk { channel_max: 2047, frame_max: 131072, heartbeat: 60 })
2024-09-04T10:21:14.226665Z TRACE amqprs::net::split_connection: SENT on channel 0: Open(MethodHeader { class_id: 10, method_id: 40 }, Open { virtual_host: ShortStr(1, "/"), capabilities: ShortStr(0, ""), insist: 0 })
2024-09-04T10:21:14.269047Z TRACE amqprs::net::split_connection: 13 bytes read from network
2024-09-04T10:21:14.269127Z TRACE amqprs::net::split_connection: RECV on channel 0: OpenOk(MethodHeader { class_id: 10, method_id: 41 }, OpenOk { know_hosts: ShortStr(0, "") })
2024-09-04T10:21:14.269574Z  INFO amqprs::api::connection: open connection AMQPRS000@localhost:5672/
2024-09-04T10:21:14.269593Z DEBUG amqprs::net::reader_handler: register channel resource on connection 'AMQPRS000@localhost:5672/ [open]'
2024-09-04T10:21:14.269671Z DEBUG amqprs::net::reader_handler: callback registered on connection 'AMQPRS000@localhost:5672/ [open]'
2024-09-04T10:21:14.269713Z DEBUG amqprs::net::reader_handler: register channel resource on connection 'AMQPRS000@localhost:5672/ [open]'
2024-09-04T10:21:14.269829Z TRACE amqprs::net::split_connection: SENT on channel 1: OpenChannel(MethodHeader { class_id: 20, method_id: 10 }, OpenChannel { out_of_band: ShortStr(0, "") })
2024-09-04T10:21:14.269957Z TRACE amqprs::net::writer_handler: connection 'AMQPRS000@localhost:5672/ [open]' heartbeat deadline is updated to Instant { tv_sec: 12201, tv_nsec: 692594584 }
2024-09-04T10:21:14.270771Z TRACE amqprs::net::split_connection: 16 bytes read from network
2024-09-04T10:21:14.270860Z TRACE amqprs::net::split_connection: RECV on channel 1: OpenChannelOk(MethodHeader { class_id: 20, method_id: 11 }, OpenChannelOk { channel_id: LongStr(0, "") })
2024-09-04T10:21:14.270909Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { tv_sec: 12231, tv_nsec: 693544892 }
2024-09-04T10:21:14.271004Z  INFO amqprs::api::connection: open channel 1 [open] of connection 'AMQPRS000@localhost:5672/ [open]'
2024-09-04T10:21:14.271038Z TRACE amqprs::api::channel::dispatcher: starts up dispatcher task of channel 1 [open] of connection 'AMQPRS000@localhost:5672/ [open]'
2024-09-04T10:21:14.272320Z DEBUG amqprs::api::channel::dispatcher: callback registered on channel 1 [open] of connection 'AMQPRS000@localhost:5672/ [open]'
2024-09-04T10:21:14.272470Z TRACE amqprs::net::split_connection: SENT on channel 1: DeclareQueue(MethodHeader { class_id: 50, method_id: 10 }, DeclareQueue { ticket: 0, queue: ShortStr(21, "amqprs.examples.basic"), bits: 2, arguments: FieldTable(0, {}) })
2024-09-04T10:21:14.272579Z TRACE amqprs::net::writer_handler: connection 'AMQPRS000@localhost:5672/ [open]' heartbeat deadline is updated to Instant { tv_sec: 12201, tv_nsec: 695221079 }
2024-09-04T10:21:14.272616Z TRACE amqprs::net::split_connection: SENT on channel 1: DeclareQueue(MethodHeader { class_id: 50, method_id: 10 }, DeclareQueue { ticket: 0, queue: ShortStr(21, "amqprs.examples.basic"), bits: 2, arguments: FieldTable(0, {}) })
2024-09-04T10:21:14.272665Z TRACE amqprs::net::writer_handler: connection 'AMQPRS000@localhost:5672/ [open]' heartbeat deadline is updated to Instant { tv_sec: 12201, tv_nsec: 695308609 }
2024-09-04T10:21:14.273137Z TRACE amqprs::net::split_connection: 84 bytes read from network
2024-09-04T10:21:14.273196Z TRACE amqprs::net::split_connection: RECV on channel 1: DeclareQueueOk(MethodHeader { class_id: 50, method_id: 11 }, DeclareQueueOk { queue: ShortStr(21, "amqprs.examples.basic"), message_count: 0, consumer_count: 0 })
2024-09-04T10:21:14.273251Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { tv_sec: 12231, tv_nsec: 695894645 }
2024-09-04T10:21:14.273303Z TRACE amqprs::net::split_connection: RECV on channel 1: DeclareQueueOk(MethodHeader { class_id: 50, method_id: 11 }, DeclareQueueOk { queue: ShortStr(21, "amqprs.examples.basic"), message_count: 0, consumer_count: 0 })
2024-09-04T10:21:14.273332Z TRACE amqprs::net::reader_handler: server heartbeat deadline is updated to Instant { tv_sec: 12231, tv_nsec: 695976560 }
thread 'thread 'tokio-runtime-workermain' panicked at ' panicked at /home/space/.cargo/registry/src/index.crates.io-6f17d22bba15001f/amqprs-1.7.0/src/api/channel/dispatcher.rssrc/main.rs::49053::4516:
:
internal error: entered unreachable code: responder must be registered for DeclareQueueOk(MethodHeader { class_id: 50, method_id: 11 }, DeclareQueueOk { queue: ShortStr(21, "amqprs.examples.basic"), message_count: 0, consumer_count: 0 }) on channel 1 [open] of connection 'AMQPRS000@localhost:5672/ [open]'called `Result::unwrap()` on an `Err` value: InternalChannelError("channel closed")

note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
2024-09-04T10:21:14.273603Z TRACE amqprs::api::channel: drop channel 1
2024-09-04T10:21:14.273669Z  INFO amqprs::api::channel: try to close channel 1 at drop
2024-09-04T10:21:14.

I'm not sure if this is an unintended way to use the library, but I don't think even then you should get an "unreachable code" panic.

The reason we're declaring a bunch of queues at once in the first place is that we have a job system where jobs are put on arbitrary queues, and workers that listen to it are only created on demand once there are actually jobs waiting in the queue.
We don't usually in practice re-declare the same queue multiple times in a small time frame, but we can't rule it out either. I guess for now we can manually check if we already declared a specific queue within a certain time frame, as it works if instead of doing join_all we use them in a loop and await them individually:

    for _ in 0..5 {
        let (queue_name, _, _) = channel
            .queue_declare(QueueDeclareArguments::durable_client_named(
                "amqprs.examples.basic",
            ))
            .await
            .unwrap()
            .unwrap();
    }
@gftea
Copy link
Owner

gftea commented Sep 4, 2024

Hi, unforntunately you can not do concurrent messaging on single channel, this is limitation in protocol.
You can only do concurrent queue declare on different channels

@gftea
Copy link
Owner

gftea commented Sep 4, 2024

@michaelklishin
Copy link
Contributor

michaelklishin commented Sep 4, 2024

Concurrent operations on a shared channel are explicitly prohibited by many RabbitMQ/AMQP 0-9-1 client libraries.

This particularly applies to publishing but also acknowledgement of messages (in particular N-at-a-time).

Adding consumers and declaring queues can work fine if you add synchronization around these operations. But a much better idea would be to use N channels, assuming the concurrency rate is reasonably low (say, < 100).

@michaelklishin
Copy link
Contributor

michaelklishin commented Sep 4, 2024

The reason we're declaring a bunch of queues at once in the first place is that we have a job > system where jobs are put on arbitrary queues, and workers that listen to it are only created > on demand once there are actually jobs waiting in the queue.

All of which is perfectly possible with synchronization or when done sequentially.

If you absolutely must declare N queues concurrently and do it all the time, you need to change your design. High connection, channel and queue churn are explicitly recommended against in the docs, although less so for queues.

Or you can use a stream with or without stream filtering using a separate client. I don't know if the stream client for Rust supports stream filtering and streams are even less meant to be transient/churned through but it can be an alternative.

@michaelklishin
Copy link
Contributor

@gftea this is a great candidate to be moved to a discussion, I don't think this client should even try to support concurrent synchronous (that is, having a protocol response) operations.

Some clients do that in some areas and with 10+ years experience with them I'd say that it was a mistake, perhaps besides the connection socket write synchronization.

@ShaddyDC
Copy link
Author

ShaddyDC commented Sep 5, 2024

Thank you all for the comments! This provides a lot of useful context I wasn't aware of.
I'll have to think about whether I want to go with multiple connections then or have a single task that gets messages from channels, but either should work fine for my use case.

I'm wondering if we can improve how this error case is handled then, if it's not something we generally want to support.
A panic with an "unreachable code" message seems surprising to me when you can provoke it while only using safe code, though admittedly while violating the library contract.

Perhaps we could consider either of

  1. Returning an error instead of panicking
  2. Enhancing the panic message with more context, possibly warning against concurrent usage (unless there are other ways to provoke this?)

These are just suggestions for discussion. I'm curious to hear your thoughts!

@ShaddyDC
Copy link
Author

ShaddyDC commented Sep 5, 2024

I was thinking about how I want to prevent people from making the same mistake again in our project, and I was considering making a wrapper around the channel that is !Sync.
Maybe I'm overlooking something, but is there a reason to allow using the channel from multiple threads at all?

@michaelklishin
Copy link
Contributor

We can return errors in certain cases but it's generally very difficult to detect concurrent channel use. Even with Rust's sophisticated type system it is not easy to express a restriction like that. I honestly don't know if !Sync would really do this limitation justice, since concurrency comes in many different shapes and forms.

@ShaddyDC what kind of error do you have in mind?

@michaelklishin
Copy link
Contributor

@ShaddyDC I like the idea of prohibiting concurrent channel use. The only area where it is a safe thing to do comes down to having concurrent consumers on a shared channel, as long as the consumers receive deliveries in automatic acknowledgement mode or ack a single delivery at a time.

In other clients this was not done for many reasons, namely it is not something easy to express in the type system or otherwise (for dynamically-typed languages).

If you think that a !Sync marker would be enough, I'd say we should seriously consider this idea.

@ShaddyDC
Copy link
Author

ShaddyDC commented Sep 6, 2024

I've looked into it a bit more, and unfortunately multi-threading isn't really the problem, so !Sync is not a solution. You can easily provoke the panic even with a single thread, and !Sync only restricts usage across multiple threads.

An async function like the following

async fn f() {
    a().await;
    b().await;
}

can be interrupted between a and b for another scheduled call of f to run, which will then be interrupted at the same point. In other words, if two calls are scheduled together, they will kind of advance through the state in lockstep, and most race conditions will always be triggered, even on a single thread.
So it doesn't seem like you can prevent this on a type system level, after all, unfortunately. :(

I guess a mutex or running your own task with channel are options, but those are probably beyond the scope of this library and left to users.

We can return errors in certain cases but it's generally very difficult to detect concurrent channel use. Even with Rust's sophisticated type system it is not easy to express a restriction like that. I honestly don't know if !Sync would really do this limitation justice, since concurrency comes in many different shapes and forms.

@ShaddyDC what kind of error do you have in mind?

I wasn't thinking about any sophisticated way of detecting concurrent channel use. My suggestion was just to, where the panic currently happens, instead return some error like NoHandlerError that has a display message along the lines of "No handler for x, if you're doing concurrent things, that's probably why, see about why it's bad and how to do it right".
Of course that's only a great idea if there are no other weird ways to fall into the same code path. In case a panic is the better way to handle it after all, a similarly expanded error message might also make it easier to debug for others.

@gftea
Copy link
Owner

gftea commented Sep 6, 2024

the async runtime is multi-thread runtime, the await means it can be scheduled in multiple threads

@ShaddyDC
Copy link
Author

ShaddyDC commented Sep 6, 2024

That is true, but I've explicitly tested with a single worker thread and with the single-threaded runtime (current_thread), and I didn't get any compile time errors when making both the channel as well as the futures it returns !Send and !Sync while the panic still occurs. It's of course possible I made a mistake, but I'm reasonably confident this issue also exists on a single thread when the async state machine intermixes multiple calls to the same queue declaration command and that the type system does not help prevent this kind of misuse, at least not like this.

Testing code for reference:

use std::{cell::Cell, marker::PhantomData};

use amqprs::{
    callbacks::{ChannelCallback, DefaultChannelCallback, DefaultConnectionCallback},
    channel::{Channel, QueueDeclareArguments},
    connection::{Connection, OpenConnectionArguments},
    AmqpMessageCount,
};
use futures_util::{future::join_all, Future};
use tokio::time;

use tracing_subscriber::{fmt, prelude::*, EnvFilter};

fn assert_sync<T: Sync>(_: T) {}
fn assert_send<T: Send>(_: T) {}

pub struct UnsyncChannel {
    inner: Channel,

    // Needed to make type `!Sync`
    _marker: PhantomData<Cell<()>>,
}

impl UnsyncChannel {
    pub fn new(channel: Channel) -> Self {
        UnsyncChannel {
            inner: channel,
            _marker: PhantomData,
        }
    }

    // `!Send` and `!Sync` future
    pub fn register_callback<F>(&self, callback: F) -> impl Future<Output = ()> + '_
    where
        F: ChannelCallback + Send + 'static,
    {
        async move { self.inner.register_callback(callback).await.unwrap() }
    }

    // `!Send` and `!Sync` future
    pub fn queue_declare(
        &self,
        args: QueueDeclareArguments,
    ) -> impl Future<Output = Option<(String, AmqpMessageCount, u32)>> + '_ {
        async move { self.inner.queue_declare(args).await.unwrap() }
    }

    pub fn close(self) -> impl Future<Output = ()> {
        async move { self.inner.close().await.unwrap() }
    }
}

#[tokio::main(flavor = "current_thread")]
async fn main() {
    // construct a subscriber that prints formatted traces to stdout
    // global subscriber with log level according to RUST_LOG
    tracing_subscriber::registry()
        .with(fmt::layer())
        .with(EnvFilter::from_default_env())
        .try_init()
        .ok();

    // open a connection to RabbitMQ server
    let connection = Connection::open(&OpenConnectionArguments::new(
        "localhost",
        5672,
        "guest",
        "guest",
    ))
    .await
    .unwrap();
    connection
        .register_callback(DefaultConnectionCallback)
        .await
        .unwrap();

    // open a channel on the connection
    let channel = connection.open_channel(None).await.unwrap();
    let channel = UnsyncChannel::new(channel);
    channel.register_callback(DefaultChannelCallback).await;

    let futures = (0..2).map(|_| {
        channel.queue_declare(QueueDeclareArguments::durable_client_named(
            "amqprs.examples.basic",
        ))
    });

    let results = join_all(futures).await;

    for result in results {
        result.unwrap();
    }

    channel
        .queue_declare(QueueDeclareArguments::durable_client_named(
            "amqprs.examples.basic",
        ))
        .await;

    // Fail to compile
    // assert_send(
    //     channel.queue_declare(QueueDeclareArguments::durable_client_named(
    //         "amqprs.examples.basic",
    //     )),
    // );
    // assert_sync(
    //     channel.queue_declare(QueueDeclareArguments::durable_client_named(
    //         "amqprs.examples.basic",
    //     )),
    // );

    // keep the `channel` and `connection` object from dropping before pub/sub is done.
    // channel/connection will be closed when drop.
    time::sleep(time::Duration::from_secs(1)).await;
    // explicitly close

    channel.close().await;
    connection.close().await.unwrap();
}

@gftea
Copy link
Owner

gftea commented Sep 6, 2024

the problem is you are using let results = join_all(futures).await;
this means interleaving of channel, the underlying reason is the the raw frame in protocol need to in sequential order, otherwise e.g you can have frame1, frame2, ack1, ack2, then it will fail.

see the docs. Note that concurrent is not parallerism, single thread with async also concurrent.

It is not recommended to share same Channel object between tasks/threads, because it allows interleaving the AMQP protocol messages in same channel in concurrent setup. Applications should be aware of the this limitation in AMQP protocol itself.

@ShaddyDC
Copy link
Author

ShaddyDC commented Sep 6, 2024

Yes, exactly, I had originally not considered that when I suggested using !Sync to prevent the issue on a type system level, but then I realised it when playing around.
I'm sorry if I was being unclear in my last two messages, but I agree with you. Unless you want to put in a mutex on a library level, this is not something you can fix or prevent, afaict. That's why I was focusing on the suggestion of just improving the error message to make the problem root cause clearer if people misuse the library in the same way as me.

@gftea
Copy link
Owner

gftea commented Sep 10, 2024

@ShaddyDC, the fundamental here is the wire frame will get out of order, so it is non-trival to provide precise and correct error messages, because in order to do that, it would need to able distinguish what frame is incorrectly interleaving and explain it is due to misuse or internal bugs in implementation.

The documentation has stated tasks/threads and interleaving the AMQP protocol messages in same channel in concurrent setup., do you think this wording are not clear, or any improvement on it

@gftea gftea added the documentation Improvements or additions to documentation label Sep 10, 2024
@ShaddyDC
Copy link
Author

No, I think the documentation is fine. I think I just didn't read that part very carefully for whatever reason. I don't think I'm the only person who that will happen to, so to me it seems very useful to improve the diagnostics when things actually do go wrong. If that is very difficult to do correctly, then that's unfortunate, but I don't understand the library well enough to offer more suggestions there.

I've got one more suggestion for adjusting the type system to potentially prevent misuse like this. Currently, operations like queue_declare take a const reference, or in other words a shared reference. Is there a reason it cannot be a unique reference, ie a &mut self?
It is slightly more cumbersome to use and potentially more restrictive than it would absolutely have to be, but I don't think that would be a huge problem, and it would nicely enforce that under no circumstances can those functions be called concurrently, because you cannot share a mutable reference.

If that does not seem like a good approach for this problem either, then I don't have any better ideas. Feel free to close the issue in that case, and thank you for bearing with me. I appreciate the insight you gave me into the problem. :)

@gftea
Copy link
Owner

gftea commented Sep 11, 2024

seems to me good to make receiver as &mut on channel api, it would be breaking changes.
I think maybe can be considered to make it in version 2 when I have time

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements or additions to documentation
Projects
None yet
Development

No branches or pull requests

3 participants