diff --git a/README.md b/README.md index 46ace90..61000fa 100644 --- a/README.md +++ b/README.md @@ -28,15 +28,32 @@ by adding the following to your `pyproject.tom` file: servicex-did-finder-lib = "^3.0" ``` -Create a script that will run your DID. It needs to contain your generator function that adheres -to the UserDIDHandler signature: +Create a celery app that will run your DID finder. This app will be responsible for starting the +Celery worker and registering your DID finder function as a task. Here is an example of how to do +this. Celery prefers that the app is in a file called `celery.py` in a module in your project. Here +is an example of how to do this: + +## celery.py: ```python -UserDIDHandler = Callable[ - [str, Dict[str, Any], Dict[str, Any]], - Generator[Dict[str, Any], None, None] -] + +from servicex_did_finder_lib import DIDFinderApp +rucio_adaptor = RucioAdaptor() +app = DIDFinderApp('rucio', did_finder_args={"rucio_adapter": rucio_adaptor}) ``` +Attach the DID finder to the app by using the `did_lookup_task` decorator. This decorator will +register the function as a Celery task. Here is an example of how to do this: + +```python +@app.did_lookup_task(name="did_finder_rucio.lookup_dataset") +def lookup_dataset(self, did: str, dataset_id: int, endpoint: str) -> None: + self.do_lookup(did=did, dataset_id=dataset_id, + endpoint=endpoint, user_did_finder=find_files) +``` + +You will need to implement the `find_files` function. This function is a generator that yields +file information dictionaries. + The arguments to the method are straight forward: * `did_name`: the name of the DID that you should look up. It has the schema stripped off (e.g. if the user sent ServiceX `rucio://dataset_name_in_rucio`, then `did_name` will be `dataset_name_in_rucio`) @@ -74,45 +91,22 @@ def find_files(did_name: str, } ``` -There is a small amount of additional boilerplate code that is required to create a DID Finder. This -is the code that will create the Celery app and register your function as a task. Here is an -example (which assumes that `find_files` is your DID handler): -```python -from servicex_did_finder_lib import DIDFinderApp - -app = DIDFinderApp('cernopendata') - -@app.did_lookup_task(name="did_finder_cern_opendata.lookup_dataset") -def lookup_dataset(self, did: str, dataset_id: int, endpoint: str) -> None: - self.do_lookup(did=did, dataset_id=dataset_id, - endpoint=endpoint, user_did_finder=find_files) - -app.start() -``` ## Extra Command Line Arguments Sometimes you need to pass additional information to your DID Finder from the command line. You do -this by creating your own `ArgParser` and then calling the `add_did_finder_cnd_arguments` method -which inserts the arguments that the library needs to pass to the finder. Here is an example: - +this by creating your own `ArgParser` ```python import argparse -from servicex_did_finder_lib import DIDFinderApp - -parser = argparse.ArgumentParser() -parser.add_argument('--foo', dest='foo', action='store', - default='', - help='Prefix to add to Xrootd URLs') +# Parse command-line arguments +parser = argparse.ArgumentParser(description='DIDFinderApp') +parser.add_argument('--custom-arg', help='Custom argument for DIDFinderApp') +args, unknown = parser.parse_known_args() -DIDFinderApp.add_did_finder_cnd_arguments(parser) +# Create the app instance +app = DIDFinderApp('myApp', did_finder_args={"custom-arg": args.custom_arg}) ``` -You then just pass the dictionary of parsed args to your app constructor: -```python -app = DIDFinderApp('cernopendata', parsed_args=parser.parse_args()) -``` - These parsed args will be passed to your `find_files` function as a dictionary in the `did_finder_args` parameter. diff --git a/src/servicex_did_finder_lib/did_finder_app.py b/src/servicex_did_finder_lib/did_finder_app.py index 6add239..87407e5 100644 --- a/src/servicex_did_finder_lib/did_finder_app.py +++ b/src/servicex_did_finder_lib/did_finder_app.py @@ -25,7 +25,6 @@ # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -import argparse import logging from datetime import datetime from typing import Any, Generator, Callable, Dict, Optional @@ -120,37 +119,31 @@ def do_lookup(self, did: str, dataset_id: int, endpoint: str, user_did_finder: U ) -class DIDFinderApp: +class DIDFinderApp(Celery): """ The main application for a DID finder. This will setup the Celery application and start the worker to process the DID requests. """ def __init__(self, did_finder_name: str, - parsed_args: Optional[argparse.Namespace] = None): + did_finder_args: Optional[Dict[str, Any]] = None, + *args, **kwargs): """ Initialize the DID finder application Args: - did_finder_name: The name of the DID finder - parsed_args: The parsed command line arguments. Leave as None to use the default parser + did_finder_name: The name of the DID finder. + did_finder_args: The parsed command line arguments and other objects you want + to make available to the tasks """ self.name = did_finder_name - self.parsed_args = vars(parsed_args) if parsed_args else None - - # Setup command line parsing - if self.parsed_args is None: - parser = argparse.ArgumentParser() - self.add_did_finder_cnd_arguments(parser) - self.parsed_args = vars(parser.parse_args()) - initialize_root_logger(self.name) - self.app = Celery(f"did_finder_{self.name}", - broker_url=self.parsed_args['rabbit_uri'], - broker_connection_retry_on_startup=True) + super().__init__(f"did_finder_{self.name}", *args, + broker_connection_retry_on_startup=True, + **kwargs) - # Cache the args in the App so they are accessible to the tasks - self.app.did_finder_args = self.parsed_args + # Cache the args in the App, so they are accessible to the tasks + self.did_finder_args = did_finder_args def did_lookup_task(self, name): """ @@ -166,41 +159,8 @@ def lookup_dataset(self, did: str, dataset_id: int, endpoint: str) -> None: name: The name of the task """ def decorator(func): - @self.app.task(base=DIDFinderTask, bind=True, name=name) + @self.task(base=DIDFinderTask, bind=True, name=name) def wrapper(*args, **kwargs): return func(*args, **kwargs) return wrapper return decorator - - def start(self): - self.app.worker_main(argv=['worker', - '--loglevel=INFO', - '-Q', f'did_finder_{self.name}', - '-n', f'{self.name}@%h' - ]) - - @classmethod - def add_did_finder_cnd_arguments(cls, parser: argparse.ArgumentParser): - """add_did_finder_cnd_arguments Add required arguments to a parser - - If you need to parse command line arguments for some special configuration, create your - own argument parser, and call this function to make sure the arguments needed - for running the back-end communication are filled in properly. - - Then pass the results of the parsing to the DID Finder App's constructor method. - - Args: - parser (argparse.ArgumentParser): The argument parser. Arguments needed for the - did finder/servicex communication will be added. - """ - parser.add_argument( - "--rabbit-uri", dest="rabbit_uri", action="store", required=True - ) - parser.add_argument( - "--prefix", - dest="prefix", - action="store", - required=False, - default="", - help="Prefix to add to use a caching proxy for URIs", - ) diff --git a/tests/servicex_did_finder_lib_tests/test_did_finder_app.py b/tests/servicex_did_finder_lib_tests/test_did_finder_app.py index ed297be..83f0d34 100644 --- a/tests/servicex_did_finder_lib_tests/test_did_finder_app.py +++ b/tests/servicex_did_finder_lib_tests/test_did_finder_app.py @@ -25,7 +25,6 @@ # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -import sys from unittest.mock import patch import pytest from celery import Celery @@ -104,25 +103,14 @@ def test_did_finder_task_exception(mocker, servicex, single_file_info): ) -def test_did_finder_app(mocker, monkeypatch): - # Temporarily replace sys.argv with mock_args - monkeypatch.setattr(sys, 'argv', [ - "did_finder.py", - "--rabbit-uri", "my-rabbit" - ]) +def test_celery_app(): + app = DIDFinderApp('foo') + assert isinstance(app, Celery) + assert app.name == 'foo' - mock_celery_app = mocker.MagicMock(Celery) + @app.did_lookup_task(name="did_finder_rucio.lookup_dataset") + def lookup_dataset(self, did: str, dataset_id: int, endpoint: str) -> None: + self.do_lookup(did=did, dataset_id=dataset_id, + endpoint=endpoint, user_did_finder=lambda x, y, z: None) - with patch( - "servicex_did_finder_lib.did_finder_app.Celery", autospec=True - ) as celery: - celery.return_value = mock_celery_app - app = DIDFinderApp(did_finder_name="pytest", parsed_args=None) - app.start() - celery.assert_called_with("did_finder_pytest", - broker_connection_retry_on_startup=True, - broker_url="my-rabbit") - mock_celery_app.worker_main.assert_called_with(argv=['worker', - '--loglevel=INFO', - '-Q', 'did_finder_pytest', - '-n', 'pytest@%h']) + assert lookup_dataset.__name__ == 'wrapper'