diff --git a/package-lock.json b/package-lock.json index 148e8b467..8dd243169 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1671,7 +1671,7 @@ "any-promise": { "version": "1.3.0", "resolved": "https://registry.npmjs.org/any-promise/-/any-promise-1.3.0.tgz", - "integrity": "sha512-7UvmKalWRt1wgjL1RrGxoSJW/0QZFIegpeGvZG9kjp8vrRu55XTHbwnqq2GpXm9uLbcuhxm3IqX9OB4MZR1b2A==" + "integrity": "sha1-q8av7tzqUugJzcA3au0845Y10X8=" }, "anymatch": { "version": "3.1.1", @@ -2829,7 +2829,7 @@ "cls-bluebird": { "version": "2.1.0", "resolved": "https://registry.npmjs.org/cls-bluebird/-/cls-bluebird-2.1.0.tgz", - "integrity": "sha512-XVb0RPmHQyy35Tz9z34gvtUcBKUK8A/1xkGCyeFc9B0C7Zr5SysgFaswRVdwI5NEMcO+3JKlIDGIOgERSn9NdA==", + "integrity": "sha1-N+8eCAqP+1XC9BZPU28ZGeeWiu4=", "requires": { "is-bluebird": "^1.0.2", "shimmer": "^1.1.0" @@ -7115,7 +7115,7 @@ "inflection": { "version": "1.12.0", "resolved": "https://registry.npmjs.org/inflection/-/inflection-1.12.0.tgz", - "integrity": "sha512-lRy4DxuIFWXlJU7ed8UiTJOSTqStqYdEb4CEbtXfNbkdj3nH1L+reUWiE10VWcJS2yR7tge8Z74pJjtBjNwj0w==" + "integrity": "sha1-ogCTVlbW9fa8TcdQLhrstwMihBY=" }, "inflight": { "version": "1.0.6", @@ -7320,7 +7320,7 @@ "is-bluebird": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/is-bluebird/-/is-bluebird-1.0.2.tgz", - "integrity": "sha512-PDRu1vVip5dGQg5tfn2qVCCyxbBYu5MhYUJwSfL/RoGBI97n1fxvilVazxzptZW0gcmsMH17H4EVZZI5E/RSeA==" + "integrity": "sha1-CWQ5Bg9KpBGr7hkUOoTWpVNG1uI=" }, "is-buffer": { "version": "1.1.6", @@ -12302,7 +12302,7 @@ "toposort-class": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/toposort-class/-/toposort-class-1.0.1.tgz", - "integrity": "sha512-OsLcGGbYF3rMjPUf8oKktyvCiUxSbqMMS39m33MAjLTC1DVIH6x3WSt63/M77ihI09+Sdfk1AXvfhCEeUmC7mg==" + "integrity": "sha1-f/0feMi+KMO6Rc1OGj9e4ZO9mYg=" }, "tough-cookie": { "version": "2.5.0", diff --git a/src/app.js b/src/app.js index 40e1ff19e..270fbae3b 100644 --- a/src/app.js +++ b/src/app.js @@ -31,6 +31,7 @@ module.exports = async () => { await database.init(); await jobsManager.init(); await jobsManager.reloadCronJobs(); + await jobsManager.reloadChaosExperiments(); await jobsManager.scheduleFinishedContainersCleanup(); if (streamingConfig.platform) { const eventStreamerPlatformConfig = require(`./config/${streamingConfig.platform}Config`); diff --git a/src/chaos-experiments/models/chaosExperimentsManager.js b/src/chaos-experiments/models/chaosExperimentsManager.js index 730fe4fc0..6ef808bd4 100644 --- a/src/chaos-experiments/models/chaosExperimentsManager.js +++ b/src/chaos-experiments/models/chaosExperimentsManager.js @@ -90,3 +90,7 @@ module.exports.runChaosExperiment = async (kubernetesChaosConfig, jobExperimentI logger.error(error, `Error while running chaos job experiment ${jobExperimentId}`); } }; + +module.exports.getFutureJobExperiments = async function (timestamp, contextId) { + return databaseConnector.getFutureJobExperiments(contextId); +}; diff --git a/src/chaos-experiments/models/database/databaseConnector.js b/src/chaos-experiments/models/database/databaseConnector.js index a12f6d708..04783f22f 100644 --- a/src/chaos-experiments/models/database/databaseConnector.js +++ b/src/chaos-experiments/models/database/databaseConnector.js @@ -11,6 +11,7 @@ module.exports = { insertChaosJobExperiment, getChaosJobExperimentById, getChaosJobExperimentByJobId, + getFutureJobExperiments, setChaosJobExperimentTriggered, updateChaosExperiment, closeConnection @@ -64,6 +65,10 @@ async function getChaosJobExperimentByJobId(jobId, contextId) { return databaseConnector.getChaosJobExperimentById(jobId, contextId); } +async function getFutureJobExperiments(contextId) { + return databaseConnector.getFutureJobExperiments(contextId); +} + async function setChaosJobExperimentTriggered(jobExperimentId, isTriggered, contextId) { return databaseConnector.setChaosJobExperimentTriggered(jobExperimentId, isTriggered, contextId); } diff --git a/src/chaos-experiments/models/database/sequelize/sequelizeConnector.js b/src/chaos-experiments/models/database/sequelize/sequelizeConnector.js index 117389f63..b556c01c5 100644 --- a/src/chaos-experiments/models/database/sequelize/sequelizeConnector.js +++ b/src/chaos-experiments/models/database/sequelize/sequelizeConnector.js @@ -2,6 +2,7 @@ const Sequelize = require('sequelize'); const { CHAOS_EXPERIMENTS_TABLE_NAME, CHAOS_JOB_EXPERIMENTS_TABLE_NAME } = require('../../../../database/sequlize-handler/consts'); +const { Op } = require('sequelize'); const KUBEOBJECT = 'kubeObject'; let client; @@ -17,6 +18,7 @@ module.exports = { insertChaosJobExperiment, getChaosJobExperimentById, getChaosJobExperimentByJobId, + getFutureJobExperiments, setChaosJobExperimentTriggered }; @@ -173,6 +175,23 @@ async function getChaosJobExperimentByJobId(jobId, contextId) { return chaosExperiment; } +async function getFutureJobExperiments(timestamp, contextId) { + const options = { + where: { + is_triggered: false, + start_time: { [Op.gt]: timestamp } + } + }; + + if (contextId) { + options.where.context_id = contextId; + } + + const chaosJobExperimentModel = client.model(CHAOS_JOB_EXPERIMENTS_TABLE_NAME); + const allJobExperiments = await chaosJobExperimentModel.findAll(options); + return allJobExperiments; +} + async function setChaosJobExperimentTriggered(id, isTriggered, contextId) { const chaosJobExperimentModel = client.model(CHAOS_EXPERIMENTS_TABLE_NAME); const options = { diff --git a/src/chaos-experiments/models/kubernetes/chaosExperimentConnector.js b/src/chaos-experiments/models/kubernetes/chaosExperimentConnector.js index c48963621..79bbe31bb 100644 --- a/src/chaos-experiments/models/kubernetes/chaosExperimentConnector.js +++ b/src/chaos-experiments/models/kubernetes/chaosExperimentConnector.js @@ -38,4 +38,4 @@ module.exports.runChaosExperiment = async (kubernetesExperimentConfig) => { namespace: response.namespace }; return genericJobResponse; -}; \ No newline at end of file +}; diff --git a/src/jobs/models/jobExperimentsHandler.js b/src/jobs/models/jobExperimentsHandler.js index c05d973c5..9f29c9129 100644 --- a/src/jobs/models/jobExperimentsHandler.js +++ b/src/jobs/models/jobExperimentsHandler.js @@ -1,5 +1,5 @@ -const chaosExperimentManager = require('../../chaos-experiments/models/chaosExperimentsManager'), +const chaosExperimentsManager = require('../../chaos-experiments/models/chaosExperimentsManager'), { v4: uuid } = require('uuid'), logger = require('../../common/logger'); @@ -16,31 +16,35 @@ async function setChaosExperimentsIfExist(jobId, jobExperiments) { try { const baseTimestamp = Date.now(); const experimentIds = jobExperiments.map(experiment => experiment.experiment_id); - const experimentsFromDb = await chaosExperimentManager.getChaosExperimentsByIds(experimentIds); + const chaoExperimentsFromDb = await chaosExperimentsManager.getChaosExperimentsByIds(experimentIds); await Promise.all(jobExperiments.map(async(experimentRequest) => - await setSingleJobExperiment(experimentRequest, experimentsFromDb, baseTimestamp, jobId) + await setSingleJobExperiment(experimentRequest, chaoExperimentsFromDb, baseTimestamp, jobId) )); } catch (error){ logger.error(error, `error while setting chaos experiments for job ${jobId}`); } }; -async function setSingleJobExperiment(experimentRequest, experimentsFromDb, baseTimestamp, jobId) { +async function setSingleJobExperiment(experimentRequest, chaoExperimentsFromDb, baseTimestamp, jobId) { try { - const experiment = experimentsFromDb.find(e => e.id === experimentRequest.experiment_id); + const experiment = chaoExperimentsFromDb.find(e => e.id === experimentRequest.experiment_id); const startTime = baseTimestamp + experimentRequest.start_after; const endTime = startTime + convertDurationStringToMillisecond(experiment.kubeObject.spec.duration); const jobExperimentId = uuid(); - await chaosExperimentManager.insertChaosJobExperiment(jobExperimentId, jobId, experiment.id, startTime, endTime); + await chaosExperimentsManager.insertChaosJobExperiment(jobExperimentId, jobId, experiment.id, startTime, endTime); const kubeObject = experiment.kubeObject; kubeObject.metadata.name = kubeObject.metadata.name.concat(`-${jobExperimentId}`); - const timeout = setTimeout(() => chaosExperimentManager.runChaosExperiment(kubeObject, jobExperimentId), experimentRequest.start_after); - jobExperimentsIdToTimeout.set(jobExperimentId, timeout); + scheduleChaosExperiment(kubeObject, jobExperimentId, experimentRequest.start_after); } catch (error){ logger.error(error, `error while setting chaos experiment ${experimentRequest.experiment_id} for job ${jobId}`); } } +function scheduleChaosExperiment(kubeObject, jobExperimentId, startAfter) { + const timeout = setTimeout(() => chaosExperimentsManager.runChaosExperiment(kubeObject, jobExperimentId), startAfter); + jobExperimentsIdToTimeout.set(jobExperimentId, timeout); +} + function convertDurationStringToMillisecond(durationString) { if (durationString.endsWith('s')){ return durationString.split('s')[0] * SEC_TO_MS; @@ -59,5 +63,6 @@ function convertDurationStringToMillisecond(durationString) { module.exports = { jobExperimentsIdToTimeout, - setChaosExperimentsIfExist -}; \ No newline at end of file + setChaosExperimentsIfExist, + scheduleChaosExperiment +}; diff --git a/src/jobs/models/jobManager.js b/src/jobs/models/jobManager.js index 6fd52a8e2..ca4a60978 100644 --- a/src/jobs/models/jobManager.js +++ b/src/jobs/models/jobManager.js @@ -13,8 +13,12 @@ const logger = require('../../common/logger'), databaseConnector = require('./database/databaseConnector'), webhooksManager = require('../../webhooks/models/webhookManager'), streamingManager = require('../../streaming/manager'), + chaosExperimentsManager = require('../../chaos-experiments/models/chaosExperimentsManager'), { STREAMING_EVENT_TYPES } = require('../../streaming/entities/common'), - { CONFIG, CONTEXT_ID, JOB_TYPE_FUNCTIONAL_TEST } = require('../../common/consts'), + { + CONFIG, CONTEXT_ID, JOB_TYPE_FUNCTIONAL_TEST, + KUBERNETES + } = require('../../common/consts'), generateError = require('../../common/generateError'), { version: PREDATOR_VERSION } = require('../../../package.json'), jobExperimentHandler = require('./jobExperimentsHandler'); @@ -44,6 +48,23 @@ module.exports.reloadCronJobs = async () => { } }; +module.exports.reloadChaosExperiments = async () => { + const contextId = httpContext.get(CONTEXT_ID); + const jobPlatform = await configHandler.getConfigValue(CONFIG.JOB_PLATFORM); + if (jobPlatform.toUpperCase() !== KUBERNETES) { + return; + } + try { + const timestamp = Date.now(); + const futureJobExperiments = await chaosExperimentsManager.getFutureJobExperiments(timestamp, contextId); + for (const futureJobExperiment of futureJobExperiments) { + await reloadSingleChaosExperiment(futureJobExperiment, timestamp); + } + } catch (error) { + throw new Error('Unable to reload job experiments , error: ' + error); + } +}; + module.exports.scheduleFinishedContainersCleanup = async () => { const interval = await configHandler.getConfigValue(CONFIG.INTERVAL_CLEANUP_FINISHED_CONTAINERS_MS); if (interval > 0) { @@ -381,3 +402,13 @@ function produceJobToStreamingPlatform(jobResponse) { }; streamingManager.produce({}, STREAMING_EVENT_TYPES.JOB_CREATED, streamingResource); } + +async function reloadSingleChaosExperiment(futureJobExperiment, timestamp){ + try { + const calculatedStartAfter = futureJobExperiment.start_time - timestamp; + const chaosExperiment = await chaosExperimentsManager.getChaosExperimentById(futureJobExperiment.experiment_id); + jobExperimentHandler.scheduleChaosExperiment(chaosExperiment.kubeObject, futureJobExperiment.job_id, futureJobExperiment.id, calculatedStartAfter); + } catch (error) { + throw new Error('Unable to reload job experiments ' + futureJobExperiment.id + ' , error: ' + error); + } +} diff --git a/tests/unit-tests/jobs/helpers/jobVerifier-test.js b/tests/unit-tests/jobs/helpers/jobVerifier-test.js index 4a55809e3..877ceb511 100644 --- a/tests/unit-tests/jobs/helpers/jobVerifier-test.js +++ b/tests/unit-tests/jobs/helpers/jobVerifier-test.js @@ -2,11 +2,12 @@ const sinon = require('sinon'), should = require('should'), jobVerifier = require('../../../../src/jobs/helpers/jobVerifier'), testsManager = require('../../../../src/tests/models/manager'), + chaosExperimentsManager = require('../../../../src/chaos-experiments/models/chaosExperimentsManager'), configHandler = require('../../../../src/configManager/models/configHandler'), consts = require('../../../../src/common/consts'); describe('Jobs verifier tests', function () { - let req, res, sandbox, nextStub, resJsonStub, resStatusStub, testsManagerStub, configHandlerStub; + let req, res, sandbox, nextStub, resJsonStub, resStatusStub, testsManagerStub, configHandlerStub, getChaosExperimentsByIdsStub; before(() => { sandbox = sinon.createSandbox(); @@ -15,6 +16,7 @@ describe('Jobs verifier tests', function () { resStatusStub = sandbox.stub(); configHandlerStub = sandbox.stub(configHandler, 'getConfigValue'); testsManagerStub = sandbox.stub(testsManager, 'getTest'); + getChaosExperimentsByIdsStub = sandbox.stub(chaosExperimentsManager, 'getChaosExperimentsByIds'); res = { json: (json) => { resJsonStub(json); @@ -179,4 +181,47 @@ describe('Jobs verifier tests', function () { should(nextStub.args[0][0].statusCode).eql(400); }); }); + + describe('verifyExperimentsExist tests', () => { + it('if job does not have experiments array, should pass', async () => { + configHandlerStub.withArgs(consts.CONFIG.JOB_PLATFORM).resolves('KUBERNETES'); + req = { body: { experiments: undefined } }; + await jobVerifier.verifyExperimentsExist(req, res, nextStub); + should(nextStub.args[0][0]).eql(undefined); + }); + it('if job experiments array is empty, should pass', async () => { + configHandlerStub.withArgs(consts.CONFIG.JOB_PLATFORM).resolves('KUBERNETES'); + req = { body: { experiments: [] } }; + await jobVerifier.verifyExperimentsExist(req, res, nextStub); + should(nextStub.args[0][0]).eql(undefined); + }); + it('if job platform is not KUBERNETES, should pass', async () => { + configHandlerStub.withArgs(consts.CONFIG.JOB_PLATFORM).resolves('DOCKER'); + req = { body: { run_immediately: true, cron_expression: '* * * * *', enabled: false } }; + await jobVerifier.verifyExperimentsExist(req, res, nextStub); + should(nextStub.args[0][0]).eql(undefined); + }); + it('if chaos experiments mentioned in the job exist, should pass', async () => { + configHandlerStub.withArgs(consts.CONFIG.JOB_PLATFORM).resolves('KUBERNETES'); + req = { body: { run_immediately: true, cron_expression: '* * * * *', enabled: true, experiments: [{ experiment_id: '1234', start_after: 1000 }] } }; + getChaosExperimentsByIdsStub.resolves([{ id: '1234' }]); + await jobVerifier.verifyExperimentsExist(req, res, nextStub); + should(nextStub.args[0][0]).eql(undefined); + }); + it('if chaos experiments mentioned in the job do not exist, should fail', async () => { + configHandlerStub.withArgs(consts.CONFIG.JOB_PLATFORM).resolves('KUBERNETES'); + req = { body: { run_immediately: true, cron_expression: '* * * * *', enabled: true, experiments: [{ experiment_id: '1234', start_after: 1000 }] } }; + getChaosExperimentsByIdsStub.resolves([]); + await jobVerifier.verifyExperimentsExist(req, res, nextStub); + should(nextStub.args[0][0].message).eql('One or more chaos experiments are not configured. Job can not be created'); + should(nextStub.args[0][0].statusCode).eql(400); + }); + it('if job platform is not KUBERNETES and job has experiments job does not have experiments, should pass', async () => { + configHandlerStub.withArgs(consts.CONFIG.JOB_PLATFORM).resolves('DOCKER'); + req = { body: { run_immediately: true, cron_expression: '* * * * *', enabled: false, experiments: [{ experiment_id: '1234', start_after: 1000 }] } }; + await jobVerifier.verifyExperimentsExist(req, res, nextStub); + should(nextStub.args[0][0].message).eql('Chaos experiment is supported only in kubernetes jobs'); + should(nextStub.args[0][0].statusCode).eql(400); + }); + }); }); diff --git a/tests/unit-tests/jobs/models/jobExperimentsHandler-test.js b/tests/unit-tests/jobs/models/jobExperimentsHandler-test.js index d7ed5daa8..47a5b2399 100644 --- a/tests/unit-tests/jobs/models/jobExperimentsHandler-test.js +++ b/tests/unit-tests/jobs/models/jobExperimentsHandler-test.js @@ -27,6 +27,7 @@ describe('Job experiments handler tests', function () { let experimentsManagerGetStub; let experimentsManagerRunJobStub; let sandbox; + let clock; before(() => { sandbox = sinon.sandbox.create(); @@ -38,6 +39,13 @@ describe('Job experiments handler tests', function () { sandbox.reset(); }); + afterEach(async () => { + if (clock){ + clock.restore(); + clock = undefined; + } + }); + after(() => { sandbox.restore(); }); @@ -60,7 +68,7 @@ describe('Job experiments handler tests', function () { } ]; const jobId = uuid(); - const clock = sinon.useFakeTimers(); + clock = sinon.useFakeTimers(); clock.tick(1000); await jobExperimentHandler.setChaosExperimentsIfExist(jobId, jobExperiments); clock.tick(3000); @@ -74,7 +82,6 @@ describe('Job experiments handler tests', function () { experimentsManagerInsertStub.args[1][1].should.eql(jobId); experimentsManagerInsertStub.args[1][2].should.eql(secondExperiment.id); (experimentsManagerInsertStub.args[1][3] - experimentsManagerInsertStub.args[0][3]).should.eql(1000); - clock.restore(); }); it('set chaos experiments with same experiment in different times', async () => { diff --git a/tests/unit-tests/jobs/models/jobManager-test.js b/tests/unit-tests/jobs/models/jobManager-test.js index 41d909d2b..6418a7ae2 100644 --- a/tests/unit-tests/jobs/models/jobManager-test.js +++ b/tests/unit-tests/jobs/models/jobManager-test.js @@ -15,6 +15,8 @@ const databaseConnector = require('../../../../src/jobs/models/database/database webhooksManager = require('../../../../src/webhooks/models/webhookManager'), basicTest = require('../../../testExamples/Basic_test.json'), reportsManager = require('../../../../src/reports/models/reportsManager'), + chaosExperimentsManager = require('../../../../src/chaos-experiments/models/chaosExperimentsManager'), + chaosExperimentConnector = require('../../../../src/chaos-experiments/models/kubernetes/chaosExperimentConnector'), config = require('../../../../src/common/consts').CONFIG; let manager; @@ -47,6 +49,11 @@ describe('Manager jobs', function () { let testsManagerGetStub; + let getFutureJobExperimentsStub; + let getChaosExperimentByIdStub; + + let runChaosExperimentStub; + before(() => { sandbox = sinon.sandbox.create(); @@ -63,6 +70,11 @@ describe('Manager jobs', function () { testsManagerGetStub = sandbox.stub(testsManager, 'getTest'); + getFutureJobExperimentsStub = sandbox.stub(chaosExperimentsManager, 'getFutureJobExperiments'); + getChaosExperimentByIdStub = sandbox.stub(chaosExperimentsManager, 'getChaosExperimentById'); + + runChaosExperimentStub = sandbox.stub(chaosExperimentConnector, 'runChaosExperiment'); + jobGetLogsStub = sandbox.stub(jobConnector, 'getLogs'); jobDeleteContainerStub = sandbox.stub(jobConnector, 'deleteAllContainers'); jobStopRunStub = sandbox.stub(jobConnector, 'stopRun'); @@ -161,6 +173,33 @@ describe('Manager jobs', function () { }); }); + describe('Reload job experiments', function () { + it('found future experiments to reload', async () => { + const timestamp = 500; + const jobExperiment = { start_time: timestamp, job_id: '1234', experiment_id: '4321', id: '2468' }; + const chaosExperiment = { kubeObject: { hello: 1 }, experiment_id: '4321' }; + getFutureJobExperimentsStub.resolves([jobExperiment]); + getChaosExperimentByIdStub.resolves(chaosExperiment); + runChaosExperimentStub.returns(); + + const clock = sinon.useFakeTimers(); + clock.tick(1000); + await manager.reloadChaosExperiments(); + clock.tick(3000); + sinon.assert.calledOnce(runChaosExperimentStub); + sinon.assert.calledWith(runChaosExperimentStub, chaosExperiment.kubeObject); + clock.restore(); + }); + it('future experiments not found - nothing to reload', async () => { + getFutureJobExperimentsStub.resolves([]); + runChaosExperimentStub.returns(); + + await manager.reloadChaosExperiments(); + sinon.assert.notCalled(getChaosExperimentByIdStub); + sinon.assert.notCalled(runChaosExperimentStub); + }); + }); + describe('schedule Finished Containers Cleanup', function () { it('Interval is set to 0, no automatic cleanup is scheduled', (done) => { getConfigValueStub.withArgs(config.INTERVAL_CLEANUP_FINISHED_CONTAINERS_MS).returns(0);