From f71e6572b0f561e173ebb5655c804b6030acf49c Mon Sep 17 00:00:00 2001 From: Ben Purdy Date: Fri, 14 Jun 2024 10:24:04 -0400 Subject: [PATCH 1/5] APP-4521 --- release_notes.md | 8 +- .../intel_requirement_filter.py | 708 +----------------- .../test_intel_requirements_interface.py | 34 +- tests/api/tc/v3/v3_helpers.py | 8 +- 4 files changed, 67 insertions(+), 691 deletions(-) diff --git a/release_notes.md b/release_notes.md index 00e8a229e..a94ba8621 100644 --- a/release_notes.md +++ b/release_notes.md @@ -2,11 +2,11 @@ ## 4.0.6 -- APP-4522 - [API] Updated batch module to support external date fields -- APP-4521 - [API] Updated support for IR endpoints +- APP-4472 - [API] Added NAICS industry classification module +- APP-4482 - [API] Updated transforms to support new static methods - APP-4520 - [Util] Fixed TCEX string operations trim method -- APP-4482 - [API] Updated transforms to support new static methods. -- APP-4472 - [NAICS] Added NAICS module for lookup by id or name. +- APP-4521 - [API] Updated support for IR endpoints +- APP-4522 - [API] Updated batch module to support external date fields ## 4.0.5 diff --git a/tcex/api/tc/v3/intel_requirements/intel_requirement_filter.py b/tcex/api/tc/v3/intel_requirements/intel_requirement_filter.py index d8ac09ece..3974d51c3 100644 --- a/tcex/api/tc/v3/intel_requirements/intel_requirement_filter.py +++ b/tcex/api/tc/v3/intel_requirements/intel_requirement_filter.py @@ -23,330 +23,31 @@ def _api_endpoint(self) -> str: """Return the API endpoint.""" return ApiEndpoints.INTEL_REQUIREMENTS.value - def associated_indicator(self, operator: Enum, associated_indicator: int | list): - """Filter associatedIndicator based on **associatedIndicator** keyword. + def category(self, operator: Enum, category: list | str): + """Filter Category based on **category** keyword. Args: operator: The operator enum for the filter. - associated_indicator: No description provided. + category: The subtype of the intel requirement. """ - if isinstance(associated_indicator, list) and operator not in self.list_types: + if isinstance(category, list) and operator not in self.list_types: raise RuntimeError( 'Operator must be CONTAINS, NOT_CONTAINS, IN' 'or NOT_IN when filtering on a list of values.' ) - self._tql.add_filter('associatedIndicator', operator, associated_indicator, TqlType.INTEGER) - - def attribute(self, operator: Enum, attribute: list | str): - """Filter attribute based on **attribute** keyword. - - Args: - operator: The operator enum for the filter. - attribute: No description provided. - """ - if isinstance(attribute, list) and operator not in self.list_types: - raise RuntimeError( - 'Operator must be CONTAINS, NOT_CONTAINS, IN' - 'or NOT_IN when filtering on a list of values.' - ) - - self._tql.add_filter('attribute', operator, attribute, TqlType.STRING) - - def child_group(self, operator: Enum, child_group: int | list): - """Filter childGroup based on **childGroup** keyword. - - Args: - operator: The operator enum for the filter. - child_group: No description provided. - """ - if isinstance(child_group, list) and operator not in self.list_types: - raise RuntimeError( - 'Operator must be CONTAINS, NOT_CONTAINS, IN' - 'or NOT_IN when filtering on a list of values.' - ) - - self._tql.add_filter('childGroup', operator, child_group, TqlType.INTEGER) - - def created_by(self, operator: Enum, created_by: list | str): - """Filter Created By based on **createdBy** keyword. - - Args: - operator: The operator enum for the filter. - created_by: The user who created the group. - """ - if isinstance(created_by, list) and operator not in self.list_types: - raise RuntimeError( - 'Operator must be CONTAINS, NOT_CONTAINS, IN' - 'or NOT_IN when filtering on a list of values.' - ) - - self._tql.add_filter('createdBy', operator, created_by, TqlType.STRING) + self._tql.add_filter('category', operator, category, TqlType.STRING) def date_added(self, operator: Enum, date_added: Arrow | datetime | int | str): """Filter Date Added based on **dateAdded** keyword. Args: operator: The operator enum for the filter. - date_added: The date the group was added to the system. + date_added: The date the requirement was added to the system. """ date_added = self.util.any_to_datetime(date_added).strftime('%Y-%m-%d %H:%M:%S') self._tql.add_filter('dateAdded', operator, date_added, TqlType.STRING) - def document_date_added( - self, operator: Enum, document_date_added: Arrow | datetime | int | str - ): - """Filter Date Added (Document) based on **documentDateAdded** keyword. - - Args: - operator: The operator enum for the filter. - document_date_added: The date the document was added. - """ - document_date_added = self.util.any_to_datetime(document_date_added).strftime( - '%Y-%m-%d %H:%M:%S' - ) - self._tql.add_filter('documentDateAdded', operator, document_date_added, TqlType.STRING) - - def document_filename(self, operator: Enum, document_filename: list | str): - """Filter Filename (Document) based on **documentFilename** keyword. - - Args: - operator: The operator enum for the filter. - document_filename: The file name of the document. - """ - if isinstance(document_filename, list) and operator not in self.list_types: - raise RuntimeError( - 'Operator must be CONTAINS, NOT_CONTAINS, IN' - 'or NOT_IN when filtering on a list of values.' - ) - - self._tql.add_filter('documentFilename', operator, document_filename, TqlType.STRING) - - def document_filesize(self, operator: Enum, document_filesize: int | list): - """Filter File Size (Document) based on **documentFilesize** keyword. - - Args: - operator: The operator enum for the filter. - document_filesize: The filesize of the document. - """ - if isinstance(document_filesize, list) and operator not in self.list_types: - raise RuntimeError( - 'Operator must be CONTAINS, NOT_CONTAINS, IN' - 'or NOT_IN when filtering on a list of values.' - ) - - self._tql.add_filter('documentFilesize', operator, document_filesize, TqlType.INTEGER) - - def document_status(self, operator: Enum, document_status: list | str): - """Filter Status (Document) based on **documentStatus** keyword. - - Args: - operator: The operator enum for the filter. - document_status: The status of the document. - """ - if isinstance(document_status, list) and operator not in self.list_types: - raise RuntimeError( - 'Operator must be CONTAINS, NOT_CONTAINS, IN' - 'or NOT_IN when filtering on a list of values.' - ) - - self._tql.add_filter('documentStatus', operator, document_status, TqlType.STRING) - - def document_type(self, operator: Enum, document_type: list | str): - """Filter Type (Document) based on **documentType** keyword. - - Args: - operator: The operator enum for the filter. - document_type: The type of document. - """ - if isinstance(document_type, list) and operator not in self.list_types: - raise RuntimeError( - 'Operator must be CONTAINS, NOT_CONTAINS, IN' - 'or NOT_IN when filtering on a list of values.' - ) - - self._tql.add_filter('documentType', operator, document_type, TqlType.STRING) - - def downvote_count(self, operator: Enum, downvote_count: int | list): - """Filter Downvote Count based on **downvoteCount** keyword. - - Args: - operator: The operator enum for the filter. - downvote_count: The number of downvotes the group has received. - """ - if isinstance(downvote_count, list) and operator not in self.list_types: - raise RuntimeError( - 'Operator must be CONTAINS, NOT_CONTAINS, IN' - 'or NOT_IN when filtering on a list of values.' - ) - - self._tql.add_filter('downvoteCount', operator, downvote_count, TqlType.INTEGER) - - def email_date(self, operator: Enum, email_date: Arrow | datetime | int | str): - """Filter Date (Email) based on **emailDate** keyword. - - Args: - operator: The operator enum for the filter. - email_date: The date of the email. - """ - email_date = self.util.any_to_datetime(email_date).strftime('%Y-%m-%d %H:%M:%S') - self._tql.add_filter('emailDate', operator, email_date, TqlType.STRING) - - def email_from(self, operator: Enum, email_from: list | str): - """Filter From (Email) based on **emailFrom** keyword. - - Args: - operator: The operator enum for the filter. - email_from: The 'from' field of the email. - """ - if isinstance(email_from, list) and operator not in self.list_types: - raise RuntimeError( - 'Operator must be CONTAINS, NOT_CONTAINS, IN' - 'or NOT_IN when filtering on a list of values.' - ) - - self._tql.add_filter('emailFrom', operator, email_from, TqlType.STRING) - - def email_score(self, operator: Enum, email_score: int | list): - """Filter Score (Email) based on **emailScore** keyword. - - Args: - operator: The operator enum for the filter. - email_score: The score of the email. - """ - if isinstance(email_score, list) and operator not in self.list_types: - raise RuntimeError( - 'Operator must be CONTAINS, NOT_CONTAINS, IN' - 'or NOT_IN when filtering on a list of values.' - ) - - self._tql.add_filter('emailScore', operator, email_score, TqlType.INTEGER) - - def email_score_includes_body(self, operator: Enum, email_score_includes_body: bool): - """Filter Score Includes Body (Email) based on **emailScoreIncludesBody** keyword. - - Args: - operator: The operator enum for the filter. - email_score_includes_body: A true/false indicating if the body was included in the - scoring of the email. - """ - self._tql.add_filter( - 'emailScoreIncludesBody', operator, email_score_includes_body, TqlType.BOOLEAN - ) - - def email_subject(self, operator: Enum, email_subject: list | str): - """Filter Subject (Email) based on **emailSubject** keyword. - - Args: - operator: The operator enum for the filter. - email_subject: The subject of the email. - """ - if isinstance(email_subject, list) and operator not in self.list_types: - raise RuntimeError( - 'Operator must be CONTAINS, NOT_CONTAINS, IN' - 'or NOT_IN when filtering on a list of values.' - ) - - self._tql.add_filter('emailSubject', operator, email_subject, TqlType.STRING) - - def event_date(self, operator: Enum, event_date: Arrow | datetime | int | str): - """Filter Event Date based on **eventDate** keyword. - - Args: - operator: The operator enum for the filter. - event_date: The event date of the group. - """ - event_date = self.util.any_to_datetime(event_date).strftime('%Y-%m-%d %H:%M:%S') - self._tql.add_filter('eventDate', operator, event_date, TqlType.STRING) - - def event_type(self, operator: Enum, event_type: list | str): - """Filter Event Type based on **eventType** keyword. - - Args: - operator: The operator enum for the filter. - event_type: The event type of the group. - """ - if isinstance(event_type, list) and operator not in self.list_types: - raise RuntimeError( - 'Operator must be CONTAINS, NOT_CONTAINS, IN' - 'or NOT_IN when filtering on a list of values.' - ) - - self._tql.add_filter('eventType', operator, event_type, TqlType.STRING) - - def external_date_added( - self, operator: Enum, external_date_added: Arrow | datetime | int | str - ): - """Filter External Date Added based on **externalDateAdded** keyword. - - Args: - operator: The operator enum for the filter. - external_date_added: The date and time that the group was first created externally. - """ - external_date_added = self.util.any_to_datetime(external_date_added).strftime( - '%Y-%m-%d %H:%M:%S' - ) - self._tql.add_filter('externalDateAdded', operator, external_date_added, TqlType.STRING) - - def external_date_expires( - self, operator: Enum, external_date_expires: Arrow | datetime | int | str - ): - """Filter External Date Expires based on **externalDateExpires** keyword. - - Args: - operator: The operator enum for the filter. - external_date_expires: The date and time the group expires externally. - """ - external_date_expires = self.util.any_to_datetime(external_date_expires).strftime( - '%Y-%m-%d %H:%M:%S' - ) - self._tql.add_filter('externalDateExpires', operator, external_date_expires, TqlType.STRING) - - def external_last_modified( - self, operator: Enum, external_last_modified: Arrow | datetime | int | str - ): - """Filter External Last Modified based on **externalLastModified** keyword. - - Args: - operator: The operator enum for the filter. - external_last_modified: The date and time the group was modified externally. - """ - external_last_modified = self.util.any_to_datetime(external_last_modified).strftime( - '%Y-%m-%d %H:%M:%S' - ) - self._tql.add_filter( - 'externalLastModified', operator, external_last_modified, TqlType.STRING - ) - - def first_seen(self, operator: Enum, first_seen: Arrow | datetime | int | str): - """Filter First Seen based on **firstSeen** keyword. - - Args: - operator: The operator enum for the filter. - first_seen: The date and time that the group was first seen. - """ - first_seen = self.util.any_to_datetime(first_seen).strftime('%Y-%m-%d %H:%M:%S') - self._tql.add_filter('firstSeen', operator, first_seen, TqlType.STRING) - - def generated_report(self, operator: Enum, generated_report: bool): - """Filter Generated (Report) based on **generatedReport** keyword. - - Args: - operator: The operator enum for the filter. - generated_report: Boolean flag indicating if the Report was auto-generated. - """ - self._tql.add_filter('generatedReport', operator, generated_report, TqlType.BOOLEAN) - - @property - def has_all_tags(self): - """Return **TagFilter** for further filtering.""" - # first-party - from tcex.api.tc.v3.tags.tag_filter import TagFilter - - tags = TagFilter(Tql()) - self._tql.add_filter('hasAllTags', TqlOperator.EQ, tags, TqlType.SUB_QUERY) - return tags - @property def has_artifact(self): """Return **ArtifactFilter** for further filtering.""" @@ -357,16 +58,6 @@ def has_artifact(self): self._tql.add_filter('hasArtifact', TqlOperator.EQ, artifacts, TqlType.SUB_QUERY) return artifacts - @property - def has_attribute(self): - """Return **GroupAttributeFilter** for further filtering.""" - # first-party - from tcex.api.tc.v3.group_attributes.group_attribute_filter import GroupAttributeFilter - - attributes = GroupAttributeFilter(Tql()) - self._tql.add_filter('hasAttribute', TqlOperator.EQ, attributes, TqlType.SUB_QUERY) - return attributes - @property def has_case(self): """Return **CaseFilter** for further filtering.""" @@ -397,40 +88,6 @@ def has_indicator(self): self._tql.add_filter('hasIndicator', TqlOperator.EQ, indicators, TqlType.SUB_QUERY) return indicators - def has_intel_query(self, operator: Enum, has_intel_query: int | list): - """Filter Associated User Queries based on **hasIntelQuery** keyword. - - Args: - operator: The operator enum for the filter. - has_intel_query: A nested query for association to User Queries. - """ - if isinstance(has_intel_query, list) and operator not in self.list_types: - raise RuntimeError( - 'Operator must be CONTAINS, NOT_CONTAINS, IN' - 'or NOT_IN when filtering on a list of values.' - ) - - self._tql.add_filter('hasIntelQuery', operator, has_intel_query, TqlType.INTEGER) - - @property - def has_intel_requirement(self): - """Return **IntelRequirementFilter** for further filtering.""" - intel_requirements = IntelRequirementFilter(Tql()) - self._tql.add_filter( - 'hasIntelRequirement', TqlOperator.EQ, intel_requirements, TqlType.SUB_QUERY - ) - return intel_requirements - - @property - def has_security_label(self): - """Return **SecurityLabel** for further filtering.""" - # first-party - from tcex.api.tc.v3.security_labels.security_label_filter import SecurityLabelFilter - - security_labels = SecurityLabelFilter(Tql()) - self._tql.add_filter('hasSecurityLabel', TqlOperator.EQ, security_labels, TqlType.SUB_QUERY) - return security_labels - @property def has_tag(self): """Return **TagFilter** for further filtering.""" @@ -462,11 +119,11 @@ def has_victim_asset(self): return victim_assets def id(self, operator: Enum, id: int | list): # pylint: disable=redefined-builtin - """Filter ID based on **id** keyword. + """Filter Bucket ID based on **id** keyword. Args: operator: The operator enum for the filter. - id: The ID of the group. + id: The bucket ID of the intel requirement. """ if isinstance(id, list) and operator not in self.list_types: raise RuntimeError( @@ -476,56 +133,52 @@ def id(self, operator: Enum, id: int | list): # pylint: disable=redefined-built self._tql.add_filter('id', operator, id, TqlType.INTEGER) - def insights(self, operator: Enum, insights: list | str): - """Filter Insights (Report) based on **insights** keyword. + def id_bucket(self, operator: Enum, id_bucket: int | list): + """Filter Bucket ID based on **idBucket** keyword. Args: operator: The operator enum for the filter. - insights: The AI generated synopsis of the report. + id_bucket: The bucket ID of the intel requirement. """ - if isinstance(insights, list) and operator not in self.list_types: + if isinstance(id_bucket, list) and operator not in self.list_types: raise RuntimeError( 'Operator must be CONTAINS, NOT_CONTAINS, IN' 'or NOT_IN when filtering on a list of values.' ) - self._tql.add_filter('insights', operator, insights, TqlType.STRING) + self._tql.add_filter('idBucket', operator, id_bucket, TqlType.INTEGER) - def is_group(self, operator: Enum, is_group: bool): - """Filter isGroup based on **isGroup** keyword. + def id_parent(self, operator: Enum, id_parent: int | list): + """Filter ID Bucket Parent based on **idParent** keyword. Args: operator: The operator enum for the filter. - is_group: No description provided. + id_parent: The ID of the parent intel requirement. """ - self._tql.add_filter('isGroup', operator, is_group, TqlType.BOOLEAN) + if isinstance(id_parent, list) and operator not in self.list_types: + raise RuntimeError( + 'Operator must be CONTAINS, NOT_CONTAINS, IN' + 'or NOT_IN when filtering on a list of values.' + ) + + self._tql.add_filter('idParent', operator, id_parent, TqlType.INTEGER) def last_modified(self, operator: Enum, last_modified: Arrow | datetime | int | str): """Filter Last Modified based on **lastModified** keyword. Args: operator: The operator enum for the filter. - last_modified: The date the group was last modified. + last_modified: The date the requirement was last modified. """ last_modified = self.util.any_to_datetime(last_modified).strftime('%Y-%m-%d %H:%M:%S') self._tql.add_filter('lastModified', operator, last_modified, TqlType.STRING) - def last_seen(self, operator: Enum, last_seen: Arrow | datetime | int | str): - """Filter Last Seen based on **lastSeen** keyword. - - Args: - operator: The operator enum for the filter. - last_seen: The date and time that the group was last seen. - """ - last_seen = self.util.any_to_datetime(last_seen).strftime('%Y-%m-%d %H:%M:%S') - self._tql.add_filter('lastSeen', operator, last_seen, TqlType.STRING) - def owner(self, operator: Enum, owner: int | list): """Filter Owner ID based on **owner** keyword. Args: operator: The operator enum for the filter. - owner: The Owner ID for the group. + owner: The Owner ID for the intel requirement. """ if isinstance(owner, list) and operator not in self.list_types: raise RuntimeError( @@ -540,7 +193,7 @@ def owner_name(self, operator: Enum, owner_name: list | str): Args: operator: The operator enum for the filter. - owner_name: The owner name for the group. + owner_name: The owner name for the intel requirement. """ if isinstance(owner_name, list) and operator not in self.list_types: raise RuntimeError( @@ -550,109 +203,35 @@ def owner_name(self, operator: Enum, owner_name: list | str): self._tql.add_filter('ownerName', operator, owner_name, TqlType.STRING) - def parent_group(self, operator: Enum, parent_group: int | list): - """Filter parentGroup based on **parentGroup** keyword. - - Args: - operator: The operator enum for the filter. - parent_group: No description provided. - """ - if isinstance(parent_group, list) and operator not in self.list_types: - raise RuntimeError( - 'Operator must be CONTAINS, NOT_CONTAINS, IN' - 'or NOT_IN when filtering on a list of values.' - ) - - self._tql.add_filter('parentGroup', operator, parent_group, TqlType.INTEGER) - - def security_label(self, operator: Enum, security_label: list | str): - """Filter Security Label based on **securityLabel** keyword. - - Args: - operator: The operator enum for the filter. - security_label: The name of a security label applied to the group. - """ - if isinstance(security_label, list) and operator not in self.list_types: - raise RuntimeError( - 'Operator must be CONTAINS, NOT_CONTAINS, IN' - 'or NOT_IN when filtering on a list of values.' - ) - - self._tql.add_filter('securityLabel', operator, security_label, TqlType.STRING) - - def signature_date_added( - self, operator: Enum, signature_date_added: Arrow | datetime | int | str - ): - """Filter Date Added (Signature) based on **signatureDateAdded** keyword. - - Args: - operator: The operator enum for the filter. - signature_date_added: The date the signature was added. - """ - signature_date_added = self.util.any_to_datetime(signature_date_added).strftime( - '%Y-%m-%d %H:%M:%S' - ) - self._tql.add_filter('signatureDateAdded', operator, signature_date_added, TqlType.STRING) - - def signature_filename(self, operator: Enum, signature_filename: list | str): - """Filter Filename (Signature) based on **signatureFilename** keyword. - - Args: - operator: The operator enum for the filter. - signature_filename: The file name of the signature. - """ - if isinstance(signature_filename, list) and operator not in self.list_types: - raise RuntimeError( - 'Operator must be CONTAINS, NOT_CONTAINS, IN' - 'or NOT_IN when filtering on a list of values.' - ) - - self._tql.add_filter('signatureFilename', operator, signature_filename, TqlType.STRING) - - def signature_type(self, operator: Enum, signature_type: list | str): - """Filter Type (Signature) based on **signatureType** keyword. + def requirement(self, operator: Enum, requirement: list | str): + """Filter Requirement based on **requirement** keyword. Args: operator: The operator enum for the filter. - signature_type: The type of signature. + requirement: The Requirement of the intel requirement. """ - if isinstance(signature_type, list) and operator not in self.list_types: + if isinstance(requirement, list) and operator not in self.list_types: raise RuntimeError( 'Operator must be CONTAINS, NOT_CONTAINS, IN' 'or NOT_IN when filtering on a list of values.' ) - self._tql.add_filter('signatureType', operator, signature_type, TqlType.STRING) + self._tql.add_filter('requirement', operator, requirement, TqlType.STRING) - def status(self, operator: Enum, status: list | str): - """Filter Status based on **status** keyword. + def subtype(self, operator: Enum, subtype: list | str): + """Filter Subtype based on **subtype** keyword. Args: operator: The operator enum for the filter. - status: Status of the group. + subtype: The subtype of the intel requirement. """ - if isinstance(status, list) and operator not in self.list_types: + if isinstance(subtype, list) and operator not in self.list_types: raise RuntimeError( 'Operator must be CONTAINS, NOT_CONTAINS, IN' 'or NOT_IN when filtering on a list of values.' ) - self._tql.add_filter('status', operator, status, TqlType.STRING) - - def summary(self, operator: Enum, summary: list | str): - """Filter Summary based on **summary** keyword. - - Args: - operator: The operator enum for the filter. - summary: The summary (name) of the group. - """ - if isinstance(summary, list) and operator not in self.list_types: - raise RuntimeError( - 'Operator must be CONTAINS, NOT_CONTAINS, IN' - 'or NOT_IN when filtering on a list of values.' - ) - - self._tql.add_filter('summary', operator, summary, TqlType.STRING) + self._tql.add_filter('subtype', operator, subtype, TqlType.STRING) def tag(self, operator: Enum, tag: list | str): """Filter Tag based on **tag** keyword. @@ -669,222 +248,17 @@ def tag(self, operator: Enum, tag: list | str): self._tql.add_filter('tag', operator, tag, TqlType.STRING) - def tag_owner(self, operator: Enum, tag_owner: int | list): - """Filter Tag Owner ID based on **tagOwner** keyword. - - Args: - operator: The operator enum for the filter. - tag_owner: The ID of the owner of a tag. - """ - if isinstance(tag_owner, list) and operator not in self.list_types: - raise RuntimeError( - 'Operator must be CONTAINS, NOT_CONTAINS, IN' - 'or NOT_IN when filtering on a list of values.' - ) - - self._tql.add_filter('tagOwner', operator, tag_owner, TqlType.INTEGER) - - def tag_owner_name(self, operator: Enum, tag_owner_name: list | str): - """Filter Tag Owner Name based on **tagOwnerName** keyword. - - Args: - operator: The operator enum for the filter. - tag_owner_name: The name of the owner of a tag. - """ - if isinstance(tag_owner_name, list) and operator not in self.list_types: - raise RuntimeError( - 'Operator must be CONTAINS, NOT_CONTAINS, IN' - 'or NOT_IN when filtering on a list of values.' - ) - - self._tql.add_filter('tagOwnerName', operator, tag_owner_name, TqlType.STRING) - - def task_assignee(self, operator: Enum, task_assignee: list | str): - """Filter Assignee (Task) based on **taskAssignee** keyword. - - Args: - operator: The operator enum for the filter. - task_assignee: The assignee of the task. - """ - if isinstance(task_assignee, list) and operator not in self.list_types: - raise RuntimeError( - 'Operator must be CONTAINS, NOT_CONTAINS, IN' - 'or NOT_IN when filtering on a list of values.' - ) - - self._tql.add_filter('taskAssignee', operator, task_assignee, TqlType.STRING) - - def task_assignee_pseudo(self, operator: Enum, task_assignee_pseudo: list | str): - """Filter Assignee Pseudonym (Task) based on **taskAssigneePseudo** keyword. - - Args: - operator: The operator enum for the filter. - task_assignee_pseudo: The pseudonym of the assignee of the task. - """ - if isinstance(task_assignee_pseudo, list) and operator not in self.list_types: - raise RuntimeError( - 'Operator must be CONTAINS, NOT_CONTAINS, IN' - 'or NOT_IN when filtering on a list of values.' - ) - - self._tql.add_filter('taskAssigneePseudo', operator, task_assignee_pseudo, TqlType.STRING) - - def task_date_added(self, operator: Enum, task_date_added: Arrow | datetime | int | str): - """Filter Date Added (Task) based on **taskDateAdded** keyword. - - Args: - operator: The operator enum for the filter. - task_date_added: The date the task was added. - """ - task_date_added = self.util.any_to_datetime(task_date_added).strftime('%Y-%m-%d %H:%M:%S') - self._tql.add_filter('taskDateAdded', operator, task_date_added, TqlType.STRING) - - def task_due_date(self, operator: Enum, task_due_date: Arrow | datetime | int | str): - """Filter Due Date (Task) based on **taskDueDate** keyword. - - Args: - operator: The operator enum for the filter. - task_due_date: The due date of a task. - """ - task_due_date = self.util.any_to_datetime(task_due_date).strftime('%Y-%m-%d %H:%M:%S') - self._tql.add_filter('taskDueDate', operator, task_due_date, TqlType.STRING) - - def task_escalated(self, operator: Enum, task_escalated: bool): - """Filter Escalated (Task) based on **taskEscalated** keyword. - - Args: - operator: The operator enum for the filter. - task_escalated: A flag indicating if a task has been escalated. - """ - self._tql.add_filter('taskEscalated', operator, task_escalated, TqlType.BOOLEAN) - - def task_escalation_date( - self, operator: Enum, task_escalation_date: Arrow | datetime | int | str - ): - """Filter Escalation Date (Task) based on **taskEscalationDate** keyword. - - Args: - operator: The operator enum for the filter. - task_escalation_date: The escalation date of a task. - """ - task_escalation_date = self.util.any_to_datetime(task_escalation_date).strftime( - '%Y-%m-%d %H:%M:%S' - ) - self._tql.add_filter('taskEscalationDate', operator, task_escalation_date, TqlType.STRING) - - def task_last_modified(self, operator: Enum, task_last_modified: Arrow | datetime | int | str): - """Filter Last Modified based on **taskLastModified** keyword. - - Args: - operator: The operator enum for the filter. - task_last_modified: The date the group was last modified. - """ - task_last_modified = self.util.any_to_datetime(task_last_modified).strftime( - '%Y-%m-%d %H:%M:%S' - ) - self._tql.add_filter('taskLastModified', operator, task_last_modified, TqlType.STRING) - - def task_overdue(self, operator: Enum, task_overdue: bool): - """Filter Overdue (Task) based on **taskOverdue** keyword. - - Args: - operator: The operator enum for the filter. - task_overdue: A flag indicating if a task has become overdue. - """ - self._tql.add_filter('taskOverdue', operator, task_overdue, TqlType.BOOLEAN) - - def task_reminded(self, operator: Enum, task_reminded: bool): - """Filter Reminded (Task) based on **taskReminded** keyword. - - Args: - operator: The operator enum for the filter. - task_reminded: A flag indicating if a task has been reminded. - """ - self._tql.add_filter('taskReminded', operator, task_reminded, TqlType.BOOLEAN) - - def task_reminder_date(self, operator: Enum, task_reminder_date: Arrow | datetime | int | str): - """Filter Reminder Date (Task) based on **taskReminderDate** keyword. - - Args: - operator: The operator enum for the filter. - task_reminder_date: The reminder date of a task. - """ - task_reminder_date = self.util.any_to_datetime(task_reminder_date).strftime( - '%Y-%m-%d %H:%M:%S' - ) - self._tql.add_filter('taskReminderDate', operator, task_reminder_date, TqlType.STRING) - - def task_status(self, operator: Enum, task_status: list | str): - """Filter Status (Task) based on **taskStatus** keyword. - - Args: - operator: The operator enum for the filter. - task_status: The status of the task. - """ - if isinstance(task_status, list) and operator not in self.list_types: - raise RuntimeError( - 'Operator must be CONTAINS, NOT_CONTAINS, IN' - 'or NOT_IN when filtering on a list of values.' - ) - - self._tql.add_filter('taskStatus', operator, task_status, TqlType.STRING) - - def type(self, operator: Enum, type: int | list): # pylint: disable=redefined-builtin - """Filter Type based on **type** keyword. - - Args: - operator: The operator enum for the filter. - type: The ID of the group type. - """ - if isinstance(type, list) and operator not in self.list_types: - raise RuntimeError( - 'Operator must be CONTAINS, NOT_CONTAINS, IN' - 'or NOT_IN when filtering on a list of values.' - ) - - self._tql.add_filter('type', operator, type, TqlType.INTEGER) - - def type_name(self, operator: Enum, type_name: list | str): - """Filter Type Name based on **typeName** keyword. - - Args: - operator: The operator enum for the filter. - type_name: The name of the group type. - """ - if isinstance(type_name, list) and operator not in self.list_types: - raise RuntimeError( - 'Operator must be CONTAINS, NOT_CONTAINS, IN' - 'or NOT_IN when filtering on a list of values.' - ) - - self._tql.add_filter('typeName', operator, type_name, TqlType.STRING) - - def upvote_count(self, operator: Enum, upvote_count: int | list): - """Filter Upvote Count based on **upvoteCount** keyword. - - Args: - operator: The operator enum for the filter. - upvote_count: The number of upvotes the group has received. - """ - if isinstance(upvote_count, list) and operator not in self.list_types: - raise RuntimeError( - 'Operator must be CONTAINS, NOT_CONTAINS, IN' - 'or NOT_IN when filtering on a list of values.' - ) - - self._tql.add_filter('upvoteCount', operator, upvote_count, TqlType.INTEGER) - - def victim_asset(self, operator: Enum, victim_asset: list | str): - """Filter victimAsset based on **victimAsset** keyword. + def unique_id(self, operator: Enum, unique_id: list | str): + """Filter Unique ID based on **uniqueId** keyword. Args: operator: The operator enum for the filter. - victim_asset: No description provided. + unique_id: The unique ID of the intel requirement. """ - if isinstance(victim_asset, list) and operator not in self.list_types: + if isinstance(unique_id, list) and operator not in self.list_types: raise RuntimeError( 'Operator must be CONTAINS, NOT_CONTAINS, IN' 'or NOT_IN when filtering on a list of values.' ) - self._tql.add_filter('victimAsset', operator, victim_asset, TqlType.STRING) + self._tql.add_filter('uniqueId', operator, unique_id, TqlType.STRING) diff --git a/tests/api/tc/v3/intel_requirements/test_intel_requirements_interface.py b/tests/api/tc/v3/intel_requirements/test_intel_requirements_interface.py index 8088358a9..2b42f1d22 100644 --- a/tests/api/tc/v3/intel_requirements/test_intel_requirements_interface.py +++ b/tests/api/tc/v3/intel_requirements/test_intel_requirements_interface.py @@ -54,12 +54,13 @@ def test_create_and_retrieve(self): ir.update() # ESUP-2521 - # assert ir.model.description == 'testing2' + assert ir.model.description == 'testing2' def test_associations(self): """Test associations.""" ir = self.v3_helper.create_ir() - # THIS DOES NOT WORK BECAUSE IT SENDS THE WRONG FIELDS VIA PUT! + # ESUP-2533 : Fields returned on get/create/put are not consistent + # indicator = self.v3_helper.create_indicator() # group = self.v3_helper.create_group() # case = self.v3_helper.create_case() @@ -78,10 +79,19 @@ def test_associations(self): # ir.stage_associated_case(case) # ir.stage_associated_artifact(artifact) # ir.update() + ir = self.v3.intel_requirement(id=ir.model.id) indicator = self.v3_helper.create_indicator() group = self.v3_helper.create_group() case = self.v3_helper.create_case() + victim = self.v3_helper.create_victim() + asset = self.tcex.api.tc.v3.victim_asset( + type='EmailAddress', address='malware@example.com', address_type='Trojan' + ) + victim.stage_victim_asset(asset) + + victim.update(params={'owner': 'TCI', 'fields': ['_all_']}) + asset.model.id = victim.model.assets.data[0].id artifact = self.v3.artifact( **{ 'case_id': case.model.id, @@ -96,25 +106,17 @@ def test_associations(self): ir.stage_associated_group(group) ir.stage_associated_case(case) ir.stage_associated_artifact(artifact) + ir.stage_associated_victim_asset(asset) ir.update() ir = self.v3.intel_requirement(id=ir.model.id) - ir.get( - params={ - 'fields': [ - 'associatedArtifacts', - 'associatedCases', - 'associatedGroups', - 'associatedIndicators', - 'associatedVictimAssets', - ] - } - ) + ir.get(params={'fields': ['_all_']}) assert ir.model.associated_indicators.data[0].id == indicator.model.id assert ir.model.associated_groups.data[0].id == group.model.id assert ir.model.associated_cases.data[0].id == case.model.id assert ir.model.associated_artifacts.data[0].id == artifact.model.id + assert ir.model.associated_victim_assets.data[0].id == asset.model.id for associated_indicator in ir.associated_indicators: assert indicator.model.id == associated_indicator.model.id @@ -122,14 +124,16 @@ def test_associations(self): for associated_group in ir.associated_groups: assert group.model.id == associated_group.model.id - # Associations not bi-directional + # ESUP-2532 : Associations not bi-directional # for associated_case in ir.associated_cases: # assert case.model.id == associated_case.model.id - # Associations not bi-directional # for associated_artifact in ir.associated_artifacts: # assert artifact.model.id == associated_artifact.model.id + # for associated_asset in ir.associated_victim_assets: + # assert associated_asset.model.id == asset.model.id + def test_intel_requirement_get_many(self, count=10): """Test Intel Requirement Get Many.""" ids = set() diff --git a/tests/api/tc/v3/v3_helpers.py b/tests/api/tc/v3/v3_helpers.py index fafb0ce4d..cf4d9ad1c 100644 --- a/tests/api/tc/v3/v3_helpers.py +++ b/tests/api/tc/v3/v3_helpers.py @@ -392,16 +392,14 @@ def create_ir(self, **kwargs): # create object ir = self.v3.intel_requirement(**ir_data) - ir.create() - # Ticket ESUP-2519 - Not being able to create IR with tags + ir.stage_tag(self.v3.tag(name=test_case_name)) - ir = self.v3.intel_requirement(id=ir.model.id) tags = self._to_list(kwargs.get('tags', [])) - ir.stage_tag(self.v3.tag(name=test_case_name)) for tag in tags: ir.stage_tag(self.v3.tag(**tag)) - ir.update() + + ir.create() # store case id for cleanup self._v3_objects.append(ir) From 48a4b8637bcf31326c7e135ecb2aba720cde9fb0 Mon Sep 17 00:00:00 2001 From: Ben Purdy Date: Thu, 27 Jun 2024 10:51:05 -0400 Subject: [PATCH 2/5] APP-4522 --- tcex/api/tc/v3/_gen/_gen_abc.py | 1 + tcex/api/tc/v3/_gen/_gen_object_abc.py | 71 +++++++++++++- tcex/api/tc/v3/_gen/model/_property_model.py | 2 +- .../intel_requirements/intel_requirement.py | 51 +++++----- .../intel_requirement_model.py | 4 +- .../keyword_sections/__init__.py | 0 .../keyword_section_model.py | 30 ++---- tcex/api/tc/v3/v3_model_abc.py | 10 +- .../test_intel_requirements_interface.py | 93 +++++++------------ 9 files changed, 152 insertions(+), 110 deletions(-) create mode 100644 tcex/api/tc/v3/intel_requirements/keyword_sections/__init__.py rename tcex/api/tc/v3/intel_requirements/{ => keyword_sections}/keyword_section_model.py (69%) diff --git a/tcex/api/tc/v3/_gen/_gen_abc.py b/tcex/api/tc/v3/_gen/_gen_abc.py index 1a3587f8a..7749893d6 100644 --- a/tcex/api/tc/v3/_gen/_gen_abc.py +++ b/tcex/api/tc/v3/_gen/_gen_abc.py @@ -492,6 +492,7 @@ def tap(self, type_: str): 'categories', 'results', 'subtypes', + 'keyword_sections' ]: return 'tcex.api.tc.v3.intel_requirements' diff --git a/tcex/api/tc/v3/_gen/_gen_object_abc.py b/tcex/api/tc/v3/_gen/_gen_object_abc.py index 47d3ac886..401a86347 100644 --- a/tcex/api/tc/v3/_gen/_gen_object_abc.py +++ b/tcex/api/tc/v3/_gen/_gen_object_abc.py @@ -445,6 +445,67 @@ def as_entity(self) -> dict: ) return '\n'.join(as_entity_property_method) + def _gen_code_object_replace_type_method(self, type_: str, model_type: str | None = None) -> str: + """Return the method code. + + def replace_artifact(self, **kwargs): + '''Replace an Artifact on the object. (mark as staged) + """ + type_ = self.util.snake_string(type_) + model_type = self.util.snake_string(model_type or type_) + model_reference = model_type + + # get model from map and update requirements + model_import_data = self._module_import_data(type_) + model_class = model_import_data.get('model_class') + self.requirements['first-party'].append( + f'''from {model_import_data.get('model_module')} ''' + f'''import {model_import_data.get('model_class')}''' + ) + stage_method = [ + ( + f'''{self.i1}def replace_{model_type.singular()}(self, ''' + f'''data: dict | list | ObjectABC | {model_class}''' + f'''):''' + ), + f'''{self.i2}"""Replace {type_.singular()} on the object."""''', + f'''{self.i2}if isinstance(data, ObjectABC):''', + f'''{self.i3}transformed_data = data''', + ( + f'''{self.i2}elif isinstance(data, list) and ''' + f'''all(isinstance(item, {model_class}) for item in data):''' + ), + f'''{self.i3}transformed_data = data''', + ( + f'''{self.i2}elif isinstance(data, list) and ''' + f'''all(isinstance(item, ObjectABC) for item in data):''' + ), + f'''{self.i3}transformed_data = data''', + ( + f'''{self.i2}elif isinstance(data, list) and ''' + f'''all(isinstance(item, dict) for item in data):''' + ), + f'''{self.i3}transformed_data = [{model_class}(**d) for d in data]''', + f'''{self.i2}elif isinstance(data, dict):''', + f'''{self.i3}transformed_data = {model_class}(**data)''', + f'''{self.i2}elif isinstance(data, {model_class}):''', + f'''{self.i3}transformed_data = data''', + f'''{self.i2}else:''' + f'''{self.i3}raise ValueError("Invalid data to replace_{model_type.singular()}")''', + '', + '', + f'''{self.i2}if isinstance(transformed_data, list):''', + f'''{self.i3}for item in transformed_data:''', + f'''{self.i4}item._staged = True''', + f'''{self.i2}elif isinstance(transformed_data, {model_class}):''', + f'''{self.i3}transformed_data._staged = True''', + f'''{self.i2}self.model.{model_reference} = transformed_data # type: ignore''', + '', + '', + ] + + return '\n'.join(stage_method) + def _gen_code_object_stage_type_method(self, type_: str, model_type: str | None = None) -> str: """Return the method code. @@ -978,10 +1039,12 @@ def filter ... _code += self._gen_code_object_stage_type_method('victim_assets') # generate stage_associated_group method - if 'associatedCases' in add_properties: + # ESUP-2532 - Associations are not Bi-Directional for IRs + if 'associatedCases' in add_properties and self.type_ != 'intel_requirements': _code += self._gen_code_object_stage_type_method('cases', 'associated_cases') - if 'associatedArtifacts' in add_properties: + # ESUP-2532 - Associations are not Bi-Directional for IRs + if 'associatedArtifacts' in add_properties and self.type_ != 'intel_requirements': _code += self._gen_code_object_stage_type_method('artifacts', 'associated_artifacts') # victims have associatedGroups but groups must be @@ -1015,6 +1078,10 @@ def filter ... if 'fileOccurrences' in add_properties: _code += self._gen_code_object_stage_type_method('file_occurrences') + # generate stage_keyword_section method + if 'keywordSections' in add_properties and self.type_ == 'intel_requirements': + _code += self._gen_code_object_replace_type_method('keyword_sections') + # generate stage_note method if 'notes' in add_properties: _code += self._gen_code_object_stage_type_method('notes') diff --git a/tcex/api/tc/v3/_gen/model/_property_model.py b/tcex/api/tc/v3/_gen/model/_property_model.py index ab8d4aaea..e2044aeed 100644 --- a/tcex/api/tc/v3/_gen/model/_property_model.py +++ b/tcex/api/tc/v3/_gen/model/_property_model.py @@ -307,7 +307,7 @@ def __process_special_types(cls, pm: 'PropertyModel', extra: dict[str, str]): } ) elif pm.type == 'KeywordSection': - bi += 'intel_requirements.keyword_section_model' + bi += 'intel_requirements.keyword_sections.keyword_section_model' extra.update( { 'import_data': f'{bi} import KeywordSectionModel', diff --git a/tcex/api/tc/v3/intel_requirements/intel_requirement.py b/tcex/api/tc/v3/intel_requirements/intel_requirement.py index ffaa87f4c..576c7e225 100644 --- a/tcex/api/tc/v3/intel_requirements/intel_requirement.py +++ b/tcex/api/tc/v3/intel_requirements/intel_requirement.py @@ -6,8 +6,6 @@ # first-party from tcex.api.tc.v3.api_endpoints import ApiEndpoints -from tcex.api.tc.v3.artifacts.artifact_model import ArtifactModel -from tcex.api.tc.v3.cases.case_model import CaseModel from tcex.api.tc.v3.groups.group_model import GroupModel from tcex.api.tc.v3.indicators.indicator_model import IndicatorModel from tcex.api.tc.v3.intel_requirements.intel_requirement_filter import IntelRequirementFilter @@ -15,6 +13,9 @@ IntelRequirementModel, IntelRequirementsModel, ) +from tcex.api.tc.v3.intel_requirements.keyword_sections.keyword_section_model import ( + KeywordSectionModel, +) from tcex.api.tc.v3.object_abc import ObjectABC from tcex.api.tc.v3.object_collection_abc import ObjectCollectionABC from tcex.api.tc.v3.tags.tag_model import TagModel @@ -140,30 +141,6 @@ def tags(self) -> Generator['Tag', None, None]: yield from self._iterate_over_sublist(Tags) # type: ignore - def stage_associated_case(self, data: dict | ObjectABC | CaseModel): - """Stage case on the object.""" - if isinstance(data, ObjectABC): - data = data.model # type: ignore - elif isinstance(data, dict): - data = CaseModel(**data) - - if not isinstance(data, CaseModel): - raise RuntimeError('Invalid type passed in to stage_associated_case') - data._staged = True - self.model.associated_cases.data.append(data) # type: ignore - - def stage_associated_artifact(self, data: dict | ObjectABC | ArtifactModel): - """Stage artifact on the object.""" - if isinstance(data, ObjectABC): - data = data.model # type: ignore - elif isinstance(data, dict): - data = ArtifactModel(**data) - - if not isinstance(data, ArtifactModel): - raise RuntimeError('Invalid type passed in to stage_associated_artifact') - data._staged = True - self.model.associated_artifacts.data.append(data) # type: ignore - def stage_associated_group(self, data: dict | ObjectABC | GroupModel): """Stage group on the object.""" if isinstance(data, ObjectABC): @@ -200,6 +177,28 @@ def stage_associated_indicator(self, data: dict | ObjectABC | IndicatorModel): data._staged = True self.model.associated_indicators.data.append(data) # type: ignore + def replace_keyword_section(self, data: dict | list | ObjectABC | KeywordSectionModel): + """Replace keyword_section on the object.""" + if not isinstance(data, list): + data = [data] + + if ( + isinstance(data, list) and + all(isinstance(item, (ObjectABC, KeywordSectionModel)) for item in data) + ): + transformed_data = data + elif isinstance(data, list) and all(isinstance(item, dict) for item in data): + transformed_data = [KeywordSectionModel(**d) for d in data] + elif isinstance(data, dict): + transformed_data = KeywordSectionModel(**data) + else: + raise ValueError("Invalid data to replace_keyword_section") + + if isinstance(transformed_data, list): + for item in transformed_data: + item._staged = True + self.model.keyword_sections = transformed_data # type: ignore + def stage_tag(self, data: dict | ObjectABC | TagModel): """Stage tag on the object.""" if isinstance(data, ObjectABC): diff --git a/tcex/api/tc/v3/intel_requirements/intel_requirement_model.py b/tcex/api/tc/v3/intel_requirements/intel_requirement_model.py index 857327c54..d2f0e3566 100644 --- a/tcex/api/tc/v3/intel_requirements/intel_requirement_model.py +++ b/tcex/api/tc/v3/intel_requirements/intel_requirement_model.py @@ -277,7 +277,9 @@ class IntelRequirementsModel( from tcex.api.tc.v3.groups.group_model import GroupsModel from tcex.api.tc.v3.indicators.indicator_model import IndicatorsModel from tcex.api.tc.v3.intel_requirements.intel_req_type_model import IntelReqTypeModel -from tcex.api.tc.v3.intel_requirements.keyword_section_model import KeywordSectionModel +from tcex.api.tc.v3.intel_requirements.keyword_sections.keyword_section_model import ( + KeywordSectionModel, +) from tcex.api.tc.v3.security.users.user_model import UserModel from tcex.api.tc.v3.tags.tag_model import TagsModel from tcex.api.tc.v3.victim_assets.victim_asset_model import VictimAssetsModel diff --git a/tcex/api/tc/v3/intel_requirements/keyword_sections/__init__.py b/tcex/api/tc/v3/intel_requirements/keyword_sections/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tcex/api/tc/v3/intel_requirements/keyword_section_model.py b/tcex/api/tc/v3/intel_requirements/keyword_sections/keyword_section_model.py similarity index 69% rename from tcex/api/tc/v3/intel_requirements/keyword_section_model.py rename to tcex/api/tc/v3/intel_requirements/keyword_sections/keyword_section_model.py index 3a99fe87a..f8c5f52e2 100644 --- a/tcex/api/tc/v3/intel_requirements/keyword_section_model.py +++ b/tcex/api/tc/v3/intel_requirements/keyword_sections/keyword_section_model.py @@ -2,30 +2,13 @@ # pylint: disable=no-member,no-self-argument,wrong-import-position # third-party -from pydantic import Field +from pydantic import Field, PrivateAttr # first-party from tcex.api.tc.v3.v3_model_abc import V3ModelABC from tcex.util import Util -class KeywordModel( - V3ModelABC, - title='Keyword Model', - alias_generator=Util().snake_to_camel, - validate_assignment=True, -): - """Model Definition""" - - value: str | None = Field( - None, - description='The value of the keyword.', - methods=['POST', 'PUT'], - read_only=False, - title='value', - ) - - class KeywordSectionModel( V3ModelABC, title='Keyword Section Model', @@ -44,6 +27,11 @@ class KeywordSectionModel( } """ + _associated_type = PrivateAttr(False) + _cm_type = PrivateAttr(False) + _shared_type = PrivateAttr(False) + _staged = PrivateAttr(False) + section_number: int | None = Field( None, description='The section number of the keyword section.', @@ -51,15 +39,15 @@ class KeywordSectionModel( read_only=False, title='sectionNumber', ) - compareValue: str | None = Field( + compare_value: str | None = Field( None, description='The compare value for the keyword section.', methods=['POST', 'PUT'], read_only=False, title='compareValue', ) - keywords: list[KeywordModel] | None = Field( - None, + keywords: list[dict] | None = Field( + [], description='A list of keywords for the keyword section.', methods=['POST', 'PUT'], read_only=False, diff --git a/tcex/api/tc/v3/v3_model_abc.py b/tcex/api/tc/v3/v3_model_abc.py index 675d52d63..a1c088467 100644 --- a/tcex/api/tc/v3/v3_model_abc.py +++ b/tcex/api/tc/v3/v3_model_abc.py @@ -51,8 +51,14 @@ def __init__(self, **kwargs): # when "id" field is present it indicates that the data was returned from the # API, otherwise the assumption is that the developer staged the data during - # instantiation of the object. - if kwargs and hasattr(self, 'id') and self.id is None: # pylint: disable=no-member + # instantiation of the object. Keyword Section Model is the only exception to + # this rule. + # pylint: disable=no-member + if ( + kwargs and hasattr(self, 'id') and + self.id is None and + self.__config__.title != 'Keyword Section Model' + ): self._staged = True # store initial dict hash of model diff --git a/tests/api/tc/v3/intel_requirements/test_intel_requirements_interface.py b/tests/api/tc/v3/intel_requirements/test_intel_requirements_interface.py index 2b42f1d22..34b808e07 100644 --- a/tests/api/tc/v3/intel_requirements/test_intel_requirements_interface.py +++ b/tests/api/tc/v3/intel_requirements/test_intel_requirements_interface.py @@ -5,6 +5,8 @@ # third-party import pytest +from tcex.api.tc.v3.intel_requirements.keyword_sections.keyword_section_model import \ + KeywordSectionModel # first-party from tcex.api.tc.v3.tql.tql_operator import TqlOperator from tests.api.tc.v3.v3_helpers import TestV3, V3Helper @@ -59,64 +61,23 @@ def test_create_and_retrieve(self): def test_associations(self): """Test associations.""" ir = self.v3_helper.create_ir() - # ESUP-2533 : Fields returned on get/create/put are not consistent - - # indicator = self.v3_helper.create_indicator() - # group = self.v3_helper.create_group() - # case = self.v3_helper.create_case() - # artifact = self.v3.artifact( - # **{ - # 'case_id': case.model.id, - # 'intel_type': 'indicator-ASN', - # 'summary': 'asn111', - # 'type': 'ASN', - # } - # ) - # artifact.create() - # - # ir.stage_associated_indicator(indicator) - # ir.stage_associated_group(group) - # ir.stage_associated_case(case) - # ir.stage_associated_artifact(artifact) - # ir.update() - - ir = self.v3.intel_requirement(id=ir.model.id) indicator = self.v3_helper.create_indicator() group = self.v3_helper.create_group() - case = self.v3_helper.create_case() - victim = self.v3_helper.create_victim() - asset = self.tcex.api.tc.v3.victim_asset( - type='EmailAddress', address='malware@example.com', address_type='Trojan' - ) - victim.stage_victim_asset(asset) - - victim.update(params={'owner': 'TCI', 'fields': ['_all_']}) - asset.model.id = victim.model.assets.data[0].id - artifact = self.v3.artifact( - **{ - 'case_id': case.model.id, - 'intel_type': 'indicator-ASN', - 'summary': 'asn111', - 'type': 'ASN', - } - ) - artifact.create() - ir.stage_associated_indicator(indicator) ir.stage_associated_group(group) - ir.stage_associated_case(case) - ir.stage_associated_artifact(artifact) - ir.stage_associated_victim_asset(asset) + + # ESUP-2532 : Associations not bi-directional + # ir.stage_associated_case(case) + # ir.stage_associated_artifact(artifact) + # ir.stage_associated_victim_asset(asset) + ir.update() - ir = self.v3.intel_requirement(id=ir.model.id) + # ir = self.v3.intel_requirement(id=ir.model.id) ir.get(params={'fields': ['_all_']}) assert ir.model.associated_indicators.data[0].id == indicator.model.id assert ir.model.associated_groups.data[0].id == group.model.id - assert ir.model.associated_cases.data[0].id == case.model.id - assert ir.model.associated_artifacts.data[0].id == artifact.model.id - assert ir.model.associated_victim_assets.data[0].id == asset.model.id for associated_indicator in ir.associated_indicators: assert indicator.model.id == associated_indicator.model.id @@ -124,15 +85,33 @@ def test_associations(self): for associated_group in ir.associated_groups: assert group.model.id == associated_group.model.id - # ESUP-2532 : Associations not bi-directional - # for associated_case in ir.associated_cases: - # assert case.model.id == associated_case.model.id - - # for associated_artifact in ir.associated_artifacts: - # assert artifact.model.id == associated_artifact.model.id - - # for associated_asset in ir.associated_victim_assets: - # assert associated_asset.model.id == asset.model.id + def test_keywords_section(self): + """Test keywords section.""" + ir = self.v3_helper.create_ir() + keyword_sections = [ + KeywordSectionModel( + section_number=0, + compare_value='includes', + keywords=[{'value': 'keyword1'}] + ), + KeywordSectionModel( + section_number=1, + compare_value='includes', + keywords=[{'value': 'keyword2'}] + ) + ] + ir.replace_keyword_section(keyword_sections) + ir.update() + ir = self.v3.intel_requirement(id=ir.model.id) + ir.get() + + assert len(ir.model.keyword_sections) == 2 + # assert ir.model.keyword_sections[0].section_number == 0 # This doesnt come back + assert ir.model.keyword_sections[0].compare_value == 'includes' + assert ir.model.keyword_sections[0].keywords[0].get('value') == 'keyword1' + # assert ir.model.keyword_sections[1].section_number == 1 # This doesnt come back + assert ir.model.keyword_sections[1].compare_value == 'includes' + assert ir.model.keyword_sections[1].keywords[0].get('value') == 'keyword2' def test_intel_requirement_get_many(self, count=10): """Test Intel Requirement Get Many.""" From 821e83d66c5097eb995c2d436d63a2763d49cfb3 Mon Sep 17 00:00:00 2001 From: Ben Purdy Date: Thu, 27 Jun 2024 10:53:23 -0400 Subject: [PATCH 3/5] APP-4522 --- tcex/api/tc/v3/_gen/_gen_abc.py | 7 +------ tcex/api/tc/v3/_gen/_gen_object_abc.py | 4 +++- .../tc/v3/intel_requirements/intel_requirement.py | 7 +++---- tcex/api/tc/v3/v3_model_abc.py | 7 ++++--- .../test_intel_requirements_interface.py | 15 ++++++--------- 5 files changed, 17 insertions(+), 23 deletions(-) diff --git a/tcex/api/tc/v3/_gen/_gen_abc.py b/tcex/api/tc/v3/_gen/_gen_abc.py index 7749893d6..a1bf80ab5 100644 --- a/tcex/api/tc/v3/_gen/_gen_abc.py +++ b/tcex/api/tc/v3/_gen/_gen_abc.py @@ -488,12 +488,7 @@ def tap(self, type_: str): ]: return 'tcex.api.tc.v3.security' - if type_.plural().lower() in [ - 'categories', - 'results', - 'subtypes', - 'keyword_sections' - ]: + if type_.plural().lower() in ['categories', 'results', 'subtypes', 'keyword_sections']: return 'tcex.api.tc.v3.intel_requirements' return 'tcex.api.tc.v3' diff --git a/tcex/api/tc/v3/_gen/_gen_object_abc.py b/tcex/api/tc/v3/_gen/_gen_object_abc.py index 401a86347..161727f9b 100644 --- a/tcex/api/tc/v3/_gen/_gen_object_abc.py +++ b/tcex/api/tc/v3/_gen/_gen_object_abc.py @@ -445,7 +445,9 @@ def as_entity(self) -> dict: ) return '\n'.join(as_entity_property_method) - def _gen_code_object_replace_type_method(self, type_: str, model_type: str | None = None) -> str: + def _gen_code_object_replace_type_method( + self, type_: str, model_type: str | None = None + ) -> str: """Return the method code. def replace_artifact(self, **kwargs): diff --git a/tcex/api/tc/v3/intel_requirements/intel_requirement.py b/tcex/api/tc/v3/intel_requirements/intel_requirement.py index 576c7e225..4b2e5e0fe 100644 --- a/tcex/api/tc/v3/intel_requirements/intel_requirement.py +++ b/tcex/api/tc/v3/intel_requirements/intel_requirement.py @@ -182,9 +182,8 @@ def replace_keyword_section(self, data: dict | list | ObjectABC | KeywordSection if not isinstance(data, list): data = [data] - if ( - isinstance(data, list) and - all(isinstance(item, (ObjectABC, KeywordSectionModel)) for item in data) + if isinstance(data, list) and all( + isinstance(item, (ObjectABC, KeywordSectionModel)) for item in data ): transformed_data = data elif isinstance(data, list) and all(isinstance(item, dict) for item in data): @@ -192,7 +191,7 @@ def replace_keyword_section(self, data: dict | list | ObjectABC | KeywordSection elif isinstance(data, dict): transformed_data = KeywordSectionModel(**data) else: - raise ValueError("Invalid data to replace_keyword_section") + raise ValueError('Invalid data to replace_keyword_section') if isinstance(transformed_data, list): for item in transformed_data: diff --git a/tcex/api/tc/v3/v3_model_abc.py b/tcex/api/tc/v3/v3_model_abc.py index a1c088467..92188f1dd 100644 --- a/tcex/api/tc/v3/v3_model_abc.py +++ b/tcex/api/tc/v3/v3_model_abc.py @@ -55,9 +55,10 @@ def __init__(self, **kwargs): # this rule. # pylint: disable=no-member if ( - kwargs and hasattr(self, 'id') and - self.id is None and - self.__config__.title != 'Keyword Section Model' + kwargs + and hasattr(self, 'id') + and self.id is None + and self.__config__.title != 'Keyword Section Model' ): self._staged = True diff --git a/tests/api/tc/v3/intel_requirements/test_intel_requirements_interface.py b/tests/api/tc/v3/intel_requirements/test_intel_requirements_interface.py index 34b808e07..c32ac17c2 100644 --- a/tests/api/tc/v3/intel_requirements/test_intel_requirements_interface.py +++ b/tests/api/tc/v3/intel_requirements/test_intel_requirements_interface.py @@ -5,9 +5,10 @@ # third-party import pytest -from tcex.api.tc.v3.intel_requirements.keyword_sections.keyword_section_model import \ - KeywordSectionModel # first-party +from tcex.api.tc.v3.intel_requirements.keyword_sections.keyword_section_model import ( + KeywordSectionModel, +) from tcex.api.tc.v3.tql.tql_operator import TqlOperator from tests.api.tc.v3.v3_helpers import TestV3, V3Helper @@ -90,15 +91,11 @@ def test_keywords_section(self): ir = self.v3_helper.create_ir() keyword_sections = [ KeywordSectionModel( - section_number=0, - compare_value='includes', - keywords=[{'value': 'keyword1'}] + section_number=0, compare_value='includes', keywords=[{'value': 'keyword1'}] ), KeywordSectionModel( - section_number=1, - compare_value='includes', - keywords=[{'value': 'keyword2'}] - ) + section_number=1, compare_value='includes', keywords=[{'value': 'keyword2'}] + ), ] ir.replace_keyword_section(keyword_sections) ir.update() From 22b62df7696d239b543dd35fb4eba096f930bb38 Mon Sep 17 00:00:00 2001 From: Ben Purdy Date: Thu, 27 Jun 2024 11:25:35 -0400 Subject: [PATCH 4/5] APP-4522 --- tcex/api/tc/v3/_gen/_gen_object_abc.py | 32 +++++-------------- .../intel_requirements/intel_requirement.py | 16 ++++------ 2 files changed, 14 insertions(+), 34 deletions(-) diff --git a/tcex/api/tc/v3/_gen/_gen_object_abc.py b/tcex/api/tc/v3/_gen/_gen_object_abc.py index 161727f9b..64f69ebfe 100644 --- a/tcex/api/tc/v3/_gen/_gen_object_abc.py +++ b/tcex/api/tc/v3/_gen/_gen_object_abc.py @@ -471,36 +471,20 @@ def replace_artifact(self, **kwargs): f'''):''' ), f'''{self.i2}"""Replace {type_.singular()} on the object."""''', - f'''{self.i2}if isinstance(data, ObjectABC):''', - f'''{self.i3}transformed_data = data''', - ( - f'''{self.i2}elif isinstance(data, list) and ''' - f'''all(isinstance(item, {model_class}) for item in data):''' - ), - f'''{self.i3}transformed_data = data''', - ( - f'''{self.i2}elif isinstance(data, list) and ''' - f'''all(isinstance(item, ObjectABC) for item in data):''' - ), + f'''{self.i2}if not isinstance(data, list):''', + f'''{self.i3}data = [data]''', + '', + f'''{self.i2}if all(isinstance(item, ({model_class}, ObjectABC)) for item in data):''' f'''{self.i3}transformed_data = data''', - ( - f'''{self.i2}elif isinstance(data, list) and ''' - f'''all(isinstance(item, dict) for item in data):''' - ), + f'''{self.i2}elif all(isinstance(item, dict) for item in data):''' f'''{self.i3}transformed_data = [{model_class}(**d) for d in data]''', - f'''{self.i2}elif isinstance(data, dict):''', - f'''{self.i3}transformed_data = {model_class}(**data)''', - f'''{self.i2}elif isinstance(data, {model_class}):''', - f'''{self.i3}transformed_data = data''', f'''{self.i2}else:''' f'''{self.i3}raise ValueError("Invalid data to replace_{model_type.singular()}")''', '', '', - f'''{self.i2}if isinstance(transformed_data, list):''', - f'''{self.i3}for item in transformed_data:''', - f'''{self.i4}item._staged = True''', - f'''{self.i2}elif isinstance(transformed_data, {model_class}):''', - f'''{self.i3}transformed_data._staged = True''', + f'''{self.i2}for item in transformed_data:''', + f'''{self.i3}item._staged = True''', + '', f'''{self.i2}self.model.{model_reference} = transformed_data # type: ignore''', '', '', diff --git a/tcex/api/tc/v3/intel_requirements/intel_requirement.py b/tcex/api/tc/v3/intel_requirements/intel_requirement.py index 4b2e5e0fe..c0ff7592a 100644 --- a/tcex/api/tc/v3/intel_requirements/intel_requirement.py +++ b/tcex/api/tc/v3/intel_requirements/intel_requirement.py @@ -182,20 +182,16 @@ def replace_keyword_section(self, data: dict | list | ObjectABC | KeywordSection if not isinstance(data, list): data = [data] - if isinstance(data, list) and all( - isinstance(item, (ObjectABC, KeywordSectionModel)) for item in data - ): + if all(isinstance(item, (KeywordSectionModel, ObjectABC)) for item in data): transformed_data = data - elif isinstance(data, list) and all(isinstance(item, dict) for item in data): + elif all(isinstance(item, dict) for item in data): transformed_data = [KeywordSectionModel(**d) for d in data] - elif isinstance(data, dict): - transformed_data = KeywordSectionModel(**data) else: - raise ValueError('Invalid data to replace_keyword_section') + raise ValueError("Invalid data to replace_keyword_section") + + for item in transformed_data: + item._staged = True - if isinstance(transformed_data, list): - for item in transformed_data: - item._staged = True self.model.keyword_sections = transformed_data # type: ignore def stage_tag(self, data: dict | ObjectABC | TagModel): From 1c9d8b38acf4f256062a78775183911cb4a92542 Mon Sep 17 00:00:00 2001 From: Ben Purdy Date: Thu, 27 Jun 2024 15:24:19 -0400 Subject: [PATCH 5/5] APP-4522 --- tcex/api/tc/v3/_gen/_gen_object_abc.py | 2 +- tcex/api/tc/v3/intel_requirements/intel_requirement.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tcex/api/tc/v3/_gen/_gen_object_abc.py b/tcex/api/tc/v3/_gen/_gen_object_abc.py index 64f69ebfe..83d860375 100644 --- a/tcex/api/tc/v3/_gen/_gen_object_abc.py +++ b/tcex/api/tc/v3/_gen/_gen_object_abc.py @@ -479,7 +479,7 @@ def replace_artifact(self, **kwargs): f'''{self.i2}elif all(isinstance(item, dict) for item in data):''' f'''{self.i3}transformed_data = [{model_class}(**d) for d in data]''', f'''{self.i2}else:''' - f'''{self.i3}raise ValueError("Invalid data to replace_{model_type.singular()}")''', + f'''{self.i3}raise ValueError('Invalid data to replace_{model_type.singular()}')''', '', '', f'''{self.i2}for item in transformed_data:''', diff --git a/tcex/api/tc/v3/intel_requirements/intel_requirement.py b/tcex/api/tc/v3/intel_requirements/intel_requirement.py index c0ff7592a..8f79f5442 100644 --- a/tcex/api/tc/v3/intel_requirements/intel_requirement.py +++ b/tcex/api/tc/v3/intel_requirements/intel_requirement.py @@ -187,7 +187,7 @@ def replace_keyword_section(self, data: dict | list | ObjectABC | KeywordSection elif all(isinstance(item, dict) for item in data): transformed_data = [KeywordSectionModel(**d) for d in data] else: - raise ValueError("Invalid data to replace_keyword_section") + raise ValueError('Invalid data to replace_keyword_section') for item in transformed_data: item._staged = True