Skip to content

Commit

Permalink
Merge pull request #642 from Zooz/reload-experiments
Browse files Browse the repository at this point in the history
feat: reload chaos experiments on service startup
  • Loading branch information
GuyAb authored Oct 1, 2023
2 parents 0834846 + 528364e commit 8ff3e15
Show file tree
Hide file tree
Showing 11 changed files with 176 additions and 20 deletions.
10 changes: 5 additions & 5 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ module.exports = async () => {
await database.init();
await jobsManager.init();
await jobsManager.reloadCronJobs();
await jobsManager.reloadChaosExperiments();
await jobsManager.scheduleFinishedContainersCleanup();
if (streamingConfig.platform) {
const eventStreamerPlatformConfig = require(`./config/${streamingConfig.platform}Config`);
Expand Down
4 changes: 4 additions & 0 deletions src/chaos-experiments/models/chaosExperimentsManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,7 @@ module.exports.runChaosExperiment = async (kubernetesChaosConfig, jobExperimentI
logger.error(error, `Error while running chaos job experiment ${jobExperimentId}`);
}
};

module.exports.getFutureJobExperiments = async function (timestamp, contextId) {
return databaseConnector.getFutureJobExperiments(contextId);
};
5 changes: 5 additions & 0 deletions src/chaos-experiments/models/database/databaseConnector.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ module.exports = {
insertChaosJobExperiment,
getChaosJobExperimentById,
getChaosJobExperimentByJobId,
getFutureJobExperiments,
setChaosJobExperimentTriggered,
updateChaosExperiment,
closeConnection
Expand Down Expand Up @@ -64,6 +65,10 @@ async function getChaosJobExperimentByJobId(jobId, contextId) {
return databaseConnector.getChaosJobExperimentById(jobId, contextId);
}

async function getFutureJobExperiments(contextId) {
return databaseConnector.getFutureJobExperiments(contextId);
}

async function setChaosJobExperimentTriggered(jobExperimentId, isTriggered, contextId) {
return databaseConnector.setChaosJobExperimentTriggered(jobExperimentId, isTriggered, contextId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const Sequelize = require('sequelize');
const { CHAOS_EXPERIMENTS_TABLE_NAME, CHAOS_JOB_EXPERIMENTS_TABLE_NAME } = require('../../../../database/sequlize-handler/consts');
const { Op } = require('sequelize');
const KUBEOBJECT = 'kubeObject';
let client;

Expand All @@ -17,6 +18,7 @@ module.exports = {
insertChaosJobExperiment,
getChaosJobExperimentById,
getChaosJobExperimentByJobId,
getFutureJobExperiments,
setChaosJobExperimentTriggered
};

Expand Down Expand Up @@ -173,6 +175,23 @@ async function getChaosJobExperimentByJobId(jobId, contextId) {
return chaosExperiment;
}

async function getFutureJobExperiments(timestamp, contextId) {
const options = {
where: {
is_triggered: false,
start_time: { [Op.gt]: timestamp }
}
};

if (contextId) {
options.where.context_id = contextId;
}

const chaosJobExperimentModel = client.model(CHAOS_JOB_EXPERIMENTS_TABLE_NAME);
const allJobExperiments = await chaosJobExperimentModel.findAll(options);
return allJobExperiments;
}

async function setChaosJobExperimentTriggered(id, isTriggered, contextId) {
const chaosJobExperimentModel = client.model(CHAOS_EXPERIMENTS_TABLE_NAME);
const options = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,4 @@ module.exports.runChaosExperiment = async (kubernetesExperimentConfig) => {
namespace: response.namespace
};
return genericJobResponse;
};
};
25 changes: 15 additions & 10 deletions src/jobs/models/jobExperimentsHandler.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

const chaosExperimentManager = require('../../chaos-experiments/models/chaosExperimentsManager'),
const chaosExperimentsManager = require('../../chaos-experiments/models/chaosExperimentsManager'),
{ v4: uuid } = require('uuid'),
logger = require('../../common/logger');

Expand All @@ -16,31 +16,35 @@ async function setChaosExperimentsIfExist(jobId, jobExperiments) {
try {
const baseTimestamp = Date.now();
const experimentIds = jobExperiments.map(experiment => experiment.experiment_id);
const experimentsFromDb = await chaosExperimentManager.getChaosExperimentsByIds(experimentIds);
const chaoExperimentsFromDb = await chaosExperimentsManager.getChaosExperimentsByIds(experimentIds);
await Promise.all(jobExperiments.map(async(experimentRequest) =>
await setSingleJobExperiment(experimentRequest, experimentsFromDb, baseTimestamp, jobId)
await setSingleJobExperiment(experimentRequest, chaoExperimentsFromDb, baseTimestamp, jobId)
));
} catch (error){
logger.error(error, `error while setting chaos experiments for job ${jobId}`);
}
};

async function setSingleJobExperiment(experimentRequest, experimentsFromDb, baseTimestamp, jobId) {
async function setSingleJobExperiment(experimentRequest, chaoExperimentsFromDb, baseTimestamp, jobId) {
try {
const experiment = experimentsFromDb.find(e => e.id === experimentRequest.experiment_id);
const experiment = chaoExperimentsFromDb.find(e => e.id === experimentRequest.experiment_id);
const startTime = baseTimestamp + experimentRequest.start_after;
const endTime = startTime + convertDurationStringToMillisecond(experiment.kubeObject.spec.duration);
const jobExperimentId = uuid();
await chaosExperimentManager.insertChaosJobExperiment(jobExperimentId, jobId, experiment.id, startTime, endTime);
await chaosExperimentsManager.insertChaosJobExperiment(jobExperimentId, jobId, experiment.id, startTime, endTime);
const kubeObject = experiment.kubeObject;
kubeObject.metadata.name = kubeObject.metadata.name.concat(`-${jobExperimentId}`);
const timeout = setTimeout(() => chaosExperimentManager.runChaosExperiment(kubeObject, jobExperimentId), experimentRequest.start_after);
jobExperimentsIdToTimeout.set(jobExperimentId, timeout);
scheduleChaosExperiment(kubeObject, jobExperimentId, experimentRequest.start_after);
} catch (error){
logger.error(error, `error while setting chaos experiment ${experimentRequest.experiment_id} for job ${jobId}`);
}
}

function scheduleChaosExperiment(kubeObject, jobExperimentId, startAfter) {
const timeout = setTimeout(() => chaosExperimentsManager.runChaosExperiment(kubeObject, jobExperimentId), startAfter);
jobExperimentsIdToTimeout.set(jobExperimentId, timeout);
}

function convertDurationStringToMillisecond(durationString) {
if (durationString.endsWith('s')){
return durationString.split('s')[0] * SEC_TO_MS;
Expand All @@ -59,5 +63,6 @@ function convertDurationStringToMillisecond(durationString) {

module.exports = {
jobExperimentsIdToTimeout,
setChaosExperimentsIfExist
};
setChaosExperimentsIfExist,
scheduleChaosExperiment
};
33 changes: 32 additions & 1 deletion src/jobs/models/jobManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ const logger = require('../../common/logger'),
databaseConnector = require('./database/databaseConnector'),
webhooksManager = require('../../webhooks/models/webhookManager'),
streamingManager = require('../../streaming/manager'),
chaosExperimentsManager = require('../../chaos-experiments/models/chaosExperimentsManager'),
{ STREAMING_EVENT_TYPES } = require('../../streaming/entities/common'),
{ CONFIG, CONTEXT_ID, JOB_TYPE_FUNCTIONAL_TEST } = require('../../common/consts'),
{
CONFIG, CONTEXT_ID, JOB_TYPE_FUNCTIONAL_TEST,
KUBERNETES
} = require('../../common/consts'),
generateError = require('../../common/generateError'),
{ version: PREDATOR_VERSION } = require('../../../package.json'),
jobExperimentHandler = require('./jobExperimentsHandler');
Expand Down Expand Up @@ -44,6 +48,23 @@ module.exports.reloadCronJobs = async () => {
}
};

module.exports.reloadChaosExperiments = async () => {
const contextId = httpContext.get(CONTEXT_ID);
const jobPlatform = await configHandler.getConfigValue(CONFIG.JOB_PLATFORM);
if (jobPlatform.toUpperCase() !== KUBERNETES) {
return;
}
try {
const timestamp = Date.now();
const futureJobExperiments = await chaosExperimentsManager.getFutureJobExperiments(timestamp, contextId);
for (const futureJobExperiment of futureJobExperiments) {
await reloadSingleChaosExperiment(futureJobExperiment, timestamp);
}
} catch (error) {
throw new Error('Unable to reload job experiments , error: ' + error);
}
};

module.exports.scheduleFinishedContainersCleanup = async () => {
const interval = await configHandler.getConfigValue(CONFIG.INTERVAL_CLEANUP_FINISHED_CONTAINERS_MS);
if (interval > 0) {
Expand Down Expand Up @@ -381,3 +402,13 @@ function produceJobToStreamingPlatform(jobResponse) {
};
streamingManager.produce({}, STREAMING_EVENT_TYPES.JOB_CREATED, streamingResource);
}

async function reloadSingleChaosExperiment(futureJobExperiment, timestamp){
try {
const calculatedStartAfter = futureJobExperiment.start_time - timestamp;
const chaosExperiment = await chaosExperimentsManager.getChaosExperimentById(futureJobExperiment.experiment_id);
jobExperimentHandler.scheduleChaosExperiment(chaosExperiment.kubeObject, futureJobExperiment.job_id, futureJobExperiment.id, calculatedStartAfter);
} catch (error) {
throw new Error('Unable to reload job experiments ' + futureJobExperiment.id + ' , error: ' + error);
}
}
47 changes: 46 additions & 1 deletion tests/unit-tests/jobs/helpers/jobVerifier-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ const sinon = require('sinon'),
should = require('should'),
jobVerifier = require('../../../../src/jobs/helpers/jobVerifier'),
testsManager = require('../../../../src/tests/models/manager'),
chaosExperimentsManager = require('../../../../src/chaos-experiments/models/chaosExperimentsManager'),
configHandler = require('../../../../src/configManager/models/configHandler'),
consts = require('../../../../src/common/consts');

describe('Jobs verifier tests', function () {
let req, res, sandbox, nextStub, resJsonStub, resStatusStub, testsManagerStub, configHandlerStub;
let req, res, sandbox, nextStub, resJsonStub, resStatusStub, testsManagerStub, configHandlerStub, getChaosExperimentsByIdsStub;

before(() => {
sandbox = sinon.createSandbox();
Expand All @@ -15,6 +16,7 @@ describe('Jobs verifier tests', function () {
resStatusStub = sandbox.stub();
configHandlerStub = sandbox.stub(configHandler, 'getConfigValue');
testsManagerStub = sandbox.stub(testsManager, 'getTest');
getChaosExperimentsByIdsStub = sandbox.stub(chaosExperimentsManager, 'getChaosExperimentsByIds');
res = {
json: (json) => {
resJsonStub(json);
Expand Down Expand Up @@ -179,4 +181,47 @@ describe('Jobs verifier tests', function () {
should(nextStub.args[0][0].statusCode).eql(400);
});
});

describe('verifyExperimentsExist tests', () => {
it('if job does not have experiments array, should pass', async () => {
configHandlerStub.withArgs(consts.CONFIG.JOB_PLATFORM).resolves('KUBERNETES');
req = { body: { experiments: undefined } };
await jobVerifier.verifyExperimentsExist(req, res, nextStub);
should(nextStub.args[0][0]).eql(undefined);
});
it('if job experiments array is empty, should pass', async () => {
configHandlerStub.withArgs(consts.CONFIG.JOB_PLATFORM).resolves('KUBERNETES');
req = { body: { experiments: [] } };
await jobVerifier.verifyExperimentsExist(req, res, nextStub);
should(nextStub.args[0][0]).eql(undefined);
});
it('if job platform is not KUBERNETES, should pass', async () => {
configHandlerStub.withArgs(consts.CONFIG.JOB_PLATFORM).resolves('DOCKER');
req = { body: { run_immediately: true, cron_expression: '* * * * *', enabled: false } };
await jobVerifier.verifyExperimentsExist(req, res, nextStub);
should(nextStub.args[0][0]).eql(undefined);
});
it('if chaos experiments mentioned in the job exist, should pass', async () => {
configHandlerStub.withArgs(consts.CONFIG.JOB_PLATFORM).resolves('KUBERNETES');
req = { body: { run_immediately: true, cron_expression: '* * * * *', enabled: true, experiments: [{ experiment_id: '1234', start_after: 1000 }] } };
getChaosExperimentsByIdsStub.resolves([{ id: '1234' }]);
await jobVerifier.verifyExperimentsExist(req, res, nextStub);
should(nextStub.args[0][0]).eql(undefined);
});
it('if chaos experiments mentioned in the job do not exist, should fail', async () => {
configHandlerStub.withArgs(consts.CONFIG.JOB_PLATFORM).resolves('KUBERNETES');
req = { body: { run_immediately: true, cron_expression: '* * * * *', enabled: true, experiments: [{ experiment_id: '1234', start_after: 1000 }] } };
getChaosExperimentsByIdsStub.resolves([]);
await jobVerifier.verifyExperimentsExist(req, res, nextStub);
should(nextStub.args[0][0].message).eql('One or more chaos experiments are not configured. Job can not be created');
should(nextStub.args[0][0].statusCode).eql(400);
});
it('if job platform is not KUBERNETES and job has experiments job does not have experiments, should pass', async () => {
configHandlerStub.withArgs(consts.CONFIG.JOB_PLATFORM).resolves('DOCKER');
req = { body: { run_immediately: true, cron_expression: '* * * * *', enabled: false, experiments: [{ experiment_id: '1234', start_after: 1000 }] } };
await jobVerifier.verifyExperimentsExist(req, res, nextStub);
should(nextStub.args[0][0].message).eql('Chaos experiment is supported only in kubernetes jobs');
should(nextStub.args[0][0].statusCode).eql(400);
});
});
});
11 changes: 9 additions & 2 deletions tests/unit-tests/jobs/models/jobExperimentsHandler-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ describe('Job experiments handler tests', function () {
let experimentsManagerGetStub;
let experimentsManagerRunJobStub;
let sandbox;
let clock;

before(() => {
sandbox = sinon.sandbox.create();
Expand All @@ -38,6 +39,13 @@ describe('Job experiments handler tests', function () {
sandbox.reset();
});

afterEach(async () => {
if (clock){
clock.restore();
clock = undefined;
}
});

after(() => {
sandbox.restore();
});
Expand All @@ -60,7 +68,7 @@ describe('Job experiments handler tests', function () {
}
];
const jobId = uuid();
const clock = sinon.useFakeTimers();
clock = sinon.useFakeTimers();
clock.tick(1000);
await jobExperimentHandler.setChaosExperimentsIfExist(jobId, jobExperiments);
clock.tick(3000);
Expand All @@ -74,7 +82,6 @@ describe('Job experiments handler tests', function () {
experimentsManagerInsertStub.args[1][1].should.eql(jobId);
experimentsManagerInsertStub.args[1][2].should.eql(secondExperiment.id);
(experimentsManagerInsertStub.args[1][3] - experimentsManagerInsertStub.args[0][3]).should.eql(1000);
clock.restore();
});

it('set chaos experiments with same experiment in different times', async () => {
Expand Down
Loading

0 comments on commit 8ff3e15

Please sign in to comment.