Skip to content

Commit

Permalink
http3: quiche+ngtcp2 improvements
Browse files Browse the repository at this point in the history
- quiche: error transfers that try to receive on a closed
  or draining connection
- ngtcp2: use callback for extending max bidi streams. This
  allows more precise calculation of MAX_CONCURRENT as we
  only can start a new stream when the server acknowledges
  the close - not when we locally have closed it.
- remove a fprintf() from h2-download client to avoid excess
  log files on tests timing out.

Closes curl#13475
  • Loading branch information
icing authored and bagder committed Apr 26, 2024
1 parent fb22459 commit c8e0cd1
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 23 deletions.
57 changes: 40 additions & 17 deletions lib/vquic/curl_ngtcp2.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ struct cf_ngtcp2_ctx {
struct Curl_hash streams; /* hash `data->id` to `h3_stream_ctx` */
size_t max_stream_window; /* max flow window for one stream */
uint64_t max_idle_ms; /* max idle time for QUIC connection */
uint64_t used_bidi_streams; /* bidi streams we have opened */
uint64_t max_bidi_streams; /* max bidi streams we can open */
int qlogfd;
};

Expand All @@ -143,6 +145,14 @@ struct cf_ngtcp2_ctx {
#define CF_CTX_CALL_DATA(cf) \
((struct cf_ngtcp2_ctx *)(cf)->ctx)->call_data

struct pkt_io_ctx;
static CURLcode cf_progress_ingress(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct pkt_io_ctx *pktx);
static CURLcode cf_progress_egress(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct pkt_io_ctx *pktx);

/**
* All about the H3 internals of a stream
*/
Expand Down Expand Up @@ -216,6 +226,7 @@ static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data)
{
struct cf_ngtcp2_ctx *ctx = cf->ctx;
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
CURLcode result;

(void)cf;
if(stream) {
Expand All @@ -228,6 +239,9 @@ static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data)
nghttp3_conn_set_stream_user_data(ctx->h3conn, stream->id, NULL);
ngtcp2_conn_set_stream_user_data(ctx->qconn, stream->id, NULL);
stream->closed = TRUE;
result = cf_progress_egress(cf, data, NULL);
if(result)
CURL_TRC_CF(data, cf, "data_done, flush egress -> %d", result);
}

Curl_hash_offt_remove(&ctx->streams, data->id);
Expand Down Expand Up @@ -315,12 +329,6 @@ static void pktx_init(struct pkt_io_ctx *pktx,
pktx_update_time(pktx, cf);
}

static CURLcode cf_progress_ingress(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct pkt_io_ctx *pktx);
static CURLcode cf_progress_egress(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct pkt_io_ctx *pktx);
static int cb_h3_acked_req_body(nghttp3_conn *conn, int64_t stream_id,
uint64_t datalen, void *user_data,
void *stream_user_data);
Expand Down Expand Up @@ -550,10 +558,16 @@ static int cb_extend_max_local_streams_bidi(ngtcp2_conn *tconn,
uint64_t max_streams,
void *user_data)
{
(void)tconn;
(void)max_streams;
(void)user_data;
struct Curl_cfilter *cf = user_data;
struct cf_ngtcp2_ctx *ctx = cf->ctx;
struct Curl_easy *data = CF_DATA_CURRENT(cf);

(void)tconn;
ctx->max_bidi_streams = max_streams;
if(data)
CURL_TRC_CF(data, cf, "max bidi streams now %" CURL_PRIu64
", used %" CURL_PRIu64, (curl_uint64_t)ctx->max_bidi_streams,
(curl_uint64_t)ctx->used_bidi_streams);
return 0;
}

Expand Down Expand Up @@ -1353,6 +1367,7 @@ static ssize_t h3_stream_open(struct Curl_cfilter *cf,
goto out;
}
stream->id = (curl_int64_t)sid;
++ctx->used_bidi_streams;

switch(data->state.httpreq) {
case HTTPREQ_POST:
Expand Down Expand Up @@ -2238,17 +2253,25 @@ static CURLcode cf_ngtcp2_query(struct Curl_cfilter *cf,

switch(query) {
case CF_QUERY_MAX_CONCURRENT: {
const ngtcp2_transport_params *rp;
DEBUGASSERT(pres1);

CF_DATA_SAVE(save, cf, data);
rp = ngtcp2_conn_get_remote_transport_params(ctx->qconn);
if(rp)
*pres1 = (rp->initial_max_streams_bidi > INT_MAX)?
INT_MAX : (int)rp->initial_max_streams_bidi;
else /* not arrived yet? */
/* Set after transport params arrived and continually updated
* by callback. QUIC counts the number over the lifetime of the
* connection, ever increasing.
* We count the *open* transfers plus the budget for new ones. */
if(ctx->max_bidi_streams) {
uint64_t avail_bidi_streams = 0;
uint64_t max_streams = CONN_INUSE(cf->conn);
if(ctx->max_bidi_streams > ctx->used_bidi_streams)
avail_bidi_streams = ctx->max_bidi_streams - ctx->used_bidi_streams;
max_streams += avail_bidi_streams;
*pres1 = (max_streams > INT_MAX)? INT_MAX : (int)max_streams;
}
else /* transport params not arrived yet? take our default. */
*pres1 = Curl_multi_max_concurrent_streams(data->multi);
CURL_TRC_CF(data, cf, "query max_conncurrent -> %d", *pres1);
CURL_TRC_CF(data, cf, "query conn[%" CURL_FORMAT_CURL_OFF_T "]: "
"MAX_CONCURRENT -> %d (%zu in use)",
cf->conn->connection_id, *pres1, CONN_INUSE(cf->conn));
CF_DATA_RESTORE(cf, save);
return CURLE_OK;
}
Expand Down
28 changes: 23 additions & 5 deletions lib/vquic/curl_quiche.c
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ static void cf_quiche_ctx_clear(struct cf_quiche_ctx *ctx)
}
}

static CURLcode cf_flush_egress(struct Curl_cfilter *cf,
struct Curl_easy *data);

/**
* All about the H3 internals of a stream
*/
Expand Down Expand Up @@ -222,6 +225,7 @@ static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data)
{
struct cf_quiche_ctx *ctx = cf->ctx;
struct stream_ctx *stream = H3_STREAM_CTX(ctx, data);
CURLcode result;

(void)cf;
if(stream) {
Expand All @@ -235,6 +239,9 @@ static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data)
stream->send_closed = TRUE;
}
stream->closed = TRUE;
result = cf_flush_egress(cf, data);
if(result)
CURL_TRC_CF(data, cf, "data_done, flush egress -> %d", result);
}
Curl_hash_offt_remove(&ctx->streams, data->id);
}
Expand Down Expand Up @@ -288,16 +295,17 @@ static struct Curl_easy *get_stream_easy(struct Curl_cfilter *cf,
return NULL;
}

static void cf_quiche_expire_conn_transfers(struct Curl_cfilter *cf,
struct Curl_easy *data)
static void cf_quiche_expire_conn_closed(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
struct Curl_easy *sdata;

DEBUGASSERT(data->multi);
CURL_TRC_CF(data, cf, "expiring all transfers on this connection");
CURL_TRC_CF(data, cf, "conn closed, expire all transfers");
for(sdata = data->multi->easyp; sdata; sdata = sdata->next) {
if(sdata == data || sdata->conn != data->conn)
continue;
CURL_TRC_CF(sdata, cf, "conn closed, expire transfer");
Curl_expire(sdata, 0, EXPIRE_RUN_NOW);
}
}
Expand Down Expand Up @@ -600,6 +608,14 @@ static CURLcode recv_pkt(const unsigned char *pkt, size_t pktlen,
&recv_info);
if(nread < 0) {
if(QUICHE_ERR_DONE == nread) {
if(quiche_conn_is_draining(ctx->qconn)) {
CURL_TRC_CF(r->data, r->cf, "ingress, connection is draining");
return CURLE_RECV_ERROR;
}
if(quiche_conn_is_closed(ctx->qconn)) {
CURL_TRC_CF(r->data, r->cf, "ingress, connection is closed");
return CURLE_RECV_ERROR;
}
CURL_TRC_CF(r->data, r->cf, "ingress, quiche is DONE");
return CURLE_OK;
}
Expand Down Expand Up @@ -706,7 +722,7 @@ static CURLcode cf_flush_egress(struct Curl_cfilter *cf,
failf(data, "connection closed by server");
/* Connection timed out, expire all transfers belonging to it
* as will not get any more POLL events here. */
cf_quiche_expire_conn_transfers(cf, data);
cf_quiche_expire_conn_closed(cf, data);
return CURLE_SEND_ERROR;
}
}
Expand Down Expand Up @@ -1489,7 +1505,9 @@ static CURLcode cf_quiche_query(struct Curl_cfilter *cf,
max_streams += quiche_conn_peer_streams_left_bidi(ctx->qconn);
}
*pres1 = (max_streams > INT_MAX)? INT_MAX : (int)max_streams;
CURL_TRC_CF(data, cf, "query: MAX_CONCURRENT -> %d", *pres1);
CURL_TRC_CF(data, cf, "query conn[%" CURL_FORMAT_CURL_OFF_T "]: "
"MAX_CONCURRENT -> %d (%zu in use)",
cf->conn->connection_id, *pres1, CONN_INUSE(cf->conn));
return CURLE_OK;
}
case CF_QUERY_CONNECT_REPLY_MS:
Expand Down
1 change: 0 additions & 1 deletion tests/http/clients/h2-download.c
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,6 @@ int main(int argc, char *argv[])
if(still_running) {
/* wait for activity, timeout or "nothing" */
mc = curl_multi_poll(multi_handle, NULL, 0, 1000, NULL);
fprintf(stderr, "curl_multi_poll() -> %d\n", mc);
}

if(mc)
Expand Down

0 comments on commit c8e0cd1

Please sign in to comment.