Skip to content

Commit

Permalink
fix(#2890): Properly cleanup WS connection (#2893)
Browse files Browse the repository at this point in the history
* fix(#2890): Properly cleanup WS connection

* Replace deprecated asset type

* Fix checkstyle

* fix(#2893): Fix checkstyle

---------

Co-authored-by: Philipp Zehnder <[email protected]>
  • Loading branch information
dominikriemer and tenthe authored May 23, 2024
1 parent d14196a commit c5867cf
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 28 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
<jakarta-servlet-api.version>6.0.0</jakarta-servlet-api.version>
<jakarta-xml-bind-api.version>4.0.0</jakarta-xml-bind-api.version>
<javax.xml.bind.version>2.4.0-b180725.0427</javax.xml.bind.version>
<java-websocket.version>1.5.0</java-websocket.version>
<java-websocket.version>1.5.6</java-websocket.version>
<jaxb-runtime.version>2.3.2</jaxb-runtime.version>
<javax-websocket-api.version>1.1</javax-websocket-api.version>
<jakarta-persistence-api.version>3.0.1</jakarta-persistence-api.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@
import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.utils.Assets;
import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;

Expand All @@ -43,7 +43,7 @@ public class ProjectionProcessor extends StreamPipesDataProcessor {
public DataProcessorDescription declareModel() {
return ProcessingElementBuilder.create("org.apache.streampipes.processors.filters.jvm.project")
.category(DataProcessorType.TRANSFORM)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
.withLocales(Locales.EN)
.requiredStream(StreamRequirementsBuilder
.create()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,21 @@
import org.java_websocket.WebSocket;
import org.java_websocket.handshake.ClientHandshake;
import org.java_websocket.server.WebSocketServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.util.Map;

public class SocketServer extends WebSocketServer {

private JsonDataFormatDefinition dataFormatDefinition;
private static final Logger LOG = LoggerFactory.getLogger(SocketServer.class);

private final JsonDataFormatDefinition dataFormatDefinition;

public SocketServer(int port) {
super(new InetSocketAddress(port));
setReuseAddr(true);
dataFormatDefinition = new JsonDataFormatDefinition();
}

Expand All @@ -42,27 +47,27 @@ public void onOpen(WebSocket conn, ClientHandshake handshake) {
conn.send("Welcome!"); //This method sends a message to the new client
broadcast(
"New connection: " + handshake.getResourceDescriptor()); //This method sends a message to all clients connected
System.out.println(conn.getRemoteSocketAddress().getAddress().getHostAddress() + " connected.");
LOG.info("{} connected.", conn.getRemoteSocketAddress().getAddress().getHostAddress());
}

@Override
public void onClose(WebSocket conn, int code, String reason, boolean remote) {
// do nothing special
LOG.info("{} closed.", conn.getRemoteSocketAddress().getAddress().getHostAddress());
}

@Override
public void onMessage(WebSocket conn, String message) {
System.out.println(conn + ": " + message);
LOG.debug(" {}: {}", conn, message);
}

@Override
public void onError(WebSocket conn, Exception ex) {
ex.printStackTrace();
LOG.error("Error in websocket connection", ex);
}

@Override
public void onStart() {
System.out.println("Server started!");
LOG.info("Server started at port {}", getPort());
setConnectionLostTimeout(0);
setConnectionLostTimeout(100);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,46 +19,46 @@
package org.apache.streampipes.sinks.brokers.jvm.websocket;

import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink;
import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration;
import org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext;
import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters;
import org.apache.streampipes.model.DataSinkType;
import org.apache.streampipes.model.graph.DataSinkDescription;
import org.apache.streampipes.model.extensions.ExtensionAssetType;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.sdk.builder.DataSinkBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.utils.Assets;
import org.apache.streampipes.wrapper.params.compat.SinkParams;
import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;

import java.io.IOException;


public class WebsocketServerSink extends StreamPipesDataSink {
public class WebsocketServerSink implements IStreamPipesDataSink {

private static final String PORT_KEY = "port";
private Integer port;

private SocketServer server;

@Override
public DataSinkDescription declareModel() {
return DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.websocket", 0)
public IDataSinkConfiguration declareConfig() {
return DataSinkConfiguration.create(
WebsocketServerSink::new,
DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.websocket", 0)
.category(DataSinkType.MESSAGING)
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
.requiredStream(StreamRequirementsBuilder
.create()
.requiredProperty(EpRequirements.anyProperty())
.build())
.requiredIntegerParameter(Labels.withId(PORT_KEY))
.build();
.build());
}

@Override
public void onInvocation(SinkParams parameters, EventSinkRuntimeContext runtimeContext) throws SpRuntimeException {
port = parameters.extractor().singleValueParameter(PORT_KEY, Integer.class);
public void onPipelineStarted(IDataSinkParameters params,
EventSinkRuntimeContext runtimeContext) {
var port = params.extractor().singleValueParameter(PORT_KEY, Integer.class);
server = new SocketServer(port);
server.setReuseAddr(true);
server.start();
Expand All @@ -70,11 +70,10 @@ public void onEvent(Event event) throws SpRuntimeException {
}

@Override
public void onDetach() throws SpRuntimeException {
public void onPipelineStopped() {
try {
server.stop();
server = null;
} catch (IOException | InterruptedException e) {
server.stop(0);
} catch (InterruptedException e) {
throw new SpRuntimeException(e.getMessage());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ protected void beforeStart() {
@Override
protected void afterStop() {
disconnectInputCollectors();
pipelineElement.onPipelineStopped();
outputCollector.disconnect();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ protected void beforeStart() {
@Override
protected void afterStop() {
inputCollectors.forEach(is -> is.unregisterConsumer(instanceId));
pipelineElement.onPipelineStopped();
postDiscard();
}
}

0 comments on commit c5867cf

Please sign in to comment.