-
Notifications
You must be signed in to change notification settings - Fork 28
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Panic on concurrent queue declaration #145
Comments
Hi, unforntunately you can not do concurrent messaging on single channel, this is limitation in protocol. |
Concurrent operations on a shared channel are explicitly prohibited by many RabbitMQ/AMQP 0-9-1 client libraries. This particularly applies to publishing but also acknowledgement of messages (in particular N-at-a-time). Adding consumers and declaring queues can work fine if you add synchronization around these operations. But a much better idea would be to use N channels, assuming the concurrency rate is reasonably low (say, < 100). |
All of which is perfectly possible with synchronization or when done sequentially. If you absolutely must declare N queues concurrently and do it all the time, you need to change your design. High connection, channel and queue churn are explicitly recommended against in the docs, although less so for queues. Or you can use a stream with or without stream filtering using a separate client. I don't know if the stream client for Rust supports stream filtering and streams are even less meant to be transient/churned through but it can be an alternative. |
@gftea this is a great candidate to be moved to a discussion, I don't think this client should even try to support concurrent synchronous (that is, having a protocol response) operations. Some clients do that in some areas and with 10+ years experience with them I'd say that it was a mistake, perhaps besides the connection socket write synchronization. |
Thank you all for the comments! This provides a lot of useful context I wasn't aware of. I'm wondering if we can improve how this error case is handled then, if it's not something we generally want to support. Perhaps we could consider either of
These are just suggestions for discussion. I'm curious to hear your thoughts! |
I was thinking about how I want to prevent people from making the same mistake again in our project, and I was considering making a wrapper around the channel that is |
We can return errors in certain cases but it's generally very difficult to detect concurrent channel use. Even with Rust's sophisticated type system it is not easy to express a restriction like that. I honestly don't know if @ShaddyDC what kind of error do you have in mind? |
@ShaddyDC I like the idea of prohibiting concurrent channel use. The only area where it is a safe thing to do comes down to having concurrent consumers on a shared channel, as long as the consumers receive deliveries in automatic acknowledgement mode or ack a single delivery at a time. In other clients this was not done for many reasons, namely it is not something easy to express in the type system or otherwise (for dynamically-typed languages). If you think that a |
I've looked into it a bit more, and unfortunately multi-threading isn't really the problem, so An async function like the following async fn f() {
a().await;
b().await;
} can be interrupted between I guess a mutex or running your own task with channel are options, but those are probably beyond the scope of this library and left to users.
I wasn't thinking about any sophisticated way of detecting concurrent channel use. My suggestion was just to, where the panic currently happens, instead return some error like |
the async runtime is multi-thread runtime, the await means it can be scheduled in multiple threads |
That is true, but I've explicitly tested with a single worker thread and with the single-threaded runtime ( Testing code for reference: use std::{cell::Cell, marker::PhantomData};
use amqprs::{
callbacks::{ChannelCallback, DefaultChannelCallback, DefaultConnectionCallback},
channel::{Channel, QueueDeclareArguments},
connection::{Connection, OpenConnectionArguments},
AmqpMessageCount,
};
use futures_util::{future::join_all, Future};
use tokio::time;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
fn assert_sync<T: Sync>(_: T) {}
fn assert_send<T: Send>(_: T) {}
pub struct UnsyncChannel {
inner: Channel,
// Needed to make type `!Sync`
_marker: PhantomData<Cell<()>>,
}
impl UnsyncChannel {
pub fn new(channel: Channel) -> Self {
UnsyncChannel {
inner: channel,
_marker: PhantomData,
}
}
// `!Send` and `!Sync` future
pub fn register_callback<F>(&self, callback: F) -> impl Future<Output = ()> + '_
where
F: ChannelCallback + Send + 'static,
{
async move { self.inner.register_callback(callback).await.unwrap() }
}
// `!Send` and `!Sync` future
pub fn queue_declare(
&self,
args: QueueDeclareArguments,
) -> impl Future<Output = Option<(String, AmqpMessageCount, u32)>> + '_ {
async move { self.inner.queue_declare(args).await.unwrap() }
}
pub fn close(self) -> impl Future<Output = ()> {
async move { self.inner.close().await.unwrap() }
}
}
#[tokio::main(flavor = "current_thread")]
async fn main() {
// construct a subscriber that prints formatted traces to stdout
// global subscriber with log level according to RUST_LOG
tracing_subscriber::registry()
.with(fmt::layer())
.with(EnvFilter::from_default_env())
.try_init()
.ok();
// open a connection to RabbitMQ server
let connection = Connection::open(&OpenConnectionArguments::new(
"localhost",
5672,
"guest",
"guest",
))
.await
.unwrap();
connection
.register_callback(DefaultConnectionCallback)
.await
.unwrap();
// open a channel on the connection
let channel = connection.open_channel(None).await.unwrap();
let channel = UnsyncChannel::new(channel);
channel.register_callback(DefaultChannelCallback).await;
let futures = (0..2).map(|_| {
channel.queue_declare(QueueDeclareArguments::durable_client_named(
"amqprs.examples.basic",
))
});
let results = join_all(futures).await;
for result in results {
result.unwrap();
}
channel
.queue_declare(QueueDeclareArguments::durable_client_named(
"amqprs.examples.basic",
))
.await;
// Fail to compile
// assert_send(
// channel.queue_declare(QueueDeclareArguments::durable_client_named(
// "amqprs.examples.basic",
// )),
// );
// assert_sync(
// channel.queue_declare(QueueDeclareArguments::durable_client_named(
// "amqprs.examples.basic",
// )),
// );
// keep the `channel` and `connection` object from dropping before pub/sub is done.
// channel/connection will be closed when drop.
time::sleep(time::Duration::from_secs(1)).await;
// explicitly close
channel.close().await;
connection.close().await.unwrap();
} |
the problem is you are using see the docs. Note that concurrent is not parallerism, single thread with async also concurrent.
|
Yes, exactly, I had originally not considered that when I suggested using |
@ShaddyDC, the fundamental here is the wire frame will get out of order, so it is non-trival to provide precise and correct error messages, because in order to do that, it would need to able distinguish what frame is incorrectly interleaving and explain it is due to misuse or internal bugs in implementation. The documentation has stated |
No, I think the documentation is fine. I think I just didn't read that part very carefully for whatever reason. I don't think I'm the only person who that will happen to, so to me it seems very useful to improve the diagnostics when things actually do go wrong. If that is very difficult to do correctly, then that's unfortunate, but I don't understand the library well enough to offer more suggestions there. I've got one more suggestion for adjusting the type system to potentially prevent misuse like this. Currently, operations like If that does not seem like a good approach for this problem either, then I don't have any better ideas. Feel free to close the issue in that case, and thank you for bearing with me. I appreciate the insight you gave me into the problem. :) |
seems to me good to make receiver as &mut on channel api, it would be breaking changes. |
We recently ran into an issue where some of our tests using amqprs 1.7.0 would fail. I honestly don't know why they worked before when we didn't change anything relevant, but since about 5 days ago we would get a panic when declaring a queue repeatedly in a short time frame.
I managed to create this minimum sample from the pub-sub example:
The log looks like this:
I'm not sure if this is an unintended way to use the library, but I don't think even then you should get an "unreachable code" panic.
The reason we're declaring a bunch of queues at once in the first place is that we have a job system where jobs are put on arbitrary queues, and workers that listen to it are only created on demand once there are actually jobs waiting in the queue.
We don't usually in practice re-declare the same queue multiple times in a small time frame, but we can't rule it out either. I guess for now we can manually check if we already declared a specific queue within a certain time frame, as it works if instead of doing
join_all
we use them in a loop and await them individually:The text was updated successfully, but these errors were encountered: