Skip to content

Commit

Permalink
feat(forEach): forEach can also take in an observer
Browse files Browse the repository at this point in the history
  • Loading branch information
tusharmath committed Sep 9, 2017
1 parent 6f5ca75 commit 55c5b07
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 16 deletions.
37 changes: 21 additions & 16 deletions src/lib/ForEach.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,35 @@
* Created by tushar.mathur on 01/11/16.
*/
import {IObservable} from './Observable'
import {IObserver} from './Observer'
import {createScheduler} from './Scheduler'
import {ISubscription} from './Subscription'
import {curry} from './Utils'

export type TOnNext<T> = {(value: T): void}
export type TOnNext<T> = {(value: T): void} | IObserver<T>
export type TSource<T> = IObservable<T>
export type TResult = ISubscription

const error = (err: Error) => {
throw err
}
const complete = () => void 0

export type ForEachType = {
<T>(onNext: TOnNext<T>, source: TSource<T>): TResult
<T>(onNext: TOnNext<T>): {(source: TSource<T>): TResult}
}

export const forEach = curry(function<T>(
onNext: TOnNext<T>,
next: TOnNext<T>,
observable: TSource<T>
) {
return observable.subscribe(
{
next(value: T) {
onNext(value)
},
complete() {},
error(err: Error) {
throw err
const observer = typeof next === 'function'
? {
next: next,
complete: complete,
error: error
}
},
createScheduler()
)
}) as {<T>(onNext: TOnNext<T>, source: TSource<T>): TResult} & {
<T>(onNext: TOnNext<T>): {(source: TSource<T>): TResult}
}
: next
return observable.subscribe(observer, createScheduler())
}) as ForEachType
34 changes: 34 additions & 0 deletions test/test.ForEach.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* Created by tushar on 09/09/17.
*/

import * as assert from 'assert'
import {forEach} from '../src/lib/ForEach'
import {fromMarble} from '../src/testing/Marble'
import {TestScheduler} from '../src/testing/TestScheduler'

describe('forEach()', () => {
it('should take a function as the default next', () => {
const sh = TestScheduler.of()
const $ = sh.Cold('-1234')
const actual: number[] = []
forEach((i: number) => actual.push(i), $)
const expected = ['1', '2', '3', '4']
sh.advanceBy(300)
assert.deepEqual(actual, expected)
})

it('should take an observer', () => {
const sh = TestScheduler.of()
const $ = sh.Cold('-1234|')
const testObserver = sh.Observer()

forEach(testObserver, $)
sh.advanceBy(300)

const actual = testObserver.results
const expected = fromMarble('-1234|')

assert.deepEqual(actual, expected)
})
})

0 comments on commit 55c5b07

Please sign in to comment.