From 70a5ab15cb82a21805d2c8d8f66d1256211a38f8 Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Mon, 30 Oct 2023 06:06:04 -0400 Subject: [PATCH 01/27] * FIX [quic] Remove repeated include headers. Signed-off-by: wanghaemq --- src/supplemental/quic/msquic_dial.c | 17 ----------------- src/supplemental/quic/msquic_listen.c | 17 ----------------- src/supplemental/quic/quic_api.c | 4 ---- 3 files changed, 38 deletions(-) diff --git a/src/supplemental/quic/msquic_dial.c b/src/supplemental/quic/msquic_dial.c index 1e548e7e7..2b1261b97 100644 --- a/src/supplemental/quic/msquic_dial.c +++ b/src/supplemental/quic/msquic_dial.c @@ -39,23 +39,6 @@ #include #include -#include "nng/mqtt/mqtt_client.h" -#include "nng/supplemental/nanolib/conf.h" -#include "nng/protocol/mqtt/mqtt_parser.h" -#include "supplemental/mqtt/mqtt_msg.h" - -#include "openssl/pem.h" -#include "openssl/x509.h" - -#include -#include -#include -#include -#include -#include -#include -#include - struct nni_quic_conn { nng_stream stream; nni_list readq; diff --git a/src/supplemental/quic/msquic_listen.c b/src/supplemental/quic/msquic_listen.c index caba102be..41cb81649 100644 --- a/src/supplemental/quic/msquic_listen.c +++ b/src/supplemental/quic/msquic_listen.c @@ -40,23 +40,6 @@ #include #include -#include "nng/mqtt/mqtt_client.h" -#include "nng/supplemental/nanolib/conf.h" -#include "nng/protocol/mqtt/mqtt_parser.h" -#include "supplemental/mqtt/mqtt_msg.h" - -#include "openssl/pem.h" -#include "openssl/x509.h" - -#include -#include -#include -#include -#include -#include -#include -#include - struct nni_quic_conn { nng_stream stream; nni_list readq; diff --git a/src/supplemental/quic/quic_api.c b/src/supplemental/quic/quic_api.c index 4ac573a9a..7b937ffe0 100644 --- a/src/supplemental/quic/quic_api.c +++ b/src/supplemental/quic/quic_api.c @@ -8,7 +8,6 @@ // #include "quic_api.h" #include "core/nng_impl.h" -#include "msquic.h" #include "nng/mqtt/mqtt_client.h" #include "nng/nng.h" @@ -16,9 +15,6 @@ #include "nng/protocol/mqtt/mqtt_parser.h" #include "supplemental/mqtt/mqtt_msg.h" -#include "openssl/pem.h" -#include "openssl/x509.h" - #include #include #include From 26bc9fbd493034f6baea1bf32877f5a91a5c1680 Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Tue, 31 Oct 2023 02:18:55 -0400 Subject: [PATCH 02/27] * ADD [quic] Add a private quic header file for share functionality between dialer and listener (quic_private.h). Signed-off-by: wanghaemq --- src/supplemental/quic/msquic_dial.c | 19 +------------------ src/supplemental/quic/quic_private.h | 25 +++++++++++++++++++++++++ 2 files changed, 26 insertions(+), 18 deletions(-) create mode 100644 src/supplemental/quic/quic_private.h diff --git a/src/supplemental/quic/msquic_dial.c b/src/supplemental/quic/msquic_dial.c index 2b1261b97..2e5ab83ba 100644 --- a/src/supplemental/quic/msquic_dial.c +++ b/src/supplemental/quic/msquic_dial.c @@ -19,6 +19,7 @@ // closed. #include "quic_api.h" +#include "quic_private.h" #include "core/nng_impl.h" #include "msquic.h" @@ -56,24 +57,6 @@ struct nni_quic_conn { nni_reap_node reap; }; -static const QUIC_API_TABLE *MsQuic = NULL; - -// Config for msquic -static const QUIC_REGISTRATION_CONFIG quic_reg_config = { - "mqtt", - QUIC_EXECUTION_PROFILE_LOW_LATENCY -}; - -static const QUIC_BUFFER quic_alpn = { - sizeof("mqtt") - 1, - (uint8_t *) "mqtt" -}; - -HQUIC registration; -HQUIC configuration; - -static int msquic_open(); -static void msquic_close(); static int msquic_conn_open(const char *host, const char *port, nni_quic_dialer *d); static void msquic_conn_close(HQUIC qconn, int rv); static void msquic_conn_fini(HQUIC qconn); diff --git a/src/supplemental/quic/quic_private.h b/src/supplemental/quic/quic_private.h new file mode 100644 index 000000000..99c0c81a0 --- /dev/null +++ b/src/supplemental/quic/quic_private.h @@ -0,0 +1,25 @@ +#ifndef NNG_SUPP_QUIC_PRIVATE_H +#define NNG_SUPP_QUIC_PRIVATE_H + +#include "msquic.h" + +// Config for msquic +static const QUIC_REGISTRATION_CONFIG quic_reg_config = { + "mqtt", + QUIC_EXECUTION_PROFILE_LOW_LATENCY +}; + +static const QUIC_BUFFER quic_alpn = { + sizeof("mqtt") - 1, + (uint8_t *) "mqtt" +}; + +static const QUIC_API_TABLE *MsQuic = NULL; + +static HQUIC registration; +static HQUIC configuration; + +int msquic_open(); +void msquic_close(); + +#endif From 02f683ed95bcf5676f1e1e4c07d32946950e7ae7 Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Tue, 31 Oct 2023 02:19:17 -0400 Subject: [PATCH 03/27] * NEW [quic] Add new file msquic_common.c to share codes of common functionality between dialer and listener. * FIX [cmake] Add new files to cmake. Signed-off-by: wanghaemq --- src/supplemental/quic/CMakeLists.txt | 5 +++ src/supplemental/quic/msquic_common.c | 52 +++++++++++++++++++++++++++ src/supplemental/quic/msquic_dial.c | 49 ------------------------- 3 files changed, 57 insertions(+), 49 deletions(-) create mode 100644 src/supplemental/quic/msquic_common.c diff --git a/src/supplemental/quic/CMakeLists.txt b/src/supplemental/quic/CMakeLists.txt index ec6ce3251..360956d7e 100644 --- a/src/supplemental/quic/CMakeLists.txt +++ b/src/supplemental/quic/CMakeLists.txt @@ -13,9 +13,14 @@ if (NNG_ENABLE_QUIC) add_subdirectory(msquic) add_dependencies(nng msquic) + nng_sources(msquic_common.c) + nng_sources(quic_private.h) + nng_sources(quic_api.c) nng_sources(quic_api.h) + nng_sources(msquic_dial.c) + nng_test(quic_api_test) find_path(INTERNAL_MSQUIC_INCLUDE_DIR diff --git a/src/supplemental/quic/msquic_common.c b/src/supplemental/quic/msquic_common.c new file mode 100644 index 000000000..631262e30 --- /dev/null +++ b/src/supplemental/quic/msquic_common.c @@ -0,0 +1,52 @@ +#include "quic_private.h" +#include "core/nng_impl.h" + +static int is_msquic_inited = 0; + +void +msquic_close() +{ + if (MsQuic != NULL) { + if (configuration != NULL) { + MsQuic->ConfigurationClose(configuration); + } + if (registration != NULL) { + // This will block until all outstanding child objects + // have been closed. + MsQuic->RegistrationClose(registration); + } + MsQuicClose(MsQuic); + is_msquic_inited = 0; + } +} + +int +msquic_open() +{ + if (is_msquic_inited == 1) + return 0; + + QUIC_STATUS rv = QUIC_STATUS_SUCCESS; + // only Open MsQUIC lib once, otherwise cause memleak + if (MsQuic == NULL) + if (QUIC_FAILED(rv = MsQuicOpen2(&MsQuic))) { + log_error("MsQuicOpen2 failed, 0x%x!\n", rv); + goto error; + } + + // Create a registration for the app's connections. + rv = MsQuic->RegistrationOpen(&quic_reg_config, ®istration); + if (QUIC_FAILED(rv)) { + log_error("RegistrationOpen failed, 0x%x!\n", rv); + goto error; + } + + is_msquic_inited = 1; + log_info("Msquic is enabled"); + return 0; + +error: + msquic_close(); + return -1; +} + diff --git a/src/supplemental/quic/msquic_dial.c b/src/supplemental/quic/msquic_dial.c index 2e5ab83ba..33bc55049 100644 --- a/src/supplemental/quic/msquic_dial.c +++ b/src/supplemental/quic/msquic_dial.c @@ -993,55 +993,6 @@ msquic_strm_cb(_In_ HQUIC stream, _In_opt_ void *Context, return QUIC_STATUS_SUCCESS; } -static int is_msquic_inited = 0; - -static void -msquic_close() -{ - if (MsQuic != NULL) { - if (configuration != NULL) { - MsQuic->ConfigurationClose(configuration); - } - if (registration != NULL) { - // This will block until all outstanding child objects - // have been closed. - MsQuic->RegistrationClose(registration); - } - MsQuicClose(MsQuic); - is_msquic_inited = 0; - } -} - -static int -msquic_open() -{ - if (is_msquic_inited == 1) - return 0; - - QUIC_STATUS rv = QUIC_STATUS_SUCCESS; - // only Open MsQUIC lib once, otherwise cause memleak - if (MsQuic == NULL) - if (QUIC_FAILED(rv = MsQuicOpen2(&MsQuic))) { - log_error("MsQuicOpen2 failed, 0x%x!\n", rv); - goto error; - } - - // Create a registration for the app's connections. - rv = MsQuic->RegistrationOpen(&quic_reg_config, ®istration); - if (QUIC_FAILED(rv)) { - log_error("RegistrationOpen failed, 0x%x!\n", rv); - goto error; - } - - is_msquic_inited = 1; - log_info("Msquic is enabled"); - return 0; - -error: - msquic_close(); - return -1; -} - // Helper function to load a client configuration. static BOOLEAN msquic_load_config(QUIC_SETTINGS *settings, nni_quic_dialer *d) From 861ac973993bcda3f28118e773a8004c5f20929e Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Tue, 31 Oct 2023 02:19:21 -0400 Subject: [PATCH 04/27] * FIX [quic/listen] Include quic_private.h. Signed-off-by: wanghaemq --- src/supplemental/quic/msquic_listen.c | 18 +----------------- 1 file changed, 1 insertion(+), 17 deletions(-) diff --git a/src/supplemental/quic/msquic_listen.c b/src/supplemental/quic/msquic_listen.c index 41cb81649..6161160ca 100644 --- a/src/supplemental/quic/msquic_listen.c +++ b/src/supplemental/quic/msquic_listen.c @@ -18,8 +18,8 @@ // The quic connection would be free if all nng streams // closed. -#include "msquic_posix.h" #include "quic_api.h" +#include "quic_private.h" #include "core/nng_impl.h" #include "msquic.h" @@ -57,22 +57,6 @@ struct nni_quic_conn { nni_reap_node reap; }; -static const QUIC_API_TABLE *MsQuic = NULL; - -// Config for msquic -static const QUIC_REGISTRATION_CONFIG quic_reg_config = { - "mqtt_listener", - QUIC_EXECUTION_PROFILE_LOW_LATENCY -}; - -static const QUIC_BUFFER quic_alpn = { - sizeof("mqtt") - 1, - (uint8_t *) "mqtt" -}; - -HQUIC registration; -HQUIC configuration - static void msquic_listener_fini(HQUIC ql); static void msquic_listener_stop(HQUIC ql); static int msquic_listen(HQUIC ql, const char *h, const char *p, nni_quic_listener *l); From 986638232996fd56c34f3387be4f3a10f4d55054 Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Tue, 31 Oct 2023 02:57:56 -0400 Subject: [PATCH 05/27] * NEW [quic] Move defination and relative interfaces of dialer/listener/conn from quic_api to quic_private headers. Signed-off-by: wanghaemq --- src/supplemental/quic/quic_api.c | 1 + src/supplemental/quic/quic_api.h | 80 ------------------------- src/supplemental/quic/quic_listener.c | 1 + src/supplemental/quic/quic_private.h | 84 +++++++++++++++++++++++++++ 4 files changed, 86 insertions(+), 80 deletions(-) diff --git a/src/supplemental/quic/quic_api.c b/src/supplemental/quic/quic_api.c index 7b937ffe0..c84fec84d 100644 --- a/src/supplemental/quic/quic_api.c +++ b/src/supplemental/quic/quic_api.c @@ -7,6 +7,7 @@ // found online at https://opensource.org/licenses/MIT. // #include "quic_api.h" +#include "quic_private.h" #include "core/nng_impl.h" #include "nng/mqtt/mqtt_client.h" diff --git a/src/supplemental/quic/quic_api.h b/src/supplemental/quic/quic_api.h index f1f12b3a0..d971e18b1 100644 --- a/src/supplemental/quic/quic_api.h +++ b/src/supplemental/quic/quic_api.h @@ -11,7 +11,6 @@ #include "core/nng_impl.h" #include "nng/nng.h" -#include "msquic.h" typedef struct quic_dialer quic_dialer; typedef struct quic_listener quic_listener; @@ -19,83 +18,4 @@ typedef struct quic_listener quic_listener; extern int nni_quic_dialer_alloc(nng_stream_dialer **, const nni_url *); extern int nni_quic_listener_alloc(nng_stream_listener **, const nni_url *); -typedef struct nni_quic_dialer nni_quic_dialer; - -extern int nni_quic_dialer_init(void **); -extern void nni_quic_dialer_fini(nni_quic_dialer *d); -extern void nni_quic_dial(void *, const char *, const char *, nni_aio *); -extern void nni_quic_dialer_close(void *); - -typedef struct nni_quic_listener nni_quic_listener; - -extern int nni_quic_listener_init(void **); -extern void nni_quic_listener_listen(nni_quic_listener *, const char *, const char *); -extern void nni_quic_listener_accept(nni_quic_listener *, nng_aio *aio); - -typedef struct nni_quic_conn nni_quic_conn; - -extern int nni_msquic_quic_alloc(nni_quic_conn **, nni_quic_dialer *); -extern void nni_msquic_quic_init(nni_quic_conn *); -extern void nni_msquic_quic_start(nni_quic_conn *, int, int); -extern void nni_msquic_quic_dialer_rele(nni_quic_dialer *); - -struct nni_quic_dialer { - nni_aio *qconaio; // for quic connection - nni_quic_conn *currcon; - nni_list connq; // pending connections/quic streams - bool closed; - bool nodelay; - bool keepalive; - struct sockaddr_storage src; - size_t srclen; - nni_mtx mtx; - nni_atomic_u64 ref; - nni_atomic_bool fini; - - // MsQuic - HQUIC qconn; // quic connection - bool enable_0rtt; - bool enable_mltstrm; - uint8_t reason_code; - // ResumptionTicket - char rticket[4096]; // Ususally it would be within 4096. - // But in msquic. The maximum size is 65535. - uint16_t rticket_sz; - // CertificateFile - char * cacert; - char * key; - char * password; - bool verify_peer; - char * ca; - - // Quic settings - uint64_t qidle_timeout; - uint32_t qkeepalive; - uint64_t qconnect_timeout; - uint32_t qdiscon_timeout; - uint32_t qsend_idle_timeout; - uint32_t qinitial_rtt_ms; - uint32_t qmax_ack_delay_ms; - - QUIC_SETTINGS settings; -}; - -struct nni_quic_listener { - nni_mtx mtx; - nni_atomic_u64 ref; - nni_atomic_bool fini; - bool closed; - bool started; - nni_list acceptq; - nni_list incomings; - - // MsQuic - HQUIC ql; // Quic Listener - - // Quic Settings - bool enable_0rtt; - bool enable_mltstrm; - - QUIC_SETTINGS settings; -}; #endif diff --git a/src/supplemental/quic/quic_listener.c b/src/supplemental/quic/quic_listener.c index 44e15263d..2fe15a880 100644 --- a/src/supplemental/quic/quic_listener.c +++ b/src/supplemental/quic/quic_listener.c @@ -7,6 +7,7 @@ // found online at https://opensource.org/licenses/MIT. // #include "quic_api.h" +#include "quic_private.h" #include "core/nng_impl.h" #include "msquic.h" diff --git a/src/supplemental/quic/quic_private.h b/src/supplemental/quic/quic_private.h index 99c0c81a0..2ce282f4a 100644 --- a/src/supplemental/quic/quic_private.h +++ b/src/supplemental/quic/quic_private.h @@ -2,6 +2,8 @@ #define NNG_SUPP_QUIC_PRIVATE_H #include "msquic.h" +#include "core/nng_impl.h" +#include "nng/nng.h" // Config for msquic static const QUIC_REGISTRATION_CONFIG quic_reg_config = { @@ -22,4 +24,86 @@ static HQUIC configuration; int msquic_open(); void msquic_close(); +typedef struct nni_quic_dialer nni_quic_dialer; + +int nni_quic_dialer_init(void **); +void nni_quic_dialer_fini(nni_quic_dialer *d); +void nni_quic_dial(void *, const char *, const char *, nni_aio *); +void nni_quic_dialer_close(void *); + +typedef struct nni_quic_listener nni_quic_listener; + +int nni_quic_listener_init(void **); +void nni_quic_listener_listen(nni_quic_listener *, const char *, const char *); +void nni_quic_listener_accept(nni_quic_listener *, nng_aio *aio); + +typedef struct nni_quic_conn nni_quic_conn; + +int nni_msquic_quic_alloc(nni_quic_conn **, nni_quic_dialer *); +void nni_msquic_quic_init(nni_quic_conn *); +void nni_msquic_quic_start(nni_quic_conn *, int, int); +void nni_msquic_quic_dialer_rele(nni_quic_dialer *); + + +struct nni_quic_dialer { + nni_aio *qconaio; // for quic connection + nni_quic_conn *currcon; + nni_list connq; // pending connections/quic streams + bool closed; + bool nodelay; + bool keepalive; + struct sockaddr_storage src; + size_t srclen; + nni_mtx mtx; + nni_atomic_u64 ref; + nni_atomic_bool fini; + + // MsQuic + HQUIC qconn; // quic connection + bool enable_0rtt; + bool enable_mltstrm; + uint8_t reason_code; + // ResumptionTicket + char rticket[4096]; // Ususally it would be within 4096. + // But in msquic. The maximum size is 65535. + uint16_t rticket_sz; + // CertificateFile + char * cacert; + char * key; + char * password; + bool verify_peer; + char * ca; + + // Quic settings + uint64_t qidle_timeout; + uint32_t qkeepalive; + uint64_t qconnect_timeout; + uint32_t qdiscon_timeout; + uint32_t qsend_idle_timeout; + uint32_t qinitial_rtt_ms; + uint32_t qmax_ack_delay_ms; + + QUIC_SETTINGS settings; +}; + +struct nni_quic_listener { + nni_mtx mtx; + nni_atomic_u64 ref; + nni_atomic_bool fini; + bool closed; + bool started; + nni_list acceptq; + nni_list incomings; + + // MsQuic + HQUIC ql; // Quic Listener + + // Quic Settings + bool enable_0rtt; + bool enable_mltstrm; + + QUIC_SETTINGS settings; +}; + + #endif From 695173c0827940c0c60e4604f5fa68620832ebaa Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Tue, 31 Oct 2023 03:02:52 -0400 Subject: [PATCH 06/27] * NEW [quic] Move definatio of quic connection structure from dialer to quic_private.h thus to share code. Signed-off-by: wanghaemq --- src/supplemental/quic/msquic_dial.c | 17 ----------------- src/supplemental/quic/quic_private.h | 16 ++++++++++++++++ 2 files changed, 16 insertions(+), 17 deletions(-) diff --git a/src/supplemental/quic/msquic_dial.c b/src/supplemental/quic/msquic_dial.c index 33bc55049..a56ac69dc 100644 --- a/src/supplemental/quic/msquic_dial.c +++ b/src/supplemental/quic/msquic_dial.c @@ -40,23 +40,6 @@ #include #include -struct nni_quic_conn { - nng_stream stream; - nni_list readq; - nni_list writeq; - bool closed; - nni_mtx mtx; - nni_aio * dial_aio; - // nni_aio * qstrmaio; // Link to msquic_strm_cb - nni_quic_dialer *dialer; - - // MsQuic - HQUIC qstrm; // quic stream - uint8_t reason_code; - - nni_reap_node reap; -}; - static int msquic_conn_open(const char *host, const char *port, nni_quic_dialer *d); static void msquic_conn_close(HQUIC qconn, int rv); static void msquic_conn_fini(HQUIC qconn); diff --git a/src/supplemental/quic/quic_private.h b/src/supplemental/quic/quic_private.h index 2ce282f4a..16b5a0b98 100644 --- a/src/supplemental/quic/quic_private.h +++ b/src/supplemental/quic/quic_private.h @@ -105,5 +105,21 @@ struct nni_quic_listener { QUIC_SETTINGS settings; }; +struct nni_quic_conn { + nng_stream stream; + nni_list readq; + nni_list writeq; + bool closed; + nni_mtx mtx; + nni_aio * dial_aio; + // nni_aio * qstrmaio; // Link to msquic_strm_cb + nni_quic_dialer *dialer; + + // MsQuic + HQUIC qstrm; // quic stream + uint8_t reason_code; + + nni_reap_node reap; +}; #endif From ff0afca2603d9704b3f8ea2886538b3c9642502c Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Tue, 31 Oct 2023 03:19:21 -0400 Subject: [PATCH 07/27] * FIX [quic] Create two interfaces to alloc connection for dialer and listener. Signed-off-by: wanghaemq --- src/supplemental/quic/msquic_dial.c | 4 ++-- src/supplemental/quic/msquic_listen.c | 2 +- src/supplemental/quic/quic_private.h | 6 +++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/supplemental/quic/msquic_dial.c b/src/supplemental/quic/msquic_dial.c index a56ac69dc..936f6df58 100644 --- a/src/supplemental/quic/msquic_dial.c +++ b/src/supplemental/quic/msquic_dial.c @@ -219,7 +219,7 @@ nni_quic_dial(void *arg, const char *host, const char *port, nni_aio *aio) nni_atomic_inc64(&d->ref); // Create a connection whenever dial. So it's okey. right? - if ((rv = nni_msquic_quic_alloc(&c, d)) != 0) { + if ((rv = nni_msquic_quic_dialer_conn_alloc(&c, d)) != 0) { nni_aio_finish_error(aio, rv); nni_msquic_quic_dialer_rele(d); return; @@ -660,7 +660,7 @@ quic_stream_set(void *arg, const char *name, const void *buf, size_t sz, nni_typ } int -nni_msquic_quic_alloc(nni_quic_conn **cp, nni_quic_dialer *d) +nni_msquic_quic_dialer_conn_alloc(nni_quic_conn **cp, nni_quic_dialer *d) { nni_quic_conn *c; if ((c = NNI_ALLOC_STRUCT(c)) == NULL) { diff --git a/src/supplemental/quic/msquic_listen.c b/src/supplemental/quic/msquic_listen.c index 6161160ca..cc7fd2d2b 100644 --- a/src/supplemental/quic/msquic_listen.c +++ b/src/supplemental/quic/msquic_listen.c @@ -200,7 +200,7 @@ quic_listener_doaccept(nni_quic_listener *l) nni_aio_free(aioc); // Create a nni quic connection - if ((rv = nni_msquic_quic_alloc(&c, NULL)) != 0) { + if ((rv = nni_msquic_quic_listener_conn_alloc(&c, l)) != 0) { msquic_conn_fini(qconn); nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); diff --git a/src/supplemental/quic/quic_private.h b/src/supplemental/quic/quic_private.h index 16b5a0b98..73ab0514d 100644 --- a/src/supplemental/quic/quic_private.h +++ b/src/supplemental/quic/quic_private.h @@ -39,9 +39,9 @@ void nni_quic_listener_accept(nni_quic_listener *, nng_aio *aio); typedef struct nni_quic_conn nni_quic_conn; -int nni_msquic_quic_alloc(nni_quic_conn **, nni_quic_dialer *); -void nni_msquic_quic_init(nni_quic_conn *); -void nni_msquic_quic_start(nni_quic_conn *, int, int); +// Might no different TODO +int nni_msquic_quic_dialer_conn_alloc(nni_quic_conn **, nni_quic_dialer *); +int nni_msquic_quic_listener_conn_alloc(nni_quic_conn **, nni_quic_listener *); void nni_msquic_quic_dialer_rele(nni_quic_dialer *); From cdecdd91f503fcee56647668630411d36c8c9ceb Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Tue, 31 Oct 2023 05:58:06 -0400 Subject: [PATCH 08/27] * NEW [quic] Move some interface about msquic strream and connection that could be shared between dialer and listener from dialer to common. Signed-off-by: wanghaemq --- src/supplemental/quic/msquic_common.c | 35 +++++++++++++++++++++++++ src/supplemental/quic/msquic_dial.c | 37 --------------------------- src/supplemental/quic/quic_private.h | 10 ++++++++ 3 files changed, 45 insertions(+), 37 deletions(-) diff --git a/src/supplemental/quic/msquic_common.c b/src/supplemental/quic/msquic_common.c index 631262e30..5fa2d1981 100644 --- a/src/supplemental/quic/msquic_common.c +++ b/src/supplemental/quic/msquic_common.c @@ -50,3 +50,38 @@ msquic_open() return -1; } +/***************************** MsQuic Bindings *****************************/ + +void +msquic_conn_close(HQUIC qconn, int rv) +{ + MsQuic->ConnectionShutdown(qconn, QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, (QUIC_UINT62)rv); +} + +void +msquic_conn_fini(HQUIC qconn) +{ + MsQuic->ConnectionClose(qconn); +} + +void +msquic_strm_close(HQUIC qstrm) +{ + log_info("stream %p shutdown", qstrm); + MsQuic->StreamShutdown( + qstrm, QUIC_STREAM_SHUTDOWN_FLAG_ABORT | QUIC_STREAM_SHUTDOWN_FLAG_IMMEDIATE, NNG_ECONNSHUT); +} + +void +msquic_strm_fini(HQUIC qstrm) +{ + log_info("stream %p fini", qstrm); + MsQuic->StreamClose(qstrm); +} + +void +msquic_strm_recv_start(HQUIC qstrm) +{ + MsQuic->StreamReceiveSetEnabled(qstrm, TRUE); +} + diff --git a/src/supplemental/quic/msquic_dial.c b/src/supplemental/quic/msquic_dial.c index 936f6df58..ba48f4fe1 100644 --- a/src/supplemental/quic/msquic_dial.c +++ b/src/supplemental/quic/msquic_dial.c @@ -41,12 +41,7 @@ #include static int msquic_conn_open(const char *host, const char *port, nni_quic_dialer *d); -static void msquic_conn_close(HQUIC qconn, int rv); -static void msquic_conn_fini(HQUIC qconn); static int msquic_strm_open(HQUIC qconn, nni_quic_dialer *d); -static void msquic_strm_close(HQUIC qstrm); -static void msquic_strm_fini(HQUIC qstrm); -static void msquic_strm_recv_start(HQUIC qstrm); static void quic_dialer_cb(void *arg); static void quic_stream_error(void *arg, int err); @@ -1095,18 +1090,6 @@ msquic_conn_open(const char *host, const char *port, nni_quic_dialer *d) return (NNG_ECONNREFUSED); } -static void -msquic_conn_close(HQUIC qconn, int rv) -{ - MsQuic->ConnectionShutdown(qconn, QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, (QUIC_UINT62)rv); -} - -static void -msquic_conn_fini(HQUIC qconn) -{ - MsQuic->ConnectionClose(qconn); -} - static int msquic_strm_open(HQUIC qconn, nni_quic_dialer *d) { @@ -1143,23 +1126,3 @@ msquic_strm_open(HQUIC qconn, nni_quic_dialer *d) return (NNG_ECLOSED); } -static void -msquic_strm_close(HQUIC qstrm) -{ - log_info("stream %p shutdown", qstrm); - MsQuic->StreamShutdown( - qstrm, QUIC_STREAM_SHUTDOWN_FLAG_ABORT | QUIC_STREAM_SHUTDOWN_FLAG_IMMEDIATE, NNG_ECONNSHUT); -} - -static void -msquic_strm_fini(HQUIC qstrm) -{ - log_info("stream %p fini", qstrm); - MsQuic->StreamClose(qstrm); -} - -static void -msquic_strm_recv_start(HQUIC qstrm) -{ - MsQuic->StreamReceiveSetEnabled(qstrm, TRUE); -} diff --git a/src/supplemental/quic/quic_private.h b/src/supplemental/quic/quic_private.h index 73ab0514d..d9c96f78c 100644 --- a/src/supplemental/quic/quic_private.h +++ b/src/supplemental/quic/quic_private.h @@ -45,6 +45,16 @@ int nni_msquic_quic_listener_conn_alloc(nni_quic_conn **, nni_quic_listener *); void nni_msquic_quic_dialer_rele(nni_quic_dialer *); +// MsQuic bindings + +void msquic_conn_close(HQUIC qconn, int rv); +void msquic_conn_fini(HQUIC qconn); + +void msquic_strm_close(HQUIC qstrm); +void msquic_strm_fini(HQUIC qstrm); +void msquic_strm_recv_start(HQUIC qstrm); + + struct nni_quic_dialer { nni_aio *qconaio; // for quic connection nni_quic_conn *currcon; From f448caa2f9e664c6d23ecb42a684a95269aa42de Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Tue, 31 Oct 2023 06:10:38 -0400 Subject: [PATCH 09/27] * NEW [quic] Add nni_msquic_quic_listener_conn_alloc for creating connection for a quic listener. Signed-off-by: wanghaemq --- src/supplemental/quic/msquic_dial.c | 5 +++-- src/supplemental/quic/msquic_listen.c | 29 +++++++++++++++++++++++++++ src/supplemental/quic/quic_private.h | 4 +++- 3 files changed, 35 insertions(+), 3 deletions(-) diff --git a/src/supplemental/quic/msquic_dial.c b/src/supplemental/quic/msquic_dial.c index ba48f4fe1..9ac247c7f 100644 --- a/src/supplemental/quic/msquic_dial.c +++ b/src/supplemental/quic/msquic_dial.c @@ -662,8 +662,9 @@ nni_msquic_quic_dialer_conn_alloc(nni_quic_conn **cp, nni_quic_dialer *d) return (NNG_ENOMEM); } - c->closed = false; - c->dialer = d; + c->closed = false; + c->dialer = d; + c->listener = NULL; nni_mtx_init(&c->mtx); nni_aio_list_init(&c->readq); diff --git a/src/supplemental/quic/msquic_listen.c b/src/supplemental/quic/msquic_listen.c index cc7fd2d2b..03d040499 100644 --- a/src/supplemental/quic/msquic_listen.c +++ b/src/supplemental/quic/msquic_listen.c @@ -266,6 +266,35 @@ nni_quic_listener_fini(nni_quic_listener *l) NNI_FREE_STRUCT(l); } +/**************************** MsQuic Connection ****************************/ + + +int +nni_msquic_quic_listener_conn_alloc(nni_quic_conn **cp, nni_quic_listener *l) +{ + nni_quic_conn *c; + if ((c = NNI_ALLOC_STRUCT(c)) == NULL) { + return (NNG_ENOMEM); + } + + c->closed = false; + c->dialer = NULL; + c->listener = l; + + nni_mtx_init(&c->mtx); + nni_aio_list_init(&c->readq); + nni_aio_list_init(&c->writeq); + + c->stream.s_free = quic_stream_free; + c->stream.s_close = quic_stream_close; + c->stream.s_recv = quic_stream_recv; + c->stream.s_send = quic_stream_send; + c->stream.s_get = quic_stream_get; + c->stream.s_set = quic_stream_set; + + *cp = c; + return (0); +} /***************************** MsQuic Bindings *****************************/ diff --git a/src/supplemental/quic/quic_private.h b/src/supplemental/quic/quic_private.h index d9c96f78c..528985445 100644 --- a/src/supplemental/quic/quic_private.h +++ b/src/supplemental/quic/quic_private.h @@ -123,7 +123,9 @@ struct nni_quic_conn { nni_mtx mtx; nni_aio * dial_aio; // nni_aio * qstrmaio; // Link to msquic_strm_cb - nni_quic_dialer *dialer; + + nni_quic_dialer *dialer; + nni_quic_listener *listener; // MsQuic HQUIC qstrm; // quic stream From 6992c07098592bea3ded3c57144fc04374d6de97 Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Wed, 1 Nov 2023 02:09:24 -0400 Subject: [PATCH 10/27] * FIX [quic] Fix the errors in building. Signed-off-by: wanghaemq --- src/supplemental/quic/msquic_listen.c | 51 ++++++++------------------- src/supplemental/quic/quic_private.h | 2 +- 2 files changed, 15 insertions(+), 38 deletions(-) diff --git a/src/supplemental/quic/msquic_listen.c b/src/supplemental/quic/msquic_listen.c index 03d040499..afa40de24 100644 --- a/src/supplemental/quic/msquic_listen.c +++ b/src/supplemental/quic/msquic_listen.c @@ -40,23 +40,6 @@ #include #include -struct nni_quic_conn { - nng_stream stream; - nni_list readq; - nni_list writeq; - bool closed; - nni_mtx mtx; - nni_aio * dial_aio; - // nni_aio * qstrmaio; // Link to msquic_strm_cb - nni_quic_dialer *dialer; - - // MsQuic - HQUIC qstrm; // quic stream - uint8_t reason_code; - - nni_reap_node reap; -}; - static void msquic_listener_fini(HQUIC ql); static void msquic_listener_stop(HQUIC ql); static int msquic_listen(HQUIC ql, const char *h, const char *p, nni_quic_listener *l); @@ -111,7 +94,7 @@ quic_listener_doclose(nni_quic_listener *l) nni_aio_finish_error(aio, NNG_ECLOSED); } while ((aio = nni_list_first(&l->incomings)) != NULL) { - qconn = nni_aio_get_prov_data(aio); + HQUIC qconn = nni_aio_get_prov_data(aio); nni_aio_list_remove(aio); nni_aio_free(aio); msquic_conn_fini(qconn); @@ -124,9 +107,6 @@ quic_listener_doclose(nni_quic_listener *l) void nni_quic_listener_close(nni_quic_listener *l) { - nni_aio *aio; - HQUIC qconn; - nni_mtx_lock(&l->mtx); quic_listener_doclose(l); nni_mtx_unlock(&l->mtx); @@ -136,10 +116,7 @@ nni_quic_listener_close(nni_quic_listener *l) int nni_quic_listener_listen(nni_quic_listener *l, const char *h, const char *p) { - socklen_t len; - int rv; - int fd; - nni_posix_pfd * pfd; + int rv; nni_mtx_lock(&l->mtx); if (l->started) { @@ -151,7 +128,11 @@ nni_quic_listener_listen(nni_quic_listener *l, const char *h, const char *p) return (NNG_ECLOSED); } - msquic_listen(l->ql, h, p, l); + rv = msquic_listen(l->ql, h, p, l); + if (rv != 0) { + nni_mtx_unlock(&l->mtx); + return rv; + } l->started = true; nni_mtx_unlock(&l->mtx); @@ -181,17 +162,13 @@ quic_listener_doaccept(nni_quic_listener *l) nni_aio *aio; while ((aio = nni_list_first(&l->acceptq)) != NULL) { - int newfd; - int fd; - int rv; - int nd; - int ka; + int rv; HQUIC qconn; nni_aio * aioc; nni_quic_conn * c; // Get the connection - if ((aioc == nni_list_first(&l->incomings)) == NULL) { + if ((aioc = nni_list_first(&l->incomings)) == NULL) { // No wait and return immediately return; } @@ -324,7 +301,7 @@ msquic_connection_cb(_In_ HQUIC Connection, _In_opt_ void *Context, MsQuic->ConnectionSendResumptionTicket(qconn, QUIC_SEND_RESUMPTION_FLAG_NONE, 0, NULL); } - nni_aio_finish(d->qconaio, 0, 0); + // nni_aio_finish(d->qconaio, 0, 0); break; case QUIC_CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_TRANSPORT: log_warn("[conn][%p] Shutdown by transport, 0x%x, Error Code %llu\n", @@ -371,8 +348,8 @@ _IRQL_requires_max_(PASSIVE_LEVEL) _Function_class_(QUIC_LISTENER_CALLBACK) QUIC_STATUS QUIC_API msquic_listener_cb(_In_ HQUIC ql, _In_opt_ void *arg, _Inout_ QUIC_LISTENER_EVENT *ev) { - HQUIC *qconn; - QUIC_NEW_CONNECTION_INFO *qinfo; + HQUIC qconn; + const QUIC_NEW_CONNECTION_INFO *qinfo; QUIC_STATUS rv = QUIC_STATUS_NOT_SUPPORTED; nni_quic_listener *l = arg; nni_aio *aio; @@ -407,7 +384,7 @@ msquic_listener_cb(_In_ HQUIC ql, _In_opt_ void *arg, _Inout_ QUIC_LISTENER_EVEN static int msquic_listen(HQUIC ql, const char *h, const char *p, nni_quic_listener *l) { - HQUIC addr; + QUIC_ADDR addr; QUIC_STATUS rv = 0; QuicAddrSetFamily(&addr, QUIC_ADDRESS_FAMILY_UNSPEC); @@ -420,7 +397,7 @@ msquic_listen(HQUIC ql, const char *h, const char *p, nni_quic_listener *l) goto error; } - if (QUIC_FAILED(rv = MsQuic->ListenerStart(ql, alpn, 1, &addr))) { + if (QUIC_FAILED(rv = MsQuic->ListenerStart(ql, &quic_alpn, 1, &addr))) { log_error("error in listen start %ld", rv); goto error; } diff --git a/src/supplemental/quic/quic_private.h b/src/supplemental/quic/quic_private.h index 528985445..84fd76dda 100644 --- a/src/supplemental/quic/quic_private.h +++ b/src/supplemental/quic/quic_private.h @@ -34,7 +34,7 @@ void nni_quic_dialer_close(void *); typedef struct nni_quic_listener nni_quic_listener; int nni_quic_listener_init(void **); -void nni_quic_listener_listen(nni_quic_listener *, const char *, const char *); +int nni_quic_listener_listen(nni_quic_listener *, const char *, const char *); void nni_quic_listener_accept(nni_quic_listener *, nng_aio *aio); typedef struct nni_quic_conn nni_quic_conn; From bc472fe120fa0d9b0e83673a4f985942c20d65cc Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Wed, 1 Nov 2023 02:55:56 -0400 Subject: [PATCH 11/27] * NEW [quic] Add handler for QUIC_CONNECTION_EVENT_PEER_STREAM_STARTED and QUIC_CONNECTION_EVENT_RESUMED. Signed-off-by: wanghaemq --- src/supplemental/quic/msquic_dial.c | 2 - src/supplemental/quic/msquic_listen.c | 178 +++++++++++++++++++++++++- 2 files changed, 177 insertions(+), 3 deletions(-) diff --git a/src/supplemental/quic/msquic_dial.c b/src/supplemental/quic/msquic_dial.c index 9ac247c7f..90b747d70 100644 --- a/src/supplemental/quic/msquic_dial.c +++ b/src/supplemental/quic/msquic_dial.c @@ -805,8 +805,6 @@ msquic_connection_cb(_In_ HQUIC Connection, _In_opt_ void *Context, return QUIC_STATUS_SUCCESS; } -// The clients's callback for stream events from MsQuic. -// New recv cb of quic transport _IRQL_requires_max_(DISPATCH_LEVEL) _Function_class_(QUIC_STREAM_CALLBACK) QUIC_STATUS QUIC_API msquic_strm_cb(_In_ HQUIC stream, _In_opt_ void *Context, diff --git a/src/supplemental/quic/msquic_listen.c b/src/supplemental/quic/msquic_listen.c index afa40de24..8798443ab 100644 --- a/src/supplemental/quic/msquic_listen.c +++ b/src/supplemental/quic/msquic_listen.c @@ -281,6 +281,172 @@ msquic_load_listener_config() return; } +_IRQL_requires_max_(DISPATCH_LEVEL) +_Function_class_(QUIC_STREAM_CALLBACK) QUIC_STATUS QUIC_API +msquic_strm_cb(_In_ HQUIC stream, _In_opt_ void *Context, + _Inout_ QUIC_STREAM_EVENT *Event) +{ + nni_quic_conn *c = Context; + nni_aio *aio; + nni_iov * aiov; + unsigned naiov; + uint32_t rlen, rlen2, rpos; + uint8_t *rbuf; + uint32_t count; + + log_debug("quic_strm_cb triggered! %d conn %p strm %p", Event->Type, c, stream); + switch (Event->Type) { + case QUIC_STREAM_EVENT_SEND_COMPLETE: + log_debug("QUIC_STREAM_EVENT_SEND_COMPLETE!"); + if (Event->SEND_COMPLETE.Canceled) { + log_warn("[strm][%p] Data sent Canceled: %d", + stream, Event->SEND_COMPLETE.Canceled); + } + // Priority msg send + if ((aio = Event->SEND_COMPLETE.ClientContext) != NULL) { + QUIC_BUFFER *buf = nni_aio_get_input(aio, 0); + free(buf); + Event->SEND_COMPLETE.ClientContext = NULL; + // TODO free by user cb or msquic layer??? + // nni_msg *msg = nni_aio_get_msg(aio); + // nni_msg_free(msg); + nni_aio_finish(aio, 0, nni_aio_count(aio)); + break; + } + // Ordinary send + quic_stream_cb(QUIC_STREAM_EVENT_SEND_COMPLETE, c); + break; + case QUIC_STREAM_EVENT_RECEIVE: + // Data was received from the peer on the stream. + count = Event->RECEIVE.BufferCount; + + log_debug("[strm][%p] Data received Flag: %d", stream, Event->RECEIVE.Flags); + + if (Event->RECEIVE.Flags & QUIC_RECEIVE_FLAG_FIN) { + if (c->reason_code == 0) + c->reason_code = CLIENT_IDENTIFIER_NOT_VALID; + log_warn("FIN received in QUIC stream"); + break; + } + + nni_mtx_lock(&c->mtx); + if (c->closed) { + // Actively closed the quic stream by upper layer. So ignore. + nni_mtx_unlock(&c->mtx); + return QUIC_STATUS_PENDING; + } + // Get all the buffers in quic stream + if (count == 0) { + nni_mtx_unlock(&c->mtx); + return QUIC_STATUS_PENDING; + } + + rbuf = Event->RECEIVE.Buffers[0].Buffer; + rlen = Event->RECEIVE.Buffers[0].Length; + + rpos = 0; + while ((aio = nni_list_first(&c->readq)) != NULL) { + nni_aio_get_iov(aio, &naiov, &aiov); + int n = 0; + for (uint8_t i=0; i= aiov[i].iov_len) { + memcpy(aiov[i].iov_buf, rbuf+rpos, aiov[i].iov_len); + rpos += aiov[i].iov_len; + n += aiov[i].iov_len; + } else { + memcpy(aiov[i].iov_buf, rbuf+rpos, rlen2); + rpos += rlen2; + n += rlen2; + } + } + if (n == 0) { // rbuf run out + break; + } + nni_aio_bump_count(aio, n); + + // We completed the entire operation on this aio. + nni_aio_list_remove(aio); + nni_aio_finish(aio, 0, nni_aio_count(aio)); + + // Go back to start of loop to see if there is another + // aio ready for us to process. + } + + MsQuic->StreamReceiveComplete(c->qstrm, rpos); + nni_mtx_unlock(&c->mtx); + + return QUIC_STATUS_PENDING; + case QUIC_STREAM_EVENT_PEER_SEND_ABORTED: + // The peer gracefully shut down its send direction of the + // stream. + log_warn("[strm][%p] PEER_SEND_ABORTED errorcode %llu\n", stream, + (unsigned long long) Event->PEER_SEND_ABORTED.ErrorCode); + if (c->reason_code == 0) + c->reason_code = SERVER_SHUTTING_DOWN; + + quic_stream_cb(QUIC_STREAM_EVENT_PEER_SEND_ABORTED, c); + break; + case QUIC_STREAM_EVENT_PEER_SEND_SHUTDOWN: + // The peer aborted its send direction of the stream. + log_warn("[strm][%p] Peer send shut down\n", stream); + MsQuic->StreamShutdown(stream, QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, 0); + quic_stream_cb(QUIC_STREAM_EVENT_PEER_SEND_SHUTDOWN, c); + break; + case QUIC_STREAM_EVENT_SEND_SHUTDOWN_COMPLETE: + log_warn("[strm][%p] QUIC_STREAM_EVENT_SEND_SHUTDOWN_COMPLETE.", stream); + break; + + case QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE: + // Both directions of the stream have been shut down and MsQuic + // is done with the stream. It can now be safely cleaned up. + log_warn("[strm][%p] QUIC_STREAM_EVENT shutdown: All done.", + stream); + log_info("close stream with Error Code: %llu", + (unsigned long long) + Event->SHUTDOWN_COMPLETE.ConnectionErrorCode); + quic_stream_cb(QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE, c); + break; + case QUIC_STREAM_EVENT_START_COMPLETE: + log_info( + "QUIC_STREAM_EVENT_START_COMPLETE [%p] ID: %ld Status: %d", + stream, Event->START_COMPLETE.ID, + Event->START_COMPLETE.Status); + if (!Event->START_COMPLETE.PeerAccepted) { + log_warn("Peer refused"); + quic_stream_cb(QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE, c); + break; + } + + quic_stream_cb(QUIC_STREAM_EVENT_START_COMPLETE, c); + break; + case QUIC_STREAM_EVENT_IDEAL_SEND_BUFFER_SIZE: + log_info("QUIC_STREAM_EVENT_IDEAL_SEND_BUFFER_SIZE"); + break; + case QUIC_STREAM_EVENT_PEER_ACCEPTED: + log_info("QUIC_STREAM_EVENT_PEER_ACCEPTED"); + break; + case QUIC_STREAM_EVENT_PEER_RECEIVE_ABORTED: + // The peer has requested that we stop sending. Close abortively. + log_warn("[strm][%p] Peer RECEIVE aborted\n", stream); + log_warn("QUIC_STREAM_EVENT_PEER_RECEIVE_ABORTED Error Code: %llu", + (unsigned long long) Event->PEER_RECEIVE_ABORTED.ErrorCode); + + quic_stream_cb(QUIC_STREAM_EVENT_PEER_RECEIVE_ABORTED, c); + break; + + default: + log_warn("Unknown Event Type %d", Event->Type); + break; + } + return QUIC_STATUS_SUCCESS; +} + + _IRQL_requires_max_(DISPATCH_LEVEL) _Function_class_(QUIC_CONNECTION_CALLBACK) QUIC_STATUS QUIC_API msquic_connection_cb(_In_ HQUIC Connection, _In_opt_ void *Context, @@ -300,8 +466,18 @@ msquic_connection_cb(_In_ HQUIC Connection, _In_opt_ void *Context, if (l->enable_0rtt) { MsQuic->ConnectionSendResumptionTicket(qconn, QUIC_SEND_RESUMPTION_FLAG_NONE, 0, NULL); } + break; + case QUIC_CONNECTION_EVENT_PEER_STREAM_STARTED: + HQUIC qstrm = ev->PEER_STREAM_STARTED.Stream; + QUIC_STREAM_OPEN_FLAGS flags = ev->PEER_STREAM_STARTED.Flags; - // nni_aio_finish(d->qconaio, 0, 0); + log_info("[conn][%p] Peer stream %p started. flags %d.", qconn, qstrm, flags); + MsQuic->SetCallbackHandler(qstrm, (void *)msquic_strm_cb, NULL); + + break; + case QUIC_CONNECTION_EVENT_RESUMED: + // TODO + log_warn("[conn][%p] This connection is resumed.", qconn); break; case QUIC_CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_TRANSPORT: log_warn("[conn][%p] Shutdown by transport, 0x%x, Error Code %llu\n", From 49dbf460372b6cab3460258a63f4ff19e1535b61 Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Wed, 1 Nov 2023 03:03:41 -0400 Subject: [PATCH 12/27] * NEW [quic] Listener should get a handle of a quic stream rather than a quic connection. Signed-off-by: wanghaemq --- src/supplemental/quic/msquic_listen.c | 82 +++++++++++++++++++++++---- 1 file changed, 72 insertions(+), 10 deletions(-) diff --git a/src/supplemental/quic/msquic_listen.c b/src/supplemental/quic/msquic_listen.c index 8798443ab..5e43103e8 100644 --- a/src/supplemental/quic/msquic_listen.c +++ b/src/supplemental/quic/msquic_listen.c @@ -245,6 +245,78 @@ nni_quic_listener_fini(nni_quic_listener *l) /**************************** MsQuic Connection ****************************/ +static void +quic_stream_cb(int events, void *arg) +{ + log_debug("[quic cb] start %d\n", events); + nni_quic_conn *c = arg; + nni_quic_dialer *d; + nni_aio *aio; + + if (!c) + return; + + d = c->listener; + + switch (events) { + case QUIC_STREAM_EVENT_SEND_COMPLETE: + nni_mtx_lock(&c->mtx); + if ((aio = nni_list_first(&c->writeq)) == NULL) { + log_error("Aio lost after sending: conn %p", c); + nni_mtx_unlock(&c->mtx); + break; + } + nni_aio_list_remove(aio); + QUIC_BUFFER *buf = nni_aio_get_input(aio, 0); + free(buf); + nni_aio_finish(aio, 0, nni_aio_count(aio)); + + // Start next send only after finished the last send + quic_stream_dowrite(c); + + nni_mtx_unlock(&c->mtx); + break; + case QUIC_STREAM_EVENT_START_COMPLETE: + nni_mtx_lock(&l->mtx); + + // Push connection to incomings + nni_aio_alloc(&aio, NULL, NULL); + nni_aio_set_prov_data(aio, (void *)qconn); + nni_aio_list_append(&l->incomings, aio); + + quic_listener_doaccept(l); + + nni_mtx_unlock(&l->mtx); + /* + if (c->dial_aio) { + // For upper layer to get the stream handle + nni_aio_set_output(c->dial_aio, 0, c); + + nni_aio_list_remove(c->dial_aio); + nni_aio_finish(c->dial_aio, 0, 0); + c->dial_aio = NULL; + } + */ + break; + // case QUIC_STREAM_EVENT_RECEIVE: // get a fin from stream + // TODO Need more talk about those cases + // case QUIC_STREAM_EVENT_PEER_SEND_ABORTED: + // case QUIC_STREAM_EVENT_PEER_SEND_SHUTDOWN: + // case QUIC_STREAM_EVENT_SEND_SHUTDOWN_COMPLETE: + case QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE: + // case QUIC_STREAM_EVENT_PEER_RECEIVE_ABORTED: + // Marked it as closed, prevent explicit shutdown + c->closed = true; + // It's the only place to free msquic stream + msquic_strm_fini(c->qstrm); + quic_stream_error(arg, NNG_ECONNSHUT); + break; + default: + break; + } + log_debug("[quic cb] end\n"); +} + int nni_msquic_quic_listener_conn_alloc(nni_quic_conn **cp, nni_quic_listener *l) @@ -537,16 +609,6 @@ msquic_listener_cb(_In_ HQUIC ql, _In_opt_ void *arg, _Inout_ QUIC_LISTENER_EVEN MsQuic->SetCallbackHandler(qconn, msquic_connection_cb, ql); rv = MsQuic->ConnectionSetConfiguration(qconn, configuration); - - nni_mtx_lock(&l->mtx); - - // Push connection to incomings - nni_aio_alloc(&aio, NULL, NULL); - nni_aio_set_prov_data(aio, (void *)qconn); - nni_aio_list_append(&l->incomings, aio); - - quic_listener_doaccept(l); - nni_mtx_unlock(&l->mtx); break; case QUIC_LISTENER_EVENT_STOP_COMPLETE: break; From e40133e4111304a214b94fc23ae4a7b0c6822824 Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Wed, 1 Nov 2023 04:30:18 -0400 Subject: [PATCH 13/27] * NEW [quic] Add quic_session the manage streams under a connection. Signed-off-by: wanghaemq --- src/supplemental/quic/quic_private.h | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/supplemental/quic/quic_private.h b/src/supplemental/quic/quic_private.h index 84fd76dda..864b988e3 100644 --- a/src/supplemental/quic/quic_private.h +++ b/src/supplemental/quic/quic_private.h @@ -115,6 +115,17 @@ struct nni_quic_listener { QUIC_SETTINGS settings; }; +struct nni_quic_session { + // MsQuic + HQUIC qconn; // quic connection + + nni_quic_conn *conns; // The quic streams in this quic connection + + bool closed; + nni_mtx mtx; +}; + + struct nni_quic_conn { nng_stream stream; nni_list readq; From db31982f124d092dc8bbcc037db8e61f4199d6b7 Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Wed, 1 Nov 2023 05:11:11 -0400 Subject: [PATCH 14/27] * NEW [quic] Add new interface quic_listener_session_alloc to alloc a session. * NEW [quic] Update the time to return a quic connection the upper layer. * FIX [quic] Upper layer could get connection at a right time.(Stream was established). Signed-off-by: wanghaemq --- src/supplemental/quic/msquic_listen.c | 80 +++++++++++++++++++-------- src/supplemental/quic/quic_private.h | 6 +- 2 files changed, 62 insertions(+), 24 deletions(-) diff --git a/src/supplemental/quic/msquic_listen.c b/src/supplemental/quic/msquic_listen.c index 5e43103e8..a05eed9b5 100644 --- a/src/supplemental/quic/msquic_listen.c +++ b/src/supplemental/quic/msquic_listen.c @@ -163,7 +163,6 @@ quic_listener_doaccept(nni_quic_listener *l) while ((aio = nni_list_first(&l->acceptq)) != NULL) { int rv; - HQUIC qconn; nni_aio * aioc; nni_quic_conn * c; @@ -172,18 +171,10 @@ quic_listener_doaccept(nni_quic_listener *l) // No wait and return immediately return; } - qconn = nni_aio_get_prov_data(aioc); // Must exists + c = nni_aio_get_prov_data(aioc); // Must exists nni_aio_list_remove(aioc); nni_aio_free(aioc); - // Create a nni quic connection - if ((rv = nni_msquic_quic_listener_conn_alloc(&c, l)) != 0) { - msquic_conn_fini(qconn); - nni_aio_list_remove(aio); - nni_aio_finish_error(aio, rv); - continue; - } - nni_aio_list_remove(aio); nni_aio_set_output(aio, 0, c); nni_aio_finish(aio, 0, 0); @@ -249,14 +240,15 @@ static void quic_stream_cb(int events, void *arg) { log_debug("[quic cb] start %d\n", events); - nni_quic_conn *c = arg; - nni_quic_dialer *d; - nni_aio *aio; + nni_quic_conn *c = arg; + nni_quic_listener *l; + nni_aio *aio; if (!c) return; - d = c->listener; + ss = c->session; + l = c->listener; switch (events) { case QUIC_STREAM_EVENT_SEND_COMPLETE: @@ -281,12 +273,12 @@ quic_stream_cb(int events, void *arg) // Push connection to incomings nni_aio_alloc(&aio, NULL, NULL); - nni_aio_set_prov_data(aio, (void *)qconn); + nni_aio_set_prov_data(aio, (void *)c); nni_aio_list_append(&l->incomings, aio); quic_listener_doaccept(l); - nni_mtx_unlock(&l->mtx); + nni_mtx_unlock(&ss->mtx); /* if (c->dial_aio) { // For upper layer to get the stream handle @@ -319,7 +311,7 @@ quic_stream_cb(int events, void *arg) int -nni_msquic_quic_listener_conn_alloc(nni_quic_conn **cp, nni_quic_listener *l) +nni_msquic_quic_listener_conn_alloc(nni_quic_conn **cp, nni_quic_session *ss) { nni_quic_conn *c; if ((c = NNI_ALLOC_STRUCT(c)) == NULL) { @@ -328,7 +320,8 @@ nni_msquic_quic_listener_conn_alloc(nni_quic_conn **cp, nni_quic_listener *l) c->closed = false; c->dialer = NULL; - c->listener = l; + c->listener = ss->listener; + c->session = ss; nni_mtx_init(&c->mtx); nni_aio_list_init(&c->readq); @@ -345,6 +338,26 @@ nni_msquic_quic_listener_conn_alloc(nni_quic_conn **cp, nni_quic_listener *l) return (0); } +static int +quic_listener_session_alloc(nni_quic_session **ss, nni_quic_listener *l, HQUIC qconn) +{ + nni_quic_session *s; + if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { + return (NNG_ENOMEM); + } + + s->closed = false; + s->qconn = qconn; + s->listener = l; + + nni_aio_list_init(&s->conns); + nni_mtx_init(&s->mtx); + + *ss = s; + return (0); +} + + /***************************** MsQuic Bindings *****************************/ static void @@ -495,6 +508,7 @@ msquic_strm_cb(_In_ HQUIC stream, _In_opt_ void *Context, } quic_stream_cb(QUIC_STREAM_EVENT_START_COMPLETE, c); + break; case QUIC_STREAM_EVENT_IDEAL_SEND_BUFFER_SIZE: log_info("QUIC_STREAM_EVENT_IDEAL_SEND_BUFFER_SIZE"); @@ -524,8 +538,8 @@ _Function_class_(QUIC_CONNECTION_CALLBACK) QUIC_STATUS QUIC_API msquic_connection_cb(_In_ HQUIC Connection, _In_opt_ void *Context, _Inout_ QUIC_CONNECTION_EVENT *ev) { - nni_quic_listener *l = Context; - HQUIC qconn = Connection; + nni_quic_session *ss = Context; + HQUIC qconn = Connection; log_debug("msquic_connection_cb triggered! %d", ev->Type); switch (ev->Type) { @@ -535,7 +549,7 @@ msquic_connection_cb(_In_ HQUIC Connection, _In_opt_ void *Context, log_info("[conn][%p] is Connected. Resumed Session %d", qconn, ev->CONNECTED.SessionResumed); - if (l->enable_0rtt) { + if (ss->listener->enable_0rtt) { MsQuic->ConnectionSendResumptionTicket(qconn, QUIC_SEND_RESUMPTION_FLAG_NONE, 0, NULL); } break; @@ -543,8 +557,20 @@ msquic_connection_cb(_In_ HQUIC Connection, _In_opt_ void *Context, HQUIC qstrm = ev->PEER_STREAM_STARTED.Stream; QUIC_STREAM_OPEN_FLAGS flags = ev->PEER_STREAM_STARTED.Flags; + int rv; + nni_quic_conn *c; + + // Create a nni quic connection + if ((rv = nni_msquic_quic_listener_conn_alloc(&c, ss)) != 0) { + log_warn("Error in alloc new quic stream."); + // msquic_conn_fini(qconn); + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + break; + } + log_info("[conn][%p] Peer stream %p started. flags %d.", qconn, qstrm, flags); - MsQuic->SetCallbackHandler(qstrm, (void *)msquic_strm_cb, NULL); + MsQuic->SetCallbackHandler(qstrm, (void *)msquic_strm_cb, c); break; case QUIC_CONNECTION_EVENT_RESUMED: @@ -596,18 +622,26 @@ _IRQL_requires_max_(PASSIVE_LEVEL) _Function_class_(QUIC_LISTENER_CALLBACK) QUIC_STATUS QUIC_API msquic_listener_cb(_In_ HQUIC ql, _In_opt_ void *arg, _Inout_ QUIC_LISTENER_EVENT *ev) { + int rv; HQUIC qconn; const QUIC_NEW_CONNECTION_INFO *qinfo; QUIC_STATUS rv = QUIC_STATUS_NOT_SUPPORTED; nni_quic_listener *l = arg; nni_aio *aio; + nni_quic_session *ss; switch (ev->Type) { case QUIC_LISTENER_EVENT_NEW_CONNECTION: qconn = ev->NEW_CONNECTION.Connection; qinfo = ev->NEW_CONNECTION.Info; - MsQuic->SetCallbackHandler(qconn, msquic_connection_cb, ql); + rv = quic_listener_session_alloc(&ss, l, qconn); + if (rv != 0) { + log_error("error in alloc session"); + break; + } + + MsQuic->SetCallbackHandler(qconn, msquic_connection_cb, ss); rv = MsQuic->ConnectionSetConfiguration(qconn, configuration); break; case QUIC_LISTENER_EVENT_STOP_COMPLETE: diff --git a/src/supplemental/quic/quic_private.h b/src/supplemental/quic/quic_private.h index 864b988e3..99d8977f4 100644 --- a/src/supplemental/quic/quic_private.h +++ b/src/supplemental/quic/quic_private.h @@ -44,6 +44,7 @@ int nni_msquic_quic_dialer_conn_alloc(nni_quic_conn **, nni_quic_dialer *); int nni_msquic_quic_listener_conn_alloc(nni_quic_conn **, nni_quic_listener *); void nni_msquic_quic_dialer_rele(nni_quic_dialer *); +typedef struct nni_quic_session nni_quic_session; // MsQuic bindings @@ -119,7 +120,9 @@ struct nni_quic_session { // MsQuic HQUIC qconn; // quic connection - nni_quic_conn *conns; // The quic streams in this quic connection + nni_list *conns; // The quic streams in this quic connection + + nni_quic_listener *listener; bool closed; nni_mtx mtx; @@ -137,6 +140,7 @@ struct nni_quic_conn { nni_quic_dialer *dialer; nni_quic_listener *listener; + nni_quic_session *session; // MsQuic HQUIC qstrm; // quic stream From 041f8679db956a1ac40fcc58b5ad993c860fce01 Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Wed, 1 Nov 2023 05:22:42 -0400 Subject: [PATCH 15/27] * FIX [quic] Fix a error in comments. * FIX [quic] Close the connection when peer abort its send direction. Signed-off-by: wanghaemq --- src/supplemental/quic/msquic_dial.c | 5 ++--- src/supplemental/quic/msquic_listen.c | 16 ++++------------ 2 files changed, 6 insertions(+), 15 deletions(-) diff --git a/src/supplemental/quic/msquic_dial.c b/src/supplemental/quic/msquic_dial.c index 90b747d70..dd8d36132 100644 --- a/src/supplemental/quic/msquic_dial.c +++ b/src/supplemental/quic/msquic_dial.c @@ -906,8 +906,7 @@ msquic_strm_cb(_In_ HQUIC stream, _In_opt_ void *Context, return QUIC_STATUS_PENDING; case QUIC_STREAM_EVENT_PEER_SEND_ABORTED: - // The peer gracefully shut down its send direction of the - // stream. + // The peer aborted its send direction of the stream. log_warn("[strm][%p] PEER_SEND_ABORTED errorcode %llu\n", stream, (unsigned long long) Event->PEER_SEND_ABORTED.ErrorCode); if (c->reason_code == 0) @@ -916,7 +915,7 @@ msquic_strm_cb(_In_ HQUIC stream, _In_opt_ void *Context, quic_stream_cb(QUIC_STREAM_EVENT_PEER_SEND_ABORTED, c); break; case QUIC_STREAM_EVENT_PEER_SEND_SHUTDOWN: - // The peer aborted its send direction of the stream. + // The peer gracefully shut down its send direction of the stream. log_warn("[strm][%p] Peer send shut down\n", stream); MsQuic->StreamShutdown(stream, QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, 0); quic_stream_cb(QUIC_STREAM_EVENT_PEER_SEND_SHUTDOWN, c); diff --git a/src/supplemental/quic/msquic_listen.c b/src/supplemental/quic/msquic_listen.c index a05eed9b5..e0c2820bf 100644 --- a/src/supplemental/quic/msquic_listen.c +++ b/src/supplemental/quic/msquic_listen.c @@ -279,16 +279,6 @@ quic_stream_cb(int events, void *arg) quic_listener_doaccept(l); nni_mtx_unlock(&ss->mtx); - /* - if (c->dial_aio) { - // For upper layer to get the stream handle - nni_aio_set_output(c->dial_aio, 0, c); - - nni_aio_list_remove(c->dial_aio); - nni_aio_finish(c->dial_aio, 0, 0); - c->dial_aio = NULL; - } - */ break; // case QUIC_STREAM_EVENT_RECEIVE: // get a fin from stream // TODO Need more talk about those cases @@ -467,13 +457,14 @@ msquic_strm_cb(_In_ HQUIC stream, _In_opt_ void *Context, return QUIC_STATUS_PENDING; case QUIC_STREAM_EVENT_PEER_SEND_ABORTED: - // The peer gracefully shut down its send direction of the - // stream. + // The peer abort its send direction of the stream. log_warn("[strm][%p] PEER_SEND_ABORTED errorcode %llu\n", stream, (unsigned long long) Event->PEER_SEND_ABORTED.ErrorCode); if (c->reason_code == 0) c->reason_code = SERVER_SHUTTING_DOWN; + msquic_strm_close(c->qstrm); + quic_stream_cb(QUIC_STREAM_EVENT_PEER_SEND_ABORTED, c); break; case QUIC_STREAM_EVENT_PEER_SEND_SHUTDOWN: @@ -483,6 +474,7 @@ msquic_strm_cb(_In_ HQUIC stream, _In_opt_ void *Context, quic_stream_cb(QUIC_STREAM_EVENT_PEER_SEND_SHUTDOWN, c); break; case QUIC_STREAM_EVENT_SEND_SHUTDOWN_COMPLETE: + // The peer gracefully shut down its send direction of the stream. log_warn("[strm][%p] QUIC_STREAM_EVENT_SEND_SHUTDOWN_COMPLETE.", stream); break; From f031b0a2e595593511bba98bfb8ab3960ac018b7 Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Wed, 1 Nov 2023 05:28:53 -0400 Subject: [PATCH 16/27] * NEW [quic] Add quic stream related interfaces. Signed-off-by: wanghaemq --- src/supplemental/quic/msquic_listen.c | 245 ++++++++++++++++++++++++++ 1 file changed, 245 insertions(+) diff --git a/src/supplemental/quic/msquic_listen.c b/src/supplemental/quic/msquic_listen.c index e0c2820bf..811f3fb5f 100644 --- a/src/supplemental/quic/msquic_listen.c +++ b/src/supplemental/quic/msquic_listen.c @@ -299,6 +299,250 @@ quic_stream_cb(int events, void *arg) log_debug("[quic cb] end\n"); } +static void +quic_stream_fini(void *arg) +{ + nni_quic_conn *c = arg; + quic_stream_close(c); + + if (c->dialer) { + nni_msquic_quic_dialer_rele(c->dialer); + } + NNI_FREE_STRUCT(c); +} + +//static nni_reap_list quic_reap_list = { +// .rl_offset = offsetof(nni_quic_conn, reap), +// .rl_func = quic_stream_fini, +//}; +static void +quic_stream_free(void *arg) +{ + nni_quic_conn *c = arg; + quic_stream_fini(c); +} + +// Notify upper layer that something happened. +// Includes closed by peer or transport layer. +// Or get a FIN from quic stream. +static void +quic_stream_error(void *arg, int err) +{ + nni_quic_conn *c = arg; + nni_aio * aio; + + nni_mtx_lock(&c->mtx); + // only close aio of this stream + while ((aio = nni_list_first(&c->writeq)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, err); + } + while ((aio = nni_list_first(&c->readq)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, err); + } + nni_mtx_unlock(&c->mtx); +} + +static void +quic_stream_close(void *arg) +{ + nni_quic_conn *c = arg; + nni_mtx_lock(&c->mtx); + if (c->closed != true) { + c->closed = true; + msquic_strm_close(c->qstrm); + } + nni_mtx_unlock(&c->mtx); +} + +static void +quic_stream_cancel(nni_aio *aio, void *arg, int rv) +{ + nni_quic_conn *c = arg; + + nni_mtx_lock(&c->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&c->mtx); +} + +static void +quic_stream_recv(void *arg, nni_aio *aio) +{ + nni_quic_conn *c = arg; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&c->mtx); + + if ((rv = nni_aio_schedule(aio, quic_stream_cancel, c)) != 0) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_aio_list_append(&c->readq, aio); + + // Receive start if there are only one aio in readq. + if (nni_list_first(&c->readq) == aio) { + // In msquic. To avoid repeated memory copies. We just enable + // the receive. And doread in msquic_strm_cb. So there is + // only one copy from msquic to nanonng. + msquic_strm_recv_start(c->qstrm); + } + nni_mtx_unlock(&c->mtx); +} + +static void +quic_stream_dowrite_prior(nni_quic_conn *c, nni_aio *aio) +{ + log_debug("[quic dowrite adv] start\n"); + int rv; + unsigned naiov; + nni_iov * aiov; + size_t n = 0; + + if (c->closed) { + return; + } + + nni_aio_get_iov(aio, &naiov, &aiov); + + QUIC_BUFFER *buf=(QUIC_BUFFER*)malloc(sizeof(QUIC_BUFFER)*naiov); + for (uint8_t i = 0; i < naiov; ++i) { + log_debug("buf%d sz %d", i, aiov[i].iov_len); + buf[i].Buffer = aiov[i].iov_buf; + buf[i].Length = aiov[i].iov_len; + n += aiov[i].iov_len; + } + nni_aio_set_input(aio, 0, buf); + + if (QUIC_FAILED(rv = MsQuic->StreamSend(c->qstrm, buf, + naiov, QUIC_SEND_FLAG_NONE, aio))) { + log_error("Failed in StreamSend, 0x%x!", rv); + free(buf); + return; + } + + nni_aio_bump_count(aio, n); + log_debug("[quic dowrite adv] end"); +} + +static void +quic_stream_dowrite(nni_quic_conn *c) +{ + log_debug("[quic dowrite] start %p", c->qstrm); + nni_aio *aio; + int rv; + + if (c->closed) { + return; + } + + while ((aio = nni_list_first(&c->writeq)) != NULL) { + unsigned naiov; + nni_iov * aiov; + size_t n = 0; + + nni_aio_get_iov(aio, &naiov, &aiov); + if (naiov == 0) + log_warn("A msg without content?"); + + QUIC_BUFFER *buf=(QUIC_BUFFER*)malloc(sizeof(QUIC_BUFFER)*naiov); + for (uint8_t i = 0; i < naiov; ++i) { + log_debug("buf%d sz %d", i, aiov[i].iov_len); + buf[i].Buffer = aiov[i].iov_buf; + buf[i].Length = aiov[i].iov_len; + n += aiov[i].iov_len; + } + nni_aio_set_input(aio, 0, buf); + + if (QUIC_FAILED(rv = MsQuic->StreamSend(c->qstrm, buf, + naiov, QUIC_SEND_FLAG_NONE, NULL))) { + log_error("Failed in StreamSend, 0x%x!", rv); + free(buf); + // nni_aio_list_remove(aio); + // nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + + nni_aio_bump_count(aio, n); + + break; + // Different from tcp. + // Here we just send one msg at once. + } +} + +static void +quic_stream_send(void *arg, nni_aio *aio) +{ + nni_quic_conn *c = arg; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + + nni_mtx_lock(&c->mtx); + if ((rv = nni_aio_schedule(aio, quic_stream_cancel, c)) != 0) { + nni_mtx_unlock(&c->mtx); + nni_aio_finish_error(aio, rv); + return; + } + + // QUIC_HIGH_PRIOR_MSG Feature! + int *flags = nni_aio_get_prov_data(aio); + nni_aio_set_prov_data(aio, NULL); + + if (flags) { + if (*flags & QUIC_HIGH_PRIOR_MSG) { + quic_stream_dowrite_prior(c, aio); + nni_mtx_unlock(&c->mtx); + return; + } + } + + nni_aio_list_append(&c->writeq, aio); + + if (nni_list_first(&c->writeq) == aio) { + quic_stream_dowrite(c); + // In msquic. Write can be done at any time. + } + nni_mtx_unlock(&c->mtx); +} + +static int +quic_stream_get(void *arg, const char *name, void *buf, size_t *szp, nni_type t) +{ + NNI_ARG_UNUSED(arg); + NNI_ARG_UNUSED(name); + NNI_ARG_UNUSED(buf); + NNI_ARG_UNUSED(szp); + NNI_ARG_UNUSED(t); + return 0; + + // nni_quic_conn *c = arg; + // return (nni_getopt(tcp_options, name, c, buf, szp, t)); +} + +static int +quic_stream_set(void *arg, const char *name, const void *buf, size_t sz, nni_type t) +{ + NNI_ARG_UNUSED(arg); + NNI_ARG_UNUSED(name); + NNI_ARG_UNUSED(buf); + NNI_ARG_UNUSED(sz); + NNI_ARG_UNUSED(t); + return 0; + + // nni_quic_conn *c = arg; + // return (nni_setopt(tcp_options, name, c, buf, sz, t)); +} int nni_msquic_quic_listener_conn_alloc(nni_quic_conn **cp, nni_quic_session *ss) @@ -476,6 +720,7 @@ msquic_strm_cb(_In_ HQUIC stream, _In_opt_ void *Context, case QUIC_STREAM_EVENT_SEND_SHUTDOWN_COMPLETE: // The peer gracefully shut down its send direction of the stream. log_warn("[strm][%p] QUIC_STREAM_EVENT_SEND_SHUTDOWN_COMPLETE.", stream); + // TODO The next msg would better to be sent with a FIN flag. break; case QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE: From 92de198a25b04fdd57f47a936b3c7e0b813e4e90 Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Wed, 1 Nov 2023 05:35:37 -0400 Subject: [PATCH 17/27] * FIX [quic] Fix the error in building. Signed-off-by: wanghaemq --- src/supplemental/quic/msquic_listen.c | 13 +++++++------ src/supplemental/quic/quic_private.h | 7 +++---- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/supplemental/quic/msquic_listen.c b/src/supplemental/quic/msquic_listen.c index 811f3fb5f..e4f526eff 100644 --- a/src/supplemental/quic/msquic_listen.c +++ b/src/supplemental/quic/msquic_listen.c @@ -44,6 +44,10 @@ static void msquic_listener_fini(HQUIC ql); static void msquic_listener_stop(HQUIC ql); static int msquic_listen(HQUIC ql, const char *h, const char *p, nni_quic_listener *l); +static void quic_stream_error(void *arg, int err); +static void quic_stream_close(void *arg); +static void quic_stream_dowrite(nni_quic_conn *c); + /***************************** MsQuic Listener ******************************/ int @@ -162,7 +166,6 @@ quic_listener_doaccept(nni_quic_listener *l) nni_aio *aio; while ((aio = nni_list_first(&l->acceptq)) != NULL) { - int rv; nni_aio * aioc; nni_quic_conn * c; @@ -242,6 +245,7 @@ quic_stream_cb(int events, void *arg) log_debug("[quic cb] start %d\n", events); nni_quic_conn *c = arg; nni_quic_listener *l; + nni_quic_session *ss; nni_aio *aio; if (!c) @@ -801,8 +805,6 @@ msquic_connection_cb(_In_ HQUIC Connection, _In_opt_ void *Context, if ((rv = nni_msquic_quic_listener_conn_alloc(&c, ss)) != 0) { log_warn("Error in alloc new quic stream."); // msquic_conn_fini(qconn); - nni_aio_list_remove(aio); - nni_aio_finish_error(aio, rv); break; } @@ -859,7 +861,6 @@ _IRQL_requires_max_(PASSIVE_LEVEL) _Function_class_(QUIC_LISTENER_CALLBACK) QUIC_STATUS QUIC_API msquic_listener_cb(_In_ HQUIC ql, _In_opt_ void *arg, _Inout_ QUIC_LISTENER_EVENT *ev) { - int rv; HQUIC qconn; const QUIC_NEW_CONNECTION_INFO *qinfo; QUIC_STATUS rv = QUIC_STATUS_NOT_SUPPORTED; @@ -872,8 +873,8 @@ msquic_listener_cb(_In_ HQUIC ql, _In_opt_ void *arg, _Inout_ QUIC_LISTENER_EVEN qconn = ev->NEW_CONNECTION.Connection; qinfo = ev->NEW_CONNECTION.Info; - rv = quic_listener_session_alloc(&ss, l, qconn); - if (rv != 0) { + int rc = quic_listener_session_alloc(&ss, l, qconn); + if (rc != 0) { log_error("error in alloc session"); break; } diff --git a/src/supplemental/quic/quic_private.h b/src/supplemental/quic/quic_private.h index 99d8977f4..d2a2a122f 100644 --- a/src/supplemental/quic/quic_private.h +++ b/src/supplemental/quic/quic_private.h @@ -32,6 +32,7 @@ void nni_quic_dial(void *, const char *, const char *, nni_aio *); void nni_quic_dialer_close(void *); typedef struct nni_quic_listener nni_quic_listener; +typedef struct nni_quic_session nni_quic_session; int nni_quic_listener_init(void **); int nni_quic_listener_listen(nni_quic_listener *, const char *, const char *); @@ -41,11 +42,9 @@ typedef struct nni_quic_conn nni_quic_conn; // Might no different TODO int nni_msquic_quic_dialer_conn_alloc(nni_quic_conn **, nni_quic_dialer *); -int nni_msquic_quic_listener_conn_alloc(nni_quic_conn **, nni_quic_listener *); +int nni_msquic_quic_listener_conn_alloc(nni_quic_conn **, nni_quic_session *); void nni_msquic_quic_dialer_rele(nni_quic_dialer *); -typedef struct nni_quic_session nni_quic_session; - // MsQuic bindings void msquic_conn_close(HQUIC qconn, int rv); @@ -120,7 +119,7 @@ struct nni_quic_session { // MsQuic HQUIC qconn; // quic connection - nni_list *conns; // The quic streams in this quic connection + nni_list conns; // The quic streams in this quic connection nni_quic_listener *listener; From 8f5f7dde80bc37f46eed60f01a019a3c8535ee64 Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Wed, 1 Nov 2023 07:29:23 -0400 Subject: [PATCH 18/27] * FIX [quic] Fix the error that call wrong interface and fix the implictly interfaces. Signed-off-by: wanghaemq --- src/supplemental/quic/quic_listener.c | 4 +--- src/supplemental/quic/quic_private.h | 2 ++ 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/supplemental/quic/quic_listener.c b/src/supplemental/quic/quic_listener.c index 2fe15a880..230c49112 100644 --- a/src/supplemental/quic/quic_listener.c +++ b/src/supplemental/quic/quic_listener.c @@ -55,7 +55,7 @@ static int quic_listener_listen(void *arg) { quic_listener *l = arg; - return (nni_quic_listener_listen(l->l, &l->sa)); + return (nni_quic_listener_listen(l->l, l->host, l->port)); } static void @@ -65,7 +65,6 @@ quic_listener_accept(void *arg, nng_aio *aio) nni_quic_listener_accept(l->l, aio); } -/* static int quic_listener_get( void *arg, const char *name, void *buf, size_t *szp, nni_type t) @@ -84,7 +83,6 @@ quic_listener_set( quic_listener *l = arg; return (nni_quic_listener_set(l->l, name, buf, sz, t)); } -*/ static int quic_listener_alloc_addr(nng_stream_listener **lp, const char *h, const char *p) diff --git a/src/supplemental/quic/quic_private.h b/src/supplemental/quic/quic_private.h index d2a2a122f..95ea1efe2 100644 --- a/src/supplemental/quic/quic_private.h +++ b/src/supplemental/quic/quic_private.h @@ -37,6 +37,8 @@ typedef struct nni_quic_session nni_quic_session; int nni_quic_listener_init(void **); int nni_quic_listener_listen(nni_quic_listener *, const char *, const char *); void nni_quic_listener_accept(nni_quic_listener *, nng_aio *aio); +void nni_quic_listener_close(nni_quic_listener *l); +void nni_quic_listener_fini(nni_quic_listener *l); typedef struct nni_quic_conn nni_quic_conn; From 8b0d6c177b7e65fc991994bea637fe9eab9040b6 Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Wed, 1 Nov 2023 23:40:50 -0400 Subject: [PATCH 19/27] * FIX [quic] Fix the multiple definations and Unreference errors. Signed-off-by: wanghaemq --- src/supplemental/quic/msquic_listen.c | 16 +++++++++------- src/supplemental/quic/quic_api.c | 2 ++ 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/src/supplemental/quic/msquic_listen.c b/src/supplemental/quic/msquic_listen.c index e4f526eff..6acfa76e1 100644 --- a/src/supplemental/quic/msquic_listen.c +++ b/src/supplemental/quic/msquic_listen.c @@ -606,7 +606,7 @@ msquic_load_listener_config() _IRQL_requires_max_(DISPATCH_LEVEL) _Function_class_(QUIC_STREAM_CALLBACK) QUIC_STATUS QUIC_API -msquic_strm_cb(_In_ HQUIC stream, _In_opt_ void *Context, +msquic_strm_listener_cb(_In_ HQUIC stream, _In_opt_ void *Context, _Inout_ QUIC_STREAM_EVENT *Event) { nni_quic_conn *c = Context; @@ -776,13 +776,13 @@ msquic_strm_cb(_In_ HQUIC stream, _In_opt_ void *Context, _IRQL_requires_max_(DISPATCH_LEVEL) _Function_class_(QUIC_CONNECTION_CALLBACK) QUIC_STATUS QUIC_API -msquic_connection_cb(_In_ HQUIC Connection, _In_opt_ void *Context, +msquic_connection_listener_cb(_In_ HQUIC Connection, _In_opt_ void *Context, _Inout_ QUIC_CONNECTION_EVENT *ev) { nni_quic_session *ss = Context; HQUIC qconn = Connection; - log_debug("msquic_connection_cb triggered! %d", ev->Type); + log_debug("msquic_connection_listener_cb triggered! %d", ev->Type); switch (ev->Type) { case QUIC_CONNECTION_EVENT_CONNECTED: // The handshake has completed for the connection. @@ -809,7 +809,7 @@ msquic_connection_cb(_In_ HQUIC Connection, _In_opt_ void *Context, } log_info("[conn][%p] Peer stream %p started. flags %d.", qconn, qstrm, flags); - MsQuic->SetCallbackHandler(qstrm, (void *)msquic_strm_cb, c); + MsQuic->SetCallbackHandler(qstrm, (void *)msquic_strm_listener_cb, c); break; case QUIC_CONNECTION_EVENT_RESUMED: @@ -879,7 +879,7 @@ msquic_listener_cb(_In_ HQUIC ql, _In_opt_ void *arg, _Inout_ QUIC_LISTENER_EVEN break; } - MsQuic->SetCallbackHandler(qconn, msquic_connection_cb, ss); + MsQuic->SetCallbackHandler(qconn, msquic_connection_listener_cb, ss); rv = MsQuic->ConnectionSetConfiguration(qconn, configuration); break; case QUIC_LISTENER_EVENT_STOP_COMPLETE: @@ -897,8 +897,10 @@ msquic_listen(HQUIC ql, const char *h, const char *p, nni_quic_listener *l) QUIC_ADDR addr; QUIC_STATUS rv = 0; - QuicAddrSetFamily(&addr, QUIC_ADDRESS_FAMILY_UNSPEC); - QuicAddrSetPort(&addr, atoi(p)); + addr.Ip.sa_family = QUIC_ADDRESS_FAMILY_UNSPEC; + addr.Ipv4.sin_port = htons(atol(p)); + // QuicAddrSetFamily(&addr, QUIC_ADDRESS_FAMILY_UNSPEC); + // QuicAddrSetPort(&addr, atoi(p)); msquic_load_listener_config(); diff --git a/src/supplemental/quic/quic_api.c b/src/supplemental/quic/quic_api.c index c84fec84d..de767a710 100644 --- a/src/supplemental/quic/quic_api.c +++ b/src/supplemental/quic/quic_api.c @@ -37,6 +37,7 @@ struct quic_dialer { void * d; // platform dialer }; +/* int nni_quic_listener_alloc(nng_stream_listener **lp, const nni_url *url) { @@ -45,6 +46,7 @@ nni_quic_listener_alloc(nng_stream_listener **lp, const nni_url *url) return 0; } +*/ static void quic_dial_cancel(nni_aio *aio, void *arg, int rv) From 9930b6462c7779e3148cc654dc0096960d344ed4 Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Thu, 2 Nov 2023 00:06:53 -0400 Subject: [PATCH 20/27] * NEW [quic] Add unit test about echo for quic dialer and listener. Signed-off-by: wanghaemq --- src/supplemental/quic/quic_listener_test.c | 75 ++++++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 src/supplemental/quic/quic_listener_test.c diff --git a/src/supplemental/quic/quic_listener_test.c b/src/supplemental/quic/quic_listener_test.c new file mode 100644 index 000000000..aafce468e --- /dev/null +++ b/src/supplemental/quic/quic_listener_test.c @@ -0,0 +1,75 @@ +// +// Copyright 2023 NanoMQ Team, Inc. +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include + +void +test_quic_echo(void) +{ + nng_stream_listener *l; + nng_stream_dialer * d; + nng_aio * aiod; + nng_aio * aiol; + nng_stream * sd; + nng_stream * sl; + void * td; + void * tl; + uint8_t * bufr; + uint8_t * bufs; + size_t size = 450001; + + // allocate messages + NUTS_ASSERT((bufs = nng_alloc(size)) != NULL); + NUTS_ASSERT((bufr = nng_alloc(size)) != NULL); + for (size_t i = 0; i < size; i++) { + bufs[i] = rand() & 0xff; + } + + NUTS_PASS(nng_aio_alloc(&aiod, NULL, NULL)); + NUTS_PASS(nng_aio_alloc(&aiol, NULL, NULL)); + nng_aio_set_timeout(aiod, 5000); // 5 sec + nng_aio_set_timeout(aiol, 5000); // 5 sec + + NUTS_PASS(nng_stream_listener_alloc(&l, "quic://127.0.0.1:14567")); + NUTS_PASS(nng_stream_listener_listen(l)); + + NUTS_PASS(nng_stream_dialer_alloc(&d, "quic://127.0.0.1:14567")); + + nng_stream_listener_accept(l, aiol); + nng_stream_dialer_dial(d, aiod); + + nng_aio_wait(aiol); + nng_aio_wait(aiod); + NUTS_PASS(nng_aio_result(aiol)); + NUTS_PASS(nng_aio_result(aiod)); + + NUTS_TRUE((sl = nng_aio_get_output(aiol, 0)) != NULL); + NUTS_TRUE((sd = nng_aio_get_output(aiod, 0)) != NULL); + + td = nuts_stream_send_start(sd, bufs, size); + tl = nuts_stream_send_start(sl, bufr, size); + + NUTS_PASS(nuts_stream_wait(td)); + NUTS_PASS(nuts_stream_wait(tl)); + NUTS_TRUE(memcmp(bufs, bufr, size) == 0); + + nng_free(bufr, size); + nng_free(bufs, size); + nng_stream_free(sd); + nng_stream_free(sl); + nng_stream_dialer_free(d); + nng_stream_listener_free(l); + nng_aio_free(aiod); + nng_aio_free(aiol); +} + +TEST_LIST = { + { "quic dialer listener echo test", test_quic_echo }, + { NULL, NULL }, +}; From afed113a428a88e4f3a28f2b8b945d27fe803415 Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Thu, 2 Nov 2023 00:24:55 -0400 Subject: [PATCH 21/27] * FIX [quic] Fix the null host and port. * FIX [quic] Dialer and listener each have a registration and configuration. Signed-off-by: wanghaemq --- src/supplemental/quic/msquic_common.c | 6 +++--- src/supplemental/quic/msquic_dial.c | 6 +++++- src/supplemental/quic/msquic_listen.c | 9 +++++++++ src/supplemental/quic/quic_listener.c | 4 ++-- src/supplemental/quic/quic_private.h | 7 ++----- 5 files changed, 21 insertions(+), 11 deletions(-) diff --git a/src/supplemental/quic/msquic_common.c b/src/supplemental/quic/msquic_common.c index 5fa2d1981..d17f0e5db 100644 --- a/src/supplemental/quic/msquic_common.c +++ b/src/supplemental/quic/msquic_common.c @@ -4,7 +4,7 @@ static int is_msquic_inited = 0; void -msquic_close() +msquic_close(HQUIC registration, HQUIC configuration) { if (MsQuic != NULL) { if (configuration != NULL) { @@ -21,7 +21,7 @@ msquic_close() } int -msquic_open() +msquic_open(HQUIC registration) { if (is_msquic_inited == 1) return 0; @@ -46,7 +46,7 @@ msquic_open() return 0; error: - msquic_close(); + msquic_close(registration, NULL); return -1; } diff --git a/src/supplemental/quic/msquic_dial.c b/src/supplemental/quic/msquic_dial.c index dd8d36132..0dbc8d146 100644 --- a/src/supplemental/quic/msquic_dial.c +++ b/src/supplemental/quic/msquic_dial.c @@ -40,6 +40,10 @@ #include #include +// The registration and configuration for dialer +static HQUIC registration; +static HQUIC configuration; + static int msquic_conn_open(const char *host, const char *port, nni_quic_dialer *d); static int msquic_strm_open(HQUIC qconn, nni_quic_dialer *d); @@ -1053,7 +1057,7 @@ msquic_conn_open(const char *host, const char *port, nni_quic_dialer *d) QUIC_STATUS rv; HQUIC conn = NULL; - if (0 != msquic_open()) { + if (0 != msquic_open(registration)) { // so... close the quic connection return (NNG_ESYSERR); } diff --git a/src/supplemental/quic/msquic_listen.c b/src/supplemental/quic/msquic_listen.c index 6acfa76e1..238c4c638 100644 --- a/src/supplemental/quic/msquic_listen.c +++ b/src/supplemental/quic/msquic_listen.c @@ -40,6 +40,10 @@ #include #include +// The registration and configuration for listener +static HQUIC registration; +static HQUIC configuration; + static void msquic_listener_fini(HQUIC ql); static void msquic_listener_stop(HQUIC ql); static int msquic_listen(HQUIC ql, const char *h, const char *p, nni_quic_listener *l); @@ -902,6 +906,11 @@ msquic_listen(HQUIC ql, const char *h, const char *p, nni_quic_listener *l) // QuicAddrSetFamily(&addr, QUIC_ADDRESS_FAMILY_UNSPEC); // QuicAddrSetPort(&addr, atoi(p)); + if (0 != msquic_open(registration)) { + // so... close the quic connection + return (NNG_ESYSERR); + } + msquic_load_listener_config(); if (QUIC_FAILED(rv = MsQuic->ListenerOpen(registration, msquic_listener_cb, (void *)l, &ql))) { diff --git a/src/supplemental/quic/quic_listener.c b/src/supplemental/quic/quic_listener.c index 230c49112..9b131961f 100644 --- a/src/supplemental/quic/quic_listener.c +++ b/src/supplemental/quic/quic_listener.c @@ -97,8 +97,8 @@ quic_listener_alloc_addr(nng_stream_listener **lp, const char *h, const char *p) NNI_FREE_STRUCT(l); return (rv); } - l->host = h; - l->port = p; + l->host = strdup(h); + l->port = strdup(p); l->ops.sl_free = quic_listener_free; l->ops.sl_close = quic_listener_close; diff --git a/src/supplemental/quic/quic_private.h b/src/supplemental/quic/quic_private.h index 95ea1efe2..56265b3fa 100644 --- a/src/supplemental/quic/quic_private.h +++ b/src/supplemental/quic/quic_private.h @@ -18,11 +18,8 @@ static const QUIC_BUFFER quic_alpn = { static const QUIC_API_TABLE *MsQuic = NULL; -static HQUIC registration; -static HQUIC configuration; - -int msquic_open(); -void msquic_close(); +int msquic_open(HQUIC registration); +void msquic_close(HQUIC registration, HQUIC configuration); typedef struct nni_quic_dialer nni_quic_dialer; From 0af9782b3924f281bd8cf4db16660c9de5c6217e Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Thu, 2 Nov 2023 05:16:58 -0400 Subject: [PATCH 22/27] * FIX [quic] Dialer and Listener should have its own msquic_open. Signed-off-by: wanghaemq --- src/supplemental/quic/msquic_common.c | 51 +++--------------------- src/supplemental/quic/msquic_dial.c | 57 ++++++++++++++++++++++++++- src/supplemental/quic/msquic_listen.c | 56 ++++++++++++++++++++++++++ src/supplemental/quic/quic_private.h | 9 +---- 4 files changed, 119 insertions(+), 54 deletions(-) diff --git a/src/supplemental/quic/msquic_common.c b/src/supplemental/quic/msquic_common.c index d17f0e5db..1d763f4ae 100644 --- a/src/supplemental/quic/msquic_common.c +++ b/src/supplemental/quic/msquic_common.c @@ -1,57 +1,16 @@ #include "quic_private.h" #include "core/nng_impl.h" -static int is_msquic_inited = 0; +static const QUIC_API_TABLE *MsQuic = NULL; + +/***************************** MsQuic Bindings *****************************/ void -msquic_close(HQUIC registration, HQUIC configuration) +msquic_set_api_table(const QUIC_API_TABLE *table) { - if (MsQuic != NULL) { - if (configuration != NULL) { - MsQuic->ConfigurationClose(configuration); - } - if (registration != NULL) { - // This will block until all outstanding child objects - // have been closed. - MsQuic->RegistrationClose(registration); - } - MsQuicClose(MsQuic); - is_msquic_inited = 0; - } + MsQuic = table; } -int -msquic_open(HQUIC registration) -{ - if (is_msquic_inited == 1) - return 0; - - QUIC_STATUS rv = QUIC_STATUS_SUCCESS; - // only Open MsQUIC lib once, otherwise cause memleak - if (MsQuic == NULL) - if (QUIC_FAILED(rv = MsQuicOpen2(&MsQuic))) { - log_error("MsQuicOpen2 failed, 0x%x!\n", rv); - goto error; - } - - // Create a registration for the app's connections. - rv = MsQuic->RegistrationOpen(&quic_reg_config, ®istration); - if (QUIC_FAILED(rv)) { - log_error("RegistrationOpen failed, 0x%x!\n", rv); - goto error; - } - - is_msquic_inited = 1; - log_info("Msquic is enabled"); - return 0; - -error: - msquic_close(registration, NULL); - return -1; -} - -/***************************** MsQuic Bindings *****************************/ - void msquic_conn_close(HQUIC qconn, int rv) { diff --git a/src/supplemental/quic/msquic_dial.c b/src/supplemental/quic/msquic_dial.c index 0dbc8d146..aad1f6b19 100644 --- a/src/supplemental/quic/msquic_dial.c +++ b/src/supplemental/quic/msquic_dial.c @@ -40,10 +40,15 @@ #include #include +static const QUIC_API_TABLE *MsQuic = NULL; + // The registration and configuration for dialer static HQUIC registration; static HQUIC configuration; +static int msquic_open(); +static void msquic_close(); + static int msquic_conn_open(const char *host, const char *port, nni_quic_dialer *d); static int msquic_strm_open(HQUIC qconn, nni_quic_dialer *d); @@ -1057,7 +1062,7 @@ msquic_conn_open(const char *host, const char *port, nni_quic_dialer *d) QUIC_STATUS rv; HQUIC conn = NULL; - if (0 != msquic_open(registration)) { + if (0 != msquic_open()) { // so... close the quic connection return (NNG_ESYSERR); } @@ -1128,3 +1133,53 @@ msquic_strm_open(HQUIC qconn, nni_quic_dialer *d) return (NNG_ECLOSED); } +static int is_msquic_inited = 0; + +static void +msquic_close() +{ + if (MsQuic != NULL) { + if (configuration != NULL) { + MsQuic->ConfigurationClose(configuration); + } + if (registration != NULL) { + // This will block until all outstanding child objects + // have been closed. + MsQuic->RegistrationClose(registration); + } + MsQuicClose(MsQuic); + is_msquic_inited = 0; + } +} + +static int +msquic_open() +{ + if (is_msquic_inited == 1) + return 0; + + QUIC_STATUS rv = QUIC_STATUS_SUCCESS; + // only Open MsQUIC lib once, otherwise cause memleak + if (MsQuic == NULL) + if (QUIC_FAILED(rv = MsQuicOpen2(&MsQuic))) { + log_error("MsQuicOpen2 failed, 0x%x!\n", rv); + goto error; + } + msquic_set_api_table(MsQuic); + + // Create a registration for the app's connections. + rv = MsQuic->RegistrationOpen(&quic_reg_config, ®istration); + if (QUIC_FAILED(rv)) { + log_error("RegistrationOpen failed, 0x%x!\n", rv); + goto error; + } + + is_msquic_inited = 1; + log_info("Msquic is enabled"); + return 0; + +error: + msquic_close(registration, NULL); + return -1; +} + diff --git a/src/supplemental/quic/msquic_listen.c b/src/supplemental/quic/msquic_listen.c index 238c4c638..493c6c4af 100644 --- a/src/supplemental/quic/msquic_listen.c +++ b/src/supplemental/quic/msquic_listen.c @@ -40,10 +40,15 @@ #include #include +static const QUIC_API_TABLE *MsQuic = NULL; + // The registration and configuration for listener static HQUIC registration; static HQUIC configuration; +static int msquic_open(); +static void msquic_close(); + static void msquic_listener_fini(HQUIC ql); static void msquic_listener_stop(HQUIC ql); static int msquic_listen(HQUIC ql, const char *h, const char *p, nni_quic_listener *l); @@ -944,3 +949,54 @@ msquic_listener_fini(HQUIC ql) MsQuic->ListenerClose(ql); } +static int is_msquic_inited = 0; + +static void +msquic_close() +{ + if (MsQuic != NULL) { + if (configuration != NULL) { + MsQuic->ConfigurationClose(configuration); + } + if (registration != NULL) { + // This will block until all outstanding child objects + // have been closed. + MsQuic->RegistrationClose(registration); + } + MsQuicClose(MsQuic); + is_msquic_inited = 0; + } +} + +static int +msquic_open() +{ + if (is_msquic_inited == 1) + return 0; + + QUIC_STATUS rv = QUIC_STATUS_SUCCESS; + // only Open MsQUIC lib once, otherwise cause memleak + if (MsQuic == NULL) + if (QUIC_FAILED(rv = MsQuicOpen2(&MsQuic))) { + log_error("MsQuicOpen2 failed, 0x%x!\n", rv); + goto error; + } + msquic_set_api_table(MsQuic); + + // Create a registration for the app's connections. + rv = MsQuic->RegistrationOpen(&quic_reg_config, ®istration); + if (QUIC_FAILED(rv)) { + log_error("RegistrationOpen failed, 0x%x!\n", rv); + goto error; + } + + is_msquic_inited = 1; + log_info("Msquic is enabled"); + return 0; + +error: + msquic_close(registration, NULL); + return -1; +} + + diff --git a/src/supplemental/quic/quic_private.h b/src/supplemental/quic/quic_private.h index 56265b3fa..0e9d5bd2c 100644 --- a/src/supplemental/quic/quic_private.h +++ b/src/supplemental/quic/quic_private.h @@ -16,11 +16,6 @@ static const QUIC_BUFFER quic_alpn = { (uint8_t *) "mqtt" }; -static const QUIC_API_TABLE *MsQuic = NULL; - -int msquic_open(HQUIC registration); -void msquic_close(HQUIC registration, HQUIC configuration); - typedef struct nni_quic_dialer nni_quic_dialer; int nni_quic_dialer_init(void **); @@ -44,7 +39,8 @@ int nni_msquic_quic_dialer_conn_alloc(nni_quic_conn **, nni_quic_dialer *); int nni_msquic_quic_listener_conn_alloc(nni_quic_conn **, nni_quic_session *); void nni_msquic_quic_dialer_rele(nni_quic_dialer *); -// MsQuic bindings +// MsQuic binding +void msquic_set_api_table(const QUIC_API_TABLE *table); void msquic_conn_close(HQUIC qconn, int rv); void msquic_conn_fini(HQUIC qconn); @@ -53,7 +49,6 @@ void msquic_strm_close(HQUIC qstrm); void msquic_strm_fini(HQUIC qstrm); void msquic_strm_recv_start(HQUIC qstrm); - struct nni_quic_dialer { nni_aio *qconaio; // for quic connection nni_quic_conn *currcon; From 977c16fcd20327f821dc1c519a3ca007bf24d36c Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Thu, 2 Nov 2023 05:18:11 -0400 Subject: [PATCH 23/27] * FIX [quic] Add function to load settings for listener. Signed-off-by: wanghaemq --- src/supplemental/quic/msquic_listen.c | 126 +++++++++++++++++++++++++- src/supplemental/quic/quic_private.h | 8 ++ 2 files changed, 130 insertions(+), 4 deletions(-) diff --git a/src/supplemental/quic/msquic_listen.c b/src/supplemental/quic/msquic_listen.c index 493c6c4af..0e8d3e282 100644 --- a/src/supplemental/quic/msquic_listen.c +++ b/src/supplemental/quic/msquic_listen.c @@ -607,10 +607,126 @@ quic_listener_session_alloc(nni_quic_session **ss, nni_quic_listener *l, HQUIC q /***************************** MsQuic Bindings *****************************/ -static void -msquic_load_listener_config() +typedef struct QUIC_CREDENTIAL_CONFIG_HELPER { + QUIC_CREDENTIAL_CONFIG CredConfig; + union { + QUIC_CERTIFICATE_HASH CertHash; + QUIC_CERTIFICATE_HASH_STORE CertHashStore; + QUIC_CERTIFICATE_FILE CertFile; + QUIC_CERTIFICATE_FILE_PROTECTED CertFileProtected; + }; +} QUIC_CREDENTIAL_CONFIG_HELPER; + +uint8_t +DecodeHexChar( + _In_ char c + ) { - return; + if (c >= '0' && c <= '9') return c - '0'; + if (c >= 'A' && c <= 'F') return 10 + c - 'A'; + if (c >= 'a' && c <= 'f') return 10 + c - 'a'; + return 0; +} + +uint32_t +DecodeHexBuffer( + _In_z_ const char* HexBuffer, + _In_ uint32_t OutBufferLen, + _Out_writes_to_(OutBufferLen, return) + uint8_t* OutBuffer + ) +{ + uint32_t HexBufferLen = (uint32_t)strlen(HexBuffer) / 2; + if (HexBufferLen > OutBufferLen) { + return 0; + } + + for (uint32_t i = 0; i < HexBufferLen; i++) { + OutBuffer[i] = + (DecodeHexChar(HexBuffer[i * 2]) << 4) | + DecodeHexChar(HexBuffer[i * 2 + 1]); + } + + return HexBufferLen; +} + +static BOOLEAN +msquic_load_listener_config(QUIC_SETTINGS *s, nni_quic_listener *l) +{ + QUIC_SETTINGS Settings = *s; + + // Configures the server's idle timeout. + Settings.IdleTimeoutMs = QUIC_IDLE_TIMEOUT_DEFAULT; + Settings.IsSet.IdleTimeoutMs = TRUE; + + // Configures the server's resumption level to allow for resumption and 0-RTT. + Settings.ServerResumptionLevel = QUIC_SERVER_RESUME_AND_ZERORTT; + Settings.IsSet.ServerResumptionLevel = TRUE; + + // Configures the server's settings to allow for the peer to open a single + // bidirectional stream. By default connections are not configured to allow + // any streams from the peer. + Settings.PeerBidiStreamCount = 1; + Settings.IsSet.PeerBidiStreamCount = TRUE; + + QUIC_CREDENTIAL_CONFIG_HELPER Config; + memset(&Config, 0, sizeof(Config)); + Config.CredConfig.Flags = QUIC_CREDENTIAL_FLAG_NONE; + + const char* Cert; + const char* KeyFile; + if ((Cert = l->cert_hash) != NULL) { + // Load the server's certificate from the default certificate store, + // using the provided certificate hash. + uint32_t CertHashLen = + DecodeHexBuffer( + Cert, + sizeof(Config.CertHash.ShaHash), + Config.CertHash.ShaHash); + if (CertHashLen != sizeof(Config.CertHash.ShaHash)) { + return FALSE; + } + Config.CredConfig.Type = QUIC_CREDENTIAL_TYPE_CERTIFICATE_HASH; + Config.CredConfig.CertificateHash = &Config.CertHash; + + } else if ((Cert = l->cert_fpath) != NULL && + (KeyFile = l->key_fpath) != NULL) { + // Loads the server's certificate from the file. + const char* Password = l->pwd; + if (Password != NULL) { + Config.CertFileProtected.CertificateFile = (char*)Cert; + Config.CertFileProtected.PrivateKeyFile = (char*)KeyFile; + Config.CertFileProtected.PrivateKeyPassword = (char*)Password; + Config.CredConfig.Type = QUIC_CREDENTIAL_TYPE_CERTIFICATE_FILE_PROTECTED; + Config.CredConfig.CertificateFileProtected = &Config.CertFileProtected; + } else { + Config.CertFile.CertificateFile = (char*)Cert; + Config.CertFile.PrivateKeyFile = (char*)KeyFile; + Config.CredConfig.Type = QUIC_CREDENTIAL_TYPE_CERTIFICATE_FILE; + Config.CredConfig.CertificateFile = &Config.CertFile; + } + + } else { + log_error("Must specify ['-cert_hash'] or ['cert_file' and 'key_file' (and optionally 'password')]!\n"); + return FALSE; + } + + // Allocate/initialize the configuration object, with the configured ALPN + // and settings. + QUIC_STATUS Status = QUIC_STATUS_SUCCESS; + if (QUIC_FAILED(Status = MsQuic->ConfigurationOpen(registration, + &quic_alpn, 1, &Settings, sizeof(Settings), NULL, &configuration))) { + log_error("ConfigurationOpen failed, 0x%x!\n", Status); + return FALSE; + } + + // Loads the TLS credential part of the configuration. + if (QUIC_FAILED(Status = MsQuic->ConfigurationLoadCredential(configuration, &Config.CredConfig))) { + log_error("ConfigurationLoadCredential failed, 0x%x!\n", Status); + return FALSE; + } + + return TRUE; } _IRQL_requires_max_(DISPATCH_LEVEL) @@ -874,9 +990,10 @@ msquic_listener_cb(_In_ HQUIC ql, _In_opt_ void *arg, _Inout_ QUIC_LISTENER_EVEN const QUIC_NEW_CONNECTION_INFO *qinfo; QUIC_STATUS rv = QUIC_STATUS_NOT_SUPPORTED; nni_quic_listener *l = arg; - nni_aio *aio; nni_quic_session *ss; + NNI_ARG_UNUSED(ql); + switch (ev->Type) { case QUIC_LISTENER_EVENT_NEW_CONNECTION: qconn = ev->NEW_CONNECTION.Connection; @@ -887,6 +1004,7 @@ msquic_listener_cb(_In_ HQUIC ql, _In_opt_ void *arg, _Inout_ QUIC_LISTENER_EVEN log_error("error in alloc session"); break; } + log_info("new connection incoming %p %*.s", qconn, qinfo->ClientAlpnListLength, qinfo->ClientAlpnList); MsQuic->SetCallbackHandler(qconn, msquic_connection_listener_cb, ss); rv = MsQuic->ConnectionSetConfiguration(qconn, configuration); diff --git a/src/supplemental/quic/quic_private.h b/src/supplemental/quic/quic_private.h index 0e9d5bd2c..785ed99ee 100644 --- a/src/supplemental/quic/quic_private.h +++ b/src/supplemental/quic/quic_private.h @@ -106,6 +106,14 @@ struct nni_quic_listener { bool enable_0rtt; bool enable_mltstrm; + // option 1 + char * cert_hash; + + // option 2 + char * cert_fpath; + char * key_fpath; + char * pwd; + QUIC_SETTINGS settings; }; From ac47a6614364e798b32b081c33a84046252b04b1 Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Thu, 2 Nov 2023 05:20:16 -0400 Subject: [PATCH 24/27] * FIX [quic] Fix some error in listener test. Signed-off-by: wanghaemq --- src/supplemental/quic/quic_listener.c | 4 ++++ src/supplemental/quic/quic_listener_test.c | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/supplemental/quic/quic_listener.c b/src/supplemental/quic/quic_listener.c index 9b131961f..baf5138c0 100644 --- a/src/supplemental/quic/quic_listener.c +++ b/src/supplemental/quic/quic_listener.c @@ -69,19 +69,23 @@ static int quic_listener_get( void *arg, const char *name, void *buf, size_t *szp, nni_type t) { + /* quic_listener *l = arg; if (strcmp(name, NNG_OPT_TCP_BOUND_PORT) == 0) { return (quic_listener_get_port(l, buf, szp, t)); } return (nni_quic_listener_get(l->l, name, buf, szp, t)); + */ } static int quic_listener_set( void *arg, const char *name, const void *buf, size_t sz, nni_type t) { + /* quic_listener *l = arg; return (nni_quic_listener_set(l->l, name, buf, sz, t)); + */ } static int diff --git a/src/supplemental/quic/quic_listener_test.c b/src/supplemental/quic/quic_listener_test.c index aafce468e..22b0b9dec 100644 --- a/src/supplemental/quic/quic_listener_test.c +++ b/src/supplemental/quic/quic_listener_test.c @@ -53,7 +53,7 @@ test_quic_echo(void) NUTS_TRUE((sd = nng_aio_get_output(aiod, 0)) != NULL); td = nuts_stream_send_start(sd, bufs, size); - tl = nuts_stream_send_start(sl, bufr, size); + tl = nuts_stream_recv_start(sl, bufr, size); NUTS_PASS(nuts_stream_wait(td)); NUTS_PASS(nuts_stream_wait(tl)); From d40aff7edbba7e9dad00ab61371fd91db5340b41 Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Thu, 2 Nov 2023 05:21:30 -0400 Subject: [PATCH 25/27] * FIX [quic] Fix the error that forget to call msquic_load_listener_config. Signed-off-by: wanghaemq --- src/supplemental/quic/msquic_listen.c | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/supplemental/quic/msquic_listen.c b/src/supplemental/quic/msquic_listen.c index 0e8d3e282..f5b9e01cd 100644 --- a/src/supplemental/quic/msquic_listen.c +++ b/src/supplemental/quic/msquic_listen.c @@ -178,7 +178,7 @@ quic_listener_doaccept(nni_quic_listener *l) nni_aio * aioc; nni_quic_conn * c; - // Get the connection + // Get the connection if ((aioc = nni_list_first(&l->incomings)) == NULL) { // No wait and return immediately return; @@ -1029,12 +1029,16 @@ msquic_listen(HQUIC ql, const char *h, const char *p, nni_quic_listener *l) // QuicAddrSetFamily(&addr, QUIC_ADDRESS_FAMILY_UNSPEC); // QuicAddrSetPort(&addr, atoi(p)); - if (0 != msquic_open(registration)) { + NNI_ARG_UNUSED(h); // Listen all interfaces + + if (0 != msquic_open()) { // so... close the quic connection return (NNG_ESYSERR); } - msquic_load_listener_config(); + if (FALSE == msquic_load_listener_config(&l->settings, l)) { + return (NNG_EINVAL); + } if (QUIC_FAILED(rv = MsQuic->ListenerOpen(registration, msquic_listener_cb, (void *)l, &ql))) { log_error("error in listen open %ld", rv); From eaac7047f306fde99489f21e3369b01c73bddeb8 Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Thu, 2 Nov 2023 06:03:43 -0400 Subject: [PATCH 26/27] * FIX [quic] Fix the error in idletimeout. Signed-off-by: wanghaemq --- src/supplemental/quic/msquic_listen.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/supplemental/quic/msquic_listen.c b/src/supplemental/quic/msquic_listen.c index f5b9e01cd..57bb02952 100644 --- a/src/supplemental/quic/msquic_listen.c +++ b/src/supplemental/quic/msquic_listen.c @@ -83,7 +83,7 @@ nni_quic_listener_init(void **argp) nni_atomic_inc64(&l->ref); // 0RTT is disabled by default - l->enable_0rtt = false; + l->enable_0rtt = true; // multi_stream is disabled by default l->enable_mltstrm = false; @@ -656,7 +656,7 @@ msquic_load_listener_config(QUIC_SETTINGS *s, nni_quic_listener *l) QUIC_SETTINGS Settings = *s; // Configures the server's idle timeout. - Settings.IdleTimeoutMs = QUIC_IDLE_TIMEOUT_DEFAULT; + Settings.IdleTimeoutMs = QUIC_IDLE_TIMEOUT_DEFAULT * 1000; Settings.IsSet.IdleTimeoutMs = TRUE; // Configures the server's resumption level to allow for resumption and 0-RTT. From 90c3e7fb81ac04f686ede5d7740593dceb7cf5e4 Mon Sep 17 00:00:00 2001 From: wanghaemq Date: Thu, 2 Nov 2023 07:04:10 -0400 Subject: [PATCH 27/27] * FIX [quic] The first state recved is QUIC_STREAM_EVENT_RECEIVED rather than QUIC_STREAM_EVENT_START_COMPLETE. So add a flag to fix the this problem. Signed-off-by: wanghaemq --- src/supplemental/quic/msquic_listen.c | 13 ++++++++++++- src/supplemental/quic/quic_private.h | 1 + 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/src/supplemental/quic/msquic_listen.c b/src/supplemental/quic/msquic_listen.c index 57bb02952..c4e9fef6f 100644 --- a/src/supplemental/quic/msquic_listen.c +++ b/src/supplemental/quic/msquic_listen.c @@ -284,6 +284,11 @@ quic_stream_cb(int events, void *arg) case QUIC_STREAM_EVENT_START_COMPLETE: nni_mtx_lock(&l->mtx); + if (c->started == true) { + nni_mtx_unlock(&l->mtx); + break; + } + // Push connection to incomings nni_aio_alloc(&aio, NULL, NULL); nni_aio_set_prov_data(aio, (void *)c); @@ -291,7 +296,10 @@ quic_stream_cb(int events, void *arg) quic_listener_doaccept(l); - nni_mtx_unlock(&ss->mtx); + // This stream is started from now. + c->started = true; + + nni_mtx_unlock(&l->mtx); break; // case QUIC_STREAM_EVENT_RECEIVE: // get a fin from stream // TODO Need more talk about those cases @@ -569,6 +577,7 @@ nni_msquic_quic_listener_conn_alloc(nni_quic_conn **cp, nni_quic_session *ss) c->dialer = NULL; c->listener = ss->listener; c->session = ss; + c->started = false; nni_mtx_init(&c->mtx); nni_aio_list_init(&c->readq); @@ -936,6 +945,8 @@ msquic_connection_listener_cb(_In_ HQUIC Connection, _In_opt_ void *Context, log_info("[conn][%p] Peer stream %p started. flags %d.", qconn, qstrm, flags); MsQuic->SetCallbackHandler(qstrm, (void *)msquic_strm_listener_cb, c); + quic_stream_cb(QUIC_STREAM_EVENT_START_COMPLETE, c); + break; case QUIC_CONNECTION_EVENT_RESUMED: // TODO diff --git a/src/supplemental/quic/quic_private.h b/src/supplemental/quic/quic_private.h index 785ed99ee..508478bcc 100644 --- a/src/supplemental/quic/quic_private.h +++ b/src/supplemental/quic/quic_private.h @@ -137,6 +137,7 @@ struct nni_quic_conn { bool closed; nni_mtx mtx; nni_aio * dial_aio; + bool started; // nni_aio * qstrmaio; // Link to msquic_strm_cb nni_quic_dialer *dialer;