From 5238912438ea58920f78620d60ef73695e52c9a1 Mon Sep 17 00:00:00 2001 From: David Joel Schwartz Date: Sat, 14 Apr 2012 20:35:58 -0400 Subject: [PATCH 1/2] Support multi-threaded JSON-RPC Change internal HTTP JSON-RPC server from single-threaded to thread-per-connection model. The IP filter list is applied prior to starting the thread, which then processes the RPC. A mutex covers the entire RPC operation, because not all RPC operations are thread-safe. [minor modifications by jgarzik, to make change upstream-ready] --- src/bitcoinrpc.cpp | 97 +++++++++++++++++++++++++++++++--------------- src/net.cpp | 5 ++- src/net.h | 3 +- 3 files changed, 70 insertions(+), 35 deletions(-) diff --git a/src/bitcoinrpc.cpp b/src/bitcoinrpc.cpp index cc1fb9b8..fbb46b52 100644 --- a/src/bitcoinrpc.cpp +++ b/src/bitcoinrpc.cpp @@ -37,6 +37,8 @@ CReserveKey* pMiningKey = NULL; static std::string strRPCUserColonPass; +void ThreadRPCServer3(void* parg); + Object JSONRPCError(int code, const string& message) { Object error; @@ -298,7 +300,7 @@ Value getwork(const Array& params, bool fHelp) typedef map > mapNewBlock_t; - static mapNewBlock_t mapNewBlock; + static mapNewBlock_t mapNewBlock; // FIXME: thread safety static vector vNewBlock; if (params.size() == 0) @@ -1049,6 +1051,20 @@ class SSLIOStreamDevice : public iostreams::device { SSLStream& stream; }; +class AcceptedConnection +{ + public: + SSLStream sslStream; + SSLIOStreamDevice d; + iostreams::stream stream; + + ip::tcp::endpoint peer; + + AcceptedConnection(asio::io_service &io_service, ssl::context &context, + bool fUseSSL) : sslStream(io_service, context), d(sslStream, fUseSSL), + stream(d) { ; } +}; + void ThreadRPCServer(void* parg) { // getwork/getblocktemplate mining rewards paid here: @@ -1056,15 +1072,15 @@ void ThreadRPCServer(void* parg) try { - vnThreadsRunning[THREAD_RPCSERVER]++; + vnThreadsRunning[THREAD_RPCLISTENER]++; ThreadRPCServer2(parg); - vnThreadsRunning[THREAD_RPCSERVER]--; + vnThreadsRunning[THREAD_RPCLISTENER]--; } catch (std::exception& e) { - vnThreadsRunning[THREAD_RPCSERVER]--; + vnThreadsRunning[THREAD_RPCLISTENER]--; PrintException(&e, "ThreadRPCServer()"); } catch (...) { - vnThreadsRunning[THREAD_RPCSERVER]--; + vnThreadsRunning[THREAD_RPCLISTENER]--; PrintException(NULL, "ThreadRPCServer()"); } @@ -1200,54 +1216,66 @@ void ThreadRPCServer2(void* parg) for (;;) { // Accept connection - SSLStream sslStream(io_service, context); - SSLIOStreamDevice d(sslStream, fUseSSL); - iostreams::stream stream(d); - - ip::tcp::endpoint peer; - vnThreadsRunning[THREAD_RPCSERVER]--; - acceptor.accept(sslStream.lowest_layer(), peer); - vnThreadsRunning[4]++; + AcceptedConnection *conn = + new AcceptedConnection(io_service, context, fUseSSL); + + vnThreadsRunning[THREAD_RPCLISTENER]--; + acceptor.accept(conn->sslStream.lowest_layer(), conn->peer); + vnThreadsRunning[THREAD_RPCLISTENER]++; + if (fShutdown) + { + delete conn; return; + } - // Restrict callers by IP - if (!ClientAllowed(peer.address().to_string())) + // Restrict callers by IP. It is important to + // do this before starting client thread, to filter out + // certain DoS and misbehaving clients. + if (!ClientAllowed(conn->peer.address().to_string())) { // Only send a 403 if we're not using SSL to prevent a DoS during the SSL handshake. if (!fUseSSL) - stream << HTTPReply(403, "") << std::flush; - continue; + conn->stream << HTTPReply(403, "") << std::flush; + delete conn; } + // start HTTP client thread + else if (!NewThread(ThreadRPCServer3, conn)) { + printf("Failed to create RPC server client thread\n"); + delete conn; + } + } +} + +void ThreadRPCServer3(void* parg) +{ + vnThreadsRunning[THREAD_RPCHANDLER]++; + AcceptedConnection *conn = (AcceptedConnection *) parg; + + do { map mapHeaders; string strRequest; - boost::thread api_caller(ReadHTTP, boost::ref(stream), boost::ref(mapHeaders), boost::ref(strRequest)); - if (!api_caller.timed_join(boost::posix_time::seconds(GetArg("-rpctimeout", 30)))) - { // Timed out: - acceptor.cancel(); - printf("ThreadRPCServer ReadHTTP timeout\n"); - continue; - } + ReadHTTP(conn->stream, mapHeaders, strRequest); // Check authorization if (mapHeaders.count("authorization") == 0) { - stream << HTTPReply(401, "") << std::flush; - continue; + conn->stream << HTTPReply(401, "") << std::flush; + break; } if (!HTTPAuthorized(mapHeaders)) { - printf("ThreadRPCServer incorrect password attempt from %s\n",peer.address().to_string().c_str()); + printf("ThreadRPCServer incorrect password attempt from %s\n", conn->peer.address().to_string().c_str()); /* Deter brute-forcing short passwords. If this results in a DOS the user really shouldn't have their RPC port exposed.*/ if (mapArgs["-rpcpassword"].size() < 20) Sleep(250); - stream << HTTPReply(401, "") << std::flush; - continue; + conn->stream << HTTPReply(401, "") << std::flush; + break; } Value id = Value::null; @@ -1271,17 +1299,22 @@ void ThreadRPCServer2(void* parg) throw JSONRPCError(-32600, "Top-level object parse error"); // Send reply - stream << HTTPReply(200, strReply) << std::flush; + conn->stream << HTTPReply(200, strReply) << std::flush; } catch (Object& objError) { - ErrorReply(stream, objError, id); + ErrorReply(conn->stream, objError, id); + break; } catch (std::exception& e) { - ErrorReply(stream, JSONRPCError(-32700, e.what()), id); + ErrorReply(conn->stream, JSONRPCError(-32700, e.what()), id); + break; } } + while (0); + delete conn; + vnThreadsRunning[THREAD_RPCHANDLER]--; } json_spirit::Value CRPCTable::execute(const std::string &strMethod, const json_spirit::Array ¶ms) const diff --git a/src/net.cpp b/src/net.cpp index 305904e0..f40a9883 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1953,13 +1953,14 @@ bool StopNode() if (vnThreadsRunning[THREAD_OPENCONNECTIONS] > 0) printf("ThreadOpenConnections still running\n"); if (vnThreadsRunning[THREAD_MESSAGEHANDLER] > 0) printf("ThreadMessageHandler still running\n"); if (vnThreadsRunning[THREAD_MINER] > 0) printf("ThreadBitcoinMiner still running\n"); - if (vnThreadsRunning[THREAD_RPCSERVER] > 0) printf("ThreadRPCServer still running\n"); + if (vnThreadsRunning[THREAD_RPCLISTENER] > 0) printf("ThreadRPCListener still running\n"); + if (vnThreadsRunning[THREAD_RPCHANDLER] > 0) printf("ThreadsRPCServer still running\n"); if (fHaveUPnP && vnThreadsRunning[THREAD_UPNP] > 0) printf("ThreadMapPort still running\n"); if (vnThreadsRunning[THREAD_DNSSEED] > 0) printf("ThreadDNSAddressSeed still running\n"); if (vnThreadsRunning[THREAD_ADDEDCONNECTIONS] > 0) printf("ThreadOpenAddedConnections still running\n"); if (vnThreadsRunning[THREAD_DUMPADDRESS] > 0) printf("ThreadDumpAddresses still running\n"); if (vnThreadsRunning[THREAD_MINTER] > 0) printf("ThreadStakeMinter still running\n"); - while (vnThreadsRunning[THREAD_MESSAGEHANDLER] > 0 || vnThreadsRunning[THREAD_RPCSERVER] > 0) + while (vnThreadsRunning[THREAD_MESSAGEHANDLER] > 0 || vnThreadsRunning[THREAD_RPCHANDLER] > 0) Sleep(20); Sleep(50); DumpAddresses(); diff --git a/src/net.h b/src/net.h index aa3cd113..867d4cbc 100644 --- a/src/net.h +++ b/src/net.h @@ -81,12 +81,13 @@ enum threadId THREAD_OPENCONNECTIONS, THREAD_MESSAGEHANDLER, THREAD_MINER, - THREAD_RPCSERVER, + THREAD_RPCLISTENER, THREAD_UPNP, THREAD_DNSSEED, THREAD_ADDEDCONNECTIONS, THREAD_DUMPADDRESS, THREAD_MINTER, + THREAD_RPCHANDLER, THREAD_MAX }; From b7333ec5ca8106d6e87cae5341b0079c1263cf87 Mon Sep 17 00:00:00 2001 From: David Joel Schwartz Date: Tue, 24 Apr 2012 01:10:02 -0400 Subject: [PATCH 2/2] RPC: Support HTTP/1.0 and HTTP/1.1, including the proper use of keep-alives --- src/bitcoinrpc.cpp | 50 +++++++++++++++++++++++++++++++++++----------- 1 file changed, 38 insertions(+), 12 deletions(-) diff --git a/src/bitcoinrpc.cpp b/src/bitcoinrpc.cpp index fbb46b52..741af687 100644 --- a/src/bitcoinrpc.cpp +++ b/src/bitcoinrpc.cpp @@ -827,7 +827,7 @@ string rfc1123Time() return string(buffer); } -static string HTTPReply(int nStatus, const string& strMsg) +static string HTTPReply(int nStatus, const string& strMsg, bool keepalive) { if (nStatus == 401) return strprintf("HTTP/1.0 401 Authorization Required\r\n" @@ -856,7 +856,7 @@ static string HTTPReply(int nStatus, const string& strMsg) return strprintf( "HTTP/1.1 %d %s\r\n" "Date: %s\r\n" - "Connection: close\r\n" + "Connection: %s\r\n" "Content-Length: %d\r\n" "Content-Type: application/json\r\n" "Server: paycoin-json-rpc/%s\r\n" @@ -865,12 +865,13 @@ static string HTTPReply(int nStatus, const string& strMsg) nStatus, cStatus, rfc1123Time().c_str(), + keepalive ? "keep-alive" : "close", strMsg.size(), FormatFullVersion().c_str(), strMsg.c_str()); } -int ReadHTTPStatus(std::basic_istream& stream) +int ReadHTTPStatus(std::basic_istream& stream, int &proto) { string str; getline(stream, str); @@ -878,6 +879,10 @@ int ReadHTTPStatus(std::basic_istream& stream) boost::split(vWords, str, boost::is_any_of(" ")); if (vWords.size() < 2) return 500; + proto = 0; + const char *ver = strstr(str.c_str(), "HTTP/1."); + if (ver != NULL) + proto = atoi(ver+7); return atoi(vWords[1].c_str()); } @@ -912,7 +917,8 @@ int ReadHTTP(std::basic_istream& stream, map& mapHeadersRe strMessageRet = ""; // Read status - int nStatus = ReadHTTPStatus(stream); + int nProto; + int nStatus = ReadHTTPStatus(stream, nProto); // Read header int nLen = ReadHTTPHeader(stream, mapHeadersRet); @@ -927,6 +933,16 @@ int ReadHTTP(std::basic_istream& stream, map& mapHeadersRe strMessageRet = string(vch.begin(), vch.end()); } + string sConHdr = mapHeadersRet["connection"]; + + if ((sConHdr != "close") && (sConHdr != "keep-alive")) + { + if (nProto >= 1) + mapHeadersRet["connection"] = "keep-alive"; + else + mapHeadersRet["connection"] = "close"; + } + return nStatus; } @@ -985,7 +1001,7 @@ void ErrorReply(std::ostream& stream, const Object& objError, const Value& id) if (code == -32600) nStatus = 400; else if (code == -32601) nStatus = 404; string strReply = JSONRPCReply(Value::null, objError, id); - stream << HTTPReply(nStatus, strReply) << std::flush; + stream << HTTPReply(nStatus, strReply, false) << std::flush; } bool ClientAllowed(const string& strAddress) @@ -1217,7 +1233,7 @@ void ThreadRPCServer2(void* parg) { // Accept connection AcceptedConnection *conn = - new AcceptedConnection(io_service, context, fUseSSL); + new AcceptedConnection(io_service, context, fUseSSL); vnThreadsRunning[THREAD_RPCLISTENER]--; acceptor.accept(conn->sslStream.lowest_layer(), conn->peer); @@ -1236,7 +1252,7 @@ void ThreadRPCServer2(void* parg) { // Only send a 403 if we're not using SSL to prevent a DoS during the SSL handshake. if (!fUseSSL) - conn->stream << HTTPReply(403, "") << std::flush; + conn->stream << HTTPReply(403, "", false) << std::flush; delete conn; } @@ -1253,7 +1269,15 @@ void ThreadRPCServer3(void* parg) vnThreadsRunning[THREAD_RPCHANDLER]++; AcceptedConnection *conn = (AcceptedConnection *) parg; - do { + bool fRun = true; + for (;;) { + if (fShutdown || !fRun) + { + conn->stream.close(); + delete conn; + --vnThreadsRunning[THREAD_RPCHANDLER]; + return; + } map mapHeaders; string strRequest; @@ -1262,7 +1286,7 @@ void ThreadRPCServer3(void* parg) // Check authorization if (mapHeaders.count("authorization") == 0) { - conn->stream << HTTPReply(401, "") << std::flush; + conn->stream << HTTPReply(401, "", false) << std::flush; break; } if (!HTTPAuthorized(mapHeaders)) @@ -1274,9 +1298,11 @@ void ThreadRPCServer3(void* parg) if (mapArgs["-rpcpassword"].size() < 20) Sleep(250); - conn->stream << HTTPReply(401, "") << std::flush; + conn->stream << HTTPReply(401, "", false) << std::flush; break; } + if (mapHeaders["connection"] == "close") + fRun = false; Value id = Value::null; try @@ -1299,7 +1325,7 @@ void ThreadRPCServer3(void* parg) throw JSONRPCError(-32600, "Top-level object parse error"); // Send reply - conn->stream << HTTPReply(200, strReply) << std::flush; + conn->stream << HTTPReply(200, strReply, fRun) << std::flush; } catch (Object& objError) { @@ -1312,7 +1338,7 @@ void ThreadRPCServer3(void* parg) break; } } - while (0); + delete conn; vnThreadsRunning[THREAD_RPCHANDLER]--; }