Skip to content

Commit

Permalink
Merge pull request #1 from tusharmath/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
tusharmath authored Oct 8, 2016
2 parents 1a76442 + 462f6b0 commit f4e3a8e
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 37 deletions.
30 changes: 30 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Observable Air

[![Build Status](https://travis-ci.org/tusharmath/rwc.svg?branch=master)](https://travis-ci.org/tusharmath/observable-air)
[![npm](https://img.shields.io/npm/v/observable-air.svg)](https://www.npmjs.com/package/observable-air)
[![Coverage Status](https://coveralls.io/repos/github/tusharmath/observable-air/badge.svg)](https://coveralls.io/github/tusharmath/observable-air)

A light weight and performance focused implementation of [Observables].
Other mainstream (no pun intended) alternatives —

[Observables]: https://github.com/tc39/proposal-observable

- [most]
- [rxjs]
- [kefir]
- [bacon]
- [highland]
- [zen observable]

[most]: https://github.com/cujojs/most
[rxjs]: https://github.com/ReactiveX/rxjs
[kefir]: https://rpominov.github.io/kefir/
[bacon]: https://baconjs.github.io/
[highland]: http://highlandjs.org/
[zen observable]: https://github.com/zenparsing/zen-observable

### Key Features

1. **Ultra high performance**
2. **Small footprint**
3. **Ease of testability**
26 changes: 20 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,31 @@
"description": "Observables for the calorie conscious",
"main": "src/Observable",
"scripts": {
"test": "ava",
"lint": "tslint ./src/**.ts",
"hydra": "node --trace-hydrogen --trace-phase=Z --trace-deopt --code-comments --hydrogen-track-positions --redirect-code-traces --redirect-code-traces-to=code.asm chore/benchmark.js",
"rfc": "node chore/rfc",
"benchmark": "date >> benchmark.md && node chore/benchmark >> benchmark.md",
"cleanup": "rm -rf ./src/**/*.js && rm -rf ./src/**/*.map",
"coverage": "nyc npm test && nyc report --reporter=text-lcov | coveralls",
"hydra": "node --trace-hydrogen --trace-phase=Z --trace-deopt --code-comments --hydrogen-track-positions --redirect-code-traces --redirect-code-traces-to=code.asm chore/benchmark.js",
"lint": "tslint ./src/**.ts",
"postinstall": "tsc",
"semantic-release": "semantic-release pre && npm publish && semantic-release post"
"rfc": "node chore/rfc",
"semantic-release": "semantic-release pre && npm publish && semantic-release post",
"test": "ava"
},
"author": "",
"license": "ISC",
"devDependencies": {
"ava": "^0.16.0",
"benchmark": "^2.1.1",
"coveralls": "^2.11.14",
"cz-conventional-changelog": "^1.2.0",
"es-observable-tests": "^0.3.0",
"ghooks": "^1.3.2",
"nyc": "^8.3.1",
"semantic-release": "^4.3.5",
"tslint": "^3.15.1",
"tslint-microsoft-contrib": "^2.0.12",
"typescript": "^2.0.3"
"typescript": "^2.0.3",
"validate-commit-msg": "^2.8.2"
},
"repository": {
"type": "git",
Expand All @@ -29,6 +36,13 @@
"config": {
"commitizen": {
"path": "./node_modules/cz-conventional-changelog"
},
"ghooks": {
"pre-commit": "npm run lint",
"commit-msg": "validate-commit-msg",
"pre-push": "npm test",
"post-merge": "npm install",
"post-rewrite": "npm install"
}
}
}
26 changes: 15 additions & 11 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,15 @@ import {Subscription, CompositeSubscription} from './Subscription';
import {SubscriptionObserver} from './SubscriptionObserver';
import {SafeExecutor} from './lib/SafeExecutor';
import {Safety} from './types/ISafeValue';
import {ISubscriptionObserver} from './types/core/ISubscriptionObserver';


function startObserver <T> (observer: IObserver<T>,
subscription: CompositeSubscription) {
if (observer.start)
observer.start(subscription)
}

export class Observable<T> implements IObservable<T> {
constructor (private func: ISubscriberFunction<T>) {
}
Expand All @@ -27,21 +34,18 @@ export class Observable<T> implements IObservable<T> {
})
}

safelyExecuteFunc (observer: IObserver<T>, cSub: CompositeSubscription) {
const r = SafeExecutor(() => {
cSub.add(Subscription.from(this.func(SubscriptionObserver.from(observer))))
})
if (r.type === Safety.error && observer.error) {
observer.error(r.value as Error)
}
private safelyExecuteFunc (observer: ISubscriptionObserver<T>, cSub: CompositeSubscription) {
const r = SafeExecutor(() => cSub.add(Subscription.from(this.func(observer))))
if (r.type === Safety.error) observer.error(r.value as Error)
}

subscribe (observer: IObserver<T>, scheduler: IScheduler = new DefaultScheduler()): ISubscription {
if (typeof observer !== 'object') throw new TypeError('Observer should be of object type')
subscribe (observer: IObserver<T> | ((t: T) => void), scheduler: IScheduler = new DefaultScheduler()): ISubscription {
const subObserver = SubscriptionObserver.from(observer)
const subscription = new CompositeSubscription()
const task = () => this.safelyExecuteFunc(observer, subscription);
const task = () => this.safelyExecuteFunc(subObserver, subscription);
subscription.add(scheduler.scheduleNow(task))
if (observer.start) observer.start(subscription)
startObserver(observer as IObserver<T>, subscription);
return subscription
}

}
41 changes: 22 additions & 19 deletions src/SubscriptionObserver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,45 @@
import {ISubscriptionObserver} from './types/core/ISubscriptionObserver';
import {IObserver} from './types/core/IObserver';

export class SubscriptionObserverStub <T> implements ISubscriptionObserver<T> {
closed: boolean;

next (val: T): void {
}

error (err: Error): void {
}

complete (): void {
}
export function noop () {
}
export function defaultOnError (err: Error) {
throw err
}


export class SubscriptionObserver<T> implements ISubscriptionObserver<T> {
closed: boolean;

constructor (private sink: IObserver<T>) {
constructor (private onNext: (val: T) => void,
private onError: (err: Error) => void,
private onComplete: () => void) {
this.closed = false
}

next (val: T): void {
this.sink.next(val)
this.onNext(val)
}

error (err: Error): void {
this.sink.error(err)
this.onError(err)
}

complete (): void {
this.sink.complete()
this.onComplete()
this.closed = true
}

static from<T> (observer: IObserver<T>) {
if (!observer.next || !observer.complete || !observer.error)
return new SubscriptionObserverStub()
return new SubscriptionObserver(observer)
static from<T> (observer: IObserver<T> | ((t: T) => void)): ISubscriptionObserver<T> {
const type = typeof observer
if (type !== 'object' && type !== 'function') throw new TypeError()
if (observer instanceof SubscriptionObserver) return observer
if (typeof observer === 'function')
return new SubscriptionObserver(observer, defaultOnError, noop)
return new SubscriptionObserver(
observer.next ? (t: T) => observer.next(t) : noop,
observer.error ? (t: Error) => observer.error(t) : defaultOnError,
observer.complete ? () => observer.complete() : noop
)
}
}
2 changes: 1 addition & 1 deletion src/testing/TestObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ export class TestObservable<T> implements IObservable<T> {
}

subscribe (observer: IObserver<T>, scheduler: IScheduler): ISubscription {
return Subscription.from(this.func(new SubscriptionObserver(observer)))
return Subscription.from(this.func(SubscriptionObserver.from(observer)))
}
}

0 comments on commit f4e3a8e

Please sign in to comment.