Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proper redis-cluster MGET & MSET by being aware of hash tags #178

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 42 additions & 13 deletions aredis/commands/strings.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
dict_merge,
bool_ok,
string_keys_to_dict)
from collections import OrderedDict, defaultdict


class BitField(object):
Expand Down Expand Up @@ -329,34 +330,62 @@ async def mget(self, keys, *args):
Returns a list of values ordered identically to ``keys``

Cluster impl:
Itterate all keys and send GET for each key.
This will go alot slower than a normal mget call in StrictRedis.

Operation is no longer atomic.
"""
res = list()
for arg in list_or_args(keys, args):
res.append(await self.get(arg))
return res
Find groups of keys with the same hash tag
https://redis.io/topics/cluster-tutorial#redis-cluster-data-sharding
and execute an MGET for each group
execute individual GETs for all other keys passed in

Operation will be atomic only if keys belonging to a single
hash tag are passed in.
"""
res_mapping = OrderedDict()
hash_tag_slots = defaultdict(list)
for key in list_or_args(keys, args):
if key.count('{') == key.count('}') == 1 and \
-1 < key.index('{') < key.index('}') - 1:
hash_tag = key.split('{')[1].split('}')[0]
hash_tag_slots[hash_tag].append(key)
res_mapping[key] = None # establish correct ordering
else:
# a loose key without a hash tag, can't use MGET
res_mapping[key] = await self.get(key)
for mget_keys in hash_tag_slots.values():
mget_res = await self.execute_command('MGET', *mget_keys)
for key, res in zip(mget_keys, mget_res):
res_mapping[key] = res
return list(res_mapping.values())

async def mset(self, *args, **kwargs):
"""
Sets key/values based on a mapping. Mapping can be supplied as a single
dictionary argument or as kwargs.

Cluster impl:
Itterate over all items and do SET on each (k,v) pair
Find groups of keys with the same hash tag
https://redis.io/topics/cluster-tutorial#redis-cluster-data-sharding
and execute an MSET for each group
execute individual SETs for all other key/value pairs

Operation is no longer atomic.
Operation will be atomic only if keys belonging to a single
hash tag are passed in.
"""
if args:
if len(args) != 1 or not isinstance(args[0], dict):
raise RedisError('MSET requires **kwargs or a single dict arg')
kwargs.update(args[0])

hash_tag_slots = defaultdict(list)
for pair in iteritems(kwargs):
await self.set(pair[0], pair[1])

key, v = pair
if key.count('{') == key.count('}') == 1 and \
-1 < key.index('{') < key.index('}') - 1:
hash_tag = key.split('{')[1].split('}')[0]
hash_tag_slots[hash_tag].extend(pair)
else:
# a loose key without a hash tag, can't use MSET
await self.set(key, v)
for mset_items in hash_tag_slots.values():
await self.execute_command('MSET', *mset_items)
return True

async def msetnx(self, *args, **kwargs):
Expand Down