diff --git a/system/mp/mpcomm.cpp b/system/mp/mpcomm.cpp index 299f674e1fb..06b0155e616 100644 --- a/system/mp/mpcomm.cpp +++ b/system/mp/mpcomm.cpp @@ -448,11 +448,90 @@ class CMPConnectThread: public Thread ISocket *listensock; CMPServer *parent; int mpSoMaxConn; + unsigned acceptThreadPoolSize = 0; + Owned threadPool; + Owned allowListCallback; void checkSelfDestruct(void *p,size32_t sz); Owned secureContextServer; + class CSlowClientProcessor : implements IThreaded + { + CMPConnectThread &owner; + CThreaded threaded; + std::vector> slowClientsSocks; + CriticalSection crit; + Semaphore sem; + std::atomic stopped = true; + + public: + CSlowClientProcessor(CMPConnectThread &_owner) : threaded("CSlowClientProcessor"), owner(_owner) + { + } + void start() + { + stopped = false; + threaded.init(this); + } + void stop() + { + if (stopped) + return; + + { + CriticalBlock b(crit); + stopped = true; + for (auto &sock : slowClientsSocks) + sock->close(); + slowClientsSocks.clear(); + } + + sem.signal(); + if (!threaded.join(1000*60*5)) + printf("CSlowClientProcessor::stop timed out\n"); + } + void add(ISocket *sock) // NB: takes ownership + { + { + CriticalBlock b(crit); + if (stopped) + { + sock->Release(); + return; + } + slowClientsSocks.emplace_back(sock); + } + sem.signal(); + } + // IThreaded + virtual void threadmain() override + { + // The slow client processor deals with each queued slow client socket in turn, waiting the standard CONFIRM_TIMEOUT for each. + // An alternative would be to try each for shorter periods, shuffling them to the end if they still haven't been handled. + // But this is all probably OTT. The main thing is to avoid a slow client blocking the accept loop. + while (true) + { + sem.wait(); + + Owned sock; + + { + CriticalBlock b(crit); + if (stopped) + break; + if (slowClientsSocks.empty()) // guard, but should never happen + { + WARNLOG("slowClientsSocks list empty"); + continue; + } + sock.set(slowClientsSocks.back()); + slowClientsSocks.pop_back(); + } + owner.handleAcceptedSocket(sock.getClear(), CONFIRM_TIMEOUT, true); + } + } + } slowClientProcessor; public: CMPConnectThread(CMPServer *_parent, unsigned port, bool _listen); ~CMPConnectThread() @@ -463,11 +542,21 @@ class CMPConnectThread: public Thread void startPort(unsigned short port); void stop() { - if (running) { + if (running) + { running = false; listensock->cancel_accept(); + + // ensure CMPConnectThread::run() has exited, and is not accepting more sockets if (!join(1000*60*5)) // should be pretty instant printf("CMPConnectThread::stop timed out\n"); + + if (listen && acceptThreadPoolSize) + { + if (!threadPool->joinAll(true, 1000*60*5)) + printf("CMPConnectThread::stop threadPool->joinAll timed out\n"); + slowClientProcessor.stop(); + } } } void installAllowListCallback(IAllowListHandler *_allowListCallback) @@ -478,6 +567,7 @@ class CMPConnectThread: public Thread { return allowListCallback; } + bool handleAcceptedSocket(ISocket *sock, unsigned timeoutMs, bool failOnTimeout); }; class PingPacketHandler; @@ -698,7 +788,7 @@ class CMPNotifyClosedThread: public Thread void traceSlowReadTms(const char *msg, ISocket *sock, void *dst, size32_t minSize, size32_t maxSize, size32_t &sizeRead, unsigned timeoutMs, unsigned timeoutChkIntervalMs) { - dbgassertex(timeoutChkIntervalMs < timeoutMs); + dbgassertex(timeoutChkIntervalMs <= timeoutMs); StringBuffer epStr; CCycleTimer readTmsTimer; unsigned intervalTimeoutMs = 500; @@ -2034,10 +2124,11 @@ bool CMPChannel::sendPingReply(unsigned timeout,bool identifyself) sendmutex.unlock(); return ret; } - + +static constexpr unsigned defaultAcceptThreadPoolSize = 100; // -------------------------------------------------------- CMPConnectThread::CMPConnectThread(CMPServer *_parent, unsigned port, bool _listen) - : Thread("MP Connection Thread") + : Thread("MP Connection Thread"), slowClientProcessor(*this) { parent = _parent; listen = _listen; @@ -2051,6 +2142,7 @@ CMPConnectThread::CMPConnectThread(CMPServer *_parent, unsigned port, bool _list mpSoMaxConn = env->getPropInt("EnvSettings/mpSoMaxConn", 0); if (!mpSoMaxConn) mpSoMaxConn = env->getPropInt("EnvSettings/ports/mpSoMaxConn", 0); + acceptThreadPoolSize = env->getPropInt("EnvSettings/acceptThreadPoolSize", defaultAcceptThreadPoolSize); } unsigned mpTraceLevel = env->getPropInt("EnvSettings/mpTraceLevel", 0); switch (mpTraceLevel) @@ -2071,6 +2163,13 @@ CMPConnectThread::CMPConnectThread(CMPServer *_parent, unsigned port, bool _list } #else parent->mpTraceLevel = getComponentConfigSP()->getPropInt("logging/@detail", InfoMsgThreshold); + if (listen) + { + if (getComponentConfigSP()->hasProp("expert/@acceptThreadPoolSize")) + acceptThreadPoolSize = getComponentConfigSP()->getPropInt("expert/@acceptThreadPoolSize"); + else + acceptThreadPoolSize = getGlobalConfigSP()->getPropInt("expert/@acceptThreadPoolSize", defaultAcceptThreadPoolSize); + } #endif if (mpSoMaxConn) @@ -2134,6 +2233,7 @@ CMPConnectThread::CMPConnectThread(CMPServer *_parent, unsigned port, bool _list if (parent->useTLS) secureContextServer.setown(createSecureSocketContextSecretSrv("local", nullptr, true)); #endif + PROGLOG("MP TLS: %s acceptThreadPoolSize: %u", parent->useTLS ? "on" : "off", acceptThreadPoolSize); } void CMPConnectThread::checkSelfDestruct(void *p,size32_t sz) @@ -2178,194 +2278,276 @@ void CMPConnectThread::startPort(unsigned short port) if (!listen) return; running = true; + if (acceptThreadPoolSize) + { + class CMPConnectThreadFactory : public CInterfaceOf + { + CMPConnectThread &owner; + public: + CMPConnectThreadFactory(CMPConnectThread &_owner) : owner(_owner) + { + } + // IThreadFactory + IPooledThread *createNew() override + { + class CMPConnectionThread : public CInterfaceOf + { + CMPConnectThread &owner; + Owned sock; + public: + CMPConnectionThread(CMPConnectThread &_owner) : owner(_owner) + { + } + // IPooledThread + virtual void init(void *param) override + { + sock.set((ISocket *)param); + } + virtual void threadmain() override + { + constexpr unsigned timeoutMs = 5000; + + // detach from this pooled thread, and own locally, so that we ensure that + // the pooled thread object does not retain it until the next init() call. + Owned handledSock = sock.getClear(); + + if (owner.handleAcceptedSocket(handledSock.getLink(), timeoutMs, false)) + { + // handoff to slowClientProcessor, which will retry for standard CONFIRM_TIMEOUT period + owner.slowClientProcessor.add(handledSock.getClear()); + } + } + virtual bool stop() override + { + return true; + } + virtual bool canReuse() const override + { + return true; + } + }; + return new CMPConnectionThread(owner); + } + }; + Owned factory = new CMPConnectThreadFactory(*this); + threadPool.setown(createThreadPool("MPConnectPool", factory, nullptr, acceptThreadPoolSize, INFINITE)); + slowClientProcessor.start(); + } Thread::start(); } -int CMPConnectThread::run() +// only returns true if !failOnTimeout and times out +bool CMPConnectThread::handleAcceptedSocket(ISocket *_sock, unsigned timeoutMs, bool failOnTimeout) { -#ifdef _TRACE - LOG(MCdebugInfo, unknownJob, "MP: Connect Thread Starting - accept loop"); -#endif - while (running) + SocketEndpoint peerEp; + _sock->getPeerEndpoint(peerEp); + Owned sock = _sock; + try { - Owned sock; - SocketEndpoint peerEp; - try - { - sock.setown(listensock->accept(true, &peerEp)); - } - catch (IException *e) - { - LOG(MCdebugInfo, unknownJob, e,"MP accept failed"); - throw; // error handling TBD - } - if (sock) +#if defined(_USE_OPENSSL) + if (parent->useTLS) { - try + Owned ssock = secureContextServer->createSecureSocket(sock.getClear()); + int tlsTraceLevel = SSLogMin; + if (parent->mpTraceLevel >= MPVerboseMsgThreshold) + tlsTraceLevel = SSLogMax; + int status = ssock->secure_accept(tlsTraceLevel); + if (status < 0) { -#if defined(_USE_OPENSSL) - if (parent->useTLS) - { - Owned ssock = secureContextServer->createSecureSocket(sock.getClear()); - int tlsTraceLevel = SSLogMin; - if (parent->mpTraceLevel >= MPVerboseMsgThreshold) - tlsTraceLevel = SSLogMax; - int status = ssock->secure_accept(tlsTraceLevel); - if (status < 0) - { - ssock->close(); - PROGLOG("MP Connect Thread: failed to accept secure connection"); - continue; - } - sock.setown(ssock.getClear()); - } + ssock->close(); + PROGLOG("MP Connect Thread: failed to accept secure connection"); + return false; // did not timeout + } + sock.setown(ssock.getClear()); + } #endif // OPENSSL #ifdef _FULLTRACE - StringBuffer s; - SocketEndpoint ep1; - sock->getPeerEndpoint(ep1); - PROGLOG("MP: Connect Thread: socket accepted from %s",ep1.getEndpointHostText(s).str()); + StringBuffer s; + SocketEndpoint ep1; + sock->getPeerEndpoint(ep1); + PROGLOG("MP: Connect Thread: socket accepted from %s",ep1.getEndpointHostText(s).str()); #endif - sock->set_keep_alive(true); + sock->set_keep_alive(true); - size32_t rd = 0; - SocketEndpoint _remoteep; - SocketEndpoint hostep; - ConnectHdr connectHdr; - bool legacyClient = false; + size32_t rd = 0; + SocketEndpoint _remoteep; + SocketEndpoint hostep; + ConnectHdr connectHdr; + bool legacyClient = false; - // NB: min size is ConnectHdr.id for legacy clients, can thus distinguish old from new - traceSlowReadTms("MP: initial accept packet from", sock, &connectHdr, sizeof(connectHdr.id), sizeof(connectHdr), rd, CONFIRM_TIMEOUT, CONFIRM_TIMEOUT_INTERVAL); - if (0 == rd) + // NB: min size is ConnectHdr.id for legacy clients, can thus distinguish old from new + try + { + traceSlowReadTms("MP: initial accept packet from", _sock, &connectHdr, sizeof(connectHdr.id), sizeof(connectHdr), rd, timeoutMs, CONFIRM_TIMEOUT_INTERVAL); + } + catch (IJSOCK_Exception *e) + { + if (JSOCKERR_timeout_expired != e->errorCode()) + throw; + if (!failOnTimeout) + return true; // timedout (socket kept open) + } + if (0 == rd) + { + if (parent->mpTraceLevel >= MPVerboseMsgThreshold) + { + // cannot get peer addresss as socket state is now ss_shutdown (unless we want to allow this in getPeerEndpoint()) + PROGLOG("MP Connect Thread: connect with no msg received, assumed port monitor check"); + } + sock->close(); + return false; // did not timeout + } + else + { + if (rd == sizeof(connectHdr.id)) // legacy client + { + legacyClient = true; + connectHdr.hdr.size = sizeof(PacketHeader); + connectHdr.hdr.tag = TAG_SYS_BCAST; + connectHdr.hdr.flags = 0; + connectHdr.hdr.version = MP_PROTOCOL_VERSION; + connectHdr.setRole(0); // unknown + } + else if (rd < sizeof(connectHdr.id) || rd > sizeof(connectHdr)) + { + // not sure how to get here as this is not one of the possible outcomes of above: rd == 0 or rd == sizeof(id) or an exception + StringBuffer errMsg("MP Connect Thread: invalid number of connection bytes serialized from "); + peerEp.getEndpointHostText(errMsg); + FLLOG(MCoperatorWarning, unknownJob, "%s", errMsg.str()); + sock->close(); + return false; // did not timeout + } + } + + if (allowListCallback) + { + StringBuffer ipStr; + peerEp.getHostText(ipStr); + StringBuffer responseText; // filled if denied, NB: if amount sent is > sizeof(ConnectHdr) we can differentiate exception from success + if (!allowListCallback->isAllowListed(ipStr, connectHdr.getRole(), &responseText)) + { + Owned e = makeStringException(-1, responseText); + OWARNLOG(e, nullptr); + + if (legacyClient) { - if (parent->mpTraceLevel >= MPVerboseMsgThreshold) - { - // cannot get peer addresss as socket state is now ss_shutdown (unless we want to allow this in getPeerEndpoint()) - PROGLOG("MP Connect Thread: connect with no msg received, assumed port monitor check"); - } - sock->close(); - continue; + /* NB: legacy client can't handle exception response + * Acknowledge legacy connection, then close socket + * The effect will be the client sees an MPERR_link_closed + */ + size32_t reply = sizeof(connectHdr.id); + sock->write(&reply, sizeof(reply)); } else { - if (rd == sizeof(connectHdr.id)) // legacy client - { - legacyClient = true; - connectHdr.hdr.size = sizeof(PacketHeader); - connectHdr.hdr.tag = TAG_SYS_BCAST; - connectHdr.hdr.flags = 0; - connectHdr.hdr.version = MP_PROTOCOL_VERSION; - connectHdr.setRole(0); // unknown - } - else if (rd < sizeof(connectHdr.id) || rd > sizeof(connectHdr)) - { - // not sure how to get here as this is not one of the possible outcomes of above: rd == 0 or rd == sizeof(id) or an exception - StringBuffer errMsg("MP Connect Thread: invalid number of connection bytes serialized from "); - peerEp.getEndpointHostText(errMsg); - FLLOG(MCoperatorWarning, unknownJob, "%s", errMsg.str()); - sock->close(); - continue; - } + MemoryBuffer mb; + DelayedSizeMarker marker(mb); + serializeException(e, mb); + marker.write(); + sock->write(mb.toByteArray(), mb.length()); } - if (allowListCallback) - { - StringBuffer ipStr; - peerEp.getHostText(ipStr); - StringBuffer responseText; // filled if denied, NB: if amount sent is > sizeof(ConnectHdr) we can differentiate exception from success - if (!allowListCallback->isAllowListed(ipStr, connectHdr.getRole(), &responseText)) - { - Owned e = makeStringException(-1, responseText); - OWARNLOG(e, nullptr); - - if (legacyClient) - { - /* NB: legacy client can't handle exception response - * Acknowledge legacy connection, then close socket - * The effect will be the client sees an MPERR_link_closed - */ - size32_t reply = sizeof(connectHdr.id); - sock->write(&reply, sizeof(reply)); - } - else - { - MemoryBuffer mb; - DelayedSizeMarker marker(mb); - serializeException(e, mb); - marker.write(); - sock->write(mb.toByteArray(), mb.length()); - } - - sock->close(); - continue; - } - } + sock->close(); + return false; // did not timeout + } + } - connectHdr.id[0].get(_remoteep); - connectHdr.id[1].get(hostep); + connectHdr.id[0].get(_remoteep); + connectHdr.id[1].get(hostep); - unsigned __int64 addrval = DIGIT1*connectHdr.id[0].ip[0] + DIGIT2*connectHdr.id[0].ip[1] + DIGIT3*connectHdr.id[0].ip[2] + DIGIT4*connectHdr.id[0].ip[3] + connectHdr.id[0].port; + unsigned __int64 addrval = DIGIT1*connectHdr.id[0].ip[0] + DIGIT2*connectHdr.id[0].ip[1] + DIGIT3*connectHdr.id[0].ip[2] + DIGIT4*connectHdr.id[0].ip[3] + connectHdr.id[0].port; #ifdef _TRACE - PROGLOG("MP: Connect Thread: addrval = %" I64F "u", addrval); + PROGLOG("MP: Connect Thread: addrval = %" I64F "u", addrval); #endif - if (_remoteep.isNull() || hostep.isNull()) - { - StringBuffer errMsg; - SocketEndpointV4 zeroTest[2]; - memset(zeroTest, 0x0, sizeof(zeroTest)); - if (memcmp(connectHdr.id, zeroTest, sizeof(connectHdr.id))) - { - // JCSMORE, I think _remoteep really must/should match a IP of this local host - errMsg.append("MP Connect Thread: invalid remote and/or host ep serialized from "); - peerEp.getEndpointHostText(errMsg); - FLLOG(MCoperatorWarning, unknownJob, "%s", errMsg.str()); - } - else if (parent->mpTraceLevel >= MPVerboseMsgThreshold) - { - // all zeros msg received - errMsg.append("MP Connect Thread: connect with empty msg received, assumed port monitor check from "); - peerEp.getEndpointHostText(errMsg); - PROGLOG("%s", errMsg.str()); - } - sock->close(); - continue; - } + if (_remoteep.isNull() || hostep.isNull()) + { + StringBuffer errMsg; + SocketEndpointV4 zeroTest[2]; + memset(zeroTest, 0x0, sizeof(zeroTest)); + if (memcmp(connectHdr.id, zeroTest, sizeof(connectHdr.id))) + { + // JCSMORE, I think _remoteep really must/should match a IP of this local host + errMsg.append("MP Connect Thread: invalid remote and/or host ep serialized from "); + peerEp.getEndpointHostText(errMsg); + FLLOG(MCoperatorWarning, unknownJob, "%s", errMsg.str()); + } + else if (parent->mpTraceLevel >= MPVerboseMsgThreshold) + { + // all zeros msg received + errMsg.append("MP Connect Thread: connect with empty msg received, assumed port monitor check from "); + peerEp.getEndpointHostText(errMsg); + PROGLOG("%s", errMsg.str()); + } + sock->close(); + return false; // did not timeout + } #ifdef _FULLTRACE - StringBuffer tmp1; - _remoteep.getEndpointHostText(tmp1); - tmp1.append(' '); - hostep.getEndpointHostText(tmp1); - PROGLOG("MP: Connect Thread: after read %s",tmp1.str()); -#endif - checkSelfDestruct(&connectHdr.id[0],sizeof(connectHdr.id)); - Owned channel = parent->lookup(_remoteep); - if (!channel->attachSocket(sock.getClear(),_remoteep,hostep,false,&rd,addrval)) - { + StringBuffer tmp1; + _remoteep.getEndpointHostText(tmp1); + tmp1.append(' '); + hostep.getEndpointHostText(tmp1); + PROGLOG("MP: Connect Thread: after read %s",tmp1.str()); +#endif + checkSelfDestruct(&connectHdr.id[0],sizeof(connectHdr.id)); + Owned channel = parent->lookup(_remoteep); + if (!channel->attachSocket(sock.getClear(),_remoteep,hostep,false,&rd,addrval)) + { #ifdef _FULLTRACE - PROGLOG("MP Connect Thread: lookup failed"); + PROGLOG("MP Connect Thread: lookup failed"); #endif - } - else - { + } + else + { #ifdef _TRACE - StringBuffer str1; - StringBuffer str2; - LOG(MCdebugInfo, unknownJob, "MP Connect Thread: connected to %s",_remoteep.getEndpointHostText(str1).str()); + StringBuffer str1; + StringBuffer str2; + LOG(MCdebugInfo, unknownJob, "MP Connect Thread: connected to %s",_remoteep.getEndpointHostText(str1).str()); #endif - } + } #ifdef _FULLTRACE - PROGLOG("MP: Connect Thread: after write"); + PROGLOG("MP: Connect Thread: after write"); #endif - } - catch (IException *e) + } + catch (IException *e) + { + FLLOG(MCoperatorWarning, unknownJob, e,"MP Connect Thread: Failed to make connection(1)"); + sock->close(); + e->Release(); + } + return false; // did not timeout +} + +int CMPConnectThread::run() +{ +#ifdef _TRACE + LOG(MCdebugInfo, unknownJob, "MP: Connect Thread Starting - accept loop"); +#endif + while (running) + { + Owned sock; + SocketEndpoint peerEp; + try + { + sock.setown(listensock->accept(true, &peerEp)); + } + catch (IException *e) + { + LOG(MCdebugInfo, unknownJob, e,"MP accept failed"); + throw; // error handling TBD + } + if (sock) + { + if (threadPool) { - FLLOG(MCoperatorWarning, unknownJob, e,"MP Connect Thread: Failed to make connection(1)"); - sock->close(); - e->Release(); + // if enabled, handle initial connection protocol on a separate thread + // if any block for more than a short period (5 seconds), then they are handed off to slowClientProcessor + threadPool->start(sock); } + else + handleAcceptedSocket(sock.getClear(), CONFIRM_TIMEOUT, true); } else {