From 25c1eda6ad52b3b37ad118e7c53822cad28ea0d4 Mon Sep 17 00:00:00 2001 From: Spiros Eliopoulos Date: Thu, 26 Jun 2014 16:58:06 -0400 Subject: [PATCH 01/15] crashes: Add safe/unsafe api for client/switch id lookup --- async/Async_OpenFlow.mli | 7 +++++-- async/Async_OpenFlow0x01.ml | 7 +++++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/async/Async_OpenFlow.mli b/async/Async_OpenFlow.mli index 3bb89f0..ea57ffa 100644 --- a/async/Async_OpenFlow.mli +++ b/async/Async_OpenFlow.mli @@ -165,8 +165,11 @@ module OpenFlow0x01 : sig | `Message of Client_id.t * m ] - val switch_id_of_client : t -> Client_id.t -> SDN_Types.switchId - val client_id_of_switch : t -> SDN_Types.switchId -> Client_id.t + val switch_id_of_client_exn : t -> Client_id.t -> SDN_Types.switchId + val client_id_of_switch_exn : t -> SDN_Types.switchId -> Client_id.t + + val switch_id_of_client : t -> Client_id.t -> SDN_Types.switchId option + val client_id_of_switch : t -> SDN_Types.switchId -> Client_id.t option val features : (t, e, f) Stage.t end diff --git a/async/Async_OpenFlow0x01.ml b/async/Async_OpenFlow0x01.ml index 861fe5b..60d9910 100644 --- a/async/Async_OpenFlow0x01.ml +++ b/async/Async_OpenFlow0x01.ml @@ -65,8 +65,11 @@ module Controller = struct let listening_port t = ChunkController.listening_port t.sub (* XXX(seliopou): Raises `Not_found` if the client is no longer connected. *) - let switch_id_of_client t c_id = ClientMap.find_exn t.switches c_id - let client_id_of_switch t sw_id = SwitchMap.find_exn t.clients sw_id + let switch_id_of_client_exn t c_id = ClientMap.find_exn t.switches c_id + let client_id_of_switch_exn t sw_id = SwitchMap.find_exn t.clients sw_id + + let switch_id_of_client t c_id = ClientMap.find t.switches c_id + let client_id_of_switch t sw_id = SwitchMap.find t.clients sw_id let create ?max_pending_connections ?verbose From b97eab27ecc3a0457a59ecbde8ee59acab6c1d49 Mon Sep 17 00:00:00 2001 From: Spiros Eliopoulos Date: Wed, 2 Jul 2014 11:54:37 -0400 Subject: [PATCH 02/15] crashes: refactor code, add asserts --- async/Async_OpenFlow0x01.ml | 13 +++++-------- async/Async_OpenFlowChunk.ml | 6 ++++-- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/async/Async_OpenFlow0x01.ml b/async/Async_OpenFlow0x01.ml index 60d9910..c77fbbd 100644 --- a/async/Async_OpenFlow0x01.ml +++ b/async/Async_OpenFlow0x01.ml @@ -84,11 +84,6 @@ module Controller = struct ; clients = SwitchMap.empty } - let _send t c_id m = - send t c_id (0l, m) >>| function - | `Drop exn -> raise exn - | `Sent _ -> () - let openflow0x01 t evt = match evt with | `Connect (c_id, version) -> @@ -107,6 +102,7 @@ module Controller = struct let features t evt = match evt with | `Connect (c_id) -> + assert (not (ClientSet.mem t.shakes c_id)); t.shakes <- ClientSet.add t.shakes c_id; send t c_id (0l, M.SwitchFeaturesRequest) >>| ChunkController.ensure | `Message (c_id, (xid, msg)) when ClientSet.mem t.shakes c_id -> @@ -114,8 +110,8 @@ module Controller = struct | M.SwitchFeaturesReply fs -> let switch_id = fs.OpenFlow0x01.SwitchFeatures.switch_id in t.switches <- ClientMap.add t.switches c_id switch_id; - t.clients <- SwitchMap.add t.clients switch_id c_id; - t.shakes <- ClientSet.remove t.shakes c_id; + t.clients <- SwitchMap.add t.clients switch_id c_id; + t.shakes <- ClientSet.remove t.shakes c_id; return [`Connect(c_id, fs)] | _ -> Log.of_lazy ~tags ~level:`Debug (lazy @@ -129,11 +125,12 @@ module Controller = struct let m_sw_id = ClientMap.find t.switches c_id in match m_sw_id with | None -> (* features request did not complete *) + assert (ClientSet.mem t.shakes c_id); t.shakes <- ClientSet.remove t.shakes c_id; return [] | Some(sw_id) -> (* features request did complete *) - t.clients <- SwitchMap.remove t.clients sw_id; t.switches <- ClientMap.remove t.switches c_id; + t.clients <- SwitchMap.remove t.clients sw_id; return [`Disconnect(c_id, sw_id, exn)] let listen t = diff --git a/async/Async_OpenFlowChunk.ml b/async/Async_OpenFlowChunk.ml index 253dcd5..d871eaa 100644 --- a/async/Async_OpenFlowChunk.ml +++ b/async/Async_OpenFlowChunk.ml @@ -55,9 +55,11 @@ module Controller = struct | `Connect s_id -> let header = { version = v; type_code = type_code_hello; length = size; xid = 0l; } in + t.handshakes <- SwitchSet.add t.handshakes s_id; Platform.send t.platform s_id (header, Cstruct.of_string "") - >>| ensure - >>| (fun e -> t.handshakes <- SwitchSet.add t.handshakes s_id; e) + >>| (function + | `Sent _ -> [] + | `Drop _ -> t.handshakes <- SwitchSet.remove t.handshakes s_id; []) | `Message (s_id, msg) when SwitchSet.mem t.handshakes s_id -> let hdr, bits = msg in begin From 34b4fc821a5b9dead7bd1473b74620d1d1a0ce59 Mon Sep 17 00:00:00 2001 From: Spiros Eliopoulos Date: Wed, 2 Jul 2014 12:46:38 -0400 Subject: [PATCH 03/15] crashes: make send fail on client disconnects Writer.flushed w will not become determined if there is a write error, which will cause the controller to hang unless every send is checked to make sure the client is still alive first. --- async/Async_OpenFlow_Platform.ml | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/async/Async_OpenFlow_Platform.ml b/async/Async_OpenFlow_Platform.ml index e4723b1..8869768 100644 --- a/async/Async_OpenFlow_Platform.ml +++ b/async/Async_OpenFlow_Platform.ml @@ -4,6 +4,8 @@ open Core.Std module Header = OpenFlow_Header module type Message = Async_OpenFlow_Message.Message +exception Flush_closed_writer + module type S = sig type t @@ -69,7 +71,14 @@ module Make(Message : Message) = struct let close ((_, w) : t) = Writer.close w - let flushed_time ((_, w) : t) = Writer.flushed_time w + let flushed_time ((_, w) : t) = + let open Deferred in + choose [ choice (Writer.flushed_time w) (fun x -> `F x) + ; choice (Writer.consumer_left w) (fun () -> `C ()) + ] + >>| function + | `F x -> x + | `C () -> raise Flush_closed_writer let read ((r, _) : t) = Serialization.deserialize r @@ -109,7 +118,11 @@ module Make(Message : Message) = struct let has_client_id = Impl.has_client_id - let send = Impl.send + let send t c_id m = + Monitor.try_with (fun () -> Impl.send t c_id m) + >>| function + | Ok x -> x + | Error _exn -> `Drop _exn let send_to_all = Impl.send_to_all From fc359871b74d0f47a899ca6a71dce9b57afd18be Mon Sep 17 00:00:00 2001 From: Spiros Eliopoulos Date: Thu, 3 Jul 2014 17:42:04 -0400 Subject: [PATCH 04/15] crashes: reimplement Async_OpenFlowChunk to record livenes As written previously, the Async_OpenFlowChunk and all controllers based on it did not monitor the liveness of switch connections. As a result when a switch disconnects unexpectedly, the controller may not become aware of that for some time. The low-level chunk controller has been rewritten to track the various states of switch connections, as well as activity on that switch. Activity in this case includes both receiving a message from the switch, and successfully sending a message to the switch. As of this commit, switches are marked as idle but no probes are sent and no connection cleanup happens. --- async/Async_OpenFlow.mli | 2 +- async/Async_OpenFlowChunk.ml | 209 ++++++++++++++++++++++++++--------- 2 files changed, 160 insertions(+), 51 deletions(-) diff --git a/async/Async_OpenFlow.mli b/async/Async_OpenFlow.mli index ea57ffa..e7cddf4 100644 --- a/async/Async_OpenFlow.mli +++ b/async/Async_OpenFlow.mli @@ -144,7 +144,7 @@ module Chunk : sig | `Message of Client_id.t * m ] - val echo : (t, e, e) Stage.t + val echo : (t, h, h) Stage.t val handshake : int -> (t, e, h) Stage.t end diff --git a/async/Async_OpenFlowChunk.ml b/async/Async_OpenFlowChunk.ml index d871eaa..fa8ddd9 100644 --- a/async/Async_OpenFlowChunk.ml +++ b/async/Async_OpenFlowChunk.ml @@ -27,16 +27,50 @@ module Controller = struct module Platform = Platform.Make(Message) module Client_id = Platform.Client_id - module SwitchSet = Set.Make(Client_id) + module ClientTbl = Hashtbl.Make(Client_id) exception Handshake of Client_id.t * string - type m = Platform.m + module Conn = struct + type t = { + state : [ `Handshake | `Active | `Idle ]; + version : int option; + state_entered : Time.t; + last_activity : Time.t + } + + let create () : t = + let now = Time.now () in + { state = `Handshake + ; version = None + ; state_entered = now + ; last_activity = now + } + + let activity (t:t) : t = + let t = { t with last_activity = Time.now () } in + if t.state = `Active then + t + else + { t with state = `Active; state_entered = t.last_activity } + + let complete_handshake (t:t) (version:int) : t = + activity { t with version = Some(version) } + + let idle (t:t) (expires : Time.Span.t) : t = + let right_now = Time.now () in + if t.state = `Idle || Time.(add t.last_activity expires > right_now) then + t + else + { t with state = `Idle; state_entered = right_now } + end + type t = { platform : Platform.t; - mutable handshakes : SwitchSet.t + clients : Conn.t ClientTbl.t } + type m = Platform.m type e = Platform.e type h = [ | `Connect of Client_id.t * int @@ -44,6 +78,78 @@ module Controller = struct | `Message of Client_id.t * m ] + module Handler = struct + let connect (t:t) (c_id:Client_id.t) = + ClientTbl.add_exn t.clients c_id (Conn.create ()) + + let handshake (t:t) (c_id:Client_id.t) (version:int) = + ClientTbl.change t.clients c_id (function + | None -> assert false + | Some(conn) -> Some(Conn.complete_handshake conn version)) + + let activity (t:t) ?ver (c_id:Client_id.t) = + ClientTbl.change t.clients c_id (function + | None -> assert false + | Some(conn) -> Some(Conn.activity conn)) + + let idle (t:t) (c_id:Client_id.t) (expires : Time.Span.t) = + ClientTbl.change t.clients c_id (function + | None -> assert false + | Some(conn) -> + let conn' = Conn.idle conn expires in + begin if not (conn = conn') then + printf "client %s marked as idle" (Client_id.to_string c_id) + end; + Some(conn')) + + let disconnect (t:t) (c_id:Client_id.t) = + ClientTbl.remove t.clients c_id + end + + module Mon = struct + let rec mark_idle t wait expires = + after wait >>> fun () -> + ClientTbl.iter t.clients (fun ~key:c_id ~data:_ -> + Handler.idle t c_id expires); + mark_idle t wait expires + end + + let create ?max_pending_connections + ?verbose + ?log_disconnects + ?buffer_age_limit ~port () = + Platform.create ?max_pending_connections ?verbose ?log_disconnects + ?buffer_age_limit ~port () + >>| function t -> + let ctl = { + platform = t; + clients = ClientTbl.create (); + } in + Mon.mark_idle ctl (Time.Span.of_sec 1.0) (Time.Span.of_sec 3.0); + ctl + + let listen t = Platform.listen t.platform + + let close t c_id = + Handler.disconnect t c_id; + Platform.close t.platform c_id + + let has_client_id t c_id = + match ClientTbl.find t.clients c_id with + | None + | Some({ Conn.state = `Handshake }) -> false + | _ -> true + + let send t c_id m = + Platform.send t.platform c_id m + >>| function + | `Sent x -> Handler.activity t c_id; `Sent x + | `Drop x -> `Drop x + + let send_to_all t = Platform.send_to_all t.platform + let client_addr_port t = Platform.client_addr_port t.platform + let listening_port t = Platform.listening_port t.platform + let ensure response = match response with | `Sent _ -> [] @@ -52,59 +158,62 @@ module Controller = struct let handshake v t evt = let open Header in match evt with - | `Connect s_id -> + | `Connect c_id -> + Handler.connect t c_id; let header = { version = v; type_code = type_code_hello; length = size; xid = 0l; } in - t.handshakes <- SwitchSet.add t.handshakes s_id; - Platform.send t.platform s_id (header, Cstruct.of_string "") + Platform.send t.platform c_id (header, Cstruct.of_string "") >>| (function - | `Sent _ -> [] - | `Drop _ -> t.handshakes <- SwitchSet.remove t.handshakes s_id; []) - | `Message (s_id, msg) when SwitchSet.mem t.handshakes s_id -> - let hdr, bits = msg in - begin - t.handshakes <- SwitchSet.remove t.handshakes s_id; - - if not (hdr.type_code = type_code_hello) then begin - Platform.close t.platform s_id; - raise (Handshake (s_id, Printf.sprintf - "Expected 0 code in header: %s%!" - (Header.to_string hdr))) - end - end; - return [`Connect (s_id, min hdr.version v)] - | `Message x -> return [`Message x] - | `Disconnect (s_id, _) when SwitchSet.mem t.handshakes s_id -> - t.handshakes <- SwitchSet.remove t.handshakes s_id; - return [] - | `Disconnect x -> return [`Disconnect x] + | `Sent _ -> [] + | `Drop exn -> raise exn) + | `Message (c_id, msg) -> + begin match ClientTbl.find t.clients c_id with + | None -> assert false + | Some({ Conn.state = `Handshake }) -> + let hdr, bits = msg in + begin + if not (hdr.type_code = type_code_hello) then begin + close t c_id; + raise (Handshake (c_id, Printf.sprintf + "Expected 0 code in header: %s%!" + (Header.to_string hdr))) + end + end; + print_endline "HANDSHAKE COMPLETE"; + Handler.handshake t c_id (min hdr.version v); + return [`Connect (c_id, min hdr.version v)] + | Some(_) -> + Handler.activity t c_id; + return [`Message (c_id, msg)] + end + | `Disconnect (c_id, exn) -> + begin match ClientTbl.find t.clients c_id with + | None -> assert false + | Some({ Conn.state = `Handshake }) -> + Handler.disconnect t c_id; + return [] + | Some(_) -> + Handler.disconnect t c_id; + return [`Disconnect (c_id, exn)] + end let echo t evt = let open Header in match evt with - | `Message (s_id, (hdr, bytes)) - when hdr.Header.type_code = type_code_echo_request -> - Platform.send t.platform s_id ({ hdr with type_code = type_code_echo_reply }, bytes) - >>| ensure + | `Message (c_id, (hdr, bytes)) -> + begin if hdr.Header.type_code = type_code_echo_request then + (* Echo requests get a reply *) + let hdr = { hdr with type_code = type_code_echo_reply } in + send t c_id (hdr , bytes) + >>| function + | `Sent _ -> [] + | `Drop exn -> raise exn + else if hdr.Header.type_code = type_code_echo_reply then + (* Echo replies get eaten *) + return [] + else + (* All other messages get forwarded *) + return [evt] + end | _ -> return [evt] - - let create ?max_pending_connections - ?verbose - ?log_disconnects - ?buffer_age_limit ~port () = - Platform.create ?max_pending_connections ?verbose ?log_disconnects - ?buffer_age_limit ~port () - >>| function t -> { - platform = t; - handshakes = SwitchSet.empty - } - - let listen t = Platform.listen t.platform - - let close t = Platform.close t.platform - let has_client_id t = Platform.has_client_id t.platform - let send t = Platform.send t.platform - let send_to_all t = Platform.send_to_all t.platform - let client_addr_port t = Platform.client_addr_port t.platform - let listening_port t = Platform.listening_port t.platform end From 915dfb862144700f1e424c12ea372bd01f2c1dfb Mon Sep 17 00:00:00 2001 From: Spiros Eliopoulos Date: Mon, 7 Jul 2014 10:16:01 -0400 Subject: [PATCH 05/15] crashes: remove println debugging --- async/Async_OpenFlowChunk.ml | 1 - 1 file changed, 1 deletion(-) diff --git a/async/Async_OpenFlowChunk.ml b/async/Async_OpenFlowChunk.ml index fa8ddd9..5a49e04 100644 --- a/async/Async_OpenFlowChunk.ml +++ b/async/Async_OpenFlowChunk.ml @@ -179,7 +179,6 @@ module Controller = struct (Header.to_string hdr))) end end; - print_endline "HANDSHAKE COMPLETE"; Handler.handshake t c_id (min hdr.version v); return [`Connect (c_id, min hdr.version v)] | Some(_) -> From a103e160cd0e62c569df043c18b4ba97af2e713c Mon Sep 17 00:00:00 2001 From: Spiros Eliopoulos Date: Mon, 7 Jul 2014 18:15:08 -0400 Subject: [PATCH 06/15] crashes: expose send_ignore_errors This is part of the typed_tcp interface that was previously hidden, but in fact needs to be exposed in order properly do connection liveness checking. --- async/Async_OpenFlow.mli | 2 ++ async/Async_OpenFlow0x01.ml | 1 + async/Async_OpenFlow0x04.ml | 1 + async/Async_OpenFlowChunk.ml | 2 ++ async/Async_OpenFlow_Platform.ml | 4 ++++ 5 files changed, 10 insertions(+) diff --git a/async/Async_OpenFlow.mli b/async/Async_OpenFlow.mli index e7cddf4..5891faa 100644 --- a/async/Async_OpenFlow.mli +++ b/async/Async_OpenFlow.mli @@ -65,6 +65,8 @@ module Platform : sig -> m -> [ `Drop of exn | `Sent of Time.t ] Deferred.t + val send_ignore_errors : t -> Client_id.t -> m -> unit + val send_to_all : t -> m -> unit val client_addr_port diff --git a/async/Async_OpenFlow0x01.ml b/async/Async_OpenFlow0x01.ml index c77fbbd..90d8c96 100644 --- a/async/Async_OpenFlow0x01.ml +++ b/async/Async_OpenFlow0x01.ml @@ -60,6 +60,7 @@ module Controller = struct let close t = ChunkController.close t.sub let has_client_id t = ChunkController.has_client_id t.sub let send t s_id msg = ChunkController.send t.sub s_id (Message.marshal' msg) + let send_ignore_errors t s_id msg = ChunkController.send_ignore_errors t.sub s_id (Message.marshal' msg) let send_to_all t msg = ChunkController.send_to_all t.sub (Message.marshal' msg) let client_addr_port t = ChunkController.client_addr_port t.sub let listening_port t = ChunkController.listening_port t.sub diff --git a/async/Async_OpenFlow0x04.ml b/async/Async_OpenFlow0x04.ml index 462d411..6cc3372 100644 --- a/async/Async_OpenFlow0x04.ml +++ b/async/Async_OpenFlow0x04.ml @@ -77,6 +77,7 @@ module Controller = struct let close t = ChunkController.close t.sub let has_client_id t = ChunkController.has_client_id t.sub let send t s_id msg = ChunkController.send t.sub s_id (Message.marshal' msg) + let send_ignore_errors t s_id msg = ChunkController.send_ignore_errors t.sub s_id (Message.marshal' msg) let send_to_all t msg = ChunkController.send_to_all t.sub (Message.marshal' msg) let client_addr_port t = ChunkController.client_addr_port t.sub let listening_port t = ChunkController.listening_port t.sub diff --git a/async/Async_OpenFlowChunk.ml b/async/Async_OpenFlowChunk.ml index 5a49e04..29bbea4 100644 --- a/async/Async_OpenFlowChunk.ml +++ b/async/Async_OpenFlowChunk.ml @@ -146,6 +146,8 @@ module Controller = struct | `Sent x -> Handler.activity t c_id; `Sent x | `Drop x -> `Drop x + let send_ignore_errors t = Platform.send_ignore_errors t.platform + let send_to_all t = Platform.send_to_all t.platform let client_addr_port t = Platform.client_addr_port t.platform let listening_port t = Platform.listening_port t.platform diff --git a/async/Async_OpenFlow_Platform.ml b/async/Async_OpenFlow_Platform.ml index 8869768..177354b 100644 --- a/async/Async_OpenFlow_Platform.ml +++ b/async/Async_OpenFlow_Platform.ml @@ -40,6 +40,8 @@ module type S = sig -> m -> [ `Drop of exn | `Sent of Time.t ] Deferred.t + val send_ignore_errors : t -> Client_id.t -> m -> unit + val send_to_all : t -> m -> unit val client_addr_port @@ -124,6 +126,8 @@ module Make(Message : Message) = struct | Ok x -> x | Error _exn -> `Drop _exn + let send_ignore_errors = Impl.send_ignore_errors + let send_to_all = Impl.send_to_all let client_addr_port = Impl.client_addr_port From 89ea984152c893209834593d086165075b14c123 Mon Sep 17 00:00:00 2001 From: Spiros Eliopoulos Date: Tue, 8 Jul 2014 15:38:22 -0400 Subject: [PATCH 07/15] crashes: remove ensure function from ChunkController API This function encourages bad error handling, and isn't used in many places anyways. --- async/Async_OpenFlow0x01.ml | 5 ++++- async/Async_OpenFlowChunk.ml | 5 ----- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/async/Async_OpenFlow0x01.ml b/async/Async_OpenFlow0x01.ml index 90d8c96..0676fea 100644 --- a/async/Async_OpenFlow0x01.ml +++ b/async/Async_OpenFlow0x01.ml @@ -105,7 +105,10 @@ module Controller = struct | `Connect (c_id) -> assert (not (ClientSet.mem t.shakes c_id)); t.shakes <- ClientSet.add t.shakes c_id; - send t c_id (0l, M.SwitchFeaturesRequest) >>| ChunkController.ensure + send t c_id (0l, M.SwitchFeaturesRequest) + >>| (function + | `Sent _ -> [] + | `Drop exn -> raise exn) | `Message (c_id, (xid, msg)) when ClientSet.mem t.shakes c_id -> begin match msg with | M.SwitchFeaturesReply fs -> diff --git a/async/Async_OpenFlowChunk.ml b/async/Async_OpenFlowChunk.ml index 29bbea4..8bd7538 100644 --- a/async/Async_OpenFlowChunk.ml +++ b/async/Async_OpenFlowChunk.ml @@ -152,11 +152,6 @@ module Controller = struct let client_addr_port t = Platform.client_addr_port t.platform let listening_port t = Platform.listening_port t.platform - let ensure response = - match response with - | `Sent _ -> [] - | `Drop exn -> raise exn - let handshake v t evt = let open Header in match evt with From 45d3b5d0d673ec2cad605a6f8d4713c9b467d75c Mon Sep 17 00:00:00 2001 From: Spiros Eliopoulos Date: Thu, 10 Jul 2014 12:48:37 -0400 Subject: [PATCH 08/15] crashes: change exception to assertion The platform does not use the underlying authentication features of typed_tcp. --- async/Async_OpenFlow_Platform.ml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/async/Async_OpenFlow_Platform.ml b/async/Async_OpenFlow_Platform.ml index 177354b..4c7e46c 100644 --- a/async/Async_OpenFlow_Platform.ml +++ b/async/Async_OpenFlow_Platform.ml @@ -111,10 +111,10 @@ module Make(Message : Message) = struct let open Impl.Server_read_result in Pipe.map (Impl.listen t) ~f:(function - | Connect id -> `Connect id + | Connect id -> `Connect id | Disconnect (id, sexp) -> `Disconnect (id, sexp) - | Denied_access msg -> raise (Invalid_argument "Denied_access should not happen") - | Data (id, m) -> `Message (id, m)) + | Data (id, m) -> `Message (id, m) + | Denied_access msg -> assert false) let close = Impl.close From f6d1d490141cf69598c4f771fdaef61f80ff039d Mon Sep 17 00:00:00 2001 From: Spiros Eliopoulos Date: Thu, 10 Jul 2014 13:32:48 -0400 Subject: [PATCH 09/15] crashes: swallow exceptions in stages See code comments for rationale. --- async/Async_OpenFlow0x01.ml | 11 ++++++++--- async/Async_OpenFlowChunk.ml | 22 ++++++++++++++++------ 2 files changed, 24 insertions(+), 9 deletions(-) diff --git a/async/Async_OpenFlow0x01.ml b/async/Async_OpenFlow0x01.ml index 0676fea..ea07914 100644 --- a/async/Async_OpenFlow0x01.ml +++ b/async/Async_OpenFlow0x01.ml @@ -106,9 +106,14 @@ module Controller = struct assert (not (ClientSet.mem t.shakes c_id)); t.shakes <- ClientSet.add t.shakes c_id; send t c_id (0l, M.SwitchFeaturesRequest) - >>| (function - | `Sent _ -> [] - | `Drop exn -> raise exn) + (* XXX(seliopou): This swallows any errors that might have occurred + * while attemping the handshake. Any such error should not be raised, + * since as far as the user is concerned the connection never existed. + * At the very least, the exception should be logged, which it will be + * as long as the log_disconnects option is not disabled when creating + * the controller. + * *) + >>| (function _ -> []) | `Message (c_id, (xid, msg)) when ClientSet.mem t.shakes c_id -> begin match msg with | M.SwitchFeaturesReply fs -> diff --git a/async/Async_OpenFlowChunk.ml b/async/Async_OpenFlowChunk.ml index 8bd7538..4fdb1c3 100644 --- a/async/Async_OpenFlowChunk.ml +++ b/async/Async_OpenFlowChunk.ml @@ -160,9 +160,14 @@ module Controller = struct let header = { version = v; type_code = type_code_hello; length = size; xid = 0l; } in Platform.send t.platform c_id (header, Cstruct.of_string "") - >>| (function - | `Sent _ -> [] - | `Drop exn -> raise exn) + (* XXX(seliopou): This swallows any errors that might have occurred + * while attemping the handshake. Any such error should not be raised, + * since as far as the user is concerned the connection never existed. + * At the very least, the exception should be logged, which it will be + * as long as the log_disconnects option is not disabled when creating + * the controller. + * *) + >>| (function _ -> []) | `Message (c_id, msg) -> begin match ClientTbl.find t.clients c_id with | None -> assert false @@ -201,9 +206,14 @@ module Controller = struct (* Echo requests get a reply *) let hdr = { hdr with type_code = type_code_echo_reply } in send t c_id (hdr , bytes) - >>| function - | `Sent _ -> [] - | `Drop exn -> raise exn + (* XXX(seliopou): This swallows any errors that might have occurred + * while attemping the handshake. Any such error should not be raised, + * since as far as the user is concerned the connection never existed. + * At the very least, the exception should be logged, which it will be + * as long as the log_disconnects option is not disabled when creating + * the controller. + * *) + >>| (function _ -> []) else if hdr.Header.type_code = type_code_echo_reply then (* Echo replies get eaten *) return [] From d3a2f80378847dbe6159cef5b2c6f57df5bed9af Mon Sep 17 00:00:00 2001 From: Spiros Eliopoulos Date: Thu, 10 Jul 2014 13:58:34 -0400 Subject: [PATCH 10/15] crashes: don't remove connections from table on close --- async/Async_OpenFlowChunk.ml | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/async/Async_OpenFlowChunk.ml b/async/Async_OpenFlowChunk.ml index 4fdb1c3..4d03a51 100644 --- a/async/Async_OpenFlowChunk.ml +++ b/async/Async_OpenFlowChunk.ml @@ -102,8 +102,6 @@ module Controller = struct end; Some(conn')) - let disconnect (t:t) (c_id:Client_id.t) = - ClientTbl.remove t.clients c_id end module Mon = struct @@ -131,14 +129,13 @@ module Controller = struct let listen t = Platform.listen t.platform let close t c_id = - Handler.disconnect t c_id; Platform.close t.platform c_id let has_client_id t c_id = - match ClientTbl.find t.clients c_id with - | None - | Some({ Conn.state = `Handshake }) -> false - | _ -> true + Platform.has_client_id t.platform c_id && + match ClientTbl.find t.clients c_id with + | Some(conn) -> not (conn.Conn.state = `Handshake) + | _ -> false let send t c_id m = Platform.send t.platform c_id m @@ -191,10 +188,10 @@ module Controller = struct begin match ClientTbl.find t.clients c_id with | None -> assert false | Some({ Conn.state = `Handshake }) -> - Handler.disconnect t c_id; + ClientTbl.remove t.clients c_id; return [] | Some(_) -> - Handler.disconnect t c_id; + ClientTbl.remove t.clients c_id; return [`Disconnect (c_id, exn)] end From 7d217a834d7637e671fe77549fe54c60b920a0fa Mon Sep 17 00:00:00 2001 From: Spiros Eliopoulos Date: Thu, 10 Jul 2014 14:31:57 -0400 Subject: [PATCH 11/15] crashes: probe and kill idle switches As part of the Chunk Controller, three separate logical threads will occasionally monitor switch connections. One will mark it as idle when no activity has been recorded for some time. The second, will probe switch connections with an echo request after some additional time of inactivity. The third will then kill switch connections that have not responded to the echo request in some period of time. --- async/Async_OpenFlowChunk.ml | 91 ++++++++++++++++++++++++++++++------ 1 file changed, 77 insertions(+), 14 deletions(-) diff --git a/async/Async_OpenFlowChunk.ml b/async/Async_OpenFlowChunk.ml index 4d03a51..ebdb7e3 100644 --- a/async/Async_OpenFlowChunk.ml +++ b/async/Async_OpenFlowChunk.ml @@ -33,7 +33,7 @@ module Controller = struct module Conn = struct type t = { - state : [ `Handshake | `Active | `Idle ]; + state : [ `Handshake | `Active | `Idle | `Probe | `Kill ]; version : int option; state_entered : Time.t; last_activity : Time.t @@ -57,12 +57,27 @@ module Controller = struct let complete_handshake (t:t) (version:int) : t = activity { t with version = Some(version) } - let idle (t:t) (expires : Time.Span.t) : t = + let idle (t:t) (span : Time.Span.t) : t * bool = let right_now = Time.now () in - if t.state = `Idle || Time.(add t.last_activity expires > right_now) then - t + if t.state = `Active && Time.(add t.last_activity span <= right_now) then + { t with state = `Idle; state_entered = right_now }, true + else + t, false + + let probe (t:t) (span : Time.Span.t) : t * bool = + let right_now = Time.now () in + if t.state = `Idle && Time.(add t.state_entered span <= right_now) then + { t with state = `Probe; state_entered = right_now }, true + else + t, false + + let kill (t:t) (span : Time.Span.t) : t * bool = + let right_now = Time.now () in + if t.state = `Probe && Time.(add t.state_entered span <= right_now) then + { t with state = `Kill; state_entered = right_now }, true else - { t with state = `Idle; state_entered = right_now } + t, false + end type t = { @@ -78,6 +93,15 @@ module Controller = struct | `Message of Client_id.t * m ] + let echo_request : int option -> Message.t = + let body = Cstruct.create 0 in + fun v -> + match v with + | None -> assert false + | Some(v) -> + let open Header in + { version = v; type_code = type_code_echo_request; length = size; xid = 0l }, body + module Handler = struct let connect (t:t) (c_id:Client_id.t) = ClientTbl.add_exn t.clients c_id (Conn.create ()) @@ -92,24 +116,61 @@ module Controller = struct | None -> assert false | Some(conn) -> Some(Conn.activity conn)) - let idle (t:t) (c_id:Client_id.t) (expires : Time.Span.t) = + let idle (t:t) (c_id:Client_id.t) (span : Time.Span.t) = ClientTbl.change t.clients c_id (function | None -> assert false | Some(conn) -> - let conn' = Conn.idle conn expires in - begin if not (conn = conn') then - printf "client %s marked as idle" (Client_id.to_string c_id) + let conn', change = Conn.idle conn span in + if change then begin + printf "client %s marked as idle\n%!" (Client_id.to_string c_id) end; Some(conn')) + let probe (t:t) (c_id:Client_id.t) (span : Time.Span.t) = + ClientTbl.change t.clients c_id (function + | None -> assert false + | Some(conn) -> + let conn', change = Conn.probe conn span in + if change then begin + printf "client %s probed\n%!" (Client_id.to_string c_id); + let echo_req = echo_request conn'.version in + let result = Result.try_with (fun () -> + Platform.send_ignore_errors t.platform c_id echo_req) in + match result with + | Error exn -> + printf "client %s write failed: %s\n%!" + (Client_id.to_string c_id) (Exn.to_string exn); + Platform.close t.platform c_id + | Ok () -> () + end; + Some(conn')) + + let kill (t:t) (c_id:Client_id.t) (span : Time.Span.t) = + ClientTbl.change t.clients c_id (function + | None -> assert false + | Some(conn) -> + let conn', change = Conn.kill conn span in + if change then begin + printf "client %s killed\n%!" (Client_id.to_string c_id); + Platform.close t.platform c_id; + end; + Some(conn')) end module Mon = struct - let rec mark_idle t wait expires = + let rec monitor t wait span f = after wait >>> fun () -> - ClientTbl.iter t.clients (fun ~key:c_id ~data:_ -> - Handler.idle t c_id expires); - mark_idle t wait expires + ClientTbl.iter t.clients (fun ~key:c_id ~data:_ -> f t c_id span); + monitor t wait span f + + let rec mark_idle t wait expires = + monitor t wait expires Handler.idle + + let rec probe_idle t wait expires = + monitor t wait expires Handler.probe + + let rec kill_idle t wait expires = + monitor t wait expires Handler.kill end let create ?max_pending_connections @@ -123,7 +184,9 @@ module Controller = struct platform = t; clients = ClientTbl.create (); } in - Mon.mark_idle ctl (Time.Span.of_sec 1.0) (Time.Span.of_sec 3.0); + Mon.mark_idle ctl (Time.Span.of_sec 1.0) (Time.Span.of_sec 4.0); + Mon.probe_idle ctl (Time.Span.of_sec 1.0) (Time.Span.of_sec 2.0); + Mon.kill_idle ctl (Time.Span.of_sec 1.0) (Time.Span.of_sec 3.0); ctl let listen t = Platform.listen t.platform From 0635d56208dd9b65c669e113bc8d8b71089f9d45 Mon Sep 17 00:00:00 2001 From: Spiros Eliopoulos Date: Thu, 10 Jul 2014 14:39:53 -0400 Subject: [PATCH 12/15] crashes: add monitor wait time Chunk.Controller.t --- async/Async_OpenFlow.mli | 2 ++ async/Async_OpenFlowChunk.ml | 31 ++++++++++++++++++------------- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/async/Async_OpenFlow.mli b/async/Async_OpenFlow.mli index 5891faa..c32433e 100644 --- a/async/Async_OpenFlow.mli +++ b/async/Async_OpenFlow.mli @@ -146,6 +146,8 @@ module Chunk : sig | `Message of Client_id.t * m ] + val set_monitor_interval : t -> Time.Span.t -> unit + val echo : (t, h, h) Stage.t val handshake : int -> (t, e, h) Stage.t end diff --git a/async/Async_OpenFlowChunk.ml b/async/Async_OpenFlowChunk.ml index ebdb7e3..c5033ba 100644 --- a/async/Async_OpenFlowChunk.ml +++ b/async/Async_OpenFlowChunk.ml @@ -82,7 +82,8 @@ module Controller = struct type t = { platform : Platform.t; - clients : Conn.t ClientTbl.t + clients : Conn.t ClientTbl.t; + mutable monitor_interval : Time.Span.t } type m = Platform.m @@ -158,21 +159,24 @@ module Controller = struct end module Mon = struct - let rec monitor t wait span f = - after wait >>> fun () -> + let rec monitor t span f = + after t.monitor_interval >>> fun () -> ClientTbl.iter t.clients (fun ~key:c_id ~data:_ -> f t c_id span); - monitor t wait span f + monitor t span f - let rec mark_idle t wait expires = - monitor t wait expires Handler.idle + let rec mark_idle t expires = + monitor t expires Handler.idle - let rec probe_idle t wait expires = - monitor t wait expires Handler.probe + let rec probe_idle t expires = + monitor t expires Handler.probe - let rec kill_idle t wait expires = - monitor t wait expires Handler.kill + let rec kill_idle t expires = + monitor t expires Handler.kill end + let set_monitor_interval (t:t) (s:Time.Span.t) : unit = + t.monitor_interval <- s + let create ?max_pending_connections ?verbose ?log_disconnects @@ -183,10 +187,11 @@ module Controller = struct let ctl = { platform = t; clients = ClientTbl.create (); + monitor_interval = Time.Span.of_ms 500.0; } in - Mon.mark_idle ctl (Time.Span.of_sec 1.0) (Time.Span.of_sec 4.0); - Mon.probe_idle ctl (Time.Span.of_sec 1.0) (Time.Span.of_sec 2.0); - Mon.kill_idle ctl (Time.Span.of_sec 1.0) (Time.Span.of_sec 3.0); + Mon.mark_idle ctl (Time.Span.of_sec 4.0); + Mon.probe_idle ctl (Time.Span.of_sec 2.0); + Mon.kill_idle ctl (Time.Span.of_sec 3.0); ctl let listen t = Platform.listen t.platform From 7a24eda5f311770a5478b213b41afa1c84d4ec4e Mon Sep 17 00:00:00 2001 From: Spiros Eliopoulos Date: Thu, 10 Jul 2014 15:01:18 -0400 Subject: [PATCH 13/15] crashes: eliminiate probe state, make waits configurable The intermediate probe state served little purpose besides providing a way of indicating to the user that a probe was imminent. For now, remove the intermediate state and immediately probe the connection once it's identified as idle. --- async/Async_OpenFlow.mli | 2 ++ async/Async_OpenFlowChunk.ml | 59 +++++++++++++++--------------------- 2 files changed, 26 insertions(+), 35 deletions(-) diff --git a/async/Async_OpenFlow.mli b/async/Async_OpenFlow.mli index c32433e..252620a 100644 --- a/async/Async_OpenFlow.mli +++ b/async/Async_OpenFlow.mli @@ -147,6 +147,8 @@ module Chunk : sig ] val set_monitor_interval : t -> Time.Span.t -> unit + val set_idle_wait : t -> Time.Span.t -> unit + val set_kill_wait : t -> Time.Span.t -> unit val echo : (t, h, h) Stage.t val handshake : int -> (t, e, h) Stage.t diff --git a/async/Async_OpenFlowChunk.ml b/async/Async_OpenFlowChunk.ml index c5033ba..0f794c7 100644 --- a/async/Async_OpenFlowChunk.ml +++ b/async/Async_OpenFlowChunk.ml @@ -33,7 +33,7 @@ module Controller = struct module Conn = struct type t = { - state : [ `Handshake | `Active | `Idle | `Probe | `Kill ]; + state : [ `Handshake | `Active | `Idle | `Kill ]; version : int option; state_entered : Time.t; last_activity : Time.t @@ -64,16 +64,9 @@ module Controller = struct else t, false - let probe (t:t) (span : Time.Span.t) : t * bool = - let right_now = Time.now () in - if t.state = `Idle && Time.(add t.state_entered span <= right_now) then - { t with state = `Probe; state_entered = right_now }, true - else - t, false - let kill (t:t) (span : Time.Span.t) : t * bool = let right_now = Time.now () in - if t.state = `Probe && Time.(add t.state_entered span <= right_now) then + if t.state = `Idle && Time.(add t.state_entered span <= right_now) then { t with state = `Kill; state_entered = right_now }, true else t, false @@ -83,7 +76,9 @@ module Controller = struct type t = { platform : Platform.t; clients : Conn.t ClientTbl.t; - mutable monitor_interval : Time.Span.t + mutable monitor_interval : Time.Span.t; + mutable idle_wait : Time.Span.t; + mutable kill_wait : Time.Span.t; } type m = Platform.m @@ -123,18 +118,8 @@ module Controller = struct | Some(conn) -> let conn', change = Conn.idle conn span in if change then begin - printf "client %s marked as idle\n%!" (Client_id.to_string c_id) - end; - Some(conn')) - - let probe (t:t) (c_id:Client_id.t) (span : Time.Span.t) = - ClientTbl.change t.clients c_id (function - | None -> assert false - | Some(conn) -> - let conn', change = Conn.probe conn span in - if change then begin - printf "client %s probed\n%!" (Client_id.to_string c_id); - let echo_req = echo_request conn'.version in + printf "client %s marked as idle... probing\n%!" (Client_id.to_string c_id); + let echo_req = echo_request conn'.Conn.version in let result = Result.try_with (fun () -> Platform.send_ignore_errors t.platform c_id echo_req) in match result with @@ -159,24 +144,27 @@ module Controller = struct end module Mon = struct - let rec monitor t span f = + let rec monitor t f = after t.monitor_interval >>> fun () -> - ClientTbl.iter t.clients (fun ~key:c_id ~data:_ -> f t c_id span); - monitor t span f - - let rec mark_idle t expires = - monitor t expires Handler.idle + ClientTbl.iter t.clients (fun ~key:c_id ~data:_ -> f t c_id); + monitor t f - let rec probe_idle t expires = - monitor t expires Handler.probe + let rec mark_idle t = + monitor t (fun t c_id -> Handler.idle t c_id t.idle_wait) - let rec kill_idle t expires = - monitor t expires Handler.kill + let rec kill_idle t = + monitor t (fun t c_id -> Handler.kill t c_id t.kill_wait) end let set_monitor_interval (t:t) (s:Time.Span.t) : unit = t.monitor_interval <- s + let set_idle_wait (t:t) (s:Time.Span.t) : unit = + t.idle_wait <- s + + let set_kill_wait (t:t) (s:Time.Span.t) : unit = + t.kill_wait <- s + let create ?max_pending_connections ?verbose ?log_disconnects @@ -188,10 +176,11 @@ module Controller = struct platform = t; clients = ClientTbl.create (); monitor_interval = Time.Span.of_ms 500.0; + idle_wait = Time.Span.of_sec 5.0; + kill_wait = Time.Span.of_sec 3.0; } in - Mon.mark_idle ctl (Time.Span.of_sec 4.0); - Mon.probe_idle ctl (Time.Span.of_sec 2.0); - Mon.kill_idle ctl (Time.Span.of_sec 3.0); + Mon.mark_idle ctl; + Mon.kill_idle ctl; ctl let listen t = Platform.listen t.platform From 7b45843f397d24693a24ac3d6422700f8916c849 Mon Sep 17 00:00:00 2001 From: Spiros Eliopoulos Date: Fri, 11 Jul 2014 13:58:04 -0400 Subject: [PATCH 14/15] crashes: configure monitoring threads in OF0x01 controller --- async/Async_OpenFlow.mli | 4 ++++ async/Async_OpenFlow0x01.ml | 9 +++++++++ 2 files changed, 13 insertions(+) diff --git a/async/Async_OpenFlow.mli b/async/Async_OpenFlow.mli index 252620a..cd271d0 100644 --- a/async/Async_OpenFlow.mli +++ b/async/Async_OpenFlow.mli @@ -177,6 +177,10 @@ module OpenFlow0x01 : sig val switch_id_of_client : t -> Client_id.t -> SDN_Types.switchId option val client_id_of_switch : t -> SDN_Types.switchId -> Client_id.t option + val set_monitor_interval : t -> Time.Span.t -> unit + val set_idle_wait : t -> Time.Span.t -> unit + val set_kill_wait : t -> Time.Span.t -> unit + val features : (t, e, f) Stage.t end diff --git a/async/Async_OpenFlow0x01.ml b/async/Async_OpenFlow0x01.ml index ea07914..3365f23 100644 --- a/async/Async_OpenFlow0x01.ml +++ b/async/Async_OpenFlow0x01.ml @@ -72,6 +72,15 @@ module Controller = struct let switch_id_of_client t c_id = ClientMap.find t.switches c_id let client_id_of_switch t sw_id = SwitchMap.find t.clients sw_id + let set_monitor_interval (t:t) (s:Time.Span.t) : unit = + ChunkController.set_monitor_interval t.sub s + + let set_idle_wait (t:t) (s:Time.Span.t) : unit = + ChunkController.set_idle_wait t.sub s + + let set_kill_wait (t:t) (s:Time.Span.t) : unit = + ChunkController.set_kill_wait t.sub s + let create ?max_pending_connections ?verbose ?log_disconnects From 6a00de6b865e6ec6ad4228399b6979f3a23ae8b9 Mon Sep 17 00:00:00 2001 From: Spiros Eliopoulos Date: Fri, 11 Jul 2014 14:44:17 -0400 Subject: [PATCH 15/15] crashes: make connection monitoring configurable --- async/Async_OpenFlow.mli | 1 + async/Async_OpenFlow0x01.ml | 5 +++-- async/Async_OpenFlow0x04.ml | 5 +++-- async/Async_OpenFlowChunk.ml | 11 +++++++---- async/Async_OpenFlow_Platform.ml | 5 ++++- 5 files changed, 18 insertions(+), 9 deletions(-) diff --git a/async/Async_OpenFlow.mli b/async/Async_OpenFlow.mli index cd271d0..0191a4d 100644 --- a/async/Async_OpenFlow.mli +++ b/async/Async_OpenFlow.mli @@ -49,6 +49,7 @@ module Platform : sig -> ?verbose:bool -> ?log_disconnects:bool -> ?buffer_age_limit:[ `At_most of Time.Span.t | `Unlimited ] + -> ?monitor_connections:bool -> port:int -> unit -> t Deferred.t diff --git a/async/Async_OpenFlow0x01.ml b/async/Async_OpenFlow0x01.ml index 3365f23..0efdb1a 100644 --- a/async/Async_OpenFlow0x01.ml +++ b/async/Async_OpenFlow0x01.ml @@ -84,9 +84,10 @@ module Controller = struct let create ?max_pending_connections ?verbose ?log_disconnects - ?buffer_age_limit ~port () = + ?buffer_age_limit + ?monitor_connections ~port () = ChunkController.create ?max_pending_connections ?verbose ?log_disconnects - ?buffer_age_limit ~port () + ?buffer_age_limit ?monitor_connections ~port () >>| function t -> { sub = t ; shakes = ClientSet.empty diff --git a/async/Async_OpenFlow0x04.ml b/async/Async_OpenFlow0x04.ml index 6cc3372..fe01fda 100644 --- a/async/Async_OpenFlow0x04.ml +++ b/async/Async_OpenFlow0x04.ml @@ -60,9 +60,10 @@ module Controller = struct let create ?max_pending_connections ?verbose ?log_disconnects - ?buffer_age_limit ~port () = + ?buffer_age_limit + ?monitor_connections ~port () = ChunkController.create ?max_pending_connections ?verbose ?log_disconnects - ?buffer_age_limit ~port () + ?buffer_age_limit ?monitor_connections ~port () >>| function t -> { sub = t } let listen t = diff --git a/async/Async_OpenFlowChunk.ml b/async/Async_OpenFlowChunk.ml index 0f794c7..57d04f8 100644 --- a/async/Async_OpenFlowChunk.ml +++ b/async/Async_OpenFlowChunk.ml @@ -168,9 +168,10 @@ module Controller = struct let create ?max_pending_connections ?verbose ?log_disconnects - ?buffer_age_limit ~port () = + ?buffer_age_limit + ?(monitor_connections=false) ~port () = Platform.create ?max_pending_connections ?verbose ?log_disconnects - ?buffer_age_limit ~port () + ?buffer_age_limit ~monitor_connections ~port () >>| function t -> let ctl = { platform = t; @@ -179,8 +180,10 @@ module Controller = struct idle_wait = Time.Span.of_sec 5.0; kill_wait = Time.Span.of_sec 3.0; } in - Mon.mark_idle ctl; - Mon.kill_idle ctl; + if monitor_connections then begin + Mon.mark_idle ctl; + Mon.kill_idle ctl + end; ctl let listen t = Platform.listen t.platform diff --git a/async/Async_OpenFlow_Platform.ml b/async/Async_OpenFlow_Platform.ml index 4c7e46c..9adaea8 100644 --- a/async/Async_OpenFlow_Platform.ml +++ b/async/Async_OpenFlow_Platform.ml @@ -24,6 +24,7 @@ module type S = sig -> ?verbose:bool -> ?log_disconnects:bool -> ?buffer_age_limit:[ `At_most of Time.Span.t | `Unlimited ] + -> ?monitor_connections:bool -> port:int -> unit -> t Deferred.t @@ -103,7 +104,9 @@ module Make(Message : Message) = struct let create ?max_pending_connections ?verbose ?log_disconnects - ?buffer_age_limit ~port () = + ?buffer_age_limit + ?monitor_connections + ~port () = Impl.create ?max_pending_connections ?verbose ?log_disconnects ?buffer_age_limit ~port ~auth:(fun _ _ _ -> return `Allow) ()