Skip to content

Commit

Permalink
fix(reconnect): Also reconnect pending connections. (#65)
Browse files Browse the repository at this point in the history
Commit 28bf0d introduced a feature where active subscriptions made via the
real-time client would resubscribe upon a disconnect and reconnect.

This could pose a problem if Track API was having errors connecting to its
backing services, because the client might get into a state where only some of
the subscriptions it tries to restart will actually restart.  Then, on the next
disconnect/reconnect, it will only restart the ones that started successfully.

This PR makes sure that subscriptions that were pending at the time of the
disconnect are also restarted.

Signed-off-by: Jeff Cuevas-Koch <[email protected]>
  • Loading branch information
cuevaskoch authored Oct 14, 2019
1 parent d8bf0d2 commit 4b2fce1
Showing 1 changed file with 42 additions and 27 deletions.
69 changes: 42 additions & 27 deletions src/RealTimeClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class RealTimeClient {
this.onConnectionOpened = this.onConnectionOpened.bind(this);
this.onMessageReceived = this.onMessageReceived.bind(this);
this.sendMessage = this.sendMessage.bind(this);
this.queueAndRemoveSubscriptions = this.queueAndRemoveSubscriptions.bind(this);
}

/**
Expand Down Expand Up @@ -120,37 +121,16 @@ class RealTimeClient {
reconnectOnClose = false,
reconnectTimeout = 0,
} = this.options;

if (reconnectOnClose) {
if (!this.isInReconnectLoop) {
this.isInReconnectLoop = true;
this.queueAndRemoveSubscriptions(x => x.openRequests);
this.queueAndRemoveSubscriptions(x => x.subscriptions);
}
this.initializing = false;
// the WebSocket API doesn't offer a way to re-open a closed connection.
// remove our connection and create a new one.
const reconnect = () => {
this.createConnection(() => Object.keys(this.subscriptions)
.map(subscriptionId => ({
...this.subscriptions[subscriptionId],
subscriptionId,
}))
.forEach(({
subscriptionId,
handler,
messageCreator,
mutableSubscriptionContainer,
subscriptionEndRejecter,
subscriptionEndResolver,
}) => {
this.sendStartSubscriptionMessage(
messageCreator,
handler,
mutableSubscriptionContainer,
() => { },
() => new Error('Could not resume subscription'),
subscriptionEndResolver,
subscriptionEndRejecter,
);
delete this.subscriptions[subscriptionId];
}));
};
const reconnect = () => this.createConnection();
this.timeoutHandles.push(setTimeout(reconnect, reconnectTimeout));
}
}
Expand All @@ -162,6 +142,9 @@ class RealTimeClient {
*/
onConnectionOpened() {
this.initializing = false;
if (this.isInReconnectLoop) {
delete this.isInReconnectLoop;
}
this.sendAuthentication()
.then(() => {
this.queuedMessages.forEach((queuedMessage) => {
Expand Down Expand Up @@ -407,6 +390,38 @@ class RealTimeClient {
removeHeartbeatHandler(heartbeatHandler) {
this.heartbeatHandlers.delete(heartbeatHandler);
}

queueAndRemoveSubscriptions(mutableBucketSelector) {
const mutableBucket = mutableBucketSelector(this);
const defaultSubscriptionStartResolver = () => {};
const defaultSubscriptionStartRejector = () => new Error('Could not resume subscription');
Object.keys(mutableBucket)
.map(id => ({
...mutableBucket[id],
id,
}))
.forEach(({
id,
handler,
messageCreator,
mutableSubscriptionContainer,
subscriptionStartResolver,
subscriptionStartRejecter,
subscriptionEndRejecter,
subscriptionEndResolver,
}) => {
this.sendStartSubscriptionMessage(
messageCreator,
handler,
mutableSubscriptionContainer,
subscriptionStartResolver || defaultSubscriptionStartResolver,
subscriptionStartRejecter || defaultSubscriptionStartRejector,
subscriptionEndResolver,
subscriptionEndRejecter,
);
delete mutableBucket[id];
});
}
}

export default RealTimeClient;

0 comments on commit 4b2fce1

Please sign in to comment.