Skip to content

Commit

Permalink
Merge pull request #4 from gjeusel/1-lazy-backend-result-implementation
Browse files Browse the repository at this point in the history
Solve: 1 lazy backend result implementation
  • Loading branch information
epandurski authored Feb 7, 2020
2 parents 1db8b50 + f0ae464 commit 8efe7a8
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 1 deletion.
17 changes: 16 additions & 1 deletion flask_melodramatiq/lazy_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ def __init__(self, app=None, config_prefix=DEFAULT_CONFIG_PREFIX, **options):
# stub broker, until our broker is ready.
self.__stub = dramatiq.brokers.stub.StubBroker(middleware=options.pop('middleware', None))

# We want to be be capabale of registering actors that might store
# results. In that end, we add a stub backend results proxy.
self.__empty_backend = dramatiq.results.Results()
self.__stub.add_middleware(self.__empty_backend)

self._unregistered_lazy_actors = []
if app is not None:
self.init_app(app)
Expand All @@ -133,18 +138,28 @@ def init_app(self, app):
"""

if self.__stub:
self.__options['middleware'] = self.__stub.middleware
self.__options['middleware'] = [
m for m in self.__stub.middleware
if m is not self.__empty_backend
]
configuration = self.__get_configuration(app)
self.__stub.close()
self.__stub = None
self.__app = app
self.__configuration = configuration
options = configuration.copy()
self.__class__ = options.pop('class')

# Instanciate dramatiq Broker
broker = self._dramatiq_broker_factory(**options)

# Add Flask App Context Middleware
broker.add_middleware(AppContextMiddleware(app))

# Register actors on broker
for actor in self._unregistered_lazy_actors:
actor._register_proxied_instance(broker=broker)

self._unregistered_lazy_actors = None
self._proxied_instance = broker # `self` is sealed from now on.
else:
Expand Down
52 changes: 52 additions & 0 deletions tests/test_basics.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
from flask_melodramatiq import Broker, StubBroker
from flask_melodramatiq.lazy_broker import LazyActor, MultipleAppsWarningMiddleware, missing
from dramatiq.middleware import Middleware
from dramatiq.results.backends.redis import RedisBackend
from dramatiq.results.backends.stub import StubBackend
from dramatiq.results import Results
from mock import Mock


def test_actor_attr_access(app, broker, run_mock):
Expand Down Expand Up @@ -282,3 +286,51 @@ def task():
worker.start()
worker.join()
assert run_mock.call_count == 2


class TestBackendResultMiddleware:
def test_it_remove_the_dummy_backend_at_init_app(self, app, broker):
mocked_client = Mock()
backend = RedisBackend(client=mocked_client)
result_middleware = Results(backend=backend)
broker.add_middleware(result_middleware)
broker.init_app(app)

only_results = list(filter(lambda x: isinstance(x, Results), broker.middleware))
assert len(only_results) == 1
assert only_results[0].backend is backend

def test_it_can_lazy_register_actors_with_store_results(self, app, broker, run_mock):
expected = "HelloYou"
run_mock.return_value = expected
@broker.actor(store_results=True)
def task():
return run_mock()

backend = StubBackend()
result_middleware = Results(backend=backend)
broker.add_middleware(result_middleware)
broker.init_app(app)

job = task.send()
worker = dramatiq.Worker(broker)
worker.start()
worker.join()
assert job.get_result() == expected

def test_it_raises_when_at_runtime_when_no_backend_for_results(self, app, broker):
@broker.actor(store_results=True)
def task():
pass

broker.init_app(app)
broker.emit_after("process_boot") # set Prometheus

job = task.send()
worker = dramatiq.Worker(broker)
worker.start()
worker.join()

match = "default broker doesn't have a results backend"
with pytest.raises(RuntimeError, match=match):
job.get_result()

0 comments on commit 8efe7a8

Please sign in to comment.