Skip to content

Commit

Permalink
Add blocking operations for SortedSets (#208)
Browse files Browse the repository at this point in the history
* Add blocking operations for SortedSets

* clean up code

---------

Co-authored-by: Ivan Ponomarev <[email protected]>
  • Loading branch information
Alex286756 and inponomarev authored Dec 2, 2023
1 parent 5a6e794 commit adb7c06
Show file tree
Hide file tree
Showing 28 changed files with 1,514 additions and 421 deletions.
672 changes: 623 additions & 49 deletions native_tests/linux/tests/support/util.tcl

Large diffs are not rendered by default.

527 changes: 258 additions & 269 deletions native_tests/linux/tests/unit/type/zset.tcl

Large diffs are not rendered by default.

26 changes: 14 additions & 12 deletions src/main/java/com/github/fppt/jedismock/Utils.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.github.fppt.jedismock;

import com.github.fppt.jedismock.exception.WrongValueTypeException;

import java.io.Closeable;

/**
Expand All @@ -16,15 +17,15 @@ public static void closeQuietly(Closeable closeable) {
}
}

public static long convertToLong(String value){
public static long convertToLong(String value) {
try {
return Long.parseLong(value);
} catch (NumberFormatException e) {
throw new WrongValueTypeException("ERR value is not an integer or out of range");
}
}

public static byte convertToByte(String value){
public static byte convertToByte(String value) {
try {
byte bit = Byte.parseByte(value);
if (bit != 0 && bit != 1) {
Expand All @@ -36,40 +37,37 @@ public static byte convertToByte(String value){
}
}

public static int convertToNonNegativeInteger(String value){
public static int convertToNonNegativeInteger(String value) {
try {
int pos = Integer.parseInt(value);
if(pos < 0) throw new NumberFormatException("Int less than 0");
if (pos < 0) throw new NumberFormatException("Int less than 0");
return pos;
} catch (NumberFormatException e) {
throw new WrongValueTypeException("ERR bit offset is not an integer or out of range");
}
}

public static int convertToInteger(String value){
public static int convertToInteger(String value) {
try {
return Integer.parseInt(value);
} catch (NumberFormatException e) {
throw new WrongValueTypeException("ERR value is not an integer or value is out of range");
}
}

public static double convertToDouble(String value){
public static double convertToDouble(String value) {
try {
return Double.parseDouble(value);
} catch (NumberFormatException e) {
throw new WrongValueTypeException("ERR bit offset is not a double or out of range");
}
}

public static String createRegexFromGlob(String glob)
{
public static String createRegexFromGlob(String glob) {
StringBuilder out = new StringBuilder("^");
for(int i = 0; i < glob.length(); ++i)
{
for (int i = 0; i < glob.length(); ++i) {
final char c = glob.charAt(i);
switch(c)
{
switch (c) {
case '*':
out.append(".*");
break;
Expand All @@ -92,4 +90,8 @@ public static String createRegexFromGlob(String glob)
out.append('$');
return out.toString();
}

public static long toNanoTimeout(String value) {
return (long) (convertToDouble(value) * 1_000_000_000L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,35 @@
import java.util.Collections;
import java.util.List;

import static com.github.fppt.jedismock.Utils.convertToDouble;
import static com.github.fppt.jedismock.Utils.toNanoTimeout;

public abstract class AbstractBPop extends AbstractRedisOperation {

private final Object lock;
private final boolean isInTransaction;
protected long timeoutNanos;
protected List<Slice> keys;

protected AbstractBPop(OperationExecutorState state, List<Slice> params) {
super(state.base(), params);
this.lock = state.lock();
this.isInTransaction = state.isTransactionModeOn();
}

@Override
protected void doOptionalWork() {
if (params().size() < 2) {
throw new IndexOutOfBoundsException("require at least 2 params");
}
timeoutNanos = toNanoTimeout(params().get(params().size() - 1).toString());
keys = params().subList(0, params().size() - 1);
}

protected abstract Slice popper(List<Slice> params);

protected abstract AbstractRedisOperation getSize(RedisBase base, List<Slice> params);

protected Slice response() {
int size = params().size();
if (size < 2) {
throw new IndexOutOfBoundsException("require at least 2 params");
}
List<Slice> keys = params().subList(0, size - 1);
long timeoutNanos = (long) (convertToDouble(params().get(size - 1).toString()) * 1_000_000_000L);

if (timeoutNanos < 0) {
throw new IllegalArgumentException("ERR timeout is negative");
}
Expand All @@ -57,10 +61,10 @@ protected Slice response() {
Thread.currentThread().interrupt();
return Response.NULL;
}
if (source != null) {
return popper(Collections.singletonList(source));
if (source == null) {
return Response.NULL_ARRAY;
} else {
return Response.NULL;
return popper(Collections.singletonList(source));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import java.util.Arrays;
import java.util.List;

import static com.github.fppt.jedismock.Utils.convertToDouble;
import static com.github.fppt.jedismock.Utils.toNanoTimeout;

@RedisCommand("brpoplpush")
class BRPopLPush extends RPopLPush {
Expand All @@ -25,7 +25,7 @@ class BRPopLPush extends RPopLPush {

protected void doOptionalWork() {
Slice source = params().get(0);
long timeoutNanos = (long) (convertToDouble(params().get(2).toString()) * 1_000_000_000L);
long timeoutNanos = toNanoTimeout(params().get(2).toString());

if (timeoutNanos < 0) {
throw new IllegalArgumentException("ERR timeout is negative");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
import com.github.fppt.jedismock.datastructures.RMZSet;
import com.github.fppt.jedismock.datastructures.Slice;
import com.github.fppt.jedismock.datastructures.ZSetEntry;
import com.github.fppt.jedismock.storage.RedisBase;
import com.github.fppt.jedismock.storage.OperationExecutorState;

import java.util.List;

abstract class AbstractZDiff extends ZStore {

AbstractZDiff(RedisBase base, List<Slice> params) {
super(base, params);
AbstractZDiff(OperationExecutorState state, List<Slice> params) {
super(state, params);
}

protected RMZSet getResult(RMZSet zset1, RMZSet zset2, double weight) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
import com.github.fppt.jedismock.datastructures.RMZSet;
import com.github.fppt.jedismock.datastructures.Slice;
import com.github.fppt.jedismock.datastructures.ZSetEntry;
import com.github.fppt.jedismock.storage.RedisBase;
import com.github.fppt.jedismock.storage.OperationExecutorState;

import java.util.List;

abstract class AbstractZInter extends ZStore {

AbstractZInter(RedisBase base, List<Slice> params) {
super(base, params);
AbstractZInter(OperationExecutorState state, List<Slice> params) {
super(state, params);
}

protected RMZSet getResult(RMZSet zset1, RMZSet zset2, double weight) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
import com.github.fppt.jedismock.datastructures.RMZSet;
import com.github.fppt.jedismock.datastructures.Slice;
import com.github.fppt.jedismock.datastructures.ZSetEntry;
import com.github.fppt.jedismock.storage.RedisBase;
import com.github.fppt.jedismock.storage.OperationExecutorState;

import java.util.List;

abstract class AbstractZUnion extends ZStore {

AbstractZUnion(RedisBase base, List<Slice> params) {
super(base, params);
AbstractZUnion(OperationExecutorState state, List<Slice> params) {
super(state, params);
}

protected RMZSet getResult(RMZSet zset1, RMZSet zset2, double weight) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.github.fppt.jedismock.operations.sortedsets;

import com.github.fppt.jedismock.datastructures.Slice;
import com.github.fppt.jedismock.exception.ArgumentException;
import com.github.fppt.jedismock.operations.RedisCommand;
import com.github.fppt.jedismock.storage.OperationExecutorState;

import java.util.ArrayList;
import java.util.List;

import static com.github.fppt.jedismock.Utils.toNanoTimeout;

@RedisCommand("bzmpop")
public class BZMPop extends BZPop {
private Slice numKeys;

BZMPop(OperationExecutorState state, List<Slice> params) {
super(state, params);

}

@Override
protected void doOptionalWork(){
if (params().size() < 4) {
throw new ArgumentException("ERR wrong number of arguments for 'bzmpop' command");
}
timeoutNanos = toNanoTimeout(params().get(0).toString());
params().remove(0);
ZMPop zmPop = new ZMPop(base(), new ArrayList<>(params()));
numKeys = params().remove(0);
keys = zmPop.parseArgs();
keys.remove(0);
}

@Override
protected Slice popper(List<Slice> params) {
List<Slice> newParams = new ArrayList<>(params());
newParams.add(0, numKeys);
ZMPop zmPop = new ZMPop(base(), newParams);
return zmPop.response();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.github.fppt.jedismock.operations.sortedsets;

import com.github.fppt.jedismock.datastructures.Slice;
import com.github.fppt.jedismock.operations.AbstractBPop;
import com.github.fppt.jedismock.operations.AbstractRedisOperation;
import com.github.fppt.jedismock.storage.OperationExecutorState;
import com.github.fppt.jedismock.storage.RedisBase;

import java.util.List;

abstract class BZPop extends AbstractBPop {

BZPop(OperationExecutorState state, List<Slice> params) {
super(state, params);
}

@Override
protected AbstractRedisOperation getSize(RedisBase base, List<Slice> params) {
return new ZCard(base(), params);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.github.fppt.jedismock.operations.sortedsets;

import com.github.fppt.jedismock.datastructures.Slice;
import com.github.fppt.jedismock.operations.RedisCommand;
import com.github.fppt.jedismock.server.Response;
import com.github.fppt.jedismock.storage.OperationExecutorState;

import java.util.Arrays;
import java.util.List;

import static com.github.fppt.jedismock.Utils.toNanoTimeout;

@RedisCommand("bzpopmax")
public class BZPopMax extends BZPop {

BZPopMax(OperationExecutorState state, List<Slice> params) {
super(state, params);
timeoutNanos = toNanoTimeout(params().get(params().size() - 1).toString());
}

@Override
protected Slice popper(List<Slice> params) {
List<Slice> result = new ZPop(base(), params, true).pop();
return Response.array(Arrays.asList(Response.bulkString(params.get(0)), result.get(0), result.get(1)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.github.fppt.jedismock.operations.sortedsets;

import com.github.fppt.jedismock.datastructures.Slice;
import com.github.fppt.jedismock.operations.RedisCommand;
import com.github.fppt.jedismock.server.Response;
import com.github.fppt.jedismock.storage.OperationExecutorState;

import java.util.Arrays;
import java.util.List;

import static com.github.fppt.jedismock.Utils.toNanoTimeout;

@RedisCommand("bzpopmin")
public class BZPopMin extends BZPop {

BZPopMin(OperationExecutorState state, List<Slice> params) {
super(state, params);
timeoutNanos = toNanoTimeout(params().get(params().size() - 1).toString());
}

@Override
protected Slice popper(List<Slice> params) {
List<Slice> result = new ZPop(base(), params, false).pop();
return Response.array(Arrays.asList(Response.bulkString(params.get(0)), result.get(0), result.get(1)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import com.github.fppt.jedismock.exception.ArgumentException;
import com.github.fppt.jedismock.operations.RedisCommand;
import com.github.fppt.jedismock.server.Response;
import com.github.fppt.jedismock.storage.RedisBase;
import com.github.fppt.jedismock.storage.OperationExecutorState;

import java.util.EnumSet;
import java.util.Iterator;
Expand All @@ -24,14 +24,16 @@ class ZAdd extends AbstractByScoreOperation {
enum Options {
XX, NX, LT, GT, CH, INCR
}
private final Object lock;

private final EnumSet<Options> options = EnumSet.noneOf(Options.class);

private int countAdd = 0;
private int countChange = 0;

ZAdd(RedisBase base, List<Slice> params) {
super(base, params);
ZAdd(OperationExecutorState state, List<Slice> params) {
super(state.base(), params);
this.lock = state.lock();
}

@Override
Expand Down Expand Up @@ -69,7 +71,7 @@ private Slice incr() {
if (countChange + countAdd > 0) {
mapDBObj.put(member, newScore);
base().putValue(key, mapDBObj);

lock.notifyAll();
return Response.bulkString(Slice.create(String.valueOf(newScore)));
}
}
Expand All @@ -96,6 +98,7 @@ private Slice adding() {
}
if (countAdd + countChange > 0) {
base().putValue(key, mapDBObj);
lock.notifyAll();
}
return options.contains(CH) ? Response.integer(countAdd + countChange) :
Response.integer(countAdd);
Expand Down Expand Up @@ -129,12 +132,16 @@ private void updateValue(RMZSet mapDBObj, Slice value, double newScore) {

private void parseParams() {
Iterator<Slice> i = params().iterator();
while (i.hasNext()) {
i.next();
boolean quit = false;
while (i.hasNext() && !quit) {
String opt = i.next().toString();
quit = true;
for (Options value : Options.values()) {
if (value.toString().equalsIgnoreCase(opt)) {
options.add(value);
i.remove();
quit = false;
break;
}
}
Expand Down
Loading

0 comments on commit adb7c06

Please sign in to comment.