Skip to content
This repository has been archived by the owner on Mar 31, 2023. It is now read-only.

Commit

Permalink
perf(cluster): run scan operation in parallel across nodes (#7)
Browse files Browse the repository at this point in the history
SCAN: Iterate over cluster nodes in parallel rather than sequentially

* with a MATCH scan I measured this as iterating over 2.5x keys per
  second compared to previous version (with 8 cluster nodes)
  • Loading branch information
eoghanmurray authored Jun 17, 2021
1 parent 553d321 commit 4fbb46d
Showing 1 changed file with 34 additions and 13 deletions.
47 changes: 34 additions & 13 deletions yaaredis/commands/iter.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import asyncio


class IterCommandMixin:
"""
convenient function of scan iter, make it a class separately
Expand Down Expand Up @@ -81,18 +84,36 @@ class ClusterIterCommandMixin(IterCommandMixin):
async def scan_iter(self, match=None, count=None,
type=type): # pylint: disable=redefined-builtin
nodes = await self.cluster_nodes()

async def iterate_node(node, queue):
cursor = '0'
while cursor != 0:
pieces = [cursor]
if match is not None:
pieces.extend(['MATCH', match])
if count is not None:
pieces.extend(['COUNT', count])
if type is not None:
pieces.extend(['TYPE', type])
response = await self.execute_command_on_nodes(
[node], 'SCAN', *pieces)
cursor, data = list(response.values())[0]
for item in data:
await queue.put(item) # blocks if queue is full

# maxsize ensures we don't pull too much data into
# memory if we are not processing it yet
maxsize = 10 if count is None else count
# reducing maxsize by one: the idea here is that the SCAN for an individual
# node can never fill the queue in a single iteration, so we'll get at most
# one SCAN iteration for each node if the queue is never consumed
maxsize -= 1
queue = asyncio.Queue(maxsize=maxsize)
tasks = []
for node in nodes:
if 'master' in node['flags']:
cursor = '0'
while cursor != 0:
pieces = [cursor]
if match is not None:
pieces.extend(['MATCH', match])
if count is not None:
pieces.extend(['COUNT', count])
if type is not None:
pieces.extend(['TYPE', type])
response = await self.execute_command_on_nodes([node], 'SCAN', *pieces)
cursor, data = list(response.values())[0]
for item in data:
yield item
t = asyncio.create_task(iterate_node(node, queue))
tasks.append(t)

while not all(t.done() for t in tasks) or not queue.empty():
yield await queue.get()

0 comments on commit 4fbb46d

Please sign in to comment.