Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for minute interval #61

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
10 changes: 7 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@ Each time the conntrack entries are polled, their counters are reset (zero-on-re
<dd>Accounting period interval. May be either in the format YYYY-MM-DD/NN,
to start a new accounting period exactly every NN days, beginning at
the given date, or a number specifiying the day of month at which to
start the next accounting period. For example:</dd>
start the next accounting period, or in the format MMm for MM minutes. For example:</dd>
</dl>

```
2017-01-17/14 # every 14 days, starting Jan 17, 2017
-2 # second to the last day of the month, e.g. 30th in March
1 # first day of the month (default)
60m # every hour
```

<dl>
Expand Down Expand Up @@ -78,7 +79,7 @@ storage requirements.</dd>
<dt>-c command</dt>
<dd>Specify a command. Current commands are: show, json, csv, list, commit. See below for more information about commands.</dd>

<dt>-p /path/to/procol-database</dt>
<dt>-p /path/to/protocol-database</dt>
<dd>Protocol description file, used to distinguish traffic streams by IP protocol number and port.</dd>

<dt>-g col[,col]</dt>
Expand All @@ -87,7 +88,7 @@ storage requirements.</dd>
<dt>-o col[,col]</dt>
<dd>Order output by the specified column. Prefix column with a - to invert order.</dd>

<dt>-t YYYY-MM-DD</dt>
<dt>-t YYYY-MM-DD|timestamp</dt>
<dd>Read data from the specified database, instead of the active database. Use the list command to view available databases.</dd>

<dt>-n</dt>
Expand All @@ -101,6 +102,9 @@ storage requirements.</dd>

<dt>-e char</dt>
<dd>Specify the escape character when using CSV format. If no argument is provided, an empty string is assumed. Currently only applies to CSV format.</dd>

<dt>-G count</dt>
<dd>Number of database generations to retrieve. Data from databases are summed. Note: connection is counted in each database, so connection spanning two intervals is counted twice.</dd>
</dl>


Expand Down
105 changes: 67 additions & 38 deletions client.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ static struct field fields[MAX] = {

static struct {
int timestamp;
unsigned int generations;
bool plain_numbers;
int8_t group_by[1 + MAX];
int8_t order_by[1 + MAX];
Expand All @@ -102,6 +103,7 @@ static struct {
.separator = '\t',
.escape = '"',
.quote = '"',
.generations = 1
};

struct command {
Expand Down Expand Up @@ -217,49 +219,52 @@ recv_database(struct dbhandle **h)
int i, len, err, ctrl_socket;
struct database db;
struct record rec;
char req[sizeof("dump YYYYMMDD\0")];
char req[100];

ctrl_socket = usock(USOCK_UNIX, opt.socket, NULL);
*h = database_mem(cmp_fn, client_opt.group_by);

if (!ctrl_socket)
return -errno;
if (!*h) {
return -ENOMEM;
}

len = snprintf(req, sizeof(req), "dump %d", client_opt.timestamp);
for (int g = 0; g < client_opt.generations; g++) {

if (send(ctrl_socket, req, len, 0) != len) {
close(ctrl_socket);
return -errno;
}
ctrl_socket = usock(USOCK_UNIX, opt.socket, NULL);

if (recv(ctrl_socket, &db, sizeof(db), 0) != sizeof(db)) {
close(ctrl_socket);
return -ENODATA;
}
if (!ctrl_socket)
return -errno;

*h = database_mem(cmp_fn, client_opt.group_by);
len = snprintf(req, sizeof(req), "dump %d-%d", client_opt.timestamp, g);

if (!*h) {
close(ctrl_socket);
return -ENOMEM;
}
if (send(ctrl_socket, req, len, 0) != len) {
close(ctrl_socket);
return -errno;
}

for (i = 0; i < db_entries(&db); i++) {
if (recv(ctrl_socket, &rec, db_recsize, 0) != db_recsize) {
if (recv(ctrl_socket, &db, sizeof(db), 0) != sizeof(db)) {
close(ctrl_socket);
return -ENODATA;
}

err = database_insert(*h, &rec);
for (i = 0; i < db_entries(&db); i++) {
if (recv(ctrl_socket, &rec, db_recsize, 0) != db_recsize) {
close(ctrl_socket);
return -ENODATA;
}

if (err != 0) {
close(ctrl_socket);
return err;
err = database_insert(*h, &rec);

if (err != 0) {
close(ctrl_socket);
return err;
}
}

close(ctrl_socket);
}

database_reorder(*h, sort_fn, client_opt.order_by);

close(ctrl_socket);
return 0;
}

Expand Down Expand Up @@ -297,14 +302,14 @@ handle_show(void)
printf("%c Fam ", columns[FAMILY]);

if (columns[HOST]) {
printf(" %c Host ( MAC ) ", columns[HOST]);
printf(" %c Host ( MAC ) ", columns[HOST]);
}
else {
if (columns[MAC])
printf(" %c MAC ", columns[MAC]);

if (columns[IP])
printf(" %c IP ", columns[IP]);
printf(" %c IP ", columns[IP]);
}

if (columns[LAYER7]) {
Expand All @@ -328,7 +333,7 @@ handle_show(void)
printf("IPv%d ", rec->family == AF_INET ? 4 : 6);

if (columns[HOST]) {
printf("%15s (%02x:%02x:%02x) ",
printf("%39s (%02x:%02x:%02x) ",
format_ipaddr(rec->family, &rec->src_addr),
rec->src_mac.ea.ether_addr_octet[3],
rec->src_mac.ea.ether_addr_octet[4],
Expand All @@ -339,7 +344,7 @@ handle_show(void)
printf("%17s ", format_macaddr(&rec->src_mac.ea));

if (columns[IP])
printf("%15s ", format_ipaddr(rec->family, &rec->src_addr));
printf("%39s ", format_ipaddr(rec->family, &rec->src_addr));
}

if (columns[LAYER7]) {
Expand Down Expand Up @@ -616,15 +621,30 @@ handle_list(void)
return -errno;
}

struct interval interval;
if (recv(ctrl_socket, &interval, sizeof(interval), 0) <= 0) {
close(ctrl_socket);
return 0;
}

while (true) {
if (recv(ctrl_socket, &client_opt.timestamp,
sizeof(client_opt.timestamp), 0) <= 0)
break;

printf("%04d-%02d-%02d\n",
client_opt.timestamp / 10000,
client_opt.timestamp % 10000 / 100,
client_opt.timestamp % 100);
if (interval.type == MINUTE) {
time_t t = client_opt.timestamp * 60;
struct tm *timeinfo = localtime (&t);
char timestr[50];
strftime(timestr, sizeof(timestr), "%F %T %Z", timeinfo);
printf ("%d (%s)\n", client_opt.timestamp, timestr);
}
else {
printf("%04d-%02d-%02d\n",
client_opt.timestamp / 10000,
client_opt.timestamp % 10000 / 100,
client_opt.timestamp % 100);
}
}

close(ctrl_socket);
Expand Down Expand Up @@ -676,7 +696,7 @@ client_main(int argc, char **argv)
unsigned int year, month, day;
char c, *p;

while ((optchr = getopt(argc, argv, "c:p:S:g:o:t:s::q::e::n")) > -1) {
while ((optchr = getopt(argc, argv, "c:p:S:g:o:t:s::q::e::nG:")) > -1) {
switch (optchr) {
case 'S':
opt.socket = optarg;
Expand Down Expand Up @@ -748,12 +768,21 @@ client_main(int argc, char **argv)
break;

case 't':
if (sscanf(optarg, "%4u-%2u-%2u", &year, &month, &day) != 3) {
fprintf(stderr, "Unrecognized date '%s'\n", optarg);
return 1;
if (sscanf(optarg, "%4u-%2u-%2u", &year, &month, &day) == 3) {
client_opt.timestamp = year * 10000 + month * 100 + day;
break;
}
else if (sscanf(optarg, "%u", &client_opt.timestamp) == 1) {
break;
}
fprintf(stderr, "Unrecognized date '%s'\n", optarg);
return 1;

client_opt.timestamp = year * 10000 + month * 100 + day;
case 'G':
if (sscanf(optarg, "%2u", &client_opt.generations) != 1) {
fprintf(stderr, "Unrecognized generations '%s'\n", optarg);
return 1;
}
break;

case 'n':
Expand Down
9 changes: 7 additions & 2 deletions database.c
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,7 @@ database_restore_gzip(struct dbhandle *h, const char *path, uint32_t timestamp)

if (h) {
h->pristine = false;
h->db->timestamp = htobe32(timestamp);

for (i = 0; i < entries; i++) {
if (gzread(gz, &rec, db_recsize) != db_recsize) {
Expand Down Expand Up @@ -563,7 +564,7 @@ database_load(struct dbhandle *h, const char *path, uint32_t timestamp)
int
database_cleanup(void)
{
uint32_t timestamp, num;
uint32_t timestamp, num, safe_low;
struct dirent *entry;
char *e, path[256];
DIR *d;
Expand All @@ -579,6 +580,8 @@ database_cleanup(void)
errno = 0;
timestamp = interval_timestamp(&opt.archive_interval, -opt.db.generations);

safe_low = opt.archive_interval.type == MINUTE ? 1704067200 / 60 : 20000101;

while ((entry = readdir(d)) != NULL) {
if (entry->d_type != DT_REG)
continue;
Expand All @@ -591,7 +594,7 @@ database_cleanup(void)
if (strcmp(e, ".db") != 0 && strcmp(e, ".db.gz") != 0)
continue;

if (num < 20000101 || num > timestamp)
if (num < safe_low || num > timestamp)
continue;

snprintf(path, sizeof(path), "%s/%u%s", opt.db.directory, num, e);
Expand All @@ -618,6 +621,8 @@ database_archive(struct dbhandle *h)
if (err)
return err;

database_cleanup();

/* lazily reset database, don't (re)alloc */
h->off = 0;
h->db->entries = 0;
Expand Down
5 changes: 3 additions & 2 deletions nlbwmon.c
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,11 @@ static void save_persistent(uint32_t timestamp)
fprintf(stderr, "Unable to load existing database: %s\n",
strerror(-err));
}
else {
err = database_save(gdbh, opt.db.directory, timestamp, opt.db.compress);
}
}

err = database_save(gdbh, opt.db.directory, timestamp, opt.db.compress);

if (err) {
fprintf(stderr, "Unable to save database: %s\n",
strerror(-err));
Expand Down
35 changes: 28 additions & 7 deletions socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,20 +68,35 @@ handle_dump(int sock, const char *arg)
{
struct dbhandle *h;
struct record *rec = NULL;
int err = 0, timestamp = 0;
char *e;
int err = 0, timestamp = 0, g = 0;

if (arg) {
timestamp = strtoul(arg, &e, 10);

if (arg == e || *e)
if (sscanf(arg, "%d-%d", &timestamp, &g) != 2)
return -EINVAL;
}

if (timestamp == 0) {
if (timestamp == 0 && g == 0) {
h = gdbh;
}
else {
if (timestamp == 0) {
timestamp = interval_timestamp(&opt.archive_interval, -g);
}
else {
int delta = 0;
while (true) {
if (timestamp == interval_timestamp(&opt.archive_interval, delta)) {
break;
}
delta--;
if (opt.db.generations && delta > -opt.db.generations)
continue;

return -EINVAL;
}
timestamp = interval_timestamp(&opt.archive_interval, delta - g);
}

h = database_init(&opt.archive_interval, false, 0);

if (!h) {
Expand Down Expand Up @@ -120,6 +135,9 @@ handle_list(int sock, const char *arg)
int delta = 0;
uint32_t timestamp;

if (send(sock, &opt.archive_interval, sizeof(opt.archive_interval), 0) != sizeof(opt.archive_interval))
return -errno;

while (true) {
timestamp = interval_timestamp(&opt.archive_interval, delta--);
err = database_load(NULL, opt.db.directory, timestamp);
Expand All @@ -129,10 +147,13 @@ handle_list(int sock, const char *arg)
fprintf(stderr, "Corrupted database detected: %d (%s)\n",
timestamp, strerror(-err));

if (opt.db.generations && delta > -opt.db.generations)
continue;

break;
}

if (send(sock, &timestamp, sizeof(timestamp), 0) != sizeof(timestamp))
if (send_data(sock, &timestamp, sizeof(timestamp)) != sizeof(timestamp))
return -errno;
}

Expand Down
Loading