From 370508b2b96b38ff61442179ac9499934c583328 Mon Sep 17 00:00:00 2001 From: Christos Nasikas Date: Fri, 11 Oct 2019 18:38:05 +0300 Subject: [PATCH 01/23] Seperate protocol helpers --- helpers.js | 83 +-------------------------------- protocols/HistogramProtocol.js | 2 +- protocols/helpers.js | 85 ++++++++++++++++++++++++++++++++++ 3 files changed, 87 insertions(+), 83 deletions(-) create mode 100644 protocols/helpers.js diff --git a/helpers.js b/helpers.js index 5a84cad..d0b56b6 100644 --- a/helpers.js +++ b/helpers.js @@ -5,8 +5,6 @@ const { appEmitter } = require('./emitters.js') const { status } = require('./config/constants') const totalAttributes = require('./smpc-global/attributes.json') const algorithms = require('./smpc-global/algorithms.json') -const mapping = require('../smpc-global/mapping.json') -const meshTerms = require('../smpc-global/meshTerms.json') const pack = (msg) => { return JSON.stringify(msg) @@ -43,90 +41,11 @@ const constructJob = request => { return { ...request, timestamps: { accepted: Date.now() }, status: status.PENDING } } -const getNumericalAttribute = (attributes) => { - return attributes.find(a => attributes.find(b => b.name === a.name && b.type === 'numerical')) -} - -const getCatecoricalAttribute = (attributes) => { - return attributes.find(a => attributes.find(b => b.name === a.name && b.type === 'categorical')) -} - -const getNumericCell = (attributes) => { - return getCell(getNumericalAttribute(attributes)) -} - -const getCell = (attr) => { - const defaultCells = Math.floor(this.state.dataInfo.dataSize / 10) - const cells = attr.cells || defaultCells - return Math.min(cells, defaultCells) -} - -const getHistogramMinMax = (data) => { - const m = data.replace(/\s/g, '').split(',') - return { min: Number(m[0]), max: Number(m[1]) } -} - -const fillHistogramArray = (x, y = 0, value = 0) => { - if (y === 0) { - return Array(x).fill(value) - } - - return Array.from(Array(x), _ => Array(y).fill(value)) -} - -const constructHistogram2DArray = (data, cellsX, cellsY) => { - let arr = fillHistogramArray(cellsX, cellsY) - - for (let i = 0; i < data.length; i++) { - let b = data[i].replace(/\s/g, '').split(',') - arr[Number(b[0])][Number(b[1])] = b[2] - } - - return arr -} - -const histogram2DArrayFromFlattenArray = (data, cellsX, cellsY) => { - const y = Math.ceil(cellsY / cellsX) - return _.chunk(data, y).map(arr => arr.map(i => i.replace(/\s/g, '').split(',')[1])) -} - -const getAttributeNames = (mesh) => { - // Take the vales of the childer of the attribute - // Sort children by mapping number - // Get only mesh term - // Get mesh's term name - return Object - .entries(mapping[mesh]) - .sort((a, b) => a[1] - b[1]) - .map(t => t[0]) - .map(m => meshTerms[m].name) -} - -const computeAxisLabels = (min, max, width, cells) => { - let start = min - let end = max - const ticks = [] - for (const _ of Array(cells - 1).keys()) { // eslint-disable-line no-unused-vars - end = start + width - ticks.push(`[${start}, ${end})`) - start = end - } - end = start + width - ticks.push(`[${start}, ${end}]`) - return ticks -} - module.exports = { getHistogramType, pack, unpack, sha256, updateJobStatus, - constructJob, - getNumericCell, - getCell, - getAttributeNames, - getHistogramMinMax, - constructHistogram2DArray, - histogram2DArrayFromFlattenArray + constructJob } diff --git a/protocols/HistogramProtocol.js b/protocols/HistogramProtocol.js index cee8811..c142084 100644 --- a/protocols/HistogramProtocol.js +++ b/protocols/HistogramProtocol.js @@ -11,7 +11,7 @@ const { constructHistogram2DArray, histogram2DArrayFromFlattenArray, getCatecoricalAttribute -} = require('../helpers') +} = require('./helpers') class HistogramProtocol extends Protocol { constructor (job) { diff --git a/protocols/helpers.js b/protocols/helpers.js new file mode 100644 index 0000000..13794fa --- /dev/null +++ b/protocols/helpers.js @@ -0,0 +1,85 @@ +const _ = require('lodash') +const mapping = require('../smpc-global/mapping.json') +const meshTerms = require('../smpc-global/meshTerms.json') + +const getNumericalAttribute = (attributes) => { + return attributes.find(a => attributes.find(b => b.name === a.name && b.type === 'numerical')) +} + +const getCatecoricalAttribute = (attributes) => { + return attributes.find(a => attributes.find(b => b.name === a.name && b.type === 'categorical')) +} + +const getNumericCell = (attributes) => { + return getCell(getNumericalAttribute(attributes)) +} + +const getCell = (attr) => { + const defaultCells = Math.floor(this.state.dataInfo.dataSize / 10) + const cells = attr.cells || defaultCells + return Math.min(cells, defaultCells) +} + +const getHistogramMinMax = (data) => { + const m = data.replace(/\s/g, '').split(',') + return { min: Number(m[0]), max: Number(m[1]) } +} + +const fillHistogramArray = (x, y = 0, value = 0) => { + if (y === 0) { + return Array(x).fill(value) + } + + return Array.from(Array(x), _ => Array(y).fill(value)) +} + +const constructHistogram2DArray = (data, cellsX, cellsY) => { + let arr = fillHistogramArray(cellsX, cellsY) + + for (let i = 0; i < data.length; i++) { + let b = data[i].replace(/\s/g, '').split(',') + arr[Number(b[0])][Number(b[1])] = b[2] + } + + return arr +} + +const histogram2DArrayFromFlattenArray = (data, cellsX, cellsY) => { + const y = Math.ceil(cellsY / cellsX) + return _.chunk(data, y).map(arr => arr.map(i => i.replace(/\s/g, '').split(',')[1])) +} + +const getAttributeNames = (mesh) => { + // Take the vales of the childer of the attribute + // Sort children by mapping number + // Get only mesh term + // Get mesh's term name + return Object + .entries(mapping[mesh]) + .sort((a, b) => a[1] - b[1]) + .map(t => t[0]) + .map(m => meshTerms[m].name) +} + +const computeAxisLabels = (min, max, width, cells) => { + let start = min + let end = max + const ticks = [] + for (const _ of Array(cells - 1).keys()) { // eslint-disable-line no-unused-vars + end = start + width + ticks.push(`[${start}, ${end})`) + start = end + } + end = start + width + ticks.push(`[${start}, ${end}]`) + return ticks +} + +module.exports = { + getNumericCell, + getCell, + getAttributeNames, + getHistogramMinMax, + constructHistogram2DArray, + histogram2DArrayFromFlattenArray +} From d230feb9ce288af0906ad8895a3237a7521349d2 Mon Sep 17 00:00:00 2001 From: Christos Nasikas Date: Fri, 11 Oct 2019 18:38:32 +0300 Subject: [PATCH 02/23] Create queue route --- routes/index.js | 2 ++ routes/queue.js | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+) create mode 100644 routes/queue.js diff --git a/routes/index.js b/routes/index.js index dffb38d..33ee6e3 100644 --- a/routes/index.js +++ b/routes/index.js @@ -1,5 +1,6 @@ const express = require('express') const smpc = require('./smpc') +const queue = require('./queue') const search = require('./search') const auth = require('./auth') @@ -11,6 +12,7 @@ router.get('/health', (_, res) => res.status(204).send()) module.exports = { '/api': router, '/api/smpc': smpc, + '/api/queue': queue, '/api/search': search, '/api/auth': auth } diff --git a/routes/queue.js b/routes/queue.js new file mode 100644 index 0000000..ea12e83 --- /dev/null +++ b/routes/queue.js @@ -0,0 +1,36 @@ +const express = require('express') + +const { getJob } = require('../db') +const { status } = require('../config/constants') +const { HTTPError } = require('../errors') + +const auth = require('../auth') + +let router = express.Router() + +router.get('/:id', [auth.authenticate], async (req, res, next) => { + try { + let value = await getJob(req.params.id) + + if (value.status !== status.COMPLETED) { + return res.status(200).json({ + ...value, + status: status.properties[value.status].msg + }) + } + + /* TODO: Take results url from express */ + const location = `/api/results/${req.params.id}` + + res.set('Location', location) + return res.status(303).json({ location }) + } catch (err) { + if (err.notFound) { + next(new HTTPError(404, 'Not found')) + } + + next(err) + } +}) + +module.exports = router From fd36e1e5b71b063ac72d19424f1db65127c73929 Mon Sep 17 00:00:00 2001 From: Christos Nasikas Date: Fri, 11 Oct 2019 18:38:57 +0300 Subject: [PATCH 03/23] Create results route --- routes/index.js | 2 ++ routes/results.js | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+) create mode 100644 routes/results.js diff --git a/routes/index.js b/routes/index.js index 33ee6e3..df321db 100644 --- a/routes/index.js +++ b/routes/index.js @@ -1,6 +1,7 @@ const express = require('express') const smpc = require('./smpc') const queue = require('./queue') +const results = require('./results') const search = require('./search') const auth = require('./auth') @@ -13,6 +14,7 @@ module.exports = { '/api': router, '/api/smpc': smpc, '/api/queue': queue, + '/api/results': results, '/api/search': search, '/api/auth': auth } diff --git a/routes/results.js b/routes/results.js new file mode 100644 index 0000000..a3b3fec --- /dev/null +++ b/routes/results.js @@ -0,0 +1,46 @@ +const express = require('express') +const contentDisposition = require('content-disposition') + +const { getJob } = require('../db') +const { status } = require('../config/constants') +const { HTTPError } = require('../errors') + +const auth = require('../auth') + +let router = express.Router() + +const getResults = async (req, res, next, download = false) => { + try { + let value = await getJob(req.params.id) + value = { ...value, status: status.properties[value.status].msg } + res.set({ + 'Content-Type': 'application/json' + }) + + if (download) { + res.set({ + 'Content-Disposition': contentDisposition(`${req.params.id}.json`) + }) + + value = JSON.stringify(value) + } + + return res.status(200).send(value) + } catch (err) { + if (err.notFound) { + next(new HTTPError(404, 'Not found')) + } + + next(err) + } +} + +router.get('/:id', [auth.authenticate], async (req, res, next) => { + await getResults(req, res, next) +}) + +router.get('/:id/download', [auth.authenticate], async (req, res, next) => { + await getResults(req, res, next, true) +}) + +module.exports = router From d0a93d5ddb5ce45a9ecbdf37265d4d38a151d7f1 Mon Sep 17 00:00:00 2001 From: Christos Nasikas Date: Fri, 11 Oct 2019 18:39:22 +0300 Subject: [PATCH 04/23] Add deprecated comment --- routes/smpc.js | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/routes/smpc.js b/routes/smpc.js index 33821f2..a3214a8 100644 --- a/routes/smpc.js +++ b/routes/smpc.js @@ -12,6 +12,11 @@ const auth = require('../auth') let router = express.Router() +/* + * Deprecated + * Use createJobRouter +*/ + const createSimpleSMPCRouter = (router, path, middlewares, protocol) => { router.post(path, middlewares, async (req, res, next) => { const job = constructJob(req.body) @@ -33,6 +38,11 @@ const createSimpleSMPCRouter = (router, path, middlewares, protocol) => { return router } +/* + * Deprecated + * Use api/queue/:id +*/ + router.get('/queue/:id', [auth.authenticate], async (req, res, next) => { try { let value = await getJob(req.params.id) @@ -83,10 +93,20 @@ const getResults = async (req, res, next, download = false) => { } } +/* + * Deprecated + * Use api/results/:id +*/ + router.get('/results/:id', [auth.authenticate], async (req, res, next) => { await getResults(req, res, next) }) +/* + * Deprecated + * Use api/results/:id/download +*/ + router.get('/:id/download', [auth.authenticate], async (req, res, next) => { await getResults(req, res, next, true) }) From 825f9e0ac918cc0b964a05b7792ca49b5405d6d4 Mon Sep 17 00:00:00 2001 From: Christos Nasikas Date: Fri, 11 Oct 2019 18:40:07 +0300 Subject: [PATCH 05/23] Job route factory function --- routes/utils.js | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 routes/utils.js diff --git a/routes/utils.js b/routes/utils.js new file mode 100644 index 0000000..5cb4e0a --- /dev/null +++ b/routes/utils.js @@ -0,0 +1,30 @@ +const { addJobToDB } = require('../db') +const { constructJob } = require('../helpers') +const { status } = require('../config/constants') +const { addJobToQueue } = require('../queue') + +const createJobRouter = (router, path, middlewares, protocol) => { + router.post(path, middlewares, async (req, res, next) => { + const job = constructJob(req.body) + job.protocol = protocol + + try { + /* TODO: Take results url from express */ + const location = `/api/queue/${job.id}` + res.set('Location', location) + res.status(202).json({ location, ...job, status: status.properties[status.PENDING].msg }) + + await addJobToDB({ ...job }) + await addJobToQueue({ ...job }) + } catch (err) { + job.status = status.FAILED + next(err) + } + }) + + return router +} + +module.exports = { + createJobRouter +} From c98c5f6f1c758aafda5c75b15d8e94daaace2a6b Mon Sep 17 00:00:00 2001 From: Christos Nasikas Date: Fri, 11 Oct 2019 18:48:02 +0300 Subject: [PATCH 06/23] Fix repo url --- package.json | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/package.json b/package.json index a5390fd..d7b0822 100644 --- a/package.json +++ b/package.json @@ -18,10 +18,10 @@ }, "repository": { "type": "git", - "url": "https://github.com/Athena-MHMD/smpc-coordinator.git" + "url": "https://github.com/athenarc/smpc-coordinator.git" }, - "homepage": "https://github.com/Athena-MHMD/smpc-coordinator", - "bugs": "https://github.com/Athena-MHMD/smpc-coordinator/issues", + "homepage": "https://github.com/athenarc/smpc-coordinator", + "bugs": "https://github.com/athenarc/smpc-coordinator/issues", "license": "MIT", "standard": { "ignore": [] From 555c92952686db0e71ed4ee94d69c7494cc4e31f Mon Sep 17 00:00:00 2001 From: Christos Nasikas Date: Mon, 14 Oct 2019 18:24:49 +0300 Subject: [PATCH 07/23] Refactor protocol utils --- protocols/HistogramProtocol.js | 6 +++--- protocols/{helpers.js => utils.js} | 0 2 files changed, 3 insertions(+), 3 deletions(-) rename protocols/{helpers.js => utils.js} (100%) diff --git a/protocols/HistogramProtocol.js b/protocols/HistogramProtocol.js index c142084..9c4d777 100644 --- a/protocols/HistogramProtocol.js +++ b/protocols/HistogramProtocol.js @@ -2,8 +2,6 @@ const Protocol = require('./Protocol') const logger = require('../config/winston') const { step } = require('../config') const { - pack, - unpack, getNumericCell, getCell, getAttributeNames, @@ -11,7 +9,9 @@ const { constructHistogram2DArray, histogram2DArrayFromFlattenArray, getCatecoricalAttribute -} = require('./helpers') +} = require('./utils') + +const { pack, unpack } = require('../helpers') class HistogramProtocol extends Protocol { constructor (job) { diff --git a/protocols/helpers.js b/protocols/utils.js similarity index 100% rename from protocols/helpers.js rename to protocols/utils.js From d753abf307b3ab0653bf86e1ae06481a75f8e6f8 Mon Sep 17 00:00:00 2001 From: Christos Nasikas Date: Mon, 14 Oct 2019 18:25:08 +0300 Subject: [PATCH 08/23] Move compute to protocol --- protocols/index.js | 13 ++++++++++++- queue.js | 2 +- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/protocols/index.js b/protocols/index.js index 8c5b157..d4591a6 100644 --- a/protocols/index.js +++ b/protocols/index.js @@ -5,6 +5,17 @@ const protocolMapping = new Map() /* All available protocols */ protocolMapping.set('histogram', HistogramProtocol) +const compute = async job => { + if (protocolMapping.has(job.data.protocol)) { + const Protocol = protocolMapping.get(job.data.protocol) + const computation = new Protocol(job) + let out = await computation.execute() + return out + } else { + throw new Error('Compute: Protocol not supported!') + } +} + module.exports = { - protocolMapping + compute } diff --git a/queue.js b/queue.js index 4021455..c62d8e7 100644 --- a/queue.js +++ b/queue.js @@ -1,6 +1,6 @@ const Queue = require('bee-queue') const { URL } = require('url') -const { compute } = require('./smpc/smpc') +const { compute } = require('./protocols') const { HTTPError } = require('./errors') const { status, REDIS_URL } = require('./config/constants') const { appEmitter } = require('./emitters.js') From 26df6c6d472278a32cbff677389e7ff4c5e8dead Mon Sep 17 00:00:00 2001 From: Christos Nasikas Date: Mon, 14 Oct 2019 18:25:32 +0300 Subject: [PATCH 09/23] Remove smpc folder --- smpc/Computation.js | 310 ----------------------------------- smpc/HistogramComputation.js | 170 ------------------- smpc/smpc.js | 16 -- 3 files changed, 496 deletions(-) delete mode 100644 smpc/Computation.js delete mode 100644 smpc/HistogramComputation.js delete mode 100644 smpc/smpc.js diff --git a/smpc/Computation.js b/smpc/Computation.js deleted file mode 100644 index 33357d7..0000000 --- a/smpc/Computation.js +++ /dev/null @@ -1,310 +0,0 @@ -const _ = require('lodash') -const WebSocket = require('ws') -const EventEmitter = require('events') - -const { pack, unpack } = require('../helpers') -const { step, players, clients, ROOT_CA, KEY, CERT } = require('../config') -const logger = require('../config/winston') - -class Computation { - constructor (job) { - this.job = job - this.emitter = new EventEmitter() - - this.players = players.map(p => ({ ...p, socket: null })) - this.clients = clients - .filter(c => _.includes(this.job.data.dataProviders, c.id)) - .map(c => ({ ...c, socket: null })) - - this.job.data.totalClients = this.clients.length - - this.state = { - dataInfoReceived: 0, - compiled: 0, - listen: 0, - import: 0, - exit: 0, - step: step.INIT, - results: '' - } - - this.opts = { - ca: ROOT_CA, - key: KEY, - cert: CERT, - rejectUnauthorized: true, - requestCert: true, - checkServerIdentity: (host, cert) => { - return undefined - } - } - - this.resolve = null - this.reject = null - this._register() - } - - _execute (resolve, reject) { - logger.info('Initiating SMPC Engine...') - logger.info(`Total Clients: ${this.job.data.totalClients}`) - this.job.reportProgress(0) // Next release will inlude the feature to pass arbitrary data in reportProgress - // this.job.reportProgress({ step: this.state.step }) // Next release will inlude the feature to pass arbitrary data in reportProgress - this.resolve = resolve - this.reject = reject - this.setupPlayers() - this.setupClients() - } - - _register () { - this.emitter.on('data-info-received', (msg) => this._eventMiddleware('data-info-received', msg, this.handleDataInfo.bind(this))) - this.emitter.on('compilation-ended', (msg) => this._eventMiddleware('compilation-ended', msg, this.handleCompilation.bind(this))) - this.emitter.on('listen', (msg) => this._eventMiddleware('listen', msg, this.listen.bind(this))) - this.emitter.on('exit', (msg) => this._eventMiddleware('exit', msg, this.handleExit.bind(this))) - this.emitter.on('computation-finished', (msg) => this._eventMiddleware('computation-finished', msg, this.handleComputationFinished.bind(this))) - this.emitter.on('importation-finished', (msg) => this._eventMiddleware('importation-finished', msg, this.handleImportationFinished.bind(this))) - } - - _eventMiddleware (event, msg, next) { - if (msg.data) { - if (msg.data.errors && msg.data.errors.length > 0) { - return this.handleError(msg) - } - - if (msg.data.code && msg.data.code !== 0) { - return this.handleError(msg) - } - } - - next(msg) - } - - setupPlayers () { - for (const [index, p] of this.players.entries()) { - this.players[index].socket = new WebSocket(p.address, { ...this.opts }) // connection errors are handle on ws.on('error') - this.players[index].socket._index = index - const ws = this.players[index].socket - - ws.on('open', () => { - ws.send(pack({ message: 'job-info', job: this.job.data })) - logger.info(`Connected to player ${p.id}.`) - }) - - ws.on('close', (code, reason) => { - logger.info(`Disconnected from player ${index}.`) - this.players[ws._index].socket = null - - if (this.state.step < step.COMPUTATION_END) { - this.restart() - this.reject(new Error(`Player ${p.id} closed before the end of the computation. Reason: ${reason}`)) - } - }) - - ws.on('error', (err) => { - logger.error('Player websocket error: ', err) - this.handleError(err) - }) - - ws.on('message', (data) => { - data = unpack(data) - this.handleMessage('player', ws, data) - }) - } - } - - setupClients () { - for (const [index, c] of this.clients.entries()) { - this.clients[index].socket = new WebSocket(c.address, { ...this.opts }) - this.clients[index].socket._index = index - const ws = this.clients[index].socket - - ws.on('open', () => { - logger.info(`Connected to client ${c.id}.`) - ws.send(pack({ message: 'job-info', job: this.job.data })) - ws.send(pack({ message: 'data-info', job: this.job.data })) - }) - - ws.on('close', (code, reason) => { - logger.info(`Disconnected from client ${index}.`) - this.clients[ws._index].socket = null - if (this.state.step < step.IMPORT_END) { - this.restart() - this.reject(new Error(`Client ${c.id} closed before the end of the importation. Reason: ${reason}`)) - } - }) - - ws.on('message', (data) => { - data = unpack(data) - this.handleMessage('client', ws, data) - }) - - ws.on('error', (err) => { - logger.error('Client websocket error: ', err) - this.handleError(err) - }) - } - } - - handleMessage (entity, ws, data) { - switch (data.message) { - case 'data-info': - this.emitter.emit('data-info-received', { entity, ws, data }) - break - case 'compilation-ended': - this.emitter.emit('compilation-ended', { entity, ws, data }) - break - case 'listen': - this.emitter.emit('listen', { entity, ws, data }) - break - case 'exit': - this.emitter.emit('exit', { entity, ws, data }) - break - case 'error': - this.handleError({ data }) - break - default: - logger.info(data) - } - } - - handleError ({ data }) { - logger.error(data) - this.restart() - this.reject(new Error('An error has occured!')) - } - - handleExit ({ entity, data }) { - switch (entity) { - case 'player': - this.state.exit += 1 - if (data.id === '0') { - this.state.results = data.output - } - if (this.state.exit === this.players.length) { - this.emitter.emit('computation-finished', { data }) - } - break - case 'client': - this.state.import += 1 - if (this.state.import === this.clients.length) { - this.emitter.emit('importation-finished', { data }) - } - break - default: - } - } - - handleDataInfo ({ data }) { - this.state.dataInfoReceived += 1 - this.processDataInfo(data.datasetInfo) - - if (this.state.dataInfoReceived === this.clients.length) { - if (this.state.dataInfo.dataSize === 0) { - return this.handleComputationFinished({ data }) - } - - this.updateStep(step.DATA_SIZE_ACCEPTED) - this.state.dataInfoReceived = 0 - this.sendToAll(pack({ message: 'compile', job: this.job.data, dataInfo: this.state.dataInfo }), this.players) - } - } - - handleCompilation ({ data }) { - this.state.compiled += 1 - if (this.state.compiled === this.players.length) { - logger.info('Compilation finished.') - this.updateStep(step.COMPILE_END) - this.state.compiled = 0 - this.sendToAll(pack({ message: 'start', job: this.job.data }), this.players) - } - } - - handleComputationFinished ({ data }) { - logger.info('Computation Finished') - this.updateStep(step.COMPUTATION_END) - this.state.exit = 0 - this.cleanUpPlayers() - this.cleanUpClients() - this.processResults() - this.state.results = this.postProcess([ ...this.state.results ]) - this.resolve(this.state.results) - } - - processResults () { - let results = [] - - if (this.state.dataInfo.dataSize === 0) { - return [] - } - - for (let r of this.state.results.split('\n')) { - if (r.includes('#') || r.includes('START')) { - continue - } - - if (r.includes('$') || r.includes('END')) { - break - } - - results.push(r) - } - - this.state.results = [...results] - } - - handleImportationFinished () { - logger.info('Importation Finished') - this.updateStep(step.IMPORT_END) - } - - execute () { - return new Promise((resolve, reject) => this._execute(resolve, reject)) - } - - restart () { - const msg = pack({ message: 'restart', job: this.job.data }) - this.sendToAll(msg, this.players) - this.sendToAll(msg, this.clients) - } - - sendToAll (message, entities) { - for (const e of entities) { - if (e.socket && e.socket.readyState === WebSocket.OPEN) { - e.socket.send(message) // Assume message is already packed. Message must be packed beforehand to avoid expessive call to JSON API - } - } - } - - listen () { - this.state.listen += 1 - if (this.state.listen === this.players.length) { - logger.info('Players are listening...') - this.updateStep(step.IMPORT_START) - this.state.listen = 0 - this.sendToAll(pack({ message: 'import', job: this.job.data }), this.clients) - } - } - - updateStep (_step) { - this.state.step = _step - // this.job.reportProgress({ step: this.state.step }) // Next release will inlude the feature to pass arbitrary data in reportProgress - this.job.reportProgress((_step / (Object.keys(step).length - 1)) * 100) - } - - cleanUpClients () { - this.cleanUp(this.clients) - } - - cleanUpPlayers () { - this.cleanUp(this.players) - } - - cleanUp (entities) { - for (const e of entities) { - if (e.socket) { - e.socket.close(1000) - } - } - } -} - -module.exports = Computation diff --git a/smpc/HistogramComputation.js b/smpc/HistogramComputation.js deleted file mode 100644 index fda7a9d..0000000 --- a/smpc/HistogramComputation.js +++ /dev/null @@ -1,170 +0,0 @@ -const _ = require('lodash') -const Computation = require('./Computation') -const attributes = require('../smpc-global/attributes.json') -const mapping = require('../smpc-global/mapping.json') -const meshTerms = require('../smpc-global/meshTerms.json') - -class HistogramComputation extends Computation { - constructor (job) { - super(job) - this.state.dataInfo = { - precision: 0.00001, - sizeAlloc: 0, - cellsX: null, - cellsY: null, - dataSize: 0 - } - } - - processDataInfo (info) { - this.state.dataInfo.sizeAlloc += Number(info.sizeAlloc) - this.state.dataInfo.dataSize += Number(info.dataSize) - this.state.dataInfo.precision = Math.min(Number(info.precision)) - this.state.dataInfo.attributeToInt = info.attributeToInt - this.state.dataInfo.intToAttribute = info.intToAttribute - - if (this.job.data.algorithm === '1d_categorical_histogram') { - this.state.dataInfo.cellsX = Number(info.cellsX) - } - - if (this.job.data.algorithm === '2d_mixed_histogram') { - this.state.dataInfo.cellsX = Number(info.cellsX) - this.state.dataInfo.cellsY = this.getNumericCell() - } - - if (this.job.data.algorithm === '2d_categorical_histogram') { - this.state.dataInfo.cellsX = Number(info.cellsX) - this.state.dataInfo.cellsY = Number(info.cellsY) - } - - if (this.job.data.algorithm === '1d_numerical_histogram') { - this.state.dataInfo.cellsX = this.getNumericCell() - } - - if (this.job.data.algorithm === '2d_numerical_histogram') { - this.state.dataInfo.cellsX = this.getCell(this.job.data.attributes[0]) - this.state.dataInfo.cellsY = this.getCell(this.job.data.attributes[1]) - } - } - - getNumericalAttribute () { - return this.job.data.attributes.find(a => attributes.find(b => b.name === a.name && b.type === 'numerical')) - } - getCatecoricalAttribute () { - return this.job.data.attributes.find(a => attributes.find(b => b.name === a.name && b.type === 'categorical')) - } - - getNumericCell () { - return this.getCell(this.getNumericalAttribute()) - } - - getCell (attr) { - const defaultCells = Math.floor(this.state.dataInfo.dataSize / 10) - const cells = attr.cells || defaultCells - return Math.min(cells, defaultCells) - } - - getMinMax (data) { - const m = data.replace(/\s/g, '').split(',') - return { min: Number(m[0]), max: Number(m[1]) } - } - - fill (x, y = 0, value = 0) { - if (y === 0) { - return Array(x).fill(value) - } - - return Array.from(Array(x), _ => Array(y).fill(value)) - } - - construct2DArray (data, cellsX, cellsY) { - let arr = this.fill(cellsX, cellsY) - - for (let i = 0; i < data.length; i++) { - let b = data[i].replace(/\s/g, '').split(',') - arr[Number(b[0])][Number(b[1])] = b[2] - } - - return arr - } - - construct2DArrayFromFlattenArray (data, cellsX, cellsY) { - const y = Math.ceil(cellsY / cellsX) - return _.chunk(data, y).map(arr => arr.map(i => i.replace(/\s/g, '').split(',')[1])) - } - - getAttributeNames (mesh) { - // Take the values of the childer of the attribute - // Sort children by mapping number - // Get only mesh term - // Get mesh's term name - return Object - .entries(mapping[mesh]) - .sort((a, b) => a[1] - b[1]) - .map(t => t[0]) - .map(m => meshTerms[m].name) - } - - computeAxisLabels (min, max, width, cells) { - let start = min - let end = max - const ticks = [] - for (const _ of Array(cells - 1).keys()) { // eslint-disable-line no-unused-vars - end = start + width - ticks.push(`[${start}, ${end})`) - start = end - } - end = start + width - ticks.push(`[${start}, ${end}]`) - return ticks - } - - postProcess (data) { - let results = data - - if (this.job.data.algorithm === '1d_categorical_histogram') { - results = data.reduce((previous, current) => { - const xy = current.replace(/\s/g, '').split(',') - if (!Number.isNaN(Number(xy[1]))) { - previous.y.push(Number(xy[1])) - } - return previous - }, { x: this.getAttributeNames(this.job.data.attributes[0].name), y: [] }) - } - - if (this.job.data.algorithm === '2d_mixed_histogram') { - const m = this.getMinMax(data[0]) - data = data.slice(1) - data = this.construct2DArray(data, this.state.dataInfo.cellsX, this.getNumericCell()) - let cells = Number(this.getNumericCell()) - - results = { ...m, z: [...data], y: this.getAttributeNames(this.getCatecoricalAttribute().name), cells } - } - - if (this.job.data.algorithm === '2d_categorical_histogram') { - data = this.construct2DArrayFromFlattenArray(data, this.state.dataInfo.cellsX, this.state.dataInfo.cellsY) - results = { z: [...data], labels: { y: this.getAttributeNames(this.job.data.attributes[0].name), x: this.getAttributeNames(this.job.data.attributes[1].name) } } - } - - if (this.job.data.algorithm === '1d_numerical_histogram') { - const m = this.getMinMax(data[0]) - data = data.slice(1) - results = { ...m, y: data.map(item => item.replace(/\s/g, '').split(',')[1]), cells: this.getCell(this.job.data.attributes[0]) } - } - - if (this.job.data.algorithm === '2d_numerical_histogram') { - const m0 = this.getMinMax(data[0]) - const m1 = this.getMinMax(data[1]) - let cellsX = this.getCell(this.job.data.attributes[0]) - let cellsY = this.getCell(this.job.data.attributes[1]) - - data = data.slice(2) - data = this.construct2DArray(data, cellsX, cellsY) - results = { min: [m0.min, m1.min], max: [m0.max, m1.max], z: data, cellsX, cellsY } - } - - return results - } -} - -module.exports = HistogramComputation diff --git a/smpc/smpc.js b/smpc/smpc.js deleted file mode 100644 index 0d6f3ba..0000000 --- a/smpc/smpc.js +++ /dev/null @@ -1,16 +0,0 @@ -const { protocolMapping } = require('../protocols') - -const compute = async (job) => { - if (protocolMapping.has(job.data.protocol)) { - const Protocol = protocolMapping.get(job.data.protocol) - const computation = new Protocol(job) - let out = await computation.execute() - return out - } else { - throw new Error('Compute: Protocol not supported!') - } -} - -module.exports = { - compute -} From 56ee6f5ddfb87883aff9bd32934be890ea4b16e0 Mon Sep 17 00:00:00 2001 From: Christos Nasikas Date: Mon, 14 Oct 2019 18:25:51 +0300 Subject: [PATCH 10/23] Refactor smpc routes --- routes/smpc.js | 109 +------------------------------------------------ 1 file changed, 2 insertions(+), 107 deletions(-) diff --git a/routes/smpc.js b/routes/smpc.js index a3214a8..b028e46 100644 --- a/routes/smpc.js +++ b/routes/smpc.js @@ -1,117 +1,12 @@ const express = require('express') -const contentDisposition = require('content-disposition') - -const { addJobToDB, getJob } = require('../db') -const { constructJob } = require('../helpers') -const { status } = require('../config/constants') -const { HTTPError } = require('../errors') -const { addJobToQueue } = require('../queue') +const { createJobRouter } = require('./utils') const validateHistogram = require('../validators/histogram') const { processAttributes, processDataProviders, preprocess, cache } = require('../middlewares') const auth = require('../auth') let router = express.Router() -/* - * Deprecated - * Use createJobRouter -*/ - -const createSimpleSMPCRouter = (router, path, middlewares, protocol) => { - router.post(path, middlewares, async (req, res, next) => { - const job = constructJob(req.body) - job.protocol = protocol - - try { - const location = `/api/smpc/queue/${job.id}` - res.set('Location', location) - res.status(202).json({ location, ...job, status: status.properties[status.PENDING].msg }) - - await addJobToDB({ ...job }) - await addJobToQueue({ ...job }) - } catch (err) { - job.status = status.FAILED - next(err) - } - }) - - return router -} - -/* - * Deprecated - * Use api/queue/:id -*/ - -router.get('/queue/:id', [auth.authenticate], async (req, res, next) => { - try { - let value = await getJob(req.params.id) - - if (value.status !== status.COMPLETED) { - return res.status(200).json({ - ...value, - status: status.properties[value.status].msg - }) - } - - const location = `/api/smpc/results/${req.params.id}` - - res.set('Location', location) - return res.status(303).json({ location }) - } catch (err) { - if (err.notFound) { - next(new HTTPError(404, 'Not found')) - } - - next(err) - } -}) - -const getResults = async (req, res, next, download = false) => { - try { - let value = await getJob(req.params.id) - value = { ...value, status: status.properties[value.status].msg } - res.set({ - 'Content-Type': 'application/json' - }) - - if (download) { - res.set({ - 'Content-Disposition': contentDisposition(`${req.params.id}.json`) - }) - - value = JSON.stringify(value) - } - - return res.status(200).send(value) - } catch (err) { - if (err.notFound) { - next(new HTTPError(404, 'Not found')) - } - - next(err) - } -} - -/* - * Deprecated - * Use api/results/:id -*/ - -router.get('/results/:id', [auth.authenticate], async (req, res, next) => { - await getResults(req, res, next) -}) - -/* - * Deprecated - * Use api/results/:id/download -*/ - -router.get('/:id/download', [auth.authenticate], async (req, res, next) => { - await getResults(req, res, next, true) -}) - -router = createSimpleSMPCRouter( +router = createJobRouter( router, '/histogram', [auth.authenticate, processAttributes, processDataProviders, validateHistogram, preprocess, cache], From 87db02fc6d7f625099bb66ce9fe98a2986b8bc8c Mon Sep 17 00:00:00 2001 From: Christos Nasikas Date: Mon, 14 Oct 2019 18:26:06 +0300 Subject: [PATCH 11/23] Add comment --- routes/utils.js | 1 + 1 file changed, 1 insertion(+) diff --git a/routes/utils.js b/routes/utils.js index 5cb4e0a..2f4ff61 100644 --- a/routes/utils.js +++ b/routes/utils.js @@ -5,6 +5,7 @@ const { addJobToQueue } = require('../queue') const createJobRouter = (router, path, middlewares, protocol) => { router.post(path, middlewares, async (req, res, next) => { + // Consider making it a middlware in case the job desc is different among routes const job = constructJob(req.body) job.protocol = protocol From 824a8ee075a207b5b8df6e5ca494034ebb1c861f Mon Sep 17 00:00:00 2001 From: Christos Nasikas Date: Mon, 14 Oct 2019 18:27:51 +0300 Subject: [PATCH 12/23] Ignore json files inside scripts --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 11e2d2e..4051444 100644 --- a/.gitignore +++ b/.gitignore @@ -124,3 +124,4 @@ smpc-user-db certs .env.* key_store +scripts/*.json From 69de1a99b1642f9dba67366c7e0fc51b4036d290 Mon Sep 17 00:00:00 2001 From: Christos Nasikas Date: Mon, 14 Oct 2019 18:28:16 +0300 Subject: [PATCH 13/23] Create draft ml routes --- routes/index.js | 2 ++ routes/ml.js | 27 +++++++++++++++++++++++++++ 2 files changed, 29 insertions(+) create mode 100644 routes/ml.js diff --git a/routes/index.js b/routes/index.js index df321db..626b299 100644 --- a/routes/index.js +++ b/routes/index.js @@ -4,6 +4,7 @@ const queue = require('./queue') const results = require('./results') const search = require('./search') const auth = require('./auth') +const ml = require('./ml') const router = express.Router() @@ -16,5 +17,6 @@ module.exports = { '/api/queue': queue, '/api/results': results, '/api/search': search, + '/api/ml': ml, '/api/auth': auth } diff --git a/routes/ml.js b/routes/ml.js new file mode 100644 index 0000000..3144b30 --- /dev/null +++ b/routes/ml.js @@ -0,0 +1,27 @@ +const express = require('express') + +const { processDataProviders, preprocess } = require('../middlewares') +const { createJobRouter } = require('./utils') +const { addJobToDB, getJob } = require('../db') + +let router = express.Router() + +router = createJobRouter( + router, + '/blackbox/create', + [preprocess], + 'dockerImage' +) + +router.get('/blackbox/:id', [processDataProviders, preprocess], async (req, res, next) => { + +}) + +router = createJobRouter( + router, + '/blackbox/:id/train', + [preprocess], + 'blackbox' +) + +module.exports = router From ebe1d4dbcea9aa9ed5a422eb512f784d265fb81d Mon Sep 17 00:00:00 2001 From: Christos Nasikas Date: Mon, 14 Oct 2019 18:41:18 +0300 Subject: [PATCH 14/23] Add option to choose which entity type will participate --- protocols/Protocol.js | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/protocols/Protocol.js b/protocols/Protocol.js index e9a4dec..6634ffe 100644 --- a/protocols/Protocol.js +++ b/protocols/Protocol.js @@ -5,7 +5,7 @@ const EventEmitter = require('events') const { players, clients, ROOT_CA, KEY, CERT } = require('../config') class Protocol { - constructor ({ job, name }) { + constructor ({ job, name, opts = { entities: 'both' } }) { if (new.target === Protocol) { throw new TypeError('Cannot construct abstract Protocol instances directly') } @@ -33,8 +33,13 @@ class Protocol { } } - this._initPlayers() - this._initClients() + if (opts.entities === 'clients' || opts.entities === 'both') { + this._initClients() + } + + if (opts.entities === 'players' || opts.entities === 'both') { + this._initPlayers() + } } _validate ({ job }) { From b0b3ce509da687ebd78b7466ca768f7291a8089a Mon Sep 17 00:00:00 2001 From: Christos Nasikas Date: Mon, 14 Oct 2019 18:41:33 +0300 Subject: [PATCH 15/23] Fix path --- protocols/index.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocols/index.js b/protocols/index.js index d4591a6..cc5d909 100644 --- a/protocols/index.js +++ b/protocols/index.js @@ -1,4 +1,4 @@ -const HistogramProtocol = require('../protocols/HistogramProtocol') +const HistogramProtocol = require('./HistogramProtocol') const protocolMapping = new Map() From 52e34676056aa00626e62b72a3b7ac4cd5f59aa6 Mon Sep 17 00:00:00 2001 From: Christos Nasikas Date: Mon, 14 Oct 2019 18:41:59 +0300 Subject: [PATCH 16/23] Create DockerImageProtocol --- protocols/DockerImageProtocol.js | 28 ++++++++++++++++++++++++++++ protocols/index.js | 2 ++ 2 files changed, 30 insertions(+) create mode 100644 protocols/DockerImageProtocol.js diff --git a/protocols/DockerImageProtocol.js b/protocols/DockerImageProtocol.js new file mode 100644 index 0000000..00ed264 --- /dev/null +++ b/protocols/DockerImageProtocol.js @@ -0,0 +1,28 @@ +const Protocol = require('./Protocol') +const logger = require('../config/winston') +const { step } = require('../config') + +const { pack, unpack } = require('../helpers') + +class DockerImageProtocol extends Protocol { + constructor (job) { + super({ job, name: 'dockerImage', opts: { entities: 'clients' } }) + } + + _execute () { + } + + handleOpen ({ ws, entity }) { + } + + handleClose ({ ws, code, reason, entity }) { + } + + handleError ({ ws, err, entity }) { + } + + handleMessage ({ ws, msg, entity }) { + } +} + +module.exports = DockerImageProtocol diff --git a/protocols/index.js b/protocols/index.js index cc5d909..38ba128 100644 --- a/protocols/index.js +++ b/protocols/index.js @@ -1,9 +1,11 @@ const HistogramProtocol = require('./HistogramProtocol') +const DockerImageProtocol = require('./DockerImageProtocol') const protocolMapping = new Map() /* All available protocols */ protocolMapping.set('histogram', HistogramProtocol) +protocolMapping.set('dockerImage', DockerImageProtocol) const compute = async job => { if (protocolMapping.has(job.data.protocol)) { From c779f3d14ef071869bd8d5191b6b09470fd4ec9a Mon Sep 17 00:00:00 2001 From: Christos Nasikas Date: Mon, 14 Oct 2019 19:22:57 +0300 Subject: [PATCH 17/23] Create ml validator --- routes/ml.js | 4 ++-- validators/ml.js | 20 ++++++++++++++++++++ 2 files changed, 22 insertions(+), 2 deletions(-) create mode 100644 validators/ml.js diff --git a/routes/ml.js b/routes/ml.js index 3144b30..b808b7a 100644 --- a/routes/ml.js +++ b/routes/ml.js @@ -2,14 +2,14 @@ const express = require('express') const { processDataProviders, preprocess } = require('../middlewares') const { createJobRouter } = require('./utils') -const { addJobToDB, getJob } = require('../db') +const { validateCreation } = require('../validators/ml') let router = express.Router() router = createJobRouter( router, '/blackbox/create', - [preprocess], + [preprocess, validateCreation], 'dockerImage' ) diff --git a/validators/ml.js b/validators/ml.js new file mode 100644 index 0000000..23aa24a --- /dev/null +++ b/validators/ml.js @@ -0,0 +1,20 @@ +const _ = require('lodash') +const { HTTPError } = require('../errors') + +const validateCreation = (req, res, next) => { + if (!req.body.image_url) { + next(new HTTPError(400, 'image_url: Bad request')) + return + } + + if (!_.isNumber(req.body.number_of_parameters)) { + next(new HTTPError(400, 'number_of_parameters: Must be a number')) + return + } + + next() +} + +module.exports = { + validateCreation +} From d2fab82195ca54ca1370ec306e34766a9e2e37eb Mon Sep 17 00:00:00 2001 From: Christos Nasikas Date: Sun, 20 Oct 2019 20:29:03 +0300 Subject: [PATCH 18/23] Add decorators --- protocols/HistogramProtocol.js | 29 +++++--------------- protocols/Protocol.js | 48 +++++++++++++++++++++++++++++++--- 2 files changed, 50 insertions(+), 27 deletions(-) diff --git a/protocols/HistogramProtocol.js b/protocols/HistogramProtocol.js index 9c4d777..715e44d 100644 --- a/protocols/HistogramProtocol.js +++ b/protocols/HistogramProtocol.js @@ -67,41 +67,24 @@ class HistogramProtocol extends Protocol { } handleOpen ({ ws, entity }) { - if (entity.type === 'player') { - logger.info(`Connected to player ${entity.id}.`) - ws.send(pack({ message: 'job-info', job: this.job.data })) - } - if (entity.type === 'client') { - logger.info(`Connected to client ${entity.id}.`) - ws.send(pack({ message: 'job-info', job: this.job.data })) ws.send(pack({ message: 'data-info', job: this.job.data })) } } handleClose ({ ws, code, reason, entity }) { - if (entity.type === 'player') { - logger.info(`Disconnected from player ${entity.id}.`) - this.players[ws._index].socket = null - - if (this.state.step < step.COMPUTATION_END) { - this.restart() - this.reject(new Error(`Player ${entity.id} closed before the end of the computation. Reason: ${reason}`)) - } + if (entity.type === 'player' && this.state.step < step.COMPUTATION_END) { + this.restart() + this.reject(new Error(`Player ${entity.id} closed before the end of the computation. Reason: ${reason}`)) } - if (entity.type === 'client') { - logger.info(`Disconnected from client ${entity.id}.`) - this.clients[ws._index].socket = null - if (this.state.step < step.IMPORT_END) { - this.restart() - this.reject(new Error(`Client ${entity.id} closed before the end of the importation. Reason: ${reason}`)) - } + if (entity.type === 'client' && this.state.step < step.IMPORT_END) { + this.restart() + this.reject(new Error(`Client ${entity.id} closed before the end of the importation. Reason: ${reason}`)) } } handleError ({ ws, err, entity }) { - logger.error(err) this.restart() this.reject(new Error('An error has occured!')) } diff --git a/protocols/Protocol.js b/protocols/Protocol.js index 6634ffe..7fe9a5f 100644 --- a/protocols/Protocol.js +++ b/protocols/Protocol.js @@ -2,7 +2,9 @@ const _ = require('lodash') const WebSocket = require('ws') const EventEmitter = require('events') +const logger = require('../config/winston') const { players, clients, ROOT_CA, KEY, CERT } = require('../config') +const { pack } = require('../helpers') class Protocol { constructor ({ job, name, opts = { entities: 'both' } }) { @@ -55,13 +57,13 @@ class Protocol { const ws = this[key][index].socket - ws.on('open', () => this.handleOpen({ ws, entity })) + ws.on('open', () => this._openDecorator({ ws, entity })) - ws.on('close', (code, reason) => this.handleClose({ ws, code, reason, entity })) + ws.on('close', (code, reason) => this._closeDecorator({ ws, code, reason, entity })) - ws.on('error', (err) => this.handleError({ ws, err, entity })) + ws.on('error', (err) => this._errorDecorator({ ws, err, entity })) - ws.on('message', (msg) => this.handleMessage({ ws, msg, entity })) + ws.on('message', (msg) => this._messageDecorator({ ws, msg, entity })) } _initPlayers () { @@ -108,6 +110,44 @@ class Protocol { } } + /* Decorators */ + _openDecorator ({ ws, entity }) { + if (entity.type === 'player') { + logger.info(`Connected to player ${entity.id}.`) + ws.send(pack({ message: 'job-info', job: this.job.data })) + } + + if (entity.type === 'client') { + logger.info(`Connected to client ${entity.id}.`) + ws.send(pack({ message: 'job-info', job: this.job.data })) + } + + this.handleOpen({ ws, entity }) + } + + _closeDecorator ({ ws, code, reason, entity }) { + if (entity.type === 'player') { + logger.info(`Disconnected from player ${entity.id}.`) + this.players[ws._index].socket = null + } + + if (entity.type === 'client') { + logger.info(`Disconnected from client ${entity.id}.`) + this.clients[ws._index].socket = null + } + + this.handleClose({ ws, code, reason, entity }) + } + + _errorDecorator ({ ws, err, entity }) { + logger.error(err) + this.handleError({ ws, err, entity }) + } + + _messageDecorator ({ ws, msg, entity }) { + this.handleMessage({ ws, msg, entity }) + } + /* Abstract Methods */ _execute () { throw new Error('_execute: Implementation Missing!') From c2c71fccbb8f79aef6651d82b32934364cda752d Mon Sep 17 00:00:00 2001 From: Christos Nasikas Date: Sun, 20 Oct 2019 21:04:09 +0300 Subject: [PATCH 19/23] Move function to base class --- protocols/HistogramProtocol.js | 6 ------ protocols/Protocol.js | 6 ++++++ 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/protocols/HistogramProtocol.js b/protocols/HistogramProtocol.js index 715e44d..9b1195d 100644 --- a/protocols/HistogramProtocol.js +++ b/protocols/HistogramProtocol.js @@ -196,12 +196,6 @@ class HistogramProtocol extends Protocol { this.updateStep(step.IMPORT_END) } - restart () { - const msg = pack({ message: 'restart', job: this.job.data }) - this.sendToAll(msg, this.players) - this.sendToAll(msg, this.clients) - } - listen () { this.state.listen += 1 if (this.state.listen === this.players.length) { diff --git a/protocols/Protocol.js b/protocols/Protocol.js index 7fe9a5f..66c58df 100644 --- a/protocols/Protocol.js +++ b/protocols/Protocol.js @@ -110,6 +110,12 @@ class Protocol { } } + restart () { + const msg = pack({ message: 'restart', job: this.job.data }) + this.sendToAll(msg, this.players) + this.sendToAll(msg, this.clients) + } + /* Decorators */ _openDecorator ({ ws, entity }) { if (entity.type === 'player') { From 578465fcb7ff362d2867b4a35fef4fb1b32d6873 Mon Sep 17 00:00:00 2001 From: Christos Nasikas Date: Sun, 20 Oct 2019 21:04:25 +0300 Subject: [PATCH 20/23] Move reject to base class --- protocols/HistogramProtocol.js | 1 - protocols/Protocol.js | 2 ++ 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/protocols/HistogramProtocol.js b/protocols/HistogramProtocol.js index 9b1195d..7d68700 100644 --- a/protocols/HistogramProtocol.js +++ b/protocols/HistogramProtocol.js @@ -86,7 +86,6 @@ class HistogramProtocol extends Protocol { handleError ({ ws, err, entity }) { this.restart() - this.reject(new Error('An error has occured!')) } handleMessage ({ ws, msg, entity }) { diff --git a/protocols/Protocol.js b/protocols/Protocol.js index 66c58df..b4516d0 100644 --- a/protocols/Protocol.js +++ b/protocols/Protocol.js @@ -148,6 +148,8 @@ class Protocol { _errorDecorator ({ ws, err, entity }) { logger.error(err) this.handleError({ ws, err, entity }) + // TODO: Get messages from err if exist + this.reject(new Error('An error has occured!')) } _messageDecorator ({ ws, msg, entity }) { From f337625a44282fdcb79730433668d71520de71af Mon Sep 17 00:00:00 2001 From: Christos Nasikas Date: Sun, 20 Oct 2019 23:36:35 +0300 Subject: [PATCH 21/23] Unpack message to decorator --- protocols/HistogramProtocol.js | 1 - protocols/Protocol.js | 3 ++- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/protocols/HistogramProtocol.js b/protocols/HistogramProtocol.js index 7d68700..7a511a6 100644 --- a/protocols/HistogramProtocol.js +++ b/protocols/HistogramProtocol.js @@ -89,7 +89,6 @@ class HistogramProtocol extends Protocol { } handleMessage ({ ws, msg, entity }) { - msg = unpack(msg) switch (msg.message) { case 'data-info': this.emitter.emit('data-info-received', { entity, ws, msg }) diff --git a/protocols/Protocol.js b/protocols/Protocol.js index b4516d0..5d66015 100644 --- a/protocols/Protocol.js +++ b/protocols/Protocol.js @@ -4,7 +4,7 @@ const EventEmitter = require('events') const logger = require('../config/winston') const { players, clients, ROOT_CA, KEY, CERT } = require('../config') -const { pack } = require('../helpers') +const { pack, unpack } = require('../helpers') class Protocol { constructor ({ job, name, opts = { entities: 'both' } }) { @@ -153,6 +153,7 @@ class Protocol { } _messageDecorator ({ ws, msg, entity }) { + msg = unpack(msg) this.handleMessage({ ws, msg, entity }) } From 5524f312fb2d92afbf727a55353f069612c627f3 Mon Sep 17 00:00:00 2001 From: Christos Nasikas Date: Wed, 23 Oct 2019 16:54:57 +0300 Subject: [PATCH 22/23] Change error handling based on new message layout --- protocols/HistogramProtocol.js | 20 +++++++------------- protocols/Protocol.js | 23 ++++++++++++++++++++--- queue.js | 1 - 3 files changed, 27 insertions(+), 17 deletions(-) diff --git a/protocols/HistogramProtocol.js b/protocols/HistogramProtocol.js index 7a511a6..825352c 100644 --- a/protocols/HistogramProtocol.js +++ b/protocols/HistogramProtocol.js @@ -52,18 +52,13 @@ class HistogramProtocol extends Protocol { this.emitter.on('importation-finished', (msg) => this._eventMiddleware('importation-finished', msg, this.handleImportationFinished.bind(this))) } - _eventMiddleware (event, msg, next) { - if (msg.data) { - if (msg.data.errors && msg.data.errors.length > 0) { - return this.handleError({ err: new Error(msg) }) - } - - if (msg.data.code && msg.data.code !== 0) { - return this.handleError({ err: new Error(msg) }) - } + _eventMiddleware (event, data, next) { + const { ws, entity, msg } = data + if (msg && msg.code && msg.code !== 0) { + return this.error({ ws, err: { message: `Entity ${entity.type} exitted with code ${msg.code}. Errors: ${msg.error.message}` }, entity }) } - next(msg) + next(data) } handleOpen ({ ws, entity }) { @@ -75,11 +70,13 @@ class HistogramProtocol extends Protocol { handleClose ({ ws, code, reason, entity }) { if (entity.type === 'player' && this.state.step < step.COMPUTATION_END) { this.restart() + // this.close() this.reject(new Error(`Player ${entity.id} closed before the end of the computation. Reason: ${reason}`)) } if (entity.type === 'client' && this.state.step < step.IMPORT_END) { this.restart() + // this.close() this.reject(new Error(`Client ${entity.id} closed before the end of the importation. Reason: ${reason}`)) } } @@ -102,9 +99,6 @@ class HistogramProtocol extends Protocol { case 'exit': this.emitter.emit('exit', { entity, ws, msg }) break - case 'error': - this.handleError({ msg }) - break default: logger.info(msg) } diff --git a/protocols/Protocol.js b/protocols/Protocol.js index 5d66015..c73254a 100644 --- a/protocols/Protocol.js +++ b/protocols/Protocol.js @@ -94,6 +94,11 @@ class Protocol { } } + close () { + this.cleanUpClients() + this.cleanUpPlayers() + } + cleanUpClients () { this.cleanUp(this.clients) } @@ -146,17 +151,29 @@ class Protocol { } _errorDecorator ({ ws, err, entity }) { - logger.error(err) this.handleError({ ws, err, entity }) - // TODO: Get messages from err if exist - this.reject(new Error('An error has occured!')) + this.error({ ws, err, entity }) } _messageDecorator ({ ws, msg, entity }) { msg = unpack(msg) + + if (msg.message === 'error') { + this._errorDecorator({ ws, err: msg.error, entity }) + return + } + this.handleMessage({ ws, msg, entity }) } + error ({ ws, err, entity }) { + this.close() + let errorMessage = (err && err.message) || 'An error has occured' + const error = `Error at entity ${entity.type} with ID ${entity.id}. Message: ${errorMessage}` + logger.error(error) + this.reject(new Error(error)) + } + /* Abstract Methods */ _execute () { throw new Error('_execute: Implementation Missing!') diff --git a/queue.js b/queue.js index c62d8e7..1b737ec 100644 --- a/queue.js +++ b/queue.js @@ -58,7 +58,6 @@ queue.on('ready', () => { const results = await compute(job) return results } catch (e) { - console.error(e) throw new Error(e.message) } }) From 9f7216592e66e6b085b38e9c9b171ca9fed1e5c5 Mon Sep 17 00:00:00 2001 From: Christos Nasikas Date: Wed, 23 Oct 2019 16:55:47 +0300 Subject: [PATCH 23/23] Finishe docker image protocol --- protocols/DockerImageProtocol.js | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/protocols/DockerImageProtocol.js b/protocols/DockerImageProtocol.js index 00ed264..a0f52bb 100644 --- a/protocols/DockerImageProtocol.js +++ b/protocols/DockerImageProtocol.js @@ -1,27 +1,47 @@ const Protocol = require('./Protocol') const logger = require('../config/winston') -const { step } = require('../config') -const { pack, unpack } = require('../helpers') +const { pack } = require('../helpers') class DockerImageProtocol extends Protocol { constructor (job) { super({ job, name: 'dockerImage', opts: { entities: 'clients' } }) + this.state = { + responses: 0 + } } _execute () { + logger.info('Initiating Docker Image Protocol...') } handleOpen ({ ws, entity }) { + ws.send(pack({ message: 'import-image', job: this.job.data })) } handleClose ({ ws, code, reason, entity }) { + if (this.state.responses < this.clients.length) { + this.reject(new Error(`Client ${entity.id} closed before the end of the computation. Reason: ${reason}`)) + } } - handleError ({ ws, err, entity }) { - } + handleError ({ ws, err, entity }) {} handleMessage ({ ws, msg, entity }) { + switch (msg.message) { + case 'image-imported': + this.handleImageImported(msg) + break + default: + logger.info(msg) + } + } + + handleImageImported (msg) { + this.state.responses++ + if (this.state.responses >= this.clients.length) { + this.resolve({}) + } } }