Skip to content

Commit

Permalink
Merge branch 'big-db-change-2' (#28)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivukotic authored Jan 11, 2024
1 parent de0e8cd commit dca0ef9
Show file tree
Hide file tree
Showing 9 changed files with 56 additions and 162 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,20 @@ jobs:
test:
strategy:
matrix:
python-version: [3.7, 3.8, 3.9]
python-version: [3.8, 3.9]
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@master
- uses: actions/checkout@v4.0.0
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
uses: actions/setup-python@v4.7.0
with:
python-version: ${{ matrix.python-version }}
- name: Install poetry
uses: Gr1N/setup-poetry@v7
uses: Gr1N/setup-poetry@v8
- name: Install dependencies
run: |
poetry install
poetry install --with dev
pip list
- name: Lint with Flake8
run: |
Expand All @@ -36,7 +36,7 @@ jobs:
poetry run coverage xml
- name: Report coverage using codecov
if: github.event_name == 'push' && matrix.python-version == 3.8
uses: codecov/codecov-action@v1
uses: codecov/codecov-action@v3.1.4
with:
file: ./coverage.xml # optional
flags: unittests # optional
8 changes: 4 additions & 4 deletions .github/workflows/pypi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,20 @@ jobs:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v4.0.0
- name: Set up Python
uses: actions/setup-python@v2
uses: actions/setup-python@v4.7.0
with:
python-version: 3.8
- name: Get the version
id: get_version
run: echo ::set-output name=VERSION::${GITHUB_REF#refs/tags/}
- name: Install poetry
uses: Gr1N/setup-poetry@v7
uses: Gr1N/setup-poetry@v8
- name: Build
run: |
poetry version ${{ steps.get_version.outputs.VERSION }}
- name: Build and publish to pypi
uses: JRubics/poetry-publish@v1.6
with:
pypi_token: ${{ secrets.pypi_password_sx_did_lib }}
pypi_token: ${{ secrets.pypi_password_sx_did_lib }}
9 changes: 3 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ Create an async callback method that `yield`s file info dictionaries. For exampl
The arguments to the method are straight forward:

* `did_name`: the name of the DID that you should look up. It has the schema stripped off (e.g. if the user sent ServiceX `rucio://dataset_name_in_rucio`, then `did_name` will be `dataset_name_in_rucio`)
* `info` contains a dict of various info about the request that asked for this DID:
* `request-id` The request id that has this DID associated. For logging.
* `info` contains a dict of various info about the request that asked for this DID.

Yield the results as you find them - ServiceX will actually start processing the files before your DID lookup is finished if you do this. The fields you need to pass back to the library are as follows:

Expand Down Expand Up @@ -114,7 +113,7 @@ In the end, all DID finders for ServiceX will run under Kubernetes. ServiceX com
__log = logger.getLogger(__name__)
async def my_callback(did_name: str, info: Dict[str, Any]):
__log.info(f'Looking up dataset {did_name}.',
extra={'requestId': info['request-id']})
extra={'somethign': info['something']})

for i in range(0, 10):
yield {
Expand All @@ -125,9 +124,7 @@ In the end, all DID finders for ServiceX will run under Kubernetes. ServiceX com
}
```

Note the parameter `request-id`: this marks the log messages with the request id that triggered this DID request. This will enable the system to track all log messages across all containers connected with this particular request id - making debugging a lot easier.

The `start_did_finder` will configure the python root logger properly to dump messages with a request ID in them.
The `start_did_finder` will configure the python root logger properly.

## URI Format

Expand Down
9 changes: 6 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ description = "ServiceX DID Library Routines"
authors = ["Gordon Watts <gwatts@uw.edu>"]

[tool.poetry.dependencies]
python = "^3.6"
python = "^3.8"
pika = "1.1.0"
make-it-sync = "^1.0.0"
requests = "^2.25.0"

[tool.poetry.dev-dependencies]
pytest = "^5.2"
[tool.poetry.group.dev]
optional = true

[tool.poetry.group.dev.dependencies]
pytest = "^7.4"
flake8 = "^3.9.1"
pytest-mock = "^3.5.1"
coverage = "^5.5"
Expand Down
42 changes: 14 additions & 28 deletions src/servicex_did_finder_lib/communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
__logging = logging.getLogger(__name__)
__logging.addHandler(logging.NullHandler())

# TODO: get rid of different modes. It should always be batch.


class _accumulator:
"Track or cache files depending on the mode we are operating in"
Expand All @@ -42,8 +44,6 @@ def add(self, file_info: Dict[str, Any]):
self._file_cache.append(file_info)
else:
self._summary.add_file(file_info)
if self._summary.file_count == 1:
self._servicex.post_transform_start()
self._servicex.put_file_add(file_info)

def send_on(self, count):
Expand All @@ -59,8 +59,6 @@ def send_bulk(self, file_list: List[Dict[str, Any]]):
for f in file_list:
self.add(f)
else:
if self._summary.file_count == 0:
self._servicex.post_transform_start()
for ifl in file_list:
self._summary.add_file(ifl)
self._servicex.put_file_add_bulk(file_list)
Expand Down Expand Up @@ -93,12 +91,6 @@ async def run_file_fetch_loop(
# If we've been holding onto any files, we need to send them now.
acc.send_on(did_info.file_count)

# Simple error checking and reporting
if summary.file_count == 0:
servicex.post_status_update(
f"DID Finder found zero files for dataset {did}", severity="fatal"
)

elapsed_time = int((datetime.now() - start_time).total_seconds())
servicex.put_fileset_complete(
{
Expand All @@ -110,11 +102,9 @@ async def run_file_fetch_loop(
}
)

servicex.post_status_update(f"Completed load of files in {elapsed_time} seconds")


def rabbit_mq_callback(
user_callback: UserDIDHandler, channel, method, properties, body, file_prefix=None
user_callback: UserDIDHandler, channel, method, properties, body
):
"""rabbit_mq_callback Respond to RabbitMQ Message
Expand All @@ -128,22 +118,23 @@ def rabbit_mq_callback(
method ([type]): Delivery method
properties ([type]): Properties of the message
body ([type]): The body (json for us) of the message
file_prefix([str]): Prefix to put in front of file paths to enable use of Cache service
"""
request_id = None # set this in case we get an exception while loading request
dataset_id = None # set this in case we get an exception while loading request
try:
# Unpack the message. Really bad if we fail up here!
did_request = json.loads(body)
did = did_request["did"]
request_id = did_request["request_id"]
dataset_id = did_request["dataset_id"]
endpoint = did_request["endpoint"]
__logging.info(
f"Received DID request {did_request}", extra={"requestId": request_id}
f"Received DID request {did_request}",
extra={"dataset_id": dataset_id}
)
servicex = ServiceXAdapter(did_request["service-endpoint"], file_prefix)
servicex.post_status_update("DID Request received")
servicex = ServiceXAdapter(dataset_id=dataset_id,
endpoint=endpoint)

info = {
"request-id": request_id,
"dataset-id": dataset_id,
}

# Process the request and resolve the DID
Expand All @@ -152,16 +143,12 @@ def rabbit_mq_callback(

except Exception as e:
_, exec_value, _ = sys.exc_info()
__logging.exception("DID Request Failed", extra={"requestId": request_id})
servicex.post_status_update(
f"DID Request Failed for id {request_id}: " f"{str(e)} - {exec_value}",
severity="fatal",
)
__logging.exception(f"DID Request Failed {str(e)}", extra={"dataset_id": dataset_id})
raise

except Exception as e:
__logging.exception(
f"DID request failed {str(e)}", extra={"requestId": request_id}
f"DID request failed {str(e)}", extra={"dataset_id": dataset_id}
)

finally:
Expand All @@ -174,7 +161,6 @@ def init_rabbit_mq(
queue_name: str,
retries: int,
retry_interval: float,
file_prefix: str = None,
): # type: ignore
rabbitmq = None
retry_count = 0
Expand All @@ -191,7 +177,7 @@ def init_rabbit_mq(
queue=queue_name,
auto_ack=False,
on_message_callback=lambda c, m, p, b: rabbit_mq_callback(
user_callback, c, m, p, b, file_prefix
user_callback, c, m, p, b
),
)
_channel.start_consuming()
Expand Down
14 changes: 7 additions & 7 deletions src/servicex_did_finder_lib/did_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@

class DIDFormatter(logging.Formatter):
"""
Need a customer formatter to allow for logging with request ids that vary.
Normally log messages are "level instance component request_id msg" and
request_id gets set by initialize_logging but we need a handler that'll let
us pass in the request id and have that embedded in the log message
Need a customer formatter to allow for logging with dataset ids that vary.
Normally log messages are "level instance component dataset_id msg" and
dataset_id gets set by initialize_logging but we need a handler that'll let
us pass in the dataset id and have that embedded in the log message
"""

def format(self, record: logging.LogRecord) -> str:
Expand All @@ -18,10 +18,10 @@ def format(self, record: logging.LogRecord) -> str:
:return: formatted log message
"""

if hasattr(record, "requestId"):
if hasattr(record, "datasetId"):
return super().format(record)
else:
setattr(record, "requestId", None)
setattr(record, "datasetId", None)
return super().format(record)


Expand All @@ -36,7 +36,7 @@ def initialize_root_logger(did_scheme: str):
instance = os.environ.get('INSTANCE_NAME', 'Unknown')
formatter = DIDFormatter('%(levelname)s ' +
f"{instance} {did_scheme}_did_finder " +
'%(requestId)s %(message)s')
'%(datasetId)s %(message)s')
handler = logging.StreamHandler()
handler.setFormatter(formatter)
handler.setLevel(logging.INFO)
Expand Down
52 changes: 7 additions & 45 deletions src/servicex_did_finder_lib/servicex_adaptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,40 +35,17 @@


class ServiceXAdapter:
def __init__(self, endpoint, file_prefix=None):
def __init__(self, endpoint, dataset_id):
self.endpoint = endpoint
self.file_prefix = file_prefix
self.dataset_id = dataset_id

self.logger = logging.getLogger(__name__)
self.logger.addHandler(logging.NullHandler())

def post_status_update(self, status_msg, severity="info"):
success = False
attempts = 0
while not success and attempts < MAX_RETRIES:
try:
requests.post(self.endpoint + "/status", data={
"timestamp": datetime.now().isoformat(),
"source": "DID Finder",
"severity": severity,
"info": status_msg
})
success = True
except requests.exceptions.ConnectionError:
self.logger.exception(f'Connection error to ServiceX App. Will retry '
f'(try {attempts} out of {MAX_RETRIES}')
attempts += 1
if not success:
self.logger.error(f'After {attempts} tries, failed to send ServiceX App a status '
f'message: {str(status_msg)} - Ignoring error.')

def _prefix_file(self, file_path):
return file_path if not self.file_prefix else self.file_prefix+file_path

def _create_json(self, file_info):
return {
"timestamp": datetime.now().isoformat(),
"paths": [self._prefix_file(fp) for fp in file_info['paths']],
"paths": file_info['paths'],
'adler32': file_info['adler32'],
'file_size': file_info['file_size'],
'file_events': file_info['file_events']
Expand All @@ -80,8 +57,8 @@ def put_file_add(self, file_info):
while not success and attempts < MAX_RETRIES:
try:
mesg = self._create_json(file_info)
requests.put(self.endpoint + "/files", json=mesg)
self.logger.info(f"Metric: {json.dumps(mesg)}")
requests.put(f"{self.endpoint}{self.dataset_id}/files", json=mesg)
self.logger.info("adding file:", extra=file_info)
success = True
except requests.exceptions.ConnectionError:
self.logger.exception(f'Connection error to ServiceX App. Will retry '
Expand All @@ -103,7 +80,7 @@ def put_file_add_bulk(self, file_list, chunk_length=30):
mesg.append(self._create_json(fi))
while not success and attempts < MAX_RETRIES:
try:
requests.put(self.endpoint + "/files", json=mesg)
requests.put(f"{self.endpoint}{self.dataset_id}/files", json=mesg)
self.logger.info(f"Metric: {json.dumps(mesg)}")
success = True
except requests.exceptions.ConnectionError:
Expand All @@ -114,27 +91,12 @@ def put_file_add_bulk(self, file_list, chunk_length=30):
self.logger.error(f'After {attempts} tries, failed to send ServiceX App '
f'a put_file_bulk message: {mesg} - Ignoring error.')

def post_transform_start(self):
success = False
attempts = 0
while not success and attempts < MAX_RETRIES:
try:
requests.post(self.endpoint + "/start")
success = True
except requests.exceptions.ConnectionError:
self.logger.exception(f'Connection error to ServiceX App. Will retry '
f'(try {attempts} out of {MAX_RETRIES}')
attempts += 1
if not success:
self.logger.error(f'After {attempts} tries, failed to send ServiceX App a '
f'transform start message - Ignoring error.')

def put_fileset_complete(self, summary):
success = False
attempts = 0
while not success and attempts < MAX_RETRIES:
try:
requests.put(self.endpoint + "/complete", json=summary)
requests.put(f"{self.endpoint}{self.dataset_id}/complete", json=summary)
success = True
except requests.exceptions.ConnectionError:
self.logger.exception(f'Connection error to ServiceX App. Will retry '
Expand Down
Loading

0 comments on commit dca0ef9

Please sign in to comment.