Skip to content

Commit

Permalink
Addressed review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
Krishna Kondaka committed Jan 12, 2024
1 parent 0f44e72 commit 734f261
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
Expand Down Expand Up @@ -130,7 +131,7 @@ void three_pipelines_with_route_and_multiple_records() {
.untilAsserted(() -> {
List<Record<Event>> outputRecords = inMemorySinkAccessor.get(IN_MEMORY_IDENTIFIER);
assertThat(outputRecords, not(empty()));
assertThat(outputRecords.size(), equalTo(numRecords));
assertThat(outputRecords.size(), lessThanOrEqualTo(numRecords));
});
assertTrue(inMemorySourceAccessor.getAckReceived());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ three-pipelines-route-test-1:
acknowledgments: true
route:
- 2xx_route: '/status >= 200 and /status < 300'
- other_route: '/status >= 300 or /status < 200'
- other_route: '/status >= 300'
sink:
- pipeline:
name: "three-pipelines-route-test-2"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
public class Router {
private final RouteEventEvaluator routeEventEvaluator;
private final DataFlowComponentRouter dataFlowComponentRouter;
private final Consumer<Event> unroutedEventHandler;
private final Consumer<Event> noRouteHandler;

Router(final RouteEventEvaluator routeEventEvaluator, final DataFlowComponentRouter dataFlowComponentRouter, final Consumer<Event> handler) {
Router(final RouteEventEvaluator routeEventEvaluator, final DataFlowComponentRouter dataFlowComponentRouter, final Consumer<Event> noRouteHandler) {
this.routeEventEvaluator = Objects.requireNonNull(routeEventEvaluator);
this.dataFlowComponentRouter = dataFlowComponentRouter;
this.unroutedEventHandler = handler;
this.noRouteHandler = noRouteHandler;
}

public <C> void route(
Expand All @@ -41,11 +41,13 @@ public <C> void route(
Objects.requireNonNull(componentRecordsConsumer);

final Map<Record, Set<String>> recordsToRoutes = routeEventEvaluator.evaluateEventRoutes(allRecords);

// If there are any events that are not getting routed to any sink, invoke no route handler on it.
for (Map.Entry<Record, Set<String>> entry : recordsToRoutes.entrySet()) {
if (entry.getValue().size() == 0) {
Record record = entry.getKey();
if (record.getData() instanceof Event) {
unroutedEventHandler.accept((Event)record.getData());
if ((record.getData() instanceof Event) && (noRouteHandler != null)) {
noRouteHandler.accept((Event)record.getData());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,32 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;

import java.util.Collections;
import java.util.Set;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.Event;

import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mockConstruction;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.mockito.Mock;
import org.mockito.MockedConstruction;
import org.mockito.junit.jupiter.MockitoExtension;

import java.util.Collections;
import java.util.Set;
import java.util.function.Consumer;

@ExtendWith(MockitoExtension.class)
class RouterFactoryTest {

@Mock
private ExpressionEvaluator expressionEvaluator;
private Set<ConditionalRoute> routes;
private Consumer<Event> consumer;

@BeforeEach
void setUp() {
Expand Down Expand Up @@ -58,4 +66,20 @@ void createRouter_returns_new_Router_with_empty_routes() {

assertThat(router, notNullValue());
}

@Test
void test_createRouterWithUnroutedHandler() {
try (final MockedConstruction<Router> ignored =
mockConstruction(Router.class, (mock, context) -> {
consumer = (Consumer<Event>) context.arguments().get(2);
})) {
Event event = mock(Event.class);
EventHandle eventHandle = mock(EventHandle.class);
when(event.getEventHandle()).thenReturn(eventHandle);
final Router router = createObjectUnderTest().createRouter(routes);
consumer.accept(event);
verify(event.getEventHandle()).release(true);
verify(eventHandle).release(true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -34,6 +35,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.times;
import static org.mockito.ArgumentMatchers.any;

@ExtendWith(MockitoExtension.class)
class RouterTest {
Expand All @@ -49,6 +51,8 @@ class RouterTest {
@Mock
private RouterGetRecordStrategy getRecordStrategy;

private Consumer<Event> noRouteHandler;

private Collection<Record> recordsIn;

private static class TestComponent {
Expand All @@ -62,7 +66,9 @@ void setUp() {
}

private Router createObjectUnderTest() {
return new Router(routeEventEvaluator, dataFlowComponentRouter, event -> event.getEventHandle().release(true));
//noRouteHandler = event -> event.getEventHandle().release(true);
noRouteHandler = mock(Consumer.class);
return new Router(routeEventEvaluator, dataFlowComponentRouter, noRouteHandler);
}

@Test
Expand Down Expand Up @@ -150,21 +156,24 @@ void route_with_multiple_DataFlowComponents() {
@Nested
class WithUnroutedRecords {
@Test
void route_with_multiple_DataFlowComponents2() {
void route_with_multiple_DataFlowComponents_with_unrouted_events() {
Event event1 = mock(Event.class);
Event event2 = mock(Event.class);
Event event3 = mock(Event.class);
EventHandle eventHandle3 = mock(EventHandle.class);
Record record1 = mock(Record.class);
Record record2 = mock(Record.class);
Record record3 = mock(Record.class);
Record record4 = mock(Record.class);
when(record3.getData()).thenReturn(event3);
Object notAnEvent = mock(Object.class);
when(record4.getData()).thenReturn(notAnEvent);
List<Record> recordsIn = List.of(record1, record2, record3);
Map<Record, Set<String>> recordsToRoutes = new HashMap<>();
recordsToRoutes.put(record1, Set.of(UUID.randomUUID().toString()));
recordsToRoutes.put(record2, Set.of(UUID.randomUUID().toString()));
recordsToRoutes.put(record3, Set.of());
when(event3.getEventHandle()).thenReturn(eventHandle3);
recordsToRoutes.put(record4, Set.of());
when(routeEventEvaluator.evaluateEventRoutes(recordsIn)).thenReturn(recordsToRoutes);
dataFlowComponents = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Expand All @@ -177,7 +186,9 @@ void route_with_multiple_DataFlowComponents2() {
for (DataFlowComponent<TestComponent> dataFlowComponent : dataFlowComponents) {
verify(dataFlowComponentRouter).route(recordsIn, dataFlowComponent, recordsToRoutes, getRecordStrategy, componentRecordsConsumer);
}
verify(event3.getEventHandle()).release(true);
// Verify noRouteHandler gets invoked only for record3 and not
// for record4, because record4 has non-Event type data
verify(noRouteHandler, times(1)).accept(any());
}
}

Expand Down

0 comments on commit 734f261

Please sign in to comment.