From faba83a29e05bec988075c689105a96572ca40f7 Mon Sep 17 00:00:00 2001 From: timtay-microsoft Date: Thu, 5 Aug 2021 20:17:11 -0700 Subject: [PATCH] fix(iot-dev): Fix bug where multiplexed deviceClients did not maintain twin and/or method subscriptions after reconnection (#1298) If a multiplexed device is subscribed to twin and/or methods, then loses its session due to network issues, it should still be subscribed to twin and/or methods after it finishes reconnection --- .../iot/device/transport/IotHubTransport.java | 6 +- .../amqps/AmqpsIotHubConnection.java | 70 +++++++++--- .../iot/iothub/MultiplexingClientTests.java | 101 +++++++++++++++++- 3 files changed, 158 insertions(+), 19 deletions(-) diff --git a/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/IotHubTransport.java b/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/IotHubTransport.java index b4aa085b58..243f8a73ed 100644 --- a/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/IotHubTransport.java +++ b/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/IotHubTransport.java @@ -833,7 +833,7 @@ public void registerMultiplexedDeviceClient(List configs, lo // Since the registration failed, need to remove the device from the list of multiplexed devices DeviceClientConfig configThatFailedToRegister = this.deviceClientConfigs.remove(deviceId); - ((AmqpsIotHubConnection) this.iotHubTransportConnection).unregisterMultiplexedDevice(configThatFailedToRegister); + ((AmqpsIotHubConnection) this.iotHubTransportConnection).unregisterMultiplexedDevice(configThatFailedToRegister, false); } } @@ -856,7 +856,7 @@ public void unregisterMultiplexedDeviceClient(List configs, if (this.iotHubTransportConnection != null) { // Safe cast since amqps and amqps_ws always use this transport connection type. - ((AmqpsIotHubConnection) this.iotHubTransportConnection).unregisterMultiplexedDevice(configToRegister); + ((AmqpsIotHubConnection) this.iotHubTransportConnection).unregisterMultiplexedDevice(configToRegister, false); } else { @@ -1286,7 +1286,7 @@ private void singleDeviceReconnectAttemptAsync(String deviceId) return; } - ((AmqpsIotHubConnection) this.iotHubTransportConnection).unregisterMultiplexedDevice(config); + ((AmqpsIotHubConnection) this.iotHubTransportConnection).unregisterMultiplexedDevice(config, true); ((AmqpsIotHubConnection) this.iotHubTransportConnection).registerMultiplexedDevice(config); } diff --git a/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/amqps/AmqpsIotHubConnection.java b/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/amqps/AmqpsIotHubConnection.java index aefcc168f6..45fc3675b0 100644 --- a/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/amqps/AmqpsIotHubConnection.java +++ b/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/amqps/AmqpsIotHubConnection.java @@ -90,6 +90,7 @@ public final class AmqpsIotHubConnection extends BaseHandler implements IotHubTr // Proton-j primitives and wrappers for the device and authentication sessions private Connection connection; private Reactor reactor; + private final Queue reconnectingDeviceSessionHandlers = new ConcurrentLinkedQueue<>(); private final Queue sessionHandlers = new ConcurrentLinkedQueue<>(); private final Queue sasTokenRenewalHandlers = new ConcurrentLinkedQueue<>(); private AmqpsCbsSessionHandler amqpsCbsSessionHandler; @@ -97,7 +98,9 @@ public final class AmqpsIotHubConnection extends BaseHandler implements IotHubTr // Multiplexed device registrations and un-registrations come from a non-reactor thread, so they get queued into these // queues and are executed when onTimerTask checks them. private final Set multiplexingClientsToRegister; - private final Set multiplexingClientsToUnregister; + + // keys are the configs of the clients to unregister, values are the flag that determines if the session should be cached locally for re-use upon reconnection + private final Map multiplexingClientsToUnregister; private final boolean isMultiplexing; @@ -106,7 +109,7 @@ public AmqpsIotHubConnection(DeviceClientConfig config, boolean isMultiplexing) // This allows us to create thread safe sets despite there being no such type default in Java 7 or 8 this.deviceClientConfigs = Collections.newSetFromMap(new ConcurrentHashMap()); this.multiplexingClientsToRegister = Collections.newSetFromMap(new ConcurrentHashMap()); - this.multiplexingClientsToUnregister = Collections.newSetFromMap(new ConcurrentHashMap()); + this.multiplexingClientsToUnregister = new ConcurrentHashMap<>(); this.deviceClientConfigs.add(config); @@ -139,7 +142,7 @@ public AmqpsIotHubConnection(String hostName, boolean isWebsocketConnection, SSL // This allows us to create thread safe sets despite there being no such type default in Java 7 or 8 this.deviceClientConfigs = Collections.newSetFromMap(new ConcurrentHashMap()); this.multiplexingClientsToRegister = Collections.newSetFromMap(new ConcurrentHashMap()); - this.multiplexingClientsToUnregister = Collections.newSetFromMap(new ConcurrentHashMap()); + this.multiplexingClientsToUnregister = new ConcurrentHashMap<>(); this.isWebsocketConnection = isWebsocketConnection; @@ -172,14 +175,28 @@ public void registerMultiplexedDevice(DeviceClientConfig config) deviceClientConfigs.add(config); } - public void unregisterMultiplexedDevice(DeviceClientConfig config) + /** + * Asynchronously unregister a multiplexed device from an active multiplexed connection or synchronously unregister + * a multiplexed device from a closed multiplexed connection. + * @param config the config of the device that should be unregistered. + * @param willReconnect true if the device will be re-registered soon because it is reconnecting. + */ + public void unregisterMultiplexedDevice(DeviceClientConfig config, boolean willReconnect) { if (this.state == IotHubConnectionStatus.CONNECTED) { // session closing logic should be done from a proton reactor thread, not this thread. This queue gets polled // onTimerTask so that this client gets unregistered on that thread instead. - log.trace("Queuing the unregistration of device {} from an active multiplexed connection", config.getDeviceId()); - this.multiplexingClientsToUnregister.add(config); + if (willReconnect) + { + log.trace("Queuing the unregistration of device {} from an active multiplexed connection. The device will be re-registered for reconnection purposes.", config.getDeviceId()); + } + else + { + log.trace("Queuing the unregistration of device {} from an active multiplexed connection", config.getDeviceId()); + } + + this.multiplexingClientsToUnregister.put(config, willReconnect); } deviceClientConfigs.remove(config); @@ -197,7 +214,7 @@ public void open() throws TransportException { for (DeviceClientConfig clientConfig : deviceClientConfigs) { - this.createSessionHandler(clientConfig); + this.addSessionHandler(clientConfig); } initializeStateLatches(); @@ -927,7 +944,7 @@ private void releaseDeviceSessionLatches() } } - private AmqpsSessionHandler createSessionHandler(DeviceClientConfig deviceClientConfig) + private AmqpsSessionHandler addSessionHandler(DeviceClientConfig deviceClientConfig) { // Check if the device session still exists from a previous connection AmqpsSessionHandler amqpsSessionHandler = null; @@ -940,14 +957,26 @@ private AmqpsSessionHandler createSessionHandler(DeviceClientConfig deviceClient } } + // If the device session was temporarily unregistered during reconnection, reuse the cached session handler + // since it holds the subscription information needed to fully reconnect all links that were open prior to reconnection. + for (AmqpsSessionHandler cachedDeviceSessionHandler : this.reconnectingDeviceSessionHandlers) + { + if (cachedDeviceSessionHandler.getDeviceId().equals(deviceClientConfig.getDeviceId())) + { + amqpsSessionHandler = cachedDeviceSessionHandler; + break; + } + } + // If the device session did not exist in the previous connection, or if there was no previous connection, // create a new session if (amqpsSessionHandler == null) { amqpsSessionHandler = new AmqpsSessionHandler(deviceClientConfig, this); - this.sessionHandlers.add(amqpsSessionHandler); } + this.sessionHandlers.add(amqpsSessionHandler); + return amqpsSessionHandler; } @@ -960,7 +989,7 @@ private void checkForNewlyRegisteredMultiplexedClientsToStart() Set configsRegisteredSuccessfully = new HashSet<>(); while (configToRegister != null) { - AmqpsSessionHandler amqpsSessionHandler = createSessionHandler(configToRegister); + AmqpsSessionHandler amqpsSessionHandler = addSessionHandler(configToRegister); log.trace("Adding device session for device {} to an active connection", configToRegister.getDeviceId()); amqpsSessionHandler.setSession(this.connection.session()); @@ -993,7 +1022,7 @@ private void checkForNewlyRegisteredMultiplexedClientsToStart() // can be opened on a reactor thread instead of from one of our threads. private void checkForNewlyUnregisteredMultiplexedClientsToStop() { - Iterator configsToUnregisterIterator = this.multiplexingClientsToUnregister.iterator(); + Iterator configsToUnregisterIterator = this.multiplexingClientsToUnregister.keySet().iterator(); DeviceClientConfig configToUnregister = configsToUnregisterIterator.hasNext() ? configsToUnregisterIterator.next() : null; Set configsUnregisteredSuccessfully = new HashSet<>(); while (configToUnregister != null) @@ -1019,6 +1048,19 @@ private void checkForNewlyUnregisteredMultiplexedClientsToStop() log.trace("Removing session handler for device {}", amqpsSessionHandler.getDeviceId()); this.sessionHandlers.remove(amqpsSessionHandler); + // if the client being unregistered is doing so for reconnection purposes + boolean isSessionReconnecting = this.multiplexingClientsToUnregister.get(configToUnregister); + if (isSessionReconnecting) + { + // save the session handler for later since it has state for what subscriptions the device had before this reconnection + this.reconnectingDeviceSessionHandlers.add(amqpsSessionHandler); + } + else + { + // remove the cached session handler since the device is being unregistered manually if it is cached + this.reconnectingDeviceSessionHandlers.remove(amqpsSessionHandler); + } + // Need to find the sas token renewal handler that is tied to this device AmqpsSasTokenRenewalHandler sasTokenRenewalHandlerToRemove = null; for (AmqpsSasTokenRenewalHandler existingSasTokenRenewalHandler : this.sasTokenRenewalHandlers) @@ -1049,7 +1091,11 @@ private void checkForNewlyUnregisteredMultiplexedClientsToStop() configToUnregister = configsToUnregisterIterator.hasNext() ? configsToUnregisterIterator.next() : null; } - this.multiplexingClientsToUnregister.removeAll(configsUnregisteredSuccessfully); + for (DeviceClientConfig successfullyUnregisteredConfig : configsUnregisteredSuccessfully) + { + this.multiplexingClientsToUnregister.remove(successfullyUnregisteredConfig); + } + this.deviceClientConfigs.removeAll(configsUnregisteredSuccessfully); } diff --git a/iot-e2e-tests/common/src/test/java/tests/integration/com/microsoft/azure/sdk/iot/iothub/MultiplexingClientTests.java b/iot-e2e-tests/common/src/test/java/tests/integration/com/microsoft/azure/sdk/iot/iothub/MultiplexingClientTests.java index e38623313a..1ab6d4dc2b 100644 --- a/iot-e2e-tests/common/src/test/java/tests/integration/com/microsoft/azure/sdk/iot/iothub/MultiplexingClientTests.java +++ b/iot-e2e-tests/common/src/test/java/tests/integration/com/microsoft/azure/sdk/iot/iothub/MultiplexingClientTests.java @@ -35,6 +35,7 @@ import com.microsoft.azure.sdk.iot.service.ServiceClient; import com.microsoft.azure.sdk.iot.service.auth.AuthenticationType; import com.microsoft.azure.sdk.iot.service.devicetwin.DeviceMethod; +import com.microsoft.azure.sdk.iot.service.devicetwin.DeviceMethodClientOptions; import com.microsoft.azure.sdk.iot.service.devicetwin.DeviceTwin; import com.microsoft.azure.sdk.iot.service.devicetwin.DeviceTwinClientOptions; import com.microsoft.azure.sdk.iot.service.devicetwin.DeviceTwinDevice; @@ -1110,8 +1111,8 @@ public void multiplexedConnectionRecoversFromDeviceSessionDropsSequential() thro for (int i = 0; i < DEVICE_MULTIPLEX_COUNT; i++) { log.info("Starting loop for device {}", testInstance.deviceClientArray.get(i).getConfig().getDeviceId()); - Message errorIjectionMessage = ErrorInjectionHelper.amqpsSessionDropErrorInjectionMessage(1, 10); - Success messageSendSuccess = testSendingMessageFromDeviceClient(testInstance.deviceClientArray.get(i), errorIjectionMessage); + Message errorInjectionMessage = ErrorInjectionHelper.amqpsSessionDropErrorInjectionMessage(1, 10); + Success messageSendSuccess = testSendingMessageFromDeviceClient(testInstance.deviceClientArray.get(i), errorInjectionMessage); waitForMessageToBeAcknowledged(messageSendSuccess, "Timed out waiting for error injection message to be acknowledged"); // Now that error injection message has been sent, need to wait for the device session to drop @@ -1164,8 +1165,8 @@ public void multiplexedConnectionRecoversFromDeviceSessionDropsParallel() throws // For each multiplexed device, use fault injection to drop the session and see if it can recover, one device at a time for (int i = 0; i < DEVICE_MULTIPLEX_COUNT; i++) { - Message errorIjectionMessage = ErrorInjectionHelper.amqpsSessionDropErrorInjectionMessage(1, 10); - Success messageSendSuccess = testSendingMessageFromDeviceClient(testInstance.deviceClientArray.get(i), errorIjectionMessage); + Message errorInjectionMessage = ErrorInjectionHelper.amqpsSessionDropErrorInjectionMessage(1, 10); + Success messageSendSuccess = testSendingMessageFromDeviceClient(testInstance.deviceClientArray.get(i), errorInjectionMessage); waitForMessageToBeAcknowledged(messageSendSuccess, "Timed out waiting for error injection message to be acknowledged"); } @@ -1779,6 +1780,98 @@ public void failedRegistrationDoesNotAffectSubsequentRegistrations() throws Exce testInstance.multiplexingClient.close(); } + // If a multiplexed device is subscribed to twin and/or methods, then loses its session due to network issues, + // it should still be subscribed to twin and/or methods after it finishes reconnection + @StandardTierHubOnlyTest + @ContinuousIntegrationTest + @Test + public void multiplexedSessionsRecoverTwinAndMethodSubscriptionsFromDeviceSessionDrops() throws Exception + { + testInstance.setup(DEVICE_MULTIPLEX_COUNT); + ConnectionStatusChangeTracker multiplexedConnectionStatusChangeTracker = new ConnectionStatusChangeTracker(); + testInstance.multiplexingClient.registerConnectionStatusChangeCallback(multiplexedConnectionStatusChangeTracker, null); + ConnectionStatusChangeTracker[] connectionStatusChangeTrackers = new ConnectionStatusChangeTracker[DEVICE_MULTIPLEX_COUNT]; + + DeviceTwin deviceTwinServiceClient = + new DeviceTwin(iotHubConnectionString, DeviceTwinClientOptions.builder().httpReadTimeout(0).build()); + + DeviceMethod deviceMethodServiceClient = + new DeviceMethod(iotHubConnectionString, DeviceMethodClientOptions.builder().httpReadTimeout(0).build()); + + for (int i = 0; i < DEVICE_MULTIPLEX_COUNT; i++) + { + connectionStatusChangeTrackers[i] = new ConnectionStatusChangeTracker(); + testInstance.deviceClientArray.get(i).registerConnectionStatusChangeCallback(connectionStatusChangeTrackers[i], null); + } + + testInstance.multiplexingClient.open(); + + // Subscribe to methods for all multiplexed clients + DeviceMethodCallback[] deviceMethodCallbacks = new DeviceMethodCallback[DEVICE_MULTIPLEX_COUNT]; + String[] expectedMethodNames = new String[DEVICE_MULTIPLEX_COUNT]; + for (int i = 0; i < DEVICE_MULTIPLEX_COUNT; i++) + { + expectedMethodNames[i] = UUID.randomUUID().toString(); + deviceMethodCallbacks[i] = new DeviceMethodCallback(expectedMethodNames[i]); + subscribeToDeviceMethod(testInstance.deviceClientArray.get(i), deviceMethodCallbacks[i]); + } + + // Start twin for all multiplexed clients + String[] expectedPropertyKeys = new String[DEVICE_MULTIPLEX_COUNT]; + String[] expectedPropertyValues = new String[DEVICE_MULTIPLEX_COUNT]; + TwinPropertyCallBackImpl[] twinPropertyCallBacks = new TwinPropertyCallBackImpl[DEVICE_MULTIPLEX_COUNT]; + for (int i = 0; i < DEVICE_MULTIPLEX_COUNT; i++) + { + expectedPropertyKeys[i] = UUID.randomUUID().toString(); + expectedPropertyValues[i] = UUID.randomUUID().toString(); + twinPropertyCallBacks[i] = new TwinPropertyCallBackImpl(expectedPropertyKeys[i], expectedPropertyValues[i]); + startTwin(testInstance.deviceClientArray.get(i), new EventCallback(IotHubStatusCode.OK), twinPropertyCallBacks[i]); + } + + // For each multiplexed device, use fault injection to drop the session and see if it can recover, one device at a time + for (int i = 0; i < DEVICE_MULTIPLEX_COUNT; i++) + { + log.info("Starting loop for device {}", testInstance.deviceClientArray.get(i).getConfig().getDeviceId()); + Message errorInjectionMessage = ErrorInjectionHelper.amqpsSessionDropErrorInjectionMessage(1, 10); + Success messageSendSuccess = testSendingMessageFromDeviceClient(testInstance.deviceClientArray.get(i), errorInjectionMessage); + waitForMessageToBeAcknowledged(messageSendSuccess, "Timed out waiting for error injection message to be acknowledged"); + + // Now that error injection message has been sent, need to wait for the device session to drop + assertConnectionStateCallbackFiredDisconnectedRetrying(connectionStatusChangeTrackers[i]); + + // Next, the faulted device should eventually recover + log.info("Waiting for device {} to reconnect", testInstance.deviceClientArray.get(i).getConfig().getDeviceId()); + assertConnectionStateCallbackFiredConnected(connectionStatusChangeTrackers[i], FAULT_INJECTION_RECOVERY_TIMEOUT_MILLIS); + + for (int j = i + 1; j < DEVICE_MULTIPLEX_COUNT; j++) + { + // devices above index i have not been deliberately faulted yet, so make sure they haven't seen a DISCONNECTED_RETRYING event yet. + assertFalse("Multiplexed device that hasn't been deliberately faulted yet saw an unexpected DISCONNECTED_RETRYING connection status callback", connectionStatusChangeTrackers[j].wentDisconnectedRetrying); + } + } + + for (int i = 0; i < DEVICE_MULTIPLEX_COUNT; i++) + { + // test d2c telemetry + testSendingMessagesFromMultiplexedClients(testInstance.deviceClientArray); + + // test receiving direct methods + testDeviceMethod(deviceMethodServiceClient, testInstance.deviceIdentityArray.get(i).getDeviceId(), expectedMethodNames[i], deviceMethodCallbacks[i]); + + // Send desired property update to multiplexed device + testDesiredPropertiesFlow(testInstance.deviceClientArray.get(i), deviceTwinServiceClient, twinPropertyCallBacks[i], expectedPropertyKeys[i], expectedPropertyValues[i]); + + // Testing sending reported properties + testReportedPropertiesFlow(testInstance.deviceClientArray.get(i), deviceTwinServiceClient, expectedPropertyKeys[i], expectedPropertyValues[i]); + } + + assertFalse(multiplexedConnectionStatusChangeTracker.wentDisconnectedRetrying); + + testInstance.multiplexingClient.close(); + + assertMultiplexedDevicesClosedGracefully(connectionStatusChangeTrackers); + } + private static void assertMultiplexedDevicesClosedGracefully(ConnectionStatusChangeTracker[] connectionStatusChangeTrackers) { for (ConnectionStatusChangeTracker connectionStatusChangeTracker : connectionStatusChangeTrackers)