diff --git a/src/k2/cmd/httpproxy/http_proxy_main.cpp b/src/k2/cmd/httpproxy/http_proxy_main.cpp index c99e2dee..9678e230 100644 --- a/src/k2/cmd/httpproxy/http_proxy_main.cpp +++ b/src/k2/cmd/httpproxy/http_proxy_main.cpp @@ -37,6 +37,7 @@ int main(int argc, char** argv) { ("partition_request_timeout", bpo::value(), "Timeout of K23SI operations, as chrono literals") ("cpo", bpo::value(), "URL of Control Plane Oracle (CPO), e.g. 'tcp+k2rpc://192.168.1.2:12345'") ("cpo_request_timeout", bpo::value(), "CPO request timeout") - ("cpo_request_backoff", bpo::value(), "CPO request backoff"); + ("cpo_request_backoff", bpo::value(), "CPO request backoff") + ("httpproxy_txn_timeout", bpo::value(), "HTTP Proxy Txn idle timeout"); return app.start(argc, argv); } diff --git a/src/k2/common/MapWithExpiry.h b/src/k2/common/MapWithExpiry.h new file mode 100644 index 00000000..571d9091 --- /dev/null +++ b/src/k2/common/MapWithExpiry.h @@ -0,0 +1,156 @@ +/* +MIT License + +Copyright(c) 2022 Futurewei Cloud + + Permission is hereby granted, + free of charge, to any person obtaining a copy of this software and associated documentation files(the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and / or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions : + + The above copyright notice and this permission notice shall be included in all copies + or + substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", + WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, + DAMAGES OR OTHER + LIABILITY, + WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE. +*/ + +#pragma once +#include "Timer.h" + +namespace k2 { +namespace nsbi = boost::intrusive; + +// Utility map class that also keeps a timestamp ordered elements list +template class TimestampOrderedMap { +public: + class MapElem { + public: + ValT& getValue() {return entry;} + TimePoint timestamp() {return ts;} + KeyT key; + ValT entry; + TimePoint ts; + nsbi::list_member_hook<> tsLink; + }; + + typedef typename std::unordered_map::iterator iterator; + typedef nsbi::list, &MapElem::tsLink>> TsOrderedList; + + + auto insert(const KeyT &key, ValT&& val, TimePoint ts) { + auto pair = elems.emplace(key, MapElem { + .key = key, + .entry = std::move(val), + .ts = ts, + .tsLink = {}, + }); + MapElem& elem = pair.first->second; + tsOrderedList.push_back(elem); + } + + // TODO: Implement using boost iterator helpers to reduce boilerplate code. + iterator end() {return elems.end();} + iterator begin() { return elems.begin();} + iterator find(const KeyT &key) {return elems.find(key);} + ValT& at(const KeyT &key) {return elems.at(key).getValue();} + auto size() {return elems.size();} + void unlink(iterator iter) { + tsOrderedList.erase(tsOrderedList.iterator_to(iter->second)); + } + auto extract(iterator iter) { + unlink(iter); + // Remove from map + return elems.extract(iter); + } + void erase(const KeyT &key) { + iterator iter = elems.find(key); + if (iter == elems.end()) return; + unlink(iter); + elems.erase(iter); + } + + void resetTs(iterator iter, TimePoint ts) { + // new TS needs to be >= current maximum TS + if (ts < tsOrderedList.back().ts) { + assert(false); + throw std::invalid_argument("too small ts"); + } + unlink(iter); + iter->second.ts = ts; + tsOrderedList.push_back(iter->second); + } + + iterator getFirst() { + if (tsOrderedList.empty()) + return elems.end(); + return find(tsOrderedList.front().key); + } + + TsOrderedList tsOrderedList; + std::unordered_map elems; +}; + +template +class MapWithExpiry { +public: + template + void start(Duration timeout, Func&& func) { + _timeout = timeout; + _expiryTimer.setCallback([this, func = std::move(func)] { + return seastar::do_until( + [this] { + auto iter = _elems.getFirst(); + return (iter == _elems.end() || iter->second.timestamp() > ClockT::now()); + }, + [this, func] { + auto iter = _elems.getFirst(); + // First Remove element from map before calling callback + auto node = _elems.extract(iter); + if (!node) return seastar::make_ready_future(); + return func(node.key(), node.mapped().getValue()); + }); + + }); + _expiryTimer.armPeriodic(_timeout/2); + } + + seastar::future<> stop() { + return _expiryTimer.stop() + .then([this] { + std::vector> bgFuts; + for(auto iter = _elems.begin(); iter != _elems.end(); iter++) { + _elems.unlink(iter); + bgFuts.push_back(iter->second.getValue().cleanup().discard_result()); + } + return seastar::when_all_succeed(bgFuts.begin(), bgFuts.end()); + }); + } + + typedef typename TimestampOrderedMap::iterator iterator; + + // Move elment to the end of the TS ordered list + void reorder(iterator iter) {_elems.resetTs(iter, getExpiry());} + iterator find(const KeyT &key) {return _elems.find(key);} + TimePoint getExpiry() {return ClockT::now() + _timeout;} + void insert(const KeyT &key, ValT&& val) {_elems.insert(key, std::move(val), getExpiry());} + void erase(const KeyT &key) {_elems.erase(key);} + auto size() {return _elems.size(); } + ValT& at(const KeyT &key) {return _elems.at(key);} + iterator end() {return _elems.end();} + ValT& getValue(iterator it) {return it->second.getValue();} + +private: + TimestampOrderedMap _elems; + // heartbeats checks are driven off single timer. + PeriodicTimer _expiryTimer; + Duration _timeout = 10s; + +}; +} diff --git a/src/k2/httpproxy/HTTPProxy.cpp b/src/k2/httpproxy/HTTPProxy.cpp index 81ecf350..8dee4cd3 100644 --- a/src/k2/httpproxy/HTTPProxy.cpp +++ b/src/k2/httpproxy/HTTPProxy.cpp @@ -168,13 +168,14 @@ HTTPProxy::HTTPProxy(): seastar::future<> HTTPProxy::gracefulStop() { _stopped = true; - - auto it = _txns.begin(); - for(; it != _txns.end(); ++it) { - _endFuts.push_back(it->second.end(false).discard_result()); - } - - return seastar::when_all_succeed(_endFuts.begin(), _endFuts.end()); + return _txns.stop() + .then_wrapped([] (auto&& fut) { + if (fut.failed()) { + K2LOG_W_EXC(log::httpproxy, fut.get_exception(), "txn failed background task"); + } + K2LOG_I(log::httpproxy, "stopped"); + return seastar::make_ready_future(); + }); } seastar::future<> HTTPProxy::start() { @@ -182,17 +183,25 @@ seastar::future<> HTTPProxy::start() { _registerMetrics(); _registerAPI(); auto _startFut = seastar::make_ready_future<>(); + _txns.start(httpproxy_txn_timeout(), [this] (uint64_t txnID, TxnInfo& txnInfo){ + K2LOG_I(log::httpproxy, "Erasing txnid={}", txnID); + _numQueries -= txnInfo.queries.size(); + _timedoutTxns++; + return txnInfo.txn.end(false).discard_result(); + }); _startFut = _startFut.then([this] {return _client.start();}); return _startFut; } + seastar::future HTTPProxy::_handleBegin(nlohmann::json&& request) { (void) request; return _client.beginTxn(k2::K2TxnOptions()) .then([this] (auto&& txn) { K2LOG_D(k2::log::httpproxy, "begin txn: {}", txn.mtr()); - _txns[_txnID++] = std::move(txn); - return JsonResponse(Statuses::S201_Created("Begin txn success"), nlohmann::json{{"txnID", _txnID - 1}}); + auto txnid = _txnID++; + _txns.insert(txnid, TxnInfo{std::move(txn), {}}); + return JsonResponse(Statuses::S201_Created("Begin txn success"), nlohmann::json{{"txnID", txnid}}); }); } @@ -209,12 +218,12 @@ seastar::future HTTPProxy::_handleEnd(nlohmann::json&& request) return JsonResponse(Statuses::S400_Bad_Request("Bad json for end request")); } - std::unordered_map::iterator it = _txns.find(id); - if (it == _txns.end()) { + auto iter = _txns.find(id); + if ( iter == _txns.end()) { return JsonResponse(Statuses::S400_Bad_Request("Could not find txnID for end request")); } - return it->second.end(commit) + return _txns.getValue(iter).txn.end(commit) .then([this, id] (k2::EndResult&& result) { _txns.erase(id); return JsonResponse(std::move(result.status)); @@ -242,9 +251,10 @@ seastar::future HTTPProxy::_handleRead(nlohmann::json&& request) return JsonResponse(Statuses::S400_Bad_Request("Invalid json for read request")); } - std::unordered_map::iterator it = _txns.find(id); - if (it == _txns.end()) { + if (auto iter = _txns.find(id); iter == _txns.end()) { return JsonResponse(Statuses::S400_Bad_Request("Could not find txnID for read request")); + } else { + _txns.reorder(iter); } return _client.getSchema(collectionName, schemaName, ANY_SCHEMA_VERSION) @@ -261,7 +271,8 @@ seastar::future HTTPProxy::_handleRead(nlohmann::json&& request) _deserializationErrors++; return JsonResponse(Statuses::S400_Bad_Request(e.what())); } - return _txns[id].read(std::move(record)) + auto& txnInfo = _txns.at(id); + return txnInfo.txn.read(std::move(record)) .then([this] (k2::ReadResult&& result) { if(!result.status.is2xxOK()) { return JsonResponse(std::move(result.status)); @@ -296,9 +307,10 @@ seastar::future HTTPProxy::_handleWrite(nlohmann::json&& request return JsonResponse(Statuses::S400_Bad_Request("Bad json for write request")); } - std::unordered_map::iterator it = _txns.find(id); - if (it == _txns.end()) { + if (auto iter = _txns.find(id); iter == _txns.end()) { return JsonResponse(Statuses::S400_Bad_Request("Could not find txnID for write request")); + } else { + _txns.reorder(iter); } return _client.getSchema(collectionName, schemaName, schemaVersion) @@ -315,8 +327,7 @@ seastar::future HTTPProxy::_handleWrite(nlohmann::json&& request _deserializationErrors++; return JsonResponse(Statuses::S400_Bad_Request(e.what())); } - - return _txns[id].write(record) + return _txns.at(id).txn.write(record) .then([] (k2::WriteResult&& result) { return JsonResponse(std::move(result.status)); }); @@ -384,17 +395,25 @@ seastar::future HTTPProxy::_handleGetKeyString(nlohmann::json&& seastar::future HTTPProxy::_handleCreateQuery(nlohmann::json&& jsonReq) { std::string collectionName; std::string schemaName; + uint64_t txnID; try { jsonReq.at("collectionName").get_to(collectionName); jsonReq.at("schemaName").get_to(schemaName); + jsonReq.at("txnID").get_to(txnID); } catch(...) { _deserializationErrors++; return JsonResponse(Statuses::S400_Bad_Request("Bad json for query request")); } + if (auto iter = _txns.find(txnID); iter == _txns.end()) { + return JsonResponse(Statuses::S400_Bad_Request("Could not find txnID for query request")); + } else { + _txns.reorder(iter); + } + return _client.createQuery(collectionName, schemaName) - .then([this, req=std::move(jsonReq)] (auto&& result) mutable { + .then([this, txnID, req=std::move(jsonReq)] (auto&& result) mutable { if(!result.status.is2xxOK()) { return JsonResponse(std::move(result.status)); } @@ -411,8 +430,10 @@ seastar::future HTTPProxy::_handleCreateQuery(nlohmann::json&& j if (req.contains("reverse")) { result.query.setReverseDirection(req["reverse"]); } - _queries[_queryID++] = std::move(result.query); - nlohmann::json resp{{"queryID", _queryID - 1}}; + auto queryid = _queryID++; + _txns.at(txnID).queries[queryid] = std::move(result.query); + _numQueries++; + nlohmann::json resp{{"queryID", queryid}}; return JsonResponse(std::move(result.status), std::move(resp)); }); } @@ -428,18 +449,20 @@ seastar::future HTTPProxy::_handleQuery(nlohmann::json&& jsonReq _deserializationErrors++; return JsonResponse(Statuses::S400_Bad_Request("Bad json for query request")); } - auto txnIter = _txns.find(txnID); - if (txnIter == _txns.end()) { + auto iter = _txns.find(txnID); + if (iter == _txns.end()) { return JsonResponse(Statuses::S400_Bad_Request("Could not find txnID for query request")); } - auto queryIter = _queries.find(queryID); - if (queryIter == _queries.end()) { + _txns.reorder(iter); + auto& txnInfo = _txns.getValue(iter); + auto queryIter = txnInfo.queries.find(queryID); + if (queryIter == txnInfo.queries.end()) { return JsonResponse(Statuses::S400_Bad_Request("Could not find queryID for query request")); } - return txnIter->second.query(queryIter->second) - .then([this, queryID](QueryResult&& result) { + return txnInfo.txn.query(queryIter->second) + .then([this, txnID, queryID](QueryResult&& result) { if(!result.status.is2xxOK()) { return JsonResponse(std::move(result.status)); } @@ -449,10 +472,12 @@ seastar::future HTTPProxy::_handleQuery(nlohmann::json&& jsonReq for (auto& record: result.records) { records.push_back(serializeJSONFromRecord(record)); } - - bool isDone = _queries[queryID].isDone(); + auto& txnInfo = _txns.at(txnID); + auto& query = txnInfo.queries.at(queryID); + bool isDone = query.isDone(); if (isDone) { - _queries.erase(queryID); + txnInfo.queries.erase(queryID); + _numQueries--; } nlohmann::json resp; resp["records"] = std::move(records); @@ -509,9 +534,10 @@ void HTTPProxy::_registerMetrics() { _metric_groups.add_group("session", { sm::make_counter("deserialization_errors", _deserializationErrors, sm::description("Total number of deserialization errors"), labels), + sm::make_counter("timed_out_txns", _timedoutTxns, sm::description("Total number of txn timed out"), labels), sm::make_gauge("open_txns", [this]{ return _txns.size();}, sm::description("Total number of open txn handles"), labels), - sm::make_gauge("open_queries", [this]{ return _queries.size();}, sm::description("Total number of open queries"), labels), + sm::make_gauge("open_queries", [this]{ return _numQueries;}, sm::description("Total number of open queries"), labels), }); } } \ No newline at end of file diff --git a/src/k2/httpproxy/HTTPProxy.h b/src/k2/httpproxy/HTTPProxy.h index 5931dcb8..47eb7ba4 100644 --- a/src/k2/httpproxy/HTTPProxy.h +++ b/src/k2/httpproxy/HTTPProxy.h @@ -23,10 +23,14 @@ Copyright(c) 2022 Futurewei Cloud #pragma once +#include #include +#include +#include namespace k2 { + class HTTPProxy { public: // application lifespan HTTPProxy(); @@ -57,15 +61,27 @@ class HTTPProxy { sm::metric_groups _metric_groups; uint64_t _deserializationErrors = 0; + uint64_t _timedoutTxns = 0; + uint64_t _numQueries = 0; bool _stopped = true; k2::K23SIClient _client; uint64_t _txnID = 0; uint64_t _queryID = 0; - std::unordered_map _txns; - // Store in progress queries - std::unordered_map _queries; - std::vector> _endFuts; + + struct TxnInfo { + k2::K2TxnHandle txn; + // Store in progress queries + std::unordered_map queries; + // clanup function is required for MapWithExpiry + auto cleanup() { + return txn.end(false); + } + }; + + MapWithExpiry _txns; + // Txn idle timeout + ConfigDuration httpproxy_txn_timeout{"httpproxy_txn_timeout", 60s}; }; // class HTTPProxy } // namespace k2 diff --git a/test/integration/skvclient.py b/test/integration/skvclient.py index 1dcd0a5b..25bb0f36 100644 --- a/test/integration/skvclient.py +++ b/test/integration/skvclient.py @@ -134,6 +134,26 @@ def read(self, loc: DBLoc) -> Tuple[Status, object] : output = result["response"].get("record") if "response" in result else None return Status(result), output + def create_query(self, collectionName: str, + schemaName: str, start: dict = None, end: dict = None, + limit: int = 0, reverse: bool = False) -> Tuple[Status, Query]: + data = {"collectionName": collectionName, + "schemaName": schemaName, "txnID": self._txn_id} + if start: + data["startScanRecord"] = start + if end: + data["endScanRecord"] = end + if limit: + data["limit"] = limit + if reverse: + data["reverse"] = reverse + + result = self._send_req("/api/CreateQuery", data) + status = Status(result) + if "response" in result: + return status, Query(result["response"]["queryID"]) + return status, None + def query(self, query: Query) -> Tuple[Status, ListOfDict]: request = {"txnID" : self._txn_id, "queryID": query.query_id} result = self._send_req("/api/Query", request) @@ -264,28 +284,6 @@ def create_collection(self, metadata: CollectionMetadata, rangeEnds: [str] = []) status = Status(result) return status - def create_query(self, collectionName: str, - schemaName: str, start: dict = None, end: dict = None, - limit: int = 0, reverse: bool = False) -> Tuple[Status, Query]: - url = self.http + "/api/CreateQuery" - data = {"collectionName": collectionName, - "schemaName": schemaName} - if start: - data["startScanRecord"] = start - if end: - data["endScanRecord"] = end - if limit: - data["limit"] = limit - if reverse: - data["reverse"] = reverse - - r = requests.post(url, data=json.dumps(data)) - result = r.json() - status = Status(result) - if "response" in result: - return status, Query(result["response"]["queryID"]) - return status, None - def get_key_string(self, fields: [FieldSpec]) -> Tuple[Status, str]: url = self.http + "/api/GetKeyString" req = {"fields": fields} diff --git a/test/integration/test_http.py b/test/integration/test_http.py index b6a796f0..4a1edd2b 100755 --- a/test/integration/test_http.py +++ b/test/integration/test_http.py @@ -32,6 +32,7 @@ CollectionMetadata, SKVClient, FieldSpec, MetricsClient, Counter, Histogram) from datetime import timedelta +from time import sleep parser = argparse.ArgumentParser() parser.add_argument("--http", help="HTTP API URL") @@ -386,33 +387,33 @@ def test_query(self): all_records = [record1, record2] - status, query_id = db.create_query("query_collection", "query_test") + status, query_id = txn.create_query("query_collection", "query_test") self.assertEqual(status.code, 200, msg=status.message) status, records = txn.queryAll(query_id) self.assertEqual(status.code, 200, msg=status.message) self.assertEqual(records, all_records) - status, query_id = db.create_query("query_collection", "query_test", + status, query_id = txn.create_query("query_collection", "query_test", start = {"partition": "default", "partition1": "h"}) self.assertEqual(status.code, 200, msg=status.message) status, records = txn.queryAll(query_id) self.assertEqual(status.code, 200, msg=status.message) self.assertEqual(records, all_records[1:]) - status, query_id = db.create_query("query_collection", "query_test", + status, query_id = txn.create_query("query_collection", "query_test", end = {"partition": "default", "partition1": "h"}) self.assertEqual(status.code, 200, msg=status.message) status, records = txn.queryAll(query_id) self.assertEqual(status.code, 200, msg=status.message) self.assertEqual(records, all_records[:1]) - status, query_id = db.create_query("query_collection", "query_test", limit = 1) + status, query_id = txn.create_query("query_collection", "query_test", limit = 1) self.assertEqual(status.code, 200, msg=status.message) status, records = txn.queryAll(query_id) self.assertEqual(status.code, 200, msg=status.message) self.assertEqual(records, all_records[:1]) - status, query_id = db.create_query("query_collection", "query_test", reverse = True) + status, query_id = txn.create_query("query_collection", "query_test", reverse = True) self.assertEqual(status.code, 200, msg=status.message) status, records = txn.queryAll(query_id) self.assertEqual(status.code, 200, msg=status.message) @@ -420,7 +421,7 @@ def test_query(self): copied.reverse() self.assertEqual(records, copied) - status, query_id = db.create_query("query_collection", "query_test", + status, query_id = txn.create_query("query_collection", "query_test", limit = 1, reverse = True) self.assertEqual(status.code, 200, msg=status.message) status, records = txn.queryAll(query_id) @@ -428,13 +429,13 @@ def test_query(self): self.assertEqual(records, all_records[1:]) # Send reverse with invalid type, should fail with type error - status, query_id = db.create_query("query_collection", "query_test", + status, query_id = txn.create_query("query_collection", "query_test", limit = 1, reverse = 5) self.assertEqual(status.code, 500, msg=status.message) self.assertIn("type_error", status.message, msg=status.message) # Send limit with invalid type, should fail with type error - status, query_id = db.create_query("query_collection", "query_test", + status, query_id = txn.create_query("query_collection", "query_test", limit = "test", reverse = False) self.assertEqual(status.code, 500, msg=status.message) self.assertIn("type_error", status.message, msg=status.message) @@ -458,6 +459,7 @@ def test_metrics(self): mclient = MetricsClient(args.prometheus, [ Counter("HttpProxy", "session", "open_txns"), Counter("HttpProxy", "session", "deserialization_errors"), + Counter("HttpProxy", "session", "timed_out_txns"), Histogram("HttpProxy", "K23SI_client", "txn_begin_latency"), Histogram("HttpProxy", "K23SI_client", "txn_end_latency"), Histogram("HttpProxy", "K23SI_client", "txn_duration") @@ -490,5 +492,24 @@ def test_metrics(self): self.assertEqual(curr.txn_end_latency, prev.txn_end_latency+1) self.assertEqual(curr.txn_duration, prev.txn_duration+1) + prev = mclient.refresh() + status, txn = db.begin_txn() + # Sleep 1.7s for txn to timeout + sleep(1.7) + status = txn.end() + self.assertEqual(status.code, 400, msg=status.message) + curr = mclient.refresh() + self.assertEqual(curr.timed_out_txns, prev.timed_out_txns+1) + + status, txn = db.begin_txn() + # Sleep 0.7 and write, it should succeed because within timeout + sleep(0.7) + status = txn.write(loc, additional_data) + self.assertEqual(status.code, 201, msg=status.message) + # Sleep additional time, should succeed as time out pushed back because of write + sleep(0.7) + status = txn.end() + self.assertEqual(status.code, 200, msg=status.message) + del sys.argv[1:] unittest.main() diff --git a/test/integration/test_http_client.sh b/test/integration/test_http_client.sh index fa3cd3b4..528992d7 100755 --- a/test/integration/test_http_client.sh +++ b/test/integration/test_http_client.sh @@ -22,7 +22,7 @@ tso_child_pid=$! sleep 2 -./build/src/k2/cmd/httpproxy/http_proxy ${COMMON_ARGS} -c1 --tcp_endpoints ${HTTP} --memory=1G --cpo ${CPO} & +./build/src/k2/cmd/httpproxy/http_proxy ${COMMON_ARGS} -c1 --tcp_endpoints ${HTTP} --memory=1G --cpo ${CPO} --httpproxy_txn_timeout=1s& http_child_pid=$! function finish {