Skip to content

Commit

Permalink
Encoder: Maybe speed up stream row buffer
Browse files Browse the repository at this point in the history
I thought that ArrayBuffer in Scala is smart enough to NOT allocate things if it still has existing capacity, but somehow in the profiler I'm reading a lot of allocations there. Let's try a custom implementation of the row buffer – as simple as possible.

If this works, I will finish the docstrings. If not, I will revert it.
  • Loading branch information
Ostrzyciel committed Oct 22, 2024
1 parent c632da4 commit 4421bb1
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 25 deletions.
45 changes: 45 additions & 0 deletions core/src/main/java/eu/ostrzyciel/jelly/core/FastBuffer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package eu.ostrzyciel.jelly.core;

class FastBuffer<T> {
private static final int CAPACITY_INCREASE = 16;

private T[] buffer;
private int size;
private int capacity;

public FastBuffer(int capacity) {
this.capacity = capacity;
this.size = 0;
this.buffer = (T[]) new Object[capacity];
}

public FastBuffer<T> append(T element) {
if (size == capacity) {
capacity += CAPACITY_INCREASE;
T[] newBuffer = (T[]) new Object[capacity];
System.arraycopy(buffer, 0, newBuffer, 0, size);
buffer = newBuffer;
}
buffer[size++] = element;
return this;
}

public T get(int index) {
return buffer[index];
}

public FastBuffer<T> clear() {
size = 0;
return this;
}

public T[] getBufferCopy() {
T[] copy = (T[]) new Object[size];
System.arraycopy(buffer, 0, copy, 0, size);
return copy;
}

public int size() {
return size;
}
}
5 changes: 2 additions & 3 deletions core/src/main/java/eu/ostrzyciel/jelly/core/NodeEncoder.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package eu.ostrzyciel.jelly.core;

import eu.ostrzyciel.jelly.core.proto.v1.*;
import scala.collection.mutable.ArrayBuffer;

import java.util.LinkedHashMap;
import java.util.function.Function;
Expand Down Expand Up @@ -95,7 +94,7 @@ public NodeEncoder(RdfStreamOptions opt, int nodeCacheSize, int dependentNodeCac
* @return The encoded literal
*/
public UniversalTerm encodeDtLiteral(
TNode key, String lex, String datatypeName, ArrayBuffer<RdfStreamRow> rowsBuffer
TNode key, String lex, String datatypeName, FastBuffer<RdfStreamRow> rowsBuffer
) {
var cachedNode = dependentNodeCache.computeIfAbsent(key, k -> new DependentNode());
// Check if the value is still valid
Expand Down Expand Up @@ -130,7 +129,7 @@ public UniversalTerm encodeDtLiteral(
* @param rowsBuffer The buffer to which the new name and prefix lookup entries should be appended
* @return The encoded IRI
*/
public UniversalTerm encodeIri(String iri, ArrayBuffer<RdfStreamRow> rowsBuffer) {
public UniversalTerm encodeIri(String iri, FastBuffer<RdfStreamRow> rowsBuffer) {
var cachedNode = dependentNodeCache.computeIfAbsent(iri, k -> new DependentNode());
// Check if the value is still valid
if (cachedNode.encoded != null &&
Expand Down
12 changes: 5 additions & 7 deletions core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoEncoder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package eu.ostrzyciel.jelly.core

import eu.ostrzyciel.jelly.core.proto.v1.*

import scala.collection.mutable.{ArrayBuffer, ListBuffer}

object ProtoEncoder:
private val graphEnd = Seq(RdfStreamRow(RdfStreamRow.Row.GraphEnd(RdfGraphEnd.defaultInstance)))
private val defaultGraphStart = RdfStreamRow(RdfStreamRow.Row.GraphStart(
Expand Down Expand Up @@ -33,7 +31,7 @@ abstract class ProtoEncoder[TNode, -TTriple, -TQuad, -TQuoted](val options: RdfS
val mainRow = RdfStreamRow(RdfStreamRow.Row.Triple(
tripleToProto(triple)
))
extraRowsBuffer.append(mainRow).toSeq
extraRowsBuffer.append(mainRow).getBufferCopy

/**
* Add an RDF quad statement to the stream.
Expand All @@ -45,7 +43,7 @@ abstract class ProtoEncoder[TNode, -TTriple, -TQuad, -TQuoted](val options: RdfS
val mainRow = RdfStreamRow(RdfStreamRow.Row.Quad(
quadToProto(quad)
))
extraRowsBuffer.append(mainRow).toSeq
extraRowsBuffer.append(mainRow).getBufferCopy

/**
* Signal the start of a new (named) delimited graph in a GRAPHS stream.
Expand All @@ -62,15 +60,15 @@ abstract class ProtoEncoder[TNode, -TTriple, -TQuad, -TQuoted](val options: RdfS
val mainRow = RdfStreamRow(RdfStreamRow.Row.GraphStart(
RdfGraphStart(graphNode)
))
extraRowsBuffer.append(mainRow).toSeq
extraRowsBuffer.append(mainRow).getBufferCopy

/**
* Signal the start of the default delimited graph in a GRAPHS stream.
* @return iterable of stream rows
*/
final def startDefaultGraph(): Iterable[RdfStreamRow] =
handleHeader()
extraRowsBuffer.append(defaultGraphStart).toSeq
extraRowsBuffer.append(defaultGraphStart).getBufferCopy

/**
* Signal the end of a delimited graph in a GRAPHS stream.
Expand Down Expand Up @@ -149,7 +147,7 @@ abstract class ProtoEncoder[TNode, -TTriple, -TQuad, -TQuoted](val options: RdfS
// *************************************
// We assume by default that 32 rows should be enough to encode one statement.
// If not, the buffer will grow.
private val extraRowsBuffer = new ArrayBuffer[RdfStreamRow](32)
private val extraRowsBuffer = new FastBuffer[RdfStreamRow](32)
// Make the node cache size between 256 and 1024, depending on the user's maxNameTableSize.
private val nodeCacheSize = Math.max(Math.min(options.maxNameTableSize, 1024), 256)
private val nodeEncoder = new NodeEncoder[TNode](options, nodeCacheSize, nodeCacheSize)
Expand Down
29 changes: 14 additions & 15 deletions core/src/test/scala/eu/ostrzyciel/jelly/core/NodeEncoderSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import org.scalatest.Inspectors
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

import scala.collection.mutable.ArrayBuffer
import scala.util.Random

class NodeEncoderSpec extends AnyWordSpec, Inspectors, Matchers:
Expand All @@ -16,8 +15,8 @@ class NodeEncoderSpec extends AnyWordSpec, Inspectors, Matchers:
maxDatatypeTableSize = 8,
)

private def getEncoder(prefixTableSize: Int = 8): (NodeEncoder[Mrl.Node], ArrayBuffer[RdfStreamRow]) =
val buffer = new ArrayBuffer[RdfStreamRow]()
private def getEncoder(prefixTableSize: Int = 8): (NodeEncoder[Mrl.Node], FastBuffer[RdfStreamRow]) =
val buffer = new FastBuffer[RdfStreamRow](32)
(NodeEncoder[Mrl.Node](smallOptions(prefixTableSize), 16, 16), buffer)

"A NodeEncoder" when {
Expand All @@ -31,8 +30,8 @@ class NodeEncoderSpec extends AnyWordSpec, Inspectors, Matchers:
node.literal.lex should be ("v1")
node.literal.literalKind.datatype should be (1)
buffer.size should be (1)
buffer.head.row.isDatatype should be (true)
val dtEntry = buffer.head.row.datatype
buffer.get(0).row.isDatatype should be (true)
val dtEntry = buffer.get(0).row.datatype
dtEntry.value should be ("dt1")
dtEntry.id should be (0)
}
Expand Down Expand Up @@ -64,7 +63,7 @@ class NodeEncoderSpec extends AnyWordSpec, Inspectors, Matchers:
node2.literal.literalKind.datatype should be (2)

buffer.size should be (4)
buffer.map(_.row.datatype) should contain only (
buffer.getBufferCopy.map(_.row.datatype) should contain only (
RdfDatatypeEntry(0, "dt1"),
RdfDatatypeEntry(0, "dt2"),
RdfDatatypeEntry(0, "dt3"),
Expand Down Expand Up @@ -112,7 +111,7 @@ class NodeEncoderSpec extends AnyWordSpec, Inspectors, Matchers:
val expectedIds = Array.from(
Iterable.fill(8)(0) ++ Seq(1) ++ Iterable.fill(3)(0) ++ Seq(1) ++ Iterable.fill(3)(0)
)
for (r, i) <- buffer.zipWithIndex do
for (r, i) <- buffer.getBufferCopy.zipWithIndex do
val dt = r.row.datatype
dt.id should be (expectedIds(i))
dt.value should be (s"dt${i + 1}")
Expand Down Expand Up @@ -192,10 +191,10 @@ class NodeEncoderSpec extends AnyWordSpec, Inspectors, Matchers:
iri.prefixId should be (1)

buffer.size should be (2)
buffer should contain (RdfStreamRow(RdfStreamRow.Row.Prefix(
buffer.getBufferCopy should contain (RdfStreamRow(RdfStreamRow.Row.Prefix(
RdfPrefixEntry(id = 0, value = "https://test.org/")
)))
buffer should contain (RdfStreamRow(RdfStreamRow.Row.Name(
buffer.getBufferCopy should contain (RdfStreamRow(RdfStreamRow.Row.Name(
RdfNameEntry(id = 0, value = "Cake")
)))
}
Expand All @@ -208,10 +207,10 @@ class NodeEncoderSpec extends AnyWordSpec, Inspectors, Matchers:

// an empty name entry still has to be allocated
buffer.size should be (2)
buffer should contain (RdfStreamRow(RdfStreamRow.Row.Prefix(
buffer.getBufferCopy should contain (RdfStreamRow(RdfStreamRow.Row.Prefix(
RdfPrefixEntry(id = 0, value = "https://test.org/test/")
)))
buffer should contain(RdfStreamRow(RdfStreamRow.Row.Name(
buffer.getBufferCopy should contain(RdfStreamRow(RdfStreamRow.Row.Name(
RdfNameEntry(id = 0, value = "")
)))
}
Expand All @@ -224,10 +223,10 @@ class NodeEncoderSpec extends AnyWordSpec, Inspectors, Matchers:

// in the mode with the prefix table enabled, an empty prefix entry still has to be allocated
buffer.size should be (2)
buffer should contain(RdfStreamRow(RdfStreamRow.Row.Prefix(
buffer.getBufferCopy should contain(RdfStreamRow(RdfStreamRow.Row.Prefix(
RdfPrefixEntry(id = 0, value = "")
)))
buffer should contain (RdfStreamRow(RdfStreamRow.Row.Name(
buffer.getBufferCopy should contain (RdfStreamRow(RdfStreamRow.Row.Name(
RdfNameEntry(id = 0, value = "testTestTest")
)))
}
Expand All @@ -240,7 +239,7 @@ class NodeEncoderSpec extends AnyWordSpec, Inspectors, Matchers:

// in the no prefix mode, there must be no prefix entries
buffer.size should be (1)
buffer should contain (RdfStreamRow(RdfStreamRow.Row.Name(
buffer.getBufferCopy should contain (RdfStreamRow(RdfStreamRow.Row.Name(
RdfNameEntry(id = 0, value = "https://test.org/Cake")
)))
}
Expand Down Expand Up @@ -292,7 +291,7 @@ class NodeEncoderSpec extends AnyWordSpec, Inspectors, Matchers:
)

buffer.size should be (expectedBuffer.size)
for ((isPrefix, eId, eVal), row) <- expectedBuffer.zip(buffer) do
for ((isPrefix, eId, eVal), row) <- expectedBuffer.zip(buffer.getBufferCopy) do
if isPrefix then
row.row.isPrefix should be (true)
val prefix = row.row.prefix
Expand Down

0 comments on commit 4421bb1

Please sign in to comment.