diff --git a/src/platform/posix/tcp_ip_channel.c b/src/platform/posix/tcp_ip_channel.c index c1046472..d85cf415 100644 --- a/src/platform/posix/tcp_ip_channel.c +++ b/src/platform/posix/tcp_ip_channel.c @@ -25,9 +25,9 @@ static bool _is_globals_initialized = false; static Environment *_env; // Forward declarations -static void _spawn_receive_thread(TcpIpChannel *self); +static void _spawn_worker_thread(TcpIpChannel *self); static lf_ret_t _reset_socket(TcpIpChannel *self); -static void *_receive_thread(void *untyped_self); +static void *_worker_thread(void *untyped_self); static void _update_state(TcpIpChannel *self, NetworkChannelState new_state) { // Update the state of the channel itself @@ -39,23 +39,6 @@ static void _update_state(TcpIpChannel *self, NetworkChannelState new_state) { static NetworkChannelState _get_state(TcpIpChannel *self) { return self->state; } -static void _socket_set_blocking(int fd, bool blocking) { - // Set socket to blocking - int opts = fcntl(fd, F_GETFL); - if (opts < 0) { - throw("Could not get socket options"); - } - if (blocking) { - opts = (opts & (~O_NONBLOCK)); - } else { - opts = (opts | O_NONBLOCK); - } - - if (fcntl(fd, F_SETFL, opts) < 0) { - throw("Could not set socket blocking state"); - } -} - static lf_ret_t _reset_socket(TcpIpChannel *self) { FD_ZERO(&self->set); if (self->fd > 0) { @@ -75,13 +58,12 @@ static lf_ret_t _reset_socket(TcpIpChannel *self) { return LF_ERR; } - _socket_set_blocking(self->fd, false); _update_state(self, NETWORK_CHANNEL_STATE_OPEN); return LF_OK; } -static void _spawn_receive_thread(TcpIpChannel *self) { +static void _spawn_worker_thread(TcpIpChannel *self) { int res; LF_INFO(NET, "TCP/IP spawning callback thread"); @@ -105,44 +87,36 @@ static void _spawn_receive_thread(TcpIpChannel *self) { throw("pthread_attr_setstack failed"); } #endif - res = pthread_create(&self->receive_thread, &self->receive_thread_attr, _receive_thread, self); + res = pthread_create(&self->receive_thread, &self->receive_thread_attr, _worker_thread, self); if (res < 0) { throw("pthread_create failed"); } } -/** - * @brief If is server: Bind and Listen for connections - * If is client: Do nothing - */ -static lf_ret_t TcpIpChannel_open_connection(NetworkChannel *untyped_self) { - TcpIpChannel *self = (TcpIpChannel *)untyped_self; +static lf_ret_t _server_bind(TcpIpChannel *self) { + struct sockaddr_in serv_addr; + serv_addr.sin_family = self->protocol_family; + serv_addr.sin_port = htons(self->port); - if (self->server) { - struct sockaddr_in serv_addr; - serv_addr.sin_family = self->protocol_family; - serv_addr.sin_port = htons(self->port); - - // turn human-readable address into something the os can work with - if (inet_pton(self->protocol_family, self->host, &serv_addr.sin_addr) <= 0) { - LF_ERR(NET, "Invalid address %s", self->host); - return LF_INVALID_VALUE; - } + // turn human-readable address into something the os can work with + if (inet_pton(self->protocol_family, self->host, &serv_addr.sin_addr) <= 0) { + LF_ERR(NET, "Invalid address %s", self->host); + return LF_INVALID_VALUE; + } - // bind the socket to that address - int ret = bind(self->fd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)); - if (ret < 0) { - LF_ERR(NET, "Could not bind to %s:%d", self->host, self->port); - _update_state(self, NETWORK_CHANNEL_STATE_CONNECTION_FAILED); - return LF_ERR; - } + // bind the socket to that address + int ret = bind(self->fd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)); + if (ret < 0) { + LF_ERR(NET, "Could not bind to %s:%d", self->host, self->port); + _update_state(self, NETWORK_CHANNEL_STATE_CONNECTION_FAILED); + return LF_ERR; + } - // start listening - if (listen(self->fd, 1) < 0) { - LF_ERR(NET, "Could not listen to %s:%d", self->host, self->port); - _update_state(self, NETWORK_CHANNEL_STATE_CONNECTION_FAILED); - return LF_ERR; - } + // start listening + if (listen(self->fd, 1) < 0) { + LF_ERR(NET, "Could not listen to %s:%d", self->host, self->port); + _update_state(self, NETWORK_CHANNEL_STATE_CONNECTION_FAILED); + return LF_ERR; } return LF_OK; @@ -159,9 +133,8 @@ static lf_ret_t _try_connect_server(NetworkChannel *untyped_self) { if (new_socket >= 0) { self->client = new_socket; FD_SET(new_socket, &self->set); - _socket_set_blocking(new_socket, true); validate(self->receive_thread == 0); - _spawn_receive_thread(self); + _spawn_worker_thread(self); _update_state(self, NETWORK_CHANNEL_STATE_CONNECTED); return LF_OK; } else { @@ -216,7 +189,7 @@ static lf_ret_t _try_connect_client(NetworkChannel *untyped_self) { TcpIpChannel *self = (TcpIpChannel *)untyped_self; lf_ret_t lf_ret; - if (self->state == NETWORK_CHANNEL_STATE_OPEN) { + if (_get_state(self) == NETWORK_CHANNEL_STATE_OPEN) { // First time trying to connect struct sockaddr_in serv_addr; @@ -243,7 +216,7 @@ static lf_ret_t _try_connect_client(NetworkChannel *untyped_self) { return LF_ERR; } } - } else if (self->state == NETWORK_CHANNEL_STATE_CONNECTION_IN_PROGRESS) { + } else if (_get_state(self) == NETWORK_CHANNEL_STATE_CONNECTION_IN_PROGRESS) { // Connection is in progress lf_ret = _check_if_socket_is_writable(self->fd); if (lf_ret == LF_OK) { @@ -252,8 +225,7 @@ static lf_ret_t _try_connect_client(NetworkChannel *untyped_self) { if (lf_ret == LF_OK) { LF_DEBUG(NET, "Connection succeeded"); _update_state(self, NETWORK_CHANNEL_STATE_CONNECTED); - _socket_set_blocking(self->fd, true); - _spawn_receive_thread(self); + _spawn_worker_thread(self); return LF_OK; } else { _update_state(self, NETWORK_CHANNEL_STATE_CONNECTION_FAILED); @@ -268,20 +240,19 @@ static lf_ret_t _try_connect_client(NetworkChannel *untyped_self) { return LF_ERR; } } else { - LF_ERR(NET, "try_connect_client called in invalid state %d", self->state); + LF_ERR(NET, "try_connect_client called in invalid state %d", _get_state(self)); return LF_ERR; } return LF_ERR; // Should never reach here } -static lf_ret_t TcpIpChannel_try_connect(NetworkChannel *untyped_self) { +static lf_ret_t TcpIpChannel_open_connection(NetworkChannel *untyped_self) { TcpIpChannel *self = (TcpIpChannel *)untyped_self; - if (self->server) { - return _try_connect_server(untyped_self); - } else { - return _try_connect_client(untyped_self); - } + + _update_state(self, NETWORK_CHANNEL_STATE_OPEN); + + return LF_OK; } static lf_ret_t TcpIpChannel_send_blocking(NetworkChannel *untyped_self, const FederateMessage *message) { @@ -421,7 +392,10 @@ static void TcpIpChannel_close_connection(NetworkChannel *untyped_self) { } } -static void *_receive_thread(void *untyped_self) { +/** + * @brief Main loop of the TcpIpChannel. + */ +static void *_worker_thread(void *untyped_self) { LF_INFO(NET, "Starting TCP/IP receive thread"); TcpIpChannel *self = untyped_self; lf_ret_t ret; @@ -429,13 +403,44 @@ static void *_receive_thread(void *untyped_self) { // set terminate to false so the loop runs self->terminate = false; + if (self->server) { + _server_bind(self); + } + while (!self->terminate) { - ret = _receive(untyped_self, &self->output); - if (ret == LF_OK) { - validate(self->receive_callback); - self->receive_callback(self->federated_connection, &self->output); - } else { - LF_ERR(NET, "Error receiving message %d", ret); + switch (_get_state(self)) { + case NETWORK_CHANNEL_STATE_OPEN: { + /* try to connect */ + if (self->server) { + _try_connect_server(untyped_self); + } else { + _try_connect_client(untyped_self); + } + } break; + + case NETWORK_CHANNEL_STATE_CONNECTION_IN_PROGRESS: { + _env->platform->wait_for(_env->platform, self->super.expected_try_connect_duration); + } break; + + case NETWORK_CHANNEL_STATE_LOST_CONNECTION: + case NETWORK_CHANNEL_STATE_CONNECTION_FAILED: { + _env->platform->wait_for(_env->platform, self->super.expected_try_connect_duration); + _reset_socket(self); + } break; + + case NETWORK_CHANNEL_STATE_CONNECTED: { + ret = _receive(untyped_self, &self->output); + if (ret == LF_OK) { + validate(self->receive_callback); + self->receive_callback(self->federated_connection, &self->output); + } else { + LF_ERR(NET, "Error receiving message %d", ret); + } + } break; + + case NETWORK_CHANNEL_STATE_UNINITIALIZED: + case NETWORK_CHANNEL_STATE_CLOSED: + break; } } @@ -481,7 +486,7 @@ static void TcpIpChannel_free(NetworkChannel *untyped_self) { static NetworkChannelState TcpIpChannel_get_connection_state(NetworkChannel *untyped_self) { TcpIpChannel *self = (TcpIpChannel *)untyped_self; - return self->state; + return _get_state(self); } void TcpIpChannel_ctor(TcpIpChannel *self, Environment *env, const char *host, unsigned short port, int protocol_family, @@ -510,7 +515,6 @@ void TcpIpChannel_ctor(TcpIpChannel *self, Environment *env, const char *host, u self->super.get_connection_state = TcpIpChannel_get_connection_state; self->super.open_connection = TcpIpChannel_open_connection; - self->super.try_connect = TcpIpChannel_try_connect; self->super.close_connection = TcpIpChannel_close_connection; self->super.send_blocking = TcpIpChannel_send_blocking; self->super.register_receive_callback = TcpIpChannel_register_receive_callback; @@ -522,4 +526,4 @@ void TcpIpChannel_ctor(TcpIpChannel *self, Environment *env, const char *host, u self->receive_thread = 0; _reset_socket(self); -} \ No newline at end of file +}