diff --git a/actions/ulog_ingestion/.gitignore b/actions/ulog_ingestion/.gitignore deleted file mode 100644 index a81be65..0000000 --- a/actions/ulog_ingestion/.gitignore +++ /dev/null @@ -1,11 +0,0 @@ -.venv -.mypy_cache -**/*.egg-info -**/.mypy_cache/ -**/.venv/ -**/__pycache__/ -**/.pytest_cache -**/dist/ -*.swp -*.pyc -output diff --git a/actions/ulog_ingestion/.python-version b/actions/ulog_ingestion/.python-version deleted file mode 100644 index 05f8ee1..0000000 --- a/actions/ulog_ingestion/.python-version +++ /dev/null @@ -1,4 +0,0 @@ -# This file is a directive to pyenv (https://github.com/pyenv/pyenv) to set matching version of Python in this directory. -# If you don't use pyenv, you can safely delete this file. -# The roboto CLI requires Python 3.9 or higher. -3.10 \ No newline at end of file diff --git a/actions/ulog_ingestion/Dockerfile b/actions/ulog_ingestion/Dockerfile deleted file mode 100644 index 0588ec3..0000000 --- a/actions/ulog_ingestion/Dockerfile +++ /dev/null @@ -1,11 +0,0 @@ -ARG PYTHON_MAJOR=3 -ARG PYTHON_MINOR=10 -ARG OS_VARIANT=slim-bookworm -FROM --platform=linux/amd64 public.ecr.aws/docker/library/python:${PYTHON_MAJOR}.${PYTHON_MINOR}-${OS_VARIANT} - -COPY requirements.runtime.txt ./ -RUN python -m pip install --upgrade pip setuptools && python -m pip install -r requirements.runtime.txt - -COPY src/ulog_ingestion/ ./ulog_ingestion - -ENTRYPOINT [ "python", "-m", "ulog_ingestion" ] diff --git a/actions/ulog_ingestion/README.md b/actions/ulog_ingestion/README.md deleted file mode 100644 index 09e4f87..0000000 --- a/actions/ulog_ingestion/README.md +++ /dev/null @@ -1,14 +0,0 @@ -# ulog_ingestion - -This Action processes ULog files for visualization within the Roboto platform. - -## Getting started - -1. Setup a virtual environment specific to this project and install development dependencies, including the `roboto` CLI: `./scripts/setup.sh` -2. Build Docker image: `./scripts/build.sh` -3. Run Action image locally: `./scripts/run.sh ` -4. Deploy to Roboto Platform: `./scripts/deploy.sh` - -## Action configuration file - -This Roboto Action is configured in `action.json`. Refer to Roboto's latest documentation for the expected structure. diff --git a/actions/ulog_ingestion/action.json b/actions/ulog_ingestion/action.json deleted file mode 100644 index 00ebd6b..0000000 --- a/actions/ulog_ingestion/action.json +++ /dev/null @@ -1,23 +0,0 @@ -{ - "name": "ulog_ingestion", - "description": "Ingest data from ULog files into topics for visualization and search.", - "short_description": "Ingest data from ULog files into topics for visualization and search.", - "parameters": [ - { - "name": "TOPICS", - "required": false, - "description": "Comma-separated list of topics to extract. For example: battery_status,actuator_armed" - } - ], - "compute_requirements": { - "vCPU": 4096, - "memory": 8192, - "storage": 21 - }, - "tags": [ - "px4" - ], - "metadata": { - "github_url": "https://github.com/roboto-ai/robologs-px4-actions/tree/main/actions/ulog_ingestion" - } -} diff --git a/actions/ulog_ingestion/requirements.dev.txt b/actions/ulog_ingestion/requirements.dev.txt deleted file mode 100644 index 23a2e3f..0000000 --- a/actions/ulog_ingestion/requirements.dev.txt +++ /dev/null @@ -1,7 +0,0 @@ -# Python packages to install into the this directory's virtual environment -# for the purpose of development, testing, and deployment. - -# Install all required runtime dependencies in local virtual environment. --r requirements.runtime.txt - -# Add additional Python packages to install here. diff --git a/actions/ulog_ingestion/requirements.runtime.txt b/actions/ulog_ingestion/requirements.runtime.txt deleted file mode 100644 index 90e55da..0000000 --- a/actions/ulog_ingestion/requirements.runtime.txt +++ /dev/null @@ -1,5 +0,0 @@ -# Python packages to install within the Docker image associated with this Action. -roboto==0.2.13 -pyulog==1.0.2 -mcap==1.1.1 -jsonschema>=4.21.1 diff --git a/actions/ulog_ingestion/scripts/build.sh b/actions/ulog_ingestion/scripts/build.sh deleted file mode 100755 index 2d384cb..0000000 --- a/actions/ulog_ingestion/scripts/build.sh +++ /dev/null @@ -1,14 +0,0 @@ -#!/usr/bin/env bash - -set -euo pipefail - -SCRIPTS_ROOT=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd) -PACKAGE_ROOT=$(dirname "${SCRIPTS_ROOT}") - -build_subcommand=(build) -# if buildx is installed, use it -if docker buildx version &> /dev/null; then - build_subcommand=(buildx build --platform linux/amd64 --output type=image) -fi - -docker "${build_subcommand[@]}" -f $PACKAGE_ROOT/Dockerfile -t ulog_ingestion:latest $PACKAGE_ROOT diff --git a/actions/ulog_ingestion/scripts/deploy.sh b/actions/ulog_ingestion/scripts/deploy.sh deleted file mode 100755 index 036cbea..0000000 --- a/actions/ulog_ingestion/scripts/deploy.sh +++ /dev/null @@ -1,50 +0,0 @@ -#!/usr/bin/env bash - -set -euo pipefail - -SCRIPTS_ROOT=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd) -PACKAGE_ROOT=$(dirname "${SCRIPTS_ROOT}") - -# Early exit if virtual environment does not exist and/or roboto is not yet installed -if [ ! -f "$PACKAGE_ROOT/.venv/bin/roboto" ]; then - echo "Virtual environment with roboto CLI does not exist. Please run ./scripts/setup.sh first." - exit 1 -fi - -# Set org_id to $ROBOTO_ORG_ID if defined, else the first argument passed to this script -org_id=${ROBOTO_ORG_ID:-} -if [ $# -gt 0 ]; then - org_id=$1 -fi - -roboto_exe="$PACKAGE_ROOT/.venv/bin/roboto" - -echo "Pushing ulog_ingestion:latest to Roboto's private registry" -image_push_args=( - --suppress-upgrade-check - images push - --quiet -) -if [[ -n $org_id ]]; then - image_push_args+=(--org $org_id) -fi -image_push_args+=(ulog_ingestion:latest) -image_push_ret_code=0 -image_uri=$($roboto_exe "${image_push_args[@]}") -image_push_ret_code=$? - -if [ $image_push_ret_code -ne 0 ]; then - echo "Failed to push ulog_ingestion:latest to Roboto's private registry" - exit 1 -fi - -echo "Creating ulog_ingestion action" -create_args=( - --from-file $PACKAGE_ROOT/action.json - --image $image_uri - --yes -) -if [[ -n $org_id ]]; then - create_args+=(--org $org_id) -fi -$roboto_exe actions create "${create_args[@]}" diff --git a/actions/ulog_ingestion/scripts/platform_test.sh b/actions/ulog_ingestion/scripts/platform_test.sh deleted file mode 100755 index 2803cc1..0000000 --- a/actions/ulog_ingestion/scripts/platform_test.sh +++ /dev/null @@ -1,83 +0,0 @@ -#!/bin/bash -set -e - -ACTION_NAME=ulog_ingestion -INPUT_FILE_PATH="./test/input/test.ulg" -EXPECTED_OUTPUT_FILE_NAME="battery_status.mcap" -INPUT_DATA="*ulg" -ORG=${ORG} - -# Check for command line argument for ORG, fallback to roboto-public if not provided -if [ -z "$1" ]; then - ORG="roboto-public" -else - ORG=$1 -fi - -# Step 1: Build and deploy docker image -./scripts/build.sh -./scripts/setup.sh -./scripts/deploy.sh "$ORG" - -# Step 2: Create dataset on platform and parse its ID -create_output=$(roboto datasets create --org "$ORG") -dataset_id=$(echo "$create_output" | jq -r '.dataset_id') - -if [ -z "$dataset_id" ]; then - echo "Failed to create dataset" - exit 1 -fi - -echo "Dataset created with ID: $dataset_id" - -# Step 3: Upload a test file to the dataset -roboto datasets upload-files -d "$dataset_id" -p "$INPUT_FILE_PATH" - -# Step 4: Invoke the Action and parse Invocation ID -invoke_output=$(roboto actions invoke "$ACTION_NAME" --dataset-id "$dataset_id" --input-data "$INPUT_DATA" --org "$ORG") -invocation_id=$(echo "$invoke_output" | grep -oP "Invocation ID: '\K[^']+" ) - -if [ -z "$invocation_id" ]; then - echo "Failed to invoke action" - exit 1 -fi - -echo "Action invoked with ID: $invocation_id" - -# Step 5: Monitor the invocation status -end_time=$(date -ud "10 minutes" +%s) -while true; do - now=$(date +%s) - if (( now > end_time )); then - echo "Invocation monitoring timed out for $ACTION_NAME" - exit 1 - fi - - status_output=$(roboto invocations status "$invocation_id") - status=$(echo "$status_output" | jq -r '.[-1].status') - - case $status in - "Completed") - echo "Action completed" - break - ;; - "Failed") - echo "Action failed for $ACTION_NAME" - roboto invocations logs $invocation_id - exit 1 - ;; - *) - echo "Current status: $status. Checking again in 20 seconds..." - sleep 20 - ;; - esac -done - -# Step 6: Verify the output files -list_output=$(roboto datasets list-files -d "$dataset_id") -if echo "$list_output" | grep -q "$EXPECTED_OUTPUT_FILE_NAME"; then - echo "Test succeeded for $ACTION_NAME" -else - echo "Test failed for $ACTION_NAME" -fi - diff --git a/actions/ulog_ingestion/scripts/run.sh b/actions/ulog_ingestion/scripts/run.sh deleted file mode 100755 index a50c34e..0000000 --- a/actions/ulog_ingestion/scripts/run.sh +++ /dev/null @@ -1,40 +0,0 @@ -#!/usr/bin/env bash - -set -euo pipefail - -SCRIPTS_ROOT=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd) -PACKAGE_ROOT=$(dirname "${SCRIPTS_ROOT}") - -# Set input_dir to $ROBOTO_INPUT_DIR if defined, else the first argument passed to this script -input_dir=${ROBOTO_INPUT_DIR:-} -if [ $# -gt 0 ]; then - input_dir=$1 -fi - -# Fail if input_dir is not an existing directory -if [ ! -d "$input_dir" ]; then - echo "Specify an existing input directory as the first argument to this script, or set the ROBOTO_INPUT_DIR environment variable" - exit 1 -fi - -# Set output_dir variable to $ROBOTO_OUTPUT_DIR if defined, else set it to "output/" in the package root (creating if necessary) -output_dir=${ROBOTO_OUTPUT_DIR:-$PACKAGE_ROOT/output} -mkdir -p $output_dir - -# Assert both directories are absolute paths -if [[ ! "$input_dir" = /* ]]; then - echo "Input directory '$input_dir' must be specified as an absolute path" - exit 1 -fi - -if [[ ! "$output_dir" = /* ]]; then - echo "Output directory '$output_dir' must be specified as an absolute path" - exit 1 -fi - -docker run --rm -it \ - -v $input_dir:/input \ - -v $output_dir:/output \ - -e ROBOTO_INPUT_DIR=/input \ - -e ROBOTO_OUTPUT_DIR=/output \ - ulog_ingestion:latest diff --git a/actions/ulog_ingestion/scripts/setup.sh b/actions/ulog_ingestion/scripts/setup.sh deleted file mode 100755 index 20cb740..0000000 --- a/actions/ulog_ingestion/scripts/setup.sh +++ /dev/null @@ -1,15 +0,0 @@ -#!/usr/bin/env bash - -set -euo pipefail - -SCRIPTS_ROOT=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd) -PACKAGE_ROOT=$(dirname "${SCRIPTS_ROOT}") - -venv_dir="$PACKAGE_ROOT/.venv" - -# Create a virtual environment -python -m venv --upgrade-deps $venv_dir - -# Install roboto -pip_exe="$venv_dir/bin/pip" -$pip_exe install --upgrade -r $PACKAGE_ROOT/requirements.dev.txt diff --git a/actions/ulog_ingestion/scripts/test.sh b/actions/ulog_ingestion/scripts/test.sh deleted file mode 100755 index 6820f66..0000000 --- a/actions/ulog_ingestion/scripts/test.sh +++ /dev/null @@ -1,80 +0,0 @@ -#!/bin/bash - -SCRIPTS_ROOT=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd) -PACKAGE_ROOT=$(dirname "${SCRIPTS_ROOT}") - -# Define constants for directories and file paths - -INPUT_DIR=${PACKAGE_ROOT}/test/input -ACTUAL_OUTPUT_DIR=${PACKAGE_ROOT}/test/actual_output -EXPECTED_OUTPUT_DIR=${PACKAGE_ROOT}/test/expected_output - -if [ ! -d "$ACTUAL_OUTPUT_DIR" ]; then - mkdir -p "$ACTUAL_OUTPUT_DIR" -fi - -# Remove previous outputs -clean_actual_output() { - rm -rf $ACTUAL_OUTPUT_DIR/ -} - -# Check if file exists -file_exists_or_error() { - local file_path="$1" - - if [ ! -f "$file_path" ]; then - echo "Error: File '$file_path' does not exist." - exit 1 - fi - echo "Test passed!" - -} - -# Run the docker command with the given parameters -run_docker_test() { - local additional_args="$1" - - docker run \ - -v $INPUT_DIR:/input \ - -v $ACTUAL_OUTPUT_DIR:/output \ - -e ROBOTO_INPUT_DIR=/input \ - -e ROBOTO_OUTPUT_DIR=/output \ - $additional_args \ - ulog_ingestion:latest -} - -function check_file_does_not_exist() { - local file_path="$1" - if [[ ! -e "$file_path" ]]; then - echo "Test passed!" - else - echo "Test failed: $1 exists!" - exit 1 - fi -} - -# Compare the actual output to the expected output -compare_outputs() { - local actual_file="$1" - local expected_file="$2" - - diff $ACTUAL_OUTPUT_DIR/$actual_file $EXPECTED_OUTPUT_DIR/$expected_file - - if [ $? -eq 0 ]; then - echo "Test passed!" - else - echo "Test failed!" - exit 1 - fi -} - -# Main test execution -main() { - - # Test 1 - echo "No-op test passed!" -} - -# Run the main test execution -main -clean_actual_output diff --git a/actions/ulog_ingestion/src/tests/test.ulg b/actions/ulog_ingestion/src/tests/test.ulg deleted file mode 100644 index 6196e98..0000000 Binary files a/actions/ulog_ingestion/src/tests/test.ulg and /dev/null differ diff --git a/actions/ulog_ingestion/src/tests/test_utils.py b/actions/ulog_ingestion/src/tests/test_utils.py deleted file mode 100644 index a263dc9..0000000 --- a/actions/ulog_ingestion/src/tests/test_utils.py +++ /dev/null @@ -1,59 +0,0 @@ -import os.path -import shutil -import ulog_ingestion.utils as utils -from pyulog.core import ULog - - -def test_create_mcap_file_from_ulog(tmp_path): - ulog_file_path = "./tests/test.ulg" - - test_topic_name = "vehicle_acceleration" - - output_path_per_topic_mcap = tmp_path / f"{test_topic_name}.mcap" - - ulog = ULog(ulog_file_path, [test_topic_name], True) - - schema_registry_dict = {} - - for key in ulog.message_formats: - json_schema_topic = utils.create_json_schema(ulog.message_formats[key].fields) - schema_registry_dict[key] = json_schema_topic - - for data_object in sorted(ulog.data_list, key=lambda obj: obj.name): - print(data_object.name) - utils.create_per_topic_mcap_from_ulog(output_path_per_topic_mcap, - data_object, - schema_registry_dict[data_object.name]) - - assert output_path_per_topic_mcap.exists() - - -def test_setup_output_folder_structure(): - - ulog_file_path = "/workspace/abc/test.ulg" - input_dir = "/workspace/" - - output_folder_path, temp_dir = utils.setup_output_folder_structure(ulog_file_path, input_dir) - - assert output_folder_path == f"{temp_dir}/.VISUALIZATION_ASSETS/abc/test" - assert os.path.exists(output_folder_path) - shutil.rmtree(output_folder_path) - - -def test_is_valid_ulog(): - - ulog_file_path = "./tests/test.ulg" - is_valid = utils.is_valid_ulog(ulog_file_path) - assert is_valid is True - - -# def test_add_metadata(): -# ulog_file_path = "./tests/test.ulg" -# #topics = ["vehicle_acceleration"] -# -# utils.add_metadata(ulog_file_path, topics=None) - - # assert os.path.exists(ulog_file_path) - # assert os.path.exists(f"{ulog_file_path}.metadata.json") - # os.remove(f"{ulog_file_path}.metadata.json") - # assert not os.path.exists(f"{ulog_file_path}.metadata.json") diff --git a/actions/ulog_ingestion/src/ulog_ingestion/__init__.py b/actions/ulog_ingestion/src/ulog_ingestion/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/actions/ulog_ingestion/src/ulog_ingestion/__main__.py b/actions/ulog_ingestion/src/ulog_ingestion/__main__.py deleted file mode 100644 index e01add2..0000000 --- a/actions/ulog_ingestion/src/ulog_ingestion/__main__.py +++ /dev/null @@ -1,308 +0,0 @@ -import argparse -import os -from typing import List -from pyulog.core import ULog -import logging -import sys -import pathlib -import time -from roboto.env import RobotoEnvKey -from concurrent.futures import ProcessPoolExecutor, as_completed - -from roboto.association import ( - Association, - AssociationType, -) -from roboto.domain import actions, datasets, files, topics -from roboto.http import ( - HttpClient, - SigV4AuthDecorator, -) -from roboto.transactions import TransactionManager -from roboto.env import RobotoEnvKey -import ulog_ingestion.utils as utils - -log = logging.getLogger("Ingesting ULog files to Roboto") - - -def load_env_var(env_var: RobotoEnvKey) -> str: - """ - Load an environment variable, and exit if it is not found. - - Args: - - env_var: The environment variable to load. - - Returns: - - The value of the environment variable. - """ - value = os.getenv(env_var.value) - if not value: - log.error("Missing required ENV var: '%s'", env_var) - sys.exit(1) - return value - - -def setup_env(): - """ - Set up the environment for the action. - - Returns: - - A tuple containing the organization ID, input directory, output directory, topic delegate, and dataset. - """ - roboto_service_url = load_env_var(RobotoEnvKey.RobotoServiceUrl) - org_id = load_env_var(RobotoEnvKey.OrgId) - invocation_id = load_env_var(RobotoEnvKey.InvocationId) - input_dir = load_env_var(RobotoEnvKey.InputDir) - output_dir = load_env_var(RobotoEnvKey.OutputDir) - - http_client = HttpClient(default_auth=SigV4AuthDecorator("execute-api")) - - topic_delegate = topics.TopicHttpDelegate( - roboto_service_base_url=roboto_service_url, http_client=http_client - ) - - invocation = actions.Invocation.from_id( - invocation_id, - invocation_delegate=actions.InvocationHttpDelegate( - roboto_service_base_url=roboto_service_url, http_client=http_client - ) - ) - dataset = datasets.Dataset.from_id( - invocation.data_source.data_source_id, - datasets.DatasetHttpDelegate( - roboto_service_base_url=roboto_service_url, http_client=http_client - ), - files.FileClientDelegate( - roboto_service_base_url=roboto_service_url, http_client=http_client - ), - transaction_manager=TransactionManager( - roboto_service_base_url=roboto_service_url, http_client=http_client - ), - ) - - return org_id, input_dir, output_dir, topic_delegate, dataset - - -def process_data( - ulog_file_path, - ulog_data_object, - ulog_message_format_object, - message_names_with_multi_id_list, - output_dir_path_mcap, - output_dir_temp, -): - """ - Process the data from a ULog file. - - Args: - - ulog_file_path: Path to the .ulg file. - - ulog_data_object: The ULog data object to process. - - ulog_message_format_object: The ULog message object to process. - - message_names_with_multi_id_list: A list of message names with multi IDs. - - output_dir_path_mcap: The path to the output directory for the MCAP files. - - output_dir_temp: The path to the temporary output directory. - - Returns: - - None - """ - org_id, input_dir, output_dir, topic_delegate, dataset = setup_env() - - dataset_relative_path = pathlib.Path(ulog_file_path).relative_to(input_dir) - file_record = dataset.get_file_info(dataset_relative_path) - - topic_association = Association( - association_id=file_record.file_id, - association_type=AssociationType.File - ) - - topic_name = schema_name = topic_name_roboto = ulog_data_object.name - - if topic_name in message_names_with_multi_id_list: - topic_name_roboto += "_" + str(ulog_data_object.multi_id).zfill(2) - - # See link below for more context about how pyulog organizes data - # https://github.com/PX4/pyulog/blob/7819181f2c9cb0ecfbcd73d46571b0afa44f0d97/pyulog/core.py#L499-L503 - message_count = len(ulog_data_object.data["timestamp"]) - - json_schema_topic = utils.create_json_schema(ulog_message_format_object[topic_name].fields) - - schema_checksum = utils.compute_checksum(json_schema_topic) - - # Create Topic Record - start_time_ns = min(ulog_data_object.data["timestamp"]) * 1000 - end_time_ns = max(ulog_data_object.data["timestamp"]) * 1000 - - topic = topics.Topic.create( - request=topics.CreateTopicRequest( - association=topic_association, - org_id=org_id, - schema_name=schema_name, - schema_checksum=schema_checksum, - topic_name=topic_name_roboto, - message_count=message_count, - start_time=start_time_ns, - end_time=end_time_ns, - ), - topic_delegate=topic_delegate, - ) - print(f"Topic created: {topic_name_roboto}") - - # Create Message Path Records - utils.create_message_path_records(topic, ulog_data_object.field_data) - - # Create MCAP File - output_path_per_topic_mcap = os.path.join( - output_dir_path_mcap, f"{topic_name_roboto}.mcap" - ) - print(f"MCAP file path: {output_path_per_topic_mcap}") - utils.create_per_topic_mcap_from_ulog( - output_path_per_topic_mcap, ulog_data_object, json_schema_topic - ) - - relative_file_name = output_path_per_topic_mcap.split(output_dir_temp)[1][1:] - - - # Upload MCAP File - dataset.upload_file( - pathlib.Path(output_path_per_topic_mcap), relative_file_name - ) - - file_id = dataset.get_file_info(relative_file_name).file_id - - - relative_file_name = ulog_file_path.split(input_dir)[1][1:] - print( - f"https://app-beta.roboto.ai/visualize/{utils.generate_config(file_record.file_id, relative_file_name)}" - ) - - print( - f"Setting default representation for topic: {topic_name_roboto}, file_id: {file_id}" - ) - - # Set Default Topic Representation - topic.set_default_representation( - request=topics.SetDefaultRepresentationRequest( - association=Association( - association_type=AssociationType.File, - association_id=file_id, - ), - org_id=org_id, - storage_format=topics.RepresentationStorageFormat.MCAP, - version=1, - ) - ) - relative_file_name = ulog_file_path.split(input_dir)[1][1:] - print(f"https://app-beta.roboto.ai/visualize/{utils.generate_config(file_record.file_id, relative_file_name)}") - return - - -def ingest_ulog(ulog_file_path: str, topics: List[str] = None): - """ - - This function creates topic entries, message path records, and MCAP files from a ULog file. - - Args: - - ulog_file_path: Path to the .ulg file. - - messages: List of messages to process, separated by commas. - - Returns: - - None - """ - msg_filter = topics.split(",") if topics else None - print(ulog_file_path) - - ulog = ULog(ulog_file_path, msg_filter, True) - - _, input_dir, _, _, _ = setup_env() - - output_dir_path_mcap, output_dir_temp = utils.setup_output_folder_structure( - ulog_file_path, input_dir - ) - start_time = time.time() - - # This is a temporary fix for Multi Information messages with the same name - # https://docs.px4.io/main/en/dev_log/ulog_file_format.html#m-multi-information-message - # Similar to Plotjuggler approach - # TODO: handle this in a better way - message_names_with_multi_id_list = list() - for d in sorted(ulog.data_list, key=lambda obj: obj.name): - print(f"topic name: {d.name}") - if d.multi_id > 0: - message_names_with_multi_id_list.append(d.name) - - message_names_with_multi_id_list = list(set(message_names_with_multi_id_list)) - - print(f"message_names_with_multi_id_list: {message_names_with_multi_id_list}") - - # Prepare Arguments for Parallel Processing - args_list = [ - ( - ulog_file_path, - ulog_data_object, - ulog.message_formats, - message_names_with_multi_id_list, - output_dir_path_mcap, - output_dir_temp, - ) - for ulog_data_object in sorted(ulog.data_list, key=lambda obj: obj.name) - ] - - with ProcessPoolExecutor() as executor: - # Submit all tasks and collect Future objects - futures = [executor.submit(process_data, *args) for args in args_list] - - for future in as_completed(futures): - try: - print("Task completed successfully") - except Exception as exc: - print(f"Task generated an exception: {exc}") - - end_time = time.time() - elapsed_time = end_time - start_time - print(f"The topic create function took {elapsed_time} seconds to process {ulog_file_path}.") - - -parser = argparse.ArgumentParser() -parser.add_argument( - "-i", - "--input-dir", - dest="input_dir", - type=pathlib.Path, - required=False, - help="Directory containing input files to process", - default=os.environ.get(RobotoEnvKey.InputDir.value), -) - -parser.add_argument( - "-o", - "--output-dir", - dest="output_dir", - type=pathlib.Path, - required=False, - help="Directory to which to write any output files to be uploaded", - default=os.environ.get(RobotoEnvKey.OutputDir.value), -) - -parser.add_argument( - "-t", - "--topic-names", - dest="topic_names", - type=str, - required=False, - help="List of topic names to process, separated by commas", - default=os.environ.get("ROBOTO_PARAM_TOPICS", None), -) - -args = parser.parse_args() - -for root, dirs, f in os.walk(args.input_dir): - for file in f: - full_path = os.path.join(root, file) - if utils.is_valid_ulog(full_path): - - utils.add_metadata_to_file(full_path, topics=None) - - ingest_ulog( - ulog_file_path=full_path, - topics=args.topic_names, - ) diff --git a/actions/ulog_ingestion/src/ulog_ingestion/utils.py b/actions/ulog_ingestion/src/ulog_ingestion/utils.py deleted file mode 100644 index 4b403f0..0000000 --- a/actions/ulog_ingestion/src/ulog_ingestion/utils.py +++ /dev/null @@ -1,383 +0,0 @@ -import hashlib -from roboto.env import RobotoEnvKey -import pathlib -import os -import tempfile -import math -from mcap.writer import Writer -from typing import Dict, List, Tuple, Any -from roboto.domain import topics -import base64 -from pyulog.core import ULog -import json - - -# Mapping from message definition types to JSON schema types -TYPE_MAPPING = { - "int8_t": "integer", - "uint8_t": "integer", - "int16_t": "integer", - "uint16_t": "integer", - "int32_t": "integer", - "uint32_t": "integer", - "int64_t": "integer", - "uint64_t": "integer", - "float": "number", - "double": "number", - "bool": "boolean", - "char": "string", -} - - -# Mapping from message definition types to Roboto canonical data types -TYPE_MAPPING_CANONICAL = { - "int8_t": topics.CanonicalDataType.Number, - "uint8_t": topics.CanonicalDataType.Number, - "int16_t": topics.CanonicalDataType.Number, - "uint16_t": topics.CanonicalDataType.Number, - "int32_t": topics.CanonicalDataType.Number, - "uint32_t": topics.CanonicalDataType.Number, - "int64_t": topics.CanonicalDataType.Number, - "uint64_t": topics.CanonicalDataType.Number, - "float": topics.CanonicalDataType.Number, - "double": topics.CanonicalDataType.Number, - "bool": topics.CanonicalDataType.Boolean, - "char": topics.CanonicalDataType.String, -} - - -def compute_checksum(json_schema: Dict[str, Any]) -> str: - """ - Computes the SHA-256 checksum of a given JSON schema. - - This function serializes the JSON schema, sorts its keys for consistency, - and then computes the SHA-256 checksum of the serialized data. - - Args: - - json_schema: A dictionary representing the JSON schema. - - Returns: - - The SHA-256 checksum of the serialized JSON schema as a hexadecimal string. - """ - serialized_schema = json.dumps(json_schema, sort_keys=True).encode("utf-8") - return hashlib.sha256(serialized_schema).hexdigest() - - -def create_json_schema( - message_definition: List[Tuple[str, int, str]] -) -> Dict[str, Any]: - """ - Creates a JSON schema based on a message definition. - - This function iterates over each field in the message definition and constructs - a JSON schema. Fields starting with '_padding' are ignored. The function supports - handling both array and non-array types. - - Args: - - message_definition: A list of tuples, each representing a field in the message. - Each tuple contains the field type, array size, and field name. - - Returns: - - A dictionary representing the constructed JSON schema. - """ - schema = {"type": "object", "properties": {}, "required": []} - - for field_type, array_size, field_name in message_definition: - if field_name.startswith("_padding"): - continue - - json_field_type = TYPE_MAPPING.get(field_type, "string") - - if array_size > 1: - schema_property = { - "type": "array", - "items": {"type": json_field_type}, - "minItems": array_size, - "maxItems": array_size, - } - else: - schema_property = {"type": json_field_type} - - schema["properties"][field_name] = schema_property - schema["required"].append(field_name) - - return schema - - -def parse_values_to_json(values: List[Tuple[str, str, Any]]) -> Dict[str, Any]: - """ - Parses a list of field values into a JSON-compatible format. - - This function iterates over each tuple in the provided list, handling both - array and non-array types. It constructs a JSON object based on field names, - types, and values. Array fields are accumulated before being added to the JSON object. - - Args: - - values: A list of tuples, each containing a field name, field type, and field value. - - Returns: - - A dictionary representing the JSON object constructed from the input values. - """ - json_instance = {} - array_values = {} - - for field_name, field_type, field_value in values: - - if "[" in field_name: - array_name = field_name.split("[")[0] - array_values.setdefault(array_name, []).append( - convert_value(field_type, field_value) - ) - else: - json_instance[field_name] = convert_value(field_type, field_value) - - json_instance.update(array_values) - return json_instance - - -def convert_value(field_type: str, value: Any) -> Any: - """ - Converts a field value to its corresponding JSON type. - - Args: - - field_type: The type of the field as a string. - - value: The value to be converted. - - Returns: - - The converted value in its appropriate JSON type. - """ - if math.isnan(value): - return None - - json_type = TYPE_MAPPING.get(field_type, "string") - if json_type == "integer": - return int(value) - elif json_type == "number": - return float(value) - elif json_type == "boolean": - return bool(int(value)) - elif json_type == "string": - return str(value) - else: - return value - - -def create_message_path_records(topic: Any, field_data: Any) -> None: - """ - Creates message path records for a given topic. - - Args: - - topic: The topic object to which the message paths will be added. - - field_data: A list of field data objects containing the message definition. - """ - array_list = list() - for field in field_data: - # For now only allow these types. TODO: Add support for nested types - - if field.type_str not in TYPE_MAPPING_CANONICAL.keys(): - canonical_data_type = topics.CanonicalDataType.Unknown - else: - canonical_data_type = TYPE_MAPPING_CANONICAL[field.type_str] - - if "[" in field.field_name: - array_name = field.field_name.split("[")[0] - array_field_type = f"{field.type_str}[]" - - if array_name not in array_list: - topic.add_message_path( - request=topics.AddMessagePathRequest( - message_path=array_name, - data_type=array_field_type, # TBD - canonical_data_type=topics.CanonicalDataType.Array, - ) - ) - - print( - f"Adding array: {array_name}, type: {array_field_type}, canonical: {topics.CanonicalDataType.Array}" - ) - - # Add another message path for the array elements - sub_array_name = f"{array_name}.[*]" - topic.add_message_path( - request=topics.AddMessagePathRequest( - message_path=sub_array_name, - data_type=field.type_str, # TBD - canonical_data_type=canonical_data_type, - ) - ) - - print( - f"Adding sub-field for array: {sub_array_name}, type: {field.type_str}, canonical: {canonical_data_type}" - ) - - array_list.append(array_name) - else: - - topic.add_message_path( - request=topics.AddMessagePathRequest( - message_path=field.field_name, - data_type=field.type_str, - canonical_data_type=canonical_data_type, - ) - ) - - print( - f"Adding field: {field.field_name}, type: {field.type_str}, canonical: {canonical_data_type}" - ) - return - - -def generate_config(file_id, relative_path): - viz_config = { - "version": "v1", - "files": [{"fileId": file_id, "relativePath": relative_path}], - } - return base64.urlsafe_b64encode(json.dumps(viz_config).encode("utf-8")).decode( - "utf-8" - ) - - -def create_per_topic_mcap_from_ulog( - output_path_per_topic_mcap: Any, d: str, json_schema_topic: Any -) -> None: - """ - Creates a per-topic MCAP file from a ULog object. - - Args: - - output_path_per_topic_mcap: The path to the output MCAP file. - - d: The ULog object containing the data to be converted. - - json_schema_topic: The json schema dict for the topic. - - Returns: - - None - """ - - with open(output_path_per_topic_mcap, "wb") as stream: - writer = Writer(stream) - writer.start() - schema_id = writer.register_schema( - name=d.name, - encoding="jsonschema", - data=json.dumps(json_schema_topic).encode(), - ) - - channel_id = writer.register_channel( - schema_id=schema_id, - topic=d.name, - message_encoding="json", - ) - - for i in range(len(d.data["timestamp"])): - values = list() - for f in d.field_data: - values.append((f.field_name, f.type_str, d.data[f.field_name][i])) - - json_msg_instance = parse_values_to_json(values) - - timestamp_ns = int(d.data["timestamp"][i] * 1000) - - writer.add_message( - channel_id=channel_id, - log_time=timestamp_ns, - sequence=i, - data=json.dumps(json_msg_instance).encode("utf-8"), - publish_time=timestamp_ns, - ) - - writer.finish() - return - - -def setup_output_folder_structure( - ulog_file_path: str, input_dir: str -) -> Tuple[str, str]: - """ - Set up the output folder structure for the .mcap files. - - Args: - - ulog_file_path: Path to the .ulg file. - - input_dir: Path to the input directory. - """ - relative_folder_path_of_file = os.path.split(ulog_file_path.split(input_dir)[1])[0] - - ulog_file_name = os.path.split(ulog_file_path)[1] - - output_folder_name_ulog = ulog_file_name.replace(".ulg", "") - relative_folder_path_of_file = relative_folder_path_of_file.lstrip("/") - temp_dir = str(tempfile.TemporaryDirectory().name) - - output_folder_path = os.path.join( - temp_dir, - ".VISUALIZATION_ASSETS", - relative_folder_path_of_file, - output_folder_name_ulog, - ) - - print(f"Output folder path: {output_folder_path}") - os.makedirs(output_folder_path, exist_ok=True) - - return output_folder_path, temp_dir - - -def is_valid_ulog(ulog_file_path: str) -> bool: - """ - Check if the given file is a valid .ulg file. - - Args: - - ulog_file_path: Path to the .ulg file. - - Returns: - - True if the file is a valid .ulg file, False otherwise. - """ - - header_bytes = b'\x55\x4c\x6f\x67\x01\x12\x35' - - with open(ulog_file_path, "rb") as file_handle: - - header_data = file_handle.read(16) - if len(header_data) != 16: - raise TypeError("Invalid ULog file format (Header too short)") - if header_data[:7] != header_bytes: - raise TypeError("Invalid ULog file format (Failed to parse header)") - - return True - - -# Helper function. Will be deleted. -def generate_config(file_id, relative_path): - viz_config = { - "version": "v1", - "files": [{"fileId": file_id, "relativePath": relative_path}], - } - return base64.urlsafe_b64encode(json.dumps(viz_config).encode("utf-8")).decode( - "utf-8" - ) - - -def add_metadata_to_file(ulog_file_path: str, topics: List[str] = None): - - msg_filter = topics.split(",") if topics else None - ulog = ULog(ulog_file_path, msg_filter, True) - - input_dir = os.environ[f"{RobotoEnvKey.InputDir.value}"] - relative_file_name = ulog_file_path.split(input_dir)[1][1:] - - file_metadata_changeset_file_path = os.environ[f"{RobotoEnvKey.FileMetadataChangesetFile.value}"] - file_metadata_changeset_file = pathlib.Path(file_metadata_changeset_file_path) - - json_line = json.dumps( - { - "relative_path": relative_file_name, - "update": { - "metadata_changeset": { - "put_fields": ulog.msg_info_dict, - }, - "description": "", - }, - } - ) - - with file_metadata_changeset_file.open('a') as file: - if file.tell() > 0: - file.write('\n') - file.write(json_line) diff --git a/actions/ulog_ingestion/test/input/test.ulg b/actions/ulog_ingestion/test/input/test.ulg deleted file mode 100644 index 6196e98..0000000 Binary files a/actions/ulog_ingestion/test/input/test.ulg and /dev/null differ