Skip to content

Commit

Permalink
fix(iot-dev): Fix bug where multiplexed deviceClients did not maintai…
Browse files Browse the repository at this point in the history
…n twin and/or method subscriptions after reconnection (#1298)

If a multiplexed device is subscribed to twin and/or methods, then loses its session due to network issues, it should still be subscribed to twin and/or methods after it finishes reconnection
  • Loading branch information
timtay-microsoft authored Aug 6, 2021
1 parent aee2d06 commit faba83a
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@ public void registerMultiplexedDeviceClient(List<DeviceClientConfig> configs, lo

// Since the registration failed, need to remove the device from the list of multiplexed devices
DeviceClientConfig configThatFailedToRegister = this.deviceClientConfigs.remove(deviceId);
((AmqpsIotHubConnection) this.iotHubTransportConnection).unregisterMultiplexedDevice(configThatFailedToRegister);
((AmqpsIotHubConnection) this.iotHubTransportConnection).unregisterMultiplexedDevice(configThatFailedToRegister, false);
}
}

Expand All @@ -856,7 +856,7 @@ public void unregisterMultiplexedDeviceClient(List<DeviceClientConfig> configs,
if (this.iotHubTransportConnection != null)
{
// Safe cast since amqps and amqps_ws always use this transport connection type.
((AmqpsIotHubConnection) this.iotHubTransportConnection).unregisterMultiplexedDevice(configToRegister);
((AmqpsIotHubConnection) this.iotHubTransportConnection).unregisterMultiplexedDevice(configToRegister, false);
}
else
{
Expand Down Expand Up @@ -1286,7 +1286,7 @@ private void singleDeviceReconnectAttemptAsync(String deviceId)
return;
}

((AmqpsIotHubConnection) this.iotHubTransportConnection).unregisterMultiplexedDevice(config);
((AmqpsIotHubConnection) this.iotHubTransportConnection).unregisterMultiplexedDevice(config, true);
((AmqpsIotHubConnection) this.iotHubTransportConnection).registerMultiplexedDevice(config);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,17 @@ public final class AmqpsIotHubConnection extends BaseHandler implements IotHubTr
// Proton-j primitives and wrappers for the device and authentication sessions
private Connection connection;
private Reactor reactor;
private final Queue<AmqpsSessionHandler> reconnectingDeviceSessionHandlers = new ConcurrentLinkedQueue<>();
private final Queue<AmqpsSessionHandler> sessionHandlers = new ConcurrentLinkedQueue<>();
private final Queue<AmqpsSasTokenRenewalHandler> sasTokenRenewalHandlers = new ConcurrentLinkedQueue<>();
private AmqpsCbsSessionHandler amqpsCbsSessionHandler;

// Multiplexed device registrations and un-registrations come from a non-reactor thread, so they get queued into these
// queues and are executed when onTimerTask checks them.
private final Set<DeviceClientConfig> multiplexingClientsToRegister;
private final Set<DeviceClientConfig> multiplexingClientsToUnregister;

// keys are the configs of the clients to unregister, values are the flag that determines if the session should be cached locally for re-use upon reconnection
private final Map<DeviceClientConfig, Boolean> multiplexingClientsToUnregister;

private final boolean isMultiplexing;

Expand All @@ -106,7 +109,7 @@ public AmqpsIotHubConnection(DeviceClientConfig config, boolean isMultiplexing)
// This allows us to create thread safe sets despite there being no such type default in Java 7 or 8
this.deviceClientConfigs = Collections.newSetFromMap(new ConcurrentHashMap<DeviceClientConfig, Boolean>());
this.multiplexingClientsToRegister = Collections.newSetFromMap(new ConcurrentHashMap<DeviceClientConfig, Boolean>());
this.multiplexingClientsToUnregister = Collections.newSetFromMap(new ConcurrentHashMap<DeviceClientConfig, Boolean>());
this.multiplexingClientsToUnregister = new ConcurrentHashMap<>();

this.deviceClientConfigs.add(config);

Expand Down Expand Up @@ -139,7 +142,7 @@ public AmqpsIotHubConnection(String hostName, boolean isWebsocketConnection, SSL
// This allows us to create thread safe sets despite there being no such type default in Java 7 or 8
this.deviceClientConfigs = Collections.newSetFromMap(new ConcurrentHashMap<DeviceClientConfig, Boolean>());
this.multiplexingClientsToRegister = Collections.newSetFromMap(new ConcurrentHashMap<DeviceClientConfig, Boolean>());
this.multiplexingClientsToUnregister = Collections.newSetFromMap(new ConcurrentHashMap<DeviceClientConfig, Boolean>());
this.multiplexingClientsToUnregister = new ConcurrentHashMap<>();

this.isWebsocketConnection = isWebsocketConnection;

Expand Down Expand Up @@ -172,14 +175,28 @@ public void registerMultiplexedDevice(DeviceClientConfig config)
deviceClientConfigs.add(config);
}

public void unregisterMultiplexedDevice(DeviceClientConfig config)
/**
* Asynchronously unregister a multiplexed device from an active multiplexed connection or synchronously unregister
* a multiplexed device from a closed multiplexed connection.
* @param config the config of the device that should be unregistered.
* @param willReconnect true if the device will be re-registered soon because it is reconnecting.
*/
public void unregisterMultiplexedDevice(DeviceClientConfig config, boolean willReconnect)
{
if (this.state == IotHubConnectionStatus.CONNECTED)
{
// session closing logic should be done from a proton reactor thread, not this thread. This queue gets polled
// onTimerTask so that this client gets unregistered on that thread instead.
log.trace("Queuing the unregistration of device {} from an active multiplexed connection", config.getDeviceId());
this.multiplexingClientsToUnregister.add(config);
if (willReconnect)
{
log.trace("Queuing the unregistration of device {} from an active multiplexed connection. The device will be re-registered for reconnection purposes.", config.getDeviceId());
}
else
{
log.trace("Queuing the unregistration of device {} from an active multiplexed connection", config.getDeviceId());
}

this.multiplexingClientsToUnregister.put(config, willReconnect);
}

deviceClientConfigs.remove(config);
Expand All @@ -197,7 +214,7 @@ public void open() throws TransportException
{
for (DeviceClientConfig clientConfig : deviceClientConfigs)
{
this.createSessionHandler(clientConfig);
this.addSessionHandler(clientConfig);
}

initializeStateLatches();
Expand Down Expand Up @@ -927,7 +944,7 @@ private void releaseDeviceSessionLatches()
}
}

private AmqpsSessionHandler createSessionHandler(DeviceClientConfig deviceClientConfig)
private AmqpsSessionHandler addSessionHandler(DeviceClientConfig deviceClientConfig)
{
// Check if the device session still exists from a previous connection
AmqpsSessionHandler amqpsSessionHandler = null;
Expand All @@ -940,14 +957,26 @@ private AmqpsSessionHandler createSessionHandler(DeviceClientConfig deviceClient
}
}

// If the device session was temporarily unregistered during reconnection, reuse the cached session handler
// since it holds the subscription information needed to fully reconnect all links that were open prior to reconnection.
for (AmqpsSessionHandler cachedDeviceSessionHandler : this.reconnectingDeviceSessionHandlers)
{
if (cachedDeviceSessionHandler.getDeviceId().equals(deviceClientConfig.getDeviceId()))
{
amqpsSessionHandler = cachedDeviceSessionHandler;
break;
}
}

// If the device session did not exist in the previous connection, or if there was no previous connection,
// create a new session
if (amqpsSessionHandler == null)
{
amqpsSessionHandler = new AmqpsSessionHandler(deviceClientConfig, this);
this.sessionHandlers.add(amqpsSessionHandler);
}

this.sessionHandlers.add(amqpsSessionHandler);

return amqpsSessionHandler;
}

Expand All @@ -960,7 +989,7 @@ private void checkForNewlyRegisteredMultiplexedClientsToStart()
Set<DeviceClientConfig> configsRegisteredSuccessfully = new HashSet<>();
while (configToRegister != null)
{
AmqpsSessionHandler amqpsSessionHandler = createSessionHandler(configToRegister);
AmqpsSessionHandler amqpsSessionHandler = addSessionHandler(configToRegister);

log.trace("Adding device session for device {} to an active connection", configToRegister.getDeviceId());
amqpsSessionHandler.setSession(this.connection.session());
Expand Down Expand Up @@ -993,7 +1022,7 @@ private void checkForNewlyRegisteredMultiplexedClientsToStart()
// can be opened on a reactor thread instead of from one of our threads.
private void checkForNewlyUnregisteredMultiplexedClientsToStop()
{
Iterator<DeviceClientConfig> configsToUnregisterIterator = this.multiplexingClientsToUnregister.iterator();
Iterator<DeviceClientConfig> configsToUnregisterIterator = this.multiplexingClientsToUnregister.keySet().iterator();
DeviceClientConfig configToUnregister = configsToUnregisterIterator.hasNext() ? configsToUnregisterIterator.next() : null;
Set<DeviceClientConfig> configsUnregisteredSuccessfully = new HashSet<>();
while (configToUnregister != null)
Expand All @@ -1019,6 +1048,19 @@ private void checkForNewlyUnregisteredMultiplexedClientsToStop()
log.trace("Removing session handler for device {}", amqpsSessionHandler.getDeviceId());
this.sessionHandlers.remove(amqpsSessionHandler);

// if the client being unregistered is doing so for reconnection purposes
boolean isSessionReconnecting = this.multiplexingClientsToUnregister.get(configToUnregister);
if (isSessionReconnecting)
{
// save the session handler for later since it has state for what subscriptions the device had before this reconnection
this.reconnectingDeviceSessionHandlers.add(amqpsSessionHandler);
}
else
{
// remove the cached session handler since the device is being unregistered manually if it is cached
this.reconnectingDeviceSessionHandlers.remove(amqpsSessionHandler);
}

// Need to find the sas token renewal handler that is tied to this device
AmqpsSasTokenRenewalHandler sasTokenRenewalHandlerToRemove = null;
for (AmqpsSasTokenRenewalHandler existingSasTokenRenewalHandler : this.sasTokenRenewalHandlers)
Expand Down Expand Up @@ -1049,7 +1091,11 @@ private void checkForNewlyUnregisteredMultiplexedClientsToStop()
configToUnregister = configsToUnregisterIterator.hasNext() ? configsToUnregisterIterator.next() : null;
}

this.multiplexingClientsToUnregister.removeAll(configsUnregisteredSuccessfully);
for (DeviceClientConfig successfullyUnregisteredConfig : configsUnregisteredSuccessfully)
{
this.multiplexingClientsToUnregister.remove(successfullyUnregisteredConfig);
}

this.deviceClientConfigs.removeAll(configsUnregisteredSuccessfully);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.microsoft.azure.sdk.iot.service.ServiceClient;
import com.microsoft.azure.sdk.iot.service.auth.AuthenticationType;
import com.microsoft.azure.sdk.iot.service.devicetwin.DeviceMethod;
import com.microsoft.azure.sdk.iot.service.devicetwin.DeviceMethodClientOptions;
import com.microsoft.azure.sdk.iot.service.devicetwin.DeviceTwin;
import com.microsoft.azure.sdk.iot.service.devicetwin.DeviceTwinClientOptions;
import com.microsoft.azure.sdk.iot.service.devicetwin.DeviceTwinDevice;
Expand Down Expand Up @@ -1110,8 +1111,8 @@ public void multiplexedConnectionRecoversFromDeviceSessionDropsSequential() thro
for (int i = 0; i < DEVICE_MULTIPLEX_COUNT; i++)
{
log.info("Starting loop for device {}", testInstance.deviceClientArray.get(i).getConfig().getDeviceId());
Message errorIjectionMessage = ErrorInjectionHelper.amqpsSessionDropErrorInjectionMessage(1, 10);
Success messageSendSuccess = testSendingMessageFromDeviceClient(testInstance.deviceClientArray.get(i), errorIjectionMessage);
Message errorInjectionMessage = ErrorInjectionHelper.amqpsSessionDropErrorInjectionMessage(1, 10);
Success messageSendSuccess = testSendingMessageFromDeviceClient(testInstance.deviceClientArray.get(i), errorInjectionMessage);
waitForMessageToBeAcknowledged(messageSendSuccess, "Timed out waiting for error injection message to be acknowledged");

// Now that error injection message has been sent, need to wait for the device session to drop
Expand Down Expand Up @@ -1164,8 +1165,8 @@ public void multiplexedConnectionRecoversFromDeviceSessionDropsParallel() throws
// For each multiplexed device, use fault injection to drop the session and see if it can recover, one device at a time
for (int i = 0; i < DEVICE_MULTIPLEX_COUNT; i++)
{
Message errorIjectionMessage = ErrorInjectionHelper.amqpsSessionDropErrorInjectionMessage(1, 10);
Success messageSendSuccess = testSendingMessageFromDeviceClient(testInstance.deviceClientArray.get(i), errorIjectionMessage);
Message errorInjectionMessage = ErrorInjectionHelper.amqpsSessionDropErrorInjectionMessage(1, 10);
Success messageSendSuccess = testSendingMessageFromDeviceClient(testInstance.deviceClientArray.get(i), errorInjectionMessage);
waitForMessageToBeAcknowledged(messageSendSuccess, "Timed out waiting for error injection message to be acknowledged");
}

Expand Down Expand Up @@ -1779,6 +1780,98 @@ public void failedRegistrationDoesNotAffectSubsequentRegistrations() throws Exce
testInstance.multiplexingClient.close();
}

// If a multiplexed device is subscribed to twin and/or methods, then loses its session due to network issues,
// it should still be subscribed to twin and/or methods after it finishes reconnection
@StandardTierHubOnlyTest
@ContinuousIntegrationTest
@Test
public void multiplexedSessionsRecoverTwinAndMethodSubscriptionsFromDeviceSessionDrops() throws Exception
{
testInstance.setup(DEVICE_MULTIPLEX_COUNT);
ConnectionStatusChangeTracker multiplexedConnectionStatusChangeTracker = new ConnectionStatusChangeTracker();
testInstance.multiplexingClient.registerConnectionStatusChangeCallback(multiplexedConnectionStatusChangeTracker, null);
ConnectionStatusChangeTracker[] connectionStatusChangeTrackers = new ConnectionStatusChangeTracker[DEVICE_MULTIPLEX_COUNT];

DeviceTwin deviceTwinServiceClient =
new DeviceTwin(iotHubConnectionString, DeviceTwinClientOptions.builder().httpReadTimeout(0).build());

DeviceMethod deviceMethodServiceClient =
new DeviceMethod(iotHubConnectionString, DeviceMethodClientOptions.builder().httpReadTimeout(0).build());

for (int i = 0; i < DEVICE_MULTIPLEX_COUNT; i++)
{
connectionStatusChangeTrackers[i] = new ConnectionStatusChangeTracker();
testInstance.deviceClientArray.get(i).registerConnectionStatusChangeCallback(connectionStatusChangeTrackers[i], null);
}

testInstance.multiplexingClient.open();

// Subscribe to methods for all multiplexed clients
DeviceMethodCallback[] deviceMethodCallbacks = new DeviceMethodCallback[DEVICE_MULTIPLEX_COUNT];
String[] expectedMethodNames = new String[DEVICE_MULTIPLEX_COUNT];
for (int i = 0; i < DEVICE_MULTIPLEX_COUNT; i++)
{
expectedMethodNames[i] = UUID.randomUUID().toString();
deviceMethodCallbacks[i] = new DeviceMethodCallback(expectedMethodNames[i]);
subscribeToDeviceMethod(testInstance.deviceClientArray.get(i), deviceMethodCallbacks[i]);
}

// Start twin for all multiplexed clients
String[] expectedPropertyKeys = new String[DEVICE_MULTIPLEX_COUNT];
String[] expectedPropertyValues = new String[DEVICE_MULTIPLEX_COUNT];
TwinPropertyCallBackImpl[] twinPropertyCallBacks = new TwinPropertyCallBackImpl[DEVICE_MULTIPLEX_COUNT];
for (int i = 0; i < DEVICE_MULTIPLEX_COUNT; i++)
{
expectedPropertyKeys[i] = UUID.randomUUID().toString();
expectedPropertyValues[i] = UUID.randomUUID().toString();
twinPropertyCallBacks[i] = new TwinPropertyCallBackImpl(expectedPropertyKeys[i], expectedPropertyValues[i]);
startTwin(testInstance.deviceClientArray.get(i), new EventCallback(IotHubStatusCode.OK), twinPropertyCallBacks[i]);
}

// For each multiplexed device, use fault injection to drop the session and see if it can recover, one device at a time
for (int i = 0; i < DEVICE_MULTIPLEX_COUNT; i++)
{
log.info("Starting loop for device {}", testInstance.deviceClientArray.get(i).getConfig().getDeviceId());
Message errorInjectionMessage = ErrorInjectionHelper.amqpsSessionDropErrorInjectionMessage(1, 10);
Success messageSendSuccess = testSendingMessageFromDeviceClient(testInstance.deviceClientArray.get(i), errorInjectionMessage);
waitForMessageToBeAcknowledged(messageSendSuccess, "Timed out waiting for error injection message to be acknowledged");

// Now that error injection message has been sent, need to wait for the device session to drop
assertConnectionStateCallbackFiredDisconnectedRetrying(connectionStatusChangeTrackers[i]);

// Next, the faulted device should eventually recover
log.info("Waiting for device {} to reconnect", testInstance.deviceClientArray.get(i).getConfig().getDeviceId());
assertConnectionStateCallbackFiredConnected(connectionStatusChangeTrackers[i], FAULT_INJECTION_RECOVERY_TIMEOUT_MILLIS);

for (int j = i + 1; j < DEVICE_MULTIPLEX_COUNT; j++)
{
// devices above index i have not been deliberately faulted yet, so make sure they haven't seen a DISCONNECTED_RETRYING event yet.
assertFalse("Multiplexed device that hasn't been deliberately faulted yet saw an unexpected DISCONNECTED_RETRYING connection status callback", connectionStatusChangeTrackers[j].wentDisconnectedRetrying);
}
}

for (int i = 0; i < DEVICE_MULTIPLEX_COUNT; i++)
{
// test d2c telemetry
testSendingMessagesFromMultiplexedClients(testInstance.deviceClientArray);

// test receiving direct methods
testDeviceMethod(deviceMethodServiceClient, testInstance.deviceIdentityArray.get(i).getDeviceId(), expectedMethodNames[i], deviceMethodCallbacks[i]);

// Send desired property update to multiplexed device
testDesiredPropertiesFlow(testInstance.deviceClientArray.get(i), deviceTwinServiceClient, twinPropertyCallBacks[i], expectedPropertyKeys[i], expectedPropertyValues[i]);

// Testing sending reported properties
testReportedPropertiesFlow(testInstance.deviceClientArray.get(i), deviceTwinServiceClient, expectedPropertyKeys[i], expectedPropertyValues[i]);
}

assertFalse(multiplexedConnectionStatusChangeTracker.wentDisconnectedRetrying);

testInstance.multiplexingClient.close();

assertMultiplexedDevicesClosedGracefully(connectionStatusChangeTrackers);
}

private static void assertMultiplexedDevicesClosedGracefully(ConnectionStatusChangeTracker[] connectionStatusChangeTrackers)
{
for (ConnectionStatusChangeTracker connectionStatusChangeTracker : connectionStatusChangeTrackers)
Expand Down

0 comments on commit faba83a

Please sign in to comment.