Mutator I/O is a tiny library to handle data transformations. It uses RxJS to compose streams of data from a source (inputStream) to a destination (outputStream)
npm i mutator-io
The concept of a "pipe" in Mutator I/O is simply an input stream piped into an output stream
interface Pipe {
name: string
in: InputStream<any>
out: OutputStream<any>
}
import { MutatorIO } from 'mutator-io'
import * as MqttInputStream from 'mutator-io-plugin-in-mqtt'
import * as DynamoDBOutputStream from 'mutator-io-plugin-out-dynamodb'
const myPipe = {
name: 'myPipeName',
in: new MqttInputStream({
url: 'mqtt://localhost:1883',
topics: ['mytopic']
}),
out: new DynamoDBOutputStream(),
}
const mutator = new MutatorIO([myPipe])
mutator.start()
interface Config {
COLORS?: boolean
LOG_LEVEL?: 'NONE' | 'INFO' | 'DEBUG'
}
class MutatorIO {
// Constructor receives an array of Pipes and an optional Config object
constructor(pipes: Array<Pipe>, config?: Config)
// Used internally by subscription to remove a trasnformer
removeTransformer(pipeName: string, index: number): boolean
// Main method used to add a transformation to an existing pipe
transform(pipeName: string, transform: TransformStream<any>): Subscription
// Subscribes to the streams and start listening to inputStreams
start(): void
}
-
- MQTT available through
npm i mutator-io-plugin-in-mqtt
- MQTT available through
-
- DynamoDB available through
npm i mutator-io-plugin-out-dynamodb
- DynamoDB available through
Once you create a pipe, you have the possiblity to append data transformations to manipulate / aggregate / mutate the incoming data as you please (before it gets to the outputStream in the pipe)
import * as DynamoDBOutputStream from 'mutator-io-plugin-out-dynamodb'
// Transform can be a simple function
// or a function returning an Observable or a Promise
mutator.transform('myPipeName', (msg) => msg + 'something')
mutator.transform('myPipeName', (msg) => Rx.Observable.from([msg + 'something']))
mutator.transform('myPipeName', (msg) => Promise.resolve(msg + 'something else delayed'))
// Transformers can implement Typescript interfaces provided by output Streams
// E.G. this transform operation transforms the incoming payload from myPipeName
// into a DynamoDB delete query, treating payload as the key of the item to delete
mutator.transform('myPipeName', (msg): DynamoDBOutputStream.Message => {
operation: DynamoDBOutputStream.Operations.DELETE,
params: {
TableName: 'processor_test',
Key: {
id: msg.payload,
}
}
})
This project leverages lerna.js to handle multiple npm packages at the same time. Use npm i && lerna bootstrap
to initialise the main repository as well as all the sub-packages. This will also act just like npm link
so the packages that depends on eachothers will keep working.
pre-commit
ensures that commits are linted and tests are green in order to perform a new commit.
"build" // Builds all packages via tsc
"test" // Runs lerna bootstrap and runs tests across all packages
"lint" // Lints (and tries to fix) source code for all the packages
Each package has the possibility to run the following scripts:
"build" // Builds the files using tsc
"build:watch" // Builds the files using tsc with -w argument recompile on change
"doc" // Generates the documentation leveraging Typescript via Typedoc
"test" // Launches mocha tests
"test:watch" // Launches mocha tests with watch optin to retrigger on change
"test:debug" // Launches iron-node with source maps support to debug tests