Skip to content

Commit

Permalink
Add a nng_stream example paired with a socket server on win/linux
Browse files Browse the repository at this point in the history
  • Loading branch information
hugolm84 authored and gdamore committed Jan 2, 2024
1 parent ad2d7ea commit b989bed
Show file tree
Hide file tree
Showing 4 changed files with 276 additions and 0 deletions.
27 changes: 27 additions & 0 deletions demo/stream/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#
# Copyright 2020 Hugo Lindström <[email protected]>
#
# This software is supplied under the terms of the MIT License, a
# copy of which should be located in the distribution where this
# file was obtained (LICENSE.txt). A copy of the license may also be
# found online at https://opensource.org/licenses/MIT.

cmake_minimum_required (VERSION 2.8.7)

project(stream)

find_package(nng CONFIG REQUIRED)

add_executable(${PROJECT_NAME})

target_sources(${PROJECT_NAME} PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/stream.c)

if (CMAKE_SYSTEM_NAME MATCHES "Linux")
target_sources(${PROJECT_NAME} PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/platform/posix/server.c)
endif()

if (CMAKE_SYSTEM_NAME MATCHES "Windows")
target_sources(${PROJECT_NAME} PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/platform/windows/server.c)
endif()

target_link_libraries(stream nng::nng)
53 changes: 53 additions & 0 deletions demo/stream/platform/posix/server.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2020 Hugo Lindström <[email protected]>

// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
// file was obtained (LICENSE.txt). A copy of the license may also be
// found online at https://opensource.org/licenses/MIT.

#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <strings.h>
#include <sys/socket.h>
#include <unistd.h>

void
error(const char *msg)
{
perror(msg);
exit(1);
}

int
server(int portno)
{
int sockfd, newsockfd;
socklen_t clilen;
struct sockaddr_in serv_addr, cli_addr;
int n;
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
error("ERROR opening socket");
}
bzero((char *) &serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = INADDR_ANY;
serv_addr.sin_port = htons(portno);
if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) <
0) {
error("ERROR on binding");
}
listen(sockfd, 5);
clilen = sizeof(cli_addr);
newsockfd = accept(sockfd, (struct sockaddr *) &cli_addr, &clilen);
if (newsockfd < 0) {
error("ERROR on accept");
}
n = write(newsockfd, "Hello Client!", 13);
if (n < 0)
error("ERROR writing to socket");
close(newsockfd);
close(sockfd);
return 0;
}
87 changes: 87 additions & 0 deletions demo/stream/platform/windows/server.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2020 Hugo Lindström <[email protected]>

// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
// file was obtained (LICENSE.txt). A copy of the license may also be
// found online at https://opensource.org/licenses/MIT.

#include <stdio.h>
#include <stdlib.h>
#include <winsock2.h>

void
wsa_fatal(const char *func)
{
fprintf(stderr, "%s: %d\n", func, WSAGetLastError());
exit(1);
}

int
server(int portno)
{
WSADATA wsa;
SOCKET s, new_socket;
struct sockaddr_in server, client;
int c;
char * message;

printf("Initialising Winsock...\n");

if (WSAStartup(MAKEWORD(2, 2), &wsa) != 0) {
wsa_fatal("Failed to call WSAStartup");
}

printf("Initialised WSA.\n");

// Create a socket
if ((s = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) {
wsa_fatal("Could not create socket");
}

printf("Socket created.\n");

// Prepare the sockaddr_in structure
server.sin_family = AF_INET;
server.sin_addr.s_addr = INADDR_ANY;
server.sin_port = htons(portno);

// Bind
if (bind(s, (struct sockaddr *) &server, sizeof(server)) ==
SOCKET_ERROR) {
wsa_fatal("Bind failed");
}

printf("Bind done\n");

// Listen to incoming connections
listen(s, 3);

// Accept and incoming connection
printf("Waiting for incoming connections...\n");

c = sizeof(struct sockaddr_in);

while ((new_socket = accept(s, (struct sockaddr *) &client, &c)) !=
INVALID_SOCKET) {
printf("Connection accepted\n");
// Reply to the client
message = "Hello Client!";
if (send(new_socket, message, (int) strlen(message), 0) ==
SOCKET_ERROR) {
wsa_fatal("Failed to send message to client!");
}
}

if (new_socket == INVALID_SOCKET) {
wsa_fatal("accept failed");
}

if (closesocket(s) == SOCKET_ERROR) {
wsa_fatal("Failed to close socket");
}

if (WSACleanup() == SOCKET_ERROR) {
wsa_fatal("Failed to WSACleanup");
}
return 0;
}
109 changes: 109 additions & 0 deletions demo/stream/stream.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2020 Hugo Lindström <[email protected]>

// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
// file was obtained (LICENSE.txt). A copy of the license may also be
// found online at https://opensource.org/licenses/MIT.
//

// This program serves as an example for how to write async communication with
// an arbitrary socket using nng_stream. The server receives a connection and
// sends a hello message to the nng_stream iov.

// To run this program, start the server as stream -s <portnumber>
// Then connect to it with the client as stream -c <url>
//
// For example:
//
// % ./stream -s 5555 &
// % ./stream -c tcp://127.0.0.1:5555

#include <nng/nng.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

void
nng_fatal(const char *func, int rv)
{
fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));
exit(1);
}

int server(int port);
int client(const char *url);

int
main(int argc, char **argv)
{
int rc;

if (argc < 3) {
fprintf(stderr, "Usage: %s [-s port|-c url]\n", argv[0]);
exit(EXIT_FAILURE);
}

if (strcmp(argv[1], "-s") == 0) {
rc = server(atoi(argv[2]));
} else {
rc = client(argv[2]);
}
exit(rc == 0 ? EXIT_SUCCESS : EXIT_FAILURE);
}

int
client(const char *url)
{
nng_stream_dialer *dialer;
nng_aio * aio;
nng_iov iov;
int rv;

// Allocatate dialer and aio assoicated with this connection
if ((rv = nng_stream_dialer_alloc(&dialer, url)) != 0) {
nng_fatal("call to nng_stream_dialer_alloc failed", rv);
}

if ((rv = nng_aio_alloc(&aio, NULL, NULL)) != 0) {
nng_fatal("call to nng_aio_alloc", rv);
}
nng_aio_set_timeout(aio, 5000); // 5 sec

// Allocatate a buffer to recv
iov.iov_len = 100;
iov.iov_buf = (char *) malloc(sizeof(char) * iov.iov_len);
if ((rv = nng_aio_set_iov(aio, 1, &iov)) != 0) {
nng_fatal("call to nng_aio_alloc", rv);
}
// Connect to the socket via url provided to alloc
nng_stream_dialer_dial(dialer, aio);

// Wait for connection
nng_aio_wait(aio);
if ((rv = nng_aio_result(aio)) != 0) {
nng_fatal("waiting for ng_stream_dialer_dial failed", rv);
}

// Get the stream (connection) at position 0
nng_stream *c1 = (nng_stream *) nng_aio_get_output(aio, 0);
nng_stream_recv(c1, aio);
nng_aio_wait(aio);
if ((rv = nng_aio_result(aio)) != 0) {
nng_fatal("waiting for nng_stream_recv failed", rv);
}

size_t recv_count = nng_aio_count(aio);
if (recv_count <= 0) {
nng_fatal("Recv count was 0!", NNG_ECONNABORTED);
} else {
printf("received %zu bytes, message: '%s'\n", recv_count,
(char *) iov.iov_buf);
}

// Send ELCOSE to send/recv associated wit this stream
free(iov.iov_buf);
nng_stream_free(c1);
nng_aio_free(aio);
nng_stream_dialer_free(dialer);
return 0;
}

0 comments on commit b989bed

Please sign in to comment.