Skip to content

Commit

Permalink
#766 Improve read performance of blob by reading directly into arrays
Browse files Browse the repository at this point in the history
When possible, we ignore the blobBufferSize.
  • Loading branch information
mrotteveel committed Sep 28, 2023
1 parent c454fa7 commit 7dba364
Show file tree
Hide file tree
Showing 24 changed files with 773 additions and 291 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@
import org.firebirdsql.gds.ng.FbExceptionBuilder;
import org.firebirdsql.gds.ng.LockCloseable;
import org.firebirdsql.gds.ng.listeners.DatabaseListener;
import org.firebirdsql.jdbc.SQLStateConstants;
import org.firebirdsql.jna.fbclient.FbClientLibrary;
import org.firebirdsql.jna.fbclient.ISC_STATUS;

import java.nio.ByteBuffer;
import java.sql.SQLException;
import java.sql.SQLNonTransientException;
import java.util.Objects;

import static org.firebirdsql.gds.JaybirdErrorCodes.jb_blobGetSegmentNegative;
import static org.firebirdsql.gds.JaybirdErrorCodes.jb_blobPutSegmentEmpty;
Expand Down Expand Up @@ -160,34 +163,16 @@ public final boolean isOutput() {

@Override
public byte[] getSegment(int sizeRequested) throws SQLException {
try {
try (LockCloseable ignored = withLock()) {
if (sizeRequested <= 0) {
throw FbExceptionBuilder.forException(jb_blobGetSegmentNegative)
.messageParameter(sizeRequested)
.toSQLException();
}
// TODO Honour request for larger sizes by looping?
sizeRequested = Math.min(sizeRequested, getMaximumSegmentSize());
final ByteBuffer responseBuffer;
final ShortByReference actualLength = new ShortByReference();
try (LockCloseable ignored = withLock()) {
checkDatabaseAttached();
checkTransactionActive();
checkBlobOpen();
responseBuffer = getByteBuffer(sizeRequested);

clientLibrary.isc_get_segment(statusVector, getJnaHandle(), actualLength, (short) sizeRequested,
responseBuffer);
final int status = statusVector[1].intValue();
// status 0 means: more to come, isc_segment means: buffer was too small, rest will be returned on next call
if (status == ISCConstants.isc_segstr_eof) {
setEof();
} else if (!(status == 0 || status == ISCConstants.isc_segment)) {
processStatusVector();
}
}
final int actualLengthInt = ((int) actualLength.getValue()) & 0xFFFF;
final byte[] segment = new byte[actualLengthInt];
checkDatabaseAttached();
ShortByReference actualLength = new ShortByReference();
ByteBuffer responseBuffer = getSegment0(sizeRequested, actualLength);
byte[] segment = new byte[actualLength.getValue() & 0xFFFF];
responseBuffer.get(segment);
return segment;
} catch (SQLException e) {
Expand All @@ -196,6 +181,55 @@ public byte[] getSegment(int sizeRequested) throws SQLException {
}
}

private ByteBuffer getSegment0(int sizeRequested, ShortByReference actualLength) throws SQLException {
checkTransactionActive();
checkBlobOpen();
sizeRequested = Math.min(sizeRequested, getMaximumSegmentSize());
ByteBuffer responseBuffer = getByteBuffer(sizeRequested);
clientLibrary.isc_get_segment(statusVector, getJnaHandle(), actualLength, (short) sizeRequested,
responseBuffer);
int status = statusVector[1].intValue();
// status 0 means: more to come, isc_segment means: buffer was too small, rest will be returned on next call
if (status == ISCConstants.isc_segstr_eof) {
setEof();
} else if (!(status == 0 || status == ISCConstants.isc_segment)) {
processStatusVector();
}
return responseBuffer;
}

@Override
public int get(final byte[] buf, final int pos, final int len) throws SQLException {
try (LockCloseable ignored = withLock()) {
try {
Objects.checkFromIndexSize(pos, len, Objects.requireNonNull(buf, "buf").length);
} catch (IndexOutOfBoundsException e) {
throw new SQLNonTransientException(e.toString(), SQLStateConstants.SQL_STATE_INVALID_STRING_LENGTH);
}
if (len == 0) return 0;
checkDatabaseAttached();
ShortByReference actualLength = new ShortByReference();
int count = 0;
while (count < len && !isEof()) {
// We honor the configured buffer size unless we somehow already allocated a bigger buffer earlier
ByteBuffer segmentBuffer = getSegment0(
Math.min(len - count, Math.max(getBlobBufferSize(), currentBufferCapacity())),
actualLength);
int dataLength = actualLength.getValue() & 0xFFFF;
segmentBuffer.get(buf, pos + count, dataLength);
count += dataLength;
}
return count;
} catch (SQLException e) {
exceptionListenerDispatcher.errorOccurred(e);
throw e;
}
}

private int getBlobBufferSize() throws SQLException {
return getDatabase().getConnectionProperties().getBlobBufferSize();
}

@Override
public void putSegment(byte[] segment) throws SQLException {
try {
Expand Down Expand Up @@ -290,11 +324,18 @@ private void processStatusVector() throws SQLException {
}

private ByteBuffer getByteBuffer(int requiredSize) {
ByteBuffer byteBuffer = this.byteBuffer;
if (byteBuffer == null || byteBuffer.capacity() < requiredSize) {
byteBuffer = ByteBuffer.allocateDirect(requiredSize);
} else {
byteBuffer.clear();
// Allocate buffer in increments of 512
return this.byteBuffer = ByteBuffer.allocateDirect((1 + (requiredSize - 1) / 512) * 512);
}
byteBuffer.clear();
return byteBuffer;
}

private int currentBufferCapacity() {
ByteBuffer byteBuffer = this.byteBuffer;
return byteBuffer != null ? byteBuffer.capacity() : 0;
}

}
23 changes: 23 additions & 0 deletions src/docs/asciidoc/release_notes.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,29 @@ We have now addressed this inconsistency, by also introducing support for settin
* `setObject(..., localTime)` sets a `LocalDateTime` derived as `LocalDate.EPOCH.atTime(localTime)` (i.e. on 1970-01-01)
* `setObject(..., localDate)` sets a `LocalDateTime` derived as `localDate.atStartOfDay()` (i.e. at 00:00:00)
[#blob-performance]
=== Performance improvements blobs

[#blob-performance-read]
==== Reading blobs

Performance of reading blobs was improved, especially when using `ResultSet.getBytes`, `Blob.getBytes`, `ResultSet.getString` or reading from a blob input stream with `read(byte[], int, int)` and similar methods with a byte array and requested length greater than 50% of the configured `blobBufferSize`.

Testing on a local network (Wi-Fi) shows an increase in throughput of roughly 50-100% for reading large blobs with the default `blobBufferSize` of 16384.

These throughput improvements were only realised in the pure Java protocol, because there we had the opportunity to avoid all additional allocations by writing directly from the network stream into the destination byte array, and this allows us to ignore the configured `blobBufferSize` and use up to the maximum request size of 64KiB - 1 instead.

This is not possible for the JNA-based protocols (native/embedded), as the implementation requires a direct byte buffer to bridge to the native API, and thus we can't ignore the `blobBufferSize`.
We were able to realise some other optimizations (in both pure Java and JNA), by avoiding allocation of a number of intermediate objects, but this has only marginal effects on the throughput.

Similar improvements may follow for writes.

[#blob-performance-min-buf]
==== Minimum `blobBufferSize` 512 bytes

As part of the performance improvements, a minimum `blobBufferSize` of 512 bytes was introduced.
Configuring values less than 512 will be ignored and use 512 instead.

// TODO add major changes

[#potentially-breaking-changes]
Expand Down
50 changes: 40 additions & 10 deletions src/jna-test/org/firebirdsql/gds/ng/jna/JnaBlobTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.ByteArrayOutputStream;
import java.sql.SQLException;
Expand Down Expand Up @@ -63,13 +65,14 @@ class JnaBlobTest extends BaseTestBlob {
/**
* Tests retrieval of a blob (what goes in is what comes out).
*/
@Test
void testInputBlobRetrieval() throws Exception {
@ParameterizedTest
@ValueSource(booleans = { true, false })
void testInputBlobRetrieval(boolean useStreamBlobs) throws Exception {
final int testId = 1;
final byte[] baseContent = generateBaseContent();
// Use sufficiently large value so that multiple segments are used
final int requiredSize = 4 * Short.MAX_VALUE;
populateBlob(testId, baseContent, requiredSize);
populateBlob(testId, baseContent, requiredSize, useStreamBlobs);

try (FbDatabase db = createDatabaseConnection()) {
try {
Expand All @@ -84,8 +87,36 @@ void testInputBlobRetrieval() throws Exception {
blob.close();
statement.close();
byte[] result = bos.toByteArray();
assertEquals(requiredSize, result.length, "Unexpected length read from blob");
assertTrue(validateBlobContent(result, baseContent, requiredSize), "Unexpected blob content");
assertBlobContent(result, baseContent, requiredSize);
} finally {
if (transaction != null) transaction.commit();
}
}
}

/**
* Tests retrieval of a blob (what goes in is what comes out).
*/
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testBlobGet(boolean useStreamBlobs) throws Exception {
final int testId = 1;
final byte[] baseContent = generateBaseContent();
// Use sufficiently large value so that multiple roundtrips are used
final int requiredSize = 4 * Short.MAX_VALUE;
populateBlob(testId, baseContent, requiredSize, useStreamBlobs);

try (FbDatabase db = createDatabaseConnection()) {
try {
long blobId = getBlobId(testId, db);

FbBlob blob = db.createBlobForInput(transaction, blobId);
blob.open();
byte[] result = new byte[requiredSize];
blob.get(result, 0, requiredSize);
blob.close();
statement.close();
assertBlobContent(result, baseContent, requiredSize);
} finally {
if (transaction != null) transaction.commit();
}
Expand All @@ -101,7 +132,7 @@ void testInputBlobSeek_segmented() throws Exception {
final byte[] baseContent = generateBaseContent();
// Use sufficiently large value so that multiple segments are used
final int requiredSize = 4 * Short.MAX_VALUE;
populateBlob(testId, baseContent, requiredSize);
populateSegmentedBlob(testId, baseContent, requiredSize);

try (JnaDatabase db = createDatabaseConnection()) {
try {
Expand Down Expand Up @@ -165,7 +196,7 @@ void testInputBlobReopen() throws Exception {
final int testId = 1;
final byte[] baseContent = generateBaseContent();
final int requiredSize = 256;
populateBlob(testId, baseContent, requiredSize);
populateSegmentedBlob(testId, baseContent, requiredSize);

try (JnaDatabase db = createDatabaseConnection()) {
try {
Expand All @@ -188,8 +219,7 @@ void testInputBlobReopen() throws Exception {

statement.close();
byte[] result = bos.toByteArray();
assertEquals(requiredSize, result.length, "Unexpected length read from blob");
assertTrue(validateBlobContent(result, baseContent, requiredSize), "Unexpected blob content");
assertBlobContent(result, baseContent, requiredSize);
} finally {
if (transaction != null) transaction.commit();
}
Expand All @@ -204,7 +234,7 @@ void testInputBlobDoubleOpen() throws Exception {
final int testId = 1;
final byte[] baseContent = generateBaseContent();
final int requiredSize = 256;
populateBlob(testId, baseContent, requiredSize);
populateSegmentedBlob(testId, baseContent, requiredSize);

try (JnaDatabase db = createDatabaseConnection()) {
try {
Expand Down
21 changes: 21 additions & 0 deletions src/main/org/firebirdsql/gds/VaxEncoding.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
*/
package org.firebirdsql.gds;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

/**
Expand Down Expand Up @@ -243,4 +245,23 @@ public static void encodeVaxInteger2WithoutLength(OutputStream out, int val) thr
out.write(val);
out.write(val >> 8);
}

/**
* Decodes an integer using two byte Vax encoding from an input stream, without length prefix.
*
* @param in
* input stream to read
* @return decoded value
* @throws IOException
* for errors reading from the stream, or if end-of-stream was reached before the full integer
* @since 6
*/
public static int decodeVaxInteger2WithoutLength(InputStream in) throws IOException {
int ch1 = in.read();
int ch2 = in.read();
if ((ch1 | ch2) < 0)
throw new EOFException();
return ch1 | (ch2 << 8);
}

}
17 changes: 16 additions & 1 deletion src/main/org/firebirdsql/gds/ng/AbstractFbBlob.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public abstract class AbstractFbBlob implements FbBlob, TransactionListener, Dat

protected final ExceptionListenerDispatcher exceptionListenerDispatcher = new ExceptionListenerDispatcher(this);
private final BlobParameterBuffer blobParameterBuffer;
private final int maximumSegmentSize;
private FbTransaction transaction;
private FbDatabase database;
private boolean open;
Expand All @@ -50,6 +51,7 @@ protected AbstractFbBlob(FbDatabase database, FbTransaction transaction, BlobPar
this.database = database;
this.transaction = transaction;
this.blobParameterBuffer = blobParameterBuffer;
maximumSegmentSize = maximumSegmentSize(database);
transaction.addWeakTransactionListener(this);
}

Expand Down Expand Up @@ -401,7 +403,20 @@ protected BlobLengthProcessor createBlobLengthProcessor() {

@Override
public int getMaximumSegmentSize() {
// TODO Max size in FB 3 is 2^16, not 2^15 - 1, is that for all versions, or only for newer protocols?
return maximumSegmentSize;
}

private static int maximumSegmentSize(FbDatabase db) {
// Max size in FB 2.1 and higher is 2^16 - 1, not 2^15 - 3 (IB 6 docs mention max is 32KiB)
if (db != null && (db.getOdsMajor() > 11 || db.getOdsMajor() == 11 && db.getOdsMinor() >= 1)) {
/* ODS 11.1 is Firebird 2.1
NOTE: getSegment can retrieve at most 65533 bytes of blob data as the buffer to receive segments is
max 65535 bytes, but the contents of the buffer are one or more segments prefixed with 2-byte lengths;
putSegment can write max 65535 bytes, because the buffer *is* the segment */
return 65535;
}
// NOTE: This should probably be Short.MAX_VALUE, but we can no longer run the relevant tests on Firebird 2.0
// and older (which aren't supported any way), so we leave this as is
return Short.MAX_VALUE - 2;
}
}
33 changes: 32 additions & 1 deletion src/main/org/firebirdsql/gds/ng/FbBlob.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import org.firebirdsql.gds.ISCConstants;
import org.firebirdsql.gds.ng.listeners.ExceptionListenable;
import org.firebirdsql.jaybird.props.DatabaseConnectionProperties;

import java.sql.SQLException;

Expand Down Expand Up @@ -127,10 +128,40 @@ public interface FbBlob extends ExceptionListenable, AutoCloseable {
*/
byte[] getSegment(int sizeRequested) throws SQLException;

/**
* Reads content from the blob into {@code buf} starting at {@code pos} for {@code len} bytes.
* <p>
* Implementations must read the requested number of bytes ({@code len}), except if end-of-blob is reached before
* the requested number of bytes was reached. The return value of this method is the actual number of bytes read.
* </p>
* <p>
* If the implementation cannot perform reads without additional allocation, it should use at most
* {@link DatabaseConnectionProperties#getBlobBufferSize()} as an internal buffer. If the implementation can
* perform reads without additional allocation, it is recommended it performs reads using (at most)
* {@link #getMaximumSegmentSize()}.
* </p>
* <p>
* Contrary to similar methods like {@link java.io.InputStream#read(byte[], int, int)}, this method returns
* {@code 0} when no bytes were read if end-of-blob is reached without reading any bytes, not {@code -1}.
* </p>
*
* @param buf
* target byte array
* @param pos
* position to start
* @param len
* number of bytes
* @return actual number of bytes read; this will only be less than {@code len} when end-of-blob was reached
* @throws SQLException
* for database access errors, if {@code pos < 0}, {@code len < 0}, or if {@code pos + len > buf.length}
* @since 6
*/
int get(byte[] buf, int pos, int len) throws SQLException;

/**
* Writes a segment of blob data.
* <p>
* Implementation must handle segment length exceeding {@link #getMaximumSegmentSize()} by batching. TODO: reconsider and let caller handle that?
* Implementation must handle segment length exceeding {@link #getMaximumSegmentSize()} by batching.
* </p>
* <p>
* Passing a section that is length 0 will throw an {@code SQLException}.
Expand Down
Loading

0 comments on commit 7dba364

Please sign in to comment.