From 0e072be7a8be40037fd810b8df25b360b4a01609 Mon Sep 17 00:00:00 2001 From: Ilija Vukotic Date: Wed, 4 Oct 2023 11:44:46 -0500 Subject: [PATCH] big cleanup. no request_did, no prefix --- README.md | 9 ++--- src/servicex_did_finder_lib/communication.py | 29 ++++------------ src/servicex_did_finder_lib/did_logging.py | 14 ++++---- .../servicex_adaptor.py | 34 ++----------------- .../test_communication.py | 22 +----------- .../test_servicex_did.py | 20 ----------- 6 files changed, 20 insertions(+), 108 deletions(-) diff --git a/README.md b/README.md index bbc0017..52abd62 100644 --- a/README.md +++ b/README.md @@ -28,8 +28,7 @@ Create an async callback method that `yield`s file info dictionaries. For exampl The arguments to the method are straight forward: * `did_name`: the name of the DID that you should look up. It has the schema stripped off (e.g. if the user sent ServiceX `rucio://dataset_name_in_rucio`, then `did_name` will be `dataset_name_in_rucio`) -* `info` contains a dict of various info about the request that asked for this DID: - * `request-id` The request id that has this DID associated. For logging. +* `info` contains a dict of various info about the request that asked for this DID. Yield the results as you find them - ServiceX will actually start processing the files before your DID lookup is finished if you do this. The fields you need to pass back to the library are as follows: @@ -114,7 +113,7 @@ In the end, all DID finders for ServiceX will run under Kubernetes. ServiceX com __log = logger.getLogger(__name__) async def my_callback(did_name: str, info: Dict[str, Any]): __log.info(f'Looking up dataset {did_name}.', - extra={'requestId': info['request-id']}) + extra={'somethign': info['something']}) for i in range(0, 10): yield { @@ -125,9 +124,7 @@ In the end, all DID finders for ServiceX will run under Kubernetes. ServiceX com } ``` -Note the parameter `request-id`: this marks the log messages with the request id that triggered this DID request. This will enable the system to track all log messages across all containers connected with this particular request id - making debugging a lot easier. - -The `start_did_finder` will configure the python root logger properly to dump messages with a request ID in them. +The `start_did_finder` will configure the python root logger properly. ## URI Format diff --git a/src/servicex_did_finder_lib/communication.py b/src/servicex_did_finder_lib/communication.py index 2278ad5..b664cb4 100644 --- a/src/servicex_did_finder_lib/communication.py +++ b/src/servicex_did_finder_lib/communication.py @@ -91,12 +91,6 @@ async def run_file_fetch_loop( # If we've been holding onto any files, we need to send them now. acc.send_on(did_info.file_count) - # Simple error checking and reporting - if summary.file_count == 0: - servicex.post_status_update( - f"DID Finder found zero files for dataset {did}", severity="fatal" - ) - elapsed_time = int((datetime.now() - start_time).total_seconds()) servicex.put_fileset_complete( { @@ -107,11 +101,10 @@ async def run_file_fetch_loop( "elapsed-time": elapsed_time, } ) - servicex.post_status_update(f"Completed load of files in {elapsed_time} seconds") def rabbit_mq_callback( - user_callback: UserDIDHandler, channel, method, properties, body, file_prefix=None + user_callback: UserDIDHandler, channel, method, properties, body ): """rabbit_mq_callback Respond to RabbitMQ Message @@ -125,25 +118,20 @@ def rabbit_mq_callback( method ([type]): Delivery method properties ([type]): Properties of the message body ([type]): The body (json for us) of the message - file_prefix([str]): Prefix to put in front of file paths to enable use of Cache service """ dataset_id = None # set this in case we get an exception while loading request - request_id = None try: # Unpack the message. Really bad if we fail up here! did_request = json.loads(body) did = did_request["did"] dataset_id = did_request["dataset_id"] - request_id = did_request["request_id"] endpoint = did_request["endpoint"] __logging.info( f"Received DID request {did_request}", - extra={"request_id": request_id, "dataset_id": dataset_id} + extra={"dataset_id": dataset_id} ) - servicex = ServiceXAdapter(request_id=request_id, - dataset_id=dataset_id, - endpoint=endpoint, - file_prefix=file_prefix) + servicex = ServiceXAdapter(dataset_id=dataset_id, + endpoint=endpoint) info = { "dataset-id": dataset_id, @@ -155,11 +143,7 @@ def rabbit_mq_callback( except Exception as e: _, exec_value, _ = sys.exc_info() - __logging.exception("DID Request Failed", extra={"dataset_id": dataset_id}) - servicex.post_status_update( - f"DID Request Failed for id {dataset_id}: " f"{str(e)} - {exec_value}", - severity="fatal", - ) + __logging.exception(f"DID Request Failed {str(e)}", extra={"dataset_id": dataset_id}) raise except Exception as e: @@ -177,7 +161,6 @@ def init_rabbit_mq( queue_name: str, retries: int, retry_interval: float, - file_prefix: str = None, ): # type: ignore rabbitmq = None retry_count = 0 @@ -194,7 +177,7 @@ def init_rabbit_mq( queue=queue_name, auto_ack=False, on_message_callback=lambda c, m, p, b: rabbit_mq_callback( - user_callback, c, m, p, b, file_prefix + user_callback, c, m, p, b ), ) _channel.start_consuming() diff --git a/src/servicex_did_finder_lib/did_logging.py b/src/servicex_did_finder_lib/did_logging.py index 4884beb..553ddb7 100644 --- a/src/servicex_did_finder_lib/did_logging.py +++ b/src/servicex_did_finder_lib/did_logging.py @@ -4,10 +4,10 @@ class DIDFormatter(logging.Formatter): """ - Need a customer formatter to allow for logging with request ids that vary. - Normally log messages are "level instance component request_id msg" and - request_id gets set by initialize_logging but we need a handler that'll let - us pass in the request id and have that embedded in the log message + Need a customer formatter to allow for logging with dataset ids that vary. + Normally log messages are "level instance component dataset_id msg" and + dataset_id gets set by initialize_logging but we need a handler that'll let + us pass in the dataset id and have that embedded in the log message """ def format(self, record: logging.LogRecord) -> str: @@ -18,10 +18,10 @@ def format(self, record: logging.LogRecord) -> str: :return: formatted log message """ - if hasattr(record, "requestId"): + if hasattr(record, "datasetId"): return super().format(record) else: - setattr(record, "requestId", None) + setattr(record, "datasetId", None) return super().format(record) @@ -36,7 +36,7 @@ def initialize_root_logger(did_scheme: str): instance = os.environ.get('INSTANCE_NAME', 'Unknown') formatter = DIDFormatter('%(levelname)s ' + f"{instance} {did_scheme}_did_finder " + - '%(requestId)s %(message)s') + '%(datasetId)s %(message)s') handler = logging.StreamHandler() handler.setFormatter(formatter) handler.setLevel(logging.INFO) diff --git a/src/servicex_did_finder_lib/servicex_adaptor.py b/src/servicex_did_finder_lib/servicex_adaptor.py index 33d67ed..478b319 100644 --- a/src/servicex_did_finder_lib/servicex_adaptor.py +++ b/src/servicex_did_finder_lib/servicex_adaptor.py @@ -35,45 +35,17 @@ class ServiceXAdapter: - def __init__(self, endpoint, dataset_id, request_id=None, file_prefix=None): + def __init__(self, endpoint, dataset_id): self.endpoint = endpoint - self.request_id = request_id self.dataset_id = dataset_id - self.file_prefix = file_prefix self.logger = logging.getLogger(__name__) self.logger.addHandler(logging.NullHandler()) - # TODO - remove. did finder should not know about request_id and call any - # endpoint with request_id. - def post_status_update(self, status_msg, severity="info"): - success = False - attempts = 0 - while not success and attempts < MAX_RETRIES: - try: - requests.post(f"{self.endpoint}{self.request_id}/status", data={ - "timestamp": datetime.now().isoformat(), - "source": "DID Finder", - "severity": severity, - "info": status_msg - }) - success = True - except requests.exceptions.ConnectionError: - self.logger.exception(f'Connection error to ServiceX App. Will retry ' - f'(try {attempts} out of {MAX_RETRIES}') - attempts += 1 - if not success: - self.logger.error(f'After {attempts} tries, failed to send ServiceX App a status ' - f'message: {str(status_msg)} - Ignoring error.') - - # TODO - remove - def _prefix_file(self, file_path): - return file_path if not self.file_prefix else self.file_prefix+file_path - def _create_json(self, file_info): return { "timestamp": datetime.now().isoformat(), - "paths": [self._prefix_file(fp) for fp in file_info['paths']], + "paths": file_info['paths'], 'adler32': file_info['adler32'], 'file_size': file_info['file_size'], 'file_events': file_info['file_events'] @@ -86,7 +58,7 @@ def put_file_add(self, file_info): try: mesg = self._create_json(file_info) requests.put(f"{self.endpoint}{self.dataset_id}/files", json=mesg) - self.logger.info(f"Metric: {json.dumps(mesg)}") + self.logger.info("adding file:", extra=file_info) success = True except requests.exceptions.ConnectionError: self.logger.exception(f'Connection error to ServiceX App. Will retry ' diff --git a/tests/servicex_did_finder_lib/test_communication.py b/tests/servicex_did_finder_lib/test_communication.py index e35dd66..13f8bc9 100644 --- a/tests/servicex_did_finder_lib/test_communication.py +++ b/tests/servicex_did_finder_lib/test_communication.py @@ -1,7 +1,7 @@ import argparse import json from typing import Any, AsyncGenerator, Dict, Optional -from unittest.mock import ANY, MagicMock, patch +from unittest.mock import MagicMock, patch import pika import pytest @@ -27,7 +27,6 @@ def send_did_request(self, did_name: str): properties = MagicMock() body = json.dumps( { - "request_id": '123-456', "did": did_name, "dataset_id": "000-111-222-444", "endpoint": "http://localhost:2334/", @@ -315,7 +314,6 @@ async def my_callback( # Make sure the file was sent along, along with the completion SXAdaptor.put_file_add.assert_not_called() SXAdaptor.put_fileset_complete.assert_not_called() - SXAdaptor.post_status_update.assert_any_call(ANY, severity="fatal") def test_failed_file_after_good(rabbitmq, SXAdaptor): @@ -344,7 +342,6 @@ async def my_callback( # Make sure the file was sent along, along with the completion SXAdaptor.put_file_add.assert_called_once() SXAdaptor.put_fileset_complete.assert_not_called() - SXAdaptor.post_status_update.assert_any_call(ANY, severity="fatal") def test_failed_file_after_good_with_avail(rabbitmq, SXAdaptor): @@ -373,7 +370,6 @@ async def my_callback( # Make sure the file was sent along, along with the completion SXAdaptor.put_file_add.assert_called_once() SXAdaptor.put_fileset_complete.assert_called_once() - SXAdaptor.post_status_update.assert_any_call("Completed load of files in 0 seconds") def test_failed_file_after_good_with_avail_limited_number(rabbitmq, SXAdaptor): @@ -408,7 +404,6 @@ async def my_callback( # Make sure the file was sent along, along with the completion SXAdaptor.put_file_add_bulk.assert_called_once() SXAdaptor.put_fileset_complete.assert_called_once() - SXAdaptor.post_status_update.assert_any_call("Completed load of files in 0 seconds") def test_no_files_returned(rabbitmq, SXAdaptor): @@ -433,7 +428,6 @@ async def my_callback( SXAdaptor.put_file_add.assert_not_called() SXAdaptor.put_fileset_complete.assert_called_once() assert SXAdaptor.put_fileset_complete.call_args[0][0]["files"] == 0 - SXAdaptor.post_status_update.assert_any_call(ANY, severity="fatal") def test_rabbitmq_connection_failure(rabbitmq_fail_once, SXAdaptor): @@ -532,8 +526,6 @@ async def my_user_callback(did, info): assert SXAdaptor.put_fileset_complete.call_args[0][0]["total-events"] == 192 assert SXAdaptor.put_fileset_complete.call_args[0][0]["total-bytes"] == 3070 - assert SXAdaptor.post_status_update.called_once() - @pytest.mark.asyncio async def test_run_file_bulk_fetch_loop(SXAdaptor, mocker): @@ -570,8 +562,6 @@ async def my_user_callback(did, info): assert SXAdaptor.put_fileset_complete.call_args[0][0]["total-events"] == 192 assert SXAdaptor.put_fileset_complete.call_args[0][0]["total-bytes"] == 3070 - assert SXAdaptor.post_status_update.called_once() - @pytest.mark.asyncio async def test_run_file_fetch_one(SXAdaptor, mocker): @@ -609,7 +599,6 @@ async def my_user_callback(did, info): SXAdaptor.put_fileset_complete.assert_called_once assert SXAdaptor.put_fileset_complete.call_args[0][0]["files"] == 1 - assert SXAdaptor.post_status_update.called_once() @pytest.mark.asyncio @@ -650,7 +639,6 @@ async def my_user_callback(did, info): SXAdaptor.put_fileset_complete.assert_called_once assert SXAdaptor.put_fileset_complete.call_args[0][0]["files"] == 1 - assert SXAdaptor.post_status_update.called_once() @pytest.mark.asyncio @@ -689,7 +677,6 @@ async def my_user_callback(did, info): SXAdaptor.put_fileset_complete.assert_called_once assert SXAdaptor.put_fileset_complete.call_args[0][0]["files"] == 1 - assert SXAdaptor.post_status_update.called_once() @pytest.mark.asyncio @@ -703,10 +690,3 @@ async def my_user_callback(did, info): assert SXAdaptor.put_file_add.assert_not_called SXAdaptor.put_fileset_complete.assert_called_once() - - assert ( - SXAdaptor.post_status_update.call_args_list[0][0][0] - == "DID Finder found zero files for dataset 123-456" - ) - - assert SXAdaptor.post_status_update.call_args_list[0][1]["severity"] == "fatal" diff --git a/tests/servicex_did_finder_lib/test_servicex_did.py b/tests/servicex_did_finder_lib/test_servicex_did.py index bf968bf..6ce949b 100644 --- a/tests/servicex_did_finder_lib/test_servicex_did.py +++ b/tests/servicex_did_finder_lib/test_servicex_did.py @@ -68,23 +68,3 @@ def test_put_file_add_bulk_large(): 'file_events': 3141 }] * 32) assert len(responses.calls) == 2 - - -@responses.activate -def test_put_file_add_with_prefix(): - - responses.add(responses.PUT, 'http://servicex.org/12345/files', status=206) - sx = ServiceXAdapter("http://servicex.org/", '12345', file_prefix="xcache123:") - sx.put_file_add({ - 'paths': ['root://foo.bar.ROOT'], - 'adler32': '32', - 'file_size': 1024, - 'file_events': 3141 - }) - - assert len(responses.calls) == 1 - submitted = json.loads(responses.calls[0].request.body) - assert submitted['paths'][0] == 'xcache123:root://foo.bar.ROOT' - assert submitted['adler32'] == '32' - assert submitted['file_events'] == 3141 - assert submitted['file_size'] == 1024