Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Encoder: Maybe speed up stream row buffer #193

Merged
merged 1 commit into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions core/src/main/java/eu/ostrzyciel/jelly/core/FastBuffer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package eu.ostrzyciel.jelly.core;

class FastBuffer<T> {
private static final int CAPACITY_INCREASE = 16;

private T[] buffer;
private int size;
private int capacity;

public FastBuffer(int capacity) {
this.capacity = capacity;
this.size = 0;
this.buffer = (T[]) new Object[capacity];
}

public FastBuffer<T> append(T element) {
if (size == capacity) {
capacity += CAPACITY_INCREASE;
T[] newBuffer = (T[]) new Object[capacity];
System.arraycopy(buffer, 0, newBuffer, 0, size);
buffer = newBuffer;
}
buffer[size++] = element;
return this;
}

public T get(int index) {
return buffer[index];
}

public FastBuffer<T> clear() {
size = 0;
return this;
}

public T[] getBufferCopy() {
T[] copy = (T[]) new Object[size];
System.arraycopy(buffer, 0, copy, 0, size);
return copy;
}

public int size() {
return size;
}
}
5 changes: 2 additions & 3 deletions core/src/main/java/eu/ostrzyciel/jelly/core/NodeEncoder.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package eu.ostrzyciel.jelly.core;

import eu.ostrzyciel.jelly.core.proto.v1.*;
import scala.collection.mutable.ArrayBuffer;

import java.util.LinkedHashMap;
import java.util.function.Function;
Expand Down Expand Up @@ -95,7 +94,7 @@ public NodeEncoder(RdfStreamOptions opt, int nodeCacheSize, int dependentNodeCac
* @return The encoded literal
*/
public UniversalTerm encodeDtLiteral(
TNode key, String lex, String datatypeName, ArrayBuffer<RdfStreamRow> rowsBuffer
TNode key, String lex, String datatypeName, FastBuffer<RdfStreamRow> rowsBuffer
) {
var cachedNode = dependentNodeCache.computeIfAbsent(key, k -> new DependentNode());
// Check if the value is still valid
Expand Down Expand Up @@ -130,7 +129,7 @@ public UniversalTerm encodeDtLiteral(
* @param rowsBuffer The buffer to which the new name and prefix lookup entries should be appended
* @return The encoded IRI
*/
public UniversalTerm encodeIri(String iri, ArrayBuffer<RdfStreamRow> rowsBuffer) {
public UniversalTerm encodeIri(String iri, FastBuffer<RdfStreamRow> rowsBuffer) {
var cachedNode = dependentNodeCache.computeIfAbsent(iri, k -> new DependentNode());
// Check if the value is still valid
if (cachedNode.encoded != null &&
Expand Down
12 changes: 5 additions & 7 deletions core/src/main/scala/eu/ostrzyciel/jelly/core/ProtoEncoder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package eu.ostrzyciel.jelly.core

import eu.ostrzyciel.jelly.core.proto.v1.*

import scala.collection.mutable.{ArrayBuffer, ListBuffer}

object ProtoEncoder:
private val graphEnd = Seq(RdfStreamRow(RdfStreamRow.Row.GraphEnd(RdfGraphEnd.defaultInstance)))
private val defaultGraphStart = RdfStreamRow(RdfStreamRow.Row.GraphStart(
Expand Down Expand Up @@ -33,7 +31,7 @@ abstract class ProtoEncoder[TNode, -TTriple, -TQuad, -TQuoted](val options: RdfS
val mainRow = RdfStreamRow(RdfStreamRow.Row.Triple(
tripleToProto(triple)
))
extraRowsBuffer.append(mainRow).toSeq
extraRowsBuffer.append(mainRow).getBufferCopy

/**
* Add an RDF quad statement to the stream.
Expand All @@ -45,7 +43,7 @@ abstract class ProtoEncoder[TNode, -TTriple, -TQuad, -TQuoted](val options: RdfS
val mainRow = RdfStreamRow(RdfStreamRow.Row.Quad(
quadToProto(quad)
))
extraRowsBuffer.append(mainRow).toSeq
extraRowsBuffer.append(mainRow).getBufferCopy

/**
* Signal the start of a new (named) delimited graph in a GRAPHS stream.
Expand All @@ -62,15 +60,15 @@ abstract class ProtoEncoder[TNode, -TTriple, -TQuad, -TQuoted](val options: RdfS
val mainRow = RdfStreamRow(RdfStreamRow.Row.GraphStart(
RdfGraphStart(graphNode)
))
extraRowsBuffer.append(mainRow).toSeq
extraRowsBuffer.append(mainRow).getBufferCopy

/**
* Signal the start of the default delimited graph in a GRAPHS stream.
* @return iterable of stream rows
*/
final def startDefaultGraph(): Iterable[RdfStreamRow] =
handleHeader()
extraRowsBuffer.append(defaultGraphStart).toSeq
extraRowsBuffer.append(defaultGraphStart).getBufferCopy

/**
* Signal the end of a delimited graph in a GRAPHS stream.
Expand Down Expand Up @@ -149,7 +147,7 @@ abstract class ProtoEncoder[TNode, -TTriple, -TQuad, -TQuoted](val options: RdfS
// *************************************
// We assume by default that 32 rows should be enough to encode one statement.
// If not, the buffer will grow.
private val extraRowsBuffer = new ArrayBuffer[RdfStreamRow](32)
private val extraRowsBuffer = new FastBuffer[RdfStreamRow](32)
// Make the node cache size between 256 and 1024, depending on the user's maxNameTableSize.
private val nodeCacheSize = Math.max(Math.min(options.maxNameTableSize, 1024), 256)
private val nodeEncoder = new NodeEncoder[TNode](options, nodeCacheSize, nodeCacheSize)
Expand Down
29 changes: 14 additions & 15 deletions core/src/test/scala/eu/ostrzyciel/jelly/core/NodeEncoderSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import org.scalatest.Inspectors
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

import scala.collection.mutable.ArrayBuffer
import scala.util.Random

class NodeEncoderSpec extends AnyWordSpec, Inspectors, Matchers:
Expand All @@ -16,8 +15,8 @@ class NodeEncoderSpec extends AnyWordSpec, Inspectors, Matchers:
maxDatatypeTableSize = 8,
)

private def getEncoder(prefixTableSize: Int = 8): (NodeEncoder[Mrl.Node], ArrayBuffer[RdfStreamRow]) =
val buffer = new ArrayBuffer[RdfStreamRow]()
private def getEncoder(prefixTableSize: Int = 8): (NodeEncoder[Mrl.Node], FastBuffer[RdfStreamRow]) =
val buffer = new FastBuffer[RdfStreamRow](32)
(NodeEncoder[Mrl.Node](smallOptions(prefixTableSize), 16, 16), buffer)

"A NodeEncoder" when {
Expand All @@ -31,8 +30,8 @@ class NodeEncoderSpec extends AnyWordSpec, Inspectors, Matchers:
node.literal.lex should be ("v1")
node.literal.literalKind.datatype should be (1)
buffer.size should be (1)
buffer.head.row.isDatatype should be (true)
val dtEntry = buffer.head.row.datatype
buffer.get(0).row.isDatatype should be (true)
val dtEntry = buffer.get(0).row.datatype
dtEntry.value should be ("dt1")
dtEntry.id should be (0)
}
Expand Down Expand Up @@ -64,7 +63,7 @@ class NodeEncoderSpec extends AnyWordSpec, Inspectors, Matchers:
node2.literal.literalKind.datatype should be (2)

buffer.size should be (4)
buffer.map(_.row.datatype) should contain only (
buffer.getBufferCopy.map(_.row.datatype) should contain only (
RdfDatatypeEntry(0, "dt1"),
RdfDatatypeEntry(0, "dt2"),
RdfDatatypeEntry(0, "dt3"),
Expand Down Expand Up @@ -112,7 +111,7 @@ class NodeEncoderSpec extends AnyWordSpec, Inspectors, Matchers:
val expectedIds = Array.from(
Iterable.fill(8)(0) ++ Seq(1) ++ Iterable.fill(3)(0) ++ Seq(1) ++ Iterable.fill(3)(0)
)
for (r, i) <- buffer.zipWithIndex do
for (r, i) <- buffer.getBufferCopy.zipWithIndex do
val dt = r.row.datatype
dt.id should be (expectedIds(i))
dt.value should be (s"dt${i + 1}")
Expand Down Expand Up @@ -192,10 +191,10 @@ class NodeEncoderSpec extends AnyWordSpec, Inspectors, Matchers:
iri.prefixId should be (1)

buffer.size should be (2)
buffer should contain (RdfStreamRow(RdfStreamRow.Row.Prefix(
buffer.getBufferCopy should contain (RdfStreamRow(RdfStreamRow.Row.Prefix(
RdfPrefixEntry(id = 0, value = "https://test.org/")
)))
buffer should contain (RdfStreamRow(RdfStreamRow.Row.Name(
buffer.getBufferCopy should contain (RdfStreamRow(RdfStreamRow.Row.Name(
RdfNameEntry(id = 0, value = "Cake")
)))
}
Expand All @@ -208,10 +207,10 @@ class NodeEncoderSpec extends AnyWordSpec, Inspectors, Matchers:

// an empty name entry still has to be allocated
buffer.size should be (2)
buffer should contain (RdfStreamRow(RdfStreamRow.Row.Prefix(
buffer.getBufferCopy should contain (RdfStreamRow(RdfStreamRow.Row.Prefix(
RdfPrefixEntry(id = 0, value = "https://test.org/test/")
)))
buffer should contain(RdfStreamRow(RdfStreamRow.Row.Name(
buffer.getBufferCopy should contain(RdfStreamRow(RdfStreamRow.Row.Name(
RdfNameEntry(id = 0, value = "")
)))
}
Expand All @@ -224,10 +223,10 @@ class NodeEncoderSpec extends AnyWordSpec, Inspectors, Matchers:

// in the mode with the prefix table enabled, an empty prefix entry still has to be allocated
buffer.size should be (2)
buffer should contain(RdfStreamRow(RdfStreamRow.Row.Prefix(
buffer.getBufferCopy should contain(RdfStreamRow(RdfStreamRow.Row.Prefix(
RdfPrefixEntry(id = 0, value = "")
)))
buffer should contain (RdfStreamRow(RdfStreamRow.Row.Name(
buffer.getBufferCopy should contain (RdfStreamRow(RdfStreamRow.Row.Name(
RdfNameEntry(id = 0, value = "testTestTest")
)))
}
Expand All @@ -240,7 +239,7 @@ class NodeEncoderSpec extends AnyWordSpec, Inspectors, Matchers:

// in the no prefix mode, there must be no prefix entries
buffer.size should be (1)
buffer should contain (RdfStreamRow(RdfStreamRow.Row.Name(
buffer.getBufferCopy should contain (RdfStreamRow(RdfStreamRow.Row.Name(
RdfNameEntry(id = 0, value = "https://test.org/Cake")
)))
}
Expand Down Expand Up @@ -292,7 +291,7 @@ class NodeEncoderSpec extends AnyWordSpec, Inspectors, Matchers:
)

buffer.size should be (expectedBuffer.size)
for ((isPrefix, eId, eVal), row) <- expectedBuffer.zip(buffer) do
for ((isPrefix, eId, eVal), row) <- expectedBuffer.zip(buffer.getBufferCopy) do
if isPrefix then
row.row.isPrefix should be (true)
val prefix = row.row.prefix
Expand Down
Loading