-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CAY-909] Notify client job progress #1081
Changes from 12 commits
c7d3688
ca8427c
8ea7311
59e7b0a
f73d6d9
6251a9f
5f8a779
233af49
3d8ef07
7255025
150a856
5401742
e3359f7
6815a45
76e02c6
2df8d37
d4d4bf3
c4b2a06
6e0e9aa
d4e5e28
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,267 @@ | ||
/* | ||
* Copyright (C) 2017 Seoul National University | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package edu.snu.cay.common; | ||
|
||
import org.apache.reef.annotations.audience.ClientSide; | ||
import org.apache.reef.annotations.audience.Public; | ||
import org.apache.reef.client.*; | ||
import org.apache.reef.tang.Configuration; | ||
import org.apache.reef.tang.Tang; | ||
import org.apache.reef.tang.annotations.Unit; | ||
import org.apache.reef.tang.exceptions.InjectionException; | ||
import org.apache.reef.util.Optional; | ||
import org.apache.reef.wake.EventHandler; | ||
|
||
import javax.inject.Inject; | ||
import java.nio.charset.StandardCharsets; | ||
import java.util.Collections; | ||
import java.util.HashSet; | ||
import java.util.logging.Level; | ||
import java.util.logging.Logger; | ||
|
||
/** | ||
* A launcher for Dolphin driver. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This class is not specific for Dolphin. You may use the comment of REEF's |
||
*/ | ||
@Public | ||
@ClientSide | ||
@Unit | ||
public final class DriverLauncher implements AutoCloseable { | ||
|
||
private static final Logger LOG = Logger.getLogger(DriverLauncher.class.getName()); | ||
|
||
private static final Configuration CLIENT_CONFIG = ClientConfiguration.CONF | ||
.set(ClientConfiguration.ON_JOB_SUBMITTED, SubmittedJobHandler.class) | ||
.set(ClientConfiguration.ON_JOB_RUNNING, RunningJobHandler.class) | ||
.set(ClientConfiguration.ON_JOB_COMPLETED, CompletedJobHandler.class) | ||
.set(ClientConfiguration.ON_JOB_FAILED, FailedJobHandler.class) | ||
.set(ClientConfiguration.ON_RUNTIME_ERROR, RuntimeErrorHandler.class) | ||
.set(ClientConfiguration.ON_JOB_MESSAGE, StringJobMessageHandler.class) | ||
.build(); | ||
|
||
private final REEF reef; | ||
|
||
private LauncherStatus status = LauncherStatus.INIT; | ||
|
||
private String jobId; | ||
private RunningJob theJob; | ||
|
||
@Inject | ||
private DriverLauncher(final REEF reef) { | ||
this.reef = reef; | ||
} | ||
|
||
/** | ||
* Instantiate a launcher for the given Configuration. | ||
* | ||
* @param runtimeConfiguration the resourcemanager configuration to be used | ||
* @return a DriverLauncher based on the given resourcemanager configuration | ||
* @throws InjectionException on configuration errors | ||
*/ | ||
public static DriverLauncher getLauncher(final Configuration runtimeConfiguration) throws InjectionException { | ||
return Tang.Factory.getTang() | ||
.newInjector(runtimeConfiguration, CLIENT_CONFIG) | ||
.getInstance(DriverLauncher.class); | ||
} | ||
|
||
/** | ||
* Kills the running job. | ||
*/ | ||
@Override | ||
public void close() { | ||
synchronized (this) { | ||
LOG.log(Level.FINER, "Close launcher: job {0} with status {1}", new Object[] {this.theJob, this.status}); | ||
if (this.status.isRunning()) { | ||
this.status = LauncherStatus.FORCE_CLOSED; | ||
} | ||
if (null != this.theJob) { | ||
this.theJob.close(); | ||
} | ||
this.notify(); | ||
} | ||
LOG.log(Level.FINEST, "Close launcher: shutdown REEF"); | ||
this.reef.close(); | ||
LOG.log(Level.FINEST, "Close launcher: done"); | ||
} | ||
|
||
/** | ||
* Submit REEF job asynchronously and do not wait for its completion. | ||
* | ||
* @param driverConfig configuration of hte driver to submit to the RM. | ||
* @return ID of the new application. | ||
*/ | ||
public String submit(final Configuration driverConfig, final long waitTime) { | ||
this.reef.submit(driverConfig); | ||
this.waitForStatus(waitTime, LauncherStatus.SUBMITTED); | ||
return this.jobId; | ||
} | ||
|
||
/** | ||
* Wait for one of the specified statuses of the REEF job. | ||
* This method is called after the job is submitted to the RM via submit(). | ||
* @param waitTime wait time in milliseconds. | ||
* @param statuses array of statuses to wait for. | ||
* @return the state of the job after the wait. | ||
*/ | ||
public LauncherStatus waitForStatus(final long waitTime, final LauncherStatus... statuses) { | ||
|
||
final long endTime = System.currentTimeMillis() + waitTime; | ||
|
||
final HashSet<LauncherStatus> statSet = new HashSet<>(statuses.length * 2); | ||
Collections.addAll(statSet, statuses); | ||
Collections.addAll(statSet, LauncherStatus.FAILED, LauncherStatus.FORCE_CLOSED); | ||
|
||
LOG.log(Level.FINEST, "Wait for status: {0}", statSet); | ||
final LauncherStatus finalStatus; | ||
|
||
synchronized (this) { | ||
while (!statSet.contains(this.status)) { | ||
try { | ||
final long delay = endTime - System.currentTimeMillis(); | ||
if (delay <= 0) { | ||
break; | ||
} | ||
LOG.log(Level.FINE, "Wait for {0} milliSeconds", delay); | ||
this.wait(delay); | ||
} catch (final InterruptedException ex) { | ||
LOG.log(Level.FINE, "Interrupted: {0}", ex); | ||
} | ||
} | ||
|
||
finalStatus = this.status; | ||
} | ||
|
||
LOG.log(Level.FINEST, "Final status: {0}", finalStatus); | ||
return finalStatus; | ||
} | ||
|
||
/** | ||
* Run a job with a waiting timeout after which it will be killed, if it did not complete yet. | ||
* | ||
* @param driverConfig the configuration for the driver. See DriverConfiguration for details. | ||
* @param timeOut timeout on the job. | ||
* @return the state of the job after execution. | ||
*/ | ||
public LauncherStatus run(final Configuration driverConfig, final long timeOut) { | ||
|
||
final long startTime = System.currentTimeMillis(); | ||
|
||
this.reef.submit(driverConfig); | ||
this.waitForStatus(timeOut - System.currentTimeMillis() + startTime, LauncherStatus.COMPLETED); | ||
|
||
if (System.currentTimeMillis() - startTime >= timeOut) { | ||
LOG.log(Level.WARNING, "The Job timed out."); | ||
synchronized (this) { | ||
this.status = LauncherStatus.FORCE_CLOSED; | ||
} | ||
} | ||
|
||
this.reef.close(); | ||
return this.getStatus(); | ||
} | ||
|
||
/** | ||
* @return the current status of the job. | ||
*/ | ||
public synchronized LauncherStatus getStatus() { | ||
return this.status; | ||
} | ||
|
||
/** Update job status and notify the waiting thread. */ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Though it's copied from REEF's code, let's make it in the right form by breaking lines. |
||
public synchronized void setStatusAndNotify(final LauncherStatus newStatus) { | ||
LOG.log(Level.FINEST, "Set status: {0} -> {1}", new Object[] {this.status, newStatus}); | ||
this.status = newStatus; | ||
this.notify(); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return String.format("DriverLauncher: { jobId: %s, status: %s }", this.jobId, this.status); | ||
} | ||
|
||
/** | ||
* Job driver notifies us that the job has been submitted to the Resource Manager. | ||
*/ | ||
public final class SubmittedJobHandler implements EventHandler<SubmittedJob> { | ||
@Override | ||
public void onNext(final SubmittedJob job) { | ||
LOG.log(Level.INFO, "REEF job submitted: {0}.", job.getId()); | ||
jobId = job.getId(); | ||
setStatusAndNotify(LauncherStatus.SUBMITTED); | ||
} | ||
} | ||
|
||
/** | ||
* Job driver notifies us that the job is running. | ||
*/ | ||
public final class RunningJobHandler implements EventHandler<RunningJob> { | ||
@Override | ||
public void onNext(final RunningJob job) { | ||
LOG.log(Level.INFO, "The Job {0} is running.", job.getId()); | ||
theJob = job; | ||
setStatusAndNotify(LauncherStatus.RUNNING); | ||
} | ||
} | ||
|
||
/** | ||
* Job driver notifies us that the job had failed. | ||
*/ | ||
public final class FailedJobHandler implements EventHandler<FailedJob> { | ||
@Override | ||
public void onNext(final FailedJob job) { | ||
final Optional<Throwable> ex = job.getReason(); | ||
LOG.log(Level.SEVERE, "Received an error for job " + job.getId(), ex); | ||
theJob = null; | ||
setStatusAndNotify(LauncherStatus.failed(ex)); | ||
} | ||
} | ||
|
||
/** | ||
* Job driver notifies us that the job had completed successfully. | ||
*/ | ||
public final class CompletedJobHandler implements EventHandler<CompletedJob> { | ||
@Override | ||
public void onNext(final CompletedJob job) { | ||
LOG.log(Level.INFO, "The Job {0} is done.", job.getId()); | ||
theJob = null; | ||
setStatusAndNotify(LauncherStatus.COMPLETED); | ||
} | ||
} | ||
|
||
/** | ||
* Handler an error in the job driver. | ||
*/ | ||
public final class RuntimeErrorHandler implements EventHandler<FailedRuntime> { | ||
@Override | ||
public void onNext(final FailedRuntime error) { | ||
LOG.log(Level.SEVERE, "Received a resource manager error", error.getReason()); | ||
theJob = null; | ||
setStatusAndNotify(LauncherStatus.failed(error.getReason())); | ||
} | ||
} | ||
|
||
/** | ||
* Handler of {@link JobMessage} from driver. | ||
* It logs progress report to console. | ||
*/ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please fix the indentation. |
||
public final class StringJobMessageHandler implements EventHandler<JobMessage> { | ||
|
||
@Override | ||
public void onNext(final JobMessage message) { | ||
final String decodedMessage = new String(message.get(), StandardCharsets.UTF_8); | ||
LOG.log(Level.INFO, "Job message: {0}", decodedMessage); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
/* | ||
* Copyright (C) 2017 Seoul National University | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
/** | ||
* Common classes. | ||
*/ | ||
package edu.snu.cay.common; |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,7 @@ | |
*/ | ||
package edu.snu.cay.dolphin.async; | ||
|
||
import edu.snu.cay.common.DriverLauncher; | ||
import edu.snu.cay.common.dataloader.TextInputFormat; | ||
import edu.snu.cay.dolphin.async.metric.*; | ||
import edu.snu.cay.dolphin.async.dashboard.DashboardConfProvider; | ||
|
@@ -56,9 +57,7 @@ | |
import edu.snu.cay.utils.trace.parameters.ReceiverPort; | ||
import edu.snu.cay.utils.trace.parameters.ReceiverType; | ||
import org.apache.reef.annotations.audience.ClientSide; | ||
import org.apache.reef.client.DriverConfiguration; | ||
import org.apache.reef.client.DriverLauncher; | ||
import org.apache.reef.client.LauncherStatus; | ||
import org.apache.reef.client.*; | ||
import org.apache.reef.io.network.naming.LocalNameResolverConfiguration; | ||
import org.apache.reef.io.network.naming.NameServerConfiguration; | ||
import org.apache.reef.io.network.util.StringIdentifierFactory; | ||
|
@@ -143,7 +142,6 @@ public static LauncherStatus launch(final String jobName, | |
basicParameterInjector.getNamedInstance(LocalRuntimeMaxNumEvaluators.class), | ||
basicParameterInjector.getNamedInstance(JVMHeapSlack.class)) : | ||
getYarnRuntimeConfiguration(basicParameterInjector.getNamedInstance(JVMHeapSlack.class)); | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you undo this change, which is irrelevant from the PR? |
||
// configuration for the parameter server | ||
final boolean dynamic = basicParameterInjector.getNamedInstance(Dynamic.class); | ||
final Class<? extends PSManager> managerClass = dynamic ? | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -72,6 +72,11 @@ public final class ClockManager { | |
*/ | ||
private int globalMinimumClock; | ||
|
||
/** | ||
* The list of EventHandler about progress update. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The list of listeners for the update of {@link #globalMinimumClock}. |
||
*/ | ||
private final List<EventHandler<Integer>> progressUpdateCallbacks = new ArrayList<>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Better to rename it as |
||
|
||
@Inject | ||
private ClockManager(final AggregationMaster aggregationMaster, | ||
final ClockMsgCodec codec, | ||
|
@@ -84,6 +89,14 @@ private ClockManager(final AggregationMaster aggregationMaster, | |
minimumClockWorkers = new ArrayList<>(); | ||
} | ||
|
||
/** | ||
* Add listener to progress update list. | ||
* @param callback when #globalMinimumClock increases, callback functions in list are called. | ||
*/ | ||
public void addProgressUpdateListener(final EventHandler<Integer> callback) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's rename it as |
||
progressUpdateCallbacks.add(callback); | ||
} | ||
|
||
/** | ||
* Helper function to create broadcast global minimum clock message. | ||
*/ | ||
|
@@ -200,6 +213,7 @@ private synchronized void tickClock(final String workerId) { | |
if (minimumClockWorkers.size() == 0) { | ||
globalMinimumClock++; | ||
broadcastGlobalMinimumClock(); | ||
progressUpdateCallbacks.forEach(callback -> callback.onNext(globalMinimumClock)); | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's put it into
edu.snu.cay.common.client
package.