-
Notifications
You must be signed in to change notification settings - Fork 232
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactors code to support inheritance #198
Open
anuragagarwal561994
wants to merge
7
commits into
bee-queue:master
Choose a base branch
from
anuragagarwal561994:using-inheritance
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
010b1e4
Refactors code to support inheritance
anuragagarwal561994 68021e8
Removes unused lodash require
anuragagarwal561994 34b9922
Apply suggestions from code review for removing redundancy
anuragagarwal561994 0a2f7d1
Fixes the flow for adding job
anuragagarwal561994 584b3e0
Removes implicit constructors
anuragagarwal561994 69e0aaf
Adds NotImplementedError and uses it in every abstract method
anuragagarwal561994 dd8532d
Apply suggestions from code review to change the api return values
anuragagarwal561994 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
module.exports = class NotImplementedError extends Error { | ||
constructor(message = "") { | ||
super(); | ||
this.message = message; | ||
this.name = this.constructor.name; | ||
} | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
const Queue = require('./queue'); | ||
const Job = require('./job'); | ||
const JobData = require('./jobData'); | ||
|
||
class BeeJob extends Job { | ||
async remove() { | ||
await this._job.remove(); | ||
} | ||
|
||
async getStatus() { | ||
return this._job.status; | ||
} | ||
|
||
async toJSON() { | ||
const {id, progress, data, options: {timestamp, stacktraces: stacktrace, delay}} = this._job; | ||
return new JobData({id, progress, data, timestamp, stacktrace, delay}); | ||
} | ||
} | ||
|
||
const VALID_STATES = ['waiting', 'active', 'succeeded', 'failed', 'delayed']; | ||
const SUPPORTED_ACTIONS = ['remove']; | ||
|
||
module.exports = class BeeQueue extends Queue { | ||
constructor(queueConfig) { | ||
const {name} = queueConfig; | ||
const options = BeeQueue.parseConfig(queueConfig); | ||
const queue = new BeeQueue(name, options); | ||
super(queue); | ||
} | ||
|
||
static parseConfig(queueConfig) { | ||
const options = { | ||
redis: this.parseRedisConfig(queueConfig), | ||
isWorker: false, | ||
getEvents: false, | ||
sendEvents: false, | ||
storeJobs: false, | ||
}; | ||
const {prefix} = queueConfig; | ||
if (prefix) options.prefix = prefix; | ||
return options; | ||
} | ||
|
||
async getJob(id) { | ||
const job = this._queue.getJob(id); | ||
return new BeeJob(job); | ||
} | ||
|
||
async getJobCounts() { | ||
const jobCounts = this._queue.checkHealth(); | ||
delete jobCounts.newestJob; | ||
return jobCounts; | ||
} | ||
|
||
async getJobs(state, start, size) { | ||
const page = {}; | ||
|
||
if (['failed', 'succeeded'].includes(state)) { | ||
page.size = size; | ||
} else { | ||
page.start = start; | ||
page.end = start + size - 1; | ||
} | ||
|
||
let jobs = await this._queue.getJobs(state, page); | ||
// Filter out Bee jobs that have already been removed by the time the promise resolves | ||
jobs = jobs.filter((job) => job); | ||
return jobs.map((j) => new BeeJob(j)); | ||
} | ||
|
||
async addJob(data) { | ||
const job = await this._queue.createJob(data).save(); | ||
return new BeeJob(job); | ||
} | ||
|
||
isValidState(state) { | ||
return VALID_STATES.includes(state); | ||
} | ||
|
||
isActionSupported(action) { | ||
return SUPPORTED_ACTIONS.includes(action); | ||
} | ||
|
||
isPaginationSupported(state) { | ||
return state !== 'succeeded' && state !== 'failed'; | ||
} | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
const {capitalize} = require('lodash'); | ||
const Bull = require('bull'); | ||
const Queue = require('./queue'); | ||
const Job = require('./job'); | ||
const JobData = require('./jobData'); | ||
|
||
const VALID_STATES = ['waiting', 'active', 'completed', 'failed', 'delayed']; | ||
const SUPPORTED_ACTIONS = ['remove', 'retry']; | ||
|
||
class BullJob extends Job { | ||
async remove() { | ||
await this._job.remove(); | ||
} | ||
|
||
async retry() { | ||
await this._job.retry(); | ||
} | ||
|
||
async getStatus() { | ||
return this._job.getState(); | ||
} | ||
|
||
|
||
async toJSON() { | ||
const { | ||
id, | ||
name, | ||
data, | ||
attemptsMade, | ||
failedReason, | ||
stacktrace, | ||
returnvalue: returnValue, | ||
timestamp, | ||
delay, | ||
progress | ||
} = this._job.toJSON(); | ||
return new JobData({ | ||
id, | ||
name, | ||
data, | ||
attemptsMade, | ||
failedReason, | ||
stacktrace, | ||
timestamp, | ||
delay, | ||
progress, | ||
returnValue, | ||
}); | ||
} | ||
} | ||
|
||
module.exports = class BullQueue extends Queue { | ||
constructor(queueConfig) { | ||
const {name} = queueConfig; | ||
const options = BullQueue.parseConfig(queueConfig); | ||
const queue = Bull(name, options); | ||
super(queue); | ||
} | ||
|
||
static parseConfig(queueConfig) { | ||
const options = {redis: this.parseRedisConfig(queueConfig)}; | ||
const {createClient, prefix} = queueConfig; | ||
if (createClient) options.createClient = createClient; | ||
if (prefix) options.prefix = prefix; | ||
return options; | ||
} | ||
|
||
async getJob(id) { | ||
const job = await this._queue.getJob(id); | ||
return new BullJob(job); | ||
} | ||
|
||
async getJobCounts() { | ||
return this._queue.getJobCounts(); | ||
} | ||
|
||
async getJobs(state, start, size) { | ||
const jobs = await this._queue[`get${capitalize(state)}`](start, start + size - 1); | ||
return jobs.map((j) => new BullJob(j)); | ||
} | ||
|
||
async addJob(data) { | ||
const job = await this._queue.add(data, { | ||
removeOnComplete: false, | ||
removeOnFail: false | ||
}); | ||
return new BullJob(job); | ||
} | ||
|
||
isValidState(state) { | ||
return VALID_STATES.includes(state); | ||
} | ||
|
||
isActionSupported(action) { | ||
return SUPPORTED_ACTIONS.includes(action); | ||
} | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
const NotImplementedError = require('../error/NotImplementedError'); | ||
|
||
module.exports = class Job { | ||
constructor(job) { | ||
this._job = job; | ||
if (new.target === Job) { | ||
throw new TypeError("Cannot construct Job instances directly"); | ||
} | ||
} | ||
|
||
async remove() { | ||
throw new NotImplementedError(); | ||
} | ||
|
||
async getStatus() { | ||
throw new NotImplementedError(); | ||
} | ||
|
||
async toJSON() { | ||
throw new NotImplementedError(); | ||
} | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
module.exports = class JobData { | ||
constructor({id, name, data, stacktrace, timestamp, progress, delay, attemptsMade, returnValue, failedReason}) { | ||
this.id = id; | ||
this.name = name; | ||
this.data = data; | ||
this.progress = progress; | ||
this.attemptsMade = attemptsMade; | ||
this.returnValue = returnValue; | ||
this.failedReason = failedReason; | ||
this.options = { | ||
stacktrace, | ||
timestamp, | ||
delay, | ||
}; | ||
} | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
const NotImplementedError = require('../error/NotImplementedError'); | ||
|
||
module.exports = class Queue { | ||
constructor(queue) { | ||
this._queue = queue; | ||
if (new.target === Queue) { | ||
throw new TypeError("Cannot construct Queue instances directly"); | ||
} | ||
} | ||
|
||
static parseRedisConfig({port, host, db, password, url, redis, tls}) { | ||
const redisHost = {host}; | ||
if (password) redisHost.password = password; | ||
if (port) redisHost.port = port; | ||
if (db) redisHost.db = db; | ||
if (tls) redisHost.tls = tls; | ||
return redis || url || redisHost; | ||
} | ||
|
||
get redisClient() { | ||
return this._queue.client; | ||
} | ||
|
||
async getJob(_id) { | ||
throw new NotImplementedError(); | ||
} | ||
|
||
async getJobCounts() { | ||
throw new NotImplementedError(); | ||
} | ||
|
||
async getJobs(_state, _start, _size) { | ||
throw new NotImplementedError(); | ||
} | ||
|
||
async addJob(_data, _options) { | ||
throw new NotImplementedError(); | ||
} | ||
|
||
isValidState(_state) { | ||
throw new NotImplementedError(); | ||
} | ||
|
||
isActionSupported(_action) { | ||
throw new NotImplementedError(); | ||
} | ||
|
||
isPaginationSupported(_state) { | ||
return true; | ||
} | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have these abstract methods throw a not implemented exception? This would both make it clear that these should be overridden, and avoid weird cases down the road where we instantiate a
Job
but avoid the constructor.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, just check once. Didn't use a message because the stacktrace should already cover the method name from where the error originated.