From 6f4ff53864d6d802859235d20e6ac0e559e91670 Mon Sep 17 00:00:00 2001 From: OleksiiSliusarenko Date: Fri, 26 Apr 2019 13:47:47 +0300 Subject: [PATCH] IN-4745: emit error if message does not corresponds to schema (#213) * IN-4745: emit error if message does not corresponds to schema * IN-4745: changed nodejs versions for build --- .travis.yml | 6 +----- lib/channelManager.js | 1 + lib/validateWithSchema.js | 2 ++ package.json | 2 +- test/channelManager.test.js | 26 ++++++++++++++++++++++++++ test/integration/integration.test.js | 15 ++++++++++++--- 6 files changed, 43 insertions(+), 9 deletions(-) diff --git a/.travis.yml b/.travis.yml index cba6df0..6b68b0f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,12 +1,8 @@ language: node_js node_js: +- '8' - '7' - '6' -- '5' -- '4' -- '0.12' -- '0.11' -- '0.10' - iojs after_success: - npm run coveralls diff --git a/lib/channelManager.js b/lib/channelManager.js index 7ee8f30..8e5a191 100644 --- a/lib/channelManager.js +++ b/lib/channelManager.js @@ -143,6 +143,7 @@ channelManager.create = function() { } function onValidationError(err, message) { + channel.emit('error', err); channel.rejectMessage(message); } diff --git a/lib/validateWithSchema.js b/lib/validateWithSchema.js index f0cd1b9..5f8528c 100644 --- a/lib/validateWithSchema.js +++ b/lib/validateWithSchema.js @@ -126,3 +126,5 @@ function _errorsTrace(errors) { }); return str; } + +module.exports.SchemaValidationError = SchemaValidationError; diff --git a/package.json b/package.json index 2e3c5c4..a635d26 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "msb", - "version": "1.2.1", + "version": "1.2.2", "description": "A framework to simplify the implementation of an event-bus oriented microservices architecture", "license": "MIT", "main": "index.js", diff --git a/test/channelManager.test.js b/test/channelManager.test.js index 184a491..0e48144 100644 --- a/test/channelManager.test.js +++ b/test/channelManager.test.js @@ -9,16 +9,23 @@ var msb = require('..'); var config = require('../lib/config'); var createChannelManager = require('../lib/channelManager').create; var messageFactory = msb.messageFactory; +var SchemaValidationError = require('../lib/validateWithSchema').SchemaValidationError; describe('channelManager', function() { var adapter; var channelManager; before(function(done) { + process.env.NODE_ENV = 'test'; adapter = amqp.create(); done(); }); + after(function(done) { + delete process.env.NODE_ENV; + done(); + }); + beforeEach(function(done) { channelManager = createChannelManager(); @@ -212,6 +219,25 @@ describe('channelManager', function() { done(); }); + it('will emit `error` event if message does not corresponds to schema', function(done) { + simple.mock(config, 'schema', require('../schema')); + channelManager.configure(config); + + var mockSubscriber = {}; + simple.mock(mockSubscriber, 'on'); + simple.mock(channelManager, 'createRawConsumer').returnWith(mockSubscriber); + + var consumer = channelManager.findOrCreateConsumer('c:errorConsumer'); + + consumer.on('error', function(err) { + expect(err).to.be.an.instanceof(SchemaValidationError); + done(); + }); + + var onMessageFn = mockSubscriber.on.calls[0].args[1]; + onMessageFn({}); + }); + it('will listen for messages and emit a new message event', function(done) { var mockSubscriber = {}; simple.mock(mockSubscriber, 'on'); diff --git a/test/integration/integration.test.js b/test/integration/integration.test.js index 6a7dbc2..6a45245 100644 --- a/test/integration/integration.test.js +++ b/test/integration/integration.test.js @@ -100,8 +100,10 @@ describe('AMQP Integration', function() { 'rejectMessage'); var onMessageMethod = simple.mock(); + var onErrorMethod = simple.mock(); consumer.on('message', onMessageMethod); + consumer.on('error', onErrorMethod); publisher.publish([ fixtures.consumer_basic, @@ -111,7 +113,8 @@ describe('AMQP Integration', function() { if (err) return done(err); setTimeout(function() { - expect(onMessageMethod.callCount).equals(2) + expect(onErrorMethod.callCount).equals(1); + expect(onMessageMethod.callCount).equals(2); expect(onMessageMethod.calls[0].arg).deep.equals(fixtures.consumer_basic); expect(onMessageMethod.calls[1].arg).deep.equals(fixtures.consumer_basic); expect(_onMessageMethod.callCount).equals(3); @@ -133,8 +136,10 @@ describe('AMQP Integration', function() { 'rejectMessage'); var onMessageMethod = simple.mock(); + var onErrorMethod = simple.mock(); consumer.on('message', onMessageMethod); + consumer.on('error', onErrorMethod); publisher.publish([ fixtures.consumer_basic, @@ -144,7 +149,8 @@ describe('AMQP Integration', function() { if (err) return done(err); setTimeout(function() { - expect(onMessageMethod.callCount).equals(2) + expect(onErrorMethod.callCount).equals(1); + expect(onMessageMethod.callCount).equals(2); expect(onMessageMethod.calls[0].arg).deep.equals(fixtures.consumer_basic); expect(onMessageMethod.calls[1].arg).deep.equals(fixtures.consumer_basic); expect(_onMessageMethod.callCount).equals(3); @@ -166,8 +172,10 @@ describe('AMQP Integration', function() { 'rejectMessage'); var onMessageMethod = simple.mock(); + var onErrorMethod = simple.mock(); consumer.on('message', onMessageMethod); + consumer.on('error', onErrorMethod); publisher.publish([ fixtures.consumer_basic, @@ -177,7 +185,8 @@ describe('AMQP Integration', function() { if (err) return done(err); setTimeout(function() { - expect(onMessageMethod.callCount).equals(2) + expect(onErrorMethod.callCount).equals(1); + expect(onMessageMethod.callCount).equals(2); expect(onMessageMethod.calls[0].arg).deep.equals(fixtures.consumer_basic); expect(onMessageMethod.calls[1].arg).deep.equals(fixtures.consumer_basic); expect(rejectMethod.callCount).equals(1);