Skip to content

Commit

Permalink
fix(mounting): differentiate base and appBase paths when disableListen (
Browse files Browse the repository at this point in the history
roggervalf authored Feb 18, 2023
1 parent 790f623 commit 387e3ac
Showing 7 changed files with 291 additions and 11 deletions.
134 changes: 134 additions & 0 deletions example/express.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
const Arena = require('../');
const express = require('express');
const {Queue, Worker, FlowProducer} = require('bullmq');
const RedisServer = require('redis-server');

// Select ports that are unlikely to be used by other services a developer might be running locally.
const HTTP_SERVER_PORT = 4735;
const REDIS_SERVER_PORT = 4736;

// Create a Redis server. This is only for convenience

async function main() {
const app = express();
const server = new RedisServer(REDIS_SERVER_PORT);
await server.open();
const queueName = 'name_of_my_queue';
const parentQueueName = 'name_of_my_parent_queue';

const queue = new Queue(queueName, {
connection: {port: REDIS_SERVER_PORT},
});
new Queue(parentQueueName, {
connection: {port: REDIS_SERVER_PORT},
});

const flow = new FlowProducer({
connection: {port: REDIS_SERVER_PORT},
});

new Worker(
queueName,
async function (job) {
await job.updateProgress(20);

// Wait 5sec
await new Promise((res) => setTimeout(res, 5000));

// Randomly succeeds or fails the job to put some jobs in completed and some in failed.
if (Math.random() > 0.5) {
throw new Error('fake error');
}
},
{
concurrency: 3,
connection: {port: REDIS_SERVER_PORT},
}
);

new Worker(
parentQueueName,
async function () {
// Wait 10sec
await new Promise((res) => setTimeout(res, 10000));

// Randomly succeeds or fails the job to put some jobs in completed and some in failed.
if (Math.random() > 0.5) {
throw new Error('fake error');
}
},
{
connection: {port: REDIS_SERVER_PORT},
}
);

const children = Array.from(Array(65).keys()).map((index) => ({
name: 'child',
data: {idx: index, foo: 'bar'},
queueName,
}));
await flow.add({
name: 'parent-job',
queueName: parentQueueName,
data: {},
children,
});

// adding delayed jobs
const delayedJob = await queue.add('delayed', {}, {delay: 60 * 1000});
delayedJob.log('Log message');

const arena = Arena(
{
BullMQ: Queue,

queues: [
{
// Required for each queue definition.
name: queueName,

// User-readable display name for the host. Required.
hostId: 'Queue Server 1',

// Queue type (Bull or Bullmq or Bee - default Bull).
type: 'bullmq',

redis: {
// host: 'localhost',
port: REDIS_SERVER_PORT,
},
},
{
// Required for each queue definition.
name: parentQueueName,

// User-readable display name for the host. Required.
hostId: 'Queue Server 2',

// Queue type (Bull or Bullmq or Bee - default Bull).
type: 'bullmq',

redis: {
// host: 'localhost',
port: REDIS_SERVER_PORT,
},
},
],
},
{
basePath: '/',
disableListen: true,
}
);

app.use('/arena', arena);

app.listen(HTTP_SERVER_PORT, () =>
console.log(`Arena listening on port ${HTTP_SERVER_PORT}!`)
);
}

main().catch((err) => {
console.error(err);
process.exit(1);
});
136 changes: 136 additions & 0 deletions example/fastify.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
const Arena = require('../');
const fastify = require('fastify');
const {Queue, Worker, FlowProducer} = require('bullmq');
const RedisServer = require('redis-server');

// Select ports that are unlikely to be used by other services a developer might be running locally.
const HTTP_SERVER_PORT = 4735;
const REDIS_SERVER_PORT = 4736;

// Create a Redis server. This is only for convenience

async function main() {
const app = fastify();
const server = new RedisServer(REDIS_SERVER_PORT);
await server.open();
const queueName = 'name_of_my_queue';
const parentQueueName = 'name_of_my_parent_queue';

const queue = new Queue(queueName, {
connection: {port: REDIS_SERVER_PORT},
});
new Queue(parentQueueName, {
connection: {port: REDIS_SERVER_PORT},
});

const flow = new FlowProducer({
connection: {port: REDIS_SERVER_PORT},
});

new Worker(
queueName,
async function (job) {
await job.updateProgress(20);

// Wait 5sec
await new Promise((res) => setTimeout(res, 5000));

// Randomly succeeds or fails the job to put some jobs in completed and some in failed.
if (Math.random() > 0.5) {
throw new Error('fake error');
}
},
{
concurrency: 3,
connection: {port: REDIS_SERVER_PORT},
}
);

new Worker(
parentQueueName,
async function () {
// Wait 10sec
await new Promise((res) => setTimeout(res, 10000));

// Randomly succeeds or fails the job to put some jobs in completed and some in failed.
if (Math.random() > 0.5) {
throw new Error('fake error');
}
},
{
connection: {port: REDIS_SERVER_PORT},
}
);

const children = Array.from(Array(65).keys()).map((index) => ({
name: 'child',
data: {idx: index, foo: 'bar'},
queueName,
}));
await flow.add({
name: 'parent-job',
queueName: parentQueueName,
data: {},
children,
});

// adding delayed jobs
const delayedJob = await queue.add('delayed', {}, {delay: 60 * 1000});
delayedJob.log('Log message');

const arena = Arena(
{
BullMQ: Queue,

queues: [
{
// Required for each queue definition.
name: queueName,

// User-readable display name for the host. Required.
hostId: 'Queue Server 1',

// Queue type (Bull or Bullmq or Bee - default Bull).
type: 'bullmq',

redis: {
// host: 'localhost',
port: REDIS_SERVER_PORT,
},
},
{
// Required for each queue definition.
name: parentQueueName,

// User-readable display name for the host. Required.
hostId: 'Queue Server 2',

// Queue type (Bull or Bullmq or Bee - default Bull).
type: 'bullmq',

redis: {
// host: 'localhost',
port: REDIS_SERVER_PORT,
},
},
],
},
{
basePath: '/',
disableListen: true,
}
);

await app.register(require('@fastify/express'));
app.use('/arena', arena);

app.listen({port: HTTP_SERVER_PORT}, (err, address) => {
if (err) throw err;
console.log(`Arena listening on port ${HTTP_SERVER_PORT}!`);
});
}

main().catch((err) => {
console.error(err);
process.exit(1);
});
4 changes: 4 additions & 0 deletions example/package.json
Original file line number Diff line number Diff line change
@@ -4,6 +4,8 @@
"description": "An example project that uses Arena",
"main": "bee.js",
"scripts": {
"start:fastify": "node fastify.js",
"start:express": "node express.js",
"start:bee": "node bee.js",
"start:bull": "node bull.js",
"start:bullmq": "node bullmq.js",
@@ -13,10 +15,12 @@
"author": "",
"license": "MIT",
"dependencies": {
"@fastify/express": "^2.3.0",
"bee-queue": "^1.4.0",
"bull": "^3.22.6",
"bullmq": "^3.0.0",
"express": "^4.17.1",
"fastify": "^4.13.0",
"redis-server": "^1.2.2"
}
}
25 changes: 14 additions & 11 deletions index.js
Original file line number Diff line number Diff line change
@@ -9,17 +9,20 @@ function run(config, listenOpts = {}) {
Queues.useCdn =
typeof listenOpts.useCdn !== 'undefined' ? listenOpts.useCdn : true;

app.locals.appBasePath = listenOpts.basePath || app.locals.appBasePath;

app.use(
app.locals.appBasePath,
express.static(path.join(__dirname, 'public'))
);
app.use(app.locals.appBasePath, routes);

const port = listenOpts.port || 4567;
const host = listenOpts.host || '0.0.0.0'; // Default: listen to all network interfaces.
if (!listenOpts.disableListen) {
if (listenOpts.disableListen) {
app.locals.appBasePath =
listenOpts.basePath == '/' ? app.locals.appBasePath : listenOpts.basePath;
app.use(
listenOpts.basePath ? listenOpts.basePath : '/',
express.static(path.join(__dirname, 'public'))
);
app.use(listenOpts.basePath ? listenOpts.basePath : '/', routes);
} else {
const appBasePath = listenOpts.basePath || app.locals.appBasePath;
app.use(appBasePath, express.static(path.join(__dirname, 'public')));
app.use(appBasePath, routes);
const port = listenOpts.port || 4567;
const host = listenOpts.host || '0.0.0.0'; // Default: listen to all network interfaces.
app.listen(port, host, () => {
console.log(`Arena is running on port ${port} at host ${host}`);
});
1 change: 1 addition & 0 deletions src/server/views/dashboard/flowDetails.js
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ async function handler(req, res) {
const {Flows} = req.app.locals;
const flow = await Flows.get(connectionName, flowHost);
const basePath = req.baseUrl;

if (!flow)
return res.status(404).render('dashboard/templates/flowNotFound', {
basePath,
1 change: 1 addition & 0 deletions src/server/views/dashboard/queueDetails.js
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ async function handler(req, res) {
const {Queues, Flows} = req.app.locals;
const queue = await Queues.get(queueName, queueHost);
const basePath = req.baseUrl;

if (!queue)
return res.status(404).render('dashboard/templates/queueNotFound', {
basePath,
1 change: 1 addition & 0 deletions src/server/views/dashboard/queueJobsByState.js
Original file line number Diff line number Diff line change
@@ -80,6 +80,7 @@ async function _html(req, res) {
const {Queues, Flows} = req.app.locals;
const queue = await Queues.get(queueName, queueHost);
const basePath = req.baseUrl;

if (!queue)
return res.status(404).render('dashboard/templates/queueNotFound', {
basePath,

0 comments on commit 387e3ac

Please sign in to comment.