Skip to content

Commit

Permalink
irmin-pack: refine append only flush callback
Browse files Browse the repository at this point in the history
Add support for two types of flush behavior when the flush threshold is
reached: automatic flushes and manually controlled flushes.
  • Loading branch information
metanivek committed Sep 16, 2022
1 parent b72a868 commit fef11e7
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 52 deletions.
34 changes: 20 additions & 14 deletions src/irmin-pack/unix/append_only_file.ml
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,17 @@ module Make (Io : Io.S) = struct
rw_perm : rw_perm option;
}

and flush_callback =
[ `Auto of (unit, Io.write_error) result -> unit | `Manual of t -> unit ]

and rw_perm = {
buf : Buffer.t;
auto_flush_threshold : int;
auto_flush_callback : t -> unit;
flush_threshold : int;
flush_callback : flush_callback;
}
(** [rw_perm] contains the data necessary to operate in readwrite mode. *)

let create_rw ~path ~overwrite ~auto_flush_threshold ~auto_flush_callback =
let create_rw ~path ~overwrite ~flush_threshold ~flush_callback =
let open Result_syntax in
let+ io = Io.create ~path ~overwrite in
let persisted_end_offset = Int63.zero in
Expand All @@ -43,7 +46,7 @@ module Make (Io : Io.S) = struct
io;
persisted_end_offset;
dead_header_size = Int63.zero;
rw_perm = Some { buf; auto_flush_threshold; auto_flush_callback };
rw_perm = Some { buf; flush_threshold; flush_callback };
}

(** A store is consistent if the real offset of the suffix/dict files is the
Expand Down Expand Up @@ -72,8 +75,8 @@ module Make (Io : Io.S) = struct
Int63.pp end_offset Int63.pp real_offset_without_header (Io.path io)];
Ok ())

let open_rw ~path ~end_offset ~dead_header_size ~auto_flush_threshold
~auto_flush_callback =
let open_rw ~path ~end_offset ~dead_header_size ~flush_threshold
~flush_callback =
let open Result_syntax in
let* io = Io.open_ ~path ~readonly:false in
let+ () = check_consistent_store ~end_offset ~dead_header_size io in
Expand All @@ -84,7 +87,7 @@ module Make (Io : Io.S) = struct
io;
persisted_end_offset;
dead_header_size;
rw_perm = Some { buf; auto_flush_threshold; auto_flush_callback };
rw_perm = Some { buf; flush_threshold; flush_callback };
}

let open_ro ~path ~end_offset ~dead_header_size =
Expand All @@ -105,9 +108,9 @@ module Make (Io : Io.S) = struct
let readonly t = Io.readonly t.io
let path t = Io.path t.io

let auto_flush_threshold = function
let flush_threshold = function
| { rw_perm = None; _ } -> None
| { rw_perm = Some rw_perm; _ } -> Some rw_perm.auto_flush_threshold
| { rw_perm = Some rw_perm; _ } -> Some rw_perm.flush_threshold

let end_offset t =
match t.rw_perm with
Expand Down Expand Up @@ -160,10 +163,13 @@ module Make (Io : Io.S) = struct
let append_exn t s =
match t.rw_perm with
| None -> raise Errors.RO_not_allowed
| Some rw_perm ->
assert (Buffer.length rw_perm.buf < rw_perm.auto_flush_threshold);
| Some rw_perm -> (
assert (Buffer.length rw_perm.buf < rw_perm.flush_threshold);
Buffer.add_string rw_perm.buf s;
if Buffer.length rw_perm.buf >= rw_perm.auto_flush_threshold then (
rw_perm.auto_flush_callback t;
assert (empty_buffer t))
if Buffer.length rw_perm.buf >= rw_perm.flush_threshold then
match rw_perm.flush_callback with
| `Auto cb -> flush t |> cb
| `Manual cb ->
cb t;
assert (empty_buffer t))
end
29 changes: 20 additions & 9 deletions src/irmin-pack/unix/append_only_file_intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,32 @@ module type S = sig

type t

type flush_callback =
[ `Auto of (unit, Io.write_error) result -> unit | `Manual of t -> unit ]
(** [flush_callback] is the callback when a flush threshold has been reached.
- Use [`Auto] callbacks if you only care about the result of a flush. The
buffer is flushed automatically before calling the callback with the
result.
- Use [`Manual] callbacks if you need more control over when the flush
occurs, for instance if you want to coordinate with flushing other
files. The buffer is not automatically flushed before calling the
callback but flush is expected to be called by the callback. *)

val create_rw :
path:string ->
overwrite:bool ->
auto_flush_threshold:int ->
auto_flush_callback:(t -> unit) ->
flush_threshold:int ->
flush_callback:flush_callback ->
(t, [> Io.create_error ]) result
(** Create a rw instance of [t] by creating the file at [path]. *)

val open_rw :
path:string ->
end_offset:int63 ->
dead_header_size:int ->
auto_flush_threshold:int ->
auto_flush_callback:(t -> unit) ->
flush_threshold:int ->
flush_callback:flush_callback ->
( t,
[> Io.open_error
| `Closed
Expand Down Expand Up @@ -74,10 +86,9 @@ module type S = sig
{3 Auto Flushes}
One of the goals of the [Append_only_file] abstraction is to provide
buffered appends. [auto_flush_threshold] is the soft cap after which the
buffer should be flushed. If a call to [append_exn] fills the buffer,
[auto_flush_callback] will be called so that the parent abstraction takes
care of the flush procedure, which is expected to call [flush]. *)
buffered appends. [flush_threshold] is the soft cap after which the buffer
should be flushed. If a call to [append_exn] fills the buffer,
[flush_callback] will be called. *)

val open_ro :
path:string ->
Expand Down Expand Up @@ -168,7 +179,7 @@ module type S = sig
Always returns [Error `Rw_not_allowed]. *)

val readonly : t -> bool
val auto_flush_threshold : t -> int option
val flush_threshold : t -> int option
val empty_buffer : t -> bool
val path : t -> string
end
Expand Down
22 changes: 10 additions & 12 deletions src/irmin-pack/unix/file_manager.ml
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ struct
(** Is expected to be called by the index when its append buffer is full so
that the dependendies of index are flushes. When the function returns,
index will flush itself. *)
let index_is_about_to_auto_flush_exn t =
let index_is_about_to_flush_exn t =
Stats.incr_fm_field Auto_index;
flush_suffix_and_its_deps t |> Errs.raise_if_error

Expand Down Expand Up @@ -248,14 +248,14 @@ struct
[%log.debug "reload: generation changed, opening %s" path];
if readonly then Suffix.open_ro ~path ~end_offset ~dead_header_size
else
let auto_flush_threshold =
match Suffix.auto_flush_threshold t.suffix with
let flush_threshold =
match Suffix.flush_threshold t.suffix with
| None -> assert false
| Some x -> x
in
let cb _ = suffix_requires_a_flush_exn t in
Suffix.open_rw ~path ~end_offset ~dead_header_size ~auto_flush_threshold
~auto_flush_callback:cb
Suffix.open_rw ~path ~end_offset ~dead_header_size ~flush_threshold
~flush_callback:(`Manual cb)
in
let suffix0 = t.suffix in
t.suffix <- suffix1;
Expand Down Expand Up @@ -306,11 +306,11 @@ struct
(* 2. Open the other files *)
let* suffix =
let path = Irmin_pack.Layout.V3.suffix ~root ~generation in
let auto_flush_threshold =
let flush_threshold =
Irmin_pack.Conf.suffix_auto_flush_threshold config
in
let cb _ = suffix_requires_a_flush_exn (get_instance ()) in
make_suffix ~path ~auto_flush_threshold ~auto_flush_callback:cb
make_suffix ~path ~flush_threshold ~flush_callback:(`Manual cb)
in
let* prefix =
let path = Irmin_pack.Layout.V3.prefix ~root ~generation in
Expand All @@ -319,11 +319,9 @@ struct
let* mapping = open_mapping ~root ~generation in
let* dict =
let path = Irmin_pack.Layout.V3.dict ~root in
let auto_flush_threshold =
Irmin_pack.Conf.dict_auto_flush_threshold config
in
let flush_threshold = Irmin_pack.Conf.dict_auto_flush_threshold config in
let cb _ = dict_requires_a_flush_exn (get_instance ()) in
make_dict ~path ~auto_flush_threshold ~auto_flush_callback:cb
make_dict ~path ~flush_threshold ~flush_callback:(`Manual cb)
in
let* index =
let log_size = Conf.index_log_size config in
Expand All @@ -335,7 +333,7 @@ struct
the callback if the instance is None *)
match !instance with
| None -> ()
| Some _ -> index_is_about_to_auto_flush_exn (get_instance ())
| Some _ -> index_is_about_to_flush_exn (get_instance ())
in
(* [cb] will not be called during calls to [index.flush] because we will
use [~no_callback:()] *)
Expand Down
22 changes: 11 additions & 11 deletions src/irmin-pack/unix/gc.ml
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ module Worker = struct
module Ao = struct
include Append_only_file.Make (Fm.Io)

let create_rw_exn ~path ~auto_flush_callback =
create_rw ~path ~overwrite:true ~auto_flush_threshold:1_000_000
~auto_flush_callback
let create_rw_exn ~path ~flush_callback =
create_rw ~path ~overwrite:true ~flush_threshold:1_000_000
~flush_callback
|> Errs.raise_if_error
end

Expand Down Expand Up @@ -148,8 +148,8 @@ module Worker = struct

let create_new_suffix ~root ~generation =
let path = Irmin_pack.Layout.V3.suffix ~root ~generation in
let auto_flush_callback x = Ao.flush x |> Errs.raise_if_error in
Ao.create_rw_exn ~path ~auto_flush_callback
let flush_callback = `Auto Errs.raise_if_error in
Ao.create_rw_exn ~path ~flush_callback

let run ~generation root commit_key =
let open Result_syntax in
Expand Down Expand Up @@ -233,8 +233,8 @@ module Worker = struct
(* Step 4. Create the new prefix. *)
let prefix =
let path = Irmin_pack.Layout.V3.prefix ~root ~generation in
let auto_flush_callback x = Ao.flush x |> Errs.raise_if_error in
Ao.create_rw_exn ~path ~auto_flush_callback
let flush_callback = `Auto Errs.raise_if_error in
Ao.create_rw_exn ~path ~flush_callback
in
let () =
Errors.finalise_exn (fun _outcome ->
Expand Down Expand Up @@ -417,11 +417,11 @@ module Make (Args : Args) = struct
(* As the new suffix is necessarily in V3, the dead_header_size is
0. *)
let dead_header_size = 0 in
let auto_flush_threshold = 1_000_000 in
let auto_flush_callback x = Ao.flush x |> Errs.raise_if_error in
let flush_threshold = 1_000_000 in
let flush_callback = `Auto Errs.raise_if_error in
let* suffix =
Ao.open_rw ~path ~end_offset ~dead_header_size ~auto_flush_callback
~auto_flush_threshold
Ao.open_rw ~path ~end_offset ~dead_header_size ~flush_callback
~flush_threshold
in
Ok suffix

Expand Down
12 changes: 6 additions & 6 deletions src/irmin-pack/unix/mapping_file.ml
Original file line number Diff line number Diff line change
Expand Up @@ -371,10 +371,10 @@ module Make (Io : Io.S) = struct
Io.unlink path2 |> ignore;

(* Create [file0] *)
let auto_flush_callback x = Ao.flush x |> Errs.raise_if_error in
let flush_callback = `Auto Errs.raise_if_error in
let* file0 =
Ao.create_rw ~path:path0 ~overwrite:true ~auto_flush_threshold:1_000_000
~auto_flush_callback
Ao.create_rw ~path:path0 ~overwrite:true ~flush_threshold:1_000_000
~flush_callback
in

(* Fill and close [file0] *)
Expand Down Expand Up @@ -404,10 +404,10 @@ module Make (Io : Io.S) = struct
Io.unlink path0 |> ignore;

(* Create [file2] *)
let auto_flush_callback x = Ao.flush x |> Errs.raise_if_error in
let flush_callback = `Auto Errs.raise_if_error in
let* file2 =
Ao.create_rw ~path:path2 ~overwrite:true ~auto_flush_threshold:1_000_000
~auto_flush_callback
Ao.create_rw ~path:path2 ~overwrite:true ~flush_threshold:1_000_000
~flush_callback
in

(* Fill and close [file2] *)
Expand Down

0 comments on commit fef11e7

Please sign in to comment.