Mutator I/O is a tiny library to handle data transformations. It uses RxJS to compose streams of data from a source (inputStream) to a destination (outputStream)
npm i mutator-io
The concept of a "pipe" in Mutator I/O is simply an input stream piped into an output stream
interface Pipe {
name: string
in: InputStream<any>
out: OutputStream<any>
import { MutatorIO } from 'mutator-io'
import * as MqttInputStream from 'mutator-io-plugin-in-mqtt'
import * as DynamoDBOutputStream from 'mutator-io-plugin-out-dynamodb'
const myPipe = {
name: 'myPipeName',
in: new MqttInputStream({
url: 'mqtt://localhost:1883',
topics: ['mytopic']
out: new DynamoDBOutputStream(),
const mutator = new MutatorIO([myPipe])
interface Config {
COLORS?: boolean
class MutatorIO {
// Constructor receives an array of Pipes and an optional Config object
constructor(pipes: Array<Pipe>, config?: Config)
// Used internally by subscription to remove a trasnformer
removeTransformer(pipeName: string, index: number): boolean
// Main method used to add a transformation to an existing pipe
transform(pipeName: string, transform: TransformStream<any>): Subscription
// Subscribes to the streams and start listening to inputStreams
start(): void
- MQTT available through
npm i mutator-io-plugin-in-mqtt
- MQTT available through
- DynamoDB available through
npm i mutator-io-plugin-out-dynamodb
- DynamoDB available through
Once you create a pipe, you have the possiblity to append data transformations to manipulate / aggregate / mutate the incoming data as you please (before it gets to the outputStream in the pipe)
import * as DynamoDBOutputStream from 'mutator-io-plugin-out-dynamodb'
// Transform can be a simple function
// or a function returning an Observable or a Promise
mutator.transform('myPipeName', (msg) => msg + 'something')
mutator.transform('myPipeName', (msg) => Rx.Observable.from([msg + 'something']))
mutator.transform('myPipeName', (msg) => Promise.resolve(msg + 'something else delayed'))
// Transformers can implement Typescript interfaces provided by output Streams
// E.G. this transform operation transforms the incoming payload from myPipeName
// into a DynamoDB delete query, treating payload as the key of the item to delete
mutator.transform('myPipeName', (msg): DynamoDBOutputStream.Message => {
operation: DynamoDBOutputStream.Operations.DELETE,
params: {
TableName: 'processor_test',
Key: {
id: msg.payload,
This project leverages lerna.js to handle multiple npm packages at the same time. Use npm i && lerna bootstrap
to initialise the main repository as well as all the sub-packages. This will also act just like npm link
so the packages that depends on eachothers will keep working.
ensures that commits are linted and tests are green in order to perform a new commit.
"build" // Builds all packages via tsc
"test" // Runs lerna bootstrap and runs tests across all packages
"lint" // Lints (and tries to fix) source code for all the packages
Each package has the possibility to run the following scripts:
"build" // Builds the files using tsc
"build:watch" // Builds the files using tsc with -w argument recompile on change
"doc" // Generates the documentation leveraging Typescript via Typedoc
"test" // Launches mocha tests
"test:watch" // Launches mocha tests with watch optin to retrigger on change
"test:debug" // Launches iron-node with source maps support to debug tests