From d5984891bca460927a9e7c90e30dd33a348a8cd3 Mon Sep 17 00:00:00 2001 From: Ostrzyciel Date: Tue, 17 Sep 2024 14:38:40 +0200 Subject: [PATCH] Update deps, stabilize flat format's order Issue: https://github.com/RiverBench/RiverBench/issues/120 --- build.sbt | 11 ++++--- src/main/scala/commands/PackageCommand.scala | 32 +++++++++++-------- .../scala/commands/PackageSchemaCommand.scala | 8 ++--- src/main/scala/util/RdfOrdering.scala | 19 +++++++++++ src/main/scala/util/StatCounterSuite.scala | 12 +++---- 5 files changed, 54 insertions(+), 28 deletions(-) create mode 100644 src/main/scala/util/RdfOrdering.scala diff --git a/build.sbt b/build.sbt index 44e81a0..a4781b7 100644 --- a/build.sbt +++ b/build.sbt @@ -5,12 +5,12 @@ resolvers += "Sonatype OSS Snapshots" at "https://s01.oss.sonatype.org/content/repositories/snapshots" lazy val circeV = "0.14.7" -lazy val jellyV = "0.14.1+12-0d137338-SNAPSHOT" -lazy val jenaV = "4.10.0" -lazy val pekkoV = "1.0.2" +lazy val jellyV = "2.0.0" +lazy val jenaV = "5.1.0" +lazy val pekkoV = "1.1.0" lazy val pekkoHttpV = "1.0.1" lazy val pekkoConnV = "1.0.2" -lazy val rdf4jV = "4.3.11" +lazy val rdf4jV = "5.0.2" lazy val icu4jV = "74.2" lazy val root = (project in file(".")) @@ -20,7 +20,7 @@ lazy val root = (project in file(".")) // Scala 3 or not Scala at all libraryDependencies ++= Seq( - "com.google.guava" % "guava" % "33.2.0-jre", + "com.google.guava" % "guava" % "33.2.1-jre", "com.ibm.icu" % "icu4j" % icu4jV, "eu.ostrzyciel.jelly" %% "jelly-stream" % jellyV, "eu.ostrzyciel.jelly" %% "jelly-jena" % jellyV, @@ -62,5 +62,6 @@ lazy val root = (project in file(".")) // emit deprecated warnings scalacOptions ++= Seq( "-deprecation", + "-Werror", ), ) diff --git a/src/main/scala/commands/PackageCommand.scala b/src/main/scala/commands/PackageCommand.scala index 9a8c145..7f85169 100644 --- a/src/main/scala/commands/PackageCommand.scala +++ b/src/main/scala/commands/PackageCommand.scala @@ -7,10 +7,11 @@ import util.io.* import eu.ostrzyciel.jelly.core.proto.v1.PhysicalStreamType import eu.ostrzyciel.jelly.core.{JellyOptions, LogicalStreamTypeFactory} import eu.ostrzyciel.jelly.stream.{EncoderFlow, JellyIo} +import org.apache.jena.graph.GraphMemFactory import org.apache.jena.rdf.model.{ModelFactory, Resource} import org.apache.jena.riot.{Lang, RDFParser, RDFWriter} import org.apache.jena.riot.lang.LabelToNode -import org.apache.jena.riot.system.{ErrorHandlerFactory, FactoryRDFStd} +import org.apache.jena.riot.system.{ErrorHandlerFactory, FactoryRDFStd, StreamRDFWriter} import org.apache.jena.sparql.core.{DatasetGraph, DatasetGraphFactory} import org.apache.pekko.stream.* import org.apache.pekko.stream.connectors.file.TarArchiveMetadata @@ -20,7 +21,7 @@ import org.apache.pekko.{Done, NotUsed} import org.eclipse.rdf4j.model.vocabulary.XSD import org.eclipse.rdf4j.rio -import java.io.{ByteArrayInputStream, FileOutputStream} +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, FileOutputStream} import java.nio.file.{FileSystems, Path} import java.util.UUID import scala.concurrent.Future @@ -28,6 +29,7 @@ import scala.jdk.CollectionConverters.* object PackageCommand extends Command: import eu.ostrzyciel.jelly.convert.jena.{*, given} + import RdfOrdering.given sealed trait DistType(val weight: Int) object DistType: @@ -48,6 +50,8 @@ object PackageCommand extends Command: val repoDir = FileSystems.getDefault.getPath(args(1)) val sourceArchiveFile = FileSystems.getDefault.getPath(args(2)) val outDir = FileSystems.getDefault.getPath(args(3)) + + GraphMemFactory.setDftGraphSameTerm(false) val metadata = MetadataReader.read(repoDir) val stats = new StatCounterSuite(metadata.elementCount) @@ -206,11 +210,12 @@ object PackageCommand extends Command: Flow[(DatasetGraph, Long), (Long, StatCounterSuite.Result), NotUsed] = Flow[(DatasetGraph, Long)] .wireTap((_, num) => if (num + 1) % 100_000 == 0 then println(s"Stats stream at: ${num + 1}")) - .splitAfter(SubstreamCancelStrategy.propagate)((_, num) => + .splitAfter((_, num) => val shouldSplit = Constants.packageSizes.contains(num + 1) if shouldSplit then println(s"Splitting stats stream at ${num + 1}") shouldSplit ) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider)) .map((ds, num) => { stats.add(ds) num + 1 @@ -334,19 +339,20 @@ object PackageCommand extends Command: } Flow[(DatasetGraph, Long)] - .map((ds, _) => { + .map((ds, _) => ds.find().asScala.toSeq.sorted) + .map(quads => { + val os = ByteArrayOutputStream() if metadata.streamTypes.exists(_.elementType == ElementType.Triple) then - RDFWriter.create() - .lang(Lang.NTRIPLES) - .source(ds.getDefaultGraph) - .asString() + val writer = StreamRDFWriter.getWriterStream(os, Lang.NTRIPLES) + quads.foreach(q => writer.triple(q.asTriple)) + writer.finish() else - RDFWriter.create() - .lang(Lang.NQUADS) - .source(ds) - .asString() + val writer = StreamRDFWriter.getWriterStream(os, Lang.NQUADS) + quads.foreach(q => writer.quad(q)) + writer.finish() + os.toByteArray }) - .map(ByteString.fromString) + .map(ByteString.fromArrayUnsafe) .toMat(StreamUtil.broadcastSink(sinks))(Keep.right) /** diff --git a/src/main/scala/commands/PackageSchemaCommand.scala b/src/main/scala/commands/PackageSchemaCommand.scala index ce4645b..4e47cf2 100644 --- a/src/main/scala/commands/PackageSchemaCommand.scala +++ b/src/main/scala/commands/PackageSchemaCommand.scala @@ -41,14 +41,14 @@ object PackageSchemaCommand extends Command: for name <- toProcessNames do val inPath = repoDir.resolve(s"src/$name.ttl") val model = RDFDataMgr.loadModel(inPath.toString) - model.listSubjectsWithProperty(RDF.`type`, OWL.Ontology).asScala.toList match + model.listSubjectsWithProperty(RDF.`type`, OWL2.Ontology).asScala.toList match case List(ontology) => // Update version IRI model.removeAll(ontology, OWL2.versionIRI, null) model.add(ontology, OWL2.versionIRI, model.createResource(s"${ontology.getURI}/$version")) // Update imports - ontology.listProperties(OWL.imports).asScala + ontology.listProperties(OWL2.imports).asScala .filter(_.getObject.isURIResource) .map(_.getObject.asResource) .filter(_.getURI.startsWith(schemaBase)) @@ -57,8 +57,8 @@ object PackageSchemaCommand extends Command: .toSeq .foreach { (oldRes, nameParts) => val newRes = model.createResource(s"$schemaBase${nameParts(0)}/$version") - model.remove(ontology, OWL.imports, oldRes) - model.add(ontology, OWL.imports, newRes) + model.remove(ontology, OWL2.imports, oldRes) + model.add(ontology, OWL2.imports, newRes) } case _ => () diff --git a/src/main/scala/util/RdfOrdering.scala b/src/main/scala/util/RdfOrdering.scala new file mode 100644 index 0000000..54b0f13 --- /dev/null +++ b/src/main/scala/util/RdfOrdering.scala @@ -0,0 +1,19 @@ +package io.github.riverbench.ci_worker +package util + +import org.apache.jena.sparql.core.Quad + +object RdfOrdering: + + given Ordering[Quad] with + def compare(x: Quad, y: Quad): Int = + val cmpGraph = x.getGraph.toString.compareTo(y.getGraph.toString) + if cmpGraph != 0 then cmpGraph + else + val cmpSubject = x.getSubject.toString.compareTo(y.getSubject.toString) + if cmpSubject != 0 then cmpSubject + else + val cmpPredicate = x.getPredicate.toString.compareTo(y.getPredicate.toString) + if cmpPredicate != 0 then cmpPredicate + else + x.getObject.toString.compareTo(y.getObject.toString) diff --git a/src/main/scala/util/StatCounterSuite.scala b/src/main/scala/util/StatCounterSuite.scala index dbfb9ee..56c2ab6 100644 --- a/src/main/scala/util/StatCounterSuite.scala +++ b/src/main/scala/util/StatCounterSuite.scala @@ -89,9 +89,9 @@ class StatCounterSuite(val size: Long): def add(ds: DatasetGraph): Unit = if ds.getDefaultGraph.isEmpty then - cGraphs.add(ds.listGraphNodes().asScala.map(_.toString(true)).toSeq) + cGraphs.add(ds.listGraphNodes().asScala.map(_.toString()).toSeq) else - cGraphs.add((ds.listGraphNodes().asScala.toSeq :+ DEFAULT_GRAPH).map(_.toString(true))) + cGraphs.add((ds.listGraphNodes().asScala.toSeq :+ DEFAULT_GRAPH).map(_.toString())) val subjects = mutable.Set[Node]() val predicates = mutable.Set[Node]() @@ -131,7 +131,7 @@ class StatCounterSuite(val size: Long): else if n.isBlank then blankNodes += n.getBlankNodeLabel else if n.isLiteral then - val lit = n.toString(false) + val lit = n.toString() controlCharCount += countAsciiControlChars(lit) literals += lit if n.getLiteralLanguage != "" then @@ -159,9 +159,9 @@ class StatCounterSuite(val size: Long): cQuotedTriples.lightAdd(quotedTripleCount) - cSubjects.addUnique(subjects.map(_.toString(true))) - cPredicates.addUnique(predicates.map(_.toString(true))) - cObjects.addUnique(objects.map(_.toString(true))) + cSubjects.addUnique(subjects.map(_.toString())) + cPredicates.addUnique(predicates.map(_.toString())) + cObjects.addUnique(objects.map(_.toString())) cStatements.lightAdd(stCount) private def countAsciiControlChars(s: String): Int =