Skip to content

Commit

Permalink
Use threads instead of processes for better sqlalchemy support
Browse files Browse the repository at this point in the history
  • Loading branch information
kevgliss committed Oct 18, 2023
1 parent 150a624 commit 1ebc86e
Showing 1 changed file with 49 additions and 45 deletions.
94 changes: 49 additions & 45 deletions src/dispatch/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,7 @@ def _run_consume(plugin_slug: str, organization_slug: str, project_id: int, runn
)
project = project_service.get(db_session=db_session, project_id=project_id)
while True:
if not running["is_running"]:
if not running:
break
plugin.instance.consume(db_session=db_session, project=project)

Expand All @@ -795,8 +795,8 @@ def _run_consume(plugin_slug: str, organization_slug: str, project_id: int, runn
def consume_signals():
"""Runs a continuous process that consumes signals from the specified plugin."""
import time
from multiprocessing import Manager
import multiprocessing
from threading import Thread, Event
import logging

import signal

Expand All @@ -810,55 +810,59 @@ def consume_signals():
install_plugins()
organizations = get_all_organizations(db_session=SessionLocal())

with Manager() as manager:
running = manager.dict()
running["is_running"] = True
workers = []
log = logging.getLogger(__name__)

for organization in organizations:
schema_engine = engine.execution_options(
schema_translate_map={
None: f"dispatch_organization_{organization.slug}",
}
# Replace manager dictionary with an Event
running = Event()
running.set()

workers = []

for organization in organizations:
schema_engine = engine.execution_options(
schema_translate_map={
None: f"dispatch_organization_{organization.slug}",
}
)
session = sessionmaker(bind=schema_engine)()

projects = project_service.get_all(db_session=session)
for project in projects:
plugins = plugin_service.get_active_instances(
db_session=session, plugin_type="signal-consumer", project_id=project.id
)
session = sessionmaker(bind=schema_engine)()

projects = project_service.get_all(db_session=session)
for project in projects:
plugins = plugin_service.get_active_instances(
db_session=session, plugin_type="signal-consumer", project_id=project.id
if not plugins:
log.warning(
f"No signals consumed. No signal-consumer plugins enabled. Project: {project.name}. Organization: {project.organization.name}"
)

if not plugins:
log.warning(
f"No signals consumed. No signal-consumer plugins enabled. Project: {project.name}. Organization: {project.organization.name}"
for plugin in plugins:
log.debug(f"Consuming signals for plugin: {plugin.plugin.slug}")
for _ in range(5): # TODO add plugin.instance.concurrency
t = Thread(
target=_run_consume,
args=(plugin.plugin.slug, organization.slug, project.id, running),
daemon=True, # Set thread to daemon
)
t.start()
workers.append(t)

def terminate_processes(signum, frame):
print("Terminating main process...")
running.clear() # stop all threads
for worker in workers:
worker.join()

for plugin in plugins:
log.debug(f"Consuming signals for plugin: {plugin.plugin.slug}")
for _ in range(5): # TODO add plugin.instance.concurrency:
p = multiprocessing.Process(
target=_run_consume,
args=(plugin.plugin.slug, organization.slug, project.id, running),
)
p.start()
workers.append(p)

def terminate_processes(signum, frame):
print("Terminating main process...")
running["is_running"] = False # noqa
for worker in workers:
worker.join()

signal.signal(signal.SIGINT, terminate_processes)
signal.signal(signal.SIGTERM, terminate_processes)

# Keep the main thread running
while True:
if not running["is_running"]:
print("Main process terminating.")
break
time.sleep(1)
signal.signal(signal.SIGINT, terminate_processes)
signal.signal(signal.SIGTERM, terminate_processes)

# Keep the main thread running
while True:
if not running.is_set():
print("Main process terminating.")
break
time.sleep(1)


@signals_group.command("process")
Expand Down

0 comments on commit 1ebc86e

Please sign in to comment.