From c67f52bda725ede07749c95bd30b8093ca487696 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edwin=20T=C3=B6r=C3=B6k?= Date: Thu, 5 Dec 2024 17:01:25 +0000 Subject: [PATCH] CP-52821: xapi_periodic_scheduler: use Mtime.span instead of Mtime.t MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Avoids dealing with overflow Signed-off-by: Edwin Török --- .../xapi-stdext/lib/xapi-stdext-threads/ipq.ml | 14 +++++++------- .../xapi-stdext/lib/xapi-stdext-threads/ipq.mli | 2 +- .../lib/xapi-stdext-threads/ipq_test.ml | 14 +++++++------- .../lib/xapi-stdext-threads/scheduler.ml | 14 +++++--------- 4 files changed, 20 insertions(+), 24 deletions(-) diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq.ml b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq.ml index 4cf29ed3d9b..7293ae625e1 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq.ml +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq.ml @@ -13,7 +13,7 @@ *) (* Imperative priority queue *) -type 'a event = {ev: 'a; time: Mtime.t} +type 'a event = {ev: 'a; time: Mtime.span} type 'a t = {default: 'a event; mutable size: int; mutable data: 'a event array} @@ -23,7 +23,7 @@ let create n default = if n <= 0 then invalid_arg "create" else - let default = {ev= default; time= Mtime_clock.now ()} in + let default = {ev= default; time= Mtime_clock.elapsed ()} in {default; size= 0; data= Array.make n default} let is_empty h = h.size <= 0 @@ -45,7 +45,7 @@ let add h x = (* moving [x] up in the heap *) let rec moveup i = let fi = (i - 1) / 2 in - if i > 0 && Mtime.is_later d.(fi).time ~than:x.time then ( + if i > 0 && Mtime.Span.is_longer d.(fi).time ~than:x.time then ( d.(i) <- d.(fi) ; moveup fi ) else @@ -69,7 +69,7 @@ let remove h s = (* moving [x] up in the heap *) let rec moveup i = let fi = (i - 1) / 2 in - if i > 0 && Mtime.is_later d.(fi).time ~than:x.time then ( + if i > 0 && Mtime.Span.is_longer d.(fi).time ~than:x.time then ( d.(i) <- d.(fi) ; moveup fi ) else @@ -83,7 +83,7 @@ let remove h s = let j' = j + 1 in if j' < n && d.(j').time < d.(j).time then j' else j in - if Mtime.is_earlier d.(j).time ~than:x.time then ( + if Mtime.Span.is_shorter d.(j).time ~than:x.time then ( d.(i) <- d.(j) ; movedown j ) else @@ -93,7 +93,7 @@ let remove h s = in if s = n then () - else if Mtime.is_later d.(s).time ~than:x.time then + else if Mtime.Span.is_longer d.(s).time ~than:x.time then moveup s else movedown s ; @@ -129,7 +129,7 @@ let check h = let d = h.data in for i = 1 to h.size - 1 do let fi = (i - 1) / 2 in - let ordered = Mtime.is_later d.(i).time ~than:d.(fi).time in + let ordered = Mtime.Span.is_longer d.(i).time ~than:d.(fi).time in assert ordered done diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq.mli b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq.mli index b7c4974e642..19f8bf1e33f 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq.mli +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq.mli @@ -12,7 +12,7 @@ * GNU Lesser General Public License for more details. *) -type 'a event = {ev: 'a; time: Mtime.t} +type 'a event = {ev: 'a; time: Mtime.span} type 'a t diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq_test.ml b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq_test.ml index e8e64093e16..a9cc2611da8 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq_test.ml +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/ipq_test.ml @@ -17,7 +17,7 @@ module Ipq = Xapi_stdext_threads_scheduler.Ipq (* test we get "out of bound" exception calling Ipq.remove *) let test_out_of_index () = let q = Ipq.create 10 0 in - Ipq.add q {Ipq.ev= 123; Ipq.time= Mtime_clock.now ()} ; + Ipq.add q {Ipq.ev= 123; Ipq.time= Mtime_clock.elapsed ()} ; let is_oob = function | Invalid_argument s when String.ends_with ~suffix:" out of bounds" s -> true @@ -43,18 +43,18 @@ let test_leak () = let use_array () = array.(0) <- 'a' in let allocated = Atomic.make true in Gc.finalise (fun _ -> Atomic.set allocated false) array ; - Ipq.add q {Ipq.ev= use_array; Ipq.time= Mtime_clock.now ()} ; + Ipq.add q {Ipq.ev= use_array; Ipq.time= Mtime_clock.elapsed ()} ; Ipq.remove q 0 ; Gc.full_major () ; Gc.full_major () ; Alcotest.(check bool) "allocated" false (Atomic.get allocated) ; - Ipq.add q {Ipq.ev= default; Ipq.time= Mtime_clock.now ()} + Ipq.add q {Ipq.ev= default; Ipq.time= Mtime_clock.elapsed ()} (* test Ipq.is_empty call *) let test_empty () = let q = Ipq.create 10 0 in Alcotest.(check bool) "same value" true (Ipq.is_empty q) ; - Ipq.add q {Ipq.ev= 123; Ipq.time= Mtime_clock.now ()} ; + Ipq.add q {Ipq.ev= 123; Ipq.time= Mtime_clock.elapsed ()} ; Alcotest.(check bool) "same value" false (Ipq.is_empty q) ; Ipq.remove q 0 ; Alcotest.(check bool) "same value" true (Ipq.is_empty q) @@ -75,7 +75,7 @@ let set queue = Ipq.iter (fun d -> let t = d.time in - let t = Mtime.to_uint64_ns t in + let t = Mtime.Span.to_uint64_ns t in s := Int64Set.add t !s ) queue ; @@ -86,7 +86,7 @@ let test_old () = let s = ref Int64Set.empty in let add i = let ti = Random.int64 1000000L in - let t = Mtime.of_uint64_ns ti in + let t = Mtime.Span.of_uint64_ns ti in let e = {Ipq.time= t; Ipq.ev= i} in Ipq.add test e ; s := Int64Set.add ti !s @@ -123,7 +123,7 @@ let test_old () = let prev = ref 0L in for _ = 0 to 49 do let e = Ipq.pop_maximum test in - let t = Mtime.to_uint64_ns e.time in + let t = Mtime.Span.to_uint64_ns e.time in Alcotest.(check bool) (Printf.sprintf "%Ld bigger than %Ld" t !prev) true (t >= !prev) ; diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml index 582809d3da2..f46ec1715e5 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml @@ -31,17 +31,13 @@ let (queue : t Ipq.t) = Ipq.create 50 queue_default let lock = Mutex.create () -let add_span clock span = - (* return max value if the add overflows: spans are unsigned integers *) - match Mtime.add_span clock span with Some t -> t | None -> Mtime.max_stamp - let add_to_queue_span ?(signal = true) name ty start_span newfunc = with_lock lock (fun () -> - let ( ++ ) = add_span in + let ( ++ ) = Mtime.Span.add in Ipq.add queue { Ipq.ev= {func= newfunc; ty; name} - ; Ipq.time= Mtime_clock.now () ++ start_span + ; Ipq.time= Mtime_clock.elapsed () ++ start_span } ) ; if signal then Delay.signal delay @@ -83,8 +79,8 @@ let loop () = (* Doesn't happen often - the queue isn't usually empty *) else let next = with_lock lock (fun () -> Ipq.maximum queue) in - let now = Mtime_clock.now () in - if next.Ipq.time < now then ( + let now = Mtime_clock.elapsed () in + if Mtime.Span.is_shorter next.Ipq.time ~than:now then ( let todo = (with_lock lock (fun () -> Ipq.pop_maximum queue)).Ipq.ev in @@ -96,7 +92,7 @@ let loop () = add_to_queue ~signal:false todo.name todo.ty timer todo.func ) else (* Sleep until next event. *) let sleep = - Mtime.(span next.Ipq.time now) + Mtime.(Span.abs_diff next.Ipq.time now) |> Mtime.Span.(add ms) |> Clock.Timer.span_to_s in