Skip to content

Commit

Permalink
Simplify the backpressure mechanisms a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
quinnj committed Sep 24, 2024
1 parent 80bd12f commit b3523f1
Showing 1 changed file with 8 additions and 43 deletions.
51 changes: 8 additions & 43 deletions src/sockets/client.jl
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ mutable struct Client
slot::Ptr{aws_channel_slot}
ch::Channel{Symbol}
readbuf::Base.BufferStream
# we use this atomic field to track how many bytes are available in the read buffer
# we need to access this from the c_process_read_message callback in order to adjust
# the read window appropriately; we also access it from any read functions on Client
# to determine if we need to increment the read window
@atomic readbuf_bytesavailable::Int
writelock::ReentrantLock
writebuf::IOBuffer
bootstrap::aws_socket_channel_bootstrap_options
Expand Down Expand Up @@ -102,7 +97,7 @@ mutable struct Client
ssl_alpn_list
)
end
x = new(host, port, debug, tls, socket_options, tls_options, buffer_capacity, C_NULL, C_NULL, Channel{Symbol}(0), Base.BufferStream(), 0, ReentrantLock(), PipeBuffer())
x = new(host, port, debug, tls, socket_options, tls_options, buffer_capacity, C_NULL, C_NULL, Channel{Symbol}(0), Base.BufferStream(), ReentrantLock(), PipeBuffer())
GC.@preserve x begin
x.bootstrap = aws_socket_channel_bootstrap_options(
client_bootstrap,
Expand Down Expand Up @@ -136,15 +131,6 @@ function c_process_read_message(handler, slot, messageptr)::Cint
unsafe_write(sock.readbuf, data.buffer, data.len)
ret = AWS_OP_SUCCESS
aws_mem_release(msg.allocator, messageptr)
# update the atomic bytesavailable field
rb_ba = @atomic sock.readbuf_bytesavailable += Int(data.len)
if rb_ba >= sock.buffer_capacity
sock.debug && @info "Buffer is full; not incrementing read window"
# Do not increment the read window
else
# Continue reading by incrementing the read window
aws_channel_slot_increment_read_window(slot, data.len)
end
catch e
close(sock.ch, sockerr(e))
end
Expand Down Expand Up @@ -178,20 +164,6 @@ function c_initial_window_size(handler)::Csize_t
end
end

function maybe_increment_read_window(sock::Client)
# Check if the read window was previously stopped
if sock.read_window_stopped
# Calculate how much space is available in the buffer
available_space = sock.buffer_capacity - bytesavailable(sock.readbuf)
if available_space >= sock.min_read_increment
sock.debug && @info "Buffer has space; incrementing read window by $available_space bytes"
# Increment the read window
aws_channel_slot_increment_read_window(sock.slot, available_space)
sock.read_window_stopped = false
end
end
end

function c_message_overhead(channel_handler)::Csize_t
return 0
end
Expand Down Expand Up @@ -346,43 +318,37 @@ end

function Base.read(sock::Client)
buf = read(sock.readbuf)
rb_ba = @atomic sock.readbuf_bytesavailable -= length(buf)
maybe_increment_read_window(sock, rb_ba)
maybe_increment_read_window(sock, bytesavailable(sock.readbuf))
return buf
end

function Base.read!(sock::Client, buf::Vector{UInt8})
n = read!(sock.readbuf, buf)
rb_ba = @atomic sock.readbuf_bytesavailable -= n
maybe_increment_read_window(sock, rb_ba)
maybe_increment_read_window(sock, bytesavailable(sock.readbuf))
return n
end

function Base.read(sock::Client, ::Type{T}) where {T}
x = read(sock.readbuf, T)
rb_ba = @atomic sock.readbuf_bytesavailable -= sizeof(T)
maybe_increment_read_window(sock, rb_ba)
maybe_increment_read_window(sock, bytesavailable(sock.readbuf))
return x
end

function Base.read(sock::Client, n::Integer)
buf = read(sock.readbuf, n)
rb_ba = @atomic sock.readbuf_bytesavailable -= length(buf)
maybe_increment_read_window(sock, rb_ba)
maybe_increment_read_window(sock, bytesavailable(sock.readbuf))
return buf
end

function Base.unsafe_read(sock::Client, ptr::Ptr{UInt8}, n::Integer)
unsafe_read(sock.readbuf, ptr, n)
rb_ba = @atomic sock.readbuf_bytesavailable -= n
maybe_increment_read_window(sock, rb_ba)
maybe_increment_read_window(sock, bytesavailable(sock.readbuf))
return
end

function Base.skip(sock::Client, n)
ret = skip(sock.readbuf, n)
rb_ba = @atomic sock.readbuf_bytesavailable -= n
maybe_increment_read_window(sock, rb_ba)
maybe_increment_read_window(sock, bytesavailable(sock.readbuf))
return ret
end

Expand All @@ -392,8 +358,7 @@ Base.isopen(sock::Client) = sock.slot == C_NULL ? false : aws_socket_is_open(aws

function Base.readbytes!(sock::Client, buf::AbstractVector{UInt8}, nb=length(buf))
act = readbytes!(sock.readbuf, buf, nb)
rb_ba = @atomic sock.readbuf_bytesavailable -= act
maybe_increment_read_window(sock, rb_ba)
maybe_increment_read_window(sock, bytesavailable(sock.readbuf))
return act
end

Expand Down

0 comments on commit b3523f1

Please sign in to comment.