From 63471b920db0cf383169b54c990466ca46aee8a1 Mon Sep 17 00:00:00 2001 From: Jonathan Muller Date: Tue, 12 Sep 2017 16:30:30 +0200 Subject: [PATCH] Shared env features / context debugToken (#82) * Shared env features / context debugToken * Review fixes * Fixes on shared env after some testing :) --- README.md | 7 ++ src/configs.js | 3 +- src/describe.js | 9 +++ src/index.js | 156 ++++++++++++++++++++++----------------- src/utils.js | 63 +++++++++++++++- tests/shared-env.spec.js | 80 ++++++++++++++++++++ 6 files changed, 247 insertions(+), 71 deletions(-) create mode 100644 tests/shared-env.spec.js diff --git a/README.md b/README.md index 6b1954b..bf7f460 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,7 @@ It is part of a lightweight microservice framework that we are cooking here at C - Tiny (depends on amqplib, debug, and puid) - Built-in [distributed tracing](http://microservices.io/patterns/observability/distributed-tracing.html), see doc - Provide your own transport to log every microservice message + - Allow to share your environments and services between developers without launching the whole stack **Compatible with**: - Auto-documentation of your microservices with [carotte-dashboard](https://github.com/cubyn/carotte-dashboard) @@ -178,3 +179,9 @@ carotte.invoke('direct/increment:describe') ``` This structure is also used in [carotte-dashboard](https://github.com/cubyn/carotte-dashboard) to auto-document your microservices architecture. You can find more information about how it works on the dashboard repository. + + +## Working together +When multiple devs are working on multiple microservices, you can use environment variables to be able to communicate with each other. To do so, the developers must set the `CAROTTE_DEBUG_TOKEN` env variable to a shared string before launching their services. + +carotte will then automatically reach each-others services. diff --git a/src/configs.js b/src/configs.js index fb43221..f4d710a 100644 --- a/src/configs.js +++ b/src/configs.js @@ -80,5 +80,6 @@ module.exports = { parseSubscriptionOptions, getPackageJson, getExchangeName, - getQueueName + getQueueName, + debugToken: process.env.CAROTTE_DEBUG_TOKEN }; diff --git a/src/describe.js b/src/describe.js index 4f139e5..5864f1f 100644 --- a/src/describe.js +++ b/src/describe.js @@ -1,3 +1,5 @@ +const configs = require('./configs'); + const replaceNonDirect = /^(topic|fanout)\//; module.exports.subscribeToDescribe = function (carotte, qualifier, meta) { @@ -7,6 +9,13 @@ module.exports.subscribeToDescribe = function (carotte, qualifier, meta) { qualifier = `${parts[0]}/${parts[parts.length - 1]}`; } + // remove debug token from qualifier before subscribing to the describe channel + // because previous queue is already suffixed with debug token and subscribe will do it + // again + if (configs.debugToken) { + qualifier = qualifier.replace(`:${configs.debugToken}`, ''); + } + carotte.subscribe(`${qualifier}:describe`, { queue: { durable: false, autoDelete: true } }, () => { return meta; }); diff --git a/src/index.js b/src/index.js index 0c6af16..fdc1c0b 100644 --- a/src/index.js +++ b/src/index.js @@ -14,7 +14,9 @@ const { serializeError, extend, emptyTransport, - getTransactionStack + getTransactionStack, + debugDestinationExists, + getDebugQueueName } = require('./utils'); const { parseQualifier, @@ -102,7 +104,7 @@ function Carotte(config) { * @param {number} [prefetch] The channel prefetch settings * @return {promise} return the channel created */ - carotte.getChannel = function getChannel(name = '', prefetch = 0) { + carotte.getChannel = function getChannel(name = '', prefetch = 0, isDebug = false) { prefetch = Number(prefetch); const channelKey = (prefetch > 0) ? `${name}:${prefetch}` : 0; @@ -114,16 +116,19 @@ function Carotte(config) { .then(conn => conn.createChannel()) .then(chan => { chan.prefetch(prefetch, (process.env.RABBITMQ_PREFETCH === 'legacy') ? undefined : true); return chan; }) .then(chan => { - initDebug('channel created correctly'); + initDebug(`channel ${channelKey} created correctly`); chan.on('close', (err) => { channels[channelKey] = undefined; - carotte.cleanExchangeCache(); - carotte.onClose(err); + if (!isDebug) { + carotte.cleanExchangeCache(); + carotte.onClose(err); + } }); + // this allow chan to throw on errors - chan.once('error', carotte.onError); + chan.once('error', !isDebug ? carotte.onError : () => {}); - if (config.enableDeadLetter) { + if (config.enableDeadLetter && !isDebug) { return chan.assertQueue(config.deadLetterQualifier) .then(q => chan.bindQueue(q.queue, 'amq.direct', q.queue)) .then(() => chan); @@ -132,8 +137,10 @@ function Carotte(config) { }) .catch(err => { channels[channelKey] = undefined; - carotte.cleanExchangeCache(); - throw err; + if (!isDebug) { + carotte.cleanExchangeCache(); + throw err; + } }); return channels[channelKey]; @@ -220,76 +227,81 @@ function Carotte(config) { options.headers['x-destination'] = qualifier; } - const exchangeName = getExchangeName(options); - const rpc = options.headers['x-reply-to'] !== undefined; - const { log = true } = options; - - // isContentBuffer is used by internal functions who don't modify the content - const buffer = options.isContentBuffer - ? payload - : Buffer.from(JSON.stringify({ - data: payload, - context: Object.assign({}, options.context, { - transactionStack: getTransactionStack(options.context) - }) - })); + // get updated routing key for debug, if dest queue exists + return debugDestinationExists(carotte, options.routingKey, options.context) + .then(routingKey => { + const exchangeName = getExchangeName(options); + const rpc = options.headers['x-reply-to'] !== undefined; + const { log = true } = options; + + // isContentBuffer is used by internal functions who don't modify the content + const buffer = options.isContentBuffer + ? payload + : Buffer.from(JSON.stringify({ + data: payload, + context: Object.assign({}, options.context, { + transactionStack: getTransactionStack(options.context) + }) + })); - producerDebug('called'); - return carotte.getChannel() - .then(chan => { - let ok; + producerDebug('called'); - if (!exchangeCache[exchangeName]) { - producerDebug(`create exchange ${exchangeName}`); - ok = chan.assertExchange(exchangeName, options.type, { - durable: options.durable - }); - exchangeCache[exchangeName] = ok; - } else { - producerDebug(`use exchange ${exchangeName} from cache`); - ok = exchangeCache[exchangeName]; - } + return carotte.getChannel() + .then(chan => { + let ok; - return ok.then(() => { - producerDebug(`publishing to ${options.routingKey} on ${exchangeName}`); - if (log) { - config.transport.info(`${rpc ? '▶ ' : '▷ '} ${options.type}/${options.routingKey}`, { + if (!exchangeCache[exchangeName]) { + producerDebug(`create exchange ${exchangeName}`); + ok = chan.assertExchange(exchangeName, options.type, { + durable: options.durable + }); + exchangeCache[exchangeName] = ok; + } else { + producerDebug(`use exchange ${exchangeName} from cache`); + ok = exchangeCache[exchangeName]; + } + + return ok.then(() => { + producerDebug(`publishing to ${options.routingKey} on ${exchangeName}`); + if (log) { + config.transport.info(`${rpc ? '▶ ' : '▷ '} ${options.type}/${options.routingKey}`, { + context: options.context, + headers: options.headers, + request: payload, + subscriber: options.context['origin-consumer'] || '', + destination: qualifier + }); + } + + return chan.publish( + exchangeName, + routingKey, + buffer, { + headers: Object.assign({}, options.headers, { + 'x-carotte-version': carottePackage.version, + 'x-origin-service': pkg.name + }), + contentType: 'application/json' + } + ); + }); + }) + .catch(err => { + config.transport.error(`${rpc ? '▶ ' : '▷ '} ${options.type}/${options.routingKey}`, { context: options.context, headers: options.headers, request: payload, subscriber: options.context['origin-consumer'] || '', - destination: qualifier + destination: qualifier, + error: err }); - } - return chan.publish( - exchangeName, - options.routingKey, - buffer, { - headers: Object.assign({}, options.headers, { - 'x-carotte-version': carottePackage.version, - 'x-origin-service': pkg.name - }), - contentType: 'application/json' + if (err.message.match(errorToRetryRegex)) { + return carotte.publish(qualifier, options, payload); } - ); - }); - }) - .catch(err => { - config.transport.error(`${rpc ? '▶ ' : '▷ '} ${options.type}/${options.routingKey}`, { - context: options.context, - headers: options.headers, - request: payload, - subscriber: options.context['origin-consumer'] || '', - destination: qualifier, - error: err - }); - - if (err.message.match(errorToRetryRegex)) { - return carotte.publish(qualifier, options, payload); - } - throw err; + throw err; + }); }); }; @@ -409,6 +421,12 @@ function Carotte(config) { options = {}; } + // don't use debug queue on fanout exchange types as it has no effect + // it will likely bork the channel + if (qualifier !== 'fanout') { + qualifier = getDebugQueueName(qualifier, options); + } + if (meta) { autodocAgent.addSubscriber(qualifier, meta); if (config.autoDescribe) { @@ -517,7 +535,7 @@ function Carotte(config) { carotte.handleRetry = function handleRetry(qualifier, options, meta = {}, headers, context, message) { return err => { - return carotte.getChannel(qualifier, options.prefetch) + return carotte.getChannel() .then(chan => { const retry = meta.retry || { max: 5, strategy: 'exponential', interval: 1 }; diff --git a/src/utils.js b/src/utils.js index afdd55d..9aa7546 100644 --- a/src/utils.js +++ b/src/utils.js @@ -1,3 +1,5 @@ +const configs = require('./configs'); + function createDeferred(timeout) { const deferred = {}; @@ -85,6 +87,63 @@ function serializeError(err) { return extend(extractedError, err); } +/** + * Return queue name for destination (when debug mode enabled). + * Also able to modify queue options to make them non-durable, auto-delete + * @param {string}} queue The initial queue name + * @param {object} [options] Provide amqp options for queue if you are subscribing + * @param {string} tokenOverride override the config token with provided string + * @return {string} The queue name to use for your call + */ +function getDebugQueueName(queue, options, tokenOverride) { + // debug token can be extracted from config (env) or context + const debugToken = tokenOverride || configs.debugToken; + + if (!debugToken) { + return queue; + } + + // if caller provides an option, it means he is probably subscribing + // so in case we set the queue as trashable (erase it on disconnect) + if (options) { + options.queue = options.queue || {}; + options.queue.durable = false; + options.queue.autoDelete = true; + } + + return `${queue}:${debugToken}`; +} + +/** + * Checks if queue exists in the broker and resolves with destination queue name + * @param {object} carotte a carotte instance + * @param {string} queue The initial queue name + * @param {object} context The context object + * @return {string} The destination queue name + */ +function debugDestinationExists(carotte, queue, context) { + // only if there is a debug token + const debugToken = context.debugToken || configs.debugToken; + + // and don't bother with RPC answers + if (debugToken && !queue.startsWith('amq.gen')) { + // get our trashable channel for existance check + // because amqp.lib trash the channel with checkQueue :D + return carotte.getChannel(debugToken, 1, true) + .then(channel => { + const dest = getDebugQueueName(queue, undefined, debugToken); + return channel.checkQueue(dest) + .then(() => { + // if it pass queue exists, we add token to context for future calls + context.debugToken = debugToken; + return dest; + }).catch(err => queue); + }); + } + + return Promise.resolve(queue); +} + const emptyTransport = { log: noop, info: noop, @@ -120,5 +179,7 @@ module.exports = { deserializeError, extend, emptyTransport, - getTransactionStack + getTransactionStack, + debugDestinationExists, + getDebugQueueName }; diff --git a/tests/shared-env.spec.js b/tests/shared-env.spec.js new file mode 100644 index 0000000..9c00fcc --- /dev/null +++ b/tests/shared-env.spec.js @@ -0,0 +1,80 @@ +const expect = require('chai').expect; +const carotte = require('./client')(); +const configs = require('../src/configs'); + +describe('shared env', () => { + const initialConfigToken = configs.debugToken; + + afterEach(() => { + configs.debugToken = initialConfigToken; + }); + + it('should be able to communicate with overloaded service', async () => { + await carotte.subscribe('direct/random-queue-aihie', { queue: { exclusive: true } }, ({ data }) => { + return 3; + }); + + await carotte.subscribe('direct/random-queue-aihie', { queue: { exclusive: true } }, ({ data }) => { + return 2; + }); + + configs.debugToken = 'I can do it'; + + await carotte.subscribe('direct/random-queue-aihie', { queue: { exclusive: true } }, ({ data }) => { + return 4; + }); + + return carotte.invoke('direct/random-queue-aihie', {}) + .then(data => { + expect(data).to.eql(4); + }); + }); + + + it('should be able to propagate debug token accross calls', async () => { + // this will be our regular service, not using tokens + await carotte.subscribe('direct/random-queue-helper', { queue: { exclusive: true } }, ({ data, invoke }) => { + // reset env token to simulate remote config + configs.debugToken = ''; + return invoke('final-dest', {}); + }); + + // this will be our regular service, another lambda + await carotte.subscribe('direct/final-dest', { queue: { exclusive: true } }, ({ data }) => { + // this should never been called as overloaded by the one below + return 'remote-final'; + }); + + // this will be our overloaded queue on dev computer + await carotte.subscribe('direct/final-dest:token', { queue: { exclusive: true } }, ({ data }) => { + // local dev reached! yay! + return 'local-final'; + }); + + // a remote queue, non-debug + await carotte.subscribe('direct/random-queue-xxaxa', { queue: { exclusive: true } }, ({ data }) => { + // should never be called + return 'not good'; + }); + + // a remote queue, non debug + await carotte.subscribe('direct/random-queue-xxaxa', { queue: { exclusive: true } }, ({ data }) => { + // should never be called + return 'not good'; + }); + + // simulate env token + configs.debugToken = 'token'; + + // local dev queue firstly called + await carotte.subscribe('direct/random-queue-xxaxa', { queue: { exclusive: true } }, ({ data, invoke }) => { + // local dev queue call remote queue + return invoke('random-queue-helper', {}); + }); + + return carotte.invoke('direct/random-queue-xxaxa', {}) + .then(data => { + expect(data).to.eql('local-final'); + }); + }); +});