Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new example: Event processing #567

Merged
merged 4 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,15 @@ jobs:
- name: Build with Maven
run: mvn verify -PcheckFormat -B
working-directory: element-template-generation
build-event-processing:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up JDK
uses: actions/setup-java@v4
with:
java-version: '21'
distribution: 'temurin'
- name: Build with Maven
run: mvn verify -PcheckFormat -B
working-directory: event-processing
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ target
**/.project
**/.settings
**/.factorypath
.DS_Store
2 changes: 2 additions & 0 deletions event-processing/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
events.mv.db
events.trace.db
102 changes: 102 additions & 0 deletions event-processing/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Event processing

## Scenario

This example shows how you can achieve consistency using this architecture:

![img.png](docs/img.png)

An event is received via event system (RabbitMQ, Kafka, ...) and should first be saved to the business systems' database (a relational database in this case).

Then, the processing should start.

### The easy way

![img.png](docs/img2.png)

These 3 steps cannot happen in a transactional way (as Camunda does not support transaction contexts, and maybe some event system does not as well).

Now you can of course implement it like this and accept the risk that an event is read from the event system, saved to the database, but not processed.

### The correct way

![img.png](docs/img3.png)

While the likelihood that errors happen here is really low (most runtimes prevent shutdown while threads are active), it can still occur (SIGTERM or power outage).

The same architecture can have an event handled in 3 independent steps:

1. The event is read from the event system and saved to the database in the initial state (`CREATED`)
2. An event in the initial state is read from the database and published to the process engine, then its state is updated to `PUBLISHING`. The same step can happen to events being in this state for too long (coordinated with the message timeout)
3. The published event triggers a callback hook that sets the events' state to `PUBLISHED`, confirming that the event has been captured by the process engine and is now being processed.

All of these steps are idempotent, so they can be implemented with an at-least-once logic. This means that:

1. The read from the event system can happen multiple times without changing the event that has been captured once
2. The publishing of the event can be repeated as it relies on zeebe messaging pattern
3. The acknowledgement of the publishing can be repeated as this is the last state change happening to the event

### Implementation

This implementation simulates the event system with a rest api that allows for submitting events:

```
POST /events

{
"id": "123",
"name": "random",
"content": {
"foo": "baz"
}
}
>>>
{
"id": "123"
}
```

* `id`: Each event consists of an id. This is the unique identifier for a specific event. An event with an `id` can only be saved once, allowing for idempotency. This id will be used as correlation key and message id when publishing a message to zeebe. On top, it is set as variable `eventId`
* `name`: Each event has a name that will be used as message name when publishing a message to zeebe.
* `content`: Finally, each event has a content that is a JSON object without fixed schema. This is set as variable `content`

A message event that should be able to work with the event processor should:

* define a message name that matches the events' name
* if not a start event, define a correlation key that matches the events' id
* have an end listener with the job type `correlated`

Now, an event can be published and its state can be tracked with the rest api:

```
GET /events/{event-id}

>>>
{
"id": "123",
"name": "random",
"content": {
"foo": "baz"
},
"state": {
"name": "PUBLISHED",
"createdAt": "2024-12-12T14:51:28.945495+01:00",
"publishingAt": "2024-12-12T14:51:29.040717+01:00",
"publishedAt": "2024-12-12T14:51:29.140717+01:00"
}
}
```

### Setup

Prerequisities:

Only a running zeebe engine is required. By default, the zeebe client points to the gateway running on port 26500, so you use either [C8Run](https://docs.camunda.io/docs/self-managed/setup/deploy/local/c8run/) or the [docker-compose](https://docs.camunda.io/docs/self-managed/setup/deploy/local/docker-compose/) setup.

To run the example, just execute:

```shell
mvn spring-boot:run
```

To connect against another cluster (saas for example), you can adjust the application.yaml according to [this guide](https://docs.camunda.io/docs/apis-tools/spring-zeebe-sdk/getting-started/).
Binary file added event-processing/docs/img.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added event-processing/docs/img2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added event-processing/docs/img3.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
188 changes: 188 additions & 0 deletions event-processing/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>io.camunda</groupId>
<artifactId>event-processing</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<version.camunda>8.6.6</version.camunda>
<version.spring-boot>3.4.0</version.spring-boot>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${version.spring-boot}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>io.camunda</groupId>
<artifactId>zeebe-bom</artifactId>
<version>${version.camunda}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>io.camunda</groupId>
<artifactId>spring-boot-starter-camunda-sdk</artifactId>
<version>${version.camunda}</version>
</dependency>
<dependency>
<groupId>io.camunda</groupId>
<artifactId>camunda-process-test-spring</artifactId>
<version>${version.camunda}</version>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>io.camunda</groupId>
<artifactId>spring-boot-starter-camunda-sdk</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
</dependency>
<dependency>
<groupId>io.camunda</groupId>
<artifactId>camunda-process-test-spring</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.13.0</version>
<configuration>
<compilerArgs>
<arg>-parameters</arg>
</compilerArgs>
</configuration>
</plugin>
<plugin>
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
<version>2.43.0</version>
<configuration>
<formats>
<format>
<includes>
<include>*.md</include>
<include>.gitignore</include>
</includes>
<trimTrailingWhitespace/>
<endWithNewline/>
<indent>
<spaces>true</spaces>
<spacesPerTab>2</spacesPerTab>
</indent>
</format>
</formats>
<java>
<googleJavaFormat/>
</java>
<pom/>
</configuration>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.5.2</version>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${version.spring-boot}</version>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
<!-- profile to auto format -->
<profile>
<id>autoFormat</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<build>
<plugins>
<plugin>
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
<executions>
<execution>
<id>spotless-format</id>
<goals>
<goal>apply</goal>
</goals>
<phase>process-sources</phase>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>

<!-- profile to perform strict validation checks -->
<profile>
<id>checkFormat</id>
<build>
<plugins>
<plugin>
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
<executions>
<execution>
<id>spotless-check</id>
<goals>
<goal>check</goal>
</goals>
<phase>validate</phase>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.camunda.consulting.eventprocessing;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.camunda.consulting.eventprocessing;

import static com.camunda.consulting.eventprocessing.Mapper.*;

import com.camunda.consulting.eventprocessing.EventService.Event;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.time.OffsetDateTime;
import java.util.Optional;
import org.jetbrains.annotations.NotNull;
import org.springframework.data.domain.Pageable;
import org.springframework.data.web.PagedModel;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/events")
public class EventController {
private final EventService eventService;

public EventController(EventService eventService) {
this.eventService = eventService;
}

@PostMapping
public ResponseEntity<CreateEventResponse> createEvent(@RequestBody CreateEventRequest request) {
return ResponseEntity.ok(map(eventService.saveEvent(map(request))));
}

@GetMapping
public ResponseEntity<PagedModel<GetEventResponse>> getEvents(Pageable pageable) {
return ResponseEntity.ok(new PagedModel<>(eventService.getEvents(pageable).map(Mapper::map)));
}

@GetMapping("/{eventId}")
public ResponseEntity<?> getEvent(@PathVariable String eventId) {
Optional<Event> event = eventService.getEvent(eventId);
if (event.isPresent()) {
GetEventResponse response = map(event.get());
return ResponseEntity.ok(response);
}
return ResponseEntity.notFound().build();
}

public record CreateEventRequest(@NotNull String id, String name, ObjectNode content) {}

public record CreateEventResponse(String id) {}

public record GetEventResponse(String id, String name, JsonNode content, State state) {
public record State(
StateName name,
OffsetDateTime createdAt,
OffsetDateTime publishingAt,
OffsetDateTime publishedAt) {
public enum StateName {
CREATED,
PUBLISHING,
PUBLISHED
}
}
}
}
Loading
Loading