Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add unit tests for History Iterator in Replayer #980

Merged
merged 7 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -231,14 +231,8 @@ public DecisionEvents next() {
}
decisionEvents.add(events.next());
}
DecisionEvents result =
new DecisionEvents(
newEvents,
decisionEvents,
replay,
replayCurrentTimeMilliseconds,
nextDecisionEventId);
return result;
return new DecisionEvents(
newEvents, decisionEvents, replay, replayCurrentTimeMilliseconds, nextDecisionEventId);
}
}

Expand Down
36 changes: 13 additions & 23 deletions src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static com.uber.cadence.worker.NonDeterministicWorkflowPolicy.FailWorkflow;

import com.google.common.annotations.VisibleForTesting;
import com.uber.cadence.EventType;
import com.uber.cadence.GetWorkflowExecutionHistoryRequest;
import com.uber.cadence.GetWorkflowExecutionHistoryResponse;
Expand Down Expand Up @@ -180,9 +181,9 @@ private void processEvent(HistoryEvent event) {
context.handleChildWorkflowExecutionTimedOut(event);
break;
case DecisionTaskCompleted:
// NOOP
break;
case DecisionTaskScheduled:
case WorkflowExecutionTimedOut:
case WorkflowExecutionTerminated:
// NOOP
break;
case DecisionTaskStarted:
Expand All @@ -208,12 +209,6 @@ private void processEvent(HistoryEvent event) {
case WorkflowExecutionStarted:
handleWorkflowExecutionStarted(event);
break;
case WorkflowExecutionTerminated:
// NOOP
break;
case WorkflowExecutionTimedOut:
// NOOP
break;
case ActivityTaskScheduled:
decisionsHelper.handleActivityTaskScheduled(event);
break;
Expand All @@ -227,11 +222,8 @@ private void processEvent(HistoryEvent event) {
context.handleMarkerRecorded(event);
break;
case WorkflowExecutionCompleted:
break;
case WorkflowExecutionFailed:
break;
case WorkflowExecutionCanceled:
break;
case WorkflowExecutionContinuedAsNew:
break;
case TimerStarted:
Expand Down Expand Up @@ -410,7 +402,7 @@ private Map<String, WorkflowQueryResult> getQueryResults(Map<String, WorkflowQue
return queries
.entrySet()
.stream()
.collect(Collectors.toMap(q -> q.getKey(), q -> queryWorkflow(q.getValue())));
.collect(Collectors.toMap(Map.Entry::getKey, q -> queryWorkflow(q.getValue())));
}

private WorkflowQueryResult queryWorkflow(WorkflowQuery query) {
Expand Down Expand Up @@ -632,9 +624,9 @@ private class DecisionTaskWithHistoryIteratorImpl implements DecisionTaskWithHis
private final Duration retryServiceOperationInitialInterval = Duration.ofMillis(200);
private final Duration retryServiceOperationMaxInterval = Duration.ofSeconds(4);
private final Duration paginationStart = Duration.ofMillis(System.currentTimeMillis());
private Duration decisionTaskStartToCloseTimeout;
private final Duration decisionTaskStartToCloseTimeout;

private final Duration decisionTaskRemainingTime() {
private Duration decisionTaskRemainingTime() {
Duration passed = Duration.ofMillis(System.currentTimeMillis()).minus(paginationStart);
return decisionTaskStartToCloseTimeout.minus(passed);
}
Expand All @@ -643,6 +635,7 @@ private final Duration decisionTaskRemainingTime() {
private Iterator<HistoryEvent> current;
private byte[] nextPageToken;

@VisibleForTesting
DecisionTaskWithHistoryIteratorImpl(
PollForDecisionTaskResponse task, Duration decisionTaskStartToCloseTimeout) {
this.task = Objects.requireNonNull(task);
Expand Down Expand Up @@ -692,7 +685,7 @@ public HistoryEvent next() {
.setExpiration(decisionTaskRemainingTime)
.setInitialInterval(retryServiceOperationInitialInterval)
.setMaximumInterval(retryServiceOperationMaxInterval)
.build();
.validateBuildWithDefaults();

GetWorkflowExecutionHistoryRequest request = new GetWorkflowExecutionHistoryRequest();
request
Expand All @@ -715,14 +708,11 @@ public HistoryEvent next() {
}
if (!current.hasNext()) {
log.error(
"GetWorkflowExecutionHistory returns an empty history, maybe a bug in server, workflowID:"
+ request.execution.workflowId
+ ", runID:"
+ request.execution.runId
+ ", domain:"
+ request.domain
+ " token:"
+ Arrays.toString(request.getNextPageToken()));
"GetWorkflowExecutionHistory returns an empty history, maybe a bug in server, workflowID:{}, runID:{}, domain:{} token:{}",
request.execution.workflowId,
request.execution.runId,
request.domain,
Arrays.toString(request.getNextPageToken()));
throw new Error(
"GetWorkflowExecutionHistory return empty history, maybe a bug in server");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.uber.cadence.internal.replay;

import static org.junit.Assert.*;
import static org.mockito.Mockito.*;

import com.uber.cadence.*;
import com.uber.cadence.client.WorkflowClientOptions;
import com.uber.cadence.internal.worker.SingleWorkerOptions;
import com.uber.cadence.serviceclient.IWorkflowService;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.*;
import org.apache.thrift.TException;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

public class ReplaceDeciderDecisionTaskWithHistoryIteratorTest {
@Mock private IWorkflowService mockService;

@Mock private DecisionContextImpl mockContext;

@Mock private DecisionsHelper mockedHelper;

private static final int MAXIMUM_PAGE_SIZE = 10000;
private final String WORKFLOW_ID = "testWorkflowId";
private final String RUN_ID = "testRunId";
private final String DOMAIN = "testDomain";
private final String START_PAGE_TOKEN = "testPageToken";
private final WorkflowExecution WORKFLOW_EXECUTION =
new WorkflowExecution().setWorkflowId(WORKFLOW_ID).setRunId(RUN_ID);
private final HistoryEvent START_EVENT =
new HistoryEvent()
.setWorkflowExecutionStartedEventAttributes(new WorkflowExecutionStartedEventAttributes())
.setEventId(1);
private final History HISTORY = new History().setEvents(Collections.singletonList(START_EVENT));
private final PollForDecisionTaskResponse task =
new PollForDecisionTaskResponse()
.setWorkflowExecution(WORKFLOW_EXECUTION)
.setHistory(HISTORY)
.setNextPageToken(START_PAGE_TOKEN.getBytes());

private Object iterator;

private void setupDecisionTaskWithHistoryIteratorImpl() {
try {
// Find the inner class first
Class<?> innerClass = findDecisionTaskWithHistoryIteratorImplClass();

// Get the constructor with the specific parameter types
Constructor<?> constructor =
innerClass.getDeclaredConstructor(
ReplayDecider.class, PollForDecisionTaskResponse.class, Duration.class);

when(mockedHelper.getTask()).thenReturn(task);
when(mockContext.getDomain()).thenReturn(DOMAIN);

// Create an instance of the outer class
ReplayDecider outerInstance =
new ReplayDecider(
mockService,
DOMAIN,
new WorkflowType().setName("testWorkflow"),
null,
mockedHelper,
SingleWorkerOptions.newBuilder()
.setMetricsScope(WorkflowClientOptions.defaultInstance().getMetricsScope())
.build(),
null);

// Create the instance
iterator = constructor.newInstance(outerInstance, task, Duration.ofSeconds(10));
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("Failed to set up test: " + e.getMessage(), e);
}
}

// Helper method to find the inner class
private Class<?> findDecisionTaskWithHistoryIteratorImplClass() {
for (Class<?> declaredClass : ReplayDecider.class.getDeclaredClasses()) {
if (declaredClass.getSimpleName().equals("DecisionTaskWithHistoryIteratorImpl")) {
return declaredClass;
}
}
throw new RuntimeException("Could not find DecisionTaskWithHistoryIteratorImpl inner class");
}

@Before
public void setUp() {
MockitoAnnotations.openMocks(this);
setupDecisionTaskWithHistoryIteratorImpl();
}

@Test
public void testGetHistoryWithSinglePageOfEvents()
throws TException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
// Arrange
List<HistoryEvent> events = Arrays.asList(createMockHistoryEvent(2), createMockHistoryEvent(3));
History mockHistory = new History().setEvents(events);
when(mockService.GetWorkflowExecutionHistory(
new GetWorkflowExecutionHistoryRequest()
.setDomain(DOMAIN)
.setNextPageToken(START_PAGE_TOKEN.getBytes())
.setExecution(WORKFLOW_EXECUTION)
.setMaximumPageSize(MAXIMUM_PAGE_SIZE)))
.thenReturn(new GetWorkflowExecutionHistoryResponse().setHistory(mockHistory));

// Act & Assert
Method wrapperMethod = iterator.getClass().getMethod("getHistory");

Object result = wrapperMethod.invoke(iterator);
Iterator<HistoryEvent> historyIterator = (Iterator<HistoryEvent>) result;
assertTrue(historyIterator.hasNext());
assertEquals(START_EVENT.getEventId(), historyIterator.next().getEventId());
assertTrue(historyIterator.hasNext());
assertEquals(events.get(0).getEventId(), historyIterator.next().getEventId());
assertTrue(historyIterator.hasNext());
assertEquals(events.get(1).getEventId(), historyIterator.next().getEventId());
assertFalse(historyIterator.hasNext());
}

@Test
public void testGetHistoryWithMultiplePages()
throws TException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
// First page events
List<HistoryEvent> firstPageEvents =
Arrays.asList(createMockHistoryEvent(1), createMockHistoryEvent(2));
History firstHistory = new History().setEvents(firstPageEvents);
String firstPageToken = "firstPageToken";
when(mockService.GetWorkflowExecutionHistory(
eq(
new GetWorkflowExecutionHistoryRequest()
.setDomain(DOMAIN)
.setNextPageToken(START_PAGE_TOKEN.getBytes())
.setExecution(WORKFLOW_EXECUTION)
.setMaximumPageSize(MAXIMUM_PAGE_SIZE))))
.thenReturn(
new GetWorkflowExecutionHistoryResponse()
.setHistory(firstHistory)
.setNextPageToken(firstPageToken.getBytes()));

// Second page events
List<HistoryEvent> secondPageEvents =
Arrays.asList(createMockHistoryEvent(3), createMockHistoryEvent(4));
History secondHistory = new History().setEvents(secondPageEvents);
when(mockService.GetWorkflowExecutionHistory(
eq(
new GetWorkflowExecutionHistoryRequest()
.setDomain(DOMAIN)
.setNextPageToken(firstPageToken.getBytes())
.setExecution(WORKFLOW_EXECUTION)
.setMaximumPageSize(MAXIMUM_PAGE_SIZE))))
.thenReturn(new GetWorkflowExecutionHistoryResponse().setHistory(secondHistory));

// Act & Assert
Method wrapperMethod = iterator.getClass().getMethod("getHistory");

Object result = wrapperMethod.invoke(iterator);
Iterator<HistoryEvent> historyIterator = (Iterator<HistoryEvent>) result;
// Check first page events
assertEquals(START_EVENT.getEventId(), historyIterator.next().getEventId());
assertEquals(firstPageEvents.get(0).getEventId(), historyIterator.next().getEventId());
assertEquals(firstPageEvents.get(1).getEventId(), historyIterator.next().getEventId());

// Check second page events
assertEquals(secondPageEvents.get(0).getEventId(), historyIterator.next().getEventId());
assertEquals(secondPageEvents.get(1).getEventId(), historyIterator.next().getEventId());

assertFalse(historyIterator.hasNext());
}

@Test(expected = Error.class)
public void testGetHistoryFailure()
throws InvocationTargetException, IllegalAccessException, NoSuchMethodException, TException {
when(mockService.GetWorkflowExecutionHistory(
new GetWorkflowExecutionHistoryRequest()
.setDomain(DOMAIN)
.setNextPageToken(START_PAGE_TOKEN.getBytes())
.setExecution(WORKFLOW_EXECUTION)
.setMaximumPageSize(MAXIMUM_PAGE_SIZE)))
.thenThrow(new TException());

// Act & Assert
Method wrapperMethod = iterator.getClass().getMethod("getHistory");

Object result = wrapperMethod.invoke(iterator);
Iterator<HistoryEvent> historyIterator = (Iterator<HistoryEvent>) result;
historyIterator.next();

historyIterator.next(); // This should throw an Error due to timeout
}

@Test(expected = Error.class)
public void testEmptyHistory()
throws InvocationTargetException, IllegalAccessException, NoSuchMethodException, TException {
when(mockService.GetWorkflowExecutionHistory(
new GetWorkflowExecutionHistoryRequest()
.setDomain(DOMAIN)
.setNextPageToken(START_PAGE_TOKEN.getBytes())
.setExecution(WORKFLOW_EXECUTION)
.setMaximumPageSize(MAXIMUM_PAGE_SIZE)))
.thenReturn(
new GetWorkflowExecutionHistoryResponse()
.setHistory(new History().setEvents(new ArrayList<>())));

// Act & Assert
Method wrapperMethod = iterator.getClass().getMethod("getHistory");

Object result = wrapperMethod.invoke(iterator);
Iterator<HistoryEvent> historyIterator = (Iterator<HistoryEvent>) result;
historyIterator.next();

historyIterator.next(); // This should throw an Error due to timeout
}

// Helper method to create mock HistoryEvent
private HistoryEvent createMockHistoryEvent(int eventId) {
return new HistoryEvent().setEventId(eventId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,6 @@ public void testWorkflowServiceWrapperMethodDelegation() throws Exception {
// Prepare test cases
List<MethodTestCase> testCases = prepareMethodTestCases();

System.out.println(testCases);

// Test each method
for (MethodTestCase testCase : testCases) {
try {
Expand Down