Skip to content

Commit

Permalink
build for tag - bug fix on session exists
Browse files Browse the repository at this point in the history
  • Loading branch information
simon.johnson committed May 20, 2022
1 parent 115c73f commit d2af50a
Show file tree
Hide file tree
Showing 23 changed files with 71 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public void connect(int keepAlive, boolean cleanSession) throws MqttsnException,
try {
IMqttsnMessage message = registry.getMessageFactory().createConnect(
registry.getOptions().getContextId(), keepAlive,
registry.getWillRegistry().hasWillMessage(state.getContext()), cleanSession);
registry.getWillRegistry().hasWillMessage(state.getContext()), cleanSession, registry.getOptions().getMaxProtocolMessageSize());
MqttsnWaitToken token = registry.getMessageStateService().sendMessage(state.getContext(), message);
Optional<IMqttsnMessage> response =
registry.getMessageStateService().waitForCompletion(state.getContext(), token);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import org.slj.mqtt.sn.cli.AbstractInteractiveCli;
import org.slj.mqtt.sn.client.MqttsnClientConnectException;
import org.slj.mqtt.sn.client.impl.MqttsnClient;
import org.slj.mqtt.sn.client.impl.MqttsnClientRuntimeRegistry;
import org.slj.mqtt.sn.codec.MqttsnCodecs;
import org.slj.mqtt.sn.impl.AbstractMqttsnRuntime;
import org.slj.mqtt.sn.impl.AbstractMqttsnRuntimeRegistry;
import org.slj.mqtt.sn.impl.ram.MqttsnInMemoryTopicRegistry;
Expand All @@ -37,7 +35,6 @@
import org.slj.mqtt.sn.net.MqttsnUdpTransport;
import org.slj.mqtt.sn.net.NetworkAddress;
import org.slj.mqtt.sn.spi.IMqttsnTransport;
import org.slj.mqtt.sn.spi.IMqttsnWillRegistry;
import org.slj.mqtt.sn.spi.MqttsnException;
import org.slj.mqtt.sn.utils.TopicPath;

Expand Down Expand Up @@ -504,7 +501,6 @@ protected void test()
protected IMqttsnTransport createTransport() {
MqttsnUdpOptions udpOptions = new MqttsnUdpOptions().withMtu(4096).withReceiveBuffer(4096).
withPort(MqttsnUdpOptions.DEFAULT_LOCAL_CLIENT_PORT);

return new MqttsnUdpTransport(udpOptions);
}

Expand All @@ -514,8 +510,7 @@ protected MqttsnOptions createOptions() throws UnknownHostException {
withNetworkAddressEntry("remote-gateway",
NetworkAddress.from(port, hostName)).
withContextId(clientId).
withMaxProtocolMessageSize(4096).
withSleepClearsRegistrations(false);
withMaxProtocolMessageSize(4096);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ public static void launch(MqttsnInteractiveClient interactiveClient) throws Exce
interactiveClient.command();
interactiveClient.exit();
} catch (Exception e) {
e.printStackTrace();
System.err.println("A fatal error was encountered: " + e.getMessage());
e.printStackTrace(System.err);
} finally {
interactiveClient.stop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public static void main(String[] args) throws Exception {

//-- construct a connect message with your required configuration
IMqttsnMessage connect =
factory.createConnect("testClientId", 60, false, true);
factory.createConnect("testClientId", 60, false, true, 1024);

System.out.println(connect);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ IMqttsnMessage createGwinfo(int gatewayId, String gatewayAddress)
* @param willPrompt: if set, indicates that client is requesting for Will topic and Will message prompting.
* @param cleanSession: same meaning as with MQTT, however extended for Will topic and Will message.
*/
IMqttsnMessage createConnect(String clientId, int keepAlive, boolean willPrompt, boolean cleanSession)
IMqttsnMessage createConnect(String clientId, int keepAlive, boolean willPrompt, boolean cleanSession, int maxPacketSize)
throws MqttsnCodecException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public IMqttsnMessage createGwinfo(int gatewayId, String gatewayAddress) throws
}

@Override
public IMqttsnMessage createConnect(String clientId, int keepAlive, boolean willPrompt, boolean cleanSession) throws MqttsnCodecException {
public IMqttsnMessage createConnect(String clientId, int keepAlive, boolean willPrompt, boolean cleanSession, int maxPacketSize) throws MqttsnCodecException {

MqttsnConnect msg = new MqttsnConnect();
msg.setClientId(clientId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ public static IMqttsnMessageFactory getInstance() {
}

@Override
public IMqttsnMessage createConnect(String clientId, int keepAlive, boolean willPrompt, boolean cleanSession) throws MqttsnCodecException {
public IMqttsnMessage createConnect(String clientId, int keepAlive, boolean willPrompt, boolean cleanSession, int maxPacketSize) throws MqttsnCodecException {

//TODO add session expiry interval
MqttsnConnect_V2_0 msg = new MqttsnConnect_V2_0();
msg.setClientId(clientId);
msg.setKeepAlive(keepAlive);
msg.setSessionExpiryInterval(keepAlive);
msg.setCleanStart(cleanSession);
msg.setWill(willPrompt);
msg.setMaxPacketSize(maxPacketSize);
msg.validate();
return msg;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void testMqttsnConnack() throws MqttsnCodecException {
public void testMqttsnConnect() throws MqttsnCodecException {

//-- test normal length clientId
IMqttsnMessage message = factory.createConnect(_clientid, MqttsnConstants.UNSIGNED_MAX_16, false, true);
IMqttsnMessage message = factory.createConnect(_clientid, MqttsnConstants.UNSIGNED_MAX_16, false, true, 1024);
testWireMessage(message);
}

Expand All @@ -85,7 +85,7 @@ public void testMqttsnConnectLongClientId() throws MqttsnCodecException {
sb.append("A");
}

IMqttsnMessage message = factory.createConnect(sb.toString(), MqttsnConstants.UNSIGNED_MAX_16, false, true);
IMqttsnMessage message = factory.createConnect(sb.toString(), MqttsnConstants.UNSIGNED_MAX_16, false, true, 1024);
testWireMessage(message);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void setup(){

@Test
public void testMqttsnConnect() throws MqttsnCodecException {
IMqttsnMessage message = factory.createConnect("THIS-IS-CLIENT-ID",98, false, false);
IMqttsnMessage message = factory.createConnect("THIS-IS-CLIENT-ID",98, false, false, 500);
testWireMessage(message);


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,14 @@

package org.slj.mqtt.sn.cli;

import org.slj.mqtt.sn.codec.MqttsnCodecs;
import org.slj.mqtt.sn.impl.AbstractMqttsnRuntime;
import org.slj.mqtt.sn.impl.AbstractMqttsnRuntimeRegistry;
import org.slj.mqtt.sn.model.IMqttsnContext;
import org.slj.mqtt.sn.model.INetworkContext;
import org.slj.mqtt.sn.model.MqttsnOptions;
import org.slj.mqtt.sn.model.MqttsnSecurityOptions;
import org.slj.mqtt.sn.spi.IMqttsnMessage;
import org.slj.mqtt.sn.spi.IMqttsnTrafficListener;
import org.slj.mqtt.sn.spi.IMqttsnTransport;
import org.slj.mqtt.sn.spi.MqttsnException;
import org.slj.mqtt.sn.spi.*;
import org.slj.mqtt.sn.utils.MqttsnUtils;
import org.slj.mqtt.sn.utils.ThreadDump;
import org.slj.mqtt.sn.utils.TopicPath;
Expand All @@ -54,6 +52,8 @@ public abstract class AbstractInteractiveCli {
static final String HOSTNAME = "hostName";
static final String CLIENTID = "clientId";
static final String PORT = "port";
static final String PROTOCOL_VERSION = "protocolVersion";


protected final AtomicInteger sentByteCount = new AtomicInteger(0);
protected final AtomicInteger receiveByteCount = new AtomicInteger(0);
Expand All @@ -67,6 +67,7 @@ public abstract class AbstractInteractiveCli {
protected String hostName;
protected int port;
protected String clientId;
protected int protocolVersion = 1;

protected MqttsnOptions options;
protected AbstractMqttsnRuntimeRegistry runtimeRegistry;
Expand All @@ -86,7 +87,7 @@ public void init(Scanner input, PrintStream output){

public void start() throws Exception {
if(input == null || output == null) throw new IllegalStateException("no init");
message(String.format("Starting up interactive CLI with port=%s, clientId=%s", port, clientId));
message(String.format("Starting up interactive CLI with clientId=%s, protocolVersion=%s", clientId, protocolVersion));
if(useHistory){
saveConfig();
}
Expand All @@ -95,6 +96,9 @@ public void start() throws Exception {
options = createOptions();
message(String.format("Creating runtime registry.. DONE"));
runtimeRegistry = createRuntimeRegistry(options, createTransport());
IMqttsnCodec codec = createCodec();
runtimeRegistry.withCodec(codec);
message(String.format("Creating runtime with codec version.. %s", codec.getProtocolVersion()));
if(options.getSecurityOptions() != null){
message(String.format("Creating security configuration.. DONE"));
message(String.format("Integrity type: %s", options.getSecurityOptions().getIntegrityType()));
Expand Down Expand Up @@ -202,9 +206,13 @@ protected void configure() throws IOException {

if(needsClientId()){
do{
clientId = captureString(input, output, "Please enter a valid clientId");
clientId = captureString(input, output, "Please enter a valid client or gateway Id");
} while(!validClientId(MAX_ALLOWED_CLIENTID_LENGTH));
}

do{
protocolVersion = captureMandatoryInt(input, output, "Please enter a protocol version (1 for 1.2 or 2 for 2.0)", new int[] {1, 2});
} while(!validProtocolVersion());
}

public void configureWithHistory() throws IOException {
Expand All @@ -230,21 +238,29 @@ protected boolean configOk(){
if(needsPort()){
if(port == 0) return false;
}
return true;
return validProtocolVersion();
}

protected void loadConfigHistory(Properties props) throws IOException {
hostName = props.getProperty(HOSTNAME);
port = Integer.valueOf(props.getProperty(PORT));
try {
port = Integer.valueOf(props.getProperty(PORT));
} catch(Exception e){
}
try {
protocolVersion = Integer.valueOf(props.getProperty(PROTOCOL_VERSION));
} catch(Exception e){
}
clientId = props.getProperty(CLIENTID);
}

protected void saveConfigHistory(Properties props) {
props.setProperty(HOSTNAME, hostName);
props.setProperty(PORT, String.valueOf(port));
if(clientId != null){
if(needsHostname() && hostName != null) props.setProperty(HOSTNAME, hostName);
if(needsPort()) props.setProperty(PORT, String.valueOf(port));
if(needsClientId() && clientId != null){
props.setProperty(CLIENTID, clientId);
}
props.setProperty(PROTOCOL_VERSION, String.valueOf(protocolVersion));
}

protected boolean loadConfig() throws IOException {
Expand Down Expand Up @@ -368,9 +384,13 @@ protected boolean validHost(){
return !p.matcher(hostName).find();
}

protected boolean validProtocolVersion(){
if(protocolVersion == 1 || protocolVersion == 2) return true;
return false;
}

protected boolean validClientId(int maxLength){
if(clientId == null) return true;
// if(clientId.trim().length() == 0) return false;
if(clientId.trim().length() > maxLength) return false;
Pattern p = Pattern.compile("[a-zA-Z0-9\\-]{1,65528}");
return p.matcher(clientId).find();
Expand Down Expand Up @@ -546,6 +566,11 @@ protected boolean needsPort(){

protected abstract AbstractMqttsnRuntime createRuntime(AbstractMqttsnRuntimeRegistry registry, MqttsnOptions options);

protected IMqttsnCodec createCodec(){
return protocolVersion == 1 ? MqttsnCodecs.MQTTSN_CODEC_VERSION_1_2 :
MqttsnCodecs.MQTTSN_CODEC_VERSION_2_0;
}

protected AbstractMqttsnRuntimeRegistry getRuntimeRegistry(){
return runtimeRegistry;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,7 @@
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="module" module-name="mqtt-sn-core" />
<orderEntry type="module" module-name="mqtt-sn-codec" />
<orderEntry type="module" module-name="mqtt-sn-gateway" />
<orderEntry type="library" name="Maven: com.google.guava:guava:30.0-jre" level="project" />
<orderEntry type="library" name="Maven: com.google.guava:failureaccess:1.0.1" level="project" />
<orderEntry type="library" name="Maven: com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava" level="project" />
<orderEntry type="library" name="Maven: com.google.code.findbugs:jsr305:3.0.2" level="project" />
<orderEntry type="library" name="Maven: org.checkerframework:checker-qual:3.5.0" level="project" />
<orderEntry type="library" name="Maven: com.google.errorprone:error_prone_annotations:2.3.4" level="project" />
<orderEntry type="library" name="Maven: com.google.j2objc:j2objc-annotations:1.3" level="project" />
<orderEntry type="library" name="Maven: org.slj:mqtt-sn-gateway:1.0.0" level="project" />
<orderEntry type="library" name="Maven: com.amazonaws:aws-iot-device-sdk-java:1.3.9" level="project" />
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-core:2.12.0" level="project" />
<orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-databind:2.12.0" level="project" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,7 @@
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="module" module-name="mqtt-sn-core" />
<orderEntry type="module" module-name="mqtt-sn-codec" />
<orderEntry type="module" module-name="mqtt-sn-gateway" />
<orderEntry type="library" name="Maven: com.google.guava:guava:30.0-jre" level="project" />
<orderEntry type="library" name="Maven: com.google.guava:failureaccess:1.0.1" level="project" />
<orderEntry type="library" name="Maven: com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava" level="project" />
<orderEntry type="library" name="Maven: com.google.code.findbugs:jsr305:3.0.2" level="project" />
<orderEntry type="library" name="Maven: org.checkerframework:checker-qual:3.5.0" level="project" />
<orderEntry type="library" name="Maven: com.google.errorprone:error_prone_annotations:2.3.4" level="project" />
<orderEntry type="library" name="Maven: com.google.j2objc:j2objc-annotations:1.3" level="project" />
<orderEntry type="library" name="Maven: org.slj:mqtt-sn-gateway:1.0.0" level="project" />
<orderEntry type="module" module-name="mqtt-sn-gateway-connector-paho" />
<orderEntry type="library" name="Maven: org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5" level="project" />
<orderEntry type="library" name="Maven: io.jsonwebtoken:jjwt:0.9.1" level="project" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

package org.slj.mqtt.sn.gateway.connector.google.iotcore;

import org.slj.mqtt.sn.codec.MqttsnCodecs;
import org.slj.mqtt.sn.gateway.cli.MqttsnInteractiveGatewayLauncher;
import org.slj.mqtt.sn.gateway.cli.MqttsnInteractiveGatewayWithKeystore;
import org.slj.mqtt.sn.gateway.impl.MqttsnGatewayRuntimeRegistry;
Expand Down Expand Up @@ -53,8 +52,7 @@ protected AbstractMqttsnRuntimeRegistry createRuntimeRegistry(MqttsnOptions opti
return MqttsnGatewayRuntimeRegistry.defaultConfiguration(options).
withBrokerConnectionFactory(new GoogleIoTCoreMqttsnBrokerConnectionFactory()).
withBrokerService(new MqttsnAggregatingGateway(brokerOptions)).
withTransport(createTransport()).
withCodec(MqttsnCodecs.MQTTSN_CODEC_VERSION_1_2);
withTransport(createTransport());
}
}, true, "Welcome to the Google IoT Core version of the gateway. You will need to connect your gateway to your Google IoT via the credentials available in your Google console.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,7 @@
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="module" module-name="mqtt-sn-core" />
<orderEntry type="module" module-name="mqtt-sn-codec" />
<orderEntry type="module" module-name="mqtt-sn-gateway" />
<orderEntry type="library" name="Maven: com.google.guava:guava:30.0-jre" level="project" />
<orderEntry type="library" name="Maven: com.google.guava:failureaccess:1.0.1" level="project" />
<orderEntry type="library" name="Maven: com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava" level="project" />
<orderEntry type="library" name="Maven: com.google.code.findbugs:jsr305:3.0.2" level="project" />
<orderEntry type="library" name="Maven: org.checkerframework:checker-qual:3.5.0" level="project" />
<orderEntry type="library" name="Maven: com.google.errorprone:error_prone_annotations:2.3.4" level="project" />
<orderEntry type="library" name="Maven: com.google.j2objc:j2objc-annotations:1.3" level="project" />
<orderEntry type="library" name="Maven: org.slj:mqtt-sn-gateway:1.0.0" level="project" />
<orderEntry type="library" name="Maven: org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: junit:junit:4.13.1" level="project" />
<orderEntry type="library" scope="TEST" name="Maven: org.hamcrest:hamcrest-core:1.3" level="project" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,13 @@

package org.slj.mqtt.sn.gateway.connector.paho;

import org.slj.mqtt.sn.codec.MqttsnCodecs;
import org.slj.mqtt.sn.gateway.cli.MqttsnInteractiveGateway;
import org.slj.mqtt.sn.gateway.cli.MqttsnInteractiveGatewayLauncher;
import org.slj.mqtt.sn.gateway.impl.MqttsnGatewayRuntimeRegistry;
import org.slj.mqtt.sn.gateway.impl.gateway.type.MqttsnAggregatingGateway;
import org.slj.mqtt.sn.gateway.spi.broker.MqttsnBackendOptions;
import org.slj.mqtt.sn.impl.AbstractMqttsnRuntimeRegistry;
import org.slj.mqtt.sn.model.MqttsnOptions;
import org.slj.mqtt.sn.model.MqttsnSecurityOptions;
import org.slj.mqtt.sn.spi.IMqttsnTransport;

public class AggregatingGatewayInteractiveMain {
Expand All @@ -49,8 +47,7 @@ protected AbstractMqttsnRuntimeRegistry createRuntimeRegistry(MqttsnOptions opti
return MqttsnGatewayRuntimeRegistry.defaultConfiguration(options).
withBrokerConnectionFactory(new PahoMqttsnBrokerConnectionFactory()).
withBrokerService(new MqttsnAggregatingGateway(brokerOptions)).
withTransport(createTransport()).
withCodec(MqttsnCodecs.MQTTSN_CODEC_VERSION_1_2);
withTransport(createTransport());
}
}, true, "Welcome to the custom-broker version of the gateway. You will need to connect your gateway to your MQTT broker using a client connection host, port, username, password & clientId.");
}
Expand Down
2 changes: 1 addition & 1 deletion mqtt-sn-gateway/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
</dependencies>

<build>
<finalName>mqtt-sn-gateway-v12-loopback-${project.version}</finalName>
<finalName>mqtt-sn-gateway-loopback-${project.version}</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public static void launch(MqttsnInteractiveGateway interactiveGateway, boolean n
interactiveGateway.exit();
} catch (Exception e) {
System.err.println("A fatal error was encountered: " + e.getMessage());
e.printStackTrace(System.err);
} finally {
interactiveGateway.stop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ public void stopServices(IMqttsnRuntimeRegistry runtime) throws MqttsnException
callShutdown(runtime.getMessageStateService());

if(((MqttsnGatewayOptions)runtime.getOptions()).isRealtimeMessageCounters()){
sendCounter.stop();
receiveCounter.stop();
if(sendCounter != null) sendCounter.stop();
if(receiveCounter != null) receiveCounter.stop();
}
}

Expand Down
Loading

0 comments on commit d2af50a

Please sign in to comment.