Skip to content


CP-47001: [xapi-fdcaps]: wrap more Unix operations
Browse files Browse the repository at this point in the history
  • Loading branch information
edwintorok committed Dec 21, 2023
1 parent d2fac79 commit ab54dc4
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 0 deletions.
79 changes: 79 additions & 0 deletions lib/xapi-fdcaps/
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ let pp ppf =

let close t = Safefd.idempotent_close_exn t.fd

let fsync t = Unix.fsync (Safefd.unsafe_to_file_descr_exn t.fd)

let with_fd t f =
let finally () = close t in
Fun.protect ~finally (fun () -> f t)
Expand Down Expand Up @@ -107,6 +109,14 @@ let creat path flags perm =
(Unix.O_RDWR :: Unix.O_CREAT :: Unix.O_EXCL :: Unix.O_CLOEXEC :: flags)

let kind_of_fd fd = of_unix_kind Unix.LargeFile.((fstat fd).st_kind)

let stdin = make_ro_exn (kind_of_fd Unix.stdin) Unix.stdin

let stdout = make_wo_exn (kind_of_fd Unix.stdout) Unix.stdout

let stderr = make_wo_exn (kind_of_fd Unix.stderr) Unix.stderr

let dev_null_out () = openfile_wo `chr "/dev/null" []

let dev_null_in () = openfile_ro `chr "/dev/null" []
Expand All @@ -122,6 +132,9 @@ let shutdown_send t =
let shutdown_all t =
Unix.shutdown (Safefd.unsafe_to_file_descr_exn t.fd) Unix.SHUTDOWN_ALL

let setsockopt_float t opt value =
Unix.setsockopt_float (Safefd.unsafe_to_file_descr_exn t.fd) opt value

let ftruncate t size =
match t.custom_ftruncate with
| None ->
Expand All @@ -138,6 +151,18 @@ let read t buf off len =
let single_write_substring t buf off len =
Unix.single_write_substring (Safefd.unsafe_to_file_descr_exn t.fd) buf off len

let fstat t = Unix.LargeFile.fstat (Safefd.unsafe_to_file_descr_exn t.fd)

let dup t =
t with
|> Safefd.unsafe_to_file_descr_exn
|> Unix.dup
|> Safefd.of_file_descr

let set_nonblock t = Unix.set_nonblock (Safefd.unsafe_to_file_descr_exn t.fd)

let clear_nonblock t = Unix.clear_nonblock (Safefd.unsafe_to_file_descr_exn t.fd)
Expand Down Expand Up @@ -200,6 +225,60 @@ let with_temp_blk ?(sector_size = 512) name f =

let setup () = Sys.set_signal Sys.sigpipe Sys.Signal_ignore

type ('a, 'b) operation = 'a t -> 'b -> int -> int -> int

let repeat_read op fd buf off len =
let rec loop consumed =
let off = off + consumed and len = len - consumed in
if len = 0 then
consumed (* we filled the buffer *)
match op fd buf off len with
| 0 (* EOF *)
| (exception
) (* connection error or non-blocking socket *) ->
| n ->
assert (n >= 0) ;
assert (n <= len) ;
loop (consumed + n)
loop 0

let repeat_write op fd buf off len =
let rec loop written =
let off = off + written and len = len - written in
if len = 0 then
written (* we've written the entire buffer *)
match op fd buf off len with
| 0
(* should never happen, but we cannot retry now or we'd enter an infinite loop *)
| (exception
, _
, _
) (* connection error or nonblocking socket *) ->
| n ->
assert (n >= 0) ;
assert (n <= len) ;
loop (written + n)
loop 0

module For_test = struct
let unsafe_fd_exn t = Safefd.unsafe_to_file_descr_exn t.fd
57 changes: 57 additions & 0 deletions lib/xapi-fdcaps/operations.mli
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,26 @@ end

(** {1 {!mod:Unix} wrappers} *)

val stdin : ([> rdonly], kind) make
(** [stdin] is a readonly file descriptor of unknown kind *)

val stdout : ([> wronly], kind) make
(** [stdout] is a writeonly file descriptor of unknown kind *)

val stderr : ([> wronly], kind) make
(** [stderr] is a writeonly file descriptor of unknown kind *)

val close : _ t -> unit
(** [close t] closes t. Doesn't raise an exception if it is already closed.
Other errors from the underlying {!val:Unix.close} are propagated.

val fsync : _ t -> unit
(** [fsync t] flushes [t] buffer to disk.
Note that the file doesn't necessarily have to be writable, e.g. you can fsync a readonly open directory.

val pipe : unit -> ([> rdonly], [> fifo]) make * ([> wronly], [> fifo]) make
(** [pipe ()] creates an unnamed pipe.
@see {!val:Unix.pipe}
Expand Down Expand Up @@ -173,6 +188,12 @@ val single_write_substring :
@see {!Unix.single_write_substring}

val fstat : _ t -> Unix.LargeFile.stats
(** [fstat t] is {!val:Unix.LargeFile.fstat} *)

val dup : 'a t -> 'a t
(** [dup t] is {!val:Unix.dup} on [t]. *)

val set_nonblock : (_, [< espipe]) make -> unit
(** [set_nonblock t].
Expand All @@ -191,6 +212,10 @@ val clear_nonblock : _ t -> unit
@see {!Unix.clear_nonblock}

val setsockopt_float :
(_, [< sock]) make -> Unix.socket_float_option -> float -> unit
(** [set_sockopt_float t opt val] sets the socket option [opt] to [val] for [t]. *)

(** {1 Temporary files} *)

val with_tempfile :
Expand All @@ -209,6 +234,38 @@ val with_temp_blk :
@param sector_size between 512 and 4096

(** {1 Operation wrappers}
The low-level {!val:read} and {!val:single_write_substring} can raise different exceptions
to mean end-of-file/disconnected depending on the file's kind.
If you want to consider disconnectins as end-of-file then use these wrappers.

(** a buffered operation on a file descriptors.
@see {!val:read} and {!val:single_write_substring}
type ('a, 'b) operation = 'a t -> 'b -> int -> int -> int

val repeat_read : ('a, bytes) operation -> ('a, bytes) operation
(** [repeat_read op buf off len] repeats [op] on the supplied buffer until EOF or a connection error is encountered.
The following connection errors are treated as EOF and are not reraised:
{!val:Unix.ECONNRESET}, {!val:Unix.ENOTCONN}.
{!val:Unix.EAGAIN} and {!val:Unix.EWOULDBLOCK} also cause the iteration to stop.
The returned value may be less than [len] if EOF was encountered.

val repeat_write : ('a, string) operation -> ('a, string) operation
(** [repeat_write op buf off len] repeats [op] on the supplied buffer until a connection error is encountered or the entire buffer is written.
The following are treated as connection errors and not reraised:
{!val:Unix.ECONNRESET}, {!val:Unix.EPIPE}, {!val:Unix.ENETDOWN}, {!val:Unix.ENETUNREACH}
{!val:Unix.EAGAIN} and {!val:Unix.EWOULDBLOCK} also cause the iteration to stop.
The returned value may be less than [len] if we were not able to complete the write due to a connection error.


module For_test : sig
Expand Down
18 changes: 18 additions & 0 deletions lib/xapi-fdcaps/
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,24 @@ let to_unix_kind =
| #sock ->

let of_unix_kind =
let open Unix in
| S_REG ->
| S_BLK ->
| S_CHR ->
| S_DIR ->
| S_LNK ->
| S_FIFO ->
| S_SOCK ->

let pp_kind fmt = Fmt.using to_unix_kind Safefd.pp_kind fmt

let pp fmt =
Expand Down
3 changes: 3 additions & 0 deletions lib/xapi-fdcaps/properties.mli
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ val as_writable_opt : ([< rw], 'a) t -> ([> writable], 'a) t option
val to_unix_kind : kind -> Unix.file_kind
(** [to_unix_kind kind] converts the polymorphic variant [kind] to {!type:Unix.file_kind} *)

val of_unix_kind : Unix.file_kind -> kind
(** [of_unix_kind kind] converts the {!type:Unix.file_kind} to {!type:kind}. *)

(** pipe, FIFO or socket that may raise {!val:Unix.ESPIPE} *)
type espipe = [fifo | sock]

Expand Down
41 changes: 41 additions & 0 deletions lib/xapi-fdcaps/test/
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,45 @@ let test_creat () =
let@ fd2 = with_fd @@ openfile_rw `reg name [] in
pp Fmt.stdout fd2 ; read_fd fd2 ; write_fd fd2

let test_repeat_read () =
let buf = String.init 255 Char.chr in
let read _ dst off len =
let available = String.length buf - off in
let len = Int.min len 11 in
let len = Int.min len available in
Bytes.blit_string buf off dst off len ;
let dst = Bytes.make 300 '_' in
let@ placeholder = with_fd @@ dev_zero () in
(* not actually used, just to make the types work, we simulate the read using string ops *)
let actual = repeat_read read placeholder dst 0 (Bytes.length dst) in
Alcotest.(check' int) ~msg:"amount read" ~actual ~expected:(String.length buf) ;
Alcotest.(check' string)
~actual:(Bytes.sub_string dst 0 actual)

let test_repeat_write () =
let buf = Bytes.make 255 '_' in
let write _ src off len =
let available = Bytes.length buf - off in
let len = Int.min len 11 in
let len = Int.min len available in
Bytes.blit_string src off buf off len ;
let src = String.init 255 Char.chr in
let@ placeholder = with_fd @@ dev_zero () in
(* not actually used, just to make the types work, we simulate the read using string ops *)
let actual = repeat_write write placeholder src 0 (String.length src) in
Alcotest.(check' int)
~msg:"amount written" ~actual ~expected:(Bytes.length buf) ;
Alcotest.(check' string)
~actual:(Bytes.sub_string buf 0 actual)

let tests =
Expand All @@ -238,6 +277,8 @@ let tests =
; test_case "socket shutdown write" `Quick test_sock_shutdown_w
; test_case "socket shutdown both" `Quick test_sock_shutdown_all
; test_case "create" `Quick test_creat
; test_case "repeat_read" `Quick test_repeat_read
; test_case "repeat_write" `Quick test_repeat_write

(* this must be the last test *)
Expand Down

0 comments on commit ab54dc4

Please sign in to comment.