diff --git a/jaybird-native/src/main/java/org/firebirdsql/gds/ng/jna/JnaBlob.java b/jaybird-native/src/main/java/org/firebirdsql/gds/ng/jna/JnaBlob.java index 229320b78..6feecdcbb 100644 --- a/jaybird-native/src/main/java/org/firebirdsql/gds/ng/jna/JnaBlob.java +++ b/jaybird-native/src/main/java/org/firebirdsql/gds/ng/jna/JnaBlob.java @@ -28,11 +28,14 @@ import org.firebirdsql.gds.ng.FbExceptionBuilder; import org.firebirdsql.gds.ng.LockCloseable; import org.firebirdsql.gds.ng.listeners.DatabaseListener; +import org.firebirdsql.jdbc.SQLStateConstants; import org.firebirdsql.jna.fbclient.FbClientLibrary; import org.firebirdsql.jna.fbclient.ISC_STATUS; import java.nio.ByteBuffer; import java.sql.SQLException; +import java.sql.SQLNonTransientException; +import java.util.Objects; import static org.firebirdsql.gds.JaybirdErrorCodes.jb_blobGetSegmentNegative; import static org.firebirdsql.gds.JaybirdErrorCodes.jb_blobPutSegmentEmpty; @@ -160,34 +163,16 @@ public final boolean isOutput() { @Override public byte[] getSegment(int sizeRequested) throws SQLException { - try { + try (LockCloseable ignored = withLock()) { if (sizeRequested <= 0) { throw FbExceptionBuilder.forException(jb_blobGetSegmentNegative) .messageParameter(sizeRequested) .toSQLException(); } - // TODO Honour request for larger sizes by looping? - sizeRequested = Math.min(sizeRequested, getMaximumSegmentSize()); - final ByteBuffer responseBuffer; - final ShortByReference actualLength = new ShortByReference(); - try (LockCloseable ignored = withLock()) { - checkDatabaseAttached(); - checkTransactionActive(); - checkBlobOpen(); - responseBuffer = getByteBuffer(sizeRequested); - - clientLibrary.isc_get_segment(statusVector, getJnaHandle(), actualLength, (short) sizeRequested, - responseBuffer); - final int status = statusVector[1].intValue(); - // status 0 means: more to come, isc_segment means: buffer was too small, rest will be returned on next call - if (status == ISCConstants.isc_segstr_eof) { - setEof(); - } else if (!(status == 0 || status == ISCConstants.isc_segment)) { - processStatusVector(); - } - } - final int actualLengthInt = ((int) actualLength.getValue()) & 0xFFFF; - final byte[] segment = new byte[actualLengthInt]; + checkDatabaseAttached(); + ShortByReference actualLength = new ShortByReference(); + ByteBuffer responseBuffer = getSegment0(sizeRequested, actualLength); + byte[] segment = new byte[actualLength.getValue() & 0xFFFF]; responseBuffer.get(segment); return segment; } catch (SQLException e) { @@ -196,6 +181,55 @@ public byte[] getSegment(int sizeRequested) throws SQLException { } } + private ByteBuffer getSegment0(int sizeRequested, ShortByReference actualLength) throws SQLException { + checkTransactionActive(); + checkBlobOpen(); + sizeRequested = Math.min(sizeRequested, getMaximumSegmentSize()); + ByteBuffer responseBuffer = getByteBuffer(sizeRequested); + clientLibrary.isc_get_segment(statusVector, getJnaHandle(), actualLength, (short) sizeRequested, + responseBuffer); + int status = statusVector[1].intValue(); + // status 0 means: more to come, isc_segment means: buffer was too small, rest will be returned on next call + if (status == ISCConstants.isc_segstr_eof) { + setEof(); + } else if (!(status == 0 || status == ISCConstants.isc_segment)) { + processStatusVector(); + } + return responseBuffer; + } + + @Override + public int get(final byte[] buf, final int pos, final int len) throws SQLException { + try (LockCloseable ignored = withLock()) { + try { + Objects.checkFromIndexSize(pos, len, Objects.requireNonNull(buf, "buf").length); + } catch (IndexOutOfBoundsException e) { + throw new SQLNonTransientException(e.toString(), SQLStateConstants.SQL_STATE_INVALID_STRING_LENGTH); + } + if (len == 0) return 0; + checkDatabaseAttached(); + ShortByReference actualLength = new ShortByReference(); + int count = 0; + while (count < len && !isEof()) { + // We honor the configured buffer size unless we somehow already allocated a bigger buffer earlier + ByteBuffer segmentBuffer = getSegment0( + Math.min(len - count, Math.max(getBlobBufferSize(), currentBufferCapacity())), + actualLength); + int dataLength = actualLength.getValue() & 0xFFFF; + segmentBuffer.get(buf, pos + count, dataLength); + count += dataLength; + } + return count; + } catch (SQLException e) { + exceptionListenerDispatcher.errorOccurred(e); + throw e; + } + } + + private int getBlobBufferSize() throws SQLException { + return getDatabase().getConnectionProperties().getBlobBufferSize(); + } + @Override public void putSegment(byte[] segment) throws SQLException { try { @@ -290,11 +324,18 @@ private void processStatusVector() throws SQLException { } private ByteBuffer getByteBuffer(int requiredSize) { + ByteBuffer byteBuffer = this.byteBuffer; if (byteBuffer == null || byteBuffer.capacity() < requiredSize) { - byteBuffer = ByteBuffer.allocateDirect(requiredSize); - } else { - byteBuffer.clear(); + // Allocate buffer in increments of 512 + return this.byteBuffer = ByteBuffer.allocateDirect((1 + (requiredSize - 1) / 512) * 512); } + byteBuffer.clear(); return byteBuffer; } + + private int currentBufferCapacity() { + ByteBuffer byteBuffer = this.byteBuffer; + return byteBuffer != null ? byteBuffer.capacity() : 0; + } + } diff --git a/src/docs/asciidoc/release_notes.adoc b/src/docs/asciidoc/release_notes.adoc index df9968fcb..baa3283e0 100644 --- a/src/docs/asciidoc/release_notes.adoc +++ b/src/docs/asciidoc/release_notes.adoc @@ -772,6 +772,29 @@ We have now addressed this inconsistency, by also introducing support for settin * `setObject(..., localTime)` sets a `LocalDateTime` derived as `LocalDate.EPOCH.atTime(localTime)` (i.e. on 1970-01-01) * `setObject(..., localDate)` sets a `LocalDateTime` derived as `localDate.atStartOfDay()` (i.e. at 00:00:00) +[#blob-performance] +=== Performance improvements blobs + +[#blob-performance-read] +==== Reading blobs + +Performance of reading blobs was improved, especially when using `ResultSet.getBytes`, `Blob.getBytes`, `ResultSet.getString` or reading from a blob input stream with `read(byte[], int, int)` and similar methods with a byte array and requested length greater than 50% of the configured `blobBufferSize`. + +Testing on a local network (Wi-Fi) shows an increase in throughput of roughly 50-100% for reading large blobs with the default `blobBufferSize` of 16384. + +These throughput improvements were only realised in the pure Java protocol, because there we had the opportunity to avoid all additional allocations by writing directly from the network stream into the destination byte array, and this allows us to ignore the configured `blobBufferSize` and use up to the maximum request size of 64KiB - 1 instead. + +This is not possible for the JNA-based protocols (native/embedded), as the implementation requires a direct byte buffer to bridge to the native API, and thus we can't ignore the `blobBufferSize`. +We were able to realise some other optimizations (in both pure Java and JNA), by avoiding allocation of a number of intermediate objects, but this has only marginal effects on the throughput. + +Similar improvements may follow for writes. + +[#blob-performance-min-buf] +==== Minimum `blobBufferSize` 512 bytes + +As part of the performance improvements, a minimum `blobBufferSize` of 512 bytes was introduced. +Configuring values less than 512 will be ignored and use 512 instead. + // TODO add major changes [#potentially-breaking-changes] diff --git a/src/jna-test/org/firebirdsql/gds/ng/jna/JnaBlobTest.java b/src/jna-test/org/firebirdsql/gds/ng/jna/JnaBlobTest.java index 3545be307..8fd304d65 100644 --- a/src/jna-test/org/firebirdsql/gds/ng/jna/JnaBlobTest.java +++ b/src/jna-test/org/firebirdsql/gds/ng/jna/JnaBlobTest.java @@ -29,6 +29,8 @@ import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.ByteArrayOutputStream; import java.sql.SQLException; @@ -63,13 +65,14 @@ class JnaBlobTest extends BaseTestBlob { /** * Tests retrieval of a blob (what goes in is what comes out). */ - @Test - void testInputBlobRetrieval() throws Exception { + @ParameterizedTest + @ValueSource(booleans = { true, false }) + void testInputBlobRetrieval(boolean useStreamBlobs) throws Exception { final int testId = 1; final byte[] baseContent = generateBaseContent(); // Use sufficiently large value so that multiple segments are used final int requiredSize = 4 * Short.MAX_VALUE; - populateBlob(testId, baseContent, requiredSize); + populateBlob(testId, baseContent, requiredSize, useStreamBlobs); try (FbDatabase db = createDatabaseConnection()) { try { @@ -84,8 +87,36 @@ void testInputBlobRetrieval() throws Exception { blob.close(); statement.close(); byte[] result = bos.toByteArray(); - assertEquals(requiredSize, result.length, "Unexpected length read from blob"); - assertTrue(validateBlobContent(result, baseContent, requiredSize), "Unexpected blob content"); + assertBlobContent(result, baseContent, requiredSize); + } finally { + if (transaction != null) transaction.commit(); + } + } + } + + /** + * Tests retrieval of a blob (what goes in is what comes out). + */ + @ParameterizedTest + @ValueSource(booleans = { true, false }) + public void testBlobGet(boolean useStreamBlobs) throws Exception { + final int testId = 1; + final byte[] baseContent = generateBaseContent(); + // Use sufficiently large value so that multiple roundtrips are used + final int requiredSize = 4 * Short.MAX_VALUE; + populateBlob(testId, baseContent, requiredSize, useStreamBlobs); + + try (FbDatabase db = createDatabaseConnection()) { + try { + long blobId = getBlobId(testId, db); + + FbBlob blob = db.createBlobForInput(transaction, blobId); + blob.open(); + byte[] result = new byte[requiredSize]; + blob.get(result, 0, requiredSize); + blob.close(); + statement.close(); + assertBlobContent(result, baseContent, requiredSize); } finally { if (transaction != null) transaction.commit(); } @@ -101,7 +132,7 @@ void testInputBlobSeek_segmented() throws Exception { final byte[] baseContent = generateBaseContent(); // Use sufficiently large value so that multiple segments are used final int requiredSize = 4 * Short.MAX_VALUE; - populateBlob(testId, baseContent, requiredSize); + populateSegmentedBlob(testId, baseContent, requiredSize); try (JnaDatabase db = createDatabaseConnection()) { try { @@ -165,7 +196,7 @@ void testInputBlobReopen() throws Exception { final int testId = 1; final byte[] baseContent = generateBaseContent(); final int requiredSize = 256; - populateBlob(testId, baseContent, requiredSize); + populateSegmentedBlob(testId, baseContent, requiredSize); try (JnaDatabase db = createDatabaseConnection()) { try { @@ -188,8 +219,7 @@ void testInputBlobReopen() throws Exception { statement.close(); byte[] result = bos.toByteArray(); - assertEquals(requiredSize, result.length, "Unexpected length read from blob"); - assertTrue(validateBlobContent(result, baseContent, requiredSize), "Unexpected blob content"); + assertBlobContent(result, baseContent, requiredSize); } finally { if (transaction != null) transaction.commit(); } @@ -204,7 +234,7 @@ void testInputBlobDoubleOpen() throws Exception { final int testId = 1; final byte[] baseContent = generateBaseContent(); final int requiredSize = 256; - populateBlob(testId, baseContent, requiredSize); + populateSegmentedBlob(testId, baseContent, requiredSize); try (JnaDatabase db = createDatabaseConnection()) { try { diff --git a/src/main/org/firebirdsql/gds/VaxEncoding.java b/src/main/org/firebirdsql/gds/VaxEncoding.java index 7b5beeba9..87ae57928 100644 --- a/src/main/org/firebirdsql/gds/VaxEncoding.java +++ b/src/main/org/firebirdsql/gds/VaxEncoding.java @@ -18,7 +18,9 @@ */ package org.firebirdsql.gds; +import java.io.EOFException; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; /** @@ -243,4 +245,23 @@ public static void encodeVaxInteger2WithoutLength(OutputStream out, int val) thr out.write(val); out.write(val >> 8); } + + /** + * Decodes an integer using two byte Vax encoding from an input stream, without length prefix. + * + * @param in + * input stream to read + * @return decoded value + * @throws IOException + * for errors reading from the stream, or if end-of-stream was reached before the full integer + * @since 6 + */ + public static int decodeVaxInteger2WithoutLength(InputStream in) throws IOException { + int ch1 = in.read(); + int ch2 = in.read(); + if ((ch1 | ch2) < 0) + throw new EOFException(); + return ch1 | (ch2 << 8); + } + } diff --git a/src/main/org/firebirdsql/gds/ng/AbstractFbBlob.java b/src/main/org/firebirdsql/gds/ng/AbstractFbBlob.java index 4d587a667..44be9b224 100644 --- a/src/main/org/firebirdsql/gds/ng/AbstractFbBlob.java +++ b/src/main/org/firebirdsql/gds/ng/AbstractFbBlob.java @@ -41,6 +41,7 @@ public abstract class AbstractFbBlob implements FbBlob, TransactionListener, Dat protected final ExceptionListenerDispatcher exceptionListenerDispatcher = new ExceptionListenerDispatcher(this); private final BlobParameterBuffer blobParameterBuffer; + private final int maximumSegmentSize; private FbTransaction transaction; private FbDatabase database; private boolean open; @@ -50,6 +51,7 @@ protected AbstractFbBlob(FbDatabase database, FbTransaction transaction, BlobPar this.database = database; this.transaction = transaction; this.blobParameterBuffer = blobParameterBuffer; + maximumSegmentSize = maximumSegmentSize(database); transaction.addWeakTransactionListener(this); } @@ -401,7 +403,20 @@ protected BlobLengthProcessor createBlobLengthProcessor() { @Override public int getMaximumSegmentSize() { - // TODO Max size in FB 3 is 2^16, not 2^15 - 1, is that for all versions, or only for newer protocols? + return maximumSegmentSize; + } + + private static int maximumSegmentSize(FbDatabase db) { + // Max size in FB 2.1 and higher is 2^16 - 1, not 2^15 - 3 (IB 6 docs mention max is 32KiB) + if (db != null && (db.getOdsMajor() > 11 || db.getOdsMajor() == 11 && db.getOdsMinor() >= 1)) { + /* ODS 11.1 is Firebird 2.1 + NOTE: getSegment can retrieve at most 65533 bytes of blob data as the buffer to receive segments is + max 65535 bytes, but the contents of the buffer are one or more segments prefixed with 2-byte lengths; + putSegment can write max 65535 bytes, because the buffer *is* the segment */ + return 65535; + } + // NOTE: This should probably be Short.MAX_VALUE, but we can no longer run the relevant tests on Firebird 2.0 + // and older (which aren't supported any way), so we leave this as is return Short.MAX_VALUE - 2; } } diff --git a/src/main/org/firebirdsql/gds/ng/FbBlob.java b/src/main/org/firebirdsql/gds/ng/FbBlob.java index 7881aa972..a93aaa5f9 100644 --- a/src/main/org/firebirdsql/gds/ng/FbBlob.java +++ b/src/main/org/firebirdsql/gds/ng/FbBlob.java @@ -26,6 +26,7 @@ import org.firebirdsql.gds.ISCConstants; import org.firebirdsql.gds.ng.listeners.ExceptionListenable; +import org.firebirdsql.jaybird.props.DatabaseConnectionProperties; import java.sql.SQLException; @@ -127,10 +128,40 @@ public interface FbBlob extends ExceptionListenable, AutoCloseable { */ byte[] getSegment(int sizeRequested) throws SQLException; + /** + * Reads content from the blob into {@code buf} starting at {@code pos} for {@code len} bytes. + *
+ * Implementations must read the requested number of bytes ({@code len}), except if end-of-blob is reached before + * the requested number of bytes was reached. The return value of this method is the actual number of bytes read. + *
+ *+ * If the implementation cannot perform reads without additional allocation, it should use at most + * {@link DatabaseConnectionProperties#getBlobBufferSize()} as an internal buffer. If the implementation can + * perform reads without additional allocation, it is recommended it performs reads using (at most) + * {@link #getMaximumSegmentSize()}. + *
+ *+ * Contrary to similar methods like {@link java.io.InputStream#read(byte[], int, int)}, this method returns + * {@code 0} when no bytes were read if end-of-blob is reached without reading any bytes, not {@code -1}. + *
+ * + * @param buf + * target byte array + * @param pos + * position to start + * @param len + * number of bytes + * @return actual number of bytes read; this will only be less than {@code len} when end-of-blob was reached + * @throws SQLException + * for database access errors, if {@code pos < 0}, {@code len < 0}, or if {@code pos + len > buf.length} + * @since 6 + */ + int get(byte[] buf, int pos, int len) throws SQLException; + /** * Writes a segment of blob data. *- * Implementation must handle segment length exceeding {@link #getMaximumSegmentSize()} by batching. TODO: reconsider and let caller handle that? + * Implementation must handle segment length exceeding {@link #getMaximumSegmentSize()} by batching. *
** Passing a section that is length 0 will throw an {@code SQLException}. diff --git a/src/main/org/firebirdsql/gds/ng/wire/AbstractFbWireBlob.java b/src/main/org/firebirdsql/gds/ng/wire/AbstractFbWireBlob.java index bbbeabfb5..57c8b02df 100644 --- a/src/main/org/firebirdsql/gds/ng/wire/AbstractFbWireBlob.java +++ b/src/main/org/firebirdsql/gds/ng/wire/AbstractFbWireBlob.java @@ -20,6 +20,8 @@ import org.firebirdsql.gds.BlobParameterBuffer; import org.firebirdsql.gds.impl.wire.WireProtocolConstants; +import org.firebirdsql.gds.impl.wire.XdrInputStream; +import org.firebirdsql.gds.impl.wire.XdrOutputStream; import org.firebirdsql.gds.ng.AbstractFbBlob; import org.firebirdsql.gds.ng.LockCloseable; @@ -114,4 +116,32 @@ public byte[] getBlobInfo(final byte[] requestItems, final int bufferLength) thr throw e; } } + + /** + * Gets the XdrInputStream. + * + * @return instance of XdrInputStream + * @throws SQLException + * if no connection is opened or when exceptions occur retrieving the InputStream + * @since 6 + */ + protected final XdrInputStream getXdrIn() throws SQLException { + return getXdrStreamAccess().getXdrIn(); + } + + /** + * Gets the XdrOutputStream. + * + * @return instance of XdrOutputStream + * @throws SQLException + * if no connection is opened or when exceptions occur retrieving the OutputStream + * @since 6 + */ + protected final XdrOutputStream getXdrOut() throws SQLException { + return getXdrStreamAccess().getXdrOut(); + } + + private XdrStreamAccess getXdrStreamAccess() { + return getDatabase().getXdrStreamAccess(); + } } diff --git a/src/main/org/firebirdsql/gds/ng/wire/AbstractFbWireDatabase.java b/src/main/org/firebirdsql/gds/ng/wire/AbstractFbWireDatabase.java index 4bc5ffd4e..4159ab9c0 100644 --- a/src/main/org/firebirdsql/gds/ng/wire/AbstractFbWireDatabase.java +++ b/src/main/org/firebirdsql/gds/ng/wire/AbstractFbWireDatabase.java @@ -113,6 +113,11 @@ public final XdrStreamAccess getXdrStreamAccess() { return connection.getXdrStreamAccess(); } + @Override + public final FbWireOperations getWireOperations() { + return wireOperations; + } + @Override public final boolean isAttached() { return super.isAttached() && connection.isConnected(); diff --git a/src/main/org/firebirdsql/gds/ng/wire/AbstractFbWireOutputBlob.java b/src/main/org/firebirdsql/gds/ng/wire/AbstractFbWireOutputBlob.java index c7bd93935..1f53b6206 100644 --- a/src/main/org/firebirdsql/gds/ng/wire/AbstractFbWireOutputBlob.java +++ b/src/main/org/firebirdsql/gds/ng/wire/AbstractFbWireOutputBlob.java @@ -70,16 +70,25 @@ public final boolean isOutput() { @Override public final byte[] getSegment(int sizeRequested) throws SQLException { + readNotSupported(); + return null; + } + + private void readNotSupported() throws SQLException { SQLException e = FbExceptionBuilder.forNonTransientException(ISCConstants.isc_segstr_no_read).toSQLException(); exceptionListenerDispatcher.errorOccurred(e); throw e; } + @Override + public int get(byte[] buf, int pos, int len) throws SQLException { + readNotSupported(); + return -1; + } + @Override public final void seek(int offset, SeekMode seekMode) throws SQLException { // This assumes seeks are not (nor in the future) supported on output blobs - SQLException e = FbExceptionBuilder.forNonTransientException(ISCConstants.isc_segstr_no_read).toSQLException(); - exceptionListenerDispatcher.errorOccurred(e); - throw e; + readNotSupported(); } } diff --git a/src/main/org/firebirdsql/gds/ng/wire/AbstractFbWireService.java b/src/main/org/firebirdsql/gds/ng/wire/AbstractFbWireService.java index bda23a4c4..2cded4a6b 100644 --- a/src/main/org/firebirdsql/gds/ng/wire/AbstractFbWireService.java +++ b/src/main/org/firebirdsql/gds/ng/wire/AbstractFbWireService.java @@ -170,6 +170,11 @@ public final XdrStreamAccess getXdrStreamAccess() { return connection.getXdrStreamAccess(); } + @Override + public final FbWireOperations getWireOperations() { + return wireOperations; + } + /** * Closes the WireConnection associated with this connection. * diff --git a/src/main/org/firebirdsql/gds/ng/wire/AbstractWireOperations.java b/src/main/org/firebirdsql/gds/ng/wire/AbstractWireOperations.java index a051dd429..0a26adb97 100644 --- a/src/main/org/firebirdsql/gds/ng/wire/AbstractWireOperations.java +++ b/src/main/org/firebirdsql/gds/ng/wire/AbstractWireOperations.java @@ -156,12 +156,7 @@ public final Response readOperationResponse(int operationCode, WarningMessageCal return response; } - /** - * Reads the next operation. Forwards call to {@link WireConnection#readNextOperation()}. - * - * @return next operation - * @throws java.io.IOException For errors reading the operation from the connection - */ + @Override public final int readNextOperation() throws IOException { try (LockCloseable ignored = withLock()) { processDeferredActions(); diff --git a/src/main/org/firebirdsql/gds/ng/wire/FbWireAttachment.java b/src/main/org/firebirdsql/gds/ng/wire/FbWireAttachment.java index d3b17610a..177fa6bf1 100644 --- a/src/main/org/firebirdsql/gds/ng/wire/FbWireAttachment.java +++ b/src/main/org/firebirdsql/gds/ng/wire/FbWireAttachment.java @@ -41,6 +41,11 @@ public interface FbWireAttachment extends FbAttachment { */ XdrStreamAccess getXdrStreamAccess(); + /** + * @return Instance of {@link FbWireOperations} for this attachment. + */ + FbWireOperations getWireOperations(); + /** * Convenience method to read a Response to a GenericResponse * diff --git a/src/main/org/firebirdsql/gds/ng/wire/FbWireOperations.java b/src/main/org/firebirdsql/gds/ng/wire/FbWireOperations.java index 808a46567..f537f73f9 100644 --- a/src/main/org/firebirdsql/gds/ng/wire/FbWireOperations.java +++ b/src/main/org/firebirdsql/gds/ng/wire/FbWireOperations.java @@ -65,6 +65,22 @@ public interface FbWireOperations { */ Response readResponse(WarningMessageCallback callback) throws SQLException, IOException; + /** + * Reads the next operation code, after processing deferred packets. + *
+ * In general, calling {@link #readResponse(WarningMessageCallback)} or one of the specific {@code readXXXResponse} + * methods should be preferred to read the response code and the response body. Use this method only for reading + * custom responses, or if you need to process the response in a way that is not possible with + * {@link #readResponse(WarningMessageCallback)}. + *
+ * + * @return next operation + * @throws java.io.IOException + * for errors reading the operation from the connection + * @since 6 + */ + int readNextOperation() throws IOException; + /** * Reads the response from the server when the operation code has already been read. * diff --git a/src/main/org/firebirdsql/gds/ng/wire/version10/V10InputBlob.java b/src/main/org/firebirdsql/gds/ng/wire/version10/V10InputBlob.java index 6567e929f..3ade3a5a4 100644 --- a/src/main/org/firebirdsql/gds/ng/wire/version10/V10InputBlob.java +++ b/src/main/org/firebirdsql/gds/ng/wire/version10/V10InputBlob.java @@ -19,15 +19,20 @@ package org.firebirdsql.gds.ng.wire.version10; import org.firebirdsql.gds.BlobParameterBuffer; +import org.firebirdsql.gds.VaxEncoding; +import org.firebirdsql.gds.impl.wire.XdrInputStream; import org.firebirdsql.gds.impl.wire.XdrOutputStream; import org.firebirdsql.gds.ng.FbExceptionBuilder; import org.firebirdsql.gds.ng.LockCloseable; import org.firebirdsql.gds.ng.listeners.DatabaseListener; import org.firebirdsql.gds.ng.wire.*; +import org.firebirdsql.jdbc.SQLStateConstants; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.sql.SQLException; +import java.sql.SQLNonTransientException; +import java.sql.SQLWarning; +import java.util.Objects; import static org.firebirdsql.gds.JaybirdErrorCodes.jb_blobGetSegmentNegative; import static org.firebirdsql.gds.VaxEncoding.iscVaxInteger2; @@ -38,6 +43,8 @@ * @since 3.0 */ public class V10InputBlob extends AbstractFbWireInputBlob implements FbWireBlob, DatabaseListener { + + private static final int STATE_END_OF_BLOB = 2; // TODO V10OutputBlob and V10InputBlob share some common behavior and information (eg in open() and getMaximumSegmentSize()), find a way to unify this @@ -57,7 +64,7 @@ public void open() throws SQLException { final FbWireDatabase database = getDatabase(); try { - final XdrOutputStream xdrOut = database.getXdrStreamAccess().getXdrOut(); + final XdrOutputStream xdrOut = getXdrOut(); final BlobParameterBuffer blobParameterBuffer = getBlobParameterBuffer(); if (blobParameterBuffer == null) { xdrOut.writeInt(op_open_blob); @@ -94,50 +101,157 @@ public byte[] getSegment(final int sizeRequested) throws SQLException { .messageParameter(sizeRequested) .toSQLException(); } + final GenericResponse response; try (LockCloseable ignored = withLock()) { checkDatabaseAttached(); checkTransactionActive(); checkBlobOpen(); - // TODO Is this actually a real limitation, or are larger sizes possible? - int actualSize = 2 + Math.min(sizeRequested, getMaximumSegmentSize()); - final GenericResponse response; - final FbWireDatabase database = getDatabase(); + int actualSize = segmentRequestSize(sizeRequested); try { - final XdrOutputStream xdrOut = database.getXdrStreamAccess().getXdrOut(); - xdrOut.writeInt(op_get_segment); - xdrOut.writeInt(getHandle()); - xdrOut.writeInt(actualSize); - xdrOut.writeInt(0); // length of segment send buffer (always 0 in get) - xdrOut.flush(); + sendGetSegment(actualSize); + getXdrOut().flush(); } catch (IOException e) { throw FbExceptionBuilder.ioWriteError(e); } try { - response = database.readGenericResponse(null); - // TODO Meaning of 2 - if (response.getObjectHandle() == 2) { + response = getDatabase().readGenericResponse(null); + if (response.getObjectHandle() == STATE_END_OF_BLOB) { // TODO what if I seek on a stream blob? setEof(); } } catch (IOException e) { throw FbExceptionBuilder.ioReadError(e); } + } - final byte[] responseBuffer = response.getData(); - if (responseBuffer.length == 0) { - return responseBuffer; - } + final byte[] responseBuffer = response.getData(); + if (responseBuffer.length == 0) { + return responseBuffer; + } + + final byte[] data = new byte[getTotalSegmentSize(responseBuffer)]; + int responsePos = 0; + int dataPos = 0; + while (responsePos < responseBuffer.length) { + int segmentLength = iscVaxInteger2(responseBuffer, responsePos); + responsePos += 2; + System.arraycopy(responseBuffer, responsePos, data, dataPos, segmentLength); + responsePos += segmentLength; + dataPos += segmentLength; + } + return data; + } catch (SQLException e) { + exceptionListenerDispatcher.errorOccurred(e); + throw e; + } + } + + /** + * Calculates the total size of all segments in {@code segmentBuffer}. + * + * @param segmentBuffer + * segment buffer (contains 1 or more segments of [2-byte length][length bytes]...). + * @return total length of segments + */ + private static int getTotalSegmentSize(byte[] segmentBuffer) { + int count = 0; + int pos = 0; + while (pos < segmentBuffer.length) { + int segmentLength = VaxEncoding.iscVaxInteger2(segmentBuffer, pos); + pos += 2 + segmentLength; + count += segmentLength; + } + return count; + } + + private int segmentRequestSize(int size) { + // The request size is the total buffer, but segments are prefixed with 2 bytes for the size. It is possible + // a single response contains multiple segments, but we don't take that into account for the size calculation. + return Math.min(Math.max(size, size + 2), getMaximumSegmentSize()); + } + + protected void sendGetSegment(int len) throws SQLException, IOException { + XdrOutputStream xdrOut = getXdrOut(); + xdrOut.writeInt(op_get_segment); + xdrOut.writeInt(getHandle()); + xdrOut.writeInt(len); + xdrOut.writeInt(0); // length of segment send buffer (always 0 in get) + } - final ByteArrayOutputStream bos = new ByteArrayOutputStream(actualSize); - int position = 0; - while (position < responseBuffer.length) { - final int segmentLength = iscVaxInteger2(responseBuffer, position); - position += 2; - bos.write(responseBuffer, position, segmentLength); - position += segmentLength; + @Override + public int get(final byte[] buf, final int pos, final int len) throws SQLException { + try { + try { + Objects.checkFromIndexSize(pos, len, Objects.requireNonNull(buf, "buf").length); + } catch (IndexOutOfBoundsException e) { + throw new SQLNonTransientException(e.toString(), SQLStateConstants.SQL_STATE_INVALID_STRING_LENGTH); + } + + try (LockCloseable ignored = withLock()) { + checkDatabaseAttached(); + checkTransactionActive(); + checkBlobOpen(); + + int count = 0; + while (count < len && !isEof()) { + try { + sendGetSegment(segmentRequestSize(len - count)); + getXdrOut().flush(); + } catch (IOException e) { + throw FbExceptionBuilder.ioWriteError(e); + } + try { + FbWireOperations wireOps = getDatabase().getWireOperations(); + final int opCode = wireOps.readNextOperation(); + if (opCode != op_response) { + wireOps.readOperationResponse(opCode, null); + throw new SQLException("Unexpected response to op_get_segment: " + opCode); + } + XdrInputStream xdrIn = getXdrIn(); + final int objHandle = xdrIn.readInt(); + xdrIn.skipNBytes(8); // blob-id (unused) + + final int bufferLength = xdrIn.readInt(); + if (bufferLength > 0) { + int bufferRemaining = bufferLength; + while (bufferRemaining > 2) { + int segmentLength = VaxEncoding.decodeVaxInteger2WithoutLength(xdrIn); + bufferRemaining -= 2; + if (segmentLength > bufferRemaining) { + throw new IOException( + "Inconsistent segment buffer: segment length %d, remaining buffer was %d" + .formatted(segmentLength, bufferRemaining)); + } else if (segmentLength > len - count) { + throw new IOException("Returned segment length %d exceeded remaining size %d" + .formatted(segmentLength, len - count)); + } + xdrIn.readFully(buf, pos + count, segmentLength); + bufferRemaining -= segmentLength; + count += segmentLength; + } + + // Safety measure: read remaining (shouldn't happen in practice) + xdrIn.skipNBytes(bufferRemaining); + // Skip buffer padding + xdrIn.skipPadding(bufferLength); + } + + SQLException exception = wireOps.readStatusVector(); + if (exception != null && !(exception instanceof SQLWarning)) { + // NOTE: SQLWarning is unlikely for this operation, so we don't do anything to report it + throw exception; + } + + if (objHandle == STATE_END_OF_BLOB) { + setEof(); + } + } catch (IOException e) { + throw FbExceptionBuilder.ioReadError(e); + } } - return bos.toByteArray(); + + return count; } } catch (SQLException e) { exceptionListenerDispatcher.errorOccurred(e); @@ -153,7 +267,7 @@ public void seek(int offset, SeekMode seekMode) throws SQLException { final FbWireDatabase database = getDatabase(); try { - final XdrOutputStream xdrOut = database.getXdrStreamAccess().getXdrOut(); + final XdrOutputStream xdrOut = getXdrOut(); xdrOut.writeInt(op_seek_blob); xdrOut.writeInt(getHandle()); xdrOut.writeInt(seekMode.getSeekModeId()); diff --git a/src/main/org/firebirdsql/gds/ng/wire/version10/V10OutputBlob.java b/src/main/org/firebirdsql/gds/ng/wire/version10/V10OutputBlob.java index 9bba5f6a7..705487699 100644 --- a/src/main/org/firebirdsql/gds/ng/wire/version10/V10OutputBlob.java +++ b/src/main/org/firebirdsql/gds/ng/wire/version10/V10OutputBlob.java @@ -40,7 +40,7 @@ */ public class V10OutputBlob extends AbstractFbWireOutputBlob implements FbWireBlob, DatabaseListener { - // TODO V10OutputBlob and V10InputBlob share some common behavior and information (eg in open() and getMaximumSegmentSize()), find a way to unify this + // TODO V10OutputBlob and V10InputBlob share some common behavior and information (eg in open()), find a way to unify this public V10OutputBlob(FbWireDatabase database, FbWireTransaction transaction, BlobParameterBuffer blobParameterBuffer) { @@ -62,7 +62,7 @@ public void open() throws SQLException { final FbWireDatabase database = getDatabase(); try { - final XdrOutputStream xdrOut = database.getXdrStreamAccess().getXdrOut(); + final XdrOutputStream xdrOut = getXdrOut(); final BlobParameterBuffer blobParameterBuffer = getBlobParameterBuffer(); if (blobParameterBuffer == null) { xdrOut.writeInt(op_create_blob); @@ -108,7 +108,7 @@ public void putSegment(byte[] segment) throws SQLException { final FbWireDatabase database = getDatabase(); try { - final XdrOutputStream xdrOut = database.getXdrStreamAccess().getXdrOut(); + final XdrOutputStream xdrOut = getXdrOut(); xdrOut.writeInt(op_put_segment); xdrOut.writeInt(getHandle()); xdrOut.writeInt(segment.length); diff --git a/src/main/org/firebirdsql/jaybird/props/DatabaseConnectionProperties.java b/src/main/org/firebirdsql/jaybird/props/DatabaseConnectionProperties.java index b2c4fd542..2928a318a 100644 --- a/src/main/org/firebirdsql/jaybird/props/DatabaseConnectionProperties.java +++ b/src/main/org/firebirdsql/jaybird/props/DatabaseConnectionProperties.java @@ -198,15 +198,17 @@ default void setSessionTimeZone(String sessionTimeZone) { } /** - * @return BLOB buffer size in bytes. + * @return BLOB buffer size in bytes; if the configured value is less than an implementation-specific minimum, that + * minimum is returned */ default int getBlobBufferSize() { - return getIntProperty(PropertyNames.blobBufferSize, PropertyConstants.DEFAULT_BLOB_BUFFER_SIZE); + return Math.max(PropertyConstants.MIN_BLOB_BUFFER_SIZE, + getIntProperty(PropertyNames.blobBufferSize, PropertyConstants.DEFAULT_BLOB_BUFFER_SIZE)); } /** * @param blobBufferSize - * size of the BLOB buffer in bytes. + * size of the BLOB buffer in bytes */ default void setBlobBufferSize(int blobBufferSize) { setIntProperty(PropertyNames.blobBufferSize, blobBufferSize); diff --git a/src/main/org/firebirdsql/jaybird/props/PropertyConstants.java b/src/main/org/firebirdsql/jaybird/props/PropertyConstants.java index 008f588df..379b21997 100644 --- a/src/main/org/firebirdsql/jaybird/props/PropertyConstants.java +++ b/src/main/org/firebirdsql/jaybird/props/PropertyConstants.java @@ -52,6 +52,7 @@ public final class PropertyConstants { static final boolean DEFAULT_IGNORE_PROCEDURE_TYPE = false; static final boolean DEFAULT_WIRE_COMPRESSION = false; public static final int DEFAULT_BLOB_BUFFER_SIZE = 16384; + static final int MIN_BLOB_BUFFER_SIZE = 512; public static final int DEFAULT_PAGE_CACHE_SIZE = 0; static final boolean DEFAULT_USE_SERVER_BATCH = true; public static final int DEFAULT_SERVER_BATCH_BUFFER_SIZE = 0; diff --git a/src/main/org/firebirdsql/jdbc/FBBlobInputStream.java b/src/main/org/firebirdsql/jdbc/FBBlobInputStream.java index d8c329286..b5224f523 100644 --- a/src/main/org/firebirdsql/jdbc/FBBlobInputStream.java +++ b/src/main/org/firebirdsql/jdbc/FBBlobInputStream.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.io.InputStream; import java.sql.SQLException; +import java.util.Objects; /** * An input stream for reading directly from a FBBlob instance. @@ -43,7 +44,7 @@ public final class FBBlobInputStream extends InputStream implements FirebirdBlob FBBlobInputStream(FBBlob owner) throws SQLException { if (owner.isNew()) { - throw new FBSQLException("You can't read a new blob"); + throw new SQLException("You can't read a new blob", SQLStateConstants.SQL_STATE_LOCATOR_EXCEPTION); } this.owner = owner; @@ -97,7 +98,8 @@ public int available() throws IOException { * Checks the available buffer size, retrieving a segment from the server if necessary. * * @return The number of bytes available in the buffer, or {@code -1} if the end of the stream is reached. - * @throws IOException if an I/O error occurs, or if the stream has been closed. + * @throws IOException + * if an I/O error occurs, or if the stream has been closed. */ private int checkBuffer() throws IOException { try (LockCloseable ignored = owner.withLock()) { @@ -126,45 +128,48 @@ public int read() throws IOException { } @Override - public int read(byte[] b, int off, int len) throws IOException { - if (b == null) { - throw new NullPointerException(); - } else if (off < 0 || len < 0 || len > b.length - off) { - throw new IndexOutOfBoundsException(); - } else if (len == 0) { - return 0; - } + public int read(final byte[] b, int off, final int len) throws IOException { + Objects.checkFromIndexSize(off, len, b.length); + if (len == 0) return 0; - final int toCopy = Math.min(checkBuffer(), len); - if (toCopy == -1) { - return -1; + try (LockCloseable ignored = owner.withLock()) { + checkClosed(); + // Optimization: for small lengths use buffer, otherwise only check if we currently have data in buffer + final int smallBufferLimit = Math.min(owner.getBufferLength(), blobHandle.getMaximumSegmentSize()) / 2; + final int avail = len <= smallBufferLimit ? checkBuffer() : available(); + if (avail == -1) { + return -1; + } + int count = 0; + if (avail > 0) { + count = Math.min(avail, len); + System.arraycopy(buffer, this.pos, b, off, count); + this.pos += count; + if (len - count < smallBufferLimit) { + // Remaining bytes are small, better if this method is called again (which will use a buffer) + return count; + } + } + + if (count < len) { + count += blobHandle.get(b, off + count, len - count); + } + // When we haven't read anything, report end-of-blob + return count == 0 ? -1 : count; + } catch (SQLException ge) { + if (ge.getCause() instanceof IOException ioe) { + throw ioe; + } + throw new IOException("Blob read problem: " + ge, ge); } - System.arraycopy(buffer, pos, b, off, toCopy); - pos += toCopy; - return toCopy; } @Override public void readFully(byte[] b, int off, int len) throws IOException { - if (b == null) { - throw new NullPointerException(); - } else if (off < 0 || len < 0 || len > b.length - off) { - throw new IndexOutOfBoundsException(); - } else if (len == 0) { - return; - } - - int counter = 0; - int pos = off; - int toRead = len; - - while (toRead > 0 && (counter = read(b, pos, toRead)) != -1) { - pos += counter; - toRead -= counter; - } - - if (counter == -1) { - throw new EOFException(); + int read = readNBytes(b, off, len); + if (read != len) { + throw new EOFException( + "End-of-blob reached after reading %d bytes, required %d bytes".formatted(read, len)); } } diff --git a/src/main/org/firebirdsql/jdbc/FirebirdBlob.java b/src/main/org/firebirdsql/jdbc/FirebirdBlob.java index ec3f6f338..7e393211f 100644 --- a/src/main/org/firebirdsql/jdbc/FirebirdBlob.java +++ b/src/main/org/firebirdsql/jdbc/FirebirdBlob.java @@ -24,6 +24,8 @@ */ package org.firebirdsql.jdbc; +import org.firebirdsql.gds.ISCConstants; + import java.sql.Blob; import java.sql.SQLException; @@ -46,13 +48,13 @@ public interface FirebirdBlob extends Blob { interface BlobInputStream extends AutoCloseable { /** Seek based on the absolute beginning of the stream */ - int SEEK_MODE_ABSOLUTE = 0; + int SEEK_MODE_ABSOLUTE = ISCConstants.blb_seek_from_head; /** Seek relative to the current position in the stream */ - int SEEK_MODE_RELATIVE = 1; + int SEEK_MODE_RELATIVE = ISCConstants.blb_seek_relative; /** Seek relative to the tail end of the stream */ - int SEEK_MODE_FROM_TAIL = 2; + int SEEK_MODE_FROM_TAIL = ISCConstants.blb_seek_from_tail; /** * Get instance of {@link FirebirdBlob} to which this stream belongs to. @@ -87,93 +89,106 @@ interface BlobInputStream extends AutoCloseable { */ @Override void close() throws IOException; - + /** - * Get Blob length. This is shortcut method for the {@code inputStream.getBlob().length()} call, however is more - * resource friendly, because no new Blob handle is created. - * - * @return length of the blob. - * - * @throws IOException if I/O error occurs. + * Get Blob length. This is a shortcut for {@code inputStream.getBlob().length()} call, and is more resource + * friendly, because no new Blob handle is created. + * + * @return length of the blob + * @throws IOException + * if I/O error occurs */ long length() throws IOException; - + /** - * Read single byte from the stream. - * - * @return next byte read from the stream or -1 if end of stream was - * reached. - * - * @throws IOException if I/O error occurs. + * Read a single byte from the stream. + * + * @return next byte read from the stream or {@code -1} if end of stream was reached + * @throws IOException + * if I/O error occurs + * @see InputStream#read() */ int read() throws IOException; - + /** - * Read some bytes from the stream without blocking. - * - * @param buffer buffer into which data should be read. - * @param offset offset in the buffer where to start. - * @param length number of bytes to read. - * - * @return number of bytes that were read. - * - * @throws IOException if I/O error occurs. + * Read some bytes from the stream into {@code buffer}. + *+ * The implementation may read less bytes than requested. Implementations may perform multiple roundtrips to + * the server to fill {@code buffer} up to the requested length. + *
+ * + * @param buffer + * buffer into which data should be read + * @param offset + * offset in the buffer where to start + * @param length + * number of bytes to read + * @return number of bytes that were actually read, returns {@code 0} if {@code len == 0}, {@code -1} if + * end-of-blob was reached without reading any bytes + * @throws IOException + * if I/O error occurs + * @see InputStream#read(byte[], int, int) */ int read(byte[] buffer, int offset, int length) throws IOException; - + /** * Read {@code length} from the stream into the specified buffer. - * This method can block until desired number of bytes is read, it can - * throw an exception if end of stream was reached during read. - * - * @param buffer buffer where data should be read. - * @param offset offset in the buffer where to start. - * @param length number of bytes to read. - * - * @throws EOFException if stream end was reached when reading data. - * @throws IOException if I/O error occurs. + *+ * This method will throw an {@code EOFException} if end-of-blob was reached before reading {@code length} + * bytes. + *
+ * + * @param buffer + * buffer where data should be read + * @param offset + * offset in the buffer where to start + * @param length + * number of bytes to read + * @throws EOFException + * if stream end was reached when reading data. + * @throws IOException + * if I/O error occurs. */ void readFully(byte[] buffer, int offset, int length) throws IOException; - + /** * Read {@code buffer.length} bytes from the buffer. This is a shortcut method for * {@code readFully(buffer, 0, buffer.length)} call. - * - * @param buffer buffer where data should be read. - * - * @throws IOException if I/O error occurs. + * + * @param buffer + * buffer where data should be read + * @throws IOException + * if I/O error occurs */ void readFully(byte[] buffer) throws IOException; - + /** - * Move current position in the Blob stream. This is a shortcut method - * to {@link #seek(int, int)} passing {@link #SEEK_MODE_ABSOLUTE} as - * seek mode. - * - * @param position absolute position to seek, starting position is - * 0 (note, in {@link Blob#getBytes(long, int)} starting position is 1). - * - * @throws IOException if I/O error occurs. + * Move current position in the Blob stream. This is a shortcut method to {@link #seek(int, int)} passing + * {@link #SEEK_MODE_ABSOLUTE} as seek mode. + * + * @param position + * absolute position to seek, starting position is 0 (note, in {@link Blob#getBytes(long, int)} starting + * position is 1). + * @throws IOException + * if I/O error occurs. */ void seek(int position) throws IOException; - + /** - * Move current position in the Blob stream. Depending on the specified - * seek mode, position can be either positive or negative. - *- * Note, this method allows to move position in the Blob stream only - * forward. If you need to read data before the current position, new - * stream must be opened. - * - * @param position position in the stream, starting position is - * 0 (note, in {@link Blob#getBytes(long, int)} starting position is 1). - * - * @param seekMode mode of seek operation, one of {@link #SEEK_MODE_ABSOLUTE}, - * {@link #SEEK_MODE_RELATIVE} or {@link #SEEK_MODE_FROM_TAIL}. - * - * @throws IOException if I/O error occurs. + * Move current position in the Blob stream. Depending on the specified seek mode, position can be either + * positive or negative. + * + * @param position + * position in the stream, starting position is 0 (note, in {@link Blob#getBytes(long, int)} starting + * position is 1) + * @param seekMode + * mode of seek operation, one of {@link #SEEK_MODE_ABSOLUTE}, {@link #SEEK_MODE_RELATIVE} or + * {@link #SEEK_MODE_FROM_TAIL} + * @throws IOException + * if I/O error occurs */ void seek(int position, int seekMode) throws IOException; + } /** diff --git a/src/main/org/firebirdsql/jdbc/SQLStateConstants.java b/src/main/org/firebirdsql/jdbc/SQLStateConstants.java index f8a6e8374..d41e2bad7 100644 --- a/src/main/org/firebirdsql/jdbc/SQLStateConstants.java +++ b/src/main/org/firebirdsql/jdbc/SQLStateConstants.java @@ -138,6 +138,11 @@ public final class SQLStateConstants { // TODO Name suggests: invalid escape sequence => 22025 public static final String SQL_STATE_INVALID_ESCAPE_SEQ = SQL_STATE_SYNTAX_ERROR; + /** + * ISO-9075-2: locator exception + */ + public static final String SQL_STATE_LOCATOR_EXCEPTION = "0F000"; + private SQLStateConstants() { // no instances } diff --git a/src/test/org/firebirdsql/gds/VaxEncodingTest.java b/src/test/org/firebirdsql/gds/VaxEncodingTest.java index 3653f3b21..545b7c36d 100644 --- a/src/test/org/firebirdsql/gds/VaxEncodingTest.java +++ b/src/test/org/firebirdsql/gds/VaxEncodingTest.java @@ -20,6 +20,7 @@ import org.junit.jupiter.api.Test; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -180,4 +181,12 @@ void iscVaxInteger2_usesOffset() { assertEquals(21251, VaxEncoding.iscVaxInteger2(encoding, 1)); } + + @Test + void decodeVaxInteger2WithoutLength() throws Exception { + var in = new ByteArrayInputStream(new byte[] { 5, 3 }); + + assertEquals(773, VaxEncoding.decodeVaxInteger2WithoutLength(in)); + } + } \ No newline at end of file diff --git a/src/test/org/firebirdsql/gds/ng/BaseTestBlob.java b/src/test/org/firebirdsql/gds/ng/BaseTestBlob.java index d514b1c18..e19ba1855 100644 --- a/src/test/org/firebirdsql/gds/ng/BaseTestBlob.java +++ b/src/test/org/firebirdsql/gds/ng/BaseTestBlob.java @@ -35,10 +35,14 @@ import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; +import java.util.Arrays; -import static org.firebirdsql.common.FBTestProperties.*; +import static org.firebirdsql.common.FBTestProperties.getConnectionViaDriverManager; +import static org.firebirdsql.common.FBTestProperties.getDefaultTpb; +import static org.firebirdsql.jaybird.fb.constants.BpbItems.TypeValues.isc_bpb_type_segmented; import static org.firebirdsql.jaybird.fb.constants.BpbItems.TypeValues.isc_bpb_type_stream; import static org.firebirdsql.jaybird.fb.constants.BpbItems.isc_bpb_type; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; /** @@ -64,37 +68,6 @@ public abstract class BaseTestBlob { " blobvalue BLOB SUB_TYPE 0" + ")"; - protected static final String CREATE_PROC_FILL_BINARY_BLOB = - "CREATE PROCEDURE FILL_BINARY_BLOB \n" + - " (\n" + - " ID INTEGER,\n" + - " BASE_CONTENT VARCHAR(" + BASE_CONTENT_SIZE + ") CHARACTER SET OCTETS,\n" + - " REQUIRED_SIZE INTEGER\n" + - " ) \n" + - "AS \n" + - " DECLARE VARIABLE REMAINING INTEGER;\n" + - " DECLARE VARIABLE BASE_CONTENT_SIZE INTEGER;\n" + - " DECLARE VARIABLE TEMP_BLOB BLOB SUB_TYPE 0;\n" + - "BEGIN\n" + - " REMAINING = REQUIRED_SIZE;\n" + - " TEMP_BLOB = '';\n" + - " BASE_CONTENT_SIZE = OCTET_LENGTH(BASE_CONTENT);\n" + - " WHILE (REMAINING > 0) DO\n" + - " BEGIN\n" + - " TEMP_BLOB = TEMP_BLOB || \n" + - " CASE \n" + - " WHEN REMAINING > BASE_CONTENT_SIZE \n" + - " THEN BASE_CONTENT \n" + - " ELSE LEFT(BASE_CONTENT, REMAINING) \n" + - " END;\n" + - " REMAINING = REMAINING - BASE_CONTENT_SIZE;\n" + - " END\n" + - " INSERT INTO blob_table (id, blobvalue) VALUES (:ID, :TEMP_BLOB);\n" + - "END"; - - protected static final String EXECUTE_FILL_BINARY_BLOB = - "{call FILL_BINARY_BLOB(?, ?, ?)}"; - protected static final String CREATE_PROC_CHECK_BINARY_BLOB = "CREATE PROCEDURE CHECK_BINARY_BLOB \n" + " ( \n" + @@ -146,7 +119,6 @@ public abstract class BaseTestBlob { @RegisterExtension static final UsesDatabaseExtension.UsesDatabaseForAll usesDatabase = UsesDatabaseExtension.usesDatabaseForAll( CREATE_BLOB_TABLE, - CREATE_PROC_FILL_BINARY_BLOB, CREATE_PROC_CHECK_BINARY_BLOB); protected SimpleStatementListener listener; @@ -163,10 +135,14 @@ public final void setup() throws Exception { /** * Queries the blob table for the blob id of the record with the specified (row) id. - * @param testId Id of the row - * @param db database to use + * + * @param testId + * id of the row + * @param db + * database to use * @return Blob id - * @throws SQLException For errors executing the query + * @throws SQLException + * For errors executing the query */ @SuppressWarnings("SameParameterValue") protected long getBlobId(int testId, FbDatabase db) throws SQLException { @@ -214,45 +190,71 @@ protected byte[] generateBlobContent(byte[] baseContent, int requiredSize) { /** * Checks if the blob content is of the required size and matches the expected content based on baseContent. * - * @param blobContent Blob content - * @param baseContent Base content - * @param requiredSize Required size - * @return {@code true} content matches, {@code false} otherwise + * @param blobContent + * Blob content + * @param baseContent + * Base content + * @param requiredSize + * Required size */ - protected boolean validateBlobContent(byte[] blobContent, byte[] baseContent, int requiredSize) { - if (blobContent.length != requiredSize) return false; - for (int index = 0; index < blobContent.length; index++) { - if (blobContent[index] != baseContent[index % baseContent.length]) return false; + protected void assertBlobContent(byte[] blobContent, byte[] baseContent, int requiredSize) { + assertEquals(requiredSize, blobContent.length, "Unexpected length of blobContent"); + int pos = 0; + while (pos + baseContent.length <= blobContent.length) { + assertArrayEquals(baseContent, Arrays.copyOfRange(blobContent, pos, pos + baseContent.length), + "from pos = " + pos); + pos += baseContent.length; + } + if (pos < blobContent.length) { + assertArrayEquals(Arrays.copyOfRange(baseContent, 0, blobContent.length - pos), + Arrays.copyOfRange(blobContent, pos, blobContent.length), "from pos = " + pos); } - return true; } /** - * Populates a (segmented) blob using the FILL_BINARY_BLOB stored procedure + * Populates a segmented blob. * - * @param id ID of the record to be created in blob_table - * @param baseContent Base content - * @param requiredSize Required size + * @param id + * ID of the record to be created in blob_table + * @param baseContent + * Base content + * @param requiredSize + * Required size */ @SuppressWarnings("SameParameterValue") - protected void populateBlob(int id, byte[] baseContent, int requiredSize) throws SQLException { - try (Connection con = getConnectionViaDriverManager(); - CallableStatement cstmt = con.prepareCall(EXECUTE_FILL_BINARY_BLOB)) { - cstmt.setInt(1, id); - cstmt.setBytes(2, baseContent); - cstmt.setInt(3, requiredSize); - - cstmt.execute(); - } + protected void populateSegmentedBlob(int id, byte[] baseContent, int requiredSize) throws SQLException { + populateBlob(id, baseContent, requiredSize, false);; } /** - * Populates a stream blob for testing. + * Populates a stream blob. * - * @param testId Id of the record to be inserted. + * @param id + * id of the record to be created in blob_table + * @param baseContent + * Base content + * @param requiredSize + * Required size */ @SuppressWarnings("SameParameterValue") - protected void populateStreamBlob(int testId, byte[] baseContent, int requiredSize) throws SQLException { + protected void populateStreamBlob(int id, byte[] baseContent, int requiredSize) throws SQLException { + populateBlob(id, baseContent, requiredSize, true); + } + + /** + * Populates a blob. + * + * @param id + * id of the record to be created in blob_table + * @param baseContent + * Base content + * @param requiredSize + * Required size + * @param streamBlob + * {@code true} create as stream blob, {@code false} create as segmented blob + */ + protected void populateBlob(int id, byte[] baseContent, int requiredSize, boolean streamBlob) + throws SQLException { final byte[] testBytes = generateBlobContent(baseContent, requiredSize); try (FbDatabase db = createDatabaseConnection()) { @@ -262,9 +264,9 @@ protected void populateStreamBlob(int testId, byte[] baseContent, int requiredSi statement = db.createStatement(transaction); statement.addStatementListener(listener); - final BlobParameterBuffer blobParameterBuffer = db.createBlobParameterBuffer(); - blobParameterBuffer.addArgument(isc_bpb_type, isc_bpb_type_stream); - final FbBlob blob = db.createBlobForOutput(transaction, blobParameterBuffer); + BlobParameterBuffer blobParameterBuffer = db.createBlobParameterBuffer(); + blobParameterBuffer.addArgument(isc_bpb_type, streamBlob ? isc_bpb_type_stream : isc_bpb_type_segmented); + FbBlob blob = db.createBlobForOutput(transaction, blobParameterBuffer); blob.open(); int bytesWritten = 0; while (bytesWritten < testBytes.length) { @@ -276,9 +278,9 @@ protected void populateStreamBlob(int testId, byte[] baseContent, int requiredSi blob.close(); statement.prepare(INSERT_BLOB_TABLE); - final DatatypeCoder datatypeCoder = db.getDatatypeCoder(); + DatatypeCoder datatypeCoder = db.getDatatypeCoder(); RowValue rowValue = RowValue.of( - datatypeCoder.encodeInt(testId), + datatypeCoder.encodeInt(id), datatypeCoder.encodeLong(blob.getBlobId())); statement.execute(rowValue); statement.close(); diff --git a/src/test/org/firebirdsql/gds/ng/wire/version10/V10InputBlobTest.java b/src/test/org/firebirdsql/gds/ng/wire/version10/V10InputBlobTest.java index 325f9c6ff..d90577b1e 100644 --- a/src/test/org/firebirdsql/gds/ng/wire/version10/V10InputBlobTest.java +++ b/src/test/org/firebirdsql/gds/ng/wire/version10/V10InputBlobTest.java @@ -22,10 +22,13 @@ import org.firebirdsql.common.extension.RequireProtocolExtension; import org.firebirdsql.gds.ISCConstants; import org.firebirdsql.gds.ng.FbBlob; +import org.firebirdsql.gds.ng.FbDatabase; import org.firebirdsql.gds.ng.wire.FbWireDatabase; import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.ByteArrayOutputStream; import java.sql.SQLException; @@ -67,13 +70,14 @@ protected V10CommonConnectionInfo commonConnectionInfo() { /** * Tests retrieval of a blob (what goes in is what comes out). */ - @Test - public void testBlobRetrieval() throws Exception { + @ParameterizedTest + @ValueSource(booleans = { true, false }) + public void testBlobRetrieval(boolean useStreamBlobs) throws Exception { final int testId = 1; final byte[] baseContent = generateBaseContent(); // Use sufficiently large value so that multiple segments are used final int requiredSize = 4 * Short.MAX_VALUE; - populateBlob(testId, baseContent, requiredSize); + populateBlob(testId, baseContent, requiredSize, useStreamBlobs); try (FbWireDatabase db = createDatabaseConnection()) { try { @@ -88,8 +92,36 @@ public void testBlobRetrieval() throws Exception { blob.close(); statement.close(); byte[] result = bos.toByteArray(); - assertEquals(requiredSize, result.length, "Unexpected length read from blob"); - assertTrue(validateBlobContent(result, baseContent, requiredSize), "Unexpected blob content"); + assertBlobContent(result, baseContent, requiredSize); + } finally { + if (transaction != null) transaction.commit(); + } + } + } + + /** + * Tests retrieval of a blob (what goes in is what comes out). + */ + @ParameterizedTest + @ValueSource(booleans = { true, false }) + public void testBlobGet(boolean useStreamBlobs) throws Exception { + final int testId = 1; + final byte[] baseContent = generateBaseContent(); + // Use sufficiently large value so that multiple roundtrips are used + final int requiredSize = 4 * Short.MAX_VALUE; + populateBlob(testId, baseContent, requiredSize, useStreamBlobs); + + try (FbDatabase db = createDatabaseConnection()) { + try { + long blobId = getBlobId(testId, db); + + FbBlob blob = db.createBlobForInput(transaction, blobId); + blob.open(); + byte[] result = new byte[requiredSize]; + blob.get(result, 0, requiredSize); + blob.close(); + statement.close(); + assertBlobContent(result, baseContent, requiredSize); } finally { if (transaction != null) transaction.commit(); } @@ -105,7 +137,7 @@ public void testBlobSeek_segmented() throws Exception { final byte[] baseContent = generateBaseContent(); // Use sufficiently large value so that multiple segments are used final int requiredSize = 4 * Short.MAX_VALUE; - populateBlob(testId, baseContent, requiredSize); + populateSegmentedBlob(testId, baseContent, requiredSize); try (FbWireDatabase db = createDatabaseConnection()) { try { @@ -169,7 +201,7 @@ public void testReopen() throws Exception { final int testId = 1; final byte[] baseContent = generateBaseContent(); final int requiredSize = 256; - populateBlob(testId, baseContent, requiredSize); + populateSegmentedBlob(testId, baseContent, requiredSize); try (FbWireDatabase db = createDatabaseConnection()) { try { @@ -192,8 +224,7 @@ public void testReopen() throws Exception { statement.close(); byte[] result = bos.toByteArray(); - assertEquals(requiredSize, result.length, "Unexpected length read from blob"); - assertTrue(validateBlobContent(result, baseContent, requiredSize), "Unexpected blob content"); + assertBlobContent(result, baseContent, requiredSize); } finally { if (transaction != null) transaction.commit(); } @@ -208,7 +239,7 @@ public void testDoubleOpen() throws Exception { final int testId = 1; final byte[] baseContent = generateBaseContent(); final int requiredSize = 256; - populateBlob(testId, baseContent, requiredSize); + populateSegmentedBlob(testId, baseContent, requiredSize); try (FbWireDatabase db = createDatabaseConnection()) { try { diff --git a/src/test/org/firebirdsql/jdbc/FBBlobInputStreamTest.java b/src/test/org/firebirdsql/jdbc/FBBlobInputStreamTest.java index 1811a563a..ae201c3df 100644 --- a/src/test/org/firebirdsql/jdbc/FBBlobInputStreamTest.java +++ b/src/test/org/firebirdsql/jdbc/FBBlobInputStreamTest.java @@ -20,19 +20,27 @@ import org.firebirdsql.common.DataGenerator; import org.firebirdsql.common.extension.UsesDatabaseExtension; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; +import org.firebirdsql.jaybird.props.PropertyNames; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.EOFException; import java.sql.*; +import java.util.Arrays; +import java.util.Properties; -import static org.firebirdsql.common.FBTestProperties.getConnectionViaDriverManager; +import static org.firebirdsql.common.FBTestProperties.getDefaultPropertiesForConnection; +import static org.firebirdsql.common.FBTestProperties.getUrl; import static org.firebirdsql.common.matchers.SQLExceptionMatchers.message; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.lessThan; import static org.junit.jupiter.api.Assertions.*; /** @@ -57,27 +65,20 @@ class FBBlobInputStreamTest { private static final String SELECT_BLOB = "SELECT bin_data FROM test_blob WHERE id = ?"; - private static Connection connection; - - @BeforeAll - static void setupAll() throws Exception{ - connection = getConnectionViaDriverManager(); - } + private Connection connection; @BeforeEach void setUp() throws Exception { - connection.setAutoCommit(true); + connection = getConnection(true); try (Statement stmt = connection.createStatement()) { stmt.execute("delete from test_blob"); } } - @AfterAll - static void tearDownAll() throws Exception { - try { + @AfterEach + void tearDown() throws Exception { + if (connection != null) { connection.close(); - } finally { - connection = null; } } @@ -183,9 +184,14 @@ void testAvailable_singleReadClosed_returns0() throws Exception { } } - @Test - void testRead_byteArr_moreThanAvailable_returnsAvailable() throws Exception { - final byte[] bytes = DataGenerator.createRandomBytes(128 * 1024); + @ParameterizedTest + @ValueSource(booleans = { true, false }) + void testRead_byteArr_moreThanAvailable_returnsAvailable(boolean useStreamBlobs) throws Exception { + if (!useStreamBlobs) { + connection.close(); + connection = getConnection(false); + } + byte[] bytes = DataGenerator.createRandomBytes(128 * 1024); populateBlob(1, bytes); try (PreparedStatement pstmt = connection.prepareStatement(SELECT_BLOB)) { @@ -195,16 +201,71 @@ void testRead_byteArr_moreThanAvailable_returnsAvailable() throws Exception { Blob blob = rs.getBlob(1); FBBlobInputStream is = (FBBlobInputStream) blob.getBinaryStream(); + assertEquals(bytes[0] & 0xFF, is.read(), "Unexpected first byte"); + int available = is.available(); + int blobBufferSize = getConnectionBlobBufferSize(); + assertThat("Value of available() should be greater than 0 but less than blobBufferSize", + available, allOf(greaterThan(0), lessThan(blobBufferSize))); + + // For small buffer size, we only read from the internal buffer, and don't request more from the server + int smallBufferSize = blobBufferSize / 2; + byte[] buffer = new byte[smallBufferSize]; + int bytesRead = is.read(buffer); + + int expectedSize = Math.min(smallBufferSize, available); + assertEquals(expectedSize, bytesRead, + "Expected to read number of bytes previously returned by available or smallBufferSize"); + assertArrayEquals(Arrays.copyOfRange(bytes, 1, expectedSize + 1), + expectedSize == buffer.length ? buffer : Arrays.copyOf(bytes, expectedSize), + "Unexpected read bytes"); + + int readOffset = expectedSize + 1; + if (is.available() == 0) { + assertEquals(bytes[++readOffset], is.read(), "Unexpected byte at offset " + (readOffset - 1)); + } + + buffer = new byte[is.available() + smallBufferSize]; + // If after reading available, we still have smallBufferSize remaining, we read the remaining bytes + // from server + bytesRead = is.read(buffer); + + assertEquals(buffer.length, bytesRead, "Expected to read number of bytes equal to the buffer size"); + assertArrayEquals(Arrays.copyOfRange(bytes, readOffset, readOffset + buffer.length), buffer, + "Unexpected read bytes"); + } + } + } + + @ParameterizedTest + @ValueSource(booleans = { true, false }) + void testRead_byteArr_moreThanAvailable_returnsAll(boolean useStreamBlobs) throws Exception { + if (!useStreamBlobs) { + connection.close(); + connection = getConnection(false); + } + final int testBlobSize = 128 * 1024; + final byte[] bytes = DataGenerator.createRandomBytes(testBlobSize); + populateBlob(1, bytes); + + try (PreparedStatement pstmt = connection.prepareStatement(SELECT_BLOB)) { + pstmt.setInt(1, 1); + try (ResultSet rs = pstmt.executeQuery()) { + assertTrue(rs.next(), "Expected a row"); + Blob blob = rs.getBlob(1); + FBBlobInputStream is = (FBBlobInputStream) blob.getBinaryStream(); + int blobBufferSize = getConnectionBlobBufferSize(); + assertEquals(bytes[0] & 0xFF, is.read(), "Unexpected first byte"); final int available = is.available(); - assertTrue(available > 0, "Value of available() should be larger than 0"); - assertTrue(available < 128 * 1024 - 1, "Value of available() should be smaller than 128 * 1024 - 1"); + assertThat("Value of available() should be greater than 0 but less than blobBufferSize", + available, allOf(greaterThan(0), lessThan(blobBufferSize))); - byte[] buffer = new byte[128 * 1024]; - int bytesRead = is.read(buffer, 1, 128 * 1024 - 1); + byte[] buffer = new byte[testBlobSize]; + buffer[0] = bytes[0]; - assertEquals(available, bytesRead, - "Expected to read the number of bytes previously returned by available"); + assertEquals(testBlobSize - 1, is.read(buffer, 1, testBlobSize - 1), + "Expected remaining bytes to be read"); + assertArrayEquals(bytes, buffer, "Expected identical bytes to be returned"); } } } @@ -242,7 +303,7 @@ void testRead_byteArrNull_throwsNPE() throws Exception { Blob blob = rs.getBlob(1); FBBlobInputStream is = (FBBlobInputStream) blob.getBinaryStream(); - //noinspection ResultOfMethodCallIgnored + //noinspection DataFlowIssue assertThrows(NullPointerException.class, () -> is.read(null, 0, 1)); } } @@ -261,7 +322,6 @@ void testRead_negativeOffset_throwsIOBE() throws Exception { FBBlobInputStream is = (FBBlobInputStream) blob.getBinaryStream(); byte[] buffer = new byte[5]; - //noinspection ResultOfMethodCallIgnored assertThrows(IndexOutOfBoundsException.class, () -> is.read(buffer, -1, 1)); } } @@ -280,7 +340,6 @@ void testRead_negativeLength_throwsIOBE() throws Exception { FBBlobInputStream is = (FBBlobInputStream) blob.getBinaryStream(); byte[] buffer = new byte[5]; - //noinspection ResultOfMethodCallIgnored assertThrows(IndexOutOfBoundsException.class, () -> is.read(buffer, 0, -1)); } } @@ -299,7 +358,6 @@ void testRead_offsetBeyondLength_throwsIOBE() throws Exception { FBBlobInputStream is = (FBBlobInputStream) blob.getBinaryStream(); byte[] buffer = new byte[5]; - //noinspection ResultOfMethodCallIgnored assertThrows(IndexOutOfBoundsException.class, () -> is.read(buffer, 5, 1)); } } @@ -318,7 +376,6 @@ void testRead_offsetAndLengthBeyondLength_throwsIOBE() throws Exception { FBBlobInputStream is = (FBBlobInputStream) blob.getBinaryStream(); byte[] buffer = new byte[5]; - //noinspection ResultOfMethodCallIgnored assertThrows(IndexOutOfBoundsException.class, () -> is.read(buffer, 0, 6)); } } @@ -326,7 +383,8 @@ void testRead_offsetAndLengthBeyondLength_throwsIOBE() throws Exception { @Test void testReadFully_byteArr_moreThanAvailable_returnsAllRead() throws Exception { - final byte[] bytes = DataGenerator.createRandomBytes(128 * 1024); + final int testBlobSize = 128 * 1024; + final byte[] bytes = DataGenerator.createRandomBytes(testBlobSize); populateBlob(1, bytes); try (PreparedStatement pstmt = connection.prepareStatement(SELECT_BLOB)) { @@ -336,15 +394,16 @@ void testReadFully_byteArr_moreThanAvailable_returnsAllRead() throws Exception { Blob blob = rs.getBlob(1); FBBlobInputStream is = (FBBlobInputStream) blob.getBinaryStream(); - byte[] buffer = new byte[128 * 1024]; + byte[] buffer = new byte[testBlobSize]; int firstValue = is.read(); assertEquals(bytes[0] & 0xFF, firstValue, "Unexpected first byte"); buffer[0] = (byte) firstValue; final int available = is.available(); - assertTrue(available < 128 * 1024 - 1, "Value of available() should be smaller than 128 * 1024 - 1"); + assertThat("Value of available() should be smaller than 128 * 1024 - 1", + available, lessThan(testBlobSize - 1)); - is.readFully(buffer, 1, 128 * 1024 - 1); + is.readFully(buffer, 1, testBlobSize - 1); assertArrayEquals(bytes, buffer, "Full blob should have been read"); } @@ -485,4 +544,17 @@ private void populateBlob(int id, byte[] bytes) throws SQLException { insert.execute(); } } + + private static Connection getConnection(boolean useStreamBlobs) throws SQLException { + Properties props = getDefaultPropertiesForConnection(); + props.setProperty(PropertyNames.useStreamBlobs, String.valueOf(useStreamBlobs)); + return DriverManager.getConnection(getUrl(), getDefaultPropertiesForConnection()); + } + + private int getConnectionBlobBufferSize() throws SQLException { + return connection.unwrap(FirebirdConnection.class) + .getFbDatabase() + .getConnectionProperties() + .getBlobBufferSize(); + } }