Skip to content

Commit

Permalink
[Integration][Gitlab] Made event processing sequential to temporarily…
Browse files Browse the repository at this point in the history
… resolve race condition issues (#1217)

# Description

**What**  
- Updated the event handling logic in both `EventHandler` and
`SystemEventHandler` to ensure sequential processing of observers and
hook handlers.
- Removed concurrent execution (`asyncio.gather` and
`asyncio.create_task`) and replaced it with sequential execution.

**Why**  
- The previous implementation processed events concurrently across
multiple observers and hook handlers, which led to race conditions when
shared entities were updated.
- This caused data inconsistency, as concurrent updates overwrote each
other's changes.
- Sequential processing ensures that each observer and handler processes
the event one at a time, maintaining data integrity.

**How**  
1. `EventHandler` Changes:
- Replaced `asyncio.create_task` for observer notifications with a
direct `await` call.
- Ensured observers for a specific event are processed in a sequential
manner.
   
2. `SystemEventHandler` Changes:
- Removed `asyncio.gather` for hook handlers and replaced it with nested
`for` loops.
- Handlers are now invoked sequentially for each client, ensuring
predictable and ordered execution.

## Type of change

Please leave one option from the following and delete the rest:

- [x] Bug fix (non-breaking change which fixes an issue)

<h4> All tests should be run against the port production
environment(using a testing org). </h4>

### Core testing checklist

- [ ] Integration able to create all default resources from scratch
- [ ] Resync finishes successfully
- [ ] Resync able to create entities
- [ ] Resync able to update entities
- [ ] Resync able to detect and delete entities
- [ ] Scheduled resync able to abort existing resync and start a new one
- [ ] Tested with at least 2 integrations from scratch
- [ ] Tested with Kafka and Polling event listeners
- [ ] Tested deletion of entities that don't pass the selector


### Integration testing checklist

- [x] Integration able to create all default resources from scratch
- [x] Resync able to create entities
- [x] Resync able to update entities
- [x] Resync able to detect and delete entities
- [x] Resync finishes successfully
- [ ] If new resource kind is added or updated in the integration, add
example raw data, mapping and expected result to the `examples` folder
in the integration directory.
- [ ] If resource kind is updated, run the integration with the example
data and check if the expected result is achieved
- [ ] If new resource kind is added or updated, validate that
live-events for that resource are working as expected
- [ ] Docs PR link [here](#)

### Preflight checklist

- [ ] Handled rate limiting
- [ ] Handled pagination
- [ ] Implemented the code in async
- [ ] Support Multi account

## Screenshots

Include screenshots from your environment showing how the resources of
the integration will look.

## API Documentation

Provide links to the API documentation used for this integration.

---------

Co-authored-by: Matan <[email protected]>
  • Loading branch information
oiadebayo and matan84 authored Dec 12, 2024
1 parent 32bf474 commit 07e5fc1
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 18 deletions.
9 changes: 9 additions & 0 deletions integrations/gitlab/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

<!-- towncrier release notes start -->

0.2.1 (2024-12-12)
====================

### Bug Fixes

- Updated integration to process hook events sequentially to temporarily resolve race condition issues experienced when multiple processes attempts to update the same entity



0.2.0 (2024-12-11)
====================

Expand Down
42 changes: 25 additions & 17 deletions integrations/gitlab/gitlab_integration/events/event_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,20 @@ async def _notify(self, event_id: str, body: dict[str, Any]) -> None:
)
return
for observer in observers_list:
if asyncio.iscoroutinefunction(observer):
if inspect.ismethod(observer):
handler = observer.__self__.__class__.__name__
logger.debug(
f"Notifying observer: {handler}, for event: {event_id}",
event_id=event_id,
handler=handler,
)
asyncio.create_task(observer(event_id, body)) # type: ignore
try:
if asyncio.iscoroutinefunction(observer):
if inspect.ismethod(observer):
handler = observer.__self__.__class__.__name__
logger.debug(
f"Notifying observer: {handler}, for event: {event_id}",
event_id=event_id,
handler=handler,
)
await observer(event_id, body) # Sequentially call each observer
except Exception as e:
logger.error(
f"Error processing event {event_id} with observer {observer}: {str(e)}"
)


class SystemEventHandler(BaseEventHandler):
Expand All @@ -112,11 +117,14 @@ def add_client(self, client: GitlabService) -> None:
async def _notify(self, event_id: str, body: dict[str, Any]) -> None:
# best effort to notify using all clients, as we don't know which one of the clients have the permission to
# access the project
await asyncio.gather(
*(
hook_handler(client).on_hook(event_id, body)
for client in self._clients
for hook_handler in self._hook_handlers.get(event_id, [])
),
return_exceptions=True,
)
for client in self._clients:
for hook_handler_class in self._hook_handlers.get(event_id, []):
try:
hook_handler_instance = hook_handler_class(client)
await hook_handler_instance.on_hook(
event_id, body
) # Sequentially process handlers
except Exception as e:
logger.error(
f"Error processing event {event_id} with handler {hook_handler_class.__name__} for client {client}: {str(e)}"
)
2 changes: 1 addition & 1 deletion integrations/gitlab/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "gitlab"
version = "0.2.0"
version = "0.2.1"
description = "Gitlab integration for Port using Port-Ocean Framework"
authors = ["Yair Siman-Tov <[email protected]>"]

Expand Down

0 comments on commit 07e5fc1

Please sign in to comment.