diff --git a/actions/ulog_ingestion/.gitignore b/actions/ulog_ingestion/.gitignore new file mode 100644 index 0000000..a81be65 --- /dev/null +++ b/actions/ulog_ingestion/.gitignore @@ -0,0 +1,11 @@ +.venv +.mypy_cache +**/*.egg-info +**/.mypy_cache/ +**/.venv/ +**/__pycache__/ +**/.pytest_cache +**/dist/ +*.swp +*.pyc +output diff --git a/actions/ulog_ingestion/.python-version b/actions/ulog_ingestion/.python-version new file mode 100644 index 0000000..05f8ee1 --- /dev/null +++ b/actions/ulog_ingestion/.python-version @@ -0,0 +1,4 @@ +# This file is a directive to pyenv (https://github.com/pyenv/pyenv) to set matching version of Python in this directory. +# If you don't use pyenv, you can safely delete this file. +# The roboto CLI requires Python 3.9 or higher. +3.10 \ No newline at end of file diff --git a/actions/ulog_ingestion/Dockerfile b/actions/ulog_ingestion/Dockerfile new file mode 100644 index 0000000..0588ec3 --- /dev/null +++ b/actions/ulog_ingestion/Dockerfile @@ -0,0 +1,11 @@ +ARG PYTHON_MAJOR=3 +ARG PYTHON_MINOR=10 +ARG OS_VARIANT=slim-bookworm +FROM --platform=linux/amd64 public.ecr.aws/docker/library/python:${PYTHON_MAJOR}.${PYTHON_MINOR}-${OS_VARIANT} + +COPY requirements.runtime.txt ./ +RUN python -m pip install --upgrade pip setuptools && python -m pip install -r requirements.runtime.txt + +COPY src/ulog_ingestion/ ./ulog_ingestion + +ENTRYPOINT [ "python", "-m", "ulog_ingestion" ] diff --git a/actions/ulog_ingestion/README.md b/actions/ulog_ingestion/README.md new file mode 100644 index 0000000..09e4f87 --- /dev/null +++ b/actions/ulog_ingestion/README.md @@ -0,0 +1,14 @@ +# ulog_ingestion + +This Action processes ULog files for visualization within the Roboto platform. + +## Getting started + +1. Setup a virtual environment specific to this project and install development dependencies, including the `roboto` CLI: `./scripts/setup.sh` +2. Build Docker image: `./scripts/build.sh` +3. Run Action image locally: `./scripts/run.sh ` +4. Deploy to Roboto Platform: `./scripts/deploy.sh` + +## Action configuration file + +This Roboto Action is configured in `action.json`. Refer to Roboto's latest documentation for the expected structure. diff --git a/actions/ulog_ingestion/action.json b/actions/ulog_ingestion/action.json new file mode 100644 index 0000000..f712fe5 --- /dev/null +++ b/actions/ulog_ingestion/action.json @@ -0,0 +1,16 @@ +{ + "name": "ulog_ingestion", + "description": "This Action processes ULog files for visualization within the Roboto platform.", + "parameters": [ + { + "name": "TOPICS", + "required": false, + "description": "Comma-separated list of topics to extract. For example: battery_status,actuator_armed" + } + ], + "compute_requirements": { + "vCPU": 4096, + "memory": 8192, + "storage": 21 + } +} diff --git a/actions/ulog_ingestion/requirements.dev.txt b/actions/ulog_ingestion/requirements.dev.txt new file mode 100644 index 0000000..23a2e3f --- /dev/null +++ b/actions/ulog_ingestion/requirements.dev.txt @@ -0,0 +1,7 @@ +# Python packages to install into the this directory's virtual environment +# for the purpose of development, testing, and deployment. + +# Install all required runtime dependencies in local virtual environment. +-r requirements.runtime.txt + +# Add additional Python packages to install here. diff --git a/actions/ulog_ingestion/requirements.runtime.txt b/actions/ulog_ingestion/requirements.runtime.txt new file mode 100644 index 0000000..c621319 --- /dev/null +++ b/actions/ulog_ingestion/requirements.runtime.txt @@ -0,0 +1,5 @@ +# Python packages to install within the Docker image associated with this Action. +roboto==0.2.11 +pyulog==1.0.2 +mcap==1.1.1 +jsonschema>=4.21.1 diff --git a/actions/ulog_ingestion/scripts/build.sh b/actions/ulog_ingestion/scripts/build.sh new file mode 100755 index 0000000..2d384cb --- /dev/null +++ b/actions/ulog_ingestion/scripts/build.sh @@ -0,0 +1,14 @@ +#!/usr/bin/env bash + +set -euo pipefail + +SCRIPTS_ROOT=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd) +PACKAGE_ROOT=$(dirname "${SCRIPTS_ROOT}") + +build_subcommand=(build) +# if buildx is installed, use it +if docker buildx version &> /dev/null; then + build_subcommand=(buildx build --platform linux/amd64 --output type=image) +fi + +docker "${build_subcommand[@]}" -f $PACKAGE_ROOT/Dockerfile -t ulog_ingestion:latest $PACKAGE_ROOT diff --git a/actions/ulog_ingestion/scripts/deploy.sh b/actions/ulog_ingestion/scripts/deploy.sh new file mode 100755 index 0000000..036cbea --- /dev/null +++ b/actions/ulog_ingestion/scripts/deploy.sh @@ -0,0 +1,50 @@ +#!/usr/bin/env bash + +set -euo pipefail + +SCRIPTS_ROOT=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd) +PACKAGE_ROOT=$(dirname "${SCRIPTS_ROOT}") + +# Early exit if virtual environment does not exist and/or roboto is not yet installed +if [ ! -f "$PACKAGE_ROOT/.venv/bin/roboto" ]; then + echo "Virtual environment with roboto CLI does not exist. Please run ./scripts/setup.sh first." + exit 1 +fi + +# Set org_id to $ROBOTO_ORG_ID if defined, else the first argument passed to this script +org_id=${ROBOTO_ORG_ID:-} +if [ $# -gt 0 ]; then + org_id=$1 +fi + +roboto_exe="$PACKAGE_ROOT/.venv/bin/roboto" + +echo "Pushing ulog_ingestion:latest to Roboto's private registry" +image_push_args=( + --suppress-upgrade-check + images push + --quiet +) +if [[ -n $org_id ]]; then + image_push_args+=(--org $org_id) +fi +image_push_args+=(ulog_ingestion:latest) +image_push_ret_code=0 +image_uri=$($roboto_exe "${image_push_args[@]}") +image_push_ret_code=$? + +if [ $image_push_ret_code -ne 0 ]; then + echo "Failed to push ulog_ingestion:latest to Roboto's private registry" + exit 1 +fi + +echo "Creating ulog_ingestion action" +create_args=( + --from-file $PACKAGE_ROOT/action.json + --image $image_uri + --yes +) +if [[ -n $org_id ]]; then + create_args+=(--org $org_id) +fi +$roboto_exe actions create "${create_args[@]}" diff --git a/actions/ulog_ingestion/scripts/run.sh b/actions/ulog_ingestion/scripts/run.sh new file mode 100755 index 0000000..a50c34e --- /dev/null +++ b/actions/ulog_ingestion/scripts/run.sh @@ -0,0 +1,40 @@ +#!/usr/bin/env bash + +set -euo pipefail + +SCRIPTS_ROOT=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd) +PACKAGE_ROOT=$(dirname "${SCRIPTS_ROOT}") + +# Set input_dir to $ROBOTO_INPUT_DIR if defined, else the first argument passed to this script +input_dir=${ROBOTO_INPUT_DIR:-} +if [ $# -gt 0 ]; then + input_dir=$1 +fi + +# Fail if input_dir is not an existing directory +if [ ! -d "$input_dir" ]; then + echo "Specify an existing input directory as the first argument to this script, or set the ROBOTO_INPUT_DIR environment variable" + exit 1 +fi + +# Set output_dir variable to $ROBOTO_OUTPUT_DIR if defined, else set it to "output/" in the package root (creating if necessary) +output_dir=${ROBOTO_OUTPUT_DIR:-$PACKAGE_ROOT/output} +mkdir -p $output_dir + +# Assert both directories are absolute paths +if [[ ! "$input_dir" = /* ]]; then + echo "Input directory '$input_dir' must be specified as an absolute path" + exit 1 +fi + +if [[ ! "$output_dir" = /* ]]; then + echo "Output directory '$output_dir' must be specified as an absolute path" + exit 1 +fi + +docker run --rm -it \ + -v $input_dir:/input \ + -v $output_dir:/output \ + -e ROBOTO_INPUT_DIR=/input \ + -e ROBOTO_OUTPUT_DIR=/output \ + ulog_ingestion:latest diff --git a/actions/ulog_ingestion/scripts/setup.sh b/actions/ulog_ingestion/scripts/setup.sh new file mode 100755 index 0000000..20cb740 --- /dev/null +++ b/actions/ulog_ingestion/scripts/setup.sh @@ -0,0 +1,15 @@ +#!/usr/bin/env bash + +set -euo pipefail + +SCRIPTS_ROOT=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd) +PACKAGE_ROOT=$(dirname "${SCRIPTS_ROOT}") + +venv_dir="$PACKAGE_ROOT/.venv" + +# Create a virtual environment +python -m venv --upgrade-deps $venv_dir + +# Install roboto +pip_exe="$venv_dir/bin/pip" +$pip_exe install --upgrade -r $PACKAGE_ROOT/requirements.dev.txt diff --git a/actions/ulog_ingestion/src/tests/test.ulg b/actions/ulog_ingestion/src/tests/test.ulg new file mode 100644 index 0000000..6196e98 Binary files /dev/null and b/actions/ulog_ingestion/src/tests/test.ulg differ diff --git a/actions/ulog_ingestion/src/tests/test_utils.py b/actions/ulog_ingestion/src/tests/test_utils.py new file mode 100644 index 0000000..76877f9 --- /dev/null +++ b/actions/ulog_ingestion/src/tests/test_utils.py @@ -0,0 +1,47 @@ +import os.path +import shutil +import ulog_ingestion.utils as utils +from pyulog.core import ULog + + +def test_create_mcap_file_from_ulog(tmp_path): + ulog_file_path = "./tests/test.ulg" + + test_topic_name = "vehicle_acceleration" + + output_path_per_topic_mcap = tmp_path / f"{test_topic_name}.mcap" + + ulog = ULog(ulog_file_path, [test_topic_name], True) + + schema_registry_dict = {} + + for key in ulog.message_formats: + json_schema_topic = utils.create_json_schema(ulog.message_formats[key].fields) + schema_registry_dict[key] = json_schema_topic + + for data_object in sorted(ulog.data_list, key=lambda obj: obj.name): + print(data_object.name) + utils.create_per_topic_mcap_from_ulog(output_path_per_topic_mcap, + data_object, + schema_registry_dict) + + assert output_path_per_topic_mcap.exists() + + +def test_setup_output_folder_structure(): + + ulog_file_path = "/workspace/abc/test.ulg" + input_dir = "/workspace/" + + output_folder_path, temp_dir = utils.setup_output_folder_structure(ulog_file_path, input_dir) + + assert output_folder_path == f"{temp_dir}/.VISUALIZATION_ASSETS/abc/test" + assert os.path.exists(output_folder_path) + shutil.rmtree(output_folder_path) + + +def test_is_valid_ulog(): + + ulog_file_path = "./tests/test.ulg" + is_valid = utils.is_valid_ulog(ulog_file_path) + assert is_valid is True diff --git a/actions/ulog_ingestion/src/ulog_ingestion/__init__.py b/actions/ulog_ingestion/src/ulog_ingestion/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/actions/ulog_ingestion/src/ulog_ingestion/__main__.py b/actions/ulog_ingestion/src/ulog_ingestion/__main__.py new file mode 100644 index 0000000..02aebc4 --- /dev/null +++ b/actions/ulog_ingestion/src/ulog_ingestion/__main__.py @@ -0,0 +1,296 @@ +import argparse +import os +from typing import List +from pyulog.core import ULog +import logging +import sys +import pathlib +import time +from concurrent.futures import ProcessPoolExecutor, as_completed + +from roboto.association import ( + Association, + AssociationType, +) +from roboto.domain import actions, datasets, files, topics +from roboto.http import ( + HttpClient, + SigV4AuthDecorator, +) +from roboto.transactions import TransactionManager + +import ulog_ingestion.utils as utils + +log = logging.getLogger("Ingesting ULog files to Roboto") + + +def load_env_var(env_var: actions.InvocationEnvVar) -> str: + """ + Load an environment variable, and exit if it is not found. + + Args: + - env_var: The environment variable to load. + + Returns: + - The value of the environment variable. + """ + value = os.getenv(env_var.value) + if not value: + log.error("Missing required ENV var: '%s'", env_var) + sys.exit(1) + return value + + +def setup_env(): + """ + Set up the environment for the action. + + Returns: + - A tuple containing the organization ID, input directory, output directory, topic delegate, and dataset. + """ + roboto_service_url = load_env_var(actions.InvocationEnvVar.RobotoServiceUrl) + org_id = load_env_var(actions.InvocationEnvVar.OrgId) + invocation_id = load_env_var(actions.InvocationEnvVar.InvocationId) + input_dir = load_env_var(actions.InvocationEnvVar.InputDir) + output_dir = load_env_var(actions.InvocationEnvVar.OutputDir) + + http_client = HttpClient(default_auth=SigV4AuthDecorator("execute-api")) + + topic_delegate = topics.TopicHttpDelegate( + roboto_service_base_url=roboto_service_url, http_client=http_client + ) + + invocation = actions.Invocation.from_id( + invocation_id, + invocation_delegate=actions.InvocationHttpDelegate( + roboto_service_base_url=roboto_service_url, http_client=http_client + ), + org_id=org_id, + ) + dataset = datasets.Dataset.from_id( + invocation.data_source.data_source_id, + datasets.DatasetHttpDelegate( + roboto_service_base_url=roboto_service_url, http_client=http_client + ), + files.FileClientDelegate( + roboto_service_base_url=roboto_service_url, http_client=http_client + ), + transaction_manager=TransactionManager( + roboto_service_base_url=roboto_service_url, http_client=http_client + ), + ) + + return org_id, input_dir, output_dir, topic_delegate, dataset + + +def process_data( + ulog_file_path, + ulog_data_object, + ulog_message_format_object, + message_names_with_multi_id_list, + output_dir_path_mcap, + output_dir_temp, +): + """ + Process the data from a ULog file. + + Args: + - ulog_file_path: Path to the .ulg file. + - ulog_data_object: The ULog data object to process. + - ulog_message_format_object: The ULog message object to process. + - message_names_with_multi_id_list: A list of message names with multi IDs. + - output_dir_path_mcap: The path to the output directory for the MCAP files. + - output_dir_temp: The path to the temporary output directory. + + Returns: + - None + """ + org_id, input_dir, output_dir, topic_delegate, dataset = setup_env() + + dataset_relative_path = pathlib.Path(ulog_file_path).relative_to(input_dir) + file_record = dataset.get_file_info(dataset_relative_path) + + topic_association = Association( + association_id=file_record.file_id, + association_type=AssociationType.File + ) + + topic_name = schema_name = topic_name_roboto = ulog_data_object.name + + if topic_name in message_names_with_multi_id_list: + topic_name_roboto += "_" + str(ulog_data_object.multi_id).zfill(2) + + # See link below for more context about how pyulog organizes data + # https://github.com/PX4/pyulog/blob/7819181f2c9cb0ecfbcd73d46571b0afa44f0d97/pyulog/core.py#L499-L503 + message_count = len(ulog_data_object.data["timestamp"]) + + json_schema_topic = utils.create_json_schema(ulog_message_format_object[topic_name].fields) + schema_checksum = utils.compute_checksum(json_schema_topic) + + # Create Topic Record + start_time_ns = min(ulog_data_object.data["timestamp"]) * 1000 + end_time_ns = max(ulog_data_object.data["timestamp"]) * 1000 + + topic = topics.Topic.create( + request=topics.CreateTopicRequest( + association=topic_association, + org_id=org_id, + schema_name=schema_name, + schema_checksum=schema_checksum, + topic_name=topic_name_roboto, + message_count=message_count, + start_time=start_time_ns, + end_time=end_time_ns, + ), + topic_delegate=topic_delegate, + ) + print(f"Topic created: {topic_name_roboto}") + + # Create Message Path Records + utils.create_message_path_records(topic, ulog_data_object.field_data) + + # Create MCAP File + output_path_per_topic_mcap = os.path.join( + output_dir_path_mcap, f"{topic_name_roboto}.mcap" + ) + print(f"MCAP file path: {output_path_per_topic_mcap}") + utils.create_per_topic_mcap_from_ulog( + output_path_per_topic_mcap, ulog_data_object, json_schema_topic + ) + + relative_file_name = output_path_per_topic_mcap.split(output_dir_temp)[1][1:] + + # Upload MCAP File + dataset.upload_file( + pathlib.Path(output_path_per_topic_mcap), relative_file_name + ) + + file_id = dataset.get_file_info(relative_file_name).file_id + + print( + f"Setting default representation for topic: {topic_name_roboto}, file_id: {file_id}" + ) + + # Set Default Topic Representation + topic.set_default_representation( + request=topics.SetDefaultRepresentationRequest( + association=Association( + association_type=AssociationType.File, + association_id=file_id, + ), + org_id=org_id, + storage_format=topics.RepresentationStorageFormat.MCAP, + version=1, + ) + ) + return + + +def ingest_ulog(ulog_file_path: str, topics: List[str] = None): + """ + + This function creates topic entries, message path records, and MCAP files from a ULog file. + + Args: + - ulog_file_path: Path to the .ulg file. + - messages: List of messages to process, separated by commas. + + Returns: + - None + """ + msg_filter = topics.split(",") if topics else None + print(ulog_file_path) + + ulog = ULog(ulog_file_path, msg_filter, True) + + _, input_dir, _, _, _ = setup_env() + + output_dir_path_mcap, output_dir_temp = utils.setup_output_folder_structure( + ulog_file_path, input_dir + ) + start_time = time.time() + + # This is a temporary fix for Multi Information messages with the same name + # https://docs.px4.io/main/en/dev_log/ulog_file_format.html#m-multi-information-message + # Similar to Plotjuggler approach + # TODO: handle this in a better way + message_names_with_multi_id_list = list() + for d in sorted(ulog.data_list, key=lambda obj: obj.name): + print(f"topic name: {d.name}") + if d.multi_id > 0: + message_names_with_multi_id_list.append(d.name) + + message_names_with_multi_id_list = list(set(message_names_with_multi_id_list)) + + print(f"message_names_with_multi_id_list: {message_names_with_multi_id_list}") + + # Prepare Arguments for Parallel Processing + args_list = [ + ( + ulog_file_path, + ulog_data_object, + ulog.message_formats, + message_names_with_multi_id_list, + output_dir_path_mcap, + output_dir_temp, + ) + for ulog_data_object in sorted(ulog.data_list, key=lambda obj: obj.name) + ] + + with ProcessPoolExecutor() as executor: + # Submit all tasks and collect Future objects + futures = [executor.submit(process_data, *args) for args in args_list] + + for future in as_completed(futures): + try: + print("Task completed successfully") + except Exception as exc: + print(f"Task generated an exception: {exc}") + + end_time = time.time() + elapsed_time = end_time - start_time + print(f"The topic create function took {elapsed_time} seconds to process {ulog_file_path}.") + + +parser = argparse.ArgumentParser() +parser.add_argument( + "-i", + "--input-dir", + dest="input_dir", + type=pathlib.Path, + required=False, + help="Directory containing input files to process", + default=os.environ.get(actions.InvocationEnvVar.InputDir.value), +) + +parser.add_argument( + "-o", + "--output-dir", + dest="output_dir", + type=pathlib.Path, + required=False, + help="Directory to which to write any output files to be uploaded", + default=os.environ.get(actions.InvocationEnvVar.OutputDir.value), +) + +parser.add_argument( + "-t", + "--topic-names", + dest="topic_names", + type=str, + required=False, + help="List of topic names to process, separated by commas", + default=os.environ.get("ROBOTO_PARAM_TOPICS", None), +) + +args = parser.parse_args() + +for root, dirs, f in os.walk(args.input_dir): + for file in f: + full_path = os.path.join(root, file) + if utils.is_valid_ulog(full_path): + + ingest_ulog( + ulog_file_path=full_path, + topics=args.topic_names, + ) diff --git a/actions/ulog_ingestion/src/ulog_ingestion/utils.py b/actions/ulog_ingestion/src/ulog_ingestion/utils.py new file mode 100644 index 0000000..e768411 --- /dev/null +++ b/actions/ulog_ingestion/src/ulog_ingestion/utils.py @@ -0,0 +1,323 @@ +import hashlib +import json +import os +import tempfile +from mcap.writer import Writer +from typing import Dict, List, Tuple, Any +from roboto.domain import topics + +# Mapping from message definition types to JSON schema types +TYPE_MAPPING = { + "int8_t": "integer", + "uint8_t": "integer", + "int16_t": "integer", + "uint16_t": "integer", + "int32_t": "integer", + "uint32_t": "integer", + "int64_t": "integer", + "uint64_t": "integer", + "float": "number", + "double": "number", + "bool": "boolean", + "char": "string", +} + + +# Mapping from message definition types to Roboto canonical data types +TYPE_MAPPING_CANONICAL = { + "int8_t": topics.CanonicalDataType.Number, + "uint8_t": topics.CanonicalDataType.Number, + "int16_t": topics.CanonicalDataType.Number, + "uint16_t": topics.CanonicalDataType.Number, + "int32_t": topics.CanonicalDataType.Number, + "uint32_t": topics.CanonicalDataType.Number, + "int64_t": topics.CanonicalDataType.Number, + "uint64_t": topics.CanonicalDataType.Number, + "float": topics.CanonicalDataType.Number, + "double": topics.CanonicalDataType.Number, + "bool": topics.CanonicalDataType.Boolean, + "char": topics.CanonicalDataType.String, +} + + +def compute_checksum(json_schema: Dict[str, Any]) -> str: + """ + Computes the SHA-256 checksum of a given JSON schema. + + This function serializes the JSON schema, sorts its keys for consistency, + and then computes the SHA-256 checksum of the serialized data. + + Args: + - json_schema: A dictionary representing the JSON schema. + + Returns: + - The SHA-256 checksum of the serialized JSON schema as a hexadecimal string. + """ + serialized_schema = json.dumps(json_schema, sort_keys=True).encode("utf-8") + return hashlib.sha256(serialized_schema).hexdigest() + + +def create_json_schema( + message_definition: List[Tuple[str, int, str]] +) -> Dict[str, Any]: + """ + Creates a JSON schema based on a message definition. + + This function iterates over each field in the message definition and constructs + a JSON schema. Fields starting with '_padding' are ignored. The function supports + handling both array and non-array types. + + Args: + - message_definition: A list of tuples, each representing a field in the message. + Each tuple contains the field type, array size, and field name. + + Returns: + - A dictionary representing the constructed JSON schema. + """ + schema = {"type": "object", "properties": {}, "required": []} + + for field_type, array_size, field_name in message_definition: + if field_name.startswith("_padding"): + continue + + json_field_type = TYPE_MAPPING.get(field_type, "string") + + if array_size > 1: + schema_property = { + "type": "array", + "items": {"type": json_field_type}, + "minItems": array_size, + "maxItems": array_size, + } + else: + schema_property = {"type": json_field_type} + + schema["properties"][field_name] = schema_property + schema["required"].append(field_name) + + return schema + + +def parse_values_to_json(values: List[Tuple[str, str, Any]]) -> Dict[str, Any]: + """ + Parses a list of field values into a JSON-compatible format. + + This function iterates over each tuple in the provided list, handling both + array and non-array types. It constructs a JSON object based on field names, + types, and values. Array fields are accumulated before being added to the JSON object. + + Args: + - values: A list of tuples, each containing a field name, field type, and field value. + + Returns: + - A dictionary representing the JSON object constructed from the input values. + """ + json_instance = {} + array_values = {} + + for field_name, field_type, field_value in values: + if "[" in field_name: + array_name = field_name.split("[")[0] + array_values.setdefault(array_name, []).append( + convert_value(field_type, field_value) + ) + else: + json_instance[field_name] = convert_value(field_type, field_value) + + json_instance.update(array_values) + return json_instance + + +def convert_value(field_type: str, value: Any) -> Any: + """ + Converts a field value to its corresponding JSON type. + + Args: + - field_type: The type of the field as a string. + - value: The value to be converted. + + Returns: + - The converted value in its appropriate JSON type. + """ + json_type = TYPE_MAPPING.get(field_type, "string") + if json_type == "integer": + return int(value) + elif json_type == "number": + return float(value) + elif json_type == "boolean": + return bool(int(value)) + elif json_type == "string": + return str(value) + else: + return value + + +def create_message_path_records(topic: Any, field_data: Any) -> None: + """ + Creates message path records for a given topic. + + Args: + - topic: The topic object to which the message paths will be added. + - field_data: A list of field data objects containing the message definition. + """ + array_list = list() + for field in field_data: + # For now only allow these types. TODO: Add support for nested types + + if field.type_str not in TYPE_MAPPING_CANONICAL.keys(): + canonical_data_type = topics.CanonicalDataType.Unknown + else: + canonical_data_type = TYPE_MAPPING_CANONICAL[field.type_str] + + if "[" in field.field_name: + array_name = field.field_name.split("[")[0] + array_field_type = f"{field.type_str}[]" + + if array_name not in array_list: + topic.add_message_path( + request=topics.AddMessagePathRequest( + message_path=array_name, + data_type=array_field_type, # TBD + canonical_data_type=topics.CanonicalDataType.Array, + ) + ) + + print( + f"Adding array: {array_name}, type: {array_field_type}, canonical: {topics.CanonicalDataType.Array}" + ) + + # Add another message path for the array elements + sub_array_name = f"{array_name}.[*]" + topic.add_message_path( + request=topics.AddMessagePathRequest( + message_path=sub_array_name, + data_type=field.type_str, # TBD + canonical_data_type=canonical_data_type, + ) + ) + + print( + f"Adding sub-field for array: {sub_array_name}, type: {field.type_str}, canonical: {canonical_data_type}" + ) + + array_list.append(array_name) + else: + + topic.add_message_path( + request=topics.AddMessagePathRequest( + message_path=field.field_name, + data_type=field.type_str, + canonical_data_type=canonical_data_type, + ) + ) + + print( + f"Adding field: {field.field_name}, type: {field.type_str}, canonical: {canonical_data_type}" + ) + + return + + +def create_per_topic_mcap_from_ulog( + output_path_per_topic_mcap: Any, d: str, json_schema_topic: Any +) -> None: + """ + Creates a per-topic MCAP file from a ULog object. + + Args: + - output_path_per_topic_mcap: The path to the output MCAP file. + - d: The ULog object containing the data to be converted. + - json_schema_topic: The json schema dict for the topic. + + Returns: + - None + """ + + with open(output_path_per_topic_mcap, "wb") as stream: + writer = Writer(stream) + writer.start() + + schema_id = writer.register_schema( + name=d.name, + encoding="jsonschema", + data=json.dumps(json_schema_topic).encode(), + ) + + channel_id = writer.register_channel( + schema_id=schema_id, + topic=d.name, + message_encoding="json", + ) + + for i in range(len(d.data["timestamp"])): + values = list() + for f in d.field_data: + values.append((f.field_name, f.type_str, d.data[f.field_name][i])) + json_msg_instance = parse_values_to_json(values) + + timestamp_ns = int(d.data["timestamp"][i] * 1000) + + writer.add_message( + channel_id=channel_id, + log_time=timestamp_ns, + data=json.dumps(json_msg_instance).encode("utf-8"), + publish_time=timestamp_ns, + ) + + writer.finish() + return + + +def setup_output_folder_structure( + ulog_file_path: str, input_dir: str +) -> Tuple[str, str]: + """ + Set up the output folder structure for the .mcap files. + + Args: + - ulog_file_path: Path to the .ulg file. + - input_dir: Path to the input directory. + """ + relative_folder_path_of_file = os.path.split(ulog_file_path.split(input_dir)[1])[0] + + ulog_file_name = os.path.split(ulog_file_path)[1] + + output_folder_name_ulog = ulog_file_name.replace(".ulg", "") + relative_folder_path_of_file = relative_folder_path_of_file.lstrip("/") + temp_dir = str(tempfile.TemporaryDirectory().name) + + output_folder_path = os.path.join( + temp_dir, + ".VISUALIZATION_ASSETS", + relative_folder_path_of_file, + output_folder_name_ulog, + ) + + print(f"Output folder path: {output_folder_path}") + os.makedirs(output_folder_path, exist_ok=True) + + return output_folder_path, temp_dir + + +def is_valid_ulog(ulog_file_path: str) -> bool: + """ + Check if the given file is a valid .ulg file. + + Args: + - ulog_file_path: Path to the .ulg file. + + Returns: + - True if the file is a valid .ulg file, False otherwise. + """ + + header_bytes = b'\x55\x4c\x6f\x67\x01\x12\x35' + + with open(ulog_file_path, "rb") as file_handle: + + header_data = file_handle.read(16) + if len(header_data) != 16: + raise TypeError("Invalid ULog file format (Header too short)") + if header_data[:7] != header_bytes: + raise TypeError("Invalid ULog file format (Failed to parse header)") + + return True