From 0a11da1c337886bf471864ec84b7f60707fdd24d Mon Sep 17 00:00:00 2001 From: Lemarais Date: Sun, 29 Aug 2021 06:14:54 +0900 Subject: [PATCH 1/7] add ClientRPC interface --- .../runtime/common/message/ClientRPC.java | 142 +------------- .../runtime/common/message/ClientRPCImpl.java | 181 ++++++++++++++++++ 2 files changed, 189 insertions(+), 134 deletions(-) create mode 100644 runtime/common/src/main/java/org/apache/nemo/runtime/common/message/ClientRPCImpl.java diff --git a/runtime/common/src/main/java/org/apache/nemo/runtime/common/message/ClientRPC.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/message/ClientRPC.java index 4317bb1553..34c830bb26 100644 --- a/runtime/common/src/main/java/org/apache/nemo/runtime/common/message/ClientRPC.java +++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/message/ClientRPC.java @@ -18,57 +18,15 @@ */ package org.apache.nemo.runtime.common.message; -import com.google.protobuf.InvalidProtocolBufferException; -import org.apache.nemo.conf.JobConf; import org.apache.nemo.runtime.common.comm.ControlMessage; -import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.tang.annotations.DefaultImplementation; import org.apache.reef.wake.EventHandler; -import org.apache.reef.wake.impl.SyncStage; -import org.apache.reef.wake.remote.Encoder; -import org.apache.reef.wake.remote.address.LocalAddressProvider; -import org.apache.reef.wake.remote.impl.TransportEvent; -import org.apache.reef.wake.remote.transport.Link; -import org.apache.reef.wake.remote.transport.LinkListener; -import org.apache.reef.wake.remote.transport.Transport; -import org.apache.reef.wake.remote.transport.TransportFactory; - -import javax.inject.Inject; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; /** - * Driver-side RPC implementation for communication from/to Nemo Client. + * This class handles communication from/to clients. */ -public final class ClientRPC { - private static final DriverToClientMessageEncoder ENCODER = new DriverToClientMessageEncoder(); - private static final ClientRPCLinkListener LINK_LISTENER = new ClientRPCLinkListener(); - private static final int RETRY_COUNT = 10; - private static final int RETRY_TIMEOUT = 100; - - private final Map> - handlers = new ConcurrentHashMap<>(); - private final Transport transport; - private final Link link; - private volatile boolean isClosed = false; - - @Inject - private ClientRPC(final TransportFactory transportFactory, - final LocalAddressProvider localAddressProvider, - @Parameter(JobConf.ClientSideRPCServerHost.class) final String clientHost, - @Parameter(JobConf.ClientSideRPCServerPort.class) final int clientPort) { - transport = transportFactory.newInstance(localAddressProvider.getLocalAddress(), - 0, new SyncStage<>(new RPCEventHandler()), null, RETRY_COUNT, RETRY_TIMEOUT); - final SocketAddress clientAddress = new InetSocketAddress(clientHost, clientPort); - try { - link = transport.open(clientAddress, ENCODER, LINK_LISTENER); - } catch (final IOException e) { - throw new IllegalStateException("Failed to setup an RPC connection to the Client. " - + "A failure at the client-side is suspected."); - } - } +@DefaultImplementation(ClientRPCImpl.class) +public interface ClientRPC { /** * Registers handler for the given type of message. @@ -77,102 +35,18 @@ private ClientRPC(final TransportFactory transportFactory, * @param handler handler implementation * @return {@code this} */ - public ClientRPC registerHandler(final ControlMessage.ClientToDriverMessageType type, - final EventHandler handler) { - if (handlers.putIfAbsent(type, handler) != null) { - throw new RuntimeException(String.format("A handler for %s already registered", type)); - } - return this; - } + ClientRPC registerHandler(final ControlMessage.ClientToDriverMessageType type, + final EventHandler handler); /** * Shuts down the transport. */ - public void shutdown() { - ensureRunning(); - try { - transport.close(); - } catch (final Exception e) { - throw new RuntimeException(e); - } finally { - isClosed = true; - } - } + void shutdown(); /** * Write message to client. * * @param message message to send. */ - public void send(final ControlMessage.DriverToClientMessage message) { - ensureRunning(); - link.write(message); - } - - /** - * Handles message from client. - * - * @param message message to process - */ - private void handleMessage(final ControlMessage.ClientToDriverMessage message) { - final ControlMessage.ClientToDriverMessageType type = message.getType(); - final EventHandler handler = handlers.get(type); - if (handler == null) { - throw new RuntimeException(String.format("Handler for message type %s not registered", type)); - } else { - handler.onNext(message); - } - } - - /** - * Provides event handler for messages from client. - */ - private final class RPCEventHandler implements EventHandler { - @Override - public void onNext(final TransportEvent transportEvent) { - try { - final byte[] data = transportEvent.getData(); - final ControlMessage.ClientToDriverMessage message = ControlMessage.ClientToDriverMessage.parseFrom(data); - handleMessage(message); - } catch (final InvalidProtocolBufferException e) { - throw new RuntimeException(e); - } - } - } - - /** - * Ensure the Transport is running. - */ - private void ensureRunning() { - if (isClosed) { - throw new RuntimeException("The ClientRPC is already closed"); - } - } - - /** - * Provides encoder for {@link org.apache.nemo.runtime.common.comm.ControlMessage.DriverToClientMessage}. - */ - private static final class DriverToClientMessageEncoder implements Encoder { - @Override - public byte[] encode(final ControlMessage.DriverToClientMessage driverToClientMessage) { - return driverToClientMessage.toByteArray(); - } - } - - /** - * Provides {@link LinkListener}. - */ - private static final class ClientRPCLinkListener implements LinkListener { - - @Override - public void onSuccess(final ControlMessage.DriverToClientMessage driverToClientMessage) { - } - - @Override - public void onException(final Throwable throwable, - final SocketAddress socketAddress, - final ControlMessage.DriverToClientMessage driverToClientMessage) { - throw new RuntimeException(throwable); - } - } + void send(final ControlMessage.DriverToClientMessage message); } diff --git a/runtime/common/src/main/java/org/apache/nemo/runtime/common/message/ClientRPCImpl.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/message/ClientRPCImpl.java new file mode 100644 index 0000000000..cfce019844 --- /dev/null +++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/message/ClientRPCImpl.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nemo.runtime.common.message; + +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.nemo.conf.JobConf; +import org.apache.nemo.runtime.common.comm.ControlMessage; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.impl.SyncStage; +import org.apache.reef.wake.remote.Encoder; +import org.apache.reef.wake.remote.address.LocalAddressProvider; +import org.apache.reef.wake.remote.impl.TransportEvent; +import org.apache.reef.wake.remote.transport.Link; +import org.apache.reef.wake.remote.transport.LinkListener; +import org.apache.reef.wake.remote.transport.Transport; +import org.apache.reef.wake.remote.transport.TransportFactory; + +import javax.inject.Inject; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Driver-side RPC implementation for communication from/to Nemo Client. + */ +public final class ClientRPCImpl implements ClientRPC { + private static final DriverToClientMessageEncoder ENCODER = new DriverToClientMessageEncoder(); + private static final ClientRPCLinkListener LINK_LISTENER = new ClientRPCLinkListener(); + private static final int RETRY_COUNT = 10; + private static final int RETRY_TIMEOUT = 100; + + private final Map> + handlers = new ConcurrentHashMap<>(); + private final Transport transport; + private final Link link; + private volatile boolean isClosed = false; + + @Inject + private ClientRPCImpl(final TransportFactory transportFactory, + final LocalAddressProvider localAddressProvider, + @Parameter(JobConf.ClientSideRPCServerHost.class) final String clientHost, + @Parameter(JobConf.ClientSideRPCServerPort.class) final int clientPort) { + transport = transportFactory.newInstance(localAddressProvider.getLocalAddress(), + 0, new SyncStage<>(new RPCEventHandler()), null, RETRY_COUNT, RETRY_TIMEOUT); + final SocketAddress clientAddress = new InetSocketAddress(clientHost, clientPort); + try { + link = transport.open(clientAddress, ENCODER, LINK_LISTENER); + } catch (final IOException e) { + throw new IllegalStateException("Failed to setup an RPC connection to the Client. " + + "A failure at the client-side is suspected."); + } + } + + /** + * Registers handler for the given type of message. + * + * @param type the type of message + * @param handler handler implementation + * @return {@code this} + */ + @Override + public ClientRPC registerHandler(final ControlMessage.ClientToDriverMessageType type, + final EventHandler handler) { + if (handlers.putIfAbsent(type, handler) != null) { + throw new RuntimeException(String.format("A handler for %s already registered", type)); + } + return this; + } + + /** + * Shuts down the transport. + */ + @Override + public void shutdown() { + ensureRunning(); + try { + transport.close(); + } catch (final Exception e) { + throw new RuntimeException(e); + } finally { + isClosed = true; + } + } + + /** + * Write message to client. + * + * @param message message to send. + */ + @Override + public void send(final ControlMessage.DriverToClientMessage message) { + ensureRunning(); + link.write(message); + } + + /** + * Handles message from client. + * + * @param message message to process + */ + private void handleMessage(final ControlMessage.ClientToDriverMessage message) { + final ControlMessage.ClientToDriverMessageType type = message.getType(); + final EventHandler handler = handlers.get(type); + if (handler == null) { + throw new RuntimeException(String.format("Handler for message type %s not registered", type)); + } else { + handler.onNext(message); + } + } + + /** + * Provides event handler for messages from client. + */ + private final class RPCEventHandler implements EventHandler { + @Override + public void onNext(final TransportEvent transportEvent) { + try { + final byte[] data = transportEvent.getData(); + final ControlMessage.ClientToDriverMessage message = ControlMessage.ClientToDriverMessage.parseFrom(data); + handleMessage(message); + } catch (final InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } + } + + /** + * Ensure the Transport is running. + */ + private void ensureRunning() { + if (isClosed) { + throw new RuntimeException("The ClientRPC is already closed"); + } + } + + /** + * Provides encoder for {@link org.apache.nemo.runtime.common.comm.ControlMessage.DriverToClientMessage}. + */ + private static final class DriverToClientMessageEncoder implements Encoder { + @Override + public byte[] encode(final ControlMessage.DriverToClientMessage driverToClientMessage) { + return driverToClientMessage.toByteArray(); + } + } + + /** + * Provides {@link LinkListener}. + */ + private static final class ClientRPCLinkListener implements LinkListener { + + @Override + public void onSuccess(final ControlMessage.DriverToClientMessage driverToClientMessage) { + } + + @Override + public void onException(final Throwable throwable, + final SocketAddress socketAddress, + final ControlMessage.DriverToClientMessage driverToClientMessage) { + throw new RuntimeException(throwable); + } + } +} From f7e3b5e8f7814f94d87922a067a5ff91b8501df7 Mon Sep 17 00:00:00 2001 From: Lemarais Date: Mon, 30 Aug 2021 01:20:52 +0900 Subject: [PATCH 2/7] Modifying to allow selection of JobLauncher in NemoRunner --- .../apache/nemo/client/beam/NemoRunner.java | 22 ++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/client/src/main/java/org/apache/nemo/client/beam/NemoRunner.java b/client/src/main/java/org/apache/nemo/client/beam/NemoRunner.java index bf3323dbb2..5b30b95a47 100644 --- a/client/src/main/java/org/apache/nemo/client/beam/NemoRunner.java +++ b/client/src/main/java/org/apache/nemo/client/beam/NemoRunner.java @@ -24,9 +24,13 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.nemo.client.JobLauncher; +import org.apache.nemo.common.ir.IRDAG; import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions; import org.apache.nemo.compiler.frontend.beam.PipelineVisitor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.lang.reflect.InvocationTargetException; import java.util.concurrent.CompletableFuture; /** @@ -34,6 +38,7 @@ */ public final class NemoRunner extends PipelineRunner { private final NemoPipelineOptions nemoPipelineOptions; + private static Class jobLauncher = JobLauncher.class; /** * BEAM Pipeline Runner. @@ -86,9 +91,20 @@ public NemoPipelineResult run(final Pipeline pipeline) { final PipelineVisitor pipelineVisitor = new PipelineVisitor(pipeline, nemoPipelineOptions); pipeline.traverseTopologically(pipelineVisitor); final NemoPipelineResult nemoPipelineResult = new NemoPipelineResult(); - CompletableFuture.runAsync(() -> - JobLauncher.launchDAG(pipelineVisitor.getConvertedPipeline(), nemoPipelineOptions.getJobName())) - .thenRun(nemoPipelineResult::setJobDone); + + CompletableFuture.runAsync(() -> { + try { + Class[] methodParamClass = new Class[] {IRDAG.class, String.class}; + Object[] methodParamObejct = new Object[] {pipelineVisitor.getConvertedPipeline(), nemoPipelineOptions.getJobName()}; + jobLauncher.getMethod("launchDAG", methodParamClass).invoke(null, methodParamObejct); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + e.printStackTrace(); + } + }).thenRun(nemoPipelineResult::setJobDone); return nemoPipelineResult; } + + public static void setJobLauncher(final Class jobLauncher) { + NemoRunner.jobLauncher = jobLauncher; + } } From 1f063929774d900cc4dac9cf1dfc1cb0b6cec9c7 Mon Sep 17 00:00:00 2001 From: Lemarais Date: Mon, 30 Aug 2021 02:03:20 +0900 Subject: [PATCH 3/7] add node spec and node json as a job config --- bin/run_beam_simulator.sh | 24 +++++++++++++++++++ checkstyle.license | 18 -------------- .../java/org/apache/nemo/conf/JobConf.java | 8 +++++++ 3 files changed, 32 insertions(+), 18 deletions(-) create mode 100755 bin/run_beam_simulator.sh delete mode 100644 checkstyle.license diff --git a/bin/run_beam_simulator.sh b/bin/run_beam_simulator.sh new file mode 100755 index 0000000000..80b1db7f0d --- /dev/null +++ b/bin/run_beam_simulator.sh @@ -0,0 +1,24 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +VERSION=$(mvn -q \ + -Dexec.executable=echo -Dexec.args='${project.version}' \ + --non-recursive exec:exec) + +java -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8000 -Dlog4j.configuration=file://`pwd`/log4j.properties -cp examples/beam/target/nemo-examples-beam-${VERSION}-shaded.jar:client/target/nemo-client-${VERSION}-shaded.jar:`$YARN_HOME/bin/yarn classpath` org.apache.nemo.client.SimulatorLauncher "$@" diff --git a/checkstyle.license b/checkstyle.license deleted file mode 100644 index 5d89b37d73..0000000000 --- a/checkstyle.license +++ /dev/null @@ -1,18 +0,0 @@ -(\s*[/\*|\/\/|