diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 897f769..56492c2 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -12,20 +12,20 @@ jobs: test: strategy: matrix: - python-version: [3.7, 3.8, 3.9] + python-version: [3.8, 3.9] runs-on: ubuntu-latest steps: - - uses: actions/checkout@master + - uses: actions/checkout@v4.0.0 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 + uses: actions/setup-python@v4.7.0 with: python-version: ${{ matrix.python-version }} - name: Install poetry - uses: Gr1N/setup-poetry@v7 + uses: Gr1N/setup-poetry@v8 - name: Install dependencies run: | - poetry install + poetry install --with dev pip list - name: Lint with Flake8 run: | @@ -36,7 +36,7 @@ jobs: poetry run coverage xml - name: Report coverage using codecov if: github.event_name == 'push' && matrix.python-version == 3.8 - uses: codecov/codecov-action@v1 + uses: codecov/codecov-action@v3.1.4 with: file: ./coverage.xml # optional flags: unittests # optional diff --git a/.github/workflows/pypi.yaml b/.github/workflows/pypi.yaml index 7a8fce4..e252303 100644 --- a/.github/workflows/pypi.yaml +++ b/.github/workflows/pypi.yaml @@ -9,20 +9,20 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4.0.0 - name: Set up Python - uses: actions/setup-python@v2 + uses: actions/setup-python@v4.7.0 with: python-version: 3.8 - name: Get the version id: get_version run: echo ::set-output name=VERSION::${GITHUB_REF#refs/tags/} - name: Install poetry - uses: Gr1N/setup-poetry@v7 + uses: Gr1N/setup-poetry@v8 - name: Build run: | poetry version ${{ steps.get_version.outputs.VERSION }} - name: Build and publish to pypi uses: JRubics/poetry-publish@v1.6 with: - pypi_token: ${{ secrets.pypi_password_sx_did_lib }} + pypi_token: ${{ secrets.pypi_password_sx_did_lib }} \ No newline at end of file diff --git a/README.md b/README.md index bbc0017..52abd62 100644 --- a/README.md +++ b/README.md @@ -28,8 +28,7 @@ Create an async callback method that `yield`s file info dictionaries. For exampl The arguments to the method are straight forward: * `did_name`: the name of the DID that you should look up. It has the schema stripped off (e.g. if the user sent ServiceX `rucio://dataset_name_in_rucio`, then `did_name` will be `dataset_name_in_rucio`) -* `info` contains a dict of various info about the request that asked for this DID: - * `request-id` The request id that has this DID associated. For logging. +* `info` contains a dict of various info about the request that asked for this DID. Yield the results as you find them - ServiceX will actually start processing the files before your DID lookup is finished if you do this. The fields you need to pass back to the library are as follows: @@ -114,7 +113,7 @@ In the end, all DID finders for ServiceX will run under Kubernetes. ServiceX com __log = logger.getLogger(__name__) async def my_callback(did_name: str, info: Dict[str, Any]): __log.info(f'Looking up dataset {did_name}.', - extra={'requestId': info['request-id']}) + extra={'somethign': info['something']}) for i in range(0, 10): yield { @@ -125,9 +124,7 @@ In the end, all DID finders for ServiceX will run under Kubernetes. ServiceX com } ``` -Note the parameter `request-id`: this marks the log messages with the request id that triggered this DID request. This will enable the system to track all log messages across all containers connected with this particular request id - making debugging a lot easier. - -The `start_did_finder` will configure the python root logger properly to dump messages with a request ID in them. +The `start_did_finder` will configure the python root logger properly. ## URI Format diff --git a/pyproject.toml b/pyproject.toml index 0a5f83f..05e21c1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,13 +5,16 @@ description = "ServiceX DID Library Routines" authors = ["Gordon Watts "] [tool.poetry.dependencies] -python = "^3.6" +python = "^3.8" pika = "1.1.0" make-it-sync = "^1.0.0" requests = "^2.25.0" -[tool.poetry.dev-dependencies] -pytest = "^5.2" +[tool.poetry.group.dev] +optional = true + +[tool.poetry.group.dev.dependencies] +pytest = "^7.4" flake8 = "^3.9.1" pytest-mock = "^3.5.1" coverage = "^5.5" diff --git a/src/servicex_did_finder_lib/communication.py b/src/servicex_did_finder_lib/communication.py index 8127783..b664cb4 100644 --- a/src/servicex_did_finder_lib/communication.py +++ b/src/servicex_did_finder_lib/communication.py @@ -26,6 +26,8 @@ __logging = logging.getLogger(__name__) __logging.addHandler(logging.NullHandler()) +# TODO: get rid of different modes. It should always be batch. + class _accumulator: "Track or cache files depending on the mode we are operating in" @@ -42,8 +44,6 @@ def add(self, file_info: Dict[str, Any]): self._file_cache.append(file_info) else: self._summary.add_file(file_info) - if self._summary.file_count == 1: - self._servicex.post_transform_start() self._servicex.put_file_add(file_info) def send_on(self, count): @@ -59,8 +59,6 @@ def send_bulk(self, file_list: List[Dict[str, Any]]): for f in file_list: self.add(f) else: - if self._summary.file_count == 0: - self._servicex.post_transform_start() for ifl in file_list: self._summary.add_file(ifl) self._servicex.put_file_add_bulk(file_list) @@ -93,12 +91,6 @@ async def run_file_fetch_loop( # If we've been holding onto any files, we need to send them now. acc.send_on(did_info.file_count) - # Simple error checking and reporting - if summary.file_count == 0: - servicex.post_status_update( - f"DID Finder found zero files for dataset {did}", severity="fatal" - ) - elapsed_time = int((datetime.now() - start_time).total_seconds()) servicex.put_fileset_complete( { @@ -110,11 +102,9 @@ async def run_file_fetch_loop( } ) - servicex.post_status_update(f"Completed load of files in {elapsed_time} seconds") - def rabbit_mq_callback( - user_callback: UserDIDHandler, channel, method, properties, body, file_prefix=None + user_callback: UserDIDHandler, channel, method, properties, body ): """rabbit_mq_callback Respond to RabbitMQ Message @@ -128,22 +118,23 @@ def rabbit_mq_callback( method ([type]): Delivery method properties ([type]): Properties of the message body ([type]): The body (json for us) of the message - file_prefix([str]): Prefix to put in front of file paths to enable use of Cache service """ - request_id = None # set this in case we get an exception while loading request + dataset_id = None # set this in case we get an exception while loading request try: # Unpack the message. Really bad if we fail up here! did_request = json.loads(body) did = did_request["did"] - request_id = did_request["request_id"] + dataset_id = did_request["dataset_id"] + endpoint = did_request["endpoint"] __logging.info( - f"Received DID request {did_request}", extra={"requestId": request_id} + f"Received DID request {did_request}", + extra={"dataset_id": dataset_id} ) - servicex = ServiceXAdapter(did_request["service-endpoint"], file_prefix) - servicex.post_status_update("DID Request received") + servicex = ServiceXAdapter(dataset_id=dataset_id, + endpoint=endpoint) info = { - "request-id": request_id, + "dataset-id": dataset_id, } # Process the request and resolve the DID @@ -152,16 +143,12 @@ def rabbit_mq_callback( except Exception as e: _, exec_value, _ = sys.exc_info() - __logging.exception("DID Request Failed", extra={"requestId": request_id}) - servicex.post_status_update( - f"DID Request Failed for id {request_id}: " f"{str(e)} - {exec_value}", - severity="fatal", - ) + __logging.exception(f"DID Request Failed {str(e)}", extra={"dataset_id": dataset_id}) raise except Exception as e: __logging.exception( - f"DID request failed {str(e)}", extra={"requestId": request_id} + f"DID request failed {str(e)}", extra={"dataset_id": dataset_id} ) finally: @@ -174,7 +161,6 @@ def init_rabbit_mq( queue_name: str, retries: int, retry_interval: float, - file_prefix: str = None, ): # type: ignore rabbitmq = None retry_count = 0 @@ -191,7 +177,7 @@ def init_rabbit_mq( queue=queue_name, auto_ack=False, on_message_callback=lambda c, m, p, b: rabbit_mq_callback( - user_callback, c, m, p, b, file_prefix + user_callback, c, m, p, b ), ) _channel.start_consuming() diff --git a/src/servicex_did_finder_lib/did_logging.py b/src/servicex_did_finder_lib/did_logging.py index 4884beb..553ddb7 100644 --- a/src/servicex_did_finder_lib/did_logging.py +++ b/src/servicex_did_finder_lib/did_logging.py @@ -4,10 +4,10 @@ class DIDFormatter(logging.Formatter): """ - Need a customer formatter to allow for logging with request ids that vary. - Normally log messages are "level instance component request_id msg" and - request_id gets set by initialize_logging but we need a handler that'll let - us pass in the request id and have that embedded in the log message + Need a customer formatter to allow for logging with dataset ids that vary. + Normally log messages are "level instance component dataset_id msg" and + dataset_id gets set by initialize_logging but we need a handler that'll let + us pass in the dataset id and have that embedded in the log message """ def format(self, record: logging.LogRecord) -> str: @@ -18,10 +18,10 @@ def format(self, record: logging.LogRecord) -> str: :return: formatted log message """ - if hasattr(record, "requestId"): + if hasattr(record, "datasetId"): return super().format(record) else: - setattr(record, "requestId", None) + setattr(record, "datasetId", None) return super().format(record) @@ -36,7 +36,7 @@ def initialize_root_logger(did_scheme: str): instance = os.environ.get('INSTANCE_NAME', 'Unknown') formatter = DIDFormatter('%(levelname)s ' + f"{instance} {did_scheme}_did_finder " + - '%(requestId)s %(message)s') + '%(datasetId)s %(message)s') handler = logging.StreamHandler() handler.setFormatter(formatter) handler.setLevel(logging.INFO) diff --git a/src/servicex_did_finder_lib/servicex_adaptor.py b/src/servicex_did_finder_lib/servicex_adaptor.py index 7d8e992..478b319 100644 --- a/src/servicex_did_finder_lib/servicex_adaptor.py +++ b/src/servicex_did_finder_lib/servicex_adaptor.py @@ -35,40 +35,17 @@ class ServiceXAdapter: - def __init__(self, endpoint, file_prefix=None): + def __init__(self, endpoint, dataset_id): self.endpoint = endpoint - self.file_prefix = file_prefix + self.dataset_id = dataset_id self.logger = logging.getLogger(__name__) self.logger.addHandler(logging.NullHandler()) - def post_status_update(self, status_msg, severity="info"): - success = False - attempts = 0 - while not success and attempts < MAX_RETRIES: - try: - requests.post(self.endpoint + "/status", data={ - "timestamp": datetime.now().isoformat(), - "source": "DID Finder", - "severity": severity, - "info": status_msg - }) - success = True - except requests.exceptions.ConnectionError: - self.logger.exception(f'Connection error to ServiceX App. Will retry ' - f'(try {attempts} out of {MAX_RETRIES}') - attempts += 1 - if not success: - self.logger.error(f'After {attempts} tries, failed to send ServiceX App a status ' - f'message: {str(status_msg)} - Ignoring error.') - - def _prefix_file(self, file_path): - return file_path if not self.file_prefix else self.file_prefix+file_path - def _create_json(self, file_info): return { "timestamp": datetime.now().isoformat(), - "paths": [self._prefix_file(fp) for fp in file_info['paths']], + "paths": file_info['paths'], 'adler32': file_info['adler32'], 'file_size': file_info['file_size'], 'file_events': file_info['file_events'] @@ -80,8 +57,8 @@ def put_file_add(self, file_info): while not success and attempts < MAX_RETRIES: try: mesg = self._create_json(file_info) - requests.put(self.endpoint + "/files", json=mesg) - self.logger.info(f"Metric: {json.dumps(mesg)}") + requests.put(f"{self.endpoint}{self.dataset_id}/files", json=mesg) + self.logger.info("adding file:", extra=file_info) success = True except requests.exceptions.ConnectionError: self.logger.exception(f'Connection error to ServiceX App. Will retry ' @@ -103,7 +80,7 @@ def put_file_add_bulk(self, file_list, chunk_length=30): mesg.append(self._create_json(fi)) while not success and attempts < MAX_RETRIES: try: - requests.put(self.endpoint + "/files", json=mesg) + requests.put(f"{self.endpoint}{self.dataset_id}/files", json=mesg) self.logger.info(f"Metric: {json.dumps(mesg)}") success = True except requests.exceptions.ConnectionError: @@ -114,27 +91,12 @@ def put_file_add_bulk(self, file_list, chunk_length=30): self.logger.error(f'After {attempts} tries, failed to send ServiceX App ' f'a put_file_bulk message: {mesg} - Ignoring error.') - def post_transform_start(self): - success = False - attempts = 0 - while not success and attempts < MAX_RETRIES: - try: - requests.post(self.endpoint + "/start") - success = True - except requests.exceptions.ConnectionError: - self.logger.exception(f'Connection error to ServiceX App. Will retry ' - f'(try {attempts} out of {MAX_RETRIES}') - attempts += 1 - if not success: - self.logger.error(f'After {attempts} tries, failed to send ServiceX App a ' - f'transform start message - Ignoring error.') - def put_fileset_complete(self, summary): success = False attempts = 0 while not success and attempts < MAX_RETRIES: try: - requests.put(self.endpoint + "/complete", json=summary) + requests.put(f"{self.endpoint}{self.dataset_id}/complete", json=summary) success = True except requests.exceptions.ConnectionError: self.logger.exception(f'Connection error to ServiceX App. Will retry ' diff --git a/tests/servicex_did_finder_lib/test_communication.py b/tests/servicex_did_finder_lib/test_communication.py index 010250e..13f8bc9 100644 --- a/tests/servicex_did_finder_lib/test_communication.py +++ b/tests/servicex_did_finder_lib/test_communication.py @@ -1,7 +1,7 @@ import argparse import json from typing import Any, AsyncGenerator, Dict, Optional -from unittest.mock import ANY, MagicMock, patch +from unittest.mock import MagicMock, patch import pika import pytest @@ -28,8 +28,8 @@ def send_did_request(self, did_name: str): body = json.dumps( { "did": did_name, - "request_id": "000-111-222-444", - "service-endpoint": "http://localhost:2334", + "dataset_id": "000-111-222-444", + "endpoint": "http://localhost:2334/", } ) @@ -314,7 +314,6 @@ async def my_callback( # Make sure the file was sent along, along with the completion SXAdaptor.put_file_add.assert_not_called() SXAdaptor.put_fileset_complete.assert_not_called() - SXAdaptor.post_status_update.assert_any_call(ANY, severity="fatal") def test_failed_file_after_good(rabbitmq, SXAdaptor): @@ -343,7 +342,6 @@ async def my_callback( # Make sure the file was sent along, along with the completion SXAdaptor.put_file_add.assert_called_once() SXAdaptor.put_fileset_complete.assert_not_called() - SXAdaptor.post_status_update.assert_any_call(ANY, severity="fatal") def test_failed_file_after_good_with_avail(rabbitmq, SXAdaptor): @@ -372,7 +370,6 @@ async def my_callback( # Make sure the file was sent along, along with the completion SXAdaptor.put_file_add.assert_called_once() SXAdaptor.put_fileset_complete.assert_called_once() - SXAdaptor.post_status_update.assert_any_call("Completed load of files in 0 seconds") def test_failed_file_after_good_with_avail_limited_number(rabbitmq, SXAdaptor): @@ -407,7 +404,6 @@ async def my_callback( # Make sure the file was sent along, along with the completion SXAdaptor.put_file_add_bulk.assert_called_once() SXAdaptor.put_fileset_complete.assert_called_once() - SXAdaptor.post_status_update.assert_any_call("Completed load of files in 0 seconds") def test_no_files_returned(rabbitmq, SXAdaptor): @@ -432,7 +428,6 @@ async def my_callback( SXAdaptor.put_file_add.assert_not_called() SXAdaptor.put_fileset_complete.assert_called_once() assert SXAdaptor.put_fileset_complete.call_args[0][0]["files"] == 0 - SXAdaptor.post_status_update.assert_any_call(ANY, severity="fatal") def test_rabbitmq_connection_failure(rabbitmq_fail_once, SXAdaptor): @@ -520,7 +515,6 @@ async def my_user_callback(did, info): yield v await run_file_fetch_loop("123-456", SXAdaptor, {}, my_user_callback) - SXAdaptor.post_transform_start.assert_called_once() assert SXAdaptor.put_file_add.call_count == 2 assert SXAdaptor.put_file_add.call_args_list[0][0][0]["paths"][0] == "/tmp/foo" @@ -532,8 +526,6 @@ async def my_user_callback(did, info): assert SXAdaptor.put_fileset_complete.call_args[0][0]["total-events"] == 192 assert SXAdaptor.put_fileset_complete.call_args[0][0]["total-bytes"] == 3070 - assert SXAdaptor.post_status_update.called_once() - @pytest.mark.asyncio async def test_run_file_bulk_fetch_loop(SXAdaptor, mocker): @@ -555,7 +547,6 @@ async def my_user_callback(did, info): yield return_values await run_file_fetch_loop("123-456", SXAdaptor, {}, my_user_callback) - SXAdaptor.post_transform_start.assert_called_once() assert SXAdaptor.put_file_add_bulk.call_count == 1 assert ( @@ -571,8 +562,6 @@ async def my_user_callback(did, info): assert SXAdaptor.put_fileset_complete.call_args[0][0]["total-events"] == 192 assert SXAdaptor.put_fileset_complete.call_args[0][0]["total-bytes"] == 3070 - assert SXAdaptor.post_status_update.called_once() - @pytest.mark.asyncio async def test_run_file_fetch_one(SXAdaptor, mocker): @@ -595,7 +584,6 @@ async def my_user_callback(did, info): yield v await run_file_fetch_loop("123-456?files=1", SXAdaptor, {}, my_user_callback) - SXAdaptor.post_transform_start.assert_called_once() assert SXAdaptor.put_file_add_bulk.call_count == 1 SXAdaptor.put_file_add_bulk.assert_called_with( @@ -611,7 +599,6 @@ async def my_user_callback(did, info): SXAdaptor.put_fileset_complete.assert_called_once assert SXAdaptor.put_fileset_complete.call_args[0][0]["files"] == 1 - assert SXAdaptor.post_status_update.called_once() @pytest.mark.asyncio @@ -637,7 +624,6 @@ async def my_user_callback(did, info): yield v await run_file_fetch_loop("123-456?files=1", SXAdaptor, {}, my_user_callback) - SXAdaptor.post_transform_start.assert_called_once() assert SXAdaptor.put_file_add_bulk.call_count == 1 SXAdaptor.put_file_add_bulk.assert_called_with( @@ -653,7 +639,6 @@ async def my_user_callback(did, info): SXAdaptor.put_fileset_complete.assert_called_once assert SXAdaptor.put_fileset_complete.call_args[0][0]["files"] == 1 - assert SXAdaptor.post_status_update.called_once() @pytest.mark.asyncio @@ -677,7 +662,6 @@ async def my_user_callback(did, info): yield v await run_file_fetch_loop("123-456?files=1", SXAdaptor, {}, my_user_callback) - SXAdaptor.post_transform_start.assert_called_once() assert SXAdaptor.put_file_add_bulk.call_count == 1 SXAdaptor.put_file_add_bulk.assert_called_with( @@ -693,7 +677,6 @@ async def my_user_callback(did, info): SXAdaptor.put_fileset_complete.assert_called_once assert SXAdaptor.put_fileset_complete.call_args[0][0]["files"] == 1 - assert SXAdaptor.post_status_update.called_once() @pytest.mark.asyncio @@ -707,10 +690,3 @@ async def my_user_callback(did, info): assert SXAdaptor.put_file_add.assert_not_called SXAdaptor.put_fileset_complete.assert_called_once() - - assert ( - SXAdaptor.post_status_update.call_args_list[0][0][0] - == "DID Finder found zero files for dataset 123-456" - ) - - assert SXAdaptor.post_status_update.call_args_list[0][1]["severity"] == "fatal" diff --git a/tests/servicex_did_finder_lib/test_servicex_did.py b/tests/servicex_did_finder_lib/test_servicex_did.py index a272c6c..6ce949b 100644 --- a/tests/servicex_did_finder_lib/test_servicex_did.py +++ b/tests/servicex_did_finder_lib/test_servicex_did.py @@ -11,8 +11,8 @@ def test_version(): @responses.activate def test_put_file_add(): - responses.add(responses.PUT, 'http://servicex.org/files', status=206) - sx = ServiceXAdapter("http://servicex.org") + responses.add(responses.PUT, 'http://servicex.org/12345/files', status=206) + sx = ServiceXAdapter("http://servicex.org/", '12345') sx.put_file_add({ 'paths': ['root://foo.bar.ROOT'], 'adler32': '32', @@ -30,8 +30,8 @@ def test_put_file_add(): @responses.activate def test_put_file_add_bulk(): - responses.add(responses.PUT, 'http://servicex.org/files', status=206) - sx = ServiceXAdapter("http://servicex.org") + responses.add(responses.PUT, 'http://servicex.org/12345/files', status=206) + sx = ServiceXAdapter("http://servicex.org/", '12345') sx.put_file_add_bulk([{ 'paths': ['root://foo.bar.ROOT'], 'adler32': '32', @@ -59,8 +59,8 @@ def test_put_file_add_bulk(): @responses.activate def test_put_file_add_bulk_large(): - responses.add(responses.PUT, 'http://servicex.org/files', status=206) - sx = ServiceXAdapter("http://servicex.org") + responses.add(responses.PUT, 'http://servicex.org/12345/files', status=206) + sx = ServiceXAdapter("http://servicex.org/", '12345') sx.put_file_add_bulk([{ 'paths': ['root://foo.bar.ROOT'], 'adler32': '32', @@ -68,33 +68,3 @@ def test_put_file_add_bulk_large(): 'file_events': 3141 }] * 32) assert len(responses.calls) == 2 - - -@responses.activate -def test_put_file_add_with_prefix(): - responses.add(responses.PUT, 'http://servicex.org/files', status=206) - sx = ServiceXAdapter("http://servicex.org", "xcache123:") - sx.put_file_add({ - 'paths': ['root://foo.bar.ROOT'], - 'adler32': '32', - 'file_size': 1024, - 'file_events': 3141 - }) - - assert len(responses.calls) == 1 - submitted = json.loads(responses.calls[0].request.body) - assert submitted['paths'][0] == 'xcache123:root://foo.bar.ROOT' - assert submitted['adler32'] == '32' - assert submitted['file_events'] == 3141 - assert submitted['file_size'] == 1024 - - -@responses.activate -def test_post_transform_start(): - responses.add(responses.POST, - 'http://servicex.org/servicex/internal/transformation/123-456/start', - status=206) - - sx = ServiceXAdapter("http://servicex.org/servicex/internal/transformation/123-456") - sx.post_transform_start() - assert len(responses.calls) == 1