Skip to content

Commit

Permalink
irmin-pack: update gc to keep stats after finalise
Browse files Browse the repository at this point in the history
  • Loading branch information
metanivek committed Aug 30, 2022
1 parent a94723b commit af53b9b
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 66 deletions.
138 changes: 75 additions & 63 deletions src/irmin-pack/unix/gc.ml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ module Make (Args : Args) = struct
contents : read Contents_store.t;
node : read Node_store.t;
commit : read Commit_store.t;
mutable stats : stats option;
}

let v ~root ~generation ~unlink ~offset ~dispatcher ~fm ~contents ~node
Expand Down Expand Up @@ -85,6 +86,7 @@ module Make (Args : Args) = struct
contents;
node;
commit;
stats = None;
}

let open_new_suffix ~end_offset { root; generation; _ } =
Expand Down Expand Up @@ -198,73 +200,83 @@ module Make (Args : Args) = struct
| `Running, _ -> assert false

let finalise ~wait t =
let go status =
let start = t.elapsed () in
let s =
ref
{
duration = 0.;
finalisation_duration = 0.;
read_gc_output_duration = 0.;
transfer_latest_newies_duration = 0.;
swap_duration = 0.;
unlink_duration = 0.;
}
in
let time on_end f =
let counter = Mtime_clock.counter () in
let res = f () in
on_end (Mtime_clock.count counter |> Mtime.Span.to_s);
res
in
match t.stats with
| Some stats -> Lwt.return_ok (`Finalised stats)
| None -> (
let go status =
let start = t.elapsed () in
let s =
ref
{
duration = 0.;
finalisation_duration = 0.;
read_gc_output_duration = 0.;
transfer_latest_newies_duration = 0.;
swap_duration = 0.;
unlink_duration = 0.;
}
in
let time on_end f =
let counter = Mtime_clock.counter () in
let res = f () in
on_end (Mtime_clock.count counter |> Mtime.Span.to_s);
res
in

let gc_output =
time (fun t -> s := { !s with read_gc_output_duration = t })
@@ fun () -> Fm.read_gc_output ~root:t.root ~generation:t.generation
in
let gc_output =
time (fun t -> s := { !s with read_gc_output_duration = t })
@@ fun () -> Fm.read_gc_output ~root:t.root ~generation:t.generation
in

let result =
let open Result_syntax in
match (status, gc_output) with
| `Success, Ok copy_end_offset ->
let* new_suffix_end_offset =
time (fun t ->
s := { !s with transfer_latest_newies_duration = t })
@@ fun () ->
transfer_latest_newies ~right_start_offset:t.offset
~copy_end_offset t
in
let* () =
time (fun t -> s := { !s with swap_duration = t }) @@ fun () ->
swap_and_purge ~right_start_offset:t.offset
~right_end_offset:new_suffix_end_offset t
in
(if t.unlink then
time (fun t -> s := { !s with unlink_duration = t }) @@ fun () ->
unlink_all t);
let result =
let open Result_syntax in
match (status, gc_output) with
| `Success, Ok copy_end_offset ->
let* new_suffix_end_offset =
time (fun t ->
s := { !s with transfer_latest_newies_duration = t })
@@ fun () ->
transfer_latest_newies ~right_start_offset:t.offset
~copy_end_offset t
in
let* () =
time (fun t -> s := { !s with swap_duration = t })
@@ fun () ->
swap_and_purge ~right_start_offset:t.offset
~right_end_offset:new_suffix_end_offset t
in
(if t.unlink then
time (fun t -> s := { !s with unlink_duration = t })
@@ fun () -> unlink_all t);

let duration = t.elapsed () in
s := { !s with duration; finalisation_duration = duration -. start };
let duration = t.elapsed () in
s :=
{
!s with
duration;
finalisation_duration = duration -. start;
};
t.stats <- Some !s;

[%log.debug
"Gc ended. %a, newies bytes:%a" pp_stats !s Int63.pp
(Int63.sub new_suffix_end_offset copy_end_offset)];
let () = Lwt.wakeup_later t.resolver (Ok !s) in
Ok (`Finalised !s)
| _ ->
let err = gc_errors status gc_output in
let () = Lwt.wakeup_later t.resolver err in
err
in
Lwt.return result
in
if wait then
let* status = Async.await t.task in
go status
else
match Async.status t.task with
| `Running -> Lwt.return_ok `Running
| status -> go status
[%log.debug
"Gc ended. %a, newies bytes:%a" pp_stats !s Int63.pp
(Int63.sub new_suffix_end_offset copy_end_offset)];
let () = Lwt.wakeup_later t.resolver (Ok !s) in
Ok (`Finalised !s)
| _ ->
let err = gc_errors status gc_output in
let () = Lwt.wakeup_later t.resolver err in
err
in
Lwt.return result
in
if wait then
let* status = Async.await t.task in
go status
else
match Async.status t.task with
| `Running -> Lwt.return_ok `Running
| status -> go status)

let on_finalise t f =
(* Ignore returned promise since the purpose of this
Expand Down
4 changes: 1 addition & 3 deletions src/irmin-pack/unix/gc_intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,7 @@ module type S = sig
([> `Running | `Finalised of stats ], Args.Errs.t) result Lwt.t
(** [finalise ~wait t] returns the state of the GC process.
If [wait = true], the call will block until GC finishes.
Calling [finalise] after a call that returns [`Finalised] is undefined. *)
If [wait = true], the call will block until GC finishes. *)

val on_finalise : t -> ((stats, Args.Errs.t) result -> unit Lwt.t) -> unit
(** Attaches a callback to the GC process, which will be called when the GC
Expand Down

0 comments on commit af53b9b

Please sign in to comment.