Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

elasticsearch history support #1408

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ disable=
C0415, # import-outside-toplevel
W0718, # broad-exception-caught
R1735, # use-dict-literal
R0917, # too-many-positional-arguments

[BASIC]
good-names=i,e,n,x,logger,tz,db,dt
45 changes: 45 additions & 0 deletions docs/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -540,3 +540,48 @@ Default: None
Sets the URI to which an OAuth 2.0 server redirects the user after successful authentication and authorization.

`oauth2_redirect_uri` option should be used with :ref:`auth`, :ref:`auth_provider`, :ref:`oauth2_key` and :ref:`oauth2_secret` options.

.. _elasticsearch:

elasticsearch
~~~~~~~~~~~~~

Signals the process that it should use elasticsearch for history


.. _elasticsearch_index_bulk_size:

elasticsearch_index_bulk_size
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

How many documents will the elasticsearch indexer allow in a single bulk index API call.

.. _elasticsearch_index_timeout:

elasticsearch_index_timeout
~~~~~~~~~~~~~~~~~~~~~~~~~~~

How long should the background thread wait for the queue to fill up to elasticsearch_index_bulk_size

.. _elasticsearch_day_retention:

elasticsearch_day_retention
~~~~~~~~~~~~~~~~~~~~~~~~~~~

For projects that require data retention management, this will specify how many days can have indexes at once.

So if the value is 21, then any indexes older than 21 days will be deleted. This happens at startup and on day change.

.. _elasticsearch_url:

elasticsearch_url
~~~~~~~~~~~~~~~~~

Which URL is elasticsearch at?

.. _elasticsearch_dashboard:

elasticsearch_dashboard
~~~~~~~~~~~~~~~~~~~~~~~

Will the dashboard initially get its counter values from elasticsearch?
5 changes: 5 additions & 0 deletions examples/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ def echo(msg, timestamp=False):
def error(msg):
raise Exception(msg)

@app.task
def add_chain(x, y, z):
result = (add.s(x, y) | add.s(z)).apply_async()
return result


if __name__ == "__main__":
app.start()
14 changes: 14 additions & 0 deletions flower/__indexer__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from __future__ import absolute_import
from __future__ import print_function
import sys
from celery.bin.celery import main as _main, celery
from flower.command import indexer


def main():
celery.add_command(indexer)
sys.exit(_main())


if __name__ == "__main__":
main()
2 changes: 1 addition & 1 deletion flower/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
VERSION = (2, 0, 0)
VERSION = (2, 0, 0, 1)
__version__ = '.'.join(map(str, VERSION)) + '-dev'
135 changes: 135 additions & 0 deletions flower/api/elasticsearch_history.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
from __future__ import absolute_import

import logging
import typing

from tornado import web
from tornado.web import HTTPError

try:
from elasticsearch import Elasticsearch, TransportError
from elasticsearch_dsl import Search
from elasticsearch_dsl.query import Range, Term
except ImportError:
Elasticsearch = None
TransportError = None
Search = None
Term = None
Range = None



from ..options import options
# need to be able to use satisfies_search_terms first
# from .search import parse_search_terms, satisfies_search_terms
from ..views import BaseHandler

logger = logging.getLogger(__name__)


sort_keys = {'name': str, 'state': str, 'received': float, 'started': float}
sort_key_alias = {'name': 'name', 'state': 'state', 'received': 'received_time', 'started': 'started_time'}


class ElasticSearchHistoryHandler(BaseHandler):
def __init__(self, *args, **kwargs):
elasticsearch_url = options.elasticsearch_url
if elasticsearch_url:
self.es = Elasticsearch([elasticsearch_url, ])
else:
self.es = None

super().__init__(*args, **kwargs)

@web.authenticated
def post(self, index_name: typing.Optional[str]=None):
index_name = index_name or 'task'
try:
self.es.indices.refresh(index_name)
except TransportError as e:
raise HTTPError(400, f'Invalid option: {e}') from e
response = f'Successful refresh on index: {index_name}'
self.write(response)


class AlternativeBackendError(Exception):
pass


def list_tasks_elastic_search(argument_getter):
elasticsearch_url = options.elasticsearch_url

es = Elasticsearch([elasticsearch_url, ])


s = Search(using=es, index='task')
result = []
try:
s = build_search_with_fields(argument_getter, s)
hit_dicts = s.execute().hits.hits
for hit_dict in hit_dicts:
result.append((hit_dict['_id'], hit_dict['_source']))
except TransportError as exc:
logger.warning("Issue querying task API via Elasticsearch", exc_info=True)
raise AlternativeBackendError() from exc
return result

# pylint: disable=too-many-branches,too-many-locals,too-many-arguments
def build_search_with_fields(argument_getter, s):
limit = argument_getter.get_argument('limit', None)
worker = argument_getter.get_argument('workername', None)
task_name = argument_getter.get_argument('taskname', None)
state = argument_getter.get_argument('state', None)
received_start = argument_getter.get_argument('received_start', None)
received_end = argument_getter.get_argument('received_end', None)
sort_by = argument_getter.get_argument('sort_by', None)
# need to be able to use satisfies_search_terms first
# search = argument_getter.get_argument('search', None)
started_start = argument_getter.get_argument('started_start', None)
started_end = argument_getter.get_argument('started_end', None)
root_id = argument_getter.get_argument('root_id', None)
parent_id = argument_getter.get_argument('parent_id', None)
runtime_lt = argument_getter.get_argument('runtime_lt', None)
runtime_gt = argument_getter.get_argument('runtime_gt', None)

limit = limit and int(limit)
worker = worker if worker != 'All' else None
task_name = task_name if task_name != 'All' else None
state = state if state != 'All' else None

# need to be able to use satisfies_search_terms first
# search_terms = parse_search_terms(search or {})
if worker:
s = s.filter(Term(hostname=worker))
if task_name:
s = s.filter(Term(name=task_name))
if state:
s = s.filter(Term(state=state))
if root_id:
s = s.filter(Term(root_id=root_id))
if parent_id:
s = s.filter(Term(parent_id=parent_id))
time_based_filtering_tuples = [("received_time", "gt", received_start), ("received_time", "lt", received_end), ("started_time", "gt", started_start), ("started_time", "lt", started_end)]
for key, comp_key, value in time_based_filtering_tuples:
if value:
s = s.filter(Range(**{key: {comp_key: value}}))

if runtime_lt is not None:
s = s.query(Range(runtime=dict(lt=float(runtime_lt))))
if runtime_gt is not None:
s = s.query(Range(runtime=dict(gt=float(runtime_gt))))
# satisfies_search_terms would be ideal to use -- maybe take the `Hit` logic in task view
# and apply that here so it could do the attr lookup as is.
# if not satisfies_search_terms(task, search_terms):
# continue
if limit is not None:
s = s.extra(size=limit)
if sort_by is not None:
reverse = False
if sort_by.startswith('-'):
sort_by = sort_by.lstrip('-')
reverse = True

if sort_by in sort_keys:
s = s.sort({sort_key_alias.get(sort_by, sort_by): {"order": "desc" if reverse else "asc"}})
return s
52 changes: 39 additions & 13 deletions flower/api/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@
from tornado.ioloop import IOLoop
from tornado.web import HTTPError

try:
from flower.api.elasticsearch_history import AlternativeBackendError
except ImportError:
AlternativeBackendError = None

try:
from flower.api.elasticsearch_history import list_tasks_elastic_search
except ImportError:
list_tasks_elastic_search = None

from ..utils import tasks
from ..utils.broker import Broker
from . import BaseApiHandler
Expand Down Expand Up @@ -405,6 +415,7 @@ async def get(self):


class ListTasks(BaseTaskHandler):
# pylint: disable=too-many-locals
@web.authenticated
def get(self):
"""
Expand Down Expand Up @@ -497,36 +508,51 @@ def get(self):
:statuscode 200: no error
:statuscode 401: unauthorized request
"""
use_es = self.application.options.elasticsearch

app = self.application
limit = self.get_argument('limit', None)
offset = self.get_argument('offset', default=0, type=int)
worker = self.get_argument('workername', None)
type = self.get_argument('taskname', None)
state = self.get_argument('state', None)
use_es = self.get_argument('es', use_es)
received_start = self.get_argument('received_start', None)
received_end = self.get_argument('received_end', None)
sort_by = self.get_argument('sort_by', None)
search = self.get_argument('search', None)
started_start = self.get_argument('started_start', None)
started_end = self.get_argument('started_end', None)
root_id = self.get_argument('root_id', None)
parent_id = self.get_argument('parent_id', None)

limit = limit and int(limit)
offset = max(offset, 0)
worker = worker if worker != 'All' else None
type = type if type != 'All' else None
state = state if state != 'All' else None

result = []
for task_id, task in tasks.iter_tasks(
app.events, limit=limit, offset=offset, sort_by=sort_by, type=type,
worker=worker, state=state,
received_start=received_start,
received_end=received_end,
search=search
):
task = tasks.as_dict(task)
worker = task.pop('worker', None)
if worker is not None:
task['worker'] = worker.hostname
result.append((task_id, task))

if use_es:
try:
result = list_tasks_elastic_search(self)
except AlternativeBackendError:
use_es = False
if not use_es:
for task_id, task in tasks.iter_tasks(
app.events, limit=limit, offset=offset, sort_by=sort_by, type=type,
worker=worker, state=state,
received_start=received_start,
received_end=received_end,
search=search,
started_start=started_start, started_end=started_end,
root_id=root_id, parent_id=parent_id
):
task = tasks.as_dict(task)
worker = task.pop('worker', None)
if worker is not None:
task['worker'] = worker.hostname
result.append((task_id, task))
self.write(OrderedDict(result))


Expand Down
45 changes: 44 additions & 1 deletion flower/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import atexit
import signal
import logging
import time

from pprint import pformat

Expand All @@ -14,6 +15,8 @@
from tornado.log import enable_pretty_logging
from celery.bin.base import CeleryCommand

from flower.indexer_app import IndexerApp
from . import __version__
from .app import Flower
from .urls import settings
from .utils import abs_path, prepend_url, strtobool
Expand Down Expand Up @@ -43,8 +46,16 @@ def flower(ctx, tornado_argv):

extract_settings()
setup_logging()

app = ctx.obj.app
custom_es_setup = True
if custom_es_setup:
app.loader.import_default_modules()
if getattr(app.conf, 'timezone', None):
os.environ['TZ'] = app.conf.timezone
time.tzset()
flower_app = Flower(capp=app, options=options, **settings)


flower_app = Flower(capp=app, options=options, **settings)

atexit.register(flower_app.stop)
Expand Down Expand Up @@ -108,6 +119,7 @@ def warn_about_celery_args_used_in_flower_command(ctx, flower_args):
'Please specify them after celery command instead following this template: '
'celery [celery args] flower [flower args].', incorrectly_used_args
)
logger.debug('Settings: %s', pformat(settings))


def setup_logging():
Expand Down Expand Up @@ -179,3 +191,34 @@ def print_banner(app, ssl):
pformat(sorted(app.tasks.keys()))
)
logger.debug('Settings: %s', pformat(settings))



@click.command(cls=CeleryCommand,
context_settings={
'ignore_unknown_options': True
})
@click.argument("tornado_argv", nargs=-1, type=click.UNPROCESSED)
@click.pass_context
def indexer(ctx, tornado_argv):
"""Tool for alternative task indexing in a Celery cluster."""
warn_about_celery_args_used_in_flower_command(ctx, tornado_argv)
apply_env_options()
apply_options(sys.argv[0], tornado_argv)

extract_settings()
setup_logging()
app = ctx.obj.app

indexer_app = IndexerApp(capp=app, options=options, **settings)

atexit.register(indexer_app.stop)
signal.signal(signal.SIGTERM, sigterm_handler)

if not ctx.obj.quiet:
print_banner(app, 'ssl_options' in settings)

try:
indexer_app.start()
except (KeyboardInterrupt, SystemExit):
pass
Loading
Loading