-
Notifications
You must be signed in to change notification settings - Fork 79
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
How to reconnect inside the Client context manager #334
Comments
Hi there, thanks for opening this issue! 🙂 You already said it, reconnection is trickier than it looks. The connection can break at any time and it's not always possible to reconnect. We have multiple options of how a reconnection interface could look like:
There's some additional discussion in #287, I would be very interested to hear what you think is best! As to your issue, an async context manager provides two magic methods, client.__aexit__(None, None, None)
client.__aenter__() Let me know if that helps! |
Thanks for your reply!
We as in "user of aiomqtt" or "developer how to design the interface"? In my world I would be able to call a connect()/disconnect() (which as far as I understand existed but got purged) and/or being able to check on whether the MQTT conn is actually alive and working and wait and give it all some time if it isn't.
I'm not sure I totally understand the approach looking at my example - I guess this is /not/ what it would/should look like?
Taking aside that this looks very clumsy (to me), this, once there was a conn issue - according to my understanding - wouldn't (a)wait the connection being re-established, but infinitely(?) try to publish, fail, aexit and aenter. If anyhow possible, I would really appreciate if you could provide modifications to the initial example dealing with connection issues in main(). Thanks a lot! |
To clarify, there's currently no official way to reconnect other than exiting the context manager and entering it again. That approach is lacking and I'd like to implement something better, but I don't yet know how I want it to look like. So, anything else is going to be a workaround. Here's a self-contained example to reconnect inside the context manager: import asyncio
import aiomqtt
import contextlib
async def main():
async with aiomqtt.Client("localhost") as client:
while True:
await client.subscribe("test/+")
try:
async for message in client.messages:
print(message.payload)
except aiomqtt.MqttError:
print("Lost connection to the MQTT broker")
while True:
print("Attempting to reconnect to the MQTT broker")
with contextlib.suppress(aiomqtt.MqttError):
await client.__aexit__(None, None, None)
with contextlib.suppress(aiomqtt.MqttError):
await client.__aenter__()
print("Successfully reconnected to the MQTT broker")
break
await asyncio.sleep(1)
asyncio.run(main()) The internal (again, workaround, might break without notice) |
Thank you very much! I wonder, though, how the example would look like, if I had the iteration over messages in its own task and the publish calls in the main function - can I aexit and aenter in both functions "concurrently"? I know, asyncio is technically not concurrent, but I wonder if the publish calls in I tried adding my thoughts into #287 - not sure my input helps, though, as it's coming from a user's perspective (currently) only thinking about his very issue. |
I just noticed the subscription / iteration over messages isn't even guarded by a try block - so my question doesn't really make sense, at least not with reference to your example. |
That's a good point, I can see how that can lead to race conditions. A "correct" implementation of a Thanks much for your comment on #287! It's super helpful to hear different opinions on this and you raise some very good points 👍 At the moment, I think failing on publish/subscribe/etc. when the client is disconnected and doing reconnection internally in a background task (Your option
If you're not only publishing messages, it should be enough to put the reconnection logic only on the message iteration: import asyncio
import aiomqtt
import contextlib
async def publish(client):
while True:
with contextlib.suppress(aiomqtt.MqttError):
await client.publish("test/example", 28.4)
await asyncio.sleep(1)
async def main():
async with aiomqtt.Client("localhost") as client, asyncio.TaskGroup() as group:
group.create_task(publish(client))
while True:
await client.subscribe("test/+")
try:
async for message in client.messages:
print(message.payload)
except aiomqtt.MqttError:
print("Lost connection to the MQTT broker")
while True:
print(f"Attempting to reconnect to the MQTT broker in 2 seconds")
await asyncio.sleep(2)
with contextlib.suppress(aiomqtt.MqttError):
await client.__aexit__(None, None, None)
with contextlib.suppress(aiomqtt.MqttError):
await client.__aenter__()
print("Successfully reconnected to the MQTT broker")
break
asyncio.run(main()) |
I tried to adapt your example the way I'd like to use it (iterating over messages in a task):
Once a connection issue is detected ("simulated" by
However even after restoring the routing, it does not reconnect. |
@empicano Sorry to bother you about that once again, but if you could provide an idea on how to solve re-connection, where iteration over messages is happening in its own task, that would be tremendously appreciated - as I'm still losing my hair over this. |
Thanks for the detailed test! I tried around with this yesterday, but somehow can't reproduce the getting stuck part on MacOS. This is the async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
tb: TracebackType | None,
) -> None:
"""Disconnect from the broker."""
if self._disconnected.done():
# Return early if the client is already disconnected
if self._lock.locked():
self._lock.release()
if (exc := self._disconnected.exception()) is not None:
# If the disconnect wasn't intentional, raise the error that caused it
raise exc
return
# Try to gracefully disconnect from the broker
rc = self._client.disconnect()
if rc == mqtt.MQTT_ERR_SUCCESS:
# Wait for acknowledgement
await self._wait_for(self._disconnected, timeout=None)
# Reset `_connected` if it's still in completed state after disconnecting
if self._connected.done():
self._connected = asyncio.Future()
else:
self._logger.warning(
"Could not gracefully disconnect: %d. Forcing disconnection.", rc
)
# Force disconnection if we cannot gracefully disconnect
if not self._disconnected.done():
self._disconnected.set_result(None)
# Release the reusability lock
if self._lock.locked():
self._lock.release() The only part where I can imagine it getting stuck is the call to paho's |
I read the docs and examples and tickets about reconnection.
My problem is: I don't want to wrap everything in a while-loop and re-execute everything just because of a connection issue, but potentially re-exec a failed call - while the messages are being iterated over in its own asyncio task.
Let's start - this is what I gathered from the docs, and essentially works:
The problem is: whenever there's a connection issue during a publish call, the whole code clock gets executed again. This is not what I want. I want to e.g. decide, whether to try again, to skip, or do whatever.
As far as I understand, I do have to do all that within the same context, so I cant just wrap every publish call around a(nother)
async with mqtt_client
.I was also trying to put the (re)connect routine into its own task, which again doesn't work, because then the client context is not available in main() anymore. I'm losing hair over this seemingly simple issue and would very much appreciate some pointer on how to achieve that, as I can't see the wood for the trees anymore.
Thanks in advance!
The text was updated successfully, but these errors were encountered: