Skip to content

Commit

Permalink
Merge pull request #216 from openziti/pull-uv-mbed-refactor
Browse files Browse the repository at this point in the history
improve EdgeRouter reconnect process
  • Loading branch information
ekoby authored Jan 20, 2021
2 parents 7aee3ff + 0f8d07e commit 776d4f3
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 44 deletions.
2 changes: 1 addition & 1 deletion deps/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ include(FetchContent)

FetchContent_Declare(uv-mbed
GIT_REPOSITORY https://github.com/netfoundry/uv-mbed.git
GIT_TAG v0.11.4
GIT_TAG v0.12.0
)
set(ENABLE_UM_TESTS OFF CACHE BOOL "" FORCE)
FetchContent_MakeAvailable(uv-mbed)
Expand Down
10 changes: 5 additions & 5 deletions inc_internal/zt_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,19 @@ typedef struct ziti_channel {
char token[UUID_STR_LEN];
uv_mbed_t connection;

// dual purpose timer:
// multi purpose timer:
// - reconnect timeout if not connected
// - latency timeout if connected
// - connect timeout when connecting
// - latency interval/timeout if connected
uv_timer_t timer;

uint64_t latency;
struct waiter_s *latency_waiter;

ch_state state;
uint32_t reconnect_count;

struct ch_conn_req **conn_reqs;
int conn_reqs_n;

LIST_HEAD(conn_reqs, ch_conn_req) conn_reqs;
uint32_t msg_seq;

buffer *incoming;
Expand Down
9 changes: 9 additions & 0 deletions includes/ziti/ziti.h
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,15 @@ extern int ziti_set_timeout(ziti_context ztx, int timeout);
ZITI_FUNC
extern int ziti_shutdown(ziti_context ztx);

/**
* @brief Frees memory allocated for the given context and nulls out the handle.
*
* @param ctxp pointer to Ziti context handle
* @return #ZITI_OK or corresponding #ZITI_ERRORS
*/
ZITI_FUNC
int ziti_ctx_free(ziti_context *ctxp) ;

/**
* @brief Shutdown Ziti Edge identity context and reclaim the memory from the provided #ziti_context.
*
Expand Down
80 changes: 54 additions & 26 deletions library/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ limitations under the License.
#define MAXHOSTNAMELEN 255
#endif

#define CONNECT_TIMEOUT (20*1000)
#define LATENCY_TIMEOUT (10*1000)
#define LATENCY_INTERVAL (60*1000) /* 1 minute */
#define BACKOFF_TIME 3000 /* 3 seconds */
#define MAX_BACKOFF 5 /* max reconnection timeout: (1 << 5) * BACKOFF_TIME = 96 seconds */

Expand Down Expand Up @@ -66,6 +69,10 @@ static void async_write(uv_async_t *ar);

static struct msg_receiver *find_receiver(ziti_channel_t *ch, uint32_t conn_id);

static void on_channel_close(ziti_channel_t *ch, ssize_t code);

static void send_latency_probe(uv_timer_t *t);

struct async_write_req {
uv_buf_t buf;
ziti_channel_t *ch;
Expand All @@ -82,6 +89,7 @@ struct waiter_s {
struct ch_conn_req {
ch_connect_cb cb;
void *ctx;
LIST_ENTRY(ch_conn_req) next;
};

struct msg_receiver {
Expand All @@ -107,8 +115,7 @@ static int ziti_channel_init(struct ziti_ctx *ctx, ziti_channel_t *ch, uint32_t

ch->state = Initial;
// 32 concurrent connect requests for the same channel is probably enough
ch->conn_reqs = calloc(32, sizeof(struct ch_conn_req *));
ch->conn_reqs_n = 0;
LIST_INIT(&ch->conn_reqs);

ch->name = NULL;
ch->in_next = NULL;
Expand All @@ -119,9 +126,9 @@ static int ziti_channel_init(struct ziti_ctx *ctx, ziti_channel_t *ch, uint32_t
LIST_INIT(&ch->waiters);

uv_mbed_init(ch->loop, &ch->connection, tls);
uv_mbed_keepalive(&ch->connection, true, 60);
uv_mbed_keepalive(&ch->connection, true, ctx->opts->router_keepalive);
uv_mbed_nodelay(&ch->connection, true);
ch->connection._stream.data = ch;
ch->connection.data = ch;

uv_timer_init(ch->loop, &ch->timer);
ch->timer.data = ch;
Expand All @@ -133,7 +140,6 @@ static int ziti_channel_init(struct ziti_ctx *ctx, ziti_channel_t *ch, uint32_t
}

void ziti_channel_free(ziti_channel_t *ch) {
free(ch->conn_reqs);
free_buffer(ch->incoming);
FREE(ch->name);
FREE(ch->version);
Expand All @@ -151,7 +157,7 @@ int ziti_close_channels(struct ziti_ctx *ziti) {

static void close_handle_cb(uv_handle_t *h) {
uv_mbed_t *mbed = (uv_mbed_t *) h;
ziti_channel_t *ch = mbed->_stream.data;
ziti_channel_t *ch = mbed->data;

uv_mbed_free(mbed);
ziti_channel_free(ch);
Expand Down Expand Up @@ -237,7 +243,7 @@ int ziti_channel_connect(ziti_context ztx, const char *ch_name, const char *url,
NEWP(r, struct ch_conn_req);
r->cb = cb;
r->ctx = cb_ctx;
ch->conn_reqs[ch->conn_reqs_n++] = r;
LIST_INSERT_HEAD(&ch->conn_reqs, r, next);
}

break;
Expand Down Expand Up @@ -513,9 +519,20 @@ static void latency_reply_cb(void *ctx, message *reply) {
message_get_uint64_header(reply, LatencyProbeTime, &ts)) {
ch->latency = uv_now(ch->loop) - ts;
ZITI_LOG(VERBOSE, "ch[%d](%s) latency is now %ld", ch->id, ch->name, ch->latency);
} else {
}
else {
ZITI_LOG(WARN, "invalid latency probe result ct[%d]", reply->header.content);
}
uv_timer_start(&ch->timer, send_latency_probe, LATENCY_INTERVAL, 0);
}

static void latency_timeout(uv_timer_t *t) {
ziti_channel_t *ch = t->data;
ziti_channel_remove_waiter(ch, ch->latency_waiter);
ch->latency_waiter = NULL;
ch->latency = UINT64_MAX;

on_channel_close(ch, ZITI_TIMEOUT);
}

static void send_latency_probe(uv_timer_t *t) {
Expand All @@ -529,7 +546,9 @@ static void send_latency_probe(uv_timer_t *t) {
}
};

ziti_channel_send_for_reply(ch, ContentTypeLatencyType, headers, 1, NULL, 0, latency_reply_cb, ch);
ch->latency_waiter = ziti_channel_send_for_reply(ch, ContentTypeLatencyType,
headers, 1, NULL, 0, latency_reply_cb, ch);
uv_timer_start(t, latency_timeout, LATENCY_TIMEOUT, 0);
}

static void hello_reply_cb(void *ctx, message *msg) {
Expand Down Expand Up @@ -560,17 +579,17 @@ static void hello_reply_cb(void *ctx, message *msg) {
ch->notify_cb(ch, EdgeRouterUnavailable, ch->notify_ctx);
}

for (int i = 0; i < ch->conn_reqs_n; i++) {
struct ch_conn_req *r = ch->conn_reqs[i];
while (!LIST_EMPTY(&ch->conn_reqs)) {
struct ch_conn_req *r = LIST_FIRST(&ch->conn_reqs);
LIST_REMOVE(r, next);
r->cb(ch, r->ctx, cb_code);
free(r);
}
ch->conn_reqs_n = 0;

if (success) {
// initial latency
ch->latency = uv_now(ch->loop) - ch->latency;
uv_timer_start(&ch->timer, send_latency_probe, 0, 60 * 1000);
uv_timer_start(&ch->timer, send_latency_probe, LATENCY_INTERVAL, 0);
} else {
reconnect_channel(ch, false);
}
Expand Down Expand Up @@ -601,14 +620,20 @@ static void async_write(uv_async_t *ar) {
uv_close((uv_handle_t *) ar, (uv_close_cb) free);
}

static void connect_timeout(uv_timer_t *t) {
ziti_channel_t *ch = t->data;
on_channel_close(ch, ZITI_GATEWAY_UNAVAILABLE);
}

static void reconnect_cb(uv_timer_t *t) {
ziti_channel_t *ch = t->data;
ziti_context ztx = ch->ctx;

if (ztx->session == NULL || ztx->session->token == NULL) {
ZITI_LOG(ERROR, "ziti context is not authenticated, delaying re-connect");
reconnect_channel(ch, false);
} else {
}
else {
ch->msg_seq = 0;

uv_connect_t *req = calloc(1, sizeof(uv_connect_t));
Expand All @@ -617,12 +642,15 @@ static void reconnect_cb(uv_timer_t *t) {
ch->state = Connecting;

uv_mbed_init(ch->loop, &ch->connection, ch->connection.tls);
ch->connection._stream.data = ch;
ch->connection.data = ch;
ZITI_LOG(DEBUG, "connecting ch[%d] to %s:%d", ch->id, ch->host, ch->port);
int rc = uv_mbed_connect(req, &ch->connection, ch->host, ch->port, on_channel_connect_internal);
if (rc != 0) {
on_channel_connect_internal(req, rc);
}
else {
uv_timer_start(&ch->timer, connect_timeout, CONNECT_TIMEOUT, 0);
}
}
}

Expand All @@ -645,8 +673,10 @@ static void reconnect_channel(ziti_channel_t *ch, bool now) {

static void on_channel_close(ziti_channel_t *ch, ssize_t code) {
if (ch->state != Closed) {
if (ch->state == Connected) {
ch->notify_cb(ch, EdgeRouterDisconnected, ch->notify_ctx);
}
ch->state = Disconnected;
ch->notify_cb(ch, EdgeRouterDisconnected, ch->notify_ctx);
}

ch->latency = UINT64_MAX;
Expand All @@ -661,22 +691,21 @@ static void on_channel_close(ziti_channel_t *ch, ssize_t code) {
free(con);
}

uv_mbed_free(&ch->connection);
uv_mbed_close(&ch->connection, NULL);
if (ch->state != Closed) {
reconnect_channel(ch, false);
ziti_force_session_refresh(ch->ctx);
}
}

static void on_write(uv_write_t *req, int status) {
ZITI_LOG(TRACE, "on_write(%p,%d)", req, status);
struct async_write_req *wr = req->data;

if (status < 0) {
ZITI_LOG(ERROR, "ch[%d] write failed [status=%d] %s", wr->ch->id, status, uv_strerror(status));
uv_mbed_t *mbed = (uv_mbed_t *) req->handle;
ziti_channel_t *ch = uv_handle_get_data((const uv_handle_t *) mbed);
ziti_channel_t *ch = wr->ch;
ZITI_LOG(ERROR, "ch[%d] write failed [status=%d] %s", ch->id, status, uv_strerror(status));
on_channel_close(ch, status);

}

if (wr != NULL) {
Expand All @@ -688,7 +717,7 @@ static void on_write(uv_write_t *req, int status) {

static void on_channel_data(uv_stream_t *s, ssize_t len, const uv_buf_t *buf) {
uv_mbed_t *mbed = (uv_mbed_t *) s;
ziti_channel_t *ch = mbed->_stream.data;
ziti_channel_t *ch = mbed->data;

if (len < 0) {
free(buf->base);
Expand Down Expand Up @@ -735,13 +764,12 @@ static void on_channel_connect_internal(uv_connect_t *req, int status) {
} else {
ZITI_LOG(ERROR, "ch[%d] failed to connect[%s] [status=%d]", ch->id, ch->name, status);

for (int i = 0; i < ch->conn_reqs_n; i++) {
struct ch_conn_req *r = ch->conn_reqs[i];
while (!LIST_EMPTY(&ch->conn_reqs)) {
struct ch_conn_req *r = LIST_FIRST(&ch->conn_reqs);
LIST_REMOVE(r, next);
r->cb(ch, r->ctx, status);
free(r);
ch->conn_reqs[i] = NULL;
}
ch->conn_reqs_n = 0;
ch->state = Disconnected;
reconnect_channel(ch, false);
}
Expand Down
4 changes: 2 additions & 2 deletions library/connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ static const int MAX_CONNECT_RETRY = 3;
XX(Binding)\
XX(Bound)\
XX(Accepting)\
XX(Timedout)\
XX(CloseWrite)\
XX(Timedout)\
XX(Disconnected)\
XX(Closed)

Expand Down Expand Up @@ -599,7 +599,7 @@ static void ziti_disconnect_async(uv_async_t *ar) {
}

static int ziti_disconnect(struct ziti_conn *conn) {
if (conn->state < Disconnected) {
if (conn->state < Timedout) {
NEWP(ar, uv_async_t);
uv_async_init(conn->ziti_ctx->loop, ar, ziti_disconnect_async);
ar->data = conn;
Expand Down
15 changes: 11 additions & 4 deletions library/ziti.c
Original file line number Diff line number Diff line change
Expand Up @@ -289,21 +289,28 @@ int ziti_set_timeout(ziti_context ztx, int timeout) {
return ZITI_OK;
}

int ziti_shutdown(ziti_context ztx) {
ZITI_LOG(INFO, "Ziti is shutting down");
static void on_logout(void *msg, ziti_error *err, void *arg) {
ziti_context ztx = arg;
ZITI_LOG(DEBUG, "ctx[%s] logout %s",
ztx->session->identity->name, err ? "failed" : "success");

free_ziti_session(ztx->session);
free(ztx->session);
ztx->session = NULL;

ziti_ctrl_close(&ztx->controller);
}

int ziti_shutdown(ziti_context ztx) {
ZITI_LOG(INFO, "Ziti is shutting down");

uv_timer_stop(&ztx->refresh_timer);
uv_timer_stop(&ztx->session_timer);
uv_timer_stop(&ztx->posture_checks->timer);

ziti_ctrl_close(&ztx->controller);
ziti_close_channels(ztx);

ziti_ctrl_logout(&ztx->controller, NULL, NULL);
ziti_ctrl_logout(&ztx->controller, on_logout, ztx);
metrics_rate_close(&ztx->up_rate);
metrics_rate_close(&ztx->down_rate);

Expand Down
14 changes: 8 additions & 6 deletions programs/ziti-prox-c/proxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,10 @@ static void process_stop(uv_loop_t *loop, struct proxy_app_ctx *app_ctx) {
}

// shutdown diagnostics
uv_timer_t *shutdown_timer = malloc(sizeof(uv_timer_t));
uv_timer_init(loop, shutdown_timer);
uv_timer_start(shutdown_timer, shutdown_timer_cb, 5000, 0);
uv_unref((uv_handle_t *) shutdown_timer);
static uv_timer_t shutdown_timer;
uv_timer_init(loop, &shutdown_timer);
uv_timer_start(&shutdown_timer, shutdown_timer_cb, 5000, 0);
uv_unref((uv_handle_t *) &shutdown_timer);

// try to cleanup
ziti_shutdown(app_ctx->ziti);
Expand Down Expand Up @@ -148,7 +148,6 @@ static void signal_cb(uv_signal_t *s, int signum) {

case SIGUSR1:
debug_dump(s->data);
reporter_cb(&report_timer);
break;

default:
Expand All @@ -175,7 +174,8 @@ static void on_ziti_close(ziti_connection conn) {
clt->ziti_conn = NULL;
ZITI_LOG(INFO, "ziti connection closed for clt[%s]", clt->addr_s);
clt->closed = true;
uv_close((uv_handle_t *) tcp, close_cb);
if (!uv_is_closing((const uv_handle_t *) tcp))
uv_close((uv_handle_t *) tcp, close_cb);
}
}

Expand Down Expand Up @@ -524,6 +524,7 @@ void run(int argc, char **argv) {
.events = ZitiContextEvent|ZitiServiceEvent|ZitiRouterEvent,
.event_cb = on_ziti_event,
.refresh_interval = 60,
.router_keepalive = 10,
.app_ctx = &app_ctx,
.config_types = my_configs,
.metrics_type = INSTANT,
Expand Down Expand Up @@ -564,6 +565,7 @@ void run(int argc, char **argv) {
}

ZITI_LOG(INFO, "proxy event loop is done");
ziti_ctx_free(&app_ctx.ziti);
free(loop);
exit(excode);
}
Expand Down

0 comments on commit 776d4f3

Please sign in to comment.