Skip to content

Commit

Permalink
comments & remove commented-out code
Browse files Browse the repository at this point in the history
  • Loading branch information
ric-evans committed Dec 1, 2023
1 parent cec74eb commit 892b3b6
Showing 1 changed file with 19 additions and 60 deletions.
79 changes: 19 additions & 60 deletions mqclient/broker_clients/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,10 @@ def __init__(
self._next_channel_number = 1 # must start at 1 (not 0)

def open_channel(self) -> pika.adapters.blocking_connection.BlockingChannel:
"""Add a channel for the connection and configure."""
LOGGER.info(f"Adding channel to connection for '{self.queue=}'")
"""Open a channel for the connection and configure."""
LOGGER.info(f"Opening channel to connection for '{self.queue=}'")
if not self.connection:
raise ClosingFailedException("No connection to add channel.")
raise ClosingFailedException("No connection to open channel.")

# give unique channel_number b/c pika has a delay on re-connections in which it will recycle a closed channel
channel = self.connection.channel(self._next_channel_number)
Expand All @@ -132,7 +132,6 @@ def open_channel(self) -> pika.adapters.blocking_connection.BlockingChannel:
queue=self.queue, durable=True, arguments={"x-queue-type": "quorum"}
)

# self.channels.append(channel)
LOGGER.info(f"Opened channel '{channel.channel_number}'")
return channel

Expand All @@ -151,17 +150,14 @@ async def close(self) -> None:
"""Close connection."""
await super().close()

# if not self.channels:
# raise ClosingFailedException("No channel to close.")
if not self.connection:
raise ClosingFailedException("No connection to close.")
if self.connection.is_closed:
LOGGER.warning("Attempted to close a connection that is already closed")
return

try:
# self.channel.cancel() -- done by self.connection.close()
self.connection.close()
self.connection.close() # also closes each channel
except Exception as e:
raise ClosingFailedException() from e

Expand All @@ -185,7 +181,7 @@ def __init__(
self.channel: Optional[pika.adapters.blocking_connection.BlockingChannel] = None

def open_channel(self) -> pika.adapters.blocking_connection.BlockingChannel:
"""Add a channel for the connection and configure."""
"""Open a channel for the connection and configure."""
if self.channel:
raise MQClientException("RabbitMQPub instance can only have one channel")

Expand Down Expand Up @@ -285,7 +281,7 @@ def __init__(
] = None

def open_channel(self) -> pika.adapters.blocking_connection.BlockingChannel:
"""Add a channel for the connection and configure.
"""Open a channel for the connection and configure.
Turn on prefetching.
"""
Expand Down Expand Up @@ -334,7 +330,7 @@ async def close(self) -> None:
LOGGER.debug(log_msgs.CLOSED_SUB)

@staticmethod
def _to_message( # type: ignore[override] # noqa: F821 # pylint: disable=W0221
def _to_message( # noqa: F821 # pylint: disable=W0221
method_frame: Optional[pika.spec.Basic.GetOk],
body: Optional[Union[str, bytes]],
channel_number: int,
Expand Down Expand Up @@ -378,20 +374,8 @@ def _get_msg():
except StopIteration:
return (None, None, None)

# def infinite_loop_over_channels() -> (
# Iterator[pika.adapters.blocking_connection.BlockingChannel]
# ):
# # this allows self.channels to be updated,
# # updates are reflected on outer-loop
# # itertools.cycle() does not allow updates
# while True:
# yield from self.channels

# inf_channels_gen = infinite_loop_over_channels()
# channel = next(inf_channels_gen) # always called manually
# n_nonempty_channels_remaining = len(self.channels) # assume all are non-empty
remaining_channels = copy.copy(self.active_channels) # start with all
channel = remaining_channels[0] # TODO - use by priority?
remaining_active_channels = copy.copy(self.active_channels) # start with all
channel = remaining_active_channels[0]

while True:
try:
Expand All @@ -413,64 +397,41 @@ def _get_msg():
except pika.exceptions.StreamLostError as e:
raise MQClientException(HEARTBEAT_STREAMLOSTERROR_MSG) from e

#
# YIELD (got a message)
if msg := RabbitMQSub._to_message(
pika_msg[0],
pika_msg[2],
channel.channel_number,
):
LOGGER.debug(f"{log_msgs.GETMSG_RECEIVED_MESSAGE} ({msg.msg_id!r}).")
# n_nonempty_channels_remaining = len(self.channels) # reset!
if channel == self.reserve_channel:
# if this was the reserve channel, move it to active channels
self.active_channels.append(self.reserve_channel)
self.reserve_channel = None # no need to open a new one now
remaining_channels = copy.copy(self.active_channels) # reset!
remaining_active_channels = copy.copy(self.active_channels) # reset!
yield msg
#
# DEAL WITH EMPTY CHANNEL (didn't get a message)
else:
# n_nonempty_channels_remaining -= 1
LOGGER.debug("No message received -- switching channels...")
if channel == self.reserve_channel:
LOGGER.debug("No message received -- tried all channels")
# this means our reserve channel came up empty,
# so there's REALLY nothing in the queue
LOGGER.debug(log_msgs.GETMSG_NO_MESSAGE)
remaining_channels = copy.copy(self.active_channels) # reset!
remaining_active_channels = copy.copy(self.active_channels) # reset
yield None
else:
remaining_channels.remove(channel)
# if n_nonempty_channels_remaining == 0:
if not remaining_channels:
# don't reset n_nonempty_channels_remaining so we can see if this one is empty
# channel = self.open_channel() # try it now
# this new channel will be yielded by inf_channels_gen eventually

# FIXME - this leads to MANY empty channels if the user
# keeps cycling despite coming up empty
# TODO - figure way to close or limit channels, but
# consider that some ack-pending messages need to use
# the channel to ack. Keep one local mapping?
# Currently uses msg._connection_id.
# Maybe just closing a newly-opened channel will suffice?

LOGGER.debug("No message received -- switching channels...")
remaining_active_channels.remove(channel)
if not remaining_active_channels:
# try reserve channel
if not self.reserve_channel:
self.reserve_channel = self.open_channel()
channel = self.reserve_channel

# continue
# elif n_nonempty_channels_remaining < 0: # -1
# # this means our newly produced channel is empty,
# # so there's REALLY nothing in the queue
# LOGGER.debug(log_msgs.GETMSG_NO_MESSAGE)
# self.reserve_channel
# channel = next(inf_channels_gen) # try next, next time
# # FIXME - this design still allows a lot of new channels, one per final iter...
# yield None
else:
# channel = next(inf_channels_gen) # try next, now
channel = remaining_channels[0] # TODO - use by priority?
# continue
# try next active channel
channel = remaining_active_channels[0]

async def get_message(
self,
Expand All @@ -488,8 +449,6 @@ async def get_message(
# None -> timeout
break # get just one message

# self.channel.cancel() # this is done by `open_sub_one()` *after* ack/nack via `close()`

return msg

def _get_channel_by_msg(
Expand Down

0 comments on commit 892b3b6

Please sign in to comment.