diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/ItemizedFileDocument.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/ItemizedFileDocument.scala index d5975c945f7..089b5a3cb5d 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/ItemizedFileDocument.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/ItemizedFileDocument.scala @@ -20,7 +20,7 @@ object ItemizedFileDocument { * * @param uri the identifier of the file. * If the file doesn't physically exist, ItemizedFileDocument will create the file(possibly also the parent folder) - * during its initialization. + * during its initialization. * The lifecycle of the file is bundled with JVM, i.e. when JVM exits, the file gets deleted. */ class ItemizedFileDocument[T >: Null <: AnyRef](val uri: URI) @@ -75,9 +75,9 @@ class ItemizedFileDocument[T >: Null <: AnyRef](val uri: URI) } /** - * Utility function to get an iterator of data items of type T. - * Each returned item will be deserialized using Kryo. - */ + * Utility function to get an iterator of data items of type T. + * Each returned item will be deserialized using Kryo. + */ private def getIterator: Iterator[T] = { lazy val input = new com.twitter.chill.Input(file.getContent.getInputStream) new Iterator[T] { @@ -180,10 +180,11 @@ class ItemizedFileDocument[T >: Null <: AnyRef](val uri: URI) * @param i index starting from 0 * @return data item of type T */ - override def getItem(i: Int): T = withReadLock { - val iterator = getIterator - iterator.drop(i).next() - } + override def getItem(i: Int): T = + withReadLock { + val iterator = getIterator + iterator.drop(i).next() + } override def getRange(from: Int, until: Int): Iterator[T] = withReadLock { @@ -198,11 +199,12 @@ class ItemizedFileDocument[T >: Null <: AnyRef](val uri: URI) override def getCount: Long = withReadLock { getIterator.size - } + } - override def get(): Iterator[T] = withReadLock { - getIterator - } + override def get(): Iterator[T] = + withReadLock { + getIterator + } /** * Physically remove the file specified by the URI. This method is THREAD-SAFE. diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/PartitionedItemizedFileDocument.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/PartitionedItemizedFileDocument.scala index 0a80e8d45f5..ab0ac3dfcfd 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/PartitionedItemizedFileDocument.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/PartitionedItemizedFileDocument.scala @@ -1,111 +1,118 @@ - package edu.uci.ics.amber.core.storage.result +package edu.uci.ics.amber.core.storage.result - import edu.uci.ics.amber.core.storage.model.VirtualDocument - import edu.uci.ics.amber.core.storage.result.PartitionedItemizedFileDocument.getPartitionURI - import edu.uci.ics.amber.util.PathUtils.workflowResultsRootPath - import org.apache.commons.vfs2.VFS +import edu.uci.ics.amber.core.storage.model.VirtualDocument +import edu.uci.ics.amber.core.storage.result.PartitionedItemizedFileDocument.getPartitionURI +import edu.uci.ics.amber.util.PathUtils.workflowResultsRootPath +import org.apache.commons.vfs2.VFS - import java.net.URI - import java.util.concurrent.locks.ReentrantLock +import java.net.URI +import java.util.concurrent.locks.ReentrantLock +object PartitionedItemizedFileDocument { - object PartitionedItemizedFileDocument { - - /** - * Utility function to generate the partition URI by index. - */ - def getPartitionURI(id: String, i: Int): URI = { - workflowResultsRootPath.resolve(id).resolve(s"partition$i").toUri + /** + * Utility function to generate the partition URI by index. + */ + def getPartitionURI(id: String, i: Int): URI = { + workflowResultsRootPath.resolve(id).resolve(s"partition$i").toUri + } +} + +class PartitionedItemizedFileDocument[R <: VirtualDocument[T], T >: Null <: AnyRef]( + val id: String, + val numOfPartition: Int, + createPartition: URI => R +) extends VirtualDocument[T] { + + // The vector of partitions, each being an instance of R (a subclass of VirtualDocument[T]) + private val partitions = + Vector.tabulate(numOfPartition)(i => createPartition(getPartitionURI(id, i))) + + // Cursor for each partition to track read position + private val cursors = Array.fill(numOfPartition)(0) + + private val mutex = new ReentrantLock() + + // Use round-robin to decide which partition to go to when reading the i-th item + private def getPartitionIndex(i: Int): Int = i % numOfPartition + + override def getURI: URI = workflowResultsRootPath.resolve(id).toUri + + override def getItem(i: Int): T = { + mutex.lock() + try { + val partitionIndex = getPartitionIndex(i) + val document = partitions(partitionIndex) + val item = document.getItem(cursors(partitionIndex)) + cursors(partitionIndex) += 1 + item + } finally { + mutex.unlock() } } - class PartitionedItemizedFileDocument[T >: Null <: AnyRef](val id: String, val numOfPartition: Int) - extends VirtualDocument[T] { - - // The array of itemized file documents, each stands for a partition - private val partitions = - Array.tabulate(numOfPartition)(i => new ItemizedFileDocument[T](getPartitionURI(id, i))) - - // Cursor for each partition to track read position - private val cursors = Array.fill(numOfPartition)(0) - - private val mutex = new ReentrantLock() + override def get(): Iterator[T] = + new Iterator[T] { + private var partitionIndex = 0 + private val iterators = partitions.map(_.get()) - // use round-robin to decide which partition to go to when reading i-th item - private def getPartitionIndex(i: Int): Int = i % numOfPartition + override def hasNext: Boolean = iterators.exists(_.hasNext) - override def getURI: URI = - workflowResultsRootPath.resolve(id).toUri - - override def getItem(i: Int): T = { - mutex.lock() - try { - val partitionIndex = getPartitionIndex(i) - val document = partitions(partitionIndex) - val item = document.getItem(cursors(partitionIndex)) - cursors(partitionIndex) += 1 - item - } finally { - mutex.unlock() - } - } - - override def get(): Iterator[T] = - new Iterator[T] { - private var partitionIndex = 0 - private val iterators = partitions.map(_.get()) - - override def hasNext: Boolean = iterators.exists(_.hasNext) - - override def next(): T = { - mutex.lock() - try { - while (!iterators(partitionIndex).hasNext) { - partitionIndex = getPartitionIndex(partitionIndex + 1) - } - iterators(partitionIndex).next() - } finally { - mutex.unlock() + override def next(): T = { + mutex.lock() + try { + while (!iterators(partitionIndex).hasNext) { + partitionIndex = getPartitionIndex(partitionIndex + 1) } + iterators(partitionIndex).next() + } finally { + mutex.unlock() } } + } - override def getRange(from: Int, until: Int): Iterator[T] = { - mutex.lock() - try { - get().slice(from, until) - } finally { - mutex.unlock() - } + override def getRange(from: Int, until: Int): Iterator[T] = { + mutex.lock() + try { + get().slice(from, until) + } finally { + mutex.unlock() } + } - override def getAfter(offset: Int): Iterator[T] = { - mutex.lock() - try { - get().drop(offset + 1) - } finally { - mutex.unlock() - } + override def getAfter(offset: Int): Iterator[T] = { + mutex.lock() + try { + get().drop(offset + 1) + } finally { + mutex.unlock() } + } - override def getCount: Long = { - mutex.lock() - try { - partitions.map(_.getCount).sum - } finally { - mutex.unlock() - } + override def getCount: Long = { + mutex.lock() + try { + partitions.map(_.getCount).sum + } finally { + mutex.unlock() } + } - override def clear(): Unit = { - mutex.lock() - try { - for (partition <- partitions) { - partition.clear() - } - VFS.getManager.resolveFile(getURI).delete() - } finally { - mutex.unlock() + override def clear(): Unit = { + mutex.lock() + try { + // Clear each partition first + for (partition <- partitions) { + partition.clear() + } + + // Delete the directory containing all partitions + val directory = VFS.getManager.resolveFile(getURI) + if (directory.exists()) { + directory.delete() // Deletes the directory and its contents } + } finally { + mutex.unlock() } } +} diff --git a/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/PartitionedItemizedFileDocumentSpec.scala b/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/PartitionedItemizedFileDocumentSpec.scala index 7f113420609..1c204341e73 100644 --- a/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/PartitionedItemizedFileDocumentSpec.scala +++ b/core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/PartitionedItemizedFileDocumentSpec.scala @@ -9,13 +9,17 @@ import org.scalatest.matchers.should.Matchers class PartitionedItemizedFileDocumentSpec extends AnyFlatSpec with Matchers with BeforeAndAfter { - var partitionDocument: PartitionedItemizedFileDocument[String] = _ + var partitionDocument: PartitionedItemizedFileDocument[ItemizedFileDocument[String], String] = _ val numOfPartitions = 3 val partitionId: String = "partition_doc_test" before { // Initialize the PartitionDocument with a base ID and number of partitions - partitionDocument = new PartitionedItemizedFileDocument[String](partitionId, numOfPartitions) + partitionDocument = new PartitionedItemizedFileDocument[ItemizedFileDocument[String], String]( + partitionId, + numOfPartitions, + uri => new ItemizedFileDocument[String](uri) + ) } after {