Skip to content

Commit

Permalink
Migrate TcpIpChannel
Browse files Browse the repository at this point in the history
  • Loading branch information
LasseRosenow committed Dec 3, 2024
1 parent c6eba25 commit 878a945
Showing 1 changed file with 78 additions and 74 deletions.
152 changes: 78 additions & 74 deletions src/platform/posix/tcp_ip_channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ static bool _is_globals_initialized = false;
static Environment *_env;

// Forward declarations
static void _spawn_receive_thread(TcpIpChannel *self);
static void _spawn_worker_thread(TcpIpChannel *self);
static lf_ret_t _reset_socket(TcpIpChannel *self);
static void *_receive_thread(void *untyped_self);
static void *_worker_thread(void *untyped_self);

static void _update_state(TcpIpChannel *self, NetworkChannelState new_state) {
// Update the state of the channel itself
Expand All @@ -39,23 +39,6 @@ static void _update_state(TcpIpChannel *self, NetworkChannelState new_state) {

static NetworkChannelState _get_state(TcpIpChannel *self) { return self->state; }

static void _socket_set_blocking(int fd, bool blocking) {
// Set socket to blocking
int opts = fcntl(fd, F_GETFL);
if (opts < 0) {
throw("Could not get socket options");
}
if (blocking) {
opts = (opts & (~O_NONBLOCK));
} else {
opts = (opts | O_NONBLOCK);
}

if (fcntl(fd, F_SETFL, opts) < 0) {
throw("Could not set socket blocking state");
}
}

static lf_ret_t _reset_socket(TcpIpChannel *self) {
FD_ZERO(&self->set);
if (self->fd > 0) {
Expand All @@ -75,13 +58,12 @@ static lf_ret_t _reset_socket(TcpIpChannel *self) {
return LF_ERR;
}

_socket_set_blocking(self->fd, false);
_update_state(self, NETWORK_CHANNEL_STATE_OPEN);

return LF_OK;
}

static void _spawn_receive_thread(TcpIpChannel *self) {
static void _spawn_worker_thread(TcpIpChannel *self) {
int res;
LF_INFO(NET, "TCP/IP spawning callback thread");

Expand All @@ -105,44 +87,36 @@ static void _spawn_receive_thread(TcpIpChannel *self) {
throw("pthread_attr_setstack failed");
}
#endif
res = pthread_create(&self->receive_thread, &self->receive_thread_attr, _receive_thread, self);
res = pthread_create(&self->receive_thread, &self->receive_thread_attr, _worker_thread, self);
if (res < 0) {
throw("pthread_create failed");
}
}

/**
* @brief If is server: Bind and Listen for connections
* If is client: Do nothing
*/
static lf_ret_t TcpIpChannel_open_connection(NetworkChannel *untyped_self) {
TcpIpChannel *self = (TcpIpChannel *)untyped_self;
static lf_ret_t _server_bind(TcpIpChannel *self) {
struct sockaddr_in serv_addr;
serv_addr.sin_family = self->protocol_family;
serv_addr.sin_port = htons(self->port);

if (self->server) {
struct sockaddr_in serv_addr;
serv_addr.sin_family = self->protocol_family;
serv_addr.sin_port = htons(self->port);

// turn human-readable address into something the os can work with
if (inet_pton(self->protocol_family, self->host, &serv_addr.sin_addr) <= 0) {
LF_ERR(NET, "Invalid address %s", self->host);
return LF_INVALID_VALUE;
}
// turn human-readable address into something the os can work with
if (inet_pton(self->protocol_family, self->host, &serv_addr.sin_addr) <= 0) {
LF_ERR(NET, "Invalid address %s", self->host);
return LF_INVALID_VALUE;
}

// bind the socket to that address
int ret = bind(self->fd, (struct sockaddr *)&serv_addr, sizeof(serv_addr));
if (ret < 0) {
LF_ERR(NET, "Could not bind to %s:%d", self->host, self->port);
_update_state(self, NETWORK_CHANNEL_STATE_CONNECTION_FAILED);
return LF_ERR;
}
// bind the socket to that address
int ret = bind(self->fd, (struct sockaddr *)&serv_addr, sizeof(serv_addr));
if (ret < 0) {
LF_ERR(NET, "Could not bind to %s:%d", self->host, self->port);
_update_state(self, NETWORK_CHANNEL_STATE_CONNECTION_FAILED);
return LF_ERR;
}

// start listening
if (listen(self->fd, 1) < 0) {
LF_ERR(NET, "Could not listen to %s:%d", self->host, self->port);
_update_state(self, NETWORK_CHANNEL_STATE_CONNECTION_FAILED);
return LF_ERR;
}
// start listening
if (listen(self->fd, 1) < 0) {
LF_ERR(NET, "Could not listen to %s:%d", self->host, self->port);
_update_state(self, NETWORK_CHANNEL_STATE_CONNECTION_FAILED);
return LF_ERR;
}

return LF_OK;
Expand All @@ -159,9 +133,8 @@ static lf_ret_t _try_connect_server(NetworkChannel *untyped_self) {
if (new_socket >= 0) {
self->client = new_socket;
FD_SET(new_socket, &self->set);
_socket_set_blocking(new_socket, true);
validate(self->receive_thread == 0);
_spawn_receive_thread(self);
_spawn_worker_thread(self);
_update_state(self, NETWORK_CHANNEL_STATE_CONNECTED);
return LF_OK;
} else {
Expand Down Expand Up @@ -216,7 +189,7 @@ static lf_ret_t _try_connect_client(NetworkChannel *untyped_self) {
TcpIpChannel *self = (TcpIpChannel *)untyped_self;
lf_ret_t lf_ret;

if (self->state == NETWORK_CHANNEL_STATE_OPEN) {
if (_get_state(self) == NETWORK_CHANNEL_STATE_OPEN) {
// First time trying to connect
struct sockaddr_in serv_addr;

Expand All @@ -243,7 +216,7 @@ static lf_ret_t _try_connect_client(NetworkChannel *untyped_self) {
return LF_ERR;
}
}
} else if (self->state == NETWORK_CHANNEL_STATE_CONNECTION_IN_PROGRESS) {
} else if (_get_state(self) == NETWORK_CHANNEL_STATE_CONNECTION_IN_PROGRESS) {
// Connection is in progress
lf_ret = _check_if_socket_is_writable(self->fd);
if (lf_ret == LF_OK) {
Expand All @@ -252,8 +225,7 @@ static lf_ret_t _try_connect_client(NetworkChannel *untyped_self) {
if (lf_ret == LF_OK) {
LF_DEBUG(NET, "Connection succeeded");
_update_state(self, NETWORK_CHANNEL_STATE_CONNECTED);
_socket_set_blocking(self->fd, true);
_spawn_receive_thread(self);
_spawn_worker_thread(self);
return LF_OK;
} else {
_update_state(self, NETWORK_CHANNEL_STATE_CONNECTION_FAILED);
Expand All @@ -268,20 +240,19 @@ static lf_ret_t _try_connect_client(NetworkChannel *untyped_self) {
return LF_ERR;
}
} else {
LF_ERR(NET, "try_connect_client called in invalid state %d", self->state);
LF_ERR(NET, "try_connect_client called in invalid state %d", _get_state(self));
return LF_ERR;
}

return LF_ERR; // Should never reach here
}

static lf_ret_t TcpIpChannel_try_connect(NetworkChannel *untyped_self) {
static lf_ret_t TcpIpChannel_open_connection(NetworkChannel *untyped_self) {
TcpIpChannel *self = (TcpIpChannel *)untyped_self;
if (self->server) {
return _try_connect_server(untyped_self);
} else {
return _try_connect_client(untyped_self);
}

_update_state(self, NETWORK_CHANNEL_STATE_OPEN);

return LF_OK;
}

static lf_ret_t TcpIpChannel_send_blocking(NetworkChannel *untyped_self, const FederateMessage *message) {
Expand Down Expand Up @@ -421,21 +392,55 @@ static void TcpIpChannel_close_connection(NetworkChannel *untyped_self) {
}
}

static void *_receive_thread(void *untyped_self) {
/**
* @brief Main loop of the TcpIpChannel.
*/
static void *_worker_thread(void *untyped_self) {
LF_INFO(NET, "Starting TCP/IP receive thread");
TcpIpChannel *self = untyped_self;
lf_ret_t ret;

// set terminate to false so the loop runs
self->terminate = false;

if (self->server) {
_server_bind(self);
}

while (!self->terminate) {
ret = _receive(untyped_self, &self->output);
if (ret == LF_OK) {
validate(self->receive_callback);
self->receive_callback(self->federated_connection, &self->output);
} else {
LF_ERR(NET, "Error receiving message %d", ret);
switch (_get_state(self)) {
case NETWORK_CHANNEL_STATE_OPEN: {
/* try to connect */
if (self->server) {
_try_connect_server(untyped_self);
} else {
_try_connect_client(untyped_self);
}
} break;

case NETWORK_CHANNEL_STATE_CONNECTION_IN_PROGRESS: {
_env->platform->wait_for(_env->platform, self->super.expected_try_connect_duration);
} break;

case NETWORK_CHANNEL_STATE_LOST_CONNECTION:
case NETWORK_CHANNEL_STATE_CONNECTION_FAILED: {
_env->platform->wait_for(_env->platform, self->super.expected_try_connect_duration);
_reset_socket(self);
} break;

case NETWORK_CHANNEL_STATE_CONNECTED: {
ret = _receive(untyped_self, &self->output);
if (ret == LF_OK) {
validate(self->receive_callback);
self->receive_callback(self->federated_connection, &self->output);
} else {
LF_ERR(NET, "Error receiving message %d", ret);
}
} break;

case NETWORK_CHANNEL_STATE_UNINITIALIZED:
case NETWORK_CHANNEL_STATE_CLOSED:
break;
}
}

Expand Down Expand Up @@ -481,7 +486,7 @@ static void TcpIpChannel_free(NetworkChannel *untyped_self) {

static NetworkChannelState TcpIpChannel_get_connection_state(NetworkChannel *untyped_self) {
TcpIpChannel *self = (TcpIpChannel *)untyped_self;
return self->state;
return _get_state(self);
}

void TcpIpChannel_ctor(TcpIpChannel *self, Environment *env, const char *host, unsigned short port, int protocol_family,
Expand Down Expand Up @@ -510,7 +515,6 @@ void TcpIpChannel_ctor(TcpIpChannel *self, Environment *env, const char *host, u

self->super.get_connection_state = TcpIpChannel_get_connection_state;
self->super.open_connection = TcpIpChannel_open_connection;
self->super.try_connect = TcpIpChannel_try_connect;
self->super.close_connection = TcpIpChannel_close_connection;
self->super.send_blocking = TcpIpChannel_send_blocking;
self->super.register_receive_callback = TcpIpChannel_register_receive_callback;
Expand All @@ -522,4 +526,4 @@ void TcpIpChannel_ctor(TcpIpChannel *self, Environment *env, const char *host, u
self->receive_thread = 0;

_reset_socket(self);
}
}

0 comments on commit 878a945

Please sign in to comment.