diff --git a/src/RealTimeClient.js b/src/RealTimeClient.js index e195b13..8519f79 100644 --- a/src/RealTimeClient.js +++ b/src/RealTimeClient.js @@ -62,6 +62,7 @@ class RealTimeClient { this.onConnectionOpened = this.onConnectionOpened.bind(this); this.onMessageReceived = this.onMessageReceived.bind(this); this.sendMessage = this.sendMessage.bind(this); + this.queueAndRemoveSubscriptions = this.queueAndRemoveSubscriptions.bind(this); } /** @@ -120,37 +121,16 @@ class RealTimeClient { reconnectOnClose = false, reconnectTimeout = 0, } = this.options; - if (reconnectOnClose) { + if (!this.isInReconnectLoop) { + this.isInReconnectLoop = true; + this.queueAndRemoveSubscriptions(x => x.openRequests); + this.queueAndRemoveSubscriptions(x => x.subscriptions); + } this.initializing = false; // the WebSocket API doesn't offer a way to re-open a closed connection. // remove our connection and create a new one. - const reconnect = () => { - this.createConnection(() => Object.keys(this.subscriptions) - .map(subscriptionId => ({ - ...this.subscriptions[subscriptionId], - subscriptionId, - })) - .forEach(({ - subscriptionId, - handler, - messageCreator, - mutableSubscriptionContainer, - subscriptionEndRejecter, - subscriptionEndResolver, - }) => { - this.sendStartSubscriptionMessage( - messageCreator, - handler, - mutableSubscriptionContainer, - () => { }, - () => new Error('Could not resume subscription'), - subscriptionEndResolver, - subscriptionEndRejecter, - ); - delete this.subscriptions[subscriptionId]; - })); - }; + const reconnect = () => this.createConnection(); this.timeoutHandles.push(setTimeout(reconnect, reconnectTimeout)); } } @@ -162,6 +142,9 @@ class RealTimeClient { */ onConnectionOpened() { this.initializing = false; + if (this.isInReconnectLoop) { + delete this.isInReconnectLoop; + } this.sendAuthentication() .then(() => { this.queuedMessages.forEach((queuedMessage) => { @@ -407,6 +390,38 @@ class RealTimeClient { removeHeartbeatHandler(heartbeatHandler) { this.heartbeatHandlers.delete(heartbeatHandler); } + + queueAndRemoveSubscriptions(mutableBucketSelector) { + const mutableBucket = mutableBucketSelector(this); + const defaultSubscriptionStartResolver = () => {}; + const defaultSubscriptionStartRejector = () => new Error('Could not resume subscription'); + Object.keys(mutableBucket) + .map(id => ({ + ...mutableBucket[id], + id, + })) + .forEach(({ + id, + handler, + messageCreator, + mutableSubscriptionContainer, + subscriptionStartResolver, + subscriptionStartRejecter, + subscriptionEndRejecter, + subscriptionEndResolver, + }) => { + this.sendStartSubscriptionMessage( + messageCreator, + handler, + mutableSubscriptionContainer, + subscriptionStartResolver || defaultSubscriptionStartResolver, + subscriptionStartRejecter || defaultSubscriptionStartRejector, + subscriptionEndResolver, + subscriptionEndRejecter, + ); + delete mutableBucket[id]; + }); + } } export default RealTimeClient;