From ee52773432888716e5ce34a07079fd72e9e4379a Mon Sep 17 00:00:00 2001 From: Ahsan Khan Date: Wed, 4 May 2022 09:55:31 -0700 Subject: [PATCH 1/5] Cleanup idle transactions after timeout, associate queries with transaction --- src/k2/cmd/httpproxy/http_proxy_main.cpp | 3 +- src/k2/httpproxy/HTTPProxy.cpp | 92 ++++++++++++++++++------ src/k2/httpproxy/HTTPProxy.h | 20 +++++- test/integration/skvclient.py | 42 ++++++----- test/integration/test_http.py | 37 +++++++--- test/integration/test_http_client.sh | 2 +- 6 files changed, 139 insertions(+), 57 deletions(-) diff --git a/src/k2/cmd/httpproxy/http_proxy_main.cpp b/src/k2/cmd/httpproxy/http_proxy_main.cpp index c99e2dee..9678e230 100644 --- a/src/k2/cmd/httpproxy/http_proxy_main.cpp +++ b/src/k2/cmd/httpproxy/http_proxy_main.cpp @@ -37,6 +37,7 @@ int main(int argc, char** argv) { ("partition_request_timeout", bpo::value(), "Timeout of K23SI operations, as chrono literals") ("cpo", bpo::value(), "URL of Control Plane Oracle (CPO), e.g. 'tcp+k2rpc://192.168.1.2:12345'") ("cpo_request_timeout", bpo::value(), "CPO request timeout") - ("cpo_request_backoff", bpo::value(), "CPO request backoff"); + ("cpo_request_backoff", bpo::value(), "CPO request backoff") + ("httpproxy_txn_timeout", bpo::value(), "HTTP Proxy Txn idle timeout"); return app.start(argc, argv); } diff --git a/src/k2/httpproxy/HTTPProxy.cpp b/src/k2/httpproxy/HTTPProxy.cpp index 81ecf350..171dd86f 100644 --- a/src/k2/httpproxy/HTTPProxy.cpp +++ b/src/k2/httpproxy/HTTPProxy.cpp @@ -171,7 +171,12 @@ seastar::future<> HTTPProxy::gracefulStop() { auto it = _txns.begin(); for(; it != _txns.end(); ++it) { - _endFuts.push_back(it->second.end(false).discard_result()); + it->second.timer.cancel(); + } + + it = _txns.begin(); + for(; it != _txns.end(); ++it) { + _endFuts.push_back(it->second.txn.end(false).discard_result()); } return seastar::when_all_succeed(_endFuts.begin(), _endFuts.end()); @@ -186,13 +191,41 @@ seastar::future<> HTTPProxy::start() { return _startFut; } +void HTTPProxy::TxnTracker::resetTimeout(Duration timeout) { + timer.rearm(Clock::now() + timeout); +} + seastar::future HTTPProxy::_handleBegin(nlohmann::json&& request) { (void) request; return _client.beginTxn(k2::K2TxnOptions()) .then([this] (auto&& txn) { K2LOG_D(k2::log::httpproxy, "begin txn: {}", txn.mtr()); - _txns[_txnID++] = std::move(txn); - return JsonResponse(Statuses::S201_Created("Begin txn success"), nlohmann::json{{"txnID", _txnID - 1}}); + auto txnid = _txnID++; + seastar::timer<> timer([this, txnid] { + if (_stopped) return; + // Sometime this callback happens during shutdown before gracefull stop is called. + // Causing the program to exit with no thread error. + // TODO: Detect and fix such condition. + K2LOG_D(log::httpproxy, "Txn timed out for txnid={}", txnid); + auto iter = _txns.find(txnid); + K2ASSERT(log::httpproxy, iter != _txns.end(), "unable to find txn for timer"); + _timedoutTxns++; + iter->second.txn.end(false) + .then_wrapped([this, txnid] (auto&& fut) { + (void)fut; + K2LOG_D(log::httpproxy, "Erasing txnid={}", txnid); + _numQueries -= _txns[txnid].queries.size(); + // Following line will also delete the timer calling this callback. + // TODO: Delete outside this callback to avoid potential race condition. + _txns.erase(txnid); + + return seastar::make_ready_future<>(); + }) + .wait(); + }); + timer.arm(httpproxy_txn_timeout()); + _txns.emplace(txnid, TxnTracker{std::move(txn), {}, std::move(timer)}); + return JsonResponse(Statuses::S201_Created("Begin txn success"), nlohmann::json{{"txnID", txnid}}); }); } @@ -209,13 +242,14 @@ seastar::future HTTPProxy::_handleEnd(nlohmann::json&& request) return JsonResponse(Statuses::S400_Bad_Request("Bad json for end request")); } - std::unordered_map::iterator it = _txns.find(id); + auto it = _txns.find(id); if (it == _txns.end()) { return JsonResponse(Statuses::S400_Bad_Request("Could not find txnID for end request")); } - return it->second.end(commit) + return it->second.txn.end(commit) .then([this, id] (k2::EndResult&& result) { + // Will automatically cancel the txn timer when destroyed _txns.erase(id); return JsonResponse(std::move(result.status)); }); @@ -242,7 +276,7 @@ seastar::future HTTPProxy::_handleRead(nlohmann::json&& request) return JsonResponse(Statuses::S400_Bad_Request("Invalid json for read request")); } - std::unordered_map::iterator it = _txns.find(id); + auto it = _txns.find(id); if (it == _txns.end()) { return JsonResponse(Statuses::S400_Bad_Request("Could not find txnID for read request")); } @@ -261,7 +295,9 @@ seastar::future HTTPProxy::_handleRead(nlohmann::json&& request) _deserializationErrors++; return JsonResponse(Statuses::S400_Bad_Request(e.what())); } - return _txns[id].read(std::move(record)) + auto iter = _txns.find(id); + iter->second.resetTimeout(httpproxy_txn_timeout()); + return iter->second.txn.read(std::move(record)) .then([this] (k2::ReadResult&& result) { if(!result.status.is2xxOK()) { return JsonResponse(std::move(result.status)); @@ -296,7 +332,7 @@ seastar::future HTTPProxy::_handleWrite(nlohmann::json&& request return JsonResponse(Statuses::S400_Bad_Request("Bad json for write request")); } - std::unordered_map::iterator it = _txns.find(id); + auto it = _txns.find(id); if (it == _txns.end()) { return JsonResponse(Statuses::S400_Bad_Request("Could not find txnID for write request")); } @@ -315,8 +351,9 @@ seastar::future HTTPProxy::_handleWrite(nlohmann::json&& request _deserializationErrors++; return JsonResponse(Statuses::S400_Bad_Request(e.what())); } - - return _txns[id].write(record) + auto iter = _txns.find(id); + iter->second.resetTimeout(httpproxy_txn_timeout()); + return iter->second.txn.write(record) .then([] (k2::WriteResult&& result) { return JsonResponse(std::move(result.status)); }); @@ -384,17 +421,23 @@ seastar::future HTTPProxy::_handleGetKeyString(nlohmann::json&& seastar::future HTTPProxy::_handleCreateQuery(nlohmann::json&& jsonReq) { std::string collectionName; std::string schemaName; + uint64_t txnID; try { jsonReq.at("collectionName").get_to(collectionName); jsonReq.at("schemaName").get_to(schemaName); + jsonReq.at("txnID").get_to(txnID); } catch(...) { _deserializationErrors++; return JsonResponse(Statuses::S400_Bad_Request("Bad json for query request")); } - + auto txnIter = _txns.find(txnID); + if (txnIter == _txns.end()) { + return JsonResponse(Statuses::S400_Bad_Request("Could not find txnID for query request")); + } + txnIter->second.resetTimeout(httpproxy_txn_timeout()); return _client.createQuery(collectionName, schemaName) - .then([this, req=std::move(jsonReq)] (auto&& result) mutable { + .then([this, txnID, req=std::move(jsonReq)] (auto&& result) mutable { if(!result.status.is2xxOK()) { return JsonResponse(std::move(result.status)); } @@ -411,8 +454,10 @@ seastar::future HTTPProxy::_handleCreateQuery(nlohmann::json&& j if (req.contains("reverse")) { result.query.setReverseDirection(req["reverse"]); } - _queries[_queryID++] = std::move(result.query); - nlohmann::json resp{{"queryID", _queryID - 1}}; + auto queryid = _queryID++; + _txns[txnID].queries[queryid] = std::move(result.query); + _numQueries++; + nlohmann::json resp{{"queryID", queryid}}; return JsonResponse(std::move(result.status), std::move(resp)); }); } @@ -432,14 +477,15 @@ seastar::future HTTPProxy::_handleQuery(nlohmann::json&& jsonReq if (txnIter == _txns.end()) { return JsonResponse(Statuses::S400_Bad_Request("Could not find txnID for query request")); } - auto queryIter = _queries.find(queryID); + txnIter->second.resetTimeout(httpproxy_txn_timeout()); + auto queryIter = txnIter->second.queries.find(queryID); - if (queryIter == _queries.end()) { + if (queryIter == txnIter->second.queries.end()) { return JsonResponse(Statuses::S400_Bad_Request("Could not find queryID for query request")); } - return txnIter->second.query(queryIter->second) - .then([this, queryID](QueryResult&& result) { + return txnIter->second.txn.query(queryIter->second) + .then([this, txnID, queryID](QueryResult&& result) { if(!result.status.is2xxOK()) { return JsonResponse(std::move(result.status)); } @@ -449,10 +495,11 @@ seastar::future HTTPProxy::_handleQuery(nlohmann::json&& jsonReq for (auto& record: result.records) { records.push_back(serializeJSONFromRecord(record)); } - - bool isDone = _queries[queryID].isDone(); + auto txnIter = _txns.find(txnID); + bool isDone = txnIter->second.queries[queryID].isDone(); if (isDone) { - _queries.erase(queryID); + txnIter->second.queries.erase(queryID); + _numQueries--; } nlohmann::json resp; resp["records"] = std::move(records); @@ -509,9 +556,10 @@ void HTTPProxy::_registerMetrics() { _metric_groups.add_group("session", { sm::make_counter("deserialization_errors", _deserializationErrors, sm::description("Total number of deserialization errors"), labels), + sm::make_counter("timed_out_txns", _timedoutTxns, sm::description("Total number of txn timed out"), labels), sm::make_gauge("open_txns", [this]{ return _txns.size();}, sm::description("Total number of open txn handles"), labels), - sm::make_gauge("open_queries", [this]{ return _queries.size();}, sm::description("Total number of open queries"), labels), + sm::make_gauge("open_queries", [this]{ return _numQueries;}, sm::description("Total number of open queries"), labels), }); } } \ No newline at end of file diff --git a/src/k2/httpproxy/HTTPProxy.h b/src/k2/httpproxy/HTTPProxy.h index 5931dcb8..808b108c 100644 --- a/src/k2/httpproxy/HTTPProxy.h +++ b/src/k2/httpproxy/HTTPProxy.h @@ -23,6 +23,7 @@ Copyright(c) 2022 Futurewei Cloud #pragma once +#include #include namespace k2 { @@ -57,15 +58,28 @@ class HTTPProxy { sm::metric_groups _metric_groups; uint64_t _deserializationErrors = 0; + uint64_t _timedoutTxns = 0; + uint64_t _numQueries = 0; bool _stopped = true; k2::K23SIClient _client; uint64_t _txnID = 0; uint64_t _queryID = 0; - std::unordered_map _txns; - // Store in progress queries - std::unordered_map _queries; + + struct TxnTracker { + k2::K2TxnHandle txn; + // Store in progress queries + std::unordered_map queries; + // Txn idle expiry timer + seastar::timer<> timer; + + // Push back txn idle expiry because of activity + void resetTimeout(Duration d); + }; + std::unordered_map _txns; std::vector> _endFuts; + // Txn idle timeout + ConfigDuration httpproxy_txn_timeout{"httpproxy_txn_timeout", 60s}; }; // class HTTPProxy } // namespace k2 diff --git a/test/integration/skvclient.py b/test/integration/skvclient.py index 1dcd0a5b..25bb0f36 100644 --- a/test/integration/skvclient.py +++ b/test/integration/skvclient.py @@ -134,6 +134,26 @@ def read(self, loc: DBLoc) -> Tuple[Status, object] : output = result["response"].get("record") if "response" in result else None return Status(result), output + def create_query(self, collectionName: str, + schemaName: str, start: dict = None, end: dict = None, + limit: int = 0, reverse: bool = False) -> Tuple[Status, Query]: + data = {"collectionName": collectionName, + "schemaName": schemaName, "txnID": self._txn_id} + if start: + data["startScanRecord"] = start + if end: + data["endScanRecord"] = end + if limit: + data["limit"] = limit + if reverse: + data["reverse"] = reverse + + result = self._send_req("/api/CreateQuery", data) + status = Status(result) + if "response" in result: + return status, Query(result["response"]["queryID"]) + return status, None + def query(self, query: Query) -> Tuple[Status, ListOfDict]: request = {"txnID" : self._txn_id, "queryID": query.query_id} result = self._send_req("/api/Query", request) @@ -264,28 +284,6 @@ def create_collection(self, metadata: CollectionMetadata, rangeEnds: [str] = []) status = Status(result) return status - def create_query(self, collectionName: str, - schemaName: str, start: dict = None, end: dict = None, - limit: int = 0, reverse: bool = False) -> Tuple[Status, Query]: - url = self.http + "/api/CreateQuery" - data = {"collectionName": collectionName, - "schemaName": schemaName} - if start: - data["startScanRecord"] = start - if end: - data["endScanRecord"] = end - if limit: - data["limit"] = limit - if reverse: - data["reverse"] = reverse - - r = requests.post(url, data=json.dumps(data)) - result = r.json() - status = Status(result) - if "response" in result: - return status, Query(result["response"]["queryID"]) - return status, None - def get_key_string(self, fields: [FieldSpec]) -> Tuple[Status, str]: url = self.http + "/api/GetKeyString" req = {"fields": fields} diff --git a/test/integration/test_http.py b/test/integration/test_http.py index b6a796f0..79080530 100755 --- a/test/integration/test_http.py +++ b/test/integration/test_http.py @@ -32,6 +32,7 @@ CollectionMetadata, SKVClient, FieldSpec, MetricsClient, Counter, Histogram) from datetime import timedelta +from time import sleep parser = argparse.ArgumentParser() parser.add_argument("--http", help="HTTP API URL") @@ -386,33 +387,33 @@ def test_query(self): all_records = [record1, record2] - status, query_id = db.create_query("query_collection", "query_test") + status, query_id = txn.create_query("query_collection", "query_test") self.assertEqual(status.code, 200, msg=status.message) status, records = txn.queryAll(query_id) self.assertEqual(status.code, 200, msg=status.message) self.assertEqual(records, all_records) - status, query_id = db.create_query("query_collection", "query_test", + status, query_id = txn.create_query("query_collection", "query_test", start = {"partition": "default", "partition1": "h"}) self.assertEqual(status.code, 200, msg=status.message) status, records = txn.queryAll(query_id) self.assertEqual(status.code, 200, msg=status.message) self.assertEqual(records, all_records[1:]) - status, query_id = db.create_query("query_collection", "query_test", + status, query_id = txn.create_query("query_collection", "query_test", end = {"partition": "default", "partition1": "h"}) self.assertEqual(status.code, 200, msg=status.message) status, records = txn.queryAll(query_id) self.assertEqual(status.code, 200, msg=status.message) self.assertEqual(records, all_records[:1]) - status, query_id = db.create_query("query_collection", "query_test", limit = 1) + status, query_id = txn.create_query("query_collection", "query_test", limit = 1) self.assertEqual(status.code, 200, msg=status.message) status, records = txn.queryAll(query_id) self.assertEqual(status.code, 200, msg=status.message) self.assertEqual(records, all_records[:1]) - status, query_id = db.create_query("query_collection", "query_test", reverse = True) + status, query_id = txn.create_query("query_collection", "query_test", reverse = True) self.assertEqual(status.code, 200, msg=status.message) status, records = txn.queryAll(query_id) self.assertEqual(status.code, 200, msg=status.message) @@ -420,7 +421,7 @@ def test_query(self): copied.reverse() self.assertEqual(records, copied) - status, query_id = db.create_query("query_collection", "query_test", + status, query_id = txn.create_query("query_collection", "query_test", limit = 1, reverse = True) self.assertEqual(status.code, 200, msg=status.message) status, records = txn.queryAll(query_id) @@ -428,13 +429,13 @@ def test_query(self): self.assertEqual(records, all_records[1:]) # Send reverse with invalid type, should fail with type error - status, query_id = db.create_query("query_collection", "query_test", + status, query_id = txn.create_query("query_collection", "query_test", limit = 1, reverse = 5) self.assertEqual(status.code, 500, msg=status.message) self.assertIn("type_error", status.message, msg=status.message) # Send limit with invalid type, should fail with type error - status, query_id = db.create_query("query_collection", "query_test", + status, query_id = txn.create_query("query_collection", "query_test", limit = "test", reverse = False) self.assertEqual(status.code, 500, msg=status.message) self.assertIn("type_error", status.message, msg=status.message) @@ -458,6 +459,7 @@ def test_metrics(self): mclient = MetricsClient(args.prometheus, [ Counter("HttpProxy", "session", "open_txns"), Counter("HttpProxy", "session", "deserialization_errors"), + Counter("HttpProxy", "session", "timed_out_txns"), Histogram("HttpProxy", "K23SI_client", "txn_begin_latency"), Histogram("HttpProxy", "K23SI_client", "txn_end_latency"), Histogram("HttpProxy", "K23SI_client", "txn_duration") @@ -490,5 +492,24 @@ def test_metrics(self): self.assertEqual(curr.txn_end_latency, prev.txn_end_latency+1) self.assertEqual(curr.txn_duration, prev.txn_duration+1) + prev = mclient.refresh() + status, txn = db.begin_txn() + # Sleep 1.2s for txn to timeout + sleep(1.2) + status = txn.end() + self.assertEqual(status.code, 400, msg=status.message) + curr = mclient.refresh() + self.assertEqual(curr.timed_out_txns, prev.timed_out_txns+1) + + status, txn = db.begin_txn() + # Sleep 0.7 and write, it should succeed because within timeout + sleep(0.7) + status = txn.write(loc, additional_data) + self.assertEqual(status.code, 201, msg=status.message) + # Sleep additional time, should succeed as time out pushed back because of write + sleep(0.7) + status = txn.end() + self.assertEqual(status.code, 200, msg=status.message) + del sys.argv[1:] unittest.main() diff --git a/test/integration/test_http_client.sh b/test/integration/test_http_client.sh index fa3cd3b4..528992d7 100755 --- a/test/integration/test_http_client.sh +++ b/test/integration/test_http_client.sh @@ -22,7 +22,7 @@ tso_child_pid=$! sleep 2 -./build/src/k2/cmd/httpproxy/http_proxy ${COMMON_ARGS} -c1 --tcp_endpoints ${HTTP} --memory=1G --cpo ${CPO} & +./build/src/k2/cmd/httpproxy/http_proxy ${COMMON_ARGS} -c1 --tcp_endpoints ${HTTP} --memory=1G --cpo ${CPO} --httpproxy_txn_timeout=1s& http_child_pid=$! function finish { From d6ab5d09c8d071971e2ecb0bfc5d4531dcbcd617 Mon Sep 17 00:00:00 2001 From: Ahsan Khan Date: Mon, 9 May 2022 16:58:48 -0700 Subject: [PATCH 2/5] Implement using single periodic timer --- src/k2/common/MapWithExpiry.h | 157 +++++++++++++++++++++++++++++++++ src/k2/httpproxy/HTTPProxy.cpp | 100 +++++++-------------- src/k2/httpproxy/HTTPProxy.h | 18 ++-- 3 files changed, 201 insertions(+), 74 deletions(-) create mode 100644 src/k2/common/MapWithExpiry.h diff --git a/src/k2/common/MapWithExpiry.h b/src/k2/common/MapWithExpiry.h new file mode 100644 index 00000000..c3f2ec6b --- /dev/null +++ b/src/k2/common/MapWithExpiry.h @@ -0,0 +1,157 @@ +/* +MIT License + +Copyright(c) 2022 Futurewei Cloud + + Permission is hereby granted, + free of charge, to any person obtaining a copy of this software and associated documentation files(the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and / or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions : + + The above copyright notice and this permission notice shall be included in all copies + or + substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", + WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, + DAMAGES OR OTHER + LIABILITY, + WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE. +*/ + +#pragma once +#include "Timer.h" + +namespace k2 { +namespace nsbi = boost::intrusive; + +// Utility map class that also keeps a timestamp ordered elements list +template class TimestampOrderedMap { +public: + class MapElem { + public: + ValT& getValue() {return entry;} + TimePoint timestamp() {return ts;} + KeyT key; + ValT entry; + TimePoint ts; + nsbi::list_member_hook<> tsLink; + }; + + typedef typename std::unordered_map::iterator iterator; + typedef nsbi::list, &MapElem::tsLink>> TsOrderedList; + + + auto insert(const KeyT &key, ValT&& val, TimePoint ts) { + auto pair = elems.emplace(key, MapElem { + .key = key, + .entry = std::move(val), + .ts = ts, + .tsLink = {}, + }); + MapElem& elem = pair.first->second; + tsOrderedList.push_back(elem); + } + + // TODO: Implement using boost iterator helpers to reduce boilerplate code. + iterator end() {return elems.end();} + iterator begin() { return elems.begin();} + iterator find(const KeyT &key) {return elems.find(key);} + ValT& at(const KeyT &key) {return elems.at(key).getValue();} + auto size() {return elems.size();} + void unlink(iterator iter) { + tsOrderedList.erase(tsOrderedList.iterator_to(iter->second)); + } + auto extract(iterator iter) { + unlink(iter); + // Remove from map + return elems.extract(iter); + } + auto erase(const KeyT &key) { + iterator iter = elems.find(key); + unlink(iter); + return elems.erase(iter); + } + + void resetTs(iterator iter, TimePoint ts) { + // new TS needs to be >= current maximum TS + if (ts < tsOrderedList.back().ts) { + assert(false); + throw std::invalid_argument("too small ts"); + } + unlink(iter); + iter->second.ts = ts; + tsOrderedList.push_back(iter->second); + } + + iterator getFirst() { + if (tsOrderedList.empty()) + return elems.end(); + return find(tsOrderedList.front().key); + } + + TsOrderedList tsOrderedList; + std::unordered_map elems; +}; + +template +class MapWithExpiry { +public: + template + void start(Duration timeout, Func&& func) { + _timeout = timeout; + _expiryTimer.setCallback([this, func = std::move(func)] { + return seastar::do_until( + [this] { + auto iter = _elems.getFirst(); + return (iter == _elems.end() || iter->second.timestamp() > ClockT::now()); + }, + [this, func] { + auto iter = _elems.getFirst(); + // First Remove element from map before calling callback + auto node = _elems.extract(iter); + if (!node) return seastar::make_ready_future(); + return func(node.key(), node.mapped().getValue()); + }); + + }); + _expiryTimer.armPeriodic(_timeout/2); + } + + seastar::future<> stop() { + return _expiryTimer.stop() + .then([this] { + std::vector> bgFuts; + for(auto iter = _elems.begin(); iter != _elems.end(); iter++) { + _elems.unlink(iter); + bgFuts.push_back(iter->second.getValue().cleanup().discard_result()); + } + return seastar::when_all_succeed(bgFuts.begin(), bgFuts.end()); + }); + } + + // Check if key is present, with optionally push back expiry if present + bool isPresent(const KeyT &key, bool reorder) { + auto iter = _elems.find(key); + if (iter == _elems.end()) return false; + if (reorder) _elems.resetTs(iter, getExpiry()); + return true; + } + + TimePoint getExpiry() {return ClockT::now() + _timeout;} + void insert(const KeyT &key, ValT&& val) {_elems.insert(key, std::move(val), getExpiry());} + void erase(const KeyT &key) {_elems.erase(key);} + auto size() {return _elems.size(); } + ValT& at(const KeyT &key) {return _elems.at(key);} + +private: + TimestampOrderedMap _elems; + // heartbeats checks are driven off single timer. + PeriodicTimer _expiryTimer; + Duration _timeout = 10s; + +}; + +} diff --git a/src/k2/httpproxy/HTTPProxy.cpp b/src/k2/httpproxy/HTTPProxy.cpp index 171dd86f..203a6da3 100644 --- a/src/k2/httpproxy/HTTPProxy.cpp +++ b/src/k2/httpproxy/HTTPProxy.cpp @@ -168,18 +168,14 @@ HTTPProxy::HTTPProxy(): seastar::future<> HTTPProxy::gracefulStop() { _stopped = true; - - auto it = _txns.begin(); - for(; it != _txns.end(); ++it) { - it->second.timer.cancel(); - } - - it = _txns.begin(); - for(; it != _txns.end(); ++it) { - _endFuts.push_back(it->second.txn.end(false).discard_result()); - } - - return seastar::when_all_succeed(_endFuts.begin(), _endFuts.end()); + return _txns.stop() + .then_wrapped([] (auto&& fut) { + if (fut.failed()) { + K2LOG_W_EXC(log::httpproxy, fut.get_exception(), "txn failed background task"); + } + K2LOG_I(log::httpproxy, "stopped"); + return seastar::make_ready_future(); + }); } seastar::future<> HTTPProxy::start() { @@ -187,13 +183,16 @@ seastar::future<> HTTPProxy::start() { _registerMetrics(); _registerAPI(); auto _startFut = seastar::make_ready_future<>(); + _txns.start(httpproxy_txn_timeout(), [this] (uint64_t txnID, TxnInfo& txnInfo){ + K2LOG_I(log::httpproxy, "Erasing txnid={}", txnID); + _numQueries -= txnInfo.queries.size(); + _timedoutTxns++; + return txnInfo.txn.end(false).discard_result(); + }); _startFut = _startFut.then([this] {return _client.start();}); return _startFut; } -void HTTPProxy::TxnTracker::resetTimeout(Duration timeout) { - timer.rearm(Clock::now() + timeout); -} seastar::future HTTPProxy::_handleBegin(nlohmann::json&& request) { (void) request; @@ -201,30 +200,7 @@ seastar::future HTTPProxy::_handleBegin(nlohmann::json&& request .then([this] (auto&& txn) { K2LOG_D(k2::log::httpproxy, "begin txn: {}", txn.mtr()); auto txnid = _txnID++; - seastar::timer<> timer([this, txnid] { - if (_stopped) return; - // Sometime this callback happens during shutdown before gracefull stop is called. - // Causing the program to exit with no thread error. - // TODO: Detect and fix such condition. - K2LOG_D(log::httpproxy, "Txn timed out for txnid={}", txnid); - auto iter = _txns.find(txnid); - K2ASSERT(log::httpproxy, iter != _txns.end(), "unable to find txn for timer"); - _timedoutTxns++; - iter->second.txn.end(false) - .then_wrapped([this, txnid] (auto&& fut) { - (void)fut; - K2LOG_D(log::httpproxy, "Erasing txnid={}", txnid); - _numQueries -= _txns[txnid].queries.size(); - // Following line will also delete the timer calling this callback. - // TODO: Delete outside this callback to avoid potential race condition. - _txns.erase(txnid); - - return seastar::make_ready_future<>(); - }) - .wait(); - }); - timer.arm(httpproxy_txn_timeout()); - _txns.emplace(txnid, TxnTracker{std::move(txn), {}, std::move(timer)}); + _txns.insert(txnid, TxnInfo{std::move(txn), {}}); return JsonResponse(Statuses::S201_Created("Begin txn success"), nlohmann::json{{"txnID", txnid}}); }); } @@ -242,14 +218,12 @@ seastar::future HTTPProxy::_handleEnd(nlohmann::json&& request) return JsonResponse(Statuses::S400_Bad_Request("Bad json for end request")); } - auto it = _txns.find(id); - if (it == _txns.end()) { + if (!_txns.isPresent(id, false)) { return JsonResponse(Statuses::S400_Bad_Request("Could not find txnID for end request")); } - return it->second.txn.end(commit) + return _txns.at(id).txn.end(commit) .then([this, id] (k2::EndResult&& result) { - // Will automatically cancel the txn timer when destroyed _txns.erase(id); return JsonResponse(std::move(result.status)); }); @@ -276,8 +250,7 @@ seastar::future HTTPProxy::_handleRead(nlohmann::json&& request) return JsonResponse(Statuses::S400_Bad_Request("Invalid json for read request")); } - auto it = _txns.find(id); - if (it == _txns.end()) { + if (!_txns.isPresent(id, true)) { return JsonResponse(Statuses::S400_Bad_Request("Could not find txnID for read request")); } @@ -295,9 +268,7 @@ seastar::future HTTPProxy::_handleRead(nlohmann::json&& request) _deserializationErrors++; return JsonResponse(Statuses::S400_Bad_Request(e.what())); } - auto iter = _txns.find(id); - iter->second.resetTimeout(httpproxy_txn_timeout()); - return iter->second.txn.read(std::move(record)) + return _txns.at(id).txn.read(std::move(record)) .then([this] (k2::ReadResult&& result) { if(!result.status.is2xxOK()) { return JsonResponse(std::move(result.status)); @@ -332,8 +303,7 @@ seastar::future HTTPProxy::_handleWrite(nlohmann::json&& request return JsonResponse(Statuses::S400_Bad_Request("Bad json for write request")); } - auto it = _txns.find(id); - if (it == _txns.end()) { + if (!_txns.isPresent(id, true)) { return JsonResponse(Statuses::S400_Bad_Request("Could not find txnID for write request")); } @@ -351,9 +321,7 @@ seastar::future HTTPProxy::_handleWrite(nlohmann::json&& request _deserializationErrors++; return JsonResponse(Statuses::S400_Bad_Request(e.what())); } - auto iter = _txns.find(id); - iter->second.resetTimeout(httpproxy_txn_timeout()); - return iter->second.txn.write(record) + return _txns.at(id).txn.write(record) .then([] (k2::WriteResult&& result) { return JsonResponse(std::move(result.status)); }); @@ -431,11 +399,11 @@ seastar::future HTTPProxy::_handleCreateQuery(nlohmann::json&& j _deserializationErrors++; return JsonResponse(Statuses::S400_Bad_Request("Bad json for query request")); } - auto txnIter = _txns.find(txnID); - if (txnIter == _txns.end()) { + + if (!_txns.isPresent(txnID, true)) { return JsonResponse(Statuses::S400_Bad_Request("Could not find txnID for query request")); } - txnIter->second.resetTimeout(httpproxy_txn_timeout()); + return _client.createQuery(collectionName, schemaName) .then([this, txnID, req=std::move(jsonReq)] (auto&& result) mutable { if(!result.status.is2xxOK()) { @@ -455,7 +423,7 @@ seastar::future HTTPProxy::_handleCreateQuery(nlohmann::json&& j result.query.setReverseDirection(req["reverse"]); } auto queryid = _queryID++; - _txns[txnID].queries[queryid] = std::move(result.query); + _txns.at(txnID).queries[queryid] = std::move(result.query); _numQueries++; nlohmann::json resp{{"queryID", queryid}}; return JsonResponse(std::move(result.status), std::move(resp)); @@ -473,18 +441,17 @@ seastar::future HTTPProxy::_handleQuery(nlohmann::json&& jsonReq _deserializationErrors++; return JsonResponse(Statuses::S400_Bad_Request("Bad json for query request")); } - auto txnIter = _txns.find(txnID); - if (txnIter == _txns.end()) { + if (!_txns.isPresent(txnID, true)) { return JsonResponse(Statuses::S400_Bad_Request("Could not find txnID for query request")); } - txnIter->second.resetTimeout(httpproxy_txn_timeout()); - auto queryIter = txnIter->second.queries.find(queryID); - if (queryIter == txnIter->second.queries.end()) { + auto& txnInfo = _txns.at(txnID); + auto queryIter = txnInfo.queries.find(queryID); + if (queryIter == txnInfo.queries.end()) { return JsonResponse(Statuses::S400_Bad_Request("Could not find queryID for query request")); } - return txnIter->second.txn.query(queryIter->second) + return txnInfo.txn.query(queryIter->second) .then([this, txnID, queryID](QueryResult&& result) { if(!result.status.is2xxOK()) { return JsonResponse(std::move(result.status)); @@ -495,10 +462,11 @@ seastar::future HTTPProxy::_handleQuery(nlohmann::json&& jsonReq for (auto& record: result.records) { records.push_back(serializeJSONFromRecord(record)); } - auto txnIter = _txns.find(txnID); - bool isDone = txnIter->second.queries[queryID].isDone(); + auto& txnInfo = _txns.at(txnID); + auto& query = txnInfo.queries.at(queryID); + bool isDone = query.isDone(); if (isDone) { - txnIter->second.queries.erase(queryID); + txnInfo.queries.erase(queryID); _numQueries--; } nlohmann::json resp; diff --git a/src/k2/httpproxy/HTTPProxy.h b/src/k2/httpproxy/HTTPProxy.h index 808b108c..47eb7ba4 100644 --- a/src/k2/httpproxy/HTTPProxy.h +++ b/src/k2/httpproxy/HTTPProxy.h @@ -25,9 +25,12 @@ Copyright(c) 2022 Futurewei Cloud #include #include +#include +#include namespace k2 { + class HTTPProxy { public: // application lifespan HTTPProxy(); @@ -66,18 +69,17 @@ class HTTPProxy { uint64_t _txnID = 0; uint64_t _queryID = 0; - struct TxnTracker { + struct TxnInfo { k2::K2TxnHandle txn; // Store in progress queries std::unordered_map queries; - // Txn idle expiry timer - seastar::timer<> timer; - - // Push back txn idle expiry because of activity - void resetTimeout(Duration d); + // clanup function is required for MapWithExpiry + auto cleanup() { + return txn.end(false); + } }; - std::unordered_map _txns; - std::vector> _endFuts; + + MapWithExpiry _txns; // Txn idle timeout ConfigDuration httpproxy_txn_timeout{"httpproxy_txn_timeout", 60s}; }; // class HTTPProxy From a139c815e48c7b48227d1dc786da537854e4771e Mon Sep 17 00:00:00 2001 From: Ahsan Khan Date: Mon, 9 May 2022 17:52:08 -0700 Subject: [PATCH 3/5] Return iterator from find --- src/k2/common/MapWithExpiry.h | 13 ++++++++----- src/k2/httpproxy/HTTPProxy.cpp | 19 +++++++++++-------- 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/src/k2/common/MapWithExpiry.h b/src/k2/common/MapWithExpiry.h index c3f2ec6b..846eb39e 100644 --- a/src/k2/common/MapWithExpiry.h +++ b/src/k2/common/MapWithExpiry.h @@ -132,12 +132,14 @@ class MapWithExpiry { }); } + typedef typename TimestampOrderedMap::iterator iterator; + // Check if key is present, with optionally push back expiry if present - bool isPresent(const KeyT &key, bool reorder) { - auto iter = _elems.find(key); - if (iter == _elems.end()) return false; + iterator find(const KeyT &key, bool reorder) { + auto iter = _elems.find(key); + if (iter == _elems.end()) return iter; if (reorder) _elems.resetTs(iter, getExpiry()); - return true; + return iter; } TimePoint getExpiry() {return ClockT::now() + _timeout;} @@ -145,6 +147,8 @@ class MapWithExpiry { void erase(const KeyT &key) {_elems.erase(key);} auto size() {return _elems.size(); } ValT& at(const KeyT &key) {return _elems.at(key);} + iterator end() {return _elems.end();} + ValT& getValue(iterator it) {return it->second.getValue();} private: TimestampOrderedMap _elems; @@ -153,5 +157,4 @@ class MapWithExpiry { Duration _timeout = 10s; }; - } diff --git a/src/k2/httpproxy/HTTPProxy.cpp b/src/k2/httpproxy/HTTPProxy.cpp index 203a6da3..24838fe1 100644 --- a/src/k2/httpproxy/HTTPProxy.cpp +++ b/src/k2/httpproxy/HTTPProxy.cpp @@ -218,11 +218,12 @@ seastar::future HTTPProxy::_handleEnd(nlohmann::json&& request) return JsonResponse(Statuses::S400_Bad_Request("Bad json for end request")); } - if (!_txns.isPresent(id, false)) { + auto iter = _txns.find(id, false); + if (iter == _txns.end()) { return JsonResponse(Statuses::S400_Bad_Request("Could not find txnID for end request")); } - return _txns.at(id).txn.end(commit) + return _txns.getValue(iter).txn.end(commit) .then([this, id] (k2::EndResult&& result) { _txns.erase(id); return JsonResponse(std::move(result.status)); @@ -250,7 +251,7 @@ seastar::future HTTPProxy::_handleRead(nlohmann::json&& request) return JsonResponse(Statuses::S400_Bad_Request("Invalid json for read request")); } - if (!_txns.isPresent(id, true)) { + if (_txns.find(id, true) == _txns.end()) { return JsonResponse(Statuses::S400_Bad_Request("Could not find txnID for read request")); } @@ -268,7 +269,8 @@ seastar::future HTTPProxy::_handleRead(nlohmann::json&& request) _deserializationErrors++; return JsonResponse(Statuses::S400_Bad_Request(e.what())); } - return _txns.at(id).txn.read(std::move(record)) + auto& txnInfo = _txns.at(id); + return txnInfo.txn.read(std::move(record)) .then([this] (k2::ReadResult&& result) { if(!result.status.is2xxOK()) { return JsonResponse(std::move(result.status)); @@ -303,7 +305,7 @@ seastar::future HTTPProxy::_handleWrite(nlohmann::json&& request return JsonResponse(Statuses::S400_Bad_Request("Bad json for write request")); } - if (!_txns.isPresent(id, true)) { + if (_txns.find(id, true) == _txns.end()) { return JsonResponse(Statuses::S400_Bad_Request("Could not find txnID for write request")); } @@ -400,7 +402,7 @@ seastar::future HTTPProxy::_handleCreateQuery(nlohmann::json&& j return JsonResponse(Statuses::S400_Bad_Request("Bad json for query request")); } - if (!_txns.isPresent(txnID, true)) { + if (_txns.find(txnID, true) == _txns.end()) { return JsonResponse(Statuses::S400_Bad_Request("Could not find txnID for query request")); } @@ -441,11 +443,12 @@ seastar::future HTTPProxy::_handleQuery(nlohmann::json&& jsonReq _deserializationErrors++; return JsonResponse(Statuses::S400_Bad_Request("Bad json for query request")); } - if (!_txns.isPresent(txnID, true)) { + auto iter = _txns.find(txnID, true); + if (iter == _txns.end()) { return JsonResponse(Statuses::S400_Bad_Request("Could not find txnID for query request")); } - auto& txnInfo = _txns.at(txnID); + auto& txnInfo = _txns.getValue(iter); auto queryIter = txnInfo.queries.find(queryID); if (queryIter == txnInfo.queries.end()) { return JsonResponse(Statuses::S400_Bad_Request("Could not find queryID for query request")); From 55544f3eadc8c84973ff6840e9905518364659d3 Mon Sep 17 00:00:00 2001 From: Ahsan Khan Date: Tue, 10 May 2022 14:29:57 -0700 Subject: [PATCH 4/5] Add more check --- src/k2/common/MapWithExpiry.h | 11 +++-------- src/k2/httpproxy/HTTPProxy.cpp | 19 +++++++++++++------ test/integration/test_http.py | 4 ++-- 3 files changed, 18 insertions(+), 16 deletions(-) diff --git a/src/k2/common/MapWithExpiry.h b/src/k2/common/MapWithExpiry.h index 846eb39e..f1750c8d 100644 --- a/src/k2/common/MapWithExpiry.h +++ b/src/k2/common/MapWithExpiry.h @@ -134,14 +134,9 @@ class MapWithExpiry { typedef typename TimestampOrderedMap::iterator iterator; - // Check if key is present, with optionally push back expiry if present - iterator find(const KeyT &key, bool reorder) { - auto iter = _elems.find(key); - if (iter == _elems.end()) return iter; - if (reorder) _elems.resetTs(iter, getExpiry()); - return iter; - } - + // Move elment to the end of the TS ordered list + void reorder(iterator iter) {_elems.resetTs(iter, getExpiry());} + iterator find(const KeyT &key) {return _elems.find(key);} TimePoint getExpiry() {return ClockT::now() + _timeout;} void insert(const KeyT &key, ValT&& val) {_elems.insert(key, std::move(val), getExpiry());} void erase(const KeyT &key) {_elems.erase(key);} diff --git a/src/k2/httpproxy/HTTPProxy.cpp b/src/k2/httpproxy/HTTPProxy.cpp index 24838fe1..8dee4cd3 100644 --- a/src/k2/httpproxy/HTTPProxy.cpp +++ b/src/k2/httpproxy/HTTPProxy.cpp @@ -218,8 +218,8 @@ seastar::future HTTPProxy::_handleEnd(nlohmann::json&& request) return JsonResponse(Statuses::S400_Bad_Request("Bad json for end request")); } - auto iter = _txns.find(id, false); - if (iter == _txns.end()) { + auto iter = _txns.find(id); + if ( iter == _txns.end()) { return JsonResponse(Statuses::S400_Bad_Request("Could not find txnID for end request")); } @@ -251,8 +251,10 @@ seastar::future HTTPProxy::_handleRead(nlohmann::json&& request) return JsonResponse(Statuses::S400_Bad_Request("Invalid json for read request")); } - if (_txns.find(id, true) == _txns.end()) { + if (auto iter = _txns.find(id); iter == _txns.end()) { return JsonResponse(Statuses::S400_Bad_Request("Could not find txnID for read request")); + } else { + _txns.reorder(iter); } return _client.getSchema(collectionName, schemaName, ANY_SCHEMA_VERSION) @@ -305,8 +307,10 @@ seastar::future HTTPProxy::_handleWrite(nlohmann::json&& request return JsonResponse(Statuses::S400_Bad_Request("Bad json for write request")); } - if (_txns.find(id, true) == _txns.end()) { + if (auto iter = _txns.find(id); iter == _txns.end()) { return JsonResponse(Statuses::S400_Bad_Request("Could not find txnID for write request")); + } else { + _txns.reorder(iter); } return _client.getSchema(collectionName, schemaName, schemaVersion) @@ -402,8 +406,10 @@ seastar::future HTTPProxy::_handleCreateQuery(nlohmann::json&& j return JsonResponse(Statuses::S400_Bad_Request("Bad json for query request")); } - if (_txns.find(txnID, true) == _txns.end()) { + if (auto iter = _txns.find(txnID); iter == _txns.end()) { return JsonResponse(Statuses::S400_Bad_Request("Could not find txnID for query request")); + } else { + _txns.reorder(iter); } return _client.createQuery(collectionName, schemaName) @@ -443,11 +449,12 @@ seastar::future HTTPProxy::_handleQuery(nlohmann::json&& jsonReq _deserializationErrors++; return JsonResponse(Statuses::S400_Bad_Request("Bad json for query request")); } - auto iter = _txns.find(txnID, true); + auto iter = _txns.find(txnID); if (iter == _txns.end()) { return JsonResponse(Statuses::S400_Bad_Request("Could not find txnID for query request")); } + _txns.reorder(iter); auto& txnInfo = _txns.getValue(iter); auto queryIter = txnInfo.queries.find(queryID); if (queryIter == txnInfo.queries.end()) { diff --git a/test/integration/test_http.py b/test/integration/test_http.py index 79080530..4a1edd2b 100755 --- a/test/integration/test_http.py +++ b/test/integration/test_http.py @@ -494,8 +494,8 @@ def test_metrics(self): prev = mclient.refresh() status, txn = db.begin_txn() - # Sleep 1.2s for txn to timeout - sleep(1.2) + # Sleep 1.7s for txn to timeout + sleep(1.7) status = txn.end() self.assertEqual(status.code, 400, msg=status.message) curr = mclient.refresh() From 1cc17e81f386a8a6b11acbc9ac2a97307f5e452f Mon Sep 17 00:00:00 2001 From: Ahsan Khan Date: Tue, 10 May 2022 15:56:33 -0700 Subject: [PATCH 5/5] Add check before erase --- src/k2/common/MapWithExpiry.h | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/k2/common/MapWithExpiry.h b/src/k2/common/MapWithExpiry.h index f1750c8d..571d9091 100644 --- a/src/k2/common/MapWithExpiry.h +++ b/src/k2/common/MapWithExpiry.h @@ -69,10 +69,11 @@ template class TimestampOrderedMap { // Remove from map return elems.extract(iter); } - auto erase(const KeyT &key) { + void erase(const KeyT &key) { iterator iter = elems.find(key); + if (iter == elems.end()) return; unlink(iter); - return elems.erase(iter); + elems.erase(iter); } void resetTs(iterator iter, TimePoint ts) {