diff --git a/.cspell/custom-dictionary-workspace.txt b/.cspell/custom-dictionary-workspace.txt index 9848cc1f8..d606d6efa 100644 --- a/.cspell/custom-dictionary-workspace.txt +++ b/.cspell/custom-dictionary-workspace.txt @@ -186,6 +186,7 @@ stringarray Stringdummy strptime suborg +subtechniques suricata targetable taskid diff --git a/release_notes.md b/release_notes.md index c105b6e39..dfa44169f 100644 --- a/release_notes.md +++ b/release_notes.md @@ -1,10 +1,12 @@ # Release Notes -### 4.0.2 +## 4.0.2 -- APP-4155 [API] Added Mitre Attack module for lookup by id or name +- APP-4155 - [API] Added Mitre Attack module for lookup by id or name +- APP-4175 - [API] Updated API to support TC 7.3 API changes +- APP-4187 - [Playbook] Updated inputs to support TC variable nested in a Playbook variable (Key Value List) -### 4.0.1 +## 4.0.1 - APP-4055 - [API] Updated v3 gen body to allow 0 and false in body - APP-4056 - [API] Updated Transforms to Support Email Group Type @@ -13,7 +15,7 @@ - APP-4107 - [Config] Updated config submodule (tcex.json model) to support legacy App Builder Apps - APP-4108 - [API] Removed max_length and min_length from TI models -### 4.0.0 +## 4.0.0 - APP-3910 - [TcEx] Updated typing hint to Python 3.11 standards - APP-3911 - [TcEx] General code enhancement diff --git a/tcex/__metadata__.py b/tcex/__metadata__.py index 0913704ee..1a4990914 100644 --- a/tcex/__metadata__.py +++ b/tcex/__metadata__.py @@ -1,3 +1,3 @@ """TcEx Framework Module""" __license__ = 'Apache-2.0' -__version__ = '4.0.1' +__version__ = '4.0.2' diff --git a/tcex/api/tc/v3/_gen/_gen.py b/tcex/api/tc/v3/_gen/_gen.py index 2d6d39a9f..ef5569dd4 100644 --- a/tcex/api/tc/v3/_gen/_gen.py +++ b/tcex/api/tc/v3/_gen/_gen.py @@ -89,7 +89,7 @@ def gen_filter(type_: SnakeString): out_file = Path(os.path.join(*out_path)) if not out_file.is_file(): - Render.panel.failure(f'\nCould not find file {out_file}.') + out_file.write_text('') # generate class methods first so requirements can be updated class_methods = gen.gen_class_methods() @@ -119,7 +119,7 @@ def gen_model(type_: SnakeString): out_file = Path(os.path.join(*out_path)) if not out_file.is_file(): - Render.panel.failure(f'\nCould not find file {out_file}.') + out_file.write_text('') # generate model fields code first so that requirements can be determined container_private_attrs = gen.gen_container_private_attrs() @@ -166,7 +166,7 @@ def gen_object(type_: SnakeString): out_file = Path(os.path.join(*out_path)) if not out_file.is_file(): - Render.panel.failure(f'\nCould not find file {out_file}.') + out_file.write_text('') # generate class method code first so that requirements can be determined container_methods = gen.gen_container_methods() @@ -198,30 +198,42 @@ def gen_object(type_: SnakeString): class ObjectTypes(str, Enum): """Object Types""" - # adversary_assets = 'adversary_assets' + # shared + tags = 'tags' + + # case management artifacts = 'artifacts' artifact_types = 'artifact_types' - attribute_types = 'attribute_types' cases = 'cases' case_attributes = 'case_attributes' - group_attributes = 'group_attributes' - groups = 'groups' - indicator_attributes = 'indicator_attributes' - indicators = 'indicators' notes = 'notes' + tasks = 'tasks' + workflow_events = 'workflow_events' + workflow_templates = 'workflow_templates' + + # intel requirements + intel_requirements = 'intel_requirements' + categories = 'categories' + results = 'results' + subtypes = 'subtypes' + + # security owner_roles = 'owner_roles' owners = 'owners' - security_labels = 'security_labels' - system_roles = 'system_roles' - tags = 'tags' - tasks = 'tasks' users = 'users' user_groups = 'user_groups' + system_roles = 'system_roles' + + # threat intelligence + attribute_types = 'attribute_types' + group_attributes = 'group_attributes' + groups = 'groups' + indicator_attributes = 'indicator_attributes' + indicators = 'indicators' + security_labels = 'security_labels' victims = 'victims' victim_assets = 'victim_assets' victim_attributes = 'victim_attributes' - workflow_events = 'workflow_events' - workflow_templates = 'workflow_templates' class GenTypes(str, Enum): @@ -248,10 +260,10 @@ def all( # pylint: disable=redefined-builtin for type_ in ObjectTypes: type_ = util.snake_string(type_.value) - if gen_type_ in ['all', 'filter']: - gen_filter(type_) if gen_type_ in ['all', 'model']: gen_model(type_) + if gen_type_ in ['all', 'filter']: + gen_filter(type_) if gen_type_ in ['all', 'object']: gen_object(type_) diff --git a/tcex/api/tc/v3/_gen/_gen_abc.py b/tcex/api/tc/v3/_gen/_gen_abc.py index b9d51f9b5..cd7f6f2f8 100644 --- a/tcex/api/tc/v3/_gen/_gen_abc.py +++ b/tcex/api/tc/v3/_gen/_gen_abc.py @@ -473,4 +473,12 @@ def tap(self, type_: str): 'user_groups', ]: return 'tcex.api.tc.v3.security' + + if type_.plural().lower() in [ + 'categories', + 'results', + 'subtypes', + ]: + return 'tcex.api.tc.v3.intel_requirements' + return 'tcex.api.tc.v3' diff --git a/tcex/api/tc/v3/_gen/_gen_args_abc.py b/tcex/api/tc/v3/_gen/_gen_args_abc.py index 68570a38b..4ece12e22 100644 --- a/tcex/api/tc/v3/_gen/_gen_args_abc.py +++ b/tcex/api/tc/v3/_gen/_gen_args_abc.py @@ -61,14 +61,14 @@ def gen_args( _doc_string = [f'{i1}Args:'] # get properties from schema - schema = model().schema(by_alias=False) + schema = model.schema(by_alias=False) if '$ref' in schema: model_name = schema.get('$ref').split('/')[-1] properties = schema.get('definitions').get(model_name).get('properties') elif 'properties' in schema: properties = schema.get('properties') else: - Render.panel.failure(model().schema_json(by_alias=False)) + Render.panel.failure(model.schema_json(by_alias=False)) # iterate over properties to build docstring for arg, prop_data in properties.items(): diff --git a/tcex/api/tc/v3/_gen/_gen_model_abc.py b/tcex/api/tc/v3/_gen/_gen_model_abc.py index 20316d26f..143587543 100644 --- a/tcex/api/tc/v3/_gen/_gen_model_abc.py +++ b/tcex/api/tc/v3/_gen/_gen_model_abc.py @@ -44,7 +44,7 @@ def _add_pydantic_private_attr(self): """Add pydantic validator only when required.""" self._add_module_class('third-party', 'pydantic', 'PrivateAttr') - def _gen_code_validator_method(self, type_: str, fields: list[str]) -> str: + def _gen_code_validator_method(self, type_: str, data: dict) -> str: """Return the validator code @validator('artifact_type', always=True, pre=True) @@ -54,6 +54,8 @@ def _validate_artifact_type(cls, v): return v """ type_ = self.util.camel_string(type_) + fields = data['fields'] + typing_type = data['typing_type'].strip('\'') fields_string = ', '.join(f'\'{field}\'' for field in fields) return '\n'.join( @@ -61,7 +63,7 @@ def _validate_artifact_type(cls, v): f'''{self.i1}@validator({fields_string}, always=True, pre=True)''', f'''{self.i1}def _validate_{type_.snake_case()}(cls, v):''', f'''{self.i2}if not v:''', - f'''{self.i3}return {type_}Model() # type: ignore''', + f'''{self.i3}return {typing_type}() # type: ignore''', f'''{self.i2}return v''', '', ] @@ -314,7 +316,10 @@ def gen_model_fields(self) -> str: # add validator if prop.extra.model is not None: - self.validators.setdefault(prop.type, []).append(prop.name.snake_case()) + self.validators.setdefault(prop.type, {}) + self.validators[prop.type].setdefault('fields', []) + self.validators[prop.type]['fields'].append(prop.name.snake_case()) + self.validators[prop.type]['typing_type'] = prop.extra.typing_type # update model _model.append( @@ -366,16 +371,16 @@ def gen_model_fields(self) -> str: # _model.append(f'''{self.i2}max_items={field_max_size},''') # max_value - if prop.max_value is not None: - _model.append(f'''{self.i2}maximum={prop.max_value},''') + # if prop.max_value is not None: + # _model.append(f'''{self.i2}maximum={prop.max_value},''') # min_length # if prop.min_length is not None: # _model.append(f'''{self.i2}min_length={prop.min_length},''') # min_value - if prop.min_value is not None: - _model.append(f'''{self.i2}minimum={prop.min_value},''') + # if prop.min_value is not None: + # _model.append(f'''{self.i2}minimum={prop.min_value},''') # readOnly/allow_mutation setting _model.append(f'''{self.i2}read_only={prop.read_only},''') @@ -465,13 +470,11 @@ def gen_validator_methods(self) -> str: """Generate model validator.""" _v = [] - validators = dict( - sorted({k: v for k, v in self.validators.items() if v is not None}.items()) - ) - for key, fields in validators.items(): - if not fields: + validators = dict(sorted({k: v for k, v in self.validators.items() if v['fields']}.items())) + for key, data in validators.items(): + if not data['fields']: continue - _v.append(self._gen_code_validator_method(key, fields)) + _v.append(self._gen_code_validator_method(key, data)) # add blank line above validators only if validators exists if _v: diff --git a/tcex/api/tc/v3/_gen/_gen_object_abc.py b/tcex/api/tc/v3/_gen/_gen_object_abc.py index bb8e8cb98..7eda65621 100644 --- a/tcex/api/tc/v3/_gen/_gen_object_abc.py +++ b/tcex/api/tc/v3/_gen/_gen_object_abc.py @@ -373,6 +373,9 @@ def as_entity(self) -> dict: name_entities = [ 'artifact_types', 'cases', + 'categories', + 'results', + 'subtypes', 'tags', 'tasks', 'workflow_templates', @@ -383,6 +386,8 @@ def as_entity(self) -> dict: value = 'self.model.summary' if self.type_.lower() in name_entities: value = 'self.model.name' + elif self.type_.lower() == 'intel_requirements': + value = 'self.model.requirement_text' as_entity_property_method = [ f'''{self.i1}@property''', diff --git a/tcex/api/tc/v3/_gen/model/_filter_model.py b/tcex/api/tc/v3/_gen/model/_filter_model.py index 9ca1c0678..20315619d 100644 --- a/tcex/api/tc/v3/_gen/model/_filter_model.py +++ b/tcex/api/tc/v3/_gen/model/_filter_model.py @@ -89,6 +89,7 @@ def __calculate_typing_type(cls, type_: str) -> dict[str, str]: 'Boolean': {'base_type': 'bool', 'typing_type': 'bool'}, 'Date': {'base_type': 'str', 'typing_type': 'Arrow | datetime | int | str'}, 'DateTime': {'base_type': 'str', 'typing_type': 'Arrow | datetime | int | str'}, + 'Double': {'base_type': 'float', 'typing_type': 'float | list'}, 'Enum': {'base_type': 'str', 'typing_type': 'list | str'}, 'EnumToInteger': {'base_type': 'str', 'typing_type': 'list | str'}, 'Integer': {'base_type': 'int', 'typing_type': 'int | list'}, @@ -114,6 +115,7 @@ def __calculate_tql_type(cls, typing_type: str): # tql types for the add_filter method call tql_type_map = { 'bool': 'TqlType.BOOLEAN', + 'float': 'TqlType.FLOAT', 'int': 'TqlType.INTEGER', 'str': 'TqlType.STRING', } diff --git a/tcex/api/tc/v3/_gen/model/_property_model.py b/tcex/api/tc/v3/_gen/model/_property_model.py index 403ae1af1..edc1a1bea 100644 --- a/tcex/api/tc/v3/_gen/model/_property_model.py +++ b/tcex/api/tc/v3/_gen/model/_property_model.py @@ -168,6 +168,9 @@ def __process_dict_types(cls, pm: 'PropertyModel', extra: dict[str, str]): 'Enrichments', 'GeoLocation', 'InvestigationLinks', + 'IntelReqType', + 'IntelRequirement', + 'KeywordSection', 'Links', 'Map', 'ValidationRule', @@ -268,6 +271,16 @@ def __process_special_types(cls, pm: 'PropertyModel', extra: dict[str, str]): 'typing_type': cls.__extra_format_type_model(pm.type), } ) + elif pm.type == 'IntelReqType': + bi += 'intel_requirements.intel_req_type_model' + extra.update( + { + 'import_data': f'{bi} import IntelReqTypeModel', + 'import_source': 'first-party-forward-reference', + 'model': 'IntelReqTypeModel', + 'typing_type': cls.__extra_format_type_model(pm.type), + } + ) elif pm.type == 'IndicatorAttributes': bi += 'indicator_attributes.indicator_attribute_model' extra.update( @@ -285,6 +298,16 @@ def __process_special_types(cls, pm: 'PropertyModel', extra: dict[str, str]): 'typing_type': '''dict | list[dict] | None''', } ) + elif pm.type == 'KeywordSection': + bi += 'intel_requirements.keyword_section_model' + extra.update( + { + 'import_data': f'{bi} import KeywordSectionModel', + 'import_source': 'first-party-forward-reference', + 'model': 'KeywordSectionModel', + 'typing_type': f'list[{cls.__extra_format_type_model(pm.type)}]', + } + ) elif pm.type == 'TaskAssignees': extra.update( { @@ -393,4 +416,12 @@ def __extra_tap(cls, type_: CamelString) -> str: 'user_groups', ]: return 'tcex.api.tc.v3.security' + + if type_.snake_case().plural().lower() in [ + 'categories', + 'results', + 'subtypes', + ]: + return 'tcex.api.tc.v3.intel_requirements' + return 'tcex.api.tc.v3' diff --git a/tcex/api/tc/v3/api_endpoints.py b/tcex/api/tc/v3/api_endpoints.py index 24832454b..96b8cac45 100644 --- a/tcex/api/tc/v3/api_endpoints.py +++ b/tcex/api/tc/v3/api_endpoints.py @@ -14,6 +14,10 @@ class ApiEndpoints(Enum): GROUP_ATTRIBUTES = '/v3/groupAttributes' GROUPS = '/v3/groups' INDICATOR_ATTRIBUTES = '/v3/indicatorAttributes' + INTEL_REQUIREMENTS = '/v3/intelRequirements' + CATEGORIES = '/v3/intelRequirements/categories' + RESULTS = '/v3/intelRequirements/results' + SUBTYPES = '/v3/intelRequirements/subtypes' INDICATORS = '/v3/indicators' NOTES = '/v3/notes' OWNERS = '/v3/security/owners' diff --git a/tcex/api/tc/v3/groups/group.py b/tcex/api/tc/v3/groups/group.py index 30654f12a..0234347d5 100644 --- a/tcex/api/tc/v3/groups/group.py +++ b/tcex/api/tc/v3/groups/group.py @@ -49,12 +49,17 @@ class Group(ObjectABC): due_date (str, kwargs): The date and time that the Task is due. escalation_date (str, kwargs): The escalation date and time. event_date (str, kwargs): The date and time that the incident or event was first created. + external_date_added (str, kwargs): The date and time that the item was first created + externally. + external_date_expires (str, kwargs): The date and time the item expires externally. + external_last_modified (str, kwargs): The date and time the item was modified externally. file_name (str, kwargs): The document or signature file name. file_text (str, kwargs): The signature file text. file_type (str, kwargs): The signature file type. - first_seen (str, kwargs): The date and time that the campaign was first created. + first_seen (str, kwargs): The date and time that the item was first seen. from_ (str, kwargs): The email From field. header (str, kwargs): The email Header field. + last_seen (str, kwargs): The date and time that the item was last seen. malware (bool, kwargs): Is the document malware? name (str, kwargs): The name of the group. owner_id (int, kwargs): The id of the Organization, Community, or Source that the item diff --git a/tcex/api/tc/v3/groups/group_filter.py b/tcex/api/tc/v3/groups/group_filter.py index 4fe5d1324..3e456eb91 100644 --- a/tcex/api/tc/v3/groups/group_filter.py +++ b/tcex/api/tc/v3/groups/group_filter.py @@ -258,6 +258,60 @@ def event_date(self, operator: Enum, event_date: Arrow | datetime | int | str): event_date = self.util.any_to_datetime(event_date).strftime('%Y-%m-%d %H:%M:%S') self._tql.add_filter('eventDate', operator, event_date, TqlType.STRING) + def external_date_added( + self, operator: Enum, external_date_added: Arrow | datetime | int | str + ): + """Filter External Date Added based on **externalDateAdded** keyword. + + Args: + operator: The operator enum for the filter. + external_date_added: The date and time that the group was first created externally. + """ + external_date_added = self.util.any_to_datetime(external_date_added).strftime( + '%Y-%m-%d %H:%M:%S' + ) + self._tql.add_filter('externalDateAdded', operator, external_date_added, TqlType.STRING) + + def external_date_expires( + self, operator: Enum, external_date_expires: Arrow | datetime | int | str + ): + """Filter External Date Expires based on **externalDateExpires** keyword. + + Args: + operator: The operator enum for the filter. + external_date_expires: The date and time the group expires externally. + """ + external_date_expires = self.util.any_to_datetime(external_date_expires).strftime( + '%Y-%m-%d %H:%M:%S' + ) + self._tql.add_filter('externalDateExpires', operator, external_date_expires, TqlType.STRING) + + def external_last_modified( + self, operator: Enum, external_last_modified: Arrow | datetime | int | str + ): + """Filter External Last Modified based on **externalLastModified** keyword. + + Args: + operator: The operator enum for the filter. + external_last_modified: The date and time the group was modified externally. + """ + external_last_modified = self.util.any_to_datetime(external_last_modified).strftime( + '%Y-%m-%d %H:%M:%S' + ) + self._tql.add_filter( + 'externalLastModified', operator, external_last_modified, TqlType.STRING + ) + + def first_seen(self, operator: Enum, first_seen: Arrow | datetime | int | str): + """Filter First Seen based on **firstSeen** keyword. + + Args: + operator: The operator enum for the filter. + first_seen: The date and time that the group was first seen. + """ + first_seen = self.util.any_to_datetime(first_seen).strftime('%Y-%m-%d %H:%M:%S') + self._tql.add_filter('firstSeen', operator, first_seen, TqlType.STRING) + def generated_report(self, operator: Enum, generated_report: bool): """Filter Generated (Report) based on **generatedReport** keyword. @@ -329,6 +383,23 @@ def has_intel_query(self, operator: Enum, has_intel_query: int | list): self._tql.add_filter('hasIntelQuery', operator, has_intel_query, TqlType.INTEGER) + def has_intel_requirement(self, operator: Enum, has_intel_requirement: int | list): + """Filter Associated Intel Requirement based on **hasIntelRequirement** keyword. + + Args: + operator: The operator enum for the filter. + has_intel_requirement: A nested query for association to intel requirements. + """ + if isinstance(has_intel_requirement, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter( + 'hasIntelRequirement', operator, has_intel_requirement, TqlType.INTEGER + ) + @property def has_security_label(self): """Return **SecurityLabel** for further filtering.""" @@ -403,6 +474,16 @@ def last_modified(self, operator: Enum, last_modified: Arrow | datetime | int | last_modified = self.util.any_to_datetime(last_modified).strftime('%Y-%m-%d %H:%M:%S') self._tql.add_filter('lastModified', operator, last_modified, TqlType.STRING) + def last_seen(self, operator: Enum, last_seen: Arrow | datetime | int | str): + """Filter Last Seen based on **lastSeen** keyword. + + Args: + operator: The operator enum for the filter. + last_seen: The date and time that the group was last seen. + """ + last_seen = self.util.any_to_datetime(last_seen).strftime('%Y-%m-%d %H:%M:%S') + self._tql.add_filter('lastSeen', operator, last_seen, TqlType.STRING) + def owner(self, operator: Enum, owner: int | list): """Filter Owner ID based on **owner** keyword. diff --git a/tcex/api/tc/v3/groups/group_model.py b/tcex/api/tc/v3/groups/group_model.py index 45dba0086..c2a9ba2c4 100644 --- a/tcex/api/tc/v3/groups/group_model.py +++ b/tcex/api/tc/v3/groups/group_model.py @@ -161,6 +161,27 @@ class GroupModel( read_only=False, title='eventDate', ) + external_date_added: datetime | None = Field( + None, + description='The date and time that the item was first created externally.', + methods=['POST', 'PUT'], + read_only=False, + title='externalDateAdded', + ) + external_date_expires: datetime | None = Field( + None, + description='The date and time the item expires externally.', + methods=['POST', 'PUT'], + read_only=False, + title='externalDateExpires', + ) + external_last_modified: datetime | None = Field( + None, + description='The date and time the item was modified externally.', + methods=['POST', 'PUT'], + read_only=False, + title='externalLastModified', + ) file_name: str | None = Field( None, applies_to=['Document', 'Report', 'Signature'], @@ -196,8 +217,7 @@ class GroupModel( ) first_seen: datetime | None = Field( None, - applies_to=['Campaign'], - description='The date and time that the campaign was first created.', + description='The date and time that the item was first seen.', methods=['POST', 'PUT'], read_only=False, title='firstSeen', @@ -239,6 +259,13 @@ class GroupModel( read_only=True, title='lastModified', ) + last_seen: datetime | None = Field( + None, + description='The date and time that the item was last seen.', + methods=['POST', 'PUT'], + read_only=False, + title='lastSeen', + ) legacy_link: str | None = Field( None, allow_mutation=False, diff --git a/tcex/api/tc/v3/indicators/indicator.py b/tcex/api/tc/v3/indicators/indicator.py index eb356c687..273efe7e2 100644 --- a/tcex/api/tc/v3/indicators/indicator.py +++ b/tcex/api/tc/v3/indicators/indicator.py @@ -49,13 +49,23 @@ class Indicator(ObjectABC): attributes (IndicatorAttributes, kwargs): A list of Attributes corresponding to the Indicator. confidence (int, kwargs): The indicator threat confidence. + custom_association_name (str, kwargs): The custom association name if assigned to this + indicator. + custom_associations (Indicators, kwargs): A list of indicators with custom associations to + this indicator. dns_active (bool, kwargs): Is dns active for the indicator? + external_date_added (str, kwargs): The date and time that the item was first created + externally. + external_date_expires (str, kwargs): The date and time the item expires externally. + external_last_modified (str, kwargs): The date and time the item was modified externally. file_actions (FileActions, kwargs): The type of file action associated with this indicator. file_occurrences (FileOccurrences, kwargs): A list of file occurrences associated with this indicator. + first_seen (str, kwargs): The date and time that the item was first seen. host_name (str, kwargs): The host name of the indicator (Host specific summary field). ip (str, kwargs): The ip address associated with this indicator (Address specific summary field). + last_seen (str, kwargs): The date and time that the item was last seen. md5 (str, kwargs): The md5 associated with this indicator (File specific summary field). mode (str, kwargs): The operation to perform on the file hashes (delete | merge). owner_id (int, kwargs): The id of the Organization, Community, or Source that the item diff --git a/tcex/api/tc/v3/indicators/indicator_filter.py b/tcex/api/tc/v3/indicators/indicator_filter.py index 9b9554d8d..8fbe6557b 100644 --- a/tcex/api/tc/v3/indicators/indicator_filter.py +++ b/tcex/api/tc/v3/indicators/indicator_filter.py @@ -247,6 +247,129 @@ def description(self, operator: Enum, description: list | str): self._tql.add_filter('description', operator, description, TqlType.STRING) + def dt_last_updated(self, operator: Enum, dt_last_updated: Arrow | datetime | int | str): + """Filter DomainTools Last Updated based on **dtLastUpdated** keyword. + + Args: + operator: The operator enum for the filter. + dt_last_updated: The date the indicator has been looked at with DomainTools. + """ + dt_last_updated = self.util.any_to_datetime(dt_last_updated).strftime('%Y-%m-%d %H:%M:%S') + self._tql.add_filter('dtLastUpdated', operator, dt_last_updated, TqlType.STRING) + + def dt_malware_score(self, operator: Enum, dt_malware_score: int | list): + """Filter DomainTools Malware Score based on **dtMalwareScore** keyword. + + Args: + operator: The operator enum for the filter. + dt_malware_score: The malware risk score from the DomainTools enrichment data. + """ + if isinstance(dt_malware_score, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('dtMalwareScore', operator, dt_malware_score, TqlType.INTEGER) + + def dt_overall_score(self, operator: Enum, dt_overall_score: int | list): + """Filter DomainTools Overall Score based on **dtOverallScore** keyword. + + Args: + operator: The operator enum for the filter. + dt_overall_score: The overall risk score from the DomainTools enrichment data. + """ + if isinstance(dt_overall_score, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('dtOverallScore', operator, dt_overall_score, TqlType.INTEGER) + + def dt_phishing_score(self, operator: Enum, dt_phishing_score: int | list): + """Filter DomainTools Phishing Score based on **dtPhishingScore** keyword. + + Args: + operator: The operator enum for the filter. + dt_phishing_score: The phishing risk score from the DomainTools enrichment data. + """ + if isinstance(dt_phishing_score, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('dtPhishingScore', operator, dt_phishing_score, TqlType.INTEGER) + + def dt_spam_score(self, operator: Enum, dt_spam_score: int | list): + """Filter DomainTools Spam Score based on **dtSpamScore** keyword. + + Args: + operator: The operator enum for the filter. + dt_spam_score: The spam risk score from the DomainTools enrichment data. + """ + if isinstance(dt_spam_score, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('dtSpamScore', operator, dt_spam_score, TqlType.INTEGER) + + def dt_status(self, operator: Enum, dt_status: bool): + """Filter DomainTools Status based on **dtStatus** keyword. + + Args: + operator: The operator enum for the filter. + dt_status: The domain status (active/inactive) from the DomainTools enrichment data. + """ + self._tql.add_filter('dtStatus', operator, dt_status, TqlType.BOOLEAN) + + def external_date_added( + self, operator: Enum, external_date_added: Arrow | datetime | int | str + ): + """Filter External Date Added based on **externalDateAdded** keyword. + + Args: + operator: The operator enum for the filter. + external_date_added: The date and time that the indicator was first created externally. + """ + external_date_added = self.util.any_to_datetime(external_date_added).strftime( + '%Y-%m-%d %H:%M:%S' + ) + self._tql.add_filter('externalDateAdded', operator, external_date_added, TqlType.STRING) + + def external_date_expires( + self, operator: Enum, external_date_expires: Arrow | datetime | int | str + ): + """Filter External Date Expires based on **externalDateExpires** keyword. + + Args: + operator: The operator enum for the filter. + external_date_expires: The date and time the indicator expires externally. + """ + external_date_expires = self.util.any_to_datetime(external_date_expires).strftime( + '%Y-%m-%d %H:%M:%S' + ) + self._tql.add_filter('externalDateExpires', operator, external_date_expires, TqlType.STRING) + + def external_last_modified( + self, operator: Enum, external_last_modified: Arrow | datetime | int | str + ): + """Filter External Last Modified based on **externalLastModified** keyword. + + Args: + operator: The operator enum for the filter. + external_last_modified: The date and time the indicator was modified externally. + """ + external_last_modified = self.util.any_to_datetime(external_last_modified).strftime( + '%Y-%m-%d %H:%M:%S' + ) + self._tql.add_filter( + 'externalLastModified', operator, external_last_modified, TqlType.STRING + ) + def false_positive_count(self, operator: Enum, false_positive_count: int | list): """Filter False Positive Count based on **falsePositiveCount** keyword. @@ -278,6 +401,16 @@ def file_size(self, operator: Enum, file_size: int | list): self._tql.add_filter('fileSize', operator, file_size, TqlType.INTEGER) + def first_seen(self, operator: Enum, first_seen: Arrow | datetime | int | str): + """Filter First Seen based on **firstSeen** keyword. + + Args: + operator: The operator enum for the filter. + first_seen: The date and time that the indicator was first seen. + """ + first_seen = self.util.any_to_datetime(first_seen).strftime('%Y-%m-%d %H:%M:%S') + self._tql.add_filter('firstSeen', operator, first_seen, TqlType.STRING) + @property def has_artifact(self): """Return **ArtifactFilter** for further filtering.""" @@ -327,6 +460,23 @@ def has_indicator(self): self._tql.add_filter('hasIndicator', TqlOperator.EQ, indicators, TqlType.SUB_QUERY) return indicators + def has_intel_requirement(self, operator: Enum, has_intel_requirement: int | list): + """Filter Associated Intel Requirement based on **hasIntelRequirement** keyword. + + Args: + operator: The operator enum for the filter. + has_intel_requirement: A nested query for association to intel requirements. + """ + if isinstance(has_intel_requirement, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter( + 'hasIntelRequirement', operator, has_intel_requirement, TqlType.INTEGER + ) + @property def has_security_label(self): """Return **SecurityLabel** for further filtering.""" @@ -443,6 +593,16 @@ def last_observed(self, operator: Enum, last_observed: Arrow | datetime | int | last_observed = self.util.any_to_datetime(last_observed).strftime('%Y-%m-%d %H:%M:%S') self._tql.add_filter('lastObserved', operator, last_observed, TqlType.STRING) + def last_seen(self, operator: Enum, last_seen: Arrow | datetime | int | str): + """Filter Last Seen based on **lastSeen** keyword. + + Args: + operator: The operator enum for the filter. + last_seen: The date and time that the indicator was last seen. + """ + last_seen = self.util.any_to_datetime(last_seen).strftime('%Y-%m-%d %H:%M:%S') + self._tql.add_filter('lastSeen', operator, last_seen, TqlType.STRING) + def observation_count(self, operator: Enum, observation_count: int | list): """Filter Observation Count based on **observationCount** keyword. diff --git a/tcex/api/tc/v3/indicators/indicator_model.py b/tcex/api/tc/v3/indicators/indicator_model.py index b09cc9c69..1bb410b1b 100644 --- a/tcex/api/tc/v3/indicators/indicator_model.py +++ b/tcex/api/tc/v3/indicators/indicator_model.py @@ -90,11 +90,23 @@ class IndicatorModel( None, description='The indicator threat confidence.', methods=['POST', 'PUT'], - maximum=100, - minimum=0, read_only=False, title='confidence', ) + custom_association_name: str | None = Field( + None, + description='The custom association name if assigned to this indicator.', + methods=['POST', 'PUT'], + read_only=False, + title='customAssociationName', + ) + custom_associations: 'IndicatorsModel' = Field( + None, + description='A list of indicators with custom associations to this indicator.', + methods=['POST', 'PUT'], + read_only=False, + title='customAssociations', + ) date_added: datetime | None = Field( None, allow_mutation=False, @@ -133,6 +145,27 @@ class IndicatorModel( read_only=True, title='enrichment', ) + external_date_added: datetime | None = Field( + None, + description='The date and time that the item was first created externally.', + methods=['POST', 'PUT'], + read_only=False, + title='externalDateAdded', + ) + external_date_expires: datetime | None = Field( + None, + description='The date and time the item expires externally.', + methods=['POST', 'PUT'], + read_only=False, + title='externalDateExpires', + ) + external_last_modified: datetime | None = Field( + None, + description='The date and time the item was modified externally.', + methods=['POST', 'PUT'], + read_only=False, + title='externalLastModified', + ) false_positive_reported_by_user: bool = Field( None, allow_mutation=False, @@ -161,6 +194,13 @@ class IndicatorModel( read_only=False, title='fileOccurrences', ) + first_seen: datetime | None = Field( + None, + description='The date and time that the item was first seen.', + methods=['POST', 'PUT'], + read_only=False, + title='firstSeen', + ) geo_location: dict | None = Field( None, allow_mutation=False, @@ -226,6 +266,13 @@ class IndicatorModel( read_only=True, title='lastObserved', ) + last_seen: datetime | None = Field( + None, + description='The date and time that the item was last seen.', + methods=['POST', 'PUT'], + read_only=False, + title='lastSeen', + ) legacy_link: str | None = Field( None, allow_mutation=False, @@ -281,8 +328,6 @@ class IndicatorModel( None, description='The indicator threat rating.', methods=['POST', 'PUT'], - maximum=5, - minimum=0, read_only=False, title='rating', ) @@ -484,7 +529,7 @@ def _validate_indicator_attributes(cls, v): return IndicatorAttributesModel() # type: ignore return v - @validator('associated_indicators', always=True, pre=True) + @validator('associated_indicators', 'custom_associations', always=True, pre=True) def _validate_indicators(cls, v): if not v: return IndicatorsModel() # type: ignore diff --git a/tcex/api/tc/v3/intel_requirement/__init__.py b/tcex/api/tc/v3/intel_requirement/__init__.py new file mode 100644 index 000000000..558af896b --- /dev/null +++ b/tcex/api/tc/v3/intel_requirement/__init__.py @@ -0,0 +1 @@ +"""TcEx Framework Module""" diff --git a/tcex/api/tc/v3/intel_requirement/ir.py b/tcex/api/tc/v3/intel_requirement/ir.py new file mode 100644 index 000000000..86ec22f8d --- /dev/null +++ b/tcex/api/tc/v3/intel_requirement/ir.py @@ -0,0 +1,53 @@ +"""TcEx Framework Module""" +# third-party +from requests import Session + +# first-party +from tcex.api.tc.v3.intel_requirements.categories.category import Categories, Category +from tcex.api.tc.v3.intel_requirements.intel_requirement import IntelRequirement, IntelRequirements +from tcex.api.tc.v3.intel_requirements.results.result import Result, Results +from tcex.api.tc.v3.intel_requirements.subtypes.subtype import Subtype, Subtypes + + +class IR: + """Intel Requirement + + Args: + session: An configured instance of request.Session with TC API Auth. + """ + + def __init__(self, session: Session): + """Initialize instance properties.""" + self.session = session + + def categories(self, **kwargs) -> Categories: + """Return a instance of Categories object.""" + return Categories(session=self.session, **kwargs) + + def category(self, **kwargs) -> Category: + """Return a instance of Category object.""" + return Category(session=self.session, **kwargs) + + def intel_requirement(self, **kwargs) -> IntelRequirement: + """Return a instance of Intel Requirement object.""" + return IntelRequirement(session=self.session, **kwargs) + + def intel_requirements(self, **kwargs) -> IntelRequirements: + """Return a instance of Intel Requirements object.""" + return IntelRequirements(session=self.session, **kwargs) + + def result(self, **kwargs) -> Result: + """Return a instance of Result object.""" + return Result(session=self.session, **kwargs) + + def results(self, **kwargs) -> Results: + """Return a instance of Results object.""" + return Results(session=self.session, **kwargs) + + def subtype(self, **kwargs) -> Subtype: + """Return a instance of Subtype object.""" + return Subtype(session=self.session, **kwargs) + + def subtypes(self, **kwargs) -> Subtypes: + """Return a instance of Subtypes object.""" + return Subtypes(session=self.session, **kwargs) diff --git a/tcex/api/tc/v3/intel_requirements/__init__.py b/tcex/api/tc/v3/intel_requirements/__init__.py new file mode 100644 index 000000000..558af896b --- /dev/null +++ b/tcex/api/tc/v3/intel_requirements/__init__.py @@ -0,0 +1 @@ +"""TcEx Framework Module""" diff --git a/tcex/api/tc/v3/intel_requirements/categories/category.py b/tcex/api/tc/v3/intel_requirements/categories/category.py new file mode 100644 index 000000000..fe74e1de3 --- /dev/null +++ b/tcex/api/tc/v3/intel_requirements/categories/category.py @@ -0,0 +1,100 @@ +"""TcEx Framework Module""" +# standard library +from collections.abc import Iterator + +# first-party +from tcex.api.tc.v3.api_endpoints import ApiEndpoints +from tcex.api.tc.v3.intel_requirements.categories.category_filter import CategoryFilter +from tcex.api.tc.v3.intel_requirements.categories.category_model import ( + CategoriesModel, + CategoryModel, +) +from tcex.api.tc.v3.object_abc import ObjectABC +from tcex.api.tc.v3.object_collection_abc import ObjectCollectionABC + + +class Category(ObjectABC): + """Categories Object. + + Args: + description (str, kwargs): The description of the subtype/category. + name (str, kwargs): The details of the subtype/category. + """ + + def __init__(self, **kwargs): + """Initialize instance properties.""" + super().__init__(kwargs.pop('session', None)) + + # properties + self._model: CategoryModel = CategoryModel(**kwargs) + self._nested_field_name = 'categories' + self._nested_filter = 'has_category' + self.type_ = 'Category' + + @property + def _api_endpoint(self) -> str: + """Return the type specific API endpoint.""" + return ApiEndpoints.CATEGORIES.value + + @property + def model(self) -> CategoryModel: + """Return the model data.""" + return self._model + + @model.setter + def model(self, data: dict | CategoryModel): + """Create model using the provided data.""" + if isinstance(data, type(self.model)): + # provided data is already a model, nothing required to change + self._model = data + elif isinstance(data, dict): + # provided data is raw response, load the model + self._model = type(self.model)(**data) + else: + raise RuntimeError(f'Invalid data type: {type(data)} provided.') + + @property + def as_entity(self) -> dict: + """Return the entity representation of the object.""" + type_ = self.type_ + + return {'type': type_, 'id': self.model.id, 'value': self.model.name} + + +class Categories(ObjectCollectionABC): + """Categories Collection. + + # Example of params input + { + 'result_limit': 100, # Limit the retrieved results. + 'result_start': 10, # Starting count used for pagination. + 'fields': ['caseId', 'summary'] # Select additional return fields. + } + + Args: + session (Session): Session object configured with TC API Auth. + tql_filters (list): List of TQL filters. + params (dict): Additional query params (see example above). + """ + + def __init__(self, **kwargs): + """Initialize instance properties.""" + super().__init__( + kwargs.pop('session', None), kwargs.pop('tql_filter', None), kwargs.pop('params', None) + ) + self._model = CategoriesModel(**kwargs) + self.type_ = 'categories' + + def __iter__(self) -> Iterator[Category]: + """Return CM objects.""" + return self.iterate(base_class=Category) # type: ignore + + @property + def _api_endpoint(self) -> str: + """Return the type specific API endpoint.""" + return ApiEndpoints.CATEGORIES.value + + @property + def filter(self) -> CategoryFilter: + """Return the type specific filter object.""" + return CategoryFilter(self.tql) diff --git a/tcex/api/tc/v3/intel_requirements/categories/category_filter.py b/tcex/api/tc/v3/intel_requirements/categories/category_filter.py new file mode 100644 index 000000000..946ab1982 --- /dev/null +++ b/tcex/api/tc/v3/intel_requirements/categories/category_filter.py @@ -0,0 +1,62 @@ +"""TcEx Framework Module""" +# standard library +from enum import Enum + +# first-party +from tcex.api.tc.v3.api_endpoints import ApiEndpoints +from tcex.api.tc.v3.filter_abc import FilterABC +from tcex.api.tc.v3.tql.tql_type import TqlType + + +class CategoryFilter(FilterABC): + """Filter Object for Categories""" + + @property + def _api_endpoint(self) -> str: + """Return the API endpoint.""" + return ApiEndpoints.CATEGORIES.value + + def description(self, operator: Enum, description: list | str): + """Filter Description based on **description** keyword. + + Args: + operator: The operator enum for the filter. + description: The description of the category. + """ + if isinstance(description, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('description', operator, description, TqlType.STRING) + + def id(self, operator: Enum, id: int | list): # pylint: disable=redefined-builtin + """Filter ID based on **id** keyword. + + Args: + operator: The operator enum for the filter. + id: The ID of the category. + """ + if isinstance(id, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('id', operator, id, TqlType.INTEGER) + + def name(self, operator: Enum, name: list | str): + """Filter Name based on **name** keyword. + + Args: + operator: The operator enum for the filter. + name: The name of the category. + """ + if isinstance(name, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('name', operator, name, TqlType.STRING) diff --git a/tcex/api/tc/v3/intel_requirements/categories/category_model.py b/tcex/api/tc/v3/intel_requirements/categories/category_model.py new file mode 100644 index 000000000..b9e4b7adb --- /dev/null +++ b/tcex/api/tc/v3/intel_requirements/categories/category_model.py @@ -0,0 +1,90 @@ +"""TcEx Framework Module""" +# pylint: disable=no-member,no-self-argument,wrong-import-position +# third-party +from pydantic import BaseModel, Extra, Field, PrivateAttr + +# first-party +from tcex.api.tc.v3.v3_model_abc import V3ModelABC +from tcex.util import Util + + +class CategoryModel( + V3ModelABC, + alias_generator=Util().snake_to_camel, + extra=Extra.allow, + title='Category Model', + validate_assignment=True, +): + """Category Model""" + + _associated_type = PrivateAttr(False) + _cm_type = PrivateAttr(False) + _shared_type = PrivateAttr(False) + _staged = PrivateAttr(False) + + description: str | None = Field( + None, + description='The description of the subtype/category.', + methods=['POST', 'PUT'], + read_only=False, + title='description', + ) + id: int | None = Field( + None, + description='The ID of the item.', + read_only=True, + title='id', + ) + name: str | None = Field( + None, + description='The details of the subtype/category.', + methods=['POST', 'PUT'], + read_only=False, + title='name', + ) + + +class CategoryDataModel( + BaseModel, + title='Category Data Model', + alias_generator=Util().snake_to_camel, + validate_assignment=True, +): + """Categories Data Model""" + + data: list[CategoryModel] | None = Field( + [], + description='The data for the Categories.', + methods=['POST', 'PUT'], + title='data', + ) + + +class CategoriesModel( + BaseModel, + title='Categories Model', + alias_generator=Util().snake_to_camel, + validate_assignment=True, +): + """Categories Model""" + + _mode_support = PrivateAttr(False) + + data: list[CategoryModel] | None = Field( + [], + description='The data for the Categories.', + methods=['POST', 'PUT'], + title='data', + ) + mode: str = Field( + 'append', + description='The PUT mode for nested objects (append, delete, replace). Default: append', + methods=['POST', 'PUT'], + title='append', + ) + + +# add forward references +CategoryDataModel.update_forward_refs() +CategoryModel.update_forward_refs() +CategoriesModel.update_forward_refs() diff --git a/tcex/api/tc/v3/intel_requirements/intel_req_type_model.py b/tcex/api/tc/v3/intel_requirements/intel_req_type_model.py new file mode 100644 index 000000000..1678f89af --- /dev/null +++ b/tcex/api/tc/v3/intel_requirements/intel_req_type_model.py @@ -0,0 +1,12 @@ +"""TcEx Framework Module""" +# pylint: disable=no-member,no-self-argument,wrong-import-position + +# third-party +from pydantic import BaseModel + + +class IntelReqTypeModel(BaseModel): + """Model Definition""" + + name: str | None + description: str | None diff --git a/tcex/api/tc/v3/intel_requirements/intel_requirement.py b/tcex/api/tc/v3/intel_requirements/intel_requirement.py new file mode 100644 index 000000000..19eac4eea --- /dev/null +++ b/tcex/api/tc/v3/intel_requirements/intel_requirement.py @@ -0,0 +1,251 @@ +"""TcEx Framework Module""" +# standard library +from collections.abc import Generator, Iterator +from typing import TYPE_CHECKING + +# first-party +from tcex.api.tc.v3.api_endpoints import ApiEndpoints +from tcex.api.tc.v3.artifacts.artifact_model import ArtifactModel +from tcex.api.tc.v3.cases.case_model import CaseModel +from tcex.api.tc.v3.groups.group_model import GroupModel +from tcex.api.tc.v3.indicators.indicator_model import IndicatorModel +from tcex.api.tc.v3.intel_requirements.intel_requirement_filter import IntelRequirementFilter +from tcex.api.tc.v3.intel_requirements.intel_requirement_model import ( + IntelRequirementModel, + IntelRequirementsModel, +) +from tcex.api.tc.v3.object_abc import ObjectABC +from tcex.api.tc.v3.object_collection_abc import ObjectCollectionABC +from tcex.api.tc.v3.tags.tag_model import TagModel +from tcex.api.tc.v3.victim_assets.victim_asset_model import VictimAssetModel + +if TYPE_CHECKING: # pragma: no cover + # first-party + from tcex.api.tc.v3.artifacts.artifact import Artifact # CIRCULAR-IMPORT + from tcex.api.tc.v3.cases.case import Case # CIRCULAR-IMPORT + from tcex.api.tc.v3.groups.group import Group # CIRCULAR-IMPORT + from tcex.api.tc.v3.indicators.indicator import Indicator # CIRCULAR-IMPORT + from tcex.api.tc.v3.tags.tag import Tag # CIRCULAR-IMPORT + from tcex.api.tc.v3.victim_assets.victim_asset import VictimAsset # CIRCULAR-IMPORT + + +class IntelRequirement(ObjectABC): + """IntelRequirements Object. + + Args: + associated_artifacts (Artifacts, kwargs): A list of Artifacts associated with this Group. + associated_cases (Cases, kwargs): A list of Cases associated with this Group. + associated_groups (Groups, kwargs): A list of groups associated with this group. + associated_indicators (Indicators, kwargs): A list of indicators associated with this group. + associated_victim_assets (VictimAssets, kwargs): A list of victim assets associated with + this group. + category (IntelReqType, kwargs): The category of the intel requirement. + description (str, kwargs): The description of the intel requirement. + keyword_sections (array, kwargs): The section of the intel requirement that contains the + keywords. + requirement_text (str, kwargs): The detailed text of the intel requirement. + reset_results (bool, kwargs): Flag to reset results when updating keywords. + subtype (IntelReqType, kwargs): The subtype of the intel requirement. + tags (Tags, kwargs): A list of Tags corresponding to the item (NOTE: Setting this parameter + will replace any existing tag(s) with the one(s) specified). + unique_id (str, kwargs): The unique id of the intel requirement. + xid (str, kwargs): The xid of the item. + """ + + def __init__(self, **kwargs): + """Initialize instance properties.""" + super().__init__(kwargs.pop('session', None)) + + # properties + self._model: IntelRequirementModel = IntelRequirementModel(**kwargs) + self._nested_field_name = 'intelRequirements' + self._nested_filter = 'has_intel_requirement' + self.type_ = 'Intel Requirement' + + @property + def _api_endpoint(self) -> str: + """Return the type specific API endpoint.""" + return ApiEndpoints.INTEL_REQUIREMENTS.value + + @property + def model(self) -> IntelRequirementModel: + """Return the model data.""" + return self._model + + @model.setter + def model(self, data: dict | IntelRequirementModel): + """Create model using the provided data.""" + if isinstance(data, type(self.model)): + # provided data is already a model, nothing required to change + self._model = data + elif isinstance(data, dict): + # provided data is raw response, load the model + self._model = type(self.model)(**data) + else: + raise RuntimeError(f'Invalid data type: {type(data)} provided.') + + @property + def as_entity(self) -> dict: + """Return the entity representation of the object.""" + type_ = self.type_ + + return {'type': type_, 'id': self.model.id, 'value': self.model.requirement_text} + + @property + def associated_artifacts(self) -> Generator['Artifact', None, None]: + """Yield Artifact from Artifacts.""" + # first-party + from tcex.api.tc.v3.artifacts.artifact import Artifacts + + yield from self._iterate_over_sublist(Artifacts) # type: ignore + + @property + def associated_cases(self) -> Generator['Case', None, None]: + """Yield Case from Cases.""" + # first-party + from tcex.api.tc.v3.cases.case import Cases + + yield from self._iterate_over_sublist(Cases) # type: ignore + + @property + def associated_groups(self) -> Generator['Group', None, None]: + """Yield Group from Groups.""" + # first-party + from tcex.api.tc.v3.groups.group import Groups + + yield from self._iterate_over_sublist(Groups) # type: ignore + + @property + def associated_indicators(self) -> Generator['Indicator', None, None]: + """Yield Indicator from Indicators.""" + # first-party + from tcex.api.tc.v3.indicators.indicator import Indicators + + yield from self._iterate_over_sublist(Indicators) # type: ignore + + @property + def associated_victim_assets(self) -> Generator['VictimAsset', None, None]: + """Yield VictimAsset from VictimAssets.""" + # first-party + from tcex.api.tc.v3.victim_assets.victim_asset import VictimAssets + + yield from self._iterate_over_sublist(VictimAssets) # type: ignore + + @property + def tags(self) -> Generator['Tag', None, None]: + """Yield Tag from Tags.""" + # first-party + from tcex.api.tc.v3.tags.tag import Tags + + yield from self._iterate_over_sublist(Tags) # type: ignore + + def stage_associated_case(self, data: dict | ObjectABC | CaseModel): + """Stage case on the object.""" + if isinstance(data, ObjectABC): + data = data.model # type: ignore + elif isinstance(data, dict): + data = CaseModel(**data) + + if not isinstance(data, CaseModel): + raise RuntimeError('Invalid type passed in to stage_associated_case') + data._staged = True + self.model.associated_cases.data.append(data) # type: ignore + + def stage_associated_artifact(self, data: dict | ObjectABC | ArtifactModel): + """Stage artifact on the object.""" + if isinstance(data, ObjectABC): + data = data.model # type: ignore + elif isinstance(data, dict): + data = ArtifactModel(**data) + + if not isinstance(data, ArtifactModel): + raise RuntimeError('Invalid type passed in to stage_associated_artifact') + data._staged = True + self.model.associated_artifacts.data.append(data) # type: ignore + + def stage_associated_group(self, data: dict | ObjectABC | GroupModel): + """Stage group on the object.""" + if isinstance(data, ObjectABC): + data = data.model # type: ignore + elif isinstance(data, dict): + data = GroupModel(**data) + + if not isinstance(data, GroupModel): + raise RuntimeError('Invalid type passed in to stage_associated_group') + data._staged = True + self.model.associated_groups.data.append(data) # type: ignore + + def stage_associated_victim_asset(self, data: dict | ObjectABC | VictimAssetModel): + """Stage victim_asset on the object.""" + if isinstance(data, ObjectABC): + data = data.model # type: ignore + elif isinstance(data, dict): + data = VictimAssetModel(**data) + + if not isinstance(data, VictimAssetModel): + raise RuntimeError('Invalid type passed in to stage_associated_victim_asset') + data._staged = True + self.model.associated_victim_assets.data.append(data) # type: ignore + + def stage_associated_indicator(self, data: dict | ObjectABC | IndicatorModel): + """Stage indicator on the object.""" + if isinstance(data, ObjectABC): + data = data.model # type: ignore + elif isinstance(data, dict): + data = IndicatorModel(**data) + + if not isinstance(data, IndicatorModel): + raise RuntimeError('Invalid type passed in to stage_associated_indicator') + data._staged = True + self.model.associated_indicators.data.append(data) # type: ignore + + def stage_tag(self, data: dict | ObjectABC | TagModel): + """Stage tag on the object.""" + if isinstance(data, ObjectABC): + data = data.model # type: ignore + elif isinstance(data, dict): + data = TagModel(**data) + + if not isinstance(data, TagModel): + raise RuntimeError('Invalid type passed in to stage_tag') + data._staged = True + self.model.tags.data.append(data) # type: ignore + + +class IntelRequirements(ObjectCollectionABC): + """IntelRequirements Collection. + + # Example of params input + { + 'result_limit': 100, # Limit the retrieved results. + 'result_start': 10, # Starting count used for pagination. + 'fields': ['caseId', 'summary'] # Select additional return fields. + } + + Args: + session (Session): Session object configured with TC API Auth. + tql_filters (list): List of TQL filters. + params (dict): Additional query params (see example above). + """ + + def __init__(self, **kwargs): + """Initialize instance properties.""" + super().__init__( + kwargs.pop('session', None), kwargs.pop('tql_filter', None), kwargs.pop('params', None) + ) + self._model = IntelRequirementsModel(**kwargs) + self.type_ = 'intel_requirements' + + def __iter__(self) -> Iterator[IntelRequirement]: + """Return CM objects.""" + return self.iterate(base_class=IntelRequirement) # type: ignore + + @property + def _api_endpoint(self) -> str: + """Return the type specific API endpoint.""" + return ApiEndpoints.INTEL_REQUIREMENTS.value + + @property + def filter(self) -> IntelRequirementFilter: + """Return the type specific filter object.""" + return IntelRequirementFilter(self.tql) diff --git a/tcex/api/tc/v3/intel_requirements/intel_requirement_filter.py b/tcex/api/tc/v3/intel_requirements/intel_requirement_filter.py new file mode 100644 index 000000000..8551c8015 --- /dev/null +++ b/tcex/api/tc/v3/intel_requirements/intel_requirement_filter.py @@ -0,0 +1,857 @@ +"""TcEx Framework Module""" +# standard library +from datetime import datetime +from enum import Enum + +# third-party +from arrow import Arrow + +# first-party +from tcex.api.tc.v3.api_endpoints import ApiEndpoints +from tcex.api.tc.v3.filter_abc import FilterABC +from tcex.api.tc.v3.tql.tql import Tql +from tcex.api.tc.v3.tql.tql_operator import TqlOperator +from tcex.api.tc.v3.tql.tql_type import TqlType + + +class IntelRequirementFilter(FilterABC): + """Filter Object for IntelRequirements""" + + @property + def _api_endpoint(self) -> str: + """Return the API endpoint.""" + return ApiEndpoints.INTEL_REQUIREMENTS.value + + def associated_indicator(self, operator: Enum, associated_indicator: int | list): + """Filter associatedIndicator based on **associatedIndicator** keyword. + + Args: + operator: The operator enum for the filter. + associated_indicator: No description provided. + """ + if isinstance(associated_indicator, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('associatedIndicator', operator, associated_indicator, TqlType.INTEGER) + + def attribute(self, operator: Enum, attribute: list | str): + """Filter attribute based on **attribute** keyword. + + Args: + operator: The operator enum for the filter. + attribute: No description provided. + """ + if isinstance(attribute, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('attribute', operator, attribute, TqlType.STRING) + + def child_group(self, operator: Enum, child_group: int | list): + """Filter childGroup based on **childGroup** keyword. + + Args: + operator: The operator enum for the filter. + child_group: No description provided. + """ + if isinstance(child_group, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('childGroup', operator, child_group, TqlType.INTEGER) + + def created_by(self, operator: Enum, created_by: list | str): + """Filter Created By based on **createdBy** keyword. + + Args: + operator: The operator enum for the filter. + created_by: The user who created the group. + """ + if isinstance(created_by, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('createdBy', operator, created_by, TqlType.STRING) + + def date_added(self, operator: Enum, date_added: Arrow | datetime | int | str): + """Filter Date Added based on **dateAdded** keyword. + + Args: + operator: The operator enum for the filter. + date_added: The date the group was added to the system. + """ + date_added = self.util.any_to_datetime(date_added).strftime('%Y-%m-%d %H:%M:%S') + self._tql.add_filter('dateAdded', operator, date_added, TqlType.STRING) + + def document_date_added( + self, operator: Enum, document_date_added: Arrow | datetime | int | str + ): + """Filter Date Added (Document) based on **documentDateAdded** keyword. + + Args: + operator: The operator enum for the filter. + document_date_added: The date the document was added. + """ + document_date_added = self.util.any_to_datetime(document_date_added).strftime( + '%Y-%m-%d %H:%M:%S' + ) + self._tql.add_filter('documentDateAdded', operator, document_date_added, TqlType.STRING) + + def document_filename(self, operator: Enum, document_filename: list | str): + """Filter Filename (Document) based on **documentFilename** keyword. + + Args: + operator: The operator enum for the filter. + document_filename: The file name of the document. + """ + if isinstance(document_filename, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('documentFilename', operator, document_filename, TqlType.STRING) + + def document_filesize(self, operator: Enum, document_filesize: int | list): + """Filter File Size (Document) based on **documentFilesize** keyword. + + Args: + operator: The operator enum for the filter. + document_filesize: The filesize of the document. + """ + if isinstance(document_filesize, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('documentFilesize', operator, document_filesize, TqlType.INTEGER) + + def document_status(self, operator: Enum, document_status: list | str): + """Filter Status (Document) based on **documentStatus** keyword. + + Args: + operator: The operator enum for the filter. + document_status: The status of the document. + """ + if isinstance(document_status, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('documentStatus', operator, document_status, TqlType.STRING) + + def document_type(self, operator: Enum, document_type: list | str): + """Filter Type (Document) based on **documentType** keyword. + + Args: + operator: The operator enum for the filter. + document_type: The type of document. + """ + if isinstance(document_type, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('documentType', operator, document_type, TqlType.STRING) + + def downvote_count(self, operator: Enum, downvote_count: int | list): + """Filter Downvote Count based on **downvoteCount** keyword. + + Args: + operator: The operator enum for the filter. + downvote_count: The number of downvotes the group has received. + """ + if isinstance(downvote_count, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('downvoteCount', operator, downvote_count, TqlType.INTEGER) + + def email_date(self, operator: Enum, email_date: Arrow | datetime | int | str): + """Filter Date (Email) based on **emailDate** keyword. + + Args: + operator: The operator enum for the filter. + email_date: The date of the email. + """ + email_date = self.util.any_to_datetime(email_date).strftime('%Y-%m-%d %H:%M:%S') + self._tql.add_filter('emailDate', operator, email_date, TqlType.STRING) + + def email_from(self, operator: Enum, email_from: list | str): + """Filter From (Email) based on **emailFrom** keyword. + + Args: + operator: The operator enum for the filter. + email_from: The 'from' field of the email. + """ + if isinstance(email_from, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('emailFrom', operator, email_from, TqlType.STRING) + + def email_score(self, operator: Enum, email_score: int | list): + """Filter Score (Email) based on **emailScore** keyword. + + Args: + operator: The operator enum for the filter. + email_score: The score of the email. + """ + if isinstance(email_score, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('emailScore', operator, email_score, TqlType.INTEGER) + + def email_score_includes_body(self, operator: Enum, email_score_includes_body: bool): + """Filter Score Includes Body (Email) based on **emailScoreIncludesBody** keyword. + + Args: + operator: The operator enum for the filter. + email_score_includes_body: A true/false indicating if the body was included in the + scoring of the email. + """ + self._tql.add_filter( + 'emailScoreIncludesBody', operator, email_score_includes_body, TqlType.BOOLEAN + ) + + def email_subject(self, operator: Enum, email_subject: list | str): + """Filter Subject (Email) based on **emailSubject** keyword. + + Args: + operator: The operator enum for the filter. + email_subject: The subject of the email. + """ + if isinstance(email_subject, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('emailSubject', operator, email_subject, TqlType.STRING) + + def event_date(self, operator: Enum, event_date: Arrow | datetime | int | str): + """Filter Event Date based on **eventDate** keyword. + + Args: + operator: The operator enum for the filter. + event_date: The event date of the group. + """ + event_date = self.util.any_to_datetime(event_date).strftime('%Y-%m-%d %H:%M:%S') + self._tql.add_filter('eventDate', operator, event_date, TqlType.STRING) + + def external_date_added( + self, operator: Enum, external_date_added: Arrow | datetime | int | str + ): + """Filter External Date Added based on **externalDateAdded** keyword. + + Args: + operator: The operator enum for the filter. + external_date_added: The date and time that the group was first created externally. + """ + external_date_added = self.util.any_to_datetime(external_date_added).strftime( + '%Y-%m-%d %H:%M:%S' + ) + self._tql.add_filter('externalDateAdded', operator, external_date_added, TqlType.STRING) + + def external_date_expires( + self, operator: Enum, external_date_expires: Arrow | datetime | int | str + ): + """Filter External Date Expires based on **externalDateExpires** keyword. + + Args: + operator: The operator enum for the filter. + external_date_expires: The date and time the group expires externally. + """ + external_date_expires = self.util.any_to_datetime(external_date_expires).strftime( + '%Y-%m-%d %H:%M:%S' + ) + self._tql.add_filter('externalDateExpires', operator, external_date_expires, TqlType.STRING) + + def external_last_modified( + self, operator: Enum, external_last_modified: Arrow | datetime | int | str + ): + """Filter External Last Modified based on **externalLastModified** keyword. + + Args: + operator: The operator enum for the filter. + external_last_modified: The date and time the group was modified externally. + """ + external_last_modified = self.util.any_to_datetime(external_last_modified).strftime( + '%Y-%m-%d %H:%M:%S' + ) + self._tql.add_filter( + 'externalLastModified', operator, external_last_modified, TqlType.STRING + ) + + def first_seen(self, operator: Enum, first_seen: Arrow | datetime | int | str): + """Filter First Seen based on **firstSeen** keyword. + + Args: + operator: The operator enum for the filter. + first_seen: The date and time that the group was first seen. + """ + first_seen = self.util.any_to_datetime(first_seen).strftime('%Y-%m-%d %H:%M:%S') + self._tql.add_filter('firstSeen', operator, first_seen, TqlType.STRING) + + def generated_report(self, operator: Enum, generated_report: bool): + """Filter Generated (Report) based on **generatedReport** keyword. + + Args: + operator: The operator enum for the filter. + generated_report: Boolean flag indicating if the Report was auto-generated. + """ + self._tql.add_filter('generatedReport', operator, generated_report, TqlType.BOOLEAN) + + @property + def has_artifact(self): + """Return **ArtifactFilter** for further filtering.""" + # first-party + from tcex.api.tc.v3.artifacts.artifact_filter import ArtifactFilter + + artifacts = ArtifactFilter(Tql()) + self._tql.add_filter('hasArtifact', TqlOperator.EQ, artifacts, TqlType.SUB_QUERY) + return artifacts + + @property + def has_attribute(self): + """Return **GroupAttributeFilter** for further filtering.""" + # first-party + from tcex.api.tc.v3.group_attributes.group_attribute_filter import GroupAttributeFilter + + attributes = GroupAttributeFilter(Tql()) + self._tql.add_filter('hasAttribute', TqlOperator.EQ, attributes, TqlType.SUB_QUERY) + return attributes + + @property + def has_case(self): + """Return **CaseFilter** for further filtering.""" + # first-party + from tcex.api.tc.v3.cases.case_filter import CaseFilter + + cases = CaseFilter(Tql()) + self._tql.add_filter('hasCase', TqlOperator.EQ, cases, TqlType.SUB_QUERY) + return cases + + @property + def has_group(self): + """Return **GroupFilter** for further filtering.""" + # first-party + from tcex.api.tc.v3.groups.group_filter import GroupFilter + + groups = GroupFilter(Tql()) + self._tql.add_filter('hasGroup', TqlOperator.EQ, groups, TqlType.SUB_QUERY) + return groups + + @property + def has_indicator(self): + """Return **IndicatorFilter** for further filtering.""" + # first-party + from tcex.api.tc.v3.indicators.indicator_filter import IndicatorFilter + + indicators = IndicatorFilter(Tql()) + self._tql.add_filter('hasIndicator', TqlOperator.EQ, indicators, TqlType.SUB_QUERY) + return indicators + + def has_intel_query(self, operator: Enum, has_intel_query: int | list): + """Filter Associated User Queries based on **hasIntelQuery** keyword. + + Args: + operator: The operator enum for the filter. + has_intel_query: A nested query for association to User Queries. + """ + if isinstance(has_intel_query, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('hasIntelQuery', operator, has_intel_query, TqlType.INTEGER) + + def has_intel_requirement(self, operator: Enum, has_intel_requirement: int | list): + """Filter Associated Intel Requirement based on **hasIntelRequirement** keyword. + + Args: + operator: The operator enum for the filter. + has_intel_requirement: A nested query for association to intel requirements. + """ + if isinstance(has_intel_requirement, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter( + 'hasIntelRequirement', operator, has_intel_requirement, TqlType.INTEGER + ) + + @property + def has_security_label(self): + """Return **SecurityLabel** for further filtering.""" + # first-party + from tcex.api.tc.v3.security_labels.security_label_filter import SecurityLabelFilter + + security_labels = SecurityLabelFilter(Tql()) + self._tql.add_filter('hasSecurityLabel', TqlOperator.EQ, security_labels, TqlType.SUB_QUERY) + return security_labels + + @property + def has_tag(self): + """Return **TagFilter** for further filtering.""" + # first-party + from tcex.api.tc.v3.tags.tag_filter import TagFilter + + tags = TagFilter(Tql()) + self._tql.add_filter('hasTag', TqlOperator.EQ, tags, TqlType.SUB_QUERY) + return tags + + @property + def has_victim(self): + """Return **VictimFilter** for further filtering.""" + # first-party + from tcex.api.tc.v3.victims.victim_filter import VictimFilter + + victims = VictimFilter(Tql()) + self._tql.add_filter('hasVictim', TqlOperator.EQ, victims, TqlType.SUB_QUERY) + return victims + + @property + def has_victim_asset(self): + """Return **VictimAssetFilter** for further filtering.""" + # first-party + from tcex.api.tc.v3.victim_assets.victim_asset_filter import VictimAssetFilter + + victim_assets = VictimAssetFilter(Tql()) + self._tql.add_filter('hasVictimAsset', TqlOperator.EQ, victim_assets, TqlType.SUB_QUERY) + return victim_assets + + def id(self, operator: Enum, id: int | list): # pylint: disable=redefined-builtin + """Filter ID based on **id** keyword. + + Args: + operator: The operator enum for the filter. + id: The ID of the group. + """ + if isinstance(id, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('id', operator, id, TqlType.INTEGER) + + def is_group(self, operator: Enum, is_group: bool): + """Filter isGroup based on **isGroup** keyword. + + Args: + operator: The operator enum for the filter. + is_group: No description provided. + """ + self._tql.add_filter('isGroup', operator, is_group, TqlType.BOOLEAN) + + def last_modified(self, operator: Enum, last_modified: Arrow | datetime | int | str): + """Filter Last Modified based on **lastModified** keyword. + + Args: + operator: The operator enum for the filter. + last_modified: The date the group was last modified. + """ + last_modified = self.util.any_to_datetime(last_modified).strftime('%Y-%m-%d %H:%M:%S') + self._tql.add_filter('lastModified', operator, last_modified, TqlType.STRING) + + def last_seen(self, operator: Enum, last_seen: Arrow | datetime | int | str): + """Filter Last Seen based on **lastSeen** keyword. + + Args: + operator: The operator enum for the filter. + last_seen: The date and time that the group was last seen. + """ + last_seen = self.util.any_to_datetime(last_seen).strftime('%Y-%m-%d %H:%M:%S') + self._tql.add_filter('lastSeen', operator, last_seen, TqlType.STRING) + + def owner(self, operator: Enum, owner: int | list): + """Filter Owner ID based on **owner** keyword. + + Args: + operator: The operator enum for the filter. + owner: The Owner ID for the group. + """ + if isinstance(owner, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('owner', operator, owner, TqlType.INTEGER) + + def owner_name(self, operator: Enum, owner_name: list | str): + """Filter Owner Name based on **ownerName** keyword. + + Args: + operator: The operator enum for the filter. + owner_name: The owner name for the group. + """ + if isinstance(owner_name, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('ownerName', operator, owner_name, TqlType.STRING) + + def parent_group(self, operator: Enum, parent_group: int | list): + """Filter parentGroup based on **parentGroup** keyword. + + Args: + operator: The operator enum for the filter. + parent_group: No description provided. + """ + if isinstance(parent_group, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('parentGroup', operator, parent_group, TqlType.INTEGER) + + def security_label(self, operator: Enum, security_label: list | str): + """Filter Security Label based on **securityLabel** keyword. + + Args: + operator: The operator enum for the filter. + security_label: The name of a security label applied to the group. + """ + if isinstance(security_label, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('securityLabel', operator, security_label, TqlType.STRING) + + def signature_date_added( + self, operator: Enum, signature_date_added: Arrow | datetime | int | str + ): + """Filter Date Added (Signature) based on **signatureDateAdded** keyword. + + Args: + operator: The operator enum for the filter. + signature_date_added: The date the signature was added. + """ + signature_date_added = self.util.any_to_datetime(signature_date_added).strftime( + '%Y-%m-%d %H:%M:%S' + ) + self._tql.add_filter('signatureDateAdded', operator, signature_date_added, TqlType.STRING) + + def signature_filename(self, operator: Enum, signature_filename: list | str): + """Filter Filename (Signature) based on **signatureFilename** keyword. + + Args: + operator: The operator enum for the filter. + signature_filename: The file name of the signature. + """ + if isinstance(signature_filename, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('signatureFilename', operator, signature_filename, TqlType.STRING) + + def signature_type(self, operator: Enum, signature_type: list | str): + """Filter Type (Signature) based on **signatureType** keyword. + + Args: + operator: The operator enum for the filter. + signature_type: The type of signature. + """ + if isinstance(signature_type, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('signatureType', operator, signature_type, TqlType.STRING) + + def status(self, operator: Enum, status: list | str): + """Filter Status based on **status** keyword. + + Args: + operator: The operator enum for the filter. + status: Status of the group. + """ + if isinstance(status, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('status', operator, status, TqlType.STRING) + + def summary(self, operator: Enum, summary: list | str): + """Filter Summary based on **summary** keyword. + + Args: + operator: The operator enum for the filter. + summary: The summary (name) of the group. + """ + if isinstance(summary, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('summary', operator, summary, TqlType.STRING) + + def tag(self, operator: Enum, tag: list | str): + """Filter Tag based on **tag** keyword. + + Args: + operator: The operator enum for the filter. + tag: The name of a tag applied to the group. + """ + if isinstance(tag, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('tag', operator, tag, TqlType.STRING) + + def tag_owner(self, operator: Enum, tag_owner: int | list): + """Filter Tag Owner ID based on **tagOwner** keyword. + + Args: + operator: The operator enum for the filter. + tag_owner: The ID of the owner of a tag. + """ + if isinstance(tag_owner, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('tagOwner', operator, tag_owner, TqlType.INTEGER) + + def tag_owner_name(self, operator: Enum, tag_owner_name: list | str): + """Filter Tag Owner Name based on **tagOwnerName** keyword. + + Args: + operator: The operator enum for the filter. + tag_owner_name: The name of the owner of a tag. + """ + if isinstance(tag_owner_name, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('tagOwnerName', operator, tag_owner_name, TqlType.STRING) + + def task_assignee(self, operator: Enum, task_assignee: list | str): + """Filter Assignee (Task) based on **taskAssignee** keyword. + + Args: + operator: The operator enum for the filter. + task_assignee: The assignee of the task. + """ + if isinstance(task_assignee, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('taskAssignee', operator, task_assignee, TqlType.STRING) + + def task_assignee_pseudo(self, operator: Enum, task_assignee_pseudo: list | str): + """Filter Assignee Pseudonym (Task) based on **taskAssigneePseudo** keyword. + + Args: + operator: The operator enum for the filter. + task_assignee_pseudo: The pseudonym of the assignee of the task. + """ + if isinstance(task_assignee_pseudo, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('taskAssigneePseudo', operator, task_assignee_pseudo, TqlType.STRING) + + def task_date_added(self, operator: Enum, task_date_added: Arrow | datetime | int | str): + """Filter Date Added (Task) based on **taskDateAdded** keyword. + + Args: + operator: The operator enum for the filter. + task_date_added: The date the task was added. + """ + task_date_added = self.util.any_to_datetime(task_date_added).strftime('%Y-%m-%d %H:%M:%S') + self._tql.add_filter('taskDateAdded', operator, task_date_added, TqlType.STRING) + + def task_due_date(self, operator: Enum, task_due_date: Arrow | datetime | int | str): + """Filter Due Date (Task) based on **taskDueDate** keyword. + + Args: + operator: The operator enum for the filter. + task_due_date: The due date of a task. + """ + task_due_date = self.util.any_to_datetime(task_due_date).strftime('%Y-%m-%d %H:%M:%S') + self._tql.add_filter('taskDueDate', operator, task_due_date, TqlType.STRING) + + def task_escalated(self, operator: Enum, task_escalated: bool): + """Filter Escalated (Task) based on **taskEscalated** keyword. + + Args: + operator: The operator enum for the filter. + task_escalated: A flag indicating if a task has been escalated. + """ + self._tql.add_filter('taskEscalated', operator, task_escalated, TqlType.BOOLEAN) + + def task_escalation_date( + self, operator: Enum, task_escalation_date: Arrow | datetime | int | str + ): + """Filter Escalation Date (Task) based on **taskEscalationDate** keyword. + + Args: + operator: The operator enum for the filter. + task_escalation_date: The escalation date of a task. + """ + task_escalation_date = self.util.any_to_datetime(task_escalation_date).strftime( + '%Y-%m-%d %H:%M:%S' + ) + self._tql.add_filter('taskEscalationDate', operator, task_escalation_date, TqlType.STRING) + + def task_last_modified(self, operator: Enum, task_last_modified: Arrow | datetime | int | str): + """Filter Last Modified based on **taskLastModified** keyword. + + Args: + operator: The operator enum for the filter. + task_last_modified: The date the group was last modified. + """ + task_last_modified = self.util.any_to_datetime(task_last_modified).strftime( + '%Y-%m-%d %H:%M:%S' + ) + self._tql.add_filter('taskLastModified', operator, task_last_modified, TqlType.STRING) + + def task_overdue(self, operator: Enum, task_overdue: bool): + """Filter Overdue (Task) based on **taskOverdue** keyword. + + Args: + operator: The operator enum for the filter. + task_overdue: A flag indicating if a task has become overdue. + """ + self._tql.add_filter('taskOverdue', operator, task_overdue, TqlType.BOOLEAN) + + def task_reminded(self, operator: Enum, task_reminded: bool): + """Filter Reminded (Task) based on **taskReminded** keyword. + + Args: + operator: The operator enum for the filter. + task_reminded: A flag indicating if a task has been reminded. + """ + self._tql.add_filter('taskReminded', operator, task_reminded, TqlType.BOOLEAN) + + def task_reminder_date(self, operator: Enum, task_reminder_date: Arrow | datetime | int | str): + """Filter Reminder Date (Task) based on **taskReminderDate** keyword. + + Args: + operator: The operator enum for the filter. + task_reminder_date: The reminder date of a task. + """ + task_reminder_date = self.util.any_to_datetime(task_reminder_date).strftime( + '%Y-%m-%d %H:%M:%S' + ) + self._tql.add_filter('taskReminderDate', operator, task_reminder_date, TqlType.STRING) + + def task_status(self, operator: Enum, task_status: list | str): + """Filter Status (Task) based on **taskStatus** keyword. + + Args: + operator: The operator enum for the filter. + task_status: The status of the task. + """ + if isinstance(task_status, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('taskStatus', operator, task_status, TqlType.STRING) + + def type(self, operator: Enum, type: int | list): # pylint: disable=redefined-builtin + """Filter Type based on **type** keyword. + + Args: + operator: The operator enum for the filter. + type: The ID of the group type. + """ + if isinstance(type, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('type', operator, type, TqlType.INTEGER) + + def type_name(self, operator: Enum, type_name: list | str): + """Filter Type Name based on **typeName** keyword. + + Args: + operator: The operator enum for the filter. + type_name: The name of the group type. + """ + if isinstance(type_name, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('typeName', operator, type_name, TqlType.STRING) + + def upvote_count(self, operator: Enum, upvote_count: int | list): + """Filter Upvote Count based on **upvoteCount** keyword. + + Args: + operator: The operator enum for the filter. + upvote_count: The number of upvotes the group has received. + """ + if isinstance(upvote_count, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('upvoteCount', operator, upvote_count, TqlType.INTEGER) + + def victim_asset(self, operator: Enum, victim_asset: list | str): + """Filter victimAsset based on **victimAsset** keyword. + + Args: + operator: The operator enum for the filter. + victim_asset: No description provided. + """ + if isinstance(victim_asset, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('victimAsset', operator, victim_asset, TqlType.STRING) diff --git a/tcex/api/tc/v3/intel_requirements/intel_requirement_model.py b/tcex/api/tc/v3/intel_requirements/intel_requirement_model.py new file mode 100644 index 000000000..5087b8b2c --- /dev/null +++ b/tcex/api/tc/v3/intel_requirements/intel_requirement_model.py @@ -0,0 +1,287 @@ +"""TcEx Framework Module""" +# pylint: disable=no-member,no-self-argument,wrong-import-position +# standard library +from datetime import datetime + +# third-party +from pydantic import BaseModel, Extra, Field, PrivateAttr, validator + +# first-party +from tcex.api.tc.v3.v3_model_abc import V3ModelABC +from tcex.util import Util + + +class IntelRequirementModel( + V3ModelABC, + alias_generator=Util().snake_to_camel, + extra=Extra.allow, + title='IntelRequirement Model', + validate_assignment=True, +): + """Intel_Requirement Model""" + + _associated_type = PrivateAttr(False) + _cm_type = PrivateAttr(False) + _shared_type = PrivateAttr(False) + _staged = PrivateAttr(False) + + associated_artifacts: 'ArtifactsModel' = Field( + None, + description='A list of Artifacts associated with this Group.', + methods=['POST', 'PUT'], + read_only=False, + title='associatedArtifacts', + ) + associated_cases: 'CasesModel' = Field( + None, + description='A list of Cases associated with this Group.', + methods=['POST', 'PUT'], + read_only=False, + title='associatedCases', + ) + associated_groups: 'GroupsModel' = Field( + None, + description='A list of groups associated with this group.', + methods=['POST', 'PUT'], + read_only=False, + title='associatedGroups', + ) + associated_indicators: 'IndicatorsModel' = Field( + None, + description='A list of indicators associated with this group.', + methods=['POST', 'PUT'], + read_only=False, + title='associatedIndicators', + ) + associated_victim_assets: 'VictimAssetsModel' = Field( + None, + description='A list of victim assets associated with this group.', + methods=['POST', 'PUT'], + read_only=False, + title='associatedVictimAssets', + ) + category: 'IntelReqTypeModel' = Field( + None, + description='The category of the intel requirement.', + methods=['POST', 'PUT'], + read_only=False, + title='category', + ) + created_by: 'UserModel' = Field( + None, + allow_mutation=False, + description='The user who created the intel requirement.', + read_only=True, + title='createdBy', + ) + date_added: datetime | None = Field( + None, + allow_mutation=False, + description='The date and time that the item was first created.', + read_only=True, + title='dateAdded', + ) + description: str | None = Field( + None, + description='The description of the intel requirement.', + methods=['POST', 'PUT'], + read_only=False, + title='description', + ) + id: int | None = Field( + None, + description='The ID of the item.', + read_only=True, + title='id', + ) + keyword_sections: list['KeywordSectionModel'] = Field( + None, + description='The section of the intel requirement that contains the keywords.', + methods=['POST', 'PUT'], + read_only=False, + title='keywordSections', + ) + last_modified: datetime | None = Field( + None, + allow_mutation=False, + description='The date and time that the Entity was last modified.', + read_only=True, + title='lastModified', + ) + last_retrieved_date: datetime | None = Field( + None, + allow_mutation=False, + description='The last date the results were retrieved for the intel requirement.', + read_only=True, + title='lastRetrievedDate', + ) + requirement_text: str | None = Field( + None, + description='The detailed text of the intel requirement.', + methods=['POST', 'PUT'], + read_only=False, + title='requirementText', + ) + reset_results: bool = Field( + None, + description='Flag to reset results when updating keywords.', + methods=['POST', 'PUT'], + read_only=False, + title='resetResults', + ) + results_link: str | None = Field( + None, + allow_mutation=False, + description='A link to the results for the intel requirement.', + read_only=True, + title='resultsLink', + ) + subtype: 'IntelReqTypeModel' = Field( + None, + description='The subtype of the intel requirement.', + methods=['POST', 'PUT'], + read_only=False, + title='subtype', + ) + tags: 'TagsModel' = Field( + None, + description=( + 'A list of Tags corresponding to the item (NOTE: Setting this parameter will replace ' + 'any existing tag(s) with the one(s) specified).' + ), + methods=['POST', 'PUT'], + read_only=False, + title='tags', + ) + unique_id: str | None = Field( + None, + description='The unique id of the intel requirement.', + methods=['POST', 'PUT'], + read_only=False, + title='uniqueId', + ) + web_link: str | None = Field( + None, + allow_mutation=False, + description='A link to the ThreatConnect details page for this entity.', + read_only=True, + title='webLink', + ) + xid: str | None = Field( + None, + description='The xid of the item.', + methods=['POST', 'PUT'], + read_only=False, + title='xid', + ) + + @validator('associated_artifacts', always=True, pre=True) + def _validate_artifacts(cls, v): + if not v: + return ArtifactsModel() # type: ignore + return v + + @validator('associated_cases', always=True, pre=True) + def _validate_cases(cls, v): + if not v: + return CasesModel() # type: ignore + return v + + @validator('associated_groups', always=True, pre=True) + def _validate_groups(cls, v): + if not v: + return GroupsModel() # type: ignore + return v + + @validator('associated_indicators', always=True, pre=True) + def _validate_indicators(cls, v): + if not v: + return IndicatorsModel() # type: ignore + return v + + @validator('category', 'subtype', always=True, pre=True) + def _validate_intel_req_type(cls, v): + if not v: + return IntelReqTypeModel() # type: ignore + return v + + @validator('keyword_sections', always=True, pre=True) + def _validate_keyword_section(cls, v): + if not v: + return list['KeywordSectionModel']() # type: ignore + return v + + @validator('tags', always=True, pre=True) + def _validate_tags(cls, v): + if not v: + return TagsModel() # type: ignore + return v + + @validator('created_by', always=True, pre=True) + def _validate_user(cls, v): + if not v: + return UserModel() # type: ignore + return v + + @validator('associated_victim_assets', always=True, pre=True) + def _validate_victim_assets(cls, v): + if not v: + return VictimAssetsModel() # type: ignore + return v + + +class IntelRequirementDataModel( + BaseModel, + title='IntelRequirement Data Model', + alias_generator=Util().snake_to_camel, + validate_assignment=True, +): + """Intel_Requirements Data Model""" + + data: list[IntelRequirementModel] | None = Field( + [], + description='The data for the IntelRequirements.', + methods=['POST', 'PUT'], + title='data', + ) + + +class IntelRequirementsModel( + BaseModel, + title='IntelRequirements Model', + alias_generator=Util().snake_to_camel, + validate_assignment=True, +): + """Intel_Requirements Model""" + + _mode_support = PrivateAttr(False) + + data: list[IntelRequirementModel] | None = Field( + [], + description='The data for the IntelRequirements.', + methods=['POST', 'PUT'], + title='data', + ) + mode: str = Field( + 'append', + description='The PUT mode for nested objects (append, delete, replace). Default: append', + methods=['POST', 'PUT'], + title='append', + ) + + +# first-party +from tcex.api.tc.v3.artifacts.artifact_model import ArtifactsModel +from tcex.api.tc.v3.cases.case_model import CasesModel +from tcex.api.tc.v3.groups.group_model import GroupsModel +from tcex.api.tc.v3.indicators.indicator_model import IndicatorsModel +from tcex.api.tc.v3.intel_requirements.intel_req_type_model import IntelReqTypeModel +from tcex.api.tc.v3.intel_requirements.keyword_section_model import KeywordSectionModel +from tcex.api.tc.v3.security.users.user_model import UserModel +from tcex.api.tc.v3.tags.tag_model import TagsModel +from tcex.api.tc.v3.victim_assets.victim_asset_model import VictimAssetsModel + +# add forward references +IntelRequirementDataModel.update_forward_refs() +IntelRequirementModel.update_forward_refs() +IntelRequirementsModel.update_forward_refs() diff --git a/tcex/api/tc/v3/intel_requirements/keyword_section_model.py b/tcex/api/tc/v3/intel_requirements/keyword_section_model.py new file mode 100644 index 000000000..90905042c --- /dev/null +++ b/tcex/api/tc/v3/intel_requirements/keyword_section_model.py @@ -0,0 +1,37 @@ +"""TcEx Framework Module""" +# pylint: disable=no-member,no-self-argument,wrong-import-position + +# third-party +from pydantic import BaseModel + +# first-party +from tcex.api.tc.v3.v3_model_abc import V3ModelABC +from tcex.util import Util + + +class KeywordModel(BaseModel): + """Model Definition""" + + value: str | None + + +class KeywordSectionModel( + V3ModelABC, + title='Keyword Section Model', + alias_generator=Util().snake_to_camel, + validate_assignment=True, +): + """Model Definition + + { + "compareValue": "includes", + "keywords": [ + { + "value": "test" + } + ] + } + """ + + compareValue: str | None + keywords: list[KeywordModel] | None diff --git a/tcex/api/tc/v3/intel_requirements/results/result.py b/tcex/api/tc/v3/intel_requirements/results/result.py new file mode 100644 index 000000000..1a95c5114 --- /dev/null +++ b/tcex/api/tc/v3/intel_requirements/results/result.py @@ -0,0 +1,98 @@ +"""TcEx Framework Module""" +# standard library +from collections.abc import Iterator + +# first-party +from tcex.api.tc.v3.api_endpoints import ApiEndpoints +from tcex.api.tc.v3.intel_requirements.results.result_filter import ResultFilter +from tcex.api.tc.v3.intel_requirements.results.result_model import ResultModel, ResultsModel +from tcex.api.tc.v3.object_abc import ObjectABC +from tcex.api.tc.v3.object_collection_abc import ObjectCollectionABC + + +class Result(ObjectABC): + """Results Object. + + Args: + archived (bool, kwargs): Has the result been archived? + associated (bool, kwargs): Has the result been associated to an entity within Threatconnect? + false_positive (bool, kwargs): Is the result declared false positive? + """ + + def __init__(self, **kwargs): + """Initialize instance properties.""" + super().__init__(kwargs.pop('session', None)) + + # properties + self._model: ResultModel = ResultModel(**kwargs) + self._nested_field_name = 'results' + self._nested_filter = 'has_result' + self.type_ = 'Result' + + @property + def _api_endpoint(self) -> str: + """Return the type specific API endpoint.""" + return ApiEndpoints.RESULTS.value + + @property + def model(self) -> ResultModel: + """Return the model data.""" + return self._model + + @model.setter + def model(self, data: dict | ResultModel): + """Create model using the provided data.""" + if isinstance(data, type(self.model)): + # provided data is already a model, nothing required to change + self._model = data + elif isinstance(data, dict): + # provided data is raw response, load the model + self._model = type(self.model)(**data) + else: + raise RuntimeError(f'Invalid data type: {type(data)} provided.') + + @property + def as_entity(self) -> dict: + """Return the entity representation of the object.""" + type_ = self.type_ + + return {'type': type_, 'id': self.model.id, 'value': self.model.name} + + +class Results(ObjectCollectionABC): + """Results Collection. + + # Example of params input + { + 'result_limit': 100, # Limit the retrieved results. + 'result_start': 10, # Starting count used for pagination. + 'fields': ['caseId', 'summary'] # Select additional return fields. + } + + Args: + session (Session): Session object configured with TC API Auth. + tql_filters (list): List of TQL filters. + params (dict): Additional query params (see example above). + """ + + def __init__(self, **kwargs): + """Initialize instance properties.""" + super().__init__( + kwargs.pop('session', None), kwargs.pop('tql_filter', None), kwargs.pop('params', None) + ) + self._model = ResultsModel(**kwargs) + self.type_ = 'results' + + def __iter__(self) -> Iterator[Result]: + """Return CM objects.""" + return self.iterate(base_class=Result) # type: ignore + + @property + def _api_endpoint(self) -> str: + """Return the type specific API endpoint.""" + return ApiEndpoints.RESULTS.value + + @property + def filter(self) -> ResultFilter: + """Return the type specific filter object.""" + return ResultFilter(self.tql) diff --git a/tcex/api/tc/v3/intel_requirements/results/result_filter.py b/tcex/api/tc/v3/intel_requirements/results/result_filter.py new file mode 100644 index 000000000..a512ae1a4 --- /dev/null +++ b/tcex/api/tc/v3/intel_requirements/results/result_filter.py @@ -0,0 +1,217 @@ +"""TcEx Framework Module""" +# standard library +from datetime import datetime +from enum import Enum + +# third-party +from arrow import Arrow + +# first-party +from tcex.api.tc.v3.api_endpoints import ApiEndpoints +from tcex.api.tc.v3.filter_abc import FilterABC +from tcex.api.tc.v3.tql.tql_type import TqlType + + +class ResultFilter(FilterABC): + """Filter Object for Results""" + + @property + def _api_endpoint(self) -> str: + """Return the API endpoint.""" + return ApiEndpoints.RESULTS.value + + def archived_date(self, operator: Enum, archived_date: Arrow | datetime | int | str): + """Filter Archived Date based on **archivedDate** keyword. + + Args: + operator: The operator enum for the filter. + archived_date: The date the result was archived. + """ + archived_date = self.util.any_to_datetime(archived_date).strftime('%Y-%m-%d %H:%M:%S') + self._tql.add_filter('archivedDate', operator, archived_date, TqlType.STRING) + + def has_intel_requirement(self, operator: Enum, has_intel_requirement: int | list): + """Filter Parent Intel Requirement based on **hasIntelRequirement** keyword. + + Args: + operator: The operator enum for the filter. + has_intel_requirement: A nested query to identify results to intel requirements. + """ + if isinstance(has_intel_requirement, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter( + 'hasIntelRequirement', operator, has_intel_requirement, TqlType.INTEGER + ) + + def id(self, operator: Enum, id: int | list): # pylint: disable=redefined-builtin + """Filter ID based on **id** keyword. + + Args: + operator: The operator enum for the filter. + id: The ID of the intel query result. + """ + if isinstance(id, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('id', operator, id, TqlType.INTEGER) + + def intel_id(self, operator: Enum, intel_id: int | list): + """Filter Intel ID based on **intelId** keyword. + + Args: + operator: The operator enum for the filter. + intel_id: The ID of the entity related to the result. + """ + if isinstance(intel_id, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('intelId', operator, intel_id, TqlType.INTEGER) + + def intel_req_id(self, operator: Enum, intel_req_id: int | list): + """Filter ID based on **intelReqId** keyword. + + Args: + operator: The operator enum for the filter. + intel_req_id: The ID of the intel requirement. + """ + if isinstance(intel_req_id, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('intelReqId', operator, intel_req_id, TqlType.INTEGER) + + def intel_type(self, operator: Enum, intel_type: list | str): + """Filter Intel Type based on **intelType** keyword. + + Args: + operator: The operator enum for the filter. + intel_type: The intel type of the result. + """ + if isinstance(intel_type, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('intelType', operator, intel_type, TqlType.STRING) + + def is_archived(self, operator: Enum, is_archived: bool): + """Filter Archived Flag based on **isArchived** keyword. + + Args: + operator: The operator enum for the filter. + is_archived: A true/false indicating if the result has been archived. + """ + self._tql.add_filter('isArchived', operator, is_archived, TqlType.BOOLEAN) + + def is_associated(self, operator: Enum, is_associated: bool): + """Filter Associated based on **isAssociated** keyword. + + Args: + operator: The operator enum for the filter. + is_associated: A true/false indicating if the result has been associated. + """ + self._tql.add_filter('isAssociated', operator, is_associated, TqlType.BOOLEAN) + + def is_false_positive(self, operator: Enum, is_false_positive: bool): + """Filter False Positive based on **isFalsePositive** keyword. + + Args: + operator: The operator enum for the filter. + is_false_positive: A true/false indicating if the result has been flagged as false + positive. + """ + self._tql.add_filter('isFalsePositive', operator, is_false_positive, TqlType.BOOLEAN) + + def is_local(self, operator: Enum, is_local: bool): + """Filter Sourced Internally based on **isLocal** keyword. + + Args: + operator: The operator enum for the filter. + is_local: A true/false indicating if the result came from ThreatConnect repository. + """ + self._tql.add_filter('isLocal', operator, is_local, TqlType.BOOLEAN) + + def last_matched_date(self, operator: Enum, last_matched_date: Arrow | datetime | int | str): + """Filter Last Matched Date based on **lastMatchedDate** keyword. + + Args: + operator: The operator enum for the filter. + last_matched_date: The date the result last matched the keyword query. + """ + last_matched_date = self.util.any_to_datetime(last_matched_date).strftime( + '%Y-%m-%d %H:%M:%S' + ) + self._tql.add_filter('lastMatchedDate', operator, last_matched_date, TqlType.STRING) + + def owner(self, operator: Enum, owner: int | list): + """Filter Owner ID based on **owner** keyword. + + Args: + operator: The operator enum for the filter. + owner: The Owner ID for the result. + """ + if isinstance(owner, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('owner', operator, owner, TqlType.INTEGER) + + def owner_name(self, operator: Enum, owner_name: list | str): + """Filter Owner Name based on **ownerName** keyword. + + Args: + operator: The operator enum for the filter. + owner_name: The owner name for the result. + """ + if isinstance(owner_name, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('ownerName', operator, owner_name, TqlType.STRING) + + def score(self, operator: Enum, score: float | list): + """Filter Score based on **score** keyword. + + Args: + operator: The operator enum for the filter. + score: The weighted score in the relevancy of the result. + """ + if isinstance(score, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('score', operator, score, TqlType.FLOAT) + + def summary(self, operator: Enum, summary: list | str): + """Filter Name based on **summary** keyword. + + Args: + operator: The operator enum for the filter. + summary: The summary of the result. + """ + if isinstance(summary, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('summary', operator, summary, TqlType.STRING) diff --git a/tcex/api/tc/v3/intel_requirements/results/result_model.py b/tcex/api/tc/v3/intel_requirements/results/result_model.py new file mode 100644 index 000000000..f43566b53 --- /dev/null +++ b/tcex/api/tc/v3/intel_requirements/results/result_model.py @@ -0,0 +1,184 @@ +"""TcEx Framework Module""" +# pylint: disable=no-member,no-self-argument,wrong-import-position +# standard library +from datetime import datetime + +# third-party +from pydantic import BaseModel, Extra, Field, PrivateAttr + +# first-party +from tcex.api.tc.v3.v3_model_abc import V3ModelABC +from tcex.util import Util + + +class ResultModel( + V3ModelABC, + alias_generator=Util().snake_to_camel, + extra=Extra.allow, + title='Result Model', + validate_assignment=True, +): + """Result Model""" + + _associated_type = PrivateAttr(False) + _cm_type = PrivateAttr(False) + _shared_type = PrivateAttr(False) + _staged = PrivateAttr(False) + + archived: bool = Field( + None, + description='Has the result been archived?', + methods=['POST', 'PUT'], + read_only=False, + title='archived', + ) + archived_date: datetime | None = Field( + None, + allow_mutation=False, + description='The date and time that the Entity was archived.', + read_only=True, + title='archivedDate', + ) + associated: bool = Field( + None, + description='Has the result been associated to an entity within Threatconnect?', + methods=['POST', 'PUT'], + read_only=False, + title='associated', + ) + false_positive: bool = Field( + None, + description='Is the result declared false positive?', + methods=['POST', 'PUT'], + read_only=False, + title='falsePositive', + ) + id: int | None = Field( + None, + description='The ID of the item.', + read_only=True, + title='id', + ) + intel_req_id: int | None = Field( + None, + allow_mutation=False, + description='The id of the intel requirement that the result is associated.', + read_only=True, + title='intelReqId', + ) + intel_requirement: dict | None = Field( + None, + allow_mutation=False, + description='The intel requirement associated to the result.', + read_only=True, + title='intelRequirement', + ) + internal: bool = Field( + None, + allow_mutation=False, + description='Is the result sourced internally from Threatconnect.', + read_only=True, + title='internal', + ) + item_id: int | None = Field( + None, + allow_mutation=False, + description='The id of the entity that matched the result.', + read_only=True, + title='itemId', + ) + item_type: str | None = Field( + None, + allow_mutation=False, + description='The type of the entity that matched the result.', + read_only=True, + title='itemType', + ) + matched_date: datetime | None = Field( + None, + allow_mutation=False, + description='The date and time that the result last matched with the intel requirement.', + read_only=True, + title='matchedDate', + ) + name: str | None = Field( + None, + allow_mutation=False, + description='The name of the result.', + read_only=True, + title='name', + ) + origin: str | None = Field( + None, + allow_mutation=False, + description='The origin of the result if derived from an internal or external source.', + read_only=True, + title='origin', + ) + owner_id: int | None = Field( + None, + allow_mutation=False, + description='The organization id that the result belongs.', + read_only=True, + title='ownerId', + ) + owner_name: str | None = Field( + None, + allow_mutation=False, + description='The organization name that the result belongs.', + read_only=True, + title='ownerName', + ) + score: int | None = Field( + None, + allow_mutation=False, + description='The relevancy score.', + read_only=True, + title='score', + ) + + +class ResultDataModel( + BaseModel, + title='Result Data Model', + alias_generator=Util().snake_to_camel, + validate_assignment=True, +): + """Results Data Model""" + + data: list[ResultModel] | None = Field( + [], + description='The data for the Results.', + methods=['POST', 'PUT'], + title='data', + ) + + +class ResultsModel( + BaseModel, + title='Results Model', + alias_generator=Util().snake_to_camel, + validate_assignment=True, +): + """Results Model""" + + _mode_support = PrivateAttr(False) + + data: list[ResultModel] | None = Field( + [], + description='The data for the Results.', + methods=['POST', 'PUT'], + title='data', + ) + mode: str = Field( + 'append', + description='The PUT mode for nested objects (append, delete, replace). Default: append', + methods=['POST', 'PUT'], + title='append', + ) + + +# add forward references +ResultDataModel.update_forward_refs() +ResultModel.update_forward_refs() +ResultsModel.update_forward_refs() diff --git a/tcex/api/tc/v3/intel_requirements/subtypes/subtype.py b/tcex/api/tc/v3/intel_requirements/subtypes/subtype.py new file mode 100644 index 000000000..e91670aa3 --- /dev/null +++ b/tcex/api/tc/v3/intel_requirements/subtypes/subtype.py @@ -0,0 +1,97 @@ +"""TcEx Framework Module""" +# standard library +from collections.abc import Iterator + +# first-party +from tcex.api.tc.v3.api_endpoints import ApiEndpoints +from tcex.api.tc.v3.intel_requirements.subtypes.subtype_filter import SubtypeFilter +from tcex.api.tc.v3.intel_requirements.subtypes.subtype_model import SubtypeModel, SubtypesModel +from tcex.api.tc.v3.object_abc import ObjectABC +from tcex.api.tc.v3.object_collection_abc import ObjectCollectionABC + + +class Subtype(ObjectABC): + """Subtypes Object. + + Args: + description (str, kwargs): The description of the subtype/category. + name (str, kwargs): The details of the subtype/category. + """ + + def __init__(self, **kwargs): + """Initialize instance properties.""" + super().__init__(kwargs.pop('session', None)) + + # properties + self._model: SubtypeModel = SubtypeModel(**kwargs) + self._nested_field_name = 'subtypes' + self._nested_filter = 'has_subtype' + self.type_ = 'Subtype' + + @property + def _api_endpoint(self) -> str: + """Return the type specific API endpoint.""" + return ApiEndpoints.SUBTYPES.value + + @property + def model(self) -> SubtypeModel: + """Return the model data.""" + return self._model + + @model.setter + def model(self, data: dict | SubtypeModel): + """Create model using the provided data.""" + if isinstance(data, type(self.model)): + # provided data is already a model, nothing required to change + self._model = data + elif isinstance(data, dict): + # provided data is raw response, load the model + self._model = type(self.model)(**data) + else: + raise RuntimeError(f'Invalid data type: {type(data)} provided.') + + @property + def as_entity(self) -> dict: + """Return the entity representation of the object.""" + type_ = self.type_ + + return {'type': type_, 'id': self.model.id, 'value': self.model.name} + + +class Subtypes(ObjectCollectionABC): + """Subtypes Collection. + + # Example of params input + { + 'result_limit': 100, # Limit the retrieved results. + 'result_start': 10, # Starting count used for pagination. + 'fields': ['caseId', 'summary'] # Select additional return fields. + } + + Args: + session (Session): Session object configured with TC API Auth. + tql_filters (list): List of TQL filters. + params (dict): Additional query params (see example above). + """ + + def __init__(self, **kwargs): + """Initialize instance properties.""" + super().__init__( + kwargs.pop('session', None), kwargs.pop('tql_filter', None), kwargs.pop('params', None) + ) + self._model = SubtypesModel(**kwargs) + self.type_ = 'subtypes' + + def __iter__(self) -> Iterator[Subtype]: + """Return CM objects.""" + return self.iterate(base_class=Subtype) # type: ignore + + @property + def _api_endpoint(self) -> str: + """Return the type specific API endpoint.""" + return ApiEndpoints.SUBTYPES.value + + @property + def filter(self) -> SubtypeFilter: + """Return the type specific filter object.""" + return SubtypeFilter(self.tql) diff --git a/tcex/api/tc/v3/intel_requirements/subtypes/subtype_filter.py b/tcex/api/tc/v3/intel_requirements/subtypes/subtype_filter.py new file mode 100644 index 000000000..6011f9d87 --- /dev/null +++ b/tcex/api/tc/v3/intel_requirements/subtypes/subtype_filter.py @@ -0,0 +1,62 @@ +"""TcEx Framework Module""" +# standard library +from enum import Enum + +# first-party +from tcex.api.tc.v3.api_endpoints import ApiEndpoints +from tcex.api.tc.v3.filter_abc import FilterABC +from tcex.api.tc.v3.tql.tql_type import TqlType + + +class SubtypeFilter(FilterABC): + """Filter Object for Subtypes""" + + @property + def _api_endpoint(self) -> str: + """Return the API endpoint.""" + return ApiEndpoints.SUBTYPES.value + + def description(self, operator: Enum, description: list | str): + """Filter Description based on **description** keyword. + + Args: + operator: The operator enum for the filter. + description: The description of the subtype. + """ + if isinstance(description, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('description', operator, description, TqlType.STRING) + + def id(self, operator: Enum, id: int | list): # pylint: disable=redefined-builtin + """Filter ID based on **id** keyword. + + Args: + operator: The operator enum for the filter. + id: The ID of the subtype. + """ + if isinstance(id, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('id', operator, id, TqlType.INTEGER) + + def name(self, operator: Enum, name: list | str): + """Filter Name based on **name** keyword. + + Args: + operator: The operator enum for the filter. + name: The name of the subtype. + """ + if isinstance(name, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('name', operator, name, TqlType.STRING) diff --git a/tcex/api/tc/v3/intel_requirements/subtypes/subtype_model.py b/tcex/api/tc/v3/intel_requirements/subtypes/subtype_model.py new file mode 100644 index 000000000..53276b4b1 --- /dev/null +++ b/tcex/api/tc/v3/intel_requirements/subtypes/subtype_model.py @@ -0,0 +1,90 @@ +"""TcEx Framework Module""" +# pylint: disable=no-member,no-self-argument,wrong-import-position +# third-party +from pydantic import BaseModel, Extra, Field, PrivateAttr + +# first-party +from tcex.api.tc.v3.v3_model_abc import V3ModelABC +from tcex.util import Util + + +class SubtypeModel( + V3ModelABC, + alias_generator=Util().snake_to_camel, + extra=Extra.allow, + title='Subtype Model', + validate_assignment=True, +): + """Subtype Model""" + + _associated_type = PrivateAttr(False) + _cm_type = PrivateAttr(False) + _shared_type = PrivateAttr(False) + _staged = PrivateAttr(False) + + description: str | None = Field( + None, + description='The description of the subtype/category.', + methods=['POST', 'PUT'], + read_only=False, + title='description', + ) + id: int | None = Field( + None, + description='The ID of the item.', + read_only=True, + title='id', + ) + name: str | None = Field( + None, + description='The details of the subtype/category.', + methods=['POST', 'PUT'], + read_only=False, + title='name', + ) + + +class SubtypeDataModel( + BaseModel, + title='Subtype Data Model', + alias_generator=Util().snake_to_camel, + validate_assignment=True, +): + """Subtypes Data Model""" + + data: list[SubtypeModel] | None = Field( + [], + description='The data for the Subtypes.', + methods=['POST', 'PUT'], + title='data', + ) + + +class SubtypesModel( + BaseModel, + title='Subtypes Model', + alias_generator=Util().snake_to_camel, + validate_assignment=True, +): + """Subtypes Model""" + + _mode_support = PrivateAttr(False) + + data: list[SubtypeModel] | None = Field( + [], + description='The data for the Subtypes.', + methods=['POST', 'PUT'], + title='data', + ) + mode: str = Field( + 'append', + description='The PUT mode for nested objects (append, delete, replace). Default: append', + methods=['POST', 'PUT'], + title='append', + ) + + +# add forward references +SubtypeDataModel.update_forward_refs() +SubtypeModel.update_forward_refs() +SubtypesModel.update_forward_refs() diff --git a/tcex/api/tc/v3/security/users/user_filter.py b/tcex/api/tc/v3/security/users/user_filter.py index 7837fa657..5df2af656 100644 --- a/tcex/api/tc/v3/security/users/user_filter.py +++ b/tcex/api/tc/v3/security/users/user_filter.py @@ -157,8 +157,8 @@ def password_reset_required(self, operator: Enum, password_reset_required: bool) Args: operator: The operator enum for the filter. - password_reset_required: A flag indicating whether or not the user's password needs to - be reset upon next login. + password_reset_required: A flag indicating whether or not + the user's password needs to be reset upon next login. """ self._tql.add_filter( 'passwordResetRequired', operator, password_reset_required, TqlType.BOOLEAN diff --git a/tcex/api/tc/v3/tags/tag_filter.py b/tcex/api/tc/v3/tags/tag_filter.py index ac438909b..cde0ede03 100644 --- a/tcex/api/tc/v3/tags/tag_filter.py +++ b/tcex/api/tc/v3/tags/tag_filter.py @@ -242,7 +242,7 @@ def technique_id(self, operator: Enum, technique_id: list | str): Args: operator: The operator enum for the filter. - technique_id: The standard ID for specific MITRE ATT&CK techniques and sub-techniques. + technique_id: The standard ID for specific MITRE ATT&CK techniques and subtechniques. """ if isinstance(technique_id, list) and operator not in self.list_types: raise RuntimeError( diff --git a/tcex/api/tc/v3/tasks/task_model.py b/tcex/api/tc/v3/tasks/task_model.py index 8eca69710..f7e53dfbc 100644 --- a/tcex/api/tc/v3/tasks/task_model.py +++ b/tcex/api/tc/v3/tasks/task_model.py @@ -170,8 +170,6 @@ class TaskModel( None, description='The phase of the workflow.', methods=['POST'], - maximum=127, - minimum=0, read_only=False, title='workflowPhase', ) @@ -179,8 +177,6 @@ class TaskModel( None, description='The step of the workflow.', methods=['POST'], - maximum=127, - minimum=1, read_only=False, title='workflowStep', ) diff --git a/tcex/api/tc/v3/tql/tql.py b/tcex/api/tc/v3/tql/tql.py index 9631deac5..1531d6b5a 100644 --- a/tcex/api/tc/v3/tql/tql.py +++ b/tcex/api/tc/v3/tql/tql.py @@ -27,7 +27,7 @@ def as_str(self): filters.append(f'''{keyword}({value._tql.as_str})''') except Exception: if isinstance(value, list): - if tql_filter.get('type') == TqlType.INTEGER: + if tql_filter.get('type') in (TqlType.FLOAT, TqlType.INTEGER): value = [str(int_) for int_ in value] elif tql_filter.get('type') == TqlType.STRING: value = [f'"{str(str_)}"' for str_ in value] @@ -53,7 +53,7 @@ def add_filter( self, keyword: str, operator: Enum | str, - value: int | list | str | FilterABC, + value: int | float | list | str | FilterABC, type_: TqlType | None = TqlType.STRING, ): """Add a filter to the current obj diff --git a/tcex/api/tc/v3/tql/tql_type.py b/tcex/api/tc/v3/tql/tql_type.py index 5a937234e..81d73e563 100644 --- a/tcex/api/tc/v3/tql/tql_type.py +++ b/tcex/api/tc/v3/tql/tql_type.py @@ -7,7 +7,8 @@ class TqlType(Enum): """ThreatConnect API TQL Types""" - STRING = 'String' - INTEGER = 'Integer' BOOLEAN = 'Boolean' + FLOAT = 'Float' + INTEGER = 'Integer' + STRING = 'String' SUB_QUERY = 'Sub Query' diff --git a/tcex/api/tc/v3/v3.py b/tcex/api/tc/v3/v3.py index aa671a629..a80cce654 100644 --- a/tcex/api/tc/v3/v3.py +++ b/tcex/api/tc/v3/v3.py @@ -3,6 +3,7 @@ # first-party from tcex.api.tc.v3.attribute_types.attribute_type import AttributeType, AttributeTypes from tcex.api.tc.v3.case_management.case_management import CaseManagement +from tcex.api.tc.v3.intel_requirement.ir import IR from tcex.api.tc.v3.security.security import Security from tcex.api.tc.v3.threat_intelligence.threat_intelligence import ThreatIntelligence @@ -27,6 +28,11 @@ def cm(self) -> CaseManagement: """Return Case Management API collection.""" return CaseManagement(self.session) + @property + def ir(self) -> IR: + """Return Intel Requirement API collection.""" + return IR(self.session) + @property def security(self) -> Security: """Return Security API collection.""" diff --git a/tcex/api/tc/v3/workflow_templates/workflow_template_model.py b/tcex/api/tc/v3/workflow_templates/workflow_template_model.py index 4f101af4a..74c77fdcc 100644 --- a/tcex/api/tc/v3/workflow_templates/workflow_template_model.py +++ b/tcex/api/tc/v3/workflow_templates/workflow_template_model.py @@ -109,7 +109,6 @@ class WorkflowTemplateModel( None, description='The **version** for the Workflow_Template.', methods=['POST', 'PUT'], - minimum=1, read_only=False, title='version', ) diff --git a/tcex/app/config b/tcex/app/config index fa3f5da8b..8229f3cc2 160000 --- a/tcex/app/config +++ b/tcex/app/config @@ -1 +1 @@ -Subproject commit fa3f5da8bf7197144594fe9bc0390398f6cabedf +Subproject commit 8229f3cc21ce6d50979c8dc8b961c7c2ee857585 diff --git a/tcex/app/playbook b/tcex/app/playbook index fcdb73526..04ba65c86 160000 --- a/tcex/app/playbook +++ b/tcex/app/playbook @@ -1 +1 @@ -Subproject commit fcdb73526e94613a36bace9f1e958fbefa7ea381 +Subproject commit 04ba65c8609024b05dea7ee1b41924fefbe65584 diff --git a/tcex/app/service/common_service.py b/tcex/app/service/common_service.py index fb5d72ce8..1cb5389fe 100644 --- a/tcex/app/service/common_service.py +++ b/tcex/app/service/common_service.py @@ -224,7 +224,7 @@ def on_message_handler(self, client, userdata, message): # pylint: disable=unus try: # messages on server topic must be json objects m = json.loads(message.payload) - if m.get('triggerId') is not None: + if m.get('triggerId') not in [None, '']: m['triggerId'] = int(m['triggerId']) except ValueError: self.log.warning( diff --git a/tcex/input/field_type/binary.py b/tcex/input/field_type/binary.py index a79cb6a39..75584cd8f 100644 --- a/tcex/input/field_type/binary.py +++ b/tcex/input/field_type/binary.py @@ -63,7 +63,7 @@ def validate_type(cls, value: bytes, field: ModelField) -> bytes: """Raise exception if value is not a Binary type.""" if not isinstance(value, bytes): raise InvalidType( - field_name=field.name, expected_types='(bytes)', provided_type=type(value) + field_name=field.name, expected_types='(bytes)', provided_type=str(type(value)) ) return value diff --git a/tcex/input/field_type/key_value.py b/tcex/input/field_type/key_value.py index f9baf01d5..74af4b792 100644 --- a/tcex/input/field_type/key_value.py +++ b/tcex/input/field_type/key_value.py @@ -9,6 +9,7 @@ # first-party from tcex.input.field_type.binary import Binary from tcex.input.field_type.exception import InvalidEmptyValue +from tcex.input.field_type.sensitive import Sensitive from tcex.input.field_type.string import String from tcex.input.field_type.tc_entity import TCEntity @@ -30,6 +31,7 @@ class KeyValue(BaseModel): | String | list[Binary] | Binary + | Sensitive ) @validator('key') diff --git a/tcex/input/field_type/sensitive.py b/tcex/input/field_type/sensitive.py index f15ebfdbd..4ca1a5c22 100644 --- a/tcex/input/field_type/sensitive.py +++ b/tcex/input/field_type/sensitive.py @@ -120,7 +120,7 @@ def validate_type(cls, value: bytes | str | Self, field: ModelField) -> bytes | """Raise exception if value is not a String type.""" if not isinstance(value, bytes | str | Sensitive): raise InvalidType( - field_name=field.name, expected_types='(bytes, str)', provided_type=type(value) + field_name=field.name, expected_types='(bytes, str)', provided_type=str(type(value)) ) return value diff --git a/tcex/util b/tcex/util index 4c017aea3..18d22b8c6 160000 --- a/tcex/util +++ b/tcex/util @@ -1 +1 @@ -Subproject commit 4c017aea3f8a429364e408d98ddf4ceed1d00069 +Subproject commit 18d22b8c695bc87cb8b21b2872b92e6cabee0f03 diff --git a/tests/api/tc/v3/intel_requirements/__init__.py b/tests/api/tc/v3/intel_requirements/__init__.py new file mode 100644 index 000000000..558af896b --- /dev/null +++ b/tests/api/tc/v3/intel_requirements/__init__.py @@ -0,0 +1 @@ +"""TcEx Framework Module""" diff --git a/tests/api/tc/v3/intel_requirements/categories/__init__.py b/tests/api/tc/v3/intel_requirements/categories/__init__.py new file mode 100644 index 000000000..558af896b --- /dev/null +++ b/tests/api/tc/v3/intel_requirements/categories/__init__.py @@ -0,0 +1 @@ +"""TcEx Framework Module""" diff --git a/tests/api/tc/v3/intel_requirements/categories/test_categories_interface.py b/tests/api/tc/v3/intel_requirements/categories/test_categories_interface.py new file mode 100644 index 000000000..9cc96629b --- /dev/null +++ b/tests/api/tc/v3/intel_requirements/categories/test_categories_interface.py @@ -0,0 +1,31 @@ +"""TcEx Framework Module""" + +# third-party +import pytest + +# first-party +from tests.api.tc.v3.v3_helpers import TestV3, V3Helper + + +class TestCategories(TestV3): + """Test TcEx API Interface.""" + + v3_helper = V3Helper('categories') + + def test_categories_api_options(self): + """Test filter keywords.""" + super().obj_api_options() + + def test_categories_filter_keywords(self): + """Test filter keywords.""" + super().obj_filter_keywords() + + @pytest.mark.xfail(reason='Verify TC Version running against.') + def test_categories_object_properties(self): + """Test properties.""" + super().obj_properties() + + @pytest.mark.xfail(reason='Verify TC Version running against.') + def test_categories_object_properties_extra(self): + """Test properties.""" + super().obj_properties_extra() diff --git a/tests/api/tc/v3/intel_requirements/results/__init__.py b/tests/api/tc/v3/intel_requirements/results/__init__.py new file mode 100644 index 000000000..558af896b --- /dev/null +++ b/tests/api/tc/v3/intel_requirements/results/__init__.py @@ -0,0 +1 @@ +"""TcEx Framework Module""" diff --git a/tests/api/tc/v3/intel_requirements/results/test_results_interface.py b/tests/api/tc/v3/intel_requirements/results/test_results_interface.py new file mode 100644 index 000000000..6418d9d36 --- /dev/null +++ b/tests/api/tc/v3/intel_requirements/results/test_results_interface.py @@ -0,0 +1,31 @@ +"""TcEx Framework Module""" + +# third-party +import pytest + +# first-party +from tests.api.tc.v3.v3_helpers import TestV3, V3Helper + + +class TestResults(TestV3): + """Test TcEx API Interface.""" + + v3_helper = V3Helper('results') + + def test_results_api_options(self): + """Test filter keywords.""" + super().obj_api_options() + + def test_results_filter_keywords(self): + """Test filter keywords.""" + super().obj_filter_keywords() + + @pytest.mark.xfail(reason='Verify TC Version running against.') + def test_results_object_properties(self): + """Test properties.""" + super().obj_properties() + + @pytest.mark.xfail(reason='Verify TC Version running against.') + def test_results_object_properties_extra(self): + """Test properties.""" + super().obj_properties_extra() diff --git a/tests/api/tc/v3/intel_requirements/subtypes/__init__.py b/tests/api/tc/v3/intel_requirements/subtypes/__init__.py new file mode 100644 index 000000000..558af896b --- /dev/null +++ b/tests/api/tc/v3/intel_requirements/subtypes/__init__.py @@ -0,0 +1 @@ +"""TcEx Framework Module""" diff --git a/tests/api/tc/v3/intel_requirements/subtypes/test_subtypes_interface.py b/tests/api/tc/v3/intel_requirements/subtypes/test_subtypes_interface.py new file mode 100644 index 000000000..665d96650 --- /dev/null +++ b/tests/api/tc/v3/intel_requirements/subtypes/test_subtypes_interface.py @@ -0,0 +1,31 @@ +"""TcEx Framework Module""" + +# third-party +import pytest + +# first-party +from tests.api.tc.v3.v3_helpers import TestV3, V3Helper + + +class TestSubtypes(TestV3): + """Test TcEx API Interface.""" + + v3_helper = V3Helper('subtypes') + + def test_subtypes_api_options(self): + """Test filter keywords.""" + super().obj_api_options() + + def test_subtypes_filter_keywords(self): + """Test filter keywords.""" + super().obj_filter_keywords() + + @pytest.mark.xfail(reason='Verify TC Version running against.') + def test_subtypes_object_properties(self): + """Test properties.""" + super().obj_properties() + + @pytest.mark.xfail(reason='Verify TC Version running against.') + def test_subtypes_object_properties_extra(self): + """Test properties.""" + super().obj_properties_extra() diff --git a/tests/api/tc/v3/intel_requirements/test_intel_requirements_interface.py b/tests/api/tc/v3/intel_requirements/test_intel_requirements_interface.py new file mode 100644 index 000000000..4af6391a8 --- /dev/null +++ b/tests/api/tc/v3/intel_requirements/test_intel_requirements_interface.py @@ -0,0 +1,31 @@ +"""TcEx Framework Module""" + +# third-party +import pytest + +# first-party +from tests.api.tc.v3.v3_helpers import TestV3, V3Helper + + +class TestIntelRequirements(TestV3): + """Test TcEx API Interface.""" + + v3_helper = V3Helper('intel_requirements') + + def test_intel_requirements_api_options(self): + """Test filter keywords.""" + super().obj_api_options() + + def test_intel_requirements_filter_keywords(self): + """Test filter keywords.""" + super().obj_filter_keywords() + + @pytest.mark.xfail(reason='Verify TC Version running against.') + def test_intel_requirements_object_properties(self): + """Test properties.""" + super().obj_properties() + + @pytest.mark.xfail(reason='Verify TC Version running against.') + def test_intel_requirements_object_properties_extra(self): + """Test properties.""" + super().obj_properties_extra() diff --git a/tests/api/tc/v3/v3_helpers.py b/tests/api/tc/v3/v3_helpers.py index 60824c445..d6d950596 100644 --- a/tests/api/tc/v3/v3_helpers.py +++ b/tests/api/tc/v3/v3_helpers.py @@ -160,6 +160,11 @@ def _module_map(module: str) -> dict: 'class_name': 'Case', 'collection_class_name': 'Cases', }, + 'categories': { + 'module': 'tcex.api.tc.v3.intel_requirements.categories.category', + 'class_name': 'Category', + 'collection_class_name': 'Categories', + }, 'group_attributes': { 'module': 'tcex.api.tc.v3.group_attributes.group_attribute', 'class_name': 'GroupAttribute', @@ -180,6 +185,11 @@ def _module_map(module: str) -> dict: 'class_name': 'IndicatorAttribute', 'collection_class_name': 'IndicatorAttributes', }, + 'intel_requirements': { + 'module': 'tcex.api.tc.v3.intel_requirements.intel_requirement', + 'class_name': 'IntelRequirement', + 'collection_class_name': 'IntelRequirements', + }, 'notes': { 'module': 'tcex.api.tc.v3.notes.note', 'class_name': 'Note', @@ -195,6 +205,11 @@ def _module_map(module: str) -> dict: 'class_name': 'Owner', 'collection_class_name': 'Owners', }, + 'results': { + 'module': 'tcex.api.tc.v3.intel_requirements.results.result', + 'class_name': 'Result', + 'collection_class_name': 'Results', + }, 'security_labels': { 'module': 'tcex.api.tc.v3.security_labels.security_label', 'class_name': 'SecurityLabel', @@ -205,6 +220,11 @@ def _module_map(module: str) -> dict: 'class_name': 'SystemRole', 'collection_class_name': 'SystemRoles', }, + 'subtypes': { + 'module': 'tcex.api.tc.v3.intel_requirements.subtypes.subtype', + 'class_name': 'Subtype', + 'collection_class_name': 'Subtypes', + }, 'tags': { 'module': 'tcex.api.tc.v3.tags.tag', 'class_name': 'Tag', @@ -716,7 +736,10 @@ def obj_api_options(self): 'analyticsType', ] - if self.v3_helper.v3_object in ['cases', 'groups'] and 'userDetails' in names: + if ( + self.v3_helper.v3_object in ['cases', 'groups', 'intel_requirements'] + and 'userDetails' in names + ): # fix discrepancy between /fields and names = ['createdBy'] @@ -737,6 +760,13 @@ def obj_api_options(self): 'threatAssessScoreObserved', ] + if self.v3_helper.v3_object == 'results': + if 'intelRequirementDetails' in names: + # fix discrepancy between /fields and + names = ['intelRequirement'] + if 'intelRequirementId' in names: + names = ['intelReqId'] + for name in names: if name not in self.v3_helper.v3_obj.properties: assert False, f'''{name} not in {self.v3_helper.v3_obj.properties.keys()}''' diff --git a/tests/app/config/test_install_json_model.py b/tests/app/config/test_install_json_model.py index 58b621c4c..a693a563a 100644 --- a/tests/app/config/test_install_json_model.py +++ b/tests/app/config/test_install_json_model.py @@ -354,7 +354,7 @@ def test_update(self): for app_type in ['tc', 'tcpb', 'tcva']: ij = self.ij_bad(app_type=app_type) try: - ij.update.multiple(migrate=True) + ij.update.multiple() assert True except Exception as ex: assert False, f'Failed to update install.json file ({ex}).'