Skip to content

Commit

Permalink
Several updates for transform for transform builder (#332)
Browse files Browse the repository at this point in the history
-  APP-4601 - [pleb] add jmespath custom functions to pleb to centralize that functionality to be used across apps.
-  APP-4604 - [transform] Add Processing Functions class to include pre-defined functions that can be used in transform builder and across TIE apps.
-  APP-4605 - [transform] normalize the way null/empty values are handled in transforms, and include empty string ''.
-  APP-4620 - [transform] Add structured/contextualized exceptions to transform to be able to deliver detailed error messages to users.
  • Loading branch information
cblades-tc authored Oct 7, 2024
1 parent cfbe18e commit 58d1618
Show file tree
Hide file tree
Showing 11 changed files with 444 additions and 85 deletions.
1 change: 1 addition & 0 deletions .cspell/custom-dictionary-workspace.txt
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ threatassess
threatconnect
timedelta
tinydb
titlecase
toplevel
traceback
triggerservice
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ disable = [
"too-many-lines",
"too-many-locals",
"too-many-nested-blocks",
"too-many-positional-arguments",
"too-many-public-methods",
"too-many-statements",
"unspecified-encoding", # TODO: fix issues and remove this disable
Expand Down
7 changes: 7 additions & 0 deletions release_notes.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Release Notes

## 4.0.7
- APP-4601 - [pleb] add jmespath custom functions to pleb to centralize that functionality to be used across apps.
- APP-4604 - [transform] Add Processing Functions class to include pre-defined functions that can be used in transform builder and across TIE apps.
- APP-4605 - [transform] normalize the way null/empty values are handled in transforms, and include empty string ''.
- APP-4620 - [transform] Add structured/contextualized exceptions to transform to be able to deliver detailed error messages to users.


## 4.0.6

- APP-4472 - [API] Added NAICS industry classification module
Expand Down
2 changes: 1 addition & 1 deletion tcex/__metadata__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""TcEx Framework Module"""

__license__ = 'Apache-2.0'
__version__ = '4.0.6'
__version__ = '4.0.7'
13 changes: 12 additions & 1 deletion tcex/api/tc/ti_transform/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
"""TcEx Framework Module"""

# first-party
from tcex.api.tc.ti_transform.ti_predefined_functions import (
ProcessingFunctions,
transform_builder_to_model,
)
from tcex.api.tc.ti_transform.ti_transform import TiTransform, TiTransforms
from tcex.api.tc.ti_transform.transform_abc import TransformException

__all__ = ['TiTransform', 'TiTransforms']
__all__ = [
'ProcessingFunctions',
'TiTransform',
'TiTransforms',
'TransformException',
'transform_builder_to_model',
]
314 changes: 302 additions & 12 deletions tcex/api/tc/ti_transform/ti_predefined_functions.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,311 @@
"""TcEx Framework Module"""

# standard library
import hashlib
import json
import uuid
from collections.abc import Iterable
from inspect import _empty, signature
from typing import TypedDict

# first-party
from tcex.util import Util
# first-part
from tcex.api.tc.ti_transform.model.transform_model import (
GroupTransformModel,
IndicatorTransformModel,
)


class TransformBuilderExport(TypedDict):
"""Basic definition of a transform exported from Transform Builder."""

type: str
transform: dict


def transform_builder_to_model(
transform: TransformBuilderExport,
processing_functions: 'ProcessingFunctions',
) -> IndicatorTransformModel | GroupTransformModel:
"""Convert a transform from Transform Builder to one of the tcex transform models."""

def find_entries(data, key) -> Iterable[dict]:
"""Find entries in a dict with a given name, regardless of depth."""
if isinstance(data, dict):
for k, v in data.items():
if k == key:
yield v
elif isinstance(v, (dict, list)):
yield from find_entries(v, key)
elif isinstance(data, list):
for item in data:
yield from find_entries(item, key)

for processing in find_entries(transform['transform'], 'transform'):
if not isinstance(processing, list):
processing = [processing]

for step in processing:
step.update(processing_functions.translate_def_to_fn(step))

match transform['type'].lower():
case 'indicator':
return IndicatorTransformModel(**transform['transform'])
case 'group':
return GroupTransformModel(**transform['transform'])
case _:
raise TypeError(f'Unknown transform type: {transform["type"]}')


class ParamDefinition(TypedDict):
"""Parameter definition for use in the transform builder UI."""

default: str | None
help: str
label: str
name: str
required: bool
type: str


class FunctionDefinition(TypedDict):
"""Function definition for use in the transform builder UI."""

name: str
label: str
help: str
params: list[ParamDefinition]


def custom_function_definition(definition: FunctionDefinition):
"""Attach a custom function definition to the function."""

def _decorator(fn):
setattr(fn, '_tcex_function_definition', definition)
return fn

return _decorator


class ProcessingFunctions:
"""Predefined functions to use in transforms."""

def __init__(self, tcex) -> None:
"""."""
self.tcex = tcex

def custom(self, value, description: str):
"""Allow for custom processing to be described."""
raise RuntimeError(f'Custom function not implemented: {description}')

def static_map(self, value, mapping: dict):
"""Map values to static values.
If there is no matching value in the mapping the original value will be returned.
"""
if not isinstance(mapping, dict):
mapping = json.loads(mapping)
return mapping.get(str(value), value)

def value_in(self, value, values: str, delimiter: str = ','):
"""Return the value if it is in the list of values, else return None."""
if not values.startswith('"'):
values.replace('"', '\"')
values = f'"{values}"'

return value if value in [v.strip() for v in json.loads(values).split(delimiter)] else None

@custom_function_definition(
{
'name': 'format_table',
'label': 'Format Objects as Markdown Table',
'help': 'Format a list of objects as a markdown table.',
'params': [
{
'default': None,
'name': 'column_order',
'label': 'Column Order',
'type': 'str',
'help': 'The order of the columns.',
'required': False,
}
],
}
)
def format_table(self, value, column_order: str):
"""Format a markdown table.
value should be a list of objects that all have the same attributes. The table will contain
one row for each object in the list, and one column for each attribute of the objects.
Keyword Args:
Column Order - The order of the columns.
"""
if column_order:
order = [c.strip() for c in column_order.split(',')]
else:
order = list(value[0].keys())

table = ''
table += f'|{"|".join(order) }|\n'
table += f'|{"|".join(["-" for o in order]) }|\n'
for row in value:
table += f'|{"|".join([str(row.get(o, "")) for o in order]) }|\n'

return table

def any_to_datetime(self, value):
"""Convert any value to a datetime object."""
return self.tcex.util.any_to_datetime(value)

def append(self, value, suffix: str):
"""Append a value to the input value."""
return f'{value}{suffix}'

def prepend(self, value, prefix: str):
"""Prepend a value to the input value."""
return f'{prefix}{value}'

def replace(self, value, old_value: str, new_value: str = ''):
"""Replace a value in the input value."""
return value.replace(old_value, new_value)

def to_uppercase(self, value):
"""Convert value to uppercase."""
return str.upper(value)

def to_lowercase(self, value):
"""Convert value to lowercase."""
return str.lower(value)

def to_titlecase(self, value):
"""Convert value to titlecase."""
return str.title(value)

def truncate(self, value, length: int, append_chars: str = '...'):
"""Truncate a string."""
return self.tcex.util.truncate_string(value, length=length, append_chars=append_chars)

def hash(self, value) -> str:
"""Hash the given value."""
return hashlib.sha256(str(value).encode('utf-8')).hexdigest()

def split(self, value, delimiter: str = ','):
"""Split a string into a list."""
return [v.strip() for v in value.split(delimiter)]

def remove_surrounding_whitespace(self, value):
"""Strip leading and trailing whitespace from a string."""
return value.strip()

@custom_function_definition({'name': 'uuid5', 'label': 'To UUID5', 'help': '', 'params': []})
def uuid5(self, value, namespace=None) -> str:
"""Generate a UUID5."""
return str(uuid.uuid5(namespace or uuid.NAMESPACE_DNS, value))

def convert_to_MITRE_tag(self, value) -> str | None:
"""Transform MITRE tags to TC format."""
return self.tcex.api.tc.v3.mitre_tags.get_by_id_regex(value, value)

def translate_def_to_fn(self, api_def: dict):
"""Translate a function definition in transform builder/API format to an actual function."""

translated = api_def.copy()

type_ = 'method' if 'method' in api_def else 'for_each'

if not type_:
raise ValueError('No method or for_each key found in definition.')

fn_name = api_def[type_]

if callable(fn_name):
return api_def

fn = getattr(self, fn_name)

if not fn:
raise ValueError(f'Unknown function: {fn_name}')

translated[type_] = fn

if 'kwargs' in api_def:
sig = signature(fn)

for kwarg in api_def['kwargs']:
if kwarg not in sig.parameters:
raise ValueError(f'Unknown argument {kwarg} for function {fn_name}')

annotation = sig.parameters[kwarg].annotation

match annotation():
case dict():
translated['kwargs'][kwarg] = json.loads(api_def['kwargs'][kwarg])
case _:
translated['kwargs'][kwarg] = sig.parameters[kwarg].annotation(
api_def['kwargs'][kwarg]
)

return translated

def get_function_definitions(self) -> list[FunctionDefinition]:
"""Get function definitions in JSON format, suitable for the transform builder UI."""

def _is_function(obj):
return type(obj).__name__ == 'method'

fns = [
fn
for fn in (
getattr(self, n)
for n in dir(self)
if not n.startswith('_')
and n not in ('get_function_definitions', 'translate_def_to_fn')
)
if _is_function(fn)
]

to_upper_case = str.upper
to_lower_case = str.lower
to_title_case = str.title
specs = [
getattr(fn, '_tcex_function_definition', {})
or {
'name': fn.__name__,
'label': self._snake_to_titlecase(fn.__name__),
'params': self._get_params_defs(fn),
'help': getattr(fn, '__doc__', ''),
}
for fn in fns
]

prepend = '{prefix} {}'.format
append = '{} {suffix}'.format
return specs # type: ignore

@staticmethod
def _snake_to_titlecase(name):
return name.replace('_', ' ').title()

def replace(value: str, find: str, replace_with: str) -> str:
"""Replace value in string."""
return value.replace(find, replace_with)
@staticmethod
def _get_params_defs(fn) -> list[ParamDefinition]:
"""Get the arguments for a function.
Args:
fn (function): The function to get the arguments for.
def format_datetime(value: str):
"""Format datetime."""
return Util.any_to_datetime(value).strftime('%Y-%m-%dT%H:%M:%SZ')
Returns:
list: A list of dictionaries containing the argument name and type.
"""
sig = signature(fn)
params = [n for n in sig.parameters if n not in ('self', 'value')]
return [
{
'default': (
sig.parameters[p].default if sig.parameters[p].default != _empty else None
),
'name': p,
'label': ProcessingFunctions._snake_to_titlecase(p),
'type': (
sig.parameters[p].annotation.__name__ if sig.parameters[p].annotation else 'str'
),
'help': '',
'required': True,
}
for p in params
]
Loading

0 comments on commit 58d1618

Please sign in to comment.