Skip to content

Commit

Permalink
Always run channel setup callback on pinned event loop if one exists (#…
Browse files Browse the repository at this point in the history
…618)

Co-authored-by: Bret Ambrose <[email protected]>
  • Loading branch information
bretambrose and Bret Ambrose authored Jan 10, 2024
1 parent 64c06af commit 6225ebb
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 1 deletion.
75 changes: 74 additions & 1 deletion source/channel_bootstrap.c
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,14 @@ static struct aws_event_loop *s_get_connection_event_loop(struct client_connecti
return aws_event_loop_group_get_next_loop(args->bootstrap->event_loop_group);
}

static void s_connection_args_setup_callback(
static void s_connect_args_setup_callback_safe(
struct client_connection_args *args,
int error_code,
struct aws_channel *channel) {

AWS_FATAL_ASSERT(
(args->requested_event_loop == NULL) || aws_event_loop_thread_is_callers_thread(args->requested_event_loop));

/* setup_callback is always called exactly once */
AWS_FATAL_ASSERT(!args->setup_called);

Expand All @@ -200,6 +204,75 @@ static void s_connection_args_setup_callback(
s_client_connection_args_release(args);
}

struct aws_connection_args_setup_callback_task {
struct aws_allocator *allocator;
struct aws_task task;
struct client_connection_args *args;
int error_code;
struct aws_channel *channel;
};

static void s_aws_connection_args_setup_callback_task_delete(struct aws_connection_args_setup_callback_task *task) {
if (task == NULL) {
return;
}

s_client_connection_args_release(task->args);
if (task->channel) {
aws_channel_release_hold(task->channel);
}

aws_mem_release(task->allocator, task);
}

void s_aws_connection_args_setup_callback_task_fn(struct aws_task *task, void *arg, enum aws_task_status status) {
(void)task;

struct aws_connection_args_setup_callback_task *callback_task = arg;

if (status == AWS_TASK_STATUS_RUN_READY) {
s_connect_args_setup_callback_safe(callback_task->args, callback_task->error_code, callback_task->channel);
}

s_aws_connection_args_setup_callback_task_delete(callback_task);
}

static struct aws_connection_args_setup_callback_task *s_aws_connection_args_setup_callback_task_new(
struct aws_allocator *allocator,
struct client_connection_args *args,
int error_code,
struct aws_channel *channel) {

struct aws_connection_args_setup_callback_task *task =
aws_mem_calloc(allocator, 1, sizeof(struct aws_connection_args_setup_callback_task));
task->allocator = allocator;
task->args = s_client_connection_args_acquire(args);
task->error_code = error_code;
task->channel = channel;
if (channel != NULL) {
aws_channel_acquire_hold(channel);
}

aws_task_init(
&task->task, s_aws_connection_args_setup_callback_task_fn, task, "safe connection args setup callback");

return task;
}

static void s_connection_args_setup_callback(
struct client_connection_args *args,
int error_code,
struct aws_channel *channel) {

if (args->requested_event_loop == NULL || aws_event_loop_thread_is_callers_thread(args->requested_event_loop)) {
s_connect_args_setup_callback_safe(args, error_code, channel);
} else {
struct aws_connection_args_setup_callback_task *callback_task =
s_aws_connection_args_setup_callback_task_new(args->bootstrap->allocator, args, error_code, channel);
aws_event_loop_schedule_task_now(args->requested_event_loop, &callback_task->task);
}
}

static void s_connection_args_creation_callback(struct client_connection_args *args, struct aws_channel *channel) {

AWS_FATAL_ASSERT(channel != NULL);
Expand Down
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ add_test_case(pem_sanitize_wrong_format_rejected)
add_test_case(socket_handler_echo_and_backpressure)
add_test_case(socket_handler_close)
add_test_case(socket_pinned_event_loop)
add_net_test_case(socket_pinned_event_loop_dns_failure)

if(NOT BYO_CRYPTO)
if(USE_S2N)
Expand Down
105 changes: 105 additions & 0 deletions tests/socket_handler_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ struct socket_common_tester {
struct aws_event_loop_group *el_group;
struct aws_atomic_var current_time_ns;
struct aws_atomic_var stats_handler;

bool setup_called;
struct aws_event_loop *requested_callback_event_loop;
int setup_error_code;
};

static struct socket_common_tester c_tester;
Expand Down Expand Up @@ -399,6 +403,107 @@ static int s_socket_pinned_event_loop_test(struct aws_allocator *allocator, void

AWS_TEST_CASE(socket_pinned_event_loop, s_socket_pinned_event_loop_test)

static void s_dns_failure_test_client_setup_callback(
struct aws_client_bootstrap *bootstrap,
int error_code,
struct aws_channel *channel,
void *user_data) {

(void)bootstrap;
(void)channel;

struct socket_common_tester *socket_tester = (struct socket_common_tester *)user_data;

aws_mutex_lock(&socket_tester->mutex);

socket_tester->setup_error_code = error_code;
socket_tester->setup_called = true;
AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(socket_tester->requested_callback_event_loop));
AWS_FATAL_ASSERT(channel == NULL);

aws_mutex_unlock(&socket_tester->mutex);
aws_condition_variable_notify_one(&socket_tester->condition_variable);
}

static void s_dns_failure_handler_test_client_shutdown_callback(
struct aws_client_bootstrap *bootstrap,
int error_code,
struct aws_channel *channel,
void *user_data) {

(void)error_code;
(void)bootstrap;
(void)channel;
(void)user_data;

// Should never be called
AWS_FATAL_ASSERT(false);
}

static bool s_dns_failure_channel_setup_predicate(void *user_data) {
struct socket_common_tester *socket_tester = (struct socket_common_tester *)user_data;
return socket_tester->setup_called;
}

static int s_socket_pinned_event_loop_dns_failure_test(struct aws_allocator *allocator, void *ctx) {
(void)ctx;

s_socket_common_tester_init(allocator, &c_tester);

struct aws_host_resolver_default_options resolver_options = {
.el_group = c_tester.el_group,
.max_entries = 8,
};
struct aws_host_resolver *resolver = aws_host_resolver_new_default(allocator, &resolver_options);

struct aws_client_bootstrap_options client_bootstrap_options = {
.event_loop_group = c_tester.el_group,
.host_resolver = resolver,
};
struct aws_client_bootstrap *client_bootstrap = aws_client_bootstrap_new(allocator, &client_bootstrap_options);
ASSERT_NOT_NULL(client_bootstrap);

struct aws_event_loop *pinned_event_loop = aws_event_loop_group_get_next_loop(c_tester.el_group);
c_tester.requested_callback_event_loop = pinned_event_loop;

struct aws_socket_options socket_options = {
.domain = AWS_SOCKET_IPV4,
.type = AWS_SOCKET_STREAM,
.connect_timeout_ms = 10000,
};

struct aws_socket_channel_bootstrap_options client_channel_options;
AWS_ZERO_STRUCT(client_channel_options);
client_channel_options.bootstrap = client_bootstrap;
client_channel_options.host_name = "notavalid.domain-seriously.uffda";
client_channel_options.port = 443;
client_channel_options.socket_options = &socket_options;
client_channel_options.setup_callback = s_dns_failure_test_client_setup_callback;
client_channel_options.shutdown_callback = s_dns_failure_handler_test_client_shutdown_callback;
client_channel_options.enable_read_back_pressure = false;
client_channel_options.requested_event_loop = pinned_event_loop;
client_channel_options.user_data = &c_tester;

ASSERT_SUCCESS(aws_client_bootstrap_new_socket_channel(&client_channel_options));

ASSERT_SUCCESS(aws_mutex_lock(&c_tester.mutex));
ASSERT_SUCCESS(aws_condition_variable_wait_pred(
&c_tester.condition_variable, &c_tester.mutex, s_dns_failure_channel_setup_predicate, &c_tester));

/* Verify the setup callback failure was on the requested event loop */
ASSERT_TRUE(c_tester.setup_error_code != 0);

aws_mutex_unlock(&c_tester.mutex);

aws_client_bootstrap_release(client_bootstrap);
aws_host_resolver_release(resolver);
ASSERT_SUCCESS(s_socket_common_tester_clean_up(&c_tester));

return AWS_OP_SUCCESS;
}

AWS_TEST_CASE(socket_pinned_event_loop_dns_failure, s_socket_pinned_event_loop_dns_failure_test)

static int s_socket_echo_and_backpressure_test(struct aws_allocator *allocator, void *ctx) {
(void)ctx;

Expand Down

0 comments on commit 6225ebb

Please sign in to comment.