diff --git a/mqtt-sn-client/src/main/java/org/slj/mqtt/sn/client/impl/MqttsnClient.java b/mqtt-sn-client/src/main/java/org/slj/mqtt/sn/client/impl/MqttsnClient.java index 865c736b..a24ee787 100644 --- a/mqtt-sn-client/src/main/java/org/slj/mqtt/sn/client/impl/MqttsnClient.java +++ b/mqtt-sn-client/src/main/java/org/slj/mqtt/sn/client/impl/MqttsnClient.java @@ -201,7 +201,7 @@ public void connect(int keepAlive, boolean cleanSession) throws MqttsnException, try { IMqttsnMessage message = registry.getMessageFactory().createConnect( registry.getOptions().getContextId(), keepAlive, - registry.getWillRegistry().hasWillMessage(state.getContext()), cleanSession); + registry.getWillRegistry().hasWillMessage(state.getContext()), cleanSession, registry.getOptions().getMaxProtocolMessageSize()); MqttsnWaitToken token = registry.getMessageStateService().sendMessage(state.getContext(), message); Optional response = registry.getMessageStateService().waitForCompletion(state.getContext(), token); diff --git a/mqtt-sn-client/src/main/java/org/slj/mqtt/sn/client/impl/cli/MqttsnInteractiveClient.java b/mqtt-sn-client/src/main/java/org/slj/mqtt/sn/client/impl/cli/MqttsnInteractiveClient.java index a2e2434d..863334dd 100644 --- a/mqtt-sn-client/src/main/java/org/slj/mqtt/sn/client/impl/cli/MqttsnInteractiveClient.java +++ b/mqtt-sn-client/src/main/java/org/slj/mqtt/sn/client/impl/cli/MqttsnInteractiveClient.java @@ -27,8 +27,6 @@ import org.slj.mqtt.sn.cli.AbstractInteractiveCli; import org.slj.mqtt.sn.client.MqttsnClientConnectException; import org.slj.mqtt.sn.client.impl.MqttsnClient; -import org.slj.mqtt.sn.client.impl.MqttsnClientRuntimeRegistry; -import org.slj.mqtt.sn.codec.MqttsnCodecs; import org.slj.mqtt.sn.impl.AbstractMqttsnRuntime; import org.slj.mqtt.sn.impl.AbstractMqttsnRuntimeRegistry; import org.slj.mqtt.sn.impl.ram.MqttsnInMemoryTopicRegistry; @@ -37,7 +35,6 @@ import org.slj.mqtt.sn.net.MqttsnUdpTransport; import org.slj.mqtt.sn.net.NetworkAddress; import org.slj.mqtt.sn.spi.IMqttsnTransport; -import org.slj.mqtt.sn.spi.IMqttsnWillRegistry; import org.slj.mqtt.sn.spi.MqttsnException; import org.slj.mqtt.sn.utils.TopicPath; @@ -504,7 +501,6 @@ protected void test() protected IMqttsnTransport createTransport() { MqttsnUdpOptions udpOptions = new MqttsnUdpOptions().withMtu(4096).withReceiveBuffer(4096). withPort(MqttsnUdpOptions.DEFAULT_LOCAL_CLIENT_PORT); - return new MqttsnUdpTransport(udpOptions); } @@ -514,8 +510,7 @@ protected MqttsnOptions createOptions() throws UnknownHostException { withNetworkAddressEntry("remote-gateway", NetworkAddress.from(port, hostName)). withContextId(clientId). - withMaxProtocolMessageSize(4096). - withSleepClearsRegistrations(false); + withMaxProtocolMessageSize(4096); } @Override diff --git a/mqtt-sn-client/src/main/java/org/slj/mqtt/sn/client/impl/cli/MqttsnInteractiveClientLauncher.java b/mqtt-sn-client/src/main/java/org/slj/mqtt/sn/client/impl/cli/MqttsnInteractiveClientLauncher.java index efd1c369..601e10db 100644 --- a/mqtt-sn-client/src/main/java/org/slj/mqtt/sn/client/impl/cli/MqttsnInteractiveClientLauncher.java +++ b/mqtt-sn-client/src/main/java/org/slj/mqtt/sn/client/impl/cli/MqttsnInteractiveClientLauncher.java @@ -41,8 +41,8 @@ public static void launch(MqttsnInteractiveClient interactiveClient) throws Exce interactiveClient.command(); interactiveClient.exit(); } catch (Exception e) { - e.printStackTrace(); System.err.println("A fatal error was encountered: " + e.getMessage()); + e.printStackTrace(System.err); } finally { interactiveClient.stop(); } diff --git a/mqtt-sn-codec/src/main/java/org/slj/mqtt/sn/ExampleUsage.java b/mqtt-sn-codec/src/main/java/org/slj/mqtt/sn/ExampleUsage.java index bde962c7..cf4bf1e8 100644 --- a/mqtt-sn-codec/src/main/java/org/slj/mqtt/sn/ExampleUsage.java +++ b/mqtt-sn-codec/src/main/java/org/slj/mqtt/sn/ExampleUsage.java @@ -44,7 +44,7 @@ public static void main(String[] args) throws Exception { //-- construct a connect message with your required configuration IMqttsnMessage connect = - factory.createConnect("testClientId", 60, false, true); + factory.createConnect("testClientId", 60, false, true, 1024); System.out.println(connect); diff --git a/mqtt-sn-codec/src/main/java/org/slj/mqtt/sn/spi/IMqttsnMessageFactory.java b/mqtt-sn-codec/src/main/java/org/slj/mqtt/sn/spi/IMqttsnMessageFactory.java index bac25e03..837a7c24 100644 --- a/mqtt-sn-codec/src/main/java/org/slj/mqtt/sn/spi/IMqttsnMessageFactory.java +++ b/mqtt-sn-codec/src/main/java/org/slj/mqtt/sn/spi/IMqttsnMessageFactory.java @@ -81,7 +81,7 @@ IMqttsnMessage createGwinfo(int gatewayId, String gatewayAddress) * @param willPrompt: if set, indicates that client is requesting for Will topic and Will message prompting. * @param cleanSession: same meaning as with MQTT, however extended for Will topic and Will message. */ - IMqttsnMessage createConnect(String clientId, int keepAlive, boolean willPrompt, boolean cleanSession) + IMqttsnMessage createConnect(String clientId, int keepAlive, boolean willPrompt, boolean cleanSession, int maxPacketSize) throws MqttsnCodecException; /** diff --git a/mqtt-sn-codec/src/main/java/org/slj/mqtt/sn/wire/version1_2/Mqttsn_v1_2_MessageFactory.java b/mqtt-sn-codec/src/main/java/org/slj/mqtt/sn/wire/version1_2/Mqttsn_v1_2_MessageFactory.java index b5a63b99..a684b942 100644 --- a/mqtt-sn-codec/src/main/java/org/slj/mqtt/sn/wire/version1_2/Mqttsn_v1_2_MessageFactory.java +++ b/mqtt-sn-codec/src/main/java/org/slj/mqtt/sn/wire/version1_2/Mqttsn_v1_2_MessageFactory.java @@ -80,7 +80,7 @@ public IMqttsnMessage createGwinfo(int gatewayId, String gatewayAddress) throws } @Override - public IMqttsnMessage createConnect(String clientId, int keepAlive, boolean willPrompt, boolean cleanSession) throws MqttsnCodecException { + public IMqttsnMessage createConnect(String clientId, int keepAlive, boolean willPrompt, boolean cleanSession, int maxPacketSize) throws MqttsnCodecException { MqttsnConnect msg = new MqttsnConnect(); msg.setClientId(clientId); diff --git a/mqtt-sn-codec/src/main/java/org/slj/mqtt/sn/wire/version2_0/Mqttsn_v2_0_MessageFactory.java b/mqtt-sn-codec/src/main/java/org/slj/mqtt/sn/wire/version2_0/Mqttsn_v2_0_MessageFactory.java index f933252e..497c030e 100644 --- a/mqtt-sn-codec/src/main/java/org/slj/mqtt/sn/wire/version2_0/Mqttsn_v2_0_MessageFactory.java +++ b/mqtt-sn-codec/src/main/java/org/slj/mqtt/sn/wire/version2_0/Mqttsn_v2_0_MessageFactory.java @@ -53,15 +53,15 @@ public static IMqttsnMessageFactory getInstance() { } @Override - public IMqttsnMessage createConnect(String clientId, int keepAlive, boolean willPrompt, boolean cleanSession) throws MqttsnCodecException { + public IMqttsnMessage createConnect(String clientId, int keepAlive, boolean willPrompt, boolean cleanSession, int maxPacketSize) throws MqttsnCodecException { - //TODO add session expiry interval MqttsnConnect_V2_0 msg = new MqttsnConnect_V2_0(); msg.setClientId(clientId); msg.setKeepAlive(keepAlive); msg.setSessionExpiryInterval(keepAlive); msg.setCleanStart(cleanSession); msg.setWill(willPrompt); + msg.setMaxPacketSize(maxPacketSize); msg.validate(); return msg; } diff --git a/mqtt-sn-codec/src/test/java/org/slj/mqtt/sn/wire/version1_2/payload/Mqttsn1_2WireTests.java b/mqtt-sn-codec/src/test/java/org/slj/mqtt/sn/wire/version1_2/payload/Mqttsn1_2WireTests.java index fec4c358..4abd8396 100644 --- a/mqtt-sn-codec/src/test/java/org/slj/mqtt/sn/wire/version1_2/payload/Mqttsn1_2WireTests.java +++ b/mqtt-sn-codec/src/test/java/org/slj/mqtt/sn/wire/version1_2/payload/Mqttsn1_2WireTests.java @@ -72,7 +72,7 @@ public void testMqttsnConnack() throws MqttsnCodecException { public void testMqttsnConnect() throws MqttsnCodecException { //-- test normal length clientId - IMqttsnMessage message = factory.createConnect(_clientid, MqttsnConstants.UNSIGNED_MAX_16, false, true); + IMqttsnMessage message = factory.createConnect(_clientid, MqttsnConstants.UNSIGNED_MAX_16, false, true, 1024); testWireMessage(message); } @@ -85,7 +85,7 @@ public void testMqttsnConnectLongClientId() throws MqttsnCodecException { sb.append("A"); } - IMqttsnMessage message = factory.createConnect(sb.toString(), MqttsnConstants.UNSIGNED_MAX_16, false, true); + IMqttsnMessage message = factory.createConnect(sb.toString(), MqttsnConstants.UNSIGNED_MAX_16, false, true, 1024); testWireMessage(message); } diff --git a/mqtt-sn-codec/src/test/java/org/slj/mqtt/sn/wire/version2_0/payload/Mqttsn2_0WireTests.java b/mqtt-sn-codec/src/test/java/org/slj/mqtt/sn/wire/version2_0/payload/Mqttsn2_0WireTests.java index b95f9831..793fd530 100644 --- a/mqtt-sn-codec/src/test/java/org/slj/mqtt/sn/wire/version2_0/payload/Mqttsn2_0WireTests.java +++ b/mqtt-sn-codec/src/test/java/org/slj/mqtt/sn/wire/version2_0/payload/Mqttsn2_0WireTests.java @@ -43,7 +43,7 @@ public void setup(){ @Test public void testMqttsnConnect() throws MqttsnCodecException { - IMqttsnMessage message = factory.createConnect("THIS-IS-CLIENT-ID",98, false, false); + IMqttsnMessage message = factory.createConnect("THIS-IS-CLIENT-ID",98, false, false, 500); testWireMessage(message); diff --git a/mqtt-sn-core/src/main/java/org/slj/mqtt/sn/cli/AbstractInteractiveCli.java b/mqtt-sn-core/src/main/java/org/slj/mqtt/sn/cli/AbstractInteractiveCli.java index d212f4d1..2b943dea 100644 --- a/mqtt-sn-core/src/main/java/org/slj/mqtt/sn/cli/AbstractInteractiveCli.java +++ b/mqtt-sn-core/src/main/java/org/slj/mqtt/sn/cli/AbstractInteractiveCli.java @@ -24,16 +24,14 @@ package org.slj.mqtt.sn.cli; +import org.slj.mqtt.sn.codec.MqttsnCodecs; import org.slj.mqtt.sn.impl.AbstractMqttsnRuntime; import org.slj.mqtt.sn.impl.AbstractMqttsnRuntimeRegistry; import org.slj.mqtt.sn.model.IMqttsnContext; import org.slj.mqtt.sn.model.INetworkContext; import org.slj.mqtt.sn.model.MqttsnOptions; import org.slj.mqtt.sn.model.MqttsnSecurityOptions; -import org.slj.mqtt.sn.spi.IMqttsnMessage; -import org.slj.mqtt.sn.spi.IMqttsnTrafficListener; -import org.slj.mqtt.sn.spi.IMqttsnTransport; -import org.slj.mqtt.sn.spi.MqttsnException; +import org.slj.mqtt.sn.spi.*; import org.slj.mqtt.sn.utils.MqttsnUtils; import org.slj.mqtt.sn.utils.ThreadDump; import org.slj.mqtt.sn.utils.TopicPath; @@ -54,6 +52,8 @@ public abstract class AbstractInteractiveCli { static final String HOSTNAME = "hostName"; static final String CLIENTID = "clientId"; static final String PORT = "port"; + static final String PROTOCOL_VERSION = "protocolVersion"; + protected final AtomicInteger sentByteCount = new AtomicInteger(0); protected final AtomicInteger receiveByteCount = new AtomicInteger(0); @@ -67,6 +67,7 @@ public abstract class AbstractInteractiveCli { protected String hostName; protected int port; protected String clientId; + protected int protocolVersion = 1; protected MqttsnOptions options; protected AbstractMqttsnRuntimeRegistry runtimeRegistry; @@ -86,7 +87,7 @@ public void init(Scanner input, PrintStream output){ public void start() throws Exception { if(input == null || output == null) throw new IllegalStateException("no init"); - message(String.format("Starting up interactive CLI with port=%s, clientId=%s", port, clientId)); + message(String.format("Starting up interactive CLI with clientId=%s, protocolVersion=%s", clientId, protocolVersion)); if(useHistory){ saveConfig(); } @@ -95,6 +96,9 @@ public void start() throws Exception { options = createOptions(); message(String.format("Creating runtime registry.. DONE")); runtimeRegistry = createRuntimeRegistry(options, createTransport()); + IMqttsnCodec codec = createCodec(); + runtimeRegistry.withCodec(codec); + message(String.format("Creating runtime with codec version.. %s", codec.getProtocolVersion())); if(options.getSecurityOptions() != null){ message(String.format("Creating security configuration.. DONE")); message(String.format("Integrity type: %s", options.getSecurityOptions().getIntegrityType())); @@ -202,9 +206,13 @@ protected void configure() throws IOException { if(needsClientId()){ do{ - clientId = captureString(input, output, "Please enter a valid clientId"); + clientId = captureString(input, output, "Please enter a valid client or gateway Id"); } while(!validClientId(MAX_ALLOWED_CLIENTID_LENGTH)); } + + do{ + protocolVersion = captureMandatoryInt(input, output, "Please enter a protocol version (1 for 1.2 or 2 for 2.0)", new int[] {1, 2}); + } while(!validProtocolVersion()); } public void configureWithHistory() throws IOException { @@ -230,21 +238,29 @@ protected boolean configOk(){ if(needsPort()){ if(port == 0) return false; } - return true; + return validProtocolVersion(); } protected void loadConfigHistory(Properties props) throws IOException { hostName = props.getProperty(HOSTNAME); - port = Integer.valueOf(props.getProperty(PORT)); + try { + port = Integer.valueOf(props.getProperty(PORT)); + } catch(Exception e){ + } + try { + protocolVersion = Integer.valueOf(props.getProperty(PROTOCOL_VERSION)); + } catch(Exception e){ + } clientId = props.getProperty(CLIENTID); } protected void saveConfigHistory(Properties props) { - props.setProperty(HOSTNAME, hostName); - props.setProperty(PORT, String.valueOf(port)); - if(clientId != null){ + if(needsHostname() && hostName != null) props.setProperty(HOSTNAME, hostName); + if(needsPort()) props.setProperty(PORT, String.valueOf(port)); + if(needsClientId() && clientId != null){ props.setProperty(CLIENTID, clientId); } + props.setProperty(PROTOCOL_VERSION, String.valueOf(protocolVersion)); } protected boolean loadConfig() throws IOException { @@ -368,9 +384,13 @@ protected boolean validHost(){ return !p.matcher(hostName).find(); } + protected boolean validProtocolVersion(){ + if(protocolVersion == 1 || protocolVersion == 2) return true; + return false; + } + protected boolean validClientId(int maxLength){ if(clientId == null) return true; -// if(clientId.trim().length() == 0) return false; if(clientId.trim().length() > maxLength) return false; Pattern p = Pattern.compile("[a-zA-Z0-9\\-]{1,65528}"); return p.matcher(clientId).find(); @@ -546,6 +566,11 @@ protected boolean needsPort(){ protected abstract AbstractMqttsnRuntime createRuntime(AbstractMqttsnRuntimeRegistry registry, MqttsnOptions options); + protected IMqttsnCodec createCodec(){ + return protocolVersion == 1 ? MqttsnCodecs.MQTTSN_CODEC_VERSION_1_2 : + MqttsnCodecs.MQTTSN_CODEC_VERSION_2_0; + } + protected AbstractMqttsnRuntimeRegistry getRuntimeRegistry(){ return runtimeRegistry; } diff --git a/mqtt-sn-gateway-connector-aws-iotcore/mqtt-sn-gateway-connector-aws-iotcore.iml b/mqtt-sn-gateway-connector-aws-iotcore/mqtt-sn-gateway-connector-aws-iotcore.iml index 7d95d45f..bfc14ecf 100644 --- a/mqtt-sn-gateway-connector-aws-iotcore/mqtt-sn-gateway-connector-aws-iotcore.iml +++ b/mqtt-sn-gateway-connector-aws-iotcore/mqtt-sn-gateway-connector-aws-iotcore.iml @@ -13,14 +13,7 @@ - - - - - - - - + diff --git a/mqtt-sn-gateway-connector-google-iotcore/mqtt-sn-gateway-connector-google-iotcore.iml b/mqtt-sn-gateway-connector-google-iotcore/mqtt-sn-gateway-connector-google-iotcore.iml index e51c9441..66a97052 100644 --- a/mqtt-sn-gateway-connector-google-iotcore/mqtt-sn-gateway-connector-google-iotcore.iml +++ b/mqtt-sn-gateway-connector-google-iotcore/mqtt-sn-gateway-connector-google-iotcore.iml @@ -13,14 +13,7 @@ - - - - - - - - + diff --git a/mqtt-sn-gateway-connector-google-iotcore/src/main/java/org/slj/mqtt/sn/gateway/connector/google/iotcore/GoogleIoTCoreAggregatingGatewayInteractiveMain.java b/mqtt-sn-gateway-connector-google-iotcore/src/main/java/org/slj/mqtt/sn/gateway/connector/google/iotcore/GoogleIoTCoreAggregatingGatewayInteractiveMain.java index 4a016165..83d5729a 100644 --- a/mqtt-sn-gateway-connector-google-iotcore/src/main/java/org/slj/mqtt/sn/gateway/connector/google/iotcore/GoogleIoTCoreAggregatingGatewayInteractiveMain.java +++ b/mqtt-sn-gateway-connector-google-iotcore/src/main/java/org/slj/mqtt/sn/gateway/connector/google/iotcore/GoogleIoTCoreAggregatingGatewayInteractiveMain.java @@ -24,7 +24,6 @@ package org.slj.mqtt.sn.gateway.connector.google.iotcore; -import org.slj.mqtt.sn.codec.MqttsnCodecs; import org.slj.mqtt.sn.gateway.cli.MqttsnInteractiveGatewayLauncher; import org.slj.mqtt.sn.gateway.cli.MqttsnInteractiveGatewayWithKeystore; import org.slj.mqtt.sn.gateway.impl.MqttsnGatewayRuntimeRegistry; @@ -53,8 +52,7 @@ protected AbstractMqttsnRuntimeRegistry createRuntimeRegistry(MqttsnOptions opti return MqttsnGatewayRuntimeRegistry.defaultConfiguration(options). withBrokerConnectionFactory(new GoogleIoTCoreMqttsnBrokerConnectionFactory()). withBrokerService(new MqttsnAggregatingGateway(brokerOptions)). - withTransport(createTransport()). - withCodec(MqttsnCodecs.MQTTSN_CODEC_VERSION_1_2); + withTransport(createTransport()); } }, true, "Welcome to the Google IoT Core version of the gateway. You will need to connect your gateway to your Google IoT via the credentials available in your Google console."); } diff --git a/mqtt-sn-gateway-connector-paho/mqtt-sn-gateway-connector-paho.iml b/mqtt-sn-gateway-connector-paho/mqtt-sn-gateway-connector-paho.iml index c210e1e6..53dac377 100644 --- a/mqtt-sn-gateway-connector-paho/mqtt-sn-gateway-connector-paho.iml +++ b/mqtt-sn-gateway-connector-paho/mqtt-sn-gateway-connector-paho.iml @@ -13,14 +13,7 @@ - - - - - - - - + diff --git a/mqtt-sn-gateway-connector-paho/src/main/java/org/slj/mqtt/sn/gateway/connector/paho/AggregatingGatewayInteractiveMain.java b/mqtt-sn-gateway-connector-paho/src/main/java/org/slj/mqtt/sn/gateway/connector/paho/AggregatingGatewayInteractiveMain.java index 1d1b6011..e932b0a5 100644 --- a/mqtt-sn-gateway-connector-paho/src/main/java/org/slj/mqtt/sn/gateway/connector/paho/AggregatingGatewayInteractiveMain.java +++ b/mqtt-sn-gateway-connector-paho/src/main/java/org/slj/mqtt/sn/gateway/connector/paho/AggregatingGatewayInteractiveMain.java @@ -24,7 +24,6 @@ package org.slj.mqtt.sn.gateway.connector.paho; -import org.slj.mqtt.sn.codec.MqttsnCodecs; import org.slj.mqtt.sn.gateway.cli.MqttsnInteractiveGateway; import org.slj.mqtt.sn.gateway.cli.MqttsnInteractiveGatewayLauncher; import org.slj.mqtt.sn.gateway.impl.MqttsnGatewayRuntimeRegistry; @@ -32,7 +31,6 @@ import org.slj.mqtt.sn.gateway.spi.broker.MqttsnBackendOptions; import org.slj.mqtt.sn.impl.AbstractMqttsnRuntimeRegistry; import org.slj.mqtt.sn.model.MqttsnOptions; -import org.slj.mqtt.sn.model.MqttsnSecurityOptions; import org.slj.mqtt.sn.spi.IMqttsnTransport; public class AggregatingGatewayInteractiveMain { @@ -49,8 +47,7 @@ protected AbstractMqttsnRuntimeRegistry createRuntimeRegistry(MqttsnOptions opti return MqttsnGatewayRuntimeRegistry.defaultConfiguration(options). withBrokerConnectionFactory(new PahoMqttsnBrokerConnectionFactory()). withBrokerService(new MqttsnAggregatingGateway(brokerOptions)). - withTransport(createTransport()). - withCodec(MqttsnCodecs.MQTTSN_CODEC_VERSION_1_2); + withTransport(createTransport()); } }, true, "Welcome to the custom-broker version of the gateway. You will need to connect your gateway to your MQTT broker using a client connection host, port, username, password & clientId."); } diff --git a/mqtt-sn-gateway/pom.xml b/mqtt-sn-gateway/pom.xml index 7a5c7fd3..10c6903f 100644 --- a/mqtt-sn-gateway/pom.xml +++ b/mqtt-sn-gateway/pom.xml @@ -65,7 +65,7 @@ - mqtt-sn-gateway-v12-loopback-${project.version} + mqtt-sn-gateway-loopback-${project.version} org.apache.maven.plugins diff --git a/mqtt-sn-gateway/src/main/java/org/slj/mqtt/sn/gateway/cli/MqttsnInteractiveGatewayLauncher.java b/mqtt-sn-gateway/src/main/java/org/slj/mqtt/sn/gateway/cli/MqttsnInteractiveGatewayLauncher.java index f4c987ff..3025dbb6 100644 --- a/mqtt-sn-gateway/src/main/java/org/slj/mqtt/sn/gateway/cli/MqttsnInteractiveGatewayLauncher.java +++ b/mqtt-sn-gateway/src/main/java/org/slj/mqtt/sn/gateway/cli/MqttsnInteractiveGatewayLauncher.java @@ -42,6 +42,7 @@ public static void launch(MqttsnInteractiveGateway interactiveGateway, boolean n interactiveGateway.exit(); } catch (Exception e) { System.err.println("A fatal error was encountered: " + e.getMessage()); + e.printStackTrace(System.err); } finally { interactiveGateway.stop(); } diff --git a/mqtt-sn-gateway/src/main/java/org/slj/mqtt/sn/gateway/impl/MqttsnGateway.java b/mqtt-sn-gateway/src/main/java/org/slj/mqtt/sn/gateway/impl/MqttsnGateway.java index 0a3c47c7..44f5ee15 100644 --- a/mqtt-sn-gateway/src/main/java/org/slj/mqtt/sn/gateway/impl/MqttsnGateway.java +++ b/mqtt-sn-gateway/src/main/java/org/slj/mqtt/sn/gateway/impl/MqttsnGateway.java @@ -172,8 +172,8 @@ public void stopServices(IMqttsnRuntimeRegistry runtime) throws MqttsnException callShutdown(runtime.getMessageStateService()); if(((MqttsnGatewayOptions)runtime.getOptions()).isRealtimeMessageCounters()){ - sendCounter.stop(); - receiveCounter.stop(); + if(sendCounter != null) sendCounter.stop(); + if(receiveCounter != null) receiveCounter.stop(); } } diff --git a/mqtt-sn-gateway/src/main/java/org/slj/mqtt/sn/gateway/impl/connector/LoopbackGatewayInteractiveMain.java b/mqtt-sn-gateway/src/main/java/org/slj/mqtt/sn/gateway/impl/connector/LoopbackGatewayInteractiveMain.java index cac0e64c..33b03c41 100644 --- a/mqtt-sn-gateway/src/main/java/org/slj/mqtt/sn/gateway/impl/connector/LoopbackGatewayInteractiveMain.java +++ b/mqtt-sn-gateway/src/main/java/org/slj/mqtt/sn/gateway/impl/connector/LoopbackGatewayInteractiveMain.java @@ -24,7 +24,6 @@ package org.slj.mqtt.sn.gateway.impl.connector; -import org.slj.mqtt.sn.codec.MqttsnCodecs; import org.slj.mqtt.sn.gateway.cli.MqttsnInteractiveGateway; import org.slj.mqtt.sn.gateway.cli.MqttsnInteractiveGatewayLauncher; import org.slj.mqtt.sn.gateway.impl.MqttsnGatewayRuntimeRegistry; @@ -38,8 +37,13 @@ public class LoopbackGatewayInteractiveMain { public static void main(String[] args) throws Exception { MqttsnInteractiveGatewayLauncher.launch(new MqttsnInteractiveGateway() { protected AbstractMqttsnRuntimeRegistry createRuntimeRegistry(MqttsnOptions options, IMqttsnTransport transport) { - MqttsnBackendOptions brokerOptions = new MqttsnBackendOptions(). - withHost(hostName). + MqttsnBackendOptions brokerOptions = new MqttsnBackendOptions(){ + @Override + public boolean validConnectionDetails() { + return true; + } + }; + brokerOptions.withHost(hostName). withPort(port). withUsername(username). withPassword(password); @@ -47,8 +51,7 @@ protected AbstractMqttsnRuntimeRegistry createRuntimeRegistry(MqttsnOptions opti return MqttsnGatewayRuntimeRegistry.defaultConfiguration(options). withBrokerConnectionFactory(new LoopbackMqttsnBrokerConnectionFactory()). withBrokerService(new MqttsnAggregatingGateway(brokerOptions)). - withTransport(createTransport()). - withCodec(MqttsnCodecs.MQTTSN_CODEC_VERSION_1_2); + withTransport(createTransport()); } }, false, "Welcome to the loopback gateway. This version does NOT use a backend broker, instead brokering MQTT messages itself as a loopback to connected devices."); } diff --git a/mqtt-sn-gateway/src/main/java/org/slj/mqtt/sn/gateway/impl/gateway/MqttsnGatewayMessageHandler.java b/mqtt-sn-gateway/src/main/java/org/slj/mqtt/sn/gateway/impl/gateway/MqttsnGatewayMessageHandler.java index 523a2329..b40b01a9 100644 --- a/mqtt-sn-gateway/src/main/java/org/slj/mqtt/sn/gateway/impl/gateway/MqttsnGatewayMessageHandler.java +++ b/mqtt-sn-gateway/src/main/java/org/slj/mqtt/sn/gateway/impl/gateway/MqttsnGatewayMessageHandler.java @@ -171,8 +171,8 @@ else if(context.getProtocolVersion() == MqttsnConstants.PROTOCOL_VERSION_2_0){ } String assignedClientId = context.isAssignedClientId() ? context.getId() : null; + boolean stateExisted = hasSessionState(context); IMqttsnSessionState state = getSessionState(context, true); - ConnectResult result = registry.getGatewaySessionService().connect(state, connect); processSessionResult(result); if(result.isError()){ @@ -182,7 +182,6 @@ else if(context.getProtocolVersion() == MqttsnConstants.PROTOCOL_VERSION_2_0){ if(will){ return registry.getMessageFactory().createWillTopicReq(); } else { - boolean stateExisted = hasSessionState(context); long sessionExpiryIntervalRequested = sessionExpiryInterval; if(sessionExpiryInterval > registry.getOptions().getRemoveDisconnectedSessionsSeconds()){ diff --git a/mqtt-sn-gateway/src/main/java/org/slj/mqtt/sn/gateway/impl/gateway/MqttsnGatewaySessionService.java b/mqtt-sn-gateway/src/main/java/org/slj/mqtt/sn/gateway/impl/gateway/MqttsnGatewaySessionService.java index 27ac3cd7..f215ef7f 100644 --- a/mqtt-sn-gateway/src/main/java/org/slj/mqtt/sn/gateway/impl/gateway/MqttsnGatewaySessionService.java +++ b/mqtt-sn-gateway/src/main/java/org/slj/mqtt/sn/gateway/impl/gateway/MqttsnGatewaySessionService.java @@ -264,6 +264,7 @@ public SubscribeResult subscribe(IMqttsnSessionState state, TopicInfo info, IMqt if(registry.getSubscriptionRegistry().subscribe(state.getContext(), topicPath, QoS)){ SubscribeResult result = registry.getBackendService().subscribe(context, new TopicPath(topicPath), message); result.setTopicInfo(info); + result.setGrantedQoS(QoS); return result; } else { SubscribeResult result = new SubscribeResult(Result.STATUS.NOOP); @@ -431,7 +432,8 @@ public void receiveToSessions(String topicPath, int qos, boolean retained, byte[ int q = Math.min(grantedQos,qos); IMqttsnSessionState sessionState = getSessionState(client, false); if(sessionState != null){ - if(payload.length > sessionState.getMaxPacketSize()){ + if(sessionState.getMaxPacketSize() != 0 && + payload.length + 9 > sessionState.getMaxPacketSize()){ logger.log(Level.WARNING, String.format("payload exceeded max size (%s) bytes configured by client, ignore this client [%s]", payload.length, client)); } else { PublishData data = new PublishData(topicPath, q, retained); diff --git a/mqtt-sn-gateway/src/main/java/org/slj/mqtt/sn/gateway/spi/broker/MqttsnBackendOptions.java b/mqtt-sn-gateway/src/main/java/org/slj/mqtt/sn/gateway/spi/broker/MqttsnBackendOptions.java index 0121a168..630046a2 100644 --- a/mqtt-sn-gateway/src/main/java/org/slj/mqtt/sn/gateway/spi/broker/MqttsnBackendOptions.java +++ b/mqtt-sn-gateway/src/main/java/org/slj/mqtt/sn/gateway/spi/broker/MqttsnBackendOptions.java @@ -26,7 +26,7 @@ import java.util.Objects; -public final class MqttsnBackendOptions { +public class MqttsnBackendOptions { public static final boolean DEFAULT_CONNECT_ON_STARTUP = true; public static final boolean DEFAULT_MANAGED_CONNECTIONS = true; diff --git a/mqtt-sn-load-test/mqtt-sn-load-test.iml b/mqtt-sn-load-test/mqtt-sn-load-test.iml index b5c7d528..af05062d 100644 --- a/mqtt-sn-load-test/mqtt-sn-load-test.iml +++ b/mqtt-sn-load-test/mqtt-sn-load-test.iml @@ -11,7 +11,7 @@ - +