Skip to content

Commit

Permalink
HPCC-32184 socket nonblocking improvements
Browse files Browse the repository at this point in the history
Tighten up semantics of jsocket and securesocket and their use
cases.
Specifically, this fixes issues with SSL traffic being handled
by select handlers, where previously it would tend to deadlock
reading because the underlying buffering nature of SSL meant that
either not all the data was available, or more than the amount read
was available, and no future notify event triggered more reading.

jsocket
- ensure reads all available, return if >= min
- clearup nonblocking toggle, only change if necessary
- clarify write() semantics (always writes all or throw an exception)
- add option to throw graceful close after reading min_size

jutil
- add incidental hold() utility method

mp
- remove slowclient handling, add trackPortProbe - track and trace port probes
- use a dedicated selecthandler for connections, and refactor connect
  logic, to avoid blocking
- maintain list of connecting clients, delete any that have stalled if
  timedout via a maintenance thread.
- refactor main MP packet reader handling to be non-blocking

securesocket
- ensure readtms semantics match jsocket
- correct handling of ssl error states (SSL_ERROR_WANT_WRITE, SSL_ERROR_WANT_READ,
  SSL_ERROR_SYSCALL, SSL_ERROR_ZERO_RETURN), triggering wait_read or
wait_write
- Fix securesocket with fd only issues.

dafilesrv
- rework dafsserver notifySelected loop slightly, to correctly expect/deal
with 0 data.
- Default all accepted dafilesrv sockets to non-blocking

general
- readtms - ensure consistenly waits for min_size.
- tidy up looping code in various places (outside of jsocket) that relied
on readtms 0 reading >=1 and looped instead of supplying min=max=len.
- change calls that relied on read/readtms 0 reading >=1
- add readtmsAllowClose for common case where code treats graceful close
as 0 bytes.

Signed-off-by: Jake Smith <[email protected]>
  • Loading branch information
jakesmith committed Jul 26, 2024
1 parent 13d38f8 commit 6ca6595
Show file tree
Hide file tree
Showing 15 changed files with 928 additions and 747 deletions.
23 changes: 6 additions & 17 deletions common/thorhelper/thorsoapcall.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1814,27 +1814,16 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
}
else if (curPosn >= currLen)
{ //nothing in buffer, read from socket
size32_t bytesRead=0;
count = 0;
do
{
socket->readtms(buf + count, 0, len - count, bytesRead, timeoutMS);
count += bytesRead;
} while (count != len);
socket->readtms(buf, len, len, count, timeoutMS);
currLen = curPosn = 0;
}
else
{ //only some is in buffer, read rest from socket
size32_t bytesRead=0;
size32_t avail = currLen - curPosn;
memcpy(buf, (buffer + curPosn), avail);
count = avail;
do
{
size32_t read;
socket->readtms(buf+avail+bytesRead, 0, len-avail-bytesRead, read, timeoutMS);
bytesRead += read;
} while (len != (bytesRead + avail));
size32_t bytesRead;
socket->readtms(buf+avail, len-avail, len-avail, bytesRead, timeoutMS);
count += bytesRead;
currLen = curPosn = 0;
}
Expand Down Expand Up @@ -2096,7 +2085,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
do {
checkTimeLimitExceeded(&remainingMS);
checkRoxieAbortMonitor(master->roxieAbortMonitor);
socket->readtms(buffer+read, 0, WSCBUFFERSIZE-read, bytesRead, MIN(master->timeoutMS,remainingMS));
readtmsAllowClose(socket, buffer+read, 1, WSCBUFFERSIZE-read, bytesRead, MIN(master->timeoutMS, remainingMS));
checkTimeLimitExceeded(&remainingMS);
checkRoxieAbortMonitor(master->roxieAbortMonitor);

Expand Down Expand Up @@ -2216,7 +2205,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
while (read<payloadsize) {
checkTimeLimitExceeded(&remainingMS);
checkRoxieAbortMonitor(master->roxieAbortMonitor);
socket->readtms(response.reserve(payloadsize-read), 0, payloadsize-read, bytesRead, MIN(master->timeoutMS,remainingMS));
readtmsAllowClose(socket, response.reserve(payloadsize-read), 1, payloadsize-read, bytesRead, MIN(master->timeoutMS, remainingMS));
checkTimeLimitExceeded(&remainingMS);
checkRoxieAbortMonitor(master->roxieAbortMonitor);

Expand All @@ -2232,7 +2221,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
for (;;) {
checkTimeLimitExceeded(&remainingMS);
checkRoxieAbortMonitor(master->roxieAbortMonitor);
socket->readtms(buffer, 0, WSCBUFFERSIZE, bytesRead, MIN(master->timeoutMS,remainingMS));
readtmsAllowClose(socket, buffer, 1, WSCBUFFERSIZE, bytesRead, MIN(master->timeoutMS, remainingMS));
checkTimeLimitExceeded(&remainingMS);
checkRoxieAbortMonitor(master->roxieAbortMonitor);

Expand Down
2 changes: 1 addition & 1 deletion esp/test/httptest/httptest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ int HttpClient::sendRequest(int times, HttpStat& stat, StringBuffer& req)
unsigned int sizeread;
do
{
socket->read(tmpbuf, 0, 256, sizeread);
socket->read(tmpbuf, 1, 256, sizeread);
}
while(sizeread > 0);

Expand Down
4 changes: 2 additions & 2 deletions esp/tools/soapplus/httpproxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ class CSocksProxyThread : public Thread
ip.getHostText(ipstr);

char inbuf2[16];
m_client->read(inbuf2, 0, 16, lenread);
readtmsAllowClose(m_client, inbuf2, 1, 16, lenread, WAIT_FOREVER);
StringBuffer username;
while(lenread > 0)
{
Expand All @@ -694,7 +694,7 @@ class CSocksProxyThread : public Thread
if (done)
break;
username.append(lenread, inbuf2);
m_client->read(inbuf2, 0, 16, lenread);
readtmsAllowClose(m_client, inbuf2, 1, 16, lenread, WAIT_FOREVER);
}

if(http_tracelevel >= 5)
Expand Down
128 changes: 72 additions & 56 deletions fs/dafsserver/dafsserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2934,68 +2934,87 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface
touch();
try
{
if (!gotSize)
while (true)
{
// left represents amount we have read of leading size32_t (normally expect to be read in 1 go)
if (0 == msg.length()) // 1st time
msgWritePtr = (byte *)msg.reserveTruncate(sizeof(size32_t));
size32_t szRead;
sock->read(msgWritePtr, 1, sizeof(size32_t)-left, szRead);
left += szRead;
msgWritePtr += szRead;
if (left == sizeof(size32_t))
if (!gotSize)
{
gotSize = true;
msg.read(left);
msg.clear();
try
// left represents amount we have read of leading size32_t (normally expect to be read in 1 go)
if (0 == msg.length()) // 1st time
msgWritePtr = (byte *)msg.reserveTruncate(sizeof(size32_t));
size32_t szRead;
sock->read(msgWritePtr, 0, sizeof(size32_t)-left, szRead, WAIT_FOREVER, false);

left += szRead;
msgWritePtr += szRead;
if (left == sizeof(size32_t)) // if not, we exit, and rely on next notifySelected
{
msgWritePtr = (byte *)msg.reserveTruncate(left);
}
catch (IException *e)
{
EXCLOG(e,"notifySelected(1)");
e->Release();
left = 0;
// if too big then suggest corrupted packet, try to consume
// JCSMORE this seems a bit pointless, and it used to only read last 'avail',
// which is not necessarily everything that was sent
char fbuf[1024];
while (true)
gotSize = true;
msg.read(left);
msg.clear();
try
{
try
{
size32_t szRead;
sock->read(fbuf, 1, 1024, szRead);
}
catch (IException *e)
msgWritePtr = (byte *)msg.reserveTruncate(left);
}
catch (IException *e)
{
EXCLOG(e,"notifySelected(1)");
e->Release();
left = 0;
// if too big then suggest corrupted packet, try to consume
// JCSMORE this seems a bit pointless, and it used to only read last 'avail',
// which is not necessarily everything that was sent
char fbuf[1024];
while (true)
{
EXCLOG(e,"notifySelected(2)");
e->Release();
break;
try
{
size32_t szRead;
sock->read(fbuf, 0, 1024, szRead, WAIT_FOREVER, true);
}
catch (IException *e)
{
EXCLOG(e,"notifySelected(2)");
e->Release();
break;
}
}
}
if (0 == left)
{
gotSize = false;
msg.clear();
parent->onCloseSocket(this, 5);
return true;
}
}
if (0 == left)
{
gotSize = false;
msg.clear();
parent->onCloseSocket(this, 5);
return true;
}
else
break; // wait for rest via subsequent notifySelected's
}
}
else // left represents length of message remaining to receive
{
size32_t szRead;
sock->read(msgWritePtr, 1, left, szRead);
msgWritePtr += szRead;
left -= szRead;
if (0 == left) // NB: only ever here if original size was >0
bool gc = false;
if (gotSize) // left represents length of message remaining to receive
{
gotSize = false; // reset for next packet
parent->handleCompleteMessage(this, msg); // consumes msg
size32_t szRead;
gc = readtmsAllowClose(sock, msgWritePtr, 0, left, szRead, WAIT_FOREVER);
msgWritePtr += szRead;
left -= szRead;
if (0 == left) // NB: only ever here if original size was >0
{
gotSize = false; // reset for next packet
parent->handleCompleteMessage(this, msg); // consumes msg
if (gc)
THROWJSOCKEXCEPTION(JSOCKERR_graceful_close);
}
else
{
if (gc)
THROWJSOCKEXCEPTION(JSOCKERR_graceful_close);
break; // wait for rest via subsequent notifySelected's
}
}
else if (gc)
THROWJSOCKEXCEPTION(JSOCKERR_graceful_close);
// to be here, implies handled full message, loop around to see if more on the wire.
// will break out if nothing/partial.
}
}
catch (IJSOCK_Exception *e)
Expand Down Expand Up @@ -5620,7 +5639,7 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface
addClient(sockSSL.getClear(), true, false);
}

if (rowServiceSockAvail)
if (!isContainerized() && rowServiceSockAvail) // in contaierized each service is on a single dedicated port, the below 2 cases are for BM only
{
#ifdef _DEBUG
acceptedRSSock->getPeerEndpoint(eps);
Expand Down Expand Up @@ -5659,10 +5678,7 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface
clients.append(*client.getLink());
}
// JCSMORE - perhaps cap # added here... ?
unsigned mode = SELECTMODE_READ;
if (secure)
mode |= SELECTMODE_WRITE;
selecthandler->add(sock, mode, client);
selecthandler->add(sock, SELECTMODE_READ, client);
}

void stop()
Expand Down
8 changes: 4 additions & 4 deletions roxie/udplib/udpsha.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -986,14 +986,14 @@ void CSimulatedQueueReadSocket::writeOwnSimulatedPacket(void const* buf, size32_
avail.signal();
}

void CSimulatedQueueReadSocket::read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeoutsecs)
void CSimulatedQueueReadSocket::read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeoutsecs, bool suppresGCIfMinSize)
{
unsigned tms = timeoutsecs == WAIT_FOREVER ? WAIT_FOREVER : timeoutsecs * 1000;
readtms(buf, min_size, max_size, size_read, tms);
}

void CSimulatedQueueReadSocket::readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,
unsigned timeout)
unsigned timeout, bool suppresGCIfMinSize)
{
size_read = 0;
if (!timeout || wait_read(timeout))
Expand Down Expand Up @@ -1054,11 +1054,11 @@ CSimulatedUdpReadSocket::~CSimulatedUdpReadSocket()

size32_t CSimulatedUdpReadSocket::get_receive_buffer_size() { return realSocket->get_receive_buffer_size(); }
void CSimulatedUdpReadSocket::set_receive_buffer_size(size32_t sz) { realSocket->set_receive_buffer_size(sz); }
void CSimulatedUdpReadSocket::read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeoutsecs)
void CSimulatedUdpReadSocket::read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeoutsecs, bool suppresGCIfMinSize)
{
realSocket->read(buf, min_size, max_size, size_read, timeoutsecs);
}
void CSimulatedUdpReadSocket::readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeout)
void CSimulatedUdpReadSocket::readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeout, bool suppresGCIfMinSize)
{
realSocket->readtms(buf, min_size, max_size, size_read, timeout);
}
Expand Down
12 changes: 6 additions & 6 deletions roxie/udplib/udpsha.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,9 @@ class CSocketSimulator : public CInterfaceOf<ISocket>
{
private:
virtual void read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,
unsigned timeoutsecs = WAIT_FOREVER) override { UNIMPLEMENTED; }
unsigned timeoutsecs = WAIT_FOREVER, bool suppresGCIfMinSize = true) override { UNIMPLEMENTED; }
virtual void readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,
unsigned timeout) override { UNIMPLEMENTED; }
unsigned timeout, bool suppresGCIfMinSize = true) override { UNIMPLEMENTED; }
virtual void read(void* buf, size32_t size) override { UNIMPLEMENTED; }
virtual size32_t write(void const* buf, size32_t size) override { UNIMPLEMENTED; }
virtual size32_t writetms(void const* buf, size32_t minSize, size32_t size, unsigned timeoutms=WAIT_FOREVER) override { UNIMPLEMENTED; }
Expand Down Expand Up @@ -442,9 +442,9 @@ class CSimulatedQueueReadSocket : public CSocketSimulator
virtual size32_t get_receive_buffer_size() override { return max; }
virtual void set_receive_buffer_size(size32_t sz) override { max = sz; }
virtual void read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,
unsigned timeoutsecs = WAIT_FOREVER) override;
unsigned timeoutsecs = WAIT_FOREVER, bool suppresGCIfMinSize = true) override;
virtual void readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,
unsigned timeout) override;
unsigned timeout, bool suppresGCIfMinSize = true) override;
virtual int wait_read(unsigned timeout) override;
virtual void close() override {}
virtual void shutdown(unsigned mode) override { }
Expand Down Expand Up @@ -491,8 +491,8 @@ class CSimulatedUdpReadSocket : public CSimulatedUdpSocket

virtual size32_t get_receive_buffer_size() override;
virtual void set_receive_buffer_size(size32_t sz) override;
virtual void read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeoutsecs = WAIT_FOREVER) override;
virtual void readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeout) override;
virtual void read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeoutsecs = WAIT_FOREVER, bool suppresGCIfMinSize = true) override;
virtual void readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeout, bool suppresGCIfMinSize = true) override;
virtual int wait_read(unsigned timeout) override;
virtual void close() override;

Expand Down
7 changes: 3 additions & 4 deletions system/jlib/jbsocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ int BufferedSocket::readline(char* buf, int maxlen, bool keepcrlf, IMultiExcepti
m_curptr = 0;
m_endptr = 0;
unsigned readlen;
m_socket->read(m_buf, 0, BSOCKET_BUFSIZE, readlen, m_timeout);
readtmsAllowClose(m_socket, m_buf, 1, BSOCKET_BUFSIZE, readlen, m_timeout*1000);
if(readlen > 0)
{
m_endptr = readlen;
Expand All @@ -111,7 +111,6 @@ int BufferedSocket::readline(char* buf, int maxlen, bool keepcrlf, IMultiExcepti
buf[ptr++] = '\n';
}
}

}
break;
}
Expand All @@ -136,7 +135,7 @@ int BufferedSocket::readline(char* buf, int maxlen, bool keepcrlf, IMultiExcepti
m_curptr = 0;
m_endptr = 0;
unsigned readlen;
m_socket->read(m_buf, 0, BSOCKET_BUFSIZE, readlen, m_timeout);
readtmsAllowClose(m_socket, m_buf, 1, BSOCKET_BUFSIZE, readlen, m_timeout*1000);
if(readlen <= 0)
break;
m_endptr = readlen;
Expand Down Expand Up @@ -218,7 +217,7 @@ int BufferedSocket::read(char* buf, int maxlen)
unsigned readlen;
try
{
m_socket->read(m_buf, 0, BSOCKET_BUFSIZE, readlen, m_timeout);
m_socket->read(m_buf, 1, BSOCKET_BUFSIZE, readlen, m_timeout);
}
catch (IException *e)
{
Expand Down
2 changes: 1 addition & 1 deletion system/jlib/jfile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6490,7 +6490,7 @@ class CSocketSerialStream: public CSerialStreamBase
if (lastpos!=pos)
throw MakeStringException(-1,"CSocketSerialStream: non-sequential read (%" I64F "d,%" I64F "d)",lastpos,pos);
size32_t size_read;
socket->readtms(ptr, 0, max_size, size_read, timeout);
readtmsAllowClose(socket, ptr, 1, max_size, size_read, timeout);
lastpos = pos+size_read;
return size_read;
}
Expand Down
Loading

0 comments on commit 6ca6595

Please sign in to comment.