Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new example: async-retry-counter #397

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions async-retry-counter/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.camunda.consulting</groupId>
<artifactId>async-retry-counter</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

<version.spring-boot-starter-camunda>8.5.4</version.spring-boot-starter-camunda>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>3.3.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>io.camunda.spring</groupId>
<artifactId>spring-boot-starter-camunda</artifactId>
<version>${version.spring-boot-starter-camunda}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>io.camunda.spring</groupId>
<artifactId>spring-boot-starter-camunda-test</artifactId>
<version>${version.spring-boot-starter-camunda}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<compilerArgs>
<arg>-parameters</arg>
</compilerArgs>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.camunda.consulting;

import io.camunda.zeebe.spring.client.annotation.Deployment;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@Deployment(resources = "classpath*:*.bpmn")
public class ExampleApp {
public static void main(String[] args) {
SpringApplication.run(ExampleApp.class, args);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package com.camunda.consulting;

public class ResponseController {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.camunda.consulting;

import io.camunda.zeebe.client.api.response.ActivatedJob;
import io.camunda.zeebe.client.api.worker.JobClient;
import io.camunda.zeebe.spring.client.annotation.JobWorker;
import io.camunda.zeebe.spring.client.annotation.Variable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.Optional;
import java.util.UUID;

@Component
public class RetryableJobWorker {
private static final Logger LOG = LoggerFactory.getLogger(RetryableJobWorker.class);

@JobWorker(autoComplete = false)
public void sendMessage(
@Variable Integer retryCounter, @Variable String callbackId, JobClient jobClient, ActivatedJob job
) {
LOG.info("retryCounter is {}", retryCounter);
if (retryCounter != null && retryCounter < 1) {
jobClient
.newFailCommand(job)
.retries(0)
.errorMessage("No retries left")
.variable("retryCounter", 1)
.send()
.join();
return;
}
if (callbackId == null) {
callbackId = UUID
.randomUUID()
.toString();
}
LOG.info("Setting callbackId {}", callbackId);
sendMessage();
int nextRetryCounter = Optional
.ofNullable(retryCounter)
.map(i -> i - 1)
.orElse(2);
LOG.info("Setting retryCounter {}", nextRetryCounter);
jobClient
.newCompleteCommand(job.getKey())
.variables(Map.of("retryCounter", nextRetryCounter, "callbackId", callbackId))
.send()
.join();
}

private void sendMessage() {
LOG.info("message sent");
}

}
3 changes: 3 additions & 0 deletions async-retry-counter/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
camunda:
client:
mode: simple
142 changes: 142 additions & 0 deletions async-retry-counter/src/main/resources/asnyc-retry-counter.bpmn
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:zeebe="http://camunda.org/schema/zeebe/1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:modeler="http://camunda.org/schema/modeler/1.0" id="Definitions_0n9tswy" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="5.23.0" modeler:executionPlatform="Camunda Cloud" modeler:executionPlatformVersion="8.5.0">
<bpmn:process id="AsyncRetryCounterProcess" name="Async Retry Counter" isExecutable="true">
<bpmn:startEvent id="AsyncMessagingRequiredStartEvent" name="Async messaging required">
<bpmn:outgoing>Flow_1g8wwcp</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:exclusiveGateway id="Gateway_1uhx8ty">
<bpmn:incoming>Flow_1g8wwcp</bpmn:incoming>
<bpmn:incoming>Flow_0n8m323</bpmn:incoming>
<bpmn:outgoing>Flow_0uebumt</bpmn:outgoing>
</bpmn:exclusiveGateway>
<bpmn:sequenceFlow id="Flow_1g8wwcp" sourceRef="AsyncMessagingRequiredStartEvent" targetRef="Gateway_1uhx8ty" />
<bpmn:sequenceFlow id="Flow_0uebumt" sourceRef="Gateway_1uhx8ty" targetRef="SendMessageTask" />
<bpmn:sendTask id="SendMessageTask" name="Send message">
<bpmn:extensionElements>
<zeebe:taskDefinition type="sendMessage" />
</bpmn:extensionElements>
<bpmn:incoming>Flow_0uebumt</bpmn:incoming>
<bpmn:outgoing>Flow_0ga8sdi</bpmn:outgoing>
</bpmn:sendTask>
<bpmn:sequenceFlow id="Flow_0ga8sdi" sourceRef="SendMessageTask" targetRef="Gateway_0lo9ey2" />
<bpmn:eventBasedGateway id="Gateway_0lo9ey2">
<bpmn:incoming>Flow_0ga8sdi</bpmn:incoming>
<bpmn:outgoing>Flow_0fybakl</bpmn:outgoing>
<bpmn:outgoing>Flow_17w47wv</bpmn:outgoing>
</bpmn:eventBasedGateway>
<bpmn:intermediateCatchEvent id="ResponseReceivedEvent" name="Response received">
<bpmn:incoming>Flow_0fybakl</bpmn:incoming>
<bpmn:outgoing>Flow_0w6kefz</bpmn:outgoing>
<bpmn:messageEventDefinition id="MessageEventDefinition_1fddpep" messageRef="Message_0s95vg4" />
</bpmn:intermediateCatchEvent>
<bpmn:sequenceFlow id="Flow_0fybakl" sourceRef="Gateway_0lo9ey2" targetRef="ResponseReceivedEvent" />
<bpmn:intermediateCatchEvent id="ErrorReceivedEvent" name="Error received">
<bpmn:incoming>Flow_17w47wv</bpmn:incoming>
<bpmn:outgoing>Flow_0t44whk</bpmn:outgoing>
<bpmn:messageEventDefinition id="MessageEventDefinition_10g3cex" messageRef="Message_058d1q8" />
</bpmn:intermediateCatchEvent>
<bpmn:sequenceFlow id="Flow_17w47wv" sourceRef="Gateway_0lo9ey2" targetRef="ErrorReceivedEvent" />
<bpmn:sequenceFlow id="Flow_0t44whk" sourceRef="ErrorReceivedEvent" targetRef="DeferRetryEvent" />
<bpmn:intermediateCatchEvent id="DeferRetryEvent" name="Defer retry">
<bpmn:incoming>Flow_0t44whk</bpmn:incoming>
<bpmn:outgoing>Flow_0n8m323</bpmn:outgoing>
<bpmn:timerEventDefinition id="TimerEventDefinition_0dz87wj">
<bpmn:timeDuration xsi:type="bpmn:tFormalExpression">PT5S</bpmn:timeDuration>
</bpmn:timerEventDefinition>
</bpmn:intermediateCatchEvent>
<bpmn:sequenceFlow id="Flow_0n8m323" sourceRef="DeferRetryEvent" targetRef="Gateway_1uhx8ty" />
<bpmn:endEvent id="AsyncMessagingDoneEndEvent" name="Async messaging done">
<bpmn:incoming>Flow_0w6kefz</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_0w6kefz" sourceRef="ResponseReceivedEvent" targetRef="AsyncMessagingDoneEndEvent" />
</bpmn:process>
<bpmn:message id="Message_0s95vg4" name="success">
<bpmn:extensionElements>
<zeebe:subscription correlationKey="=callbackId" />
</bpmn:extensionElements>
</bpmn:message>
<bpmn:message id="Message_058d1q8" name="fail">
<bpmn:extensionElements>
<zeebe:subscription correlationKey="=callbackId" />
</bpmn:extensionElements>
</bpmn:message>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="AsyncRetryCounterProcess">
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="AsyncMessagingRequiredStartEvent">
<dc:Bounds x="179" y="99" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="154" y="142" width="87" height="27" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Gateway_1uhx8ty_di" bpmnElement="Gateway_1uhx8ty" isMarkerVisible="true">
<dc:Bounds x="265" y="92" width="50" height="50" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_0p4e2cq_di" bpmnElement="SendMessageTask">
<dc:Bounds x="370" y="77" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Gateway_0093v8l_di" bpmnElement="Gateway_0lo9ey2">
<dc:Bounds x="525" y="92" width="50" height="50" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_0m0k2pk_di" bpmnElement="ResponseReceivedEvent">
<dc:Bounds x="632" y="99" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="625" y="142" width="50" height="27" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_0r7jt4i_di" bpmnElement="ErrorReceivedEvent">
<dc:Bounds x="632" y="212" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="615" y="255" width="70" height="14" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_1dxbo7e_di" bpmnElement="DeferRetryEvent">
<dc:Bounds x="452" y="302" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="444" y="345" width="52" height="14" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_1wyqlll_di" bpmnElement="AsyncMessagingDoneEndEvent">
<dc:Bounds x="732" y="99" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="707" y="142" width="87" height="27" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_1g8wwcp_di" bpmnElement="Flow_1g8wwcp">
<di:waypoint x="215" y="117" />
<di:waypoint x="265" y="117" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0n8m323_di" bpmnElement="Flow_0n8m323">
<di:waypoint x="452" y="320" />
<di:waypoint x="290" y="320" />
<di:waypoint x="290" y="142" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0uebumt_di" bpmnElement="Flow_0uebumt">
<di:waypoint x="315" y="117" />
<di:waypoint x="370" y="117" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0ga8sdi_di" bpmnElement="Flow_0ga8sdi">
<di:waypoint x="470" y="117" />
<di:waypoint x="525" y="117" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0fybakl_di" bpmnElement="Flow_0fybakl">
<di:waypoint x="575" y="117" />
<di:waypoint x="632" y="117" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_17w47wv_di" bpmnElement="Flow_17w47wv">
<di:waypoint x="550" y="142" />
<di:waypoint x="550" y="230" />
<di:waypoint x="632" y="230" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0w6kefz_di" bpmnElement="Flow_0w6kefz">
<di:waypoint x="668" y="117" />
<di:waypoint x="732" y="117" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0t44whk_di" bpmnElement="Flow_0t44whk">
<di:waypoint x="668" y="230" />
<di:waypoint x="730" y="230" />
<di:waypoint x="730" y="320" />
<di:waypoint x="488" y="320" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package com.camunda.consulting;

import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.client.api.response.ProcessInstanceEvent;
import io.camunda.zeebe.process.test.api.ZeebeTestEngine;
import io.camunda.zeebe.process.test.filters.RecordStream;
import io.camunda.zeebe.process.test.filters.StreamFilter;
import io.camunda.zeebe.process.test.inspections.model.InspectedProcessInstance;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.spring.test.ZeebeSpringTest;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.time.Duration;
import java.util.concurrent.TimeoutException;

import static io.camunda.zeebe.process.test.assertions.BpmnAssert.*;
import static io.camunda.zeebe.spring.test.ZeebeTestThreadSupport.*;

@ZeebeSpringTest
@SpringBootTest
public class ProcessTest {
@Autowired
ZeebeClient zeebeClient;
@Autowired
ZeebeTestEngine zeebeTestEngine;

@Test
void shouldRetryThreeTimes() throws InterruptedException, TimeoutException {
ProcessInstanceEvent process = zeebeClient
.newCreateInstanceCommand()
.bpmnProcessId("AsyncRetryCounterProcess")
.latestVersion()
.variable("callbackId", "123")
.send()
.join();
waitForProcessInstanceHasPassedElement(process, "SendMessageTask");
assertThat(process).hasVariableWithValue("retryCounter", 2);
zeebeClient
.newPublishMessageCommand()
.messageName("fail")
.correlationKey("123")
.send()
.join();
zeebeTestEngine.waitForIdleState(Duration.ofSeconds(10));
zeebeTestEngine.increaseTime(Duration.ofSeconds(5));
waitForProcessInstanceHasPassedElement(new InspectedProcessInstance(process.getProcessInstanceKey()),
"SendMessageTask",
Duration.ofSeconds(10),
2
);
assertThat(process).hasVariableWithValue("retryCounter", 1);
zeebeClient
.newPublishMessageCommand()
.messageName("fail")
.correlationKey("123")
.send()
.join();
zeebeTestEngine.waitForIdleState(Duration.ofSeconds(10));
zeebeTestEngine.increaseTime(Duration.ofSeconds(5));
waitForProcessInstanceHasPassedElement(new InspectedProcessInstance(process.getProcessInstanceKey()),
"SendMessageTask",
Duration.ofSeconds(10),
3
);
assertThat(process).hasVariableWithValue("retryCounter", 0);
zeebeClient
.newPublishMessageCommand()
.messageName("fail")
.correlationKey("123")
.send()
.join();
zeebeTestEngine.waitForIdleState(Duration.ofSeconds(10));
zeebeTestEngine.increaseTime(Duration.ofSeconds(5));
waitForProcessInstanceHasPassedElement(new InspectedProcessInstance(process.getProcessInstanceKey()),
"Gateway_1uhx8ty",
Duration.ofSeconds(10),
4
);
zeebeTestEngine.waitForIdleState(Duration.ofSeconds(10));
long incidentKey = assertThat(process)
.hasAnyIncidents()
.extractingLatestIncident()
.getIncidentKey();
long jobKey = StreamFilter
.jobRecords(RecordStream.of(zeebeTestEngine.getRecordStreamSource()))
.withIntent(JobIntent.FAILED)
.stream()
.findFirst()
.get()
.getKey();
zeebeClient
.newUpdateRetriesCommand(jobKey)
.retries(1)
.send()
.join();
zeebeClient
.newResolveIncidentCommand(incidentKey)
.send()
.join();
zeebeClient
.newPublishMessageCommand()
.messageName("success")
.correlationKey("123")
.send()
.join();
waitForProcessInstanceCompleted(process);
}
}