diff --git a/src/rx.ts b/src/rx.ts index 1a63fba..96c6f05 100644 --- a/src/rx.ts +++ b/src/rx.ts @@ -14,7 +14,7 @@ import {Request, Response, ResponseError, ResponseSuccess, ResponseSuccessType} let id = 0; export default class JsonRpcRxClient { - isReady$: Subject; + isReady$: Subject; message$: Subject | Request>; //@ts-ignore @@ -30,6 +30,7 @@ export default class JsonRpcRxClient { send(method: string, params?: any): Observable { const req: Request = {jsonrpc: '2.0', id: id++, method, params}; return this.isReady$.pipe( + filter(r => r), take(1), switchMap(() => { this._ws.send(JSON.stringify(req)); @@ -58,7 +59,7 @@ export default class JsonRpcRxClient { this._ws = new WebSocket(this.address); this._ws.onclose = this.onSocketClose; - this._ws.onopen = () => this.isReady$.next(this); + this._ws.onopen = () => this.isReady$.next(true); fromEvent(this._ws, 'message').pipe( map(({data}: any) => JSON.parse(data)) ).subscribe(message => this.message$.next(message)); @@ -71,6 +72,7 @@ export default class JsonRpcRxClient { if (this.isDestroyed) return; console.error(`disconnected from ${this.address} code: '${event.code}' reason: '${event.reason}'`); + this.isReady$.next(false); setTimeout((): void => { this.connect(); }, 1000);