diff --git a/async/Async_OpenFlow.mli b/async/Async_OpenFlow.mli index 3bb89f0..0191a4d 100644 --- a/async/Async_OpenFlow.mli +++ b/async/Async_OpenFlow.mli @@ -49,6 +49,7 @@ module Platform : sig -> ?verbose:bool -> ?log_disconnects:bool -> ?buffer_age_limit:[ `At_most of Time.Span.t | `Unlimited ] + -> ?monitor_connections:bool -> port:int -> unit -> t Deferred.t @@ -65,6 +66,8 @@ module Platform : sig -> m -> [ `Drop of exn | `Sent of Time.t ] Deferred.t + val send_ignore_errors : t -> Client_id.t -> m -> unit + val send_to_all : t -> m -> unit val client_addr_port @@ -144,7 +147,11 @@ module Chunk : sig | `Message of Client_id.t * m ] - val echo : (t, e, e) Stage.t + val set_monitor_interval : t -> Time.Span.t -> unit + val set_idle_wait : t -> Time.Span.t -> unit + val set_kill_wait : t -> Time.Span.t -> unit + + val echo : (t, h, h) Stage.t val handshake : int -> (t, e, h) Stage.t end @@ -165,8 +172,15 @@ module OpenFlow0x01 : sig | `Message of Client_id.t * m ] - val switch_id_of_client : t -> Client_id.t -> SDN_Types.switchId - val client_id_of_switch : t -> SDN_Types.switchId -> Client_id.t + val switch_id_of_client_exn : t -> Client_id.t -> SDN_Types.switchId + val client_id_of_switch_exn : t -> SDN_Types.switchId -> Client_id.t + + val switch_id_of_client : t -> Client_id.t -> SDN_Types.switchId option + val client_id_of_switch : t -> SDN_Types.switchId -> Client_id.t option + + val set_monitor_interval : t -> Time.Span.t -> unit + val set_idle_wait : t -> Time.Span.t -> unit + val set_kill_wait : t -> Time.Span.t -> unit val features : (t, e, f) Stage.t end diff --git a/async/Async_OpenFlow0x01.ml b/async/Async_OpenFlow0x01.ml index 861fe5b..0efdb1a 100644 --- a/async/Async_OpenFlow0x01.ml +++ b/async/Async_OpenFlow0x01.ml @@ -60,20 +60,34 @@ module Controller = struct let close t = ChunkController.close t.sub let has_client_id t = ChunkController.has_client_id t.sub let send t s_id msg = ChunkController.send t.sub s_id (Message.marshal' msg) + let send_ignore_errors t s_id msg = ChunkController.send_ignore_errors t.sub s_id (Message.marshal' msg) let send_to_all t msg = ChunkController.send_to_all t.sub (Message.marshal' msg) let client_addr_port t = ChunkController.client_addr_port t.sub let listening_port t = ChunkController.listening_port t.sub (* XXX(seliopou): Raises `Not_found` if the client is no longer connected. *) - let switch_id_of_client t c_id = ClientMap.find_exn t.switches c_id - let client_id_of_switch t sw_id = SwitchMap.find_exn t.clients sw_id + let switch_id_of_client_exn t c_id = ClientMap.find_exn t.switches c_id + let client_id_of_switch_exn t sw_id = SwitchMap.find_exn t.clients sw_id + + let switch_id_of_client t c_id = ClientMap.find t.switches c_id + let client_id_of_switch t sw_id = SwitchMap.find t.clients sw_id + + let set_monitor_interval (t:t) (s:Time.Span.t) : unit = + ChunkController.set_monitor_interval t.sub s + + let set_idle_wait (t:t) (s:Time.Span.t) : unit = + ChunkController.set_idle_wait t.sub s + + let set_kill_wait (t:t) (s:Time.Span.t) : unit = + ChunkController.set_kill_wait t.sub s let create ?max_pending_connections ?verbose ?log_disconnects - ?buffer_age_limit ~port () = + ?buffer_age_limit + ?monitor_connections ~port () = ChunkController.create ?max_pending_connections ?verbose ?log_disconnects - ?buffer_age_limit ~port () + ?buffer_age_limit ?monitor_connections ~port () >>| function t -> { sub = t ; shakes = ClientSet.empty @@ -81,11 +95,6 @@ module Controller = struct ; clients = SwitchMap.empty } - let _send t c_id m = - send t c_id (0l, m) >>| function - | `Drop exn -> raise exn - | `Sent _ -> () - let openflow0x01 t evt = match evt with | `Connect (c_id, version) -> @@ -104,15 +113,24 @@ module Controller = struct let features t evt = match evt with | `Connect (c_id) -> + assert (not (ClientSet.mem t.shakes c_id)); t.shakes <- ClientSet.add t.shakes c_id; - send t c_id (0l, M.SwitchFeaturesRequest) >>| ChunkController.ensure + send t c_id (0l, M.SwitchFeaturesRequest) + (* XXX(seliopou): This swallows any errors that might have occurred + * while attemping the handshake. Any such error should not be raised, + * since as far as the user is concerned the connection never existed. + * At the very least, the exception should be logged, which it will be + * as long as the log_disconnects option is not disabled when creating + * the controller. + * *) + >>| (function _ -> []) | `Message (c_id, (xid, msg)) when ClientSet.mem t.shakes c_id -> begin match msg with | M.SwitchFeaturesReply fs -> let switch_id = fs.OpenFlow0x01.SwitchFeatures.switch_id in t.switches <- ClientMap.add t.switches c_id switch_id; - t.clients <- SwitchMap.add t.clients switch_id c_id; - t.shakes <- ClientSet.remove t.shakes c_id; + t.clients <- SwitchMap.add t.clients switch_id c_id; + t.shakes <- ClientSet.remove t.shakes c_id; return [`Connect(c_id, fs)] | _ -> Log.of_lazy ~tags ~level:`Debug (lazy @@ -126,11 +144,12 @@ module Controller = struct let m_sw_id = ClientMap.find t.switches c_id in match m_sw_id with | None -> (* features request did not complete *) + assert (ClientSet.mem t.shakes c_id); t.shakes <- ClientSet.remove t.shakes c_id; return [] | Some(sw_id) -> (* features request did complete *) - t.clients <- SwitchMap.remove t.clients sw_id; t.switches <- ClientMap.remove t.switches c_id; + t.clients <- SwitchMap.remove t.clients sw_id; return [`Disconnect(c_id, sw_id, exn)] let listen t = diff --git a/async/Async_OpenFlow0x04.ml b/async/Async_OpenFlow0x04.ml index 462d411..fe01fda 100644 --- a/async/Async_OpenFlow0x04.ml +++ b/async/Async_OpenFlow0x04.ml @@ -60,9 +60,10 @@ module Controller = struct let create ?max_pending_connections ?verbose ?log_disconnects - ?buffer_age_limit ~port () = + ?buffer_age_limit + ?monitor_connections ~port () = ChunkController.create ?max_pending_connections ?verbose ?log_disconnects - ?buffer_age_limit ~port () + ?buffer_age_limit ?monitor_connections ~port () >>| function t -> { sub = t } let listen t = @@ -77,6 +78,7 @@ module Controller = struct let close t = ChunkController.close t.sub let has_client_id t = ChunkController.has_client_id t.sub let send t s_id msg = ChunkController.send t.sub s_id (Message.marshal' msg) + let send_ignore_errors t s_id msg = ChunkController.send_ignore_errors t.sub s_id (Message.marshal' msg) let send_to_all t msg = ChunkController.send_to_all t.sub (Message.marshal' msg) let client_addr_port t = ChunkController.client_addr_port t.sub let listening_port t = ChunkController.listening_port t.sub diff --git a/async/Async_OpenFlowChunk.ml b/async/Async_OpenFlowChunk.ml index 253dcd5..57d04f8 100644 --- a/async/Async_OpenFlowChunk.ml +++ b/async/Async_OpenFlowChunk.ml @@ -27,16 +27,61 @@ module Controller = struct module Platform = Platform.Make(Message) module Client_id = Platform.Client_id - module SwitchSet = Set.Make(Client_id) + module ClientTbl = Hashtbl.Make(Client_id) exception Handshake of Client_id.t * string - type m = Platform.m + module Conn = struct + type t = { + state : [ `Handshake | `Active | `Idle | `Kill ]; + version : int option; + state_entered : Time.t; + last_activity : Time.t + } + + let create () : t = + let now = Time.now () in + { state = `Handshake + ; version = None + ; state_entered = now + ; last_activity = now + } + + let activity (t:t) : t = + let t = { t with last_activity = Time.now () } in + if t.state = `Active then + t + else + { t with state = `Active; state_entered = t.last_activity } + + let complete_handshake (t:t) (version:int) : t = + activity { t with version = Some(version) } + + let idle (t:t) (span : Time.Span.t) : t * bool = + let right_now = Time.now () in + if t.state = `Active && Time.(add t.last_activity span <= right_now) then + { t with state = `Idle; state_entered = right_now }, true + else + t, false + + let kill (t:t) (span : Time.Span.t) : t * bool = + let right_now = Time.now () in + if t.state = `Idle && Time.(add t.state_entered span <= right_now) then + { t with state = `Kill; state_entered = right_now }, true + else + t, false + + end + type t = { platform : Platform.t; - mutable handshakes : SwitchSet.t + clients : Conn.t ClientTbl.t; + mutable monitor_interval : Time.Span.t; + mutable idle_wait : Time.Span.t; + mutable kill_wait : Time.Span.t; } + type m = Platform.m type e = Platform.e type h = [ | `Connect of Client_id.t * int @@ -44,65 +89,194 @@ module Controller = struct | `Message of Client_id.t * m ] - let ensure response = - match response with - | `Sent _ -> [] - | `Drop exn -> raise exn + let echo_request : int option -> Message.t = + let body = Cstruct.create 0 in + fun v -> + match v with + | None -> assert false + | Some(v) -> + let open Header in + { version = v; type_code = type_code_echo_request; length = size; xid = 0l }, body - let handshake v t evt = - let open Header in - match evt with - | `Connect s_id -> - let header = { version = v; type_code = type_code_hello; - length = size; xid = 0l; } in - Platform.send t.platform s_id (header, Cstruct.of_string "") - >>| ensure - >>| (fun e -> t.handshakes <- SwitchSet.add t.handshakes s_id; e) - | `Message (s_id, msg) when SwitchSet.mem t.handshakes s_id -> - let hdr, bits = msg in - begin - t.handshakes <- SwitchSet.remove t.handshakes s_id; - - if not (hdr.type_code = type_code_hello) then begin - Platform.close t.platform s_id; - raise (Handshake (s_id, Printf.sprintf - "Expected 0 code in header: %s%!" - (Header.to_string hdr))) - end - end; - return [`Connect (s_id, min hdr.version v)] - | `Message x -> return [`Message x] - | `Disconnect (s_id, _) when SwitchSet.mem t.handshakes s_id -> - t.handshakes <- SwitchSet.remove t.handshakes s_id; - return [] - | `Disconnect x -> return [`Disconnect x] + module Handler = struct + let connect (t:t) (c_id:Client_id.t) = + ClientTbl.add_exn t.clients c_id (Conn.create ()) - let echo t evt = - let open Header in - match evt with - | `Message (s_id, (hdr, bytes)) - when hdr.Header.type_code = type_code_echo_request -> - Platform.send t.platform s_id ({ hdr with type_code = type_code_echo_reply }, bytes) - >>| ensure - | _ -> return [evt] + let handshake (t:t) (c_id:Client_id.t) (version:int) = + ClientTbl.change t.clients c_id (function + | None -> assert false + | Some(conn) -> Some(Conn.complete_handshake conn version)) + + let activity (t:t) ?ver (c_id:Client_id.t) = + ClientTbl.change t.clients c_id (function + | None -> assert false + | Some(conn) -> Some(Conn.activity conn)) + + let idle (t:t) (c_id:Client_id.t) (span : Time.Span.t) = + ClientTbl.change t.clients c_id (function + | None -> assert false + | Some(conn) -> + let conn', change = Conn.idle conn span in + if change then begin + printf "client %s marked as idle... probing\n%!" (Client_id.to_string c_id); + let echo_req = echo_request conn'.Conn.version in + let result = Result.try_with (fun () -> + Platform.send_ignore_errors t.platform c_id echo_req) in + match result with + | Error exn -> + printf "client %s write failed: %s\n%!" + (Client_id.to_string c_id) (Exn.to_string exn); + Platform.close t.platform c_id + | Ok () -> () + end; + Some(conn')) + + let kill (t:t) (c_id:Client_id.t) (span : Time.Span.t) = + ClientTbl.change t.clients c_id (function + | None -> assert false + | Some(conn) -> + let conn', change = Conn.kill conn span in + if change then begin + printf "client %s killed\n%!" (Client_id.to_string c_id); + Platform.close t.platform c_id; + end; + Some(conn')) + end + + module Mon = struct + let rec monitor t f = + after t.monitor_interval >>> fun () -> + ClientTbl.iter t.clients (fun ~key:c_id ~data:_ -> f t c_id); + monitor t f + + let rec mark_idle t = + monitor t (fun t c_id -> Handler.idle t c_id t.idle_wait) + + let rec kill_idle t = + monitor t (fun t c_id -> Handler.kill t c_id t.kill_wait) + end + + let set_monitor_interval (t:t) (s:Time.Span.t) : unit = + t.monitor_interval <- s + + let set_idle_wait (t:t) (s:Time.Span.t) : unit = + t.idle_wait <- s + + let set_kill_wait (t:t) (s:Time.Span.t) : unit = + t.kill_wait <- s let create ?max_pending_connections ?verbose ?log_disconnects - ?buffer_age_limit ~port () = + ?buffer_age_limit + ?(monitor_connections=false) ~port () = Platform.create ?max_pending_connections ?verbose ?log_disconnects - ?buffer_age_limit ~port () - >>| function t -> { - platform = t; - handshakes = SwitchSet.empty - } + ?buffer_age_limit ~monitor_connections ~port () + >>| function t -> + let ctl = { + platform = t; + clients = ClientTbl.create (); + monitor_interval = Time.Span.of_ms 500.0; + idle_wait = Time.Span.of_sec 5.0; + kill_wait = Time.Span.of_sec 3.0; + } in + if monitor_connections then begin + Mon.mark_idle ctl; + Mon.kill_idle ctl + end; + ctl let listen t = Platform.listen t.platform - let close t = Platform.close t.platform - let has_client_id t = Platform.has_client_id t.platform - let send t = Platform.send t.platform + let close t c_id = + Platform.close t.platform c_id + + let has_client_id t c_id = + Platform.has_client_id t.platform c_id && + match ClientTbl.find t.clients c_id with + | Some(conn) -> not (conn.Conn.state = `Handshake) + | _ -> false + + let send t c_id m = + Platform.send t.platform c_id m + >>| function + | `Sent x -> Handler.activity t c_id; `Sent x + | `Drop x -> `Drop x + + let send_ignore_errors t = Platform.send_ignore_errors t.platform + let send_to_all t = Platform.send_to_all t.platform let client_addr_port t = Platform.client_addr_port t.platform let listening_port t = Platform.listening_port t.platform + + let handshake v t evt = + let open Header in + match evt with + | `Connect c_id -> + Handler.connect t c_id; + let header = { version = v; type_code = type_code_hello; + length = size; xid = 0l; } in + Platform.send t.platform c_id (header, Cstruct.of_string "") + (* XXX(seliopou): This swallows any errors that might have occurred + * while attemping the handshake. Any such error should not be raised, + * since as far as the user is concerned the connection never existed. + * At the very least, the exception should be logged, which it will be + * as long as the log_disconnects option is not disabled when creating + * the controller. + * *) + >>| (function _ -> []) + | `Message (c_id, msg) -> + begin match ClientTbl.find t.clients c_id with + | None -> assert false + | Some({ Conn.state = `Handshake }) -> + let hdr, bits = msg in + begin + if not (hdr.type_code = type_code_hello) then begin + close t c_id; + raise (Handshake (c_id, Printf.sprintf + "Expected 0 code in header: %s%!" + (Header.to_string hdr))) + end + end; + Handler.handshake t c_id (min hdr.version v); + return [`Connect (c_id, min hdr.version v)] + | Some(_) -> + Handler.activity t c_id; + return [`Message (c_id, msg)] + end + | `Disconnect (c_id, exn) -> + begin match ClientTbl.find t.clients c_id with + | None -> assert false + | Some({ Conn.state = `Handshake }) -> + ClientTbl.remove t.clients c_id; + return [] + | Some(_) -> + ClientTbl.remove t.clients c_id; + return [`Disconnect (c_id, exn)] + end + + let echo t evt = + let open Header in + match evt with + | `Message (c_id, (hdr, bytes)) -> + begin if hdr.Header.type_code = type_code_echo_request then + (* Echo requests get a reply *) + let hdr = { hdr with type_code = type_code_echo_reply } in + send t c_id (hdr , bytes) + (* XXX(seliopou): This swallows any errors that might have occurred + * while attemping the handshake. Any such error should not be raised, + * since as far as the user is concerned the connection never existed. + * At the very least, the exception should be logged, which it will be + * as long as the log_disconnects option is not disabled when creating + * the controller. + * *) + >>| (function _ -> []) + else if hdr.Header.type_code = type_code_echo_reply then + (* Echo replies get eaten *) + return [] + else + (* All other messages get forwarded *) + return [evt] + end + | _ -> return [evt] end diff --git a/async/Async_OpenFlow_Platform.ml b/async/Async_OpenFlow_Platform.ml index e4723b1..9adaea8 100644 --- a/async/Async_OpenFlow_Platform.ml +++ b/async/Async_OpenFlow_Platform.ml @@ -4,6 +4,8 @@ open Core.Std module Header = OpenFlow_Header module type Message = Async_OpenFlow_Message.Message +exception Flush_closed_writer + module type S = sig type t @@ -22,6 +24,7 @@ module type S = sig -> ?verbose:bool -> ?log_disconnects:bool -> ?buffer_age_limit:[ `At_most of Time.Span.t | `Unlimited ] + -> ?monitor_connections:bool -> port:int -> unit -> t Deferred.t @@ -38,6 +41,8 @@ module type S = sig -> m -> [ `Drop of exn | `Sent of Time.t ] Deferred.t + val send_ignore_errors : t -> Client_id.t -> m -> unit + val send_to_all : t -> m -> unit val client_addr_port @@ -69,7 +74,14 @@ module Make(Message : Message) = struct let close ((_, w) : t) = Writer.close w - let flushed_time ((_, w) : t) = Writer.flushed_time w + let flushed_time ((_, w) : t) = + let open Deferred in + choose [ choice (Writer.flushed_time w) (fun x -> `F x) + ; choice (Writer.consumer_left w) (fun () -> `C ()) + ] + >>| function + | `F x -> x + | `C () -> raise Flush_closed_writer let read ((r, _) : t) = Serialization.deserialize r @@ -92,7 +104,9 @@ module Make(Message : Message) = struct let create ?max_pending_connections ?verbose ?log_disconnects - ?buffer_age_limit ~port () = + ?buffer_age_limit + ?monitor_connections + ~port () = Impl.create ?max_pending_connections ?verbose ?log_disconnects ?buffer_age_limit ~port ~auth:(fun _ _ _ -> return `Allow) () @@ -100,16 +114,22 @@ module Make(Message : Message) = struct let open Impl.Server_read_result in Pipe.map (Impl.listen t) ~f:(function - | Connect id -> `Connect id + | Connect id -> `Connect id | Disconnect (id, sexp) -> `Disconnect (id, sexp) - | Denied_access msg -> raise (Invalid_argument "Denied_access should not happen") - | Data (id, m) -> `Message (id, m)) + | Data (id, m) -> `Message (id, m) + | Denied_access msg -> assert false) let close = Impl.close let has_client_id = Impl.has_client_id - let send = Impl.send + let send t c_id m = + Monitor.try_with (fun () -> Impl.send t c_id m) + >>| function + | Ok x -> x + | Error _exn -> `Drop _exn + + let send_ignore_errors = Impl.send_ignore_errors let send_to_all = Impl.send_to_all