Skip to content

Commit

Permalink
dbeaver/pro#3772 Move WS client to common module (#66)
Browse files Browse the repository at this point in the history
* dbeaver/pro#3772 Move WS client to common module

* dbeaver/pro#3885 Fix

---------

Co-authored-by: Ivan Gagarkin <[email protected]>
  • Loading branch information
ivgag and Ivan Gagarkin authored Jan 14, 2025
1 parent bc3c2e6 commit 260f32c
Show file tree
Hide file tree
Showing 7 changed files with 363 additions and 2 deletions.
7 changes: 5 additions & 2 deletions modules/com.dbeaver.rpc/META-INF/MANIFEST.MF
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ Bundle-Version: 2.3.1.qualifier
Bundle-Release-Date: 20240205
Bundle-RequiredExecutionEnvironment: JavaSE-11
Require-Bundle: org.jkiss.utils,
com.google.gson
Export-Package: com.dbeaver.rpc
org.jkiss.bundle.jakarta.jetty.websocket,
com.google.gson,
slf4j.api
Export-Package: com.dbeaver.rpc,
com.dbeaver.rpc.ws
Automatic-Module-Name: com.dbeaver.rpc
8 changes: 8 additions & 0 deletions modules/com.dbeaver.rpc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@
<description>RPC over HTTP.</description>
<url>https://github.com/dbeaver/dbeaver-common</url>

<repositories>
<repository>
<id>local-contrib</id>
<url>${local-p2-repo.url}</url>
<layout>p2</layout>
</repository>
</repositories>

<dependencies>
<dependency>
<groupId>com.google.code.gson</groupId>
Expand Down
133 changes: 133 additions & 0 deletions modules/com.dbeaver.rpc/src/com/dbeaver/rpc/ws/WsClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* DBeaver - Universal Database Manager
* Copyright (C) 2010-2025 DBeaver Corp
*
* All Rights Reserved.
*
* NOTICE: All information contained herein is, and remains
* the property of DBeaver Corp and its suppliers, if any.
* The intellectual and technical concepts contained
* herein are proprietary to DBeaver Corp and its suppliers
* and may be covered by U.S. and Foreign Patents,
* patents in process, and are protected by trade secret or copyright law.
* Dissemination of this information or reproduction of this material
* is strictly forbidden unless prior written permission is obtained
* from DBeaver Corp.
*/
package com.dbeaver.rpc.ws;

import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import jakarta.websocket.MessageHandler;
import jakarta.websocket.Session;
import org.jkiss.utils.rest.RpcConstants;
import org.jkiss.utils.rest.RpcException;

import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

/**
* A WebSocket client for sending and receiving messages with correlation IDs.
*/
public final class WsClient implements MessageHandler.Whole<String> {
private static final Gson GSON = RpcConstants.COMPACT_GSON;

private final Session session;
private final Map<UUID, CompletableFuture<String>> pendingMessages = new ConcurrentHashMap<>();

/**
* Creates a new client using an existing WebSocket Session.
*/
public WsClient(Session session) {
this.session = session;
// Register this instance as the message handler.
this.session.addMessageHandler(this);
}

/**
* Sends a message synchronously and blocks until a response is received.
*
* @param payload The data to send.
* @return The response message.
* @throws IOException if the message fails to send.
*/
public String sendMessage(String payload) throws IOException {
return sendMessageAsync(payload).join();
}

/**
* Sends a message asynchronously.
*
* @param payload The data to send.
* @return A CompletableFuture that will complete when a response arrives.
* @throws IOException if the message fails to send.
*/
public CompletableFuture<String> sendMessageAsync(String payload) throws IOException {
UUID messageId = UUID.randomUUID();

// Send the text message over the WebSocket.
WsRequest methodInvocation = new WsRequest(messageId, payload);
String message = GSON.toJson(methodInvocation);
session.getBasicRemote().sendText(message);

// Store a future so we can complete it when the response arrives.
CompletableFuture<String> future = new CompletableFuture<>();
pendingMessages.put(messageId, future);

return future;
}

@Override
public void onMessage(String rawMessage) {
WsResponse result = GSON.fromJson(rawMessage, WsResponse.class);

CompletableFuture<String> future = pendingMessages.remove(result.messageId());
if (future == null) {
// No future found for this message ID.
return;
}

if (result.error() != null) {
future.completeExceptionally(parseError(result.error()));
} else {
future.complete(result.result());
}
}

/**
* Closes the WebSocket session.
*/
public void close() {
try {
session.close();
} catch (IOException e) {
// Handle close error as needed.
}
}

private static Exception parseError(String contents) {
try {
Map<?, ?> map = GSON.fromJson(contents, Map.class);
Map<String, Object> error = (Map<String, Object>) map.get("error");
if (error != null) {
Object errorClass = error.get("exceptionClass");
Object message = error.get("message");
Object stacktrace = error.get("stacktrace");
if (message != null) {
RpcException rpcException = new RpcException(
message.toString(),
errorClass == null ? null : errorClass.toString());
//rpcException.setStackTrace();
return rpcException;
}
}
} catch (JsonSyntaxException ignored) {
return new IOException(contents);
}

return new IOException(contents);
}
}
113 changes: 113 additions & 0 deletions modules/com.dbeaver.rpc/src/com/dbeaver/rpc/ws/WsClientBuilder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* DBeaver - Universal Database Manager
* Copyright (C) 2010-2025 DBeaver Corp
*
* All Rights Reserved.
*
* NOTICE: All information contained herein is, and remains
* the property of DBeaver Corp and its suppliers, if any.
* The intellectual and technical concepts contained
* herein are proprietary to DBeaver Corp and its suppliers
* and may be covered by U.S. and Foreign Patents,
* patents in process, and are protected by trade secret or copyright law.
* Dissemination of this information or reproduction of this material
* is strictly forbidden unless prior written permission is obtained
* from DBeaver Corp.
*/
package com.dbeaver.rpc.ws;

import jakarta.websocket.*;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.ee10.websocket.jakarta.client.JakartaWebSocketClientContainer;
import org.eclipse.jetty.util.component.LifeCycle;

import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* Builder for configuring and creating WebSocketClient instances.
*/
public class WsClientBuilder {
private static final Logger logger = Logger.getLogger(WsClient.class.getName());

private String url;
private Map<String, String> headers;
private Duration timeout;

public WsClientBuilder url(String url) {
this.url = url;
return this;
}

public WsClientBuilder headers(Map<String, String> headers) {
this.headers = headers;
return this;
}

public WsClientBuilder timeout(Duration timeout) {
this.timeout = timeout;
return this;
}

/**
* Builds and connects a WebSocketClient using the given configuration.
*
* @return A connected WebSocketClient instance.
* @throws DeploymentException if the WebSocket deployment fails.
* @throws IOException if a connection error occurs.
*/
public WsClient connect() throws DeploymentException, IOException {
ClientEndpointConfig.Configurator configurator = new ClientEndpointConfig.Configurator() {
@Override
public void beforeRequest(Map<String, List<String>> headers) {
if (WsClientBuilder.this.headers != null) {
WsClientBuilder.this.headers.forEach((key, value) -> headers.put(key, List.of(value)));
}
}

@Override
public void afterResponse(HandshakeResponse hr) {
List<String> error = hr.getHeaders().get(WsConstants.HANDSHAKE_ERROR_HEADER);
if (error != null && !error.isEmpty()) {
throw new RuntimeException("Handshake error: " + error.get(0));
}
}
};

ClientEndpointConfig config = ClientEndpointConfig
.Builder
.create()
.configurator(configurator)
.build();

Endpoint endpoint = new Endpoint() {
@Override
public void onOpen(Session session, EndpointConfig endpointConfig) {
session.setMaxIdleTimeout(timeout.toMillis());
session.setMaxTextMessageBufferSize(Integer.MAX_VALUE);
}

@Override
public void onError(Session session, Throwable thr) {
logger.log(Level.SEVERE, "WebSocket error", thr);
}

@Override
public void onClose(Session session, CloseReason closeReason) {
logger.log(Level.INFO, "WebSocket closed: " + closeReason);
}
};

JakartaWebSocketClientContainer clientContainer = new JakartaWebSocketClientContainer((HttpClient) null);
LifeCycle.start(clientContainer);

Session session = clientContainer.connectToServer(endpoint, config, URI.create(url));

return new WsClient(session);
}
}
24 changes: 24 additions & 0 deletions modules/com.dbeaver.rpc/src/com/dbeaver/rpc/ws/WsConstants.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* DBeaver - Universal Database Manager
* Copyright (C) 2010-2025 DBeaver Corp
*
* All Rights Reserved.
*
* NOTICE: All information contained herein is, and remains
* the property of DBeaver Corp and its suppliers, if any.
* The intellectual and technical concepts contained
* herein are proprietary to DBeaver Corp and its suppliers
* and may be covered by U.S. and Foreign Patents,
* patents in process, and are protected by trade secret or copyright law.
* Dissemination of this information or reproduction of this material
* is strictly forbidden unless prior written permission is obtained
* from DBeaver Corp.
*/
package com.dbeaver.rpc.ws;

public final class WsConstants {
public static final String HANDSHAKE_ERROR_HEADER = "X-Handshake-Error";

private WsConstants() {
}
}
37 changes: 37 additions & 0 deletions modules/com.dbeaver.rpc/src/com/dbeaver/rpc/ws/WsRequest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* DBeaver - Universal Database Manager
* Copyright (C) 2010-2025 DBeaver Corp
*
* All Rights Reserved.
*
* NOTICE: All information contained herein is, and remains
* the property of DBeaver Corp and its suppliers, if any.
* The intellectual and technical concepts contained
* herein are proprietary to DBeaver Corp and its suppliers
* and may be covered by U.S. and Foreign Patents,
* patents in process, and are protected by trade secret or copyright law.
* Dissemination of this information or reproduction of this material
* is strictly forbidden unless prior written permission is obtained
* from DBeaver Corp.
*/
package com.dbeaver.rpc.ws;

import java.util.UUID;

public class WsRequest {
private final UUID messageId;
private final String payload;

public WsRequest(UUID messageId, String payload) {
this.messageId = messageId;
this.payload = payload;
}

public UUID messageId() {
return messageId;
}

public String payload() {
return payload;
}
}
43 changes: 43 additions & 0 deletions modules/com.dbeaver.rpc/src/com/dbeaver/rpc/ws/WsResponse.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* DBeaver - Universal Database Manager
* Copyright (C) 2010-2025 DBeaver Corp
*
* All Rights Reserved.
*
* NOTICE: All information contained herein is, and remains
* the property of DBeaver Corp and its suppliers, if any.
* The intellectual and technical concepts contained
* herein are proprietary to DBeaver Corp and its suppliers
* and may be covered by U.S. and Foreign Patents,
* patents in process, and are protected by trade secret or copyright law.
* Dissemination of this information or reproduction of this material
* is strictly forbidden unless prior written permission is obtained
* from DBeaver Corp.
*/
package com.dbeaver.rpc.ws;

import java.util.UUID;

public class WsResponse {
private final UUID messageId;
private final String result;
private final String error;

public WsResponse(UUID messageId, String result, String error) {
this.messageId = messageId;
this.result = result;
this.error = error;
}

public UUID messageId() {
return messageId;
}

public String result() {
return result;
}

public String error() {
return error;
}
}

0 comments on commit 260f32c

Please sign in to comment.