Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blackbox #21

Draft
wants to merge 23 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,4 @@ smpc-user-db
certs
.env.*
key_store
scripts/*.json
83 changes: 1 addition & 82 deletions helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ const { appEmitter } = require('./emitters.js')
const { status } = require('./config/constants')
const totalAttributes = require('./smpc-global/attributes.json')
const algorithms = require('./smpc-global/algorithms.json')
const mapping = require('../smpc-global/mapping.json')
const meshTerms = require('../smpc-global/meshTerms.json')

const pack = (msg) => {
return JSON.stringify(msg)
Expand Down Expand Up @@ -43,90 +41,11 @@ const constructJob = request => {
return { ...request, timestamps: { accepted: Date.now() }, status: status.PENDING }
}

const getNumericalAttribute = (attributes) => {
return attributes.find(a => attributes.find(b => b.name === a.name && b.type === 'numerical'))
}

const getCatecoricalAttribute = (attributes) => {
return attributes.find(a => attributes.find(b => b.name === a.name && b.type === 'categorical'))
}

const getNumericCell = (attributes) => {
return getCell(getNumericalAttribute(attributes))
}

const getCell = (attr) => {
const defaultCells = Math.floor(this.state.dataInfo.dataSize / 10)
const cells = attr.cells || defaultCells
return Math.min(cells, defaultCells)
}

const getHistogramMinMax = (data) => {
const m = data.replace(/\s/g, '').split(',')
return { min: Number(m[0]), max: Number(m[1]) }
}

const fillHistogramArray = (x, y = 0, value = 0) => {
if (y === 0) {
return Array(x).fill(value)
}

return Array.from(Array(x), _ => Array(y).fill(value))
}

const constructHistogram2DArray = (data, cellsX, cellsY) => {
let arr = fillHistogramArray(cellsX, cellsY)

for (let i = 0; i < data.length; i++) {
let b = data[i].replace(/\s/g, '').split(',')
arr[Number(b[0])][Number(b[1])] = b[2]
}

return arr
}

const histogram2DArrayFromFlattenArray = (data, cellsX, cellsY) => {
const y = Math.ceil(cellsY / cellsX)
return _.chunk(data, y).map(arr => arr.map(i => i.replace(/\s/g, '').split(',')[1]))
}

const getAttributeNames = (mesh) => {
// Take the vales of the childer of the attribute
// Sort children by mapping number
// Get only mesh term
// Get mesh's term name
return Object
.entries(mapping[mesh])
.sort((a, b) => a[1] - b[1])
.map(t => t[0])
.map(m => meshTerms[m].name)
}

const computeAxisLabels = (min, max, width, cells) => {
let start = min
let end = max
const ticks = []
for (const _ of Array(cells - 1).keys()) { // eslint-disable-line no-unused-vars
end = start + width
ticks.push(`[${start}, ${end})`)
start = end
}
end = start + width
ticks.push(`[${start}, ${end}]`)
return ticks
}

module.exports = {
getHistogramType,
pack,
unpack,
sha256,
updateJobStatus,
constructJob,
getNumericCell,
getCell,
getAttributeNames,
getHistogramMinMax,
constructHistogram2DArray,
histogram2DArrayFromFlattenArray
constructJob
}
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
},
"repository": {
"type": "git",
"url": "https://github.com/Athena-MHMD/smpc-coordinator.git"
"url": "https://github.com/athenarc/smpc-coordinator.git"
},
"homepage": "https://github.com/Athena-MHMD/smpc-coordinator",
"bugs": "https://github.com/Athena-MHMD/smpc-coordinator/issues",
"homepage": "https://github.com/athenarc/smpc-coordinator",
"bugs": "https://github.com/athenarc/smpc-coordinator/issues",
"license": "MIT",
"standard": {
"ignore": []
Expand Down
48 changes: 48 additions & 0 deletions protocols/DockerImageProtocol.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
const Protocol = require('./Protocol')
const logger = require('../config/winston')

const { pack } = require('../helpers')

class DockerImageProtocol extends Protocol {
constructor (job) {
super({ job, name: 'dockerImage', opts: { entities: 'clients' } })
this.state = {
responses: 0
}
}

_execute () {
logger.info('Initiating Docker Image Protocol...')
}

handleOpen ({ ws, entity }) {
ws.send(pack({ message: 'import-image', job: this.job.data }))
}

handleClose ({ ws, code, reason, entity }) {
if (this.state.responses < this.clients.length) {
this.reject(new Error(`Client ${entity.id} closed before the end of the computation. Reason: ${reason}`))
}
}

handleError ({ ws, err, entity }) {}

handleMessage ({ ws, msg, entity }) {
switch (msg.message) {
case 'image-imported':
this.handleImageImported(msg)
break
default:
logger.info(msg)
}
}

handleImageImported (msg) {
this.state.responses++
if (this.state.responses >= this.clients.length) {
this.resolve({})
}
}
}

module.exports = DockerImageProtocol
63 changes: 16 additions & 47 deletions protocols/HistogramProtocol.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@ const Protocol = require('./Protocol')
const logger = require('../config/winston')
const { step } = require('../config')
const {
pack,
unpack,
getNumericCell,
getCell,
getAttributeNames,
getHistogramMinMax,
constructHistogram2DArray,
histogram2DArrayFromFlattenArray,
getCatecoricalAttribute
} = require('../helpers')
} = require('./utils')

const { pack, unpack } = require('../helpers')

class HistogramProtocol extends Protocol {
constructor (job) {
Expand Down Expand Up @@ -52,62 +52,40 @@ class HistogramProtocol extends Protocol {
this.emitter.on('importation-finished', (msg) => this._eventMiddleware('importation-finished', msg, this.handleImportationFinished.bind(this)))
}

_eventMiddleware (event, msg, next) {
if (msg.data) {
if (msg.data.errors && msg.data.errors.length > 0) {
return this.handleError({ err: new Error(msg) })
}

if (msg.data.code && msg.data.code !== 0) {
return this.handleError({ err: new Error(msg) })
}
_eventMiddleware (event, data, next) {
const { ws, entity, msg } = data
if (msg && msg.code && msg.code !== 0) {
return this.error({ ws, err: { message: `Entity ${entity.type} exitted with code ${msg.code}. Errors: ${msg.error.message}` }, entity })
}

next(msg)
next(data)
}

handleOpen ({ ws, entity }) {
if (entity.type === 'player') {
logger.info(`Connected to player ${entity.id}.`)
ws.send(pack({ message: 'job-info', job: this.job.data }))
}

if (entity.type === 'client') {
logger.info(`Connected to client ${entity.id}.`)
ws.send(pack({ message: 'job-info', job: this.job.data }))
ws.send(pack({ message: 'data-info', job: this.job.data }))
}
}

handleClose ({ ws, code, reason, entity }) {
if (entity.type === 'player') {
logger.info(`Disconnected from player ${entity.id}.`)
this.players[ws._index].socket = null

if (this.state.step < step.COMPUTATION_END) {
this.restart()
this.reject(new Error(`Player ${entity.id} closed before the end of the computation. Reason: ${reason}`))
}
if (entity.type === 'player' && this.state.step < step.COMPUTATION_END) {
this.restart()
// this.close()
this.reject(new Error(`Player ${entity.id} closed before the end of the computation. Reason: ${reason}`))
}

if (entity.type === 'client') {
logger.info(`Disconnected from client ${entity.id}.`)
this.clients[ws._index].socket = null
if (this.state.step < step.IMPORT_END) {
this.restart()
this.reject(new Error(`Client ${entity.id} closed before the end of the importation. Reason: ${reason}`))
}
if (entity.type === 'client' && this.state.step < step.IMPORT_END) {
this.restart()
// this.close()
this.reject(new Error(`Client ${entity.id} closed before the end of the importation. Reason: ${reason}`))
}
}

handleError ({ ws, err, entity }) {
logger.error(err)
this.restart()
this.reject(new Error('An error has occured!'))
}

handleMessage ({ ws, msg, entity }) {
msg = unpack(msg)
switch (msg.message) {
case 'data-info':
this.emitter.emit('data-info-received', { entity, ws, msg })
Expand All @@ -121,9 +99,6 @@ class HistogramProtocol extends Protocol {
case 'exit':
this.emitter.emit('exit', { entity, ws, msg })
break
case 'error':
this.handleError({ msg })
break
default:
logger.info(msg)
}
Expand Down Expand Up @@ -213,12 +188,6 @@ class HistogramProtocol extends Protocol {
this.updateStep(step.IMPORT_END)
}

restart () {
const msg = pack({ message: 'restart', job: this.job.data })
this.sendToAll(msg, this.players)
this.sendToAll(msg, this.clients)
}

listen () {
this.state.listen += 1
if (this.state.listen === this.players.length) {
Expand Down
Loading