Skip to content

Commit

Permalink
Merge pull request #204 from tusharmath/fix-observable-ctor
Browse files Browse the repository at this point in the history
Fix observable ctor
  • Loading branch information
tusharmath authored Aug 30, 2017
2 parents c666c55 + 93b6a0f commit 865b2ee
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 42 deletions.
2 changes: 1 addition & 1 deletion benchmarks/bm.create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
* Created by tushar on 09/12/16.
*/
import {Suite} from 'benchmark'
import {Observable} from '../src/lib/Observable'
import {IObserver} from '../src/lib/Observer'
import {Observable} from '../src/sources/Create'
import {IDeferred, run} from './lib'

function subscriber(observer: IObserver<number>) {
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"main": "./src/main",
"typings": "./src/main.d.ts",
"scripts": {
"benchmark": "tsc && node ./benchmarks/run",
"benchmark": "tsc; node ./benchmarks/run",
"cleanup": "find ./src -type f -name '*.js' -delete && find ./src -type f -name '*.map' -delete",
"coverage": "nyc npm test && nyc report --reporter=text-lcov | coveralls",
"hydra": "node --trace-hydrogen --trace-phase=Z --trace-deopt --code-comments --hydrogen-track-positions --redirect-code-traces --redirect-code-traces-to=code.asm ./benchmarks/run",
Expand Down
25 changes: 24 additions & 1 deletion src/lib/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,31 @@
import {IObservable} from './Observable'
import {IObserver} from './Observer'
import {IScheduler} from './Scheduler'
import {ISubscription} from './Subscription'
import {ISubscriberFunction} from './SubscriberFunction'
import {
CompositeSubscription,
ISubscription,
Subscription
} from './Subscription'

export interface IObservable<T> {
subscribe(observer: IObserver<T>, scheduler: IScheduler): ISubscription
}

export class Observable<T> implements IObservable<T> {
constructor(private f: ISubscriberFunction<T>) {}

run(
cSub: CompositeSubscription,
observer: IObserver<T>,
scheduler: IScheduler
) {
cSub.add(new Subscription(this.f(observer, scheduler)))
}

subscribe(observer: IObserver<T>, scheduler: IScheduler): ISubscription {
const cSub = new CompositeSubscription()
cSub.add(scheduler.asap(this.run.bind(this, cSub, observer, scheduler)))
return cSub
}
}
7 changes: 3 additions & 4 deletions src/lib/SubscriberFunction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ import {IObserver} from './Observer'
import {IScheduler} from './Scheduler'
import {ISubscription} from './Subscription'

export type SubscriberFunctionReturnType = ISubscription | void | (() => void)

export interface ISubscriberFunction<T> {
(observer: IObserver<T>, scheduler: IScheduler):
| ISubscription
| void
| (() => void)
(observer: IObserver<T>, scheduler: IScheduler): SubscriberFunctionReturnType
}
27 changes: 8 additions & 19 deletions src/lib/Subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,26 @@
*/

import {LinkedList, LinkedListNode} from './LinkedList'
import {SubscriberFunctionReturnType} from './SubscriberFunction'
import {ISubscription} from './Subscription'

export interface ISubscription {
unsubscribe(): void
readonly closed: boolean
}

export function isSubscription(subscription: ISubscription) {
return (
subscription instanceof BaseSubscription ||
(subscription && typeof subscription.unsubscribe === 'function')
)
}
export class Subscription implements ISubscription {
closed: boolean = false

export class BaseSubscription implements ISubscription {
constructor(private f: (() => void), public closed = false) {}
constructor(private dispose: SubscriberFunctionReturnType) {}

unsubscribe(): void {
this.f()
if (this.dispose) {
if (typeof this.dispose === 'function') this.dispose()
else if ('unsubscribe' in this.dispose) this.dispose.unsubscribe()
}
this.closed = true
}

static from(subscription: ISubscription | {(): void} | void): ISubscription {
if (isSubscription(subscription as ISubscription))
return subscription as ISubscription

if (typeof subscription === 'function')
return new BaseSubscription(subscription as {(): void})

return new BaseSubscription(() => undefined)
}
}

export class CompositeSubscription implements ISubscription {
Expand Down
3 changes: 2 additions & 1 deletion src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
* Created by tushar on 17/02/17.
*/

export {Observable, empty, just, never} from './sources/Create'
export {delay} from './operators/Delay'
export {empty, just, never} from './sources/Create'
export {filter} from './operators/Filter'
export {flatMap} from './operators/Join'
export {forEach} from './lib/ForEach'
Expand All @@ -22,6 +22,7 @@ export {mapTo} from './operators/Map'
export {map} from './operators/Map'
export {merge} from './operators/Merge'
export {multicast} from './operators/Multicast'
export {Observable} from './lib/Observable'
export {reduce} from './operators/Reduce'
export {sample} from './operators/Sample'
export {scan} from './operators/Scan'
Expand Down
11 changes: 1 addition & 10 deletions src/sources/Create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,7 @@
import {IObservable} from '../lib/Observable'
import {IObserver} from '../lib/Observer'
import {IScheduler} from '../lib/Scheduler'
import {ISubscriberFunction} from '../lib/SubscriberFunction'
import {BaseSubscription, ISubscription} from '../lib/Subscription'

export class Observable<T> implements IObservable<T> {
constructor(private f: ISubscriberFunction<T>) {}

subscribe(observer: IObserver<T>, scheduler: IScheduler): ISubscription {
return BaseSubscription.from(this.f(observer, scheduler))
}
}
import {ISubscription} from '../lib/Subscription'

class JustObservable<T> implements IObservable<T> {
constructor(private val: T) {}
Expand Down
3 changes: 1 addition & 2 deletions src/sources/FromPromise.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
/**
* Created by tushar.mathur on 16/10/16.
*/
import {IObservable} from '../lib/Observable'
import {IObservable, Observable} from '../lib/Observable'
import {IObserver} from '../lib/Observer'
import {Observable} from './Create'

export function onResult<T>(observer: IObserver<T>, result: T) {
observer.next(result)
Expand Down
32 changes: 29 additions & 3 deletions test/test.Create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
* Created by tushar on 08/12/16.
*/
import test from 'ava'
import {Observable} from '../src/sources/Create'
import {Observable} from '../src/lib/Observable'
import {slice} from '../src/main'
import {EVENT} from '../src/testing/Events'
import {TestScheduler} from '../src/testing/TestScheduler'

test(t => {
const sh = TestScheduler.of()
const {results} = sh.start(() => new Observable(ob => ob.next('A')))
t.deepEqual(results, [EVENT.next(200, 'A')])
t.deepEqual(results, [EVENT.next(201, 'A')])
})

test(t => {
Expand All @@ -20,5 +21,30 @@ test(t => {
sh.delay(() => ob.next('A'), 15)
})
)
t.deepEqual(results, [EVENT.next(215, 'A')])
t.deepEqual(results, [EVENT.next(216, 'A')])
})

test('should unsubscribe', t => {
const sh = TestScheduler.of()
const actual = sh.start(() =>
slice(
0,
3,
new Observable(ob => {
let done = false
for (let i = 0; i < 10 && !done; i++) ob.next(i)
ob.complete()
return () => (done = true)
})
)
).results

const expected = [
EVENT.next(201, 0),
EVENT.next(201, 1),
EVENT.next(201, 2),
EVENT.complete(201)
]

t.deepEqual(actual, expected)
})

0 comments on commit 865b2ee

Please sign in to comment.