From 4421bb12fc38969990d61de44ca3a7599be82687 Mon Sep 17 00:00:00 2001 From: Ostrzyciel Date: Wed, 23 Oct 2024 00:09:59 +0200 Subject: [PATCH] Encoder: Maybe speed up stream row buffer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit I thought that ArrayBuffer in Scala is smart enough to NOT allocate things if it still has existing capacity, but somehow in the profiler I'm reading a lot of allocations there. Let's try a custom implementation of the row buffer – as simple as possible. If this works, I will finish the docstrings. If not, I will revert it. --- .../eu/ostrzyciel/jelly/core/FastBuffer.java | 45 +++++++++++++++++++ .../eu/ostrzyciel/jelly/core/NodeEncoder.java | 5 +-- .../ostrzyciel/jelly/core/ProtoEncoder.scala | 12 +++-- .../jelly/core/NodeEncoderSpec.scala | 29 ++++++------ 4 files changed, 66 insertions(+), 25 deletions(-) create mode 100644 core/src/main/java/eu/ostrzyciel/jelly/core/FastBuffer.java diff --git a/core/src/main/java/eu/ostrzyciel/jelly/core/FastBuffer.java b/core/src/main/java/eu/ostrzyciel/jelly/core/FastBuffer.java new file mode 100644 index 00000000..ec4065e5 --- /dev/null +++ b/core/src/main/java/eu/ostrzyciel/jelly/core/FastBuffer.java @@ -0,0 +1,45 @@ +package eu.ostrzyciel.jelly.core; + +class FastBuffer { + private static final int CAPACITY_INCREASE = 16; + + private T[] buffer; + private int size; + private int capacity; + + public FastBuffer(int capacity) { + this.capacity = capacity; + this.size = 0; + this.buffer = (T[]) new Object[capacity]; + } + + public FastBuffer append(T element) { + if (size == capacity) { + capacity += CAPACITY_INCREASE; + T[] newBuffer = (T[]) new Object[capacity]; + System.arraycopy(buffer, 0, newBuffer, 0, size); + buffer = newBuffer; + } + buffer[size++] = element; + return this; + } + + public T get(int index) { + return buffer[index]; + } + + public FastBuffer clear() { + size = 0; + return this; + } + + public T[] getBufferCopy() { + T[] copy = (T[]) new Object[size]; + System.arraycopy(buffer, 0, copy, 0, size); + return copy; + } + + public int size() { + return size; + } +} diff --git a/core/src/main/java/eu/ostrzyciel/jelly/core/NodeEncoder.java b/core/src/main/java/eu/ostrzyciel/jelly/core/NodeEncoder.java index 6e54b801..a212500d 100644 --- a/core/src/main/java/eu/ostrzyciel/jelly/core/NodeEncoder.java +++ b/core/src/main/java/eu/ostrzyciel/jelly/core/NodeEncoder.java @@ -1,7 +1,6 @@ package eu.ostrzyciel.jelly.core; import eu.ostrzyciel.jelly.core.proto.v1.*; -import scala.collection.mutable.ArrayBuffer; import java.util.LinkedHashMap; import java.util.function.Function; @@ -95,7 +94,7 @@ public NodeEncoder(RdfStreamOptions opt, int nodeCacheSize, int dependentNodeCac * @return The encoded literal */ public UniversalTerm encodeDtLiteral( - TNode key, String lex, String datatypeName, ArrayBuffer rowsBuffer + TNode key, String lex, String datatypeName, FastBuffer rowsBuffer ) { var cachedNode = dependentNodeCache.computeIfAbsent(key, k -> new DependentNode()); // Check if the value is still valid @@ -130,7 +129,7 @@ public UniversalTerm encodeDtLiteral( * @param rowsBuffer The buffer to which the new name and prefix lookup entries should be appended * @return The encoded IRI */ - public UniversalTerm encodeIri(String iri, ArrayBuffer rowsBuffer) { + public UniversalTerm encodeIri(String iri, FastBuffer rowsBuffer) { var cachedNode = dependentNodeCache.computeIfAbsent(iri, k -> new DependentNode()); // Check if the value is still valid if (cachedNode.encoded != null && diff --git a/core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoEncoder.scala b/core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoEncoder.scala index 94a3e63c..ac156c97 100644 --- a/core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoEncoder.scala +++ b/core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoEncoder.scala @@ -2,8 +2,6 @@ package eu.ostrzyciel.jelly.core import eu.ostrzyciel.jelly.core.proto.v1.* -import scala.collection.mutable.{ArrayBuffer, ListBuffer} - object ProtoEncoder: private val graphEnd = Seq(RdfStreamRow(RdfStreamRow.Row.GraphEnd(RdfGraphEnd.defaultInstance))) private val defaultGraphStart = RdfStreamRow(RdfStreamRow.Row.GraphStart( @@ -33,7 +31,7 @@ abstract class ProtoEncoder[TNode, -TTriple, -TQuad, -TQuoted](val options: RdfS val mainRow = RdfStreamRow(RdfStreamRow.Row.Triple( tripleToProto(triple) )) - extraRowsBuffer.append(mainRow).toSeq + extraRowsBuffer.append(mainRow).getBufferCopy /** * Add an RDF quad statement to the stream. @@ -45,7 +43,7 @@ abstract class ProtoEncoder[TNode, -TTriple, -TQuad, -TQuoted](val options: RdfS val mainRow = RdfStreamRow(RdfStreamRow.Row.Quad( quadToProto(quad) )) - extraRowsBuffer.append(mainRow).toSeq + extraRowsBuffer.append(mainRow).getBufferCopy /** * Signal the start of a new (named) delimited graph in a GRAPHS stream. @@ -62,7 +60,7 @@ abstract class ProtoEncoder[TNode, -TTriple, -TQuad, -TQuoted](val options: RdfS val mainRow = RdfStreamRow(RdfStreamRow.Row.GraphStart( RdfGraphStart(graphNode) )) - extraRowsBuffer.append(mainRow).toSeq + extraRowsBuffer.append(mainRow).getBufferCopy /** * Signal the start of the default delimited graph in a GRAPHS stream. @@ -70,7 +68,7 @@ abstract class ProtoEncoder[TNode, -TTriple, -TQuad, -TQuoted](val options: RdfS */ final def startDefaultGraph(): Iterable[RdfStreamRow] = handleHeader() - extraRowsBuffer.append(defaultGraphStart).toSeq + extraRowsBuffer.append(defaultGraphStart).getBufferCopy /** * Signal the end of a delimited graph in a GRAPHS stream. @@ -149,7 +147,7 @@ abstract class ProtoEncoder[TNode, -TTriple, -TQuad, -TQuoted](val options: RdfS // ************************************* // We assume by default that 32 rows should be enough to encode one statement. // If not, the buffer will grow. - private val extraRowsBuffer = new ArrayBuffer[RdfStreamRow](32) + private val extraRowsBuffer = new FastBuffer[RdfStreamRow](32) // Make the node cache size between 256 and 1024, depending on the user's maxNameTableSize. private val nodeCacheSize = Math.max(Math.min(options.maxNameTableSize, 1024), 256) private val nodeEncoder = new NodeEncoder[TNode](options, nodeCacheSize, nodeCacheSize) diff --git a/core/src/test/scala/eu/ostrzyciel/jelly/core/NodeEncoderSpec.scala b/core/src/test/scala/eu/ostrzyciel/jelly/core/NodeEncoderSpec.scala index 706f4e83..cfb6a5db 100644 --- a/core/src/test/scala/eu/ostrzyciel/jelly/core/NodeEncoderSpec.scala +++ b/core/src/test/scala/eu/ostrzyciel/jelly/core/NodeEncoderSpec.scala @@ -6,7 +6,6 @@ import org.scalatest.Inspectors import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec -import scala.collection.mutable.ArrayBuffer import scala.util.Random class NodeEncoderSpec extends AnyWordSpec, Inspectors, Matchers: @@ -16,8 +15,8 @@ class NodeEncoderSpec extends AnyWordSpec, Inspectors, Matchers: maxDatatypeTableSize = 8, ) - private def getEncoder(prefixTableSize: Int = 8): (NodeEncoder[Mrl.Node], ArrayBuffer[RdfStreamRow]) = - val buffer = new ArrayBuffer[RdfStreamRow]() + private def getEncoder(prefixTableSize: Int = 8): (NodeEncoder[Mrl.Node], FastBuffer[RdfStreamRow]) = + val buffer = new FastBuffer[RdfStreamRow](32) (NodeEncoder[Mrl.Node](smallOptions(prefixTableSize), 16, 16), buffer) "A NodeEncoder" when { @@ -31,8 +30,8 @@ class NodeEncoderSpec extends AnyWordSpec, Inspectors, Matchers: node.literal.lex should be ("v1") node.literal.literalKind.datatype should be (1) buffer.size should be (1) - buffer.head.row.isDatatype should be (true) - val dtEntry = buffer.head.row.datatype + buffer.get(0).row.isDatatype should be (true) + val dtEntry = buffer.get(0).row.datatype dtEntry.value should be ("dt1") dtEntry.id should be (0) } @@ -64,7 +63,7 @@ class NodeEncoderSpec extends AnyWordSpec, Inspectors, Matchers: node2.literal.literalKind.datatype should be (2) buffer.size should be (4) - buffer.map(_.row.datatype) should contain only ( + buffer.getBufferCopy.map(_.row.datatype) should contain only ( RdfDatatypeEntry(0, "dt1"), RdfDatatypeEntry(0, "dt2"), RdfDatatypeEntry(0, "dt3"), @@ -112,7 +111,7 @@ class NodeEncoderSpec extends AnyWordSpec, Inspectors, Matchers: val expectedIds = Array.from( Iterable.fill(8)(0) ++ Seq(1) ++ Iterable.fill(3)(0) ++ Seq(1) ++ Iterable.fill(3)(0) ) - for (r, i) <- buffer.zipWithIndex do + for (r, i) <- buffer.getBufferCopy.zipWithIndex do val dt = r.row.datatype dt.id should be (expectedIds(i)) dt.value should be (s"dt${i + 1}") @@ -192,10 +191,10 @@ class NodeEncoderSpec extends AnyWordSpec, Inspectors, Matchers: iri.prefixId should be (1) buffer.size should be (2) - buffer should contain (RdfStreamRow(RdfStreamRow.Row.Prefix( + buffer.getBufferCopy should contain (RdfStreamRow(RdfStreamRow.Row.Prefix( RdfPrefixEntry(id = 0, value = "https://test.org/") ))) - buffer should contain (RdfStreamRow(RdfStreamRow.Row.Name( + buffer.getBufferCopy should contain (RdfStreamRow(RdfStreamRow.Row.Name( RdfNameEntry(id = 0, value = "Cake") ))) } @@ -208,10 +207,10 @@ class NodeEncoderSpec extends AnyWordSpec, Inspectors, Matchers: // an empty name entry still has to be allocated buffer.size should be (2) - buffer should contain (RdfStreamRow(RdfStreamRow.Row.Prefix( + buffer.getBufferCopy should contain (RdfStreamRow(RdfStreamRow.Row.Prefix( RdfPrefixEntry(id = 0, value = "https://test.org/test/") ))) - buffer should contain(RdfStreamRow(RdfStreamRow.Row.Name( + buffer.getBufferCopy should contain(RdfStreamRow(RdfStreamRow.Row.Name( RdfNameEntry(id = 0, value = "") ))) } @@ -224,10 +223,10 @@ class NodeEncoderSpec extends AnyWordSpec, Inspectors, Matchers: // in the mode with the prefix table enabled, an empty prefix entry still has to be allocated buffer.size should be (2) - buffer should contain(RdfStreamRow(RdfStreamRow.Row.Prefix( + buffer.getBufferCopy should contain(RdfStreamRow(RdfStreamRow.Row.Prefix( RdfPrefixEntry(id = 0, value = "") ))) - buffer should contain (RdfStreamRow(RdfStreamRow.Row.Name( + buffer.getBufferCopy should contain (RdfStreamRow(RdfStreamRow.Row.Name( RdfNameEntry(id = 0, value = "testTestTest") ))) } @@ -240,7 +239,7 @@ class NodeEncoderSpec extends AnyWordSpec, Inspectors, Matchers: // in the no prefix mode, there must be no prefix entries buffer.size should be (1) - buffer should contain (RdfStreamRow(RdfStreamRow.Row.Name( + buffer.getBufferCopy should contain (RdfStreamRow(RdfStreamRow.Row.Name( RdfNameEntry(id = 0, value = "https://test.org/Cake") ))) } @@ -292,7 +291,7 @@ class NodeEncoderSpec extends AnyWordSpec, Inspectors, Matchers: ) buffer.size should be (expectedBuffer.size) - for ((isPrefix, eId, eVal), row) <- expectedBuffer.zip(buffer) do + for ((isPrefix, eId, eVal), row) <- expectedBuffer.zip(buffer.getBufferCopy) do if isPrefix then row.row.isPrefix should be (true) val prefix = row.row.prefix