From 2f38b442fe44685fe1693cee42a8f28b897fdb04 Mon Sep 17 00:00:00 2001 From: lyogev Date: Tue, 5 May 2020 10:46:15 +0300 Subject: [PATCH] fix(scheduler): add fair scheduler and a new parallel mode to metorikku --- .../scala/com/yotpo/metorikku/Metorikku.scala | 61 +++++++++---------- .../job/ConfigurationParser.scala | 26 ++++---- 2 files changed, 45 insertions(+), 42 deletions(-) diff --git a/src/main/scala/com/yotpo/metorikku/Metorikku.scala b/src/main/scala/com/yotpo/metorikku/Metorikku.scala index a12943610..8d1662467 100644 --- a/src/main/scala/com/yotpo/metorikku/Metorikku.scala +++ b/src/main/scala/com/yotpo/metorikku/Metorikku.scala @@ -9,20 +9,38 @@ import org.apache.log4j.LogManager object Metorikku extends App { val log = LogManager.getLogger(this.getClass) log.info("Starting Metorikku - Parsing configuration") - val session = Job(ConfigurationParser.parse(args)) - session.config.periodic match { - case Some(periodic) => { - executePeriodicTask(periodic) + val configurations = ConfigurationParser.parse(args) + + val jobs = configurations.map(config => + new Runnable { + def run(): Unit = { + val job = Job(config) + + job.config.periodic match { + case Some(periodic) => { + executePeriodicTask(job, periodic) + } + case _ => runMetrics(job) + } + } } - case _ => runMetrics(session) + ) + + jobs match { + case s if s.length > 1 => { + val threads = jobs.map(r => new Thread(r)) + threads.foreach(t => t.start()) + threads.foreach(t => t.join()) + } + case _ => jobs.foreach(r => r.run()) } - private def executePeriodicTask(periodic: Periodic) = { + private def executePeriodicTask(job: Job, periodic: Periodic) = { val task = new Runnable { def run() = { - session.sparkSession.catalog.clearCache() - runMetrics(session) + job.sparkSession.catalog.clearCache() + runMetrics(job) } } val ex = new ScheduledThreadPoolExecutor(1) @@ -30,31 +48,12 @@ object Metorikku extends App { ex.scheduleAtFixedRate(task, initialDelay, periodic.getTriggerDurationInSeconds(), TimeUnit.SECONDS) } - def runMetricsInParallel(job: Job, metrics: Seq[String]): Unit = { - val threads = metrics.map(metricSetPath => new Thread(new Runnable { - def run() { - val metricSet = new MetricSet(metricSetPath) - metricSet.run(job) - } - })).toList - - threads.foreach(t => t.start()) - threads.foreach(t => t.join()) - } - def runMetrics(job: Job): Unit = { job.config.metrics match { - case Some(metrics) => { - session.config.parallel match { - case Some(true) => runMetricsInParallel(job, metrics) - case _ => { - metrics.foreach(metricSetPath => { - val metricSet = new MetricSet(metricSetPath) - metricSet.run(job) - }) - } - } - } + case Some(metrics) => metrics.foreach(metricSetPath => { + val metricSet = new MetricSet(metricSetPath) + metricSet.run(job) + }) case None => log.warn("No mertics were defined, exiting") } } diff --git a/src/main/scala/com/yotpo/metorikku/configuration/job/ConfigurationParser.scala b/src/main/scala/com/yotpo/metorikku/configuration/job/ConfigurationParser.scala index 0c3d390e1..a6da59e94 100644 --- a/src/main/scala/com/yotpo/metorikku/configuration/job/ConfigurationParser.scala +++ b/src/main/scala/com/yotpo/metorikku/configuration/job/ConfigurationParser.scala @@ -12,36 +12,40 @@ import scopt.OptionParser object ConfigurationParser { val log: Logger = LogManager.getLogger(this.getClass) - case class ConfigFileName(job: Option[String] = None, filename: Option[String] = None) + case class ConfigFileName(job: Option[String] = None, filename: Option[Seq[String]] = None) val CLIparser: OptionParser[ConfigFileName] = new scopt.OptionParser[ConfigFileName]("Metorikku") { head("Metorikku", "1.0") opt[String]('j', "job") .action((x, c) => c.copy(job = Option(x))) .text("Job configuration JSON") - opt[String]('c', "config") - .text("Path to the job config file (YAML/JSON)") + opt[Seq[String]]('c', "config") + .text("Path to the job config file (YAML/JSON), you can add multiple files by concatenating the file names with ,") .action((x, c) => c.copy(filename = Option(x))) .validate(x => { - if (Files.exists(Paths.get(x))) { - success - } - else { - failure("Supplied file not found") + for (p <- x) { + if (!Files.exists(Paths.get(p))) { + failure("Supplied file not found") + } } + success }) help("help") text "use command line arguments to specify the configuration file path or content" } - def parse(args: Array[String]): Configuration = { + def parse(args: Array[String]): Seq[Configuration] = { log.info("Starting Metorikku - Parsing configuration") CLIparser.parse(args, ConfigFileName()) match { case Some(arguments) => arguments.job match { - case Some(job) => parseConfigurationFile(job, FileUtils.getObjectMapperByExtension("json")) + case Some(job) => Seq(parseConfigurationFile(job, FileUtils.getObjectMapperByExtension("json"))) case None => arguments.filename match { - case Some(filename) => parseConfigurationFile(FileUtils.readConfigurationFile(filename), FileUtils.getObjectMapperByFileName(filename)) + case Some(filenames) => + filenames. + map(filename => + parseConfigurationFile(FileUtils.readConfigurationFile(filename), + FileUtils.getObjectMapperByFileName(filename))).toList case None => throw new MetorikkuException("Failed to parse config file") } }