Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use buffer pool to avoid large buffer allocation for connections and responses #87

Open
wants to merge 7 commits into
base: branch-2.6
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/baselib/.depend
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
ocsigen_loader.cmi :
ocsigen_cache.cmi :
ocsigen_buffer_pool.cmi :
ocsigen_stream.cmi :
ocsigen_config.cmi : ocsigen_lib.cmi
ocsigen_messages.cmi :
Expand All @@ -13,6 +14,8 @@ dynlink_wrapper.nonatdynlink.cmo :
dynlink_wrapper.nonatdynlink.cmx :
ocsigen_cache.cmo : ocsigen_cache.cmi
ocsigen_cache.cmx : ocsigen_cache.cmi
ocsigen_buffer_pool.cmo : ocsigen_buffer_pool.cmi
ocsigen_buffer_pool.cmx : ocsigen_buffer_pool.cmi
ocsigen_commandline.cmo : ocsigen_getcommandline.cmi ocsigen_config.cmi
ocsigen_commandline.cmx : ocsigen_getcommandline.cmi ocsigen_config.cmx
ocsigen_config.cmo : ocsigen_lib.cmi ocsigen_config.cmi
Expand Down
1 change: 1 addition & 0 deletions src/baselib/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ all: byte opt
FILES := ocsigen_lib_base.ml \
ocsigen_lib.ml \
ocsigen_cache.ml \
ocsigen_buffer_pool.ml \
ocsigen_config.ml \
ocsigen_commandline.ml \
ocsigen_messages.ml \
Expand Down
122 changes: 122 additions & 0 deletions src/baselib/ocsigen_buffer_pool.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
(* Copyright (C) 2015 Mauricio Fernandez
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, with linking exception;
* either version 2.1 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*)

module POOL :
sig
type 'a t

val make : (unit -> 'a) -> int -> 'a t
val take : 'a t -> 'a
val give_back : 'a t -> 'a -> unit
end =
struct
type 'a t =
{
create : unit -> 'a;
q : 'a Stack.t;
capacity : int;
mutable q_size : int;
}

let make create capacity =
{ create; capacity;
q_size = 0;
q = Stack.create ();
}

let take t =
if Stack.is_empty t.q then
t.create ()
else
let x = Stack.pop t.q in
t.q_size <- t.q_size - 1;
x

let give_back t x =
if t.q_size < t.capacity then begin
Stack.push x t.q;
t.q_size <- t.q_size + 1;
end
end

let round_to_pow2 n =
let m = ref 1 in
while !m < n do
m := !m * 2;
done;
!m

let is_pow2 n =
let m = n lor (n lsr 1) in
let m = m lor (m lsr 2) in
let m = m lor (m lsr 4) in
let m = m lor (m lsr 8) in
let m = m lor (m lsr 16) in
n land (m lsr 1) = 0

let make_buffer_pool ?(min_size = 64) ?(max_size = 65536) create capacity =
let h = Hashtbl.create 13 in

let get_pool size =
try
Hashtbl.find h size
with Not_found ->
let p = POOL.make (fun () -> create size) (capacity size) in
Hashtbl.add h size p;
p in

let get ~exact size =
if (exact && not (is_pow2 size)) || size < min_size || size > max_size then
(create size, (fun () -> ()))
else
let p = get_pool (round_to_pow2 size) in
let x = POOL.take p in

let released = ref false in

let release () =
if not !released then begin
released := true;
POOL.give_back p x
end
in
(x, release)
in
(`Round_up (get ~exact:false), `Exact (get ~exact:true))

let `Round_up get_bytes, `Exact get_bytes_exact =
make_buffer_pool Bytes.create (fun _ -> 256)

let `Round_up get_lwt_bytes, `Exact get_lwt_bytes_exact =
make_buffer_pool Lwt_bytes.create (fun _ -> 256)

let dummy_get_bytes n = (Bytes.create n, (fun () -> ()))
let dummy_get_lwt_bytes n = (Lwt_bytes.create n, (fun () -> ()))

let get_bytes, get_bytes_exact =
try
ignore (Unix.getenv "OCSIGEN_DISABLE_BUFFER_POOL");
(dummy_get_bytes, dummy_get_bytes)
with Not_found ->
(get_bytes, get_bytes_exact)

let get_lwt_bytes, get_lwt_bytes_exact =
try
ignore (Unix.getenv "OCSIGEN_DISABLE_BUFFER_POOL");
(dummy_get_lwt_bytes, dummy_get_lwt_bytes)
with Not_found ->
(get_lwt_bytes, get_lwt_bytes_exact)
87 changes: 87 additions & 0 deletions src/baselib/ocsigen_buffer_pool.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
(* Copyright (C) 2015 Mauricio Fernandez
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, with linking exception;
* either version 2.1 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*)

(**
Buffer pools.

Not (preemptive) thread safe.

@author Mauricio Fernandez
*)

(** [make_buffer_pool ?min_size ?max_size make capacity]
* returns two functions [`Round_up f] and [`Exact g] that are invoked as
* in [f wanted_size] and return a buffer of the wanted size and a function
* to release the buffer and return it to the pool. These functions do not
* block and will return a buffer immediately.
*
* [f] will round up the size to a power of two. [g] will not, and will thus
* allocate fresh buffers if invoked with sizes that are not powers of two.
*
* [make_buffer_pool] is used as follows:
*
* {[
* let `Round_up get_buf, `Exact get_buf_exact =
* make_buffer_pool ?min_size ?max_size allocate_buffer
* (fun n -> how_many_buffers_to_keep_at_most n)
*
* ...
* let b, release = get_buf 4096 in
* ...
*
* release ()
* ]}
*
* The release function can be invoked any number of times (the second and
* later calls are NOPs). If not invoked, the buffer will not be returned to
* the pool, but this is usually harmless (it only means there will be more
* allocation later in time).
*
* The returned buffers must not be used after they have been released, as
* they might have been reused at that point.
*
* @param min_size buffers of size under [min_size] are not pooled (and
* always freshly allocated with [make size] (default: 64)
*
* @param max_size buffers of size over [max_size] are not pooled (and
* always freshly allocated with [make size] (default: 65536)
*
* @param capacity is used to compute how many buffers to retain at most (as a
* function of the corresponding size). If [capacity n] returns [m], that
* means that the the pool will hold at most [m] buffers. Note that buffers
* are allocated lazily (only when the pool is empty), and the actual number
* of buffers (of a given size) allocated depends on how many are used
* simultaneously (before being released).
* *)

val make_buffer_pool :
?min_size:int ->
?max_size:int ->
(int -> 'a) ->
(int -> int) ->
[`Round_up of int -> 'a * (unit -> unit) ] *
[`Exact of int -> 'a * (unit -> unit) ]

(** {2 Buffer allocation against internal Ocsigen pools.}
*
* Pooling can be disabled altogether (for benchmark or other purposes) by
* setting the [OCSIGEN_DISABLE_BUFFER_POOL] environment variable.
* *)
val get_bytes : int -> Bytes.t * (unit -> unit)
val get_bytes_exact : int -> Bytes.t * (unit -> unit)
val get_lwt_bytes : int -> Lwt_bytes.t * (unit -> unit)
val get_lwt_bytes_exact : int -> Lwt_bytes.t * (unit -> unit)
10 changes: 6 additions & 4 deletions src/extensions/.depend
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@ cors.cmx : ../server/ocsigen_request_info.cmx ../baselib/ocsigen_lib.cmx \
../http/ocsigen_http_frame.cmx ../server/ocsigen_extensions.cmx \
../http/http_headers.cmx ../http/framepp.cmx
deflatemod.cmo : ../baselib/ocsigen_stream.cmi \
../server/ocsigen_request_info.cmi ../http/ocsigen_http_frame.cmi \
../http/ocsigen_headers.cmi ../server/ocsigen_extensions.cmi \
../server/ocsigen_request_info.cmi ../baselib/ocsigen_lib.cmi \
../http/ocsigen_http_frame.cmi ../http/ocsigen_headers.cmi \
../server/ocsigen_extensions.cmi ../baselib/ocsigen_buffer_pool.cmi \
../http/http_headers.cmi
deflatemod.cmx : ../baselib/ocsigen_stream.cmx \
../server/ocsigen_request_info.cmx ../http/ocsigen_http_frame.cmx \
../http/ocsigen_headers.cmx ../server/ocsigen_extensions.cmx \
../server/ocsigen_request_info.cmx ../baselib/ocsigen_lib.cmx \
../http/ocsigen_http_frame.cmx ../http/ocsigen_headers.cmx \
../server/ocsigen_extensions.cmx ../baselib/ocsigen_buffer_pool.cmx \
../http/http_headers.cmx
extendconfiguration.cmo : ../server/ocsigen_parseconfig.cmi \
../server/ocsigen_extensions.cmi ../http/ocsigen_cookies.cmi \
Expand Down
6 changes: 4 additions & 2 deletions src/extensions/deflatemod.ml
Original file line number Diff line number Diff line change
Expand Up @@ -149,19 +149,21 @@ and next_cont oz stream =
(* deflate param : true = deflate ; false = gzip (no header in this case) *)
let compress deflate stream =
let zstream = Zlib.deflate_init !compress_level deflate in
let buf, release = Ocsigen_buffer_pool.get_bytes !buffer_size in
let finalize status =
Ocsigen_stream.finalize stream status >>= fun e ->
(try
Zlib.deflate_end zstream
with
(* ignore errors, deflate_end cleans everything anyway *)
Zlib.Error _ -> ());
release ();
return (Lwt_log.ign_info ~section "Zlib stream closed") in
let oz =
{ stream = zstream ;
buf = Bytes.create !buffer_size;
buf;
pos = 0;
avail = !buffer_size
avail = Bytes.length buf;
} in
let new_stream () = next_cont oz (Ocsigen_stream.get stream) in
Lwt_log.ign_info ~section "Zlib stream initialized" ;
Expand Down
14 changes: 8 additions & 6 deletions src/http/.depend
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ ocsigen_headers.cmx : ocsigen_senders.cmx ../baselib/ocsigen_lib.cmx \
ocsigen_headers.cmi
ocsigen_http_com.cmo : ../baselib/ocsigen_stream.cmi \
../baselib/ocsigen_lib.cmi ocsigen_http_frame.cmi ocsigen_cookies.cmi \
../baselib/ocsigen_config.cmi http_lexer.cmo http_headers.cmi framepp.cmi \
ocsigen_http_com.cmi
../baselib/ocsigen_config.cmi ../baselib/ocsigen_buffer_pool.cmi \
http_lexer.cmo http_headers.cmi framepp.cmi ocsigen_http_com.cmi
ocsigen_http_com.cmx : ../baselib/ocsigen_stream.cmx \
../baselib/ocsigen_lib.cmx ocsigen_http_frame.cmx ocsigen_cookies.cmx \
../baselib/ocsigen_config.cmx http_lexer.cmx http_headers.cmx framepp.cmx \
ocsigen_http_com.cmi
../baselib/ocsigen_config.cmx ../baselib/ocsigen_buffer_pool.cmx \
http_lexer.cmx http_headers.cmx framepp.cmx ocsigen_http_com.cmi
ocsigen_http_frame.cmo : ../baselib/ocsigen_stream.cmi \
../baselib/ocsigen_lib.cmi ocsigen_cookies.cmi http_headers.cmi \
ocsigen_http_frame.cmi
Expand All @@ -51,11 +51,13 @@ ocsigen_http_frame.cmx : ../baselib/ocsigen_stream.cmx \
ocsigen_senders.cmo : ../baselib/ocsigen_stream.cmi \
../baselib/ocsigen_lib.cmi ocsigen_http_frame.cmi ocsigen_http_com.cmi \
ocsigen_cookies.cmi ../baselib/ocsigen_config.cmi \
ocsigen_charset_mime.cmi http_headers.cmi ocsigen_senders.cmi
ocsigen_charset_mime.cmi ../baselib/ocsigen_buffer_pool.cmi \
http_headers.cmi ocsigen_senders.cmi
ocsigen_senders.cmx : ../baselib/ocsigen_stream.cmx \
../baselib/ocsigen_lib.cmx ocsigen_http_frame.cmx ocsigen_http_com.cmx \
ocsigen_cookies.cmx ../baselib/ocsigen_config.cmx \
ocsigen_charset_mime.cmx http_headers.cmx ocsigen_senders.cmi
ocsigen_charset_mime.cmx ../baselib/ocsigen_buffer_pool.cmx \
http_headers.cmx ocsigen_senders.cmi
test_parser.cmo : ocsigen_http_frame.cmi http_lexer.cmo
test_parser.cmx : ocsigen_http_frame.cmx http_lexer.cmx
test_pp.cmo : ocsigen_http_frame.cmi framepp.cmi
Expand Down
35 changes: 28 additions & 7 deletions src/http/ocsigen_http_com.ml
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,19 @@ let create_receiver timeout mode fd =
let timeout =
Lwt_timeout.create
timeout
(fun () -> Lwt_ssl.abort fd Timeout)
in
(fun () -> Lwt_ssl.abort fd Timeout) in
let buffer, release = Ocsigen_buffer_pool.get_lwt_bytes buffer_size in
let buf, release2 = Ocsigen_buffer_pool.get_bytes buffer_size in
{ id = new_id ();
fd = fd;
chan =
Lwt_io.make
~mode:Lwt_io.output
~buffer:(Lwt_bytes.create buffer_size)
~close:(fun () -> Lwt_timeout.stop timeout;
release ();
release2 ();
Lwt.return_unit)
~buffer
(fun buf pos len ->
Lwt_timeout.start timeout;
Lwt.try_bind
Expand All @@ -139,7 +144,7 @@ let create_receiver timeout mode fd =
Lwt.fail (convert_io_error e)));
timeout = timeout;
r_mode = mode;
buf=Bytes.create buffer_size;
buf;
read_pos = 0;
write_pos = 0;
closed = Lwt.wait ();
Expand All @@ -156,7 +161,21 @@ let unlock_receiver receiver = Lwt_mutex.unlock receiver.read_mutex

let abort conn =
Lwt.wakeup (snd conn.closed) ();
Lwt_ssl.abort conn.fd Aborted
Lwt_ssl.abort conn.fd Aborted;
(* we discard the receive buffer so as to avoid errors in [receive] and such *)
conn.read_pos <- 0;
conn.write_pos <- 0;
(* so that we don't overwrite the buffer we no longer own by accident *)
conn.buf <- Bytes.create 8;
ignore (try Lwt_io.close conn.chan with _ -> Lwt.return_unit)

let discard conn =
(* we discard the receive buffer so as to avoid errors in [receive] and such *)
conn.read_pos <- 0;
conn.write_pos <- 0;
(* so that we don't overwrite the buffer we no longer own by accident *)
conn.buf <- Bytes.create 8;
ignore (try Lwt_io.close conn.chan with _ -> Lwt.return_unit)

let closed conn = fst conn.closed

Expand Down Expand Up @@ -651,7 +670,7 @@ let default_sender = create_sender ~server_name:Ocsigen_config.server_name ()
let write_stream_chunked out_ch stream =
let buf_size = 4096 in
let size_for_not_buffering = 900 in
let buffer = Bytes.create buf_size in
let buffer, release = Ocsigen_buffer_pool.get_bytes buf_size in
let rec aux stream len =
Ocsigen_stream.next stream >>= fun e ->
match e with
Expand All @@ -664,7 +683,9 @@ let write_stream_chunked out_ch stream =
Lwt_io.write out_ch "\r\n"
end else
Lwt.return ()) >>= fun () ->
Lwt_io.write out_ch "0\r\n\r\n"
Lwt_io.write out_ch "0\r\n\r\n" >>= fun () ->
release ();
Lwt.return_unit
| Ocsigen_stream.Cont (s, next) ->
let l = String.length s in
if l = 0 then
Expand Down
1 change: 1 addition & 0 deletions src/http/ocsigen_http_com.mli
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ val send :
unit Lwt.t

val abort : connection -> unit
val discard : connection -> unit


(** Use this function to make an action just before sending the result
Expand Down
Loading