diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 29b2147c..0b0315e0 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -147,3 +147,15 @@ jobs: - name: Build with Maven run: mvn verify -PcheckFormat -B working-directory: element-template-generation + build-event-processing: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Set up JDK + uses: actions/setup-java@v4 + with: + java-version: '21' + distribution: 'temurin' + - name: Build with Maven + run: mvn verify -PcheckFormat -B + working-directory: event-processing diff --git a/.gitignore b/.gitignore index a27d3c77..0795d691 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ target **/.project **/.settings **/.factorypath +.DS_Store \ No newline at end of file diff --git a/event-processing/.gitignore b/event-processing/.gitignore new file mode 100644 index 00000000..d78abf23 --- /dev/null +++ b/event-processing/.gitignore @@ -0,0 +1,2 @@ +events.mv.db +events.trace.db diff --git a/event-processing/README.md b/event-processing/README.md new file mode 100644 index 00000000..34bd2897 --- /dev/null +++ b/event-processing/README.md @@ -0,0 +1,108 @@ +# Event processing + +## Scenario + +This example shows how you can achieve consistency using this architecture: + +![img.png](docs/img.png) + +An event is received via event system (RabbitMQ, Kafka, ...) and should first be saved to the business systems' database (a relational database in this case). + +Then, the processing should start. + +### The easy way + +![img.png](docs/img2.png) + +These 3 steps cannot happen in a transactional way (as Camunda does not support transaction contexts, and maybe some event system does not as well). + +Now you can of course implement it like this and accept the risk that an event is read from the event system, saved to the database, but not processed. + +### The correct way + +![img.png](docs/img3.png) + +While the likelihood that errors happen here is really low (most runtimes prevent shutdown while threads are active), it can still occur (SIGTERM or power outage). + +The same architecture can have an event handled in 3 independent steps: + +1. The event is read from the event system and saved to the database in the initial state (`CREATED`) +2. An event in the initial state is read from the database and published to the process engine, then its state is updated to `PUBLISHING`. The same step can happen to events being in this state for too long (coordinated with the message timeout) +3. The published event triggers a callback hook that sets the events' state to `PUBLISHED`, confirming that the event has been captured by the process engine and is now being processed. + +All of these steps are idempotent, so they can be implemented with an at-least-once logic. This means that: + +1. The read from the event system can happen multiple times without changing the event that has been captured once +2. The publishing of the event can be repeated as it relies on zeebe messaging pattern +3. The acknowledgement of the publishing can be repeated as this is the last state change happening to the event + +### Implementation + +This implementation simulates the event system with a rest api that allows for submitting events: + +``` +POST /events + +{ + "id": "123", + "name": "random", + "content": { + "foo": "baz" + } +} +>>> +{ + "id": "123" +} +``` + +* `id`: Each event consists of an id. This is the unique identifier for a specific event. An event with an `id` can only be saved once, allowing for idempotency. This id will be used as correlation key and message id when publishing a message to zeebe. On top, it is set as variable `eventId` +* `name`: Each event has a name that will be used as message name when publishing a message to zeebe. +* `content`: Finally, each event has a content that is a JSON object without fixed schema. This is set as variable `content` + +A message event that should be able to work with the event processor should: + +* define a message name that matches the events' name +* if not a start event, define a correlation key that matches the events' id +* have an end listener or a following service task with the job type `correlated` + +Now, an event can be published and its state can be tracked with the rest api: + +``` +GET /events/{event-id} + +>>> +{ + "id": "123", + "name": "random", + "content": { + "foo": "baz" + }, + "state": { + "name": "PUBLISHED", + "createdAt": "2024-12-12T14:51:28.945495+01:00", + "publishingAt": "2024-12-12T14:51:29.040717+01:00", + "publishedAt": "2024-12-12T14:51:29.140717+01:00" + } +} +``` + +### Setup + +Prerequisities: + +Only a running zeebe engine is required. By default, the zeebe client points to the gateway running on port 26500, so you use either [C8Run](https://docs.camunda.io/docs/self-managed/setup/deploy/local/c8run/) or the [docker-compose](https://docs.camunda.io/docs/self-managed/setup/deploy/local/docker-compose/) setup. + +To run the example, just execute: + +```shell +mvn spring-boot:run +``` + +To connect against another cluster (saas for example), you can adjust the application.yaml according to [this guide](https://docs.camunda.io/docs/apis-tools/spring-zeebe-sdk/getting-started/). + +There is an example [process](./src/test/resources/eventHandler.bpmn) and [request](./src/test/resources/createEventRequest.json) available in the tests. + +Just deploy the process and send a request to publish an event as documented above. + +![img.png](docs/img4.png) diff --git a/event-processing/docs/img.png b/event-processing/docs/img.png new file mode 100644 index 00000000..803cc680 Binary files /dev/null and b/event-processing/docs/img.png differ diff --git a/event-processing/docs/img2.png b/event-processing/docs/img2.png new file mode 100644 index 00000000..d3152ef4 Binary files /dev/null and b/event-processing/docs/img2.png differ diff --git a/event-processing/docs/img3.png b/event-processing/docs/img3.png new file mode 100644 index 00000000..c3d3623f Binary files /dev/null and b/event-processing/docs/img3.png differ diff --git a/event-processing/docs/img4.png b/event-processing/docs/img4.png new file mode 100644 index 00000000..ef69244f Binary files /dev/null and b/event-processing/docs/img4.png differ diff --git a/event-processing/pom.xml b/event-processing/pom.xml new file mode 100644 index 00000000..be00085c --- /dev/null +++ b/event-processing/pom.xml @@ -0,0 +1,188 @@ + + + 4.0.0 + + io.camunda + event-processing + 1.0-SNAPSHOT + + + 21 + 21 + UTF-8 + 8.6.6 + 3.4.0 + + + + + org.springframework.boot + spring-boot-dependencies + ${version.spring-boot} + pom + import + + + io.camunda + zeebe-bom + ${version.camunda} + pom + import + + + io.camunda + spring-boot-starter-camunda-sdk + ${version.camunda} + + + io.camunda + camunda-process-test-spring + ${version.camunda} + + + + + + + io.camunda + spring-boot-starter-camunda-sdk + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-data-jpa + + + org.springframework.boot + spring-boot-starter-jdbc + + + com.h2database + h2 + + + io.camunda + camunda-process-test-spring + test + + + org.springframework.boot + spring-boot-starter-test + test + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.13.0 + + + -parameters + + + + + com.diffplug.spotless + spotless-maven-plugin + 2.43.0 + + + + + *.md + .gitignore + + + + + true + 2 + + + + + + + + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + 3.5.2 + + + org.springframework.boot + spring-boot-maven-plugin + ${version.spring-boot} + + + + repackage + + + + + + + + + + autoFormat + + true + + + + + com.diffplug.spotless + spotless-maven-plugin + + + spotless-format + + apply + + process-sources + + + + + + + + + + checkFormat + + + + com.diffplug.spotless + spotless-maven-plugin + + + spotless-check + + check + + validate + + + + + + + + \ No newline at end of file diff --git a/event-processing/src/main/java/com/camunda/consulting/eventprocessing/App.java b/event-processing/src/main/java/com/camunda/consulting/eventprocessing/App.java new file mode 100644 index 00000000..b3611355 --- /dev/null +++ b/event-processing/src/main/java/com/camunda/consulting/eventprocessing/App.java @@ -0,0 +1,11 @@ +package com.camunda.consulting.eventprocessing; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class App { + public static void main(String[] args) { + SpringApplication.run(App.class, args); + } +} diff --git a/event-processing/src/main/java/com/camunda/consulting/eventprocessing/EventController.java b/event-processing/src/main/java/com/camunda/consulting/eventprocessing/EventController.java new file mode 100644 index 00000000..da5f3dcf --- /dev/null +++ b/event-processing/src/main/java/com/camunda/consulting/eventprocessing/EventController.java @@ -0,0 +1,67 @@ +package com.camunda.consulting.eventprocessing; + +import static com.camunda.consulting.eventprocessing.Mapper.*; + +import com.camunda.consulting.eventprocessing.EventService.Event; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.time.OffsetDateTime; +import java.util.Optional; +import org.jetbrains.annotations.NotNull; +import org.springframework.data.domain.Pageable; +import org.springframework.data.web.PagedModel; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequestMapping("/events") +public class EventController { + private final EventService eventService; + + public EventController(EventService eventService) { + this.eventService = eventService; + } + + @PostMapping + public ResponseEntity createEvent(@RequestBody CreateEventRequest request) { + return ResponseEntity.ok(map(eventService.saveEvent(map(request)))); + } + + @GetMapping + public ResponseEntity> getEvents(Pageable pageable) { + return ResponseEntity.ok(new PagedModel<>(eventService.getEvents(pageable).map(Mapper::map))); + } + + @GetMapping("/{eventId}") + public ResponseEntity getEvent(@PathVariable String eventId) { + Optional event = eventService.getEvent(eventId); + if (event.isPresent()) { + GetEventResponse response = map(event.get()); + return ResponseEntity.ok(response); + } + return ResponseEntity.notFound().build(); + } + + public record CreateEventRequest(@NotNull String id, String name, ObjectNode content) {} + + public record CreateEventResponse(String id) {} + + public record GetEventResponse(String id, String name, JsonNode content, State state) { + public record State( + StateName name, + OffsetDateTime createdAt, + OffsetDateTime publishingAt, + OffsetDateTime publishedAt) { + public enum StateName { + CREATED, + PUBLISHING, + PUBLISHED + } + } + } +} diff --git a/event-processing/src/main/java/com/camunda/consulting/eventprocessing/EventProcessor.java b/event-processing/src/main/java/com/camunda/consulting/eventprocessing/EventProcessor.java new file mode 100644 index 00000000..9e1a4c03 --- /dev/null +++ b/event-processing/src/main/java/com/camunda/consulting/eventprocessing/EventProcessor.java @@ -0,0 +1,79 @@ +package com.camunda.consulting.eventprocessing; + +import com.camunda.consulting.eventprocessing.EventService.Event; +import com.camunda.consulting.eventprocessing.EventService.Event.State.StateName; +import io.camunda.zeebe.client.ZeebeClient; +import io.camunda.zeebe.client.api.command.ClientStatusException; +import io.camunda.zeebe.spring.client.annotation.JobWorker; +import io.camunda.zeebe.spring.client.annotation.Variable; +import io.grpc.Status.Code; +import java.time.Duration; +import java.time.OffsetDateTime; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.data.domain.Pageable; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +@Component +@EnableScheduling +public class EventProcessor { + private static final Logger LOG = LoggerFactory.getLogger(EventProcessor.class); + private final EventService eventService; + private final ZeebeClient zeebeClient; + + public EventProcessor(EventService eventService, ZeebeClient zeebeClient) { + this.eventService = eventService; + this.zeebeClient = zeebeClient; + } + + @Scheduled(fixedDelay = 100L) + public void processCreatedEvents() { + eventService + .getEventsWithState(Pageable.ofSize(100), StateName.CREATED) + .forEach(this::processEvent); + } + + @Scheduled(fixedDelay = 60000L) + public void processPublishingEvents() { + eventService + .getEventsWithState(Pageable.ofSize(100), StateName.PUBLISHING) + .filter( + e -> + e.state().publishingAt() == null + || e.state() + .publishingAt() + .isBefore(OffsetDateTime.now().minus(Duration.ofMinutes(10)))) + .forEach(this::processEvent); + } + + void processEvent(Event event) { + event = eventService.updateEventStateToPublishing(event.id()); + LOG.info("Processing event: {}", event); + try { + zeebeClient + .newPublishMessageCommand() + .messageName(event.name()) + .correlationKey(event.id()) + .messageId(event.id()) + .variables(Map.of("eventId", event.id(), "content", event.content())) + .timeToLive(Duration.ofMinutes(8)) + .send() + .join(); + } catch (ClientStatusException exception) { + if (!exception.getStatusCode().equals(Code.ALREADY_EXISTS)) { + throw exception; + } else { + LOG.warn("Event {} already published", event.name()); + } + } + } + + @JobWorker(type = "correlated") + public void handleCorrelatedEvent(@Variable String eventId) { + Event event = eventService.updateEventStateToPublished(eventId); + LOG.info("Event is published: {}", event); + } +} diff --git a/event-processing/src/main/java/com/camunda/consulting/eventprocessing/EventRepository.java b/event-processing/src/main/java/com/camunda/consulting/eventprocessing/EventRepository.java new file mode 100644 index 00000000..7fe750e7 --- /dev/null +++ b/event-processing/src/main/java/com/camunda/consulting/eventprocessing/EventRepository.java @@ -0,0 +1,85 @@ +package com.camunda.consulting.eventprocessing; + +import com.camunda.consulting.eventprocessing.EventRepository.EventEntity; +import jakarta.persistence.Entity; +import jakarta.persistence.Id; +import java.time.OffsetDateTime; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.stereotype.Repository; + +@Repository +public interface EventRepository extends JpaRepository { + + @Entity(name = "EVENT") + class EventEntity { + @Id private String id; + private String name; + private String content; + private State state; + private OffsetDateTime createdAt; + private OffsetDateTime publishingAt; + private OffsetDateTime publishedAt; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getContent() { + return content; + } + + public void setContent(String content) { + this.content = content; + } + + public State getState() { + return state; + } + + public void setState(State state) { + this.state = state; + } + + public OffsetDateTime getCreatedAt() { + return createdAt; + } + + public void setCreatedAt(OffsetDateTime createdAt) { + this.createdAt = createdAt; + } + + public OffsetDateTime getPublishingAt() { + return publishingAt; + } + + public void setPublishingAt(OffsetDateTime publishingAt) { + this.publishingAt = publishingAt; + } + + public OffsetDateTime getPublishedAt() { + return publishedAt; + } + + public void setPublishedAt(OffsetDateTime publishedAt) { + this.publishedAt = publishedAt; + } + + public enum State { + CREATED, + PUBLISHING, + PUBLISHED + } + } +} diff --git a/event-processing/src/main/java/com/camunda/consulting/eventprocessing/EventService.java b/event-processing/src/main/java/com/camunda/consulting/eventprocessing/EventService.java new file mode 100644 index 00000000..f14ad4c1 --- /dev/null +++ b/event-processing/src/main/java/com/camunda/consulting/eventprocessing/EventService.java @@ -0,0 +1,80 @@ +package com.camunda.consulting.eventprocessing; + +import static com.camunda.consulting.eventprocessing.Mapper.*; + +import com.camunda.consulting.eventprocessing.EventRepository.EventEntity; +import com.camunda.consulting.eventprocessing.EventRepository.EventEntity.State; +import com.camunda.consulting.eventprocessing.EventService.Event.State.StateName; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.time.OffsetDateTime; +import java.util.Optional; +import org.springframework.data.domain.Example; +import org.springframework.data.domain.Page; +import org.springframework.data.domain.Pageable; +import org.springframework.stereotype.Service; + +@Service +public class EventService { + private final EventRepository eventRepository; + + public EventService(EventRepository eventRepository) { + this.eventRepository = eventRepository; + } + + public Event saveEvent(UnsavedEvent event) { + return map( + eventRepository.findById(event.id()).orElseGet(() -> eventRepository.save(map(event)))); + } + + public Page getEvents(Pageable pageable) { + return eventRepository.findAll(pageable).map(Mapper::map); + } + + public Page getEventsWithState(Pageable pageable, StateName state) { + return eventRepository.findAll(Example.of(entity(state)), pageable).map(Mapper::map); + } + + public Optional getEvent(String id) { + return eventRepository.findById(id).map(Mapper::map); + } + + private static EventEntity entity(StateName state) { + EventEntity entity = new EventEntity(); + entity.setState(map(state)); + return entity; + } + + public Event updateEventStateToPublishing(String id) { + EventEntity eventEntity = eventRepository.findById(id).orElseThrow(); + eventEntity.setPublishingAt(OffsetDateTime.now()); + + eventEntity.setState(State.PUBLISHING); + eventRepository.save(eventEntity); + return map(eventEntity); + } + + public Event updateEventStateToPublished(String id) { + EventEntity eventEntity = eventRepository.findById(id).orElseThrow(); + eventEntity.setPublishedAt(OffsetDateTime.now()); + + eventEntity.setState(State.PUBLISHED); + eventRepository.save(eventEntity); + return map(eventEntity); + } + + public record UnsavedEvent(String id, String name, ObjectNode content) {} + + public record Event(String id, String name, ObjectNode content, State state) { + public record State( + StateName name, + OffsetDateTime createdAt, + OffsetDateTime publishingAt, + OffsetDateTime publishedAt) { + public enum StateName { + CREATED, + PUBLISHING, + PUBLISHED + } + } + } +} diff --git a/event-processing/src/main/java/com/camunda/consulting/eventprocessing/ExampleServiceCalls.java b/event-processing/src/main/java/com/camunda/consulting/eventprocessing/ExampleServiceCalls.java new file mode 100644 index 00000000..b442fa9b --- /dev/null +++ b/event-processing/src/main/java/com/camunda/consulting/eventprocessing/ExampleServiceCalls.java @@ -0,0 +1,26 @@ +package com.camunda.consulting.eventprocessing; + +import io.camunda.zeebe.spring.client.annotation.JobWorker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +@Component +public class ExampleServiceCalls { + private static final Logger LOG = LoggerFactory.getLogger(ExampleServiceCalls.class); + + @JobWorker + public void service1() { + LOG.info("Service 1 called"); + } + + @JobWorker + public void service2() { + LOG.info("Service 2 called"); + } + + @JobWorker + public void service3() { + LOG.info("Service 3 called"); + } +} diff --git a/event-processing/src/main/java/com/camunda/consulting/eventprocessing/Mapper.java b/event-processing/src/main/java/com/camunda/consulting/eventprocessing/Mapper.java new file mode 100644 index 00000000..53e1df7a --- /dev/null +++ b/event-processing/src/main/java/com/camunda/consulting/eventprocessing/Mapper.java @@ -0,0 +1,188 @@ +package com.camunda.consulting.eventprocessing; + +import com.camunda.consulting.eventprocessing.EventController.CreateEventRequest; +import com.camunda.consulting.eventprocessing.EventController.CreateEventResponse; +import com.camunda.consulting.eventprocessing.EventController.GetEventResponse; +import com.camunda.consulting.eventprocessing.EventRepository.EventEntity; +import com.camunda.consulting.eventprocessing.EventRepository.EventEntity.State; +import com.camunda.consulting.eventprocessing.EventService.Event; +import com.camunda.consulting.eventprocessing.EventService.UnsavedEvent; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.time.OffsetDateTime; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.function.Function; + +/** + * Please ignore this class for the example. + * + *

This is just a generic mapper to map dto - pojo - entity as they are separated + */ +public class Mapper { + private static final Map, Map, Function>> MAPPER_REGISTRY = + new HashMap<>(); + private static final ObjectMapper objectMapper = new ObjectMapper(); + + static { + Arrays.stream(Mapper.class.getDeclaredMethods()) + .filter(m -> Modifier.isStatic(m.getModifiers())) + .filter(m -> Modifier.isPrivate(m.getModifiers())) + .filter(m -> Objects.nonNull(m.getAnnotation(IsMapper.class))) + .forEach(Mapper::process); + } + + private static void process(Method method) { + // only 1 parameter allowed + if (method.getParameterCount() != 1) { + throw new RuntimeException("Method must have exactly one parameter"); + } + Class sourceType = method.getParameterTypes()[0]; + Class targetType = method.getReturnType(); + Map, Function> targetMap = + MAPPER_REGISTRY.computeIfAbsent(sourceType, (c) -> new HashMap<>()); + if (targetMap.containsKey(targetType)) { + throw new RuntimeException( + "There are 2 functions defined for mapping " + sourceType + " to " + targetType); + } + targetMap.put( + targetType, + (s) -> { + try { + return method.invoke(null, s); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException("Error while invoking mapping method", e); + } + }); + } + + @SafeVarargs + public static T map(S event, T... omit) { + Class sourceType = event.getClass(); + Class targetType = omit.getClass().getComponentType(); + if (MAPPER_REGISTRY.containsKey(sourceType)) { + Map, Function> targetMap = MAPPER_REGISTRY.get(sourceType); + if (targetMap.containsKey(targetType)) { + Function function = (Function) targetMap.get(targetType); + return function.apply(event); + } else { + throw new RuntimeException( + "There are no functions defined for mapping " + sourceType + " to " + targetType); + } + } else { + throw new RuntimeException( + "There are no functions defined for mapping " + sourceType + " to " + targetType); + } + } + + @IsMapper + private static EventEntity mapToEventEntity(UnsavedEvent event) { + EventEntity eventEntity = new EventEntity(); + eventEntity.setId(event.id()); + eventEntity.setName(event.name()); + eventEntity.setContent(mapToString(event.content())); + eventEntity.setState(State.CREATED); + eventEntity.setCreatedAt(OffsetDateTime.now()); + return eventEntity; + } + + @IsMapper + private static String mapToString(JsonNode node) { + try { + return objectMapper.writeValueAsString(node); + } catch (JsonProcessingException e) { + throw new RuntimeException("Error while writing json node to string", e); + } + } + + @IsMapper + private static ObjectNode mapToJsonNode(String json) { + try { + return (ObjectNode) objectMapper.readTree(json); + } catch (JsonProcessingException e) { + throw new RuntimeException("Error while reading string to json node", e); + } + } + + @IsMapper + private static Event mapToEvent(EventEntity eventEntity) { + return new Event( + eventEntity.getId(), + eventEntity.getName(), + mapToJsonNode(eventEntity.getContent()), + mapToState(eventEntity)); + } + + @IsMapper + private static Event.State mapToState(EventEntity eventEntity) { + return new Event.State( + mapToStateName(eventEntity.getState()), + eventEntity.getCreatedAt(), + eventEntity.getPublishingAt(), + eventEntity.getPublishedAt()); + } + + @IsMapper + private static Event.State.StateName mapToStateName(EventEntity.State state) { + return switch (state) { + case CREATED -> Event.State.StateName.CREATED; + case PUBLISHED -> Event.State.StateName.PUBLISHED; + case PUBLISHING -> Event.State.StateName.PUBLISHING; + case null -> null; + }; + } + + @IsMapper + private static EventEntity.State mapToState(Event.State.StateName stateName) { + return switch (stateName) { + case CREATED -> EventEntity.State.CREATED; + case PUBLISHED -> EventEntity.State.PUBLISHED; + case PUBLISHING -> EventEntity.State.PUBLISHING; + case null -> null; + }; + } + + @IsMapper + private static UnsavedEvent mapToUnsavedEvent(CreateEventRequest event) { + return new UnsavedEvent(event.id(), event.name(), event.content()); + } + + @IsMapper + private static CreateEventResponse mapToCreatedEventResponse(Event event) { + return new CreateEventResponse(event.id()); + } + + @IsMapper + private static GetEventResponse mapToGetEventResponse(Event event) { + return new GetEventResponse( + event.id(), event.name(), event.content(), mapToState(event.state())); + } + + @IsMapper + private static GetEventResponse.State mapToState(Event.State event) { + return new GetEventResponse.State( + mapToStateName(event.name()), event.createdAt(), event.publishingAt(), event.publishedAt()); + } + + @IsMapper + private static GetEventResponse.State.StateName mapToStateName(Event.State.StateName state) { + return switch (state) { + case CREATED -> GetEventResponse.State.StateName.CREATED; + case PUBLISHED -> GetEventResponse.State.StateName.PUBLISHED; + case PUBLISHING -> GetEventResponse.State.StateName.PUBLISHING; + case null -> null; + }; + } + + @Retention(RetentionPolicy.RUNTIME) + private @interface IsMapper {} +} diff --git a/event-processing/src/main/resources/application.yaml b/event-processing/src/main/resources/application.yaml new file mode 100644 index 00000000..8b7232a0 --- /dev/null +++ b/event-processing/src/main/resources/application.yaml @@ -0,0 +1,8 @@ +spring: + datasource: + url: jdbc:h2:file:./events + jpa: + generate-ddl: true +camunda: + client: + mode: selfmanaged \ No newline at end of file diff --git a/event-processing/src/test/java/com/camunda/consulting/eventprocessing/ProcessTest.java b/event-processing/src/test/java/com/camunda/consulting/eventprocessing/ProcessTest.java new file mode 100644 index 00000000..7247f529 --- /dev/null +++ b/event-processing/src/test/java/com/camunda/consulting/eventprocessing/ProcessTest.java @@ -0,0 +1,60 @@ +package com.camunda.consulting.eventprocessing; + +import static org.assertj.core.api.Assertions.*; + +import com.camunda.consulting.eventprocessing.EventController.CreateEventRequest; +import com.camunda.consulting.eventprocessing.EventController.CreateEventResponse; +import com.camunda.consulting.eventprocessing.EventService.Event.State.StateName; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.camunda.process.test.api.CamundaSpringProcessTest; +import io.camunda.zeebe.client.ZeebeClient; +import java.io.IOException; +import java.io.InputStream; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; + +@SpringBootTest +@CamundaSpringProcessTest +public class ProcessTest { + @Autowired ZeebeClient zeebeClient; + + @Autowired EventService eventService; + + @Autowired EventController eventController; + + private static CreateEventRequest loadCreateEventRequest() { + try (InputStream stream = + ProcessTest.class.getClassLoader().getResourceAsStream("createEventRequest.json")) { + + return new ObjectMapper().readValue(stream, CreateEventRequest.class); + } catch (IOException e) { + throw new RuntimeException("Error while loading unsaved event", e); + } + } + + @BeforeEach + void deployProcess() { + zeebeClient + .newDeployResourceCommand() + .addResourceFromClasspath("eventHandler.bpmn") + .send() + .join(); + } + + @Test + void shouldPublishEvent() { + // when + CreateEventResponse event = eventController.createEvent(loadCreateEventRequest()).getBody(); + // then + Awaitility.await() + .untilAsserted( + () -> + assertThat(eventService.getEvent(event.id())) + .isPresent() + .get() + .matches(e -> e.state().name().equals(StateName.PUBLISHED))); + } +} diff --git a/event-processing/src/test/resources/application.yaml b/event-processing/src/test/resources/application.yaml new file mode 100644 index 00000000..bbe10c53 --- /dev/null +++ b/event-processing/src/test/resources/application.yaml @@ -0,0 +1,5 @@ +spring: + datasource: + url: jdbc:h2:mem:test + jpa: + generate-ddl: true \ No newline at end of file diff --git a/event-processing/src/test/resources/createEventRequest.json b/event-processing/src/test/resources/createEventRequest.json new file mode 100644 index 00000000..410b9ba3 --- /dev/null +++ b/event-processing/src/test/resources/createEventRequest.json @@ -0,0 +1,7 @@ +{ + "id": "123", + "name": "random", + "content": { + "foo": "baz" + } +} \ No newline at end of file diff --git a/event-processing/src/test/resources/eventHandler.bpmn b/event-processing/src/test/resources/eventHandler.bpmn new file mode 100644 index 00000000..b8c9695c --- /dev/null +++ b/event-processing/src/test/resources/eventHandler.bpmn @@ -0,0 +1,98 @@ + + + + + + + + + + Flow_07icx7o + + + + Flow_0zg3kkg + + + + + + + + Flow_07icx7o + Flow_0xz040e + + + + + + Flow_0xz040e + Flow_01qhw0g + + + + + + + Flow_01qhw0g + Flow_0zg3kkg + + + + The published ack happens as end listener, can also be a separate service task + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +