From 0d36e36933885ebb81f517ea255a72bb840481df Mon Sep 17 00:00:00 2001 From: Brad Campbell Date: Tue, 24 Jun 2014 13:33:11 -0400 Subject: [PATCH] removed c based udp receiver Just too much hassle to build and run. Easier to just use python. --- receiver/Tupfile | 9 -- receiver/Tupfile.ini | 0 receiver/config_header.py | 80 ---------------- receiver/receiver.h | 19 ---- receiver/receiver_udp.c | 190 -------------------------------------- receiver/receiver_udp.h | 17 ---- receiver/utils.c | 53 ----------- receiver/utils.h | 6 -- 8 files changed, 374 deletions(-) delete mode 100644 receiver/Tupfile delete mode 100644 receiver/Tupfile.ini delete mode 100755 receiver/config_header.py delete mode 100644 receiver/receiver.h delete mode 100644 receiver/receiver_udp.c delete mode 100644 receiver/receiver_udp.h delete mode 100644 receiver/utils.c delete mode 100644 receiver/utils.h diff --git a/receiver/Tupfile b/receiver/Tupfile deleted file mode 100644 index fd04141..0000000 --- a/receiver/Tupfile +++ /dev/null @@ -1,9 +0,0 @@ -!cc = |> gcc -Wall -Werror -O2 -c %f -o %o |> - -: |> ln -s ../config/gatd.config %o |> gatd.config {lns} -: gatd.config | {lns} |> ./config_header.py -i %f -o %o |> gatd_config.h {headers} - -: foreach *.c | {headers} |> !cc |> %B.o {objs} -: {objs} |> gcc %f -lrabbitmq -o %o |> receiver-udp - -.gitignore diff --git a/receiver/Tupfile.ini b/receiver/Tupfile.ini deleted file mode 100644 index e69de29..0000000 diff --git a/receiver/config_header.py b/receiver/config_header.py deleted file mode 100755 index e563878..0000000 --- a/receiver/config_header.py +++ /dev/null @@ -1,80 +0,0 @@ -#!/usr/bin/env python2 -# -# This module converts a text config file to python classes and attributes. -# -# so -# [mongo] -# host: inductor.eecs.umich.edu -# -# becomes -# gatdConfig.mongo.host -# - -import argparse -import ConfigParser -import os -import sys - - -HEADER_FILE = """ -#ifndef __GATD_CONFIG_H__ -#define __GATD_CONFIG_H__ - -// THIS FILE AUTOGENERATED BY config_header.py -// DO NOT EDIT - -""" - -argparser = argparse.ArgumentParser() -argparser.add_argument('-i', '--input', help='Input file name', - action='store', required=True) -argparser.add_argument('-o', '--output', help='Output file name', - action='store', required=True) -args = argparser.parse_args() - - -if not os.path.exists(args.input): - print('Input config file not found.') - sys.exit(1) - -config = ConfigParser.ConfigParser() -config.read(args.input) - -for section in config.sections(): - attrs = dict(config.items(section)) - - # Convert integers to integers - for attr in attrs: - try: - attrs[attr] = int(attrs[attr]) - except ValueError: - pass - - for k,v in attrs.items(): - var_type = '' - var_value = '' - var_name = '{}_{}'.format(section, k) - - if section == 'rabbitmq' and (k[0:2] == 'q_' or k[0:4] == 'xch_'): - # These need super special formatting - var_type = 'amqp_bytes_t' - var_value = '{{{}, "{}"}}'.format(len(v), v) - else: - if type(v) == str: - var_value = '"{}"'.format(v) - var_type = 'char' - var_name += '[]' - elif type(v) == int: - var_value = '{}'.format(v) - var_type = 'int' - - line = 'const {vtype} {name} = {value};\n'.format(vtype=var_type, - name=var_name.upper(), value=var_value); - - HEADER_FILE += line - -HEADER_FILE += '#endif' - -with open(args.output, 'w') as f: - f.write(HEADER_FILE) - diff --git a/receiver/receiver.h b/receiver/receiver.h deleted file mode 100644 index 724b1b0..0000000 --- a/receiver/receiver.h +++ /dev/null @@ -1,19 +0,0 @@ -#ifndef __RECEIVER_H__ -#define __RECEIVER_H__ - -#include -#include - -#define MAX_INCOMING_LENGTH 4096 - -struct message_item_t { - uint8_t type; // 8 bits to identify the structure of this packet - // in case it changes - struct in6_addr addr; // IPv6 address of the sender - uint16_t port; // Sender port - uint64_t time; // Time packet was received - uint8_t data[MAX_INCOMING_LENGTH]; -} __attribute__((__packed__)); -typedef struct message_item_t message_item_t; - -#endif diff --git a/receiver/receiver_udp.c b/receiver/receiver_udp.c deleted file mode 100644 index b09a224..0000000 --- a/receiver/receiver_udp.c +++ /dev/null @@ -1,190 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include - -#include "gatd_config.h" -#include "utils.h" -#include "receiver.h" -#include "receiver_udp.h" - -// Global constants -const amqp_channel_t amqp_channel = 1; -const amqp_bytes_t amqp_exchange_type = {6, "fanout"}; -const amqp_bytes_t amqp_routing_key = {0, "\0"}; -const amqp_table_t amqp_table_null = {0, NULL}; -const amqp_boolean_t amqp_false = 0; -const amqp_boolean_t amqp_true = 1; - -const int MESSAGE_ITEM_BASE_SIZE = sizeof(message_item_t) - MAX_INCOMING_LENGTH; - - -amqp_connection_state_t amqp_connect () { - amqp_socket_t* amqp_socket = NULL; - amqp_connection_state_t amqp_conn; - amqp_rpc_reply_t amqp_ret; - int result; - - amqp_conn = amqp_new_connection(); - - amqp_socket = amqp_tcp_socket_new(amqp_conn); - if (amqp_socket == 0) { - fprintf(stderr, "Could not create amqp socket.\n"); - exit(1); - } - - result = amqp_socket_open(amqp_socket, RABBITMQ_HOST, RABBITMQ_PORT); - if (result) { - fprintf(stderr, "Could not open amqp socket.\n"); - exit(1); - } - - amqp_ret = amqp_login(amqp_conn, "/", 0, - //AMQP_DEFAULT_FRAME_SIZE, - 131072, - 0, - AMQP_SASL_METHOD_PLAIN, RABBITMQ_USERNAME, - RABBITMQ_PASSWORD); - die_on_amqp_error(amqp_ret, "Logging in"); - amqp_channel_open(amqp_conn, amqp_channel); - amqp_ret = amqp_get_rpc_reply(amqp_conn); - die_on_amqp_error(amqp_ret, "Opening channel"); - - // Make sure the receiver exchange exists - amqp_exchange_declare(amqp_conn, amqp_channel, RABBITMQ_XCH_RECEIVE, - amqp_exchange_type, amqp_false, amqp_true, - amqp_table_null); - - // Make sure there is a queue bound to the exchange to receive the packets - amqp_queue_declare(amqp_conn, amqp_channel, RABBITMQ_Q_RECEIVE, amqp_false, - amqp_true, amqp_false, amqp_false, amqp_table_null); - - // Bind the receive queue to the receive exchange - amqp_queue_bind(amqp_conn, amqp_channel, RABBITMQ_Q_RECEIVE, - RABBITMQ_XCH_RECEIVE, amqp_routing_key, amqp_table_null); - - return amqp_conn; -} - -// Return the current time in milliseconds from the unix epoch -uint64_t get_time () { - struct timeval tv; - uint64_t time; - - if (gettimeofday(&tv, NULL) == 0) { - time = ((uint64_t) tv.tv_sec) * 1000; - time += ((uint64_t) tv.tv_usec) / 1000; - } else { - fprintf(stderr, "Could not get time properly.\n"); - exit(1); - } - - return time; -} - -uint32_t min (uint32_t a, uint32_t b) { - if (a < b) return a; - return b; -} - -int main (int argc, char** argv) { - int socket_world; - struct sockaddr_in6 recv_addr; - struct in6_addr recv_ipv6_addr = IN6ADDR_ANY_INIT; - - amqp_connection_state_t amqp_conn; - amqp_bytes_t amqp_message; - - struct message_item_t message; - - int ret; - - // Set process name - if (0 != prctl(PR_SET_NAME, "gatd-r: udp", 0, 0, 0)) { - perror("Failed to set process name"); - } - - // Setup - message.type = PKT_TYPE_UDP; - - // Connect to RabbitMQ - amqp_conn = amqp_connect(); - - // Create an IPv6 ready socket - socket_world = socket(AF_INET6, SOCK_DGRAM, 0); - if (socket_world < 0) { - fprintf(stderr, "Unable to create UDP socket\n"); - return 1; - } - - // Bind the socket to the correct port on all interfaces - recv_addr.sin6_family = AF_INET6; - recv_addr.sin6_addr = recv_ipv6_addr; - recv_addr.sin6_port = htons(RECEIVER_PORT_UDP); - - ret = bind(socket_world, (struct sockaddr*) &recv_addr, sizeof(recv_addr)); - - if (ret < 0) { - // error on binding - fprintf(stderr, "Unable to bind UDP socket\n"); - return 1; - } - - printf("Beginning loop to receive UDP packets.\n"); - - // Receive - while (1) { - uint8_t buffer[MAX_INCOMING_LENGTH]; - struct sockaddr_in6 sender_addr; - uint32_t sender_addr_len; - int32_t packet_len; - int32_t stored_len; - - packet_len = recvfrom(socket_world, - buffer, - MAX_INCOMING_LENGTH, - 0, - (struct sockaddr*) &sender_addr, - &sender_addr_len); - - if (packet_len > -1) { - message.time = bswap_64(get_time()); - message.port = sender_addr.sin6_port; - stored_len = min(packet_len, MAX_INCOMING_LENGTH); - memcpy(&message.addr, &sender_addr.sin6_addr, - sizeof(struct in6_addr)); - memcpy(&message.data, buffer, stored_len); - - amqp_message.len = MESSAGE_ITEM_BASE_SIZE + stored_len; - amqp_message.bytes = &message; - - // Publish - ret = amqp_basic_publish(amqp_conn, - amqp_channel, - RABBITMQ_XCH_RECEIVE, - amqp_routing_key, - 0, - 0, - NULL, - amqp_message); - } - - } - - close(socket_world); - - amqp_channel_close(amqp_conn, 1, AMQP_REPLY_SUCCESS); - amqp_connection_close(amqp_conn, AMQP_REPLY_SUCCESS); - amqp_destroy_connection(amqp_conn); - - return 0; - -} diff --git a/receiver/receiver_udp.h b/receiver/receiver_udp.h deleted file mode 100644 index a9b2661..0000000 --- a/receiver/receiver_udp.h +++ /dev/null @@ -1,17 +0,0 @@ -#ifndef __RECEIVER_UDP_H__ -#define __RECEIVER_UDP_H__ - -#include - -#include -#include -#include "utils.h" - -extern const int MESSAGE_ITEM_BASE_SIZE; - - -amqp_connection_state_t amqp_connect (); -uint64_t get_time (); -uint32_t min (uint32_t a, uint32_t b); - -#endif diff --git a/receiver/utils.c b/receiver/utils.c deleted file mode 100644 index d823f41..0000000 --- a/receiver/utils.c +++ /dev/null @@ -1,53 +0,0 @@ -#include -#include -#include -#include - -#include -#include -#include - -#include "utils.h" - -void die_on_amqp_error(amqp_rpc_reply_t x, const char* context) { - switch (x.reply_type) { - case AMQP_RESPONSE_NORMAL: - return; - - case AMQP_RESPONSE_NONE: - fprintf(stderr, "%s: missing RPC reply type!\n", context); - break; - - case AMQP_RESPONSE_LIBRARY_EXCEPTION: - fprintf(stderr, "%s: %s\n", context, - amqp_error_string2(x.library_error)); - break; - - case AMQP_RESPONSE_SERVER_EXCEPTION: - switch (x.reply.id) { - case AMQP_CONNECTION_CLOSE_METHOD: { - amqp_connection_close_t* m = (amqp_connection_close_t*) x.reply.decoded; - fprintf(stderr, "%s: server connection error %d, message: %.*s\n", - context, - m->reply_code, - (int) m->reply_text.len, (char*) m->reply_text.bytes); - break; - } - case AMQP_CHANNEL_CLOSE_METHOD: { - amqp_channel_close_t* m = (amqp_channel_close_t*) x.reply.decoded; - fprintf(stderr, "%s: server channel error %d, message: %.*s\n", - context, - m->reply_code, - (int) m->reply_text.len, (char*) m->reply_text.bytes); - break; - } - default: - fprintf(stderr, "%s: unknown server error, method id 0x%08X\n", - context, x.reply.id); - break; - } - break; - } - - exit(1); -} diff --git a/receiver/utils.h b/receiver/utils.h deleted file mode 100644 index 3f2163f..0000000 --- a/receiver/utils.h +++ /dev/null @@ -1,6 +0,0 @@ -#ifndef __UTILS_H__ -#define __UTILS_H__ - -extern void die_on_amqp_error(amqp_rpc_reply_t x, char const *context); - -#endif