diff --git a/lib/vquic/curl_ngtcp2.c b/lib/vquic/curl_ngtcp2.c index 5d9fab9190d..d6480c004d7 100644 --- a/lib/vquic/curl_ngtcp2.c +++ b/lib/vquic/curl_ngtcp2.c @@ -135,6 +135,8 @@ struct cf_ngtcp2_ctx { struct Curl_hash streams; /* hash `data->id` to `h3_stream_ctx` */ size_t max_stream_window; /* max flow window for one stream */ uint64_t max_idle_ms; /* max idle time for QUIC connection */ + uint64_t used_bidi_streams; /* bidi streams we have opened */ + uint64_t max_bidi_streams; /* max bidi streams we can open */ int qlogfd; }; @@ -143,6 +145,14 @@ struct cf_ngtcp2_ctx { #define CF_CTX_CALL_DATA(cf) \ ((struct cf_ngtcp2_ctx *)(cf)->ctx)->call_data +struct pkt_io_ctx; +static CURLcode cf_progress_ingress(struct Curl_cfilter *cf, + struct Curl_easy *data, + struct pkt_io_ctx *pktx); +static CURLcode cf_progress_egress(struct Curl_cfilter *cf, + struct Curl_easy *data, + struct pkt_io_ctx *pktx); + /** * All about the H3 internals of a stream */ @@ -216,6 +226,7 @@ static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data) { struct cf_ngtcp2_ctx *ctx = cf->ctx; struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data); + CURLcode result; (void)cf; if(stream) { @@ -228,6 +239,9 @@ static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data) nghttp3_conn_set_stream_user_data(ctx->h3conn, stream->id, NULL); ngtcp2_conn_set_stream_user_data(ctx->qconn, stream->id, NULL); stream->closed = TRUE; + result = cf_progress_egress(cf, data, NULL); + if(result) + CURL_TRC_CF(data, cf, "data_done, flush egress -> %d", result); } Curl_hash_offt_remove(&ctx->streams, data->id); @@ -315,12 +329,6 @@ static void pktx_init(struct pkt_io_ctx *pktx, pktx_update_time(pktx, cf); } -static CURLcode cf_progress_ingress(struct Curl_cfilter *cf, - struct Curl_easy *data, - struct pkt_io_ctx *pktx); -static CURLcode cf_progress_egress(struct Curl_cfilter *cf, - struct Curl_easy *data, - struct pkt_io_ctx *pktx); static int cb_h3_acked_req_body(nghttp3_conn *conn, int64_t stream_id, uint64_t datalen, void *user_data, void *stream_user_data); @@ -550,10 +558,16 @@ static int cb_extend_max_local_streams_bidi(ngtcp2_conn *tconn, uint64_t max_streams, void *user_data) { - (void)tconn; - (void)max_streams; - (void)user_data; + struct Curl_cfilter *cf = user_data; + struct cf_ngtcp2_ctx *ctx = cf->ctx; + struct Curl_easy *data = CF_DATA_CURRENT(cf); + (void)tconn; + ctx->max_bidi_streams = max_streams; + if(data) + CURL_TRC_CF(data, cf, "max bidi streams now %" CURL_PRIu64 + ", used %" CURL_PRIu64, (curl_uint64_t)ctx->max_bidi_streams, + (curl_uint64_t)ctx->used_bidi_streams); return 0; } @@ -1353,6 +1367,7 @@ static ssize_t h3_stream_open(struct Curl_cfilter *cf, goto out; } stream->id = (curl_int64_t)sid; + ++ctx->used_bidi_streams; switch(data->state.httpreq) { case HTTPREQ_POST: @@ -2238,17 +2253,25 @@ static CURLcode cf_ngtcp2_query(struct Curl_cfilter *cf, switch(query) { case CF_QUERY_MAX_CONCURRENT: { - const ngtcp2_transport_params *rp; DEBUGASSERT(pres1); - CF_DATA_SAVE(save, cf, data); - rp = ngtcp2_conn_get_remote_transport_params(ctx->qconn); - if(rp) - *pres1 = (rp->initial_max_streams_bidi > INT_MAX)? - INT_MAX : (int)rp->initial_max_streams_bidi; - else /* not arrived yet? */ + /* Set after transport params arrived and continually updated + * by callback. QUIC counts the number over the lifetime of the + * connection, ever increasing. + * We count the *open* transfers plus the budget for new ones. */ + if(ctx->max_bidi_streams) { + uint64_t avail_bidi_streams = 0; + uint64_t max_streams = CONN_INUSE(cf->conn); + if(ctx->max_bidi_streams > ctx->used_bidi_streams) + avail_bidi_streams = ctx->max_bidi_streams - ctx->used_bidi_streams; + max_streams += avail_bidi_streams; + *pres1 = (max_streams > INT_MAX)? INT_MAX : (int)max_streams; + } + else /* transport params not arrived yet? take our default. */ *pres1 = Curl_multi_max_concurrent_streams(data->multi); - CURL_TRC_CF(data, cf, "query max_conncurrent -> %d", *pres1); + CURL_TRC_CF(data, cf, "query conn[%" CURL_FORMAT_CURL_OFF_T "]: " + "MAX_CONCURRENT -> %d (%zu in use)", + cf->conn->connection_id, *pres1, CONN_INUSE(cf->conn)); CF_DATA_RESTORE(cf, save); return CURLE_OK; } diff --git a/lib/vquic/curl_quiche.c b/lib/vquic/curl_quiche.c index d91549befe0..181dd7d497f 100644 --- a/lib/vquic/curl_quiche.c +++ b/lib/vquic/curl_quiche.c @@ -138,6 +138,9 @@ static void cf_quiche_ctx_clear(struct cf_quiche_ctx *ctx) } } +static CURLcode cf_flush_egress(struct Curl_cfilter *cf, + struct Curl_easy *data); + /** * All about the H3 internals of a stream */ @@ -222,6 +225,7 @@ static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data) { struct cf_quiche_ctx *ctx = cf->ctx; struct stream_ctx *stream = H3_STREAM_CTX(ctx, data); + CURLcode result; (void)cf; if(stream) { @@ -235,6 +239,9 @@ static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data) stream->send_closed = TRUE; } stream->closed = TRUE; + result = cf_flush_egress(cf, data); + if(result) + CURL_TRC_CF(data, cf, "data_done, flush egress -> %d", result); } Curl_hash_offt_remove(&ctx->streams, data->id); } @@ -288,16 +295,17 @@ static struct Curl_easy *get_stream_easy(struct Curl_cfilter *cf, return NULL; } -static void cf_quiche_expire_conn_transfers(struct Curl_cfilter *cf, - struct Curl_easy *data) +static void cf_quiche_expire_conn_closed(struct Curl_cfilter *cf, + struct Curl_easy *data) { struct Curl_easy *sdata; DEBUGASSERT(data->multi); - CURL_TRC_CF(data, cf, "expiring all transfers on this connection"); + CURL_TRC_CF(data, cf, "conn closed, expire all transfers"); for(sdata = data->multi->easyp; sdata; sdata = sdata->next) { if(sdata == data || sdata->conn != data->conn) continue; + CURL_TRC_CF(sdata, cf, "conn closed, expire transfer"); Curl_expire(sdata, 0, EXPIRE_RUN_NOW); } } @@ -600,6 +608,14 @@ static CURLcode recv_pkt(const unsigned char *pkt, size_t pktlen, &recv_info); if(nread < 0) { if(QUICHE_ERR_DONE == nread) { + if(quiche_conn_is_draining(ctx->qconn)) { + CURL_TRC_CF(r->data, r->cf, "ingress, connection is draining"); + return CURLE_RECV_ERROR; + } + if(quiche_conn_is_closed(ctx->qconn)) { + CURL_TRC_CF(r->data, r->cf, "ingress, connection is closed"); + return CURLE_RECV_ERROR; + } CURL_TRC_CF(r->data, r->cf, "ingress, quiche is DONE"); return CURLE_OK; } @@ -706,7 +722,7 @@ static CURLcode cf_flush_egress(struct Curl_cfilter *cf, failf(data, "connection closed by server"); /* Connection timed out, expire all transfers belonging to it * as will not get any more POLL events here. */ - cf_quiche_expire_conn_transfers(cf, data); + cf_quiche_expire_conn_closed(cf, data); return CURLE_SEND_ERROR; } } @@ -1489,7 +1505,9 @@ static CURLcode cf_quiche_query(struct Curl_cfilter *cf, max_streams += quiche_conn_peer_streams_left_bidi(ctx->qconn); } *pres1 = (max_streams > INT_MAX)? INT_MAX : (int)max_streams; - CURL_TRC_CF(data, cf, "query: MAX_CONCURRENT -> %d", *pres1); + CURL_TRC_CF(data, cf, "query conn[%" CURL_FORMAT_CURL_OFF_T "]: " + "MAX_CONCURRENT -> %d (%zu in use)", + cf->conn->connection_id, *pres1, CONN_INUSE(cf->conn)); return CURLE_OK; } case CF_QUERY_CONNECT_REPLY_MS: diff --git a/tests/http/clients/h2-download.c b/tests/http/clients/h2-download.c index 4ae283d2c39..74aac6a8ee9 100644 --- a/tests/http/clients/h2-download.c +++ b/tests/http/clients/h2-download.c @@ -380,7 +380,6 @@ int main(int argc, char *argv[]) if(still_running) { /* wait for activity, timeout or "nothing" */ mc = curl_multi_poll(multi_handle, NULL, 0, 1000, NULL); - fprintf(stderr, "curl_multi_poll() -> %d\n", mc); } if(mc)