Skip to content

Commit

Permalink
[Security Solutions] Sync EA transform with risk engine (#167371)
Browse files Browse the repository at this point in the history
## Summary

@elastic/security-entity-analytics

When the risk engine is enabled, users must wait at least one hour to
see risk score data.
This PR fixes the issue by scheduling the latest transform after the
risk score task finishes.

The first time the risk score runs, it will call `start transform`. That
is necessary to ensure the transform doesn't run on an empty index. On
the subsequent runs, it will call `schedule transform now`.

How to test it?
* Open kibana with an empty ES
* Make sure you have the appropriate license
* Ingest some events and generate alerts
* Enable the risk engine
* Open the Entity Analytics Dashboard 
* You should see risk score data (if it isn't there yet, refresh the
page)

### Checklist

Delete any items that are not applicable to this PR.

- [x] Any text added follows [EUI's writing
guidelines](https://elastic.github.io/eui/#/guidelines/writing), uses
sentence case text and includes [i18n
support](https://github.com/elastic/kibana/blob/main/packages/kbn-i18n/README.md)
- [x] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios

---------

Co-authored-by: Ryland Herrick <[email protected]>
  • Loading branch information
machadoum and rylnd authored Oct 9, 2023
1 parent 474ce1e commit 42ba72a
Show file tree
Hide file tree
Showing 11 changed files with 171 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,6 @@ export const getIndexPatternDataStream = (namespace: string): IIndexPatternStrin
alias: `${riskScoreBaseIndexName}.${riskScoreBaseIndexName}-${namespace}`,
});

export const getLatestTransformId = (namespace: string): string =>
`risk_score_latest_transform_${namespace}`;

export const getTransformOptions = ({ dest, source }: { dest: string; source: string[] }) => ({
dest: {
index: dest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,11 +511,6 @@ describe('RiskEngineDataClient', () => {
transform_id: 'risk_score_latest_transform_default',
},
});

expect(transforms.startTransform).toHaveBeenCalledWith({
esClient,
transformId: 'risk_score_latest_transform_default',
});
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import {
mappingComponentName,
ilmPolicyName,
ilmPolicy,
getLatestTransformId,
getTransformOptions,
} from './configurations';
import { createDataStream } from './utils/create_datastream';
Expand All @@ -40,8 +39,8 @@ import {
} from '../../../common/risk_engine';
import {
getLegacyTransforms,
getLatestTransformId,
removeLegacyTransforms,
startTransform,
createTransform,
} from './utils/transforms';
import {
Expand Down Expand Up @@ -362,8 +361,6 @@ export class RiskEngineDataClient {
}),
},
});

await startTransform({ esClient, transformId });
} catch (error) {
this.options.logger.error(`Error initializing risk engine resources: ${error.message}`);
throw error;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const createRiskScoreServiceMock = (): jest.Mocked<RiskScoreService> => ({
calculateAndPersistScores: jest.fn(),
getConfiguration: jest.fn(),
getRiskInputsIndex: jest.fn(),
scheduleLatestTransformNow: jest.fn(),
});

export const riskScoreServiceMock = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { calculateRiskScores } from './calculate_risk_scores';
import { calculateAndPersistRiskScores } from './calculate_and_persist_risk_scores';
import type { RiskEngineDataClient } from './risk_engine_data_client';
import type { RiskInputsIndexResponse } from './get_risk_inputs_index';
import { scheduleLatestTransformNow } from './utils/transforms';

export interface RiskScoreService {
calculateScores: (params: CalculateScoresParams) => Promise<CalculateScoresResponse>;
Expand All @@ -25,6 +26,7 @@ export interface RiskScoreService {
) => Promise<CalculateAndPersistScoresResponse>;
getConfiguration: () => Promise<RiskEngineConfiguration | null>;
getRiskInputsIndex: ({ dataViewId }: { dataViewId: string }) => Promise<RiskInputsIndexResponse>;
scheduleLatestTransformNow: () => Promise<void>;
}

export interface RiskScoreServiceFactoryParams {
Expand All @@ -45,4 +47,5 @@ export const riskScoreServiceFactory = ({
calculateAndPersistRiskScores({ ...params, esClient, logger, riskEngineDataClient, spaceId }),
getConfiguration: async () => riskEngineDataClient.getConfiguration(),
getRiskInputsIndex: async (params) => riskEngineDataClient.getRiskInputsIndex(params),
scheduleLatestTransformNow: () => scheduleLatestTransformNow({ namespace: spaceId, esClient }),
});
Original file line number Diff line number Diff line change
Expand Up @@ -449,9 +449,21 @@ describe('Risk Scoring Task', () => {
expect.stringContaining('task was cancelled')
);
});

it('schedules the transform to run now', async () => {
await runTask({
getRiskScoreService,
isCancelled: mockIsCancelled,
logger: mockLogger,
taskInstance: riskScoringTaskInstanceMock,
telemetry: mockTelemetry,
});

expect(mockRiskScoreService.scheduleLatestTransformNow).toHaveBeenCalledTimes(1);
});
});

describe('telemetry', () => {
describe('when execution was successful', () => {
it('send success telemetry event', async () => {
await runTask({
getRiskScoreService,
Expand All @@ -468,12 +480,28 @@ describe('Risk Scoring Task', () => {
});
});

it('send error telemetry event', async () => {
it('schedules the transform to run now', async () => {
await runTask({
getRiskScoreService,
isCancelled: mockIsCancelled,
logger: mockLogger,
taskInstance: riskScoringTaskInstanceMock,
telemetry: mockTelemetry,
});

expect(mockRiskScoreService.scheduleLatestTransformNow).toHaveBeenCalledTimes(1);
});
});

describe('when execution was unsuccessful', () => {
beforeEach(() => {
mockRiskScoreService.calculateAndPersistScores.mockReset();
mockRiskScoreService.calculateAndPersistScores.mockImplementationOnce(() => {
throw new Error();
});
});

it('send error telemetry event', async () => {
try {
await runTask({
getRiskScoreService,
Expand All @@ -491,6 +519,20 @@ describe('Risk Scoring Task', () => {
}
});

it('does not schedules the transform to run now', async () => {
await expect(
runTask({
getRiskScoreService,
isCancelled: mockIsCancelled,
logger: mockLogger,
taskInstance: riskScoringTaskInstanceMock,
telemetry: mockTelemetry,
})
).rejects.toThrow();

expect(mockRiskScoreService.scheduleLatestTransformNow).not.toHaveBeenCalled();
});

it('sends a cancellation telemetry event if the task was cancelled', async () => {
mockIsCancelled.mockReturnValue(true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,10 @@ export const runTask = async ({
taskDurationInSeconds,
interval: taskInstance?.schedule?.interval,
};

telemetry.reportEvent(RISK_SCORE_EXECUTION_SUCCESS_EVENT.eventType, telemetryEvent);

riskScoreService.scheduleLatestTransformNow();

if (isCancelled()) {
log('task was cancelled');
telemetry.reportEvent(RISK_SCORE_EXECUTION_CANCELLATION_EVENT.eventType, telemetryEvent);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { TransformGetTransformStatsResponse } from '@elastic/elasticsearch/lib/api/types';
import { elasticsearchServiceMock } from '@kbn/core/server/mocks';
import { scheduleTransformNow } from './transforms';

const transformId = 'test_transform_id';

const startedTransformsMock = {
count: 1,
transforms: [
{
id: 'test_transform_id_1',
state: 'started',
},
],
} as TransformGetTransformStatsResponse;

const stoppedTransformsMock = {
count: 1,
transforms: [
{
id: 'test_transform_id_2',
state: 'stopped',
},
],
} as TransformGetTransformStatsResponse;

describe('transforms utils', () => {
beforeEach(() => {
jest.resetAllMocks();
});

describe('scheduleTransformNow', () => {
it('calls startTransform when the transform state is stopped ', async () => {
const esClient = elasticsearchServiceMock.createScopedClusterClient().asCurrentUser;
esClient.transform.getTransformStats.mockResolvedValueOnce(stoppedTransformsMock);

await scheduleTransformNow({ esClient, transformId });

expect(esClient.transform.startTransform).toHaveBeenCalled();
});

it('calls scheduleNowTransform when the transform state is started ', async () => {
const esClient = elasticsearchServiceMock.createScopedClusterClient().asCurrentUser;
esClient.transform.getTransformStats.mockResolvedValueOnce(startedTransformsMock);

await scheduleTransformNow({ esClient, transformId });

expect(esClient.transform.scheduleNowTransform).toHaveBeenCalled();
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import type {
TransformPutTransformResponse,
TransformGetTransformTransformSummary,
TransformPutTransformRequest,
TransformGetTransformStatsTransformStats,
} from '@elastic/elasticsearch/lib/api/types';
import { RiskScoreEntity } from '../../../../common/search_strategy';
import {
Expand Down Expand Up @@ -99,6 +100,13 @@ export const createTransform = async ({
}
};

export const getLatestTransformId = (namespace: string): string =>
`risk_score_latest_transform_${namespace}`;

const hasTransformStarted = (transformStats: TransformGetTransformStatsTransformStats): boolean => {
return transformStats.state === 'indexing' || transformStats.state === 'started';
};

export const startTransform = async ({
esClient,
transformId,
Expand All @@ -110,13 +118,51 @@ export const startTransform = async ({
transform_id: transformId,
});
if (transformStats.count <= 0) {
throw new Error(`Can't check ${transformId} status`);
throw new Error(
`Unable to find transform status for [${transformId}] while attempting to start`
);
}
if (
transformStats.transforms[0].state === 'indexing' ||
transformStats.transforms[0].state === 'started'
) {
if (hasTransformStarted(transformStats.transforms[0])) {
return;
}

return esClient.transform.startTransform({ transform_id: transformId });
};

export const scheduleTransformNow = async ({
esClient,
transformId,
}: {
esClient: ElasticsearchClient;
transformId: string;
}): Promise<TransformStartTransformResponse | void> => {
const transformStats = await esClient.transform.getTransformStats({
transform_id: transformId,
});
if (transformStats.count <= 0) {
throw new Error(
`Unable to find transform status for [${transformId}] while attempting to schedule now`
);
}

if (hasTransformStarted(transformStats.transforms[0])) {
await esClient.transform.scheduleNowTransform({
transform_id: transformId,
});
} else {
await esClient.transform.startTransform({
transform_id: transformId,
});
}
};

export const scheduleLatestTransformNow = async ({
namespace,
esClient,
}: {
namespace: string;
esClient: ElasticsearchClient;
}): Promise<void> => {
const transformId = getLatestTransformId(namespace);
await scheduleTransformNow({ esClient, transformId });
};
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ export default ({ getService }: FtrProviderContext) => {
transform_id: transformId,
});

expect(transformStats.transforms[0].state).to.eql('started');
expect(transformStats.transforms[0].state).to.eql('stopped');
});

it('should create configuration saved object', async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,16 @@ export default ({ getService }: FtrProviderContext): void => {
);
});

it('starts the latest transform', async () => {
await waitForRiskScoresToBePresent({ es, log, scoreCount: 10 });

const transformStats = await es.transform.getTransformStats({
transform_id: 'risk_score_latest_transform_default',
});

expect(transformStats.transforms[0].state).to.eql('started');
});

describe('disabling and re-enabling the risk engine', () => {
beforeEach(async () => {
await waitForRiskScoresToBePresent({ es, log, scoreCount: 10 });
Expand Down

0 comments on commit 42ba72a

Please sign in to comment.