-
Notifications
You must be signed in to change notification settings - Fork 198
/
import_mx_l3.py
131 lines (107 loc) · 4.5 KB
/
import_mx_l3.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#!/usr/bin/python3
READ_ME = '''
=== PREREQUISITES ===
Run in Python 3 with Meraki dashboard API Python library @
https://github.com/meraki/dashboard-api-python/
pip[3] install --upgrade meraki
=== DESCRIPTION ===
Imports MX L3 outbound firewall rules from CSV file.
Note that if there is a final "default rule" with logging enabled, then a
syslog server needs to be configured on the Network-wide > General page.
=== USAGE ===
python[3] import_mx_l3.py [-k <api_key>] -n <net_id> -f <file> [-m <mode>]
The -f parameter is the path to the CSV file with the new MX L3 firewall rules.
The optional -m parameter is either "simulate" (default) to only print changes,
or "commit" to also apply those changes to the dashboard network.
API key can also be exported as an environment variable named
MERAKI_DASHBOARD_API_KEY
'''
import csv
from datetime import datetime
import getopt
import os
import sys
import ipaddress
import meraki
# Prints READ_ME help message for user to read
def print_help():
lines = READ_ME.split('\n')
for line in lines:
print('# {0}'.format(line))
def main(argv):
# Set default values for command line arguments
api_key = net_id = arg_file = arg_mode = None
# Get command line arguments
try:
opts, args = getopt.getopt(argv, 'hk:n:f:m:')
except getopt.GetoptError:
print_help()
sys.exit(2)
for opt, arg in opts:
if opt == '-h':
print_help()
sys.exit()
elif opt == '-k':
api_key = arg
elif opt == '-n':
net_id = arg
elif opt == '-f':
arg_file = arg
elif opt == '-m':
arg_mode = arg
# Check if all required parameters have been input
if (api_key == None and os.getenv('MERAKI_DASHBOARD_API_KEY') == None) or net_id == None or arg_file == None:
print_help()
sys.exit(2)
# Assign default mode to "simulate" unless "commit" specified
if arg_mode != 'commit':
arg_mode = 'simulate'
# Read CSV input file, and skip header row
input_file = open(arg_file)
csv_reader = csv.reader(input_file, delimiter=',', quotechar='"', quoting=csv.QUOTE_ALL)
next(csv_reader, None)
print(f'Reading file {arg_file}')
# Loop through each firewall rule from CSV file and build PUT data
fw_rules = []
for row in csv_reader:
rule = dict({'policy': row[0], 'protocol': row[1], 'srcCidr': row[2], 'srcPort': row[3], 'destCidr': row[4], 'destPort': row[5], 'comment': row[6], 'syslogEnabled': (row[7] == True or row[7] == 'True' or row[7] == 'true')})
# Append implied "/32" for IP addresses for just one host
try:
ip = ipaddress.ip_address(rule['srcCidr'])
if not '/' in rule['srcCidr']:
rule['srcCidr'] += '/32'
except:
pass
try:
ip = ipaddress.ip_address(rule['destCidr'])
if not '/' in rule['destCidr']:
rule['destCidr'] += '/32'
except:
pass
print(rule)
fw_rules.append(rule)
old_rules = list(fw_rules)
print(f'Processed all {len(fw_rules)} rules of file {arg_file}')
# Check if last (default) rule exists, and if so, remove and check for default logging
default_rule_exists = False
default_logging = False
last_rule = {'comment': 'Default rule', 'policy': 'allow', 'protocol': 'Any', 'srcPort': 'Any', 'srcCidr': 'Any', 'destPort': 'Any', 'destCidr': 'Any'}
if all(item in fw_rules[-1].items() for item in last_rule.items()):
default_rule_exists = True
default_logging = (fw_rules.pop()['syslogEnabled'] == True)
# Dashboard API library class
m = meraki.DashboardAPI(api_key=api_key, log_file_prefix=__file__[:-3], simulate=(arg_mode == 'simulate'))
# Update MX L3 firewall rules
print(f'Attempting update/simulation of firewall rules to network {net_id}')
m.appliance.updateNetworkApplianceFirewallL3FirewallRules(net_id, rules=fw_rules, syslogDefaultRule=default_logging)
# Confirm whether changes were successfully made
if arg_mode == 'commit':
new_rules = m.appliance.getNetworkApplianceFirewallL3FirewallRules(net_id)['rules']
if default_rule_exists and new_rules[:-1] == old_rules[:-1]:
print('Update successful!')
elif not(default_rule_exists) and new_rules[:-1] == old_rules:
print('Update successful!')
else:
print('Uh oh, something went wrong...')
if __name__ == '__main__':
main(sys.argv[1:])