Skip to content

Commit

Permalink
adding support for multiple input json files
Browse files Browse the repository at this point in the history
  • Loading branch information
Marius authored and Marius committed Dec 3, 2014
1 parent f62be06 commit 1732321
Showing 1 changed file with 62 additions and 43 deletions.
105 changes: 62 additions & 43 deletions json2avro.c
Original file line number Diff line number Diff line change
Expand Up @@ -282,51 +282,65 @@ int schema_traverse(const avro_schema_t schema, json_t *json, json_t *dft,
return 0;
}

void process_file(FILE *input, avro_file_writer_t out, avro_schema_t schema,
void process_file(char *json_bucket, avro_file_writer_t out, avro_schema_t schema,
int verbose, int memstat, int errabort, int strjson, size_t max_str_sz) {

json_error_t err;
json_t *json;
int n = 0;
FILE *input;
char *json_file;

json = json_loadf(input, JSON_DISABLE_EOF_CHECK, &err);
while (!feof(input)) {
n++;
if (verbose && !(n % 1000))
printf("Processing record %d\n", n);
if (!json) {
if (errabort) {
fprintf(stderr, "JSON error on line %d, column %d, pos %d: %s, aborting.\n", n, err.column, err.position, err.text);
return;
}
fprintf(stderr, "JSON error on line %d, column %d, pos %d: %s, skipping to EOL\n", n, err.column, err.position, err.text);
while (getc(input) != '\n' && !feof(input)) {};
json_file = strtok(json_bucket, " ");

while ((json_file != NULL)) {
if (strcmp(json_file, "-") == 0) {
input = stdin;
json = json_loadf(input, JSON_DISABLE_EOF_CHECK, &err);
} else {
input = fopen(json_file, "r");
json = json_loadf(input, JSON_DISABLE_EOF_CHECK, &err);
continue;
}
while (!feof(input)) {
n++;
if (verbose && !(n % 1000))
printf("Processing record %d\n", n);
if (!json) {
if (errabort) {
fprintf(stderr, "JSON error on line %d, column %d, pos %d: %s, aborting.\n", n, err.column, err.position, err.text);
return;
}
fprintf(stderr, "JSON error on line %d, column %d, pos %d: %s, skipping to EOL\n", n, err.column, err.position, err.text);
while (getc(input) != '\n' && !feof(input)) {};
json = json_loadf(input, JSON_DISABLE_EOF_CHECK, &err);
continue;
}

avro_value_t record;
avro_value_iface_t *iface = avro_generic_class_from_schema(schema);
avro_generic_value_new(iface, &record);
avro_value_t record;
avro_value_iface_t *iface = avro_generic_class_from_schema(schema);
avro_generic_value_new(iface, &record);

if (!schema_traverse(schema, json, NULL, &record, 0, strjson, max_str_sz)) {
if (!schema_traverse(schema, json, NULL, &record, 0, strjson, max_str_sz)) {

if (avro_file_writer_append_value(out, &record)) {
fprintf(stderr, "ERROR: avro_file_writer_append_value() FAILED: %s\n", avro_strerror());
exit(EXIT_FAILURE);
}
if (avro_file_writer_append_value(out, &record)) {
fprintf(stderr, "ERROR: avro_file_writer_append_value() FAILED: %s\n", avro_strerror());
exit(EXIT_FAILURE);
}

} else
fprintf(stderr, "Error processing record %d, skipping...\n", n);
} else
fprintf(stderr, "Error processing record %d, skipping...\n", n);

avro_value_iface_decref(iface);
avro_value_decref(&record);
avro_value_iface_decref(iface);
avro_value_decref(&record);

json_decref(json);
if (memstat && !(n % 1000))
memory_status();
json_decref(json);
if (memstat && !(n % 1000))
memory_status();

json = json_loadf(input, JSON_DISABLE_EOF_CHECK, &err);
json = json_loadf(input, JSON_DISABLE_EOF_CHECK, &err);
}
fclose(input);
json_file = strtok(NULL, " ");
}

if (memstat) memory_status();
Expand All @@ -335,7 +349,7 @@ void process_file(FILE *input, avro_file_writer_t out, avro_schema_t schema,
}

void print_help(char *program_name) {
fprintf(stderr, "Usage: %s [options] [input_file.json] <output_file.avro>\n", program_name);
fprintf(stderr, "Usage: %s [options] [input_file(s).json] <output_file.avro>\n", program_name);
fprintf(stderr, "\n");
fprintf(stderr, "Where options are:\n");
fprintf(stderr, " -s schema (required) Avro schema to use for conversion.\n");
Expand Down Expand Up @@ -369,6 +383,7 @@ int main(int argc, char *argv[]) {

int opt, opterr = 0, verbose = 0, memstat = 0, errabort = 0, strjson = 0;
char *schema_arg = NULL;
char json_bucket[8192];
char *codec = NULL;
char *endptr = NULL;
char *outpath = NULL;
Expand Down Expand Up @@ -435,10 +450,6 @@ int main(int argc, char *argv[]) {
if (file_args_cnt == 0) {
usage_error(argv[0], "Please provide at least one file name argument");
}
if (file_args_cnt > 2) {
fprintf(stderr, "Too many file name arguments: %d!\n", file_args_cnt);
usage_error(argv[0], 0);
}

if (opterr) usage_error(argv[0], 0);
if (!schema_arg) usage_error(argv[0], "Please provide correct schema!");
Expand All @@ -450,15 +461,23 @@ int main(int argc, char *argv[]) {
}

if ((argc - optind) == 1) {
input = stdin;
// put a "-" into json_bucket to make it easier later to identify wether the stream we're reading is coming
// from stdin or from a stream of files
strcat(json_bucket, "-");
outpath = argv[optind];
} else {
outpath = argv[optind+1];
input = fopen(argv[optind], "rb");
if ( errno != 0 ) {
fprintf(stderr, "ERROR: Cannot open input file: %s: ", argv[optind]);
perror(0);
exit(EXIT_FAILURE);
outpath = argv[argc-1];
for (;optind < argc-1 && *argv[optind] != '-'; optind++) {
input = fopen(argv[optind], "rb");
if (errno != 0) {
fprintf(stderr, "ERROR: Cannot open input file: %s: ", argv[optind]);
perror(0);
exit(EXIT_FAILURE); // Uncomment if some of the files you're passing as input json are not present
} else {
strcat(json_bucket, argv[optind]);
strcat(json_bucket, " ");
}
fclose(input);
}
}

Expand All @@ -484,7 +503,7 @@ int main(int argc, char *argv[]) {
if (verbose)
fprintf(stderr, "Using codec: %s\n", codec);

process_file(input, out, schema, verbose, memstat, errabort, strjson, max_str_sz);
process_file(json_bucket, out, schema, verbose, memstat, errabort, strjson, max_str_sz);

if (verbose)
printf("Closing writer....\n");
Expand Down

0 comments on commit 1732321

Please sign in to comment.