Skip to content

Commit

Permalink
Added -O option: print offsets
Browse files Browse the repository at this point in the history
  • Loading branch information
edenhill committed Dec 3, 2014
1 parent 83205d9 commit 5777390
Showing 1 changed file with 13 additions and 2 deletions.
15 changes: 13 additions & 2 deletions kafkacat.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ static struct conf {
char mode;
int flags;
#define CONF_F_KEY_DELIM 0x2
#define CONF_F_OFFSET 0x4 /* Print offsets */
int delim;
int key_delim;
int msg_size;
Expand All @@ -76,6 +77,7 @@ static struct conf {
.partition = RD_KAFKA_PARTITION_UA,
.msg_size = 1024*1024,
.delim = '\n',
.key_delim = '\t',
};


Expand Down Expand Up @@ -195,7 +197,8 @@ static ssize_t produce_file (const char *path) {
return -1;
}

INFO(4, "Producing file %s (%"PRIdMAX" bytes)\n", path, (intmax_t)st.st_size);
INFO(4, "Producing file %s (%"PRIdMAX" bytes)\n",
path, (intmax_t)st.st_size);
produce(ptr, st.st_size, NULL, 0, RD_KAFKA_MSG_F_COPY);

munmap(ptr, st.st_size);
Expand Down Expand Up @@ -363,6 +366,10 @@ static void consume_cb (rd_kafka_message_t *rkmessage, void *opaque) {
rd_kafka_message_errstr(rkmessage));
}

/* Print offset (using key delim), if desired */
if (conf.flags & CONF_F_OFFSET)
fprintf(fp, "%"PRId64"%c", rkmessage->offset, conf.key_delim);

/* Print key, if desired */
if (conf.flags & CONF_F_KEY_DELIM)
fprintf(fp, "%.*s%c",
Expand Down Expand Up @@ -669,6 +676,7 @@ static void __attribute__((noreturn)) usage (const char *argv0, int exitcode,
" -D <delim> Delimiter to separate messages on output\n"
" -K <delim> Print message keys prefixing the message\n"
" with specified delimiter.\n"
" -O Print message offset using -K delimiter\n"
" -c <cnt> Exit after consuming this number "
"of messages\n"
" -u Unbuffered output\n"
Expand Down Expand Up @@ -727,7 +735,7 @@ static void argparse (int argc, char **argv) {
int opt;

while ((opt = getopt(argc, argv,
"PCLt:p:b:z:o:eD:K:d:qvX:c:u")) != -1) {
"PCLt:p:b:z:o:eD:K:Od:qvX:c:u")) != -1) {
switch (opt) {
case 'P':
case 'C':
Expand Down Expand Up @@ -773,6 +781,9 @@ static void argparse (int argc, char **argv) {
conf.key_delim = parse_delim(optarg);
conf.flags |= CONF_F_KEY_DELIM;
break;
case 'O':
conf.flags |= CONF_F_OFFSET;
break;
case 'c':
conf.msg_cnt = strtoll(optarg, NULL, 10);
break;
Expand Down

0 comments on commit 5777390

Please sign in to comment.