Skip to content

Commit

Permalink
Shared env features / context debugToken (#82)
Browse files Browse the repository at this point in the history
* Shared env features / context debugToken

* Review fixes

* Fixes on shared env after some testing :)
  • Loading branch information
PuKoren authored Sep 12, 2017
1 parent 0b78e1b commit 63471b9
Show file tree
Hide file tree
Showing 6 changed files with 247 additions and 71 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ It is part of a lightweight microservice framework that we are cooking here at C
- Tiny (depends on amqplib, debug, and puid)
- Built-in [distributed tracing](http://microservices.io/patterns/observability/distributed-tracing.html), see doc
- Provide your own transport to log every microservice message
- Allow to share your environments and services between developers without launching the whole stack

**Compatible with**:
- Auto-documentation of your microservices with [carotte-dashboard](https://github.com/cubyn/carotte-dashboard)
Expand Down Expand Up @@ -178,3 +179,9 @@ carotte.invoke('direct/increment:describe')
```

This structure is also used in [carotte-dashboard](https://github.com/cubyn/carotte-dashboard) to auto-document your microservices architecture. You can find more information about how it works on the dashboard repository.


## Working together
When multiple devs are working on multiple microservices, you can use environment variables to be able to communicate with each other. To do so, the developers must set the `CAROTTE_DEBUG_TOKEN` env variable to a shared string before launching their services.

carotte will then automatically reach each-others services.
3 changes: 2 additions & 1 deletion src/configs.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,6 @@ module.exports = {
parseSubscriptionOptions,
getPackageJson,
getExchangeName,
getQueueName
getQueueName,
debugToken: process.env.CAROTTE_DEBUG_TOKEN
};
9 changes: 9 additions & 0 deletions src/describe.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
const configs = require('./configs');

const replaceNonDirect = /^(topic|fanout)\//;

module.exports.subscribeToDescribe = function (carotte, qualifier, meta) {
Expand All @@ -7,6 +9,13 @@ module.exports.subscribeToDescribe = function (carotte, qualifier, meta) {
qualifier = `${parts[0]}/${parts[parts.length - 1]}`;
}

// remove debug token from qualifier before subscribing to the describe channel
// because previous queue is already suffixed with debug token and subscribe will do it
// again
if (configs.debugToken) {
qualifier = qualifier.replace(`:${configs.debugToken}`, '');
}

carotte.subscribe(`${qualifier}:describe`, { queue: { durable: false, autoDelete: true } }, () => {
return meta;
});
Expand Down
156 changes: 87 additions & 69 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ const {
serializeError,
extend,
emptyTransport,
getTransactionStack
getTransactionStack,
debugDestinationExists,
getDebugQueueName
} = require('./utils');
const {
parseQualifier,
Expand Down Expand Up @@ -102,7 +104,7 @@ function Carotte(config) {
* @param {number} [prefetch] The channel prefetch settings
* @return {promise} return the channel created
*/
carotte.getChannel = function getChannel(name = '', prefetch = 0) {
carotte.getChannel = function getChannel(name = '', prefetch = 0, isDebug = false) {
prefetch = Number(prefetch);
const channelKey = (prefetch > 0) ? `${name}:${prefetch}` : 0;

Expand All @@ -114,16 +116,19 @@ function Carotte(config) {
.then(conn => conn.createChannel())
.then(chan => { chan.prefetch(prefetch, (process.env.RABBITMQ_PREFETCH === 'legacy') ? undefined : true); return chan; })
.then(chan => {
initDebug('channel created correctly');
initDebug(`channel ${channelKey} created correctly`);
chan.on('close', (err) => {
channels[channelKey] = undefined;
carotte.cleanExchangeCache();
carotte.onClose(err);
if (!isDebug) {
carotte.cleanExchangeCache();
carotte.onClose(err);
}
});

// this allow chan to throw on errors
chan.once('error', carotte.onError);
chan.once('error', !isDebug ? carotte.onError : () => {});

if (config.enableDeadLetter) {
if (config.enableDeadLetter && !isDebug) {
return chan.assertQueue(config.deadLetterQualifier)
.then(q => chan.bindQueue(q.queue, 'amq.direct', q.queue))
.then(() => chan);
Expand All @@ -132,8 +137,10 @@ function Carotte(config) {
})
.catch(err => {
channels[channelKey] = undefined;
carotte.cleanExchangeCache();
throw err;
if (!isDebug) {
carotte.cleanExchangeCache();
throw err;
}
});

return channels[channelKey];
Expand Down Expand Up @@ -220,76 +227,81 @@ function Carotte(config) {
options.headers['x-destination'] = qualifier;
}

const exchangeName = getExchangeName(options);
const rpc = options.headers['x-reply-to'] !== undefined;
const { log = true } = options;

// isContentBuffer is used by internal functions who don't modify the content
const buffer = options.isContentBuffer
? payload
: Buffer.from(JSON.stringify({
data: payload,
context: Object.assign({}, options.context, {
transactionStack: getTransactionStack(options.context)
})
}));
// get updated routing key for debug, if dest queue exists
return debugDestinationExists(carotte, options.routingKey, options.context)
.then(routingKey => {
const exchangeName = getExchangeName(options);
const rpc = options.headers['x-reply-to'] !== undefined;
const { log = true } = options;

// isContentBuffer is used by internal functions who don't modify the content
const buffer = options.isContentBuffer
? payload
: Buffer.from(JSON.stringify({
data: payload,
context: Object.assign({}, options.context, {
transactionStack: getTransactionStack(options.context)
})
}));

producerDebug('called');
return carotte.getChannel()
.then(chan => {
let ok;
producerDebug('called');

if (!exchangeCache[exchangeName]) {
producerDebug(`create exchange ${exchangeName}`);
ok = chan.assertExchange(exchangeName, options.type, {
durable: options.durable
});
exchangeCache[exchangeName] = ok;
} else {
producerDebug(`use exchange ${exchangeName} from cache`);
ok = exchangeCache[exchangeName];
}
return carotte.getChannel()
.then(chan => {
let ok;

return ok.then(() => {
producerDebug(`publishing to ${options.routingKey} on ${exchangeName}`);
if (log) {
config.transport.info(`${rpc ? '▶ ' : '▷ '} ${options.type}/${options.routingKey}`, {
if (!exchangeCache[exchangeName]) {
producerDebug(`create exchange ${exchangeName}`);
ok = chan.assertExchange(exchangeName, options.type, {
durable: options.durable
});
exchangeCache[exchangeName] = ok;
} else {
producerDebug(`use exchange ${exchangeName} from cache`);
ok = exchangeCache[exchangeName];
}

return ok.then(() => {
producerDebug(`publishing to ${options.routingKey} on ${exchangeName}`);
if (log) {
config.transport.info(`${rpc ? '▶ ' : '▷ '} ${options.type}/${options.routingKey}`, {
context: options.context,
headers: options.headers,
request: payload,
subscriber: options.context['origin-consumer'] || '',
destination: qualifier
});
}

return chan.publish(
exchangeName,
routingKey,
buffer, {
headers: Object.assign({}, options.headers, {
'x-carotte-version': carottePackage.version,
'x-origin-service': pkg.name
}),
contentType: 'application/json'
}
);
});
})
.catch(err => {
config.transport.error(`${rpc ? '▶ ' : '▷ '} ${options.type}/${options.routingKey}`, {
context: options.context,
headers: options.headers,
request: payload,
subscriber: options.context['origin-consumer'] || '',
destination: qualifier
destination: qualifier,
error: err
});
}

return chan.publish(
exchangeName,
options.routingKey,
buffer, {
headers: Object.assign({}, options.headers, {
'x-carotte-version': carottePackage.version,
'x-origin-service': pkg.name
}),
contentType: 'application/json'
if (err.message.match(errorToRetryRegex)) {
return carotte.publish(qualifier, options, payload);
}
);
});
})
.catch(err => {
config.transport.error(`${rpc ? '▶ ' : '▷ '} ${options.type}/${options.routingKey}`, {
context: options.context,
headers: options.headers,
request: payload,
subscriber: options.context['origin-consumer'] || '',
destination: qualifier,
error: err
});

if (err.message.match(errorToRetryRegex)) {
return carotte.publish(qualifier, options, payload);
}

throw err;
throw err;
});
});
};

Expand Down Expand Up @@ -409,6 +421,12 @@ function Carotte(config) {
options = {};
}

// don't use debug queue on fanout exchange types as it has no effect
// it will likely bork the channel
if (qualifier !== 'fanout') {
qualifier = getDebugQueueName(qualifier, options);
}

if (meta) {
autodocAgent.addSubscriber(qualifier, meta);
if (config.autoDescribe) {
Expand Down Expand Up @@ -517,7 +535,7 @@ function Carotte(config) {
carotte.handleRetry =
function handleRetry(qualifier, options, meta = {}, headers, context, message) {
return err => {
return carotte.getChannel(qualifier, options.prefetch)
return carotte.getChannel()
.then(chan => {
const retry = meta.retry || { max: 5, strategy: 'exponential', interval: 1 };

Expand Down
63 changes: 62 additions & 1 deletion src/utils.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
const configs = require('./configs');

function createDeferred(timeout) {
const deferred = {};

Expand Down Expand Up @@ -85,6 +87,63 @@ function serializeError(err) {
return extend(extractedError, err);
}

/**
* Return queue name for destination (when debug mode enabled).
* Also able to modify queue options to make them non-durable, auto-delete
* @param {string}} queue The initial queue name
* @param {object} [options] Provide amqp options for queue if you are subscribing
* @param {string} tokenOverride override the config token with provided string
* @return {string} The queue name to use for your call
*/
function getDebugQueueName(queue, options, tokenOverride) {
// debug token can be extracted from config (env) or context
const debugToken = tokenOverride || configs.debugToken;

if (!debugToken) {
return queue;
}

// if caller provides an option, it means he is probably subscribing
// so in case we set the queue as trashable (erase it on disconnect)
if (options) {
options.queue = options.queue || {};
options.queue.durable = false;
options.queue.autoDelete = true;
}

return `${queue}:${debugToken}`;
}

/**
* Checks if queue exists in the broker and resolves with destination queue name
* @param {object} carotte a carotte instance
* @param {string} queue The initial queue name
* @param {object} context The context object
* @return {string} The destination queue name
*/
function debugDestinationExists(carotte, queue, context) {
// only if there is a debug token
const debugToken = context.debugToken || configs.debugToken;

// and don't bother with RPC answers
if (debugToken && !queue.startsWith('amq.gen')) {
// get our trashable channel for existance check
// because amqp.lib trash the channel with checkQueue :D
return carotte.getChannel(debugToken, 1, true)
.then(channel => {
const dest = getDebugQueueName(queue, undefined, debugToken);
return channel.checkQueue(dest)
.then(() => {
// if it pass queue exists, we add token to context for future calls
context.debugToken = debugToken;
return dest;
}).catch(err => queue);
});
}

return Promise.resolve(queue);
}

const emptyTransport = {
log: noop,
info: noop,
Expand Down Expand Up @@ -120,5 +179,7 @@ module.exports = {
deserializeError,
extend,
emptyTransport,
getTransactionStack
getTransactionStack,
debugDestinationExists,
getDebugQueueName
};
Loading

0 comments on commit 63471b9

Please sign in to comment.