Skip to content

Commit

Permalink
Switch to file-based tagging
Browse files Browse the repository at this point in the history
  • Loading branch information
YvesSchoenberg committed Apr 23, 2024
1 parent e8d9fd9 commit 67a289a
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 58 deletions.
4 changes: 2 additions & 2 deletions actions/tag_ulog/requirements.runtime.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# Python packages to install within the Docker image associated with this Action.
roboto
pyulog
roboto==0.2.14
pyulog==1.0.2
202 changes: 146 additions & 56 deletions actions/tag_ulog/src/tag_ulog/__main__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,47 @@
from pyulog.core import ULog
from roboto.domain import actions

import argparse
import logging
import os
import pathlib
import json
from typing import Union
import re
import sys

from roboto.domain import actions, datasets, files, http_delegates
from roboto.env import RobotoEnvKey
from roboto.http import (
HttpClient,
SigV4AuthDecorator,
)
from roboto import updates

from roboto.transactions.transaction_manager import TransactionManager

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def load_env_var(env_var: RobotoEnvKey, strict=True) -> Union[str, None]:
"""
Load an environment variable, and exit if it is not found.
Args:
- env_var: The environment variable to load.
Returns:
- The value of the environment variable.
"""
value = os.getenv(env_var.value)
if not value:
if strict:
logger.error("Missing required ENV var: '%s'", env_var)
sys.exit(1)
else:
return None
return value


def extract_text_between_brackets(text):
match = re.search(r'\[([^\]]+)\]', text)
if match:
Expand All @@ -19,60 +50,119 @@ def extract_text_between_brackets(text):
return None


parser = argparse.ArgumentParser()
parser.add_argument(
"-i",
"--input-dir",
dest="input_dir",
type=pathlib.Path,
required=False,
help="Directory containing input files to process",
default=os.environ.get(actions.InvocationEnvVar.InputDir.value),
)
def main(args):

parser.add_argument(
"-o",
"--output-dir",
dest="output_dir",
type=pathlib.Path,
required=False,
help="Directory to which to write any output files to be uploaded",
default=os.environ.get(actions.InvocationEnvVar.OutputDir.value),
)
input_dir = args.input_dir
output_dir = args.output_dir
dataset_metadata_path = args.dataset_metadata_path
files_metadata_path = args.files_metadata_path

if not input_dir or not output_dir or not dataset_metadata_path or not files_metadata_path:
error_msg = "Set ROBOTO_INPUT_DIR, ROBOTO_OUTPUT_DIR and ROBOTO_DATASET_METADATA_CHANGESET_FILE, ROBOTO_FILE_METADATA_CHANGESET_FILE env variables."
logger.error(error_msg)
raise RuntimeError(error_msg)

# get dataset id
invocation_id = load_env_var(RobotoEnvKey.InvocationId, strict=False)
logger.info(f"{invocation_id=}")

# If inside an invocation, get info for file-level tagging
if invocation_id:
# Setup and authorize HTTP client
client = HttpClient(default_auth=SigV4AuthDecorator("execute-api"))
service_url = load_env_var(RobotoEnvKey.RobotoServiceUrl)

delegate = http_delegates.HttpDelegates.from_client(http=client, endpoint=service_url)
invocation = actions.invocation.Invocation.from_id(invocation_id, delegate.invocations)
dataset_id = invocation.data_source.data_source_id
logger.info(f"{dataset_id}=")
transaction_manager = TransactionManager(service_url, client)

dataset = datasets.dataset.Dataset.from_id(dataset_id, dataset_delegate=delegate.datasets,
file_delegate=delegate.files,
transaction_manager=transaction_manager)
else:
dataset = None

for root, dirs, paths in os.walk(input_dir):
for file in paths:
# Check if the file ends with .ulg
if file.endswith(".ulg"):
_, ulg_file_name = os.path.split(file)
msg_filter = []
full_path = os.path.join(root, file)
ulog = ULog(full_path, msg_filter, True)
file_put_tags = list()
for m in ulog.logged_messages:
print(m.log_level_str() + " " + m.message)
if m.log_level_str() == "ERROR":
module_name = extract_text_between_brackets(m.message)
if module_name is not None:
module_name = "ERROR_" + module_name
if module_name not in file_put_tags:
file_put_tags.append(module_name)
continue

if dataset and file_put_tags:
for tag in file_put_tags:
# build a MetadataChangeset with put_tag direction
changeset = updates.MetadataChangeset().Builder().put_tag(tag).build()
ulg_file = dataset.get_file_info(file)
file_update_request = files.file_requests.UpdateFileRecordRequest(metadata_changeset=changeset)
# pass request to add metadata
ulg_file = ulg_file.update(file_update_request)
logger.info(f"Tagging {file} with {tag}")

# with open(dataset_metadata_path, "w") as json_file:
# metadata_dict = {
# "put_tags": list(set(put_tags)),
# "remove_tags": [],
# "put_fields": put_fields,
# "remove_fields": [],
# }
# logger.info(f"Writing {dataset_metadata_path}...")
# json.dump(metadata_dict, json_file, indent=4)


if __name__ == "__main__":
parser = argparse.ArgumentParser()

parser.add_argument(
"-i",
"--input-dir",
dest="input_dir",
type=pathlib.Path,
required=False,
help="Directory containing input files to process",
default=load_env_var(RobotoEnvKey.InputDir),
)
parser.add_argument(
"-o",
"--output-dir",
dest="output_dir",
type=pathlib.Path,
required=False,
help="Directory to which to write any output files to be uploaded",
default=load_env_var(RobotoEnvKey.OutputDir)
)
parser.add_argument(
"-d",
"--dataset-metadata-path",
dest="dataset_metadata_path",
type=pathlib.Path,
required=False,
help="Path at which to save dataset-level metadata",
default=load_env_var(RobotoEnvKey.DatasetMetadataChangesetFile)
)
parser.add_argument(
"-f",
"--files-metadata-path",
dest="files_metadata_path",
type=pathlib.Path,
required=False,
help="Path at which to save file-level metadata",
default=load_env_var(RobotoEnvKey.FileMetadataChangesetFile)
)

args = parser.parse_args()

output_path_metadata = os.getenv("ROBOTO_DATASET_METADATA_CHANGESET_FILE", "/output/changeset.json")

put_tags = list()

put_fields = dict()

for root, dirs, files in os.walk(args.input_dir):
for file in files:
# Check if the file ends with .ulg
if file.endswith(".ulg"):
_, ulg_file_name = os.path.split(file)
msg_filter = []
full_path = os.path.join(root, file)
ulog = ULog(full_path, msg_filter, True)
for m in ulog.logged_messages:
print(m.log_level_str() + " " + m.message)
if m.log_level_str() == "ERROR":
module_name = extract_text_between_brackets(m.message)
if module_name is not None:
module_name = "ERROR_" + module_name
if module_name not in put_tags:
put_tags.append(module_name)
continue

with open(output_path_metadata, "w") as json_file:
metadata_dict = {
"put_tags": list(set(put_tags)),
"remove_tags": [],
"put_fields": put_fields,
"remove_fields": [],
}
logger.info(f"Writing {output_path_metadata}...")
json.dump(metadata_dict, json_file, indent=4)
args = parser.parse_args()
main(args)

0 comments on commit 67a289a

Please sign in to comment.