Skip to content

Commit

Permalink
Merge pull request #11 from tusharmath/flat-map
Browse files Browse the repository at this point in the history
Add Join functionality
  • Loading branch information
tusharmath authored Oct 10, 2016
2 parents 201f1f8 + a201f83 commit 24d65e6
Show file tree
Hide file tree
Showing 4 changed files with 265 additions and 0 deletions.
66 changes: 66 additions & 0 deletions src/lib/LinkedList.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/**
* Created by tushar.mathur on 09/10/16.
*/


export class Node<T> {
public left: Node<T> | null;
public right: Node<T> | null;

constructor (public value: T) {
this.right = null
this.left = null
}

static of <T> (val: T) {
return new Node(val)
}
}

export class LinkedList<T> {
public length: number
private __head: Node<T> | null;

constructor () {
this.length = 0
this.__head = null
}

element () {
return this.__head
}

add (val: T) {
const node = Node.of(val)
if (!this.__head) {
this.__head = node
} else {
this.__head.right = node
node.left = this.__head
this.__head = node
}
this.length++
return node
}

remove (n: Node<T>) {
if (n.left && n.right) {
n.left.right = n.right
n.right.left = n.left
}
else if (n.left) {
this.__head = n.left
n.left.right = null
}
else if (n.right) {
n.right.left = null
} else {
this.__head = null
}
this.length--
}

static of<T> () {
return new LinkedList<T>()
}
}
86 changes: 86 additions & 0 deletions src/operators/Join.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/**
* Created by tushar.mathur on 10/10/16.
*/


import {IObservable} from '../types/core/IObservable';
import {IObserver} from '../types/core/IObserver';
import {IScheduler} from '../types/IScheduler';
import {ISubscription} from '../types/core/ISubscription';
import {CompositeSubscription} from '../testing/Subscription';


export class JoinValueObserver<T> implements IObserver<T> {
constructor (private sink: IObserver<T>, private root: JoinObserver<T>) {
}

next (val: T): void {
this.sink.next(val)
}

error (err: Error): void {
this.sink.error(err)
}

complete (): void {
this.root.subscriptionCompleted()
}

}

export class JoinObserver<T> implements IObserver<IObservable<T>> {
private count: number;
private sourceCompleted: boolean;

constructor (private sink: IObserver<T>, private scheduler: IScheduler, private subscriptions: CompositeSubscription) {
this.sourceCompleted = false
this.count = 0
}

subscriptionCompleted () {
this.count--
this.completeSink()
}

completeSink () {
if (this.sourceCompleted && this.count === 0) {
this.sink.complete()
}
}


next (val: IObservable<T>): void {
const joinValueObserver = new JoinValueObserver(this.sink, this);
this.count++
this.subscriptions.add(
val.subscribe(joinValueObserver, this.scheduler)
)
}

error (err: Error): void {
this.sink.error(err)
}

complete (): void {
this.sourceCompleted = true
this.completeSink()
}
}


export class JoinObservable<T> implements IObservable<T> {
constructor (private source: IObservable<IObservable<T>>) {
}

subscribe (observer: IObserver<T>, scheduler: IScheduler): ISubscription {
const subscription = new CompositeSubscription()
subscription.add(
this.source.subscribe(new JoinObserver(observer, scheduler, subscription), scheduler)
)
return subscription
}
}

export function join <T> (source: IObservable<IObservable<T>>) {
return new JoinObservable(source)
}
41 changes: 41 additions & 0 deletions test/test.JoinObservable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Created by tushar.mathur on 10/10/16.
*/

import test from 'ava';
import {join} from '../src/operators/Join';
import {TestScheduler} from '../src/testing/TestScheduler';
import {ReactiveTest} from '../src/testing/ReactiveTest';
const {next, complete} = ReactiveTest

test('subscribe()', t => {
const sh = TestScheduler.of()
const sa$$ = sh.createColdObservable([
next(10, 'A0'),
next(20, 'A1'),
next(30, 'A2'),
complete(40)
])
const sb$$ = sh.createColdObservable([
next(10, 'B0'),
next(20, 'B1'),
next(30, 'B2'),
complete(40)
])
const s$$ = sh.createColdObservable([
next(10, sa$$),
next(20, sb$$),
complete(100)
])
const {results} = sh.startScheduler<number>(() => join(s$$))

t.deepEqual(results, [
next(220, 'A0'),
next(230, 'A1'),
next(230, 'B0'),
next(240, 'A2'),
next(240, 'B1'),
next(250, 'B2'),
complete(300)
])
})
72 changes: 72 additions & 0 deletions test/test.LinkedList.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/**
* Created by tushar.mathur on 09/10/16.
*/


import test from 'ava';
import {LinkedList} from '../src/lib/LinkedList';

function toArray<T> (q: LinkedList<T>) {
let n = q.element()
if (!n) return []
const arr = [n.value]
while (n.left) {
n = n.left
arr.push(n.value)
}
return arr.reverse()
}
test('constructor()', t => {
t.true(LinkedList.of() instanceof LinkedList)
})
test('add()', t => {
const q = LinkedList.of()
q.add('A')
q.add('B')
q.add('C')
q.add('D')
t.deepEqual(toArray(q), ['A', 'B', 'C', 'D'])
t.is(q.length, 4)
})

test('remove(): Remove First (non-empty)', t => {
const q = LinkedList.of()
const a = q.add('A')
const b = q.add('B')
const c = q.add('C')

q.remove(a)
t.deepEqual(toArray(q), ['B', 'C'])
t.is(q.length, 2)
})

test('remove(): Remove LAST (non-empty)', t => {
const q = LinkedList.of()
const a = q.add('A')
const b = q.add('B')
const c = q.add('C')
q.remove(c)
t.deepEqual(toArray(q), ['A', 'B'])
t.is(q.length, 2)
})


test('remove(): Remove MIDDLE (non-empty)', t => {
const q = LinkedList.of()
const a = q.add('A')
const b = q.add('B')
const c = q.add('C')

q.remove(b)
t.deepEqual(toArray(q), ['A', 'C'])
t.is(q.length, 2)
})


test('remove(): Remove LAST', t => {
const q = LinkedList.of()
const a = q.add('A')
q.remove(a)
t.deepEqual(toArray(q), [])
t.is(q.length, 0)
})

0 comments on commit 24d65e6

Please sign in to comment.