Skip to content

Commit

Permalink
Enhancement/clearer signal consume msg (#3828)
Browse files Browse the repository at this point in the history
* Adds a message to the signal consume cli

* Adding multi process
  • Loading branch information
kevgliss authored Oct 2, 2023
1 parent beec5de commit 3940a5a
Showing 1 changed file with 27 additions and 23 deletions.
50 changes: 27 additions & 23 deletions src/dispatch/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -773,38 +773,42 @@ def signals_group():


@signals_group.command("consume")
@click.argument("organization")
@click.argument("project")
@click.argument("plugin_name")
def consume_signals(organization, project, plugin_name): # TODO support multiple from one command
def consume_signals():
"""Runs a continuous process that consumes signals from the specified plugin."""
import concurrent.futures

from dispatch.common.utils.cli import install_plugins
from dispatch.database.core import refetch_db_session
from dispatch.project import service as project_service
from dispatch.project.models import ProjectRead
from dispatch.plugin import service as plugin_service

install_plugins()

session = refetch_db_session(organization)
from dispatch.organization.service import get_all as get_all_organizations
from dispatch.database.core import SessionLocal, engine, sessionmaker

project = project_service.get_by_name_or_raise(
db_session=session, project_in=ProjectRead(name=project)
)
install_plugins()
organizations = get_all_organizations(db_session=SessionLocal())
for organization in organizations:
schema_engine = engine.execution_options(
schema_translate_map={
None: f"dispatch_organization_{organization.slug}",
}
)
session = sessionmaker(bind=schema_engine)()

plugins = plugin_service.get_active_instances(
db_session=session, plugin_type="signal-consumer", project_id=project.id
)
projects = project_service.get_all(db_session=session)
for project in projects:
plugins = plugin_service.get_active_instances(
db_session=session, plugin_type="signal-consumer", project_id=project.id
)

if not plugins:
log.debug(
"No signals consumed. No signal-consumer plugins enabled. Project: {project.name}. Organization: {project.organization.name}"
)
return
if not plugins:
log.warning(
f"No signals consumed. No signal-consumer plugins enabled. Project: {project.name}. Organization: {project.organization.name}"
)

for plugin in plugins:
if plugin.plugin.slug == plugin_name:
plugin.instance.consume(db_session=session, project=project)
for plugin in plugins:
log.debug(f"Consuming signals for plugin {plugin.plugin.slug}")
with concurrent.futures.ProcessPoolExecutor() as executor:
executor.submit(plugin.instance.consume, session, project)


@signals_group.command("process")
Expand Down

0 comments on commit 3940a5a

Please sign in to comment.