Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FSSDK-11003] disposable service implementation #981

Merged
Merged
44 changes: 37 additions & 7 deletions lib/event_processor/batch_event_processor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ describe('QueueingEventProcessor', async () => {

processor.start();
await processor.onRunning();
for(let i = 0; i < 100; i++) {
for(let i = 0; i < 99; i++) {
const event = createImpressionEvent(`id-${i}`);
await processor.process(event);
}
Expand All @@ -190,30 +190,33 @@ describe('QueueingEventProcessor', async () => {
await processor.onRunning();

let events: ProcessableEvent[] = [];
for(let i = 0; i < 100; i++) {
for(let i = 0; i < 99; i++){
const event = createImpressionEvent(`id-${i}`);
events.push(event);
await processor.process(event);
}

expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(0);

let event = createImpressionEvent('id-100');
let event = createImpressionEvent('id-99');
events.push(event);
await processor.process(event);

expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(1);
expect(eventDispatcher.dispatchEvent.mock.calls[0][0]).toEqual(buildLogEvent(events));

events = [event];
for(let i = 101; i < 200; i++) {
events = [];

for(let i = 100; i < 199; i++) {
const event = createImpressionEvent(`id-${i}`);
events.push(event);
await processor.process(event);
}

expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(1);

event = createImpressionEvent('id-200');
event = createImpressionEvent('id-199');
events.push(event);
await processor.process(event);

expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(2);
Expand Down Expand Up @@ -257,6 +260,33 @@ describe('QueueingEventProcessor', async () => {
expect(eventDispatcher.dispatchEvent.mock.calls[1][0]).toEqual(buildLogEvent([newEvent]));
});

it('should flush queue immediately regardless of batchSize, if event processor is disposable', async () => {
const eventDispatcher = getMockDispatcher();
const mockDispatch: MockInstance<typeof eventDispatcher.dispatchEvent> = eventDispatcher.dispatchEvent;
mockDispatch.mockResolvedValue({});

const dispatchRepeater = getMockRepeater();

const processor = new BatchEventProcessor({
eventDispatcher,
dispatchRepeater,
batchSize: 100,
});

processor.makeDisposable();
processor.start();
await processor.onRunning();

const events: ProcessableEvent[] = [];
const event = createImpressionEvent('id-1');
events.push(event);
await processor.process(event);

expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(1);
expect(eventDispatcher.dispatchEvent.mock.calls[0][0]).toEqual(buildLogEvent(events));
expect(dispatchRepeater.reset).toHaveBeenCalledTimes(1);
});

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add tests for the following as well when disposable

  1. FailedEventRepeater is not started
  2. dispatchRepeater is not started
  3. maxRetry is limited

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding your number 2 - Current logic is - dispatchRepeater starts, dispatch immediately and then stops. Isn't this expected ? Or am I missing something here ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

according to the refactored logic in this PR, if batchSize == 1, dispatch repeater should never start. It will immediately dispatch the event, the repeater start is in the else branch

it('should store the event in the eventStore with increasing ids', async () => {
const eventDispatcher = getMockDispatcher();
const eventStore = getMockSyncCache<EventWithId>();
Expand Down
30 changes: 23 additions & 7 deletions lib/event_processor/batch_event_processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ import { areEventContextsEqual } from "./event_builder/user_event";
import { EVENT_PROCESSOR_STOPPED, FAILED_TO_DISPATCH_EVENTS, FAILED_TO_DISPATCH_EVENTS_WITH_ARG } from "../exception_messages";
import { sprintf } from "../utils/fns";

export const DEFAULT_MIN_BACKOFF = 1000;
export const DEFAULT_MAX_BACKOFF = 32000;

export type EventWithId = {
id: string;
event: ProcessableEvent;
Expand Down Expand Up @@ -209,7 +212,8 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
if (!batch) {
return;
}


this.dispatchRepeater.reset();
this.dispatchBatch(batch, closing);
}

Expand All @@ -218,10 +222,6 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
return Promise.reject('Event processor is not running');
}

if (this.eventQueue.length == this.batchSize) {
this.flush();
}

const eventWithId = {
id: this.idGenerator.getId(),
event: event,
Expand All @@ -232,13 +232,30 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
if (this.eventQueue.length > 0 && !areEventContextsEqual(this.eventQueue[0].event, event)) {
this.flush();
}
this.eventQueue.push(eventWithId);

this.eventQueue.push(eventWithId);

if (this.eventQueue.length == this.batchSize) {
this.flush();
} else if (!this.dispatchRepeater.isRunning()) {
this.dispatchRepeater.start();
}

}

start(): void {
if (!this.isNew()) {
return;
}
if(this.disposable) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we override the makeDisposable method and do this inside that

this.batchSize = 1;
this.retryConfig = {
maxRetries: Math.min(this.retryConfig?.maxRetries ?? 5, 5),
backoffProvider:
this.retryConfig?.backoffProvider ||
(() => new ExponentialBackoff(DEFAULT_MIN_BACKOFF, DEFAULT_MAX_BACKOFF, 500)),
};
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on old line 245 below, we should check disposable before starting failedEventRepeater.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we will not start the "failedEventRepeater" if event processor is disposable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

super.start();
this.state = ServiceState.Running;
this.dispatchRepeater.start();
Expand All @@ -254,7 +271,6 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
}

if (this.isNew()) {
// TOOD: replace message with imported constants
this.startPromise.reject(new Error(EVENT_PROCESSOR_STOPPED));
}

Expand Down
4 changes: 2 additions & 2 deletions lib/event_processor/event_processor_factory.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
*/

import { describe, it, expect, beforeEach, vi, MockInstance } from 'vitest';
import { DEFAULT_EVENT_BATCH_SIZE, DEFAULT_EVENT_FLUSH_INTERVAL, DEFAULT_MAX_BACKOFF, DEFAULT_MIN_BACKOFF, getBatchEventProcessor } from './event_processor_factory';
import { BatchEventProcessor, BatchEventProcessorConfig, EventWithId } from './batch_event_processor';
import { DEFAULT_EVENT_BATCH_SIZE, DEFAULT_EVENT_FLUSH_INTERVAL, getBatchEventProcessor } from './event_processor_factory';
import { BatchEventProcessor, BatchEventProcessorConfig, EventWithId,DEFAULT_MAX_BACKOFF, DEFAULT_MIN_BACKOFF } from './batch_event_processor';
import { ExponentialBackoff, IntervalRepeater } from '../utils/repeater/repeater';
import { getMockSyncCache } from '../tests/mock/mock_cache';
import { LogLevel } from '../modules/logging';
Expand Down
4 changes: 1 addition & 3 deletions lib/event_processor/event_processor_factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@ import { StartupLog } from "../service";
import { ExponentialBackoff, IntervalRepeater } from "../utils/repeater/repeater";
import { EventDispatcher } from "./event_dispatcher/event_dispatcher";
import { EventProcessor } from "./event_processor";
import { BatchEventProcessor, EventWithId, RetryConfig } from "./batch_event_processor";
import { BatchEventProcessor, DEFAULT_MAX_BACKOFF, DEFAULT_MIN_BACKOFF, EventWithId, RetryConfig } from "./batch_event_processor";
import { AsyncPrefixCache, Cache, SyncPrefixCache } from "../utils/cache/cache";

export const DEFAULT_EVENT_BATCH_SIZE = 10;
export const DEFAULT_EVENT_FLUSH_INTERVAL = 1000;
export const DEFAULT_EVENT_MAX_QUEUE_SIZE = 10000;
export const DEFAULT_MIN_BACKOFF = 1000;
export const DEFAULT_MAX_BACKOFF = 32000;
export const FAILED_EVENT_RETRY_INTERVAL = 20 * 1000;
export const EVENT_STORE_PREFIX = 'optly_event:';

Expand Down
1 change: 0 additions & 1 deletion lib/exception_messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ export const DATAFILE_MANAGER_STOPPED = 'Datafile manager stopped before it coul
export const DATAFILE_MANAGER_FAILED_TO_START = 'Datafile manager failed to start';
export const FAILED_TO_FETCH_DATAFILE = 'Failed to fetch datafile';
export const FAILED_TO_STOP = 'Failed to stop';
export const YOU_MUST_PROVIDE_DATAFILE_IN_SSR = 'You must provide datafile in SSR';
export const YOU_MUST_PROVIDE_AT_LEAST_ONE_OF_SDKKEY_OR_DATAFILE = 'You must provide at least one of sdkKey or datafile';
export const RETRY_CANCELLED = 'Retry cancelled';
export const REQUEST_TIMEOUT = 'Request timeout';
Expand Down
33 changes: 33 additions & 0 deletions lib/odp/event_manager/odp_event_manager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,39 @@ describe('DefaultOdpEventManager', () => {
}
});

it('should flush the queue immediately if disposable, regardless of the batchSize', async () => {
const apiManager = getMockApiManager();
const repeater = getMockRepeater()
apiManager.sendEvents.mockResolvedValue({ statusCode: 200 });
// spy on the flush method
const odpEventManager = new DefaultOdpEventManager({
repeater,
apiManager: apiManager,
batchSize: 10,
retryConfig: {
maxRetries: 3,
backoffProvider: vi.fn(),
},
});

odpEventManager.updateConfig({
integrated: true,
odpConfig: config,
});
odpEventManager.makeDisposable();
odpEventManager.start();

await expect(odpEventManager.onRunning()).resolves.not.toThrow();

const event = makeEvent(0);
odpEventManager.sendEvent(event);
await exhaustMicrotasks();

expect(apiManager.sendEvents).toHaveBeenCalledTimes(1);
expect(apiManager.sendEvents).toHaveBeenNthCalledWith(1, config, [event]);
expect(repeater.reset).toHaveBeenCalledTimes(1);
})

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also add tests for repeater stop when disposable = true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the actual implementationrepeater.reset calls repeater.stop. But in the mock version of the repeater thats not the case. Whats your suggestion on that? Should I update the mock to support this ? (Not meaningful IMO).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see we have assertions for repeater.reset here. We can ignore this comment

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can add a test to assert the repeater is not started, similar to eventProcessor

it('drops events and logs if the state is not running', async () => {
const apiManager = getMockApiManager();
apiManager.sendEvents.mockResolvedValue({ statusCode: 200 });
Expand Down
5 changes: 5 additions & 0 deletions lib/odp/event_manager/odp_event_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ export class DefaultOdpEventManager extends BaseService implements OdpEventManag
if (!this.isNew) {
return;
}
// Override for disposable event manager
if(this.disposable) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we ovverride makeDisposable() and do this inside that?

this.retryConfig.maxRetries = Math.min(this.retryConfig.maxRetries, 5);
this.batchSize = 1
}

super.start();
if (this.odpIntegrationConfig) {
Expand Down
1 change: 1 addition & 0 deletions lib/odp/odp_manager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ const getMockOdpEventManager = () => {
getState: vi.fn(),
updateConfig: vi.fn(),
sendEvent: vi.fn(),
makeDisposable: vi.fn(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add test that it makes the eventManager disposable when its makeDisposable() is called?

};
};

Expand Down
4 changes: 4 additions & 0 deletions lib/odp/odp_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ export class DefaultOdpManager extends BaseService implements OdpManager {
if (!this.isNew()) {
return;
}

if(this.disposable) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we ovverride the makeDisposable() method and do this inside that?

this.eventManager.makeDisposable();
}

this.state = ServiceState.Starting;

Expand Down
15 changes: 6 additions & 9 deletions lib/optimizely/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,10 @@
import { describe, it, expect, vi } from 'vitest';
import Optimizely from '.';
import { getMockProjectConfigManager } from '../tests/mock/mock_project_config_manager';
import * as logger from '../plugins/logger';
import * as jsonSchemaValidator from '../utils/json_schema_validator';
import { LOG_LEVEL } from '../common_exports';
import { createNotificationCenter } from '../notification_center';
import testData from '../tests/test_data';
import { getForwardingEventProcessor } from '../event_processor/forwarding_event_processor';
import { LoggerFacade } from '../modules/logging';
import { createProjectConfig } from '../project_config/project_config';
import { getMockLogger } from '../tests/mock/mock_logger';

Expand All @@ -39,12 +36,12 @@ describe('Optimizely', () => {

const notificationCenter = createNotificationCenter({ logger, errorHandler });

it('should pass ssr to the project config manager', () => {
it('should pass disposable option to the project config manager', () => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should pass the disposable option to eventProcessor and odpManager as well, can we modify this test to assert those as well?

const projectConfigManager = getMockProjectConfigManager({
initConfig: createProjectConfig(testData.getTestProjectConfig()),
});

vi.spyOn(projectConfigManager, 'setSsr');
vi.spyOn(projectConfigManager, 'makeDisposable');

const instance = new Optimizely({
clientEngine: 'node-sdk',
Expand All @@ -54,16 +51,16 @@ describe('Optimizely', () => {
logger,
notificationCenter,
eventProcessor,
isSsr: true,
disposable: true,
isValidInstance: true,
});

expect(projectConfigManager.setSsr).toHaveBeenCalledWith(true);
expect(projectConfigManager.makeDisposable).toHaveBeenCalled();
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
expect(instance.getProjectConfig()).toBe(projectConfigManager.config);
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
expect(projectConfigManager.isSsr).toBe(true);
expect(projectConfigManager.disposable).toBe(true);
});
});
18 changes: 12 additions & 6 deletions lib/optimizely/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,20 @@ export default class Optimizely implements Client {
this.errorHandler = config.errorHandler;
this.isOptimizelyConfigValid = config.isValidInstance;
this.logger = config.logger;
this.projectConfigManager = config.projectConfigManager;
this.notificationCenter = config.notificationCenter;
this.odpManager = config.odpManager;
this.vuidManager = config.vuidManager;
this.eventProcessor = config.eventProcessor;

if(config.disposable) {
this.projectConfigManager.makeDisposable();
this.eventProcessor?.makeDisposable();
this.odpManager?.makeDisposable();
}

let decideOptionsArray = config.defaultDecideOptions ?? [];

if (!Array.isArray(decideOptionsArray)) {
this.logger.log(LOG_LEVEL.DEBUG, INVALID_DEFAULT_DECIDE_OPTIONS, MODULE_NAME);
decideOptionsArray = [];
Expand All @@ -160,7 +170,6 @@ export default class Optimizely implements Client {
}
});
this.defaultDecideOptions = defaultDecideOptions;
this.projectConfigManager = config.projectConfigManager;

this.disposeOnUpdate = this.projectConfigManager.onUpdate((configObj: projectConfig.ProjectConfig) => {
this.logger.log(
Expand All @@ -176,7 +185,6 @@ export default class Optimizely implements Client {
this.updateOdpSettings();
});

this.projectConfigManager.setSsr(config.isSsr)
this.projectConfigManager.start();
const projectConfigManagerRunningPromise = this.projectConfigManager.onRunning();

Expand All @@ -198,10 +206,6 @@ export default class Optimizely implements Client {
UNSTABLE_conditionEvaluators: config.UNSTABLE_conditionEvaluators,
});

this.notificationCenter = config.notificationCenter;

this.eventProcessor = config.eventProcessor;

this.eventProcessor?.start();
const eventProcessorRunningPromise = this.eventProcessor ? this.eventProcessor.onRunning() :
Promise.resolve(undefined);
Expand All @@ -210,6 +214,8 @@ export default class Optimizely implements Client {
this.notificationCenter.sendNotifications(NOTIFICATION_TYPES.LOG_EVENT, event);
});

this.odpManager?.start();

this.readyPromise = Promise.all([
projectConfigManagerRunningPromise,
eventProcessorRunningPromise,
Expand Down
19 changes: 19 additions & 0 deletions lib/project_config/polling_datafile_manager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,25 @@ describe('PollingDatafileManager', () => {
expect(repeater.stop).toHaveBeenCalled();
});

it('stops repeater after successful initialization if disposable is true', async () => {
const repeater = getMockRepeater();
const requestHandler = getMockRequestHandler();
const mockResponse = getMockAbortableRequest(Promise.resolve({ statusCode: 200, body: '{"foo": "bar"}', headers: {} }));
requestHandler.makeRequest.mockReturnValueOnce(mockResponse);

const manager = new PollingDatafileManager({
repeater,
requestHandler,
sdkKey: 'keyThatExists',
});
manager.makeDisposable();
manager.start();
repeater.execute(0);

await expect(manager.onRunning()).resolves.not.toThrow();
expect(repeater.stop).toHaveBeenCalled();
});

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also add tests that stops retrying to initialize after 5 failed attempts if disposable is true?

it('saves the datafile in cache', async () => {
const repeater = getMockRepeater();
const requestHandler = getMockRequestHandler();
Expand Down
Loading
Loading