Skip to content

Commit

Permalink
[FLINK-36526][state/forst] Optimize the overhead of writing with dire…
Browse files Browse the repository at this point in the history
…ct buffer
  • Loading branch information
Zakelly committed Oct 14, 2024
1 parent d6cb8b3 commit 04685fe
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ static ByteBuffer wrapUnsafeMemoryWithByteBuffer(long address, int size) {
* @param buffer {@link ByteBuffer} which wraps the native memory address to get
* @return native memory address wrapped by the given {@link ByteBuffer}
*/
static long getByteBufferAddress(ByteBuffer buffer) {
public static long getByteBufferAddress(ByteBuffer buffer) {
Preconditions.checkNotNull(buffer, "buffer is null");
Preconditions.checkArgument(
buffer.isDirect(), "Can't get address of a non-direct ByteBuffer.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.Experimental;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.memory.MemoryUtils;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -34,6 +35,16 @@
@Experimental
public class ByteBufferWritableFSDataOutputStream extends FSDataOutputStream {

/** The unsafe handle for transparent memory copied (heap / off-heap). */
@SuppressWarnings("restriction")
private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;

/** The beginning of the byte array contents, relative to the byte array object. */
@SuppressWarnings("restriction")
private static final long BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);

private static final int SEGMENT_BUFFER_SIZE = 16 * 1024;

private final FSDataOutputStream originalOutputStream;

public ByteBufferWritableFSDataOutputStream(FSDataOutputStream originalOutputStream) {
Expand All @@ -58,6 +69,22 @@ public void write(ByteBuffer bb) throws IOException {
}
if (bb.hasArray()) {
write(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining());
} else if (bb.isDirect()) {
int len = bb.remaining();
int segment = Math.min(len, SEGMENT_BUFFER_SIZE);
byte[] bytes = new byte[segment];
for (int i = 0; i < len; ) {
int copy = Math.min(segment, bb.remaining());
UNSAFE.copyMemory(
null,
MemoryUtils.getByteBufferAddress(bb) + bb.position(),
bytes,
BYTE_ARRAY_BASE_OFFSET,
copy);
originalOutputStream.write(bytes, 0, copy);
bb.position(bb.position() + copy);
i += copy;
}
} else {
int len = bb.remaining();
for (int i = 0; i < len; i++) {
Expand Down

0 comments on commit 04685fe

Please sign in to comment.