Skip to content

Commit

Permalink
Modified config option to be local_mode
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
Krishna Kondaka committed Mar 20, 2024
1 parent a158645 commit e9f8bdc
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class AggregateProcessor extends AbstractProcessor<Record<Event>, Record<
private final AggregateAction aggregateAction;

private boolean forceConclude = false;
private boolean localOnly = false;
private boolean localMode = false;
private final String whenCondition;
private final ExpressionEvaluator expressionEvaluator;

Expand All @@ -70,7 +70,7 @@ public AggregateProcessor(final AggregateProcessorConfig aggregateProcessorConfi
this.actionHandleEventsOutCounter = pluginMetrics.counter(ACTION_HANDLE_EVENTS_OUT);
this.actionHandleEventsDroppedCounter = pluginMetrics.counter(ACTION_HANDLE_EVENTS_DROPPED);
this.whenCondition = aggregateProcessorConfig.getWhenCondition();
this.localOnly = aggregateProcessorConfig.getLocalOnly();
this.localMode = aggregateProcessorConfig.getLocalMode();

pluginMetrics.gauge(CURRENT_AGGREGATE_GROUPS, aggregateGroupManager, AggregateGroupManager::getAllGroupsSize);
}
Expand Down Expand Up @@ -155,7 +155,7 @@ public boolean isApplicableEventForPeerForwarding(Event event) {
if (whenCondition == null) {
return true;
}
if (localOnly) {
if (localMode) {
return false;
}
return expressionEvaluator.evaluateConditional(whenCondition, event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ public class AggregateProcessorConfig {
@NotNull
private PluginModel aggregateAction;

@JsonProperty("local_only")
@JsonProperty("local_mode")
@NotNull
private Boolean localOnly = false;
private Boolean localMode = false;

@JsonProperty("aggregate_when")
private String whenCondition;
Expand All @@ -47,8 +47,8 @@ public String getWhenCondition() {
return whenCondition;
}

public Boolean getLocalOnly() {
return localOnly;
public Boolean getLocalMode() {
return localMode;
}

public PluginModel getAggregateAction() { return aggregateAction; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ public void testDefault() {
final AggregateProcessorConfig aggregateConfig = new AggregateProcessorConfig();

assertThat(aggregateConfig.getGroupDuration(), equalTo(Duration.ofSeconds(AggregateProcessorConfig.DEFAULT_GROUP_DURATION_SECONDS)));
assertThat(aggregateConfig.getLocalOnly(), equalTo(false));
assertThat(aggregateConfig.getLocalMode(), equalTo(false));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ private AggregateProcessor createObjectUnderTest() {
@BeforeEach
void setUp() {
when(aggregateProcessorConfig.getAggregateAction()).thenReturn(actionConfiguration);
when(aggregateProcessorConfig.getLocalOnly()).thenReturn(false);
when(aggregateProcessorConfig.getLocalMode()).thenReturn(false);
when(actionConfiguration.getPluginName()).thenReturn(UUID.randomUUID().toString());
when(actionConfiguration.getPluginSettings()).thenReturn(Collections.emptyMap());
when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class)))
Expand Down Expand Up @@ -280,7 +280,7 @@ void handleEvent_returning_with_condition_eliminates_one_record_local_only() {
when(expressionEvaluator.evaluateConditional(condition, firstEvent)).thenReturn(true);
when(expressionEvaluator.evaluateConditional(condition, secondEvent)).thenReturn(false);
when(aggregateProcessorConfig.getWhenCondition()).thenReturn(condition);
when(aggregateProcessorConfig.getLocalOnly()).thenReturn(true);
when(aggregateProcessorConfig.getLocalMode()).thenReturn(true);
final AggregateProcessor objectUnderTest = createObjectUnderTest();
when(aggregateGroupManager.getGroupsToConclude(eq(false))).thenReturn(Collections.emptyList());
when(aggregateActionResponse.getEvent()).thenReturn(event);
Expand Down

0 comments on commit e9f8bdc

Please sign in to comment.