Skip to content

Commit

Permalink
[multicore][tasks] polymorphic child_id type
Browse files Browse the repository at this point in the history
Summary: TaskGenerator is a bit too specialised for multiprocessing: it expects a Unix pid passed to the `next` function. This will not make sense in a domain-based implementation (all the pids will be the same), so we make that type polymorphic. ProcessPool instantiates that type to a `Pid.t`.

Reviewed By: jvillard

Differential Revision:
D68446335

Privacy Context Container: L1208441

fbshipit-source-id: 9977ffd4265da122f6efb85dd9ae0b9b6ed3d22a
  • Loading branch information
ngorogiannis authored and facebook-github-bot committed Jan 22, 2025
1 parent c8df77d commit 4c9d427
Show file tree
Hide file tree
Showing 13 changed files with 30 additions and 27 deletions.
2 changes: 1 addition & 1 deletion infer/src/backend/CallGraphScheduler.mli
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@

open! IStd

val bottom_up : CallGraph.t -> (TaskSchedulerTypes.target, 'a) TaskGenerator.t
val bottom_up : CallGraph.t -> (TaskSchedulerTypes.target, 'a, _) TaskGenerator.t
2 changes: 1 addition & 1 deletion infer/src/backend/FileScheduler.mli
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ open! IStd
val make :
finish:('a option -> TaskSchedulerTypes.target -> TaskSchedulerTypes.target option)
-> SourceFile.t list
-> (TaskSchedulerTypes.target, 'a) TaskGenerator.t
-> (TaskSchedulerTypes.target, 'a, _) TaskGenerator.t
2 changes: 1 addition & 1 deletion infer/src/backend/ReplayScheduler.mli
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ val make :
finish:('a option -> TaskSchedulerTypes.target -> TaskSchedulerTypes.target option)
-> CallGraph.t
-> SourceFile.t list
-> (TaskSchedulerTypes.target, 'a) TaskGenerator.t
-> (TaskSchedulerTypes.target, 'a, _) TaskGenerator.t
6 changes: 3 additions & 3 deletions infer/src/backend/RestartScheduler.ml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type target_with_dependency = {target: TaskSchedulerTypes.target; dependency_fil

let restart_count = ref 0

let of_queue ready : ('a, TaskSchedulerTypes.analysis_result) TaskGenerator.t =
let of_queue ready : ('a, TaskSchedulerTypes.analysis_result, Pid.t) TaskGenerator.t =
let remaining = ref (Queue.length ready) in
let remaining_tasks () = !remaining in
let is_empty () = Int.equal !remaining 0 in
Expand Down Expand Up @@ -60,11 +60,11 @@ let of_queue ready : ('a, TaskSchedulerTypes.analysis_result) TaskGenerator.t =
| _ ->
None
in
let next {TaskGenerator.child_pid; is_first_update} =
let next {TaskGenerator.child_id; is_first_update} =
if is_first_update then
(* new update cycle, worth checking if the first job in the queue is still blocked again *)
waiting_for_blocked_target := false ;
match dequeue_from_blocked child_pid with
match dequeue_from_blocked child_id with
| Some _ as some_result ->
some_result
| None ->
Expand Down
2 changes: 1 addition & 1 deletion infer/src/backend/RestartScheduler.mli
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ val setup : unit -> unit

val make :
SourceFile.t list
-> (TaskSchedulerTypes.target, TaskSchedulerTypes.analysis_result) TaskGenerator.t
-> (TaskSchedulerTypes.target, TaskSchedulerTypes.analysis_result, Pid.t) TaskGenerator.t

val with_lock :
get_actives:(unit -> SpecializedProcname.t list) -> f:(unit -> 'a) -> Procname.t -> 'a
Expand Down
2 changes: 1 addition & 1 deletion infer/src/backend/SyntacticCallGraph.ml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ let build_from_sources sources =

let to_dotty g = CallGraph.to_dotty g SyntacticDependencyGraphDot

let bottom_up sources : (TaskSchedulerTypes.target, 'a) TaskGenerator.t =
let bottom_up sources : (TaskSchedulerTypes.target, 'a, _) TaskGenerator.t =
let syntactic_call_graph = build_from_sources sources in
if Config.debug_level_analysis > 0 then to_dotty syntactic_call_graph ;
CallGraphScheduler.bottom_up syntactic_call_graph
Expand Down
2 changes: 1 addition & 1 deletion infer/src/backend/SyntacticCallGraph.mli
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ val iter_captured_procs_and_callees : (Procname.t -> Procname.t list -> unit) ->
val make :
finish:('a option -> TaskSchedulerTypes.target -> TaskSchedulerTypes.target option)
-> SourceFile.t list
-> (TaskSchedulerTypes.target, 'a) TaskGenerator.t
-> (TaskSchedulerTypes.target, 'a, _) TaskGenerator.t
(** task generator that works by
- loading the syntactic call graph from the capture DB
Expand Down
2 changes: 1 addition & 1 deletion infer/src/backend/Tasks.ml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ let run_sequentially ~finish ~(f : ('a, 'b) doer) (tasks : 'a list) : unit =
TaskBar.set_tasks_total task_bar (task_generator.remaining_tasks ()) ;
TaskBar.tasks_done_reset task_bar ;
let for_child_info =
{TaskGenerator.child_slot= -1; child_pid= Unix.getpid (); is_first_update= true}
{TaskGenerator.child_slot= -1; child_id= Unix.getpid (); is_first_update= true}
in
let rec run_tasks () =
if not (task_generator.is_empty ()) then (
Expand Down
2 changes: 1 addition & 1 deletion infer/src/backend/Tasks.mli
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ module Runner : sig
-> child_prologue:(ProcessPool.Worker.id -> unit)
-> f:('work, 'result) doer
-> child_epilogue:(ProcessPool.Worker.id -> 'final)
-> (unit -> ('work, 'result) TaskGenerator.t)
-> (unit -> ('work, 'result, Pid.t) TaskGenerator.t)
-> ('work, 'final, 'result) t
(** Create a runner running [jobs] jobs in parallel *)

Expand Down
9 changes: 5 additions & 4 deletions infer/src/base/ProcessPool.ml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ type ('work, 'final, 'result) t =
; children_updates: Unix.File_descr.t list
(** each child has it's own pipe to send updates to the pool *)
; task_bar: TaskBar.t
; tasks: ('work, 'result) TaskGenerator.t (** generator for work remaining to be done *) }
; tasks: ('work, 'result, Pid.t) TaskGenerator.t (** generator for work remaining to be done *)
}

(** {2 Constants} *)

Expand Down Expand Up @@ -204,10 +205,10 @@ let send_work_to_idle_children pool =
let exception NoMoreWork in
let is_first_update_ref = ref true in
let send_work_to_child pool slot =
let child_pid = pool.slots.(slot).pid in
let child_id = pool.slots.(slot).pid in
let is_first_update = !is_first_update_ref in
is_first_update_ref := false ;
match pool.tasks.next {child_slot= slot; child_pid; is_first_update} with
match pool.tasks.next {child_slot= slot; child_id; is_first_update} with
| None ->
raise_notrace NoMoreWork
| Some (x, finish) ->
Expand Down Expand Up @@ -495,7 +496,7 @@ let create :
-> child_prologue:(Worker.id -> unit)
-> f:('work -> 'result option)
-> child_epilogue:(Worker.id -> 'final)
-> tasks:(unit -> ('work, 'result) TaskGenerator.t)
-> tasks:(unit -> ('work, 'result, Pid.t) TaskGenerator.t)
-> ('work, 'final, 'result) t =
fun ~jobs ~child_prologue ~f ~child_epilogue ~tasks ->
let task_bar = TaskBar.create ~jobs in
Expand Down
2 changes: 1 addition & 1 deletion infer/src/base/ProcessPool.mli
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ val create :
-> child_prologue:(Worker.id -> unit)
-> f:('work -> 'result option)
-> child_epilogue:(Worker.id -> 'final)
-> tasks:(unit -> ('work, 'result) TaskGenerator.t)
-> tasks:(unit -> ('work, 'result, Pid.t) TaskGenerator.t)
-> ('work, 'final, 'result) t
(** Create a new pool of processes running [jobs] jobs in parallel *)

Expand Down
10 changes: 5 additions & 5 deletions infer/src/base/TaskGenerator.ml
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@

open! IStd

type for_child_info = {child_slot: int; child_pid: Pid.t; is_first_update: bool}
type 'c for_child_info = {child_slot: int; child_id: 'c; is_first_update: bool}

type ('a, 'b) t =
type ('a, 'b, 'c) t =
{ remaining_tasks: unit -> int
; is_empty: unit -> bool
; finished: result:'b option -> 'a -> unit
; next: for_child_info -> ('a * (unit -> unit)) option }
; next: 'c for_child_info -> ('a * (unit -> unit)) option }

let chain (gen1 : ('a, 'b) t) (gen2 : ('a, 'b) t) : ('a, 'b) t =
let chain (gen1 : ('a, 'b, 'c) t) (gen2 : ('a, 'b, 'c) t) : ('a, 'b, 'c) t =
let remaining_tasks () = gen1.remaining_tasks () + gen2.remaining_tasks () in
let gen1_returned_empty = ref false in
let gen1_is_empty () =
Expand All @@ -32,7 +32,7 @@ let chain (gen1 : ('a, 'b) t) (gen2 : ('a, 'b) t) : ('a, 'b) t =
{remaining_tasks; is_empty; finished; next}


let of_list ~finish (lst : 'a list) : ('a, _) t =
let of_list ~finish (lst : 'a list) : ('a, _, _) t =
let content = ref lst in
let length = ref (List.length lst) in
let remaining_tasks () = !length in
Expand Down
14 changes: 8 additions & 6 deletions infer/src/base/TaskGenerator.mli
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,34 @@

open! IStd

type for_child_info = {child_slot: int; child_pid: Pid.t; is_first_update: bool}
type 'child_id for_child_info = {child_slot: int; child_id: 'child_id; is_first_update: bool}

(** abstraction for generating jobs *)
type ('a, 'b) t =
type ('work, 'result, 'child_id) t =
{ remaining_tasks: unit -> int
(** number of tasks remaining to complete -- only used for reporting, so imprecision is not
a bug *)
; is_empty: unit -> bool
(** when should the main loop of the task manager stop expecting new tasks *)
; finished: result:'b option -> 'a -> unit
; finished: result:'result option -> 'work -> unit
(** Process pool calls [finished result:r x] when a worker finishes item [x]. [result] is
[None] when the item was completed successfully and [Some pname] when it failed because
it could not lock [pname]. This is only called if [next ()] has previously returned
[Some x] and [x] was sent to a worker. *)
; next: for_child_info -> ('a * (unit -> unit)) option
; next: 'child_id for_child_info -> ('work * (unit -> unit)) option
(** [next ()] generates the next work item together with a "finalizer" for that work item
that will be run once the work has completed (just before calling [finished]). If
[is_empty ()] is true then [next ()] must return [None]. However, it is OK to for
[next ()] to return [None] when [is_empty] is false. This corresponds to the case where
there is more work to be done, but it is not schedulable until some already scheduled
work is finished. *) }

val chain : ('a, 'b) t -> ('a, 'b) t -> ('a, 'b) t
val chain :
('work, 'result, 'child_id) t -> ('work, 'result, 'child_id) t -> ('work, 'result, 'child_id) t
(** chain two generators in order *)

val of_list : finish:('b option -> 'a -> 'a option) -> 'a list -> ('a, 'b) t
val of_list :
finish:('result option -> 'work -> 'work option) -> 'work list -> ('work, 'result, _) t
(** schedule tasks out of a concrete list *)

val finish_always_none : _ option -> _ -> _ option

0 comments on commit 4c9d427

Please sign in to comment.