Skip to content

Commit

Permalink
[CAY-909] Notify client job progress (#1081)
Browse files Browse the repository at this point in the history
* DolphinDriverLauncher in AsyncDolphinDriver and AsyncDolphinLauncher
* divide ProgressMessageHandler from Dolphin specific driver launcher
* set StringJobMessageHandler in DriverLauncher
* formats, name of listener in ClockManager modified
  • Loading branch information
JunhoeKim authored and wynot12 committed Apr 5, 2017
1 parent 5c82692 commit 89eb3c2
Show file tree
Hide file tree
Showing 5 changed files with 317 additions and 3 deletions.
271 changes: 271 additions & 0 deletions common/src/main/java/edu/snu/cay/common/client/DriverLauncher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
/*
* Copyright (C) 2017 Seoul National University
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.snu.cay.common.client;

import org.apache.reef.annotations.audience.ClientSide;
import org.apache.reef.annotations.audience.Public;
import org.apache.reef.client.*;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.annotations.Unit;
import org.apache.reef.tang.exceptions.InjectionException;
import org.apache.reef.util.Optional;
import org.apache.reef.wake.EventHandler;

import javax.inject.Inject;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashSet;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* Improved version of {@link org.apache.reef.client.DriverLauncher}.
* It supports {@link StringJobMessageHandler}, which logs {@link JobMessage}
* with the assumption that the message type is String.
*/
@Public
@ClientSide
@Unit
public final class DriverLauncher implements AutoCloseable {

private static final Logger LOG = Logger.getLogger(DriverLauncher.class.getName());

private static final Configuration CLIENT_CONFIG = ClientConfiguration.CONF
.set(ClientConfiguration.ON_JOB_SUBMITTED, SubmittedJobHandler.class)
.set(ClientConfiguration.ON_JOB_RUNNING, RunningJobHandler.class)
.set(ClientConfiguration.ON_JOB_COMPLETED, CompletedJobHandler.class)
.set(ClientConfiguration.ON_JOB_FAILED, FailedJobHandler.class)
.set(ClientConfiguration.ON_RUNTIME_ERROR, RuntimeErrorHandler.class)
.set(ClientConfiguration.ON_JOB_MESSAGE, StringJobMessageHandler.class)
.build();

private final REEF reef;

private LauncherStatus status = LauncherStatus.INIT;

private String jobId;
private RunningJob theJob;

@Inject
private DriverLauncher(final REEF reef) {
this.reef = reef;
}

/**
* Instantiate a launcher for the given Configuration.
*
* @param runtimeConfiguration the resourcemanager configuration to be used
* @return a DriverLauncher based on the given resourcemanager configuration
* @throws InjectionException on configuration errors
*/
public static DriverLauncher getLauncher(final Configuration runtimeConfiguration) throws InjectionException {
return Tang.Factory.getTang()
.newInjector(runtimeConfiguration, CLIENT_CONFIG)
.getInstance(DriverLauncher.class);
}

/**
* Kills the running job.
*/
@Override
public void close() {
synchronized (this) {
LOG.log(Level.FINER, "Close launcher: job {0} with status {1}", new Object[] {this.theJob, this.status});
if (this.status.isRunning()) {
this.status = LauncherStatus.FORCE_CLOSED;
}
if (null != this.theJob) {
this.theJob.close();
}
this.notify();
}
LOG.log(Level.FINEST, "Close launcher: shutdown REEF");
this.reef.close();
LOG.log(Level.FINEST, "Close launcher: done");
}

/**
* Submit REEF job asynchronously and do not wait for its completion.
*
* @param driverConfig configuration of hte driver to submit to the RM.
* @return ID of the new application.
*/
public String submit(final Configuration driverConfig, final long waitTime) {
this.reef.submit(driverConfig);
this.waitForStatus(waitTime, LauncherStatus.SUBMITTED);
return this.jobId;
}

/**
* Wait for one of the specified statuses of the REEF job.
* This method is called after the job is submitted to the RM via submit().
* @param waitTime wait time in milliseconds.
* @param statuses array of statuses to wait for.
* @return the state of the job after the wait.
*/
public LauncherStatus waitForStatus(final long waitTime, final LauncherStatus... statuses) {

final long endTime = System.currentTimeMillis() + waitTime;

final HashSet<LauncherStatus> statSet = new HashSet<>(statuses.length * 2);
Collections.addAll(statSet, statuses);
Collections.addAll(statSet, LauncherStatus.FAILED, LauncherStatus.FORCE_CLOSED);

LOG.log(Level.FINEST, "Wait for status: {0}", statSet);
final LauncherStatus finalStatus;

synchronized (this) {
while (!statSet.contains(this.status)) {
try {
final long delay = endTime - System.currentTimeMillis();
if (delay <= 0) {
break;
}
LOG.log(Level.FINE, "Wait for {0} milliSeconds", delay);
this.wait(delay);
} catch (final InterruptedException ex) {
LOG.log(Level.FINE, "Interrupted: {0}", ex);
}
}

finalStatus = this.status;
}

LOG.log(Level.FINEST, "Final status: {0}", finalStatus);
return finalStatus;
}

/**
* Run a job with a waiting timeout after which it will be killed, if it did not complete yet.
*
* @param driverConfig the configuration for the driver. See DriverConfiguration for details.
* @param timeOut timeout on the job.
* @return the state of the job after execution.
*/
public LauncherStatus run(final Configuration driverConfig, final long timeOut) {

final long startTime = System.currentTimeMillis();

this.reef.submit(driverConfig);
this.waitForStatus(timeOut - System.currentTimeMillis() + startTime, LauncherStatus.COMPLETED);

if (System.currentTimeMillis() - startTime >= timeOut) {
LOG.log(Level.WARNING, "The Job timed out.");
synchronized (this) {
this.status = LauncherStatus.FORCE_CLOSED;
}
}

this.reef.close();
return this.getStatus();
}

/**
* @return the current status of the job.
*/
public synchronized LauncherStatus getStatus() {
return this.status;
}

/**
* Update job status and notify the waiting thread.
*/
public synchronized void setStatusAndNotify(final LauncherStatus newStatus) {
LOG.log(Level.FINEST, "Set status: {0} -> {1}", new Object[] {this.status, newStatus});
this.status = newStatus;
this.notify();
}

@Override
public String toString() {
return String.format("DriverLauncher: { jobId: %s, status: %s }", this.jobId, this.status);
}

/**
* Job driver notifies us that the job has been submitted to the Resource Manager.
*/
public final class SubmittedJobHandler implements EventHandler<SubmittedJob> {
@Override
public void onNext(final SubmittedJob job) {
LOG.log(Level.INFO, "REEF job submitted: {0}.", job.getId());
jobId = job.getId();
setStatusAndNotify(LauncherStatus.SUBMITTED);
}
}

/**
* Job driver notifies us that the job is running.
*/
public final class RunningJobHandler implements EventHandler<RunningJob> {
@Override
public void onNext(final RunningJob job) {
LOG.log(Level.INFO, "The Job {0} is running.", job.getId());
theJob = job;
setStatusAndNotify(LauncherStatus.RUNNING);
}
}

/**
* Job driver notifies us that the job had failed.
*/
public final class FailedJobHandler implements EventHandler<FailedJob> {
@Override
public void onNext(final FailedJob job) {
final Optional<Throwable> ex = job.getReason();
LOG.log(Level.SEVERE, "Received an error for job " + job.getId(), ex);
theJob = null;
setStatusAndNotify(LauncherStatus.failed(ex));
}
}

/**
* Job driver notifies us that the job had completed successfully.
*/
public final class CompletedJobHandler implements EventHandler<CompletedJob> {
@Override
public void onNext(final CompletedJob job) {
LOG.log(Level.INFO, "The Job {0} is done.", job.getId());
theJob = null;
setStatusAndNotify(LauncherStatus.COMPLETED);
}
}

/**
* Handler an error in the job driver.
*/
public final class RuntimeErrorHandler implements EventHandler<FailedRuntime> {
@Override
public void onNext(final FailedRuntime error) {
LOG.log(Level.SEVERE, "Received a resource manager error", error.getReason());
theJob = null;
setStatusAndNotify(LauncherStatus.failed(error.getReason()));
}
}

/**
* Handler of {@link JobMessage} from driver.
* It logs job message to console.
*/
public final class StringJobMessageHandler implements EventHandler<JobMessage> {

@Override
public void onNext(final JobMessage message) {
final String decodedMessage = new String(message.get(), StandardCharsets.UTF_8);
LOG.log(Level.INFO, "Job message: {0}", decodedMessage);
}
}
}
19 changes: 19 additions & 0 deletions common/src/main/java/edu/snu/cay/common/client/package-info.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright (C) 2017 Seoul National University
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Common client classes.
*/
package edu.snu.cay.common.client;
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import edu.snu.cay.utils.trace.HTraceParameters;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.driver.ProgressProvider;
import org.apache.reef.driver.client.JobMessageObserver;
import org.apache.reef.driver.context.ActiveContext;
import org.apache.reef.driver.context.ClosedContext;
import org.apache.reef.driver.context.ContextConfiguration;
Expand Down Expand Up @@ -79,6 +80,7 @@

import javax.inject.Inject;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -293,6 +295,7 @@ public final class AsyncDolphinDriver {
*/
@Inject
private AsyncDolphinDriver(final EvaluatorManager evaluatorManager,
final JobMessageObserver jobMessageObserver,
final DataLoadingService dataLoadingService,
final SynchronizationManager synchronizationManager,
final ClockManager clockManager,
Expand Down Expand Up @@ -340,6 +343,14 @@ private AsyncDolphinDriver(final EvaluatorManager evaluatorManager,
this.optimizationIntervalMs = optimizationIntervalMs;
this.maxNumEpochs = maxNumEpochs;

// send JobMessage about epoch progress to client
// it utilizes the minimum clock maintained by ClockManager
this.clockManager.addClockUpdateListener(epochIdx -> {
final String progressMessage = String.format("Progress epoch count is [%d / %d]", epochIdx, maxNumEpochs);
final byte[] encodedProgressMessage = progressMessage.getBytes(StandardCharsets.UTF_8);
jobMessageObserver.sendMessageToClient(encodedProgressMessage);
});

try {
final Injector workerInjector = injector.forkInjector();
workerInjector.bindVolatileInstance(EMDeleteExecutor.class, new WorkerRemover());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package edu.snu.cay.dolphin.async;

import edu.snu.cay.common.client.DriverLauncher;
import edu.snu.cay.common.dataloader.TextInputFormat;
import edu.snu.cay.dolphin.async.metric.*;
import edu.snu.cay.dolphin.async.dashboard.DashboardConfProvider;
Expand Down Expand Up @@ -56,9 +57,7 @@
import edu.snu.cay.utils.trace.parameters.ReceiverPort;
import edu.snu.cay.utils.trace.parameters.ReceiverType;
import org.apache.reef.annotations.audience.ClientSide;
import org.apache.reef.client.DriverConfiguration;
import org.apache.reef.client.DriverLauncher;
import org.apache.reef.client.LauncherStatus;
import org.apache.reef.client.*;
import org.apache.reef.io.network.naming.LocalNameResolverConfiguration;
import org.apache.reef.io.network.naming.NameServerConfiguration;
import org.apache.reef.io.network.util.StringIdentifierFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ public final class ClockManager {
*/
private int globalMinimumClock;

/**
* The list of listeners for the update of {@link #globalMinimumClock}.
*/
private final List<EventHandler<Integer>> clockUpdateListeners = new ArrayList<>();

@Inject
private ClockManager(final AggregationMaster aggregationMaster,
final ClockMsgCodec codec,
Expand All @@ -84,6 +89,14 @@ private ClockManager(final AggregationMaster aggregationMaster,
minimumClockWorkers = new ArrayList<>();
}

/**
* Add a listener for the update of minimum clock.
* @param callback a callback to be invoked when {@link #globalMinimumClock} increases
*/
public void addClockUpdateListener(final EventHandler<Integer> callback) {
clockUpdateListeners.add(callback);
}

/**
* Helper function to create broadcast global minimum clock message.
*/
Expand Down Expand Up @@ -200,6 +213,7 @@ private synchronized void tickClock(final String workerId) {
if (minimumClockWorkers.size() == 0) {
globalMinimumClock++;
broadcastGlobalMinimumClock();
clockUpdateListeners.forEach(callback -> callback.onNext(globalMinimumClock));
}
}
}
Expand Down

0 comments on commit 89eb3c2

Please sign in to comment.