Multi-threaded workers processing
schema.org Action
concurrently.
Note: this module is auto published to npm on CircleCI. Only run npm version patch|minor|major
and let CI do the rest.
Workers work with schema.org Action
. Readers not
familiar with Action
should refer
to
schema.org Actions overview document for
a quick introduction.
For an API endpoint receiving actions like:
{
"@context": "http://schema.org",
"@id-input": { "@type": "PropertyValueSpecification", "valueRequired": true },
"@type-input": { "@type": "PropertyValueSpecification", "valueRequired": true },
"actionStatus": "PotentialActionStatus",
"agent-input": { "@type": "PropertyValueSpecification", "valueRequired": true },
"object-input": {
"@type": "PropertyValueSpecification",
"valueRequired": true,
"valueName": "objectId"
},
"result": {
"@id-output": {
"@type": "PropertyValueSpecification",
"valueRequired": true,
"valueName": "resultId"
},
"@type": "UpdateAction"
},
"target": {
"@type": "EntryPoint",
"httpMethod": "PUT",
"urlTemplate": "http://example.com/{objectId}",
"encodingType": "application/ld+json",
"contentType": "application/ld+json"
}
}
@scipe/workers
provides everything required to create scalable action
processing pipelines supporting cancellation and real time progress events.
@scipe/workers
provides a base Worker
class. Workers implementors
must extend this base class with:
- a
handleAction
method (required) - a
handleExit
method (optional) - life cycles methods (
onActiveActionStatus
,onCompletedActionStatus
,onFailedActionStatus
) (optional).
import { Worker } from '@scipe/workers';
class CustomWorker extends Worker {
constructor(config) {
super(config);
}
handleAction(action, callback) {
// Do work
callback(err, handledAction, nextAction);
}
handleExit(err) {
// err is an error in case of crash or a status code in case of clean exit
// Do cleanup things like killing child processes
}
onActiveActionStatus(action, callback) {
// Called before the worker starts to emit the first
// ActiveActionStatus message. Calling the callback with an error will
// abort the work.
}
onCompletedActionStatus(handledAction, callback) {
// Called if handleAction succesfully completed and before emitting
// CompletedActionStatus message. Calling the callback with an error will
// call onFailedActionStatus (passing the error and the handledAction).
}
onCanceledAction(action, callback) {
// Called when the user issue a `CancelAction` targetting `action`
// Calling the callback with an error will abort the cancellation
}
onFailedActionStatus(err, action, callback) {
// Called if handleAction or onCompletedActionStatus failed
// Calling the callback with and error with a negative
// property will trigger the suicide of the worker. After suicide, a new
// worker node will be automatically respawned.
}
}
Workers are spawned (using Node.js cluster module) and expose ZeroMQ sockets so that:
- work (action) can be dispatched to the workers.
- workers can notify their progress.
- ongoing work (action) can be canceled.
If the handleAction
method calls its completion callback with a
nextAction
argument, the next actions will be automatically
dispatched.
Errors should be instances of
Error.
Errors may have a code
property.
Errors with a code < 0 triggers the suicide of the current worker. After suicide, a new worker node will be automatically respawned.
worker:
import { Worker } from '@scipe/workers';
class CustomWorker extends Worker {
constructor(config) {
super(config);
}
handleAction(action, callback) {
callback(err, processedAction, nextAction);
}
handleExit(err) {
}
}
let w = new CustomWorker({nWorkers: 1});
w.listen();
w.stop(() => {
//stopped
});
client:
import { Worker } from '@scipe/workers';
let w = new Worker();
w.dispatch({
'@context': 'http://schema.org',
'@id': 'http://example.com/actionId',
'@type': 'Action',
agent: 'http://example.com/agentId',
object: 'http://example.com/objectId',
result: {
'@id-outptut': {
'@type': 'PropertyValueSpecification',
valueRequired: true,
valueName: 'resultId'
}
},
target: {
'@type': 'EntryPoint',
httpMethod: 'PUT',
urlTemplate: 'http://example.com/{resultId}',
encodingType: 'application/ld+json',
contentType: 'application/ld+json'
}
}, (err) => {
// the worker acknowledge the dispatch as soon as the action is received by the worker
});
A broker is needed so that the client can reach the worker. The broker will also ensure proper balancing of the load among the multiple connected workers (using a least recently used strategy).
Broker:
import { Broker } from '@scipe/workers';
const broker = new Broker();
broker.listen(err => {
if (err) {
throw err;
}
});
broker.on('change', (data) => {
console.log(data);
})
The broker
is an EventEmitter
and emit change
event that can be tracked to
know:
- the number of pending requests
- the number of available workers (in READY state).
This data can be used to auto-scale the workers based on work load.
Cancellation (CancelAction
)
Workers subscribe to a ZeroMQ SUB socket and messages can be sent to this socket to administrate the workers.
In particular, work related to a given action
can be canceled by sending
a CancelAction
whose object
is the action
@id
to the
worker zeromq under the worker
topic to the pub socket.
import zmq from 'zmq';
const pub = zmq.socket('push');
const topic = 'worker';
const cancelAction = {
'@type': CancelAction,
actionStatus: 'CompletedActionStatus',
object: 'scipe:actionId'
}
pub.connect(w.PULL_ENDPOINT);
pub.send([topic, JSON.stringify(cancelAction)]);
Workers publish the status of their work through a ZeroMQ SUB socket.
import zmq from 'zmq';
let sub = zmq.socket('sub');
sub.connect(w.XPUB_ENDPOINT);
sub.subscribe('');
sub.on('message', function(topic, action) {
// topic is the action agent['@id']
// note that topic and action are Buffers
});
When a worker starts (and while the job is running), it will re-emit
the action sent at a regular interval with an
actionStatus
of
ActiveActionStatus
.
If a user cancel a job, the worker will emit emit the original action with an
actionStatus
of
CanceledActionStatus
.
If a worker fails, it will emit emit the original action with an
actionStatus
of
FailedActionStatus
and
an error
property containing more
information on the cause of the failure.
When a worker is done processing an action, it will emit the
handledAction
returned by the handleAction
method usually with an
actionStatus
of
CompletedActionStatus
.
Within a worker, further information can be published to
the ZeroMQ PUB socket by calling the emitEvent(action, event)
method. Calling emitEvent
will publish
a ProgressEvent
to the PUB socket. The
topic (required by ZeroMQ) will be set to the action agent @id
.
{
"@context": "http://schema.org",
"@id": "scipe:eventId",
"@type": "Event",
"about": "scipe:actionId",
"description": "starting to process the action",
"startDate": "2016-02-29T16:21:32.886Z"
}
In addition to publishing the ProgressEvent
, the emitEvent
method
returns an object with:
emitEndedEvent
, a function returning the sameProgressEvent
as the one emitted the previous call but, with an addedendDate
property.emitEvent
, returning a newProgressEvent
linked to the previous event through the superEvent property.toJSON
, function returning the emittedProgressEvent
JavaScript object (note that this function will be called by JSON.stringify).
import { Worker } from '@scipe/workers';
class CustomWorker extends Worker {
handleAction(action, callback) {
const superEvent = this.emitEvent(action, 'starting to process the action');
const imageConversionEvent = superEvent.emitEvent('starting image conversion');
// convert images...
imageConversionEvent.emitEndedEvent();
superEvent.emitEndedEvent();
callback(err, handledAction, nextAction);
}
}
Workers can be configured by passing a config
object to their
constructor (see worker source code for details).
import { ImageWorker, AudioVideoWorker, DocumentWorker } from '@scipe/workers';
The ImageWorker
class extends the Worker
class and
process Action
whose object
are ImageObject
.
The AudioVideoWorker
class extends the Worker
class and
process Action
whose object
are VideoObject
or AudioObject
.
The DocumentWorker
class extends the Worker
class and
processes Actions
the object
of which are DocumentObject
, a
subclass of MediaObject
.
A CLI is available to quickly launch a broker and all the specialized worker.
See
run-workers --help
For more details
-
Install graphicsmagick (
brew install graphicsmagick --with-libtiff
on OSX). -
Install imagemagick (
brew install imagemagick --with-libtiff
on OSX). -
Install ffmpeg (
brew install ffmpeg --with-libvpx --with-libvorbis --with-theora --with-aac --with-libx264
on OSX). -
Install LibreOffice (it needs to be used headless)
-
Run
npm install
Run npm test
@scipe/workers
is dual-licensed under commercial and open source licenses
(AGPLv3) based on the intended
use case. Contact us to learn which license applies to your use case.