diff --git a/benchmarks/bm.create.ts b/benchmarks/bm.create.ts index acd7008..26d5cc9 100644 --- a/benchmarks/bm.create.ts +++ b/benchmarks/bm.create.ts @@ -2,8 +2,8 @@ * Created by tushar on 09/12/16. */ import {Suite} from 'benchmark' +import {Observable} from '../src/lib/Observable' import {IObserver} from '../src/lib/Observer' -import {Observable} from '../src/sources/Create' import {IDeferred, run} from './lib' function subscriber(observer: IObserver) { diff --git a/package.json b/package.json index 2fb9670..c928885 100644 --- a/package.json +++ b/package.json @@ -4,7 +4,7 @@ "main": "./src/main", "typings": "./src/main.d.ts", "scripts": { - "benchmark": "tsc && node ./benchmarks/run", + "benchmark": "tsc; node ./benchmarks/run", "cleanup": "find ./src -type f -name '*.js' -delete && find ./src -type f -name '*.map' -delete", "coverage": "nyc npm test && nyc report --reporter=text-lcov | coveralls", "hydra": "node --trace-hydrogen --trace-phase=Z --trace-deopt --code-comments --hydrogen-track-positions --redirect-code-traces --redirect-code-traces-to=code.asm ./benchmarks/run", diff --git a/src/lib/Observable.ts b/src/lib/Observable.ts index 5cbbcea..2061161 100644 --- a/src/lib/Observable.ts +++ b/src/lib/Observable.ts @@ -4,8 +4,31 @@ import {IObservable} from './Observable' import {IObserver} from './Observer' import {IScheduler} from './Scheduler' -import {ISubscription} from './Subscription' +import {ISubscriberFunction} from './SubscriberFunction' +import { + CompositeSubscription, + ISubscription, + Subscription +} from './Subscription' export interface IObservable { subscribe(observer: IObserver, scheduler: IScheduler): ISubscription } + +export class Observable implements IObservable { + constructor(private f: ISubscriberFunction) {} + + run( + cSub: CompositeSubscription, + observer: IObserver, + scheduler: IScheduler + ) { + cSub.add(new Subscription(this.f(observer, scheduler))) + } + + subscribe(observer: IObserver, scheduler: IScheduler): ISubscription { + const cSub = new CompositeSubscription() + cSub.add(scheduler.asap(this.run.bind(this, cSub, observer, scheduler))) + return cSub + } +} diff --git a/src/lib/SubscriberFunction.ts b/src/lib/SubscriberFunction.ts index 202035a..eb18edc 100644 --- a/src/lib/SubscriberFunction.ts +++ b/src/lib/SubscriberFunction.ts @@ -6,9 +6,8 @@ import {IObserver} from './Observer' import {IScheduler} from './Scheduler' import {ISubscription} from './Subscription' +export type SubscriberFunctionReturnType = ISubscription | void | (() => void) + export interface ISubscriberFunction { - (observer: IObserver, scheduler: IScheduler): - | ISubscription - | void - | (() => void) + (observer: IObserver, scheduler: IScheduler): SubscriberFunctionReturnType } diff --git a/src/lib/Subscription.ts b/src/lib/Subscription.ts index d879634..447f796 100644 --- a/src/lib/Subscription.ts +++ b/src/lib/Subscription.ts @@ -3,6 +3,7 @@ */ import {LinkedList, LinkedListNode} from './LinkedList' +import {SubscriberFunctionReturnType} from './SubscriberFunction' import {ISubscription} from './Subscription' export interface ISubscription { @@ -10,30 +11,18 @@ export interface ISubscription { readonly closed: boolean } -export function isSubscription(subscription: ISubscription) { - return ( - subscription instanceof BaseSubscription || - (subscription && typeof subscription.unsubscribe === 'function') - ) -} +export class Subscription implements ISubscription { + closed: boolean = false -export class BaseSubscription implements ISubscription { - constructor(private f: (() => void), public closed = false) {} + constructor(private dispose: SubscriberFunctionReturnType) {} unsubscribe(): void { - this.f() + if (this.dispose) { + if (typeof this.dispose === 'function') this.dispose() + else if ('unsubscribe' in this.dispose) this.dispose.unsubscribe() + } this.closed = true } - - static from(subscription: ISubscription | {(): void} | void): ISubscription { - if (isSubscription(subscription as ISubscription)) - return subscription as ISubscription - - if (typeof subscription === 'function') - return new BaseSubscription(subscription as {(): void}) - - return new BaseSubscription(() => undefined) - } } export class CompositeSubscription implements ISubscription { diff --git a/src/main.ts b/src/main.ts index c438a6b..df40302 100644 --- a/src/main.ts +++ b/src/main.ts @@ -2,8 +2,8 @@ * Created by tushar on 17/02/17. */ -export {Observable, empty, just, never} from './sources/Create' export {delay} from './operators/Delay' +export {empty, just, never} from './sources/Create' export {filter} from './operators/Filter' export {flatMap} from './operators/Join' export {forEach} from './lib/ForEach' @@ -22,6 +22,7 @@ export {mapTo} from './operators/Map' export {map} from './operators/Map' export {merge} from './operators/Merge' export {multicast} from './operators/Multicast' +export {Observable} from './lib/Observable' export {reduce} from './operators/Reduce' export {sample} from './operators/Sample' export {scan} from './operators/Scan' diff --git a/src/sources/Create.ts b/src/sources/Create.ts index da7a521..b4d1bc5 100644 --- a/src/sources/Create.ts +++ b/src/sources/Create.ts @@ -4,16 +4,7 @@ import {IObservable} from '../lib/Observable' import {IObserver} from '../lib/Observer' import {IScheduler} from '../lib/Scheduler' -import {ISubscriberFunction} from '../lib/SubscriberFunction' -import {BaseSubscription, ISubscription} from '../lib/Subscription' - -export class Observable implements IObservable { - constructor(private f: ISubscriberFunction) {} - - subscribe(observer: IObserver, scheduler: IScheduler): ISubscription { - return BaseSubscription.from(this.f(observer, scheduler)) - } -} +import {ISubscription} from '../lib/Subscription' class JustObservable implements IObservable { constructor(private val: T) {} diff --git a/src/sources/FromPromise.ts b/src/sources/FromPromise.ts index 399c119..6fe7f4e 100644 --- a/src/sources/FromPromise.ts +++ b/src/sources/FromPromise.ts @@ -1,9 +1,8 @@ /** * Created by tushar.mathur on 16/10/16. */ -import {IObservable} from '../lib/Observable' +import {IObservable, Observable} from '../lib/Observable' import {IObserver} from '../lib/Observer' -import {Observable} from './Create' export function onResult(observer: IObserver, result: T) { observer.next(result) diff --git a/test/test.Create.ts b/test/test.Create.ts index 6546d18..3736e97 100644 --- a/test/test.Create.ts +++ b/test/test.Create.ts @@ -2,14 +2,15 @@ * Created by tushar on 08/12/16. */ import test from 'ava' -import {Observable} from '../src/sources/Create' +import {Observable} from '../src/lib/Observable' +import {slice} from '../src/main' import {EVENT} from '../src/testing/Events' import {TestScheduler} from '../src/testing/TestScheduler' test(t => { const sh = TestScheduler.of() const {results} = sh.start(() => new Observable(ob => ob.next('A'))) - t.deepEqual(results, [EVENT.next(200, 'A')]) + t.deepEqual(results, [EVENT.next(201, 'A')]) }) test(t => { @@ -20,5 +21,30 @@ test(t => { sh.delay(() => ob.next('A'), 15) }) ) - t.deepEqual(results, [EVENT.next(215, 'A')]) + t.deepEqual(results, [EVENT.next(216, 'A')]) +}) + +test('should unsubscribe', t => { + const sh = TestScheduler.of() + const actual = sh.start(() => + slice( + 0, + 3, + new Observable(ob => { + let done = false + for (let i = 0; i < 10 && !done; i++) ob.next(i) + ob.complete() + return () => (done = true) + }) + ) + ).results + + const expected = [ + EVENT.next(201, 0), + EVENT.next(201, 1), + EVENT.next(201, 2), + EVENT.complete(201) + ] + + t.deepEqual(actual, expected) })