Skip to content

Commit

Permalink
Add %F output format option to print timestamps in a human-readable f…
Browse files Browse the repository at this point in the history
…ormat
  • Loading branch information
Alexey Sveshnikov committed Nov 1, 2021
1 parent 54be60e commit 863e023
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 0 deletions.
47 changes: 47 additions & 0 deletions format.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
* POSSIBILITY OF SUCH DAMAGE.
*/

#include <stdio.h>
#include <time.h>

#include "kcat.h"
#include "rdendian.h"

Expand Down Expand Up @@ -136,6 +139,10 @@ void fmt_parse (const char *fmt) {
fmt_add(KC_FMT_TIMESTAMP, NULL, 0);
conf.flags |= CONF_F_APIVERREQ;
break;
case 'F':
fmt_add(KC_FMT_FORMATTED_TIMESTAMP, NULL, 0);
conf.flags |= CONF_F_APIVERREQ;
break;
#if HAVE_HEADERS
case 'h':
fmt_add(KC_FMT_HEADERS, NULL, 0);
Expand Down Expand Up @@ -370,6 +377,28 @@ static int unpack (FILE *fp, const char *what, const char *fmt,
}


/**
* Convert timestamp into a human-readable format.
*/
void format_timestamp(char *dst, size_t maxsize, int64_t unixtime_ms) {
time_t unixtime_secs = unixtime_ms / 1000;

struct tm *ts = localtime(&unixtime_secs);

size_t pos = strftime(dst, maxsize,
"%Y-%m-%dT%H:%M:%S.", ts);

// add milliseconds
pos += snprintf(
&dst[pos],
maxsize-pos,
"%" PRId64,
unixtime_ms % 1000);

// add timezone
strftime(&dst[pos], maxsize-pos, "%z", ts);
}



/**
Expand Down Expand Up @@ -517,6 +546,24 @@ static void fmt_msg_output_str (FILE *fp,
&tstype));
#else
r = fprintf(fp, "-1");
#endif
break;
}

case KC_FMT_FORMATTED_TIMESTAMP:
{
#if RD_KAFKA_VERSION >= 0x000902ff
rd_kafka_timestamp_type_t tstype;

char buf[80];
format_timestamp(buf,
sizeof(buf),
rd_kafka_message_timestamp(rkmessage,
&tstype));

r = fprintf(fp, "%s", buf);
#else
r = fprintf(fp, "-1");
#endif
break;
}
Expand Down
1 change: 1 addition & 0 deletions kcat.c
Original file line number Diff line number Diff line change
Expand Up @@ -1434,6 +1434,7 @@ static void RD_NORETURN usage (const char *argv0, int exitcode,
" %%K Message key length (or -1 for NULL)\n"
#if RD_KAFKA_VERSION >= 0x000902ff
" %%T Message timestamp (milliseconds since epoch UTC)\n"
" %%F Message timestamp (ISO8601 formatted)\n"
#endif
#if HAVE_HEADERS
" %%h Message headers (n=v CSV)\n"
Expand Down
1 change: 1 addition & 0 deletions kcat.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ typedef enum {
KC_FMT_TOPIC,
KC_FMT_PARTITION,
KC_FMT_TIMESTAMP,
KC_FMT_FORMATTED_TIMESTAMP,
KC_FMT_HEADERS
} fmt_type_t;

Expand Down

0 comments on commit 863e023

Please sign in to comment.