Skip to content

Commit

Permalink
Implement naming convention
Browse files Browse the repository at this point in the history
  • Loading branch information
LasseRosenow committed Dec 4, 2024
1 parent 4d3dce6 commit 1bc27d9
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 71 deletions.
82 changes: 41 additions & 41 deletions src/platform/posix/tcp_ip_channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,19 @@ static bool _is_globals_initialized = false;
static Environment *_env;

// Forward declarations
static void _spawn_receive_thread(TcpIpChannel *self);
static lf_ret_t _reset_socket(TcpIpChannel *self);
static void *_receive_thread(void *untyped_self);
static void _TcpIpChannel_spawn_receive_thread(TcpIpChannel *self);
static lf_ret_t _TcpIpChannel_reset_socket(TcpIpChannel *self);
static void *_TcpIpChannel_receive_thread(void *untyped_self);

static void _update_state(TcpIpChannel *self, NetworkChannelState state) {
static void _TcpIpChannel_update_state(TcpIpChannel *self, NetworkChannelState state) {
// Update the state of the channel itself
self->state = state;

// Inform runtime about new state
_env->platform->new_async_event(_env->platform);
}

static void _socket_set_blocking(int fd, bool blocking) {
static void _TcpIpChannel_socket_set_blocking(int fd, bool blocking) {
// Set socket to blocking
int opts = fcntl(fd, F_GETFL);
if (opts < 0) {
Expand All @@ -54,7 +54,7 @@ static void _socket_set_blocking(int fd, bool blocking) {
}
}

static lf_ret_t _reset_socket(TcpIpChannel *self) {
static lf_ret_t _TcpIpChannel_reset_socket(TcpIpChannel *self) {
FD_ZERO(&self->set);
if (self->fd > 0) {
if (close(self->fd) < 0) {
Expand All @@ -73,13 +73,13 @@ static lf_ret_t _reset_socket(TcpIpChannel *self) {
return LF_ERR;
}

_socket_set_blocking(self->fd, false);
_update_state(self, NETWORK_CHANNEL_STATE_OPEN);
_TcpIpChannel_socket_set_blocking(self->fd, false);
_TcpIpChannel_update_state(self, NETWORK_CHANNEL_STATE_OPEN);

return LF_OK;
}

static void _spawn_receive_thread(TcpIpChannel *self) {
static void _TcpIpChannel_spawn_receive_thread(TcpIpChannel *self) {
int res;
LF_INFO(NET, "TCP/IP spawning callback thread");

Expand All @@ -103,7 +103,7 @@ static void _spawn_receive_thread(TcpIpChannel *self) {
throw("pthread_attr_setstack failed");
}
#endif
res = pthread_create(&self->receive_thread, &self->receive_thread_attr, _receive_thread, self);
res = pthread_create(&self->receive_thread, &self->receive_thread_attr, _TcpIpChannel_receive_thread, self);
if (res < 0) {
throw("pthread_create failed");
}
Expand Down Expand Up @@ -131,22 +131,22 @@ static lf_ret_t TcpIpChannel_open_connection(NetworkChannel *untyped_self) {
int ret = bind(self->fd, (struct sockaddr *)&serv_addr, sizeof(serv_addr));
if (ret < 0) {
LF_ERR(NET, "Could not bind to %s:%d", self->host, self->port);
_update_state(self, NETWORK_CHANNEL_STATE_CONNECTION_FAILED);
_TcpIpChannel_update_state(self, NETWORK_CHANNEL_STATE_CONNECTION_FAILED);
return LF_ERR;
}

// start listening
if (listen(self->fd, 1) < 0) {
LF_ERR(NET, "Could not listen to %s:%d", self->host, self->port);
_update_state(self, NETWORK_CHANNEL_STATE_CONNECTION_FAILED);
_TcpIpChannel_update_state(self, NETWORK_CHANNEL_STATE_CONNECTION_FAILED);
return LF_ERR;
}
}

return LF_OK;
}

static lf_ret_t _try_connect_server(NetworkChannel *untyped_self) {
static lf_ret_t _TcpIpChannel_try_connect_server(NetworkChannel *untyped_self) {
TcpIpChannel *self = (TcpIpChannel *)untyped_self;

int new_socket;
Expand All @@ -157,10 +157,10 @@ static lf_ret_t _try_connect_server(NetworkChannel *untyped_self) {
if (new_socket >= 0) {
self->client = new_socket;
FD_SET(new_socket, &self->set);
_socket_set_blocking(new_socket, true);
_TcpIpChannel_socket_set_blocking(new_socket, true);
validate(self->receive_thread == 0);
_spawn_receive_thread(self);
_update_state(self, NETWORK_CHANNEL_STATE_CONNECTED);
_TcpIpChannel_spawn_receive_thread(self);
_TcpIpChannel_update_state(self, NETWORK_CHANNEL_STATE_CONNECTED);
return LF_OK;
} else {
if (errno == EWOULDBLOCK || errno == EAGAIN) {
Expand All @@ -174,7 +174,7 @@ static lf_ret_t _try_connect_server(NetworkChannel *untyped_self) {
}
}

static lf_ret_t _check_if_socket_is_writable(int fd) {
static lf_ret_t _TcpIpChannel_check_if_socket_is_writable(int fd) {
fd_set set;
FD_ZERO(&set);
FD_SET(fd, &set);
Expand All @@ -197,7 +197,7 @@ static lf_ret_t _check_if_socket_is_writable(int fd) {
return LF_ERR;
}

static lf_ret_t _check_socket_error(int fd) {
static lf_ret_t _TcpIpChannel_check_socket_error(int fd) {
int so_error;
socklen_t len = sizeof(so_error);
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &so_error, &len) < 0) {
Expand All @@ -210,7 +210,7 @@ static lf_ret_t _check_socket_error(int fd) {
}
}

static lf_ret_t _try_connect_client(NetworkChannel *untyped_self) {
static lf_ret_t _TcpIpChannel_try_connect_client(NetworkChannel *untyped_self) {
TcpIpChannel *self = (TcpIpChannel *)untyped_self;
lf_ret_t lf_ret;

Expand All @@ -227,41 +227,41 @@ static lf_ret_t _try_connect_client(NetworkChannel *untyped_self) {

int ret = connect(self->fd, (struct sockaddr *)&serv_addr, sizeof(serv_addr));
if (ret == 0) {
_update_state(self, NETWORK_CHANNEL_STATE_CONNECTED);
_TcpIpChannel_update_state(self, NETWORK_CHANNEL_STATE_CONNECTED);
return LF_OK;
} else {
if (errno == EINPROGRESS) {
_update_state(self, NETWORK_CHANNEL_STATE_CONNECTION_IN_PROGRESS);
_TcpIpChannel_update_state(self, NETWORK_CHANNEL_STATE_CONNECTION_IN_PROGRESS);
LF_DEBUG(NET, "Connection in progress!");
return LF_OK;
} else {
LF_ERR(NET, "Connect failed errno=%d", errno);
_update_state(self, NETWORK_CHANNEL_STATE_CONNECTION_FAILED);
_reset_socket(self);
_TcpIpChannel_update_state(self, NETWORK_CHANNEL_STATE_CONNECTION_FAILED);
_TcpIpChannel_reset_socket(self);
return LF_ERR;
}
}
} else if (self->state == NETWORK_CHANNEL_STATE_CONNECTION_IN_PROGRESS) {
// Connection is in progress
lf_ret = _check_if_socket_is_writable(self->fd);
lf_ret = _TcpIpChannel_check_if_socket_is_writable(self->fd);
if (lf_ret == LF_OK) {
LF_DEBUG(NET, "Socket is writable");
lf_ret = _check_socket_error(self->fd);
lf_ret = _TcpIpChannel_check_socket_error(self->fd);
if (lf_ret == LF_OK) {
LF_DEBUG(NET, "Connection succeeded");
_update_state(self, NETWORK_CHANNEL_STATE_CONNECTED);
_socket_set_blocking(self->fd, true);
_spawn_receive_thread(self);
_TcpIpChannel_update_state(self, NETWORK_CHANNEL_STATE_CONNECTED);
_TcpIpChannel_socket_set_blocking(self->fd, true);
_TcpIpChannel_spawn_receive_thread(self);
return LF_OK;
} else {
_update_state(self, NETWORK_CHANNEL_STATE_CONNECTION_FAILED);
_TcpIpChannel_update_state(self, NETWORK_CHANNEL_STATE_CONNECTION_FAILED);
LF_ERR(NET, "Connection failed");
_reset_socket(self);
_TcpIpChannel_reset_socket(self);
return LF_ERR;
}
} else {
_update_state(self, NETWORK_CHANNEL_STATE_CONNECTION_FAILED);
_reset_socket(self);
_TcpIpChannel_update_state(self, NETWORK_CHANNEL_STATE_CONNECTION_FAILED);
_TcpIpChannel_reset_socket(self);
LF_ERR(NET, "Select failed errno=%d", errno);
return LF_ERR;
}
Expand All @@ -276,9 +276,9 @@ static lf_ret_t _try_connect_client(NetworkChannel *untyped_self) {
static lf_ret_t TcpIpChannel_try_connect(NetworkChannel *untyped_self) {
TcpIpChannel *self = (TcpIpChannel *)untyped_self;
if (self->server) {
return _try_connect_server(untyped_self);
return _TcpIpChannel_try_connect_server(untyped_self);
} else {
return _try_connect_client(untyped_self);
return _TcpIpChannel_try_connect_client(untyped_self);
}
}

Expand Down Expand Up @@ -317,7 +317,7 @@ static lf_ret_t TcpIpChannel_send_blocking(NetworkChannel *untyped_self, const F
switch (errno) {
case ETIMEDOUT:
case ENOTCONN:
_update_state(self, NETWORK_CHANNEL_STATE_LOST_CONNECTION);
_TcpIpChannel_update_state(self, NETWORK_CHANNEL_STATE_LOST_CONNECTION);
return LF_ERR;
default:
return LF_ERR;
Expand All @@ -337,7 +337,7 @@ static lf_ret_t TcpIpChannel_send_blocking(NetworkChannel *untyped_self, const F
return LF_OK;
}

static lf_ret_t _receive(NetworkChannel *untyped_self, FederateMessage *return_message) {
static lf_ret_t _TcpIpChannel_receive(NetworkChannel *untyped_self, FederateMessage *return_message) {
TcpIpChannel *self = (TcpIpChannel *)untyped_self;
int socket;

Expand Down Expand Up @@ -375,7 +375,7 @@ static lf_ret_t _receive(NetworkChannel *untyped_self, FederateMessage *return_m
case ECONNRESET:
case ENOTCONN:
case ECONNABORTED:
_update_state(self, NETWORK_CHANNEL_STATE_LOST_CONNECTION);
_TcpIpChannel_update_state(self, NETWORK_CHANNEL_STATE_LOST_CONNECTION);
return LF_ERR;
break;
}
Expand All @@ -384,7 +384,7 @@ static lf_ret_t _receive(NetworkChannel *untyped_self, FederateMessage *return_m
// This means the connection was closed.
LF_INFO(NET, "Connection closed");
self->terminate = true;
_update_state(self, NETWORK_CHANNEL_STATE_CLOSED);
_TcpIpChannel_update_state(self, NETWORK_CHANNEL_STATE_CLOSED);
return LF_ERR;
}

Expand Down Expand Up @@ -419,7 +419,7 @@ static void TcpIpChannel_close_connection(NetworkChannel *untyped_self) {
}
}

static void *_receive_thread(void *untyped_self) {
static void *_TcpIpChannel_receive_thread(void *untyped_self) {
LF_INFO(NET, "Starting TCP/IP receive thread");
TcpIpChannel *self = untyped_self;
lf_ret_t ret;
Expand All @@ -428,7 +428,7 @@ static void *_receive_thread(void *untyped_self) {
self->terminate = false;

while (!self->terminate) {
ret = _receive(untyped_self, &self->output);
ret = _TcpIpChannel_receive(untyped_self, &self->output);
if (ret == LF_OK) {
validate(self->receive_callback);
self->receive_callback(self->federated_connection, &self->output);
Expand Down Expand Up @@ -519,5 +519,5 @@ void TcpIpChannel_ctor(TcpIpChannel *self, Environment *env, const char *host, u
self->federated_connection = NULL;
self->receive_thread = 0;

_reset_socket(self);
_TcpIpChannel_reset_socket(self);
}
Loading

0 comments on commit 1bc27d9

Please sign in to comment.