diff --git a/src/lib/ForEach.ts b/src/lib/ForEach.ts index e2f1bfa..e5bd26d 100644 --- a/src/lib/ForEach.ts +++ b/src/lib/ForEach.ts @@ -2,30 +2,35 @@ * Created by tushar.mathur on 01/11/16. */ import {IObservable} from './Observable' +import {IObserver} from './Observer' import {createScheduler} from './Scheduler' import {ISubscription} from './Subscription' import {curry} from './Utils' -export type TOnNext = {(value: T): void} +export type TOnNext = {(value: T): void} | IObserver export type TSource = IObservable export type TResult = ISubscription +const error = (err: Error) => { + throw err +} +const complete = () => void 0 + +export type ForEachType = { + (onNext: TOnNext, source: TSource): TResult + (onNext: TOnNext): {(source: TSource): TResult} +} + export const forEach = curry(function( - onNext: TOnNext, + next: TOnNext, observable: TSource ) { - return observable.subscribe( - { - next(value: T) { - onNext(value) - }, - complete() {}, - error(err: Error) { - throw err + const observer = typeof next === 'function' + ? { + next: next, + complete: complete, + error: error } - }, - createScheduler() - ) -}) as {(onNext: TOnNext, source: TSource): TResult} & { - (onNext: TOnNext): {(source: TSource): TResult} -} + : next + return observable.subscribe(observer, createScheduler()) +}) as ForEachType diff --git a/test/test.ForEach.ts b/test/test.ForEach.ts new file mode 100644 index 0000000..cb33116 --- /dev/null +++ b/test/test.ForEach.ts @@ -0,0 +1,34 @@ +/** + * Created by tushar on 09/09/17. + */ + +import * as assert from 'assert' +import {forEach} from '../src/lib/ForEach' +import {fromMarble} from '../src/testing/Marble' +import {TestScheduler} from '../src/testing/TestScheduler' + +describe('forEach()', () => { + it('should take a function as the default next', () => { + const sh = TestScheduler.of() + const $ = sh.Cold('-1234') + const actual: number[] = [] + forEach((i: number) => actual.push(i), $) + const expected = ['1', '2', '3', '4'] + sh.advanceBy(300) + assert.deepEqual(actual, expected) + }) + + it('should take an observer', () => { + const sh = TestScheduler.of() + const $ = sh.Cold('-1234|') + const testObserver = sh.Observer() + + forEach(testObserver, $) + sh.advanceBy(300) + + const actual = testObserver.results + const expected = fromMarble('-1234|') + + assert.deepEqual(actual, expected) + }) +})