Skip to content

Commit

Permalink
Migrate DID Finders to use Celery
Browse files Browse the repository at this point in the history
  • Loading branch information
BenGalewsky committed Jul 3, 2024
1 parent 7c671fa commit 31ba467
Show file tree
Hide file tree
Showing 17 changed files with 1,167 additions and 1,255 deletions.
153 changes: 83 additions & 70 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,105 +4,118 @@

## Introduction

ServiceX DID finders take a dataset name and turn them into files to be transformed. They interact with ServiceX via the Rabbit MQ message broker. As such, the integration into ServiceX and the API must be the same for all DID finders. This library abstracts away some of that interaction so that the code that interacts with ServiceX can be separated from the code that translates a dataset identifier into a list of files.
ServiceX DID finders take a dataset name and turn them into files to be transformed. They are
implemented as a Celery application with a task called `do_lookup`. Developers of a specific
DID Finder implementation need to write a generator function which yields a dictionary for each
file in the dataset.

The DID Finder author need write only a line of initialization code and then a small routine that translates a DID into a list of files.
The Task interacts with ServiceX through the App's REST endpoint to add files to the dataset and
a separate REST endpoint to signal that the dataset is complete.

## Usage
The app caches DID lookups. The `dataset_id` is the primary key for the cache table.

A very simple [demo](https://github.com/ssl-hep/ServiceX_DID_Finder_Demo) has been created to show how to make a basic DID finder for ServiceX. You can use that as a starting point if authoring one.
Invocations of the `do_lookup` task accepts the following arguments:
* `did`: The dataset identifier to look up
* `dataset_id`: The ID of the dataset in the database
* `endpoint`: The ServiceX endpoint to send the results to
* `user_did_finder`: The user callback that is a generator function that yields file information dictionaries.

Create an async callback method that `yield`s file info dictionaries. For example:
## Creating a DID Finder
You start with a new Python project. You will need to add this library as a dependency to the project
by adding the following to your `pyproject.tom` file:

```
servicex-did-finder-lib = "^3.0"
```

Create a script that will run your DID. It needs to contain your generator function that adheres
to the UserDIDHandler signature:
```python
async def my_callback(did_name: str, info: Dict[str, Any]):
for i in range(0, 10):
yield {
'paths': [f"root://atlas-experiment.cern.ch/dataset1/file{i}.root"],
'adler32': b183712731,
'file_size': 0,
'file_events': 0,
}
UserDIDHandler = Callable[
[str, Dict[str, Any], Dict[str, Any]],
Generator[Dict[str, Any], None, None]
]
```

The arguments to the method are straight forward:

* `did_name`: the name of the DID that you should look up. It has the schema stripped off (e.g. if the user sent ServiceX `rucio://dataset_name_in_rucio`, then `did_name` will be `dataset_name_in_rucio`)
* `info` contains a dict of various info about the request that asked for this DID.
* `info` contains a dict of various info about the database ID for this dataset.
* `did_finder_args` contains the arguments that were passed to the DID finder at startup. This is a way to pass command line arguments to your file finder

Yield the results as you find them - ServiceX will actually start processing the files before your DID lookup is finished if you do this. The fields you need to pass back to the library are as follows:
Yield the results as you find them. The fields you need to pass back to the library are as follows:

* `paths`: An ordered list of URIs that a transformer in ServiceX can access to get at the file. Often these are either `root://` or `http://` schema URI's. When accessing the file, URIs will be tried in ordered listed.
* `adler32`: A CRC number for the file. This CRC is calculated in a special way by rucio and is not used. Leave as 0 if you do not know it.
* `file_size`: Number of bytes of the file. Used to calculate statistics. Leave as zero if you do not know it (or it is expensive to look up).
* `file_events`: Number of events in the file. Used to calculate statistics. Leave as zero if you do not know it (or it is expensive to look up).

Once the callback is working privately, it is time to build a container. ServiceX will start the container and pass, one way or the other, a few arguments to it. Several arguments are required for the DID finder to work (like `rabbitUrl`). The library will automatically parse these during initialization.

To initialize the library and start listening for and processing messages from RabbitMQ, you must initialize the library. In all cases, the call to `start_did_finder` will not return.

If you do not have to process any command line arguments to configure the service, then the following is good enough:
Here's a simple example of a did handler generator:

```python
start_did_finder('my_finder', my_callback)
def find_files(did_name: str,
info: Dict[str, Any],
did_finder_args: Dict[str, Any]
) -> Generator[Dict[str, Any], None]:
__log.info('DID Lookup request received.', extra={
'requestId': info['request-id'], 'dataset': did_name})

urls = xrd.glob(cache_prefix + did_name)
if len(urls) == 0:
raise RuntimeError(f"No files found matching {did_name} for request "
f"{info['request-id']} - are you sure it is correct?")

for url in urls:
yield {
'paths': [url],
'adler32': 0, # No clue
'file_size': 0, # We could look up the size but that would be slow
'file_events': 0, # And this we do not know
}
```

The first argument is the name of the schema. The user will use `my_finder://dataset_name` to access this DID lookup (and `my_callback` is called with `dataset_name` as `did_name`). Please make sure everything is lower case: schema in URI's are not case sensitive.
There is a small amount of additional boilerplate code that is required to create a DID Finder. This
is the code that will create the Celery app and register your function as a task. Here is an
example (which assumes that `find_files` is your DID handler):
```python
from servicex_did_finder_lib import DIDFinderApp

app = DIDFinderApp('cernopendata')

The second argument is the call back.
@app.did_lookup_task(name="did_finder_cern_opendata.lookup_dataset")
def lookup_dataset(self, did: str, dataset_id: int, endpoint: str) -> None:
self.do_lookup(did=did, dataset_id=dataset_id,
endpoint=endpoint, user_did_finder=find_files)

app.start()
```

If you do need to configure your DID finder from command line arguments, then use the python `argparse` library, being sure to hook in the did finder library as follows (this code is used to start the rucio finder):
## Extra Command Line Arguments
Sometimes you need to pass additional information to your DID Finder from the command line. You do
this by creating your own `ArgParser` and then calling the `add_did_finder_cnd_arguments` method
which inserts the arguments that the library needs to pass to the finder. Here is an example:

```python
# Parse the command line arguments
parser = argparse.ArgumentParser()
parser.add_argument('--site', dest='site', action='store',
default=None,
help='XCache Site)')
parser.add_argument('--prefix', dest='prefix', action='store',
default='',
help='Prefix to add to Xrootd URLs')
parser.add_argument('--threads', dest='threads', action='store',
default=10, type=int, help="Number of threads to spawn")
add_did_finder_cnd_arguments(parser)

args = parser.parse_args()

site = args.site
prefix = args.prefix
threads = args.threads
logger.info("ServiceX DID Finder starting up: "
f"Threads: {threads} Site: {site} Prefix: {prefix}")

# Initialize the finder
did_client = DIDClient()
replica_client = ReplicaClient()
rucio_adapter = RucioAdapter(did_client, replica_client)

# Run the DID Finder
try:
logger.info('Starting rucio DID finder')

async def callback(did_name, info):
async for f in find_files(rucio_adapter, site, prefix, threads,
did_name, info):
yield f

start_did_finder('rucio',
callback,
parsed_args=args)

finally:
logger.info('Done running rucio DID finder')
import argparse
from servicex_did_finder_lib import DIDFinderApp

parser = argparse.ArgumentParser()
parser.add_argument('--foo', dest='foo', action='store',
default='',
help='Prefix to add to Xrootd URLs')

DIDFinderApp.add_did_finder_cnd_arguments(parser)

```

In particular note:
You then just pass the dictionary of parsed args to your app constructor:
```python
app = DIDFinderApp('cernopendata', parsed_args=parser.parse_args())
```

1. The call to `add_did_finder_cnd_arguments` to setup the arguments required by the finder library.
2. Parsing of the arguments using the usual `parse_args` method
3. Passing the parsed arguments to `start_did_finder`.
These parsed args will be passed to your `find_files` function as a dictionary in
the `did_finder_args` parameter.

Another pattern in the above code that one might find useful - a thread-safe way of passing global arguments into the callback. Given Python's Global Interpreter Lock, this is probably not necessary.

### Proper Logging

Expand All @@ -124,7 +137,7 @@ In the end, all DID finders for ServiceX will run under Kubernetes. ServiceX com
}
```

The `start_did_finder` will configure the python root logger properly.
The `DIDFinderApp` will configure the python root logger properly.

## URI Format

Expand Down
Loading

0 comments on commit 31ba467

Please sign in to comment.