From 04685fe69806bd3314d571667502138f982838b2 Mon Sep 17 00:00:00 2001 From: Zakelly Date: Mon, 14 Oct 2024 15:56:02 +0800 Subject: [PATCH] [FLINK-36526][state/forst] Optimize the overhead of writing with direct buffer --- .../apache/flink/core/memory/MemoryUtils.java | 2 +- .../ByteBufferWritableFSDataOutputStream.java | 27 +++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemoryUtils.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemoryUtils.java index bb8904a39f294..2155dc12159bc 100644 --- a/flink-core/src/main/java/org/apache/flink/core/memory/MemoryUtils.java +++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemoryUtils.java @@ -154,7 +154,7 @@ static ByteBuffer wrapUnsafeMemoryWithByteBuffer(long address, int size) { * @param buffer {@link ByteBuffer} which wraps the native memory address to get * @return native memory address wrapped by the given {@link ByteBuffer} */ - static long getByteBufferAddress(ByteBuffer buffer) { + public static long getByteBufferAddress(ByteBuffer buffer) { Preconditions.checkNotNull(buffer, "buffer is null"); Preconditions.checkArgument( buffer.isDirect(), "Can't get address of a non-direct ByteBuffer."); diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ByteBufferWritableFSDataOutputStream.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ByteBufferWritableFSDataOutputStream.java index 5d8fc2cd86d00..aa7b975a02b8e 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ByteBufferWritableFSDataOutputStream.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ByteBufferWritableFSDataOutputStream.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.Experimental; import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.memory.MemoryUtils; import java.io.IOException; import java.nio.ByteBuffer; @@ -34,6 +35,16 @@ @Experimental public class ByteBufferWritableFSDataOutputStream extends FSDataOutputStream { + /** The unsafe handle for transparent memory copied (heap / off-heap). */ + @SuppressWarnings("restriction") + private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE; + + /** The beginning of the byte array contents, relative to the byte array object. */ + @SuppressWarnings("restriction") + private static final long BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); + + private static final int SEGMENT_BUFFER_SIZE = 16 * 1024; + private final FSDataOutputStream originalOutputStream; public ByteBufferWritableFSDataOutputStream(FSDataOutputStream originalOutputStream) { @@ -58,6 +69,22 @@ public void write(ByteBuffer bb) throws IOException { } if (bb.hasArray()) { write(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining()); + } else if (bb.isDirect()) { + int len = bb.remaining(); + int segment = Math.min(len, SEGMENT_BUFFER_SIZE); + byte[] bytes = new byte[segment]; + for (int i = 0; i < len; ) { + int copy = Math.min(segment, bb.remaining()); + UNSAFE.copyMemory( + null, + MemoryUtils.getByteBufferAddress(bb) + bb.position(), + bytes, + BYTE_ARRAY_BASE_OFFSET, + copy); + originalOutputStream.write(bytes, 0, copy); + bb.position(bb.position() + copy); + i += copy; + } } else { int len = bb.remaining(); for (int i = 0; i < len; i++) {