5e7c2d6cef
GitOrigin-RevId: f99e5f03cc0aa231ab5950a15ed02afec45ed51a
205 lines
9 KiB
Diff
205 lines
9 KiB
Diff
diff --git a/lib/engine/scheduler.ml b/lib/engine/scheduler.ml
|
|
index e32bd0f..93b566b 100644
|
|
--- a/lib/engine/scheduler.ml
|
|
+++ b/lib/engine/scheduler.ml
|
|
@@ -601,7 +601,7 @@ module Make(Backend : Backend) = struct
|
|
)
|
|
)
|
|
| Trywith tw -> (
|
|
- match Table.find sched.traces (Workflow.id tw.w) with
|
|
+ match Hashtbl.find sched.traces (Workflow.id tw.w) with
|
|
| Some eventual_trace -> (
|
|
eventual_trace >>= function
|
|
| Ok (Run r) ->
|
|
@@ -667,10 +667,10 @@ module Make(Backend : Backend) = struct
|
|
let register_build sched ~id ~build_trace =
|
|
let open Eval_thread.Infix in
|
|
(
|
|
- match Table.find sched.traces id with
|
|
+ match Hashtbl.find sched.traces id with
|
|
| None ->
|
|
let trace = build_trace () in
|
|
- Table.set sched.traces ~key:id ~data:trace ;
|
|
+ Hashtbl.set sched.traces ~key:id ~data:trace ;
|
|
trace
|
|
| Some trace -> trace
|
|
) >>= fun trace ->
|
|
@@ -854,7 +854,7 @@ module Make(Backend : Backend) = struct
|
|
Eval_thread.join l.elts ~f:(build ?target sched)
|
|
| Trywith tw -> (
|
|
build sched ?target tw.w >> fun w_result ->
|
|
- match Table.find sched.traces (Workflow.id tw.w) with
|
|
+ match Hashtbl.find sched.traces (Workflow.id tw.w) with
|
|
| Some eventual_trace -> (
|
|
eventual_trace >> function
|
|
| Ok (Run r) when run_trywith_recovery r.details ->
|
|
diff --git a/lib/multinode/bistro_multinode.ml b/lib/multinode/bistro_multinode.ml
|
|
index 01dc5ac..3fc6b0e 100644
|
|
--- a/lib/multinode/bistro_multinode.ml
|
|
+++ b/lib/multinode/bistro_multinode.ml
|
|
@@ -130,7 +130,7 @@ module Server = struct
|
|
let search (type s) (table : s String.Table.t) ~f =
|
|
let module M = struct exception Found of string * s end in
|
|
try
|
|
- String.Table.fold table ~init:() ~f:(fun ~key ~data () -> if f ~key ~data then raise (M.Found (key, data))) ;
|
|
+ Hashtbl.fold table ~init:() ~f:(fun ~key ~data () -> if f ~key ~data then raise (M.Found (key, data))) ;
|
|
None
|
|
with M.Found (k, v) -> Some (k, v)
|
|
|
|
@@ -145,7 +145,7 @@ module Server = struct
|
|
match allocation_attempt with
|
|
| None -> Some elt
|
|
| Some (worker_id, (Resource curr)) ->
|
|
- String.Table.set pool.available ~key:worker_id ~data:(Resource { np = curr.np - np ; mem = curr.mem - mem }) ;
|
|
+ Hashtbl.set pool.available ~key:worker_id ~data:(Resource { np = curr.np - np ; mem = curr.mem - mem }) ;
|
|
Lwt.wakeup u (worker_id, Resource { np ; mem }) ;
|
|
None
|
|
)
|
|
@@ -163,12 +163,12 @@ module Server = struct
|
|
t
|
|
|
|
let add_worker pool (Worker { id ; np ; mem ; _ }) =
|
|
- match String.Table.add pool.available ~key:id ~data:(Allocator.Resource { np ; mem }) with
|
|
+ match Hashtbl.add pool.available ~key:id ~data:(Allocator.Resource { np ; mem }) with
|
|
| `Ok -> allocation_pass pool
|
|
| `Duplicate -> failwith "A worker has been added twice"
|
|
|
|
let release pool worker_id (Allocator.Resource { np ; mem }) =
|
|
- String.Table.update pool.available worker_id ~f:(function
|
|
+ Hashtbl.update pool.available worker_id ~f:(function
|
|
| None -> failwith "Tried to release resources of inexistent worker"
|
|
| Some (Resource r) -> Resource { np = r.np + np ; mem = r.mem + mem }
|
|
)
|
|
@@ -235,13 +235,13 @@ module Server = struct
|
|
| Subscript { np ; mem } ->
|
|
let id = new_id () in
|
|
let w = create_worker ~np ~mem id in
|
|
- String.Table.set state.workers ~key:id ~data:w ;
|
|
+ Hashtbl.set state.workers ~key:id ~data:w ;
|
|
Worker_allocator.add_worker state.alloc w ;
|
|
log (Logger.Debug (sprintf "new worker %s" id)) ;
|
|
Lwt.return (Client_id id)
|
|
|
|
| Get_job { client_id } -> (
|
|
- match String.Table.find state.workers client_id with
|
|
+ match Hashtbl.find state.workers client_id with
|
|
| None -> Lwt.return None
|
|
| Some (Worker worker) ->
|
|
Lwt.choose [
|
|
@@ -250,22 +250,22 @@ module Server = struct
|
|
] >>= function
|
|
| `Job wp ->
|
|
let workflow_id = workflow_id_of_job_waiter wp in
|
|
- String.Table.set worker.running_jobs ~key:workflow_id ~data:wp ;
|
|
+ Hashtbl.set worker.running_jobs ~key:workflow_id ~data:wp ;
|
|
Lwt.return (Some (job_of_job_waiter wp))
|
|
| `Stop -> Lwt.return None
|
|
)
|
|
|
|
| Plugin_result r ->
|
|
- let Worker worker = String.Table.find_exn state.workers r.client_id in
|
|
+ let Worker worker = Hashtbl.find_exn state.workers r.client_id in
|
|
Lwt.return (
|
|
- match String.Table.find_exn worker.running_jobs r.workflow_id with
|
|
+ match Hashtbl.find_exn worker.running_jobs r.workflow_id with
|
|
| Waiting_plugin wp -> Lwt.wakeup wp.waiter r.result
|
|
| Waiting_shell_command _ -> assert false (* should never happen *)
|
|
)
|
|
| Shell_command_result r ->
|
|
- let Worker worker = String.Table.find_exn state.workers r.client_id in
|
|
+ let Worker worker = Hashtbl.find_exn state.workers r.client_id in
|
|
Lwt.return (
|
|
- match String.Table.find_exn worker.running_jobs r.workflow_id with
|
|
+ match Hashtbl.find_exn worker.running_jobs r.workflow_id with
|
|
| Waiting_plugin _ -> assert false (* should never happen *)
|
|
| Waiting_shell_command wp -> Lwt.wakeup wp.waiter r.result
|
|
)
|
|
@@ -307,7 +307,7 @@ module Server = struct
|
|
|
|
let request_resource backend req =
|
|
Worker_allocator.request backend.state.alloc req >|= fun (worker_id, resource) ->
|
|
- String.Table.find_exn backend.state.workers worker_id, resource
|
|
+ Hashtbl.find_exn backend.state.workers worker_id, resource
|
|
|
|
let release_resource backend worker_id res =
|
|
Worker_allocator.release backend.state.alloc worker_id res
|
|
@@ -334,7 +334,7 @@ module Server = struct
|
|
* loop () *)
|
|
|
|
let eval backend { worker_id ; workflow_id } f x =
|
|
- let Worker worker = String.Table.find_exn backend.state.workers worker_id in
|
|
+ let Worker worker = Hashtbl.find_exn backend.state.workers worker_id in
|
|
let f () = f x in
|
|
let t, u = Lwt.wait () in
|
|
let job_waiter = Waiting_plugin { waiter = u ; f ; workflow_id } in
|
|
@@ -342,7 +342,7 @@ module Server = struct
|
|
t
|
|
|
|
let run_shell_command backend { worker_id ; workflow_id } cmd =
|
|
- let Worker worker = String.Table.find_exn backend.state.workers worker_id in
|
|
+ let Worker worker = Hashtbl.find_exn backend.state.workers worker_id in
|
|
let t, u = Lwt.wait () in
|
|
let job = Waiting_shell_command { waiter = u ; cmd ; workflow_id } in
|
|
Lwt_queue.push worker.pending_jobs job ;
|
|
diff --git a/lib/utils/dot_output.ml b/lib/utils/dot_output.ml
|
|
index 90c299f..d13fceb 100644
|
|
--- a/lib/utils/dot_output.ml
|
|
+++ b/lib/utils/dot_output.ml
|
|
@@ -24,7 +24,7 @@ module G = struct
|
|
(* let successors g u = fold_succ (fun h t -> h :: t) g u [] *)
|
|
|
|
let rec of_workflow_aux seen acc u =
|
|
- if S.mem seen u then (seen, acc)
|
|
+ if Set.mem seen u then (seen, acc)
|
|
else (
|
|
let deps = W.Any.deps u in
|
|
let seen, acc =
|
|
@@ -34,7 +34,7 @@ module G = struct
|
|
in
|
|
let acc = add_vertex acc u in
|
|
let acc = List.fold deps ~init:acc ~f:(fun acc v -> add_edge acc u v) in
|
|
- let seen = S.add seen u in
|
|
+ let seen = Set.add seen u in
|
|
seen, acc
|
|
)
|
|
|
|
@@ -109,7 +109,7 @@ let dot_output ?db oc g ~needed =
|
|
]
|
|
in
|
|
let vertex_attributes u =
|
|
- let needed = (match db with None -> true | Some _ -> false) || S.mem needed u in
|
|
+ let needed = (match db with None -> true | Some _ -> false) || Set.mem needed u in
|
|
let color = if needed then black else light_gray in
|
|
let shape = `Shape (shape u) in
|
|
let W.Any w = u in
|
|
@@ -141,7 +141,7 @@ let dot_output ?db oc g ~needed =
|
|
| _ -> []
|
|
in
|
|
let color =
|
|
- if (match db with None -> true | Some _ -> false) || (S.mem needed u && not (already_done u))
|
|
+ if (match db with None -> true | Some _ -> false) || (Set.mem needed u && not (already_done u))
|
|
then black else light_gray in
|
|
style @ [ `Color color ]
|
|
in
|
|
diff --git a/lib/utils/repo.ml b/lib/utils/repo.ml
|
|
index 06abcd5..206a99e 100644
|
|
--- a/lib/utils/repo.ml
|
|
+++ b/lib/utils/repo.ml
|
|
@@ -160,7 +160,7 @@ let protected_set repo =
|
|
| Select s -> fold_path_workflow acc (W.Any s.dir)
|
|
| Input _ -> acc
|
|
| Shell _
|
|
- | Plugin _ -> String.Set.add acc (W.id w)
|
|
+ | Plugin _ -> Set.add acc (W.id w)
|
|
| Trywith tw ->
|
|
fold_path_workflow (fold_path_workflow acc (W.Any tw.w)) (W.Any tw.failsafe)
|
|
| Ifelse ie ->
|
|
@@ -187,7 +187,7 @@ let cache_clip_fold ~bistro_dir repo ~f ~init =
|
|
let protected = protected_set repo in
|
|
let db = Db.init_exn bistro_dir in
|
|
Db.fold_cache db ~init ~f:(fun acc id ->
|
|
- f db acc (if String.Set.mem protected id then `Protected id else `Unprotected id)
|
|
+ f db acc (if Set.mem protected id then `Protected id else `Unprotected id)
|
|
)
|
|
|
|
let cache_clip_dry_run ~bistro_dir repo =
|