Skip to content

Commit

Permalink
logproto: remove handshake_in_progress method
Browse files Browse the repository at this point in the history
Instead of having to call an extra virtual method for each I/O event,
bundle this information into the handshake call itself and track whether
handshake is in progress in the caller.

Signed-off-by: Balazs Scheidler <[email protected]>
  • Loading branch information
bazsi committed Feb 4, 2024
1 parent 885f23a commit 10882c5
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 64 deletions.
17 changes: 2 additions & 15 deletions lib/logproto/logproto-auto-server.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,25 +86,13 @@ log_proto_auto_server_fetch(LogProtoServer *s, const guchar **msg, gsize *msg_le
g_assert_not_reached();
}

static gboolean
log_proto_auto_handshake_in_progress(LogProtoServer *s)
{
LogProtoAutoServer *self = (LogProtoAutoServer *) s;

if (self->proto_impl)
return log_proto_server_handshake_in_progress(self->proto_impl);

/* as long as the auto detection is not yet finished we are in handshake mode */
return TRUE;
}

static LogProtoStatus
log_proto_auto_handshake(LogProtoServer *s)
log_proto_auto_handshake(LogProtoServer *s, gboolean *handshake_finished)
{
LogProtoAutoServer *self = (LogProtoAutoServer *) s;
/* allow the impl to do its handshake */
if (self->proto_impl)
return log_proto_server_handshake(self->proto_impl);
return log_proto_server_handshake(self->proto_impl, handshake_finished);

gchar detect_buffer[1];
gint rc;
Expand Down Expand Up @@ -142,7 +130,6 @@ log_proto_auto_server_new(LogTransport *transport, const LogProtoServerOptions *
LogProtoAutoServer *self = g_new0(LogProtoAutoServer, 1);

log_proto_server_init(&self->super, transport, options);
self->super.handshake_in_progess = log_proto_auto_handshake_in_progress;
self->super.handshake = log_proto_auto_handshake;
self->super.prepare = log_proto_auto_server_prepare;
self->super.fetch = log_proto_auto_server_fetch;
Expand Down
12 changes: 2 additions & 10 deletions lib/logproto/logproto-proxied-text-server.c
Original file line number Diff line number Diff line change
Expand Up @@ -524,13 +524,13 @@ log_proto_proxied_text_server_prepare(LogProtoServer *s, GIOCondition *cond, gin
}

static LogProtoStatus
log_proto_proxied_text_server_handshake(LogProtoServer *s)
log_proto_proxied_text_server_handshake(LogProtoServer *s, gboolean *handshake_finished)
{
LogProtoProxiedTextServer *self = (LogProtoProxiedTextServer *) s;

LogProtoStatus status = _fetch_into_proxy_buffer(self);

self->handshake_done = (status == LPS_SUCCESS);
self->handshake_done = *handshake_finished = (status == LPS_SUCCESS);
if (status != LPS_SUCCESS)
return status;

Expand Down Expand Up @@ -558,13 +558,6 @@ log_proto_proxied_text_server_handshake(LogProtoServer *s)
}
}

static gboolean
log_proto_proxied_text_server_handshake_in_progress(LogProtoServer *s)
{
LogProtoProxiedTextServer *self = (LogProtoProxiedTextServer *) s;
return !self->handshake_done;
}

static void
_augment_aux_data(LogProtoProxiedTextServer *self, LogTransportAuxData *aux)
{
Expand Down Expand Up @@ -626,7 +619,6 @@ log_proto_proxied_text_server_init(LogProtoProxiedTextServer *self, LogTransport
log_proto_text_server_init(&self->super, transport, options);

self->super.super.super.prepare = log_proto_proxied_text_server_prepare;
self->super.super.super.handshake_in_progess = log_proto_proxied_text_server_handshake_in_progress;
self->super.super.super.handshake = log_proto_proxied_text_server_handshake;
self->super.super.super.fetch = log_proto_proxied_text_server_fetch;
self->super.super.super.free_fn = log_proto_proxied_text_server_free;
Expand Down
18 changes: 4 additions & 14 deletions lib/logproto/logproto-server.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ struct _LogProtoServer
LogProtoStatus (*fetch)(LogProtoServer *s, const guchar **msg, gsize *msg_len, gboolean *may_read,
LogTransportAuxData *aux, Bookmark *bookmark);
gboolean (*validate_options)(LogProtoServer *s);
gboolean (*handshake_in_progess)(LogProtoServer *s);
LogProtoStatus (*handshake)(LogProtoServer *s);
LogProtoStatus (*handshake)(LogProtoServer *s, gboolean *handshake_finished);
void (*free_fn)(LogProtoServer *s);
};

Expand All @@ -99,23 +98,14 @@ log_proto_server_validate_options(LogProtoServer *self)
return self->validate_options(self);
}

static inline gboolean
log_proto_server_handshake_in_progress(LogProtoServer *s)
{
if (s->handshake_in_progess)
{
return s->handshake_in_progess(s);
}
return FALSE;
}

static inline LogProtoStatus
log_proto_server_handshake(LogProtoServer *s)
log_proto_server_handshake(LogProtoServer *s, gboolean *handshake_finished)
{
if (s->handshake)
{
return s->handshake(s);
return s->handshake(s, handshake_finished);
}
*handshake_finished = TRUE;
return LPS_SUCCESS;
}

Expand Down
45 changes: 23 additions & 22 deletions lib/logproto/tests/test-proxy-proto.c
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,12 @@ ParameterizedTest(ProtocolHeaderTestParams *params, log_proto, test_proxy_protoc
-1, LTM_EOF),
get_inited_proto_server_options());

gboolean valid = log_proto_server_handshake(proto) == LPS_SUCCESS;
gboolean handshake_finished = FALSE;
gboolean valid = log_proto_server_handshake(proto, &handshake_finished) == LPS_SUCCESS;

if (valid)
cr_assert(handshake_finished == TRUE);

cr_assert_eq(valid, params->valid,
"This should be %s: %s", params->valid ? "valid" : "invalid", params->proxy_header);

Expand All @@ -111,7 +116,9 @@ Test(log_proto, test_proxy_protocol_handshake_and_fetch_success)
LTM_EOF);
LogProtoServer *proto = log_proto_proxied_text_server_new(transport, get_inited_proto_server_options());

cr_assert_eq(log_proto_server_handshake(proto), LPS_SUCCESS);
gboolean handshake_finished = FALSE;
cr_assert_eq(log_proto_server_handshake(proto, &handshake_finished), LPS_SUCCESS);
cr_assert(handshake_finished == TRUE);
assert_proto_server_fetch(proto, "test message", -1);

log_proto_server_free(proto);
Expand All @@ -124,7 +131,9 @@ Test(log_proto, test_proxy_protocol_handshake_failed)
LTM_EOF);
LogProtoServer *proto = log_proto_proxied_text_server_new(transport, get_inited_proto_server_options());

cr_assert_eq(log_proto_server_handshake(proto), LPS_ERROR);
gboolean handshake_finished = FALSE;
cr_assert_eq(log_proto_server_handshake(proto, &handshake_finished), LPS_ERROR);
cr_assert(handshake_finished == FALSE);

log_proto_server_free(proto);
}
Expand Down Expand Up @@ -166,7 +175,9 @@ Test(log_proto, test_proxy_protocol_aux_data)
LTM_EOF);
LogProtoServer *proto = log_proto_proxied_text_server_new(transport, get_inited_proto_server_options());

cr_assert_eq(log_proto_server_handshake(proto), LPS_SUCCESS);
gboolean handshake_finished = FALSE;
cr_assert_eq(log_proto_server_handshake(proto, &handshake_finished), LPS_SUCCESS);
cr_assert(handshake_finished == TRUE);

LogTransportAuxData aux;
log_transport_aux_data_init(&aux);
Expand All @@ -193,7 +204,9 @@ Test(log_proto, test_proxy_protocol_v2_parse_header)
),
get_inited_proto_server_options());

cr_assert_eq(log_proto_server_handshake(proto), LPS_SUCCESS, "Proxy protocol v2 parsing failed");
gboolean handshake_finished = FALSE;
cr_assert_eq(log_proto_server_handshake(proto, &handshake_finished), LPS_SUCCESS, "Proxy protocol v2 parsing failed");
cr_assert(handshake_finished == TRUE);

LogTransportAuxData aux;
log_transport_aux_data_init(&aux);
Expand All @@ -208,19 +221,6 @@ Test(log_proto, test_proxy_protocol_v2_parse_header)
log_proto_server_free(proto);
}

static void
assert_handshake_is_taking_place(LogProtoServer *proto)
{
gint timeout;
GIOCondition cond;


cr_assert_eq(log_proto_server_prepare(proto, &cond, &timeout), LPPA_POLL_IO);
cr_assert_eq(cond, G_IO_IN);

cr_assert(log_proto_server_handshake_in_progress(proto));
}

Test(log_proto, test_proxy_protocol_header_partial_read)
{
LogTransportMock *transport = (LogTransportMock *) log_transport_mock_records_new(LTM_EOF);
Expand All @@ -238,14 +238,15 @@ Test(log_proto, test_proxy_protocol_header_partial_read)
LogProtoServer *proto = log_proto_proxied_text_server_new((LogTransport *) transport,
get_inited_proto_server_options());

gboolean handshake_finished = FALSE;
for(size_t i = 0; i < length; i++)
{
assert_handshake_is_taking_place(proto);
cr_assert_eq(log_proto_server_handshake(proto), LPS_AGAIN);
cr_assert_eq(log_proto_server_handshake(proto, &handshake_finished), LPS_AGAIN);
cr_assert(handshake_finished == FALSE);
}

cr_assert_eq(log_proto_server_handshake(proto), LPS_SUCCESS);
cr_assert_not(log_proto_server_handshake_in_progress(proto));
cr_assert_eq(log_proto_server_handshake(proto, &handshake_finished), LPS_SUCCESS);
cr_assert(handshake_finished == TRUE);



Expand Down
8 changes: 6 additions & 2 deletions lib/logreader.c
Original file line number Diff line number Diff line change
Expand Up @@ -419,14 +419,17 @@ _add_aux_nvpair(const gchar *name, const gchar *value, gsize value_len, gpointer
static inline gint
log_reader_process_handshake(LogReader *self)
{
LogProtoStatus status = log_proto_server_handshake(self->proto);
gboolean handshake_finished;
LogProtoStatus status = log_proto_server_handshake(self->proto, &handshake_finished);

switch (status)
{
case LPS_EOF:
case LPS_ERROR:
return status == LPS_ERROR ? NC_READ_ERROR : NC_CLOSE;
case LPS_SUCCESS:
if (handshake_finished)
self->handshake_in_progress = FALSE;
break;
case LPS_AGAIN:
break;
Expand Down Expand Up @@ -494,7 +497,7 @@ log_reader_fetch_log(LogReader *self)
aux = NULL;

log_transport_aux_data_init(aux);
if (log_proto_server_handshake_in_progress(self->proto))
if (self->handshake_in_progress)
{
return log_reader_process_handshake(self);
}
Expand Down Expand Up @@ -768,6 +771,7 @@ log_reader_new(GlobalConfig *cfg)
self->super.schedule_dynamic_window_realloc = _schedule_dynamic_window_realloc;
self->super.metrics.raw_bytes_enabled = TRUE;
self->immediate_check = FALSE;
self->handshake_in_progress = TRUE;
log_reader_init_watches(self);
g_mutex_init(&self->pending_close_lock);
g_cond_init(&self->pending_close_cond);
Expand Down
2 changes: 1 addition & 1 deletion lib/logreader.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ struct _LogReader
{
LogSource super;
LogProtoServer *proto;
gboolean immediate_check;
gboolean immediate_check, handshake_in_progress;
LogPipe *control;
LogReaderOptions *options;
PollEvents *poll_events;
Expand Down

0 comments on commit 10882c5

Please sign in to comment.