Skip to content

Commit

Permalink
Specify queue name to make sure we only receive tasks destined for th…
Browse files Browse the repository at this point in the history
…is did finder
  • Loading branch information
BenGalewsky committed Jul 8, 2024
1 parent 8a072f3 commit c1e3372
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 4 deletions.
11 changes: 9 additions & 2 deletions src/servicex_did_finder_lib/did_finder_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ def do_lookup(self, did: str, dataset_id: int, endpoint: str, user_did_finder: U

acc.send_on(did_info.file_count)
except Exception:
# noinspection PyTypeChecker
self.logger.error(
f"Error processing DID {did}",
extra={"dataset_id": dataset_id},
Expand Down Expand Up @@ -144,7 +145,9 @@ def __init__(self, did_finder_name: str,

initialize_root_logger(self.name)

self.app = Celery(f"did_finder_{self.name}", broker_url=self.parsed_args['rabbit_uri'])
self.app = Celery(f"did_finder_{self.name}",
broker_url=self.parsed_args['rabbit_uri'],
broker_connection_retry_on_startup=True)

# Cache the args in the App so they are accessible to the tasks
self.app.did_finder_args = self.parsed_args
Expand All @@ -170,7 +173,11 @@ def wrapper(*args, **kwargs):
return decorator

def start(self):
self.app.worker_main(argv=['worker', '--loglevel=INFO'])
self.app.worker_main(argv=['worker',
'--loglevel=INFO',
'-Q', f'did_finder_{self.name}',
'-n', f'{self.name}@%h'
])

@classmethod
def add_did_finder_cnd_arguments(cls, parser: argparse.ArgumentParser):
Expand Down
9 changes: 7 additions & 2 deletions tests/servicex_did_finder_lib_tests/test_did_finder_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,5 +119,10 @@ def test_did_finder_app(mocker, monkeypatch):
celery.return_value = mock_celery_app
app = DIDFinderApp(did_finder_name="pytest", parsed_args=None)
app.start()
celery.assert_called_with("did_finder_pytest", broker_url="my-rabbit")
mock_celery_app.worker_main.assert_called_with(argv=['worker', '--loglevel=INFO'])
celery.assert_called_with("did_finder_pytest",
broker_connection_retry_on_startup=True,
broker_url="my-rabbit")
mock_celery_app.worker_main.assert_called_with(argv=['worker',
'--loglevel=INFO',
'-Q', 'did_finder_pytest',
'-n', 'pytest@%h'])

0 comments on commit c1e3372

Please sign in to comment.