diff --git a/common/thorhelper/thorsoapcall.cpp b/common/thorhelper/thorsoapcall.cpp index 6fe8f4c8bd6..b0faa9d8b79 100644 --- a/common/thorhelper/thorsoapcall.cpp +++ b/common/thorhelper/thorsoapcall.cpp @@ -1814,27 +1814,16 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo } else if (curPosn >= currLen) { //nothing in buffer, read from socket - size32_t bytesRead=0; - count = 0; - do - { - socket->readtms(buf + count, 0, len - count, bytesRead, timeoutMS); - count += bytesRead; - } while (count != len); + socket->readtms(buf, len, len, count, timeoutMS); currLen = curPosn = 0; } else { //only some is in buffer, read rest from socket - size32_t bytesRead=0; size32_t avail = currLen - curPosn; memcpy(buf, (buffer + curPosn), avail); count = avail; - do - { - size32_t read; - socket->readtms(buf+avail+bytesRead, 0, len-avail-bytesRead, read, timeoutMS); - bytesRead += read; - } while (len != (bytesRead + avail)); + size32_t bytesRead; + socket->readtms(buf+avail, len-avail, len-avail, bytesRead, timeoutMS); count += bytesRead; currLen = curPosn = 0; } @@ -2096,7 +2085,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo do { checkTimeLimitExceeded(&remainingMS); checkRoxieAbortMonitor(master->roxieAbortMonitor); - socket->readtms(buffer+read, 0, WSCBUFFERSIZE-read, bytesRead, MIN(master->timeoutMS,remainingMS)); + readtmsAllowClose(socket, buffer+read, 1, WSCBUFFERSIZE-read, bytesRead, MIN(master->timeoutMS, remainingMS)); checkTimeLimitExceeded(&remainingMS); checkRoxieAbortMonitor(master->roxieAbortMonitor); @@ -2216,7 +2205,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo while (readroxieAbortMonitor); - socket->readtms(response.reserve(payloadsize-read), 0, payloadsize-read, bytesRead, MIN(master->timeoutMS,remainingMS)); + readtmsAllowClose(socket, response.reserve(payloadsize-read), 1, payloadsize-read, bytesRead, MIN(master->timeoutMS, remainingMS)); checkTimeLimitExceeded(&remainingMS); checkRoxieAbortMonitor(master->roxieAbortMonitor); @@ -2232,7 +2221,7 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo for (;;) { checkTimeLimitExceeded(&remainingMS); checkRoxieAbortMonitor(master->roxieAbortMonitor); - socket->readtms(buffer, 0, WSCBUFFERSIZE, bytesRead, MIN(master->timeoutMS,remainingMS)); + readtmsAllowClose(socket, buffer, 1, WSCBUFFERSIZE, bytesRead, MIN(master->timeoutMS, remainingMS)); checkTimeLimitExceeded(&remainingMS); checkRoxieAbortMonitor(master->roxieAbortMonitor); diff --git a/esp/test/httptest/httptest.cpp b/esp/test/httptest/httptest.cpp index 3f5515207fa..b28e3b1edbf 100644 --- a/esp/test/httptest/httptest.cpp +++ b/esp/test/httptest/httptest.cpp @@ -565,7 +565,7 @@ int HttpClient::sendRequest(int times, HttpStat& stat, StringBuffer& req) unsigned int sizeread; do { - socket->read(tmpbuf, 0, 256, sizeread); + socket->read(tmpbuf, 1, 256, sizeread); } while(sizeread > 0); diff --git a/esp/tools/soapplus/httpproxy.cpp b/esp/tools/soapplus/httpproxy.cpp index 59e0fd0fcdd..345f4b92552 100644 --- a/esp/tools/soapplus/httpproxy.cpp +++ b/esp/tools/soapplus/httpproxy.cpp @@ -676,7 +676,7 @@ class CSocksProxyThread : public Thread ip.getHostText(ipstr); char inbuf2[16]; - m_client->read(inbuf2, 0, 16, lenread); + readtmsAllowClose(m_client, inbuf2, 1, 16, lenread, WAIT_FOREVER); StringBuffer username; while(lenread > 0) { @@ -694,7 +694,7 @@ class CSocksProxyThread : public Thread if (done) break; username.append(lenread, inbuf2); - m_client->read(inbuf2, 0, 16, lenread); + readtmsAllowClose(m_client, inbuf2, 1, 16, lenread, WAIT_FOREVER); } if(http_tracelevel >= 5) diff --git a/fs/dafsserver/dafsserver.cpp b/fs/dafsserver/dafsserver.cpp index 9e788fe4447..4bd04fa8a8a 100644 --- a/fs/dafsserver/dafsserver.cpp +++ b/fs/dafsserver/dafsserver.cpp @@ -2924,6 +2924,21 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface e->Release(); } } + bool readtmsCheckGC(ISocket *sock, void *dst, size32_t minSize, size32_t maxSize, size32_t &szRead, unsigned timeout) + { + try + { + sock->read(dst, minSize, maxSize, szRead, WAIT_FOREVER, true); + } + catch (IJSOCK_Exception *e) + { + if (e->errorCode() != JSOCKERR_graceful_close) + throw; + e->Release(); + return true; + } + return false; + } bool isRowServiceClient() const { return calledByRowService; } bool notifySelected(ISocket *sock, unsigned selected) { @@ -2934,68 +2949,87 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface touch(); try { - if (!gotSize) + while (true) { - // left represents amount we have read of leading size32_t (normally expect to be read in 1 go) - if (0 == msg.length()) // 1st time - msgWritePtr = (byte *)msg.reserveTruncate(sizeof(size32_t)); - size32_t szRead; - sock->read(msgWritePtr, 1, sizeof(size32_t)-left, szRead); - left += szRead; - msgWritePtr += szRead; - if (left == sizeof(size32_t)) + if (!gotSize) { - gotSize = true; - msg.read(left); - msg.clear(); - try - { - msgWritePtr = (byte *)msg.reserveTruncate(left); - } - catch (IException *e) + // left represents amount we have read of leading size32_t (normally expect to be read in 1 go) + if (0 == msg.length()) // 1st time + msgWritePtr = (byte *)msg.reserveTruncate(sizeof(size32_t)); + size32_t szRead; + if (readtmsCheckGC(sock, msgWritePtr, 0, sizeof(size32_t)-left, szRead, WAIT_FOREVER)) + THROWJSOCKEXCEPTION(JSOCKERR_graceful_close); + left += szRead; + msgWritePtr += szRead; + if (left == sizeof(size32_t)) // if not, we exit, and rely on next notifySelected { - EXCLOG(e,"notifySelected(1)"); - e->Release(); - left = 0; - // if too big then suggest corrupted packet, try to consume - // JCSMORE this seems a bit pointless, and it used to only read last 'avail', - // which is not necessarily everything that was sent - char fbuf[1024]; - while (true) + gotSize = true; + msg.read(left); + msg.clear(); + try { - try - { - size32_t szRead; - sock->read(fbuf, 1, 1024, szRead); - } - catch (IException *e) + msgWritePtr = (byte *)msg.reserveTruncate(left); + } + catch (IException *e) + { + EXCLOG(e,"notifySelected(1)"); + e->Release(); + left = 0; + // if too big then suggest corrupted packet, try to consume + // JCSMORE this seems a bit pointless, and it used to only read last 'avail', + // which is not necessarily everything that was sent + char fbuf[1024]; + while (true) { - EXCLOG(e,"notifySelected(2)"); - e->Release(); - break; + try + { + size32_t szRead; + sock->read(fbuf, 0, 1024, szRead, WAIT_FOREVER, true); + } + catch (IException *e) + { + EXCLOG(e,"notifySelected(2)"); + e->Release(); + break; + } } } + if (0 == left) + { + gotSize = false; + msg.clear(); + parent->onCloseSocket(this, 5); + return true; + } } - if (0 == left) - { - gotSize = false; - msg.clear(); - parent->onCloseSocket(this, 5); - return true; - } + else + break; // wait for rest via subsequent notifySelected's } - } - else // left represents length of message remaining to receive - { - size32_t szRead; - sock->read(msgWritePtr, 1, left, szRead); - msgWritePtr += szRead; - left -= szRead; - if (0 == left) // NB: only ever here if original size was >0 + bool gc = false; + if (gotSize) // left represents length of message remaining to receive { - gotSize = false; // reset for next packet - parent->handleCompleteMessage(this, msg); // consumes msg + size32_t szRead; + gc = readtmsCheckGC(sock, msgWritePtr, 0, left, szRead, WAIT_FOREVER); + msgWritePtr += szRead; + left -= szRead; + if (0 == left) // NB: only ever here if original size was >0 + { + gotSize = false; // reset for next packet + parent->handleCompleteMessage(this, msg); // consumes msg + if (gc) + THROWJSOCKEXCEPTION(JSOCKERR_graceful_close); + } + else + { + if (gc) + THROWJSOCKEXCEPTION(JSOCKERR_graceful_close); + break; // wait for rest via subsequent notifySelected's + } } + else if (gc) + THROWJSOCKEXCEPTION(JSOCKERR_graceful_close); + // to be here, implies handled full message, loop around to see if more on the wire. + // will break out if nothing/partial. } } catch (IJSOCK_Exception *e) @@ -5607,6 +5641,7 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface eps.getEndpointHostText(peerURL); PROGLOG("Server accepting from %s", peerURL.str()); #endif + sock->set_nonblock(true); addClient(sock.getClear(), false, false); } @@ -5617,17 +5652,19 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface eps.getEndpointHostText(peerURL.clear()); PROGLOG("Server accepting SECURE from %s", peerURL.str()); #endif + sockSSL->set_nonblock(true); addClient(sockSSL.getClear(), true, false); } - if (rowServiceSockAvail) + if (!isContainerized() && rowServiceSockAvail) // in contaierized each service is on a single dedicated port, the below 2 cases are for BM only { #ifdef _DEBUG acceptedRSSock->getPeerEndpoint(eps); eps.getEndpointHostText(peerURL.clear()); PROGLOG("Server accepting row service socket from %s", peerURL.str()); #endif - addClient(acceptedRSSock.getClear(), true, true); + acceptedRSSock->set_nonblock(true); + addClient(acceptedRSSock.getClear(), rowServiceSSL, true); } } else @@ -5659,10 +5696,7 @@ class CRemoteFileServer : implements IRemoteFileServer, public CInterface clients.append(*client.getLink()); } // JCSMORE - perhaps cap # added here... ? - unsigned mode = SELECTMODE_READ; - if (secure) - mode |= SELECTMODE_WRITE; - selecthandler->add(sock, mode, client); + selecthandler->add(sock, SELECTMODE_READ, client); } void stop() diff --git a/roxie/udplib/udpsha.cpp b/roxie/udplib/udpsha.cpp index 36fe5b4aa80..65d66b4c296 100644 --- a/roxie/udplib/udpsha.cpp +++ b/roxie/udplib/udpsha.cpp @@ -986,14 +986,14 @@ void CSimulatedQueueReadSocket::writeOwnSimulatedPacket(void const* buf, size32_ avail.signal(); } -void CSimulatedQueueReadSocket::read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeoutsecs) +void CSimulatedQueueReadSocket::read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeoutsecs, bool suppresGCIfMinSize) { unsigned tms = timeoutsecs == WAIT_FOREVER ? WAIT_FOREVER : timeoutsecs * 1000; readtms(buf, min_size, max_size, size_read, tms); } void CSimulatedQueueReadSocket::readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, - unsigned timeout) + unsigned timeout, bool suppresGCIfMinSize) { size_read = 0; if (!timeout || wait_read(timeout)) @@ -1054,11 +1054,11 @@ CSimulatedUdpReadSocket::~CSimulatedUdpReadSocket() size32_t CSimulatedUdpReadSocket::get_receive_buffer_size() { return realSocket->get_receive_buffer_size(); } void CSimulatedUdpReadSocket::set_receive_buffer_size(size32_t sz) { realSocket->set_receive_buffer_size(sz); } -void CSimulatedUdpReadSocket::read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeoutsecs) +void CSimulatedUdpReadSocket::read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeoutsecs, bool suppresGCIfMinSize) { realSocket->read(buf, min_size, max_size, size_read, timeoutsecs); } -void CSimulatedUdpReadSocket::readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeout) +void CSimulatedUdpReadSocket::readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeout, bool suppresGCIfMinSize) { realSocket->readtms(buf, min_size, max_size, size_read, timeout); } diff --git a/roxie/udplib/udpsha.hpp b/roxie/udplib/udpsha.hpp index a111f75143d..116f7e66a02 100644 --- a/roxie/udplib/udpsha.hpp +++ b/roxie/udplib/udpsha.hpp @@ -339,9 +339,9 @@ class CSocketSimulator : public CInterfaceOf { private: virtual void read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, - unsigned timeoutsecs = WAIT_FOREVER) override { UNIMPLEMENTED; } + unsigned timeoutsecs = WAIT_FOREVER, bool suppresGCIfMinSize = true) override { UNIMPLEMENTED; } virtual void readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, - unsigned timeout) override { UNIMPLEMENTED; } + unsigned timeout, bool suppresGCIfMinSize = true) override { UNIMPLEMENTED; } virtual void read(void* buf, size32_t size) override { UNIMPLEMENTED; } virtual size32_t write(void const* buf, size32_t size) override { UNIMPLEMENTED; } virtual size32_t writetms(void const* buf, size32_t minSize, size32_t size, unsigned timeoutms=WAIT_FOREVER) override { UNIMPLEMENTED; } @@ -442,9 +442,9 @@ class CSimulatedQueueReadSocket : public CSocketSimulator virtual size32_t get_receive_buffer_size() override { return max; } virtual void set_receive_buffer_size(size32_t sz) override { max = sz; } virtual void read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, - unsigned timeoutsecs = WAIT_FOREVER) override; + unsigned timeoutsecs = WAIT_FOREVER, bool suppresGCIfMinSize = true) override; virtual void readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, - unsigned timeout) override; + unsigned timeout, bool suppresGCIfMinSize = true) override; virtual int wait_read(unsigned timeout) override; virtual void close() override {} virtual void shutdown(unsigned mode) override { } @@ -491,8 +491,8 @@ class CSimulatedUdpReadSocket : public CSimulatedUdpSocket virtual size32_t get_receive_buffer_size() override; virtual void set_receive_buffer_size(size32_t sz) override; - virtual void read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeoutsecs = WAIT_FOREVER) override; - virtual void readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeout) override; + virtual void read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeoutsecs = WAIT_FOREVER, bool suppresGCIfMinSize = true) override; + virtual void readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeout, bool suppresGCIfMinSize = true) override; virtual int wait_read(unsigned timeout) override; virtual void close() override; diff --git a/system/jlib/jbsocket.cpp b/system/jlib/jbsocket.cpp index 35853588cdd..227f73f628a 100644 --- a/system/jlib/jbsocket.cpp +++ b/system/jlib/jbsocket.cpp @@ -100,7 +100,7 @@ int BufferedSocket::readline(char* buf, int maxlen, bool keepcrlf, IMultiExcepti m_curptr = 0; m_endptr = 0; unsigned readlen; - m_socket->read(m_buf, 0, BSOCKET_BUFSIZE, readlen, m_timeout); + readtmsAllowClose(m_socket, m_buf, 1, BSOCKET_BUFSIZE, readlen, m_timeout*1000); if(readlen > 0) { m_endptr = readlen; @@ -111,7 +111,6 @@ int BufferedSocket::readline(char* buf, int maxlen, bool keepcrlf, IMultiExcepti buf[ptr++] = '\n'; } } - } break; } @@ -136,7 +135,7 @@ int BufferedSocket::readline(char* buf, int maxlen, bool keepcrlf, IMultiExcepti m_curptr = 0; m_endptr = 0; unsigned readlen; - m_socket->read(m_buf, 0, BSOCKET_BUFSIZE, readlen, m_timeout); + readtmsAllowClose(m_socket, m_buf, 1, BSOCKET_BUFSIZE, readlen, m_timeout*1000); if(readlen <= 0) break; m_endptr = readlen; @@ -218,7 +217,7 @@ int BufferedSocket::read(char* buf, int maxlen) unsigned readlen; try { - m_socket->read(m_buf, 0, BSOCKET_BUFSIZE, readlen, m_timeout); + m_socket->read(m_buf, 1, BSOCKET_BUFSIZE, readlen, m_timeout); } catch (IException *e) { diff --git a/system/jlib/jfile.cpp b/system/jlib/jfile.cpp index d4f3689163c..ca2ad86ef04 100644 --- a/system/jlib/jfile.cpp +++ b/system/jlib/jfile.cpp @@ -6467,7 +6467,7 @@ class CSocketSerialStream: public CSerialStreamBase if (lastpos!=pos) throw MakeStringException(-1,"CSocketSerialStream: non-sequential read (%" I64F "d,%" I64F "d)",lastpos,pos); size32_t size_read; - socket->readtms(ptr, 0, max_size, size_read, timeout); + readtmsAllowClose(socket, ptr, 1, max_size, size_read, timeout); lastpos = pos+size_read; return size_read; } diff --git a/system/jlib/jsocket.cpp b/system/jlib/jsocket.cpp index 4142a6bd125..3ff9b93f2f2 100644 --- a/system/jlib/jsocket.cpp +++ b/system/jlib/jsocket.cpp @@ -519,8 +519,8 @@ class CSocket: public ISocket, public CInterface void connect_wait( unsigned timems); void udpconnect(); - void read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,unsigned timeoutsecs); - void readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timedelaysecs); + void read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeoutsecs, bool suppresGCIfMinSize=true); + void readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timedelaysecs, bool suppresGCIfMinSize=true); void read(void* buf, size32_t size); size32_t write(void const* buf, size32_t size); size32_t writetms(void const* buf, size32_t minSize, size32_t size, unsigned timeoutms=WAIT_FOREVER); @@ -889,6 +889,9 @@ inline void getSockAddrEndpoint(const J_SOCKADDR &u, socklen_t ul, SocketEndpoin bool CSocket::set_nonblock(bool on) { + if (nonblocking==on) + return nonblocking; + int flags = fcntl(sock, F_GETFL, 0); if (flags == -1) return nonblocking; @@ -966,7 +969,7 @@ size32_t CSocket::avail_read() #define PRE_CONN_UNREACH_ELIM 100 -int CSocket::pre_connect (bool block) +int CSocket::pre_connect(bool block) { if (targetip.isNull()) { @@ -1015,7 +1018,7 @@ int CSocket::pre_connect (bool block) return err; } -int CSocket::post_connect () +int CSocket::post_connect() { set_nonblock(false); int err = 0; @@ -1921,29 +1924,29 @@ int CSocket::wait_write(unsigned timeout) return ret; } -void CSocket::readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &sizeRead, unsigned timeoutMs) +void CSocket::readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &sizeRead, unsigned timeoutMs, bool suppresGCIfMinSize) { + /* + * Read at least min_size bytes, up to max_size bytes. + * Reads as much as possible off socket until block detected. + * Uses non-blocking (in theory could avoid if infinite timeout and min_size==max_size, but not worth the complexity). + * NB: If min_size==0 then will return asap if no data is avail. + * NB: If min_size==0, but notified of graceful close, throw graceful close exception. + * NB: timeout is meaningless if min_size is 0 + */ + sizeRead = 0; if (0 == max_size) - { return; - } if (state != ss_open) THROWJSOCKEXCEPTION(JSOCKERR_not_opened); - // NB: The semantics here, effectively mean min_size is always >0, because it first waits on wait_read - // i.e. something has to be on socket to continue (or error/graceful close). + ScopedNonBlockingMode scopedNonBlockingMode(this); CCycleTimer timer; while (true) { - unsigned remainingMs = timer.remainingMs(timeoutMs); - int rc = wait_read(remainingMs); - if (rc < 0) - THROWJSOCKTARGETEXCEPTION(SOCKETERRNO()); - else if (rc == 0) - THROWJSOCKTARGETEXCEPTION(JSOCKERR_timeout_expired); - unsigned retrycount=100; + int rc; EintrRetry: if (sockmode==sm_udp_server) // udp server { @@ -1958,8 +1961,9 @@ void CSocket::readtms(void* buf, size32_t min_size, size32_t max_size, size32_t if (rc > 0) { sizeRead += rc; - if (sizeRead >= min_size) + if (sizeRead == max_size) break; + // NB: will exit when blocked if sizeRead >= min_size } else { @@ -1984,11 +1988,12 @@ void CSocket::readtms(void* buf, size32_t min_size, size32_t max_size, size32_t } else { - if (nonblocking && (err == JSE_WOULDBLOCK || err == EAGAIN)) // if EGAIN or EWOULDBLOCK - no more data to read + // NB: can only be here if sizeRead < min_size OR min_size = 0 + if (err == JSE_WOULDBLOCK || err == EAGAIN) // if EGAIN or EWOULDBLOCK - no more data to read { - if (0 == min_size) // if here, implies nothing read, since it would have exited already in (rc > 0) block. + if (sizeRead >= min_size) break; - // fall through/loop around. NB: rc != 0 + // otherwise, continue waiting for min_size } else { @@ -2001,16 +2006,40 @@ void CSocket::readtms(void* buf, size32_t min_size, size32_t max_size, size32_t } THROWJSOCKTARGETEXCEPTION(err); } + // fall through to timeout/wait_read handling below. } } if (rc == 0) { state = ss_shutdown; - if (sizeRead >= min_size) - break; // suppress graceful close exception if have already read minimum + if (suppresGCIfMinSize && (sizeRead >= min_size)) + break; THROWJSOCKTARGETEXCEPTION(JSOCKERR_graceful_close); } } + + unsigned remainingMs = timer.remainingMs(timeoutMs); + if (rc > 0) + { + if (0 == remainingMs) + { + if (sizeRead >= min_size) + break; + THROWJSOCKTARGETEXCEPTION(JSOCKERR_timeout_expired); + } + + // loop around to read more, or detect blocked. + } + else // NB rc < 0, (if rc == 0 handeld already above) + { + // here because blocked (and sizeRead < min_size) + rc = wait_read(remainingMs); + if (rc < 0) + THROWJSOCKTARGETEXCEPTION(SOCKETERRNO()); + else if (rc == 0) + THROWJSOCKTARGETEXCEPTION(JSOCKERR_timeout_expired); + } + //else // read something, loop around to see if can read more, or detect blocked. } cycle_t elapsedCycles = timer.elapsedCycles(); @@ -2022,10 +2051,24 @@ void CSocket::readtms(void* buf, size32_t min_size, size32_t max_size, size32_t stats.ioReadCycles += elapsedCycles; } -void CSocket::read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeoutsecs) +void CSocket::read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeoutsecs, bool suppresGCIfMinSize) { unsigned timeoutMs = (timeoutsecs==WAIT_FOREVER) ? WAIT_FOREVER : (timeoutsecs * 1000); - readtms(buf, min_size, max_size, size_read, timeoutMs); + readtms(buf, min_size, max_size, size_read, timeoutMs, suppresGCIfMinSize); +} + +void readtmsAllowClose(ISocket *sock, void* buf, size32_t min_size, size32_t max_size, size32_t &sizeRead, unsigned timeoutMs) +{ + try + { + sock->readtms(buf, min_size, max_size, sizeRead, timeoutMs, false); + } + catch(IJSOCK_Exception *e) + { + if (JSOCKERR_graceful_close != e->errorCode()) + throw; + e->Release(); + } } void CSocket::read(void* buf, size32_t size) @@ -2043,17 +2086,7 @@ size32_t CSocket::writetms(void const* buf, size32_t minSize, size32_t size, uns if (state != ss_open) THROWJSOCKTARGETEXCEPTION(JSOCKERR_not_opened); - // If timeoutMs != WAIT_FOREVER, set non-blocking mode for the duration of this function - struct ScopedNonBlockingMode - { - CSocket *socket = nullptr; - bool prevMode = false; - void init(CSocket *_socket) { socket = _socket; prevMode = socket->set_nonblock(true); } - ~ScopedNonBlockingMode() { if (socket) socket->set_nonblock(prevMode); } - } scopedNonBlockingMode; - - if (WAIT_FOREVER != timeoutMs) - scopedNonBlockingMode.init(this); + ScopedNonBlockingMode scopedNonBlockingMode(this); while (true) { unsigned retrycount=100; @@ -2079,8 +2112,9 @@ size32_t CSocket::writetms(void const* buf, size32_t minSize, size32_t size, uns if (rc > 0) { sizeWritten += rc; - if (sizeWritten >= minSize) + if (sizeWritten == size) break; + // NB: will exit when blocked if sizeWritten >= minSize } else if (rc < 0) { @@ -2109,8 +2143,10 @@ size32_t CSocket::writetms(void const* buf, size32_t minSize, size32_t size, uns errclose(); err = JSOCKERR_broken_pipe; } - if ((err == JSE_WOULDBLOCK) && nonblocking) + if (err == JSE_WOULDBLOCK) { + if (sizeWritten >= minSize) + break; unsigned remainingMs = timer.remainingMs(timeoutMs); rc = wait_write(remainingMs); if (rc < 0) @@ -2234,7 +2270,6 @@ size32_t CSocket::udp_write_to(const SocketEndpoint &ep, void const* buf, size32 size32_t CSocket::write_multiple(unsigned num,const void **buf, size32_t *size) { assertex(sockmode!=sm_udp_server); - assertex(!nonblocking); if (num==1) return write(buf[0],size[0]); size32_t total = 0; @@ -2574,7 +2609,6 @@ void CSocket::shutdown(unsigned mode) void CSocket::shutdownNoThrow(unsigned mode) { if (state == ss_open) { - state = ss_shutdown; #ifdef SOCKTRACE PROGLOG("SOCKTRACE: shutdown(%d) socket %x %d (%p)", mode, sock, sock, this); #endif @@ -7218,3 +7252,18 @@ extern jlib_decl void shutdownAndCloseNoThrow(ISocket * optSocket) e->Release(); } } + +//// +void ScopedNonBlockingMode::init(ISocket *_socket) +{ + socket = _socket; + if (socket->set_nonblock(true)) // was already nonblocking + socket = nullptr; // nothing to reset in dtor +} + +ScopedNonBlockingMode::~ScopedNonBlockingMode() +{ + // only applicable if resetting back to nonblocking = false + if (socket) + socket->set_nonblock(false); +} diff --git a/system/jlib/jsocket.hpp b/system/jlib/jsocket.hpp index dec2268a56b..c08662c54fc 100644 --- a/system/jlib/jsocket.hpp +++ b/system/jlib/jsocket.hpp @@ -227,7 +227,6 @@ class jlib_decl IpSubNet } }; - class jlib_decl ISocket : extends IInterface { public: @@ -289,13 +288,15 @@ class jlib_decl ISocket : extends IInterface // static ISocket* attach(int s,bool tcpip=true); - + // suppresGCIfMinSize - if true, will suppress graceful close if size_read >= min_size + // This is the default behavior for backwards compatibility. + // Set to false, to allow caller to see graceful close even if size_read >= min_size virtual void read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, - unsigned timeoutsecs = WAIT_FOREVER) = 0; + unsigned timeoutsecs = WAIT_FOREVER, bool suppresGCIfMinSize = true) = 0; virtual void readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, - unsigned timeout) = 0; + unsigned timeout, bool suppresGCIfMinSize = true) = 0; virtual void read(void* buf, size32_t size) = 0; - virtual size32_t write(void const* buf, size32_t size) = 0; // returns amount written normally same as in size (see set_nonblock) + virtual size32_t write(void const* buf, size32_t size) = 0; virtual size32_t writetms(void const* buf, size32_t minSize, size32_t size, unsigned timeoutms=WAIT_FOREVER) = 0; virtual size32_t get_max_send_size() = 0; @@ -456,6 +457,9 @@ Exceptions raised: (when set_raise_exceptions(TRUE)) }; +// helper function that allows a graceful close on a readtms to return with less than min_size. +// A common pattern is to read >=1 byte(s), but allow graceful close to return less (e.g. 0) +extern jlib_decl void readtmsAllowClose(ISocket *sock, void* buf, size32_t min_size, size32_t max_size, size32_t &sizeRead, unsigned timeoutMs); interface jlib_thrown_decl IJSOCK_Exception: extends IException { @@ -742,6 +746,15 @@ class jlib_decl CSingletonSocketConnection: implements IConversation, public CIn extern jlib_decl void shutdownAndCloseNoThrow(ISocket * optSocket); // Safely shutdown and close a socket without throwing an exception. +struct jlib_decl ScopedNonBlockingMode +{ + ISocket *socket = nullptr; + void init(ISocket *_socket); + ScopedNonBlockingMode(ISocket *_socket) { init(_socket); } + ~ScopedNonBlockingMode(); +}; + + #ifdef _WIN32 #define SOCKETERRNO() WSAGetLastError() #else diff --git a/system/jlib/jutil.cpp b/system/jlib/jutil.cpp index dbed0920d38..4d07204dd4f 100644 --- a/system/jlib/jutil.cpp +++ b/system/jlib/jutil.cpp @@ -3592,3 +3592,16 @@ extern jlib_decl void getResourceFromJfrog(StringBuffer &localPath, IPropertyTre throw makeStringExceptionV(0, "MD5 mismatch on file %s in manifest", filename.str()); } } + +void hold(const char *msg) +{ + WARNLOG("Holding: %s", msg); + bool held = true; + while (held) + { + MilliSleep(5000); + } + WARNLOG("Released: %s", msg); +} + + diff --git a/system/jlib/jutil.hpp b/system/jlib/jutil.hpp index 445899965be..817befb1082 100644 --- a/system/jlib/jutil.hpp +++ b/system/jlib/jutil.hpp @@ -663,5 +663,7 @@ extern jlib_decl bool getDefaultPlane(StringBuffer &ret, const char * componentO extern jlib_decl void getResourceFromJfrog(StringBuffer &localPath, IPropertyTree &item); +extern jlib_decl void hold(const char *msg); + #endif diff --git a/system/mp/mpcomm.cpp b/system/mp/mpcomm.cpp index 6355baa5a15..a957901706b 100644 --- a/system/mp/mpcomm.cpp +++ b/system/mp/mpcomm.cpp @@ -26,6 +26,8 @@ #include #include +#include +#include #include "platform.h" #include "portlist.h" @@ -75,8 +77,9 @@ #define CONNECT_TIMEOUT_MINSLEEP 2000 // random range: CONNECT_TIMEOUT_MINSLEEP to CONNECT_TIMEOUT_MAXSLEEP milliseconds #define CONNECT_TIMEOUT_MAXSLEEP 5000 -#define CONFIRM_TIMEOUT (90*1000) // 1.5 mins -#define CONFIRM_TIMEOUT_INTERVAL 5000 // 5 secs +#define PING_CONFIRM_TIMEOUT (90*1000) // 1.5 mins +#define HEADER_CONFIRM_TIMEOUT (90*1000) // 1.5 mins +#define HEADER_CONFIRM_TIMEOUT_INTERVAL 5000 // 5 secs #define TRACESLOW_THRESHOLD 1000 // 1 sec #define VERIFY_DELAY (1*60*1000) // 1 Minute @@ -441,97 +444,322 @@ class CMPServer; class CMPChannel; -class CMPConnectThread: public Thread +static CriticalSection portProbeCS; +static cycle_t portProbeLastLog = 0; +enum CloseType { CloseType_graceful, CloseType_error, CloseType_timeout, CloseType_COUNT }; +static std::array portProbeCloseCounts = {}; +static std::array portProbeCloseCycles = {}; + + +static void trackPortProbe(cycle_t elapsedCycles, const char *peerEndpointTest, CloseType closeType) { - std::atomic running; - bool listen; - ISocket *listensock; - CMPServer *parent; - int mpSoMaxConn; - unsigned acceptThreadPoolSize = 0; - Owned threadPool; + // this should be based on a logging feature flag + CriticalBlock b(portProbeCS); + portProbeCloseCounts[closeType]++; + portProbeCloseCycles[closeType] += elapsedCycles; + if ((0 == portProbeLastLog)) + portProbeLastLog = get_cycles_now(); + else + { + cycle_t nowCycles = get_cycles_now(); + cycle_t cyclesSinceLastLog = nowCycles - portProbeLastLog; + if ((cyclesSinceLastLog >= (queryOneSecCycles()*60))) // log max every minute + { + portProbeLastLog = nowCycles; + unsigned __int64 totalProbes = portProbeCloseCounts[CloseType_graceful] + portProbeCloseCounts[CloseType_error] + portProbeCloseCounts[CloseType_timeout]; + DBGLOG("Port probes: %" I64F "u [graceful=%" I64F "u (%" I64F "u ms),error=%" I64F "u (%" I64F "u ms),timedout=%" I64F "u (%" I64F "u ms). Last: %s, type=%s, time: %" I64F "u", + totalProbes, + portProbeCloseCounts[CloseType_graceful], cycle_to_millisec(portProbeCloseCycles[CloseType_graceful]), + portProbeCloseCounts[CloseType_error], cycle_to_millisec(portProbeCloseCycles[CloseType_error]), + portProbeCloseCounts[CloseType_timeout], cycle_to_millisec(portProbeCloseCycles[CloseType_timeout]), + peerEndpointTest, CloseType_graceful==closeType?"graceful":CloseType_error==closeType?"error":"timeout", cycle_to_millisec(elapsedCycles)); + } + } +} - Owned allowListCallback; - void checkSelfDestruct(void *p,size32_t sz); +// Legacy header sent id[2] only (but legacy clients are no longer supported since 9.6.4, i.e. they will fail during connection process) +struct ConnectHdr +{ + ConnectHdr(const SocketEndpoint &hostEp, const SocketEndpoint &remoteEp, unsigned __int64 role) + { + id[0].set(hostEp); + id[1].set(remoteEp); + + hdr.size = sizeof(PacketHeader); + hdr.tag = TAG_SYS_BCAST; + hdr.flags = 0; + hdr.version = MP_PROTOCOL_VERSION; + setRole(role); + } + ConnectHdr() + { + } + SocketEndpointV4 id[2]; + PacketHeader hdr; + inline void setRole(unsigned __int64 role) + { + hdr.replytag = (mptag_t) (role >> 32); + hdr.sequence = (unsigned) (role & 0xffffffff); + } + inline unsigned __int64 getRole() const + { + return (((unsigned __int64)hdr.replytag)<<32) | ((unsigned __int64)hdr.sequence); + } +}; - Owned secureContextServer; - class CSlowClientProcessor : implements IThreaded +class CMPConnectThread: public Thread +{ + class CConnectSelectHandler { CMPConnectThread &owner; - CThreaded threaded; - std::vector> slowClientsSocks; - CriticalSection crit; - Semaphore sem; - std::atomic stopped = true; - + Owned selectHandler; + unsigned mode = SELECTMODE_READ; public: - CSlowClientProcessor(CMPConnectThread &_owner) : threaded("CSlowClientProcessor"), owner(_owner) + class CSocketHandler : public CInterfaceOf { - } - void start() + CConnectSelectHandler &selectHandler; + Owned sock; + SocketEndpoint peerEP; + mutable StringBuffer peerHostText, peerEndpointText; + ConnectHdr hdr; + cycle_t createTime = 0; + size32_t readSoFar = 0; + CriticalSection crit; + bool closedOrHandled = false; + public: + CSocketHandler(CConnectSelectHandler &_selectHandler, ISocket *_sock, const SocketEndpoint &_peerEP) : selectHandler(_selectHandler), sock(_sock), peerEP(_peerEP) + { + createTime = get_cycles_now(); + } + ISocket *querySocket() + { + return sock; + } + ConnectHdr &queryHdr() + { + return hdr; + } + cycle_t queryCreateTime() const + { + return createTime; + } + size32_t queryReadSoFar() const + { + return readSoFar; + } + const char *queryPeerHostText() const + { + if (0 == peerHostText.length()) + peerEP.getHostText(peerHostText); + return peerHostText; + } + const char *queryPeerEndpointText() const + { + if (0 == peerEndpointText.length()) + peerEP.getEndpointHostText(peerEndpointText); + return peerEndpointText; + } + bool closeIfTimedout(cycle_t now) + { + return false; + if (cycle_to_millisec(now - createTime) >= HEADER_CONFIRM_TIMEOUT) + { + // will block any pending notifySelected on this socket + CriticalBlock b(crit); + if (!closedOrHandled) + { + closedOrHandled = true; + return true; + } + } + return false; + } + // ISocketSelectNotify impl. + virtual bool notifySelected(ISocket *sock, unsigned selected) override + { + CLeavableCriticalBlock b(crit); + if (closedOrHandled) + return false; + size32_t rd = 0; + void *p = (byte *)&hdr + readSoFar; + + Owned exception; + try + { + sock->readtms(p, 0, sizeof(ConnectHdr)-readSoFar, rd, 60000); // long enough! + readSoFar += rd; + if (sizeof(ConnectHdr) == readSoFar) + { + closedOrHandled = true; + // process() will remove itself from handler, and need to avoid it doing so while in 'crit' + // since the maintenance thread could also be tyring to manipulate handlers and calling closeIfTimedout() + b.leave(); + selectHandler.process(*this); + } + } + catch (IJSOCK_Exception *e) + { + exception.setown(e); + } + if (exception) + selectHandler.close(*this, exception); + + return false; + } + }; + private: + // NB: Linked vs Owned, because methods will implicitly construct an object of this type + // which can be problematic/confusing, for example if Owned and std::list->remove is called + // with a pointer, it will auto instantiate a OWned and cause -ve leak. + std::list> handlers; + + CriticalSection handlersCS; + + std::thread maintenanceThread; + Semaphore maintenanceSem; + + void clearupSocketHandlers() { - stopped = false; - threaded.init(this, false); + std::vector> toClose; + { + CriticalBlock b(handlersCS); + auto it = handlers.begin(); + while (true) + { + if (it == handlers.end()) + break; + CSocketHandler *socketHandler = *it; + if (socketHandler->closeIfTimedout(get_cycles_now())) + { + toClose.push_back(LINK(socketHandler)); + it = handlers.erase(it); + } + else + ++it; + } + } + for (auto &socketHandler: toClose) + { + try + { + Owned e = createJSocketException(JSOCKERR_timeout_expired, "Connect timeout expired", __FILE__, __LINE__); + close(*socketHandler, e); + } + catch (IException *e) + { + EXCLOG(e, "CConnectSelectHandler::maintenanceFunc"); + e->Release(); + } + } } - void stop() + public: + CConnectSelectHandler(CMPConnectThread &_owner) : owner(_owner) { - if (stopped) - return; + selectHandler.setown(createSocketSelectHandler()); + selectHandler->start(); + auto maintenanceFunc = [&] { - CriticalBlock b(crit); - stopped = true; - for (auto &sock : slowClientsSocks) - sock->close(); - slowClientsSocks.clear(); - } - - sem.signal(); - if (!threaded.join(1000*60*5)) - printf("CSlowClientProcessor::stop timed out\n"); + while (owner.running) + { + if (maintenanceSem.wait(10000)) // check every 10s + break; + clearupSocketHandlers(); + } + }; + maintenanceThread = std::thread(maintenanceFunc); + } + ~CConnectSelectHandler() + { + maintenanceSem.signal(); + maintenanceThread.join(); } - void add(ISocket *sock) // NB: takes ownership + void add(ISocket *sock, const SocketEndpoint &peerEP) { + while (true) { - CriticalBlock b(crit); - if (stopped) + unsigned numHandlers; { - sock->Release(); - return; + CriticalBlock b(handlersCS); + numHandlers = handlers.size(); } - slowClientsSocks.emplace_back(sock); + if (numHandlers < owner.maxListenHandlerSockets) + break; + DBGLOG("Too many handlers (%u), waiting for some to be processed (max limit: %u)", numHandlers, owner.maxListenHandlerSockets); + MilliSleep(1000); } - sem.signal(); + + Owned socketHandler = new CSocketHandler(*this, LINK(sock), peerEP); + + size_t numHandlers; + { + CriticalBlock b(handlersCS); + selectHandler->add(sock, mode, socketHandler); // NB: sock and handler linked by select handler + handlers.emplace_back(socketHandler); + numHandlers = handlers.size(); + } + if (0 == (numHandlers % 100)) // for info. log at each 100 boundary + DBGLOG("handlers = %u", (unsigned)numHandlers); } - // IThreaded - virtual void threadmain() override + void close(CSocketHandler &socketHandler, IJSOCK_Exception *exception) { - // The slow client processor deals with each queued slow client socket in turn, waiting the standard CONFIRM_TIMEOUT for each. - // An alternative would be to try each for shorter periods, shuffling them to the end if they still haven't been handled. - // But this is all probably OTT. The main thing is to avoid a slow client blocking the accept loop. - while (true) + if (socketHandler.queryReadSoFar()) // read something { - sem.wait(); - - Owned sock; + VStringBuffer errMsg("MP Connect Thread: invalid number of connection bytes serialized from: %s", socketHandler.queryPeerHostText()); + FLLOG(MCoperatorWarning, "%s", errMsg.str()); + } + int exceptionCode = exception->errorCode(); + switch (exceptionCode) + { + case JSOCKERR_timeout_expired: + trackPortProbe(get_cycles_now() - socketHandler.queryCreateTime(), socketHandler.queryPeerEndpointText(), CloseType_timeout); + break; + case JSOCKERR_graceful_close: + trackPortProbe(get_cycles_now() - socketHandler.queryCreateTime(), socketHandler.queryPeerEndpointText(), CloseType_graceful); + break; + default: + trackPortProbe(get_cycles_now() - socketHandler.queryCreateTime(), socketHandler.queryPeerEndpointText(), CloseType_error); + break; + } - { - CriticalBlock b(crit); - if (stopped) - break; - if (slowClientsSocks.empty()) // guard, but should never happen - { - WARNLOG("slowClientsSocks list empty"); - continue; - } - sock.set(slowClientsSocks.back()); - slowClientsSocks.pop_back(); - } - owner.handleAcceptedSocket(sock.getClear(), CONFIRM_TIMEOUT, true); + Linked handler = &socketHandler; + { + CriticalBlock b(handlersCS); + selectHandler->remove(socketHandler.querySocket()); + handlers.remove(&socketHandler); } + handler->querySocket()->close(); } - } slowClientProcessor; + void process(CSocketHandler &socketHandler) + { + Linked handler = &socketHandler; + { + CriticalBlock b(handlersCS); + selectHandler->remove(socketHandler.querySocket()); + handlers.remove(&socketHandler); + } + + if (owner.threadPool) + owner.threadPool->start(handler.getClear()); + else + owner.handleAcceptedSocket(handler.getClear()); + } + }; + std::atomic running; + bool listen; + ISocket *listensock; + CMPServer *parent; + int mpSoMaxConn; + unsigned acceptThreadPoolSize = 0; + unsigned maxListenHandlerSockets = 60000; // what is a sensible default limit? + Owned threadPool; + + Owned allowListCallback; + void checkSelfDestruct(void *p,size32_t sz); + + Owned secureContextServer; + public: CMPConnectThread(CMPServer *_parent, unsigned port, bool _listen); ~CMPConnectThread() @@ -555,7 +783,6 @@ class CMPConnectThread: public Thread { if (!threadPool->joinAll(true, 1000*60*5)) printf("CMPConnectThread::stop threadPool->joinAll timed out\n"); - slowClientProcessor.stop(); } } } @@ -567,7 +794,7 @@ class CMPConnectThread: public Thread { return allowListCallback; } - bool handleAcceptedSocket(ISocket *sock, unsigned timeoutMs, bool failOnTimeout); + bool handleAcceptedSocket(CConnectSelectHandler::CSocketHandler *handler); }; class PingPacketHandler; @@ -755,7 +982,7 @@ class CMPNotifyClosedThread: public Thread } } catch (IException *e) { - FLLOG(MCoperatorWarning, e,"MP writepacket"); + FLLOG(MCoperatorWarning, e, "MP writepacket"); e->Release(); } } @@ -786,131 +1013,6 @@ class CMPNotifyClosedThread: public Thread }; -void traceSlowReadTms(const char *msg, ISocket *sock, void *dst, size32_t minSize, size32_t maxSize, size32_t &sizeRead, unsigned timeoutMs, unsigned timeoutChkIntervalMs) -{ - dbgassertex(timeoutChkIntervalMs <= timeoutMs); - StringBuffer epStr; - CCycleTimer readTmsTimer; - unsigned intervalTimeoutMs = 500; - CCycleTimer intvlTimer; - - // legacy client sends minSize, recent client sends maxSize - // if read < maxSize, keep trying for maxSize, but if its exactly minSize - // somewhat quickly (without waiting full timeout) settle for minSize ... - - if (intervalTimeoutMs > timeoutChkIntervalMs) - intervalTimeoutMs = timeoutChkIntervalMs; - - sizeRead = 0; - - unsigned firstReadTime = 0; - size32_t maxRead = maxSize; - for (;;) - { - try - { - size32_t amtRead = 0; - sock->readtms((char *)dst+sizeRead, 0, maxRead, amtRead, intervalTimeoutMs); - sizeRead += amtRead; - if (sizeRead == maxSize) - break; - maxRead -= amtRead; - } - catch (IJSOCK_Exception *e) - { - if (JSOCKERR_graceful_close == e->errorCode()) - { - e->Release(); - return; - } - else if (JSOCKERR_timeout_expired != e->errorCode()) - throw; - // interval read timed out ... - unsigned elapsedMs = readTmsTimer.elapsedMs(); - if (sizeRead == minSize) - { - if (firstReadTime == 0) - firstReadTime = elapsedMs; - else if ((elapsedMs - firstReadTime) >= 5000) // max wait if minSize sent - { - e->Release(); - break; - } - } - if (elapsedMs >= timeoutMs) - { - if (sizeRead >= minSize) - { - e->Release(); - break; - } - throw; - } - unsigned remainingMs = timeoutMs-elapsedMs; - if (remainingMs < intervalTimeoutMs) - intervalTimeoutMs = remainingMs; - if (intvlTimer.elapsedMs() >= timeoutChkIntervalMs) - { - intvlTimer.reset(); - if (0 == epStr.length()) - { - SocketEndpoint ep; - sock->getPeerEndpoint(ep); - ep.getEndpointHostText(epStr); - } - WARNLOG("%s %s, stalled for %d ms so far", msg, epStr.str(), elapsedMs); - } - e->Release(); - } - } - if (readTmsTimer.elapsedMs() >= TRACESLOW_THRESHOLD) - { - if (0 == epStr.length()) - { - SocketEndpoint ep; - sock->getPeerEndpoint(ep); - ep.getEndpointHostText(epStr); - } - WARNLOG("%s %s, took: %d ms", msg, epStr.str(), readTmsTimer.elapsedMs()); - } -} - -/* Legacy header sent id[2] only. - * To remain backward compatible (when new MP clients are connecting to old Dali), - * we send a regular empty PacketHeader as well that has the 'role' embedded within it, - * in unused fields. TAG_SYS_BCAST is used as the message tag, because it is an - * unused feature that all Dali's simply receive and delete. - */ -struct ConnectHdr -{ - ConnectHdr(const SocketEndpoint &hostEp, const SocketEndpoint &remoteEp, unsigned __int64 role) - { - id[0].set(hostEp); - id[1].set(remoteEp); - - hdr.size = sizeof(PacketHeader); - hdr.tag = TAG_SYS_BCAST; - hdr.flags = 0; - hdr.version = MP_PROTOCOL_VERSION; - setRole(role); - } - ConnectHdr() - { - } - SocketEndpointV4 id[2]; - PacketHeader hdr; - inline void setRole(unsigned __int64 role) - { - hdr.replytag = (mptag_t) (role >> 32); - hdr.sequence = (unsigned) (role & 0xffffffff); - } - inline unsigned __int64 getRole() const - { - return (((unsigned __int64)hdr.replytag)<<32) | ((unsigned __int64)hdr.sequence); - } -}; - - class CMPPacketReader; class CMPChannel: public CInterface @@ -983,6 +1085,7 @@ protected: friend class CMPPacketReader; if (remaining<10000) remaining = 10000; // 10s min granularity for MP newsock.setown(ISocket::connect_timeout(remoteep,remaining)); + newsock->set_nonblock(true); #if defined(_USE_OPENSSL) if (parent->useTLS) @@ -1073,7 +1176,7 @@ protected: friend class CMPPacketReader; // if its an exception or legacy and not in allowlist the other side closes its socket after sending this msg ... size32_t amtRead = 0; - newsock->readtms(&replyBuf[totRead], 0, maxRead, amtRead, CONNECT_TIMEOUT_INTERVAL); + newsock->readtms(&replyBuf[totRead], 1, maxRead, amtRead, CONNECT_TIMEOUT_INTERVAL); totRead += amtRead; if (totRead == sizeof(size32_t)) { @@ -1512,7 +1615,7 @@ class PingPacketHandler // TAG_SYS_PING public: void handle(CMPChannel *channel,bool identifyself) { - channel->sendPingReply(CONFIRM_TIMEOUT,identifyself); + channel->sendPingReply(PING_CONFIRM_TIMEOUT, identifyself); } bool send(CMPChannel *channel,PacketHeader &hdr,CTimeMon &tm) { @@ -1700,11 +1803,12 @@ class ForwardPacketHandler // TAG_SYS_FORWARD class CMPPacketReader: public ISocketSelectNotify, public CInterface { - CMessageBuffer *activemsg; - byte * activeptr; - size32_t remaining; - CMPChannel *parent; + CMessageBuffer *activemsg = nullptr; + byte * activeptr = nullptr; + size32_t remaining = 0; + CMPChannel *parent = nullptr; CriticalSection sect; + bool gotPacketHdr = false; public: IMPLEMENT_IINTERFACE; @@ -1725,136 +1829,130 @@ class CMPPacketReader: public ISocketSelectNotify, public CInterface parent = NULL; } - bool notifySelected(ISocket *sock,unsigned selected) + bool notifySelected(ISocket *sock, unsigned selected) { if (!parent) return false; - try { - // try and mop up all data on socket - // TLS TODO: avail_read() may not return accurate amount of pending bytes - size32_t sizeavail = sock->avail_read(); - if (sizeavail==0) { - // graceful close - Linked pc; - { - CriticalBlock block(sect); - if (parent) { - pc.set(parent); // don't want channel to disappear during call - parent = NULL; - } - } - if (pc) - { -#ifdef _TRACELINKCLOSED - LOG(MCdebugInfo, "CMPPacketReader::notifySelected() about to close socket, mode = 0x%x", selected); -#endif - pc->closeSocket(false, true); - } - return false; - } - do { + try + { + while (true) // NB: breaks out if blocked (if (remaining) ..) + { + // try and mop up all data on socket parent->lastxfer = msTick(); #ifdef _FULLTRACE parent->numiter++; #endif - if (!activemsg) { // no message in progress - PacketHeader hdr; // header for active message + if (!activemsg) // no message in progress + { #ifdef _FULLTRACE parent->numiter = 1; parent->startxfer = msTick(); #endif - // assumes packet header will arrive in one go - if (sizeavailread(&hdr,sizeof(hdr),sizeof(hdr),szread,60); // I don't *really* want to block here but not much else can do - } - else - sock->read(&hdr,sizeof(hdr)); - if (hdr.version/0x100 != MP_PROTOCOL_VERSION/0x100) { + remaining = sizeof(PacketHeader); + gotPacketHdr = false; + activemsg = new CMessageBuffer(remaining); + activeptr = (byte *)activemsg->bufferBase(); + } + size32_t szRead; + if (!gotPacketHdr) + { + CCycleTimer timer; + sock->readtms(activeptr, 0, remaining, szRead, timer.remainingMs(60000)); + remaining -= szRead; + activeptr += szRead; + if (remaining) // only possible if blocked. + return false; // wait for next notification + + PacketHeader &hdr = *(PacketHeader *)activemsg->bufferBase(); + if (hdr.version/0x100 != MP_PROTOCOL_VERSION/0x100) + { // TBD IPV6 here SocketEndpoint ep; sock->getPeerEndpoint(ep); IMP_Exception *e=new CMPException(MPERR_protocol_version_mismatch,ep); throw e; } - if (sizeavail<=sizeof(hdr)) - sizeavail = sock->avail_read(); - else - sizeavail -= sizeof(hdr); -#ifdef _FULLTRACE + hdr.setMessageFields(*activemsg); + #ifdef _FULLTRACE StringBuffer ep1; StringBuffer ep2; LOG(MCdebugInfo, "MP: ReadPacket(sender=%s,target=%s,tag=%d,replytag=%d,size=%d)",hdr.sender.getEndpointHostText(ep1).str(),hdr.target.getEndpointHostText(ep2).str(),hdr.tag,hdr.replytag,hdr.size); -#endif + #endif remaining = hdr.size-sizeof(hdr); - activemsg = new CMessageBuffer(remaining); // will get from low level IO at some stage - activeptr = (byte *)activemsg->reserveTruncate(remaining); - hdr.setMessageFields(*activemsg); + activeptr = (byte *)activemsg->clear().reserveTruncate(remaining); + gotPacketHdr = true; } - - size32_t toread = sizeavail; - if (toread>remaining) - toread = remaining; - if (toread) { - sock->read(activeptr,toread); - remaining -= toread; - sizeavail -= toread; - activeptr += toread; + + if (remaining) + { + sock->readtms(activeptr, 0, remaining, szRead, WAIT_FOREVER); + remaining -= szRead; + activeptr += szRead; } - if (remaining==0) { // we have the packet so process + if (remaining) // only possible if blocked. + return false; // wait for next notification #ifdef _FULLTRACE - LOG(MCdebugInfo, "MP: ReadPacket(timetaken = %d,select iterations=%d)",msTick()-parent->startxfer,parent->numiter); + LOG(MCdebugInfo, "MP: ReadPacket(timetaken = %d,select iterations=%d)",msTick()-parent->startxfer,parent->numiter); #endif - do { - switch (activemsg->getTag()) { + do + { + switch (activemsg->getTag()) + { case TAG_SYS_MULTI: - activemsg = parent->queryServer().multipackethandler->handle(activemsg); // activemsg in/out - break; + activemsg = parent->queryServer().multipackethandler->handle(activemsg); // activemsg in/out + break; case TAG_SYS_PING: - parent->queryServer().pingpackethandler->handle(parent,false); //,activemsg); - delete activemsg; - activemsg = NULL; - break; + parent->queryServer().pingpackethandler->handle(parent,false); //,activemsg); + delete activemsg; + activemsg = NULL; + break; case TAG_SYS_PING_REPLY: - parent->queryServer().pingreplypackethandler->handle(parent); - delete activemsg; - activemsg = NULL; - break; + parent->queryServer().pingreplypackethandler->handle(parent); + delete activemsg; + activemsg = NULL; + break; case TAG_SYS_BCAST: - activemsg = parent->queryServer().broadcastpackethandler->handle(activemsg); - break; + activemsg = parent->queryServer().broadcastpackethandler->handle(activemsg); + break; case TAG_SYS_FORWARD: - activemsg = parent->queryServer().forwardpackethandler->handle(activemsg); - break; + activemsg = parent->queryServer().forwardpackethandler->handle(activemsg); + break; default: - parent->queryServer().userpackethandler->handle(activemsg); // takes ownership - activemsg = NULL; - } - } while (activemsg); + parent->queryServer().userpackethandler->handle(activemsg); // takes ownership + activemsg = NULL; + } } - if (!sizeavail) - sizeavail = sock->avail_read(); - } while (sizeavail); - return false; // ok + while (activemsg); + } } - catch (IException *e) { + catch (IException *e) + { if (e->errorCode()!=JSOCKERR_graceful_close) FLLOG(MCoperatorWarning, e,"MP(Packet Reader)"); e->Release(); + gotPacketHdr = false; } - // error here, so close socket (ignore error as may be closed already) - try { - if(parent) - parent->closeSocket(false, true); + + // here due to error or graceful close, so close socket (ignore error as may be closed already) + try + { + Linked pc; + { + CriticalBlock block(sect); + if (parent) + { + pc.set(parent); // don't want channel to disappear during call + parent = NULL; + } + } + if (pc) + pc->closeSocket(false, true); } - catch (IException *e) { + catch (IException *e) + { e->Release(); } - parent = NULL; return false; } }; @@ -1988,7 +2086,7 @@ bool CMPChannel::attachSocket(ISocket *newsock,const SocketEndpoint &_remoteep,c PROGLOG("MP: attachSocket before select add"); #endif - parent->querySelectHandler().add(channelsock,SELECTMODE_READ,reader); + parent->querySelectHandler().add(channelsock, SELECTMODE_READ, reader); #ifdef _FULLTRACE PROGLOG("MP: attachSocket after select add"); @@ -2128,7 +2226,7 @@ bool CMPChannel::sendPingReply(unsigned timeout,bool identifyself) static constexpr unsigned defaultAcceptThreadPoolSize = 100; // -------------------------------------------------------- CMPConnectThread::CMPConnectThread(CMPServer *_parent, unsigned port, bool _listen) - : Thread("MP Connection Thread"), slowClientProcessor(*this) + : Thread("MP Connection Thread") { parent = _parent; listen = _listen; @@ -2293,7 +2391,7 @@ void CMPConnectThread::startPort(unsigned short port) class CMPConnectionThread : public CInterfaceOf { CMPConnectThread &owner; - Owned sock; + Owned handler; public: CMPConnectionThread(CMPConnectThread &_owner) : owner(_owner) { @@ -2301,21 +2399,11 @@ void CMPConnectThread::startPort(unsigned short port) // IPooledThread virtual void init(void *param) override { - sock.set((ISocket *)param); + handler.setown((CConnectSelectHandler::CSocketHandler *)param); } virtual void threadmain() override { - constexpr unsigned timeoutMs = 5000; - - // detach from this pooled thread, and own locally, so that we ensure that - // the pooled thread object does not retain it until the next init() call. - Owned handledSock = sock.getClear(); - - if (owner.handleAcceptedSocket(handledSock.getLink(), timeoutMs, false)) - { - // handoff to slowClientProcessor, which will retry for standard CONFIRM_TIMEOUT period - owner.slowClientProcessor.add(handledSock.getClear()); - } + owner.handleAcceptedSocket(handler.getClear()); } virtual bool stop() override { @@ -2331,129 +2419,39 @@ void CMPConnectThread::startPort(unsigned short port) }; Owned factory = new CMPConnectThreadFactory(*this); threadPool.setown(createThreadPool("MPConnectPool", factory, false, nullptr, acceptThreadPoolSize, INFINITE)); - slowClientProcessor.start(); } Thread::start(false); } -// only returns true if !failOnTimeout and times out -bool CMPConnectThread::handleAcceptedSocket(ISocket *_sock, unsigned timeoutMs, bool failOnTimeout) +bool CMPConnectThread::handleAcceptedSocket(CConnectSelectHandler::CSocketHandler *_handler) { - SocketEndpoint peerEp; - _sock->getPeerEndpoint(peerEp); - Owned sock = _sock; + Owned handler = _handler; + ISocket *sock = handler->querySocket(); + ConnectHdr &connectHdr = handler->queryHdr(); try { -#if defined(_USE_OPENSSL) - if (parent->useTLS) - { - Owned ssock = secureContextServer->createSecureSocket(sock.getClear()); - int tlsTraceLevel = SSLogMin; - if (parent->mpTraceLevel >= MPVerboseMsgThreshold) - tlsTraceLevel = SSLogMax; - int status = ssock->secure_accept(tlsTraceLevel); - if (status < 0) - { - ssock->close(); - PROGLOG("MP Connect Thread: failed to accept secure connection"); - return false; // did not timeout - } - sock.setown(ssock.getClear()); - } -#endif // OPENSSL - -#ifdef _FULLTRACE - StringBuffer s; - SocketEndpoint ep1; - sock->getPeerEndpoint(ep1); - PROGLOG("MP: Connect Thread: socket accepted from %s",ep1.getEndpointHostText(s).str()); -#endif - - sock->set_keep_alive(true); - - size32_t rd = 0; - SocketEndpoint _remoteep; - SocketEndpoint hostep; - ConnectHdr connectHdr; - bool legacyClient = false; - - // NB: min size is ConnectHdr.id for legacy clients, can thus distinguish old from new - try - { - traceSlowReadTms("MP: initial accept packet from", sock, &connectHdr, sizeof(connectHdr.id), sizeof(connectHdr), rd, timeoutMs, CONFIRM_TIMEOUT_INTERVAL); - } - catch (IJSOCK_Exception *e) - { - if (JSOCKERR_timeout_expired != e->errorCode()) - throw; - if (!failOnTimeout) - return true; // timedout (socket kept open) - } - if (0 == rd) - { - if (parent->mpTraceLevel >= MPVerboseMsgThreshold) - { - // cannot get peer addresss as socket state is now ss_shutdown (unless we want to allow this in getPeerEndpoint()) - PROGLOG("MP Connect Thread: connect with no msg received, assumed port monitor check"); - } - sock->close(); - return false; // did not timeout - } - else - { - if (rd == sizeof(connectHdr.id)) // legacy client - { - legacyClient = true; - connectHdr.hdr.size = sizeof(PacketHeader); - connectHdr.hdr.tag = TAG_SYS_BCAST; - connectHdr.hdr.flags = 0; - connectHdr.hdr.version = MP_PROTOCOL_VERSION; - connectHdr.setRole(0); // unknown - } - else if (rd < sizeof(connectHdr.id) || rd > sizeof(connectHdr)) - { - // not sure how to get here as this is not one of the possible outcomes of above: rd == 0 or rd == sizeof(id) or an exception - StringBuffer errMsg("MP Connect Thread: invalid number of connection bytes serialized from "); - peerEp.getEndpointHostText(errMsg); - FLLOG(MCoperatorWarning, "%s", errMsg.str()); - sock->close(); - return false; // did not timeout - } - } - if (allowListCallback) { - StringBuffer ipStr; - peerEp.getHostText(ipStr); StringBuffer responseText; // filled if denied, NB: if amount sent is > sizeof(ConnectHdr) we can differentiate exception from success - if (!allowListCallback->isAllowListed(ipStr, connectHdr.getRole(), &responseText)) + if (!allowListCallback->isAllowListed(handler->queryPeerHostText(), connectHdr.getRole(), &responseText)) { Owned e = makeStringException(-1, responseText); OWARNLOG(e, nullptr); - if (legacyClient) - { - /* NB: legacy client can't handle exception response - * Acknowledge legacy connection, then close socket - * The effect will be the client sees an MPERR_link_closed - */ - size32_t reply = sizeof(connectHdr.id); - sock->write(&reply, sizeof(reply)); - } - else - { - MemoryBuffer mb; - DelayedSizeMarker marker(mb); - serializeException(e, mb); - marker.write(); - sock->write(mb.toByteArray(), mb.length()); - } + // NB: from 9.6 legacy clients are no longer supported (legacy clients in this context are older than 7.4.2) + MemoryBuffer mb; + DelayedSizeMarker marker(mb); + serializeException(e, mb); + marker.write(); + sock->write(mb.toByteArray(), mb.length()); sock->close(); return false; // did not timeout } } + SocketEndpoint _remoteep; + SocketEndpoint hostep; connectHdr.id[0].get(_remoteep); connectHdr.id[1].get(hostep); @@ -2470,19 +2468,17 @@ bool CMPConnectThread::handleAcceptedSocket(ISocket *_sock, unsigned timeoutMs, if (memcmp(connectHdr.id, zeroTest, sizeof(connectHdr.id))) { // JCSMORE, I think _remoteep really must/should match a IP of this local host - errMsg.append("MP Connect Thread: invalid remote and/or host ep serialized from "); - peerEp.getEndpointHostText(errMsg); + errMsg.appendf("MP Connect Thread: invalid remote and/or host ep serialized from %s", handler->queryPeerEndpointText()); FLLOG(MCoperatorWarning, "%s", errMsg.str()); } else if (parent->mpTraceLevel >= MPVerboseMsgThreshold) { // all zeros msg received - errMsg.append("MP Connect Thread: connect with empty msg received, assumed port monitor check from "); - peerEp.getEndpointHostText(errMsg); + errMsg.appendf("MP Connect Thread: connect with empty msg received, assumed port monitor check from %s", handler->queryPeerEndpointText()); PROGLOG("%s", errMsg.str()); } sock->close(); - return false; // did not timeout + return false; } #ifdef _FULLTRACE StringBuffer tmp1; @@ -2493,7 +2489,8 @@ bool CMPConnectThread::handleAcceptedSocket(ISocket *_sock, unsigned timeoutMs, #endif checkSelfDestruct(&connectHdr.id[0],sizeof(connectHdr.id)); Owned channel = parent->lookup(_remoteep); - if (!channel->attachSocket(sock.getClear(),_remoteep,hostep,false,&rd,addrval)) + size32_t rd = sizeof(connectHdr); + if (!channel->attachSocket(LINK(sock),_remoteep,hostep,false,&rd,addrval)) { #ifdef _FULLTRACE PROGLOG("MP Connect Thread: lookup failed"); @@ -2517,7 +2514,7 @@ bool CMPConnectThread::handleAcceptedSocket(ISocket *_sock, unsigned timeoutMs, sock->close(); e->Release(); } - return false; // did not timeout + return false; } int CMPConnectThread::run() @@ -2526,13 +2523,15 @@ int CMPConnectThread::run() LOG(MCdebugInfo, "MP: Connect Thread Starting - accept loop"); #endif Owned exception; + + CConnectSelectHandler connectSelectHandler(*this); while (running) { Owned sock; - SocketEndpoint peerEp; + SocketEndpoint peerEP; try { - sock.setown(listensock->accept(true, &peerEp)); + sock.setown(listensock->accept(true, &peerEP)); } catch (IException *e) { @@ -2540,14 +2539,39 @@ int CMPConnectThread::run() } if (sock) { - if (threadPool) +#if defined(_USE_OPENSSL) + if (parent->useTLS) { - // if enabled, handle initial connection protocol on a separate thread - // if any block for more than a short period (5 seconds), then they are handed off to slowClientProcessor - threadPool->start(sock); + Owned ssock = secureContextServer->createSecureSocket(sock.getClear()); + int tlsTraceLevel = SSLogMin; + if (parent->mpTraceLevel >= MPVerboseMsgThreshold) + tlsTraceLevel = SSLogMax; + int status = ssock->secure_accept(tlsTraceLevel); + if (status < 0) + { + ssock->close(); + PROGLOG("MP Connect Thread: failed to accept secure connection"); + continue; + } + sock.setown(ssock.getClear()); } - else - handleAcceptedSocket(sock.getClear(), CONFIRM_TIMEOUT, true); +#endif // OPENSSL + +#ifdef _FULLTRACE + StringBuffer s; + SocketEndpoint ep1; + sock->getPeerEndpoint(ep1); + PROGLOG("MP: Connect Thread: socket accepted from %s",ep1.getEndpointHostText(s).str()); +#endif + sock->set_keep_alive(true); + sock->set_nonblock(true); + + // NB: creates a CSocketHandler that is added to the select handler. + // it will manage the handling of the incoming ConnectHdr header only. + // After that, the socket will be removed from the connectSelectHamndler, + // a CMPChannel will be estalbished, and the socket will be added to the MP CMPPacketReader select handler. + // See handleAcceptedSocket. + connectSelectHandler.add(sock, peerEP); } else { diff --git a/system/security/securesocket/securesocket.cpp b/system/security/securesocket/securesocket.cpp index 4c6e8b10a35..eb8cd960c9e 100644 --- a/system/security/securesocket/securesocket.cpp +++ b/system/security/securesocket/securesocket.cpp @@ -137,15 +137,8 @@ class CStringSet : public CInterface class CSecureSocket : implements ISecureSocket, public CInterface { private: - struct ScopedNonBlockingMode - { - CSecureSocket *socket = nullptr; - bool prevMode = false; - void init(CSecureSocket *_socket) { socket = _socket; prevMode = socket->set_nonblock(true); } - ~ScopedNonBlockingMode() { if (socket) socket->set_nonblock(prevMode); } - }; - SSL* m_ssl; + StringBuffer epStr; Linked contextCallback; Owned m_socket; bool nonBlocking; @@ -166,6 +159,7 @@ class CSecureSocket : implements ISecureSocket, public CInterface private: StringBuffer& get_cn(X509* cert, StringBuffer& cn); bool verify_cert(X509* cert); + void handleError(int ssl_err, bool writing, bool wait, unsigned timeoutMs, const char *opStr); public: IMPLEMENT_IINTERFACE; @@ -178,8 +172,8 @@ class CSecureSocket : implements ISecureSocket, public CInterface virtual int logPollError(unsigned revents, const char *rwstr); virtual int wait_read(unsigned timeoutms); - virtual void read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,unsigned timeoutsecs); - virtual void readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeoutms); + virtual void read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,unsigned timeoutsecs, bool suppresGCIfMinSize=true); + virtual void readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeoutms, bool suppresGCIfMinSize=true); virtual size32_t write(void const* buf, size32_t size); virtual size32_t writetms(void const* buf, size32_t minSize, size32_t size, unsigned timeoutms=WAIT_FOREVER); @@ -503,7 +497,12 @@ CSecureSocket::CSecureSocket(ISocket* sock, int sockfd, ISecureSocketContextCall : contextCallback(callback) { if (sock) + { sockfd = sock->OShandle(); + SocketEndpoint ep; + sock->getPeerEndpoint(ep); + ep.getEndpointHostText(epStr); + } m_socket.setown(sock); contextVersion = callback->getVersion(); m_ssl = callback->createActiveSSL(); @@ -760,6 +759,70 @@ int CSecureSocket::secure_accept(int logLevel) return 0; } +void CSecureSocket::handleError(int ssl_err, bool writing, bool wait, unsigned timeoutMs, const char *opStr) +{ + // if !wait, then we only perform ssl_err checking, we do not wait_read/wait_write or timeout + int rc = 0; + switch (ssl_err) + { + case SSL_ERROR_ZERO_RETURN: + { + THROWJSOCKEXCEPTION(JSOCKERR_graceful_close); + } + case SSL_ERROR_WANT_READ: // NB: SSL_write can provoke SSL_ERROR_WANT_READ + { + if (wait) + rc = wait_read(timeoutMs); + break; + } + case SSL_ERROR_WANT_WRITE: // NB: SSL_read can provoke SSL_ERROR_WANT_WRITE + { + if (wait) + rc = wait_write(timeoutMs); + break; + } + case SSL_ERROR_SYSCALL: + { + int sockErr = SOCKETERRNO(); + if (sockErr == EAGAIN || sockErr == EWOULDBLOCK) + { + if (wait) + { + if (writing) + rc = wait_write(timeoutMs); + else + rc = wait_read(timeoutMs); + } + break; + } + // fall through to default error handling below + } + default: + { + char errbuf[512]; + ERR_error_string_n(ssl_err, errbuf, 512); + ERR_clear_error(); + VStringBuffer errmsg("%s error %d - %s", opStr, ssl_err, errbuf); + // if (m_loglevel >= SSLogMax) + DBGLOG("Warning: %s", errmsg.str()); + THROWJSOCKEXCEPTION_MSG(ssl_err, errmsg); + } + } + if (wait && rc <= 0) + { + int code = SOCKETERRNO(); + VStringBuffer errorMsg("%s: %s ", opStr, (SSL_ERROR_WANT_WRITE == ssl_err) ? "wait_write" : "wait_read"); + if (rc == 0) + { + code = JSOCKERR_timeout_expired; + errorMsg.append("timeout expired"); + } + else + errorMsg.append("error"); + THROWJSOCKEXCEPTION_MSG(code, errorMsg.str()); + } +} + int CSecureSocket::secure_connect(int logLevel) { if (m_fqdn.length() > 0) @@ -768,14 +831,16 @@ int CSecureSocket::secure_connect(int logLevel) SSL_set_tlsext_host_name(m_ssl, m_fqdn.str()); } - int err = SSL_connect (m_ssl); - if(err <= 0) + unsigned timeoutMs = 60*1000; // more than enough, used to be infinite + CCycleTimer timer; + while (true) { - int ret = SSL_get_error(m_ssl, err); - char errbuf[512]; - ERR_error_string_n(ERR_get_error(), errbuf, 512); - DBGLOG("SSL_connect error - %s, SSL_get_error=%d, error - %d", errbuf,ret, err); - throw MakeStringException(-1, "SSL_connect failed: %s", errbuf); + int rc = SSL_connect(m_ssl); + if (rc > 0) + break; + int ssl_err = SSL_get_error(m_ssl, rc); + unsigned remainingMs = timer.remainingMs(timeoutMs); + handleError(ssl_err, true, true, remainingMs, "SSL_connect"); } if (logLevel > SSLogNormal) @@ -829,81 +894,54 @@ int CSecureSocket::wait_read(unsigned timeoutms) return m_socket->wait_read(timeoutms); } -void CSecureSocket::readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &sizeRead, unsigned timeoutMs) +void CSecureSocket::readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &sizeRead, unsigned timeoutMs, bool suppresGCIfMinSize) { + // Adheres to same semantics as CSocket::readtms + // NB: when handling poll notifications, we must read until SSL_read indicates would block + // because we may not get another poll notification because SSL internally has read everything. sizeRead = 0; CCycleTimer timer; - - // for semantics to work with a timeout, have to be non-blocking when reading SSL - // because wait_read can't guarantee that there are bytes ready to read, only that - // there are bytes pending on the underlying socket. - // We put in non-blocking mode, so that if after wait_read says there's something, - // SSL_read won't block and will respond with a SSL_ERROR_WANT_READ/SSL_ERROR_WANT_WRITE - // if not ready. - ScopedNonBlockingMode scopedNonBlockingMode; - if (WAIT_FOREVER != timeoutMs) - scopedNonBlockingMode.init(this); - - int ssl_err = SSL_ERROR_WANT_READ; // initially call will be wait_read + ScopedNonBlockingMode scopedNonBlockingMode(this); while (true) { - int rc; - unsigned remainingMs = timer.remainingMs(timeoutMs); - if (ssl_err == SSL_ERROR_WANT_READ) - rc = wait_read(remainingMs); - else // SSL_ERROR_WANT_WRITE - rc = wait_write(remainingMs); - if (rc < 0) - THROWJSOCKEXCEPTION_MSG(SOCKETERRNO(), "wait_read error"); - if (rc == 0) - THROWJSOCKEXCEPTION_MSG(JSOCKERR_timeout_expired, "timeout expired"); - ERR_clear_error(); - rc = SSL_read(m_ssl, (char*)buf + sizeRead, max_size - sizeRead); - + int rc = SSL_read(m_ssl, (char*)buf + sizeRead, max_size - sizeRead); + unsigned remainingMs = timer.remainingMs(timeoutMs); if (rc > 0) { sizeRead += rc; - if (sizeRead >= min_size) + if (sizeRead == max_size) break; + if (0 == remainingMs) + { + if (sizeRead >= min_size) + break; + THROWJSOCKEXCEPTION_MSG(JSOCKERR_timeout_expired, "timeout expired"); + } + // loop around to read more, or detect blocked (and exit if sizeRead >= min_size) } else if (0 == rc) { - if (sizeRead >= min_size) - break; // suppress graceful close exception if have already read minimum + if (suppresGCIfMinSize && (sizeRead >= min_size)) + break; THROWJSOCKEXCEPTION(JSOCKERR_graceful_close); } else { - ssl_err = SSL_get_error(m_ssl, rc); - // NB: if timeout != WAIT_FOREVER, nonBlocking should always be true here - if (nonBlocking && (ssl_err == SSL_ERROR_WANT_READ || ssl_err == SSL_ERROR_WANT_WRITE)) // NB: SSL_read can cause SSL_ERROR_WANT_WRITE - { - // NB: we must be below min_size if here (otherwise would have exited in (rc > 0) block above) - - // To maintain consistent semantics with jsocket, we continue waiting even in the min_size = 0 case. - // NB: jsocket::readtms always blocks (wait_read) initially, meaning in effect min_size is always treated as >0 - } - else - { - char errbuf[512]; - ERR_error_string_n(ssl_err, errbuf, 512); - ERR_clear_error(); - VStringBuffer errmsg("SSL_read error %d - %s", ssl_err, errbuf); - if (m_loglevel >= SSLogMax) - DBGLOG("Warning: %s", errmsg.str()); - THROWJSOCKEXCEPTION_MSG(ssl_err, errmsg); - } - // here only if nonBlocking && WANT_READ or WANT_WRITE - // since we do not have size_min yet, loop around and wait for more. + // NB: if blocked, return if sizeRead >= min_size + int ssl_err = SSL_get_error(m_ssl, rc); + bool wait = sizeRead < min_size; // if >= min_size, then handleError will validate errors only + handleError(ssl_err, false, wait, remainingMs, "SSL_read"); + if (sizeRead >= min_size) + break; } } } -void CSecureSocket::read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,unsigned timeoutsecs) +void CSecureSocket::read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timeoutsecs, bool suppresGCIfMinSize) { unsigned timeoutMs = (timeoutsecs==WAIT_FOREVER) ? WAIT_FOREVER : (timeoutsecs * 1000); - readtms(buf, min_size, max_size, size_read, timeoutMs); + readtms(buf, min_size, max_size, size_read, timeoutMs, suppresGCIfMinSize); } size32_t CSecureSocket::writetms(void const* buf, size32_t minSize, size32_t size, unsigned timeoutMs) @@ -912,11 +950,7 @@ size32_t CSecureSocket::writetms(void const* buf, size32_t minSize, size32_t siz return 0; CCycleTimer timer; - - ScopedNonBlockingMode scopedNonBlockingMode; - if (WAIT_FOREVER != timeoutMs) - scopedNonBlockingMode.init(this); - + ScopedNonBlockingMode scopedNonBlockingMode(this); while (true) { int rc = SSL_write(m_ssl, buf, size); @@ -927,32 +961,8 @@ size32_t CSecureSocket::writetms(void const* buf, size32_t minSize, size32_t siz return rc; } int ssl_err = SSL_get_error(m_ssl, rc); - if (nonBlocking && (ssl_err == SSL_ERROR_WANT_READ || ssl_err == SSL_ERROR_WANT_WRITE)) // NB: SSL_write can cause SSL_ERROR_WANT_READ - { - unsigned remainingMs = timer.remainingMs(timeoutMs); - if (ssl_err == SSL_ERROR_WANT_READ) - rc = wait_read(remainingMs); - else // SSL_ERROR_WANT_WRITE - rc = wait_write(remainingMs); - if (rc < 0) - { - const char *msg = (ssl_err == SSL_ERROR_WANT_READ) ? "wait_read error" : "wait_write error"; - THROWJSOCKEXCEPTION_MSG(SOCKETERRNO(), msg); - } - if (rc == 0) - THROWJSOCKEXCEPTION_MSG(JSOCKERR_timeout_expired, "timeout expired"); - } - else - { - char errbuf[512]; - ERR_error_string_n(ssl_err, errbuf, 512); - ERR_clear_error(); - VStringBuffer errmsg("SSL_write error %d - %s", ssl_err, errbuf); - if (ssl_err == SSL_ERROR_ZERO_RETURN) - THROWJSOCKEXCEPTION(JSOCKERR_graceful_close); - else - THROWJSOCKEXCEPTION_MSG(JSOCKERR_broken_pipe, errmsg); - } + unsigned remainingMs = timer.remainingMs(timeoutMs); + handleError(ssl_err, true, true, remainingMs, "SSL_write"); } throwUnexpected(); // should never get here } diff --git a/tools/testsocket/testsocket.cpp b/tools/testsocket/testsocket.cpp index 221e4361a72..ebbaff42aa4 100644 --- a/tools/testsocket/testsocket.cpp +++ b/tools/testsocket/testsocket.cpp @@ -301,7 +301,7 @@ int readResults(ISocket * socket, bool readBlocked, bool useHTTP, StringBuffer & { try { - socket->read(t, 0, len, sendlen); + socket->read(t, 1, len, sendlen); } catch (IException *E) {