From 82d601a41788aa6efe98a2b09e9c37a08afc5358 Mon Sep 17 00:00:00 2001 From: Bracey Summers Date: Tue, 7 Feb 2023 14:01:33 -0600 Subject: [PATCH] APP-3859 - updated magic variable lookup to handle core API change where role is no longer returned in API data. --- tcex/api/tc/utils/threat_intel_utils.py | 24 +++++++++---------- .../tc/v2/threat_intelligence/ti_helpers.py | 7 +++--- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/tcex/api/tc/utils/threat_intel_utils.py b/tcex/api/tc/utils/threat_intel_utils.py index 5a9750568..e33269bfb 100644 --- a/tcex/api/tc/utils/threat_intel_utils.py +++ b/tcex/api/tc/utils/threat_intel_utils.py @@ -205,19 +205,29 @@ def resolve_variables(self, inputs: List[str]) -> List[str]: """ resolved_inputs = [] for input_ in inputs: + # handle null inputs if not input_: resolved_inputs.append(None) continue - if input_.strip() not in self.resolvable_variables: + + # clean up input + input_ = input_.strip() + + # handle unknown input types + if input_ not in self.resolvable_variables: resolved_inputs.append(input_) continue - input_ = input_.strip() + + # special handling of group types (no API request required) if input_ == '${GROUP_TYPES}': for type_ in self.group_types: resolved_inputs.append(type_) continue + # get variable settings resolvable_variable_details = self.resolvable_variables[input_] + + # make API call to retrieve variable data r = self.session_tc.get( resolvable_variable_details.get('url'), params={'resultLimit': 10_000} ) @@ -226,16 +236,6 @@ def resolve_variables(self, inputs: List[str]) -> List[str]: raise RuntimeError(f'Could not retrieve {input_} from ThreatConnect API.') json_ = r.json() - # No TQL filter to filter out API users during REST call so have to do it manually here. - if input_ in ['${API_USERS}', '${USERS}']: - temp_data = [] - for item in json_.get('data', []): - if item.get('role') == 'Api User' and input_ == '${API_USERS}': - temp_data.append(item) - elif item.get('role') != 'Api User' and input_ == '${USERS}': - temp_data.append(item) - json_['data'] = temp_data - for item in jmespath.search(resolvable_variable_details.get('jmspath'), json_): resolved_inputs.append(str(item)) diff --git a/tests/api/tc/v2/threat_intelligence/ti_helpers.py b/tests/api/tc/v2/threat_intelligence/ti_helpers.py index 429dca937..e31f23391 100644 --- a/tests/api/tc/v2/threat_intelligence/ti_helpers.py +++ b/tests/api/tc/v2/threat_intelligence/ti_helpers.py @@ -13,6 +13,7 @@ # first-party from tcex import TcEx from tcex.api.tc.v2.threat_intelligence import ThreatIntelligence + from tcex.api.tc.v2.threat_intelligence.mappings.indicator.indicator import Indicator class TIHelper: @@ -238,7 +239,7 @@ def create_group(self, **kwargs): return ti - def create_indicator(self, indicator_type=None, **kwargs): + def create_indicator(self, indicator_type=None, **kwargs) -> 'Indicator': """Create an case. If a case_name is not provide a dynamic case name will be used. @@ -479,7 +480,7 @@ class TestThreatIntelligence: owner = None required_fields = {} ti = None - ti_helper = None + ti_helper: TIHelper def teardown_method(self): """Clean up resources""" @@ -766,7 +767,7 @@ def indicator_add_attribute(self, request): ti_data = response_data.get('data', {}).get('attribute') # assert response - assert r.status_code == 201 + assert r.status_code == 201, f'(status-code={r.status_code}, message={r.text})' assert response_data.get('status') == 'Success' # validate ti data