Skip to content

Commit

Permalink
changed config option to 'recursive'
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
Krishna Kondaka committed Mar 25, 2024
1 parent 8a7132d commit 3e028c6
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ public class Router {
private final RouteEventEvaluator routeEventEvaluator;
private final DataFlowComponentRouter dataFlowComponentRouter;
private final Consumer<Event> noRouteHandler;
private Set<Record> recordsUnRouted;

Router(final RouteEventEvaluator routeEventEvaluator, final DataFlowComponentRouter dataFlowComponentRouter, final Consumer<Event> noRouteHandler) {
this.routeEventEvaluator = Objects.requireNonNull(routeEventEvaluator);
this.dataFlowComponentRouter = dataFlowComponentRouter;
this.noRouteHandler = noRouteHandler;
this.recordsUnRouted = null;
}

public <C> void route(
Expand All @@ -42,19 +44,37 @@ public <C> void route(
Objects.requireNonNull(componentRecordsConsumer);

final Map<Record, Set<String>> recordsToRoutes = routeEventEvaluator.evaluateEventRoutes(allRecords);
recordsUnRouted = null;

Set<Record> recordsUnRouted = new HashSet<>(allRecords);
boolean allRecordsRouted = false;

for (DataFlowComponent<C> dataFlowComponent : dataFlowComponents) {
if (dataFlowComponent.getRoutes().isEmpty()) {
allRecordsRouted = true;
break;
}
}

if (!allRecordsRouted) {
recordsUnRouted = new HashSet<>(allRecords);
}

for (DataFlowComponent<C> dataFlowComponent : dataFlowComponents) {
dataFlowComponentRouter.route(allRecords, dataFlowComponent, recordsToRoutes, getRecordStrategy, (component, records) -> {
recordsUnRouted.removeAll(records);
if (recordsUnRouted != null) {
for (final Record record: records) {
recordsUnRouted.remove(record);
}
}
componentRecordsConsumer.accept(component, records);
});
}

for (Record record: recordsUnRouted) {
if (record.getData() instanceof Event) {
noRouteHandler.accept((Event)record.getData());
if (recordsUnRouted != null) {
for (Record record: recordsUnRouted) {
if (record.getData() instanceof Event) {
noRouteHandler.accept((Event)record.getData());
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.times;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -170,9 +171,9 @@ void route_with_multiple_DataFlowComponents_with_unrouted_events() {
Record record2 = mock(Record.class);
Record record3 = mock(Record.class);
Record record4 = mock(Record.class);
when(record3.getData()).thenReturn(event3);
lenient().when(record3.getData()).thenReturn(event3);
Object notAnEvent = mock(Object.class);
when(record4.getData()).thenReturn(notAnEvent);
lenient().when(record4.getData()).thenReturn(notAnEvent);
List<Record> recordsIn = List.of(record1, record2, record3, record4);
Map<Record, Set<String>> recordsToRoutes = new HashMap<>();
recordsToRoutes.put(record1, Set.of(UUID.randomUUID().toString()));
Expand All @@ -184,6 +185,8 @@ void route_with_multiple_DataFlowComponents_with_unrouted_events() {
for (int i = 0; i < 5; i++) {
final DataFlowComponent dataFlowComponent = mock(DataFlowComponent.class);
dataFlowComponents.add(dataFlowComponent);
final Set<String> routes = Set.of(UUID.randomUUID().toString());
when(dataFlowComponent.getRoutes()).thenReturn(routes);
}

createObjectUnderTest().route(recordsIn, dataFlowComponents, getRecordStrategy, componentRecordsConsumer);
Expand All @@ -196,6 +199,44 @@ void route_with_multiple_DataFlowComponents_with_unrouted_events() {
verify(noRouteHandler, times(1)).accept(any());
}

@Test
void route_with_multiple_DataFlowComponents_with_unrouted_events_with_allrouted() {
Event event1 = mock(Event.class);
Event event2 = mock(Event.class);
Event event3 = mock(Event.class);
EventHandle eventHandle3 = mock(EventHandle.class);
Record record1 = mock(Record.class);
Record record2 = mock(Record.class);
Record record3 = mock(Record.class);
Record record4 = mock(Record.class);
lenient().when(record3.getData()).thenReturn(event3);
Object notAnEvent = mock(Object.class);
lenient().when(record4.getData()).thenReturn(notAnEvent);
List<Record> recordsIn = List.of(record1, record2, record3, record4);
Map<Record, Set<String>> recordsToRoutes = new HashMap<>();
recordsToRoutes.put(record1, Set.of(UUID.randomUUID().toString()));
recordsToRoutes.put(record2, Set.of(UUID.randomUUID().toString()));
recordsToRoutes.put(record3, Set.of());
recordsToRoutes.put(record4, Set.of());
when(routeEventEvaluator.evaluateEventRoutes(recordsIn)).thenReturn(recordsToRoutes);
dataFlowComponents = new ArrayList<>();
for (int i = 0; i < 5; i++) {
final DataFlowComponent dataFlowComponent = mock(DataFlowComponent.class);
dataFlowComponents.add(dataFlowComponent);
final Set<String> routes = i ==0 ? Collections.emptySet() : Set.of(UUID.randomUUID().toString());
lenient().when(dataFlowComponent.getRoutes()).thenReturn(routes);
}

createObjectUnderTest().route(recordsIn, dataFlowComponents, getRecordStrategy, componentRecordsConsumer);

for (DataFlowComponent<TestComponent> dataFlowComponent : dataFlowComponents) {
verify(dataFlowComponentRouter).route(eq(recordsIn), eq(dataFlowComponent), eq(recordsToRoutes), eq(getRecordStrategy), any(BiConsumer.class));
}
// Verify noRouteHandler gets invoked only for record3 and not
// for record4, because record4 has non-Event type data
verify(noRouteHandler, times(0)).accept(any());
}

@Test
void route_with_multiple_DataFlowComponents_with_unrouted_events_and_sink_with_noroutes() {
Event event1 = mock(Event.class);
Expand Down

0 comments on commit 3e028c6

Please sign in to comment.