diff --git a/src/operators/Join.ts b/src/operators/Join.ts new file mode 100644 index 0000000..d971681 --- /dev/null +++ b/src/operators/Join.ts @@ -0,0 +1,86 @@ +/** + * Created by tushar.mathur on 10/10/16. + */ + + +import {IObservable} from '../types/core/IObservable'; +import {IObserver} from '../types/core/IObserver'; +import {IScheduler} from '../types/IScheduler'; +import {ISubscription} from '../types/core/ISubscription'; +import {CompositeSubscription} from '../testing/Subscription'; + + +export class JoinValueObserver implements IObserver { + constructor (private sink: IObserver, private root: JoinObserver) { + } + + next (val: T): void { + this.sink.next(val) + } + + error (err: Error): void { + this.sink.error(err) + } + + complete (): void { + this.root.subscriptionCompleted() + } + +} + +export class JoinObserver implements IObserver> { + private count: number; + private sourceCompleted: boolean; + + constructor (private sink: IObserver, private scheduler: IScheduler, private subscriptions: CompositeSubscription) { + this.sourceCompleted = false + this.count = 0 + } + + subscriptionCompleted () { + this.count-- + this.completeSink() + } + + completeSink () { + if (this.sourceCompleted && this.count === 0) { + this.sink.complete() + } + } + + + next (val: IObservable): void { + const joinValueObserver = new JoinValueObserver(this.sink, this); + this.count++ + this.subscriptions.add( + val.subscribe(joinValueObserver, this.scheduler) + ) + } + + error (err: Error): void { + this.sink.error(err) + } + + complete (): void { + this.sourceCompleted = true + this.completeSink() + } +} + + +export class JoinObservable implements IObservable { + constructor (private source: IObservable>) { + } + + subscribe (observer: IObserver, scheduler: IScheduler): ISubscription { + const subscription = new CompositeSubscription() + subscription.add( + this.source.subscribe(new JoinObserver(observer, scheduler, subscription), scheduler) + ) + return subscription + } +} + +export function join (source: IObservable>) { + return new JoinObservable(source) +} diff --git a/test/test.JoinObservable.ts b/test/test.JoinObservable.ts new file mode 100644 index 0000000..405ea29 --- /dev/null +++ b/test/test.JoinObservable.ts @@ -0,0 +1,41 @@ +/** + * Created by tushar.mathur on 10/10/16. + */ + +import test from 'ava'; +import {join} from '../src/operators/Join'; +import {TestScheduler} from '../src/testing/TestScheduler'; +import {ReactiveTest} from '../src/testing/ReactiveTest'; +const {next, complete} = ReactiveTest + +test('subscribe()', t => { + const sh = TestScheduler.of() + const sa$$ = sh.createColdObservable([ + next(10, 'A0'), + next(20, 'A1'), + next(30, 'A2'), + complete(40) + ]) + const sb$$ = sh.createColdObservable([ + next(10, 'B0'), + next(20, 'B1'), + next(30, 'B2'), + complete(40) + ]) + const s$$ = sh.createColdObservable([ + next(10, sa$$), + next(20, sb$$), + complete(100) + ]) + const {results} = sh.startScheduler(() => join(s$$)) + + t.deepEqual(results, [ + next(220, 'A0'), + next(230, 'A1'), + next(230, 'B0'), + next(240, 'A2'), + next(240, 'B1'), + next(250, 'B2'), + complete(300) + ]) +})