diff --git a/release_notes.md b/release_notes.md index 538d6544d..7bd6536d3 100644 --- a/release_notes.md +++ b/release_notes.md @@ -1,5 +1,9 @@ # Release Notes +### 3.0.7 + +- APP-3859 - [API] Enhancements for ThreatConnect 7.x + ### 3.0.6 - APP-3857 - [CLI] Updated spectool for general improvements and fixes diff --git a/tcex/__metadata__.py b/tcex/__metadata__.py index f14c55f1f..c8f922053 100644 --- a/tcex/__metadata__.py +++ b/tcex/__metadata__.py @@ -5,5 +5,5 @@ __license__ = 'Apache License, Version 2' __package_name__ = 'tcex' __url__ = 'https://github.com/ThreatConnect-Inc/tcex' -__version__ = '3.0.6' +__version__ = '3.0.7' __download_url__ = f'https://github.com/ThreatConnect-Inc/tcex/tarball/{__version__}' diff --git a/tcex/api/tc/ti_transform/model/transform_model.py b/tcex/api/tc/ti_transform/model/transform_model.py index 5404c4418..90f63a4b0 100644 --- a/tcex/api/tc/ti_transform/model/transform_model.py +++ b/tcex/api/tc/ti_transform/model/transform_model.py @@ -141,7 +141,7 @@ class GroupTransformModel(TiTransformModel, extra=Extra.forbid): name: MetadataTransformModel = Field(..., description='') # campaign - first_seen: Optional[MetadataTransformModel] = Field(None, description='') + first_seen: Optional[DatetimeTransformModel] = Field(None, description='') # document malware: Optional[MetadataTransformModel] = Field(None, description='') password: Optional[MetadataTransformModel] = Field(None, description='') @@ -150,7 +150,7 @@ class GroupTransformModel(TiTransformModel, extra=Extra.forbid): score: Optional[MetadataTransformModel] = Field(None, description='') to_addr: Optional[MetadataTransformModel] = Field(None, description='') # event, incident - event_date: Optional[MetadataTransformModel] = Field(None, description='') + event_date: Optional[DatetimeTransformModel] = Field(None, description='') status: Optional[MetadataTransformModel] = Field(None, description='') # report publish_date: Optional[DatetimeTransformModel] = Field(None, description='') diff --git a/tcex/api/tc/utils/threat_intel_utils.py b/tcex/api/tc/utils/threat_intel_utils.py index 5a9750568..e33269bfb 100644 --- a/tcex/api/tc/utils/threat_intel_utils.py +++ b/tcex/api/tc/utils/threat_intel_utils.py @@ -205,19 +205,29 @@ def resolve_variables(self, inputs: List[str]) -> List[str]: """ resolved_inputs = [] for input_ in inputs: + # handle null inputs if not input_: resolved_inputs.append(None) continue - if input_.strip() not in self.resolvable_variables: + + # clean up input + input_ = input_.strip() + + # handle unknown input types + if input_ not in self.resolvable_variables: resolved_inputs.append(input_) continue - input_ = input_.strip() + + # special handling of group types (no API request required) if input_ == '${GROUP_TYPES}': for type_ in self.group_types: resolved_inputs.append(type_) continue + # get variable settings resolvable_variable_details = self.resolvable_variables[input_] + + # make API call to retrieve variable data r = self.session_tc.get( resolvable_variable_details.get('url'), params={'resultLimit': 10_000} ) @@ -226,16 +236,6 @@ def resolve_variables(self, inputs: List[str]) -> List[str]: raise RuntimeError(f'Could not retrieve {input_} from ThreatConnect API.') json_ = r.json() - # No TQL filter to filter out API users during REST call so have to do it manually here. - if input_ in ['${API_USERS}', '${USERS}']: - temp_data = [] - for item in json_.get('data', []): - if item.get('role') == 'Api User' and input_ == '${API_USERS}': - temp_data.append(item) - elif item.get('role') != 'Api User' and input_ == '${USERS}': - temp_data.append(item) - json_['data'] = temp_data - for item in jmespath.search(resolvable_variable_details.get('jmspath'), json_): resolved_inputs.append(str(item)) diff --git a/tcex/api/tc/v3/_gen/_gen.py b/tcex/api/tc/v3/_gen/_gen.py index 218e99aae..83394bfc9 100644 --- a/tcex/api/tc/v3/_gen/_gen.py +++ b/tcex/api/tc/v3/_gen/_gen.py @@ -23,6 +23,12 @@ utils = Utils() +def log_server(): + """Log server.""" + _api_server = os.getenv('TC_API_PATH') + typer.secho(f'Using server {_api_server}', fg=typer.colors.BRIGHT_MAGENTA) + + class GenerateArgs(GenerateArgsABC): """Generate Models for TC API Types""" @@ -263,6 +269,7 @@ def all( # pylint: disable=redefined-builtin ), ): """Generate Args.""" + log_server() gen_type = gen_type.value.lower() for type_ in ObjectTypes: type_ = utils.snake_string(type_.value) @@ -285,6 +292,7 @@ def args( ), ): """Generate Args.""" + log_server() gen_args(utils.snake_string(type_.value), indent_blocks) @@ -295,6 +303,7 @@ def code( ), ): """Generate Args.""" + log_server() tv = utils.snake_string(type_.value) gen_filter(tv) gen_model(tv) @@ -308,6 +317,7 @@ def filter( # pylint: disable=redefined-builtin ), ): """Generate the filter code.""" + log_server() gen_filter(utils.snake_string(type_.value)) @@ -318,6 +328,7 @@ def model( ), ): """Generate the model""" + log_server() gen_model(utils.snake_string(type_.value)) @@ -328,6 +339,7 @@ def object( # pylint: disable=redefined-builtin ), ): """Generate the filter class""" + log_server() gen_object(utils.snake_string(type_.value)) diff --git a/tcex/api/tc/v3/_gen/_gen_abc.py b/tcex/api/tc/v3/_gen/_gen_abc.py index 1fdbb74fd..061c8b6f4 100644 --- a/tcex/api/tc/v3/_gen/_gen_abc.py +++ b/tcex/api/tc/v3/_gen/_gen_abc.py @@ -172,6 +172,9 @@ def _prop_contents_updated(self) -> dict: # remove unused fields, if any self._prop_content_remove_unused(_properties) + # update "bad" data + self._prop_content_update(_properties) + # critical fix for breaking API change if self.type_ in [ 'case_attributes', @@ -315,6 +318,10 @@ def _prop_content_fix_types(self, _properties: dict): def _prop_content_remove_unused(self, properties: dict): """Remove unused fields from properties.""" + if self.type_ in ['attribute_types']: + if 'owner' in properties: + del properties['owner'] + if self.type_ in [ 'attribute_types', 'case_attributes', @@ -334,6 +341,53 @@ def _prop_content_remove_unused(self, properties: dict): if field in properties: del properties[field] + if self.type_ in ['users']: + unused_fields = [ + 'customTqlTimeout', + 'disabled', + 'firstName', + 'jobFunction', + 'jobRole', + 'lastLogin', + 'lastName', + 'lastPasswordChange', + 'locked', + 'logoutIntervalMinutes', + 'owner', + 'ownerRoles', + 'password', + 'passwordResetRequired', + 'pseudonym', + 'systemRole', + 'termsAccepted', + 'termsAcceptedDate', + 'tqlTimeout', + 'twoFactorResetRequired', + 'uiTheme', + ] + for field in unused_fields: + if field in properties: + del properties[field] + + if self.type_ in ['victims']: + unused_fields = [ + 'ownerId', # Core Issue: should not be listed for this type + ] + for field in unused_fields: + if field in properties: + del properties[field] + + def _prop_content_update(self, properties: dict): + """Update "bad" data in properties.""" + if self.type_ in ['groups']: + # fixed fields that are missing readOnly property + properties['downVoteCount']['readOnly'] = True + properties['upVoteCount']['readOnly'] = True + + if self.type_ in ['victims']: + # ownerName is readOnly, but readOnly is not defined in response from OPTIONS endpoint + properties['ownerName']['readOnly'] = True + @property def _prop_models(self) -> List[PropertyModel]: """Return a list of PropertyModel objects.""" diff --git a/tcex/api/tc/v3/_gen/_gen_object_abc.py b/tcex/api/tc/v3/_gen/_gen_object_abc.py index c27a9d15a..3373aa7e9 100644 --- a/tcex/api/tc/v3/_gen/_gen_object_abc.py +++ b/tcex/api/tc/v3/_gen/_gen_object_abc.py @@ -191,8 +191,7 @@ def filter(self) -> 'ArtifactFilter': f'''{self.i2}self._request(''', f'''{self.i3}method='GET',''', f'''{self.i3}url=f\'\'\'{{self.url('GET')}}/download\'\'\',''', - f'''{self.i3}# headers={{'content-type': 'application/octet-stream'}},''', - f'''{self.i3}headers=None,''', + f'''{self.i3}headers={{'Accept': 'application/octet-stream'}},''', f'''{self.i3}params=params,''', f'''{self.i2})''', f'''{self.i2}return self.request.content''', @@ -203,7 +202,7 @@ def filter(self) -> 'ArtifactFilter': f'''{self.i3}method='GET',''', f'''{self.i3}body=None,''', f'''{self.i3}url=f\'\'\'{{self.url('GET')}}/pdf\'\'\',''', - f'''{self.i3}headers=None,''', + f'''{self.i3}headers={{'Accept': 'application/octet-stream'}},''', f'''{self.i3}params=params,''', f'''{self.i2})''', '', diff --git a/tcex/api/tc/v3/attribute_types/attribute_type.py b/tcex/api/tc/v3/attribute_types/attribute_type.py index 7bc218192..08f813e67 100644 --- a/tcex/api/tc/v3/attribute_types/attribute_type.py +++ b/tcex/api/tc/v3/attribute_types/attribute_type.py @@ -53,7 +53,17 @@ def filter(self) -> 'AttributeTypeFilter': class AttributeType(ObjectABC): - """AttributeTypes Object.""" + """AttributeTypes Object. + + Args: + allow_markdown (bool, kwargs): Flag that enables markdown feature in the attribute value + field. + description (str, kwargs): The description of the attribute type. + error_message (str, kwargs): The error message displayed. + max_size (int, kwargs): The maximum size of the attribute value. + name (str, kwargs): The name of the attribute type. + validation_rule (object, kwargs): The validation rule that governs the attribute value. + """ def __init__(self, **kwargs): """Initialize class properties.""" diff --git a/tcex/api/tc/v3/attribute_types/attribute_type_model.py b/tcex/api/tc/v3/attribute_types/attribute_type_model.py index a43d2636d..3ff0bc032 100644 --- a/tcex/api/tc/v3/attribute_types/attribute_type_model.py +++ b/tcex/api/tc/v3/attribute_types/attribute_type_model.py @@ -65,12 +65,54 @@ class AttributeTypeModel( _shared_type = PrivateAttr(False) _staged = PrivateAttr(False) + allow_markdown: bool = Field( + None, + description='Flag that enables markdown feature in the attribute value field.', + methods=['POST', 'PUT'], + read_only=False, + title='allowMarkdown', + ) + description: Optional[str] = Field( + None, + description='The description of the attribute type.', + methods=['POST', 'PUT'], + read_only=False, + title='description', + ) + error_message: Optional[str] = Field( + None, + description='The error message displayed.', + methods=['POST', 'PUT'], + read_only=False, + title='errorMessage', + ) id: Optional[int] = Field( None, description='The ID of the item.', read_only=True, title='id', ) + max_size: Optional[int] = Field( + None, + description='The maximum size of the attribute value.', + methods=['POST', 'PUT'], + read_only=False, + title='maxSize', + ) + name: Optional[str] = Field( + None, + description='The name of the attribute type.', + methods=['POST', 'PUT'], + read_only=False, + title='name', + ) + validation_rule: Optional[dict] = Field( + None, + description='The validation rule that governs the attribute value.', + methods=['POST', 'PUT'], + read_only=False, + title='validationRule', + ) # add forward references diff --git a/tcex/api/tc/v3/case_attributes/case_attribute.py b/tcex/api/tc/v3/case_attributes/case_attribute.py index 9f470aea9..8ff7ec123 100644 --- a/tcex/api/tc/v3/case_attributes/case_attribute.py +++ b/tcex/api/tc/v3/case_attributes/case_attribute.py @@ -1,6 +1,6 @@ """CaseAttribute / CaseAttributes Object""" # standard library -from typing import Union +from typing import TYPE_CHECKING, Iterator, Union # first-party from tcex.api.tc.v3.api_endpoints import ApiEndpoints @@ -11,6 +11,11 @@ ) from tcex.api.tc.v3.object_abc import ObjectABC from tcex.api.tc.v3.object_collection_abc import ObjectCollectionABC +from tcex.api.tc.v3.security_labels.security_label_model import SecurityLabelModel + +if TYPE_CHECKING: # pragma: no cover + # first-party + from tcex.api.tc.v3.security_labels.security_label import SecurityLabel class CaseAttributes(ObjectCollectionABC): @@ -59,9 +64,13 @@ class CaseAttribute(ObjectABC): case_id (int, kwargs): Case associated with attribute. default (bool, kwargs): A flag indicating that this is the default attribute of its type within the object. Only applies to certain attribute and data types. + pinned (bool, kwargs): A flag indicating that the attribute has been noted for importance. + security_labels (SecurityLabels, kwargs): A list of Security Labels corresponding to the + Intel item (NOTE: Setting this parameter will replace any existing tag(s) with + the one(s) specified). source (str, kwargs): The attribute source. type (str, kwargs): The attribute type. - value (str, kwargs): Attribute value. + value (str, kwargs): The attribute value. """ def __init__(self, **kwargs): @@ -95,3 +104,23 @@ def model(self, data: Union['CaseAttributeModel', dict]): self._model = type(self.model)(**data) else: raise RuntimeError(f'Invalid data type: {type(data)} provided.') + + @property + def security_labels(self) -> Iterator['SecurityLabel']: + """Yield SecurityLabel from SecurityLabels.""" + # first-party + from tcex.api.tc.v3.security_labels.security_label import SecurityLabels + + yield from self._iterate_over_sublist(SecurityLabels) + + def stage_security_label(self, data: Union[dict, 'ObjectABC', 'SecurityLabelModel']): + """Stage security_label on the object.""" + if isinstance(data, ObjectABC): + data = data.model + elif isinstance(data, dict): + data = SecurityLabelModel(**data) + + if not isinstance(data, SecurityLabelModel): + raise RuntimeError('Invalid type passed in to stage_security_label') + data._staged = True + self.model.security_labels.data.append(data) diff --git a/tcex/api/tc/v3/case_attributes/case_attribute_filter.py b/tcex/api/tc/v3/case_attributes/case_attribute_filter.py index 18f827c4a..ef124c6d0 100644 --- a/tcex/api/tc/v3/case_attributes/case_attribute_filter.py +++ b/tcex/api/tc/v3/case_attributes/case_attribute_filter.py @@ -121,6 +121,15 @@ def owner_name(self, operator: Enum, owner_name: str): """ self._tql.add_filter('ownerName', operator, owner_name, TqlType.STRING) + def pinned(self, operator: Enum, pinned: bool): + """Filter Pinned based on **pinned** keyword. + + Args: + operator: The operator enum for the filter. + pinned: Whether or not the attribute is pinned with importance. + """ + self._tql.add_filter('pinned', operator, pinned, TqlType.BOOLEAN) + def source(self, operator: Enum, source: str): """Filter Source based on **source** keyword. diff --git a/tcex/api/tc/v3/case_attributes/case_attribute_model.py b/tcex/api/tc/v3/case_attributes/case_attribute_model.py index 5e9df958a..6dbef9ddc 100644 --- a/tcex/api/tc/v3/case_attributes/case_attribute_model.py +++ b/tcex/api/tc/v3/case_attributes/case_attribute_model.py @@ -83,7 +83,7 @@ class CaseAttributeModel( date_added: Optional[datetime] = Field( None, allow_mutation=False, - description='The date and time that the Attribute was first created.', + description='The date and time that the item was first created.', read_only=True, title='dateAdded', ) @@ -110,6 +110,23 @@ class CaseAttributeModel( read_only=True, title='lastModified', ) + pinned: bool = Field( + None, + description='A flag indicating that the attribute has been noted for importance.', + methods=['POST', 'PUT'], + read_only=False, + title='pinned', + ) + security_labels: Optional['SecurityLabelsModel'] = Field( + None, + description=( + 'A list of Security Labels corresponding to the Intel item (NOTE: Setting this ' + 'parameter will replace any existing tag(s) with the one(s) specified).' + ), + methods=['POST', 'PUT'], + read_only=False, + title='securityLabels', + ) source: Optional[str] = Field( None, description='The attribute source.', @@ -126,13 +143,19 @@ class CaseAttributeModel( ) value: Optional[str] = Field( None, - description='Attribute value.', + description='The attribute value.', methods=['POST', 'PUT'], min_length=1, read_only=False, title='value', ) + @validator('security_labels', always=True) + def _validate_security_labels(cls, v): + if not v: + return SecurityLabelsModel() + return v + @validator('created_by', always=True) def _validate_user(cls, v): if not v: @@ -142,6 +165,7 @@ def _validate_user(cls, v): # first-party from tcex.api.tc.v3.security.users.user_model import UserModel +from tcex.api.tc.v3.security_labels.security_label_model import SecurityLabelsModel # add forward references CaseAttributeDataModel.update_forward_refs() diff --git a/tcex/api/tc/v3/cases/case_filter.py b/tcex/api/tc/v3/cases/case_filter.py index 21db6d8b1..51e6f5289 100644 --- a/tcex/api/tc/v3/cases/case_filter.py +++ b/tcex/api/tc/v3/cases/case_filter.py @@ -56,6 +56,16 @@ def cal_score(self, operator: Enum, cal_score: int): """ self._tql.add_filter('calScore', operator, cal_score, TqlType.INTEGER) + def case_close_date(self, operator: Enum, case_close_date: str): + """Filter Cases Closed based on **caseCloseDate** keyword. + + Args: + operator: The operator enum for the filter. + case_close_date: The date/time the case was closed. + """ + case_close_date = self.utils.any_to_datetime(case_close_date).strftime('%Y-%m-%d %H:%M:%S') + self._tql.add_filter('caseCloseDate', operator, case_close_date, TqlType.STRING) + def case_close_time(self, operator: Enum, case_close_time: str): """Filter Case Close Time based on **caseCloseTime** keyword. @@ -117,6 +127,16 @@ def case_occurrence_user(self, operator: Enum, case_occurrence_user: str): """ self._tql.add_filter('caseOccurrenceUser', operator, case_occurrence_user, TqlType.STRING) + def case_open_date(self, operator: Enum, case_open_date: str): + """Filter Cases Created based on **caseOpenDate** keyword. + + Args: + operator: The operator enum for the filter. + case_open_date: The date/time the case was opened. + """ + case_open_date = self.utils.any_to_datetime(case_open_date).strftime('%Y-%m-%d %H:%M:%S') + self._tql.add_filter('caseOpenDate', operator, case_open_date, TqlType.STRING) + def case_open_time(self, operator: Enum, case_open_time: str): """Filter Case Open Time based on **caseOpenTime** keyword. diff --git a/tcex/api/tc/v3/group_attributes/group_attribute.py b/tcex/api/tc/v3/group_attributes/group_attribute.py index e87cebec4..2783144ec 100644 --- a/tcex/api/tc/v3/group_attributes/group_attribute.py +++ b/tcex/api/tc/v3/group_attributes/group_attribute.py @@ -64,12 +64,13 @@ class GroupAttribute(ObjectABC): default (bool, kwargs): A flag indicating that this is the default attribute of its type within the object. Only applies to certain attribute and data types. group_id (int, kwargs): Group associated with attribute. + pinned (bool, kwargs): A flag indicating that the attribute has been noted for importance. security_labels (SecurityLabels, kwargs): A list of Security Labels corresponding to the Intel item (NOTE: Setting this parameter will replace any existing tag(s) with the one(s) specified). source (str, kwargs): The attribute source. type (str, kwargs): The attribute type. - value (str, kwargs): Attribute value. + value (str, kwargs): The attribute value. """ def __init__(self, **kwargs): diff --git a/tcex/api/tc/v3/group_attributes/group_attribute_filter.py b/tcex/api/tc/v3/group_attributes/group_attribute_filter.py index 48bed2e8b..14b20845a 100644 --- a/tcex/api/tc/v3/group_attributes/group_attribute_filter.py +++ b/tcex/api/tc/v3/group_attributes/group_attribute_filter.py @@ -18,6 +18,15 @@ def _api_endpoint(self) -> str: """Return the API endpoint.""" return ApiEndpoints.GROUP_ATTRIBUTES.value + def associable(self, operator: Enum, associable: bool): + """Filter Associable based on **associable** keyword. + + Args: + operator: The operator enum for the filter. + associable: A flag to include an attribute in group associations. + """ + self._tql.add_filter('associable', operator, associable, TqlType.BOOLEAN) + def date_added(self, operator: Enum, date_added: str): """Filter Date Added based on **dateAdded** keyword. @@ -131,6 +140,15 @@ def owner_name(self, operator: Enum, owner_name: str): """ self._tql.add_filter('ownerName', operator, owner_name, TqlType.STRING) + def pinned(self, operator: Enum, pinned: bool): + """Filter Pinned based on **pinned** keyword. + + Args: + operator: The operator enum for the filter. + pinned: Whether or not the attribute is pinned with importance. + """ + self._tql.add_filter('pinned', operator, pinned, TqlType.BOOLEAN) + def source(self, operator: Enum, source: str): """Filter Source based on **source** keyword. diff --git a/tcex/api/tc/v3/group_attributes/group_attribute_model.py b/tcex/api/tc/v3/group_attributes/group_attribute_model.py index 2bd32a228..c5046c778 100644 --- a/tcex/api/tc/v3/group_attributes/group_attribute_model.py +++ b/tcex/api/tc/v3/group_attributes/group_attribute_model.py @@ -76,7 +76,7 @@ class GroupAttributeModel( date_added: Optional[datetime] = Field( None, allow_mutation=False, - description='The date and time that the Attribute was first created.', + description='The date and time that the item was first created.', read_only=True, title='dateAdded', ) @@ -110,6 +110,13 @@ class GroupAttributeModel( read_only=True, title='lastModified', ) + pinned: bool = Field( + None, + description='A flag indicating that the attribute has been noted for importance.', + methods=['POST', 'PUT'], + read_only=False, + title='pinned', + ) security_labels: Optional['SecurityLabelsModel'] = Field( None, description=( @@ -136,7 +143,7 @@ class GroupAttributeModel( ) value: Optional[str] = Field( None, - description='Attribute value.', + description='The attribute value.', methods=['POST', 'PUT'], min_length=1, read_only=False, diff --git a/tcex/api/tc/v3/groups/group.py b/tcex/api/tc/v3/groups/group.py index bd93315e2..45941fd3f 100644 --- a/tcex/api/tc/v3/groups/group.py +++ b/tcex/api/tc/v3/groups/group.py @@ -95,6 +95,10 @@ class Group(ObjectABC): header (str, kwargs): The email Header field. malware (bool, kwargs): Is the document malware? name (str, kwargs): The name of the group. + owner_id (int, kwargs): The id of the Organization, Community, or Source that the item + belongs to. + owner_name (str, kwargs): The name of the Organization, Community, or Source that the item + belongs to. password (str, kwargs): The password associated with the document (Required if Malware is true). publish_date (str, kwargs): The date and time that the report was first created. @@ -108,6 +112,8 @@ class Group(ObjectABC): tags (Tags, kwargs): A list of Tags corresponding to the item (NOTE: Setting this parameter will replace any existing tag(s) with the one(s) specified). type (str, kwargs): The **type** for the Group. + up_vote (bool, kwargs): Is the intelligence valid and useful? (0 means downvote, 1 means + upvote, and NULL means no vote). xid (str, kwargs): The xid of the item. """ @@ -190,8 +196,7 @@ def download(self, params: Optional[dict] = None) -> bytes: self._request( method='GET', url=f'''{self.url('GET')}/download''', - # headers={'content-type': 'application/octet-stream'}, - headers=None, + headers={'Accept': 'application/octet-stream'}, params=params, ) return self.request.content @@ -202,7 +207,7 @@ def pdf(self, params: Optional[dict] = None) -> bytes: method='GET', body=None, url=f'''{self.url('GET')}/pdf''', - headers=None, + headers={'Accept': 'application/octet-stream'}, params=params, ) diff --git a/tcex/api/tc/v3/groups/group_filter.py b/tcex/api/tc/v3/groups/group_filter.py index f589b6853..12b0a7ccd 100644 --- a/tcex/api/tc/v3/groups/group_filter.py +++ b/tcex/api/tc/v3/groups/group_filter.py @@ -180,6 +180,15 @@ def event_date(self, operator: Enum, event_date: str): event_date = self.utils.any_to_datetime(event_date).strftime('%Y-%m-%d %H:%M:%S') self._tql.add_filter('eventDate', operator, event_date, TqlType.STRING) + def generated_report(self, operator: Enum, generated_report: bool): + """Filter Generated (Report) based on **generatedReport** keyword. + + Args: + operator: The operator enum for the filter. + generated_report: Boolean flag indicating if the Report was auto-generated. + """ + self._tql.add_filter('generatedReport', operator, generated_report, TqlType.BOOLEAN) + @property def has_artifact(self): """Return **ArtifactFilter** for further filtering.""" diff --git a/tcex/api/tc/v3/groups/group_model.py b/tcex/api/tc/v3/groups/group_model.py index 12b7686bf..46b9bae73 100644 --- a/tcex/api/tc/v3/groups/group_model.py +++ b/tcex/api/tc/v3/groups/group_model.py @@ -157,6 +157,13 @@ class GroupModel( read_only=True, title='documentType', ) + down_vote_count: Optional[int] = Field( + None, + allow_mutation=False, + description='The total number of users who find the intel not helpful.', + read_only=True, + title='downVoteCount', + ) due_date: Optional[datetime] = Field( None, applies_to=['Task'], @@ -251,6 +258,13 @@ class GroupModel( read_only=False, title='from', ) + generated_report: bool = Field( + None, + allow_mutation=False, + description='Is the report auto-generated?', + read_only=True, + title='generatedReport', + ) header: Optional[str] = Field( None, applies_to=['Email'], @@ -274,6 +288,13 @@ class GroupModel( read_only=True, title='lastModified', ) + legacy_link: Optional[str] = Field( + None, + allow_mutation=False, + description='A link to the legacy ThreatConnect details page for this entity.', + read_only=True, + title='legacyLink', + ) malware: bool = Field( None, applies_to=['Document'], @@ -299,11 +320,18 @@ class GroupModel( read_only=True, title='overdue', ) + owner_id: Optional[int] = Field( + None, + description='The id of the Organization, Community, or Source that the item belongs to.', + methods=['POST'], + read_only=False, + title='ownerId', + ) owner_name: Optional[str] = Field( None, - allow_mutation=False, description='The name of the Organization, Community, or Source that the item belongs to.', - read_only=True, + methods=['POST'], + read_only=False, title='ownerName', ) password: Optional[str] = Field( @@ -427,6 +455,23 @@ class GroupModel( read_only=False, title='type', ) + up_vote: bool = Field( + None, + description=( + 'Is the intelligence valid and useful? (0 means downvote, 1 means upvote, and NULL ' + 'means no vote).' + ), + methods=['POST', 'PUT'], + read_only=False, + title='upVote', + ) + up_vote_count: Optional[int] = Field( + None, + allow_mutation=False, + description='The total number of users who find the intel useful.', + read_only=True, + title='upVoteCount', + ) web_link: Optional[str] = Field( None, allow_mutation=False, diff --git a/tcex/api/tc/v3/indicator_attributes/indicator_attribute.py b/tcex/api/tc/v3/indicator_attributes/indicator_attribute.py index 709d86c26..548c53266 100644 --- a/tcex/api/tc/v3/indicator_attributes/indicator_attribute.py +++ b/tcex/api/tc/v3/indicator_attributes/indicator_attribute.py @@ -64,12 +64,13 @@ class IndicatorAttribute(ObjectABC): default (bool, kwargs): A flag indicating that this is the default attribute of its type within the object. Only applies to certain attribute and data types. indicator_id (int, kwargs): Indicator associated with attribute. + pinned (bool, kwargs): A flag indicating that the attribute has been noted for importance. security_labels (SecurityLabels, kwargs): A list of Security Labels corresponding to the Intel item (NOTE: Setting this parameter will replace any existing tag(s) with the one(s) specified). source (str, kwargs): The attribute source. type (str, kwargs): The attribute type. - value (str, kwargs): Attribute value. + value (str, kwargs): The attribute value. """ def __init__(self, **kwargs): diff --git a/tcex/api/tc/v3/indicator_attributes/indicator_attribute_filter.py b/tcex/api/tc/v3/indicator_attributes/indicator_attribute_filter.py index 7288720ac..4ff86cd29 100644 --- a/tcex/api/tc/v3/indicator_attributes/indicator_attribute_filter.py +++ b/tcex/api/tc/v3/indicator_attributes/indicator_attribute_filter.py @@ -18,6 +18,15 @@ def _api_endpoint(self) -> str: """Return the API endpoint.""" return ApiEndpoints.INDICATOR_ATTRIBUTES.value + def associable(self, operator: Enum, associable: bool): + """Filter Associable based on **associable** keyword. + + Args: + operator: The operator enum for the filter. + associable: A flag to include an attribute in indicator associations. + """ + self._tql.add_filter('associable', operator, associable, TqlType.BOOLEAN) + def date_added(self, operator: Enum, date_added: str): """Filter Date Added based on **dateAdded** keyword. @@ -131,6 +140,15 @@ def owner_name(self, operator: Enum, owner_name: str): """ self._tql.add_filter('ownerName', operator, owner_name, TqlType.STRING) + def pinned(self, operator: Enum, pinned: bool): + """Filter Pinned based on **pinned** keyword. + + Args: + operator: The operator enum for the filter. + pinned: Whether or not the attribute is pinned with importance. + """ + self._tql.add_filter('pinned', operator, pinned, TqlType.BOOLEAN) + def source(self, operator: Enum, source: str): """Filter Source based on **source** keyword. diff --git a/tcex/api/tc/v3/indicator_attributes/indicator_attribute_model.py b/tcex/api/tc/v3/indicator_attributes/indicator_attribute_model.py index a6d1398bd..2698e2edd 100644 --- a/tcex/api/tc/v3/indicator_attributes/indicator_attribute_model.py +++ b/tcex/api/tc/v3/indicator_attributes/indicator_attribute_model.py @@ -76,7 +76,7 @@ class IndicatorAttributeModel( date_added: Optional[datetime] = Field( None, allow_mutation=False, - description='The date and time that the Attribute was first created.', + description='The date and time that the item was first created.', read_only=True, title='dateAdded', ) @@ -110,6 +110,13 @@ class IndicatorAttributeModel( read_only=True, title='lastModified', ) + pinned: bool = Field( + None, + description='A flag indicating that the attribute has been noted for importance.', + methods=['POST', 'PUT'], + read_only=False, + title='pinned', + ) security_labels: Optional['SecurityLabelsModel'] = Field( None, description=( @@ -136,7 +143,7 @@ class IndicatorAttributeModel( ) value: Optional[str] = Field( None, - description='Attribute value.', + description='The attribute value.', methods=['POST', 'PUT'], min_length=1, read_only=False, diff --git a/tcex/api/tc/v3/indicators/indicator.py b/tcex/api/tc/v3/indicators/indicator.py index 8f621f2a6..e337d9de9 100644 --- a/tcex/api/tc/v3/indicators/indicator.py +++ b/tcex/api/tc/v3/indicators/indicator.py @@ -116,6 +116,11 @@ class Indicator(ObjectABC): ip (str, kwargs): The ip address associated with this indicator (Address specific summary field). md5 (str, kwargs): The md5 associated with this indicator (File specific summary field). + mode (str, kwargs): The operation to perform on the file hashes (delete | merge). + owner_id (int, kwargs): The id of the Organization, Community, or Source that the item + belongs to. + owner_name (str, kwargs): The name of the Organization, Community, or Source that the item + belongs to. private_flag (bool, kwargs): Is this indicator private? rating (int, kwargs): The indicator threat rating. security_labels (SecurityLabels, kwargs): A list of Security Labels corresponding to the diff --git a/tcex/api/tc/v3/indicators/indicator_filter.py b/tcex/api/tc/v3/indicators/indicator_filter.py index 10d4fe51d..2a09752fd 100644 --- a/tcex/api/tc/v3/indicators/indicator_filter.py +++ b/tcex/api/tc/v3/indicators/indicator_filter.py @@ -484,3 +484,22 @@ def value3(self, operator: Enum, value3: str): value3: No description provided. """ self._tql.add_filter('value3', operator, value3, TqlType.STRING) + + def vt_last_updated(self, operator: Enum, vt_last_updated: str): + """Filter Virus Total Last Updated based on **vtLastUpdated** keyword. + + Args: + operator: The operator enum for the filter. + vt_last_updated: The date the indicator has been looked at with Virus Total. + """ + vt_last_updated = self.utils.any_to_datetime(vt_last_updated).strftime('%Y-%m-%d %H:%M:%S') + self._tql.add_filter('vtLastUpdated', operator, vt_last_updated, TqlType.STRING) + + def vt_malicious_count(self, operator: Enum, vt_malicious_count: int): + """Filter Virus Total Malicious Count based on **vtMaliciousCount** keyword. + + Args: + operator: The operator enum for the filter. + vt_malicious_count: The number of malicious reports for an indicator from Virus Total. + """ + self._tql.add_filter('vtMaliciousCount', operator, vt_malicious_count, TqlType.INTEGER) diff --git a/tcex/api/tc/v3/indicators/indicator_model.py b/tcex/api/tc/v3/indicators/indicator_model.py index 86618485a..37cb7503d 100644 --- a/tcex/api/tc/v3/indicators/indicator_model.py +++ b/tcex/api/tc/v3/indicators/indicator_model.py @@ -158,6 +158,29 @@ class IndicatorModel( read_only=False, title='dnsActive', ) + dns_resolution: Optional[dict] = Field( + None, + allow_mutation=False, + applies_to=['Host', 'Address'], + conditional_required=['Host', 'Address'], + description='Dns resolution data for the Host or Address indicator.', + read_only=True, + title='dnsResolution', + ) + enrichment: Optional[dict] = Field( + None, + allow_mutation=False, + description='Enrichment data.', + read_only=True, + title='enrichment', + ) + false_positive_reported_by_user: bool = Field( + None, + allow_mutation=False, + description='Has a false positive been reported by this user for this indicator today?', + read_only=True, + title='falsePositiveReportedByUser', + ) false_positives: Optional[int] = Field( None, allow_mutation=False, @@ -179,6 +202,15 @@ class IndicatorModel( read_only=False, title='fileOccurrences', ) + geo_location: Optional[dict] = Field( + None, + allow_mutation=False, + applies_to=['Host', 'Address'], + conditional_required=['Host', 'Address'], + description='Geographical localization of the Host or Address indicator.', + read_only=True, + title='geoLocation', + ) host_name: Optional[str] = Field( None, applies_to=['Host'], @@ -194,6 +226,15 @@ class IndicatorModel( read_only=True, title='id', ) + investigation_links: Optional[dict] = Field( + None, + allow_mutation=False, + description=( + 'Resource links that provide additional information to assist in investigation.' + ), + read_only=True, + title='investigationLinks', + ) ip: Optional[str] = Field( None, applies_to=['Address'], @@ -226,14 +267,29 @@ class IndicatorModel( read_only=True, title='lastObserved', ) + legacy_link: Optional[str] = Field( + None, + allow_mutation=False, + description='A link to the legacy ThreatConnect details page for this entity.', + read_only=True, + title='legacyLink', + ) md5: Optional[str] = Field( None, applies_to=['File'], description='The md5 associated with this indicator (File specific summary field).', - methods=['POST'], + methods=['POST', 'PUT'], read_only=False, title='md5', ) + mode: Optional[str] = Field( + None, + applies_to=['File'], + description='The operation to perform on the file hashes (delete | merge).', + methods=['POST', 'PUT'], + read_only=False, + title='mode', + ) observations: Optional[int] = Field( None, allow_mutation=False, @@ -241,11 +297,18 @@ class IndicatorModel( read_only=True, title='observations', ) + owner_id: Optional[int] = Field( + None, + description='The id of the Organization, Community, or Source that the item belongs to.', + methods=['POST'], + read_only=False, + title='ownerId', + ) owner_name: Optional[str] = Field( None, - allow_mutation=False, description='The name of the Organization, Community, or Source that the item belongs to.', - read_only=True, + methods=['POST'], + read_only=False, title='ownerName', ) private_flag: bool = Field( @@ -278,7 +341,7 @@ class IndicatorModel( None, applies_to=['File'], description='The sha1 associated with this indicator (File specific summary field).', - methods=['POST'], + methods=['POST', 'PUT'], read_only=False, title='sha1', ) @@ -286,7 +349,7 @@ class IndicatorModel( None, applies_to=['File'], description='The sha256 associated with this indicator (File specific summary field).', - methods=['POST'], + methods=['POST', 'PUT'], read_only=False, title='sha256', ) @@ -352,6 +415,20 @@ class IndicatorModel( read_only=True, title='threatAssessScore', ) + threat_assess_score_false_positive: Optional[int] = Field( + None, + allow_mutation=False, + description='The Threat Assess score for false positives related to this indicator.', + read_only=True, + title='threatAssessScoreFalsePositive', + ) + threat_assess_score_observed: Optional[int] = Field( + None, + allow_mutation=False, + description='The Threat Assess score observed for this indicator.', + read_only=True, + title='threatAssessScoreObserved', + ) type: Optional[str] = Field( None, description='The **type** for the Indicator.', @@ -388,6 +465,15 @@ class IndicatorModel( read_only=True, title='webLink', ) + whois: Optional[dict] = Field( + None, + allow_mutation=False, + applies_to=['Host'], + conditional_required=['Host'], + description='The whois data for the indicator.', + read_only=True, + title='whois', + ) whois_active: bool = Field( None, applies_to=['Host'], diff --git a/tcex/api/tc/v3/security/users/user_filter.py b/tcex/api/tc/v3/security/users/user_filter.py index f51390c2a..11559b3de 100644 --- a/tcex/api/tc/v3/security/users/user_filter.py +++ b/tcex/api/tc/v3/security/users/user_filter.py @@ -16,6 +16,15 @@ def _api_endpoint(self) -> str: """Return the API endpoint.""" return ApiEndpoints.USERS.value + def disabled(self, operator: Enum, disabled: bool): + """Filter Disabled based on **disabled** keyword. + + Args: + operator: The operator enum for the filter. + disabled: A flag indicating whether or not the user's account has been disabled. + """ + self._tql.add_filter('disabled', operator, disabled, TqlType.BOOLEAN) + def first_name(self, operator: Enum, first_name: str): """Filter First Name based on **firstName** keyword. @@ -43,6 +52,34 @@ def id(self, operator: Enum, id: int): # pylint: disable=redefined-builtin """ self._tql.add_filter('id', operator, id, TqlType.INTEGER) + def job_function(self, operator: Enum, job_function: str): + """Filter Job Function based on **jobFunction** keyword. + + Args: + operator: The operator enum for the filter. + job_function: The user's job function. + """ + self._tql.add_filter('jobFunction', operator, job_function, TqlType.STRING) + + def job_role(self, operator: Enum, job_role: str): + """Filter Job Role based on **jobRole** keyword. + + Args: + operator: The operator enum for the filter. + job_role: The user's job role. + """ + self._tql.add_filter('jobRole', operator, job_role, TqlType.STRING) + + def last_login(self, operator: Enum, last_login: str): + """Filter Last Login based on **lastLogin** keyword. + + Args: + operator: The operator enum for the filter. + last_login: The last time the user logged in. + """ + last_login = self.utils.any_to_datetime(last_login).strftime('%Y-%m-%d %H:%M:%S') + self._tql.add_filter('lastLogin', operator, last_login, TqlType.STRING) + def last_name(self, operator: Enum, last_name: str): """Filter Last Name based on **lastName** keyword. @@ -52,6 +89,108 @@ def last_name(self, operator: Enum, last_name: str): """ self._tql.add_filter('lastName', operator, last_name, TqlType.STRING) + def last_password_change(self, operator: Enum, last_password_change: str): + """Filter Last Password Change based on **lastPasswordChange** keyword. + + Args: + operator: The operator enum for the filter. + last_password_change: The last time the user changed their password. + """ + last_password_change = self.utils.any_to_datetime(last_password_change).strftime( + '%Y-%m-%d %H:%M:%S' + ) + self._tql.add_filter('lastPasswordChange', operator, last_password_change, TqlType.STRING) + + def locked(self, operator: Enum, locked: bool): + """Filter Locked based on **locked** keyword. + + Args: + operator: The operator enum for the filter. + locked: A flag indicating whether or not the user's account has been locked. + """ + self._tql.add_filter('locked', operator, locked, TqlType.BOOLEAN) + + def logout_interval_minutes(self, operator: Enum, logout_interval_minutes: int): + """Filter Logout Interval based on **logoutIntervalMinutes** keyword. + + Args: + operator: The operator enum for the filter. + logout_interval_minutes: The configured period of time to wait for idle activity before + being logged out. + """ + self._tql.add_filter( + 'logoutIntervalMinutes', operator, logout_interval_minutes, TqlType.INTEGER + ) + + def password_reset_required(self, operator: Enum, password_reset_required: bool): + """Filter Password Reset Required based on **passwordResetRequired** keyword. + + Args: + operator: The operator enum for the filter. + password_reset_required: A flag indicating whether or not the user's password needs to + be reset upoin next login. + """ + self._tql.add_filter( + 'passwordResetRequired', operator, password_reset_required, TqlType.BOOLEAN + ) + + def pseudonym(self, operator: Enum, pseudonym: str): + """Filter Pseudonym based on **pseudonym** keyword. + + Args: + operator: The operator enum for the filter. + pseudonym: The user's pseudonym. + """ + self._tql.add_filter('pseudonym', operator, pseudonym, TqlType.STRING) + + def system_role(self, operator: Enum, system_role: str): + """Filter Role Name based on **systemRole** keyword. + + Args: + operator: The operator enum for the filter. + system_role: The system role name defined for the user. + """ + self._tql.add_filter('systemRole', operator, system_role, TqlType.STRING) + + def terms_accepted(self, operator: Enum, terms_accepted: bool): + """Filter Terms Accepted based on **termsAccepted** keyword. + + Args: + operator: The operator enum for the filter. + terms_accepted: Flag indicating whether user has accepted the current Terms of Service. + """ + self._tql.add_filter('termsAccepted', operator, terms_accepted, TqlType.BOOLEAN) + + def terms_accepted_date(self, operator: Enum, terms_accepted_date: str): + """Filter Terms Accepted Date based on **termsAcceptedDate** keyword. + + Args: + operator: The operator enum for the filter. + terms_accepted_date: Date and time the user accepted the current Terms of Service. + """ + terms_accepted_date = self.utils.any_to_datetime(terms_accepted_date).strftime( + '%Y-%m-%d %H:%M:%S' + ) + self._tql.add_filter('termsAcceptedDate', operator, terms_accepted_date, TqlType.STRING) + + def tql_timeout(self, operator: Enum, tql_timeout: int): + """Filter TQL Timeout based on **tqlTimeout** keyword. + + Args: + operator: The operator enum for the filter. + tql_timeout: The custom TQL timeout value (if defined). + """ + self._tql.add_filter('tqlTimeout', operator, tql_timeout, TqlType.INTEGER) + + def ui_theme(self, operator: Enum, ui_theme: str): + """Filter UI Theme based on **uiTheme** keyword. + + Args: + operator: The operator enum for the filter. + ui_theme: The user's configured theme (e.g. light/dark). + """ + self._tql.add_filter('uiTheme', operator, ui_theme, TqlType.STRING) + def user_name(self, operator: Enum, user_name: str): """Filter User Name based on **userName** keyword. diff --git a/tcex/api/tc/v3/security/users/user_model.py b/tcex/api/tc/v3/security/users/user_model.py index d06597a75..3f39803d3 100644 --- a/tcex/api/tc/v3/security/users/user_model.py +++ b/tcex/api/tc/v3/security/users/user_model.py @@ -65,40 +65,12 @@ class UserModel( _shared_type = PrivateAttr(False) _staged = PrivateAttr(False) - first_name: Optional[str] = Field( - None, - allow_mutation=False, - description='The **first name** for the User.', - read_only=True, - title='firstName', - ) id: Optional[int] = Field( None, description='The ID of the item.', read_only=True, title='id', ) - last_name: Optional[str] = Field( - None, - allow_mutation=False, - description='The **last name** for the User.', - read_only=True, - title='lastName', - ) - pseudonym: Optional[str] = Field( - None, - allow_mutation=False, - description='The **pseudonym** for the User.', - read_only=True, - title='pseudonym', - ) - role: Optional[str] = Field( - None, - allow_mutation=False, - description='The **role** for the User.', - read_only=True, - title='role', - ) user_name: Optional[str] = Field( None, allow_mutation=False, diff --git a/tcex/api/tc/v3/victim_attributes/victim_attribute.py b/tcex/api/tc/v3/victim_attributes/victim_attribute.py index d0ec1c865..d403b56ce 100644 --- a/tcex/api/tc/v3/victim_attributes/victim_attribute.py +++ b/tcex/api/tc/v3/victim_attributes/victim_attribute.py @@ -1,17 +1,22 @@ """VictimAttribute / VictimAttributes Object""" # standard library -from typing import Union +from typing import TYPE_CHECKING, Iterator, Union # first-party from tcex.api.tc.v3.api_endpoints import ApiEndpoints from tcex.api.tc.v3.object_abc import ObjectABC from tcex.api.tc.v3.object_collection_abc import ObjectCollectionABC +from tcex.api.tc.v3.security_labels.security_label_model import SecurityLabelModel from tcex.api.tc.v3.victim_attributes.victim_attribute_filter import VictimAttributeFilter from tcex.api.tc.v3.victim_attributes.victim_attribute_model import ( VictimAttributeModel, VictimAttributesModel, ) +if TYPE_CHECKING: # pragma: no cover + # first-party + from tcex.api.tc.v3.security_labels.security_label import SecurityLabel + class VictimAttributes(ObjectCollectionABC): """VictimAttributes Collection. @@ -58,9 +63,13 @@ class VictimAttribute(ObjectABC): Args: default (bool, kwargs): A flag indicating that this is the default attribute of its type within the object. Only applies to certain attribute and data types. + pinned (bool, kwargs): A flag indicating that the attribute has been noted for importance. + security_labels (SecurityLabels, kwargs): A list of Security Labels corresponding to the + Intel item (NOTE: Setting this parameter will replace any existing tag(s) with + the one(s) specified). source (str, kwargs): The attribute source. type (str, kwargs): The attribute type. - value (str, kwargs): Attribute value. + value (str, kwargs): The attribute value. victim_id (int, kwargs): Victim associated with attribute. """ @@ -95,3 +104,23 @@ def model(self, data: Union['VictimAttributeModel', dict]): self._model = type(self.model)(**data) else: raise RuntimeError(f'Invalid data type: {type(data)} provided.') + + @property + def security_labels(self) -> Iterator['SecurityLabel']: + """Yield SecurityLabel from SecurityLabels.""" + # first-party + from tcex.api.tc.v3.security_labels.security_label import SecurityLabels + + yield from self._iterate_over_sublist(SecurityLabels) + + def stage_security_label(self, data: Union[dict, 'ObjectABC', 'SecurityLabelModel']): + """Stage security_label on the object.""" + if isinstance(data, ObjectABC): + data = data.model + elif isinstance(data, dict): + data = SecurityLabelModel(**data) + + if not isinstance(data, SecurityLabelModel): + raise RuntimeError('Invalid type passed in to stage_security_label') + data._staged = True + self.model.security_labels.data.append(data) diff --git a/tcex/api/tc/v3/victim_attributes/victim_attribute_filter.py b/tcex/api/tc/v3/victim_attributes/victim_attribute_filter.py index fe3436e71..2891847de 100644 --- a/tcex/api/tc/v3/victim_attributes/victim_attribute_filter.py +++ b/tcex/api/tc/v3/victim_attributes/victim_attribute_filter.py @@ -122,6 +122,15 @@ def owner_name(self, operator: Enum, owner_name: str): """ self._tql.add_filter('ownerName', operator, owner_name, TqlType.STRING) + def pinned(self, operator: Enum, pinned: bool): + """Filter Pinned based on **pinned** keyword. + + Args: + operator: The operator enum for the filter. + pinned: Whether or not the attribute is pinned with importance. + """ + self._tql.add_filter('pinned', operator, pinned, TqlType.BOOLEAN) + def source(self, operator: Enum, source: str): """Filter Source based on **source** keyword. diff --git a/tcex/api/tc/v3/victim_attributes/victim_attribute_model.py b/tcex/api/tc/v3/victim_attributes/victim_attribute_model.py index 48fa6eb87..59bac64e9 100644 --- a/tcex/api/tc/v3/victim_attributes/victim_attribute_model.py +++ b/tcex/api/tc/v3/victim_attributes/victim_attribute_model.py @@ -76,7 +76,7 @@ class VictimAttributeModel( date_added: Optional[datetime] = Field( None, allow_mutation=False, - description='The date and time that the Attribute was first created.', + description='The date and time that the item was first created.', read_only=True, title='dateAdded', ) @@ -103,6 +103,23 @@ class VictimAttributeModel( read_only=True, title='lastModified', ) + pinned: bool = Field( + None, + description='A flag indicating that the attribute has been noted for importance.', + methods=['POST', 'PUT'], + read_only=False, + title='pinned', + ) + security_labels: Optional['SecurityLabelsModel'] = Field( + None, + description=( + 'A list of Security Labels corresponding to the Intel item (NOTE: Setting this ' + 'parameter will replace any existing tag(s) with the one(s) specified).' + ), + methods=['POST', 'PUT'], + read_only=False, + title='securityLabels', + ) source: Optional[str] = Field( None, description='The attribute source.', @@ -119,7 +136,7 @@ class VictimAttributeModel( ) value: Optional[str] = Field( None, - description='Attribute value.', + description='The attribute value.', methods=['POST', 'PUT'], min_length=1, read_only=False, @@ -133,6 +150,12 @@ class VictimAttributeModel( title='victimId', ) + @validator('security_labels', always=True) + def _validate_security_labels(cls, v): + if not v: + return SecurityLabelsModel() + return v + @validator('created_by', always=True) def _validate_user(cls, v): if not v: @@ -142,6 +165,7 @@ def _validate_user(cls, v): # first-party from tcex.api.tc.v3.security.users.user_model import UserModel +from tcex.api.tc.v3.security_labels.security_label_model import SecurityLabelsModel # add forward references VictimAttributeDataModel.update_forward_refs() diff --git a/tcex/utils/string_operations.py b/tcex/utils/string_operations.py index 16c2aa1ee..cc6bdc3ef 100644 --- a/tcex/utils/string_operations.py +++ b/tcex/utils/string_operations.py @@ -126,7 +126,7 @@ def truncate_string( return f'{output.rstrip()}{append_chars}' @property - def _camel_pattern(self) -> re.Pattern: + def _camel_pattern(self) -> 're.Pattern': """Return compiled re pattern.""" return re.compile(r'(? 'Indicator': """Create an case. If a case_name is not provide a dynamic case name will be used. @@ -479,7 +480,7 @@ class TestThreatIntelligence: owner = None required_fields = {} ti = None - ti_helper = None + ti_helper: TIHelper def teardown_method(self): """Clean up resources""" @@ -766,7 +767,7 @@ def indicator_add_attribute(self, request): ti_data = response_data.get('data', {}).get('attribute') # assert response - assert r.status_code == 201 + assert r.status_code == 201, f'(status-code={r.status_code}, message={r.text})' assert response_data.get('status') == 'Success' # validate ti data diff --git a/tests/api/tc/v3/artifacts/test_artifact_interface.py b/tests/api/tc/v3/artifacts/test_artifact_interface.py index f50dea044..c3806ac40 100644 --- a/tests/api/tc/v3/artifacts/test_artifact_interface.py +++ b/tests/api/tc/v3/artifacts/test_artifact_interface.py @@ -142,7 +142,7 @@ def test_artifact_create_and_retrieve_nested_types(self, request: FixtureRequest artifact_data = { 'case_id': case.model.id, 'intel_type': 'indicator-ASN', - 'summary': f'asn{randint(100, 999)}', + 'summary': f'ASN{randint(100, 999)}', 'type': 'ASN', } @@ -292,7 +292,7 @@ def test_artifact_all_filter_on_task(self, request: FixtureRequest): # [Create Testing] define object data artifact_data = { 'intel_type': 'indicator-ASN', - 'summary': f'asn{randint(100, 999)}', + 'summary': f'ASN{randint(100, 999)}', 'task_id': task.model.id, 'type': 'ASN', } @@ -330,7 +330,7 @@ def test_artifact_create_by_case_xid(self, request: FixtureRequest): artifact_data = { 'case_xid': case.model.xid, 'intel_type': 'indicator-ASN', - 'summary': f'asn{randint(100, 999)}', + 'summary': f'ASN{randint(100, 999)}', 'type': 'ASN', } diff --git a/tests/api/tc/v3/case_attributes/test_case_attribute_snippets.py b/tests/api/tc/v3/case_attributes/test_case_attribute_snippets.py index 18cad2a9e..b70925590 100644 --- a/tests/api/tc/v3/case_attributes/test_case_attribute_snippets.py +++ b/tests/api/tc/v3/case_attributes/test_case_attribute_snippets.py @@ -34,6 +34,7 @@ def test_case_attributes_tql_filter(self): 'value': 'An example description attribute', } ], + params={'fields': 'attributes'}, ) # get attribute id attribute_id = case.model.attributes.data[0].id @@ -59,6 +60,7 @@ def test_case_attribute_get_by_id(self): 'value': 'An example description attribute', } ], + params={'fields': 'attributes'}, ) # get attribute id attribute_id = case.model.attributes.data[0].id diff --git a/tests/api/tc/v3/cases/test_case_interface.py b/tests/api/tc/v3/cases/test_case_interface.py index 19d60e706..d93a36f0b 100644 --- a/tests/api/tc/v3/cases/test_case_interface.py +++ b/tests/api/tc/v3/cases/test_case_interface.py @@ -143,7 +143,7 @@ def test_case_create_and_retrieve_nested_types(self, request: 'pytest.FixtureReq # [Create Testing] define artifact data artifact_data = { - 'summary': f'asn{randint(100, 999)}', + 'summary': f'ASN{randint(100, 999)}', 'type': 'ASN', } @@ -258,7 +258,7 @@ def test_case_nested_objects(self, request: 'pytest.FixtureRequest'): """Test nested objects on a Case""" # [Pre-Requisite] - create case with artifact data artifact_data = { - 'summary': f'asn{randint(100, 999)}', + 'summary': f'ASN{randint(100, 999)}', 'type': 'ASN', } case_data = { @@ -330,7 +330,7 @@ def test_case_nested_objects(self, request: 'pytest.FixtureRequest'): # [Stage Testing] Stage another artifact onto the case. artifact_data_2 = { - 'summary': f'asn{randint(100, 999)}', + 'summary': f'ASN{randint(100, 999)}', 'type': 'ASN', } case.stage_artifact(artifact_data_2) @@ -392,7 +392,7 @@ def test_case_all_filters(self, request: 'pytest.FixtureRequest'): note_data = {'text': f'sample note for {request.node.name} test case.'} artifact_data = { 'intel_type': 'indicator-ASN', - 'summary': f'asn{randint(100, 999)}', + 'summary': f'ASN{randint(100, 999)}', 'type': 'ASN', } # [Create Testing] define task data @@ -588,7 +588,7 @@ def test_case_delete_by_id(self): # artifact_data = {'data': []} # for _ in range(0, artifact_count): # # [Pre-Requisite] define artifact data - # summary = f'asn{randint(100, 999)}' + # summary = f'ASN{randint(100, 999)}' # artifact_data.get('data').append( # { # 'intel_type': 'indicator-ASN', diff --git a/tests/api/tc/v3/groups/test_group_interface.py b/tests/api/tc/v3/groups/test_group_interface.py index 6c11d8dac..3b48d178f 100644 --- a/tests/api/tc/v3/groups/test_group_interface.py +++ b/tests/api/tc/v3/groups/test_group_interface.py @@ -225,6 +225,7 @@ def test_group_create_and_retrieve_nested_types(self): """ groups = self.v3.groups() groups.filter.has_indicator.last_modified(TqlOperator.GT, '2021-11-09T00:00:00Z') + groups.filter.owner_name(TqlOperator.EQ, 'TCI') # groups.filter.has_indicator.last_modified(TqlOperator.GT, 'yesterday') for group in groups: diff --git a/tests/api/tc/v3/indicators/test_indicator_snippets.py b/tests/api/tc/v3/indicators/test_indicator_snippets.py index bfed44539..73b84cafa 100644 --- a/tests/api/tc/v3/indicators/test_indicator_snippets.py +++ b/tests/api/tc/v3/indicators/test_indicator_snippets.py @@ -515,7 +515,7 @@ def test_indicator_get_by_raw_tql(self): indicators = self.tcex.v3.indicators() indicators.filter.tql = ( 'typeName in ("Host", "Address", "EmailAddress", "File", "URL") and ' - '(summary like "%example%" or tag like "%example%")' + '(summary like "%example%" or tag like "%example%") and (ownerName EQ "TCI")' ) for indicator in indicators: diff --git a/tests/api/tc/v3/tasks/test_task_interface.py b/tests/api/tc/v3/tasks/test_task_interface.py index 52327301d..0ca477784 100644 --- a/tests/api/tc/v3/tasks/test_task_interface.py +++ b/tests/api/tc/v3/tasks/test_task_interface.py @@ -62,7 +62,7 @@ def test_task_create_and_retrieve_nested_types(self, request: 'pytest.FixtureReq artifact_data = { 'intel_type': 'indicator-ASN', - 'summary': f'asn{randint(100, 999)}', + 'summary': f'ASN{randint(100, 999)}', 'type': 'ASN', } @@ -148,7 +148,7 @@ def test_task_all_filters(self, request: 'pytest.FixtureRequest'): note_data = {'text': f'sample note for {request.node.name} test case.'} artifact_data = { 'intel_type': 'indicator-ASN', - 'summary': f'asn{randint(100, 999)}', + 'summary': f'ASN{randint(100, 999)}', 'type': 'ASN', } @@ -357,7 +357,7 @@ def test_task_get_single_by_id_properties(self, request: 'pytest.FixtureRequest' artifact_data = {'data': []} for _ in range(0, artifact_count): # [Pre-Requisite] define artifact data - summary = f'asn{randint(100, 999)}' + summary = f'ASN{randint(100, 999)}' artifact_data.get('data').append( { 'intel_type': 'indicator-ASN', diff --git a/tests/api/tc/v3/v3_helpers.py b/tests/api/tc/v3/v3_helpers.py index cfe7735ee..aa06e19ee 100644 --- a/tests/api/tc/v3/v3_helpers.py +++ b/tests/api/tc/v3/v3_helpers.py @@ -30,6 +30,9 @@ class V3Helper: def __init__(self, v3_object: str): """Initialize Class Properties""" + self.v3_object = v3_object + + # properties self.app = MockApp(runtime_level='Playbook') self.tcex = self.app.tcex self.v3 = self.tcex.v3 @@ -346,9 +349,11 @@ def create_case(self, **kwargs): Args: assignee (str, kwargs): Optional assignee. + attributes (list, kwargs): Optional attributes. date_added (str, kwargs): Optional date_added. description (str, kwargs): Optional description. name (str, kwargs): Optional name. + params (dict, kwargs): Optional params to send to create request. resolution (str, kwargs): Optional resolution. severity (str, kwargs): Optional severity. status (str, kwargs): Optional status. @@ -404,7 +409,7 @@ def create_case(self, **kwargs): case.stage_task(self.v3.task(**task)) # create object - case.create() + case.create(kwargs.get('params', {})) # store case id for cleanup self._v3_objects.append(case) @@ -684,7 +689,7 @@ def cleanup(self): class TestV3: """Test TcEx V3 Base Class""" - v3_helper = None + v3_helper: V3Helper tcex = None utils = Utils() @@ -703,29 +708,42 @@ def obj_api_options(self): we can validate that they match the properties returned from the OPTIONS request. """ for f in self.v3_helper.v3_obj.fields: - # fields that are returned in OPT /v3//filters endpoint, but not OPT /v3/ - if f.get('name') in [ - # case ignore - 'analytics', # per MJ this is a special field that enables returning all analytics - # group ignore - 'victimAssets', - # indicator ignore - 'files', - 'fileAction', - # Returns custom indicator fields as value1/value2/value3 - # instead of their custom names - 'genericCustomIndicatorValues', - 'groups', - 'indicators', - 'owner', - 'threatAssess', - ]: - continue - - if f.get('name') not in self.v3_helper.v3_obj.properties: - assert ( - False - ), f'''{f.get('name')} not in {self.v3_helper.v3_obj.properties.keys()}''' + names = [f.get('name')] + + if self.v3_helper.v3_object == 'artifacts' and 'analytics' in names: + # fix discrepancy between /fields and + names = [ + 'analyticsPriority', + 'analyticsPriorityLevel', + 'analyticsScore', + 'analyticsStatus', + 'analyticsType', + ] + + if self.v3_helper.v3_object in ['cases', 'groups'] and 'userDetails' in names: + # fix discrepancy between /fields and + names = ['createdBy'] + + if self.v3_helper.v3_object == 'indicators': + if 'genericCustomIndicatorValues' in names: + # fix discrepancy between /fields and + names = ['value1', 'value2', 'value3'] + if 'whoIs' in names: + # fix discrepancy between /fields and + names = ['whois'] + if 'threatAssess' in names: + # fix discrepancy between /fields and + names = [ + 'threatAssessConfidence', + 'threatAssessRating', + 'threatAssessScore', + 'threatAssessScoreFalsePositive', + 'threatAssessScoreObserved', + ] + + for name in names: + if name not in self.v3_helper.v3_obj.properties: + assert False, f'''{name} not in {self.v3_helper.v3_obj.properties.keys()}''' def obj_filter_keywords(self): """Test filter keywords.