Skip to content

Commit

Permalink
Add better error handling when parsing transform from transform build…
Browse files Browse the repository at this point in the history
…er (#334)

 -  APP-4632 - [transform] Add attribute.pinned field
  • Loading branch information
cblades-tc authored Oct 17, 2024
1 parent 0d25ce3 commit 1554516
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 37 deletions.
1 change: 1 addition & 0 deletions release_notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
- APP-4604 - [transform] Add Processing Functions class to include pre-defined functions that can be used in transform builder and across TIE apps.
- APP-4605 - [transform] normalize the way null/empty values are handled in transforms, and include empty string ''.
- APP-4620 - [transform] Add structured/contextualized exceptions to transform to be able to deliver detailed error messages to users.
- APP-4632 - [transform] Add attribute.pinned field


## 4.0.6
Expand Down
1 change: 1 addition & 0 deletions tcex/api/tc/ti_transform/model/transform_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ class AttributeTransformModel(ValueTransformModel, extra=Extra.forbid):
displayed: bool | MetadataTransformModel = Field(False, description='')
source: str | MetadataTransformModel | None = Field(None, description='')
type: str | MetadataTransformModel = Field(..., description='')
pinned: bool | MetadataTransformModel = Field(False, description='')


class SecurityLabelTransformModel(ValueTransformModel, extra=Extra.forbid):
Expand Down
83 changes: 48 additions & 35 deletions tcex/api/tc/ti_transform/ti_predefined_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,31 @@ def transform_builder_to_model(
) -> IndicatorTransformModel | GroupTransformModel:
"""Convert a transform from Transform Builder to one of the tcex transform models."""

def find_entries(data, key) -> Iterable[dict]:
def find_entries(data, key, context='') -> Iterable[tuple[str, dict]]:
"""Find entries in a dict with a given name, regardless of depth."""
if isinstance(data, dict):
for k, v in data.items():
if k == key:
yield v
yield (context, v)
elif isinstance(v, (dict, list)):
yield from find_entries(v, key)
yield from find_entries(v, key, context=f'{context}.{k}' if context else k)
elif isinstance(data, list):
for item in data:
yield from find_entries(item, key)
for i, item in enumerate(data):
yield from find_entries(item, key, context=f'{context}[{i}]')

for processing in find_entries(transform['transform'], 'transform'):
for context, processing in find_entries(transform['transform'], 'transform'):
if not isinstance(processing, list):
processing = [processing]

for step in processing:
step.update(processing_functions.translate_def_to_fn(step))
fn_name = step.get('for_each') or step.get('method')
step.update(
processing_functions.translate_def_to_fn(
step,
f'{context}, Processing Function: '
f'{processing_functions._snake_to_titlecase(fn_name)}',
)
)

match transform['type'].lower():
case 'indicator':
Expand Down Expand Up @@ -156,7 +163,7 @@ def format_table(self, value, column_order: str):

def any_to_datetime(self, value):
"""Convert any value to a datetime object."""
return self.tcex.util.any_to_datetime(value)
return self.tcex.util.any_to_datetime(value).isoformat()

def append(self, value, suffix: str):
"""Append a value to the input value."""
Expand Down Expand Up @@ -207,46 +214,52 @@ def convert_to_MITRE_tag(self, value) -> str | None:
"""Transform MITRE tags to TC format."""
return self.tcex.api.tc.v3.mitre_tags.get_by_id_regex(value, value)

def translate_def_to_fn(self, api_def: dict):
def translate_def_to_fn(self, api_def: dict, context: str):
"""Translate a function definition in transform builder/API format to an actual function."""
additional_context = ''
try:
translated = api_def.copy()

translated = api_def.copy()

type_ = 'method' if 'method' in api_def else 'for_each'
type_ = 'method' if 'method' in api_def else 'for_each'

if not type_:
raise ValueError('No method or for_each key found in definition.')
if not type_:
raise ValueError('No method or for_each key found in definition.')

fn_name = api_def[type_]
fn_name = api_def[type_]

if callable(fn_name):
return api_def
if callable(fn_name):
return api_def

fn = getattr(self, fn_name)
fn = getattr(self, fn_name)

if not fn:
raise ValueError(f'Unknown function: {fn_name}')
if not fn:
raise ValueError(f'Unknown function: {fn_name}')

translated[type_] = fn
translated[type_] = fn

if 'kwargs' in api_def:
sig = signature(fn)
if 'kwargs' in api_def:
sig = signature(fn)

for kwarg in api_def['kwargs']:
if kwarg not in sig.parameters:
raise ValueError(f'Unknown argument {kwarg} for function {fn_name}')
for kwarg in api_def['kwargs']:
if kwarg not in sig.parameters:
raise ValueError(f'Unknown argument {kwarg} for function {fn_name}')

annotation = sig.parameters[kwarg].annotation
annotation = sig.parameters[kwarg].annotation
additional_context = f', Argument: {kwarg}'
match annotation():
case dict():
translated['kwargs'][kwarg] = json.loads(api_def['kwargs'][kwarg])
case _:
translated['kwargs'][kwarg] = sig.parameters[kwarg].annotation(
api_def['kwargs'][kwarg]
)

match annotation():
case dict():
translated['kwargs'][kwarg] = json.loads(api_def['kwargs'][kwarg])
case _:
translated['kwargs'][kwarg] = sig.parameters[kwarg].annotation(
api_def['kwargs'][kwarg]
)
return translated
except Exception as e:
# first-party
from tcex.api.tc.ti_transform import TransformException

return translated
raise TransformException(f'{context}{additional_context}', e, context=api_def)

def get_function_definitions(self) -> list[FunctionDefinition]:
"""Get function definitions in JSON format, suitable for the transform builder UI."""
Expand Down
5 changes: 5 additions & 0 deletions tcex/api/tc/ti_transform/ti_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def add_attribute(
type_: str,
value: str,
displayed: bool = False,
pinned: bool = False,
source: str | None = None,
):
"""Add an attribute to the transformed item."""
Expand All @@ -104,6 +105,10 @@ def add_attribute(
if displayed is True:
attribute_data['displayed'] = displayed

# pinned is a special case, it only needs to be added if True
if pinned is True:
attribute_data['pinned'] = displayed

# source is a special case, it only needs to be added if not None
if source is not None:
attribute_data['source'] = source
Expand Down
8 changes: 6 additions & 2 deletions tcex/api/tc/ti_transform/transform_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,12 @@ def _process_attributes(self, attributes: list[AttributeTransformModel]):
types = self._process_metadata_transform_model(attribute.type, len(values))
source = self._process_metadata_transform_model(attribute.source, len(values))
displayed = self._process_metadata_transform_model(attribute.displayed, len(values))
pinned = self._process_metadata_transform_model(attribute.pinned, len(values))

param_keys = ['type_', 'value', 'displayed', 'source']
params = [dict(zip(param_keys, p)) for p in zip(types, values, displayed, source)]
param_keys = ['type_', 'value', 'displayed', 'pinned', 'source']
params = [
dict(zip(param_keys, p)) for p in zip(types, values, displayed, pinned, source)
]

for param in params:
param = self.util.remove_none(param)
Expand Down Expand Up @@ -654,6 +657,7 @@ def add_attribute(
type_: str,
value: str,
displayed: bool = False,
pinned: bool = False,
source: str | None = None,
):
"""Abstract method"""
Expand Down

0 comments on commit 1554516

Please sign in to comment.