Skip to content

Commit

Permalink
feat: introduce Event Hub and Django Q for asynchronous events / mess…
Browse files Browse the repository at this point in the history
…aging, improve LLM integration and UX of ProductTree creation. Update Makefile to automatically run workers for async processing.
  • Loading branch information
adrianmcphee committed Nov 20, 2024
1 parent 7ff50e8 commit 5a7d19f
Show file tree
Hide file tree
Showing 18 changed files with 802 additions and 130 deletions.
8 changes: 6 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ MANAGE = python manage.py

help:
@echo "help -- Print this help showing all commands. "
@echo "run -- run the django development server "
@echo "run -- run the django development server and qcluster"
@echo "test -- run all tests "
@echo "cov -- run all tests with coverage "
@echo "cov_html -- run all tests with html coverage "
Expand All @@ -13,6 +13,7 @@ help:
@echo "dumpdata -- Backup the data from the running django app "
@echo "tailwindcss -- Generate Tailwindcss "
@echo "docs -- Update schema.json and generate documentation "
@echo "qcluster -- run only the Django Q cluster "

.PHONY: docs
docs:
Expand All @@ -25,7 +26,7 @@ rmpyc:
find . | grep -E "__pycache__|\.pyc|\.pyo" | xargs sudo rm -rf

run:
$(MANAGE) runserver
./run.sh

migrate:
$(MANAGE) makemigrations
Expand Down Expand Up @@ -70,3 +71,6 @@ cov:

cov_html:
pytest --cov --cov-report html --cov-fail-under=50

qcluster:
$(MANAGE) qcluster
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Generated by Django 4.2.2 on 2024-11-20 13:44

from django.db import migrations


class Migration(migrations.Migration):

dependencies = [
('product_management', '0060_consolidate_product_fields'),
]

operations = [
migrations.AlterModelOptions(
name='bounty',
options={'ordering': ('-created_at',), 'verbose_name_plural': 'Bounties'},
),
migrations.AlterModelOptions(
name='fileattachment',
options={'verbose_name_plural': 'File Attachments'},
),
migrations.AlterModelOptions(
name='productarea',
options={'verbose_name_plural': 'Product Areas'},
),
migrations.AlterModelOptions(
name='producttree',
options={'verbose_name_plural': 'Product Trees'},
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Generated by Django 4.2.2 on 2024-11-20 13:44

from django.db import migrations


class Migration(migrations.Migration):

dependencies = [
('security', '0013_alter_user_options'),
]

operations = [
migrations.AlterModelOptions(
name='blacklistedusernames',
options={'verbose_name': 'Blacklisted Username', 'verbose_name_plural': 'Blacklisted Usernames'},
),
migrations.AlterModelOptions(
name='organisationpersonroleassignment',
options={'verbose_name_plural': 'Org Role Assignments'},
),
migrations.AlterModelOptions(
name='productroleassignment',
options={'verbose_name_plural': 'Product Role Assignments'},
),
migrations.AlterModelOptions(
name='signinattempt',
options={'verbose_name_plural': 'Sign In Attempts'},
),
migrations.AlterModelOptions(
name='signuprequest',
options={'verbose_name_plural': 'Sign Up Requests'},
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Generated by Django 4.2.2 on 2024-11-20 13:44

from django.db import migrations, models
import django.db.models.deletion


class Migration(migrations.Migration):

dependencies = [
('talent', '0016_fix_bountydeliveryattempt'),
]

operations = [
migrations.AlterModelOptions(
name='bountyclaim',
options={'ordering': ('-created_at',), 'verbose_name_plural': 'Bounty Claims'},
),
migrations.AlterModelOptions(
name='bountydeliveryattempt',
options={'ordering': ('-created_at',), 'verbose_name_plural': 'Product Delivery Attempts'},
),
migrations.AlterModelOptions(
name='personskill',
options={'verbose_name_plural': 'Person Skills'},
),
migrations.AlterField(
model_name='bountyclaim',
name='person',
field=models.ForeignKey(null=True, on_delete=django.db.models.deletion.CASCADE, related_name='bounty_claims', to='talent.person'),
),
migrations.AlterField(
model_name='bountydeliveryattempt',
name='kind',
field=models.CharField(choices=[('New', 'New'), ('Approved', 'Approved'), ('Rejected', 'Rejected')], default='New', max_length=20),
),
migrations.AlterField(
model_name='feedback',
name='provider',
field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='feedback_given', to='talent.person'),
),
migrations.AlterField(
model_name='feedback',
name='recipient',
field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='feedback_received', to='talent.person'),
),
]
3 changes: 2 additions & 1 deletion apps/capabilities/talent/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ class Status(models.TextChoices):
person = models.ForeignKey(
'Person',
on_delete=models.CASCADE,
related_name='bounty_claims'
related_name='bounty_claims',
null=True
)
expected_finish_date = models.DateField(default=date.today)
status = models.CharField(
Expand Down
26 changes: 26 additions & 0 deletions apps/common/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"apps.canopy",
"apps.common",
"apps.flows.challenge_authoring",
"apps.event_hub",
]
THIRD_PARTIES = [
"django_htmx",
Expand All @@ -44,6 +45,7 @@
"tinymce",
"csp",
"widget_tweaks",
"django_q",
]
BUILTIN_APPS = [
"jazzmin",
Expand Down Expand Up @@ -408,3 +410,27 @@ class Media:
GROQ_API_KEY = os.getenv('GROQ_API_KEY')
if not GROQ_API_KEY:
raise ImproperlyConfigured("GROQ_API_KEY is required")

# Event Hub Configuration
EVENT_BUS = {
'BACKEND': 'apps.event_hub.services.backends.django_q.DjangoQBackend',
'LOGGING_ENABLED': True,
'TASK_TIMEOUT': 300, # 5 minutes
'TASK_RETRIES': 3,
'ERROR_CALLBACK': 'apps.common.utils.error_reporting.report_event_bus_error', # optional
}

# Django Q Configuration (using PostgreSQL as broker)
Q_CLUSTER = {
'name': 'OpenUnited',
'workers': 4,
'recycle': 500,
'timeout': 300, # 5 minutes
'retry': 600, # 10 minutes - must be greater than timeout
'compress': True,
'save_limit': 250,
'queue_limit': 500,
'cpu_affinity': 1,
'label': 'Django Q',
'orm': 'default'
}
Empty file added apps/event_hub/__init__.py
Empty file.
9 changes: 9 additions & 0 deletions apps/event_hub/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from django.apps import AppConfig

class EventHubConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'apps.event_hub'

def ready(self):
# Import any signal handlers or event listeners here
pass
16 changes: 16 additions & 0 deletions apps/event_hub/services/backends/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from abc import ABC, abstractmethod
import logging

logger = logging.getLogger(__name__)

class EventBusBackend(ABC):
@abstractmethod
def enqueue_task(self, task_path, *args, **kwargs):
pass

@abstractmethod
def execute_task_sync(self, task_path, *args, **kwargs):
pass

def report_error(self, error, task_info=None):
logger.error(f"Error in task {task_info}: {error}")
109 changes: 109 additions & 0 deletions apps/event_hub/services/backends/django_q.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import logging
from typing import Dict, Callable
from django_q.tasks import async_task
from ..event_bus import EventBusBackend
from django.conf import settings
from django.utils.module_loading import import_string

logger = logging.getLogger(__name__)

def execute_listener(listener_module: str, listener_name: str, payload: Dict) -> None:
"""
Execute a listener function by importing it dynamically.
This function needs to be at module level to be pickleable.
"""
logger.info(f"[execute_listener] Starting execution for {listener_module}.{listener_name}")
try:
import importlib
logger.info(f"[execute_listener] Importing module {listener_module}")
module = importlib.import_module(listener_module)

logger.info(f"[execute_listener] Getting function {listener_name}")
listener = getattr(module, listener_name)

logger.info(f"[execute_listener] Executing listener with payload: {payload}")
result = listener(payload)

logger.info(f"[execute_listener] Execution completed with result: {result}")
return result

except Exception as e:
logger.exception(f"[execute_listener] Failed to execute listener: {str(e)}")
raise


class DjangoQBackend(EventBusBackend):
def enqueue_task(self, listener: Callable, payload: Dict) -> None:
"""Enqueue a task to be executed asynchronously"""
try:
logger.info(f"[DjangoQBackend] Enqueueing task for {listener.__name__}")

# Get the module and function name for the listener
listener_module = listener.__module__
listener_name = listener.__name__

logger.info(f"[DjangoQBackend] Module: {listener_module}, Function: {listener_name}")

# Queue the task using the module-level function
task_id = async_task(
'apps.event_hub.services.backends.django_q.execute_listener',
listener_module,
listener_name,
payload,
task_name=f"event.{listener_name}",
hook='apps.event_hub.services.backends.django_q.task_hook',
timeout=getattr(settings, 'EVENT_BUS_TASK_TIMEOUT', 300), # 5 minutes default
retry=getattr(settings, 'EVENT_BUS_TASK_RETRIES', 3)
)

logger.info(f"[DjangoQBackend] Task {task_id} enqueued successfully")

except Exception as e:
logger.exception(f"[DjangoQBackend] Failed to enqueue task: {str(e)}")
raise

def execute_task_sync(self, listener: Callable, payload: Dict) -> None:
"""Execute the listener synchronously"""
try:
logger.info(f"[DjangoQBackend] Executing {listener.__name__} synchronously")
result = listener(payload)
logger.info(f"[DjangoQBackend] Sync execution completed: {result}")

except Exception as e:
logger.exception(f"[DjangoQBackend] Sync execution failed: {str(e)}")
raise

def report_error(self, error: Exception, context: Dict) -> None:
"""Report error to monitoring system"""
error_message = f"Error in Django-Q backend: {str(error)}"

# Add more context to error reporting
error_context = {
"error_type": error.__class__.__name__,
"error_message": str(error),
"context": context,
"backend": "django_q"
}

logger.error(error_message, extra=error_context, exc_info=True)

# Optional: Add custom error reporting (e.g., Sentry)
if hasattr(settings, 'EVENT_BUS_ERROR_CALLBACK'):
try:
error_callback = import_string(settings.EVENT_BUS_ERROR_CALLBACK)
error_callback(error_message, error_context)
except Exception as e:
logger.exception("Failed to execute error callback")


def task_hook(task):
"""Hook that runs after task completion"""
logger.info(f"[task_hook] Task completed: {task.id}")
logger.info(f"[task_hook] Function: {task.func}")
logger.info(f"[task_hook] Args: {task.args}")
logger.info(f"[task_hook] Result: {task.result}")

if task.success:
logger.info("[task_hook] Task succeeded")
else:
logger.error(f"[task_hook] Task failed: {task.result}")
43 changes: 43 additions & 0 deletions apps/event_hub/services/event_bus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from typing import Dict, List, Callable
import logging
from .backends.base import EventBusBackend

logger = logging.getLogger(__name__)

class EventBus:
_instance = None
_initialized = False
_listeners: Dict[str, List[Callable]] = {}

def __new__(cls, backend=None):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance

def __init__(self, backend: EventBusBackend = None):
if not self._initialized:
if backend is None:
raise ValueError("Backend must be provided for EventBus initialization")
self.backend = backend
self._initialized = True

def register_listener(self, event_name: str, listener: Callable) -> None:
if event_name not in self._listeners:
self._listeners[event_name] = []
self._listeners[event_name].append(listener)
logger.debug(f"Registered listener {listener.__name__} for event {event_name}")

def emit_event(self, event_name: str, payload: dict, is_async: bool = True) -> None:
if event_name not in self._listeners:
logger.warning(f"No listeners registered for event {event_name}")
return

for listener in self._listeners[event_name]:
try:
if is_async:
self.backend.enqueue_task(listener, payload)
else:
self.backend.execute_task_sync(listener, payload)
except Exception as e:
logger.error(f"Error processing event {event_name}: {str(e)}")
raise
13 changes: 13 additions & 0 deletions apps/event_hub/services/factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from django.conf import settings
from django.utils.module_loading import import_string
from .event_bus import EventBus

def get_event_bus():
"""
Factory function to get or create an EventBus instance with the configured backend
"""
backend_path = settings.EVENT_BUS['BACKEND']
backend_class = import_string(backend_path)
backend_instance = backend_class()

return EventBus(backend=backend_instance)
Loading

0 comments on commit 5a7d19f

Please sign in to comment.