From b3523f11ff4fd143c80a4c7210636a56dc818d8a Mon Sep 17 00:00:00 2001 From: Jacob Quinn Date: Tue, 24 Sep 2024 00:27:32 -0600 Subject: [PATCH] Simplify the backpressure mechanisms a bit --- src/sockets/client.jl | 51 +++++++------------------------------------ 1 file changed, 8 insertions(+), 43 deletions(-) diff --git a/src/sockets/client.jl b/src/sockets/client.jl index aee9aab..a19a677 100644 --- a/src/sockets/client.jl +++ b/src/sockets/client.jl @@ -42,11 +42,6 @@ mutable struct Client slot::Ptr{aws_channel_slot} ch::Channel{Symbol} readbuf::Base.BufferStream - # we use this atomic field to track how many bytes are available in the read buffer - # we need to access this from the c_process_read_message callback in order to adjust - # the read window appropriately; we also access it from any read functions on Client - # to determine if we need to increment the read window - @atomic readbuf_bytesavailable::Int writelock::ReentrantLock writebuf::IOBuffer bootstrap::aws_socket_channel_bootstrap_options @@ -102,7 +97,7 @@ mutable struct Client ssl_alpn_list ) end - x = new(host, port, debug, tls, socket_options, tls_options, buffer_capacity, C_NULL, C_NULL, Channel{Symbol}(0), Base.BufferStream(), 0, ReentrantLock(), PipeBuffer()) + x = new(host, port, debug, tls, socket_options, tls_options, buffer_capacity, C_NULL, C_NULL, Channel{Symbol}(0), Base.BufferStream(), ReentrantLock(), PipeBuffer()) GC.@preserve x begin x.bootstrap = aws_socket_channel_bootstrap_options( client_bootstrap, @@ -136,15 +131,6 @@ function c_process_read_message(handler, slot, messageptr)::Cint unsafe_write(sock.readbuf, data.buffer, data.len) ret = AWS_OP_SUCCESS aws_mem_release(msg.allocator, messageptr) - # update the atomic bytesavailable field - rb_ba = @atomic sock.readbuf_bytesavailable += Int(data.len) - if rb_ba >= sock.buffer_capacity - sock.debug && @info "Buffer is full; not incrementing read window" - # Do not increment the read window - else - # Continue reading by incrementing the read window - aws_channel_slot_increment_read_window(slot, data.len) - end catch e close(sock.ch, sockerr(e)) end @@ -178,20 +164,6 @@ function c_initial_window_size(handler)::Csize_t end end -function maybe_increment_read_window(sock::Client) - # Check if the read window was previously stopped - if sock.read_window_stopped - # Calculate how much space is available in the buffer - available_space = sock.buffer_capacity - bytesavailable(sock.readbuf) - if available_space >= sock.min_read_increment - sock.debug && @info "Buffer has space; incrementing read window by $available_space bytes" - # Increment the read window - aws_channel_slot_increment_read_window(sock.slot, available_space) - sock.read_window_stopped = false - end - end -end - function c_message_overhead(channel_handler)::Csize_t return 0 end @@ -346,43 +318,37 @@ end function Base.read(sock::Client) buf = read(sock.readbuf) - rb_ba = @atomic sock.readbuf_bytesavailable -= length(buf) - maybe_increment_read_window(sock, rb_ba) + maybe_increment_read_window(sock, bytesavailable(sock.readbuf)) return buf end function Base.read!(sock::Client, buf::Vector{UInt8}) n = read!(sock.readbuf, buf) - rb_ba = @atomic sock.readbuf_bytesavailable -= n - maybe_increment_read_window(sock, rb_ba) + maybe_increment_read_window(sock, bytesavailable(sock.readbuf)) return n end function Base.read(sock::Client, ::Type{T}) where {T} x = read(sock.readbuf, T) - rb_ba = @atomic sock.readbuf_bytesavailable -= sizeof(T) - maybe_increment_read_window(sock, rb_ba) + maybe_increment_read_window(sock, bytesavailable(sock.readbuf)) return x end function Base.read(sock::Client, n::Integer) buf = read(sock.readbuf, n) - rb_ba = @atomic sock.readbuf_bytesavailable -= length(buf) - maybe_increment_read_window(sock, rb_ba) + maybe_increment_read_window(sock, bytesavailable(sock.readbuf)) return buf end function Base.unsafe_read(sock::Client, ptr::Ptr{UInt8}, n::Integer) unsafe_read(sock.readbuf, ptr, n) - rb_ba = @atomic sock.readbuf_bytesavailable -= n - maybe_increment_read_window(sock, rb_ba) + maybe_increment_read_window(sock, bytesavailable(sock.readbuf)) return end function Base.skip(sock::Client, n) ret = skip(sock.readbuf, n) - rb_ba = @atomic sock.readbuf_bytesavailable -= n - maybe_increment_read_window(sock, rb_ba) + maybe_increment_read_window(sock, bytesavailable(sock.readbuf)) return ret end @@ -392,8 +358,7 @@ Base.isopen(sock::Client) = sock.slot == C_NULL ? false : aws_socket_is_open(aws function Base.readbytes!(sock::Client, buf::AbstractVector{UInt8}, nb=length(buf)) act = readbytes!(sock.readbuf, buf, nb) - rb_ba = @atomic sock.readbuf_bytesavailable -= act - maybe_increment_read_window(sock, rb_ba) + maybe_increment_read_window(sock, bytesavailable(sock.readbuf)) return act end