Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CAY-909] Notify client job progress #1081

Merged
merged 20 commits into from
Apr 5, 2017
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
c7d3688
commit
wynot12 Mar 27, 2017
ca8427c
DolphinDriverLauncher in AsyncDolphinDriver and AsyncDolphinLauncher
kimjunhoe Mar 28, 2017
8ea7311
[CAY-909] Update AsyncDolphinDriver, AsyncDolphinLauncher, ClockMana…
kimjunhoe Mar 28, 2017
59e7b0a
devide ProgressMessageHandler from Dolphin specific driver launcher
kimjunhoe Mar 28, 2017
f73d6d9
Merge branch 'master' into notify-client-job-progress
wynot12 Mar 28, 2017
6251a9f
Add DolphintDriverLauncher class
kimjunhoe Mar 28, 2017
5f8a779
Merge branch 'notify-client-job-progress' of http://github.com/cmssnu…
kimjunhoe Mar 28, 2017
233af49
Merge branch 'master' of http://github.com/cmssnu/cay into notify-cli…
kimjunhoe Mar 28, 2017
3d8ef07
Change redundant class in DolphinDriverLauncher
kimjunhoe Mar 28, 2017
7255025
in progress
wynot12 Mar 29, 2017
150a856
set StringJobMessageHandler in DriverLauncher
kimjunhoe Mar 29, 2017
5401742
Merge branch 'master' into notify-client-job-progress
yunseong Mar 30, 2017
e3359f7
formats, name of listener in ClockManager modified
kimjunhoe Apr 3, 2017
6815a45
Merge branch 'master' of http://github.com/cmssnu/cay into notify-cli…
kimjunhoe Apr 3, 2017
76e02c6
Merge branch 'notify-client-job-progress' of http://github.com/cmssnu…
kimjunhoe Apr 3, 2017
2df8d37
Merge branch 'master' into notify-client-job-progress
wynot12 Apr 3, 2017
d4d4bf3
undo delete blank line
kimjunhoe Apr 4, 2017
c4b2a06
XMerge branch 'notify-client-job-progress' of http://github.com/cmssn…
kimjunhoe Apr 4, 2017
6e0e9aa
Minor update
wynot12 Apr 4, 2017
d4e5e28
Merge branch 'master' into notify-client-job-progress
wynot12 Apr 5, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 267 additions & 0 deletions common/src/main/java/edu/snu/cay/common/DriverLauncher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
/*
* Copyright (C) 2017 Seoul National University
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.snu.cay.common;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's put it into edu.snu.cay.common.client package.


import org.apache.reef.annotations.audience.ClientSide;
import org.apache.reef.annotations.audience.Public;
import org.apache.reef.client.*;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.annotations.Unit;
import org.apache.reef.tang.exceptions.InjectionException;
import org.apache.reef.util.Optional;
import org.apache.reef.wake.EventHandler;

import javax.inject.Inject;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashSet;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* A launcher for Dolphin driver.
Copy link
Contributor

@wynot12 wynot12 Mar 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is not specific for Dolphin.
Could you rewrite the comment?

You may use the comment of REEF's DriverLauncher and specify what you modified.

*/
@Public
@ClientSide
@Unit
public final class DriverLauncher implements AutoCloseable {

private static final Logger LOG = Logger.getLogger(DriverLauncher.class.getName());

private static final Configuration CLIENT_CONFIG = ClientConfiguration.CONF
.set(ClientConfiguration.ON_JOB_SUBMITTED, SubmittedJobHandler.class)
.set(ClientConfiguration.ON_JOB_RUNNING, RunningJobHandler.class)
.set(ClientConfiguration.ON_JOB_COMPLETED, CompletedJobHandler.class)
.set(ClientConfiguration.ON_JOB_FAILED, FailedJobHandler.class)
.set(ClientConfiguration.ON_RUNTIME_ERROR, RuntimeErrorHandler.class)
.set(ClientConfiguration.ON_JOB_MESSAGE, StringJobMessageHandler.class)
.build();

private final REEF reef;

private LauncherStatus status = LauncherStatus.INIT;

private String jobId;
private RunningJob theJob;

@Inject
private DriverLauncher(final REEF reef) {
this.reef = reef;
}

/**
* Instantiate a launcher for the given Configuration.
*
* @param runtimeConfiguration the resourcemanager configuration to be used
* @return a DriverLauncher based on the given resourcemanager configuration
* @throws InjectionException on configuration errors
*/
public static DriverLauncher getLauncher(final Configuration runtimeConfiguration) throws InjectionException {
return Tang.Factory.getTang()
.newInjector(runtimeConfiguration, CLIENT_CONFIG)
.getInstance(DriverLauncher.class);
}

/**
* Kills the running job.
*/
@Override
public void close() {
synchronized (this) {
LOG.log(Level.FINER, "Close launcher: job {0} with status {1}", new Object[] {this.theJob, this.status});
if (this.status.isRunning()) {
this.status = LauncherStatus.FORCE_CLOSED;
}
if (null != this.theJob) {
this.theJob.close();
}
this.notify();
}
LOG.log(Level.FINEST, "Close launcher: shutdown REEF");
this.reef.close();
LOG.log(Level.FINEST, "Close launcher: done");
}

/**
* Submit REEF job asynchronously and do not wait for its completion.
*
* @param driverConfig configuration of hte driver to submit to the RM.
* @return ID of the new application.
*/
public String submit(final Configuration driverConfig, final long waitTime) {
this.reef.submit(driverConfig);
this.waitForStatus(waitTime, LauncherStatus.SUBMITTED);
return this.jobId;
}

/**
* Wait for one of the specified statuses of the REEF job.
* This method is called after the job is submitted to the RM via submit().
* @param waitTime wait time in milliseconds.
* @param statuses array of statuses to wait for.
* @return the state of the job after the wait.
*/
public LauncherStatus waitForStatus(final long waitTime, final LauncherStatus... statuses) {

final long endTime = System.currentTimeMillis() + waitTime;

final HashSet<LauncherStatus> statSet = new HashSet<>(statuses.length * 2);
Collections.addAll(statSet, statuses);
Collections.addAll(statSet, LauncherStatus.FAILED, LauncherStatus.FORCE_CLOSED);

LOG.log(Level.FINEST, "Wait for status: {0}", statSet);
final LauncherStatus finalStatus;

synchronized (this) {
while (!statSet.contains(this.status)) {
try {
final long delay = endTime - System.currentTimeMillis();
if (delay <= 0) {
break;
}
LOG.log(Level.FINE, "Wait for {0} milliSeconds", delay);
this.wait(delay);
} catch (final InterruptedException ex) {
LOG.log(Level.FINE, "Interrupted: {0}", ex);
}
}

finalStatus = this.status;
}

LOG.log(Level.FINEST, "Final status: {0}", finalStatus);
return finalStatus;
}

/**
* Run a job with a waiting timeout after which it will be killed, if it did not complete yet.
*
* @param driverConfig the configuration for the driver. See DriverConfiguration for details.
* @param timeOut timeout on the job.
* @return the state of the job after execution.
*/
public LauncherStatus run(final Configuration driverConfig, final long timeOut) {

final long startTime = System.currentTimeMillis();

this.reef.submit(driverConfig);
this.waitForStatus(timeOut - System.currentTimeMillis() + startTime, LauncherStatus.COMPLETED);

if (System.currentTimeMillis() - startTime >= timeOut) {
LOG.log(Level.WARNING, "The Job timed out.");
synchronized (this) {
this.status = LauncherStatus.FORCE_CLOSED;
}
}

this.reef.close();
return this.getStatus();
}

/**
* @return the current status of the job.
*/
public synchronized LauncherStatus getStatus() {
return this.status;
}

/** Update job status and notify the waiting thread. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though it's copied from REEF's code, let's make it in the right form by breaking lines.

public synchronized void setStatusAndNotify(final LauncherStatus newStatus) {
LOG.log(Level.FINEST, "Set status: {0} -> {1}", new Object[] {this.status, newStatus});
this.status = newStatus;
this.notify();
}

@Override
public String toString() {
return String.format("DriverLauncher: { jobId: %s, status: %s }", this.jobId, this.status);
}

/**
* Job driver notifies us that the job has been submitted to the Resource Manager.
*/
public final class SubmittedJobHandler implements EventHandler<SubmittedJob> {
@Override
public void onNext(final SubmittedJob job) {
LOG.log(Level.INFO, "REEF job submitted: {0}.", job.getId());
jobId = job.getId();
setStatusAndNotify(LauncherStatus.SUBMITTED);
}
}

/**
* Job driver notifies us that the job is running.
*/
public final class RunningJobHandler implements EventHandler<RunningJob> {
@Override
public void onNext(final RunningJob job) {
LOG.log(Level.INFO, "The Job {0} is running.", job.getId());
theJob = job;
setStatusAndNotify(LauncherStatus.RUNNING);
}
}

/**
* Job driver notifies us that the job had failed.
*/
public final class FailedJobHandler implements EventHandler<FailedJob> {
@Override
public void onNext(final FailedJob job) {
final Optional<Throwable> ex = job.getReason();
LOG.log(Level.SEVERE, "Received an error for job " + job.getId(), ex);
theJob = null;
setStatusAndNotify(LauncherStatus.failed(ex));
}
}

/**
* Job driver notifies us that the job had completed successfully.
*/
public final class CompletedJobHandler implements EventHandler<CompletedJob> {
@Override
public void onNext(final CompletedJob job) {
LOG.log(Level.INFO, "The Job {0} is done.", job.getId());
theJob = null;
setStatusAndNotify(LauncherStatus.COMPLETED);
}
}

/**
* Handler an error in the job driver.
*/
public final class RuntimeErrorHandler implements EventHandler<FailedRuntime> {
@Override
public void onNext(final FailedRuntime error) {
LOG.log(Level.SEVERE, "Received a resource manager error", error.getReason());
theJob = null;
setStatusAndNotify(LauncherStatus.failed(error.getReason()));
}
}

/**
* Handler of {@link JobMessage} from driver.
* It logs progress report to console.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix the indentation.

public final class StringJobMessageHandler implements EventHandler<JobMessage> {

@Override
public void onNext(final JobMessage message) {
final String decodedMessage = new String(message.get(), StandardCharsets.UTF_8);
LOG.log(Level.INFO, "Job message: {0}", decodedMessage);
}
}
}
19 changes: 19 additions & 0 deletions common/src/main/java/edu/snu/cay/common/package-info.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright (C) 2017 Seoul National University
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Common classes.
*/
package edu.snu.cay.common;
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import edu.snu.cay.utils.trace.HTraceParameters;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.driver.ProgressProvider;
import org.apache.reef.driver.client.JobMessageObserver;
import org.apache.reef.driver.context.ActiveContext;
import org.apache.reef.driver.context.ClosedContext;
import org.apache.reef.driver.context.ContextConfiguration;
Expand Down Expand Up @@ -79,6 +80,7 @@

import javax.inject.Inject;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -293,6 +295,7 @@ public final class AsyncDolphinDriver {
*/
@Inject
private AsyncDolphinDriver(final EvaluatorManager evaluatorManager,
final JobMessageObserver jobMessageObserver,
final DataLoadingService dataLoadingService,
final SynchronizationManager synchronizationManager,
final ClockManager clockManager,
Expand Down Expand Up @@ -340,6 +343,13 @@ private AsyncDolphinDriver(final EvaluatorManager evaluatorManager,
this.optimizationIntervalMs = optimizationIntervalMs;
this.maxNumEpochs = maxNumEpochs;

// send JobMessage to client
this.clockManager.addProgressUpdateListener(epochIdx -> {
final String progressMessage = String.format("Progress epoch count is [%d / %d]", epochIdx, maxNumEpochs);
final byte[] encodedProgressMessage = progressMessage.getBytes(StandardCharsets.UTF_8);
jobMessageObserver.sendMessageToClient(encodedProgressMessage);
});

try {
final Injector workerInjector = injector.forkInjector();
workerInjector.bindVolatileInstance(EMDeleteExecutor.class, new WorkerRemover());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package edu.snu.cay.dolphin.async;

import edu.snu.cay.common.DriverLauncher;
import edu.snu.cay.common.dataloader.TextInputFormat;
import edu.snu.cay.dolphin.async.metric.*;
import edu.snu.cay.dolphin.async.dashboard.DashboardConfProvider;
Expand Down Expand Up @@ -56,9 +57,7 @@
import edu.snu.cay.utils.trace.parameters.ReceiverPort;
import edu.snu.cay.utils.trace.parameters.ReceiverType;
import org.apache.reef.annotations.audience.ClientSide;
import org.apache.reef.client.DriverConfiguration;
import org.apache.reef.client.DriverLauncher;
import org.apache.reef.client.LauncherStatus;
import org.apache.reef.client.*;
import org.apache.reef.io.network.naming.LocalNameResolverConfiguration;
import org.apache.reef.io.network.naming.NameServerConfiguration;
import org.apache.reef.io.network.util.StringIdentifierFactory;
Expand Down Expand Up @@ -143,7 +142,6 @@ public static LauncherStatus launch(final String jobName,
basicParameterInjector.getNamedInstance(LocalRuntimeMaxNumEvaluators.class),
basicParameterInjector.getNamedInstance(JVMHeapSlack.class)) :
getYarnRuntimeConfiguration(basicParameterInjector.getNamedInstance(JVMHeapSlack.class));

Copy link
Contributor

@wynot12 wynot12 Mar 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you undo this change, which is irrelevant from the PR?

// configuration for the parameter server
final boolean dynamic = basicParameterInjector.getNamedInstance(Dynamic.class);
final Class<? extends PSManager> managerClass = dynamic ?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ public final class ClockManager {
*/
private int globalMinimumClock;

/**
* The list of EventHandler about progress update.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The list of listeners for the update of {@link #globalMinimumClock}.

*/
private final List<EventHandler<Integer>> progressUpdateCallbacks = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to rename it as clockUpdateListeners.


@Inject
private ClockManager(final AggregationMaster aggregationMaster,
final ClockMsgCodec codec,
Expand All @@ -84,6 +89,14 @@ private ClockManager(final AggregationMaster aggregationMaster,
minimumClockWorkers = new ArrayList<>();
}

/**
* Add listener to progress update list.
* @param callback when #globalMinimumClock increases, callback functions in list are called.
*/
public void addProgressUpdateListener(final EventHandler<Integer> callback) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's rename it as addClockUpdateListener.

progressUpdateCallbacks.add(callback);
}

/**
* Helper function to create broadcast global minimum clock message.
*/
Expand Down Expand Up @@ -200,6 +213,7 @@ private synchronized void tickClock(final String workerId) {
if (minimumClockWorkers.size() == 0) {
globalMinimumClock++;
broadcastGlobalMinimumClock();
progressUpdateCallbacks.forEach(callback -> callback.onNext(globalMinimumClock));
}
}
}
Expand Down