diff --git a/src/dispatch/cli.py b/src/dispatch/cli.py index ca2a9609d9ee..ce00226ededd 100644 --- a/src/dispatch/cli.py +++ b/src/dispatch/cli.py @@ -786,7 +786,7 @@ def _run_consume(plugin_slug: str, organization_slug: str, project_id: int, runn ) project = project_service.get(db_session=db_session, project_id=project_id) while True: - if not running["is_running"]: + if not running: break plugin.instance.consume(db_session=db_session, project=project) @@ -795,8 +795,8 @@ def _run_consume(plugin_slug: str, organization_slug: str, project_id: int, runn def consume_signals(): """Runs a continuous process that consumes signals from the specified plugin.""" import time - from multiprocessing import Manager - import multiprocessing + from threading import Thread, Event + import logging import signal @@ -810,55 +810,59 @@ def consume_signals(): install_plugins() organizations = get_all_organizations(db_session=SessionLocal()) - with Manager() as manager: - running = manager.dict() - running["is_running"] = True - workers = [] + log = logging.getLogger(__name__) - for organization in organizations: - schema_engine = engine.execution_options( - schema_translate_map={ - None: f"dispatch_organization_{organization.slug}", - } + # Replace manager dictionary with an Event + running = Event() + running.set() + + workers = [] + + for organization in organizations: + schema_engine = engine.execution_options( + schema_translate_map={ + None: f"dispatch_organization_{organization.slug}", + } + ) + session = sessionmaker(bind=schema_engine)() + + projects = project_service.get_all(db_session=session) + for project in projects: + plugins = plugin_service.get_active_instances( + db_session=session, plugin_type="signal-consumer", project_id=project.id ) - session = sessionmaker(bind=schema_engine)() - projects = project_service.get_all(db_session=session) - for project in projects: - plugins = plugin_service.get_active_instances( - db_session=session, plugin_type="signal-consumer", project_id=project.id + if not plugins: + log.warning( + f"No signals consumed. No signal-consumer plugins enabled. Project: {project.name}. Organization: {project.organization.name}" ) - if not plugins: - log.warning( - f"No signals consumed. No signal-consumer plugins enabled. Project: {project.name}. Organization: {project.organization.name}" + for plugin in plugins: + log.debug(f"Consuming signals for plugin: {plugin.plugin.slug}") + for _ in range(5): # TODO add plugin.instance.concurrency + t = Thread( + target=_run_consume, + args=(plugin.plugin.slug, organization.slug, project.id, running), + daemon=True, # Set thread to daemon ) + t.start() + workers.append(t) + + def terminate_processes(signum, frame): + print("Terminating main process...") + running.clear() # stop all threads + for worker in workers: + worker.join() - for plugin in plugins: - log.debug(f"Consuming signals for plugin: {plugin.plugin.slug}") - for _ in range(5): # TODO add plugin.instance.concurrency: - p = multiprocessing.Process( - target=_run_consume, - args=(plugin.plugin.slug, organization.slug, project.id, running), - ) - p.start() - workers.append(p) - - def terminate_processes(signum, frame): - print("Terminating main process...") - running["is_running"] = False # noqa - for worker in workers: - worker.join() - - signal.signal(signal.SIGINT, terminate_processes) - signal.signal(signal.SIGTERM, terminate_processes) - - # Keep the main thread running - while True: - if not running["is_running"]: - print("Main process terminating.") - break - time.sleep(1) + signal.signal(signal.SIGINT, terminate_processes) + signal.signal(signal.SIGTERM, terminate_processes) + + # Keep the main thread running + while True: + if not running.is_set(): + print("Main process terminating.") + break + time.sleep(1) @signals_group.command("process")