Skip to content

Commit

Permalink
h2load: Add --rps option
Browse files Browse the repository at this point in the history
  • Loading branch information
tatsuhiro-t committed Feb 23, 2021
1 parent 92944f7 commit 6cdc13d
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 5 deletions.
109 changes: 104 additions & 5 deletions src/h2load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ Config::~Config() {
bool Config::is_rate_mode() const { return (this->rate != 0); }
bool Config::is_timing_based_mode() const { return (this->duration > 0); }
bool Config::has_base_uri() const { return (!this->base_uri.empty()); }
bool Config::rps_enabled() const { return this->rps > 0.0; }
Config config;

namespace {
Expand Down Expand Up @@ -286,6 +287,51 @@ void warmup_timeout_cb(struct ev_loop *loop, ev_timer *w, int revents) {
}
} // namespace

namespace {
void rps_cb(struct ev_loop *loop, ev_timer *w, int revents) {
auto client = static_cast<Client *>(w->data);
auto &session = client->session;

assert(!config.timing_script);

if (client->req_left == 0) {
ev_timer_stop(loop, w);
return;
}

auto now = ev_now(loop);
auto d = now - client->rps_duration_started;
auto n = static_cast<size_t>(round(d * config.rps));
client->rps_req_pending += n;
client->rps_duration_started = now - d + static_cast<double>(n) / config.rps;

if (client->rps_req_pending == 0) {
return;
}

auto nreq = session->max_concurrent_streams() - client->rps_req_inflight;
if (nreq == 0) {
return;
}

nreq = config.is_timing_based_mode() ? std::max(nreq, client->req_left)
: std::min(nreq, client->req_left);
nreq = std::min(nreq, client->rps_req_pending);

client->rps_req_inflight += nreq;
client->rps_req_pending -= nreq;

for (; nreq > 0; --nreq) {
if (client->submit_request() != 0) {
client->process_request_failure();
break;
}
}

client->signal_write();
}
} // namespace

namespace {
// Called when an a connection has been inactive for a set period of time
// or a fixed amount of time after all requests have been made on a
Expand Down Expand Up @@ -374,7 +420,10 @@ Client::Client(uint32_t id, Worker *worker, size_t req_todo)
id(id),
fd(-1),
new_connection_requested(false),
final(false) {
final(false),
rps_duration_started(0),
rps_req_pending(0),
rps_req_inflight(0) {
if (req_todo == 0) { // this means infinite number of requests are to be made
// This ensures that number of requests are unbounded
// Just a positive number is fine, we chose the first positive number
Expand All @@ -396,6 +445,9 @@ Client::Client(uint32_t id, Worker *worker, size_t req_todo)

ev_timer_init(&request_timeout_watcher, client_request_timeout_cb, 0., 0.);
request_timeout_watcher.data = this;

ev_timer_init(&rps_watcher, rps_cb, 0., 0.);
rps_watcher.data = this;
}

Client::~Client() {
Expand Down Expand Up @@ -552,6 +604,7 @@ void Client::disconnect() {

ev_timer_stop(worker->loop, &conn_inactivity_watcher);
ev_timer_stop(worker->loop, &conn_active_watcher);
ev_timer_stop(worker->loop, &rps_watcher);
ev_timer_stop(worker->loop, &request_timeout_watcher);
streams.clear();
session.reset();
Expand Down Expand Up @@ -866,8 +919,18 @@ void Client::on_stream_close(int32_t stream_id, bool success, bool final) {
if (!ev_is_active(&request_timeout_watcher)) {
ev_feed_event(worker->loop, &request_timeout_watcher, EV_TIMER);
}
} else if (submit_request() != 0) {
process_request_failure();
} else if (!config.rps_enabled()) {
if (submit_request() != 0) {
process_request_failure();
}
} else if (rps_req_pending) {
--rps_req_pending;
if (submit_request() != 0) {
process_request_failure();
}
} else {
assert(rps_req_inflight);
--rps_req_inflight;
}
}
}
Expand Down Expand Up @@ -962,10 +1025,25 @@ int Client::connection_made() {

record_connect_time();

if (!config.timing_script) {
if (config.rps_enabled()) {
rps_watcher.repeat = std::max(0.01, 1. / config.rps);
ev_timer_again(worker->loop, &rps_watcher);
rps_duration_started = ev_now(worker->loop);
}

if (config.rps_enabled()) {
assert(req_left);

++rps_req_inflight;

if (submit_request() != 0) {
process_request_failure();
}
} else if (!config.timing_script) {
auto nreq = config.is_timing_based_mode()
? std::max(req_left, session->max_concurrent_streams())
: std::min(req_left, session->max_concurrent_streams());

for (; nreq > 0; --nreq) {
if (submit_request() != 0) {
process_request_failure();
Expand Down Expand Up @@ -1943,7 +2021,8 @@ void print_help(std::ostream &out) {
port defined in the first URI are used solely. Values
contained in other URIs, if present, are ignored.
Definition of a base URI overrides all scheme, host or
port values.
port values. --timing-script-file and --rps are
mutually exclusive.
-B, --base-uri=(<URI>|unix:<PATH>)
Specify URI from which the scheme, host and port will be
used for all requests. The base URI overrides all
Expand Down Expand Up @@ -1988,6 +2067,8 @@ void print_help(std::ostream &out) {
--connect-to=<HOST>[:<PORT>]
Host and port to connect instead of using the authority
in <URI>.
--rps=<N> Specify request per second for each client. --rps and
--timing-script-file are mutually exclusive.
-v, --verbose
Output debug information.
--version Display version information and exit.
Expand Down Expand Up @@ -2047,6 +2128,7 @@ int main(int argc, char **argv) {
{"warm-up-time", required_argument, &flag, 9},
{"log-file", required_argument, &flag, 10},
{"connect-to", required_argument, &flag, 11},
{"rps", required_argument, &flag, 12},
{nullptr, 0, nullptr, 0}};
int option_index = 0;
auto c = getopt_long(argc, argv,
Expand Down Expand Up @@ -2286,6 +2368,17 @@ int main(int argc, char **argv) {
config.connect_to_port = port;
break;
}
case 12: {
char *end;
auto v = std::strtod(optarg, &end);
if (end == optarg || *end != '\0' || !std::isfinite(v) ||
1. / v < 1e-6) {
std::cerr << "--rps: Invalid value " << optarg << std::endl;
exit(EXIT_FAILURE);
}
config.rps = v;
break;
}
}
break;
default:
Expand Down Expand Up @@ -2376,6 +2469,12 @@ int main(int argc, char **argv) {
exit(EXIT_FAILURE);
}

if (config.timing_script && config.rps_enabled()) {
std::cerr << "--timing-script-file, --rps: they are mutually exclusive."
<< std::endl;
exit(EXIT_FAILURE);
}

if (config.nreqs == 0 && !config.is_timing_based_mode()) {
std::cerr << "-n: the number of requests must be strictly greater than 0 "
"if timing-based test is not being run."
Expand Down
17 changes: 17 additions & 0 deletions src/h2load.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,16 @@ struct Config {
// list of supported NPN/ALPN protocol strings in the order of
// preference.
std::vector<std::string> npn_list;
// The number of request per second for each client.
double rps;

Config();
~Config();

bool is_rate_mode() const;
bool is_timing_based_mode() const;
bool has_base_uri() const;
bool rps_enabled() const;
};

struct RequestStat {
Expand Down Expand Up @@ -336,6 +339,20 @@ struct Client {
// true if the current connection will be closed, and no more new
// request cannot be processed.
bool final;
// rps_watcher is a timer to invoke callback periodically to
// generate a new request.
ev_timer rps_watcher;
// The timestamp that starts the period which contributes to the
// next request generation.
ev_tstamp rps_duration_started;
// The number of requests allowed by rps, but limited by stream
// concurrency.
size_t rps_req_pending;
// The number of in-flight streams. req_inflight has similar value
// but it only measures requests made during Phase::MAIN_DURATION.
// rps_req_inflight measures the number of requests in all phases,
// and it is only used if --rps is given.
size_t rps_req_inflight;

enum { ERR_CONNECT_FAIL = -100 };

Expand Down

0 comments on commit 6cdc13d

Please sign in to comment.