Skip to content
This repository has been archived by the owner on Jan 18, 2020. It is now read-only.

Commit

Permalink
Merge pull request #318 from chop-dbhi/async-worker
Browse files Browse the repository at this point in the history
Methods and jobs API to support async result retrieval
  • Loading branch information
bruth committed Jul 1, 2015
2 parents c88562e + a8cdc22 commit 655c1a7
Show file tree
Hide file tree
Showing 11 changed files with 516 additions and 7 deletions.
1 change: 1 addition & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ branch = True
source = avocado
omit =
*/admin.py
*/south_migrations/*
*/migrations/*
avocado/query/parsers/__init__.py

Expand Down
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ env:

services:
- memcached
- redis-server

addons:
- postgres
Expand Down Expand Up @@ -40,4 +41,4 @@ after_success:
matrix:
exclude:
- python: "2.6"
env: DJANGO=1.7.6 POSTGRES_TEST_USER=postgres POSTGRES_TEST_NAME=avocado MYSQL_TEST_USER=root MYSQL_TEST_NAME=avocado
env: DJANGO=1.7.6 POSTGRES_TEST_USER=postgres POSTGRES_TEST_NAME=avocado MYSQL_TEST_USER=root MYSQL_TEST_NAME=avocado
Empty file added avocado/async/__init__.py
Empty file.
92 changes: 92 additions & 0 deletions avocado/async/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
from django_rq import get_worker, get_queue

from avocado.conf import settings
from avocado.query import utils


def run_jobs():
"""
Execute all the pending jobs.
"""
get_worker(settings.ASYNC_QUEUE).work(burst=True)


def get_job(job_id):
"""
Return the job for the specified ID or None if it cannot be found.
Args:
job_id(uuid): The ID of the job to retrieve.
Returns:
The job with the matching ID or None if no job with the supplied job
ID could be found.
"""
queue = get_queue(settings.ASYNC_QUEUE)
return queue.fetch_job(job_id)


def get_job_count():
"""
Returns the current number of jobs in the queue.
"""
return get_queue(settings.ASYNC_QUEUE).count


def get_job_result(job_id):
"""
Returns the result of the job with the supplied ID.
If the job could not be found or the job is not finished yet, None will
be returned as the job result.
Args:
job_id(uuid): The ID of the job to retrieve the result for.
Returns:
The result of the job with the matching ID or None if the job could
not be found or is not finished.
"""
return get_job(job_id).result


def get_jobs():
"""
Returns a collection of all the pending jobs.
"""
return get_queue(settings.ASYNC_QUEUE).jobs


def cancel_job(job_id):
"""
Cancel the job and its associated query if they exist.
Args:
job_id(uuid): The ID of the job to cancel
Returns:
The cancellation result of the job's query if it had one. If the job
could not be found or the job had no query, this method returns None.
"""
job = get_job(job_id)

if job is None:
return None

result = None
query_name = job.meta.get('query_name')
if query_name:
canceled = utils.cancel_query(query_name)
result = {
'canceled': canceled
}

job.cancel()
return result


def cancel_all_jobs():
"""
Cancels all jobs.
"""
get_queue(settings.ASYNC_QUEUE).empty()
3 changes: 3 additions & 0 deletions avocado/conf/global_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,6 @@
# the ad-hoc queries built from a context and view.
DATA_CACHE = 'default'
QUERY_CACHE = 'default'

# Name of the queue to use for scheduling and working on async jobs.
ASYNC_QUEUE = 'avocado'
190 changes: 188 additions & 2 deletions avocado/query/utils.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
import logging

import django
from django.db import connections, DEFAULT_DB_ALIAS, DatabaseError
from django.core.cache import get_cache
from django.db import connections, DEFAULT_DB_ALIAS, DatabaseError
from django_rq import get_queue

from avocado.conf import settings
from avocado.export import HTMLExporter, registry as exporters
from avocado.query import pipeline

logger = logging.getLogger(__name__)

logger = logging.getLogger(__name__)

DEFAULT_LIMIT = 20
TEMP_DB_ALIAS_PREFIX = '_db:{0}'


Expand Down Expand Up @@ -184,3 +190,183 @@ def _cancel_query(name, db, pid):
return True

logger.warn('canceling queries for {0} is not supported'.format(engine))


def get_exporter_class(export_type):
"""
Returns the exporter class for the supplied export type name.
Args:
export_type(string): The string name of the exporter.
Returns:
The exporter class for the supplied exporter_type as defined in
the exporters registry. See avocado.export.registry for more info.
"""
if export_type.lower() == 'html':
return HTMLExporter

return exporters[export_type]


def async_get_result_rows(context, view, query_options, job_options=None):
"""
Creates a new job to asynchronously get result rows and returns the job ID.
Args:
See get_result_rows argument list.
Keyword Arugments:
Set as properties on the returned job's meta.
Returns:
The ID of the created job.
"""
if not job_options:
job_options = {}

queue = get_queue(settings.ASYNC_QUEUE)
job = queue.enqueue(get_result_rows,
context,
view,
query_options,
evaluate_rows=True)
job.meta.update(job_options)
job.save()

return job.id


def get_result_rows(context, view, query_options, evaluate_rows=False):
"""
Returns the result rows and options given the supplied arguments.
The options include the exporter, queryset, offset, limit, page, and
stop_page that were used when calculating the result rows. These can give
some more context to callers of this method as far as the returned row
set is concerned.
Args:
context (DataContext): Context for the query processor
view (DataView): View for the query processor
query_options (dict): Options for the query and result rows slice.
These options include:
* page: Start page of the result row slice.
* limit: Upper bound on number of result rows returned.
* stop_page: Stop page of result row slice.
* query_name: Query name used when isolating result row query.
* processor: Processor to use to generate queryset.
* tree: Modeltree to pass to QueryProcessor.
* export_type: Export type to use for result rows.
* reader: Reader type to use when exporting, see
export._base.BaseExporter.readers for available readers.
Kwargs:
evaluate_rows (default=False): When this is True, the generator
returned from the read method of the exporter will be evaluated
and all results will be stored in a list. This is useful if the
caller of this method actually needs an evaluated result set. An
example of this is calling this method asynchronously which needs
a pickleable return value(generators can't be pickled).
Returns:
dict -- Result rows and relevant options used to calculate rows. These
options include:
* exporter: The exporter used.
* limit: The limit on the number of result rows.
* offset: The starting offset of the result rows.
* page: The starting page number of the result rows.
* queryset: The queryset used to gather results.
* rows: The result rows themselves.
* stop_page: The stop page of the result rows collection.
"""
offset = None

page = query_options.get('page')
limit = query_options.get('limit') or 0
stop_page = query_options.get('stop_page')
query_name = query_options.get('query_name')
processor_name = query_options.get('processor') or 'default'
tree = query_options.get('tree')
export_type = query_options.get('export_type') or 'html'
reader = query_options.get('reader')

if page is not None:
page = int(page)

# Pages are 1-based.
if page < 1:
raise ValueError('Page must be greater than or equal to 1.')

# Change to 0-base for calculating offset.
offset = limit * (page - 1)

if stop_page:
stop_page = int(stop_page)

# Cannot have a lower index stop page than start page.
if stop_page < page:
raise ValueError(
'Stop page must be greater than or equal to start page.')

# 4...5 means 4 and 5, not everything up to 5 like with
# list slices, so 4...4 is equivalent to just 4
if stop_page > page:
limit = limit * stop_page
else:
# When no page or range is specified, the limit does not apply.
limit = None

QueryProcessor = pipeline.query_processors[processor_name]
processor = QueryProcessor(context=context, view=view, tree=tree)
queryset = processor.get_queryset()

# Isolate this query to a named connection. This will cancel an
# outstanding queries of the same name if one is present.
cancel_query(query_name)
queryset = isolate_queryset(query_name, queryset)

# 0 limit means all for pagination, however the read method requires
# an explicit limit of None
limit = limit or None

# We use HTMLExporter in Serrano but Avocado has it disabled. Until it
# is enabled in Avocado, we can reference the HTMLExporter directly here.
exporter = processor.get_exporter(get_exporter_class(export_type))

# This is an optimization when concepts are selected for ordering
# only. There is no guarantee to how many rows are required to get
# the desired `limit` of rows, so the query is unbounded. If all
# ordering facets are visible, the limit and offset can be pushed
# down to the query.
order_only = lambda f: not f.get('visible', True)
view_node = view.parse()

if filter(order_only, view_node.facets):
iterable = processor.get_iterable(queryset=queryset)
rows = exporter.manual_read(iterable,
offset=offset,
limit=limit)
else:
iterable = processor.get_iterable(queryset=queryset,
limit=limit,
offset=offset)
method = exporter.reader(reader)
rows = method(iterable)

if evaluate_rows:
rows = list(rows)

return {
'context': context,
'export_type': export_type,
'limit': limit,
'offset': offset,
'page': page,
'processor': processor,
'queryset': queryset,
'rows': rows,
'stop_page': stop_page,
'view': view,
}
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ ordereddict
sqlparse
mysql-python
psycopg2
django_rq
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
'modeltree>=1.1.9',
'South==1.0.2',
'jsonfield==1.0.0',
'django_rq',
]

if sys.version_info < (2, 7):
Expand Down
Loading

0 comments on commit 655c1a7

Please sign in to comment.