From c5867cf18d49b3ad64d1de3451b63b5334dc479d Mon Sep 17 00:00:00 2001 From: Dominik Riemer Date: Thu, 23 May 2024 07:51:03 +0200 Subject: [PATCH] fix(#2890): Properly cleanup WS connection (#2893) * fix(#2890): Properly cleanup WS connection * Replace deprecated asset type * Fix checkstyle * fix(#2893): Fix checkstyle --------- Co-authored-by: Philipp Zehnder --- pom.xml | 2 +- .../projection/ProjectionProcessor.java | 4 +- .../brokers/jvm/websocket/SocketServer.java | 17 ++++++--- .../jvm/websocket/WebsocketServerSink.java | 37 +++++++++---------- .../StandaloneEventProcessorRuntime.java | 1 + .../runtime/StandaloneEventSinkRuntime.java | 1 + 6 files changed, 34 insertions(+), 28 deletions(-) diff --git a/pom.xml b/pom.xml index 9a713f93ea..f6b8e02d64 100644 --- a/pom.xml +++ b/pom.xml @@ -76,7 +76,7 @@ 6.0.0 4.0.0 2.4.0-b180725.0427 - 1.5.0 + 1.5.6 2.3.2 1.1 3.0.1 diff --git a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/projection/ProjectionProcessor.java b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/projection/ProjectionProcessor.java index a8642a7e9a..c933da7bf5 100644 --- a/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/projection/ProjectionProcessor.java +++ b/streampipes-extensions/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/projection/ProjectionProcessor.java @@ -22,6 +22,7 @@ import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext; import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector; import org.apache.streampipes.model.DataProcessorType; +import org.apache.streampipes.model.extensions.ExtensionAssetType; import org.apache.streampipes.model.graph.DataProcessorDescription; import org.apache.streampipes.model.runtime.Event; import org.apache.streampipes.sdk.builder.ProcessingElementBuilder; @@ -29,7 +30,6 @@ import org.apache.streampipes.sdk.helpers.EpRequirements; import org.apache.streampipes.sdk.helpers.Locales; import org.apache.streampipes.sdk.helpers.OutputStrategies; -import org.apache.streampipes.sdk.utils.Assets; import org.apache.streampipes.wrapper.params.compat.ProcessorParams; import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor; @@ -43,7 +43,7 @@ public class ProjectionProcessor extends StreamPipesDataProcessor { public DataProcessorDescription declareModel() { return ProcessingElementBuilder.create("org.apache.streampipes.processors.filters.jvm.project") .category(DataProcessorType.TRANSFORM) - .withAssets(Assets.DOCUMENTATION, Assets.ICON) + .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON) .withLocales(Locales.EN) .requiredStream(StreamRequirementsBuilder .create() diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/websocket/SocketServer.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/websocket/SocketServer.java index b42cbad301..babd6f8a3f 100644 --- a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/websocket/SocketServer.java +++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/websocket/SocketServer.java @@ -24,16 +24,21 @@ import org.java_websocket.WebSocket; import org.java_websocket.handshake.ClientHandshake; import org.java_websocket.server.WebSocketServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; import java.util.Map; public class SocketServer extends WebSocketServer { - private JsonDataFormatDefinition dataFormatDefinition; + private static final Logger LOG = LoggerFactory.getLogger(SocketServer.class); + + private final JsonDataFormatDefinition dataFormatDefinition; public SocketServer(int port) { super(new InetSocketAddress(port)); + setReuseAddr(true); dataFormatDefinition = new JsonDataFormatDefinition(); } @@ -42,27 +47,27 @@ public void onOpen(WebSocket conn, ClientHandshake handshake) { conn.send("Welcome!"); //This method sends a message to the new client broadcast( "New connection: " + handshake.getResourceDescriptor()); //This method sends a message to all clients connected - System.out.println(conn.getRemoteSocketAddress().getAddress().getHostAddress() + " connected."); + LOG.info("{} connected.", conn.getRemoteSocketAddress().getAddress().getHostAddress()); } @Override public void onClose(WebSocket conn, int code, String reason, boolean remote) { - // do nothing special + LOG.info("{} closed.", conn.getRemoteSocketAddress().getAddress().getHostAddress()); } @Override public void onMessage(WebSocket conn, String message) { - System.out.println(conn + ": " + message); + LOG.debug(" {}: {}", conn, message); } @Override public void onError(WebSocket conn, Exception ex) { - ex.printStackTrace(); + LOG.error("Error in websocket connection", ex); } @Override public void onStart() { - System.out.println("Server started!"); + LOG.info("Server started at port {}", getPort()); setConnectionLostTimeout(0); setConnectionLostTimeout(100); } diff --git a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/websocket/WebsocketServerSink.java b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/websocket/WebsocketServerSink.java index d4b4aaae3f..2cbba45c9c 100644 --- a/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/websocket/WebsocketServerSink.java +++ b/streampipes-extensions/streampipes-sinks-brokers-jvm/src/main/java/org/apache/streampipes/sinks/brokers/jvm/websocket/WebsocketServerSink.java @@ -19,46 +19,46 @@ package org.apache.streampipes.sinks.brokers.jvm.websocket; import org.apache.streampipes.commons.exceptions.SpRuntimeException; +import org.apache.streampipes.extensions.api.pe.IStreamPipesDataSink; +import org.apache.streampipes.extensions.api.pe.config.IDataSinkConfiguration; import org.apache.streampipes.extensions.api.pe.context.EventSinkRuntimeContext; +import org.apache.streampipes.extensions.api.pe.param.IDataSinkParameters; import org.apache.streampipes.model.DataSinkType; -import org.apache.streampipes.model.graph.DataSinkDescription; +import org.apache.streampipes.model.extensions.ExtensionAssetType; import org.apache.streampipes.model.runtime.Event; import org.apache.streampipes.sdk.builder.DataSinkBuilder; import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder; +import org.apache.streampipes.sdk.builder.sink.DataSinkConfiguration; import org.apache.streampipes.sdk.helpers.EpRequirements; import org.apache.streampipes.sdk.helpers.Labels; import org.apache.streampipes.sdk.helpers.Locales; -import org.apache.streampipes.sdk.utils.Assets; -import org.apache.streampipes.wrapper.params.compat.SinkParams; -import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink; -import java.io.IOException; - - -public class WebsocketServerSink extends StreamPipesDataSink { +public class WebsocketServerSink implements IStreamPipesDataSink { private static final String PORT_KEY = "port"; - private Integer port; private SocketServer server; @Override - public DataSinkDescription declareModel() { - return DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.websocket", 0) + public IDataSinkConfiguration declareConfig() { + return DataSinkConfiguration.create( + WebsocketServerSink::new, + DataSinkBuilder.create("org.apache.streampipes.sinks.brokers.jvm.websocket", 0) .category(DataSinkType.MESSAGING) .withLocales(Locales.EN) - .withAssets(Assets.DOCUMENTATION, Assets.ICON) + .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON) .requiredStream(StreamRequirementsBuilder .create() .requiredProperty(EpRequirements.anyProperty()) .build()) .requiredIntegerParameter(Labels.withId(PORT_KEY)) - .build(); + .build()); } @Override - public void onInvocation(SinkParams parameters, EventSinkRuntimeContext runtimeContext) throws SpRuntimeException { - port = parameters.extractor().singleValueParameter(PORT_KEY, Integer.class); + public void onPipelineStarted(IDataSinkParameters params, + EventSinkRuntimeContext runtimeContext) { + var port = params.extractor().singleValueParameter(PORT_KEY, Integer.class); server = new SocketServer(port); server.setReuseAddr(true); server.start(); @@ -70,11 +70,10 @@ public void onEvent(Event event) throws SpRuntimeException { } @Override - public void onDetach() throws SpRuntimeException { + public void onPipelineStopped() { try { - server.stop(); - server = null; - } catch (IOException | InterruptedException e) { + server.stop(0); + } catch (InterruptedException e) { throw new SpRuntimeException(e.getMessage()); } } diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java index c2b70c5401..356f57b8c7 100644 --- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java +++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java @@ -96,6 +96,7 @@ protected void beforeStart() { @Override protected void afterStop() { disconnectInputCollectors(); + pipelineElement.onPipelineStopped(); outputCollector.disconnect(); } diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventSinkRuntime.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventSinkRuntime.java index 7c7f407c14..8dc453924b 100644 --- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventSinkRuntime.java +++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventSinkRuntime.java @@ -81,6 +81,7 @@ protected void beforeStart() { @Override protected void afterStop() { inputCollectors.forEach(is -> is.unregisterConsumer(instanceId)); + pipelineElement.onPipelineStopped(); postDiscard(); } }