From 564779189bd52043f938c2afd8813bcec9cf3cf2 Mon Sep 17 00:00:00 2001 From: Ostrzyciel Date: Wed, 11 Sep 2024 14:06:54 +0200 Subject: [PATCH] Use sketches for unique count statistics Issue: https://github.com/RiverBench/RiverBench/issues/91 --- .../commands/CategoryDocGenCommand.scala | 2 +- .../scala/commands/DatasetDocGenCommand.scala | 23 ++++++++-- src/main/scala/util/RdfUtil.scala | 2 + src/main/scala/util/StatCounter.scala | 44 +++++++++---------- src/main/scala/util/StatCounterSuite.scala | 28 ++++++------ src/main/scala/util/doc/DocBuilder.scala | 17 +++---- 6 files changed, 67 insertions(+), 49 deletions(-) diff --git a/src/main/scala/commands/CategoryDocGenCommand.scala b/src/main/scala/commands/CategoryDocGenCommand.scala index d7bd314..67782a5 100644 --- a/src/main/scala/commands/CategoryDocGenCommand.scala +++ b/src/main/scala/commands/CategoryDocGenCommand.scala @@ -48,7 +48,7 @@ object CategoryDocGenCommand extends Command: ), startHeadingLevel = 3, customValueFormatters = { - case res: Resource if { + case (_, res: Resource) if { val props = res.listProperties().asScala.map(_.getPredicate).toSeq res.hasProperty(RDF.`type`, RdfUtil.SystemUnderTest) && props.contains(RDFS.label) && diff --git a/src/main/scala/commands/DatasetDocGenCommand.scala b/src/main/scala/commands/DatasetDocGenCommand.scala index 5b022e6..164271a 100644 --- a/src/main/scala/commands/DatasetDocGenCommand.scala +++ b/src/main/scala/commands/DatasetDocGenCommand.scala @@ -1,15 +1,14 @@ package io.github.riverbench.ci_worker package commands -import util.doc.{DocBuilder, DocFileUtil, MarkdownUtil} import util.* +import util.doc.{DocBuilder, DocFileUtil, MarkdownUtil} import org.apache.jena.rdf.model.{Property, RDFNode} -import org.apache.jena.riot.RDFDataMgr import org.apache.jena.sparql.vocabulary.FOAF import org.apache.jena.vocabulary.{RDF, RDFS} -import java.nio.file.{FileSystems, Files, Path} +import java.nio.file.{FileSystems, Files} import scala.concurrent.Future import scala.jdk.CollectionConverters.* @@ -78,7 +77,23 @@ object DatasetDocGenCommand extends Command: val statsRes = m.createResource() for s <- m.listObjectsOfProperty(RdfUtil.hasStatisticsSet).asScala.distinct do statsRes.addProperty(RdfUtil.hasStatisticsSet, s) - // val statSection = docIndex.addSubsection() + // Another cheat: transform unique count statistics to include upper/lower confidence bounds + val statsToProcess = m.listResourcesWithProperty(RdfUtil.uniqueCountLowerBound).asScala.toSeq + for stat <- statsToProcess do + val uniqueCount = stat.getProperty(RdfUtil.uniqueCount).getLong + val lower = stat.getProperty(RdfUtil.uniqueCountLowerBound).getLong + val upper = stat.getProperty(RdfUtil.uniqueCountUpperBound).getLong + stat.removeAll(RdfUtil.uniqueCount) + stat.removeAll(RdfUtil.uniqueCountLowerBound) + stat.removeAll(RdfUtil.uniqueCountUpperBound) + stat.addProperty( + RdfUtil.uniqueCount, + MarkdownUtil.prettyLabel( + "~ " + MarkdownUtil.formatInt(uniqueCount.toString), + Some(s"Estimated value. Lower bound: ${MarkdownUtil.formatInt(lower.toString)}, " + + s"upper bound: ${MarkdownUtil.formatInt(upper.toString)} (95% confidence).") + ) + ) docBuilderIndex.buildSection(statsRes, docIndex) // Save the index.md document diff --git a/src/main/scala/util/RdfUtil.scala b/src/main/scala/util/RdfUtil.scala index 8af1ad0..553ac1c 100644 --- a/src/main/scala/util/RdfUtil.scala +++ b/src/main/scala/util/RdfUtil.scala @@ -47,6 +47,8 @@ object RdfUtil: val stDev = m.createProperty(pRb, "standardDeviation") val sum = m.createProperty(pRb, "sum") val uniqueCount = m.createProperty(pRb, "uniqueCount") + val uniqueCountLowerBound = m.createProperty(pRb, "uniqueCountLowerBound") + val uniqueCountUpperBound = m.createProperty(pRb, "uniqueCountUpperBound") val hasStatistics = m.createProperty(pRb, "hasStatistics") val hasStatisticsSet = m.createProperty(pRb, "hasStatisticsSet") diff --git a/src/main/scala/util/StatCounter.scala b/src/main/scala/util/StatCounter.scala index 6a8bccb..fca56c3 100644 --- a/src/main/scala/util/StatCounter.scala +++ b/src/main/scala/util/StatCounter.scala @@ -1,15 +1,14 @@ package io.github.riverbench.ci_worker package util -import com.google.common.hash.{BloomFilter, Funnel, PrimitiveSink} +import org.apache.datasketches.hll.HllSketch import org.apache.jena.datatypes.xsd.XSDDatatype.* -import org.apache.jena.graph.Node import org.apache.jena.rdf.model.Resource //noinspection UnstableApiUsage object StatCounter: case class Result(sum: Long, mean: Double, stDev: Double, min: Long, max: Long, - uniqueCount: Option[Long]): + uniqueCount: Option[Long], uniqueLowerBound: Option[Long], uniqueUpperBound: Option[Long]): def addToRdf(statRes: Resource): Unit = statRes.addProperty(RdfUtil.sum, sum.toString, XSDinteger) statRes.addProperty(RdfUtil.mean, mean.toString, XSDdecimal) @@ -17,13 +16,8 @@ object StatCounter: statRes.addProperty(RdfUtil.minimum, min.toString, XSDinteger) statRes.addProperty(RdfUtil.maximum, max.toString, XSDinteger) uniqueCount.foreach(c => statRes.addProperty(RdfUtil.uniqueCount, c.toString, XSDinteger)) - - implicit val stringFunnel: Funnel[String] = - (from: String, into: PrimitiveSink) => into.putBytes(from.getBytes) - - // Add to the node's funnel both its hashcode and string repr. bytes to avoid collisions... just in case - implicit val nodeFunnel: Funnel[Node] = - (from: Node, into: PrimitiveSink) => into.putInt(from.hashCode()).putBytes(from.toString.getBytes) + uniqueLowerBound.foreach(c => statRes.addProperty(RdfUtil.uniqueCountLowerBound, c.toString, XSDinteger)) + uniqueUpperBound.foreach(c => statRes.addProperty(RdfUtil.uniqueCountUpperBound, c.toString, XSDinteger)) class LightStatCounter[T]: import StatCounter.* @@ -51,27 +45,33 @@ class LightStatCounter[T]: def result: Result = this.synchronized { val mean = sum.toDouble / count val stDev = Math.sqrt(sumSq.toDouble / count - mean * mean) - Result(sum, mean, stDev, min, max, None) + Result(sum, mean, stDev, min, max, None, None, None) } -//noinspection UnstableApiUsage -class StatCounter[T : Funnel](size: Long) extends LightStatCounter[T]: +class StatCounter extends LightStatCounter[String]: import StatCounter.* - private val bloomFilter = BloomFilter.create[T](implicitly[Funnel[T]], size, 0.01) + private val sketch = HllSketch(16) - override def add(values: Seq[T]): Unit = - // the bloom filter is thread-safe - values.foreach(bloomFilter.put) - // but the counter is not + override def add(values: Seq[String]): Unit = + sketch.synchronized { + values.foreach(sketch.update) + } lightAdd(values.distinct.size) - override def addUnique(values: Iterable[T]): Unit = - values.foreach(bloomFilter.put) + override def addUnique(values: Iterable[String]): Unit = + sketch.synchronized { + values.foreach(sketch.update) + } lightAdd(values.size) - override def result: Result = - super.result.copy(uniqueCount = Some(bloomFilter.approximateElementCount)) + override def result: Result = sketch.synchronized { + super.result.copy( + uniqueCount = Some(sketch.getEstimate.toLong), + uniqueLowerBound = Some(sketch.getLowerBound(2).toLong), + uniqueUpperBound = Some(sketch.getUpperBound(2).toLong) + ) + } // uses sets instead of bloom filters class PreciseStatCounter[T] extends LightStatCounter[T]: diff --git a/src/main/scala/util/StatCounterSuite.scala b/src/main/scala/util/StatCounterSuite.scala index fa33e94..dbfb9ee 100644 --- a/src/main/scala/util/StatCounterSuite.scala +++ b/src/main/scala/util/StatCounterSuite.scala @@ -69,29 +69,29 @@ class StatCounterSuite(val size: Long): private val DEFAULT_GRAPH = NodeFactory.createBlankNode("DEFAULT GRAPH") // A bad heuristic: 10x the size of the stream is assumed to be the number of elements in the bloom filters - private val cIris = new StatCounter[String](10 * size) - private val cLiterals = new StatCounter[String](10 * size) - private val cPlainLiterals = new StatCounter[String](10 * size) - private val cDtLiterals = new StatCounter[String](10 * size) - private val cLangLiterals = new StatCounter[String](10 * size) + private val cIris = new StatCounter() + private val cLiterals = new StatCounter() + private val cPlainLiterals = new StatCounter() + private val cDtLiterals = new StatCounter() + private val cLangLiterals = new StatCounter() private val cDatatypes = new PreciseStatCounter[String] private val cAsciiControlChars = LightStatCounter[Char]() private val cBlankNodes = new LightStatCounter[String]() private val cQuotedTriples = new LightStatCounter[String]() - private val cSubjects = new StatCounter[Node](10 * size) - private val cPredicates = new StatCounter[Node](10 * size) - private val cObjects = new StatCounter[Node](10 * size) - private val cGraphs = new StatCounter[Node](10 * size) + private val cSubjects = new StatCounter() + private val cPredicates = new StatCounter() + private val cObjects = new StatCounter() + private val cGraphs = new StatCounter() private val cStatements = new LightStatCounter[String]() def add(ds: DatasetGraph): Unit = if ds.getDefaultGraph.isEmpty then - cGraphs.add(ds.listGraphNodes().asScala.toSeq) + cGraphs.add(ds.listGraphNodes().asScala.map(_.toString(true)).toSeq) else - cGraphs.add(ds.listGraphNodes().asScala.toSeq :+ DEFAULT_GRAPH) + cGraphs.add((ds.listGraphNodes().asScala.toSeq :+ DEFAULT_GRAPH).map(_.toString(true))) val subjects = mutable.Set[Node]() val predicates = mutable.Set[Node]() @@ -159,9 +159,9 @@ class StatCounterSuite(val size: Long): cQuotedTriples.lightAdd(quotedTripleCount) - cSubjects.addUnique(subjects) - cPredicates.addUnique(predicates) - cObjects.addUnique(objects) + cSubjects.addUnique(subjects.map(_.toString(true))) + cPredicates.addUnique(predicates.map(_.toString(true))) + cObjects.addUnique(objects.map(_.toString(true))) cStatements.lightAdd(stCount) private def countAsciiControlChars(s: String): Int = diff --git a/src/main/scala/util/doc/DocBuilder.scala b/src/main/scala/util/doc/DocBuilder.scala index a8c6ff9..d6afb54 100644 --- a/src/main/scala/util/doc/DocBuilder.scala +++ b/src/main/scala/util/doc/DocBuilder.scala @@ -17,7 +17,8 @@ object DocBuilder: defaultPropGroup: Option[String] = None, tabularProps: Seq[Property] = Seq(), startHeadingLevel: Int = 1, - customValueFormatters: PartialFunction[RDFNode, DocValue] = PartialFunction.empty, + // Arguments to the partial function: predicate, object. + customValueFormatters: PartialFunction[(Resource, RDFNode), DocValue] = PartialFunction.empty, customSectionContentGen: Map[Resource, Seq[RDFNode] => String] = Map(), ) @@ -105,19 +106,19 @@ class DocBuilder(ontologies: Model, opt: DocBuilder.Options): private def makeValue(docProp: DocProp, objects: Iterable[RDFNode]): DocValue = def makeValueInternal(o: RDFNode): DocValue = - o match + (docProp.prop, o) match case opt.customValueFormatters(value) => value - case lit: Literal if docProp.prop.getURI == RdfUtil.dcatByteSize.getURI => + case (RdfUtil.dcatByteSize, lit: Literal) => DocValue.SizeLiteral(lit.getLexicalForm) - case lit: Literal => DocValue.Literal(lit) - case res: Resource if res.isAnon => + case (_, lit: Literal) => DocValue.Literal(lit) + case (_, res: Resource) if res.isAnon => val nestedValues = getDocValuesForRes(res) DocValue.BlankNode(nestedValues, getTitleForProps(nestedValues)) - case res: Resource if docProp.prop.getURI == RDF.`type`.getURI || + case (prop, res: Resource) if prop == RDF.`type` || RdfUtil.isNamedThing(res, Some(ontologies)) => DocValue.RdfNamedThing(res, ontologies) - case res: Resource => DocValue.Link(res) - case node => DocValue.Text(node.toString) + case (_, res: Resource) => DocValue.Link(res) + case (_, node) => DocValue.Text(node.toString) val values = objects.map { o => val list = GraphList.members(GNode.create(o.getModel.getGraph, o.asNode)).asScala