From 3476a22562171cbb79de61359f11d627abe2586c Mon Sep 17 00:00:00 2001 From: Frediano Ziglio Date: Thu, 28 Nov 2024 14:47:51 +0000 Subject: [PATCH 1/9] Simple test for periodic scheduler Test that the event is correctly executed. Signed-off-by: Frediano Ziglio --- .../xapi-stdext/lib/xapi-stdext-threads/dune | 6 +-- .../lib/xapi-stdext-threads/scheduler_test.ml | 37 +++++++++++++++++++ .../xapi-stdext-threads/scheduler_test.mli | 0 3 files changed, 40 insertions(+), 3 deletions(-) create mode 100644 ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler_test.ml create mode 100644 ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler_test.mli diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune index 7fcff9e08c2..d8036380cd7 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/dune @@ -1,7 +1,7 @@ (library (public_name xapi-stdext-threads) (name xapi_stdext_threads) - (modules :standard \ ipq scheduler threadext_test ipq_test) + (modules :standard \ ipq scheduler threadext_test ipq_test scheduler_test) (libraries mtime mtime.clock.os @@ -22,8 +22,8 @@ ) (tests - (names threadext_test ipq_test) + (names threadext_test ipq_test scheduler_test) (package xapi-stdext-threads) - (modules threadext_test ipq_test) + (modules threadext_test ipq_test scheduler_test) (libraries xapi_stdext_threads alcotest mtime.clock.os mtime fmt threads.posix xapi_stdext_threads_scheduler) ) diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler_test.ml b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler_test.ml new file mode 100644 index 00000000000..272b0572943 --- /dev/null +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler_test.ml @@ -0,0 +1,37 @@ +(* + * Copyright (C) 2024 Cloud Software Group + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation; version 2.1 only. with the special + * exception on linking described in file LICENSE. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + *) + +module Scheduler = Xapi_stdext_threads_scheduler.Scheduler + +let started = Atomic.make false + +let start_schedule () = + if not (Atomic.exchange started true) then + Thread.create Scheduler.loop () |> ignore + +let send event data = Event.(send event data |> sync) + +let receive event = Event.(receive event |> sync) + +let test_single () = + let finished = Event.new_channel () in + Scheduler.add_to_queue "one" Scheduler.OneShot 0.001 (fun () -> + send finished true + ) ; + start_schedule () ; + Alcotest.(check bool) "result" true (receive finished) + +let tests = [("test_single", `Quick, test_single)] + +let () = Alcotest.run "Scheduler" [("generic", tests)] diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler_test.mli b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler_test.mli new file mode 100644 index 00000000000..e69de29bb2d From 624926133b9b821391c2f5cfcbde72ed35afdb87 Mon Sep 17 00:00:00 2001 From: Frediano Ziglio Date: Fri, 6 Dec 2024 09:56:53 +0000 Subject: [PATCH 2/9] Limit mutex contention in add_to_queue Signed-off-by: Frediano Ziglio --- .../lib/xapi-stdext-threads/scheduler.ml | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml index 3e8543ec04d..50c4c17d4b9 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml @@ -49,14 +49,11 @@ module Clock = struct end let add_to_queue ?(signal = true) name ty start newfunc = - with_lock lock (fun () -> - let ( ++ ) = Clock.add_span in - Ipq.add queue - { - Ipq.ev= {func= newfunc; ty; name} - ; Ipq.time= Mtime_clock.now () ++ start - } - ) ; + let ( ++ ) = Clock.add_span in + let item = + {Ipq.ev= {func= newfunc; ty; name}; Ipq.time= Mtime_clock.now () ++ start} + in + with_lock lock (fun () -> Ipq.add queue item) ; if signal then Delay.signal delay let remove_from_queue name = From f86c07666fd098b471511cd742c6bb08b8e21514 Mon Sep 17 00:00:00 2001 From: Frediano Ziglio Date: Fri, 6 Dec 2024 10:05:22 +0000 Subject: [PATCH 3/9] Compare correctly Mtime.t Do not use ">" or other operators to compare Mtime.t, the value is intended to be unsigned and should be compared with Int64.unsigned_compare as Mtime functions do. Signed-off-by: Frediano Ziglio --- ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml index 50c4c17d4b9..2e0f28f8800 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml @@ -72,7 +72,7 @@ let loop () = else let next = with_lock lock (fun () -> Ipq.maximum queue) in let now = Mtime_clock.now () in - if next.Ipq.time < now then ( + if Mtime.is_earlier next.Ipq.time ~than:now then ( let todo = (with_lock lock (fun () -> Ipq.pop_maximum queue)).Ipq.ev in From 2950dd91f171b1be1297e446a8585fa1a1e10555 Mon Sep 17 00:00:00 2001 From: Frediano Ziglio Date: Fri, 6 Dec 2024 10:10:19 +0000 Subject: [PATCH 4/9] Protect queue with mutex in remove_from_queue Signed-off-by: Frediano Ziglio --- ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml | 1 + 1 file changed, 1 insertion(+) diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml index 2e0f28f8800..03ee8ef976e 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml @@ -57,6 +57,7 @@ let add_to_queue ?(signal = true) name ty start newfunc = if signal then Delay.signal delay let remove_from_queue name = + with_lock lock @@ fun () -> let index = Ipq.find_p queue (fun {name= n; _} -> name = n) in if index > -1 then Ipq.remove queue index From 529eeaa98c9d225ac9a49c70094acfde4b6f62c7 Mon Sep 17 00:00:00 2001 From: Frediano Ziglio Date: Fri, 6 Dec 2024 10:17:45 +0000 Subject: [PATCH 5/9] Remove signal parameter from add_to_queue The parameter was false only to support an internal usage, external users should always alert the thread loop. Signed-off-by: Frediano Ziglio --- .../libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml | 8 ++++++-- .../xapi-stdext/lib/xapi-stdext-threads/scheduler.mli | 3 +-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml index 03ee8ef976e..a8c56dc47e8 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml @@ -48,7 +48,7 @@ module Clock = struct Mtime.min_stamp end -let add_to_queue ?(signal = true) name ty start newfunc = +let add_to_queue_internal ?(signal = true) name ty start newfunc = let ( ++ ) = Clock.add_span in let item = {Ipq.ev= {func= newfunc; ty; name}; Ipq.time= Mtime_clock.now () ++ start} @@ -56,6 +56,9 @@ let add_to_queue ?(signal = true) name ty start newfunc = with_lock lock (fun () -> Ipq.add queue item) ; if signal then Delay.signal delay +let add_to_queue name ty start newfunc = + add_to_queue_internal name ty start newfunc + let remove_from_queue name = with_lock lock @@ fun () -> let index = Ipq.find_p queue (fun {name= n; _} -> name = n) in @@ -82,7 +85,8 @@ let loop () = | OneShot -> () | Periodic timer -> - add_to_queue ~signal:false todo.name todo.ty timer todo.func + add_to_queue_internal ~signal:false todo.name todo.ty timer + todo.func ) else (* Sleep until next event. *) let sleep = Mtime.(span next.Ipq.time now) diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.mli b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.mli index b087a35c5cf..d4d19b1f790 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.mli +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.mli @@ -18,8 +18,7 @@ type func_ty = | OneShot (** Fire just once *) | Periodic of float (** Fire periodically with a given period in seconds *) -val add_to_queue : - ?signal:bool -> string -> func_ty -> float -> (unit -> unit) -> unit +val add_to_queue : string -> func_ty -> float -> (unit -> unit) -> unit (** Start a new timer. *) val remove_from_queue : string -> unit From 2c192c955825b7f8833c2e2565cc02fc887f44a6 Mon Sep 17 00:00:00 2001 From: Frediano Ziglio Date: Tue, 26 Nov 2024 22:30:31 +0000 Subject: [PATCH 6/9] Fix multiple issues in periodic scheduler - Do not wait huge amount of time if the queue is empty but use Delay.wait if possible; - Fix delete of periodic events. In case the event is processed it's removed from the queue. Previously remove_from_queue was not able to mark this event as removed; - Do not race between checking the first event and removing it. These 2 actions were done in 2 separate critical section, now they are done in a single one. Signed-off-by: Frediano Ziglio --- .../lib/xapi-stdext-threads/scheduler.ml | 79 ++++++++++++------- 1 file changed, 50 insertions(+), 29 deletions(-) diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml index a8c56dc47e8..a544ed79bbb 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler.ml @@ -27,6 +27,8 @@ let delay = Delay.make () let queue_default = {func= (fun () -> ()); ty= OneShot; name= ""} +let (pending_event : t option ref) = ref None + let (queue : t Ipq.t) = Ipq.create 50 queue_default let lock = Mutex.create () @@ -48,50 +50,68 @@ module Clock = struct Mtime.min_stamp end -let add_to_queue_internal ?(signal = true) name ty start newfunc = +let add_to_queue name ty start newfunc = let ( ++ ) = Clock.add_span in let item = {Ipq.ev= {func= newfunc; ty; name}; Ipq.time= Mtime_clock.now () ++ start} in with_lock lock (fun () -> Ipq.add queue item) ; - if signal then Delay.signal delay - -let add_to_queue name ty start newfunc = - add_to_queue_internal name ty start newfunc + Delay.signal delay let remove_from_queue name = with_lock lock @@ fun () -> - let index = Ipq.find_p queue (fun {name= n; _} -> name = n) in - if index > -1 then - Ipq.remove queue index + match !pending_event with + | Some ev when ev.name = name -> + pending_event := None + | Some _ | None -> + let index = Ipq.find_p queue (fun {name= n; _} -> name = n) in + if index > -1 then + Ipq.remove queue index + +let add_periodic_pending () = + with_lock lock @@ fun () -> + match !pending_event with + | Some ({ty= Periodic timer; _} as ev) -> + let ( ++ ) = Clock.add_span in + let item = {Ipq.ev; Ipq.time= Mtime_clock.now () ++ timer} in + Ipq.add queue item ; + pending_event := None + | Some {ty= OneShot; _} -> + pending_event := None + | None -> + () let loop () = debug "%s started" __MODULE__ ; try while true do - let empty = with_lock lock (fun () -> Ipq.is_empty queue) in - if empty then - Thread.delay 10.0 - (* Doesn't happen often - the queue isn't usually empty *) - else - let next = with_lock lock (fun () -> Ipq.maximum queue) in - let now = Mtime_clock.now () in - if Mtime.is_earlier next.Ipq.time ~than:now then ( - let todo = - (with_lock lock (fun () -> Ipq.pop_maximum queue)).Ipq.ev - in + let now = Mtime_clock.now () in + let deadline, item = + with_lock lock @@ fun () -> + (* empty: wait till we get something *) + if Ipq.is_empty queue then + (Clock.add_span now 10.0, None) + else + let next = Ipq.maximum queue in + if Mtime.is_later next.Ipq.time ~than:now then + (* not expired: wait till time or interrupted *) + (next.Ipq.time, None) + else ( + (* remove expired item *) + Ipq.pop_maximum queue |> ignore ; + (* save periodic to be scheduled again *) + if next.Ipq.ev.ty <> OneShot then pending_event := Some next.Ipq.ev ; + (now, Some next.Ipq.ev) + ) + in + match item with + | Some todo -> (try todo.func () with _ -> ()) ; - match todo.ty with - | OneShot -> - () - | Periodic timer -> - add_to_queue_internal ~signal:false todo.name todo.ty timer - todo.func - ) else (* Sleep until next event. *) + add_periodic_pending () + | None -> ( + (* Sleep until next event. *) let sleep = - Mtime.(span next.Ipq.time now) - |> Mtime.Span.(add ms) - |> Clock.span_to_s + Mtime.(span deadline now) |> Mtime.Span.(add ms) |> Clock.span_to_s in try ignore (Delay.wait delay sleep) with e -> @@ -107,6 +127,7 @@ let loop () = normal delay. New events may be missed." detailed_msg ; Thread.delay sleep + ) done with _ -> error From 935c84f865bcdafac73fe203d6c5c1f058a4f22d Mon Sep 17 00:00:00 2001 From: Frediano Ziglio Date: Thu, 28 Nov 2024 15:19:12 +0000 Subject: [PATCH 7/9] Add test for removing periodic event in periodic scheduler Potentially a periodic event can be cancelled while this is processed. Simulate this behavior removing the event inside the handler. This was fixed by previous commit. Signed-off-by: Frediano Ziglio --- .../lib/xapi-stdext-threads/scheduler_test.ml | 30 ++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler_test.ml b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler_test.ml index 272b0572943..8b0baeb74b1 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler_test.ml +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler_test.ml @@ -24,6 +24,12 @@ let send event data = Event.(send event data |> sync) let receive event = Event.(receive event |> sync) +let elapsed_ms cnt = + let elapsed_ns = Mtime_clock.count cnt |> Mtime.Span.to_uint64_ns in + Int64.(div elapsed_ns 1000000L |> to_int) + +let is_less = Alcotest.(testable (pp int)) Stdlib.( > ) + let test_single () = let finished = Event.new_channel () in Scheduler.add_to_queue "one" Scheduler.OneShot 0.001 (fun () -> @@ -32,6 +38,28 @@ let test_single () = start_schedule () ; Alcotest.(check bool) "result" true (receive finished) -let tests = [("test_single", `Quick, test_single)] +let test_remove_self () = + let which = Event.new_channel () in + Scheduler.add_to_queue "self" (Scheduler.Periodic 0.001) 0.001 (fun () -> + (* this should remove the periodic scheduling *) + Scheduler.remove_from_queue "self" ; + (* add an operation to stop the test *) + Scheduler.add_to_queue "stop" Scheduler.OneShot 0.1 (fun () -> + send which "stop" + ) ; + send which "self" + ) ; + start_schedule () ; + let cnt = Mtime_clock.counter () in + Alcotest.(check string) "same event name" "self" (receive which) ; + Alcotest.(check string) "same event name" "stop" (receive which) ; + let elapsed_ms = elapsed_ms cnt in + Alcotest.check is_less "small time" 300 elapsed_ms + +let tests = + [ + ("test_single", `Quick, test_single) + ; ("test_remove_self", `Quick, test_remove_self) + ] let () = Alcotest.run "Scheduler" [("generic", tests)] From 60e12576ac08f6db431b1ddb251cba0b54c8d30e Mon Sep 17 00:00:00 2001 From: Frediano Ziglio Date: Thu, 28 Nov 2024 16:32:38 +0000 Subject: [PATCH 8/9] Add test for handling event if queue was empty in periodic scheduler Previously if the queue was empty and the loop thread was active the scheduler took quite some time to pick up the new event. Check that this is done in a timely fashion to avoid regressions in code. Signed-off-by: Frediano Ziglio --- .../lib/xapi-stdext-threads/scheduler_test.ml | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler_test.ml b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler_test.ml index 8b0baeb74b1..2828b3a10a3 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler_test.ml +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler_test.ml @@ -56,10 +56,28 @@ let test_remove_self () = let elapsed_ms = elapsed_ms cnt in Alcotest.check is_less "small time" 300 elapsed_ms +let test_empty () = + let finished = Event.new_channel () in + Scheduler.add_to_queue "one" Scheduler.OneShot 0.001 (fun () -> + send finished true + ) ; + start_schedule () ; + Alcotest.(check bool) "finished" true (receive finished) ; + (* wait loop to go to wait with no work to do *) + Thread.delay 0.1 ; + Scheduler.add_to_queue "two" Scheduler.OneShot 0.001 (fun () -> + send finished true + ) ; + let cnt = Mtime_clock.counter () in + Alcotest.(check bool) "finished" true (receive finished) ; + let elapsed_ms = elapsed_ms cnt in + Alcotest.check is_less "small time" 100 elapsed_ms + let tests = [ ("test_single", `Quick, test_single) ; ("test_remove_self", `Quick, test_remove_self) + ; ("test_empty", `Quick, test_empty) ] let () = Alcotest.run "Scheduler" [("generic", tests)] From 88dd4d9f5f2969532c43fce16177b21d9d7ac6e8 Mon Sep 17 00:00:00 2001 From: Frediano Ziglio Date: Mon, 9 Dec 2024 14:50:27 +0000 Subject: [PATCH 9/9] Add a test to check the loop is woken up adding a new event Similar to test_empty test however the state of the loop should be different. Signed-off-by: Frediano Ziglio --- .../lib/xapi-stdext-threads/scheduler_test.ml | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler_test.ml b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler_test.ml index 2828b3a10a3..0a4a847403f 100644 --- a/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler_test.ml +++ b/ocaml/libs/xapi-stdext/lib/xapi-stdext-threads/scheduler_test.ml @@ -73,11 +73,31 @@ let test_empty () = let elapsed_ms = elapsed_ms cnt in Alcotest.check is_less "small time" 100 elapsed_ms +let test_wakeup () = + let which = Event.new_channel () in + (* schedule a long event *) + Scheduler.add_to_queue "long" Scheduler.OneShot 2.0 (fun () -> + send which "long" + ) ; + start_schedule () ; + (* wait loop to go to wait with no work to do *) + Thread.delay 0.1 ; + let cnt = Mtime_clock.counter () in + (* schedule a quick event, should wake up the loop *) + Scheduler.add_to_queue "quick" Scheduler.OneShot 0.1 (fun () -> + send which "quick" + ) ; + Alcotest.(check string) "same event name" "quick" (receive which) ; + Scheduler.remove_from_queue "long" ; + let elapsed_ms = elapsed_ms cnt in + Alcotest.check is_less "small time" 150 elapsed_ms + let tests = [ ("test_single", `Quick, test_single) ; ("test_remove_self", `Quick, test_remove_self) ; ("test_empty", `Quick, test_empty) + ; ("test_wakeup", `Quick, test_wakeup) ] let () = Alcotest.run "Scheduler" [("generic", tests)]