-
Notifications
You must be signed in to change notification settings - Fork 4
/
application_switch_3.py
192 lines (142 loc) · 8.21 KB
/
application_switch_3.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
""" Main Switch module, monitors interface bandwidth usage and applies the QoS policies specified by the SDN Controller """
import FeedbackMessage
import FlowMonitor_3
import SwitchProperties
import json
import subprocess
import socket
from threading import Timer
from threading import Thread
from threading import Lock
class SwitchSocket(Thread):
""" Class that listens for SDN controller messages """
def __init__(self, report_object, application_port):
Thread.__init__(self)
self.application_port = application_port
self.report_object = report_object
self.sock = socket.socket() # Create a socket object
def run(self):
host = subprocess.check_output("ifconfig | grep 10.1.4 | awk '{print $2;}'", shell=True).split(':')[1].split('\n')[0]
print "Binding to " + str(host) + "in port " + str(self.application_port)
self.sock.bind((host, self.application_port)) # Bind to the port
self.sock.listen(5)
while True:
try:
client, addr = self.sock.accept() # Establish connection with client
data = client.recv(14336) # Get data from the client
#print 'Message from', addr # Print a message confirming
data_treatment = HandleMessage(self.report_object, data, addr) # Call the thread to work with the data received
data_treatment.setDaemon(True) # Set the thread as a demond
data_treatment.start() # Start the thread
except KeyboardInterrupt:
print "\nCtrl+C was hitten, stopping server"
client.close()
break
class HandleMessage(Thread):
""" Class that process the message and calls appropiate handling method """
def __init__(self, report_object, received, addr):
Thread.__init__(self)
self.received = received
self.src_address = addr[0]
self.response_port = 23456
self.bw_for_new_flows = 0.1
self.report_object = report_object
def run(self):
self.report_object.message_from_controller(self.received)
class ApplicationSwitch:
""" Main class that runs in OpenFlow switches for FlowFence """
def __init__(self):
#Controller message parameters
self.controller_ip = '10.1.4.1'
#todo: Handle this!
self.flowfence_port = 12345
#Link monitoring parameters
self.control_in_process = 0
self.samples = 1
self.period = 3
self.interval_time = 1.0
self.upper_limit = 100
self.lower_limit = 50
self.complete_flow_list = []
self.switch_properties = SwitchProperties.SwitchProperties()
self.interfaces_list = self.switch_properties.get_interfaces()
self.application_port = 23456
# toDo: change for the formula:
# min(((Interface Capacity)/(Minimum Bandwidth SLA));(Maximum SO queue number))
self.max_queue_limit = 100
#print self.interfaces_list
for i in range(len(self.interfaces_list)):
flow_int_dict = dict.fromkeys(['interfaceName', 'dpid'])
flow_int_dict['interfaceName'] = self.interfaces_list[i]['name']
flow_int_dict['dpid'] = self.interfaces_list[i]['dpid']
self.complete_flow_list.append(flow_int_dict)
self.msg_sender = FeedbackMessage.FeedbackMessage(self.controller_ip, self.flowfence_port)
def congestion_detected(self, interface_dict):
""" This method prepares and sends the congestion notification message to the controller """
#Control variable to avoid sending multiple process
if self.control_in_process == 0:
#print "Interface Dict: " + str(interface_dict)
feedback_dict = dict.fromkeys(['Notification', 'Interface'])
feedback_dict['Notification'] = "Congestion"
feedback_dict['Interface'] = dict.fromkeys(['capacity', 'dpid', 'name'])
feedback_dict['Interface']['capacity'] = interface_dict['capacity']
feedback_dict['Interface']['dpid'] = interface_dict['dpid']
feedback_dict['Interface']['name'] = interface_dict['name']
notification_message = json.dumps(str(feedback_dict))
#print 'Message sent: ' + notification_message
self.msg_sender.send_message(notification_message, self.controller_ip, self.flowfence_port)
#self.msg_sender.close_connection()
self.control_in_process = 1
def congestion_ceased(self):
""" Unused for now """
if self.control_in_process == 1:
#print "Congestion ceased"
self.control_in_process = 1
def queues_ready(self, interface_dict, bw_list, queue_list):
""" After receiving the order to create a queue for each flo, notifies the controller that the queues are ready """
#print "Interface Dict: " + str(interface_dict)
feedback_dict = dict.fromkeys(['Notification', 'queue_list', 'Interface', 'bw_list'])
feedback_dict['Notification'] = "QueuesDone"
feedback_dict['queue_list'] = queue_list
feedback_dict['Interface'] = dict.fromkeys(['capacity', 'dpid'])
feedback_dict['Interface']['capacity'] = interface_dict['capacity']
feedback_dict['Interface']['dpid'] = interface_dict['dpid']
feedback_dict['Interface']['name'] = interface_dict['name']
feedback_dict['bw_list'] = bw_list
notification_message = json.dumps(str(feedback_dict))
#print "Message sent to: " + str(self.controller_ip,) + " Port: " + str(self.flowfence_port)
lock = Lock()
lock.acquire()
print "Switch created queues: ", feedback_dict['queue_list']
try:
self.msg_sender.send_message(notification_message, self.controller_ip, self.flowfence_port)
#self.msg_sender.close_connection()
finally:
lock.release()
def message_from_controller(self, message):
""" Handles the message from the controller, this orders the switch to create 1 queue by each flow """
#print "Raw message received: " + str(message)
message_dict = eval(json.loads(message))
#print "Message Received: " + str(message_dict)
# En caso que el mensaje sea una indicacion de congestion, debemos preparar las filas y reportar que han sido inicializadas exitosamente
# Luego recibiremos un flowmod enviando los flujos a las filas respectivas
if message_dict['Response'] == "Decrement":
#self.link_state.create_queues(message_dict)
self.link_state.update_queues(message_dict)
if message_dict['Response'] == "Clear":
print "Warning clearing all queues!"
self.link_state.clear_queues(message_dict)
if message_dict['Response'] == "Delete_queue":
self.link_state.delete_a_queue(message_dict)
@classmethod
def get_instance(cls):
""" Returns an instance of application switch """
return ApplicationSwitch()
if __name__ == "__main__":
CODE = ApplicationSwitch()
CODE.listen_socket = SwitchSocket(CODE, CODE.application_port)
CODE.link_state = FlowMonitor_3.FlowMonitor_3(CODE.samples, CODE.interval_time, CODE.upper_limit, CODE.lower_limit)
print "Init Finished"
CODE.listen_socket.setDaemon(True)
CODE.listen_socket.start()
CODE.link_state.start_monitoring()