diff --git a/common/src/main/java/edu/snu/cay/common/client/DriverLauncher.java b/common/src/main/java/edu/snu/cay/common/client/DriverLauncher.java new file mode 100644 index 000000000..c06bcd533 --- /dev/null +++ b/common/src/main/java/edu/snu/cay/common/client/DriverLauncher.java @@ -0,0 +1,271 @@ +/* + * Copyright (C) 2017 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.cay.common.client; + +import org.apache.reef.annotations.audience.ClientSide; +import org.apache.reef.annotations.audience.Public; +import org.apache.reef.client.*; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.annotations.Unit; +import org.apache.reef.tang.exceptions.InjectionException; +import org.apache.reef.util.Optional; +import org.apache.reef.wake.EventHandler; + +import javax.inject.Inject; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashSet; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Improved version of {@link org.apache.reef.client.DriverLauncher}. + * It supports {@link StringJobMessageHandler}, which logs {@link JobMessage} + * with the assumption that the message type is String. + */ +@Public +@ClientSide +@Unit +public final class DriverLauncher implements AutoCloseable { + + private static final Logger LOG = Logger.getLogger(DriverLauncher.class.getName()); + + private static final Configuration CLIENT_CONFIG = ClientConfiguration.CONF + .set(ClientConfiguration.ON_JOB_SUBMITTED, SubmittedJobHandler.class) + .set(ClientConfiguration.ON_JOB_RUNNING, RunningJobHandler.class) + .set(ClientConfiguration.ON_JOB_COMPLETED, CompletedJobHandler.class) + .set(ClientConfiguration.ON_JOB_FAILED, FailedJobHandler.class) + .set(ClientConfiguration.ON_RUNTIME_ERROR, RuntimeErrorHandler.class) + .set(ClientConfiguration.ON_JOB_MESSAGE, StringJobMessageHandler.class) + .build(); + + private final REEF reef; + + private LauncherStatus status = LauncherStatus.INIT; + + private String jobId; + private RunningJob theJob; + + @Inject + private DriverLauncher(final REEF reef) { + this.reef = reef; + } + + /** + * Instantiate a launcher for the given Configuration. + * + * @param runtimeConfiguration the resourcemanager configuration to be used + * @return a DriverLauncher based on the given resourcemanager configuration + * @throws InjectionException on configuration errors + */ + public static DriverLauncher getLauncher(final Configuration runtimeConfiguration) throws InjectionException { + return Tang.Factory.getTang() + .newInjector(runtimeConfiguration, CLIENT_CONFIG) + .getInstance(DriverLauncher.class); + } + + /** + * Kills the running job. + */ + @Override + public void close() { + synchronized (this) { + LOG.log(Level.FINER, "Close launcher: job {0} with status {1}", new Object[] {this.theJob, this.status}); + if (this.status.isRunning()) { + this.status = LauncherStatus.FORCE_CLOSED; + } + if (null != this.theJob) { + this.theJob.close(); + } + this.notify(); + } + LOG.log(Level.FINEST, "Close launcher: shutdown REEF"); + this.reef.close(); + LOG.log(Level.FINEST, "Close launcher: done"); + } + + /** + * Submit REEF job asynchronously and do not wait for its completion. + * + * @param driverConfig configuration of hte driver to submit to the RM. + * @return ID of the new application. + */ + public String submit(final Configuration driverConfig, final long waitTime) { + this.reef.submit(driverConfig); + this.waitForStatus(waitTime, LauncherStatus.SUBMITTED); + return this.jobId; + } + + /** + * Wait for one of the specified statuses of the REEF job. + * This method is called after the job is submitted to the RM via submit(). + * @param waitTime wait time in milliseconds. + * @param statuses array of statuses to wait for. + * @return the state of the job after the wait. + */ + public LauncherStatus waitForStatus(final long waitTime, final LauncherStatus... statuses) { + + final long endTime = System.currentTimeMillis() + waitTime; + + final HashSet statSet = new HashSet<>(statuses.length * 2); + Collections.addAll(statSet, statuses); + Collections.addAll(statSet, LauncherStatus.FAILED, LauncherStatus.FORCE_CLOSED); + + LOG.log(Level.FINEST, "Wait for status: {0}", statSet); + final LauncherStatus finalStatus; + + synchronized (this) { + while (!statSet.contains(this.status)) { + try { + final long delay = endTime - System.currentTimeMillis(); + if (delay <= 0) { + break; + } + LOG.log(Level.FINE, "Wait for {0} milliSeconds", delay); + this.wait(delay); + } catch (final InterruptedException ex) { + LOG.log(Level.FINE, "Interrupted: {0}", ex); + } + } + + finalStatus = this.status; + } + + LOG.log(Level.FINEST, "Final status: {0}", finalStatus); + return finalStatus; + } + + /** + * Run a job with a waiting timeout after which it will be killed, if it did not complete yet. + * + * @param driverConfig the configuration for the driver. See DriverConfiguration for details. + * @param timeOut timeout on the job. + * @return the state of the job after execution. + */ + public LauncherStatus run(final Configuration driverConfig, final long timeOut) { + + final long startTime = System.currentTimeMillis(); + + this.reef.submit(driverConfig); + this.waitForStatus(timeOut - System.currentTimeMillis() + startTime, LauncherStatus.COMPLETED); + + if (System.currentTimeMillis() - startTime >= timeOut) { + LOG.log(Level.WARNING, "The Job timed out."); + synchronized (this) { + this.status = LauncherStatus.FORCE_CLOSED; + } + } + + this.reef.close(); + return this.getStatus(); + } + + /** + * @return the current status of the job. + */ + public synchronized LauncherStatus getStatus() { + return this.status; + } + + /** + * Update job status and notify the waiting thread. + */ + public synchronized void setStatusAndNotify(final LauncherStatus newStatus) { + LOG.log(Level.FINEST, "Set status: {0} -> {1}", new Object[] {this.status, newStatus}); + this.status = newStatus; + this.notify(); + } + + @Override + public String toString() { + return String.format("DriverLauncher: { jobId: %s, status: %s }", this.jobId, this.status); + } + + /** + * Job driver notifies us that the job has been submitted to the Resource Manager. + */ + public final class SubmittedJobHandler implements EventHandler { + @Override + public void onNext(final SubmittedJob job) { + LOG.log(Level.INFO, "REEF job submitted: {0}.", job.getId()); + jobId = job.getId(); + setStatusAndNotify(LauncherStatus.SUBMITTED); + } + } + + /** + * Job driver notifies us that the job is running. + */ + public final class RunningJobHandler implements EventHandler { + @Override + public void onNext(final RunningJob job) { + LOG.log(Level.INFO, "The Job {0} is running.", job.getId()); + theJob = job; + setStatusAndNotify(LauncherStatus.RUNNING); + } + } + + /** + * Job driver notifies us that the job had failed. + */ + public final class FailedJobHandler implements EventHandler { + @Override + public void onNext(final FailedJob job) { + final Optional ex = job.getReason(); + LOG.log(Level.SEVERE, "Received an error for job " + job.getId(), ex); + theJob = null; + setStatusAndNotify(LauncherStatus.failed(ex)); + } + } + + /** + * Job driver notifies us that the job had completed successfully. + */ + public final class CompletedJobHandler implements EventHandler { + @Override + public void onNext(final CompletedJob job) { + LOG.log(Level.INFO, "The Job {0} is done.", job.getId()); + theJob = null; + setStatusAndNotify(LauncherStatus.COMPLETED); + } + } + + /** + * Handler an error in the job driver. + */ + public final class RuntimeErrorHandler implements EventHandler { + @Override + public void onNext(final FailedRuntime error) { + LOG.log(Level.SEVERE, "Received a resource manager error", error.getReason()); + theJob = null; + setStatusAndNotify(LauncherStatus.failed(error.getReason())); + } + } + + /** + * Handler of {@link JobMessage} from driver. + * It logs job message to console. + */ + public final class StringJobMessageHandler implements EventHandler { + + @Override + public void onNext(final JobMessage message) { + final String decodedMessage = new String(message.get(), StandardCharsets.UTF_8); + LOG.log(Level.INFO, "Job message: {0}", decodedMessage); + } + } +} diff --git a/common/src/main/java/edu/snu/cay/common/client/package-info.java b/common/src/main/java/edu/snu/cay/common/client/package-info.java new file mode 100644 index 000000000..a7ef2c7b2 --- /dev/null +++ b/common/src/main/java/edu/snu/cay/common/client/package-info.java @@ -0,0 +1,19 @@ +/* + * Copyright (C) 2017 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Common client classes. + */ +package edu.snu.cay.common.client; diff --git a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinDriver.java b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinDriver.java index 1c55ce445..52d79fcfa 100644 --- a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinDriver.java +++ b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinDriver.java @@ -49,6 +49,7 @@ import edu.snu.cay.utils.trace.HTraceParameters; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.driver.ProgressProvider; +import org.apache.reef.driver.client.JobMessageObserver; import org.apache.reef.driver.context.ActiveContext; import org.apache.reef.driver.context.ClosedContext; import org.apache.reef.driver.context.ContextConfiguration; @@ -79,6 +80,7 @@ import javax.inject.Inject; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; @@ -293,6 +295,7 @@ public final class AsyncDolphinDriver { */ @Inject private AsyncDolphinDriver(final EvaluatorManager evaluatorManager, + final JobMessageObserver jobMessageObserver, final DataLoadingService dataLoadingService, final SynchronizationManager synchronizationManager, final ClockManager clockManager, @@ -340,6 +343,14 @@ private AsyncDolphinDriver(final EvaluatorManager evaluatorManager, this.optimizationIntervalMs = optimizationIntervalMs; this.maxNumEpochs = maxNumEpochs; + // send JobMessage about epoch progress to client + // it utilizes the minimum clock maintained by ClockManager + this.clockManager.addClockUpdateListener(epochIdx -> { + final String progressMessage = String.format("Progress epoch count is [%d / %d]", epochIdx, maxNumEpochs); + final byte[] encodedProgressMessage = progressMessage.getBytes(StandardCharsets.UTF_8); + jobMessageObserver.sendMessageToClient(encodedProgressMessage); + }); + try { final Injector workerInjector = injector.forkInjector(); workerInjector.bindVolatileInstance(EMDeleteExecutor.class, new WorkerRemover()); diff --git a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinLauncher.java b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinLauncher.java index d9325a2d7..9d6d69481 100644 --- a/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinLauncher.java +++ b/dolphin/async/src/main/java/edu/snu/cay/dolphin/async/AsyncDolphinLauncher.java @@ -15,6 +15,7 @@ */ package edu.snu.cay.dolphin.async; +import edu.snu.cay.common.client.DriverLauncher; import edu.snu.cay.common.dataloader.TextInputFormat; import edu.snu.cay.dolphin.async.metric.*; import edu.snu.cay.dolphin.async.dashboard.DashboardConfProvider; @@ -56,9 +57,7 @@ import edu.snu.cay.utils.trace.parameters.ReceiverPort; import edu.snu.cay.utils.trace.parameters.ReceiverType; import org.apache.reef.annotations.audience.ClientSide; -import org.apache.reef.client.DriverConfiguration; -import org.apache.reef.client.DriverLauncher; -import org.apache.reef.client.LauncherStatus; +import org.apache.reef.client.*; import org.apache.reef.io.network.naming.LocalNameResolverConfiguration; import org.apache.reef.io.network.naming.NameServerConfiguration; import org.apache.reef.io.network.util.StringIdentifierFactory; diff --git a/services/ps/src/main/java/edu/snu/cay/services/ps/driver/impl/ClockManager.java b/services/ps/src/main/java/edu/snu/cay/services/ps/driver/impl/ClockManager.java index 9081aa4c2..c0f874530 100644 --- a/services/ps/src/main/java/edu/snu/cay/services/ps/driver/impl/ClockManager.java +++ b/services/ps/src/main/java/edu/snu/cay/services/ps/driver/impl/ClockManager.java @@ -72,6 +72,11 @@ public final class ClockManager { */ private int globalMinimumClock; + /** + * The list of listeners for the update of {@link #globalMinimumClock}. + */ + private final List> clockUpdateListeners = new ArrayList<>(); + @Inject private ClockManager(final AggregationMaster aggregationMaster, final ClockMsgCodec codec, @@ -84,6 +89,14 @@ private ClockManager(final AggregationMaster aggregationMaster, minimumClockWorkers = new ArrayList<>(); } + /** + * Add a listener for the update of minimum clock. + * @param callback a callback to be invoked when {@link #globalMinimumClock} increases + */ + public void addClockUpdateListener(final EventHandler callback) { + clockUpdateListeners.add(callback); + } + /** * Helper function to create broadcast global minimum clock message. */ @@ -200,6 +213,7 @@ private synchronized void tickClock(final String workerId) { if (minimumClockWorkers.size() == 0) { globalMinimumClock++; broadcastGlobalMinimumClock(); + clockUpdateListeners.forEach(callback -> callback.onNext(globalMinimumClock)); } } }