From 4fc7d945999ce92916eb2054adc62c5085e6a17c Mon Sep 17 00:00:00 2001 From: Jiadong Bai Date: Thu, 12 Dec 2024 15:24:03 -0800 Subject: [PATCH] pass the compilation --- .../result/ArrowFileDocumentSpec.scala | 11 +++++++ .../result/PartitionedFileDocumentSpec.scala | 31 ++++++++++++++++--- 2 files changed, 37 insertions(+), 5 deletions(-) diff --git a/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/ArrowFileDocumentSpec.scala b/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/ArrowFileDocumentSpec.scala index e8185e1a380..23693063922 100644 --- a/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/ArrowFileDocumentSpec.scala +++ b/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/ArrowFileDocumentSpec.scala @@ -14,6 +14,17 @@ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future import scala.jdk.CollectionConverters._ +object ArrowFileDocumentSpec { + def stringSerializer(item: String, index: Int, root: VectorSchemaRoot): Unit = { + val vector = root.getVector("data").asInstanceOf[VarCharVector] + vector.setSafe(index, item.getBytes("UTF-8")) + } + + def stringDeserializer(index: Int, root: VectorSchemaRoot): String = { + new String(root.getVector("data").asInstanceOf[VarCharVector].get(index)) + } +} + class ArrowFileDocumentSpec extends AnyFlatSpec with Matchers with BeforeAndAfter { val stringArrowSchema = new Schema(List( diff --git a/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/PartitionedFileDocumentSpec.scala b/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/PartitionedFileDocumentSpec.scala index ce28a59018e..4a3840fbe76 100644 --- a/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/PartitionedFileDocumentSpec.scala +++ b/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/PartitionedFileDocumentSpec.scala @@ -1,14 +1,35 @@ package edu.uci.ics.amber.core.storage.result +import edu.uci.ics.amber.core.storage.result.ArrowFileDocumentSpec.{stringDeserializer, stringSerializer} import edu.uci.ics.amber.core.storage.result.PartitionedFileDocument.getPartitionURI +import org.apache.arrow.vector.types.pojo.{ArrowType, Field, Schema} +import org.apache.arrow.vector.{VarCharVector, VectorSchemaRoot} import org.apache.commons.vfs2.{FileObject, VFS} import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.BeforeAndAfter import org.scalatest.concurrent.ScalaFutures.convertScalaFuture import org.scalatest.matchers.should.Matchers +import scala.jdk.CollectionConverters.IterableHasAsJava + +object ArrowFileDocumentSpec { + def stringSerializer(item: String, index: Int, root: VectorSchemaRoot): Unit = { + val vector = root.getVector("data").asInstanceOf[VarCharVector] + vector.setSafe(index, item.getBytes("UTF-8")) + } + + def stringDeserializer(index: Int, root: VectorSchemaRoot): String = { + new String(root.getVector("data").asInstanceOf[VarCharVector].get(index)) + } +} + + class PartitionedFileDocumentSpec extends AnyFlatSpec with Matchers with BeforeAndAfter { + val stringArrowSchema = new Schema(List( + Field.nullablePrimitive("data", ArrowType.Utf8.INSTANCE) + ).asJava) + var partitionDocument: PartitionedFileDocument[ArrowFileDocument[String], String] = _ val numOfPartitions = 3 val partitionId: String = "partition_doc_test" @@ -18,7 +39,7 @@ class PartitionedFileDocumentSpec extends AnyFlatSpec with Matchers with BeforeA partitionDocument = new PartitionedFileDocument[ArrowFileDocument[String], String]( partitionId, numOfPartitions, - uri => new ArrowFileDocument[String](uri) + uri => new ArrowFileDocument[String](uri, stringArrowSchema, stringSerializer, stringDeserializer) ) } @@ -30,7 +51,7 @@ class PartitionedFileDocumentSpec extends AnyFlatSpec with Matchers with BeforeA "PartitionDocument" should "create and write to each partition directly" in { for (i <- 0 until numOfPartitions) { val partitionURI = getPartitionURI(partitionId, i) - val fileDoc = new ArrowFileDocument[String](partitionURI) + val fileDoc = new ArrowFileDocument[String](partitionURI, stringArrowSchema, stringSerializer, stringDeserializer) fileDoc.open() fileDoc.putOne(s"Data for partition $i") fileDoc.close() @@ -46,7 +67,7 @@ class PartitionedFileDocumentSpec extends AnyFlatSpec with Matchers with BeforeA // Write some data directly to each partition for (i <- 0 until numOfPartitions) { val partitionURI = getPartitionURI(partitionId, i) - val fileDoc = new ArrowFileDocument[String](partitionURI) + val fileDoc = new ArrowFileDocument[String](partitionURI, stringArrowSchema, stringSerializer, stringDeserializer) fileDoc.open() fileDoc.putOne(s"Content in partition $i") fileDoc.close() @@ -63,7 +84,7 @@ class PartitionedFileDocumentSpec extends AnyFlatSpec with Matchers with BeforeA // Write some data directly to each partition for (i <- 0 until numOfPartitions) { val partitionURI = getPartitionURI(partitionId, i) - val fileDoc = new ArrowFileDocument[String](partitionURI) + val fileDoc = new ArrowFileDocument[String](partitionURI, stringArrowSchema, stringSerializer, stringDeserializer) fileDoc.open() fileDoc.putOne(s"Some data in partition $i") fileDoc.close() @@ -86,7 +107,7 @@ class PartitionedFileDocumentSpec extends AnyFlatSpec with Matchers with BeforeA val futures = (0 until numOfPartitions).map { i => Future { val partitionURI = getPartitionURI(partitionId, i) - val fileDoc = new ArrowFileDocument[String](partitionURI) + val fileDoc = new ArrowFileDocument[String](partitionURI, stringArrowSchema, stringSerializer, stringDeserializer) fileDoc.open() fileDoc.putOne(s"Concurrent write to partition $i") fileDoc.close()