Skip to content

Commit

Permalink
add type R to PartitionedItemizedFileDocument
Browse files Browse the repository at this point in the history
  • Loading branch information
bobbai00 committed Dec 13, 2024
1 parent 6bf133b commit b5d6a20
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ object ItemizedFileDocument {
*
* @param uri the identifier of the file.
* If the file doesn't physically exist, ItemizedFileDocument will create the file(possibly also the parent folder)
* during its initialization.
* during its initialization.
* The lifecycle of the file is bundled with JVM, i.e. when JVM exits, the file gets deleted.
*/
class ItemizedFileDocument[T >: Null <: AnyRef](val uri: URI)
Expand Down Expand Up @@ -75,9 +75,9 @@ class ItemizedFileDocument[T >: Null <: AnyRef](val uri: URI)
}

/**
* Utility function to get an iterator of data items of type T.
* Each returned item will be deserialized using Kryo.
*/
* Utility function to get an iterator of data items of type T.
* Each returned item will be deserialized using Kryo.
*/
private def getIterator: Iterator[T] = {
lazy val input = new com.twitter.chill.Input(file.getContent.getInputStream)
new Iterator[T] {
Expand Down Expand Up @@ -180,10 +180,11 @@ class ItemizedFileDocument[T >: Null <: AnyRef](val uri: URI)
* @param i index starting from 0
* @return data item of type T
*/
override def getItem(i: Int): T = withReadLock {
val iterator = getIterator
iterator.drop(i).next()
}
override def getItem(i: Int): T =
withReadLock {
val iterator = getIterator
iterator.drop(i).next()
}

override def getRange(from: Int, until: Int): Iterator[T] =
withReadLock {
Expand All @@ -198,11 +199,12 @@ class ItemizedFileDocument[T >: Null <: AnyRef](val uri: URI)
override def getCount: Long =
withReadLock {
getIterator.size
}
}

override def get(): Iterator[T] = withReadLock {
getIterator
}
override def get(): Iterator[T] =
withReadLock {
getIterator
}

/**
* Physically remove the file specified by the URI. This method is THREAD-SAFE.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,111 +1,118 @@
package edu.uci.ics.amber.core.storage.result
package edu.uci.ics.amber.core.storage.result

import edu.uci.ics.amber.core.storage.model.VirtualDocument
import edu.uci.ics.amber.core.storage.result.PartitionedItemizedFileDocument.getPartitionURI
import edu.uci.ics.amber.util.PathUtils.workflowResultsRootPath
import org.apache.commons.vfs2.VFS
import edu.uci.ics.amber.core.storage.model.VirtualDocument
import edu.uci.ics.amber.core.storage.result.PartitionedItemizedFileDocument.getPartitionURI
import edu.uci.ics.amber.util.PathUtils.workflowResultsRootPath
import org.apache.commons.vfs2.VFS

import java.net.URI
import java.util.concurrent.locks.ReentrantLock
import java.net.URI
import java.util.concurrent.locks.ReentrantLock

object PartitionedItemizedFileDocument {

object PartitionedItemizedFileDocument {

/**
* Utility function to generate the partition URI by index.
*/
def getPartitionURI(id: String, i: Int): URI = {
workflowResultsRootPath.resolve(id).resolve(s"partition$i").toUri
/**
* Utility function to generate the partition URI by index.
*/
def getPartitionURI(id: String, i: Int): URI = {
workflowResultsRootPath.resolve(id).resolve(s"partition$i").toUri
}
}

class PartitionedItemizedFileDocument[R <: VirtualDocument[T], T >: Null <: AnyRef](
val id: String,
val numOfPartition: Int,
createPartition: URI => R
) extends VirtualDocument[T] {

// The vector of partitions, each being an instance of R (a subclass of VirtualDocument[T])
private val partitions =
Vector.tabulate(numOfPartition)(i => createPartition(getPartitionURI(id, i)))

// Cursor for each partition to track read position
private val cursors = Array.fill(numOfPartition)(0)

private val mutex = new ReentrantLock()

// Use round-robin to decide which partition to go to when reading the i-th item
private def getPartitionIndex(i: Int): Int = i % numOfPartition

override def getURI: URI = workflowResultsRootPath.resolve(id).toUri

override def getItem(i: Int): T = {
mutex.lock()
try {
val partitionIndex = getPartitionIndex(i)
val document = partitions(partitionIndex)
val item = document.getItem(cursors(partitionIndex))
cursors(partitionIndex) += 1
item
} finally {
mutex.unlock()
}
}

class PartitionedItemizedFileDocument[T >: Null <: AnyRef](val id: String, val numOfPartition: Int)
extends VirtualDocument[T] {

// The array of itemized file documents, each stands for a partition
private val partitions =
Array.tabulate(numOfPartition)(i => new ItemizedFileDocument[T](getPartitionURI(id, i)))

// Cursor for each partition to track read position
private val cursors = Array.fill(numOfPartition)(0)

private val mutex = new ReentrantLock()
override def get(): Iterator[T] =
new Iterator[T] {
private var partitionIndex = 0
private val iterators = partitions.map(_.get())

// use round-robin to decide which partition to go to when reading i-th item
private def getPartitionIndex(i: Int): Int = i % numOfPartition
override def hasNext: Boolean = iterators.exists(_.hasNext)

override def getURI: URI =
workflowResultsRootPath.resolve(id).toUri

override def getItem(i: Int): T = {
mutex.lock()
try {
val partitionIndex = getPartitionIndex(i)
val document = partitions(partitionIndex)
val item = document.getItem(cursors(partitionIndex))
cursors(partitionIndex) += 1
item
} finally {
mutex.unlock()
}
}

override def get(): Iterator[T] =
new Iterator[T] {
private var partitionIndex = 0
private val iterators = partitions.map(_.get())

override def hasNext: Boolean = iterators.exists(_.hasNext)

override def next(): T = {
mutex.lock()
try {
while (!iterators(partitionIndex).hasNext) {
partitionIndex = getPartitionIndex(partitionIndex + 1)
}
iterators(partitionIndex).next()
} finally {
mutex.unlock()
override def next(): T = {
mutex.lock()
try {
while (!iterators(partitionIndex).hasNext) {
partitionIndex = getPartitionIndex(partitionIndex + 1)
}
iterators(partitionIndex).next()
} finally {
mutex.unlock()
}
}
}

override def getRange(from: Int, until: Int): Iterator[T] = {
mutex.lock()
try {
get().slice(from, until)
} finally {
mutex.unlock()
}
override def getRange(from: Int, until: Int): Iterator[T] = {
mutex.lock()
try {
get().slice(from, until)
} finally {
mutex.unlock()
}
}

override def getAfter(offset: Int): Iterator[T] = {
mutex.lock()
try {
get().drop(offset + 1)
} finally {
mutex.unlock()
}
override def getAfter(offset: Int): Iterator[T] = {
mutex.lock()
try {
get().drop(offset + 1)
} finally {
mutex.unlock()
}
}

override def getCount: Long = {
mutex.lock()
try {
partitions.map(_.getCount).sum
} finally {
mutex.unlock()
}
override def getCount: Long = {
mutex.lock()
try {
partitions.map(_.getCount).sum
} finally {
mutex.unlock()
}
}

override def clear(): Unit = {
mutex.lock()
try {
for (partition <- partitions) {
partition.clear()
}
VFS.getManager.resolveFile(getURI).delete()
} finally {
mutex.unlock()
override def clear(): Unit = {
mutex.lock()
try {
// Clear each partition first
for (partition <- partitions) {
partition.clear()
}

// Delete the directory containing all partitions
val directory = VFS.getManager.resolveFile(getURI)
if (directory.exists()) {
directory.delete() // Deletes the directory and its contents
}
} finally {
mutex.unlock()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@ import org.scalatest.matchers.should.Matchers

class PartitionedItemizedFileDocumentSpec extends AnyFlatSpec with Matchers with BeforeAndAfter {

var partitionDocument: PartitionedItemizedFileDocument[String] = _
var partitionDocument: PartitionedItemizedFileDocument[ItemizedFileDocument[String], String] = _
val numOfPartitions = 3
val partitionId: String = "partition_doc_test"

before {
// Initialize the PartitionDocument with a base ID and number of partitions
partitionDocument = new PartitionedItemizedFileDocument[String](partitionId, numOfPartitions)
partitionDocument = new PartitionedItemizedFileDocument[ItemizedFileDocument[String], String](
partitionId,
numOfPartitions,
uri => new ItemizedFileDocument[String](uri)
)
}

after {
Expand Down

0 comments on commit b5d6a20

Please sign in to comment.