From 3940a5aeb1b718da3e272bdbce8cc20eebcb7074 Mon Sep 17 00:00:00 2001 From: kevgliss Date: Mon, 2 Oct 2023 14:05:55 -0700 Subject: [PATCH] Enhancement/clearer signal consume msg (#3828) * Adds a message to the signal consume cli * Adding multi process --- src/dispatch/cli.py | 50 ++++++++++++++++++++++++--------------------- 1 file changed, 27 insertions(+), 23 deletions(-) diff --git a/src/dispatch/cli.py b/src/dispatch/cli.py index 0675e157cbec..1b11c602b563 100644 --- a/src/dispatch/cli.py +++ b/src/dispatch/cli.py @@ -773,38 +773,42 @@ def signals_group(): @signals_group.command("consume") -@click.argument("organization") -@click.argument("project") -@click.argument("plugin_name") -def consume_signals(organization, project, plugin_name): # TODO support multiple from one command +def consume_signals(): """Runs a continuous process that consumes signals from the specified plugin.""" + import concurrent.futures + from dispatch.common.utils.cli import install_plugins - from dispatch.database.core import refetch_db_session from dispatch.project import service as project_service - from dispatch.project.models import ProjectRead from dispatch.plugin import service as plugin_service - install_plugins() - - session = refetch_db_session(organization) + from dispatch.organization.service import get_all as get_all_organizations + from dispatch.database.core import SessionLocal, engine, sessionmaker - project = project_service.get_by_name_or_raise( - db_session=session, project_in=ProjectRead(name=project) - ) + install_plugins() + organizations = get_all_organizations(db_session=SessionLocal()) + for organization in organizations: + schema_engine = engine.execution_options( + schema_translate_map={ + None: f"dispatch_organization_{organization.slug}", + } + ) + session = sessionmaker(bind=schema_engine)() - plugins = plugin_service.get_active_instances( - db_session=session, plugin_type="signal-consumer", project_id=project.id - ) + projects = project_service.get_all(db_session=session) + for project in projects: + plugins = plugin_service.get_active_instances( + db_session=session, plugin_type="signal-consumer", project_id=project.id + ) - if not plugins: - log.debug( - "No signals consumed. No signal-consumer plugins enabled. Project: {project.name}. Organization: {project.organization.name}" - ) - return + if not plugins: + log.warning( + f"No signals consumed. No signal-consumer plugins enabled. Project: {project.name}. Organization: {project.organization.name}" + ) - for plugin in plugins: - if plugin.plugin.slug == plugin_name: - plugin.instance.consume(db_session=session, project=project) + for plugin in plugins: + log.debug(f"Consuming signals for plugin {plugin.plugin.slug}") + with concurrent.futures.ProcessPoolExecutor() as executor: + executor.submit(plugin.instance.consume, session, project) @signals_group.command("process")