Skip to content

Commit

Permalink
Various changes for output rotation
Browse files Browse the repository at this point in the history
Some various cleanups and refactoring of Sami's commits, in particular:

- Require explicitly requesting strftime() filtering of the -w argument,
  via either --localtime or --gmtime. This allows a choice of local
  timezone or UTC timestamps.

- Factor out the strftime() rewriting code from open_write_file() into
  a new function update_output_fname().

- Factor out the file rotation code from cb_read() into a new function
  maybe_rotate_output().

- Remove the arbitrary 1024-byte limit on filenames. (Though, add a
  256-byte arbitrary limit on the expansion of the strftime() format
  specifier, since there doesn't appear to be a way to obtain that size
  in advance.)

- Remove the filename suffix appending feature. I'm not sure this is
  entirely necessary. (But if it is it should probably be done via a
  separate patch.)
  • Loading branch information
Robert Edmonds committed Apr 27, 2016
1 parent b6e31c5 commit dc359f8
Showing 1 changed file with 117 additions and 90 deletions.
207 changes: 117 additions & 90 deletions src/fstrm_capture.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <time.h>
#include <unistd.h>

#include <event2/buffer.h>
#include <event2/bufferevent.h>
Expand All @@ -40,7 +40,6 @@
#include "libmy/argv.h"
#include "libmy/my_alloc.h"
#include "libmy/print_string.h"
#include "libmy/ubuf.h"

#if HAVE_DECL_FFLUSH_UNLOCKED
# define fflush fflush_unlocked
Expand All @@ -55,7 +54,6 @@
#endif

#define CAPTURE_HIGH_WATERMARK 262144
#define FNAME_MAXLEN 1024

struct capture;
struct capture_args;
Expand Down Expand Up @@ -101,21 +99,24 @@ struct capture {
struct event *ev_sighup;

FILE *output_file;
char *output_filename;
time_t output_open_ts;
char *output_fname;
time_t output_open_timestamp;

size_t bytes_written;
size_t count_written;

struct tm *(*calendar_fn)(const time_t *, struct tm *);
};

struct capture_args {
bool help;
int debug;
bool localtime;
bool gmtime;
char *str_content_type;
char *str_read_unix;
char *str_write_fname;
int split_s;
char *str_suffix;
int split_seconds;
};

static struct capture g_program_ctx;
Expand All @@ -128,41 +129,47 @@ static argv_t g_args[] = {
NULL,
"display this help text and exit" },

{ 'd', "debug",
{ 'd', "debug",
ARGV_INCR,
&g_program_args.debug,
NULL,
"increment debugging level" },

{ 't', "type",
{ 't', "type",
ARGV_CHAR_P,
&g_program_args.str_content_type,
"<STRING>",
"Frame Streams content type" },

{ 'u', "unix",
{ 'u', "unix",
ARGV_CHAR_P,
&g_program_args.str_read_unix,
"<FILENAME>",
"Unix socket path to read from" },

{ 'w', "write",
{ 'w', "write",
ARGV_CHAR_P,
&g_program_args.str_write_fname,
"<FILENAME>",
"filename to write, as a strftime(3) string" },

{ 'x', "suffix",
ARGV_CHAR_P,
&g_program_args.str_suffix,
"<STRING>",
"output filename suffix to add after closing" },
"file path to write Frame Streams data to" },

{ 's', "split",
{ 's', "split",
ARGV_INT,
&g_program_args.split_s,
&g_program_args.split_seconds,
"<SECONDS>",
"Amount of seconds to capture into each file" },
"seconds before rotating output file" },

{ '\0', "localtime",
ARGV_BOOL,
&g_program_args.localtime,
NULL,
"filter -w path with strftime (local time)" },

{ '\0', "gmtime",
ARGV_BOOL,
&g_program_args.gmtime,
NULL,
"filter -w path with strftime (UTC)" },

{ ARGV_LAST },
};
Expand Down Expand Up @@ -256,14 +263,10 @@ usage(const char *msg)
}

static bool
parse_args(const int argc, char **argv)
parse_args(const int argc, char **argv, struct capture *ctx)
{
argv_version_string = PACKAGE_VERSION;

/* These are non-mandatory options */
g_program_args.split_s = 0;
g_program_args.str_suffix = NULL;

if (argv_process(g_args, argc, argv) != 0)
return false;

Expand All @@ -275,16 +278,24 @@ parse_args(const int argc, char **argv)
if (g_program_args.str_read_unix == NULL)
usage("Unix socket path to read from (--unix) is not set");
if (g_program_args.str_write_fname == NULL)
usage("file path to write Frame Streams data to (--write) is not set");
usage("File path to write Frame Streams data to (--write) is not set");
if (strcmp(g_program_args.str_write_fname, "-") == 0) {
int fd = fileno(stdout);
if (isatty(fd) == 1)
usage("refusing binary output to terminal (stdout)");
if (g_program_args.split_s != 0)
usage("cannot use output splitting with stdout");
if (g_program_args.str_suffix != NULL)
usage("cannot use output suffix renaming with stdout");
if (isatty(STDOUT_FILENO) == 1)
usage("Refusing to write binary output to a terminal");
if (g_program_args.split_seconds != 0)
usage("Cannot use output splitting when writing to stdout");
}
if (g_program_args.localtime && g_program_args.gmtime)
usage("--localtime and --gmtime are mutually exclusive");
if (g_program_args.split_seconds && !g_program_args.localtime && !g_program_args.gmtime)
usage("--split requires either --localtime or --gmtime");

/* Set calendar function, if needed. */
if (g_program_args.localtime)
ctx->calendar_fn = localtime_r;
else if (g_program_args.gmtime)
ctx->calendar_fn = gmtime_r;

return true;
}

Expand Down Expand Up @@ -401,43 +412,66 @@ open_write_start(struct capture *ctx)
return false;
}

static const char *
update_output_fname(struct capture *ctx)
{
time_t time_now = {0};
struct tm tm_now = {0};

/* Get current broken-down time representation. */
tzset();
time_now = time(NULL);
ctx->calendar_fn(&time_now, &tm_now);

/* Save current time. */
ctx->output_open_timestamp = time_now;

/*
* Filter ctx->args->str_write_fname with strftime(), store output in
* ctx->output_fname. Assume strftime() lengthens the string by no more
* than 256 bytes.
*/
if (ctx->output_fname != NULL)
my_free(ctx->output_fname);
const size_t len_output_fname = strlen(ctx->args->str_write_fname) + 256;
ctx->output_fname = my_calloc(1, len_output_fname);

if (strftime(ctx->output_fname, len_output_fname,
ctx->args->str_write_fname, &tm_now) <= 0)
{
my_free(ctx->output_fname);
fprintf(stderr, "%s: strftime() failed on format string \"%s\"\n",
argv_program, ctx->args->str_write_fname);
return NULL;
}

return ctx->output_fname;
}

static bool
open_write_file(struct capture *ctx)
{
char *fstr = ctx->args->str_write_fname;
char *fname, *fn_buf;
time_t t;
struct tm the_time;
size_t fn_len;
int fd;
mode_t open_mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH;
int open_flags = O_CREAT | O_WRONLY | O_TRUNC;
const char *fname = ctx->args->str_write_fname;

if (strcmp(fname, "-") == 0) {
/* Use already opened FILE* for stdout. */
ctx->output_file = stdout;
} else {
mode_t open_mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH;
int open_flags = O_CREAT | O_WRONLY | O_TRUNC;
#if defined(O_CLOEXEC)
open_flags |= O_CLOEXEC;
open_flags |= O_CLOEXEC;
#endif

if (strcmp(fstr, "-") == 0) {
fd = fileno(stdout);
ctx->output_file = stdout;
fname = fstr;
}
else {
/* Filename strftime expansion, useful when splitting output */
t = time(NULL);
localtime_r(&t, &the_time);
fn_buf = (char*)my_calloc(FNAME_MAXLEN, 1);
fn_len = strftime(fn_buf, FNAME_MAXLEN, fstr, &the_time);
if (!fn_len) {
fprintf(stderr, "%s: strftime failed on filename \"%s\"\n",
argv_program, fstr);
return false;
/* Rewrite the output filename if needed. */
if (ctx->calendar_fn) {
fname = update_output_fname(ctx);
if (fname == NULL)
return false;
}
fname = my_malloc(fn_len);
strcpy(fname, fn_buf);
my_free(fn_buf);

/* Open the file descriptor. */
fd = open(fname, open_flags, open_mode);
int fd = open(fname, open_flags, open_mode);
if (fd == -1) {
fprintf(stderr, "%s: failed to open output file %s\n",
argv_program, fname);
Expand All @@ -454,10 +488,9 @@ open_write_file(struct capture *ctx)
}
}

/* Reset output statistics. */
ctx->count_written = 0;
ctx->bytes_written = 0;
ctx->output_open_ts = t;
ctx->output_filename = fname;

/* Write the START frame. */
if (!open_write_start(ctx)) {
Expand All @@ -469,16 +502,13 @@ open_write_file(struct capture *ctx)
}

/* Success. */
fprintf(stderr, "%s: opened output file %s (fd=%d)\n", argv_program, fname, fd);
fprintf(stderr, "%s: opened output file %s\n", argv_program, fname);
return true;
}

static bool
close_write_file(struct capture *ctx)
{
char *fname = ctx->output_filename;
const char *suffix = ctx->args->str_suffix;

if (ctx->output_file != NULL) {
/* Write the STOP frame. */
if (!close_write_stop(ctx))
Expand All @@ -489,26 +519,10 @@ close_write_file(struct capture *ctx)
ctx->output_file = NULL;
}

if (fname != NULL) {
/* Rename newly closed file if suffix is desired */
if (suffix != NULL) {
ubuf *fn_buf = ubuf_dup_cstr(fname);
ubuf_add_cstr(fn_buf, suffix);
(void)rename(fname, (char*)fn_buf->_v);
ubuf_destroy(&fn_buf);
}
}

/* Success. */
fprintf(stderr, "%s: closed output file %s (suffix %s, wrote %zd frames, %zd bytes)\n",
argv_program, fname, suffix ? suffix : "(none)",
fprintf(stderr, "%s: closed output file %s (wrote %zd frames, %zd bytes)\n",
argv_program, ctx->output_fname ? : ctx->args->str_write_fname,
ctx->count_written, ctx->bytes_written);

if (fname != NULL) {
my_free(fname);
ctx->output_filename = NULL;
}

return true;
}

Expand Down Expand Up @@ -563,17 +577,26 @@ process_data_frame(struct conn *conn)
conn->bytes_read += bytes_read;
conn->ctx->count_written += 1;
conn->ctx->bytes_written += bytes_read;
}

static void
maybe_rotate_output(struct conn *conn)
{
/* Output file rotation requested? */
if (conn->ctx->args->split_s > 0) {
if (conn->ctx->args->split_seconds > 0) {
time_t t_now = time(NULL);

/* Is it time to rotate? */
if (t_now >= conn->ctx->output_open_ts + conn->ctx->args->split_s) {
/* Rotate output file, fail hard if unsuccessful */
if (t_now >= conn->ctx->output_open_timestamp + conn->ctx->args->split_seconds) {
/* Rotate output file, fail hard if unsuccessful. */
if (!close_write_file(conn->ctx)) {
fprintf(stderr, "%s: %s: close_write_file() failed\n",
argv_program, __func__);
exit(EXIT_FAILURE);
}
if (!open_write_file(conn->ctx)) {
fprintf(stderr, "%s: %s: open_write_file() failed\n",
argv_program, __func__);
exit(EXIT_FAILURE);
}
}
Expand Down Expand Up @@ -891,6 +914,9 @@ cb_read(struct bufferevent *bev, void *arg)
if (conn->len_frame_payload > 0) {
/* This is a data frame. */
process_data_frame(conn);

/* Check if it's time to rotate the output file. */
maybe_rotate_output(conn);
} else {
/* This is a control frame. */
if (!load_control_frame(conn)) {
Expand Down Expand Up @@ -1021,13 +1047,14 @@ cleanup(struct capture *ctx)
evconnlistener_free(ctx->ev_connlistener);
if (ctx->ev_base != NULL)
event_base_free(ctx->ev_base);
my_free(ctx->output_fname);
}

int
main(int argc, char **argv)
{
/* Parse arguments. */
if (!parse_args(argc, argv)) {
if (!parse_args(argc, argv, &g_program_ctx)) {
usage(NULL);
return EXIT_FAILURE;
}
Expand Down

0 comments on commit dc359f8

Please sign in to comment.