-
Notifications
You must be signed in to change notification settings - Fork 304
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HPCC-31130 Handle MP accept connection negotiations asynchronously #18273
HPCC-31130 Handle MP accept connection negotiations asynchronously #18273
Conversation
@mckellyln - please review. |
19304a9
to
06dfbfb
Compare
https://track.hpccsystems.com/browse/HPCC-31130 |
system/mp/mpcomm.cpp
Outdated
std::vector<Owned<ISocket>> slowClientsSocks; | ||
CriticalSection crit; | ||
Semaphore sem; | ||
bool stopped = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should stopped be std::atomic ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably, I'll change it.
{ | ||
constexpr unsigned timeoutMs = 5000; | ||
Owned<ISocket> handledSock = sock.getClear(); | ||
if (owner.handleAcceptedSocket(handledSock.getLink(), timeoutMs, false)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why try for 5000 ms before handing off to async thread(s) ? Why not just always use async thread(s) ?
Maybe almost same thing if we change 5000 ms here to 100 ms or something very short ..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does always use an async thread - this code is in in CMPConnectionThread::threadmain (a thread in the pool).
What it's doing here (within a thread of the pool), is handing this connection off, to outside the thread pool, to a separate 'slow' handler, so pool doesn't clog up.
@mckellyln - made 'stopped' an atomic, and see reply re. other question. |
I've done some testing and this PR looks good, but I can still get some bad behavior when a rogue client connects to Thor and sends in various amounts of garbage data. I have created https://track.hpccsystems.com/browse/HPCC-31251 for additional work on this. |
d920488
to
873ebe1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I approve this PR.
@jakesmith please can you add a summary of the solution to the jira. |
Added. Also noticed a problem with the way the optional config. option was being picked up in k8s. Addressed in new commit. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jakesmith A couple of problems and a few minor other comments.
system/mp/mpcomm.cpp
Outdated
continue; | ||
} | ||
|
||
Linked<ISocket> sock = slowClientsSocks.back(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be protected within the critical section - otherwise it could get an invalid item. Theoretically the empty() call needs to be as well - on a chip with a very weak memory model..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will put both in crit block.
system/mp/mpcomm.cpp
Outdated
{ | ||
CriticalBlock b(crit); | ||
for (auto &sock : slowClientsSocks) | ||
sock->close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it safe to call close() on a socket that is currently being listened to on another thread? (If stop is called while waiting for a slow response). From memory it can be complicated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it will potentially currently being read from (CSocket::readtms) at this point, and on a ::poll call.
I think socket::close() will cause the poll to quit (I did test this scenario).
I thought it was a used technique elsewhere.
@mckellyln - what are your thoughts on this/interrupting the read?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reading up, I think it is unsafe, also, the handling of getting slowClientsSocks.back() in crit, then later calling pop_back() is unsafe is new items are being added.
I am going to remove the socket from the slowClientsSocks before calling handleAcceptedSocket.
It will mean it can't be interrupted and will timeout, but that's not really a problem, just a theoretical slightly longer delay during shutdown if happens to be on a stalling socket.
system/mp/mpcomm.cpp
Outdated
void add(ISocket *sock) // NB: takes ownership | ||
{ | ||
if (stopped) | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will leak sock.
I'm not sure what is the cleanest pattern to implement this. Possibly pass an Owned or link the socket....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's safe to assume it will get to the emplace_back once past this point (and thus own).
So a specific sock->Release here if stopped is a reasonable resolution.
running = false; | ||
listensock->cancel_accept(); | ||
if (listen) | ||
{ | ||
if (!threadPool->joinAll(true, 1000*60*5)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth commenting that stop the listening threads first in case any get added to the stop processor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll move the join for this (CMPConnectThread) accept thread ahead of this, and add a comment.
system/mp/mpcomm.cpp
Outdated
@@ -2178,194 +2265,272 @@ void CMPConnectThread::startPort(unsigned short port) | |||
if (!listen) | |||
return; | |||
running = true; | |||
if (acceptThreadPoolSize) | |||
{ | |||
class CFactory : public CInterfaceOf<IThreadFactory> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CFactory: The name could be more descriptive - what is it a factory for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will rename to CMPConnectThreadFactory.
I do tend to use generic names if inline classes are used and therefore implicitly relevant to the surrounding code.
running = false; | ||
listensock->cancel_accept(); | ||
if (listen) | ||
{ | ||
if (!threadPool->joinAll(true, 1000*60*5)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
threadPool could be null if the pool size is set to 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, will add a check.
system/mp/mpcomm.cpp
Outdated
Owned<ISocket> handledSock = sock.getClear(); | ||
if (owner.handleAcceptedSocket(handledSock.getLink(), timeoutMs, false)) | ||
{ | ||
// handoff to slowClientProcessor, which will retry for standard CONFIRM_TIMEOUT period owner.slowClientProcessor.add(sock.getClear()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spurious (almost) copy of the next line on the end of the comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah yes, couldn't even spot it in vscode at first!
Will remove.
{ | ||
constexpr unsigned timeoutMs = 5000; | ||
Owned<ISocket> handledSock = sock.getClear(); | ||
if (owner.handleAcceptedSocket(handledSock.getLink(), timeoutMs, false)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not worth changing, but it is counter-intuitive that a function called handleAcceptedSocket() returns true if it fails to handle it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree, and pondered this, but really true|false is insufficient. As it stands returning false means it did not timeout, but it doesn't mean it succeeded. Various failure conditions also return false.
So really it should return a state, but I only care about 1 of them (timeout).
@ghalliday - please see new commit. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jakesmith please squash
And handoff slow/blocking connection negotiations to separate dedicated thread. Signed-off-by: Jake Smith <[email protected]>
19ae797
to
6c13c81
Compare
@ghalliday - done |
And handoff slow/blocking connection negotiations to separate dedicated thread.
Type of change:
Checklist:
Smoketest:
Testing: