Skip to content

Commit

Permalink
HPCC-32184 socket nonblocking improvements
Browse files Browse the repository at this point in the history
jsocket
- ensure reads all available, return if >= min
- clearup nonblocking toggle, only change if necessary
- clarify write() semantics (always writes all or throw an exception)
- add option to throw graceful close after reading min_size

jutil
- add incidental hold() utility method

mp
- remove slowclient handling, add trackPortProbe - track and trace port probes
- use a dedicated selecthandler for connections, and refactor connect
  logic, to avoid blocking
- maintain list of connecting clients, delete any that have stalled if
  timedout via a maintenance thread.
- refactor main MP packet reader handling to be non-blocking

securesocket
- ensure readtms semantics match jsocket
- correct handling of ssl error states (SSL_ERROR_WANT_WRITE, SSL_ERROR_WANT_READ,
  SSL_ERROR_SYSCALL, SSL_ERROR_ZERO_RETURN), triggering wait_read or
wait_write

dafilesrv
- rework dafsserver notifySelected loop slightly, to correctly expect/deal
with 0 data.
- Default all accepted dafilesrv sockets to non-blocking

general
- readtms - ensure consistenly waits for min_size.
- tidy up looping code in various places (outside of jsocket) that relied
on readtms 0 reading >=1 and looped instead of supplying min=max=len.
- change calls that relied on read/readtms 0 reading >=1
- add readtmsAllowClose for common case where code treats graceful close
as 0 bytes.

Signed-off-by: Jake Smith <[email protected]>
  • Loading branch information
jakesmith committed Jun 28, 2024
1 parent 3f6374e commit 6e8a360
Show file tree
Hide file tree
Showing 15 changed files with 794 additions and 661 deletions.
23 changes: 6 additions & 17 deletions common/thorhelper/thorsoapcall.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1814,27 +1814,16 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
}
else if (curPosn >= currLen)
{ //nothing in buffer, read from socket
size32_t bytesRead=0;
count = 0;
do
{
socket->readtms(buf + count, 0, len - count, bytesRead, timeoutMS);
count += bytesRead;
} while (count != len);
socket->readtms(buf, len, len, count, timeoutMS);
currLen = curPosn = 0;
}
else
{ //only some is in buffer, read rest from socket
size32_t bytesRead=0;
size32_t avail = currLen - curPosn;
memcpy(buf, (buffer + curPosn), avail);
count = avail;
do
{
size32_t read;
socket->readtms(buf+avail+bytesRead, 0, len-avail-bytesRead, read, timeoutMS);
bytesRead += read;
} while (len != (bytesRead + avail));
size32_t bytesRead;
socket->readtms(buf+avail, len-avail, len-avail, bytesRead, timeoutMS);
count += bytesRead;
currLen = curPosn = 0;
}
Expand Down Expand Up @@ -2096,7 +2085,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
do {
checkTimeLimitExceeded(&remainingMS);
checkRoxieAbortMonitor(master->roxieAbortMonitor);
socket->readtms(buffer+read, 0, WSCBUFFERSIZE-read, bytesRead, MIN(master->timeoutMS,remainingMS));
readtmsAllowClose(socket, buffer+read, 1, WSCBUFFERSIZE-read, bytesRead, MIN(master->timeoutMS, remainingMS));
checkTimeLimitExceeded(&remainingMS);
checkRoxieAbortMonitor(master->roxieAbortMonitor);

Expand Down Expand Up @@ -2216,7 +2205,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
while (read<payloadsize) {
checkTimeLimitExceeded(&remainingMS);
checkRoxieAbortMonitor(master->roxieAbortMonitor);
socket->readtms(response.reserve(payloadsize-read), 0, payloadsize-read, bytesRead, MIN(master->timeoutMS,remainingMS));
readtmsAllowClose(socket, response.reserve(payloadsize-read), 1, payloadsize-read, bytesRead, MIN(master->timeoutMS, remainingMS));
checkTimeLimitExceeded(&remainingMS);
checkRoxieAbortMonitor(master->roxieAbortMonitor);

Expand All @@ -2232,7 +2221,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
for (;;) {
checkTimeLimitExceeded(&remainingMS);
checkRoxieAbortMonitor(master->roxieAbortMonitor);
socket->readtms(buffer, 0, WSCBUFFERSIZE, bytesRead, MIN(master->timeoutMS,remainingMS));
readtmsAllowClose(socket, buffer, 1, WSCBUFFERSIZE, bytesRead, MIN(master->timeoutMS, remainingMS));
checkTimeLimitExceeded(&remainingMS);
checkRoxieAbortMonitor(master->roxieAbortMonitor);

Expand Down
2 changes: 1 addition & 1 deletion esp/test/httptest/httptest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ int HttpClient::sendRequest(int times, HttpStat& stat, StringBuffer& req)
unsigned int sizeread;
do
{
socket->read(tmpbuf, 0, 256, sizeread);
socket->read(tmpbuf, 1, 256, sizeread);
}
while(sizeread > 0);

Expand Down
4 changes: 2 additions & 2 deletions esp/tools/soapplus/httpproxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ class CSocksProxyThread : public Thread
ip.getHostText(ipstr);

char inbuf2[16];
m_client->read(inbuf2, 0, 16, lenread);
readtmsAllowClose(m_client, inbuf2, 1, 16, lenread, WAIT_FOREVER);
StringBuffer username;
while(lenread > 0)
{
Expand All @@ -694,7 +694,7 @@ class CSocksProxyThread : public Thread
if (done)
break;
username.append(lenread, inbuf2);
m_client->read(inbuf2, 0, 16, lenread);
readtmsAllowClose(m_client, inbuf2, 1, 16, lenread, WAIT_FOREVER);
}

if(http_tracelevel >= 5)
Expand Down
148 changes: 91 additions & 57 deletions fs/dafsserver/dafsserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2924,6 +2924,21 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface
e->Release();
}
}
bool readtmsCheckGC(ISocket *sock, void *dst, size32_t minSize, size32_t maxSize, size32_t &szRead, unsigned timeout)
{
try
{
sock->read(dst, minSize, maxSize, szRead, WAIT_FOREVER, true);
}
catch (IJSOCK_Exception *e)
{
if (e->errorCode() != JSOCKERR_graceful_close)
throw;
e->Release();
return true;
}
return false;
}
bool isRowServiceClient() const { return calledByRowService; }
bool notifySelected(ISocket *sock, unsigned selected)
{
Expand All @@ -2934,68 +2949,87 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface
touch();
try
{
if (!gotSize)
while (true)
{
// left represents amount we have read of leading size32_t (normally expect to be read in 1 go)
if (0 == msg.length()) // 1st time
msgWritePtr = (byte *)msg.reserveTruncate(sizeof(size32_t));
size32_t szRead;
sock->read(msgWritePtr, 1, sizeof(size32_t)-left, szRead);
left += szRead;
msgWritePtr += szRead;
if (left == sizeof(size32_t))
if (!gotSize)
{
gotSize = true;
msg.read(left);
msg.clear();
try
{
msgWritePtr = (byte *)msg.reserveTruncate(left);
}
catch (IException *e)
// left represents amount we have read of leading size32_t (normally expect to be read in 1 go)
if (0 == msg.length()) // 1st time
msgWritePtr = (byte *)msg.reserveTruncate(sizeof(size32_t));
size32_t szRead;
if (readtmsCheckGC(sock, msgWritePtr, 0, sizeof(size32_t)-left, szRead, WAIT_FOREVER))
THROWJSOCKEXCEPTION(JSOCKERR_graceful_close);
left += szRead;
msgWritePtr += szRead;
if (left == sizeof(size32_t)) // if not, we exit, and rely on next notifySelected
{
EXCLOG(e,"notifySelected(1)");
e->Release();
left = 0;
// if too big then suggest corrupted packet, try to consume
// JCSMORE this seems a bit pointless, and it used to only read last 'avail',
// which is not necessarily everything that was sent
char fbuf[1024];
while (true)
gotSize = true;
msg.read(left);
msg.clear();
try
{
try
{
size32_t szRead;
sock->read(fbuf, 1, 1024, szRead);
}
catch (IException *e)
msgWritePtr = (byte *)msg.reserveTruncate(left);
}
catch (IException *e)
{
EXCLOG(e,"notifySelected(1)");
e->Release();
left = 0;
// if too big then suggest corrupted packet, try to consume
// JCSMORE this seems a bit pointless, and it used to only read last 'avail',
// which is not necessarily everything that was sent
char fbuf[1024];
while (true)
{
EXCLOG(e,"notifySelected(2)");
e->Release();
break;
try
{
size32_t szRead;
sock->read(fbuf, 0, 1024, szRead, WAIT_FOREVER, true);
}
catch (IException *e)
{
EXCLOG(e,"notifySelected(2)");
e->Release();
break;
}
}
}
if (0 == left)
{
gotSize = false;
msg.clear();
parent->onCloseSocket(this, 5);
return true;
}
}
if (0 == left)
{
gotSize = false;
msg.clear();
parent->onCloseSocket(this, 5);
return true;
}
else
break; // wait for rest via subsequent notifySelected's
}
}
else // left represents length of message remaining to receive
{
size32_t szRead;
sock->read(msgWritePtr, 1, left, szRead);
msgWritePtr += szRead;
left -= szRead;
if (0 == left) // NB: only ever here if original size was >0
bool gc = false;
if (gotSize) // left represents length of message remaining to receive
{
gotSize = false; // reset for next packet
parent->handleCompleteMessage(this, msg); // consumes msg
size32_t szRead;
gc = readtmsCheckGC(sock, msgWritePtr, 0, left, szRead, WAIT_FOREVER);
msgWritePtr += szRead;
left -= szRead;
if (0 == left) // NB: only ever here if original size was >0
{
gotSize = false; // reset for next packet
parent->handleCompleteMessage(this, msg); // consumes msg
if (gc)
THROWJSOCKEXCEPTION(JSOCKERR_graceful_close);
}
else
{
if (gc)
THROWJSOCKEXCEPTION(JSOCKERR_graceful_close);
break; // wait for rest via subsequent notifySelected's
}
}
else if (gc)
THROWJSOCKEXCEPTION(JSOCKERR_graceful_close);
// to be here, implies handled full message, loop around to see if more on the wire.
// will break out if nothing/partial.
}
}
catch (IJSOCK_Exception *e)
Expand Down Expand Up @@ -5607,6 +5641,7 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface
eps.getEndpointHostText(peerURL);
PROGLOG("Server accepting from %s", peerURL.str());
#endif
sock->set_nonblock(true);
addClient(sock.getClear(), false, false);
}

Expand All @@ -5617,17 +5652,19 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface
eps.getEndpointHostText(peerURL.clear());
PROGLOG("Server accepting SECURE from %s", peerURL.str());
#endif
sockSSL->set_nonblock(true);
addClient(sockSSL.getClear(), true, false);
}

if (rowServiceSockAvail)
if (!isContainerized() && rowServiceSockAvail) // in contaierized each service is on a single dedicated port, the below 2 cases are for BM only
{
#ifdef _DEBUG
acceptedRSSock->getPeerEndpoint(eps);
eps.getEndpointHostText(peerURL.clear());
PROGLOG("Server accepting row service socket from %s", peerURL.str());
#endif
addClient(acceptedRSSock.getClear(), true, true);
acceptedRSSock->set_nonblock(true);
addClient(acceptedRSSock.getClear(), rowServiceSSL, true);
}
}
else
Expand Down Expand Up @@ -5659,10 +5696,7 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface
clients.append(*client.getLink());
}
// JCSMORE - perhaps cap # added here... ?
unsigned mode = SELECTMODE_READ;
if (secure)
mode |= SELECTMODE_WRITE;
selecthandler->add(sock, mode, client);
selecthandler->add(sock, SELECTMODE_READ, client);
}

void stop()
Expand Down
8 changes: 4 additions & 4 deletions roxie/udplib/udpsha.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -986,14 +986,14 @@ void CSimulatedQueueReadSocket::writeOwnSimulatedPacket(void const* buf, size32_
avail.signal();
}

void CSimulatedQueueReadSocket::read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeoutsecs)
void CSimulatedQueueReadSocket::read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeoutsecs, bool suppresGCIfMinSize)
{
unsigned tms = timeoutsecs == WAIT_FOREVER ? WAIT_FOREVER : timeoutsecs * 1000;
readtms(buf, min_size, max_size, size_read, tms);
}

void CSimulatedQueueReadSocket::readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,
unsigned timeout)
unsigned timeout, bool suppresGCIfMinSize)
{
size_read = 0;
if (!timeout || wait_read(timeout))
Expand Down Expand Up @@ -1054,11 +1054,11 @@ CSimulatedUdpReadSocket::~CSimulatedUdpReadSocket()

size32_t CSimulatedUdpReadSocket::get_receive_buffer_size() { return realSocket->get_receive_buffer_size(); }
void CSimulatedUdpReadSocket::set_receive_buffer_size(size32_t sz) { realSocket->set_receive_buffer_size(sz); }
void CSimulatedUdpReadSocket::read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeoutsecs)
void CSimulatedUdpReadSocket::read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeoutsecs, bool suppresGCIfMinSize)
{
realSocket->read(buf, min_size, max_size, size_read, timeoutsecs);
}
void CSimulatedUdpReadSocket::readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeout)
void CSimulatedUdpReadSocket::readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeout, bool suppresGCIfMinSize)
{
realSocket->readtms(buf, min_size, max_size, size_read, timeout);
}
Expand Down
12 changes: 6 additions & 6 deletions roxie/udplib/udpsha.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,9 @@ class CSocketSimulator : public CInterfaceOf<ISocket>
{
private:
virtual void read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,
unsigned timeoutsecs = WAIT_FOREVER) override { UNIMPLEMENTED; }
unsigned timeoutsecs = WAIT_FOREVER, bool suppresGCIfMinSize = true) override { UNIMPLEMENTED; }
virtual void readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,
unsigned timeout) override { UNIMPLEMENTED; }
unsigned timeout, bool suppresGCIfMinSize = true) override { UNIMPLEMENTED; }
virtual void read(void* buf, size32_t size) override { UNIMPLEMENTED; }
virtual size32_t write(void const* buf, size32_t size) override { UNIMPLEMENTED; }
virtual size32_t writetms(void const* buf, size32_t minSize, size32_t size, unsigned timeoutms=WAIT_FOREVER) override { UNIMPLEMENTED; }
Expand Down Expand Up @@ -442,9 +442,9 @@ class CSimulatedQueueReadSocket : public CSocketSimulator
virtual size32_t get_receive_buffer_size() override { return max; }
virtual void set_receive_buffer_size(size32_t sz) override { max = sz; }
virtual void read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,
unsigned timeoutsecs = WAIT_FOREVER) override;
unsigned timeoutsecs = WAIT_FOREVER, bool suppresGCIfMinSize = true) override;
virtual void readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,
unsigned timeout) override;
unsigned timeout, bool suppresGCIfMinSize = true) override;
virtual int wait_read(unsigned timeout) override;
virtual void close() override {}
virtual void shutdown(unsigned mode) override { }
Expand Down Expand Up @@ -491,8 +491,8 @@ class CSimulatedUdpReadSocket : public CSimulatedUdpSocket

virtual size32_t get_receive_buffer_size() override;
virtual void set_receive_buffer_size(size32_t sz) override;
virtual void read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeoutsecs = WAIT_FOREVER) override;
virtual void readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeout) override;
virtual void read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeoutsecs = WAIT_FOREVER, bool suppresGCIfMinSize = true) override;
virtual void readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeout, bool suppresGCIfMinSize = true) override;
virtual int wait_read(unsigned timeout) override;
virtual void close() override;

Expand Down
Loading

0 comments on commit 6e8a360

Please sign in to comment.