Skip to content

Commit

Permalink
dev: security mode and nextHopIpv6 (#5)
Browse files Browse the repository at this point in the history
* allow empty string as security policy mode

* fix: policy mode default to security

* fix: data policy nextHopIpv6 action set entry

* bump sdk dev version
  • Loading branch information
sbasan authored Nov 21, 2024
1 parent c3c514c commit feb1515
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 21 deletions.
13 changes: 12 additions & 1 deletion catalystwan/models/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,18 @@ def str_as_interface_list(val: Union[str, Sequence[InterfaceStr]]) -> Sequence[I
"umbrella",
]

PolicyModeType = Literal["security", "unified"]
_PolicyModeType = Literal["security", "unified"]

def parse_policy_mode(val: Optional[str]) -> _PolicyModeType:
if isinstance(val, str) and val == "unified":
return "unified"
return "security"

PolicyModeType = Annotated[
_PolicyModeType,
BeforeValidator(parse_policy_mode)
]


CoreRegion = Literal[
"core",
Expand Down
10 changes: 7 additions & 3 deletions catalystwan/models/policy/definition/traffic_data.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Copyright 2023 Cisco Systems, Inc. and its affiliates

from ipaddress import IPv4Address, IPv4Network, IPv6Network
from ipaddress import IPv4Address, IPv4Network, IPv6Address, IPv6Network
from typing import List, Literal, Optional, Set, Tuple, Union, overload
from uuid import UUID

Expand Down Expand Up @@ -49,6 +49,7 @@
Match,
NATAction,
NextHopActionEntry,
NextHopIpv6ActionEntry,
NextHopLooseEntry,
PacketLengthEntry,
PLPEntry,
Expand Down Expand Up @@ -274,8 +275,11 @@ def associate_nat_action(
self._insert_action(nat_action)

@accept_action
def associate_next_hop_action(self, next_hop: IPv4Address, loose: bool = False) -> None:
self._insert_action_in_set(NextHopActionEntry(value=next_hop))
def associate_next_hop_action(self, next_hop: Union[IPv4Address, IPv6Address], loose: bool = False) -> None:
if isinstance(next_hop, IPv6Address):
self._insert_action_in_set(NextHopIpv6ActionEntry(value=next_hop))
else:
self._insert_action_in_set(NextHopActionEntry(value=next_hop))
self._insert_action_in_set(NextHopLooseEntry(value=loose))

@accept_action
Expand Down
12 changes: 4 additions & 8 deletions catalystwan/models/policy/definition/zone_based_firewall.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from pydantic import BaseModel, ConfigDict, Field
from typing_extensions import Annotated

from catalystwan.models.common import PolicyModeType
from catalystwan.models.misc.application_protocols import ApplicationProtocol
from catalystwan.models.policy.policy_definition import (
AdvancedInspectionProfileAction,
Expand Down Expand Up @@ -220,12 +221,6 @@ class ZoneBasedFWPolicyEntry(BaseModel):
model_config = ConfigDict(populate_by_name=True)


class ZoneBasedFWPolicyHeader(PolicyDefinitionBase):
type: Literal["zoneBasedFW"] = "zoneBasedFW"
mode: str = Field(default="security")
model_config = ConfigDict(populate_by_name=True)


class ZoneBasedFWPolicyDefinition(DefinitionWithSequencesCommonBase):
default_action: ZoneBasedFirewallDefaultAction = Field(
default=ZoneBasedFirewallDefaultAction(type="drop"),
Expand All @@ -236,9 +231,10 @@ class ZoneBasedFWPolicyDefinition(DefinitionWithSequencesCommonBase):
entries: List[ZoneBasedFWPolicyEntry] = []


class ZoneBasedFWPolicy(ZoneBasedFWPolicyHeader):
class ZoneBasedFWPolicy(PolicyDefinitionBase):
model_config = ConfigDict(populate_by_name=True)
type: Literal["zoneBasedFW"] = "zoneBasedFW"
mode: Literal["security", "unified"] = "security"
mode: PolicyModeType = "security"
definition: ZoneBasedFWPolicyDefinition = ZoneBasedFWPolicyDefinition()

def add_ipv4_rule(
Expand Down
6 changes: 6 additions & 0 deletions catalystwan/models/policy/policy_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,11 @@ class NextHopActionEntry(BaseModel):
value: Union[IPv4Address, IPv6Address]


class NextHopIpv6ActionEntry(BaseModel):
field: Literal["nextHopIpv6"] = "nextHopIpv6"
value: IPv6Address


class NextHopMatchEntry(BaseModel):
field: Literal["nextHop"] = "nextHop"
ref: UUID
Expand Down Expand Up @@ -1205,6 +1210,7 @@ class CloudSaaSAction(BaseModel):
MetricEntry,
MetricTypeEntry,
NextHopActionEntry,
NextHopIpv6ActionEntry,
NextHopLooseEntry,
OMPTagEntry,
OriginatorEntry,
Expand Down
16 changes: 8 additions & 8 deletions catalystwan/models/policy/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class UnifiedSecurityPolicyDefinition(PolicyDefinition):


class SecurityPolicy(PolicyCreationPayload):
policy_mode: Literal[None, "security"] = Field(
policy_mode: Literal[None, "", "security"] = Field(
default="security", serialization_alias="policyMode", validation_alias="policyMode"
)
policy_type: str = Field(default="feature", serialization_alias="policyType", validation_alias="policyType")
Expand All @@ -138,26 +138,26 @@ class SecurityPolicy(PolicyCreationPayload):
def get_assemby_item_uuids(self) -> Set[UUID]:
return set((item.definition_id for item in self.policy_definition.assembly))

def add_item(self, item: SecurityPolicyAssemblyItem) -> None:
def _add_item(self, item: SecurityPolicyAssemblyItem) -> None:
self.policy_definition.assembly.append(item)

def add_zone_based_fw(self, definition_id: UUID) -> None:
self.add_item(ZoneBasedFWAssemblyItem(definition_id=definition_id))
self._add_item(ZoneBasedFWAssemblyItem(definition_id=definition_id))

def add_dns_security(self, definition_id: UUID) -> None:
self.add_item(DNSSecurityAssemblyItem(definition_id=definition_id))
self._add_item(DNSSecurityAssemblyItem(definition_id=definition_id))

def add_intrusion_prevention(self, definition_id: UUID) -> None:
self.add_item(IntrusionPreventionAssemblyItem(definition_id=definition_id))
self._add_item(IntrusionPreventionAssemblyItem(definition_id=definition_id))

def add_url_filtering(self, definition_id: UUID) -> None:
self.add_item(URLFilteringAssemblyItem(definition_id=definition_id))
self._add_item(URLFilteringAssemblyItem(definition_id=definition_id))

def add_advanced_malware_protection(self, definition_id: UUID) -> None:
self.add_item(AdvancedMalwareProtectionAssemblyItem(definition_id=definition_id))
self._add_item(AdvancedMalwareProtectionAssemblyItem(definition_id=definition_id))

def add_ssl_decryption(self, definition_id: UUID) -> None:
self.add_item(SSLDecryptionAssemblyItem(definition_id=definition_id))
self._add_item(SSLDecryptionAssemblyItem(definition_id=definition_id))

@field_validator("policy_definition", mode="before")
@classmethod
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "catalystwan"
version = "0.40.0dev0"
version = "0.40.0dev1"
description = "Cisco Catalyst WAN SDK for Python"
authors = ["kagorski <kagorski@cisco.com>"]
readme = "README.md"
Expand Down

0 comments on commit feb1515

Please sign in to comment.