Skip to content

Commit

Permalink
CP-32622: replace select in proxy with polly
Browse files Browse the repository at this point in the history
Signed-off-by: Edwin Török <[email protected]>
  • Loading branch information
edwintorok committed Nov 28, 2023
1 parent 1ac6224 commit 95197c9
Showing 1 changed file with 40 additions and 18 deletions.
58 changes: 40 additions & 18 deletions lib/xapi-stdext-unix/unixext.ml
Original file line number Diff line number Diff line change
Expand Up @@ -409,26 +409,52 @@ let string_of_signal x =
then List.assoc x table
else (Printf.sprintf "(ocaml signal %d with an unknown name)" x)

let with_polly f =
let polly = Polly.create () in
let finally () = Polly.close polly in
Xapi_stdext_pervasives.Pervasiveext.finally
(fun () -> f polly) finally


let proxy (a: Unix.file_descr) (b: Unix.file_descr) =
let size = 64 * 1024 in
(* [a'] is read from [a] and will be written to [b] *)
(* [b'] is read from [b] and will be written to [a] *)
let a' = CBuf.empty size and b' = CBuf.empty size in
Unix.set_nonblock a;
Unix.set_nonblock b;
with_polly @@ fun polly ->

try
while true do
let r = (if CBuf.should_read a' then [ a ] else []) @ (if CBuf.should_read b' then [ b ] else []) in
let w = (if CBuf.should_write a' then [ b ] else []) @ (if CBuf.should_write b' then [ a ] else []) in

let any = ref false in
if CBuf.should_read a' then begin
Polly.add polly a Polly.Events.(inp lor oneshot);
any := true
end;
if CBuf.should_read b' then begin
Polly.add polly b Polly.Events.(inp lor oneshot);
any := true
end;
if CBuf.should_write a' then begin
Polly.add polly b Polly.Events.(out lor oneshot);
any := true
end;
if CBuf.should_write b' then begin
Polly.add polly a Polly.Events.(out lor oneshot);
any := true
end;

(* If we can't make any progress (because fds have been closed), then stop *)
if r = [] && w = [] then raise End_of_file;

let r, w, _ = Unix.select r w [] (-1.0) in
(* Do the writing before the reading *)
List.iter (fun fd -> if a = fd then CBuf.write b' a else CBuf.write a' b) w;
List.iter (fun fd -> if a = fd then CBuf.read a' a else CBuf.read b' b) r;
if not !any then raise End_of_file;

Polly.wait_fold polly 4 (-1) () (fun _polly fd events () ->
(* Do the writing before the reading *)
if Polly.Events.(test out events) then
if a = fd then CBuf.write b' a else CBuf.write a' b;
if Polly.Events.(test inp events) then
if a = fd then CBuf.read a' a else CBuf.read b' b;
);
(* If there's nothing else to read or write then signal the other end *)
List.iter
(fun (buf, fd) ->
Expand Down Expand Up @@ -509,11 +535,8 @@ let to_milliseconds ms = ms *. 1000. |> int_of_float
[setsockopt_float] to set the timeout
[clear_nonblock] to ensure the socket is non-blocking
*)
let with_polly kind fd f =
let polly = Polly.create () in
let finally () = Polly.close polly in
Xapi_stdext_pervasives.Pervasiveext.finally
(fun () ->
let with_polly_wait kind fd f =
with_polly @@ fun polly ->
Polly.add polly fd kind;
let wait remaining_time =
if remaining_time < 0. then raise Timeout;
Expand All @@ -524,13 +547,12 @@ let with_polly kind fd f =
if ready = 0 then raise Timeout
in
f wait fd
) finally

(* Write as many bytes to a file descriptor as possible from data before a given clock time. *)
(* Raises Timeout exception if the number of bytes written is less than the specified length. *)
(* Writes into the file descriptor at the current cursor position. *)
let time_limited_write_internal (write : Unix.file_descr -> 'a -> int -> int -> int) filedesc length data target_response_time =
with_polly Polly.Events.out filedesc @@ fun wait filedesc ->
with_polly_wait Polly.Events.out filedesc @@ fun wait filedesc ->
let total_bytes_to_write = length in
let bytes_written = ref 0 in
let now = ref (Unix.gettimeofday()) in
Expand All @@ -555,7 +577,7 @@ let time_limited_write_substring filedesc length data target_response_time =
(* Raises Timeout exception if the number of bytes read is less than the desired number. *)
(* Reads from the file descriptor at the current cursor position. *)
let time_limited_read filedesc length target_response_time =
with_polly Polly.Events.inp filedesc @@ fun wait filedesc ->
with_polly_wait Polly.Events.inp filedesc @@ fun wait filedesc ->
let total_bytes_to_read = length in
let bytes_read = ref 0 in
let buf = Bytes.make total_bytes_to_read '\000' in
Expand All @@ -573,7 +595,7 @@ let time_limited_read filedesc length target_response_time =

let time_limited_single_read filedesc length ~max_wait =
let buf = Bytes.make length '\000' in
with_polly Polly.Events.inp filedesc @@ fun wait filedesc ->
with_polly_wait Polly.Events.inp filedesc @@ fun wait filedesc ->
wait max_wait;
let bytes =
try Unix.read filedesc buf 0 length
Expand Down

0 comments on commit 95197c9

Please sign in to comment.