Skip to content

Commit

Permalink
Merge pull request #108 from samoht/max-fids
Browse files Browse the repository at this point in the history
Allow clients to set the number of fids they will use
  • Loading branch information
samoht authored Dec 12, 2016
2 parents 6ee62ae + 315b2f5 commit 06acf16
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 30 deletions.
2 changes: 1 addition & 1 deletion appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ environment:

install:
- appveyor DownloadFile https://raw.githubusercontent.com/ocaml/ocaml-ci-scripts/master/appveyor-opam.sh
- "%CYG_ROOT%\\setup-x86.exe -qnNdO -R %CYG_ROOT% -s http://cygwin.mirror.constant.com -l C:/cygwin/var/cache/setup -P rsync -P patch -P diffutils -P make -P unzip -P git -P m4 -P perl -P mingw64-x86_64-gcc-core"
- "%CYG_ROOT%\\setup-x86.exe -qnNdO -R %CYG_ROOT% -s http://cygwin.mirror.constant.com -l C:/cygwin/var/cache/setup -P rsync -P patch -P make -P unzip -P git -P m4 -P perl -P mingw64-x86_64-gcc-core"

build_script:
- "%CYG_BASH% '${APPVEYOR_BUILD_FOLDER}/appveyor-opam.sh'"
74 changes: 58 additions & 16 deletions lib/protocol_9p_client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ module Make(Log: Protocol_9p_s.LOG)(FLOW: V1_LWT.FLOW) = struct
mutable wakeners: Response.payload Error.t Lwt.u Types.Tag.Map.t;
mutable free_tags: Types.Tag.Set.t;
free_tags_c: unit Lwt_condition.t;
mutable free_fids: Types.Fid.Set.t;
max_fids: int32;
mutable fids: Types.Fid.Set.t;
free_fids_c: unit Lwt_condition.t;
}

Expand Down Expand Up @@ -304,21 +305,62 @@ module Make(Log: Protocol_9p_s.LOG)(FLOW: V1_LWT.FLOW) = struct
Lwt.return (Ok x)
| { Response.payload = p } -> return_error p

let fid = function
| 0l -> (* 0 is the pre-allocated FS root *) None
| n ->
match Types.Fid.of_int32 n with
| Ok m -> Some m
| Error _ -> (* NOFID *) None

let rec random_fid ?(n=10) t =
if n = 0 then (
Log.info (fun l -> l "Cannot allocate a new random fid after 10 tries");
None
) else match fid @@ Random.int32 Int32.max_int with
| None -> random_fid ~n:(n-1) t
| Some f as r -> if Types.Fid.Set.mem f t.fids then random_fid t else r

let min_fid t =
match fid @@ Int32.pred Types.Fid.(to_int32 @@ Set.min_elt t.fids) with
| None -> random_fid t
| Some _ as r -> r

(* if max_fids is not reached, the allocation strategy is:
- pick max(allocated_fid) + 1
- if this is NOFID:
- pick min(allocated_fid) - 1
- if this is 0 or NOFID:
- pick a random fid until finding a non-allocated one
This means that keeping [0=(NOFID+1)] and [NOFID-1] always open
might be costly.*)
let next_fid t =
match Int32.of_int (Types.Fid.Set.cardinal t.fids) with
| 0l -> fid 1l (* 0l is pre-allocated for the FS root *)
| n ->
if n >= t.max_fids then None else
Types.Fid.Set.max_elt t.fids
|> Types.Fid.to_int32
|> Int32.succ
|> fid
|> function
| Some _ as r -> r
| None -> min_fid t

let rec allocate_fid t =
let open Lwt in
if t.free_fids = Types.Fid.Set.empty && (dispatcher_is_running t)
then (
Log.info (fun f -> f "FID pool exhausted (will wait for a free one; deadlock possible)");
Lwt_condition.wait t.free_fids_c >>= fun () -> allocate_fid t
) else
if not(dispatcher_is_running t)
then return (Error (`Msg "connection disconnected"))
else
let fid = Types.Fid.Set.min_elt t.free_fids in
t.free_fids <- Types.Fid.Set.remove fid t.free_fids;
return (Ok fid)
let open Lwt.Infix in
match next_fid t with
| None ->
if dispatcher_is_running t then (
Log.info (fun f -> f "FID pool exhausted (will wait for a free one; \
deadlock possible)");
Lwt_condition.wait t.free_fids_c >>= fun () -> allocate_fid t
) else
Lwt.return (Error (`Msg "connection disconnected"))
| Some fid ->
t.fids <- Types.Fid.Set.add fid t.fids;
Lwt.return (Ok fid)
let mark_fid_as_free t fid =
t.free_fids <- Types.Fid.Set.add fid t.free_fids;
t.fids <- Types.Fid.Set.remove fid t.fids;
Lwt_condition.signal t.free_fids_c ()
let deallocate_fid t fid =
let open Lwt in
Expand Down Expand Up @@ -507,7 +549,7 @@ module Make(Log: Protocol_9p_s.LOG)(FLOW: V1_LWT.FLOW) = struct
)

(* 8215 = 8192 + 23 (maximum overhead in a write packet) *)
let connect flow ?(msize = 8215l) ?(username = "nobody") ?(aname = "/") () =
let connect flow ?(msize = 8215l) ?(username = "nobody") ?(max_fids=100l) ?(aname = "/") () =
let reader = Reader.create flow in
let writer = flow in
LowLevel.version reader writer msize Types.Version.unix
Expand Down Expand Up @@ -538,7 +580,7 @@ module Make(Log: Protocol_9p_s.LOG)(FLOW: V1_LWT.FLOW) = struct
wakeners = Types.Tag.Map.empty;
free_tags = Types.Tag.recommended;
free_tags_c = Lwt_condition.create ();
free_fids = Types.Fid.recommended;
max_fids; fids = Types.Fid.Set.empty;
free_fids_c = Lwt_condition.create ();
} in
LowLevel.attach reader writer root Types.Fid.nofid username aname None
Expand Down
8 changes: 5 additions & 3 deletions lib/protocol_9p_client.mli
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,13 @@ module Make(Log: Protocol_9p_s.LOG)(FLOW: V1_LWT.FLOW) : sig
include S

val connect:
FLOW.flow -> ?msize:int32 -> ?username:string -> ?aname:string -> unit ->
FLOW.flow -> ?msize:int32 -> ?username:string -> ?max_fids:int32 ->
?aname:string -> unit ->
t Protocol_9p_error.t Lwt.t
(** Establish a fresh connection to a 9P server. [msize] gives the maximum
message size we support: the server may choose a lower value. [username]
is the username to present to the remote server. [aname] is the name of
the exported filesystem. *)
is the username to present to the remote server. [max_fids] is the default
number of maximum openened fids: by default it is set to [100].
[aname] is the name of the exported filesystem. *)

end
8 changes: 2 additions & 6 deletions lib_test/tests.ml
Original file line number Diff line number Diff line change
Expand Up @@ -150,15 +150,11 @@ let print_parse_response r () =
)

let test_requests = List.map (fun r ->
Fmt.strf "print then parse %a" Request.pp r,
`Quick,
print_parse_request r
"print then parse random request", `Quick, print_parse_request r
) requests

let test_responses = List.map (fun r ->
Fmt.strf "print then parse %a" Response.pp r,
`Quick,
print_parse_response r
Fmt.strf "print then parser random response", `Quick, print_parse_response r
) responses

let tests = [
Expand Down
4 changes: 2 additions & 2 deletions unix/client9p_unix.ml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ module Make(Log: S.LOG) = struct
let s = Lwt_unix.socket Lwt_unix.PF_UNIX Lwt_unix.SOCK_STREAM 0 in
connect_or_close s (Lwt_unix.ADDR_UNIX path)

let connect proto address ?msize ?username ?aname () =
let connect proto address ?msize ?username ?aname ?max_fids () =
( match proto, address with
| "tcp", _ ->
begin match String.cuts ~sep:":" address with
Expand All @@ -84,7 +84,7 @@ module Make(Log: S.LOG) = struct
Lwt.return (Error.error_msg "Unknown protocol %s" proto)
) >>*= fun s ->
let flow = Flow_lwt_unix.connect s in
Client.connect flow ?msize ?username ?aname ()
Client.connect flow ?msize ?username ?aname ?max_fids ()
>>= function
| Result.Error _ as err -> Lwt.return err
| Result.Ok client ->
Expand Down
6 changes: 4 additions & 2 deletions unix/client9p_unix.mli
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ module Make(Log: Protocol_9p.S.LOG) : sig

val connect:
string -> string -> ?msize:int32 -> ?username:string -> ?aname:string ->
unit -> t Protocol_9p.Error.t Lwt.t
?max_fids:int32 -> unit -> t Protocol_9p.Error.t Lwt.t
(** [connect proto address ?msize ?username ?aname ()] creates a 9P connection
over [proto] to [address] with an optional maximum message size [?msize]
and optional [?username] and authentication [?aname]. Allowed combinations
and optional [?username] and authentication [?aname]. [max_fids] is the
maximal numbers of files concurrently opened by the client.
Allowed combinations
of [proto] and [address] are:
- unix /path/to/file
- tcp ip:port
Expand Down

0 comments on commit 06acf16

Please sign in to comment.