Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactors code to support inheritance #198

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/server/error/NotImplementedError.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
module.exports = class NotImplementedError extends Error {
constructor(message = "") {
super();
this.message = message;
this.name = this.constructor.name;
}
};
87 changes: 87 additions & 0 deletions src/server/queue/bee.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
const Queue = require('./queue');
const Job = require('./job');
const JobData = require('./jobData');

class BeeJob extends Job {
async remove() {
await this._job.remove();
}

async getStatus() {
return this._job.status;
}

async toJSON() {
const {id, progress, data, options: {timestamp, stacktraces: stacktrace, delay}} = this._job;
return new JobData({id, progress, data, timestamp, stacktrace, delay});
}
}

const VALID_STATES = ['waiting', 'active', 'succeeded', 'failed', 'delayed'];
const SUPPORTED_ACTIONS = ['remove'];

module.exports = class BeeQueue extends Queue {
constructor(queueConfig) {
const {name} = queueConfig;
const options = BeeQueue.parseConfig(queueConfig);
const queue = new BeeQueue(name, options);
super(queue);
}

static parseConfig(queueConfig) {
const options = {
redis: this.parseRedisConfig(queueConfig),
isWorker: false,
getEvents: false,
sendEvents: false,
storeJobs: false,
};
const {prefix} = queueConfig;
if (prefix) options.prefix = prefix;
return options;
}

async getJob(id) {
const job = this._queue.getJob(id);
return new BeeJob(job);
}

async getJobCounts() {
const jobCounts = this._queue.checkHealth();
delete jobCounts.newestJob;
return jobCounts;
}

async getJobs(state, start, size) {
const page = {};

if (['failed', 'succeeded'].includes(state)) {
page.size = size;
} else {
page.start = start;
page.end = start + size - 1;
}

let jobs = await this._queue.getJobs(state, page);
// Filter out Bee jobs that have already been removed by the time the promise resolves
jobs = jobs.filter((job) => job);
return jobs.map((j) => new BeeJob(j));
}

async addJob(data) {
const job = await this._queue.createJob(data).save();
return new BeeJob(job);
}

isValidState(state) {
return VALID_STATES.includes(state);
}

isActionSupported(action) {
return SUPPORTED_ACTIONS.includes(action);
}

isPaginationSupported(state) {
return state !== 'succeeded' && state !== 'failed';
}
};
97 changes: 97 additions & 0 deletions src/server/queue/bull.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
const {capitalize} = require('lodash');
const Bull = require('bull');
const Queue = require('./queue');
const Job = require('./job');
const JobData = require('./jobData');

const VALID_STATES = ['waiting', 'active', 'completed', 'failed', 'delayed'];
const SUPPORTED_ACTIONS = ['remove', 'retry'];

class BullJob extends Job {
async remove() {
await this._job.remove();
}

async retry() {
await this._job.retry();
}

async getStatus() {
return this._job.getState();
}


async toJSON() {
const {
id,
name,
data,
attemptsMade,
failedReason,
stacktrace,
returnvalue: returnValue,
timestamp,
delay,
progress
} = this._job.toJSON();
return new JobData({
id,
name,
data,
attemptsMade,
failedReason,
stacktrace,
timestamp,
delay,
progress,
returnValue,
});
}
}

module.exports = class BullQueue extends Queue {
constructor(queueConfig) {
const {name} = queueConfig;
const options = BullQueue.parseConfig(queueConfig);
const queue = Bull(name, options);
super(queue);
}

static parseConfig(queueConfig) {
const options = {redis: this.parseRedisConfig(queueConfig)};
const {createClient, prefix} = queueConfig;
if (createClient) options.createClient = createClient;
if (prefix) options.prefix = prefix;
return options;
}

async getJob(id) {
const job = await this._queue.getJob(id);
return new BullJob(job);
}

async getJobCounts() {
return this._queue.getJobCounts();
}

async getJobs(state, start, size) {
const jobs = await this._queue[`get${capitalize(state)}`](start, start + size - 1);
return jobs.map((j) => new BullJob(j));
}

async addJob(data) {
const job = await this._queue.add(data, {
removeOnComplete: false,
removeOnFail: false
});
return new BullJob(job);
}

isValidState(state) {
return VALID_STATES.includes(state);
}

isActionSupported(action) {
return SUPPORTED_ACTIONS.includes(action);
}
};
52 changes: 6 additions & 46 deletions src/server/queue/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const _ = require('lodash');
const Bull = require('bull');
const Bee = require('bee-queue');
const BullQueue = require('./bull');
const BeeQueue = require('./bee');

class Queues {
constructor(config) {
Expand Down Expand Up @@ -38,59 +38,19 @@ class Queues {
return this._queues[queueHost][queueName];
}

const { type, name, port, host, db, password, prefix, url, redis, tls } = queueConfig;

const redisHost = { host };
if (password) redisHost.password = password;
if (port) redisHost.port = port;
if (db) redisHost.db = db;
if (tls) redisHost.tls = tls;

const isBee = type === 'bee';

const options = {
redis: redis || url || redisHost
};
if (prefix) options.prefix = prefix;

const {type} = queueConfig;
let queue;
if (isBee) {
_.extend(options, {
isWorker: false,
getEvents: false,
sendEvents: false,
storeJobs: false
});

queue = new Bee(name, options);
queue.IS_BEE = true;
if (type === 'bee') {
queue = new BeeQueue(queueConfig);
} else {
if (queueConfig.createClient) options.createClient = queueConfig.createClient;
queue = new Bull(name, options);
queue = new BullQueue(queueConfig);
}

this._queues[queueHost] = this._queues[queueHost] || {};
this._queues[queueHost][queueName] = queue;

return queue;
}

/**
* Creates and adds a job with the given `data` to the given `queue`.
*
* @param {Object} queue A bee or bull queue class
* @param {Object} data The data to be used within the job
*/
async set(queue, data) {
if (queue.IS_BEE) {
return queue.createJob(data).save();
} else {
return queue.add(data, {
removeOnComplete: false,
removeOnFail: false
});
}
}
}

module.exports = Queues;
22 changes: 22 additions & 0 deletions src/server/queue/job.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
const NotImplementedError = require('../error/NotImplementedError');

module.exports = class Job {
constructor(job) {
this._job = job;
if (new.target === Job) {
throw new TypeError("Cannot construct Job instances directly");
}
}

async remove() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have these abstract methods throw a not implemented exception? This would both make it clear that these should be overridden, and avoid weird cases down the road where we instantiate a Job but avoid the constructor.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, just check once. Didn't use a message because the stacktrace should already cover the method name from where the error originated.

throw new NotImplementedError();
}

async getStatus() {
throw new NotImplementedError();
}

async toJSON() {
throw new NotImplementedError();
}
};
16 changes: 16 additions & 0 deletions src/server/queue/jobData.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
module.exports = class JobData {
constructor({id, name, data, stacktrace, timestamp, progress, delay, attemptsMade, returnValue, failedReason}) {
this.id = id;
this.name = name;
this.data = data;
this.progress = progress;
this.attemptsMade = attemptsMade;
this.returnValue = returnValue;
this.failedReason = failedReason;
this.options = {
stacktrace,
timestamp,
delay,
};
}
};
51 changes: 51 additions & 0 deletions src/server/queue/queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
const NotImplementedError = require('../error/NotImplementedError');

module.exports = class Queue {
constructor(queue) {
this._queue = queue;
if (new.target === Queue) {
throw new TypeError("Cannot construct Queue instances directly");
}
}

static parseRedisConfig({port, host, db, password, url, redis, tls}) {
const redisHost = {host};
if (password) redisHost.password = password;
if (port) redisHost.port = port;
if (db) redisHost.db = db;
if (tls) redisHost.tls = tls;
return redis || url || redisHost;
}

get redisClient() {
return this._queue.client;
}

async getJob(_id) {
throw new NotImplementedError();
}

async getJobCounts() {
throw new NotImplementedError();
}

async getJobs(_state, _start, _size) {
throw new NotImplementedError();
}

async addJob(_data, _options) {
throw new NotImplementedError();
}

isValidState(_state) {
throw new NotImplementedError();
}

isActionSupported(_action) {
throw new NotImplementedError();
}

isPaginationSupported(_state) {
return true;
}
};
18 changes: 8 additions & 10 deletions src/server/views/api/bulkAction.js
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
const _ = require('lodash');

const ACTIONS = ['remove', 'retry'];

function bulkAction(action) {
return async function handler(req, res) {
if (!_.includes(ACTIONS, action)) {
res.status(401).send({
error: 'unauthorized action',
details: `action ${action} not permitted`
});
}

const { queueName, queueHost } = req.params;
const {Queues} = req.app.locals;
const queue = await Queues.get(queueName, queueHost);
if (!queue) return res.status(404).send({error: 'queue not found'});
if (!queue) return void res.status(404).json({error: 'queue not found'});

if (!queue.isActionSupported(action)) {
return void res.status(401).json({
error: 'unauthorized action',
details: `queue does not support action ${action}`
});
}

const {jobs} = req.body;

Expand Down
2 changes: 1 addition & 1 deletion src/server/views/api/jobAdd.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ async function handler(req, res) {
if (!queue) return res.status(404).json({ error: 'queue not found' });

try {
await Queues.set(queue, data);
await queue.addJob(data);
} catch (err) {
return res.status(500).json({ error: err.message });
}
Expand Down
Loading