Skip to content

Commit

Permalink
feat(mqtt5): resume session on reconnect (#1558)
Browse files Browse the repository at this point in the history
  • Loading branch information
jcosentino11 authored Nov 9, 2023
1 parent 4276d74 commit e8da9a7
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ public void onStopped(Mqtt5Client client, OnStoppedReturn onStoppedReturn) {
this.builderProvider = builderProvider;
}

synchronized Mqtt5Client getClient() { // for testing
return client;
}

void disableRateLimiting() {
bandwidthLimiter.setRate(Double.MAX_VALUE);
transactionLimiter.setRate(Double.MAX_VALUE);
Expand Down Expand Up @@ -306,7 +310,11 @@ private synchronized void internalConnect() {

builder.withLifeCycleEvents(this.connectionEventCallback)
.withPublishEvents(this.messageHandler)
.withSessionBehavior(Mqtt5ClientOptions.ClientSessionBehavior.REJOIN_POST_SUCCESS)
// reset the session on initial connect,
// but when we reconnect purposefully,
// attempt to resume the session rather than clear it again
.withSessionBehavior(hasConnectedOnce.get() ? Mqtt5ClientOptions.ClientSessionBehavior.REJOIN_ALWAYS
: Mqtt5ClientOptions.ClientSessionBehavior.REJOIN_POST_SUCCESS)
.withOfflineQueueBehavior(
Mqtt5ClientOptions.ClientOfflineQueueBehavior.FAIL_ALL_ON_DISCONNECT)
.withMinReconnectDelayMs(minReconnectSeconds == 0 ? null : minReconnectSeconds * 1000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,43 @@

package com.aws.greengrass.mqttclient;

import com.aws.greengrass.componentmanager.KernelConfigResolver;
import com.aws.greengrass.config.Topics;
import com.aws.greengrass.dependency.Context;
import com.aws.greengrass.testcommons.testutilities.GGExtension;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension;
import software.amazon.awssdk.crt.mqtt.MqttClientConnectionEvents;
import software.amazon.awssdk.crt.mqtt5.Mqtt5ClientOptions;
import software.amazon.awssdk.crt.mqtt5.OnConnectionSuccessReturn;
import software.amazon.awssdk.crt.mqtt5.OnDisconnectionReturn;
import software.amazon.awssdk.crt.mqtt5.packets.ConnAckPacket;
import software.amazon.awssdk.iot.AwsIotMqtt5ClientBuilder;

import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeoutException;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@SuppressWarnings("PMD.CloseResource")
@ExtendWith({GGExtension.class, MockitoExtension.class})
public class AwsIotMqtt5ClientTest {
class AwsIotMqtt5ClientTest {
@Mock
AwsIotMqtt5ClientBuilder builder;

Expand All @@ -41,29 +51,34 @@ public class AwsIotMqtt5ClientTest {
@Mock
MqttClientConnectionEvents mockCallback2;

@Spy
CallbackEventManager callbackEventManager;
Topics mockTopic;

// same as what we use in Kernel
private ExecutorService executorService;
private ScheduledExecutorService ses;
Topics topics = Topics.of(new Context(), KernelConfigResolver.CONFIGURATION_CONFIG_KEY, null);

ExecutorService executorService = Executors.newFixedThreadPool(1);

@Mock
ScheduledExecutorService ses;

@BeforeEach
void beforeEach() {
callbackEventManager = spy(new CallbackEventManager());
callbackEventManager.addToCallbackEvents(mockCallback1);
callbackEventManager.addToCallbackEvents(mockCallback2);
mockTopic = mock(Topics.class);
executorService = Executors.newCachedThreadPool();
ses = new ScheduledThreadPoolExecutor(4);
}

@AfterEach
void tearDown() throws IOException {
executorService.shutdownNow();
topics.getContext().close();
}

@Test
void GIVEN_multiple_callbacks_in_callbackEventManager_WHEN_connections_are_interrupted_purposely_THEN_no_callbacks_are_called() {
AwsIotMqtt5Client client1 = new AwsIotMqtt5Client(() -> builder, (x) -> null, "A", 0, mockTopic,
AwsIotMqtt5Client client1 = new AwsIotMqtt5Client(() -> builder, (x) -> null, "A", 0, topics,
callbackEventManager, executorService, ses);
client1.disableRateLimiting();
AwsIotMqtt5Client client2 = new AwsIotMqtt5Client(() -> builder, (x) -> null, "B", 0, mockTopic,
AwsIotMqtt5Client client2 = new AwsIotMqtt5Client(() -> builder, (x) -> null, "B", 0, topics,
callbackEventManager, executorService, ses);
client2.disableRateLimiting();
callbackEventManager.runOnConnectionResumed(false);
Expand All @@ -84,4 +99,39 @@ void GIVEN_multiple_callbacks_in_callbackEventManager_WHEN_connections_are_inter

assertTrue(callbackEventManager.hasCallbacked());
}

@Test
void GIVEN_connected_client_WHEN_reconnect_THEN_client_configured_to_resume_session() {
try (AwsIotMqtt5ClientBuilder builder = AwsIotMqtt5ClientBuilder.newMqttBuilder("localhost");
AwsIotMqtt5Client client = new AwsIotMqtt5Client(() -> builder, (x) -> null, "A", 0, topics,
callbackEventManager, executorService, ses)) {
Runnable reconnectSuccessfully = () -> {
executorService.submit(() -> {
try {
Thread.sleep(1000L);
client.getConnectionEventCallback().onConnectionSuccess(client.getClient(), connectionSuccess());
} catch (InterruptedException ignore) {
}
});
try {
client.reconnect(5000L);
} catch (ExecutionException | InterruptedException | TimeoutException e) {
fail(e);
}
};

// even on reconnect, if first time connection, use cleanSession=True then cleanSession=False
reconnectSuccessfully.run();
assertEquals(Mqtt5ClientOptions.ClientSessionBehavior.REJOIN_POST_SUCCESS, client.getClient().getClientOptions().getSessionBehavior());
// subsequent connects use rejoin always
reconnectSuccessfully.run();
assertEquals(Mqtt5ClientOptions.ClientSessionBehavior.REJOIN_ALWAYS, client.getClient().getClientOptions().getSessionBehavior());
}
}

private OnConnectionSuccessReturn connectionSuccess() {
OnConnectionSuccessReturn ret = mock(OnConnectionSuccessReturn.class);
when(ret.getConnAckPacket()).thenReturn(mock(ConnAckPacket.class));
return ret;
}
}

0 comments on commit e8da9a7

Please sign in to comment.