Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[gce] Ensure ressources are freed #17949

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@
import java.io.PrintWriter;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.Optional;
import java.net.UnknownHostException;
import java.util.Random;

import org.eclipse.jdt.annotation.NonNullByDefault;
import org.openhab.binding.gce.internal.model.M2MMessageParser;
import org.openhab.binding.gce.internal.model.PortDefinition;
import org.openhab.binding.gce.internal.model.StatusFile;
import org.openhab.binding.gce.internal.model.StatusFileAccessor;
import org.openhab.core.thing.ThingUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException;

/**
* The {@link Ipx800DeviceConnector} is responsible for connecting,
Expand All @@ -35,156 +40,142 @@
*/
@NonNullByDefault
public class Ipx800DeviceConnector extends Thread {
private static final int DEFAULT_SOCKET_TIMEOUT_MS = 5000;
private static final int DEFAULT_RECONNECT_TIMEOUT_MS = 5000;
private static final int DEFAULT_SOCKET_TIMEOUT_MS = 10000;
private static final int MAX_KEEPALIVE_FAILURE = 3;
private static final String ENDL = "\r\n";

private final Logger logger = LoggerFactory.getLogger(Ipx800DeviceConnector.class);
private final Random randomizer = new Random();

private final String hostname;
private final int portNumber;

private Optional<M2MMessageParser> messageParser = Optional.empty();
private Optional<Socket> socket = Optional.empty();
private Optional<BufferedReader> input = Optional.empty();
private Optional<PrintWriter> output = Optional.empty();
private final M2MMessageParser parser;
private final StatusFileAccessor statusAccessor;
private final Ipx800EventListener listener;
private final Socket socket;
private final BufferedReader input;
private final PrintWriter output;

private int failedKeepalive = 0;
private boolean waitingKeepaliveResponse = false;
private boolean interrupted = false;

public Ipx800DeviceConnector(String hostname, int portNumber, ThingUID uid) {
public Ipx800DeviceConnector(String hostname, int portNumber, ThingUID uid, Ipx800EventListener listener)
throws UnknownHostException, IOException {
super("OH-binding-" + uid);
this.hostname = hostname;
this.portNumber = portNumber;
this.listener = listener;

logger.debug("Connecting to {}:{}...", hostname, portNumber);
Socket socket = new Socket(hostname, portNumber);
socket.setSoTimeout(DEFAULT_SOCKET_TIMEOUT_MS);
this.socket = socket;

output = new PrintWriter(socket.getOutputStream(), true);
input = new BufferedReader(new InputStreamReader(socket.getInputStream()));
parser = new M2MMessageParser(listener);
statusAccessor = new StatusFileAccessor(hostname);
setDaemon(true);
}

/**
* Stop the device thread
*/
public void dispose() {
interrupted = true;
}

public synchronized void send(String message) {
output.ifPresentOrElse(out -> {
logger.debug("Sending '{}' to Ipx800", message);
out.write(message + ENDL);
out.flush();
}, () -> logger.warn("Trying to send '{}' while the output stream is closed.", message));
logger.debug("Sending '{}' to Ipx800", message);
output.println(message);
}

/**
* Connect to the ipx800
*
* @throws IOException
* Send a random keepalive command which cause the IPX to send an update.
* If we don't receive the update maxKeepAliveFailure time, the connection is closed
*/
private void connect() throws IOException {
disconnect();
private void sendKeepalive() {
PortDefinition pd = PortDefinition.values()[randomizer.nextInt(PortDefinition.AS_SET.size())];
String command = "%s%d".formatted(pd.m2mCommand, randomizer.nextInt(pd.quantity) + 1);

logger.debug("Connecting to {}:{}...", hostname, portNumber);
Socket socket = new Socket(hostname, portNumber);
socket.setSoTimeout(DEFAULT_SOCKET_TIMEOUT_MS);
socket.getInputStream().skip(socket.getInputStream().available());
this.socket = Optional.of(socket);
if (waitingKeepaliveResponse) {
failedKeepalive++;
logger.debug("Sending keepalive {}, attempt {}", command, failedKeepalive);
} else {
failedKeepalive = 0;
logger.debug("Sending keepalive {}", command);
}

output.println(command);
parser.setExpectedResponse(command);

input = Optional.of(new BufferedReader(new InputStreamReader(socket.getInputStream())));
output = Optional.of(new PrintWriter(socket.getOutputStream(), true));
waitingKeepaliveResponse = true;
}

/**
* Disconnect the device
*/
private void disconnect() {
logger.debug("Disconnecting");
@Override
public void run() {
while (!interrupted) {
if (failedKeepalive > MAX_KEEPALIVE_FAILURE) {
interrupted = true;
listener.errorOccurred(new IOException("Max keep alive attempts has been reached"));
}
try {
String command = input.readLine();
waitingKeepaliveResponse = false;
parser.unsolicitedUpdate(command);
} catch (SocketTimeoutException e) {
sendKeepalive();
} catch (IOException e) {
interrupted = true;
listener.errorOccurred(e);
}
}
if (output instanceof PrintWriter out) {
out.close();
}

input.ifPresent(in -> {
if (input instanceof BufferedReader in) {
try {
in.close();
} catch (IOException ignore) {
} catch (IOException e) {
logger.warn("Exception input stream: {}", e.getMessage());
}
input = Optional.empty();
});

output.ifPresent(PrintWriter::close);
output = Optional.empty();
}

socket.ifPresent(client -> {
if (socket instanceof Socket client) {
try {
logger.debug("Closing socket");
client.close();
} catch (IOException ignore) {
} catch (IOException e) {
logger.warn("Exception closing socket: {}", e.getMessage());
}
socket = Optional.empty();
});
}

logger.debug("Disconnected");
}

/**
* Stop the device thread
*/
public void dispose() {
interrupt();
disconnect();
public StatusFile readStatusFile() throws SAXException, IOException {
return statusAccessor.read();
}

/**
* Send an arbitrary keepalive command which cause the IPX to send an update.
* If we don't receive the update maxKeepAliveFailure time, the connection is closed and reopened
* Set output of the device sending the corresponding command
*
* @param targetPort
* @param targetValue
*/
private void sendKeepalive() {
output.ifPresent(out -> {
if (waitingKeepaliveResponse) {
failedKeepalive++;
logger.debug("Sending keepalive, attempt {}", failedKeepalive);
} else {
failedKeepalive = 0;
logger.debug("Sending keepalive");
}
out.println("GetIn01");
out.flush();
waitingKeepaliveResponse = true;
});
public void setOutput(String targetPort, int targetValue, boolean pulse) {
logger.debug("Sending {} to {}", targetValue, targetPort);
String command = "Set%02d%s%s".formatted(Integer.parseInt(targetPort), targetValue, pulse ? "p" : "");
send(command);
}

@Override
public void run() {
try {
waitingKeepaliveResponse = false;
failedKeepalive = 0;
connect();
while (!interrupted()) {
if (failedKeepalive > MAX_KEEPALIVE_FAILURE) {
throw new IOException("Max keep alive attempts has been reached");
}
input.ifPresent(in -> {
try {
String command = in.readLine();
waitingKeepaliveResponse = false;
messageParser.ifPresent(parser -> parser.unsolicitedUpdate(command));
} catch (IOException e) {
handleException(e);
}
});
}
disconnect();
} catch (IOException e) {
handleException(e);
}
try {
Thread.sleep(DEFAULT_RECONNECT_TIMEOUT_MS);
} catch (InterruptedException e) {
dispose();
}
}

private void handleException(Exception e) {
if (!interrupted()) {
if (e instanceof SocketTimeoutException) {
sendKeepalive();
return;
} else if (e instanceof IOException) {
logger.warn("Communication error: '{}'. Will retry in {} ms", e, DEFAULT_RECONNECT_TIMEOUT_MS);
}
messageParser.ifPresent(parser -> parser.errorOccurred(e));
}
/**
* Resets the counter value to 0
*
* @param targetCounter
*/
public void resetCounter(int targetCounter) {
logger.debug("Resetting counter {} to 0", targetCounter);
send("ResetCount%d".formatted(targetCounter));
}

public void setParser(M2MMessageParser parser) {
this.messageParser = Optional.of(parser);
public void resetPLC() {
send("Reset");
}
}
Loading
Loading