This repository has been archived by the owner on Dec 22, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 5
/
dna.py
138 lines (116 loc) · 5.41 KB
/
dna.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
"""
This module implements a northbound API client manager for DNA Center
Basic Usage:
dnac = dna.Dnac('https://10.0.0.1/')
dnac.login('admin', 'password')
print(dnac.get('network-device/count'))
dnac.close()
Or as a context manager:
with dna.Dnac('https://10.0.0.1/') as dnac:
dnac.login('admin', 'password')
print(dnac.get('network-device/count'))
"""
# Author: Tim Dorssers
import json
import time
import logging
import requests
from requests import HTTPError
requests.packages.urllib3.disable_warnings() # Disable warnings
class Dnac(requests.Session):
""" Implements a REST API session manager for DNA Center """
def __init__(self, url):
super(Dnac, self).__init__()
self.base_url = 'https://' + url.rsplit('://')[-1].split('/')[0]
self.headers.update({'Content-Type': 'application/json'})
self.verify = False # Ignore verifying the SSL certificate
def login(self, username, passwd):
""" Opens session to DNA Center """
# Request token using HTTP basic authorization
response = self.post('auth/token', ver='api/system/v1',
auth=(username, passwd))
# Persist authorization token for further REST requests
self.headers.update({'X-Auth-Token': response['Token']})
def request(self, method, api, ver='api/v1', data=None, **kwargs):
""" Extends base class method to handle DNA Center JSON data """
# Construct URL, serialize data and send request
url = self.base_url + '/' + ver.strip('/') + '/' + api.strip('/')
data = json.dumps(data).encode('utf-8') if data is not None else None
response = super(Dnac, self).request(method, url, data=data, **kwargs)
# Deserialize response and return JsonObj object
try:
json_obj = response.json(object_hook=JsonObj)
except ValueError:
logging.debug('Response is not JSON encoded')
json_obj = response # Return requests.Response object instead
else:
if 400 <= response.status_code < 600 and 'response' in json_obj:
# Use DNA Center returned error message in case of HTTP error
response.reason = _flatten(': ', json_obj.response,
['errorCode', 'message', 'detail'])
response.raise_for_status() # Raise HTTPError, if one occurred
return json_obj
def wait_on_task(self, task_id, timeout=125, interval=2, backoff=1.15):
""" Repeatedly requests DNA Center task status until completed """
start_time = time.time()
while True:
# Get task status by id
response = self.get('task/' + task_id)
if 'endTime' in response.response: # Task has completed
msg = _flatten(': ', response.response,
['errorCode', 'failureReason', 'progress'])
# Raise exception when isError is true else log completion
if response.response.get('isError', False):
raise TaskError(msg, response=response)
else:
logging.info('TASK %s has completed and returned: %s'
% (task_id, msg))
return response
elif (start_time + timeout < time.time()): # Task has timed out
raise TimeoutError('TASK %s did not complete within the '
'specified time-out (%s seconds)'
% (task_id, timeout))
logging.info('TASK %s has not completed yet. Sleeping %d seconds'
% (task_id, interval))
time.sleep(int(interval))
interval *= backoff
class TimeoutError(Exception):
""" Custom exception raised when a task has timed out """
pass
class TaskError(Exception):
""" Custom exception raised when a task has failed """
def __init__(self, *args, **kwargs):
self.response = kwargs.pop('response', None)
super(TaskError, self).__init__(*args, **kwargs)
class JsonObj(dict):
""" Dictionary with attribute access """
__setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__
def __getattr__(self, name):
""" x.__getattr__(y) <==> x.y """
try:
return self[name]
except KeyError:
raise AttributeError(name)
def __str__(self):
""" Serialize object to JSON formatted string with indents """
return json.dumps(self, indent=4)
def _flatten(string, dct, keys):
""" Helper function to join values of given keys existing in dict """
return string.join(str(dct[k]) for k in set(keys) & set(dct.keys()))
def find(obj, val, key='id'):
""" Recursively search JSON object for a value of a key/attribute """
if isinstance(obj, list): # JSON array
for item in obj:
r = find(item, val, key)
if r is not None:
return r
elif isinstance(obj, JsonObj): # JSON object
if obj.get(key) == val:
return obj
for item in iter(obj):
if isinstance(obj[item], list):
return find(obj[item], val, key)
def ctime(val):
""" Convert time in milliseconds since the epoch to a formatted string """
return time.ctime(int(val) // 1000)