Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exception on extra fields #325

Merged
merged 21 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 101 additions & 66 deletions contentctl/actions/new_content.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@


from dataclasses import dataclass
import questionary
from typing import Any
Expand All @@ -11,67 +9,108 @@
import pathlib
from contentctl.objects.abstract_security_content_objects.security_content_object_abstract import SecurityContentObject_Abstract
from contentctl.output.yml_writer import YmlWriter

from contentctl.objects.enums import AssetType
from contentctl.objects.constants import SES_OBSERVABLE_TYPE_MAPPING, SES_OBSERVABLE_ROLE_MAPPING
class NewContent:
UPDATE_PREFIX = "__UPDATE__"

DEFAULT_DRILLDOWN_DEF = [
{
"name": f'View the detection results for - "${UPDATE_PREFIX}FIRST_RISK_OBJECT$" and "${UPDATE_PREFIX}SECOND_RISK_OBJECT$"',
"search": f'%original_detection_search% | search "${UPDATE_PREFIX}FIRST_RISK_OBJECT = "${UPDATE_PREFIX}FIRST_RISK_OBJECT$" second_observable_type_here = "${UPDATE_PREFIX}SECOND_RISK_OBJECT$"',
"earliest_offset": '$info_min_time$',
"latest_offset": '$info_max_time$'
},
{
"name": f'View risk events for the last 7 days for - "${UPDATE_PREFIX}FIRST_RISK_OBJECT$" and "${UPDATE_PREFIX}SECOND_RISK_OBJECT$"',
"search": f'| from datamodel Risk.All_Risk | search normalized_risk_object IN ("${UPDATE_PREFIX}FIRST_RISK_OBJECT$", "${UPDATE_PREFIX}SECOND_RISK_OBJECT$") starthoursago=168 | stats count min(_time) as firstTime max(_time) as lastTime values(search_name) as "Search Name" values(risk_message) as "Risk Message" values(analyticstories) as "Analytic Stories" values(annotations._all) as "Annotations" values(annotations.mitre_attack.mitre_tactic) as "ATT&CK Tactics" by normalized_risk_object | `security_content_ctime(firstTime)` | `security_content_ctime(lastTime)`',
"earliest_offset": '$info_min_time$',
"latest_offset": '$info_max_time$'
}
]


def buildDetection(self)->dict[str,Any]:
def buildDetection(self) -> tuple[dict[str, Any], str]:
questions = NewContentQuestions.get_questions_detection()
answers: dict[str,str] = questionary.prompt(
questions,
kbi_msg="User did not answer all of the prompt questions. Exiting...")
answers: dict[str, str] = questionary.prompt(
questions,
kbi_msg="User did not answer all of the prompt questions. Exiting...",
)
if not answers:
raise ValueError("User didn't answer one or more questions!")
answers.update(answers)
answers['name'] = answers['detection_name']
del answers['detection_name']
answers['id'] = str(uuid.uuid4())
answers['version'] = 1
answers['date'] = datetime.today().strftime('%Y-%m-%d')
answers['author'] = answers['detection_author']
del answers['detection_author']
answers['data_source'] = answers['data_source']
answers['type'] = answers['detection_type']
del answers['detection_type']
answers['status'] = "production" #start everything as production since that's what we INTEND the content to become
answers['description'] = 'UPDATE_DESCRIPTION'
file_name = answers['name'].replace(' ', '_').replace('-','_').replace('.','_').replace('/','_').lower()
answers['search'] = answers['detection_search'] + ' | `' + file_name + '_filter`'
del answers['detection_search']
answers['how_to_implement'] = 'UPDATE_HOW_TO_IMPLEMENT'
answers['known_false_positives'] = 'UPDATE_KNOWN_FALSE_POSITIVES'
answers['references'] = ['REFERENCE']
answers['tags'] = dict()
answers['tags']['analytic_story'] = ['UPDATE_STORY_NAME']
answers['tags']['asset_type'] = 'UPDATE asset_type'
answers['tags']['confidence'] = 'UPDATE value between 1-100'
answers['tags']['impact'] = 'UPDATE value between 1-100'
answers['tags']['message'] = 'UPDATE message'
answers['tags']['mitre_attack_id'] = [x.strip() for x in answers['mitre_attack_ids'].split(',')]
answers['tags']['observable'] = [{'name': 'UPDATE', 'type': 'UPDATE', 'role': ['UPDATE']}]
answers['tags']['product'] = ['Splunk Enterprise','Splunk Enterprise Security','Splunk Cloud']
answers['tags']['required_fields'] = ['UPDATE']
answers['tags']['risk_score'] = 'UPDATE (impact * confidence)/100'
answers['tags']['security_domain'] = answers['security_domain']
del answers["security_domain"]
answers['tags']['cve'] = ['UPDATE WITH CVE(S) IF APPLICABLE']

#generate the tests section
answers['tests'] = [
{
'name': "True Positive Test",
'attack_data': [
{
'data': "https://github.com/splunk/contentctl/wiki",
"sourcetype": "UPDATE SOURCETYPE",
"source": "UPDATE SOURCE"
}
]
}
]
del answers["mitre_attack_ids"]
return answers

def buildStory(self)->dict[str,Any]:
data_source_field = (
answers["data_source"] if len(answers["data_source"]) > 0 else [f"{NewContent.UPDATE_PREFIX} zero or more data_sources"]
)
file_name = (
answers["detection_name"]
.replace(" ", "_")
.replace("-", "_")
.replace(".", "_")
.replace("/", "_")
.lower()
)

#Minimum lenght for a mitre tactic is 5 characters: T1000
if len(answers["mitre_attack_ids"]) >= 5:
mitre_attack_ids = [x.strip() for x in answers["mitre_attack_ids"].split(",")]
else:
#string was too short, so just put a placeholder
mitre_attack_ids = [f"{NewContent.UPDATE_PREFIX} zero or more mitre_attack_ids"]

output_file_answers: dict[str, Any] = {
"name": answers["detection_name"],
"id": str(uuid.uuid4()),
"version": 1,
"date": datetime.today().strftime("%Y-%m-%d"),
"author": answers["detection_author"],
"status": "production", # start everything as production since that's what we INTEND the content to become
"type": answers["detection_type"],
"description": f"{NewContent.UPDATE_PREFIX} by providing a description of your search",
"data_source": data_source_field,
"search": f"{answers['detection_search']} | `{file_name}_filter`",
"how_to_implement": f"{NewContent.UPDATE_PREFIX} how to implement your search",
"known_false_positives": f"{NewContent.UPDATE_PREFIX} known false positives for your search",
"references": [f"{NewContent.UPDATE_PREFIX} zero or more http references to provide more information about your search"],
"drilldown_searches": NewContent.DEFAULT_DRILLDOWN_DEF,
"tags": {
"analytic_story": [f"{NewContent.UPDATE_PREFIX} by providing zero or more analytic stories"],
"asset_type": f"{NewContent.UPDATE_PREFIX} by providing and asset type from {list(AssetType._value2member_map_)}",
"confidence": f"{NewContent.UPDATE_PREFIX} by providing a value between 1-100",
"impact": f"{NewContent.UPDATE_PREFIX} by providing a value between 1-100",
"message": f"{NewContent.UPDATE_PREFIX} by providing a risk message. Fields in your search results can be referenced using $fieldName$",
"mitre_attack_id": mitre_attack_ids,
"observable": [
{"name": f"{NewContent.UPDATE_PREFIX} the field name of the observable. This is a field that exists in your search results.", "type": f"{NewContent.UPDATE_PREFIX} the type of your observable from the list {list(SES_OBSERVABLE_TYPE_MAPPING.keys())}.", "role": [f"{NewContent.UPDATE_PREFIX} the role from the list {list(SES_OBSERVABLE_ROLE_MAPPING.keys())}"]}
],
"product": [
"Splunk Enterprise",
"Splunk Enterprise Security",
"Splunk Cloud",
],
"security_domain": answers["security_domain"],
"cve": [f"{NewContent.UPDATE_PREFIX} with CVE(s) if applicable"],
},
"tests": [
{
"name": "True Positive Test",
"attack_data": [
{
"data": f"{NewContent.UPDATE_PREFIX} the data file to replay. Go to https://github.com/splunk/contentctl/wiki for information about the format of this field",
"sourcetype": f"{NewContent.UPDATE_PREFIX} the sourcetype of your data file.",
"source": f"{NewContent.UPDATE_PREFIX} the source of your datafile",
}
],
}
],
}

if answers["detection_type"] not in ["TTP", "Anomaly", "Correlation"]:
del output_file_answers["drilldown_searches"]

return output_file_answers, answers['detection_kind']

def buildStory(self) -> dict[str, Any]:
questions = NewContentQuestions.get_questions_story()
answers = questionary.prompt(
questions,
Expand All @@ -96,12 +135,11 @@ def buildStory(self)->dict[str,Any]:
del answers['usecase']
answers['tags']['cve'] = ['UPDATE WITH CVE(S) IF APPLICABLE']
return answers


def execute(self, input_dto: new) -> None:
if input_dto.type == NewContentType.detection:
content_dict = self.buildDetection()
subdirectory = pathlib.Path('detections') / content_dict.pop('detection_kind')
content_dict, detection_kind = self.buildDetection()
subdirectory = pathlib.Path('detections') / detection_kind
elif input_dto.type == NewContentType.story:
content_dict = self.buildStory()
subdirectory = pathlib.Path('stories')
Expand All @@ -111,23 +149,20 @@ def execute(self, input_dto: new) -> None:
full_output_path = input_dto.path / subdirectory / SecurityContentObject_Abstract.contentNameToFileName(content_dict.get('name'))
YmlWriter.writeYmlFile(str(full_output_path), content_dict)



def writeObjectNewContent(self, object: dict, subdirectory_name: str, type: NewContentType) -> None:
if type == NewContentType.detection:
file_path = os.path.join(self.output_path, 'detections', subdirectory_name, self.convertNameToFileName(object['name'], object['tags']['product']))
output_folder = pathlib.Path(self.output_path)/'detections'/subdirectory_name
#make sure the output folder exists for this detection
# make sure the output folder exists for this detection
output_folder.mkdir(exist_ok=True)

YmlWriter.writeDetection(file_path, object)
print("Successfully created detection " + file_path)

elif type == NewContentType.story:
file_path = os.path.join(self.output_path, 'stories', self.convertNameToFileName(object['name'], object['tags']['product']))
YmlWriter.writeStory(file_path, object)
print("Successfully created story " + file_path)

else:
raise(Exception(f"Object Must be Story or Detection, but is not: {object}"))

2 changes: 1 addition & 1 deletion contentctl/contentctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def main():

else:
#The file exists, so load it up!
config_obj = YmlReader().load_file(configFile)
config_obj = YmlReader().load_file(configFile,add_fields=False)
t = test.model_validate(config_obj)
except Exception as e:
print(f"Error validating 'contentctl.yml':\n{str(e)}")
Expand Down
14 changes: 0 additions & 14 deletions contentctl/helper/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,20 +247,6 @@ def validate_git_pull_request(repo_path: str, pr_number: int) -> str:

return hash

# @staticmethod
# def check_required_fields(
# thisField: str, definedFields: dict, requiredFields: list[str]
# ):
# missing_fields = [
# field for field in requiredFields if field not in definedFields
# ]
# if len(missing_fields) > 0:
# raise (
# ValueError(
# f"Could not validate - please resolve other errors resulting in missing fields {missing_fields}"
# )
# )

@staticmethod
def verify_file_exists(
file_path: str, verbose_print=False, timeout_seconds: int = 10
Expand Down
2 changes: 1 addition & 1 deletion contentctl/input/new_content_questions.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def get_questions_detection(cls) -> list[dict[str,Any]]:
"type": "text",
"message": "enter search (spl)",
"name": "detection_search",
"default": "| UPDATE_SPL",
"default": "| __UPDATE__ SPL",
},
{
"type": "text",
Expand Down
17 changes: 11 additions & 6 deletions contentctl/input/yml_reader.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
from typing import Dict, Any

import yaml


import sys
import pathlib

class YmlReader():

@staticmethod
def load_file(file_path: pathlib.Path, add_fields=True, STRICT_YML_CHECKING=False) -> Dict[str,Any]:
def load_file(file_path: pathlib.Path, add_fields:bool=True, STRICT_YML_CHECKING:bool=False) -> Dict[str,Any]:
try:
file_handler = open(file_path, 'r', encoding="utf-8")

Expand All @@ -27,8 +24,16 @@ def load_file(file_path: pathlib.Path, add_fields=True, STRICT_YML_CHECKING=Fals
print(f"Error loading YML file {file_path}: {str(e)}")
sys.exit(1)
try:
#yml_obj = list(yaml.safe_load_all(file_handler))[0]
yml_obj = yaml.load(file_handler, Loader=yaml.CSafeLoader)
#Ideally we should use
# from contentctl.actions.new_content import NewContent
# and use NewContent.UPDATE_PREFIX,
# but there is a circular dependency right now which makes that difficult.
# We have instead hardcoded UPDATE_PREFIX
UPDATE_PREFIX = "__UPDATE__"
data = file_handler.read()
if UPDATE_PREFIX in data:
raise Exception(f"The file {file_path} contains the value '{UPDATE_PREFIX}'. Please fill out any unpopulated fields as required.")
yml_obj = yaml.load(data, Loader=yaml.CSafeLoader)
except yaml.YAMLError as exc:
print(exc)
sys.exit(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@

# TODO (#266): disable the use_enum_values configuration
class SecurityContentObject_Abstract(BaseModel, abc.ABC):
model_config = ConfigDict(use_enum_values=True,validate_default=True)

model_config = ConfigDict(use_enum_values=True,validate_default=True,extra="forbid")
name: str = Field(...,max_length=99)
author: str = Field(...,max_length=255)
date: datetime.date = Field(...)
Expand Down
3 changes: 2 additions & 1 deletion contentctl/objects/alert_action.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from __future__ import annotations
from pydantic import BaseModel, model_serializer
from pydantic import BaseModel, model_serializer, ConfigDict
from typing import Optional

from contentctl.objects.deployment_email import DeploymentEmail
Expand All @@ -9,6 +9,7 @@
from contentctl.objects.deployment_phantom import DeploymentPhantom

class AlertAction(BaseModel):
model_config = ConfigDict(extra="forbid")
email: Optional[DeploymentEmail] = None
notable: Optional[DeploymentNotable] = None
rba: Optional[DeploymentRBA] = DeploymentRBA()
Expand Down
1 change: 1 addition & 0 deletions contentctl/objects/atomic.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class InputArgumentType(StrEnum):
Url = "Url"

class AtomicExecutor(BaseModel):
model_config = ConfigDict(extra="forbid")
name: str
elevation_required: Optional[bool] = False #Appears to be optional
command: Optional[str] = None
Expand Down
3 changes: 2 additions & 1 deletion contentctl/objects/base_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Union
from abc import ABC, abstractmethod

from pydantic import BaseModel
from pydantic import BaseModel,ConfigDict

from contentctl.objects.base_test_result import BaseTestResult

Expand All @@ -21,6 +21,7 @@ def __str__(self) -> str:

# TODO (#224): enforce distinct test names w/in detections
class BaseTest(BaseModel, ABC):
model_config = ConfigDict(extra="forbid")
"""
A test case for a detection
"""
Expand Down
9 changes: 6 additions & 3 deletions contentctl/objects/baseline.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

from __future__ import annotations
from typing import Annotated, Optional, List,Any
from pydantic import field_validator, ValidationInfo, Field, model_serializer
from typing import Annotated, List,Any
from pydantic import field_validator, ValidationInfo, Field, model_serializer, computed_field
from contentctl.objects.deployment import Deployment
from contentctl.objects.security_content_object import SecurityContentObject
from contentctl.objects.enums import DataModel
Expand All @@ -15,7 +15,6 @@
class Baseline(SecurityContentObject):
name:str = Field(...,max_length=CONTENTCTL_MAX_SEARCH_NAME_LENGTH)
type: Annotated[str,Field(pattern="^Baseline$")] = Field(...)
datamodel: Optional[List[DataModel]] = None
search: str = Field(..., min_length=4)
how_to_implement: str = Field(..., min_length=4)
known_false_positives: str = Field(..., min_length=4)
Expand All @@ -34,6 +33,10 @@ def get_conf_stanza_name(self, app:CustomApp)->str:
def getDeployment(cls, v:Any, info:ValidationInfo)->Deployment:
return Deployment.getDeployment(v,info)

@computed_field
@property
def datamodel(self) -> List[DataModel]:
return [dm for dm in DataModel if dm.value in self.search]

@model_serializer
def serialize_model(self):
Expand Down
5 changes: 2 additions & 3 deletions contentctl/objects/baseline_tags.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from __future__ import annotations
from pydantic import BaseModel, Field, field_validator, ValidationInfo, model_serializer
from pydantic import BaseModel, Field, field_validator, ValidationInfo, model_serializer, ConfigDict
from typing import List, Any, Union

from contentctl.objects.story import Story
Expand All @@ -12,12 +12,12 @@


class BaselineTags(BaseModel):
model_config = ConfigDict(extra="forbid")
analytic_story: list[Story] = Field(...)
#deployment: Deployment = Field('SET_IN_GET_DEPLOYMENT_FUNCTION')
# TODO (#223): can we remove str from the possible types here?
detections: List[Union[Detection,str]] = Field(...)
product: List[SecurityContentProductName] = Field(...,min_length=1)
required_fields: List[str] = Field(...,min_length=1)
security_domain: SecurityDomain = Field(...)


Expand All @@ -33,7 +33,6 @@ def serialize_model(self):
"analytic_story": [story.name for story in self.analytic_story],
"detections": [detection.name for detection in self.detections if isinstance(detection,Detection)],
"product": self.product,
"required_fields":self.required_fields,
"security_domain":self.security_domain,
"deployments": None
}
Expand Down
Loading
Loading