diff --git a/actions/tag_ulog/Dockerfile b/actions/tag_ulog/Dockerfile index d6d5b29..25b74db 100644 --- a/actions/tag_ulog/Dockerfile +++ b/actions/tag_ulog/Dockerfile @@ -1,5 +1,5 @@ ARG PYTHON_MAJOR=3 -ARG PYTHON_MINOR=10 +ARG PYTHON_MINOR=12 ARG OS_VARIANT=slim-bookworm FROM --platform=linux/amd64 public.ecr.aws/docker/library/python:${PYTHON_MAJOR}.${PYTHON_MINOR}-${OS_VARIANT} diff --git a/actions/tag_ulog/action.json b/actions/tag_ulog/action.json index 72843da..9cefe56 100644 --- a/actions/tag_ulog/action.json +++ b/actions/tag_ulog/action.json @@ -1,7 +1,7 @@ { "name": "tag_ulog", - "short_description": "Add tags to a dataset for any error messages found in ULog files.", - "description": "This Action adds tags to a dataset by scanning ULog files (.ulg) for any ERROR messages. Under the hood, it uses the [pyulog](https://github.com/PX4/pyulog) library.", + "short_description": "Add error message tags to ULog files.", + "description": "This Action adds tags to ULog files (.ulg) for any ERROR messages. Under the hood, it uses the [pyulog](https://github.com/PX4/pyulog) library.", "compute_requirements": { "vCPU": 512, diff --git a/actions/tag_ulog/requirements.runtime.txt b/actions/tag_ulog/requirements.runtime.txt index b732b9e..32c2f8d 100644 --- a/actions/tag_ulog/requirements.runtime.txt +++ b/actions/tag_ulog/requirements.runtime.txt @@ -1,3 +1,3 @@ # Python packages to install within the Docker image associated with this Action. -roboto==0.2.16 +roboto==0.6.1 pyulog==1.0.2 diff --git a/actions/tag_ulog/scripts/deploy.sh b/actions/tag_ulog/scripts/deploy.sh index d8504f8..ae2f470 100755 --- a/actions/tag_ulog/scripts/deploy.sh +++ b/actions/tag_ulog/scripts/deploy.sh @@ -5,6 +5,10 @@ set -euo pipefail SCRIPTS_ROOT=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd) PACKAGE_ROOT=$(dirname "${SCRIPTS_ROOT}") +# Build image and run unit tests +echo "Building image..." +$PACKAGE_ROOT/scripts/build.sh + # Early exit if virtual environment does not exist and/or roboto is not yet installed if [ ! -f "$PACKAGE_ROOT/.venv/bin/roboto" ]; then echo "Virtual environment with roboto CLI does not exist. Please run ./scripts/setup.sh first." diff --git a/actions/tag_ulog/scripts/integration_test.py b/actions/tag_ulog/scripts/integration_test.py new file mode 100644 index 0000000..5fb79bf --- /dev/null +++ b/actions/tag_ulog/scripts/integration_test.py @@ -0,0 +1,63 @@ +import subprocess +from pathlib import Path +# Run the build script +subprocess.run(["./scripts/deploy.sh", "roboto-public"], check=True) + +import logging +import pathlib +from roboto.domain import datasets, actions, topics +from roboto.waiters import wait_for +from roboto.association import Association, AssociationType + +# Configure logging +logging.basicConfig(level=logging.INFO) +log = logging.getLogger(__name__) + +def wait_for_terminal_status(invocation: actions.Invocation) -> None: + def _invocation_has_terminal_status(invoc: actions.Invocation) -> bool: + invoc.refresh() + return invoc.reached_terminal_status + + wait_for( + _invocation_has_terminal_status, + args=[invocation], + interval=10, + timeout_msg=f"Invocation {invocation.id} has not run to completion in allowed time", + ) + +caller_org_id = "roboto-public" + +# Create dataset +log.info("Creating dataset") +dataset = datasets.Dataset.create(caller_org_id=caller_org_id) +dataset.put_tags(tags=["integration_test", "ros_ingestion"]) +log.info(f"Dataset created: {dataset.dataset_id}") + +# Upload PX4 file +test_file_path_ulg = pathlib.Path("./test/input/test.ulg") +log.info(f"Uploading file {test_file_path_ulg}") +dataset.upload_file(test_file_path_ulg, file_destination_path="test.ulg") + +# Invoke action +action = actions.Action.from_name(name="tag_ulog", owner_org_id=caller_org_id) +log.info("Invoking action") +invocation = action.invoke( + input_data=["**.ulg"], + data_source_id=dataset.dataset_id, + data_source_type=actions.InvocationDataSourceType.Dataset, + invocation_source=actions.InvocationSource.Manual, + caller_org_id=caller_org_id, +) +wait_for_terminal_status(invocation) + +try: + assert invocation.current_status == actions.InvocationStatus.Completed +except AssertionError as e: + for entry in invocation.get_logs(): + print(entry.log) + raise e + +file_id = dataset.get_file_by_path("test.ulg") + +assert "ERROR_commander" in file_id.tags +log.info("Test completed successfully") diff --git a/actions/tag_ulog/src/tag_ulog/__main__.py b/actions/tag_ulog/src/tag_ulog/__main__.py index 22f6bd5..ea2e004 100644 --- a/actions/tag_ulog/src/tag_ulog/__main__.py +++ b/actions/tag_ulog/src/tag_ulog/__main__.py @@ -1,48 +1,26 @@ from pyulog.core import ULog -import argparse -import logging import os -import pathlib -import json -from typing import Union import re -import sys - -from roboto.domain import actions, datasets, files, http_delegates -from roboto.env import RobotoEnvKey -from roboto.http import ( - HttpClient, - SigV4AuthDecorator, -) -from roboto import updates +from pathlib import Path -from roboto.transactions.transaction_manager import TransactionManager +import logging +from roboto import ActionRuntime logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -def load_env_var(env_var: RobotoEnvKey, strict=True) -> Union[str, None]: - """ - Load an environment variable, and exit if it is not found. +def extract_text_between_brackets(text): + ''' + Extract text between brackets Args: - - env_var: The environment variable to load. + text (str): Text to extract from Returns: - - The value of the environment variable. - """ - value = os.getenv(env_var.value) - if not value: - if strict: - logger.error("Missing required ENV var: '%s'", env_var) - sys.exit(1) - else: - return None - return value - + str: Text between brackets + ''' -def extract_text_between_brackets(text): match = re.search(r'\[([^\]]+)\]', text) if match: return match.group(1) @@ -50,47 +28,29 @@ def extract_text_between_brackets(text): return None -def main(args): +if __name__ == "__main__": + runtime = ActionRuntime.from_env() - input_dir = args.input_dir - output_dir = args.output_dir - dataset_metadata_path = args.dataset_metadata_path - files_metadata_path = args.files_metadata_path + input_dir = runtime.input_dir + output_dir = runtime.output_dir + dataset = runtime.dataset - if not input_dir or not output_dir or not dataset_metadata_path or not files_metadata_path: - error_msg = "Set ROBOTO_INPUT_DIR, ROBOTO_OUTPUT_DIR and ROBOTO_DATASET_METADATA_CHANGESET_FILE, ROBOTO_FILE_METADATA_CHANGESET_FILE env variables." + if not input_dir: + error_msg = "Set ROBOTO_INPUT_DIR env variable." logger.error(error_msg) raise RuntimeError(error_msg) - # get dataset id - invocation_id = load_env_var(RobotoEnvKey.InvocationId, strict=False) - logger.info(f"{invocation_id=}") - - # If inside an invocation, get info for file-level tagging - if invocation_id: - # Setup and authorize HTTP client - client = HttpClient(default_auth=SigV4AuthDecorator("execute-api")) - service_url = load_env_var(RobotoEnvKey.RobotoServiceUrl) - - delegate = http_delegates.HttpDelegates.from_client(http=client, endpoint=service_url) - invocation = actions.invocation.Invocation.from_id(invocation_id, delegate.invocations) - dataset_id = invocation.data_source.data_source_id - logger.info(f"{dataset_id}=") - transaction_manager = TransactionManager(service_url, client) - - dataset = datasets.dataset.Dataset.from_id(dataset_id, dataset_delegate=delegate.datasets, - file_delegate=delegate.files, - transaction_manager=transaction_manager) - else: - dataset = None - for root, dirs, paths in os.walk(input_dir): for file in paths: # Check if the file ends with .ulg if file.endswith(".ulg"): _, ulg_file_name = os.path.split(file) + msg_filter = [] full_path = os.path.join(root, file) + relative_path = Path(full_path).relative_to(input_dir) + file_id = dataset.get_file_by_path(relative_path) + ulog = ULog(full_path, msg_filter, True) file_put_tags = list() for m in ulog.logged_messages: @@ -102,57 +62,4 @@ def main(args): if module_name not in file_put_tags: file_put_tags.append(module_name) continue - - if dataset and file_put_tags: - for tag in file_put_tags: - # build a MetadataChangeset with put_tag direction - changeset = updates.MetadataChangeset().Builder().put_tag(tag).build() - ulg_file = dataset.get_file_info(file) - file_update_request = files.file_requests.UpdateFileRecordRequest(metadata_changeset=changeset) - # pass request to add metadata - ulg_file = ulg_file.update(file_update_request) - logger.info(f"Tagging {file} with {tag}") - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - - parser.add_argument( - "-i", - "--input-dir", - dest="input_dir", - type=pathlib.Path, - required=False, - help="Directory containing input files to process", - default=load_env_var(RobotoEnvKey.InputDir), - ) - parser.add_argument( - "-o", - "--output-dir", - dest="output_dir", - type=pathlib.Path, - required=False, - help="Directory to which to write any output files to be uploaded", - default=load_env_var(RobotoEnvKey.OutputDir) - ) - parser.add_argument( - "-d", - "--dataset-metadata-path", - dest="dataset_metadata_path", - type=pathlib.Path, - required=False, - help="Path at which to save dataset-level metadata", - default=load_env_var(RobotoEnvKey.DatasetMetadataChangesetFile) - ) - parser.add_argument( - "-f", - "--files-metadata-path", - dest="files_metadata_path", - type=pathlib.Path, - required=False, - help="Path at which to save file-level metadata", - default=load_env_var(RobotoEnvKey.FileMetadataChangesetFile) - ) - - args = parser.parse_args() - main(args) + file_id.put_tags(tags=file_put_tags) \ No newline at end of file