From 95056e6abeb1b9ccee545ab2e990b1b2ddf6f355 Mon Sep 17 00:00:00 2001 From: Orion Poplawski Date: Fri, 22 Dec 2023 21:26:44 -0700 Subject: [PATCH] [pfsense_nat_outbound] Allow for NET: addresses --- plugins/module_utils/nat_outbound.py | 81 ++++++++++++------- plugins/modules/pfsense_nat_outbound.py | 4 +- .../modules/test_pfsense_nat_outbound.py | 62 +++++++++----- 3 files changed, 95 insertions(+), 52 deletions(-) diff --git a/plugins/module_utils/nat_outbound.py b/plugins/module_utils/nat_outbound.py index 6e5c6abe..bb137d4c 100644 --- a/plugins/module_utils/nat_outbound.py +++ b/plugins/module_utils/nat_outbound.py @@ -76,9 +76,10 @@ def _params_to_obj(self): obj = dict() obj['descr'] = self.params['descr'] - if self.params['state'] == 'present': + params = self.params + if params['state'] == 'present': obj['sourceport'] = '' - obj['interface'] = self.pfsense.parse_interface(self.params['interface']) + obj['interface'] = self.pfsense.parse_interface(params['interface']) self._get_ansible_param(obj, 'ipprotocol') if obj['ipprotocol'] == 'inet46': del obj['ipprotocol'] @@ -93,15 +94,15 @@ def _params_to_obj(self): self._get_ansible_param_bool(obj, 'staticnatport', value='') self._get_ansible_param_bool(obj, 'nosync', value='') - if 'after' in self.params and self.params['after'] is not None: - self.after = self.params['after'] + if 'after' in params and params['after'] is not None: + self.after = params['after'] - if 'before' in self.params and self.params['before'] is not None: - self.before = self.params['before'] + if 'before' in params and params['before'] is not None: + self.before = params['before'] self._parse_address(obj, 'source', 'sourceport', True, 'network') - self._parse_address(obj, 'destination', 'dstport', False, 'address') - if self.params['invert']: + self._parse_address(obj, 'destination', 'dstport', False, 'network') + if params['invert']: obj['destination']['not'] = None self._parse_translated_address(obj) @@ -120,32 +121,49 @@ def _parse_address(self, obj, field, field_port, allow_self, target): param = self.params[field] addr = param.split(':') - if len(addr) > 2: + if len(addr) > 3: self.module.fail_json(msg='Cannot parse address %s' % (param)) address = addr[0] ret = dict() - ports = addr[1] if len(addr) > 1 else None - if address == 'any': - if field == 'source': - ret[target] = 'any' - else: - ret['any'] = '' - # rule with this firewall - elif allow_self and address == '(self)': - ret[target] = '(self)' - elif self.pfsense.is_ipv4_address(address): - ret[target] = address + '/32' - elif self.pfsense.is_ipv4_network(address, False): - (addr, bits) = self.pfsense.parse_ip_network(address, False, False) - ret[target] = addr + '/' + str(bits) - elif self.pfsense.find_alias(address, 'host') is not None or self.pfsense.find_alias(address, 'network') is not None: - ret[target] = address + + if address == 'NET': + interface = addr[1] if len(addr) > 1 else None + ports = addr[2] if len(addr) > 2 else None + if interface is None or interface == '': + self.module.fail_json(msg='Cannot parse address %s' % (param)) + + ret['network'] = self.pfsense.parse_interface(interface) else: - self.module.fail_json(msg='Cannot parse address %s, not IP or alias' % (address)) + ports = addr[1] if len(addr) > 1 else None + if address == 'any': + if field == 'source': + ret[target] = 'any' + else: + ret['any'] = '' + # rule with this firewall + elif allow_self and address == '(self)': + ret[target] = '(self)' + elif self.params['ipprotocol'] != 'inet6' and self.pfsense.is_ipv4_address(address): + ret[target] = address + '/32' + self.module.warn('Specifying an address without a CIDR prefix is depracated. Please add /32 if you want a single host address') + elif self.params['ipprotocol'] != 'inet4' and self.pfsense.is_ipv6_address(address): + ret[target] = address + '/128' + self.module.warn('Specifying an address without a CIDR prefix is depracated. Please add /128 if you want a single host address') + elif self.params['ipprotocol'] != 'inet6' and self.pfsense.is_ipv4_network(address, False): + (addr, bits) = self.pfsense.parse_ip_network(address, False, False) + ret[target] = addr + '/' + str(bits) + elif self.params['ipprotocol'] != 'inet4' and self.pfsense.is_ipv6_network(address, False): + (addr, bits) = self.pfsense.parse_ip_network(address, False, False) + ret[target] = addr + '/' + str(bits) + elif self.pfsense.find_alias(address, 'host') is not None or self.pfsense.find_alias(address, 'network') is not None: + ret[target] = address + else: + self.module.fail_json(msg='Cannot parse address %s, not %s network or alias' % (address, self.params['ipprotocol'])) - self._parse_ports(obj, ports, field_port, param) + if ports is not None: + self._parse_ports(obj, ports, field_port, param) obj[field] = ret @@ -427,13 +445,14 @@ def _log_fields(self, before=None): return values - @staticmethod - def _obj_address_to_log_field(rule, addr, target, port): + def _obj_address_to_log_field(self, rule, addr, target, port): """ return formated address from dict """ field = '' if addr in rule: if target in rule[addr]: - field = rule[addr][target] + if self.pfsense.interfaces.find(rule[addr][target]): + field = 'NET:' + field += rule[addr][target] elif addr == 'destination' and 'any' in rule[addr]: field = 'any' @@ -447,7 +466,7 @@ def _obj_to_log_fields(self, rule): """ return formated source and destination from dict """ res = {} res['source'] = self._obj_address_to_log_field(rule, 'source', 'network', 'sourceport') - res['destination'] = self._obj_address_to_log_field(rule, 'destination', 'address', 'dstport') + res['destination'] = self._obj_address_to_log_field(rule, 'destination', 'network', 'dstport') res['interface'] = self.pfsense.get_interface_display_name(rule['interface']) if rule['target'] == 'other-subnet': diff --git a/plugins/modules/pfsense_nat_outbound.py b/plugins/modules/pfsense_nat_outbound.py index 72232c0e..9586de96 100644 --- a/plugins/modules/pfsense_nat_outbound.py +++ b/plugins/modules/pfsense_nat_outbound.py @@ -49,12 +49,12 @@ choices: [ "any", "tcp", "udp", "tcp/udp", "icmp", "esp", "ah", "gre", "ipv6", "igmp", "carp", "pfsync" ] type: str source: - description: The matching source address, in {any,(self),ALIAS,NETWORK}[:port] format. + description: The matching source address, in {any,(self),ALIAS,NETWORK,NET:INTERFACE}[:port] format. required: false default: null type: str destination: - description: The matching destination address, in {any,ALIAS,NETWORK}[:port] format. + description: The matching destination address, in {any,ALIAS,NETWORK,NET:INTERFACE}[:port] format. required: false default: null type: str diff --git a/tests/unit/plugins/modules/test_pfsense_nat_outbound.py b/tests/unit/plugins/modules/test_pfsense_nat_outbound.py index 3af48110..7ac92dc8 100644 --- a/tests/unit/plugins/modules/test_pfsense_nat_outbound.py +++ b/tests/unit/plugins/modules/test_pfsense_nat_outbound.py @@ -35,28 +35,35 @@ def is_ipv4_address(address): pass return False - def parse_address(self, addr, field): + def parse_address(self, name, addr, field, invert=False): """ return address parsed in dict """ parts = addr.split(':') res = {} port = None - if parts[0] == 'any': - if field == 'network': - res[field] = 'any' - else: - res['any'] = None - elif parts[0] == '(self)': - res[field] = '(self)' - elif parts[0] in ['lan', 'vpn', 'vt1', 'lan_100']: - res[field] = self.unalias_interface(parts[0]) + if parts[0] == 'NET': + res[field] = parts[1] + if len(parts) > 2: + port = parts[2].replace('-', ':') else: - res[field] = parts[0] + if parts[0] == 'any': + if name == 'source': + res[field] = 'any' + else: + res['any'] = None + elif parts[0] == '(self)': + res[field] = '(self)' + elif parts[0] in ['lan', 'vpn', 'vt1', 'lan_100']: + res[field] = self.unalias_interface(parts[0]) + else: + res[field] = parts[0] - if field in res and self.is_ipv4_address(res[field]) and res[field].find('/') == -1: - res[field] += '/32' + if field in res and self.is_ipv4_address(res[field]) and res[field].find('/') == -1: + res[field] += '/32' - if len(parts) > 1: - port = parts[1].replace('-', ':') + if len(parts) > 1: + port = parts[1].replace('-', ':') + if invert: + res['not'] = None return (res, port) @@ -68,9 +75,9 @@ def reparse_network(value): return '2.3.4.0/24' return value - def check_addr(self, params, target_elt, addr, field, port): + def check_addr(self, params, target_elt, addr, field, port, invert=False): """ test the addresses definition """ - (addr_dict, port_value) = self.parse_address(params[addr], field) + (addr_dict, port_value) = self.parse_address(addr, params[addr], field, invert=invert) addr_elt = self.assert_find_xml_elt(target_elt, addr) for key, value in addr_dict.items(): self.check_value_equal(addr_elt, key, self.reparse_network(value)) @@ -107,12 +114,11 @@ def md5(value): def check_target_elt(self, obj, target_elt, target_idx=-1): """ test the xml definition """ self.check_addr(obj, target_elt, 'source', 'network', 'sourceport') - self.check_addr(obj, target_elt, 'destination', 'address', 'dstport') + self.check_addr(obj, target_elt, 'destination', 'network', 'dstport', invert=obj.get('invert')) self.check_target_addr(obj, target_elt) self.check_param_equal_or_not_find(obj, target_elt, 'disabled') self.check_param_equal_or_not_find(obj, target_elt, 'nonat') - self.check_param_equal_or_not_find(obj, target_elt, 'invert') self.check_param_equal_or_not_find(obj, target_elt, 'staticnatport') self.check_param_equal_or_not_find(obj, target_elt, 'nosync') self.check_param_equal_or_not_find(obj, target_elt, 'nonat') @@ -191,6 +197,24 @@ def test_nat_outbound_create_networks(self): command = "create nat_outbound 'https-source-rewriting', interface='lan', source='1.2.3.4/24', destination='2.3.4.5/24:443'" self.do_module_test(obj, command=command, target_idx=3) + def test_nat_outbound_create_networks_invert(self): + """ test """ + obj = dict(descr='https-source-rewriting', interface='lan', source='1.2.3.4/24', destination='2.3.4.5/24:443', invert=True) + command = "create nat_outbound 'https-source-rewriting', interface='lan', source='1.2.3.4/24', destination='2.3.4.5/24:443', invert=True" + self.do_module_test(obj, command=command, target_idx=3) + + def test_nat_outbound_create_interface_destination_network(self): + """ test """ + obj = dict(descr='https-source-rewriting', interface='lan', source='1.2.3.4/24', destination='NET:lan:443') + command = "create nat_outbound 'https-source-rewriting', interface='lan', source='1.2.3.4/24', destination='NET:lan:443'" + self.do_module_test(obj, command=command, target_idx=3) + + def test_nat_outbound_create_interface_source_network(self): + """ test """ + obj = dict(descr='https-source-rewriting', interface='lan', source='NET:lan', destination='2.3.4.5/24:443') + command = "create nat_outbound 'https-source-rewriting', interface='lan', source='NET:lan', destination='2.3.4.5/24:443'" + self.do_module_test(obj, command=command, target_idx=3) + def test_nat_outbound_create_top(self): """ test """ obj = dict(descr='https-source-rewriting', interface='lan', source='any', destination='1.2.3.4:443', after='top')