From c7d36885ca6de88ac9a2c766c080afe732c0906f Mon Sep 17 00:00:00 2001 From: WooYeon Date: Mon, 27 Mar 2017 16:29:03 +0900 Subject: [PATCH 01/11] commit --- .../snu/cay/dolphin/async/AsyncDolphinDriver.java | 10 ++++++++++ .../cay/services/ps/driver/impl/ClockManager.java | 15 +++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinDriver.java b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinDriver.java index 1c55ce445..02f870e2e 100644 --- a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinDriver.java +++ b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinDriver.java @@ -49,6 +49,7 @@ import edu.snu.cay.utils.trace.HTraceParameters; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.driver.ProgressProvider; +import org.apache.reef.driver.client.JobMessageObserver; import org.apache.reef.driver.context.ActiveContext; import org.apache.reef.driver.context.ClosedContext; import org.apache.reef.driver.context.ContextConfiguration; @@ -283,6 +284,8 @@ public final class AsyncDolphinDriver { */ private final ExecutorService optimizationTriggerExecutor = Executors.newSingleThreadExecutor(); + private final JobMessageObserver jobMessageObserver; + /** * Injectable constructor. * @@ -293,6 +296,7 @@ public final class AsyncDolphinDriver { */ @Inject private AsyncDolphinDriver(final EvaluatorManager evaluatorManager, + final JobMessageObserver jobMessageObserver, final DataLoadingService dataLoadingService, final SynchronizationManager synchronizationManager, final ClockManager clockManager, @@ -318,6 +322,7 @@ private AsyncDolphinDriver(final EvaluatorManager evaluatorManager, final HTrace hTrace) throws IOException { hTrace.initialize(); this.evaluatorManager = evaluatorManager; + this.jobMessageObserver = jobMessageObserver; this.dataLoadingService = dataLoadingService; this.synchronizationManager = synchronizationManager; this.clockManager = clockManager; @@ -340,6 +345,11 @@ private AsyncDolphinDriver(final EvaluatorManager evaluatorManager, this.optimizationIntervalMs = optimizationIntervalMs; this.maxNumEpochs = maxNumEpochs; + this.clockManager.addClockListener(epochIdx -> { + final byte[] bytes = null; // encode epochIdx + jobMessageObserver.sendMessageToClient(bytes); + }); + try { final Injector workerInjector = injector.forkInjector(); workerInjector.bindVolatileInstance(EMDeleteExecutor.class, new WorkerRemover()); diff --git a/services/ps/src/main/java/edu/snu/cay/services/ps/driver/impl/ClockManager.java b/services/ps/src/main/java/edu/snu/cay/services/ps/driver/impl/ClockManager.java index 9081aa4c2..58a14b7cc 100644 --- a/services/ps/src/main/java/edu/snu/cay/services/ps/driver/impl/ClockManager.java +++ b/services/ps/src/main/java/edu/snu/cay/services/ps/driver/impl/ClockManager.java @@ -72,6 +72,11 @@ public final class ClockManager { */ private int globalMinimumClock; + /** + * + */ + private final List> callbacks = new ArrayList<>(); + @Inject private ClockManager(final AggregationMaster aggregationMaster, final ClockMsgCodec codec, @@ -84,6 +89,14 @@ private ClockManager(final AggregationMaster aggregationMaster, minimumClockWorkers = new ArrayList<>(); } + /** + * + * @param callback + */ + public void addClockListener(final EventHandler callback) { + callbacks.add(callback); + } + /** * Helper function to create broadcast global minimum clock message. */ @@ -199,6 +212,8 @@ private synchronized void tickClock(final String workerId) { if (minimumClockWorkers.remove(workerId)) { if (minimumClockWorkers.size() == 0) { globalMinimumClock++; + + callbacks.forEach(callback -> callback.onNext(globalMinimumClock)); broadcastGlobalMinimumClock(); } } From ca8427cbb9c9f2e89766e95a125b325985c687a9 Mon Sep 17 00:00:00 2001 From: junhoe Date: Tue, 28 Mar 2017 15:41:14 +0900 Subject: [PATCH 02/11] DolphinDriverLauncher in AsyncDolphinDriver and AsyncDolphinLauncher --- .../cay/dolphin/async/AsyncDolphinDriver.java | 16 +++++++++++++--- .../cay/dolphin/async/AsyncDolphinLauncher.java | 11 +++++------ .../services/ps/driver/impl/ClockManager.java | 13 ++++++------- 3 files changed, 24 insertions(+), 16 deletions(-) diff --git a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinDriver.java b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinDriver.java index 02f870e2e..f5b6e1e5b 100644 --- a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinDriver.java +++ b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinDriver.java @@ -80,6 +80,7 @@ import javax.inject.Inject; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; @@ -284,6 +285,9 @@ public final class AsyncDolphinDriver { */ private final ExecutorService optimizationTriggerExecutor = Executors.newSingleThreadExecutor(); + /** + * Send progress information to Client. + */ private final JobMessageObserver jobMessageObserver; /** @@ -345,9 +349,15 @@ private AsyncDolphinDriver(final EvaluatorManager evaluatorManager, this.optimizationIntervalMs = optimizationIntervalMs; this.maxNumEpochs = maxNumEpochs; - this.clockManager.addClockListener(epochIdx -> { - final byte[] bytes = null; // encode epochIdx - jobMessageObserver.sendMessageToClient(bytes); + this.clockManager.addProgressUpdateListener(epochIdx -> { + // encode progress information : epochIdx, maxNumEpochs + final int[] progressInfo = {epochIdx, maxNumEpochs}; + final byte[] encodedProgressInfo = ByteBuffer.allocate(4 * progressInfo.length) + .putInt(epochIdx) + .putInt(maxNumEpochs) + .array(); + jobMessageObserver.sendMessageToClient(encodedProgressInfo); + LOG.log(Level.INFO, "Epoch event is triggered! epoch index : {0}", epochIdx); }); try { diff --git a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinLauncher.java b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinLauncher.java index d9325a2d7..a432018a1 100644 --- a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinLauncher.java +++ b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinLauncher.java @@ -56,9 +56,7 @@ import edu.snu.cay.utils.trace.parameters.ReceiverPort; import edu.snu.cay.utils.trace.parameters.ReceiverType; import org.apache.reef.annotations.audience.ClientSide; -import org.apache.reef.client.DriverConfiguration; -import org.apache.reef.client.DriverLauncher; -import org.apache.reef.client.LauncherStatus; +import org.apache.reef.client.*; import org.apache.reef.io.network.naming.LocalNameResolverConfiguration; import org.apache.reef.io.network.naming.NameServerConfiguration; import org.apache.reef.io.network.util.StringIdentifierFactory; @@ -143,7 +141,6 @@ public static LauncherStatus launch(final String jobName, basicParameterInjector.getNamedInstance(LocalRuntimeMaxNumEvaluators.class), basicParameterInjector.getNamedInstance(JVMHeapSlack.class)) : getYarnRuntimeConfiguration(basicParameterInjector.getNamedInstance(JVMHeapSlack.class)); - // configuration for the parameter server final boolean dynamic = basicParameterInjector.getNamedInstance(Dynamic.class); final Class managerClass = dynamic ? @@ -210,12 +207,12 @@ public static LauncherStatus launch(final String jobName, // driver-side configurations final Configuration driverConf = getDriverConfiguration(jobName, basicParameterInjector); final int timeout = basicParameterInjector.getNamedInstance(Timeout.class); - - final LauncherStatus status = DriverLauncher.getLauncher(runTimeConf).run( + final LauncherStatus status = DolphinDriverLauncher.getLauncher(runTimeConf).run( Configurations.merge(basicParameterConf, parameterServerConf, serializedServerConf, serializedWorkerConf, driverConf, customDriverConfiguration, serializedEMClientConf, dashboardConf), timeout); + LOG.log(Level.INFO, "REEF job completed: {0}", status); return status; @@ -448,4 +445,6 @@ private static String processInputDir(final String inputDir, final Injector inje final File inputFile = new File(inputDir); return "file:///" + inputFile.getAbsolutePath(); } + + } diff --git a/services/ps/src/main/java/edu/snu/cay/services/ps/driver/impl/ClockManager.java b/services/ps/src/main/java/edu/snu/cay/services/ps/driver/impl/ClockManager.java index 58a14b7cc..d35fbcf18 100644 --- a/services/ps/src/main/java/edu/snu/cay/services/ps/driver/impl/ClockManager.java +++ b/services/ps/src/main/java/edu/snu/cay/services/ps/driver/impl/ClockManager.java @@ -75,7 +75,7 @@ public final class ClockManager { /** * */ - private final List> callbacks = new ArrayList<>(); + private final List> progressUpdateCallbacks = new ArrayList<>(); @Inject private ClockManager(final AggregationMaster aggregationMaster, @@ -90,11 +90,11 @@ private ClockManager(final AggregationMaster aggregationMaster, } /** - * - * @param callback + * Add listener to progress update list. + * @param callback when #globalMinimumClock increases, callback functions in list are called. */ - public void addClockListener(final EventHandler callback) { - callbacks.add(callback); + public void addProgressUpdateListener(final EventHandler callback) { + progressUpdateCallbacks.add(callback); } /** @@ -212,9 +212,8 @@ private synchronized void tickClock(final String workerId) { if (minimumClockWorkers.remove(workerId)) { if (minimumClockWorkers.size() == 0) { globalMinimumClock++; - - callbacks.forEach(callback -> callback.onNext(globalMinimumClock)); broadcastGlobalMinimumClock(); + progressUpdateCallbacks.forEach(callback -> callback.onNext(globalMinimumClock)); } } } From 8ea7311ad3a6d8b7574db6e11660a5530623ff56 Mon Sep 17 00:00:00 2001 From: junhoe Date: Tue, 28 Mar 2017 15:50:48 +0900 Subject: [PATCH 03/11] [CAY-909] Update AsyncDolphinDriver, AsyncDolphinLauncher, ClockManager, implement DolphinDriverLauncher --- .../edu/snu/cay/dolphin/async/AsyncDolphinDriver.java | 8 ++------ .../edu/snu/cay/dolphin/async/AsyncDolphinLauncher.java | 4 +--- .../edu/snu/cay/services/ps/driver/impl/ClockManager.java | 2 +- 3 files changed, 4 insertions(+), 10 deletions(-) diff --git a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinDriver.java b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinDriver.java index f5b6e1e5b..3f9bdf2a8 100644 --- a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinDriver.java +++ b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinDriver.java @@ -285,11 +285,6 @@ public final class AsyncDolphinDriver { */ private final ExecutorService optimizationTriggerExecutor = Executors.newSingleThreadExecutor(); - /** - * Send progress information to Client. - */ - private final JobMessageObserver jobMessageObserver; - /** * Injectable constructor. * @@ -326,7 +321,6 @@ private AsyncDolphinDriver(final EvaluatorManager evaluatorManager, final HTrace hTrace) throws IOException { hTrace.initialize(); this.evaluatorManager = evaluatorManager; - this.jobMessageObserver = jobMessageObserver; this.dataLoadingService = dataLoadingService; this.synchronizationManager = synchronizationManager; this.clockManager = clockManager; @@ -356,6 +350,8 @@ private AsyncDolphinDriver(final EvaluatorManager evaluatorManager, .putInt(epochIdx) .putInt(maxNumEpochs) .array(); + + // send JobMessage to client jobMessageObserver.sendMessageToClient(encodedProgressInfo); LOG.log(Level.INFO, "Epoch event is triggered! epoch index : {0}", epochIdx); }); diff --git a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinLauncher.java b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinLauncher.java index a432018a1..07cb14863 100644 --- a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinLauncher.java +++ b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinLauncher.java @@ -207,12 +207,12 @@ public static LauncherStatus launch(final String jobName, // driver-side configurations final Configuration driverConf = getDriverConfiguration(jobName, basicParameterInjector); final int timeout = basicParameterInjector.getNamedInstance(Timeout.class); + final LauncherStatus status = DolphinDriverLauncher.getLauncher(runTimeConf).run( Configurations.merge(basicParameterConf, parameterServerConf, serializedServerConf, serializedWorkerConf, driverConf, customDriverConfiguration, serializedEMClientConf, dashboardConf), timeout); - LOG.log(Level.INFO, "REEF job completed: {0}", status); return status; @@ -445,6 +445,4 @@ private static String processInputDir(final String inputDir, final Injector inje final File inputFile = new File(inputDir); return "file:///" + inputFile.getAbsolutePath(); } - - } diff --git a/services/ps/src/main/java/edu/snu/cay/services/ps/driver/impl/ClockManager.java b/services/ps/src/main/java/edu/snu/cay/services/ps/driver/impl/ClockManager.java index d35fbcf18..f586b7caf 100644 --- a/services/ps/src/main/java/edu/snu/cay/services/ps/driver/impl/ClockManager.java +++ b/services/ps/src/main/java/edu/snu/cay/services/ps/driver/impl/ClockManager.java @@ -73,7 +73,7 @@ public final class ClockManager { private int globalMinimumClock; /** - * + * The list of EventHandler about progress update. */ private final List> progressUpdateCallbacks = new ArrayList<>(); From 59e7b0a28ee8dc1bc16a71ed313e3ce7c0e550b6 Mon Sep 17 00:00:00 2001 From: junhoe Date: Tue, 28 Mar 2017 17:05:44 +0900 Subject: [PATCH 04/11] devide ProgressMessageHandler from Dolphin specific driver launcher --- .../cay/dolphin/async/AsyncDolphinDriver.java | 16 +++++----------- .../cay/dolphin/async/AsyncDolphinLauncher.java | 13 ++++++++++++- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinDriver.java b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinDriver.java index 3f9bdf2a8..a8409e6d4 100644 --- a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinDriver.java +++ b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinDriver.java @@ -80,7 +80,7 @@ import javax.inject.Inject; import java.io.IOException; -import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; @@ -343,17 +343,11 @@ private AsyncDolphinDriver(final EvaluatorManager evaluatorManager, this.optimizationIntervalMs = optimizationIntervalMs; this.maxNumEpochs = maxNumEpochs; + // send JobMessage to client this.clockManager.addProgressUpdateListener(epochIdx -> { - // encode progress information : epochIdx, maxNumEpochs - final int[] progressInfo = {epochIdx, maxNumEpochs}; - final byte[] encodedProgressInfo = ByteBuffer.allocate(4 * progressInfo.length) - .putInt(epochIdx) - .putInt(maxNumEpochs) - .array(); - - // send JobMessage to client - jobMessageObserver.sendMessageToClient(encodedProgressInfo); - LOG.log(Level.INFO, "Epoch event is triggered! epoch index : {0}", epochIdx); + final String progressMessage = String.format("Progress epoch count is [%d / %d]", epochIdx, maxNumEpochs); + final byte[] encodedProgressMessage = progressMessage.getBytes(StandardCharsets.UTF_8); + jobMessageObserver.sendMessageToClient(encodedProgressMessage); }); try { diff --git a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinLauncher.java b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinLauncher.java index 07cb14863..f559c867c 100644 --- a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinLauncher.java +++ b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinLauncher.java @@ -16,6 +16,7 @@ package edu.snu.cay.dolphin.async; import edu.snu.cay.common.dataloader.TextInputFormat; +import edu.snu.cay.dolphin.async.client.ProgressMessageHandler; import edu.snu.cay.dolphin.async.metric.*; import edu.snu.cay.dolphin.async.dashboard.DashboardConfProvider; import edu.snu.cay.dolphin.async.dashboard.DashboardLauncher; @@ -208,7 +209,17 @@ public static LauncherStatus launch(final String jobName, final Configuration driverConf = getDriverConfiguration(jobName, basicParameterInjector); final int timeout = basicParameterInjector.getNamedInstance(Timeout.class); - final LauncherStatus status = DolphinDriverLauncher.getLauncher(runTimeConf).run( + // client-side configurations + final Configuration clientConf = ClientConfiguration.CONF + .set(ClientConfiguration.ON_JOB_SUBMITTED, DolphinDriverLauncher.SubmittedJobHandler.class) + .set(ClientConfiguration.ON_JOB_RUNNING, DolphinDriverLauncher.RunningJobHandler.class) + .set(ClientConfiguration.ON_JOB_COMPLETED, DolphinDriverLauncher.CompletedJobHandler.class) + .set(ClientConfiguration.ON_JOB_FAILED, DolphinDriverLauncher.FailedJobHandler.class) + .set(ClientConfiguration.ON_RUNTIME_ERROR, DolphinDriverLauncher.RuntimeErrorHandler.class) + .set(ClientConfiguration.ON_JOB_MESSAGE, ProgressMessageHandler.class) + .build(); + + final LauncherStatus status = DolphinDriverLauncher.getLauncher(runTimeConf, clientConf).run( Configurations.merge(basicParameterConf, parameterServerConf, serializedServerConf, serializedWorkerConf, driverConf, customDriverConfiguration, serializedEMClientConf, dashboardConf), From 6251a9f95a30a424868250ce0d98424ea1ed29c9 Mon Sep 17 00:00:00 2001 From: junhoe Date: Tue, 28 Mar 2017 18:38:02 +0900 Subject: [PATCH 05/11] Add DolphintDriverLauncher class --- .../dolphin/async/DolphinDriverLauncher.java | 284 ++++++++++++++++++ .../async/client/ProgressMessageHandler.java | 44 +++ .../dolphin/async/client/package-info.java | 19 ++ 3 files changed, 347 insertions(+) create mode 100644 dolphin/async/src/main/java/edu/snu/cay/dolphin/async/DolphinDriverLauncher.java create mode 100644 dolphin/async/src/main/java/edu/snu/cay/dolphin/async/client/ProgressMessageHandler.java create mode 100644 dolphin/async/src/main/java/edu/snu/cay/dolphin/async/client/package-info.java diff --git a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/DolphinDriverLauncher.java b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/DolphinDriverLauncher.java new file mode 100644 index 000000000..b5aecc948 --- /dev/null +++ b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/DolphinDriverLauncher.java @@ -0,0 +1,284 @@ +/* + * Copyright (C) 2017 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.cay.dolphin.async; + +import org.apache.reef.annotations.audience.ClientSide; +import org.apache.reef.annotations.audience.Public; +import org.apache.reef.client.*; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.annotations.Unit; +import org.apache.reef.tang.exceptions.InjectionException; +import org.apache.reef.util.Optional; +import org.apache.reef.wake.EventHandler; + +import javax.inject.Inject; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.IntBuffer; +import java.util.Collections; +import java.util.HashSet; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * A launcher for Dolphin driver. + */ +@Public +@ClientSide +@Unit +public final class DolphinDriverLauncher implements AutoCloseable { + + private static final Logger LOG = Logger.getLogger(DolphinDriverLauncher.class.getName()); + + private static final Configuration CLIENT_CONFIG = ClientConfiguration.CONF + .set(ClientConfiguration.ON_JOB_SUBMITTED, SubmittedJobHandler.class) + .set(ClientConfiguration.ON_JOB_RUNNING, RunningJobHandler.class) + .set(ClientConfiguration.ON_JOB_COMPLETED, CompletedJobHandler.class) + .set(ClientConfiguration.ON_JOB_FAILED, FailedJobHandler.class) + .set(ClientConfiguration.ON_RUNTIME_ERROR, RuntimeErrorHandler.class) + .build(); + + private final REEF reef; + + private LauncherStatus status = LauncherStatus.INIT; + + private String jobId; + private RunningJob theJob; + + @Inject + private DolphinDriverLauncher(final REEF reef) { + this.reef = reef; + } + + /** + * Instantiate a launcher for the given Configuration. + * + * @param runtimeConfiguration the resourcemanager configuration to be used + * @return a DolphinDriverLauncher based on the given resourcemanager configuration + * @throws InjectionException on configuration errors + */ + public static DolphinDriverLauncher getLauncher(final Configuration runtimeConfiguration) throws InjectionException { + return Tang.Factory.getTang() + .newInjector(runtimeConfiguration, CLIENT_CONFIG) + .getInstance(DolphinDriverLauncher.class); + } + + /** + * Intantiate a launcher for the given Configuration. + * @param runtimeConfiguration the resourcemanager configuration to be used + * @param clientConfiguration the client configuration to be used + * @return a DolphinDriverLauncher based on the given configuration + * @throws InjectionException on configuration errors + */ + public static DolphinDriverLauncher getLauncher(final Configuration runtimeConfiguration, + final Configuration clientConfiguration) throws InjectionException { + return Tang.Factory.getTang() + .newInjector(runtimeConfiguration, clientConfiguration) + .getInstance(DolphinDriverLauncher.class); + } + + /** + * Kills the running job. + */ + @Override + public void close() { + synchronized (this) { + LOG.log(Level.FINER, "Close launcher: job {0} with status {1}", new Object[] {this.theJob, this.status}); + if (this.status.isRunning()) { + this.status = LauncherStatus.FORCE_CLOSED; + } + if (null != this.theJob) { + this.theJob.close(); + } + this.notify(); + } + LOG.log(Level.FINEST, "Close launcher: shutdown REEF"); + this.reef.close(); + LOG.log(Level.FINEST, "Close launcher: done"); + } + + /** + * Submit REEF job asynchronously and do not wait for its completion. + * + * @param driverConfig configuration of hte driver to submit to the RM. + * @return ID of the new application. + */ + public String submit(final Configuration driverConfig, final long waitTime) { + this.reef.submit(driverConfig); + this.waitForStatus(waitTime, LauncherStatus.SUBMITTED); + return this.jobId; + } + + /** + * Wait for one of the specified statuses of the REEF job. + * This method is called after the job is submitted to the RM via submit(). + * @param waitTime wait time in milliseconds. + * @param statuses array of statuses to wait for. + * @return the state of the job after the wait. + */ + public LauncherStatus waitForStatus(final long waitTime, final LauncherStatus... statuses) { + + final long endTime = System.currentTimeMillis() + waitTime; + + final HashSet statSet = new HashSet<>(statuses.length * 2); + Collections.addAll(statSet, statuses); + Collections.addAll(statSet, LauncherStatus.FAILED, LauncherStatus.FORCE_CLOSED); + + LOG.log(Level.FINEST, "Wait for status: {0}", statSet); + final LauncherStatus finalStatus; + + synchronized (this) { + while (!statSet.contains(this.status)) { + try { + final long delay = endTime - System.currentTimeMillis(); + if (delay <= 0) { + break; + } + LOG.log(Level.FINE, "Wait for {0} milliSeconds", delay); + this.wait(delay); + } catch (final InterruptedException ex) { + LOG.log(Level.FINE, "Interrupted: {0}", ex); + } + } + + finalStatus = this.status; + } + + LOG.log(Level.FINEST, "Final status: {0}", finalStatus); + return finalStatus; + } + + /** + * Run a job with a waiting timeout after which it will be killed, if it did not complete yet. + * + * @param driverConfig the configuration for the driver. See DriverConfiguration for details. + * @param timeOut timeout on the job. + * @return the state of the job after execution. + */ + public LauncherStatus run(final Configuration driverConfig, final long timeOut) { + + final long startTime = System.currentTimeMillis(); + + this.reef.submit(driverConfig); + this.waitForStatus(timeOut - System.currentTimeMillis() + startTime, LauncherStatus.COMPLETED); + + if (System.currentTimeMillis() - startTime >= timeOut) { + LOG.log(Level.WARNING, "The Job timed out."); + synchronized (this) { + this.status = LauncherStatus.FORCE_CLOSED; + } + } + + this.reef.close(); + return this.getStatus(); + } + + /** + * @return the current status of the job. + */ + public synchronized LauncherStatus getStatus() { + return this.status; + } + + /** Update job status and notify the waiting thread. */ + public synchronized void setStatusAndNotify(final LauncherStatus newStatus) { + LOG.log(Level.FINEST, "Set status: {0} -> {1}", new Object[] {this.status, newStatus}); + this.status = newStatus; + this.notify(); + } + + @Override + public String toString() { + return String.format("DolphinDriverLauncher: { jobId: %s, status: %s }", this.jobId, this.status); + } + + /** + * Job driver notifies us that the job has been submitted to the Resource Manager. + */ + public final class SubmittedJobHandler implements EventHandler { + @Override + public void onNext(final SubmittedJob job) { + LOG.log(Level.INFO, "REEF job submitted: {0}.", job.getId()); + jobId = job.getId(); + setStatusAndNotify(LauncherStatus.SUBMITTED); + } + } + + /** + * Job driver notifies us that the job is running. + */ + public final class RunningJobHandler implements EventHandler { + @Override + public void onNext(final RunningJob job) { + LOG.log(Level.INFO, "The Job {0} is running.", job.getId()); + theJob = job; + setStatusAndNotify(LauncherStatus.RUNNING); + } + } + + /** + * Job driver notifies us that the job had failed. + */ + public final class FailedJobHandler implements EventHandler { + @Override + public void onNext(final FailedJob job) { + final Optional ex = job.getReason(); + LOG.log(Level.SEVERE, "Received an error for job " + job.getId(), ex); + theJob = null; + setStatusAndNotify(LauncherStatus.failed(ex)); + } + } + + /** + * Job driver notifies us that the job had completed successfully. + */ + public final class CompletedJobHandler implements EventHandler { + @Override + public void onNext(final CompletedJob job) { + LOG.log(Level.INFO, "The Job {0} is done.", job.getId()); + theJob = null; + setStatusAndNotify(LauncherStatus.COMPLETED); + } + } + + /** + * Handler an error in the job driver. + */ + public final class RuntimeErrorHandler implements EventHandler { + @Override + public void onNext(final FailedRuntime error) { + LOG.log(Level.SEVERE, "Received a resource manager error", error.getReason()); + theJob = null; + setStatusAndNotify(LauncherStatus.failed(error.getReason())); + } + } + + /** + * Log progress report to client. + */ + final class JobMessageHandler implements EventHandler { + @Override + public void onNext(final JobMessage message) { + final IntBuffer ib = ByteBuffer.wrap(message.get()).order(ByteOrder.BIG_ENDIAN).asIntBuffer(); + final int[] decodedProgressInfo = new int[ib.capacity()]; + ib.get(decodedProgressInfo); + final int epochIdx = decodedProgressInfo[0]; + final int maxNumEpochs = decodedProgressInfo[1]; + LOG.log(Level.INFO, "Progressed epoch count is : [{0} / {1}]", new Object[] {epochIdx, maxNumEpochs}); + } + } +} diff --git a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/client/ProgressMessageHandler.java b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/client/ProgressMessageHandler.java new file mode 100644 index 000000000..db61f09e7 --- /dev/null +++ b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/client/ProgressMessageHandler.java @@ -0,0 +1,44 @@ +/* + * Copyright (C) 2017 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.cay.dolphin.async.client; + +import org.apache.reef.annotations.audience.ClientSide; +import org.apache.reef.client.JobMessage; +import org.apache.reef.wake.EventHandler; + +import javax.inject.Inject; +import java.nio.charset.StandardCharsets; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Log progress report to client. + */ +@ClientSide +public final class ProgressMessageHandler implements EventHandler { + + private static final Logger LOG = Logger.getLogger(ProgressMessageHandler.class.getName()); + + @Inject + private ProgressMessageHandler() { + } + + @Override + public void onNext(final JobMessage message) { + final String decodedMessage = new String(message.get(), StandardCharsets.UTF_8); + LOG.log(Level.INFO, decodedMessage); + } +} diff --git a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/client/package-info.java b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/client/package-info.java new file mode 100644 index 000000000..72907e300 --- /dev/null +++ b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/client/package-info.java @@ -0,0 +1,19 @@ +/* + * Copyright (C) 2017 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Classes that are related to dolphin client. + */ +package edu.snu.cay.dolphin.async.client; From 3d8ef07448cbd0d2eed103ca7e04b93852466d57 Mon Sep 17 00:00:00 2001 From: junhoe Date: Tue, 28 Mar 2017 19:41:01 +0900 Subject: [PATCH 06/11] Change redundant class in DolphinDriverLauncher --- .../dolphin/async/DolphinDriverLauncher.java | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/DolphinDriverLauncher.java b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/DolphinDriverLauncher.java index b5aecc948..3fa3df893 100644 --- a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/DolphinDriverLauncher.java +++ b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/DolphinDriverLauncher.java @@ -26,9 +26,6 @@ import org.apache.reef.wake.EventHandler; import javax.inject.Inject; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.nio.IntBuffer; import java.util.Collections; import java.util.HashSet; import java.util.logging.Level; @@ -266,19 +263,4 @@ public void onNext(final FailedRuntime error) { setStatusAndNotify(LauncherStatus.failed(error.getReason())); } } - - /** - * Log progress report to client. - */ - final class JobMessageHandler implements EventHandler { - @Override - public void onNext(final JobMessage message) { - final IntBuffer ib = ByteBuffer.wrap(message.get()).order(ByteOrder.BIG_ENDIAN).asIntBuffer(); - final int[] decodedProgressInfo = new int[ib.capacity()]; - ib.get(decodedProgressInfo); - final int epochIdx = decodedProgressInfo[0]; - final int maxNumEpochs = decodedProgressInfo[1]; - LOG.log(Level.INFO, "Progressed epoch count is : [{0} / {1}]", new Object[] {epochIdx, maxNumEpochs}); - } - } } From 72550256b77b53c40937d71abddc5e01e63a41ef Mon Sep 17 00:00:00 2001 From: WooYeon Date: Wed, 29 Mar 2017 14:58:49 +0900 Subject: [PATCH 07/11] in progress --- .../edu/snu/cay/common/DriverLauncher.java | 24 +++++++++---------- .../dolphin/async/AsyncDolphinLauncher.java | 14 +++++------ .../{client => }/ProgressMessageHandler.java | 7 +++--- .../dolphin/async/client/package-info.java | 19 --------------- 4 files changed, 23 insertions(+), 41 deletions(-) rename dolphin/async/src/main/java/edu/snu/cay/dolphin/async/DolphinDriverLauncher.java => common/src/main/java/edu/snu/cay/common/DriverLauncher.java (89%) rename dolphin/async/src/main/java/edu/snu/cay/dolphin/async/{client => }/ProgressMessageHandler.java (87%) delete mode 100644 dolphin/async/src/main/java/edu/snu/cay/dolphin/async/client/package-info.java diff --git a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/DolphinDriverLauncher.java b/common/src/main/java/edu/snu/cay/common/DriverLauncher.java similarity index 89% rename from dolphin/async/src/main/java/edu/snu/cay/dolphin/async/DolphinDriverLauncher.java rename to common/src/main/java/edu/snu/cay/common/DriverLauncher.java index 3fa3df893..a75a08d6b 100644 --- a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/DolphinDriverLauncher.java +++ b/common/src/main/java/edu/snu/cay/common/DriverLauncher.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package edu.snu.cay.dolphin.async; +package edu.snu.cay.common; import org.apache.reef.annotations.audience.ClientSide; import org.apache.reef.annotations.audience.Public; @@ -37,9 +37,9 @@ @Public @ClientSide @Unit -public final class DolphinDriverLauncher implements AutoCloseable { +public final class DriverLauncher implements AutoCloseable { - private static final Logger LOG = Logger.getLogger(DolphinDriverLauncher.class.getName()); + private static final Logger LOG = Logger.getLogger(DriverLauncher.class.getName()); private static final Configuration CLIENT_CONFIG = ClientConfiguration.CONF .set(ClientConfiguration.ON_JOB_SUBMITTED, SubmittedJobHandler.class) @@ -57,7 +57,7 @@ public final class DolphinDriverLauncher implements AutoCloseable { private RunningJob theJob; @Inject - private DolphinDriverLauncher(final REEF reef) { + private DriverLauncher(final REEF reef) { this.reef = reef; } @@ -65,27 +65,27 @@ private DolphinDriverLauncher(final REEF reef) { * Instantiate a launcher for the given Configuration. * * @param runtimeConfiguration the resourcemanager configuration to be used - * @return a DolphinDriverLauncher based on the given resourcemanager configuration + * @return a DriverLauncher based on the given resourcemanager configuration * @throws InjectionException on configuration errors */ - public static DolphinDriverLauncher getLauncher(final Configuration runtimeConfiguration) throws InjectionException { + public static DriverLauncher getLauncher(final Configuration runtimeConfiguration) throws InjectionException { return Tang.Factory.getTang() .newInjector(runtimeConfiguration, CLIENT_CONFIG) - .getInstance(DolphinDriverLauncher.class); + .getInstance(DriverLauncher.class); } /** * Intantiate a launcher for the given Configuration. * @param runtimeConfiguration the resourcemanager configuration to be used * @param clientConfiguration the client configuration to be used - * @return a DolphinDriverLauncher based on the given configuration + * @return a DriverLauncher based on the given configuration * @throws InjectionException on configuration errors */ - public static DolphinDriverLauncher getLauncher(final Configuration runtimeConfiguration, - final Configuration clientConfiguration) throws InjectionException { + public static DriverLauncher getLauncher(final Configuration runtimeConfiguration, + final Configuration clientConfiguration) throws InjectionException { return Tang.Factory.getTang() .newInjector(runtimeConfiguration, clientConfiguration) - .getInstance(DolphinDriverLauncher.class); + .getInstance(DriverLauncher.class); } /** @@ -200,7 +200,7 @@ public synchronized void setStatusAndNotify(final LauncherStatus newStatus) { @Override public String toString() { - return String.format("DolphinDriverLauncher: { jobId: %s, status: %s }", this.jobId, this.status); + return String.format("DriverLauncher: { jobId: %s, status: %s }", this.jobId, this.status); } /** diff --git a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinLauncher.java b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinLauncher.java index f559c867c..75d3eaa84 100644 --- a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinLauncher.java +++ b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinLauncher.java @@ -15,8 +15,8 @@ */ package edu.snu.cay.dolphin.async; +import edu.snu.cay.common.DriverLauncher; import edu.snu.cay.common.dataloader.TextInputFormat; -import edu.snu.cay.dolphin.async.client.ProgressMessageHandler; import edu.snu.cay.dolphin.async.metric.*; import edu.snu.cay.dolphin.async.dashboard.DashboardConfProvider; import edu.snu.cay.dolphin.async.dashboard.DashboardLauncher; @@ -211,15 +211,15 @@ public static LauncherStatus launch(final String jobName, // client-side configurations final Configuration clientConf = ClientConfiguration.CONF - .set(ClientConfiguration.ON_JOB_SUBMITTED, DolphinDriverLauncher.SubmittedJobHandler.class) - .set(ClientConfiguration.ON_JOB_RUNNING, DolphinDriverLauncher.RunningJobHandler.class) - .set(ClientConfiguration.ON_JOB_COMPLETED, DolphinDriverLauncher.CompletedJobHandler.class) - .set(ClientConfiguration.ON_JOB_FAILED, DolphinDriverLauncher.FailedJobHandler.class) - .set(ClientConfiguration.ON_RUNTIME_ERROR, DolphinDriverLauncher.RuntimeErrorHandler.class) + .set(ClientConfiguration.ON_JOB_SUBMITTED, DriverLauncher.SubmittedJobHandler.class) + .set(ClientConfiguration.ON_JOB_RUNNING, DriverLauncher.RunningJobHandler.class) + .set(ClientConfiguration.ON_JOB_COMPLETED, DriverLauncher.CompletedJobHandler.class) + .set(ClientConfiguration.ON_JOB_FAILED, DriverLauncher.FailedJobHandler.class) + .set(ClientConfiguration.ON_RUNTIME_ERROR, DriverLauncher.RuntimeErrorHandler.class) .set(ClientConfiguration.ON_JOB_MESSAGE, ProgressMessageHandler.class) .build(); - final LauncherStatus status = DolphinDriverLauncher.getLauncher(runTimeConf, clientConf).run( + final LauncherStatus status = DriverLauncher.getLauncher(runTimeConf, clientConf).run( Configurations.merge(basicParameterConf, parameterServerConf, serializedServerConf, serializedWorkerConf, driverConf, customDriverConfiguration, serializedEMClientConf, dashboardConf), diff --git a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/client/ProgressMessageHandler.java b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/ProgressMessageHandler.java similarity index 87% rename from dolphin/async/src/main/java/edu/snu/cay/dolphin/async/client/ProgressMessageHandler.java rename to dolphin/async/src/main/java/edu/snu/cay/dolphin/async/ProgressMessageHandler.java index db61f09e7..bc85c73c8 100644 --- a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/client/ProgressMessageHandler.java +++ b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/ProgressMessageHandler.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package edu.snu.cay.dolphin.async.client; +package edu.snu.cay.dolphin.async; import org.apache.reef.annotations.audience.ClientSide; import org.apache.reef.client.JobMessage; @@ -25,7 +25,8 @@ import java.util.logging.Logger; /** - * Log progress report to client. + * Handler of {@link JobMessage} from driver. + * It logs progress report to console. */ @ClientSide public final class ProgressMessageHandler implements EventHandler { @@ -39,6 +40,6 @@ private ProgressMessageHandler() { @Override public void onNext(final JobMessage message) { final String decodedMessage = new String(message.get(), StandardCharsets.UTF_8); - LOG.log(Level.INFO, decodedMessage); + LOG.log(Level.INFO, "Job message: {0}", decodedMessage); } } diff --git a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/client/package-info.java b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/client/package-info.java deleted file mode 100644 index 72907e300..000000000 --- a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/client/package-info.java +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright (C) 2017 Seoul National University - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Classes that are related to dolphin client. - */ -package edu.snu.cay.dolphin.async.client; From 150a8562bd7f633105fc142b6e8ab151e72bff88 Mon Sep 17 00:00:00 2001 From: junhoe Date: Wed, 29 Mar 2017 15:35:18 +0900 Subject: [PATCH 08/11] set StringJobMessageHandler in DriverLauncher --- .../edu/snu/cay/common/DriverLauncher.java | 29 ++++++------ .../java/edu/snu/cay/common/package-info.java | 19 ++++++++ .../dolphin/async/AsyncDolphinLauncher.java | 12 +---- .../dolphin/async/ProgressMessageHandler.java | 45 ------------------- 4 files changed, 35 insertions(+), 70 deletions(-) create mode 100644 common/src/main/java/edu/snu/cay/common/package-info.java delete mode 100644 dolphin/async/src/main/java/edu/snu/cay/dolphin/async/ProgressMessageHandler.java diff --git a/common/src/main/java/edu/snu/cay/common/DriverLauncher.java b/common/src/main/java/edu/snu/cay/common/DriverLauncher.java index a75a08d6b..e2d8b1a4d 100644 --- a/common/src/main/java/edu/snu/cay/common/DriverLauncher.java +++ b/common/src/main/java/edu/snu/cay/common/DriverLauncher.java @@ -26,6 +26,7 @@ import org.apache.reef.wake.EventHandler; import javax.inject.Inject; +import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.HashSet; import java.util.logging.Level; @@ -47,6 +48,7 @@ public final class DriverLauncher implements AutoCloseable { .set(ClientConfiguration.ON_JOB_COMPLETED, CompletedJobHandler.class) .set(ClientConfiguration.ON_JOB_FAILED, FailedJobHandler.class) .set(ClientConfiguration.ON_RUNTIME_ERROR, RuntimeErrorHandler.class) + .set(ClientConfiguration.ON_JOB_MESSAGE, StringJobMessageHandler.class) .build(); private final REEF reef; @@ -74,20 +76,6 @@ public static DriverLauncher getLauncher(final Configuration runtimeConfiguratio .getInstance(DriverLauncher.class); } - /** - * Intantiate a launcher for the given Configuration. - * @param runtimeConfiguration the resourcemanager configuration to be used - * @param clientConfiguration the client configuration to be used - * @return a DriverLauncher based on the given configuration - * @throws InjectionException on configuration errors - */ - public static DriverLauncher getLauncher(final Configuration runtimeConfiguration, - final Configuration clientConfiguration) throws InjectionException { - return Tang.Factory.getTang() - .newInjector(runtimeConfiguration, clientConfiguration) - .getInstance(DriverLauncher.class); - } - /** * Kills the running job. */ @@ -263,4 +251,17 @@ public void onNext(final FailedRuntime error) { setStatusAndNotify(LauncherStatus.failed(error.getReason())); } } + + /** + * Handler of {@link JobMessage} from driver. + * It logs progress report to console. + */ + public final class StringJobMessageHandler implements EventHandler { + + @Override + public void onNext(final JobMessage message) { + final String decodedMessage = new String(message.get(), StandardCharsets.UTF_8); + LOG.log(Level.INFO, "Job message: {0}", decodedMessage); + } + } } diff --git a/common/src/main/java/edu/snu/cay/common/package-info.java b/common/src/main/java/edu/snu/cay/common/package-info.java new file mode 100644 index 000000000..d37dbfe72 --- /dev/null +++ b/common/src/main/java/edu/snu/cay/common/package-info.java @@ -0,0 +1,19 @@ +/* + * Copyright (C) 2017 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Common classes. + */ +package edu.snu.cay.common; diff --git a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinLauncher.java b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinLauncher.java index 75d3eaa84..75de97414 100644 --- a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinLauncher.java +++ b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinLauncher.java @@ -209,17 +209,7 @@ public static LauncherStatus launch(final String jobName, final Configuration driverConf = getDriverConfiguration(jobName, basicParameterInjector); final int timeout = basicParameterInjector.getNamedInstance(Timeout.class); - // client-side configurations - final Configuration clientConf = ClientConfiguration.CONF - .set(ClientConfiguration.ON_JOB_SUBMITTED, DriverLauncher.SubmittedJobHandler.class) - .set(ClientConfiguration.ON_JOB_RUNNING, DriverLauncher.RunningJobHandler.class) - .set(ClientConfiguration.ON_JOB_COMPLETED, DriverLauncher.CompletedJobHandler.class) - .set(ClientConfiguration.ON_JOB_FAILED, DriverLauncher.FailedJobHandler.class) - .set(ClientConfiguration.ON_RUNTIME_ERROR, DriverLauncher.RuntimeErrorHandler.class) - .set(ClientConfiguration.ON_JOB_MESSAGE, ProgressMessageHandler.class) - .build(); - - final LauncherStatus status = DriverLauncher.getLauncher(runTimeConf, clientConf).run( + final LauncherStatus status = DriverLauncher.getLauncher(runTimeConf).run( Configurations.merge(basicParameterConf, parameterServerConf, serializedServerConf, serializedWorkerConf, driverConf, customDriverConfiguration, serializedEMClientConf, dashboardConf), diff --git a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/ProgressMessageHandler.java b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/ProgressMessageHandler.java deleted file mode 100644 index bc85c73c8..000000000 --- a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/ProgressMessageHandler.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (C) 2017 Seoul National University - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package edu.snu.cay.dolphin.async; - -import org.apache.reef.annotations.audience.ClientSide; -import org.apache.reef.client.JobMessage; -import org.apache.reef.wake.EventHandler; - -import javax.inject.Inject; -import java.nio.charset.StandardCharsets; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * Handler of {@link JobMessage} from driver. - * It logs progress report to console. - */ -@ClientSide -public final class ProgressMessageHandler implements EventHandler { - - private static final Logger LOG = Logger.getLogger(ProgressMessageHandler.class.getName()); - - @Inject - private ProgressMessageHandler() { - } - - @Override - public void onNext(final JobMessage message) { - final String decodedMessage = new String(message.get(), StandardCharsets.UTF_8); - LOG.log(Level.INFO, "Job message: {0}", decodedMessage); - } -} From e3359f7cebdea64d91ed8fca7747fb749c39cfa2 Mon Sep 17 00:00:00 2001 From: junhoe Date: Mon, 3 Apr 2017 17:41:54 +0900 Subject: [PATCH 09/11] formats, name of listener in ClockManager modified --- .../cay/common/{ => client}/DriverLauncher.java | 15 +++++++++------ .../snu/cay/common/{ => client}/package-info.java | 4 ++-- .../snu/cay/dolphin/async/AsyncDolphinDriver.java | 2 +- .../cay/dolphin/async/AsyncDolphinLauncher.java | 2 +- .../cay/services/ps/driver/impl/ClockManager.java | 10 +++++----- 5 files changed, 18 insertions(+), 15 deletions(-) rename common/src/main/java/edu/snu/cay/common/{ => client}/DriverLauncher.java (96%) rename common/src/main/java/edu/snu/cay/common/{ => client}/package-info.java (91%) diff --git a/common/src/main/java/edu/snu/cay/common/DriverLauncher.java b/common/src/main/java/edu/snu/cay/common/client/DriverLauncher.java similarity index 96% rename from common/src/main/java/edu/snu/cay/common/DriverLauncher.java rename to common/src/main/java/edu/snu/cay/common/client/DriverLauncher.java index e2d8b1a4d..0db9fdd6c 100644 --- a/common/src/main/java/edu/snu/cay/common/DriverLauncher.java +++ b/common/src/main/java/edu/snu/cay/common/client/DriverLauncher.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package edu.snu.cay.common; +package edu.snu.cay.common.client; import org.apache.reef.annotations.audience.ClientSide; import org.apache.reef.annotations.audience.Public; @@ -33,7 +33,8 @@ import java.util.logging.Logger; /** - * A launcher for Dolphin driver. + * Improve version of DriverLauncher in REEF. + * It has JobMessageHandler which handles string messages from Driver. */ @Public @ClientSide @@ -179,7 +180,9 @@ public synchronized LauncherStatus getStatus() { return this.status; } - /** Update job status and notify the waiting thread. */ + /** + * Update job status and notify the waiting thread. + */ public synchronized void setStatusAndNotify(final LauncherStatus newStatus) { LOG.log(Level.FINEST, "Set status: {0} -> {1}", new Object[] {this.status, newStatus}); this.status = newStatus; @@ -253,9 +256,9 @@ public void onNext(final FailedRuntime error) { } /** - * Handler of {@link JobMessage} from driver. - * It logs progress report to console. - */ + * Handler of {@link JobMessage} from driver. + * It logs progress report to console. + */ public final class StringJobMessageHandler implements EventHandler { @Override diff --git a/common/src/main/java/edu/snu/cay/common/package-info.java b/common/src/main/java/edu/snu/cay/common/client/package-info.java similarity index 91% rename from common/src/main/java/edu/snu/cay/common/package-info.java rename to common/src/main/java/edu/snu/cay/common/client/package-info.java index d37dbfe72..a7ef2c7b2 100644 --- a/common/src/main/java/edu/snu/cay/common/package-info.java +++ b/common/src/main/java/edu/snu/cay/common/client/package-info.java @@ -14,6 +14,6 @@ * limitations under the License. */ /** - * Common classes. + * Common client classes. */ -package edu.snu.cay.common; +package edu.snu.cay.common.client; diff --git a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinDriver.java b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinDriver.java index a8409e6d4..1ee482eaa 100644 --- a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinDriver.java +++ b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinDriver.java @@ -344,7 +344,7 @@ private AsyncDolphinDriver(final EvaluatorManager evaluatorManager, this.maxNumEpochs = maxNumEpochs; // send JobMessage to client - this.clockManager.addProgressUpdateListener(epochIdx -> { + this.clockManager.addClockUpdateListener(epochIdx -> { final String progressMessage = String.format("Progress epoch count is [%d / %d]", epochIdx, maxNumEpochs); final byte[] encodedProgressMessage = progressMessage.getBytes(StandardCharsets.UTF_8); jobMessageObserver.sendMessageToClient(encodedProgressMessage); diff --git a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinLauncher.java b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinLauncher.java index 75de97414..629eedf6d 100644 --- a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinLauncher.java +++ b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinLauncher.java @@ -15,7 +15,7 @@ */ package edu.snu.cay.dolphin.async; -import edu.snu.cay.common.DriverLauncher; +import edu.snu.cay.common.client.DriverLauncher; import edu.snu.cay.common.dataloader.TextInputFormat; import edu.snu.cay.dolphin.async.metric.*; import edu.snu.cay.dolphin.async.dashboard.DashboardConfProvider; diff --git a/services/ps/src/main/java/edu/snu/cay/services/ps/driver/impl/ClockManager.java b/services/ps/src/main/java/edu/snu/cay/services/ps/driver/impl/ClockManager.java index f586b7caf..a6bb1a6f6 100644 --- a/services/ps/src/main/java/edu/snu/cay/services/ps/driver/impl/ClockManager.java +++ b/services/ps/src/main/java/edu/snu/cay/services/ps/driver/impl/ClockManager.java @@ -73,9 +73,9 @@ public final class ClockManager { private int globalMinimumClock; /** - * The list of EventHandler about progress update. + * The list of listeners for the update of {@link #globalMinimumClock}. */ - private final List> progressUpdateCallbacks = new ArrayList<>(); + private final List> clockUpdateListeners = new ArrayList<>(); @Inject private ClockManager(final AggregationMaster aggregationMaster, @@ -93,8 +93,8 @@ private ClockManager(final AggregationMaster aggregationMaster, * Add listener to progress update list. * @param callback when #globalMinimumClock increases, callback functions in list are called. */ - public void addProgressUpdateListener(final EventHandler callback) { - progressUpdateCallbacks.add(callback); + public void addClockUpdateListener(final EventHandler callback) { + clockUpdateListeners.add(callback); } /** @@ -213,7 +213,7 @@ private synchronized void tickClock(final String workerId) { if (minimumClockWorkers.size() == 0) { globalMinimumClock++; broadcastGlobalMinimumClock(); - progressUpdateCallbacks.forEach(callback -> callback.onNext(globalMinimumClock)); + clockUpdateListeners.forEach(callback -> callback.onNext(globalMinimumClock)); } } } From d4d4bf3dfc7f78df56a1ac3ad06cc22936a6496b Mon Sep 17 00:00:00 2001 From: junhoe Date: Tue, 4 Apr 2017 11:18:29 +0900 Subject: [PATCH 10/11] undo delete blank line --- .../java/edu/snu/cay/dolphin/async/AsyncDolphinLauncher.java | 1 + 1 file changed, 1 insertion(+) diff --git a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinLauncher.java b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinLauncher.java index 629eedf6d..4fae47fec 100644 --- a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinLauncher.java +++ b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinLauncher.java @@ -142,6 +142,7 @@ public static LauncherStatus launch(final String jobName, basicParameterInjector.getNamedInstance(LocalRuntimeMaxNumEvaluators.class), basicParameterInjector.getNamedInstance(JVMHeapSlack.class)) : getYarnRuntimeConfiguration(basicParameterInjector.getNamedInstance(JVMHeapSlack.class)); + // configuration for the parameter server final boolean dynamic = basicParameterInjector.getNamedInstance(Dynamic.class); final Class managerClass = dynamic ? From 6e0e9aafd82219f4976d3c22a9c02d4b6595a41c Mon Sep 17 00:00:00 2001 From: WooYeon Date: Tue, 4 Apr 2017 18:13:18 +0900 Subject: [PATCH 11/11] Minor update --- .../java/edu/snu/cay/common/client/DriverLauncher.java | 7 ++++--- .../java/edu/snu/cay/dolphin/async/AsyncDolphinDriver.java | 3 ++- .../edu/snu/cay/dolphin/async/AsyncDolphinLauncher.java | 2 +- .../edu/snu/cay/services/ps/driver/impl/ClockManager.java | 4 ++-- 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/common/src/main/java/edu/snu/cay/common/client/DriverLauncher.java b/common/src/main/java/edu/snu/cay/common/client/DriverLauncher.java index 0db9fdd6c..c06bcd533 100644 --- a/common/src/main/java/edu/snu/cay/common/client/DriverLauncher.java +++ b/common/src/main/java/edu/snu/cay/common/client/DriverLauncher.java @@ -33,8 +33,9 @@ import java.util.logging.Logger; /** - * Improve version of DriverLauncher in REEF. - * It has JobMessageHandler which handles string messages from Driver. + * Improved version of {@link org.apache.reef.client.DriverLauncher}. + * It supports {@link StringJobMessageHandler}, which logs {@link JobMessage} + * with the assumption that the message type is String. */ @Public @ClientSide @@ -257,7 +258,7 @@ public void onNext(final FailedRuntime error) { /** * Handler of {@link JobMessage} from driver. - * It logs progress report to console. + * It logs job message to console. */ public final class StringJobMessageHandler implements EventHandler { diff --git a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinDriver.java b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinDriver.java index 1ee482eaa..52d79fcfa 100644 --- a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinDriver.java +++ b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinDriver.java @@ -343,7 +343,8 @@ private AsyncDolphinDriver(final EvaluatorManager evaluatorManager, this.optimizationIntervalMs = optimizationIntervalMs; this.maxNumEpochs = maxNumEpochs; - // send JobMessage to client + // send JobMessage about epoch progress to client + // it utilizes the minimum clock maintained by ClockManager this.clockManager.addClockUpdateListener(epochIdx -> { final String progressMessage = String.format("Progress epoch count is [%d / %d]", epochIdx, maxNumEpochs); final byte[] encodedProgressMessage = progressMessage.getBytes(StandardCharsets.UTF_8); diff --git a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinLauncher.java b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinLauncher.java index 4fae47fec..9d6d69481 100644 --- a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinLauncher.java +++ b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinLauncher.java @@ -142,7 +142,7 @@ public static LauncherStatus launch(final String jobName, basicParameterInjector.getNamedInstance(LocalRuntimeMaxNumEvaluators.class), basicParameterInjector.getNamedInstance(JVMHeapSlack.class)) : getYarnRuntimeConfiguration(basicParameterInjector.getNamedInstance(JVMHeapSlack.class)); - + // configuration for the parameter server final boolean dynamic = basicParameterInjector.getNamedInstance(Dynamic.class); final Class managerClass = dynamic ? diff --git a/services/ps/src/main/java/edu/snu/cay/services/ps/driver/impl/ClockManager.java b/services/ps/src/main/java/edu/snu/cay/services/ps/driver/impl/ClockManager.java index a6bb1a6f6..c0f874530 100644 --- a/services/ps/src/main/java/edu/snu/cay/services/ps/driver/impl/ClockManager.java +++ b/services/ps/src/main/java/edu/snu/cay/services/ps/driver/impl/ClockManager.java @@ -90,8 +90,8 @@ private ClockManager(final AggregationMaster aggregationMaster, } /** - * Add listener to progress update list. - * @param callback when #globalMinimumClock increases, callback functions in list are called. + * Add a listener for the update of minimum clock. + * @param callback a callback to be invoked when {@link #globalMinimumClock} increases */ public void addClockUpdateListener(final EventHandler callback) { clockUpdateListeners.add(callback);