Skip to content

Commit

Permalink
Merge branch 'master' into add-public-dataset
Browse files Browse the repository at this point in the history
  • Loading branch information
GspikeHalo authored Dec 16, 2024
2 parents 710d07e + 41c47bc commit e952e60
Show file tree
Hide file tree
Showing 25 changed files with 281 additions and 601 deletions.
64 changes: 2 additions & 62 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
<br>

<h4 align="center">
<a href="https://github.com/Texera/texera#videos">Demo Video</a>
<a href="https://texera.io">Official Site</a>
|
<a href="https://texera.github.io/blog/">Blogs</a>
<a href="https://texera.io/?cat=11">Blogs</a>
|
<a href="https://github.com/Texera/texera/wiki/Getting-Started">Getting Started</a>
<br>
Expand All @@ -29,13 +29,6 @@
<img alt="Static Badge" src="https://img.shields.io/badge/Largest_Deployment-100_nodes,_400_cores-green">
</p>

# Motivation

* Data science is labor-intensive and particularly challenging for non-IT users applying AI/ML.
* Many workflow-based data science platforms lack parallelism, limiting their ability to handle big datasets.
* Cloud services and technologies have advanced significantly over the past decade, enabling powerful browser-based interfaces supported by high-speed networks.
* Existing data science platforms offer limited interaction during long-running jobs, making them difficult to manage after execution begins.

# Goals

* Provide data science as cloud services;
Expand Down Expand Up @@ -148,59 +141,6 @@ The workflow in the use case shown below includes data cleaning, ML model traini
_In JAMIA 2021_ | [PDF](https://www.ncbi.nlm.nih.gov/pmc/articles/PMC7989302/pdf/ocab047.pdf)
</details>


# Education
<table>
<tr style="height: 500px;">
<td align="center">
<a href="https://ds4all.ics.uci.edu/">
<img src="https://ds4all.ics.uci.edu/wp-content/uploads/2023/07/banner-1024x576.png">
</a>
<p><b>Data Science for All</b></p>
An NSF-funded summer program to teach high-school students data science and AI/ML
</td>
<td align="center">
<a href="https://canvas.eee.uci.edu/courses/63639/pages/syllabus">
<img src="https://github.com/user-attachments/assets/a7569fd3-6857-48b4-80dc-d9f006ae2c8f">
</a>
<p><b>ICS 80: Data Science and AI/ML Using Workflows</b></p>
A Spring 2024 course at UCI, teaching 42 undergraduates, most of whom are not computer science majors, to learn data science and AI/ML
</td>
<td align="center">
<a href="https://sites.google.com/uci.edu/ds-workshop2024/home">
<img src="https://www.cerritos.edu/_resources/images/common/cerritos-college-logo.svg">
</a>
<p><b>Workshop of Data Science for Everyone at Cerritos College</b></p>
A two-day workshop designed for non-CS students to learn data science and ML without a single line of coding
</td>
</tr>
</table>


# Videos
<table>
<tr style="height: 500px;">
<td align="center">
<a href="https://www.youtube.com/watch?v=B81iMFS5fPc">
<img src="https://img.youtube.com/vi/B81iMFS5fPc/0.jpg" alt="Watch the video">
</a>
<p><b>dkNET Webinar 04/26/2024</b></p>
</td>
<td align="center">
<a href="https://www.youtube.com/watch?v=SP-XiDADbw0">
<img src="https://img.youtube.com/vi/SP-XiDADbw0/0.jpg" alt="Watch the video">
</a>
<p><b>Texera Demo @ VLDB'20</b></p>
</td>
<td align="center">
<a href="https://www.youtube.com/watch?v=T5ShFRfHmgI">
<img src="https://img.youtube.com/vi/T5ShFRfHmgI/0.jpg" alt="Watch the video">
</a>
<p><b>Amber Presentation @ VLDB'20</b></p>
</td>
</tr>
</table>

# Getting Started

* For users, visit [Guide to Use Texera](https://github.com/Texera/texera/wiki/Getting-Started).
Expand Down
4 changes: 2 additions & 2 deletions core/amber/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ fault-tolerance {
}
}

region-plan-generator {
enable-cost-based-region-plan-generator = false
schedule-generator {
enable-cost-based-schedule-generator = false
use-global-search = false
use-top-down-search = false
search-timeout-milliseconds = 1000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package edu.uci.ics.amber.engine.architecture.controller

import edu.uci.ics.amber.core.workflow.{PhysicalPlan, WorkflowContext}
import edu.uci.ics.amber.engine.architecture.scheduling.{
CostBasedRegionPlanGenerator,
ExpansionGreedyRegionPlanGenerator,
CostBasedScheduleGenerator,
ExpansionGreedyScheduleGenerator,
Region,
Schedule
}
Expand All @@ -21,23 +21,24 @@ class WorkflowScheduler(
* Update the schedule to be executed, based on the given physicalPlan.
*/
def updateSchedule(physicalPlan: PhysicalPlan): Unit = {
// generate an RegionPlan with regions using a region plan generator.
val (regionPlan, updatedPhysicalPlan) = if (AmberConfig.enableCostBasedRegionPlanGenerator) {
// CostBasedRegionPlanGenerator considers costs to try to find an optimal plan.
new CostBasedRegionPlanGenerator(
workflowContext,
physicalPlan,
actorId
).generate()
} else {
// ExpansionGreedyRegionPlanGenerator is the stable default plan generator.
new ExpansionGreedyRegionPlanGenerator(
workflowContext,
physicalPlan
).generate()
}
// generate a schedule using a region plan generator.
val (generatedSchedule, updatedPhysicalPlan) =
if (AmberConfig.enableCostBasedScheduleGenerator) {
// CostBasedRegionPlanGenerator considers costs to try to find an optimal plan.
new CostBasedScheduleGenerator(
workflowContext,
physicalPlan,
actorId
).generate()
} else {
// ExpansionGreedyRegionPlanGenerator is the stable default plan generator.
new ExpansionGreedyScheduleGenerator(
workflowContext,
physicalPlan
).generate()
}
this.schedule = generatedSchedule
this.physicalPlan = updatedPhysicalPlan
this.schedule = Schedule.apply(regionPlan)
}

def getNextRegions: Set[Region] = if (!schedule.hasNext) Set() else schedule.next()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package edu.uci.ics.amber.engine.architecture.scheduling

import edu.uci.ics.amber.core.storage.result.OpResultStorage
import edu.uci.ics.amber.core.workflow.{PhysicalPlan, WorkflowContext}
import edu.uci.ics.amber.engine.common.{AmberConfig, AmberLogging}
import edu.uci.ics.amber.virtualidentity.{ActorVirtualIdentity, PhysicalOpIdentity}
Expand All @@ -14,14 +13,14 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future}
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}
import scala.util.control.Breaks.{break, breakable}
import scala.util.{Failure, Success, Try}

class CostBasedRegionPlanGenerator(
class CostBasedScheduleGenerator(
workflowContext: WorkflowContext,
initialPhysicalPlan: PhysicalPlan,
val actorId: ActorVirtualIdentity
) extends RegionPlanGenerator(
) extends ScheduleGenerator(
workflowContext,
initialPhysicalPlan
)
Expand All @@ -35,20 +34,21 @@ class CostBasedRegionPlanGenerator(
numStatesExplored: Int = 0
)

def generate(): (RegionPlan, PhysicalPlan) = {

def generate(): (Schedule, PhysicalPlan) = {
val startTime = System.nanoTime()
val regionDAG = createRegionDAG()
val totalRPGTime = System.nanoTime() - startTime
val regionPlan = RegionPlan(
regions = regionDAG.iterator().asScala.toSet,
regionLinks = regionDAG.edgeSet().asScala.toSet
)
val schedule = generateScheduleFromRegionPlan(regionPlan)
logger.info(
s"WID: ${workflowContext.workflowId.id}, EID: ${workflowContext.executionId.id}, total RPG time: " +
s"${totalRPGTime / 1e6} ms."
)
(
RegionPlan(
regions = regionDAG.iterator().asScala.toSet,
regionLinks = regionDAG.edgeSet().asScala.toSet
),
schedule,
physicalPlan
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,22 @@ import scala.annotation.tailrec
import scala.collection.mutable
import scala.jdk.CollectionConverters.CollectionHasAsScala

class ExpansionGreedyRegionPlanGenerator(
class ExpansionGreedyScheduleGenerator(
workflowContext: WorkflowContext,
initialPhysicalPlan: PhysicalPlan
) extends RegionPlanGenerator(workflowContext, initialPhysicalPlan)
) extends ScheduleGenerator(workflowContext, initialPhysicalPlan)
with LazyLogging {
def generate(): (RegionPlan, PhysicalPlan) = {
def generate(): (Schedule, PhysicalPlan) = {

val regionDAG = createRegionDAG()
val regionPlan = RegionPlan(
regions = regionDAG.vertexSet().asScala.toSet,
regionLinks = regionDAG.edgeSet().asScala.toSet
)
val schedule = generateScheduleFromRegionPlan(regionPlan)

(
RegionPlan(
regions = regionDAG.vertexSet().asScala.toSet,
regionLinks = regionDAG.edgeSet().asScala.toSet
),
schedule,
physicalPlan
)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,12 @@
package edu.uci.ics.amber.engine.architecture.scheduling

import scala.collection.mutable
import scala.jdk.CollectionConverters.CollectionHasAsScala

case class Schedule(private val regionPlan: RegionPlan) extends Iterator[Set[Region]] {
private val levels = mutable.Map.empty[RegionIdentity, Int]
private val levelSets = mutable.Map.empty[Int, mutable.Set[RegionIdentity]]
private var currentLevel = 0

regionPlan.topologicalIterator().foreach { currentVertex =>
val level = regionPlan.dag.incomingEdgesOf(currentVertex).asScala.foldLeft(0) {
(maxLevel, incomingEdge) =>
val sourceVertex = regionPlan.dag.getEdgeSource(incomingEdge)
math.max(maxLevel, levels.getOrElse(sourceVertex, 0) + 1)
}

levels(currentVertex) = level
levelSets.getOrElseUpdate(level, mutable.Set.empty).add(currentVertex)
}

currentLevel = levelSets.keys.minOption.getOrElse(0)
case class Schedule(private val levelSets: Map[Int, Set[Region]]) extends Iterator[Set[Region]] {
private var currentLevel = levelSets.keys.minOption.getOrElse(0)

override def hasNext: Boolean = levelSets.isDefinedAt(currentLevel)

override def next(): Set[Region] = {
val regions = levelSets(currentLevel).map(regionId => regionPlan.getRegion(regionId)).toSet
val regions = levelSets(currentLevel)
currentLevel += 1
regions
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package edu.uci.ics.amber.engine.architecture.scheduling
import edu.uci.ics.amber.core.storage.result.{OpResultStorage, ResultStorage}
import edu.uci.ics.amber.core.tuple.Schema
import edu.uci.ics.amber.core.workflow.{PhysicalOp, PhysicalPlan, WorkflowContext}
import edu.uci.ics.amber.engine.architecture.scheduling.RegionPlanGenerator.replaceVertex
import edu.uci.ics.amber.engine.architecture.scheduling.ScheduleGenerator.replaceVertex
import edu.uci.ics.amber.engine.architecture.scheduling.resourcePolicies.{
DefaultResourceAllocator,
ExecutionClusterInfo
Expand All @@ -18,7 +18,7 @@ import org.jgrapht.traverse.TopologicalOrderIterator
import scala.collection.mutable
import scala.jdk.CollectionConverters.{CollectionHasAsScala, IteratorHasAsScala}

object RegionPlanGenerator {
object ScheduleGenerator {
def replaceVertex(
graph: DirectedAcyclicGraph[Region, RegionLink],
oldVertex: Region,
Expand Down Expand Up @@ -50,13 +50,27 @@ object RegionPlanGenerator {
}
}

abstract class RegionPlanGenerator(
abstract class ScheduleGenerator(
workflowContext: WorkflowContext,
var physicalPlan: PhysicalPlan
) {
private val executionClusterInfo = new ExecutionClusterInfo()

def generate(): (RegionPlan, PhysicalPlan)
def generate(): (Schedule, PhysicalPlan)

/**
* A schedule is a ranking on the regions of a region plan. Currently we use a total order of the regions.
*/
def generateScheduleFromRegionPlan(regionPlan: RegionPlan): Schedule = {
val levelSets = regionPlan
.topologicalIterator()
.zipWithIndex
.map(zippedRegionId => {
zippedRegionId._2 -> Set.apply(regionPlan.getRegion(zippedRegionId._1))
})
.toMap
Schedule.apply(levelSets)
}

def allocateResource(
regionDAG: DirectedAcyclicGraph[Region, RegionLink]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,13 @@ object AmberConfig {
val isFaultToleranceEnabled: Boolean = faultToleranceLogRootFolder.nonEmpty

// Region plan generator
val enableCostBasedRegionPlanGenerator: Boolean =
getConfSource.getBoolean("region-plan-generator.enable-cost-based-region-plan-generator")
val useGlobalSearch: Boolean = getConfSource.getBoolean("region-plan-generator.use-global-search")
val enableCostBasedScheduleGenerator: Boolean =
getConfSource.getBoolean("schedule-generator.enable-cost-based-schedule-generator")
val useGlobalSearch: Boolean = getConfSource.getBoolean("schedule-generator.use-global-search")
val useTopDownSearch: Boolean =
getConfSource.getBoolean("region-plan-generator.use-top-down-search")
getConfSource.getBoolean("schedule-generator.use-top-down-search")
val searchTimeoutMilliseconds: Int =
getConfSource.getInt("region-plan-generator.search-timeout-milliseconds")
getConfSource.getInt("schedule-generator.search-timeout-milliseconds")

// Storage configuration
val sinkStorageTTLInSecs: Int = getConfSource.getInt("result-cleanup.ttl-in-seconds")
Expand Down
Loading

0 comments on commit e952e60

Please sign in to comment.