Skip to content

Commit

Permalink
Merge pull request #773 from kamalbennani/impr/expose-switch-cluster-…
Browse files Browse the repository at this point in the history
…func

Expose a way to switch from one cluster/app to the other
  • Loading branch information
MeenaAlfons authored Nov 29, 2023
2 parents ed900f8 + 83f4db8 commit 32d4a23
Show file tree
Hide file tree
Showing 10 changed files with 124 additions and 8 deletions.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions spec/javascripts/helpers/mocks.js
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ var Mocks = {
manager.disconnect = jasmine.createSpy("disconnect");
manager.send_event = jasmine.createSpy("send_event");
manager.isUsingTLS = jasmine.createSpy("isUsingTLS").and.returnValue(false);
manager.switchCluster = jasmine.createSpy("switchCluster");
return manager;
},

Expand Down
43 changes: 43 additions & 0 deletions spec/javascripts/unit/core/connection/connection_manager_spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,49 @@ describe("ConnectionManager", function() {
});
});

describe("#switchCluster", function() {
it("should update cluster key", function() {
expect(manager.key).toEqual("foo");
manager.switchCluster("bar");
expect(manager.key).toEqual("bar");
});

it("should re-build the strategy", function() {
expect(managerOptions.getStrategy.calls.count()).toEqual(1);
manager.switchCluster("bar");
expect(managerOptions.getStrategy.calls.count()).toEqual(2);
expect(managerOptions.getStrategy).toHaveBeenCalledWith({
key: "bar",
useTLS: false,
timeline: timeline
});
});

it("should try to connect using the strategy", function() {
manager.switchCluster("bar");
// connection is retried with a zero delay
jasmine.clock().tick(0);
expect(strategy.connect).toHaveBeenCalled();
});

it("should transition to connecting", function() {
var onConnecting = jasmine.createSpy("onConnecting");
var onStateChange = jasmine.createSpy("onStateChange");
manager.bind("connecting", onConnecting);
manager.bind("state_change", onStateChange);

manager.switchCluster("bar");// connection is retried with a zero delay
jasmine.clock().tick(0);

expect(manager.state).toEqual("connecting");
expect(onConnecting).toHaveBeenCalled();
expect(onStateChange).toHaveBeenCalledWith({
previous: "initialized",
current: "connecting"
});
});
});

describe("before establishing a connection", function() {
beforeEach(function() {
manager.connect();
Expand Down
41 changes: 41 additions & 0 deletions spec/javascripts/unit/core/pusher_spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,47 @@ describe("Pusher", function() {
});
});

describe("switch cluster", function() {
var pusher;
var subscribedChannels

beforeEach(function() {
pusher = new Pusher("foo", {cluster: "mt1"});

subscribedChannels = {
channel1: pusher.subscribe("channel1"),
channel2: pusher.subscribe("channel2")
};

pusher.connect();
pusher.connection.state = "connected";
pusher.connection.emit("connected");
});

it("should resubscribe to all channels", function() {
expect(subscribedChannels.channel1.subscribe).toHaveBeenCalledTimes(1);
expect(subscribedChannels.channel2.subscribe).toHaveBeenCalledTimes(1);

pusher.switchCluster({ appKey: 'bar', cluster: 'us3' });
pusher.connect();
pusher.connection.state = 'connected';
pusher.connection.emit('connected');

expect(subscribedChannels.channel1.subscribe).toHaveBeenCalledTimes(2);
expect(subscribedChannels.channel2.subscribe).toHaveBeenCalledTimes(2);
});

it("should send events via the connection manager", function() {
pusher.switchCluster({ appKey: 'bar', cluster: 'us3' });
pusher.send_event("event", { key: "value" }, "channel");
expect(pusher.connection.send_event).toHaveBeenCalledWith(
"event",
{ key: "value" },
"channel"
);
});
})

describe("#unsubscribe", function() {
it("should unsubscribe the channel if subscription is not pending", function() {
var channel = pusher.subscribe("yyy");
Expand Down
9 changes: 9 additions & 0 deletions src/core/connection/connection_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@ export default class ConnectionManager extends EventsDispatcher {
this.updateStrategy();
}

switchCluster(key: string) {
this.key = key;
// This ensures that the new config coming from
// pusher instance are taken into account
// such as appKey and cluster
this.updateStrategy();
this.retryIn(0);
}

/** Establishes a connection to Pusher.
*
* Does nothing when connection is already established. See top-level doc
Expand Down
5 changes: 5 additions & 0 deletions src/core/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ export interface Options {
wssPort?: number;
}

export interface ClusterOptions {
appKey: string;
cluster: string;
}

export function validateOptions(options) {
if (options == null) {
throw 'You must pass an options object';
Expand Down
20 changes: 15 additions & 5 deletions src/core/pusher.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import AbstractRuntime from '../runtimes/interface';
import Runtime from 'runtime';
import Util from './util';
import * as Collections from './utils/collections';
import Channels from './channels/channels';
import Channel from './channels/channel';
Expand All @@ -10,14 +9,11 @@ import TimelineSender from './timeline/timeline_sender';
import TimelineLevel from './timeline/level';
import { defineTransport } from './strategies/strategy_builder';
import ConnectionManager from './connection/connection_manager';
import ConnectionManagerOptions from './connection/connection_manager_options';
import { PeriodicTimer } from './utils/timers';
import Defaults from './defaults';
import * as DefaultConfig from './config';
import Logger from './logger';
import Factory from './utils/factory';
import UrlStore from 'core/utils/url_store';
import { Options, validateOptions } from './options';
import { Options, ClusterOptions, validateOptions } from './options';
import { Config, getConfig } from './config';
import StrategyOptions from './strategies/strategy_options';
import UserFacade from './user';
Expand Down Expand Up @@ -53,6 +49,7 @@ export default class Pusher {

/* INSTANCE PROPERTIES */
key: string;
options: Options;
config: Config;
channels: Channels;
global_emitter: EventsDispatcher;
Expand Down Expand Up @@ -141,6 +138,19 @@ export default class Pusher {
}
}

/**
* Allows you to switch Pusher cluster without
* losing all the channels/subscription binding
* as this is internally managed by the SDK.
*/
switchCluster(options: ClusterOptions) {
const { appKey, cluster } = options;
this.key = appKey;
this.options = { ...this.options, cluster };
this.config = getConfig(this.options, this);
this.connection.switchCluster(this.key);
}

channel(name: string): Channel {
return this.channels.find(name);
}
Expand Down
1 change: 1 addition & 0 deletions types/src/core/connection/connection_manager.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ export default class ConnectionManager extends EventsDispatcher {
handshakeCallbacks: HandshakeCallbacks;
connectionCallbacks: ConnectionCallbacks;
constructor(key: string, options: ConnectionManagerOptions);
switchCluster(key: string): void;
connect(): void;
send(data: any): boolean;
send_event(name: string, data: any, channel?: string): boolean;
Expand Down
4 changes: 4 additions & 0 deletions types/src/core/options.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,8 @@ export interface Options {
wsPort?: number;
wssPort?: number;
}
export interface ClusterOptions {
appKey: string;
cluster: string;
}
export declare function validateOptions(options: any): void;
4 changes: 3 additions & 1 deletion types/src/core/pusher.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import Timeline from './timeline/timeline';
import TimelineSender from './timeline/timeline_sender';
import ConnectionManager from './connection/connection_manager';
import { PeriodicTimer } from './utils/timers';
import { Options } from './options';
import { Options, ClusterOptions } from './options';
import { Config } from './config';
import UserFacade from './user';
export default class Pusher {
Expand All @@ -21,6 +21,7 @@ export default class Pusher {
static log: (message: any) => void;
private static getClientFeatures;
key: string;
options: Options;
config: Config;
channels: Channels;
global_emitter: EventsDispatcher;
Expand All @@ -31,6 +32,7 @@ export default class Pusher {
timelineSenderTimer: PeriodicTimer;
user: UserFacade;
constructor(app_key: string, options: Options);
switchCluster(options: ClusterOptions): void;
channel(name: string): Channel;
allChannels(): Channel[];
connect(): void;
Expand Down

0 comments on commit 32d4a23

Please sign in to comment.