diff --git a/.pylintrc b/.pylintrc index b40d987d..4d9d6248 100644 --- a/.pylintrc +++ b/.pylintrc @@ -11,6 +11,7 @@ disable= C0415, # import-outside-toplevel W0718, # broad-exception-caught R1735, # use-dict-literal + R0917, # too-many-positional-arguments [BASIC] good-names=i,e,n,x,logger,tz,db,dt diff --git a/docs/config.rst b/docs/config.rst index f613bb94..66017472 100644 --- a/docs/config.rst +++ b/docs/config.rst @@ -540,3 +540,48 @@ Default: None Sets the URI to which an OAuth 2.0 server redirects the user after successful authentication and authorization. `oauth2_redirect_uri` option should be used with :ref:`auth`, :ref:`auth_provider`, :ref:`oauth2_key` and :ref:`oauth2_secret` options. + +.. _elasticsearch: + +elasticsearch +~~~~~~~~~~~~~ + +Signals the process that it should use elasticsearch for history + + +.. _elasticsearch_index_bulk_size: + +elasticsearch_index_bulk_size +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +How many documents will the elasticsearch indexer allow in a single bulk index API call. + +.. _elasticsearch_index_timeout: + +elasticsearch_index_timeout +~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +How long should the background thread wait for the queue to fill up to elasticsearch_index_bulk_size + +.. _elasticsearch_day_retention: + +elasticsearch_day_retention +~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +For projects that require data retention management, this will specify how many days can have indexes at once. + +So if the value is 21, then any indexes older than 21 days will be deleted. This happens at startup and on day change. + +.. _elasticsearch_url: + +elasticsearch_url +~~~~~~~~~~~~~~~~~ + +Which URL is elasticsearch at? + +.. _elasticsearch_dashboard: + +elasticsearch_dashboard +~~~~~~~~~~~~~~~~~~~~~~~ + +Will the dashboard initially get its counter values from elasticsearch? diff --git a/examples/tasks.py b/examples/tasks.py index 94f3af52..277082d9 100644 --- a/examples/tasks.py +++ b/examples/tasks.py @@ -31,6 +31,11 @@ def echo(msg, timestamp=False): def error(msg): raise Exception(msg) +@app.task +def add_chain(x, y, z): + result = (add.s(x, y) | add.s(z)).apply_async() + return result + if __name__ == "__main__": app.start() diff --git a/flower/__indexer__.py b/flower/__indexer__.py new file mode 100644 index 00000000..27af62d0 --- /dev/null +++ b/flower/__indexer__.py @@ -0,0 +1,14 @@ +from __future__ import absolute_import +from __future__ import print_function +import sys +from celery.bin.celery import main as _main, celery +from flower.command import indexer + + +def main(): + celery.add_command(indexer) + sys.exit(_main()) + + +if __name__ == "__main__": + main() diff --git a/flower/__init__.py b/flower/__init__.py index 24c8e6f7..b85e5fb1 100644 --- a/flower/__init__.py +++ b/flower/__init__.py @@ -1,2 +1,2 @@ -VERSION = (2, 0, 0) +VERSION = (2, 0, 0, 1) __version__ = '.'.join(map(str, VERSION)) + '-dev' diff --git a/flower/api/elasticsearch_history.py b/flower/api/elasticsearch_history.py new file mode 100644 index 00000000..932bc46c --- /dev/null +++ b/flower/api/elasticsearch_history.py @@ -0,0 +1,135 @@ +from __future__ import absolute_import + +import logging +import typing + +from tornado import web +from tornado.web import HTTPError + +try: + from elasticsearch import Elasticsearch, TransportError + from elasticsearch_dsl import Search + from elasticsearch_dsl.query import Range, Term +except ImportError: + Elasticsearch = None + TransportError = None + Search = None + Term = None + Range = None + + + +from ..options import options +# need to be able to use satisfies_search_terms first +# from .search import parse_search_terms, satisfies_search_terms +from ..views import BaseHandler + +logger = logging.getLogger(__name__) + + +sort_keys = {'name': str, 'state': str, 'received': float, 'started': float} +sort_key_alias = {'name': 'name', 'state': 'state', 'received': 'received_time', 'started': 'started_time'} + + +class ElasticSearchHistoryHandler(BaseHandler): + def __init__(self, *args, **kwargs): + elasticsearch_url = options.elasticsearch_url + if elasticsearch_url: + self.es = Elasticsearch([elasticsearch_url, ]) + else: + self.es = None + + super().__init__(*args, **kwargs) + + @web.authenticated + def post(self, index_name: typing.Optional[str]=None): + index_name = index_name or 'task' + try: + self.es.indices.refresh(index_name) + except TransportError as e: + raise HTTPError(400, f'Invalid option: {e}') from e + response = f'Successful refresh on index: {index_name}' + self.write(response) + + +class AlternativeBackendError(Exception): + pass + + +def list_tasks_elastic_search(argument_getter): + elasticsearch_url = options.elasticsearch_url + + es = Elasticsearch([elasticsearch_url, ]) + + + s = Search(using=es, index='task') + result = [] + try: + s = build_search_with_fields(argument_getter, s) + hit_dicts = s.execute().hits.hits + for hit_dict in hit_dicts: + result.append((hit_dict['_id'], hit_dict['_source'])) + except TransportError as exc: + logger.warning("Issue querying task API via Elasticsearch", exc_info=True) + raise AlternativeBackendError() from exc + return result + +# pylint: disable=too-many-branches,too-many-locals,too-many-arguments +def build_search_with_fields(argument_getter, s): + limit = argument_getter.get_argument('limit', None) + worker = argument_getter.get_argument('workername', None) + task_name = argument_getter.get_argument('taskname', None) + state = argument_getter.get_argument('state', None) + received_start = argument_getter.get_argument('received_start', None) + received_end = argument_getter.get_argument('received_end', None) + sort_by = argument_getter.get_argument('sort_by', None) + # need to be able to use satisfies_search_terms first + # search = argument_getter.get_argument('search', None) + started_start = argument_getter.get_argument('started_start', None) + started_end = argument_getter.get_argument('started_end', None) + root_id = argument_getter.get_argument('root_id', None) + parent_id = argument_getter.get_argument('parent_id', None) + runtime_lt = argument_getter.get_argument('runtime_lt', None) + runtime_gt = argument_getter.get_argument('runtime_gt', None) + + limit = limit and int(limit) + worker = worker if worker != 'All' else None + task_name = task_name if task_name != 'All' else None + state = state if state != 'All' else None + + # need to be able to use satisfies_search_terms first + # search_terms = parse_search_terms(search or {}) + if worker: + s = s.filter(Term(hostname=worker)) + if task_name: + s = s.filter(Term(name=task_name)) + if state: + s = s.filter(Term(state=state)) + if root_id: + s = s.filter(Term(root_id=root_id)) + if parent_id: + s = s.filter(Term(parent_id=parent_id)) + time_based_filtering_tuples = [("received_time", "gt", received_start), ("received_time", "lt", received_end), ("started_time", "gt", started_start), ("started_time", "lt", started_end)] + for key, comp_key, value in time_based_filtering_tuples: + if value: + s = s.filter(Range(**{key: {comp_key: value}})) + + if runtime_lt is not None: + s = s.query(Range(runtime=dict(lt=float(runtime_lt)))) + if runtime_gt is not None: + s = s.query(Range(runtime=dict(gt=float(runtime_gt)))) + # satisfies_search_terms would be ideal to use -- maybe take the `Hit` logic in task view + # and apply that here so it could do the attr lookup as is. + # if not satisfies_search_terms(task, search_terms): + # continue + if limit is not None: + s = s.extra(size=limit) + if sort_by is not None: + reverse = False + if sort_by.startswith('-'): + sort_by = sort_by.lstrip('-') + reverse = True + + if sort_by in sort_keys: + s = s.sort({sort_key_alias.get(sort_by, sort_by): {"order": "desc" if reverse else "asc"}}) + return s diff --git a/flower/api/tasks.py b/flower/api/tasks.py index 730c290e..6f58ca40 100644 --- a/flower/api/tasks.py +++ b/flower/api/tasks.py @@ -12,6 +12,16 @@ from tornado.ioloop import IOLoop from tornado.web import HTTPError +try: + from flower.api.elasticsearch_history import AlternativeBackendError +except ImportError: + AlternativeBackendError = None + +try: + from flower.api.elasticsearch_history import list_tasks_elastic_search +except ImportError: + list_tasks_elastic_search = None + from ..utils import tasks from ..utils.broker import Broker from . import BaseApiHandler @@ -405,6 +415,7 @@ async def get(self): class ListTasks(BaseTaskHandler): + # pylint: disable=too-many-locals @web.authenticated def get(self): """ @@ -497,36 +508,51 @@ def get(self): :statuscode 200: no error :statuscode 401: unauthorized request """ + use_es = self.application.options.elasticsearch + app = self.application limit = self.get_argument('limit', None) offset = self.get_argument('offset', default=0, type=int) worker = self.get_argument('workername', None) type = self.get_argument('taskname', None) state = self.get_argument('state', None) + use_es = self.get_argument('es', use_es) received_start = self.get_argument('received_start', None) received_end = self.get_argument('received_end', None) sort_by = self.get_argument('sort_by', None) search = self.get_argument('search', None) + started_start = self.get_argument('started_start', None) + started_end = self.get_argument('started_end', None) + root_id = self.get_argument('root_id', None) + parent_id = self.get_argument('parent_id', None) limit = limit and int(limit) offset = max(offset, 0) worker = worker if worker != 'All' else None type = type if type != 'All' else None state = state if state != 'All' else None - result = [] - for task_id, task in tasks.iter_tasks( - app.events, limit=limit, offset=offset, sort_by=sort_by, type=type, - worker=worker, state=state, - received_start=received_start, - received_end=received_end, - search=search - ): - task = tasks.as_dict(task) - worker = task.pop('worker', None) - if worker is not None: - task['worker'] = worker.hostname - result.append((task_id, task)) + + if use_es: + try: + result = list_tasks_elastic_search(self) + except AlternativeBackendError: + use_es = False + if not use_es: + for task_id, task in tasks.iter_tasks( + app.events, limit=limit, offset=offset, sort_by=sort_by, type=type, + worker=worker, state=state, + received_start=received_start, + received_end=received_end, + search=search, + started_start=started_start, started_end=started_end, + root_id=root_id, parent_id=parent_id + ): + task = tasks.as_dict(task) + worker = task.pop('worker', None) + if worker is not None: + task['worker'] = worker.hostname + result.append((task_id, task)) self.write(OrderedDict(result)) diff --git a/flower/command.py b/flower/command.py index 94ed6c7b..d0013919 100644 --- a/flower/command.py +++ b/flower/command.py @@ -3,6 +3,7 @@ import atexit import signal import logging +import time from pprint import pformat @@ -14,6 +15,8 @@ from tornado.log import enable_pretty_logging from celery.bin.base import CeleryCommand +from flower.indexer_app import IndexerApp +from . import __version__ from .app import Flower from .urls import settings from .utils import abs_path, prepend_url, strtobool @@ -43,8 +46,16 @@ def flower(ctx, tornado_argv): extract_settings() setup_logging() - app = ctx.obj.app + custom_es_setup = True + if custom_es_setup: + app.loader.import_default_modules() + if getattr(app.conf, 'timezone', None): + os.environ['TZ'] = app.conf.timezone + time.tzset() + flower_app = Flower(capp=app, options=options, **settings) + + flower_app = Flower(capp=app, options=options, **settings) atexit.register(flower_app.stop) @@ -108,6 +119,7 @@ def warn_about_celery_args_used_in_flower_command(ctx, flower_args): 'Please specify them after celery command instead following this template: ' 'celery [celery args] flower [flower args].', incorrectly_used_args ) + logger.debug('Settings: %s', pformat(settings)) def setup_logging(): @@ -179,3 +191,34 @@ def print_banner(app, ssl): pformat(sorted(app.tasks.keys())) ) logger.debug('Settings: %s', pformat(settings)) + + + +@click.command(cls=CeleryCommand, + context_settings={ + 'ignore_unknown_options': True + }) +@click.argument("tornado_argv", nargs=-1, type=click.UNPROCESSED) +@click.pass_context +def indexer(ctx, tornado_argv): + """Tool for alternative task indexing in a Celery cluster.""" + warn_about_celery_args_used_in_flower_command(ctx, tornado_argv) + apply_env_options() + apply_options(sys.argv[0], tornado_argv) + + extract_settings() + setup_logging() + app = ctx.obj.app + + indexer_app = IndexerApp(capp=app, options=options, **settings) + + atexit.register(indexer_app.stop) + signal.signal(signal.SIGTERM, sigterm_handler) + + if not ctx.obj.quiet: + print_banner(app, 'ssl_options' in settings) + + try: + indexer_app.start() + except (KeyboardInterrupt, SystemExit): + pass diff --git a/flower/elasticsearch_events.py b/flower/elasticsearch_events.py new file mode 100644 index 00000000..69836541 --- /dev/null +++ b/flower/elasticsearch_events.py @@ -0,0 +1,363 @@ +from __future__ import absolute_import, with_statement + +import collections +import json +import logging +import threading +import time +import traceback +from datetime import date, datetime, timedelta +from logging import config + +import pytz + +try: + import elasticsearch + from elasticsearch import (Elasticsearch, ElasticsearchException, + RequestsHttpConnection, TransportError) + from elasticsearch.helpers import bulk +except ImportError: + elasticsearch = None + Elasticsearch = None + RequestsHttpConnection = None + TransportError = None + ElasticsearchException = None + bulk = None + +from celery.events.state import State + +from flower.events import Events + +from .options import options + +try: + from collections import Counter +except ImportError: + from .utils.backports.collections import Counter + + +logger = logging.getLogger(__name__) +try: + import queue +except ImportError: + import Queue as queue + + +LOGGING = { + 'version': 1, + 'disable_existing_loggers': False, + 'formatters': { + 'verbose': { + 'class': 'flower.logging_utils.CeleryOneLineExceptionFormatter', + 'format': '%(levelname)s %(asctime)s %(funcName)s %(module)s %(lineno)d %(message)s' + }, + }, + 'handlers': { + 'task_logger_file': { + 'level': 'DEBUG', + 'class': 'logging.handlers.TimedRotatingFileHandler', + 'filename': 'task_logger.log', + 'formatter': 'verbose', + 'when': 'midnight', + 'interval': 1, + 'backupCount': 30, + 'utc': True, + }, + 'stream': { + 'level': 'DEBUG', + 'class': 'logging.StreamHandler', + 'formatter': 'verbose', + }, + }, + 'loggers': { + 'task_logger': { + 'handlers': ['task_logger_file', 'stream', ], + 'level': 'DEBUG', + 'propagate': False, + }, + }, +} +config.dictConfig(LOGGING) + +logger = logging.getLogger('task_logger') +ELASTICSEARCH_URL = options.elasticsearch_url +ES_INDEX_TIMEOUT = options.elasticsearch_index_timeout +ES_INDEX_BULK_SIZE = options.elasticsearch_index_bulk_size +ES_DAY_RETENTION = options.elasticsearch_day_retention + + +ES_CLIENT = Elasticsearch( + [ELASTICSEARCH_URL], + connection_class=RequestsHttpConnection +) + +INDICES_CLIENT = ES_CLIENT.indices + +def get_index_name(current_date: date) -> str: + return f'task-{current_date.isoformat()}' + +body = { + 'properties': { + 'hostname': {'type': 'keyword', }, + 'worker': {'type': 'keyword', }, + 'clock': {'type': 'integer', }, + 'args': {'type': 'keyword', }, + 'kwargs': {'type': 'keyword', }, + 'timestamp_time': {'type': 'date', }, + 'timestamp': {'type': 'float', }, + 'root_id': {'type': 'keyword', }, + 'root': {'type': 'keyword', }, + 'parent_id': {'type': 'keyword', }, + 'parent': {'type': 'keyword', }, + 'name': {'type': 'keyword', }, + 'result': {'type': 'keyword', }, + 'state': {'type': 'keyword', }, + 'eta': {'type': 'date', }, + 'received': {'type': 'float', }, + 'retries': {'type': 'integer', }, + 'received_time': { + "type": "date", + }, + 'expires': {'type': 'date', }, + 'revoked': {'type': 'float', }, + 'revoked_time': { + "type": "date", + }, + 'retried': {'type': 'float', }, + 'retried_time': { + "type": "date", + }, + 'started': {'type': 'float', }, + 'started_time': { + "type": "date", + }, + 'failed': {'type': 'float', }, + 'failed_time': { + "type": "date", + }, + 'succeeded': {'type': 'float', }, + 'succeeded_time': { + "type": "date", + }, + 'runtime': {'type': 'float', }, + 'info': {'type': 'text', }, + 'traceback': {'type': 'text', }, + 'exception': {'type': 'text', }, + '_fields': {'type': 'keyword', }, + 'children': {'type': 'keyword', }, + } +} + +es_queue = queue.Queue() + + +def es_consumer(): + es_buffer = [] + t = threading.current_thread() + while getattr(t, "do_run", True): + start_time = int(time.time()) + try: + while len(es_buffer) < ES_INDEX_BULK_SIZE: + es_buffer.append(es_queue.get(timeout=ES_INDEX_TIMEOUT)) + es_queue.task_done() + got_task_time = int(time.time()) + if got_task_time - start_time >= ES_INDEX_TIMEOUT: + raise queue.Empty + except queue.Empty: + pass + if es_buffer: + for try_idx in range(5): + # should consider implementing retry logic (outside of what the ES library uses) + try: + bulk(actions=es_buffer, client=ES_CLIENT, stats_only=True) + except (elasticsearch.ConnectionError, elasticsearch.ConnectionTimeout, ): + time.sleep(pow(2, try_idx)) + logger.warning(traceback.format_exc()) + except elasticsearch.helpers.BulkIndexError: + time.sleep(pow(2, try_idx)) + logger.warning(traceback.format_exc()) + break + except Exception: + es_buffer[:] = [] + logger.warning(traceback.format_exc()) + break + else: + es_buffer[:] = [] + break + # Can enable the sleep in case it seems like we're writing into ES too frequently + # time.sleep(0.5) + + +es_thread = threading.Thread(target=es_consumer) +es_thread.daemon = True + + +def send_to_elastic_search(state, event): + # task name is sent only with -received event, and state + # will keep track of this for us. + if not event['type'].startswith('task-'): + return + task = state.tasks.get(event['uuid']) + received_time = task.received + succeeded_time = task.succeeded + start_time = task.started + + # potentially use the sched module to change it via native python logic + current_date = datetime.now(tz=pytz.utc).date() + active_index_name = f'task-{current_date.isoformat()}' + if active_index_name != get_index_name(current_date): + try: + INDICES_CLIENT.create(index=active_index_name) + INDICES_CLIENT.put_alias('task-*', 'task') + INDICES_CLIENT.put_mapping( + doc_type='task', + body=body, + index=active_index_name + ) + except TransportError: + logger.warning("Issue creating or putting alias or mapping: %s", traceback.format_exc()) + else: + try: + deleted = delete_old_elasticsearch_indices(current_date, ES_DAY_RETENTION) + logger.info("Deleted the following older indices from day retention: " + "%s, " + "indices: %s", ES_DAY_RETENTION, deleted) + except ElasticsearchException: + logger.warning("Issue deleting older indices", exc_info=True) + + doc_body = { + 'hostname': task.hostname, + 'worker': task.hostname if task.worker else None, + 'exchange': task.exchange, + 'retries': task.retries, + 'routing_key': task.routing_key, + 'args': task.args, + 'kwargs': task.kwargs, + 'name': task.name, + 'clock': task.clock, + 'children': str(list(task.children)) if task.children is not None else None, + 'expires': task.expires if task.expires else task.expires, + 'eta': task.eta, + 'state': task.state, + 'received': received_time, + 'received_time': datetime.utcfromtimestamp(received_time).replace(tzinfo=pytz.utc) if received_time else None, + 'retried': task.retried, + 'retried_time': datetime.utcfromtimestamp(task.retried).replace(tzinfo=pytz.utc) if task.retried else None, + 'started': start_time, + 'started_time': datetime.utcfromtimestamp(start_time).replace(tzinfo=pytz.utc) if start_time else None, + 'succeeded': succeeded_time, + 'succeeded_time': datetime.utcfromtimestamp(succeeded_time).replace( + tzinfo=pytz.utc) if succeeded_time else None, + 'revoked': task.revoked, + 'revoked_time': datetime.utcfromtimestamp(task.revoked).replace(tzinfo=pytz.utc) if task.revoked else None, + 'failed': task.failed, + 'failed_time': datetime.utcfromtimestamp(task.failed).replace(tzinfo=pytz.utc) if task.failed else None, + 'info': json.dumps(task.info()), + 'result': task.result, + 'root_id': task.root_id, + 'root': str(task.root) if task.root else None, + 'runtime': task.runtime, + 'timestamp': task.timestamp, + 'timestamp_time': datetime.utcfromtimestamp(task.timestamp).replace(tzinfo=pytz.utc) if task.timestamp else None, + 'exception': task.exception, + 'traceback': task.traceback, + 'parent_id': task.parent_id, + 'parent': str(task.parent) if task.parent else None, + '_fields': task._fields, + } + try: + doc_body['_type'] = 'task' + doc_body['_op_type'] = 'index' + doc_body['_index'] = get_index_name(current_date) + doc_body['_id'] = task.uuid + es_queue.put(doc_body) + except Exception: + logger.info( + "%s[%s] worker: %s, received: %s, started: %s, succeeded: %s, info=%s", + name=task.name, + uuid=task.uuid, + worker=task.hostname, + info=task.info(), + received=received_time, + started=start_time, + succeeded=succeeded_time, + ) + + + +def delete_old_elasticsearch_indices(current_date, day_cut_off): + if day_cut_off is None: + return None + date_cut_off = current_date - timedelta(days=day_cut_off) + indices_return = ES_CLIENT.cat.indices(index="task", format="json", h="index") # pylint: disable=unexpected-keyword-arg + indices_return = [ + item["index"] for item in indices_return if + date( + year=int(item["index"].split("-")[1]), + month=int(item["index"].split("-")[2]), + day=int(item["index"].split("-")[3])) + < date_cut_off + ] + if indices_return: + return {"indices": indices_return, "response": ES_CLIENT.indices.delete(index=",".join(indices_return))} + return None + + +class EventsState(State): + # EventsState object is created and accessed only from ioloop thread + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.counter = collections.defaultdict(Counter) + + def event(self, event): + if not event['type'].startswith('task-'): + return + + # Save the event + super().event(event) + send_to_elastic_search(self, event) + + +class IndexerEvents(Events): + events_enable_interval = 5000 + + # pylint: disable=too-many-arguments + def __init__(self, capp, db=None, persistent=False, + enable_events=True, io_loop=None, **kwargs): + + super().__init__(capp=capp, db=db, persistent=persistent, + enable_events=enable_events, io_loop=io_loop, + **kwargs) + try: + INDICES_CLIENT.create(index=get_index_name(datetime.now(tz=pytz.utc).date())) + except TransportError as te: + if te.error in ['index_already_exists_exception', 'resource_already_exists_exception']: + pass + else: + logger.warning("Elastic search occurred, " + "may be bad: %s", traceback.format_exc()) + try: + INDICES_CLIENT.put_mapping( + doc_type='task', + body=body, + index=get_index_name(datetime.now(tz=pytz.utc).date()) + ) + except TransportError: + logger.warning("Elastic search put mapping error (may be bad)", exc_info=True) + try: + if INDICES_CLIENT.exists(index=get_index_name(datetime.now(tz=pytz.utc).date())): + INDICES_CLIENT.put_alias('task-*', 'task') + except TransportError: + logger.warning("Elastic search exists/alias put error", exc_info=True) + + try: + current_date = datetime.now(tz=pytz.utc).date() + deleted = delete_old_elasticsearch_indices(current_date, ES_DAY_RETENTION) + logger.info("Deleted the following older indices from day retention: " + "%s, " + "indices: %s", ES_DAY_RETENTION, deleted) + except ElasticsearchException: + logger.warning("Issue deleting older indices.", exc_info=True) + + self.state = EventsState(**kwargs) diff --git a/flower/events.py b/flower/events.py index cd15d7a2..117a804f 100644 --- a/flower/events.py +++ b/flower/events.py @@ -3,7 +3,7 @@ import shelve import threading import time -from collections import Counter +from collections import defaultdict, Counter from functools import partial from celery.events import EventReceiver @@ -13,6 +13,17 @@ from tornado.ioloop import PeriodicCallback from tornado.options import options + +try: + from elasticsearch import Elasticsearch + from elasticsearch_dsl import Search, MultiSearch + from elasticsearch_dsl.query import Term +except ImportError: + Search = None + Elasticsearch = None + MultiSearch = None + Term = None + logger = logging.getLogger(__name__) PROMETHEUS_METRICS = None @@ -69,10 +80,13 @@ def event(self, event): worker_name = event['hostname'] event_type = event['type'] - self.counter[worker_name][event_type] += 1 + if not self.counter[worker_name] and options.elasticsearch_dashboard is True: + event_type = self.elasticsearch_dashboard_data(worker_name, event_type) + else: + self.counter[worker_name][event_type] += 1 if event_type.startswith('task-'): - task_id = event['uuid'] + task_id = event.get('uuid') task = self.tasks.get(task_id) task_name = event.get('name', '') if not task_name and task_id in self.tasks: @@ -109,6 +123,43 @@ def event(self, event): if event_type == 'worker-offline': self.metrics.worker_online.labels(worker_name).set(0) + # pylint: disable=too-many-locals + def elasticsearch_dashboard_data(self, worker_name, event_type): + elasticsearch_url = options.elasticsearch_url + es = Elasticsearch([elasticsearch_url, ]) + + ms = MultiSearch(using=es, index="task") + s = Search(using=es, index='task') + ms = ms.add(s.filter(Term(state='RECEIVED') & Term(hostname=worker_name)).extra(size=0)) # pylint: disable=no-member + ms = ms.add(s.filter(Term(state='STARTED') & Term(hostname=worker_name)).extra(size=0)) # pylint: disable=no-member + ms = ms.add(s.filter(Term(state='SUCCESS') & Term(hostname=worker_name)).extra(size=0)) # pylint: disable=no-member + ms = ms.add(s.filter(Term(state='FAILED') & Term(hostname=worker_name)).extra(size=0)) # pylint: disable=no-member + ms = ms.add(s.filter(Term(state='RETRIED') & Term(hostname=worker_name)).extra(size=0)) # pylint: disable=no-member + responses = ms.execute() + task_event_keys = ["task-received", "task-started", "task-succeeded", "task-failed", "task-retried"] + tasks_info = defaultdict(int) + for event_type_item, resp in zip(task_event_keys, responses): + tasks_info[event_type_item] += resp.hits.total + processed = tasks_info["task-received"] + started = tasks_info["task-started"] + succeeded = tasks_info["task-succeeded"] + failed = tasks_info["task-failed"] + retried = tasks_info["task-retried"] + self.counter[worker_name]['task-received'] = processed + started + succeeded + failed + retried + self.counter[worker_name]['task-started'] = started + self.counter[worker_name]['task-succeeded'] = succeeded + self.counter[worker_name]['task-retried'] = retried + self.counter[worker_name]['task-failed'] = failed + if not event_type.startswith('task-'): + self.counter[worker_name][event_type] += 1 + return event_type + + # from .elasticsearch_history import send_to_elastic_search + # try: + # send_to_elastic_search(self, event) + # except Exception as e: + # print(e) + class Events(threading.Thread): events_enable_interval = 5000 diff --git a/flower/indexer_app.py b/flower/indexer_app.py new file mode 100644 index 00000000..b965be07 --- /dev/null +++ b/flower/indexer_app.py @@ -0,0 +1,42 @@ +import celery +from tornado import ioloop + +from flower.options import default_options + + +class IndexerApp: + def __init__(self, options=None, capp=None, events=None, io_loop=None, **kwargs): + kwargs.clear() + super().__init__() + self.options = options or default_options + self.io_loop = ioloop.IOLoop.instance() + + self.capp = capp or celery.Celery() + from flower.elasticsearch_events import IndexerEvents, es_thread + self.es_thread = es_thread + self.events = events or IndexerEvents( + self.capp, db=self.options.db, + persistent=self.options.persistent, + enable_events=self.options.enable_events, + io_loop=io_loop or self.io_loop, + max_workers_in_memory=self.options.max_workers, + max_tasks_in_memory=self.options.max_tasks) + self.started = False + + def start(self): + self.events.start() + self.es_thread.start() + self.started = True + self.io_loop.start() + + def stop(self): + if self.started: + self.events.stop() + self.es_thread.do_run = False + self.es_thread.join(timeout=10) + self.started = False + + @property + def transport(self): + return getattr(self.capp.connection().transport, + 'driver_type', None) diff --git a/flower/logging_utils.py b/flower/logging_utils.py new file mode 100644 index 00000000..228dab31 --- /dev/null +++ b/flower/logging_utils.py @@ -0,0 +1,19 @@ +import logging + + +class CeleryOneLineExceptionFormatter(logging.Formatter): + """ + Special logging formatter mostly for Celery, to make exceptions on 1 line. + """ + def formatException(self, ei): + """ + Format an exception so that it prints on a single line. + """ + result = super().formatException(ei) + return repr(result) # or format into one line however you want to + + def format(self, record): + s = super().format(record) + if record.exc_text or record.levelno >= logging.ERROR: + s = s.replace('\n', '') + '|' + return s diff --git a/flower/options.py b/flower/options.py index 083d4b5b..bf5cb981 100644 --- a/flower/options.py +++ b/flower/options.py @@ -68,6 +68,12 @@ define("url_prefix", type=str, help="base url prefix") define("task_runtime_metric_buckets", type=float, default=Histogram.DEFAULT_BUCKETS, multiple=True, help="histogram latency bucket value") +define("elasticsearch", type=bool, default=False, help="Whether to support history lookups with elasticsearch (at least by default)") +define("elasticsearch_index_bulk_size", type=int, default=200, help="Elasticsearch indexer size for bulk indexing") +define("elasticsearch_index_timeout", type=int, default=10, help="Elasticsearch indexer timeout on indexing") +define("elasticsearch_day_retention", type=int, default=21, help="Elasticsearch indexer number of days of tasks to retain in the index") +define("elasticsearch_url", type=str, default="http://localhost:9200/", help="Elasticsearch URL for indexer and standard flower for history lookup") +define("elasticsearch_dashboard", type=bool, default=False, help="Whether to lookup the dashboard information from Elasticsearch") default_options = options diff --git a/flower/urls.py b/flower/urls.py index d3ea5df8..638ae4df 100644 --- a/flower/urls.py +++ b/flower/urls.py @@ -2,7 +2,7 @@ from tornado.web import StaticFileHandler, url -from .api import control, tasks, workers +from .api import control, tasks, workers, elasticsearch_history from .utils import gen_cookie_secret from .views import auth, monitor from .views.broker import BrokerView @@ -54,6 +54,8 @@ # Metrics (r"/metrics", monitor.Metrics), (r"/healthcheck", monitor.Healthcheck), + # Elastic search + (r"/api/es/refresh/(.*)", elasticsearch_history.ElasticSearchHistoryHandler), # Static (r"/static/(.*)", StaticFileHandler, {"path": settings['static_path']}), diff --git a/flower/utils/search.py b/flower/utils/search.py index fba842b6..21dd278c 100644 --- a/flower/utils/search.py +++ b/flower/utils/search.py @@ -1,9 +1,11 @@ +import datetime import re +import time from kombu.utils.encoding import safe_str - -def parse_search_terms(raw_search_value): +# pylint: disable=too-many-branches,too-many-statements +def parse_search_terms(raw_search_value, find_time_keys=False): search_regexp = r'(?:[^\s,"]|"(?:\\.|[^"])*")+' # splits by space, ignores space in quotes if not raw_search_value: return {} @@ -11,12 +13,17 @@ def parse_search_terms(raw_search_value): for query_part in re.findall(search_regexp, raw_search_value): if not query_part: continue + find_any = True if query_part.startswith('result:'): parsed_search['result'] = preprocess_search_value(query_part[len('result:'):]) elif query_part.startswith('args:'): if 'args' not in parsed_search: parsed_search['args'] = [] parsed_search['args'].append(preprocess_search_value(query_part[len('args:'):])) + elif query_part.startswith('taskname:'): + if 'taskname' not in parsed_search: + parsed_search['taskname'] = [] + parsed_search['taskname'].append(preprocess_search_value(query_part[len('taskname:'):])) elif query_part.startswith('kwargs:'): if 'kwargs'not in parsed_search: parsed_search['kwargs'] = {} @@ -29,7 +36,60 @@ def parse_search_terms(raw_search_value): if 'state' not in parsed_search: parsed_search['state'] = [] parsed_search['state'].append(preprocess_search_value(query_part[len('state:'):])) - else: + elif query_part.startswith('es:'): + parsed_search['es'] = preprocess_search_value(query_part[len('es:'):]) + elif query_part.startswith('uuid:'): + parsed_search['uuid'] = preprocess_search_value(query_part[len('uuid:'):]) + elif query_part.startswith('runtime_lt:'): + parsed_search['runtime_lt'] = preprocess_search_value(query_part[len('runtime_lt:'):]) + elif query_part.startswith('runtime_gt:'): + parsed_search['runtime_gt'] = preprocess_search_value(query_part[len('runtime_gt:'):]) + elif query_part.startswith('root_id:'): + parsed_search['root_id'] = preprocess_search_value(query_part[len('root_id:'):]) + elif query_part.startswith('parent_id:'): + parsed_search['parent_id'] = preprocess_search_value(query_part[len('parent_id:'):]) + if parsed_search: + find_any = False + if find_time_keys: + def convert(x): + try: + if x.count(":") == 2: + if x.count("."): + # does not return fractional second information, but at least + # we can "support" it being passed in, as opposed to ignoring it + return time.mktime( + datetime.datetime.strptime(x, '%Y-%m-%d %H:%M:%S.%f').timetuple() + ) + + return time.mktime( + datetime.datetime.strptime(x, '%Y-%m-%d %H:%M:%S').timetuple() + ) + return time.mktime( + datetime.datetime.strptime(x, '%Y-%m-%d %H:%M').timetuple() + ) + except ValueError: + return "" + if query_part.startswith('received_start'): + received_start = preprocess_search_value(query_part[len('received_start:'):]) + if received_start: + parsed_search['received_start'] = convert(received_start) + find_any = False + if query_part.startswith('received_end'): + received_end = preprocess_search_value(query_part[len('received_end:'):]) + if received_end: + parsed_search['received_end'] = convert(received_end) + find_any = False + if query_part.startswith('started_start'): + started_start = preprocess_search_value(query_part[len('started_start:'):]) + if started_start: + parsed_search['started_start'] = convert(started_start) + find_any = False + if query_part.startswith('started_end'): + started_end = preprocess_search_value(query_part[len('started_end:'):]) + if started_end: + parsed_search['started_end'] = convert(started_end) + find_any = False + if find_any: parsed_search['any'] = preprocess_search_value(query_part) return parsed_search @@ -37,11 +97,20 @@ def parse_search_terms(raw_search_value): def satisfies_search_terms(task, search_terms): any_value_search_term = search_terms.get('any') result_search_term = search_terms.get('result') + task_name_search_term = search_terms.get('taskname') args_search_terms = search_terms.get('args') kwargs_search_terms = search_terms.get('kwargs') state_search_terms = search_terms.get('state') + runtime_lt = search_terms.get("runtime_lt") + runtime_gt = search_terms.get("runtime_gt") - if not any([any_value_search_term, result_search_term, args_search_terms, kwargs_search_terms, state_search_terms]): + activated_terms = [ + any_value_search_term, result_search_term, + task_name_search_term, args_search_terms, + kwargs_search_terms, state_search_terms, + runtime_lt, runtime_gt, + ] + if not any(activated_terms): return True terms = [ @@ -54,7 +123,9 @@ def satisfies_search_terms(task, search_terms): kwargs_search_terms and all( stringified_dict_contains_value(k, v, task.kwargs) for k, v in kwargs_search_terms.items() ), - args_search_terms and task_args_contains_search_args(task.args, args_search_terms) + args_search_terms and task_args_contains_search_args(task.args, args_search_terms), + runtime_lt is not None and task.runtime is not None and float(runtime_lt) > task.runtime, + runtime_gt is not None and task.runtime is not None and float(runtime_gt) < task.runtime, ] return any(terms) diff --git a/flower/utils/tasks.py b/flower/utils/tasks.py index 4abcd82f..f754549c 100644 --- a/flower/utils/tasks.py +++ b/flower/utils/tasks.py @@ -1,13 +1,16 @@ import datetime import time +from celery.events.state import Task + from .search import parse_search_terms, satisfies_search_terms # pylint: disable=too-many-branches,too-many-locals,too-many-arguments def iter_tasks(events, limit=None, offset=0, type=None, worker=None, state=None, sort_by=None, received_start=None, received_end=None, - started_start=None, started_end=None, search=None): + started_start=None, started_end=None, search=None, + root_id=None, parent_id=None, runtime_lt=None, runtime_gt=None): i = 0 tasks = events.state.tasks_by_timestamp() if sort_by is not None: @@ -25,6 +28,14 @@ def convert(x): continue if state and task.state != state: continue + if root_id and task.root_id != root_id: + continue + if parent_id and task.parent_id != parent_id: + continue + if runtime_lt is not None and task.runtime is not None and runtime_lt < task.runtime: + continue + if runtime_gt is not None and task.runtime is not None and runtime_gt > task.runtime: + continue if received_start and task.received and\ task.received < convert(received_start): continue @@ -62,7 +73,7 @@ def sort_tasks(tasks, sort_by): reverse=reverse) -def get_task_by_id(events, task_id): +def get_task_by_id(events, task_id) -> Task: return events.state.tasks.get(task_id) diff --git a/flower/views/tasks.py b/flower/views/tasks.py index 189c0828..9e0431d1 100644 --- a/flower/views/tasks.py +++ b/flower/views/tasks.py @@ -1,19 +1,82 @@ import copy +import datetime import logging +import re +import time +import typing from functools import total_ordering +from celery.events.state import Task +from kombu.utils.functional import LRUCache from tornado import web +from ..events import Events +from ..options import options as runtime_options +from ..utils.search import parse_search_terms from ..utils.tasks import as_dict, get_task_by_id, iter_tasks from ..views import BaseHandler +try: + from elasticsearch.client import Elasticsearch + from elasticsearch.exceptions import TransportError + from elasticsearch_dsl import Search + from elasticsearch_dsl.query import Match, Range, Term, Terms, Wildcard + from elasticsearch_dsl.search import Hit + +except ImportError: + Elasticsearch = None + Match = None + TransportError = None + Search = None + Wildcard = None + Terms = None + Term = None + Range = None + Hit = None + logger = logging.getLogger(__name__) class TaskView(BaseHandler): + def __init__(self, *args, **kwargs): + self.use_es = runtime_options.elasticsearch + self.elasticsearch_url = runtime_options.elasticsearch_url + if self.use_es: + self.es_client = Elasticsearch([self.elasticsearch_url, ]) + else: + self.es_client = None + super().__init__(*args, **kwargs) + @web.authenticated - def get(self, task_id): - task = get_task_by_id(self.application.events, task_id) + def get(self, task_id: str): + use_es = self.get_argument('es', type=bool, default=self.use_es) + task: typing.Union[Task, Hit] + if use_es: + + try: + es_client = self.es_client + if es_client.indices.exists('task'): + es_s = Search(using=es_client, index='task') + for hit in es_s.query(Match(_id=task_id)): + task = hit + task.uuid = task_id + task.worker = type('worker', (), {})() + task.children_raw = task.children + task.children = [] + for re_match in [m for m in re.finditer(r"\w+(-\w+)+)\) \w+ clock:\d+>", task.children_raw) if m]: + task.children.append(Task(uuid=re_match.group("task_uuid"))) + task.worker.hostname = task.hostname + break + else: + use_es = False + else: + use_es = False + except TransportError: + logger.exception('Issue getting elastic search task data; falling back to in memory') + use_es = False + if not use_es: + events = typing.cast(Events, self.application.events) + task = get_task_by_id(events, task_id) if task is None: raise web.HTTPError(404, f"Unknown task '{task_id}'") @@ -41,42 +104,221 @@ def __lt__(self, other): return self.value is None +class DoNotUseElasticSearchHistoryError(Exception): + pass + + class TasksDataTable(BaseHandler): + def __init__(self, *args, **kwargs): + if runtime_options.elasticsearch: + self.query_cache = LRUCache(limit=1000) + self.use_es = True + self.elasticsearch_url = runtime_options.elasticsearch_url + + self.es_client = Elasticsearch([self.elasticsearch_url, ]) + else: + self.query_cache = None + self.elasticsearch_url = None + self.use_es = False + super().__init__(*args, **kwargs) + + # pylint: disable=too-many-locals @web.authenticated def get(self): - app = self.application draw = self.get_argument('draw', type=int) start = self.get_argument('start', type=int) length = self.get_argument('length', type=int) search = self.get_argument('search[value]', type=str) - + use_es = bool(self.use_es) column = self.get_argument('order[0][column]', type=int) sort_by = self.get_argument(f'columns[{column}][data]', type=str) sort_order = self.get_argument('order[0][dir]', type=str) == 'desc' + total_records = 0 + records_filtered = [] + if use_es: + use_es, sort_by, filtered_tasks, records_filtered, total_records = self.sort_by_with_elastic_search(search, sort_by, sort_order, start, length) + if not use_es: + def key(item): + return Comparable(getattr(item[1], sort_by)) - def key(item): - return Comparable(getattr(item[1], sort_by)) + self.maybe_normalize_for_sort(self.application.events.state.tasks_by_timestamp(), sort_by) + sorted_tasks = sorted( + iter_tasks(self.application.events, search=search), + key=key, + reverse=sort_order + ) + total_records = len(sorted_tasks) - self.maybe_normalize_for_sort(app.events.state.tasks_by_timestamp(), sort_by) + filtered_tasks = [] + records_filtered = len(sorted_tasks) - sorted_tasks = sorted( - iter_tasks(app.events, search=search), - key=key, - reverse=sort_order - ) + for task in sorted_tasks[start:start + length]: + task_dict = as_dict(self.format_task(task)[1]) + if task_dict.get('worker'): + task_dict['worker'] = task_dict['worker'].hostname - filtered_tasks = [] + filtered_tasks.append(task_dict) - for task in sorted_tasks[start:start + length]: - task_dict = as_dict(self.format_task(task)[1]) - if task_dict.get('worker'): - task_dict['worker'] = task_dict['worker'].hostname + self.write(dict(draw=draw, data=filtered_tasks, + recordsTotal=total_records, + recordsFiltered=records_filtered)) # bug? - filtered_tasks.append(task_dict) + # pylint: disable=too-many-branches,too-many-locals,too-many-arguments,too-many-statements,too-many-nested-blocks + def sort_by_with_elastic_search(self, search, sort_by, sort_order, start, length): + es_client = self.es_client + filtered_tasks = [] + records_filtered = 0 + use_es = True + total_records = 0 + try: + es_s = Search(using=es_client, index='task') + if search: + search_terms = parse_search_terms(search or {}, find_time_keys=True) + if search_terms: + if 'es' in search_terms: + if search_terms.get('es') == '0': + raise DoNotUseElasticSearchHistoryError() + if 'args' in search_terms: + s_args = search_terms['args'] + arg_queries = None + for s_arg in s_args: + if arg_queries is None: + arg_queries = Wildcard(args='*'+s_arg+'*') + else: + arg_queries &= Wildcard(args='*'+s_arg+'*') + es_s = es_s.query(arg_queries) + if 'kwargs' in search_terms: + s_args = search_terms['kwargs'] + arg_queries = None + for s_arg, s_v_arg in s_args.items(): + if arg_queries is None: + arg_queries = Wildcard(kwargs='*' + s_arg + ': ' + s_v_arg + '*') + else: + arg_queries &= Wildcard(kwargs='*' + s_arg + ': ' + s_v_arg + '*') + es_s = es_s.query(arg_queries) + if 'result' in search_terms: + es_s = es_s.query(Match(result=search_terms['result'])) + if 'taskname' in search_terms: + es_s = es_s.query(Terms(name=search_terms['taskname'])) + if 'runtime_lt' in search_terms and search_terms['runtime_lt']: + es_s = es_s.query(Range(runtime=dict(lt=float(search_terms['runtime_lt'])))) + if 'runtime_gt' in search_terms and search_terms['runtime_gt']: + es_s = es_s.query(Range(runtime=dict(gt=float(search_terms['runtime_gt'])))) + if 'parent_id' in search_terms: + es_s = es_s.filter(Term(parent_id=search_terms['parent_id'])) + if 'root_id' in search_terms: + es_s = es_s.filter(Term(root_id=search_terms['root_id'])) + if 'received_start' in search_terms: + es_s = es_s.filter(Range(received_time=dict(gt=datetime.datetime.utcfromtimestamp(search_terms['received_start']).isoformat()))) + if 'received_end' in search_terms: + es_s = es_s.filter(Range(received_time=dict(lt=datetime.datetime.utcfromtimestamp(search_terms['received_end']).isoformat()))) + if 'started_start' in search_terms: + es_s = es_s.filter(Range(started_time=dict(gt=datetime.datetime.utcfromtimestamp(search_terms['started_start']).isoformat()))) + if 'started_end' in search_terms: + es_s = es_s.filter(Range(started_time=dict(lt=datetime.datetime.utcfromtimestamp(search_terms['started_end']).isoformat()))) + if 'state' in search_terms: + es_s = es_s.query(Terms(state=search_terms['state'])) + if search_terms.get('uuid'): + es_s = es_s.query(Term(**{'_id': search_terms['uuid']})) + # if searching by `uuid`, then no need to search by `any`. + search_terms.pop('any', None) + if 'any' in search_terms: + # this is a simple form of the `any` search that flower constructs + es_id_query = es_s.filter(Term(**{'_id': search})) + id_hits = es_id_query.execute().hits.hits + if id_hits: + es_s = es_id_query + else: + es_s = es_s.query(Wildcard(name='*' + search + '*') | + Wildcard(hostname='*' + search + '*')) - self.write(dict(draw=draw, data=filtered_tasks, - recordsTotal=len(sorted_tasks), - recordsFiltered=len(sorted_tasks))) + # total_records = es_s.count() + if sort_by in ('started', 'received', 'succeeded', 'failed', 'revoked', 'timestamp', ): + sort_by += '_time' + sorted_tasks = es_s.sort({sort_by: dict(order='asc' if sort_order else 'desc')}) + filtered_tasks = [] + # elastic search window for normal pagination is default at 10000 + # so if we're over 10000, then we need to hand-find the appropriate next value + # and to do this we have 1 major way: + # using `search_after` (from this value or a previous search_after) + # we can efficiently search deeply into elasticsearch + # + # If it's our first time and we're going way beyond 10000, then we'll use `search_after` + # to efficiently get to the proper spot + # And if we have the previous start already cached, we'll use that to find the next start + if start + length > 10000: + total_tries = 5 + cache_value = None + cache_start = None + if self.query_cache: + for start_offset in range(1, 201): + for _ in range(total_tries): + cache_value = self.query_cache.get((start - (length * start_offset), + length, sort_by, sort_order)) + if cache_value: + cache_start = start - (length * start_offset) + break + time.sleep(0.001) + if cache_value: + break + # WIP: handle the case where we grab an older cache key and we need to forward to + # the current search context appropriately (people spamming the `Next` button faster than we + # can compute the next one. We can get old ones if they miss the next one) + # We already retry on the current one, but if we miss out and go earlier, then there could be a bug. + if cache_value: + if cache_start < start - length: + # WIP: validate that this will forward us to the correct current start + for _ in range(cache_start + length, start - length, length): + sorted_tasks = es_s.extra(from_=0, + size=length, search_after=cache_value).sort( + {sort_by: 'asc' if sort_order else 'desc'}, {'_uid': 'desc', } + ) + cache_value = sorted_tasks.execute().hits.hits[-1]['sort'] + sorted_tasks = es_s.extra(from_=0, size=length, search_after=cache_value).sort( + {sort_by: 'asc' if not sort_order else 'desc'}, {'_uid': 'desc', }).execute().hits + else: + last_normal_hits = sorted_tasks.extra(from_=9999, size=1).execute().hits.hits + if last_normal_hits: + last_hit = last_normal_hits[0] + sorted_tasks = es_s.extra(from_=0, size=length, + search_after=[last_hit['sort'][0], + 'task#' + last_hit['_id']]).sort( + {sort_by: 'asc' if sort_order else 'desc'}, {'_uid': 'desc', }) + hits = sorted_tasks.execute().hits.hits + if len(hits) + 10000 < start: + # may be a bug in here where we have no hits because of a logic error in here + # we could get more efficient by forwarding with a higher `length` until we get + # to where we need to be + for _ in range(9999+length, start + length, length): + hits = sorted_tasks.execute().hits.hits + sorted_tasks = es_s.extra(from_=0, + size=length, search_after=hits[-1]['sort']).sort( + {sort_by: 'asc' if sort_order else 'desc'}, {'_uid': 'desc', } + ) + sorted_tasks = sorted_tasks.execute().hits + else: + sorted_tasks = sorted_tasks.extra(from_=start, size=length).execute().hits + last_task = None + total_records = sorted_tasks.total + for task in sorted_tasks.hits: + task_dict = task.get('_source') + task_dict['uuid'] = task['_id'] + if task_dict.get('worker'): + task_dict['worker'] = task_dict['hostname'] + else: + task_dict['worker'] = task_dict.get('hostname') + filtered_tasks.append(task_dict) + last_task = task + records_filtered = sorted_tasks.total + if start + length > 10000: + # may be a bug in here --> last_task may be `None` by mistake + self.query_cache[(start, length, sort_by, sort_order)] = last_task.get('sort') + except TransportError: + logger.exception('Issue getting elastic search task data; falling back to in memory') + use_es = False + except DoNotUseElasticSearchHistoryError: + use_es = False + return use_es, sort_by, filtered_tasks, records_filtered, total_records @classmethod def maybe_normalize_for_sort(cls, tasks, sort_by): @@ -112,13 +354,13 @@ def get(self): app = self.application capp = self.application.capp - time = 'natural-time' if app.options.natural_time else 'time' + time_setting = 'natural-time' if app.options.natural_time else 'time' if capp.conf.timezone: - time += '-' + str(capp.conf.timezone) + time_setting += '-' + str(capp.conf.timezone) self.render( "tasks.html", tasks=[], columns=app.options.tasks_columns, - time=time, + time=time_setting, ) diff --git a/setup.py b/setup.py index 7e028ec9..f0bf1a32 100644 --- a/setup.py +++ b/setup.py @@ -44,6 +44,13 @@ def get_requirements(filename): classifiers = [s.strip() for s in classes.split('\n') if s] +EXTRAS_REQUIRE = { + "elasticsearch": ["elasticsearch>=5.4,<6.4", "elasticsearch_dsl>=5.4,<6.4", "requests>=2.13,<3", ], +} + + +EXTRAS_REQUIRE.update({':python_version == "2.7"': ['futures']}) + setup( name='flower', version=get_package_version(), @@ -58,6 +65,7 @@ def get_requirements(filename): python_requires=">=3.7", packages=find_packages(exclude=['tests', 'tests.*']), install_requires=get_requirements('default.txt'), + extras_require=EXTRAS_REQUIRE, test_suite="tests", tests_require=get_requirements('test.txt'), package_data={'flower': ['templates/*', 'static/*.*', @@ -65,6 +73,11 @@ def get_requirements(filename): entry_points={ 'celery.commands': [ 'flower = flower.command:flower', + 'flower-indexer = flower.command:indexer', + ], + 'console_scripts': [ + 'flower = flower.__main__:main', + 'flower-indexer = flower.__indexer__:main', ], }, )