diff --git a/src/mbtrn/CMakeLists.txt b/src/mbtrn/CMakeLists.txt index a42ebff23..a2fded8eb 100644 --- a/src/mbtrn/CMakeLists.txt +++ b/src/mbtrn/CMakeLists.txt @@ -185,9 +185,17 @@ target_link_libraries(emallpub PRIVATE TIRPC::TIRPC mbtrnframe m pthread) # #------------------------------------------------------------------------------ # +add_executable(emserpub utils/emserpub.c) +target_include_directories(emserpub PRIVATE ${CMAKE_SOURCE_DIR}/src/mbtrnframe + ${CMAKE_SOURCE_DIR}/src/mbio + ${CMAKE_SOURCE_DIR}/src/mbtrnutils) +target_link_libraries(emserpub PRIVATE TIRPC::TIRPC mbtrnframe m pthread) +# +#------------------------------------------------------------------------------ +# install(TARGETS r7kr mb1r DESTINATION ${CMAKE_INSTALL_LIBDIR}) # -install(TARGETS udps udpc mbtnav_cli stream7k emu7k r7kr_test trnc tbinx mb1conv mb12csv mb1r_test mb1-cli emallpub DESTINATION ${CMAKE_INSTALL_BINDIR}) +install(TARGETS udps udpc mbtnav_cli stream7k emu7k r7kr_test trnc tbinx mb1conv mb12csv mb1r_test mb1-cli emallpub emserpub DESTINATION ${CMAKE_INSTALL_BINDIR}) # #------------------------------------------------------------------------------ # diff --git a/src/mbtrn/utils/emallpub.c b/src/mbtrn/utils/emallpub.c index 5eefd396b..06430d983 100644 --- a/src/mbtrn/utils/emallpub.c +++ b/src/mbtrn/utils/emallpub.c @@ -197,6 +197,8 @@ static void s_parse_args(int argc, char **argv, app_cfg_t *cfg) else if (strcmp("delay", options[option_index].name) == 0) { sscanf(optarg,"%lu",&cfg->delay_ms); } + default: + break; } } @@ -217,6 +219,8 @@ static void s_parse_args(int argc, char **argv, app_cfg_t *cfg) if(help) { s_show_help(); + s_cfg_destroy(&cfg); + exit(0); } } diff --git a/src/mbtrn/utils/emserpub.c b/src/mbtrn/utils/emserpub.c new file mode 100644 index 000000000..c3c13c7ac --- /dev/null +++ b/src/mbtrn/utils/emserpub.c @@ -0,0 +1,703 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "mframe.h" +#include "mlist.h" +#include "mfile.h" +//#include "mlog.h" +#include "mxdebug.h" +#include "mxd_app.h" + +#if defined(__CYGWIN__) +#define WIN_DECLSPEC __declspec(dllimport) +#else +#define WIN_DECLSPEC +#endif + +// default IO buffer size +#define IBUF_BYTES_DFL 4096 + +// XON/XOFF doesn't make sense with binary data +#define EMS_WITH_XONXOFF 0 +#define XON 0x11 +#define XOFF 0x13 + + +// app configuration +typedef struct app_cfg_s { + int verbose; + char *ser_device; + unsigned int ser_baud; + unsigned int ser_delay_us; + int flow; + uint64_t ibuf_sz; + unsigned char *ibuf; + mlist_t *file_paths; +}app_cfg_t; + +// app state +typedef struct app_ctx_s { + FILE *fp; + int fd; + int64_t total_rbytes; + int64_t total_wbytes; + int64_t burst_count; + long fend; + bool tx_flag; + bool quit_flag; + +} app_ctx_t; + +static void s_parse_args(int argc, char **argv, app_cfg_t *cfg); +static void s_show_help(); +static void s_termination_handler (int signum); +static app_cfg_t *app_cfg_new(); +static void app_cfg_destroy(app_cfg_t **pself); +static void app_cfg_show(app_cfg_t *self); +static app_ctx_t *app_ctx_new(); +static void app_ctx_destroy(app_ctx_t **pself); +static int init_ctx(app_ctx_t *ctx, app_cfg_t *cfg); +static void config_serial(int fd, app_cfg_t *cfg); +static bool cts_is_set(int fd); +static bool wait_flow_on(app_ctx_t *ctx, app_cfg_t *cfg); +static bool check_flow_on(app_ctx_t *ctx, app_cfg_t *cfg); +static int write_data(app_ctx_t *ctx, app_cfg_t *cfg); + +// user interrupt flag (SIGINT) +static bool g_interrupt=false; + +// parse command line options +static void s_parse_args(int argc, char **argv, app_cfg_t *cfg) +{ + extern char WIN_DECLSPEC *optarg; + int option_index; + int c; + bool help=false; + bool version=false; + + static struct option options[] = { + {"verbose", required_argument, NULL, 'v'}, + {"help", no_argument, NULL, 'h'}, + {"device", required_argument, NULL, 'd'}, + {"baud", required_argument, NULL, 'b'}, + {"delay", required_argument, NULL, 'D'}, + {"flow", required_argument, NULL, 'f'}, + {"ibuf", required_argument, NULL, 'i'}, + {NULL, 0, NULL, 0}}; + + // process argument list + while ((c = getopt_long(argc, argv, "hd:b:D:f:i:v:", options, &option_index)) != -1){ + switch (c) { + // long options all return c=0 + case 0: + // verbose + if (strcmp("verbose", options[option_index].name) == 0) { + sscanf(optarg,"%d",&cfg->verbose); + } + // help + else if (strcmp("help", options[option_index].name) == 0) { + help = true; + } + // port + else if (strcmp("device", options[option_index].name) == 0) { + free(cfg->ser_device); + cfg->ser_device = strdup(optarg); + } + // baud + else if (strcmp("baud", options[option_index].name) == 0) { + sscanf(optarg,"%u",&cfg->ser_baud); + } + // delay + else if (strcmp("delay", options[option_index].name) == 0) { + sscanf(optarg,"%u",&cfg->ser_delay_us); + } + // flow + else if (strcmp("flow", options[option_index].name) == 0) { + int flow=0; + sscanf(optarg,"%c", (char *)&flow); + if(toupper(flow) == 'N') + cfg->flow = 'N'; + else if(toupper(flow) == 'R') + cfg->flow = 'R'; +#if EMS_WITH_XONXOFF + else if(toupper(flow) == 'X') + cfg->flow = 'X'; +#endif + else + fprintf(stderr, "WARN: flow control (%c) not supported\n", flow); + } + // ibuf + else if (strcmp("ibuf", options[option_index].name) == 0) { + uint64_t x = 0; + if(sscanf(optarg,"%" PRIu64 "", &x) == 1 && x > 0){ + cfg->ibuf_sz = x; + + unsigned char *bp = (unsigned char *)realloc(cfg->ibuf, x); + memset(bp, 0, x); + + cfg->ibuf = bp; + } + + } + case 'v': + sscanf(optarg,"%d",&cfg->verbose); + break; + case 'h': + help = true; + break; + case 'd': + free(cfg->ser_device); + cfg->ser_device = strdup(optarg); + break; + case 'b': + sscanf(optarg,"%u",&cfg->ser_baud); + break; + case 'D': + sscanf(optarg,"%u",&cfg->ser_delay_us); + break; + case 'f': + { + int flow=0; + sscanf(optarg,"%c", (char *)&flow); + if(toupper(flow) == 'N') + cfg->flow = 'N'; + else if(toupper(flow) == 'R') + cfg->flow = 'R'; +#if EMS_WITH_XONXOFF + else if(toupper(flow) == 'X') + cfg->flow = 'X'; +#endif + else + fprintf(stderr, "WARN: flow control (%c) not supported\n", flow); + } + break; + case 'i': + { + uint64_t x = 0; + if(sscanf(optarg,"%" PRIu64 "", &x) == 1 && x > 0){ + cfg->ibuf_sz = x; + + unsigned char *bp = (unsigned char *)realloc(cfg->ibuf, x); + memset(bp, 0, x); + + cfg->ibuf = bp; + } + } + break; + default: + break; + } + } + + for (int i=optind; ifile_paths,strdup(argv[i])); + } + + if(help){ + s_show_help(); + app_cfg_destroy(&cfg); + exit(0); + } + return; +} + +// show help message +static void s_show_help() +{ + char help_message[] = "\n publish em710 UDP capture data to serial port (emulate M3 serial output)\n"; + char usage_message[] = "\n emserpub [options] file [file...]\n" + "\n Options:\n" + " -v, --verbose=n : verbose output level\n" + " -h, --help : show this help message\n" + " -d, --device=s : serial port device\n" + " -b, --baud=u : serial comms rate\n" + " -f, --flow=c : serial flow control (N: none R: RTS/CTS)\n" + " -i. --ibuf=u : inbuf size (bytes)\n" + " -D, --delay=u : interchacter delay (usec)\n" + "\n"; + printf("%s",help_message); + printf("%s",usage_message); +} + +// signal handler +static void s_termination_handler (int signum) +{ + switch (signum) { + case SIGINT: + case SIGHUP: + case SIGTERM: + g_interrupt=true; + break; + default: +// MX_ERROR_MSG("not handled[%d]\n",signum); + break; + } +} + +// allocate app_cfg_t resources +static app_cfg_t *app_cfg_new() +{ + app_cfg_t *new_cfg = (app_cfg_t *)malloc(sizeof(app_cfg_t)); + if ( new_cfg != NULL) { + + new_cfg->verbose = 0; + new_cfg->ser_device = strdup("/dev/ttyUSB0"); + new_cfg->ser_baud = 115200; + new_cfg->ser_delay_us = 0; + new_cfg->flow = 'R'; + new_cfg->ibuf_sz = IBUF_BYTES_DFL; + new_cfg->ibuf = (unsigned char *)malloc(IBUF_BYTES_DFL); + if(new_cfg->ibuf != NULL){ + memset(new_cfg->ibuf, 0, new_cfg->ibuf_sz); + } else { + fprintf(stderr, "ibuf alloc failed %p len %d %d/%s\n", new_cfg->ibuf, IBUF_BYTES_DFL, errno, strerror(errno)); + exit(-1); + } + new_cfg->file_paths = mlist_new(); + mlist_autofree(new_cfg->file_paths, free); + } + return new_cfg; +} + +// release app_cfg_t resources +static void app_cfg_destroy(app_cfg_t **pself) +{ + if(pself != NULL) { + app_cfg_t *self = *pself; + if (self != NULL) { + + mlist_destroy(&self->file_paths); + free(self->ser_device); + free(self->ibuf); + free(self); + } + *pself = NULL; + } +} + +// show app_cfg_t +static void app_cfg_show(app_cfg_t *self) +{ + fprintf(stderr,"\n"); + fprintf(stderr,"device %s\n", self->ser_device); + fprintf(stderr,"baud %u\n", self->ser_baud); + fprintf(stderr,"flow %c\n", self->flow); + fprintf(stderr,"delay_us %u\n", self->ser_delay_us); + fprintf(stderr,"ibuf_sz %" PRIu64 "\n", self->ibuf_sz); + fprintf(stderr,"verbose %d\n", self->verbose); + fprintf(stderr,"files:\n"); + char *path = (char *)mlist_first(self->file_paths); + int i = 0; + while (NULL != path) { + fprintf(stderr, "[%3d] %s\n", i++, path); + path = (char *)mlist_next(self->file_paths); + } + fprintf(stderr,"\n"); +} + +// allocate app_ctx_t resources +static app_ctx_t *app_ctx_new() +{ + app_ctx_t *instance = (app_ctx_t *)malloc(sizeof(app_ctx_t)); + if(instance != NULL){ + instance->fp = NULL; + instance->fd = -1; + instance->total_rbytes = 0; + instance->total_wbytes = 0; + instance->burst_count = 0; + instance->fend = 0; + instance->tx_flag = true; + instance->quit_flag = false; + } + return instance; +} + +// release app_ctx_t resources +static void app_ctx_destroy(app_ctx_t **pself) +{ + if(pself != NULL){ + app_ctx_t *self = *pself; + if(self != NULL){ + if(self->fp != NULL) + fclose(self->fp); + close(self->fd); + free(self); + } + *pself = NULL; + } +} + +// initialze state +static int init_ctx(app_ctx_t *ctx, app_cfg_t *cfg) +{ + int retval = 0; + // open output port + ctx->fd = open(cfg->ser_device, O_RDWR|O_NOCTTY); + + if(ctx->fd < 0){ + fprintf(stderr, "could not open %s %d/%s\n", cfg->ser_device, errno, strerror(errno)); + return -1; + } + + config_serial(ctx->fd, cfg); + + return retval; +} + +// configure serial terminal +static void config_serial(int fd, app_cfg_t *cfg) +{ + struct termios tty; + if(tcgetattr(fd, &tty) != 0) { + fprintf(stderr, "Error %i from tcgetattr: %s\n", errno, strerror(errno)); + } + + cfmakeraw(&tty); + + if(cfg->flow == 'R') { + tty.c_cflag |= CRTSCTS; // Disable RTS/CTS hardware flow control + } else if(cfg->flow == 'X') { + tty.c_iflag |= (IXON); // Enable s/w flow ctrl input + tty.c_iflag |= (IXOFF); // Enable s/w flow ctrl output + tty.c_iflag &= ~(IXANY); // Enable s/w flow ctrl + tty.c_cc[VSTART] = XON; + tty.c_cc[VSTOP] = XOFF; + tty.c_cc[VTIME] = 1; // Wait for up to 1s (10 deciseconds), returning as soon as any data is received. + tty.c_cc[VMIN] = 0; + } + +#if 0 + tty.c_cflag &= ~(CSIZE|PARENB); // Clear parity bit + tty.c_cflag &= ~CSTOPB; // Clear stop field (one stop bit) + tty.c_cflag |= CS8; // 8 bits per byte + tty.c_cflag |= CREAD; // Turn on READ & ignore ctrl lines + tty.c_cflag |= CLOCAL; // Turn on READ & ignore ctrl lines + tty.c_lflag &= ~ICANON; + tty.c_lflag &= ~ECHO; // Disable echo + tty.c_lflag &= ~ECHOE; // Disable erasure + tty.c_lflag &= ~ECHONL; // Disable new-line echo + tty.c_lflag &= ~ISIG; // Disable interpretation of INTR, QUIT and SUSP + tty.c_lflag &= ~IEXTEN; // Disable implementation-defined input processing + // tty.c_iflag &= ~(IXON | IXOFF | IXANY); // Enable s/w flow ctrl + tty.c_iflag &= ~(IGNBRK|BRKINT|PARMRK|ISTRIP|INLCR|IGNCR|ICRNL); // Disable any special handling of received bytes + tty.c_oflag &= ~OPOST; // Prevent special interpretation of output bytes (e.g. newline chars) + tty.c_oflag &= ~ONLCR; // Prevent conversion of newline to carriage return/line feed + // tty.c_oflag &= ~OXTABS; // Prevent conversion of tabs to spaces (NOT PRESENT IN LINUX) + // tty.c_oflag &= ~ONOEOT; // Prevent removal of C-d chars (0x004) in output (NOT PRESENT IN LINUX) + tty.c_cc[VTIME] = 0; // Wait for up to 1s (10 deciseconds), returning as soon as any data is received. + tty.c_cc[VMIN] = 0; + + if(cfg->flow == 'X'){ + tty.c_iflag &= (IXON); // Enable s/w flow ctrl + tty.c_iflag &= ~(IXOFF | IXANY); // Enable s/w flow ctrl + tty.c_cc[VSTOP] = XOFF; + tty.c_cc[VSTART] = XON; + } +#endif + + + // Set in/out baud rate + switch(cfg->ser_baud){ + case 1200: + cfsetispeed(&tty, B1200); + cfsetospeed(&tty, B1200); + break; + case 1800: + cfsetispeed(&tty, B1800); + cfsetospeed(&tty, B1800); + break; + case 2400: + cfsetispeed(&tty, B2400); + cfsetospeed(&tty, B2400); + break; + case 4800: + cfsetispeed(&tty, B9600); + cfsetospeed(&tty, B9600); + break; + case 9600: + cfsetispeed(&tty, B9600); + cfsetospeed(&tty, B9600); + break; + case 19200: + cfsetispeed(&tty, B19200); + cfsetospeed(&tty, B19200); + break; + case 38400: + cfsetispeed(&tty, B38400); + cfsetospeed(&tty, B38400); + break; + case 57600: + cfsetispeed(&tty, B57600); + cfsetospeed(&tty, B57600); + break; + case 115200: + cfsetispeed(&tty, B115200); + cfsetospeed(&tty, B115200); + break; + default: + fprintf(stderr, "ERR - invalid ser_baud %u\n", cfg->ser_baud); + break; + }; + + if(tcsetattr(fd, TCSANOW, &tty) != 0) + fprintf(stderr, "ERR - tcsetattr failed %d/%s\n", errno, strerror(errno)); + + return; +} + +// return true of CTS is set +static bool cts_is_set(int fd) +{ + int modstat=0; + if(ioctl(fd, TIOCMGET, &modstat) != 0) + fprintf(stderr, "ERR TIOCMGET- %d/%s\n", errno, strerror(errno)); + + return ((modstat&TIOCM_CTS) != 0); +} + +// wait for flow control to enable output +static bool wait_flow_on(app_ctx_t *ctx, app_cfg_t *cfg) +{ + + // monitor flow control, enable output on start + if(cfg->flow == 'R'){ + + // wait for CTS + while( !g_interrupt){ + + if(cts_is_set(ctx->fd)) { + + if(cfg->verbose >= 1) + fprintf(stderr, "\nENABLE TX (CTS)\n"); + ctx->tx_flag = true; + ctx->burst_count = 0; + return true; + } + usleep(10000); + } + } +#if EMS_WITH_XONXOFF + else if(cfg->flow == 'X'){ + while(!g_interrupt){ + unsigned char flow_stat = 0; + if(read(fd, &flow_stat, 1) == 1 && flow_stat == XON){ + if(cfg->verbose >= 1) + fprintf(stderr, "\nENABLE TX (XON)\n"); + tcflush(fd, TCIFLUSH); + + ctx->tx_flag = true; + ctx->burst_count = 0; + return true; + } + usleep(10000); + } + } +#endif + + return false; +} + +// check flow control, return false on stop +static bool check_flow_on(app_ctx_t *ctx, app_cfg_t *cfg) +{ + // monitor flow control, disable output on stop + if(cfg->flow == 'R'){ + // check CTS, stop sending if asserted + if(!cts_is_set(ctx->fd)){ + if(cfg->verbose >= 1) + fprintf(stderr, "\nDISABLE TX (CTS)\n"); + ctx->tx_flag = false; + return false; + } + } +#if EMS_WITH_XONXOFF + else if(cfg->flow == 'X'){ + unsigned char flow_stat = 0; + if(read(ctx->fd, &flow_stat, 1) == 1 && flow_stat == XOFF){ + if(cfg->verbose >= 1) + fprintf(stderr, "\nDISABLE TX (XOFF)\n"); + tcflush(ctx->fd, TCIFLUSH); + ctx->tx_flag = false; + return false; + } + } +#endif + + ctx->tx_flag = true; + return true; +} + +// read from input file and write to serial port +// and delay (if > 0) +// transfer size set by IO buffer size (ibuf) +static int write_data(app_ctx_t *ctx, app_cfg_t *cfg) +{ + int retval = 0; + static uint64_t obytes = 0; + + if(!ctx->tx_flag) + return 0; + + // read byte(s) from input file + size_t rbytes = fread(cfg->ibuf, 1, (size_t)cfg->ibuf_sz, ctx->fp); + + if( rbytes > 0) { + ctx->total_rbytes += rbytes; + ctx->burst_count += rbytes; + + unsigned char *op = cfg->ibuf; + ssize_t rem_bytes = rbytes; + while(rem_bytes > 0){ + // write byte(s) to output (should block until sent) + ssize_t wb = write(ctx->fd, op, rem_bytes); + tcdrain(ctx->fd); + + if(wb > 0) { + ctx->total_wbytes += wb; + rem_bytes -= wb; + op += wb; + + if(wb < rbytes){ + fprintf(stderr, "\nWARN - write returned %zd/%zd\n", wb, rbytes); + } + } else{ + fprintf(stderr, "\nERR - write returned %zd ibuf %p len %" PRIu64 " %d/%s\n", wb, cfg->ibuf, cfg->ibuf_sz, errno, strerror(errno)); + } + } + + // display bytes + if(cfg->verbose >= 4 ){ + for(int i = 0; i < rbytes; i++){ + if((obytes % 16) == 0) + fprintf(stderr, "\n%08llx: ", (long long unsigned)obytes); + fprintf(stderr, "%02X ", cfg->ibuf[i]); + obytes++; + } + } + + // delay per configuration + if(cfg->ser_delay_us > 0) + usleep(cfg->ser_delay_us); + + } else { + fprintf(stderr, "ERR - fread returned %zd feof %d ferr %d %d/%s\n", rbytes, feof(ctx->fp), ferror(ctx->fp), errno, strerror(errno)); + if(feof(ctx->fp)){ + ctx->quit_flag = true; + retval = -1; + } + + clearerr(ctx->fp); + } + return retval; +} + +int main(int argc, char **argv) +{ + // configure signal handling + struct sigaction saStruct; + sigemptyset(&saStruct.sa_mask); + saStruct.sa_flags = 0; + saStruct.sa_handler = s_termination_handler; + sigaction(SIGINT, &saStruct, NULL); + + // get default configuration + app_cfg_t *cfg = app_cfg_new(); + + // parse options + s_parse_args(argc, argv, cfg); + app_cfg_show(cfg); + + // get context, initilize + app_ctx_t *ctx = app_ctx_new(); + + if(init_ctx(ctx, cfg) == 0){ + + // if context valid, process input file + + if(cfg->verbose > 0 ){ + fprintf(stderr, "%s output device %s connected fd %d %u bps\n", __func__, cfg->ser_device, ctx->fd, cfg->ser_baud); + } + + // iterate over input file path list + char *path = (char *)mlist_first(cfg->file_paths); + + while (NULL != path) { + + // open next input file + ctx->fp = fopen(path, "r"); + + if(ctx->fp != NULL){ + // get file size, end pointer + fseek(ctx->fp, 0, SEEK_END); + ctx->fend = ftell(ctx->fp); + fseek(ctx->fp, 0, SEEK_SET); + + if(cfg->verbose > 0 ){ + fprintf(stderr, "%s input file %s open fp %p\n", __func__, path, ctx->fp); + fprintf(stderr, "%s ftell %ld fend %ld\n", __func__, ftell(ctx->fp), ctx->fend); + } + } else { + fprintf(stderr, "ERR - fopen failed file %s %d/%s\n", path, errno, strerror(errno)); + } + + // process file + while(ctx->fp != NULL && !ctx->quit_flag && !g_interrupt) { + + memset(cfg->ibuf, 0, cfg->ibuf_sz); + + // quit if end of input + if(ftell(ctx->fp) >= ctx->fend) + break; + + // wait for flow control enable + wait_flow_on(ctx, cfg); + + while(ctx->tx_flag && !g_interrupt && !ctx->quit_flag){ + + // quit if end of input + if(ftell(ctx->fp) >= ctx->fend) + break; + + // check flow control, disable output on stop + check_flow_on(ctx, cfg); + + // do output when enabled + write_data(ctx, cfg); + } + } + + // close file, get next + if(ctx->fp != NULL){ + fclose(ctx->fp); + ctx->fp = NULL; + } + + path = (char *)mlist_next(cfg->file_paths); + } + + } else { + fprintf(stderr, "ERR - init_ctx failed; %d/%s\n", errno, strerror(errno)); + } + + if(cfg->verbose > 0 ){ + fprintf(stderr, "\n read %" PRIu64 "/%08llX wrote %" PRIu64 "/%08llX bytes\n", ctx->total_rbytes, (long long unsigned)ctx->total_rbytes, ctx->total_wbytes, (long long unsigned)ctx->total_wbytes); + } + + app_cfg_destroy(&cfg); + app_ctx_destroy(&ctx); + + return 0; +} diff --git a/src/mbtrnutils/mbtrnpp.c b/src/mbtrnutils/mbtrnpp.c index de2c0b4c3..32caaec01 100644 --- a/src/mbtrnutils/mbtrnpp.c +++ b/src/mbtrnutils/mbtrnpp.c @@ -40,12 +40,15 @@ #include #include #include +#include #include #include #include #include #include +#include #include +#include #include "mb_status.h" #include "mb_format.h" @@ -113,7 +116,8 @@ struct mbtrnpp_ping_struct { typedef enum { INPUT_MODE_SOCKET = 1, - INPUT_MODE_FILE = 2 + INPUT_MODE_FILE = 2, + INPUT_MODE_SERIAL = 3 } input_mode_t; typedef enum{ @@ -752,16 +756,60 @@ FILE *output_trn_fp = NULL; #endif // WITH_MBTNAV +#define XON 0x11 +#define XOFF 0x13 + +typedef enum{ + EM710_EOK = 0, + EM710_ETYPE = 1, + EM710_ESTX = 2, + EM710_EETX = 3, + EM710_ECHK = 4, + EM710_EREAD = 5, + EM710_EOFLOW = 6, + EM710_EMEM = 7, + EM710_EINVAL = 8, + EM710_ECOUNT +}em710_frame_err_t; + +const char *em_frame_err_str[]={"EM_EOK","EM_ETYPE","EM_ESTX","EM_EETX","EM_ECHK","EM_EREAD","EM_EOFLOW","EM_EMEM","EM_EINVAL"}; + +typedef struct ser_buf_s{ + int fd; + int64_t size; + byte *data; + byte *pread; + // pend points to last char in buffer + // (typically, but not necessarily end of buffer) + byte *pend; +} ser_buf_t; + +typedef struct em_ser_ctx_s { + + bool fill_stx; + uint64_t stream_ofs; + uint64_t frame_count[3]; + ser_buf_t *ser_buffer; + size_t dgram_bytes; + byte *bp; + byte *pstx; + byte *petx; + struct mbsys_simrad3_header *header; +}em_ser_ctx_t; + struct sockaddr_in em_sock_addr; socklen_t em_sock_len; +unsigned char em_ser_flow = 'R'; -#ifdef WITH_EM710_ALL_LOG +#if WITH_EM710_ALL_LOG // log em710 frames (debug) FILE *em_all_log = NULL; const char *em_all_name="em-all.bin"; #endif -#ifdef WITH_EM710_UDP_LOG +#define WITH_EM710_UDP_LOG 1 + +#if WITH_EM710_UDP_LOG FILE *em_udp_log = NULL; const char *em_udp_name="em-udp.bin"; #endif @@ -905,6 +953,9 @@ int mbtrnpp_kemkmall_input_close(int verbose, void *mbio_ptr, int *error); int mbtrnpp_em710raw_input_open(int verbose, void *mbio_ptr, char *definition, int *error); int mbtrnpp_em710raw_input_read(int verbose, void *mbio_ptr, size_t *size, char *buffer, int *error); int mbtrnpp_em710raw_input_close(int verbose, void *mbio_ptr, int *error); +int mbtrnpp_em710raw_input_open_ser(int verbose, void *mbio_ptr, char *definition, int *error); +int mbtrnpp_em710raw_input_read_ser(int verbose, void *mbio_ptr, size_t *size, char *buffer, int *error); +int mbtrnpp_em710raw_input_close_ser(int verbose, void *mbio_ptr, int *error); #ifdef WITH_MB1_READER int mbtrnpp_mb1r_input_open(int verbose, void *mbio_ptr, char *definition, int *error); int mbtrnpp_mb1r_input_read(int verbose, void *mbio_ptr, size_t *size, char *buffer, int *error); @@ -2175,7 +2226,8 @@ static int s_parse_opt_input(mbtrnpp_cfg_t *cfg, char *opt_str) if ((psdef=strstr(opt_str, "socket:"))!=NULL) { size_t sdef_len=strlen(psdef); - if(sdef_len>0 && sdef_len 0 && sdef_len < MB_PATH_SIZE){ psdef+=strlen("socket:"); // set socket mode and definition cfg->input_mode = INPUT_MODE_SOCKET; @@ -2185,6 +2237,18 @@ static int s_parse_opt_input(mbtrnpp_cfg_t *cfg, char *opt_str) } // fprintf(stderr, "socket_definition|%s\n", cfg->socket_definition); + } else if ((psdef=strstr(opt_str, "serial:"))!=NULL) { + + size_t sdef_len=strlen(psdef); + + if(sdef_len > 0 && sdef_len < MB_PATH_SIZE){ + psdef+=strlen("serial:"); + // set socket mode and definition + cfg->input_mode = INPUT_MODE_SERIAL; + sprintf(cfg->socket_definition,"%s",psdef); + } else { + fprintf(stderr,"serial definition length invalid [%s/%zu/%zu]\n",psdef,sdef_len,(size_t)MB_PATH_SIZE); + } } else { // cfg->input is input file name cfg->input_mode = INPUT_MODE_FILE; @@ -2772,6 +2836,12 @@ static int s_mbtrnpp_validate_config(mbtrnpp_cfg_t *cfg) fprintf(stderr,"ERR - socket_definition not set\n"); } break; + case INPUT_MODE_SERIAL: + if(strlen(cfg->socket_definition)==0){ + err_count++; + fprintf(stderr,"ERR - serial_definition not set\n"); + } + break; default: err_count++; fprintf(stderr,"ERR - invalid input mode [%d]\n",cfg->input_mode); @@ -3759,10 +3829,68 @@ int main(int argc, char **argv) { if (mbtrn_cfg->verbose > 0) fprintf(stderr, "%s\n", log_message); } - } + } else if (strncmp(mbtrn_cfg->input, "serial", 6) == 0) { - /* otherwised open swath data files as is normal for MB-System programs */ - else { + if (mbtrn_cfg->format == MBF_EM710RAW) { + mbtrnpp_input_open = &mbtrnpp_em710raw_input_open_ser; + mbtrnpp_input_read = &mbtrnpp_em710raw_input_read_ser; + mbtrnpp_input_close = &mbtrnpp_em710raw_input_close_ser; + } + else{ + fprintf(stderr,"ERR - Invalid output format [%d]\n",mbtrn_cfg->format); + } + + if ((status = mb_input_init(mbtrn_cfg->verbose, mbtrn_cfg->socket_definition, mbtrn_cfg->format, pings, lonflip, bounds, + btime_i, etime_i, speedmin, timegap, + &imbio_ptr, &btime_d, &etime_d, + &beams_bath, &beams_amp, &pixels_ss, + mbtrnpp_input_open, mbtrnpp_input_read, mbtrnpp_input_close, + &error)) != MB_SUCCESS) { + sprintf(log_message, "MBIO Error returned from function "); + if (logfp != NULL) + mbtrnpp_postlog(mbtrn_cfg->verbose, logfp, log_message, &error); + fprintf(stderr, "\n%s\n", log_message); + + mb_error(mbtrn_cfg->verbose, error, &message); + if (logfp != NULL) + mbtrnpp_postlog(mbtrn_cfg->verbose, logfp, message, &error); + fprintf(stderr, "%s\n", message); + + sprintf(log_message, "Sonar data socket <%s> not initialized for reading", ifile); + if (logfp != NULL) + mbtrnpp_postlog(mbtrn_cfg->verbose, logfp, log_message, &error); + fprintf(stderr, "\n%s\n", log_message); + + sprintf(log_message, "Program <%s> Terminated", program_name); + if (logfp != NULL) + mbtrnpp_postlog(mbtrn_cfg->verbose, logfp, log_message, &error); + fprintf(stderr, "\n%s\n", log_message); + + mlog_tprintf(mbtrnpp_mlog_id,"e,sonar data connection init failed\n"); + MST_COUNTER_INC(app_stats->stats->events[MBTPP_EV_EMBCON]); + + s_mbtrnpp_exit(error); + } + else { + + sprintf(log_message, "Sonar data serial <%s> initialized for reading", ifile); + mlog_tprintf(mbtrnpp_mlog_id,"i,sonar data serial initialized\n"); + mlog_tprintf(mbtrnpp_mlog_id,"MBIO format id,%d\n", mbtrn_cfg->format); + MST_COUNTER_INC(app_stats->stats->events[MBTPP_EV_MB_CONN]); + + if (logfp != NULL) + mbtrnpp_postlog(mbtrn_cfg->verbose, logfp, log_message, &error); + if (mbtrn_cfg->verbose > 0) + fprintf(stderr, "\n%s\n", log_message); + + sprintf(log_message, "MBIO format id: %d", mbtrn_cfg->format); + if (logfp != NULL) + mbtrnpp_postlog(mbtrn_cfg->verbose, logfp, log_message, &error); + if (mbtrn_cfg->verbose > 0) + fprintf(stderr, "%s\n", log_message); + } + }else { + /* otherwised open swath data files as is normal for MB-System programs */ if ((status = mb_read_init(mbtrn_cfg->verbose, ifile, mbtrn_cfg->format, pings, lonflip, bounds, btime_i, etime_i, speedmin, timegap, &imbio_ptr, &btime_d, &etime_d, &beams_bath, &beams_amp, &pixels_ss, &error)) != @@ -4371,15 +4499,23 @@ int main(int argc, char **argv) { // deal with fatal error > 0 - this is usually MB_ERROR_EOF if ((status == MB_FAILURE) && (error > 0)) { - if (mbtrn_cfg->input_mode == INPUT_MODE_SOCKET) { + if (mbtrn_cfg->input_mode == INPUT_MODE_SOCKET) { - MST_COUNTER_INC(app_stats->stats->events[MBTPP_EV_EMBGETALL]); + MST_COUNTER_INC(app_stats->stats->events[MBTPP_EV_EMBGETALL]); - fprintf(stderr, "EOF (input socket) - clear status/error\n"); - status = MB_SUCCESS; - error = MB_ERROR_NO_ERROR; + fprintf(stderr, "EOF (input socket) - clear status/error\n"); + status = MB_SUCCESS; + error = MB_ERROR_NO_ERROR; - } + } else if (mbtrn_cfg->input_mode == INPUT_MODE_SERIAL) { + + MST_COUNTER_INC(app_stats->stats->events[MBTPP_EV_EMBGETALL]); + + fprintf(stderr, "EOF (input serial) - clear status/error\n"); + status = MB_SUCCESS; + error = MB_ERROR_NO_ERROR; + + } else { done = true; status = MB_SUCCESS; @@ -4415,14 +4551,21 @@ int main(int argc, char **argv) { } // while(!done) [main loop] /* close the files */ - if (mbtrn_cfg->input_mode == INPUT_MODE_SOCKET) { - fprintf(stderr, "socket input mode - continue (probably shouldn't be here)\n"); - mlog_tprintf(mbtrnpp_mlog_id,"e,invalid code path - socket input mode\n"); - read_data = true; + if (mbtrn_cfg->input_mode == INPUT_MODE_SOCKET) { + fprintf(stderr, "socket input mode - continue (probably shouldn't be here)\n"); + mlog_tprintf(mbtrnpp_mlog_id,"e,invalid code path - socket input mode\n"); + read_data = true; - // empty the ring buffer - ndata = 0; - } + // empty the ring buffer + ndata = 0; + } else if (mbtrn_cfg->input_mode == INPUT_MODE_SERIAL) { + fprintf(stderr, "serial input mode - continue (probably shouldn't be here)\n"); + mlog_tprintf(mbtrnpp_mlog_id,"e,invalid code path - serial input mode\n"); + read_data = true; + + // empty the ring buffer + ndata = 0; + } else { status = mb_close(mbtrn_cfg->verbose, &imbio_ptr, &error); @@ -7102,7 +7245,6 @@ int mbtrnpp_kemkmall_input_close(int verbose, void *mbio_ptr, int *error) { return (status); } - /*--------------------------------------------------------------------*/ int mbtrnpp_em710raw_input_open(int verbose, void *mbio_ptr, char *definition, int *error) { @@ -7206,7 +7348,7 @@ int mbtrnpp_em710raw_input_open(int verbose, void *mbio_ptr, char *definition, i em_all_log = fopen(em_all_name, "w+"); #endif #ifdef WITH_EM710_UDP_LOG - em_all_log = fopen(em_udp_name, "w+"); + em_udp_log = fopen(em_udp_name, "w+"); #endif // save the socket within the mb_io structure @@ -7541,6 +7683,1010 @@ int mbtrnpp_em710raw_input_close(int verbose, void *mbio_ptr, int *error) { return (status); } +/*--------------------------------------------------------------------*/ + +int mbtrnpp_em710raw_input_open_ser(int verbose, void *mbio_ptr, char *definition, int *error) { + + // local variables + int status = MB_SUCCESS; + struct mb_io_struct *mb_io_ptr; + + // print input debug statements + if (verbose >= 2) { + fprintf(stderr, "\ndbg2 MBIO function <%s> called\n", __func__); + fprintf(stderr, "dbg2 Input arguments:\n"); + fprintf(stderr, "dbg2 verbose: %d\n", verbose); + fprintf(stderr, "dbg2 mbio_ptr: %p,%p\n", mbio_ptr, &mbio_ptr); + fprintf(stderr, "dbg2 definition: %s\n", definition); + } + + // get pointer to mbio descriptor + mb_io_ptr = (struct mb_io_struct *)mbio_ptr; + + // set initial status + status = MB_SUCCESS; + + // set flag to enable Sentry sensordepth kluge + int *kluge_set = (int *)&mb_io_ptr->save10; + *kluge_set = 1; + + // Open and initialize the socket based input for reading using function + // mbtrnpp_kemall_input_read(). + // - use mb_io_ptr->mbsp to hold pointer to socket i/o structure + // - the socket definition = "hostInterface:broadcastGroup:port" + int port=-1; + unsigned int ser_baud=115200; + unsigned char ser_flow='R'; + mb_path in_type; + mb_path ser_device; + struct ip_mreq group; + char *token; + char *saveptr; + + if ((token = strtok_r(definition, ":", &saveptr)) != NULL) { + strncpy(ser_device, token, sizeof(mb_path)); + } + if ((token = strtok_r(NULL, ":", &saveptr)) != NULL) { + sscanf(token, "%u", &ser_baud); + } + if ((token = strtok_r(NULL, ":", &saveptr)) != NULL) { + if(toupper(token[0]) == 'X') + ser_flow = 'X'; + if(toupper(token[0]) == 'R') + ser_flow = 'R'; + if(toupper(token[0]) == 'N') + ser_flow = 'N'; + } + em_ser_flow = ser_flow; + + //sscanf(definition, "%s:%s:%d", hostInterface, bcastGrp, &port); + fprintf(stderr, "Attempting to open serial port to Kongsberg sonar output at:\n"); + fprintf(stderr, " Definition: %s\n", definition); + fprintf(stderr, " ser_device: %s\n ser_baud: %u\n nser_flow %c\n", + ser_device, ser_baud, ser_flow); + + // Create a datagram socket on which to receive. + int sd = -1; + sd = open(ser_device, O_RDWR|O_NOCTTY); + + if (sd < 0) + { + perror("Opening datagram serial port error"); + + mlog_tprintf(mbtrnpp_mlog_id,"e,serial port [%d/%s]\n",errno,strerror(errno)); + status=MB_FAILURE; + *error=MB_ERROR_OPEN_FAIL; + return status; + } + + // configure serial port + struct termios tty; + if(tcgetattr(sd, &tty) != 0) { + fprintf(stderr, "Error %i from tcgetattr: %s\n", errno, strerror(errno)); + } + cfmakeraw(&tty); + + if(ser_flow == 'R'){ + tty.c_cflag |= CRTSCTS; // Enable RTS/CTS hardware flow control + } else if(ser_flow == 'X'){ + tty.c_iflag |= (IXON); // Enable s/w flow ctrl input + tty.c_iflag |= (IXOFF); // Enable s/w flow ctrl output + tty.c_iflag &= ~(IXANY); // Enable s/w flow ctrl + tty.c_cc[VSTART] = XON; + tty.c_cc[VSTOP] = XOFF; + } + +#if 0 + tty.c_cflag &= ~(CSIZE|PARENB); // Clear parity bit + tty.c_cflag &= ~CSTOPB; // Clear stop field (one stop bit) + tty.c_cflag |= CS8; // 8 bits per byte + tty.c_cflag |= CREAD; // Turn on READ & ignore ctrl lines + tty.c_cflag |= CLOCAL; // Turn on READ & ignore ctrl lines + tty.c_lflag &= ~ICANON; + tty.c_lflag &= ~ECHO; // Disable echo + tty.c_lflag &= ~ECHOE; // Disable erasure + tty.c_lflag &= ~ECHONL; // Disable new-line echo + tty.c_lflag &= ~ISIG; // Disable interpretation of INTR, QUIT and SUSP + tty.c_lflag &= ~IEXTEN; // Disable implementation-defined input processing +// tty.c_iflag &= ~(IXON | IXOFF | IXANY); // Enable s/w flow ctrl + tty.c_iflag &= ~(IXON); // Disable output s/w flow ctrl + tty.c_iflag &= ~(IXOFF | IXANY); // Disable input s/w flow ctrl + tty.c_iflag &= ~(IGNBRK|BRKINT|PARMRK|ISTRIP|INLCR|IGNCR|ICRNL); // Disable any special handling of received bytes + tty.c_oflag &= ~OPOST; // Prevent special interpretation of output bytes (e.g. newline chars) + tty.c_oflag &= ~ONLCR; // Prevent conversion of newline to carriage return/line feed + // tty.c_oflag &= ~OXTABS; // Prevent conversion of tabs to spaces (NOT PRESENT IN LINUX) + // tty.c_oflag &= ~ONOEOT; // Prevent removal of C-d chars (0x004) in output (NOT PRESENT IN LINUX) + tty.c_cc[VTIME] = 0; // Wait for up to 1s (10 deciseconds), returning as soon as any data is received. + tty.c_cc[VMIN] = 0; +// tty.c_cc[VSTOP] = XOFF; //0x13 +// tty.c_cc[VSTART] = XON; //0x11 +#endif + fprintf(stderr, "%s connected fd %d %s:%d\n", __func__, sd, ser_device, ser_baud); + + // Set in/out baud rate to be 9600 + // Set in/out baud rate + bool baud_valid = true; + switch(ser_baud){ + case 1200: + cfsetispeed(&tty, B1200); + cfsetospeed(&tty, B1200); + break; + case 1800: + cfsetispeed(&tty, B1800); + cfsetospeed(&tty, B1800); + break; + case 2400: + cfsetispeed(&tty, B2400); + cfsetospeed(&tty, B2400); + break; + case 4800: + cfsetispeed(&tty, B9600); + cfsetospeed(&tty, B9600); + break; + case 9600: + cfsetispeed(&tty, B9600); + cfsetospeed(&tty, B9600); + break; + case 19200: + cfsetispeed(&tty, B19200); + cfsetospeed(&tty, B19200); + break; + case 38400: + cfsetispeed(&tty, B38400); + cfsetospeed(&tty, B38400); + break; + case 57600: + cfsetispeed(&tty, B57600); + cfsetospeed(&tty, B57600); + break; + case 115200: + cfsetispeed(&tty, B115200); + cfsetospeed(&tty, B115200); + break; + default: + fprintf(stderr, "ERR - invalid ser_baud %u\n", ser_baud); + baud_valid = false; + break; + }; + + if(tcsetattr(sd, TCSANOW, &tty) != 0) + fprintf(stderr, "ERR - tcsetattr failed %d/%s\n", errno, strerror(errno)); + +#ifdef WITH_EM710_ALL_LOG + em_all_log = fopen(em_all_name, "w+"); +#endif +#ifdef WITH_EM710_UDP_LOG + em_udp_log = fopen(em_udp_name, "w+"); +#endif + + // save the socket within the mb_io structure + int *sd_ptr = NULL; + status &= mb_mallocd(verbose, __FILE__, __LINE__, sizeof(sd), (void **)&sd_ptr, error); + *sd_ptr = sd; + mb_io_ptr->mbsp = (void *) sd_ptr; + + /*initialize buffer for fragmented MWZ and MRC datagrams*/ + memset(mRecordBuf, 0, sizeof(mRecordBuf)); + + /* print output debug statements */ + if (verbose >= 2) { + fprintf(stderr, "\ndbg2 MBIO function <%s> completed\n", __func__); + fprintf(stderr, "dbg2 Return values:\n"); + fprintf(stderr, "dbg2 error: %d\n", *error); + fprintf(stderr, "dbg2 Return status:\n"); + fprintf(stderr, "dbg2 status: %d\n", status); + } + + MST_COUNTER_INC(app_stats->stats->events[MBTPP_EV_MB_CONN]); + + /* return */ + return (status); +} + +/*--------------------------------------------------------------------*/ + +// allocate a serial buffer instance +ser_buf_t *ser_buf_new(int fd, uint64_t size){ + ser_buf_t *instance = (ser_buf_t *)malloc(sizeof(ser_buf_t)); + if(instance != NULL){ + instance->fd = fd; + instance->size = size; + instance->data = NULL; + instance->pread = NULL; + instance->pend = NULL; + } + return instance; +} + +// release serial buffer resources +void ser_buf_destroy(ser_buf_t **ppself) +{ + if(ppself != NULL){ + ser_buf_t *self = *(ppself); + if(self != NULL){ + free(self->data); + *ppself = NULL; + } + } +} + +// allocate a serial IO context instance +em_ser_ctx_t *em710_ser_ctx_new(int fd, int64_t size) +{ + em_ser_ctx_t *instance = (em_ser_ctx_t *)malloc(sizeof(em_ser_ctx_t)); + if(instance != NULL){ + instance->fill_stx = false; + instance->stream_ofs = 0; + for(int i = 0; i < 3; i++) + instance->frame_count[i] = 0; + instance->dgram_bytes = 0; + instance->bp = NULL; + instance->pstx = NULL; + instance->petx = NULL; + instance->header = NULL; + instance->ser_buffer = ser_buf_new(fd, size); + } + return instance; +}; + +// release a serial IO context resources +void em710_ser_ctx_destroy(em_ser_ctx_t **ppself) +{ + if(ppself != NULL){ + em_ser_ctx_t *self = *(ppself); + if(self != NULL){ + ser_buf_destroy(&self->ser_buffer); + *ppself = NULL; + } + } +}; + +/*--------------------------------------------------------------------*/ + +static bool mbtrnpp_em710raw_validate_frame(byte *src, unsigned int len, int *r_err, int verbose) +{ + + bool retval = true; + struct mbsys_simrad3_header *header = (struct mbsys_simrad3_header *)src; + + byte *petx = src + header->numBytesDgm + 1; + byte *psum = (byte *)&header->dgmSTX + 1; + byte *pbchk = (byte *)petx+1; + unsigned short *pchk = (unsigned short *)pbchk; + + switch(header->dgmType){ + + case ALL_INSTALLATION_U: + case ALL_INSTALLATION_L: + case ALL_REMOTE: + case ALL_RUNTIME: + case ALL_RAW_RANGE_BEAM_ANGLE: + case ALL_XYZ88: + case ALL_CLOCK: + case ALL_ATTITUDE: + case ALL_POSITION: + case ALL_SURFACE_SOUND_SPEED: + break; + default: + MX_BPRINT((verbose < -2), "%s: invalid type %02x\n", __func__, header->dgmType); + if(r_err != NULL) *r_err = EM710_ETYPE; + return false; + break; + }; + + if(header->dgmSTX != EM3_START_BYTE) { + if(r_err != NULL) *r_err = EM710_ESTX; + MX_BPRINT((verbose < -2), "%s: invalid STX %02X/%02X\n", __func__, header->dgmSTX, EM3_START_BYTE); + return false; + } + + if(*petx != EM3_END_BYTE) { + MX_BPRINT((verbose < -2), "%s: invalid ETX %02X/%02X len(%u)\n", __func__, *petx, EM3_END_BYTE, len); + if(r_err != NULL) *r_err = EM710_EETX; + return false; + } + + short unsigned int sum = 0; + + while(psum < petx){ + sum += *psum; + psum++; + } + if(sum != *pchk){ + MX_BPRINT((verbose < -2), "%s: invalid checksum sum %04X/%04hu chk %04X/%04hu\n", __func__, sum, sum, *pchk, *pchk); + if(r_err != NULL) *r_err = EM710_ECHK; + return false; + } else { + MX_BPRINT((verbose > 1), "%s - petx ofs(%04lX) pchk ofs (%04lx) etx %02X\n", __func__, petx-src, pbchk-src, *petx); + MX_BPRINT((verbose > 1), "%s - sum %04hu/%04X checksum %04hu/%04X \n", __func__, sum, sum, *pchk, *pchk); + } + if(r_err != NULL) *r_err = EM710_EOK; + + return retval; +} + +/*--------------------------------------------------------------------*/ + +// start/stop sender RTS (RTS/CTS) +int mbtrnpp_em710raw_set_rts(int fd, bool state) +{ + int errors = 0; + + int modstat = 0; + + if(ioctl(fd, TIOCMGET, &modstat) != 0){ + fprintf(stderr, "ERR TIOCMGET- %d/%s\n", errno, strerror(errno)); + errors++; + } + + if(errors == 0){ + + // assert RTS (active low) + if(state) + modstat |= TIOCM_RTS; + else + modstat &= ~TIOCM_RTS; + + if(ioctl(fd, TIOCMSET, &modstat) != 0) { + fprintf(stderr, "ERR TIOCMSET- %d/%s\n", errno, strerror(errno)); + errors++; + } + } + // fprintf(stderr, "%s: fd %d state %c errors %d\n", __func__, fd, (start_flow ? 'Y' : 'N'), errors); + + return (errors == 0 ? 0 : -1); +} + +/*--------------------------------------------------------------------*/ + +int mbtrnpp_em710raw_set_cts(int fd, bool state) +{ + int errors = 0; + + int modstat = 0; + + if(ioctl(fd, TIOCMGET, &modstat) != 0){ + fprintf(stderr, "ERR TIOCMGET- %d/%s\n", errno, strerror(errno)); + errors++; + } + + if(errors == 0){ + + // assert RTS (active low) + if(state) + modstat |= TIOCM_CTS; + else + modstat &= ~TIOCM_CTS; + + if(ioctl(fd, TIOCMSET, &modstat) != 0) { + fprintf(stderr, "ERR TIOCMSET- %d/%s\n", errno, strerror(errno)); + errors++; + } + } + // fprintf(stderr, "%s: fd %d state %c errors %d\n", __func__, fd, (start_flow ? 'Y' : 'N'), errors); + + return (errors == 0 ? 0 : -1); +} + +/*--------------------------------------------------------------------*/ + +// check and refill serial buffer as needed +// handles serial IO, flow control, and buffering +int64_t mbtrnpp_em710raw_update_buffer(int fd, byte *buf, size_t len, const byte *save_ptr, uint64_t *r_stream_ofs, int verbose) +{ + + if(buf == NULL){ + fprintf(stderr, "%s: ERR - framebuf NULL\n", __func__); + return -1; + } + + if(len <= 0){ + fprintf(stderr, "%s: ERR - len <= 0\n", __func__); + return -1; + } + + if(save_ptr != NULL && + ((save_ptr < buf) || (save_ptr > buf + len))){ + fprintf(stderr, "%s: ERR - invalid save_ptr\n", __func__); + return -1; + } + + byte *end_ptr = buf + len; + + // copy remaining bytes (if any) + // to beginning of buffer + off_t save_len = 0; + if(save_ptr != NULL && save_ptr > buf){ + save_len = buf + len - save_ptr; + memcpy(buf, save_ptr, save_len); + } + + // set input pointer and zero input region + byte *wr_ptr = buf + save_len; + ssize_t wr_len = buf + len - wr_ptr; + memset(wr_ptr, 0, wr_len); + +// fprintf(stderr, "%s: fd %d buf %p len %zd\n", __func__, fd, buf, len); +// fprintf(stderr, "%s: save_prt %p save_len %lld \n", __func__, save_ptr, save_len); +// fprintf(stderr, "%s: wr_ptr %p wr_len %zd \n", __func__, wr_ptr, wr_len); + + +// int availBytes=0; +// int istat = ioctl(fd, FIONREAD, &availBytes); +// fprintf(stderr,"FIONREAD stat %d availBytes %d %d/%s\n", istat, availBytes, errno, strerror(errno)); + + byte *cur = wr_ptr; + size_t rem_bytes = wr_len; + size_t bytes_added = 0; + bool tx_en = false; + int64_t burst_bytes = 0; + while(rem_bytes > 0) { + + fd_set rd_fds; + struct timeval timeout; + int rc, max_fd; + + FD_ZERO(&rd_fds); + FD_SET( fd, &rd_fds ); + max_fd = fd + 1; + memset( &timeout, 0, sizeof(timeout) ); + timeout.tv_sec = 0; + timeout.tv_usec = 100000; + + rc = select(max_fd, &rd_fds, NULL, NULL, &timeout); + + if ( rc < 0 ) { + // an error occurred during the select() + perror( "select()" ); + } + else if ( rc == 0 ) { + // no sets ready + if(!tx_en){ + tx_en = true; + burst_bytes = 0; + + if(em_ser_flow == 'R'){ + MX_BMSG((verbose < -2), "ENABLE CTS\n"); + // enable sender transmit + mbtrnpp_em710raw_set_rts(fd, true); + } else if(em_ser_flow == 'X'){ + MX_BMSG((verbose < -2), "ENABLE XON\n"); + unsigned char c[1] = {XON}; + write(fd, c, 1); + } + } + } else { + + // at least one set is ready + // fprintf(stderr, "fd is %s\n", FD_ISSET(fd, &rd_fds) ? "READY" : "NOT READY"); + if(FD_ISSET(fd, &rd_fds)){ + + size_t rq = rem_bytes; + size_t rb = read(fd, cur, rq); + + if( rb > 0){ + rem_bytes -= rb; + cur += rb; + bytes_added += rb; + burst_bytes += rb; + if(r_stream_ofs != NULL) + *r_stream_ofs += rb; +// fprintf(stderr, "ser read wr_len %zd rem %4zd rq %4zd rb %4zd\n", wr_len, rem_bytes, rq, rb); + } + } + } + } + + // disable sender transmit + if(em_ser_flow == 'R'){ + MX_BPRINT((verbose < -2), "DISABLE CTS (%lld bytes)\n", burst_bytes); + mbtrnpp_em710raw_set_rts(fd, false); + } else if(em_ser_flow == 'X'){ + unsigned char c[1] = {XOFF}; + write(fd, c, 1); + MX_BPRINT((verbose < -2), "DISABLE XOFF (%lld bytes)\n", burst_bytes); + + } + +// int64_t sofs = (r_stream_ofs != NULL ? *r_stream_ofs : -1); +// fprintf(stderr, "%s: stream_ofs %llX (%llu) \n", __func__, sofs, sofs); + + + return bytes_added; +} + +/*--------------------------------------------------------------------*/ + +// maintain serial IO buffer +// return requested bytes from buffer, request refill when empty +int64_t mbtrnpp_em710raw_read_buffer(ser_buf_t *src, byte *dest, int64_t read_len, uint64_t *r_stream_ofs, int verbose) +{ + + int64_t retval = -1; + + // read from buffer and refill as needed + if(src == NULL || src->fd < 0 || src->size <= 0 || dest == NULL || read_len <= 0){ + fprintf(stderr, "%s: ERR - invalid argument ser_buf %p\n",__func__, src); + if(src != NULL){ + fprintf(stderr,"fd %d size %lld dest %p readlen %lld\n", src->fd, src->size, dest, read_len); + } + return retval; + } + + if(src->data == NULL){ + // uninitialized buffer + src->data = (byte *)malloc(src->size); + if(src->data == NULL){ + fprintf(stderr, "%s: ERR - malloc failed %d/%s\n",__func__, errno, strerror(errno)); + // memory error + return retval; + } + + src->pread = src->data; + src->pend = src->data; + } + + // request refill if not enough characters remaining + int64_t rem_bytes = (src->pend - src->pread); + if( rem_bytes < read_len){ + + int64_t new_bytes = mbtrnpp_em710raw_update_buffer(src->fd, src->data, src->size, src->pend, r_stream_ofs, verbose); + + if(new_bytes > 0) { + src->pread = src->data; + src->pend = src->data + rem_bytes + new_bytes; + rem_bytes = (src->pend - src->pread + 1); + }// else error + } + + // read bytes into destination + if(rem_bytes >= read_len){ + memcpy(dest, src->pread, read_len); + src->pread += read_len; + // return bytes read + retval = read_len; + } + +// fprintf(stderr, "%s: ret %lld\n",__func__, retval); + return retval; +} + +// read one .ALL frame +// handles stream errors/resync +// return frame length or -1 and sets r_err +// The raw .ALL format does not include frame length and the +// start/end sync bytes may be present in valid frame data. +// Basic algorithm +// - find a record start (STX) +// - find and validate type (if type invalid restart) +// - find next record start (STX2) +// - check for ETX at STX2-3 (if no ETX, continue) +// - validate checksum (return if OK, else restart) +int64_t mbtrnpp_em710raw_read_frame(em_ser_ctx_t *ctx, byte *frame_buf, size_t len, int *r_err, int verbose) +{ + + typedef enum { + ST_START = 0, + ST_FRAME_START, + ST_FRAME_TYPE, + ST_FRAME_END, + ST_VALIDATE, + ST_ERROR + }state_t; + + const char *state_names[] = {"ST_START","ST_FRAME_START","ST_FRAME_TYPE","ST_FRAME_END","ST_VALIDATE", "ST_ERROR"}; + + state_t state = ST_START; + + int64_t rbytes = 0; + + + if(ctx != NULL) { + // intiailze serial IO context + ctx->dgram_bytes = 0; + ctx->header = (struct mbsys_simrad3_header *)frame_buf; + ctx->bp = frame_buf + 4; + ctx->pstx = frame_buf + 5; + ctx->petx = NULL; + } else { + state = ST_ERROR; + if(r_err != NULL) + *r_err = EM710_EINVAL; + } + + while(! (state == ST_ERROR)){ + + // fprintf(stderr, "state %s fstx %c b %02X\n", state_names[state], (fill_stx ? 'Y' : 'N'), *(bp-1)); + // fprintf(stderr, "state %s bp %04lX:%02X %04lX:%02X \n", state_names[state], ((bp-1)-frame_buf), *(bp-1), (bp-frame_buf), *bp); + + if(state == ST_START){ + memset(frame_buf, 0, len); + if(ctx->fill_stx) + ctx->header->dgmSTX = 0x02; + ctx->bp = frame_buf + 4; + ctx->fill_stx = false; + state = ST_FRAME_START; + ctx->dgram_bytes = 0; + } + + if(state == ST_FRAME_START){ + if(ctx->header->dgmSTX == EM3_START_BYTE){ + ctx->bp++; + state = ST_FRAME_END; + ctx->dgram_bytes++; + } else { + rbytes = mbtrnpp_em710raw_read_buffer(ctx->ser_buffer, ctx->bp, 1, &ctx->stream_ofs, verbose); + if( rbytes == 1){ + + if(*ctx->bp == EM3_START_BYTE){ + ctx->bp++; + ctx->dgram_bytes++; + state = ST_FRAME_TYPE; + } + } else if(rbytes <= 0){ + if(r_err != NULL) + *r_err = EM710_EREAD; + state = ST_ERROR; + } + } + } + + if(state == ST_FRAME_TYPE){ + + rbytes = mbtrnpp_em710raw_read_buffer(ctx->ser_buffer, ctx->bp, 1, &ctx->stream_ofs, verbose); + + if( rbytes == 1){ + + switch(*ctx->bp){ + case ALL_INSTALLATION_U: + case ALL_INSTALLATION_L: + case ALL_REMOTE: + case ALL_RUNTIME: + case ALL_RAW_RANGE_BEAM_ANGLE: + case ALL_XYZ88: + case ALL_CLOCK: + case ALL_ATTITUDE: + case ALL_POSITION: + case ALL_SURFACE_SOUND_SPEED: + ctx->bp++; + ctx->dgram_bytes++; + state = ST_FRAME_END; + break; + default: + ctx->fill_stx = false; + state = ST_START; + + if(verbose >= 3) + fprintf(stderr, "INFO - invalid type %02X bp=%p fp=%p\n", *ctx->bp, ctx->bp, frame_buf); + break; + }; + + } else if(rbytes <= 0){ + if(r_err != NULL) + *r_err = EM710_EREAD; + state = ST_ERROR; + } + } + + if(state == ST_FRAME_END){ + + rbytes = mbtrnpp_em710raw_read_buffer(ctx->ser_buffer, ctx->bp, 1, &ctx->stream_ofs, verbose); + + if(rbytes == 1){ + + if(*ctx->bp == EM3_START_BYTE && *(ctx->bp-3) == EM3_END_BYTE){ + // buffer may contain complete frame + next start byte + // bp points to next start byte + ctx->petx = ctx->bp - 3; + ctx->fill_stx = true; + state = ST_VALIDATE; + } else { + ctx->bp++; + ctx->dgram_bytes++; + ctx->fill_stx = false; + } + if((ctx->bp - frame_buf) >= len){ + fprintf(stderr,"%s: ERR - buffer length exceeded (Frame End))\n", __func__); + ctx->fill_stx = false; + if(r_err != NULL) + *r_err = EM710_EOFLOW; + state = ST_ERROR; + } + } else { + if(r_err != NULL) + *r_err = EM710_EREAD; + state = ST_ERROR; + } + } + + if(state == ST_VALIDATE){ + + ctx->header->numBytesDgm = (ctx->petx - frame_buf)-1; + + int verr = 0; + + ctx->frame_count[0]++; + + if(mbtrnpp_em710raw_validate_frame(frame_buf, ctx->dgram_bytes, &verr, verbose)){ + + ctx->frame_count[1]++; + + MX_BPRINT( (verbose < -2), "frame stream_bytes[%010llX] len[%4zd/%04X] N/valid/invalid[%6lld %6lld %6lld] \n", ctx->stream_ofs, ctx->dgram_bytes, ctx->dgram_bytes, ctx->frame_count[0], ctx->frame_count[1], ctx->frame_count[2]); + + size_t send_len = ctx->header->numBytesDgm; + + if(verbose >= 3){ + for (int i = 0; i < ctx->header->numBytesDgm + 4; i++) + { + if(i%16 == 0) + fprintf(stderr, "\n%08X: ",i); + fprintf(stderr, "%02x ", frame_buf[i]); + } + fprintf(stderr, "\n"); + } + + if(r_err != NULL) + *r_err = EM710_EOK; + + return send_len; + } else { + ctx->frame_count[2]++; + if(verr == EM710_ECHK || verr == EM710_ETYPE){ + // checksum or type error, restart + // (should already be handled in state machine) + ctx->fill_stx = false; + state = ST_START; + } else { + // frame invalid, keep going + ctx->bp++; + ctx->fill_stx = false; + state = ST_FRAME_END; + } + MX_BPRINT( (verbose < -2), "frame stream_bytes[%010llX] len[%4zd/%04X] N/valid/invalid[%6lld %6lld %6lld] err[%d/%s]\n", ctx->stream_ofs, ctx->dgram_bytes, ctx->dgram_bytes, ctx->frame_count[0], ctx->frame_count[1], ctx->frame_count[2], verr, em_frame_err_str[verr%EM710_ECOUNT]); + + } + } + } + return -1; +} + +/*--------------------------------------------------------------------*/ + +int mbtrnpp_em710raw_input_read_ser(int verbose, void *mbio_ptr, size_t *size, + char *buffer, int *error) +{ + // local variables + int status = MB_SUCCESS; + struct mb_io_struct *mb_io_ptr = (struct mb_io_struct *)mbio_ptr; + + // print input debug statements + if (verbose >= 2) { + fprintf(stderr, "\ndbg2 MBIO function <%s> called\n", __func__); + fprintf(stderr, "dbg2 Input arguments:\n"); + fprintf(stderr, "dbg2 verbose: %d\n", verbose); + fprintf(stderr, "dbg2 mbio_ptr: %p\n", mbio_ptr); + fprintf(stderr, "dbg2 size: %zu\n", *size); + fprintf(stderr, "dbg2 buffer: %p\n", buffer); + } + + // Read the requested number of bytes (= size) off the input and + // place those bytes into the buffer. + // This requires reading full MB1 records from the socket, + // storing the data in buffer (implemented here), and parceling + // those bytes out as requested. + + // use the socket reader + // read and return single frame + int64_t rbytes=-1; + uint32_t sync_bytes=0; + static uint64_t frame_count = 0; + static uint64_t frame_invalid = 0; + static uint64_t frame_read_err = 0; + + // mbsp points to serial port file descriptor + int *mbsp = (int *)mb_io_ptr->mbsp; + + // frame buffer for byte-wise reads + static byte *frame_buf = NULL; + static size_t frame_len = 0; + static struct mbsys_simrad3_header *fb_phdr = NULL; + static byte *fb_pread=NULL; + static size_t bytes_read=0; + static bool read_frame=true; + bool read_err = false; + + static em_ser_ctx_t *ctx = NULL; + + if(NULL == frame_buf) + { + frame_buf = (byte *)malloc(MB_UDP_SIZE_MAX); + memset(frame_buf, 0, MB_UDP_SIZE_MAX); + fb_pread = frame_buf; + fb_phdr = (struct mbsys_simrad3_header *)frame_buf; + bytes_read = 0; + } + + // if valid reader... + if(NULL != mbsp && NULL != frame_buf) + { + if(ctx == NULL){ + // This serial context persists across calls, + // buffering serial data and maintaining the serial IO state. + // the ser_buf_read method handles reading and returning data, + // and refilling the buffer when it is empty. + ctx = em710_ser_ctx_new(*mbsp, 4096); + } + + if(read_frame) + { + + // read frame into buffer + memset(frame_buf, 0, MB_UDP_SIZE_MAX); + fb_pread = frame_buf; + + // read UDP datagram from the socket + // returns number of bytes read or -1 error + + // UDP datagrams don't include 4-byte size field (numBytesDgm), + // but valid .ALL datagrams must include it. + // We'll calculate it and include it at the start of the buffer. + + // The datagram size field (numBytesDgm) reflects the number of bytes + // from STX to the end of the footer (inclusive). + // This enables it to be read, then + // used to read the remainder of the datagram, *including* the footer. + + int frame_err = 0; + if ( (rbytes = mbtrnpp_em710raw_read_frame(ctx, (void *) (frame_buf), MB_UDP_SIZE_MAX, &frame_err, verbose) ) >= 0) + { + struct mbsys_simrad3_header *header = (struct mbsys_simrad3_header *)frame_buf; + + // set datagram size (first 4 bytes of buffer) + unsigned int *pDgmSize = (unsigned int *)frame_buf; + *pDgmSize = rbytes; + + int errors = 0; + + byte *petx = frame_buf + (header->numBytesDgm + 1); + uint16_t *pchk = (uint16_t *)(petx + 1); + + frame_len = header->numBytesDgm + 4; + frame_count++; + // write frame to log (debug only) +#ifdef WITH_EM710_ALL_LOG + // write .ALL frame + fwrite(frame_buf, frame_len, 1, em_all_log); + fflush(em_all_log); +#endif +#ifdef WITH_EM710_UDP_LOG + // write UDP frame + fwrite(frame_buf+4, frame_len-4, 1, em_udp_log); + fflush(em_udp_log); +#endif + + if(verbose >= 5 || verbose <= -5 ) + em710_frame_show(frame_buf, verbose); + + if(errors == 0) + { + // update frame read pointers + fb_pread = frame_buf; + read_frame = false; + read_err = false; +// MX_LPRINT(MBTRNPP, 3, "read frame len[%zu]\n", frame_len); + } else { + // frame invalid + frame_invalid++; + read_err = true; + MX_LPRINT(MBTRNPP, 3, "invalid frame len[%llu] chksum[%hu/%04X]\n", frame_len, *pchk, *pchk); + } + } else { + // read error + frame_read_err++; + read_err = true; + //if(frame_read_err%100 == 0) + //MX_LPRINT(MBTRNPP, 3, "em710_read_frame failed rbytes[%lld] errors(%d)\n", rbytes, frame_read_err); + } + +// MX_BPRINT(((!read_err || (frame_read_err % 100) == 0) && abs(verbose) >= 3), "%s - read frame fd %3d len[%6llu] n[%8llu] invalid[%8llu] read_err[%8llu/%s] \n", __func__, *mbsp, frame_len, frame_count, frame_invalid, frame_read_err, em_frame_err_str[frame_err%EM710_ECOUNT]); + + } else { + // there's a frame in the buffer + size_t bytes_rem = frame_buf + frame_len - fb_pread; + size_t readlen = (*size <= bytes_rem ? *size : bytes_rem); + + MX_LPRINT(MBTRNPP, 4, "reading framebuf size[%2zu] frame_len[%6zu] rem[%6zu] err[%c]\n", (size_t)*size, frame_len, bytes_rem, (read_err?'Y':'N')); + } + + if(!read_err){ + int64_t bytes_rem = frame_buf + frame_len - fb_pread; + size_t readlen = (*size <= bytes_rem ? *size : bytes_rem); + if(readlen > 0){ + memcpy(buffer, fb_pread, readlen); + *size = (size_t)readlen; + *error = MB_ERROR_NO_ERROR; + // update frame cursor + fb_pread += readlen; + bytes_rem -= readlen; + if(bytes_rem <= 0) + { + MX_LPRINT(MBTRNPP, 4, "* buffer empty rem[%"PRId64"]\n", bytes_rem); + // if nothing left, read a frame next time + read_frame = true; + } + } else { + // buffer empty + status = MB_FAILURE; + *error = MB_ERROR_EOF; + *size = (size_t)0; + read_frame = true; + MX_LPRINT(MBTRNPP, 4, "buffer empty readlen[%zu] rem[%"PRId64"]\n", readlen, bytes_rem); + } + } + } else { + fprintf(stderr, "%s : ERR - frame buffer or socket is NULL\n", __func__); + } + + if(read_err) + { + status = MB_FAILURE; + *error = MB_ERROR_EOF; + *size = (size_t)0; + + MST_METRIC_START(app_stats->stats->metrics[MBTPP_CH_MB_GETFAIL_XT], mtime_dtime()); + MX_LPRINT(MBTRNPP, 4, "read em710raw UDP socket failed: sync_bytes[%d] status[%d] err[%d]\n",sync_bytes,status, *error); + + MST_COUNTER_INC(app_stats->stats->events[MBTPP_EV_EMBFRAMERD]); + MST_COUNTER_ADD(app_stats->stats->status[MBTPP_STA_MB_SYNC_BYTES],sync_bytes); + + // TODO: check connection status, only reconnect if disconnected... + + MST_METRIC_LAP(app_stats->stats->metrics[MBTPP_CH_MB_GETFAIL_XT], mtime_dtime()); + } + + // print output debug statements + if (verbose >= 2) { + fprintf(stderr, "\ndbg2 MBIO function <%s> completed\n", __func__); + fprintf(stderr, "dbg2 Return values:\n"); + fprintf(stderr, "dbg2 size: %zu\n", *size); + fprintf(stderr, "dbg2 buffer: %p\n", buffer); + fprintf(stderr, "dbg2 error: %d\n", *error); + fprintf(stderr, "dbg2 Return status:\n"); + fprintf(stderr, "dbg2 status: %d\n", status); + } + + return (status); +} + +/*--------------------------------------------------------------------*/ + +int mbtrnpp_em710raw_input_close_ser(int verbose, void *mbio_ptr, int *error) { + + /* local variables */ + int status = mbtrnpp_em710raw_input_close_ser(verbose, mbio_ptr, error); + struct mb_io_struct *mb_io_ptr; + + /* print input debug statements */ + if (verbose >= 2) { + fprintf(stderr, "\ndbg2 MBIO function <%s> called\n", __func__); + fprintf(stderr, "dbg2 Input arguments:\n"); + fprintf(stderr, "dbg2 verbose: %d\n", verbose); + fprintf(stderr, "dbg2 mbio_ptr: %p\n", mbio_ptr); + } + + /* print output debug statements */ + if (verbose >= 2) { + fprintf(stderr, "\ndbg2 MBIO function <%s> completed\n", __func__); + fprintf(stderr, "dbg2 Return values:\n"); + fprintf(stderr, "dbg2 error: %d\n", *error); + fprintf(stderr, "dbg2 Return status:\n"); + fprintf(stderr, "dbg2 status: %d\n", status); + } + + /* return */ + return (status); +} + #ifdef WITH_MB1_READER /*--------------------------------------------------------------------*/ int mbtrnpp_mb1r_input_open(int verbose, void *mbio_ptr, char *definition, int *error)