Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Quic listener is under developping #720

Draft
wants to merge 27 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
70a5ab1
* FIX [quic] Remove repeated include headers.
wanghaEMQ Oct 30, 2023
26bc9fb
* ADD [quic] Add a private quic header file for share functionality b…
wanghaEMQ Oct 31, 2023
02f683e
* NEW [quic] Add new file msquic_common.c to share codes of common fu…
wanghaEMQ Oct 31, 2023
861ac97
* FIX [quic/listen] Include quic_private.h.
wanghaEMQ Oct 31, 2023
9866382
* NEW [quic] Move defination and relative interfaces of dialer/listen…
wanghaEMQ Oct 31, 2023
695173c
* NEW [quic] Move definatio of quic connection structure from dialer …
wanghaEMQ Oct 31, 2023
ff0afca
* FIX [quic] Create two interfaces to alloc connection for dialer and…
wanghaEMQ Oct 31, 2023
cdecdd9
* NEW [quic] Move some interface about msquic strream and connection …
wanghaEMQ Oct 31, 2023
f448caa
* NEW [quic] Add nni_msquic_quic_listener_conn_alloc for creating con…
wanghaEMQ Oct 31, 2023
6992c07
* FIX [quic] Fix the errors in building.
wanghaEMQ Nov 1, 2023
bc472fe
* NEW [quic] Add handler for QUIC_CONNECTION_EVENT_PEER_STREAM_STARTE…
wanghaEMQ Nov 1, 2023
49dbf46
* NEW [quic] Listener should get a handle of a quic stream rather tha…
wanghaEMQ Nov 1, 2023
e40133e
* NEW [quic] Add quic_session the manage streams under a connection.
wanghaEMQ Nov 1, 2023
db31982
* NEW [quic] Add new interface quic_listener_session_alloc to alloc a…
wanghaEMQ Nov 1, 2023
041f867
* FIX [quic] Fix a error in comments.
wanghaEMQ Nov 1, 2023
f031b0a
* NEW [quic] Add quic stream related interfaces.
wanghaEMQ Nov 1, 2023
92de198
* FIX [quic] Fix the error in building.
wanghaEMQ Nov 1, 2023
8f5f7dd
* FIX [quic] Fix the error that call wrong interface and fix the impl…
wanghaEMQ Nov 1, 2023
8b0d6c1
* FIX [quic] Fix the multiple definations and Unreference errors.
wanghaEMQ Nov 2, 2023
9930b64
* NEW [quic] Add unit test about echo for quic dialer and listener.
wanghaEMQ Nov 2, 2023
afed113
* FIX [quic] Fix the null host and port.
wanghaEMQ Nov 2, 2023
0af9782
* FIX [quic] Dialer and Listener should have its own msquic_open.
wanghaEMQ Nov 2, 2023
977c16f
* FIX [quic] Add function to load settings for listener.
wanghaEMQ Nov 2, 2023
ac47a66
* FIX [quic] Fix some error in listener test.
wanghaEMQ Nov 2, 2023
d40aff7
* FIX [quic] Fix the error that forget to call msquic_load_listener_c…
wanghaEMQ Nov 2, 2023
eaac704
* FIX [quic] Fix the error in idletimeout.
wanghaEMQ Nov 2, 2023
90c3e7f
* FIX [quic] The first state recved is QUIC_STREAM_EVENT_RECEIVED rat…
wanghaEMQ Nov 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/supplemental/quic/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,14 @@ if (NNG_ENABLE_QUIC)
add_subdirectory(msquic)
add_dependencies(nng msquic)

nng_sources(msquic_common.c)
nng_sources(quic_private.h)

nng_sources(quic_api.c)
nng_sources(quic_api.h)

nng_sources(msquic_dial.c)

nng_test(quic_api_test)

find_path(INTERNAL_MSQUIC_INCLUDE_DIR
Expand Down
46 changes: 46 additions & 0 deletions src/supplemental/quic/msquic_common.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#include "quic_private.h"
#include "core/nng_impl.h"

static const QUIC_API_TABLE *MsQuic = NULL;

/***************************** MsQuic Bindings *****************************/

void
msquic_set_api_table(const QUIC_API_TABLE *table)
{
MsQuic = table;
}

void
msquic_conn_close(HQUIC qconn, int rv)
{
MsQuic->ConnectionShutdown(qconn, QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, (QUIC_UINT62)rv);
}

void
msquic_conn_fini(HQUIC qconn)
{
MsQuic->ConnectionClose(qconn);
}

void
msquic_strm_close(HQUIC qstrm)
{
log_info("stream %p shutdown", qstrm);
MsQuic->StreamShutdown(
qstrm, QUIC_STREAM_SHUTDOWN_FLAG_ABORT | QUIC_STREAM_SHUTDOWN_FLAG_IMMEDIATE, NNG_ECONNSHUT);
}

void
msquic_strm_fini(HQUIC qstrm)
{
log_info("stream %p fini", qstrm);
MsQuic->StreamClose(qstrm);
}

void
msquic_strm_recv_start(HQUIC qstrm)
{
MsQuic->StreamReceiveSetEnabled(qstrm, TRUE);
}

190 changes: 55 additions & 135 deletions src/supplemental/quic/msquic_dial.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
// closed.

#include "quic_api.h"
#include "quic_private.h"
#include "core/nng_impl.h"
#include "msquic.h"

Expand All @@ -39,65 +40,17 @@
#include <time.h>
#include <unistd.h>

#include "nng/mqtt/mqtt_client.h"
#include "nng/supplemental/nanolib/conf.h"
#include "nng/protocol/mqtt/mqtt_parser.h"
#include "supplemental/mqtt/mqtt_msg.h"

#include "openssl/pem.h"
#include "openssl/x509.h"

#include <assert.h>
#include <errno.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>

struct nni_quic_conn {
nng_stream stream;
nni_list readq;
nni_list writeq;
bool closed;
nni_mtx mtx;
nni_aio * dial_aio;
// nni_aio * qstrmaio; // Link to msquic_strm_cb
nni_quic_dialer *dialer;

// MsQuic
HQUIC qstrm; // quic stream
uint8_t reason_code;

nni_reap_node reap;
};

static const QUIC_API_TABLE *MsQuic = NULL;

// Config for msquic
static const QUIC_REGISTRATION_CONFIG quic_reg_config = {
"mqtt",
QUIC_EXECUTION_PROFILE_LOW_LATENCY
};

static const QUIC_BUFFER quic_alpn = {
sizeof("mqtt") - 1,
(uint8_t *) "mqtt"
};

HQUIC registration;
HQUIC configuration;
// The registration and configuration for dialer
static HQUIC registration;
static HQUIC configuration;

static int msquic_open();
static void msquic_close();

static int msquic_conn_open(const char *host, const char *port, nni_quic_dialer *d);
static void msquic_conn_close(HQUIC qconn, int rv);
static void msquic_conn_fini(HQUIC qconn);
static int msquic_strm_open(HQUIC qconn, nni_quic_dialer *d);
static void msquic_strm_close(HQUIC qstrm);
static void msquic_strm_fini(HQUIC qstrm);
static void msquic_strm_recv_start(HQUIC qstrm);

static void quic_dialer_cb(void *arg);
static void quic_stream_error(void *arg, int err);
Expand Down Expand Up @@ -270,7 +223,7 @@ nni_quic_dial(void *arg, const char *host, const char *port, nni_aio *aio)
nni_atomic_inc64(&d->ref);

// Create a connection whenever dial. So it's okey. right?
if ((rv = nni_msquic_quic_alloc(&c, d)) != 0) {
if ((rv = nni_msquic_quic_dialer_conn_alloc(&c, d)) != 0) {
nni_aio_finish_error(aio, rv);
nni_msquic_quic_dialer_rele(d);
return;
Expand Down Expand Up @@ -711,15 +664,16 @@ quic_stream_set(void *arg, const char *name, const void *buf, size_t sz, nni_typ
}

int
nni_msquic_quic_alloc(nni_quic_conn **cp, nni_quic_dialer *d)
nni_msquic_quic_dialer_conn_alloc(nni_quic_conn **cp, nni_quic_dialer *d)
{
nni_quic_conn *c;
if ((c = NNI_ALLOC_STRUCT(c)) == NULL) {
return (NNG_ENOMEM);
}

c->closed = false;
c->dialer = d;
c->closed = false;
c->dialer = d;
c->listener = NULL;

nni_mtx_init(&c->mtx);
nni_aio_list_init(&c->readq);
Expand Down Expand Up @@ -860,8 +814,6 @@ msquic_connection_cb(_In_ HQUIC Connection, _In_opt_ void *Context,
return QUIC_STATUS_SUCCESS;
}

// The clients's callback for stream events from MsQuic.
// New recv cb of quic transport
_IRQL_requires_max_(DISPATCH_LEVEL)
_Function_class_(QUIC_STREAM_CALLBACK) QUIC_STATUS QUIC_API
msquic_strm_cb(_In_ HQUIC stream, _In_opt_ void *Context,
Expand Down Expand Up @@ -963,8 +915,7 @@ msquic_strm_cb(_In_ HQUIC stream, _In_opt_ void *Context,

return QUIC_STATUS_PENDING;
case QUIC_STREAM_EVENT_PEER_SEND_ABORTED:
// The peer gracefully shut down its send direction of the
// stream.
// The peer aborted its send direction of the stream.
log_warn("[strm][%p] PEER_SEND_ABORTED errorcode %llu\n", stream,
(unsigned long long) Event->PEER_SEND_ABORTED.ErrorCode);
if (c->reason_code == 0)
Expand All @@ -973,7 +924,7 @@ msquic_strm_cb(_In_ HQUIC stream, _In_opt_ void *Context,
quic_stream_cb(QUIC_STREAM_EVENT_PEER_SEND_ABORTED, c);
break;
case QUIC_STREAM_EVENT_PEER_SEND_SHUTDOWN:
// The peer aborted its send direction of the stream.
// The peer gracefully shut down its send direction of the stream.
log_warn("[strm][%p] Peer send shut down\n", stream);
MsQuic->StreamShutdown(stream, QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL, 0);
quic_stream_cb(QUIC_STREAM_EVENT_PEER_SEND_SHUTDOWN, c);
Expand Down Expand Up @@ -1027,55 +978,6 @@ msquic_strm_cb(_In_ HQUIC stream, _In_opt_ void *Context,
return QUIC_STATUS_SUCCESS;
}

static int is_msquic_inited = 0;

static void
msquic_close()
{
if (MsQuic != NULL) {
if (configuration != NULL) {
MsQuic->ConfigurationClose(configuration);
}
if (registration != NULL) {
// This will block until all outstanding child objects
// have been closed.
MsQuic->RegistrationClose(registration);
}
MsQuicClose(MsQuic);
is_msquic_inited = 0;
}
}

static int
msquic_open()
{
if (is_msquic_inited == 1)
return 0;

QUIC_STATUS rv = QUIC_STATUS_SUCCESS;
// only Open MsQUIC lib once, otherwise cause memleak
if (MsQuic == NULL)
if (QUIC_FAILED(rv = MsQuicOpen2(&MsQuic))) {
log_error("MsQuicOpen2 failed, 0x%x!\n", rv);
goto error;
}

// Create a registration for the app's connections.
rv = MsQuic->RegistrationOpen(&quic_reg_config, &registration);
if (QUIC_FAILED(rv)) {
log_error("RegistrationOpen failed, 0x%x!\n", rv);
goto error;
}

is_msquic_inited = 1;
log_info("Msquic is enabled");
return 0;

error:
msquic_close();
return -1;
}

// Helper function to load a client configuration.
static BOOLEAN
msquic_load_config(QUIC_SETTINGS *settings, nni_quic_dialer *d)
Expand Down Expand Up @@ -1195,18 +1097,6 @@ msquic_conn_open(const char *host, const char *port, nni_quic_dialer *d)
return (NNG_ECONNREFUSED);
}

static void
msquic_conn_close(HQUIC qconn, int rv)
{
MsQuic->ConnectionShutdown(qconn, QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, (QUIC_UINT62)rv);
}

static void
msquic_conn_fini(HQUIC qconn)
{
MsQuic->ConnectionClose(qconn);
}

static int
msquic_strm_open(HQUIC qconn, nni_quic_dialer *d)
{
Expand Down Expand Up @@ -1243,23 +1133,53 @@ msquic_strm_open(HQUIC qconn, nni_quic_dialer *d)
return (NNG_ECLOSED);
}

static void
msquic_strm_close(HQUIC qstrm)
{
log_info("stream %p shutdown", qstrm);
MsQuic->StreamShutdown(
qstrm, QUIC_STREAM_SHUTDOWN_FLAG_ABORT | QUIC_STREAM_SHUTDOWN_FLAG_IMMEDIATE, NNG_ECONNSHUT);
}
static int is_msquic_inited = 0;

static void
msquic_strm_fini(HQUIC qstrm)
msquic_close()
{
log_info("stream %p fini", qstrm);
MsQuic->StreamClose(qstrm);
if (MsQuic != NULL) {
if (configuration != NULL) {
MsQuic->ConfigurationClose(configuration);
}
if (registration != NULL) {
// This will block until all outstanding child objects
// have been closed.
MsQuic->RegistrationClose(registration);
}
MsQuicClose(MsQuic);
is_msquic_inited = 0;
}
}

static void
msquic_strm_recv_start(HQUIC qstrm)
static int
msquic_open()
{
MsQuic->StreamReceiveSetEnabled(qstrm, TRUE);
if (is_msquic_inited == 1)
return 0;

QUIC_STATUS rv = QUIC_STATUS_SUCCESS;
// only Open MsQUIC lib once, otherwise cause memleak
if (MsQuic == NULL)
if (QUIC_FAILED(rv = MsQuicOpen2(&MsQuic))) {
log_error("MsQuicOpen2 failed, 0x%x!\n", rv);
goto error;
}
msquic_set_api_table(MsQuic);

// Create a registration for the app's connections.
rv = MsQuic->RegistrationOpen(&quic_reg_config, &registration);
if (QUIC_FAILED(rv)) {
log_error("RegistrationOpen failed, 0x%x!\n", rv);
goto error;
}

is_msquic_inited = 1;
log_info("Msquic is enabled");
return 0;

error:
msquic_close(registration, NULL);
return -1;
}

Loading
Loading