From be79ce76c64abf87d9f7d360398ba07aae772426 Mon Sep 17 00:00:00 2001 From: rdifalco Date: Tue, 21 Oct 2014 18:45:50 -0700 Subject: [PATCH] Addresses issues #779 and #775. Conflicts: src/main/java/redis/clients/jedis/Connection.java src/main/java/redis/clients/jedis/Protocol.java --- .../java/redis/clients/jedis/Connection.java | 17 +- .../java/redis/clients/jedis/Protocol.java | 70 +++--- .../redis/clients/util/RedisInputStream.java | 204 +++++++++++++----- .../tests/benchmark/ProtocolBenchmark.java | 105 +++++++++ 4 files changed, 299 insertions(+), 97 deletions(-) create mode 100644 src/test/java/redis/clients/jedis/tests/benchmark/ProtocolBenchmark.java diff --git a/src/main/java/redis/clients/jedis/Connection.java b/src/main/java/redis/clients/jedis/Connection.java index 95be5d94cb..35a01130d2 100644 --- a/src/main/java/redis/clients/jedis/Connection.java +++ b/src/main/java/redis/clients/jedis/Connection.java @@ -1,5 +1,12 @@ package redis.clients.jedis; +import redis.clients.jedis.Protocol.Command; +import redis.clients.jedis.exceptions.JedisConnectionException; +import redis.clients.jedis.exceptions.JedisDataException; +import redis.clients.util.RedisInputStream; +import redis.clients.util.RedisOutputStream; +import redis.clients.util.SafeEncoder; + import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; @@ -8,13 +15,6 @@ import java.util.ArrayList; import java.util.List; -import redis.clients.jedis.Protocol.Command; -import redis.clients.jedis.exceptions.JedisConnectionException; -import redis.clients.jedis.exceptions.JedisDataException; -import redis.clients.util.RedisInputStream; -import redis.clients.util.RedisOutputStream; -import redis.clients.util.SafeEncoder; - public class Connection implements Closeable { private String host; private int port = Protocol.DEFAULT_PORT; @@ -176,7 +176,7 @@ public boolean isConnected() { && !socket.isOutputShutdown(); } - protected String getStatusCodeReply() { + public String getStatusCodeReply() { flush(); pipelinedCommands--; final byte[] resp = (byte[]) readProtocolWithCheckingBroken(); @@ -286,4 +286,5 @@ protected Object readProtocolWithCheckingBroken() { throw exc; } } + } diff --git a/src/main/java/redis/clients/jedis/Protocol.java b/src/main/java/redis/clients/jedis/Protocol.java index ad52d56b98..59b5adef58 100644 --- a/src/main/java/redis/clients/jedis/Protocol.java +++ b/src/main/java/redis/clients/jedis/Protocol.java @@ -127,67 +127,61 @@ private static String[] parseTargetHostAndSlot( } private static Object process(final RedisInputStream is) { - try { - byte b = is.readByte(); - if (b == MINUS_BYTE) { - processError(is); - } else if (b == ASTERISK_BYTE) { - return processMultiBulkReply(is); - } else if (b == COLON_BYTE) { - return processInteger(is); - } else if (b == DOLLAR_BYTE) { - return processBulkReply(is); - } else if (b == PLUS_BYTE) { - return processStatusCodeReply(is); - } else { - throw new JedisConnectionException("Unknown reply: " + (char) b); - } - } catch (IOException e) { - throw new JedisConnectionException(e); + + final byte b = is.readByte(); + if (b == PLUS_BYTE) { + return processStatusCodeReply(is); + } else if (b == DOLLAR_BYTE) { + return processBulkReply(is); + } else if (b == ASTERISK_BYTE) { + return processMultiBulkReply(is); + } else if (b == COLON_BYTE) { + return processInteger(is); + } else if (b == MINUS_BYTE) { + processError(is); + return null; + } else { + throw new JedisConnectionException("Unknown reply: " + (char) b); } - return null; } private static byte[] processStatusCodeReply(final RedisInputStream is) { - return SafeEncoder.encode(is.readLine()); + return is.readLineBytes(); } private static byte[] processBulkReply(final RedisInputStream is) { - int len = Integer.parseInt(is.readLine()); + final int len = is.readIntCrLf(); if (len == -1) { return null; } - byte[] read = new byte[len]; + + final byte[] read = new byte[len]; int offset = 0; - try { - while (offset < len) { - int size = is.read(read, offset, (len - offset)); - if (size == -1) - throw new JedisConnectionException( - "It seems like server has closed the connection."); - offset += size; - } - // read 2 more bytes for the command delimiter - is.readByte(); - is.readByte(); - } catch (IOException e) { - throw new JedisConnectionException(e); + while (offset < len) { + final int size = is.read(read, offset, (len - offset)); + if (size == -1) + throw new JedisConnectionException( + "It seems like server has closed the connection."); + offset += size; } + // read 2 more bytes for the command delimiter + is.readByte(); + is.readByte(); + return read; } private static Long processInteger(final RedisInputStream is) { - String num = is.readLine(); - return Long.valueOf(num); + return is.readLongCrLf(); } private static List processMultiBulkReply(final RedisInputStream is) { - int num = Integer.parseInt(is.readLine()); + final int num = is.readIntCrLf(); if (num == -1) { return null; } - List ret = new ArrayList(num); + final List ret = new ArrayList(num); for (int i = 0; i < num; i++) { try { ret.add(process(is)); diff --git a/src/main/java/redis/clients/util/RedisInputStream.java b/src/main/java/redis/clients/util/RedisInputStream.java index 5ac8c94202..beae73b883 100644 --- a/src/main/java/redis/clients/util/RedisInputStream.java +++ b/src/main/java/redis/clients/util/RedisInputStream.java @@ -16,12 +16,15 @@ package redis.clients.util; -import java.io.FilterInputStream; -import java.io.IOException; -import java.io.InputStream; +import java.io.*; import redis.clients.jedis.exceptions.JedisConnectionException; +/** + * This class assumes (to some degree) that we are reading a RESP stream. As such it assumes + * certain conventions regarding CRLF line termination. It also assumes that if the Protocol + * layer requires a byte that if that byte is not there it is a stream error. + */ public class RedisInputStream extends FilterInputStream { protected final byte buf[]; @@ -40,73 +43,172 @@ public RedisInputStream(InputStream in) { this(in, 8192); } - public byte readByte() throws IOException { - if (count == limit) { - fill(); - } - + public byte readByte() throws JedisConnectionException { + ensureFill(); return buf[count++]; } public String readLine() { - int b; - byte c; - StringBuilder sb = new StringBuilder(); - - try { - while (true) { - if (count == limit) { - fill(); - } - if (limit == -1) - break; + final StringBuilder sb = new StringBuilder(); + while (true) { + ensureFill(); + + byte b = buf[count++]; + if (b == '\r') { + ensureFill(); // Must be one more byte - b = buf[count++]; - if (b == '\r') { - if (count == limit) { - fill(); - } - - if (limit == -1) { - sb.append((char) b); - break; - } - - c = buf[count++]; - if (c == '\n') { - break; - } - sb.append((char) b); - sb.append((char) c); - } else { - sb.append((char) b); + byte c = buf[count++]; + if (c == '\n') { + break; } + sb.append((char) b); + sb.append((char) c); + } else { + sb.append((char) b); } - } catch (IOException e) { - throw new JedisConnectionException(e); } - String reply = sb.toString(); + + final String reply = sb.toString(); if (reply.length() == 0) { - throw new JedisConnectionException( - "It seems like server has closed the connection."); + throw new JedisConnectionException("It seems like server has closed the connection."); } + return reply; } - public int read(byte[] b, int off, int len) throws IOException { - if (count == limit) { - fill(); - if (limit == -1) - return -1; + public byte[] readLineBytes() { + + /* This operation should only require one fill. In that typical + case we optimize allocation and copy of the byte array. In the + edge case where more than one fill is required then we take a + slower path and expand a byte array output stream as is + necessary. */ + + ensureFill(); + + int pos = count; + final byte[] buf = this.buf; + while (true) { + if (pos == limit) { + return readLineBytesSlowly(); + } + + if (buf[pos++] == '\r') { + if (pos == limit) { + return readLineBytesSlowly(); + } + + if (buf[pos++] == '\n') { + break; + } + } + } + + final int N = (pos - count) - 2; + final byte[] line = new byte[N]; + System.arraycopy(buf, count, line, 0, N); + count = pos; + return line; + } + + /** + * Slow path in case a line of bytes cannot be read in one #fill() operation. This is still faster + * than creating the StrinbBuilder, String, then encoding as byte[] in Protocol, then decoding back + * into a String. + */ + private byte[] readLineBytesSlowly() { + ByteArrayOutputStream bout = null; + while (true) { + ensureFill(); + + byte b = buf[count++]; + if (b == '\r') { + ensureFill(); // Must be one more byte + + byte c = buf[count++]; + if (c == '\n') { + break; + } + + if (bout == null) { + bout = new ByteArrayOutputStream(16); + } + + bout.write(b); + bout.write(c); + } else { + if (bout == null) { + bout = new ByteArrayOutputStream(16); + } + + bout.write(b); + } + } + + return bout == null ? new byte[0] : bout.toByteArray(); + } + + public int readIntCrLf() { + return (int)readLongCrLf(); + } + + public long readLongCrLf() { + final byte[] buf = this.buf; + + ensureFill(); + + final boolean isNeg = buf[count] == '-'; + if (isNeg) { + ++count; } + + long value = 0; + while (true) { + ensureFill(); + + final int b = buf[count++]; + if (b == '\r') { + ensureFill(); + + if (buf[count++] != '\n') { + throw new JedisConnectionException("Unexpected character!"); + } + + break; + } + else { + value = value * 10 + b - '0'; + } + } + + return (isNeg ? -value : value); + } + + public int read(byte[] b, int off, int len) throws JedisConnectionException { + ensureFill(); + final int length = Math.min(limit - count, len); System.arraycopy(buf, count, b, off, length); count += length; return length; } - private void fill() throws IOException { - limit = in.read(buf); - count = 0; + /** + * This methods assumes there are required bytes to be read. If we cannot read + * anymore bytes an exception is thrown to quickly ascertain that the stream + * was smaller than expected. + */ + private void ensureFill() throws JedisConnectionException { + if (count >= limit) { + try { + limit = in.read(buf); + count = 0; + if (limit == -1) { + throw new JedisConnectionException("Unexpected end of stream."); + } + } catch (IOException e) { + throw new JedisConnectionException(e); + } + } } } diff --git a/src/test/java/redis/clients/jedis/tests/benchmark/ProtocolBenchmark.java b/src/test/java/redis/clients/jedis/tests/benchmark/ProtocolBenchmark.java new file mode 100644 index 0000000000..eacdfa9bc7 --- /dev/null +++ b/src/test/java/redis/clients/jedis/tests/benchmark/ProtocolBenchmark.java @@ -0,0 +1,105 @@ +package redis.clients.jedis.tests.benchmark; + +import redis.clients.jedis.Protocol; +import redis.clients.util.RedisInputStream; +import redis.clients.util.RedisOutputStream; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.TimeUnit; + +/** + * Copyright (c) 2014 + */ +public class ProtocolBenchmark { + private static final int TOTAL_OPERATIONS = 500000; + + public static void main(String[] args) throws Exception, + IOException { + long total = 0; + for (int at = 0; at != 10; ++at) { + long elapsed = measureInputMulti(); + long ops = ((1000 * 2 * TOTAL_OPERATIONS) / TimeUnit.NANOSECONDS.toMillis(elapsed)); + if (at >= 5) { + total += ops; + } + } + System.out.println((total / 5) + " avg"); + + total = 0; + for (int at = 0; at != 10; ++at) { + long elapsed = measureInputStatus(); + long ops = ((1000 * 2 * TOTAL_OPERATIONS) / TimeUnit.NANOSECONDS.toMillis(elapsed)); + if (at >= 5) { + total += ops; + } + } + + System.out.println((total / 5) + " avg"); + + total = 0; + for (int at = 0; at != 10; ++at) { + long elapsed = measureCommand(); + long ops = ((1000 * 2 * TOTAL_OPERATIONS) / TimeUnit.NANOSECONDS.toMillis(elapsed)); + if (at >= 5) { + total += ops; + } + } + + System.out.println((total / 5) + " avg"); + } + + private static long measureInputMulti() throws Exception { + long duration = 0; + + InputStream is = new ByteArrayInputStream( + "*4\r\n$3\r\nfoo\r\n$13\r\nbarbarbarfooz\r\n$5\r\nHello\r\n$5\r\nWorld\r\n" + .getBytes()); + + RedisInputStream in = new RedisInputStream(is); + for (int n = 0; n <= TOTAL_OPERATIONS; n++) { + long start = System.nanoTime(); + Protocol.read(in); + duration += (System.nanoTime() - start); + in.reset(); + } + + return duration; + } + + private static long measureInputStatus() throws Exception { + long duration = 0; + + InputStream is = new ByteArrayInputStream( + "+OK\r\n" + .getBytes()); + + RedisInputStream in = new RedisInputStream(is); + for (int n = 0; n <= TOTAL_OPERATIONS; n++) { + long start = System.nanoTime(); + Protocol.read(in); + duration += (System.nanoTime() - start); + in.reset(); + } + + return duration; + } + + private static long measureCommand() throws Exception { + long duration = 0; + + byte[] KEY = "123456789".getBytes(); + byte[] VAL = "FooBar".getBytes(); + + for (int n = 0; n <= TOTAL_OPERATIONS; n++) { + RedisOutputStream out = new RedisOutputStream(new ByteArrayOutputStream(8192)); + long start = System.nanoTime(); + Protocol.sendCommand(out, Protocol.Command.SET, KEY, VAL); + duration += (System.nanoTime() - start); + } + + return duration; + } +}