diff --git a/mqclient/broker_clients/rabbitmq.py b/mqclient/broker_clients/rabbitmq.py index 869c5ebd..5cb4adc5 100644 --- a/mqclient/broker_clients/rabbitmq.py +++ b/mqclient/broker_clients/rabbitmq.py @@ -110,10 +110,10 @@ def __init__( self._next_channel_number = 1 # must start at 1 (not 0) def open_channel(self) -> pika.adapters.blocking_connection.BlockingChannel: - """Add a channel for the connection and configure.""" - LOGGER.info(f"Adding channel to connection for '{self.queue=}'") + """Open a channel for the connection and configure.""" + LOGGER.info(f"Opening channel to connection for '{self.queue=}'") if not self.connection: - raise ClosingFailedException("No connection to add channel.") + raise ClosingFailedException("No connection to open channel.") # give unique channel_number b/c pika has a delay on re-connections in which it will recycle a closed channel channel = self.connection.channel(self._next_channel_number) @@ -132,7 +132,6 @@ def open_channel(self) -> pika.adapters.blocking_connection.BlockingChannel: queue=self.queue, durable=True, arguments={"x-queue-type": "quorum"} ) - # self.channels.append(channel) LOGGER.info(f"Opened channel '{channel.channel_number}'") return channel @@ -151,8 +150,6 @@ async def close(self) -> None: """Close connection.""" await super().close() - # if not self.channels: - # raise ClosingFailedException("No channel to close.") if not self.connection: raise ClosingFailedException("No connection to close.") if self.connection.is_closed: @@ -160,8 +157,7 @@ async def close(self) -> None: return try: - # self.channel.cancel() -- done by self.connection.close() - self.connection.close() + self.connection.close() # also closes each channel except Exception as e: raise ClosingFailedException() from e @@ -185,7 +181,7 @@ def __init__( self.channel: Optional[pika.adapters.blocking_connection.BlockingChannel] = None def open_channel(self) -> pika.adapters.blocking_connection.BlockingChannel: - """Add a channel for the connection and configure.""" + """Open a channel for the connection and configure.""" if self.channel: raise MQClientException("RabbitMQPub instance can only have one channel") @@ -285,7 +281,7 @@ def __init__( ] = None def open_channel(self) -> pika.adapters.blocking_connection.BlockingChannel: - """Add a channel for the connection and configure. + """Open a channel for the connection and configure. Turn on prefetching. """ @@ -334,7 +330,7 @@ async def close(self) -> None: LOGGER.debug(log_msgs.CLOSED_SUB) @staticmethod - def _to_message( # type: ignore[override] # noqa: F821 # pylint: disable=W0221 + def _to_message( # noqa: F821 # pylint: disable=W0221 method_frame: Optional[pika.spec.Basic.GetOk], body: Optional[Union[str, bytes]], channel_number: int, @@ -378,20 +374,8 @@ def _get_msg(): except StopIteration: return (None, None, None) - # def infinite_loop_over_channels() -> ( - # Iterator[pika.adapters.blocking_connection.BlockingChannel] - # ): - # # this allows self.channels to be updated, - # # updates are reflected on outer-loop - # # itertools.cycle() does not allow updates - # while True: - # yield from self.channels - - # inf_channels_gen = infinite_loop_over_channels() - # channel = next(inf_channels_gen) # always called manually - # n_nonempty_channels_remaining = len(self.channels) # assume all are non-empty - remaining_channels = copy.copy(self.active_channels) # start with all - channel = remaining_channels[0] # TODO - use by priority? + remaining_active_channels = copy.copy(self.active_channels) # start with all + channel = remaining_active_channels[0] while True: try: @@ -413,6 +397,7 @@ def _get_msg(): except pika.exceptions.StreamLostError as e: raise MQClientException(HEARTBEAT_STREAMLOSTERROR_MSG) from e + # # YIELD (got a message) if msg := RabbitMQSub._to_message( pika_msg[0], @@ -420,57 +405,33 @@ def _get_msg(): channel.channel_number, ): LOGGER.debug(f"{log_msgs.GETMSG_RECEIVED_MESSAGE} ({msg.msg_id!r}).") - # n_nonempty_channels_remaining = len(self.channels) # reset! if channel == self.reserve_channel: # if this was the reserve channel, move it to active channels self.active_channels.append(self.reserve_channel) self.reserve_channel = None # no need to open a new one now - remaining_channels = copy.copy(self.active_channels) # reset! + remaining_active_channels = copy.copy(self.active_channels) # reset! yield msg + # # DEAL WITH EMPTY CHANNEL (didn't get a message) else: - # n_nonempty_channels_remaining -= 1 - LOGGER.debug("No message received -- switching channels...") if channel == self.reserve_channel: + LOGGER.debug("No message received -- tried all channels") # this means our reserve channel came up empty, # so there's REALLY nothing in the queue LOGGER.debug(log_msgs.GETMSG_NO_MESSAGE) - remaining_channels = copy.copy(self.active_channels) # reset! + remaining_active_channels = copy.copy(self.active_channels) # reset yield None else: - remaining_channels.remove(channel) - # if n_nonempty_channels_remaining == 0: - if not remaining_channels: - # don't reset n_nonempty_channels_remaining so we can see if this one is empty - # channel = self.open_channel() # try it now - # this new channel will be yielded by inf_channels_gen eventually - - # FIXME - this leads to MANY empty channels if the user - # keeps cycling despite coming up empty - # TODO - figure way to close or limit channels, but - # consider that some ack-pending messages need to use - # the channel to ack. Keep one local mapping? - # Currently uses msg._connection_id. - # Maybe just closing a newly-opened channel will suffice? - + LOGGER.debug("No message received -- switching channels...") + remaining_active_channels.remove(channel) + if not remaining_active_channels: # try reserve channel if not self.reserve_channel: self.reserve_channel = self.open_channel() channel = self.reserve_channel - - # continue - # elif n_nonempty_channels_remaining < 0: # -1 - # # this means our newly produced channel is empty, - # # so there's REALLY nothing in the queue - # LOGGER.debug(log_msgs.GETMSG_NO_MESSAGE) - # self.reserve_channel - # channel = next(inf_channels_gen) # try next, next time - # # FIXME - this design still allows a lot of new channels, one per final iter... - # yield None else: - # channel = next(inf_channels_gen) # try next, now - channel = remaining_channels[0] # TODO - use by priority? - # continue + # try next active channel + channel = remaining_active_channels[0] async def get_message( self, @@ -488,8 +449,6 @@ async def get_message( # None -> timeout break # get just one message - # self.channel.cancel() # this is done by `open_sub_one()` *after* ack/nack via `close()` - return msg def _get_channel_by_msg(