diff --git a/json2avro.c b/json2avro.c index e979f57..ba391cd 100644 --- a/json2avro.c +++ b/json2avro.c @@ -282,65 +282,51 @@ int schema_traverse(const avro_schema_t schema, json_t *json, json_t *dft, return 0; } -void process_file(char *json_bucket, avro_file_writer_t out, avro_schema_t schema, +void process_file(FILE *input, avro_file_writer_t out, avro_schema_t schema, int verbose, int memstat, int errabort, int strjson, size_t max_str_sz) { json_error_t err; json_t *json; int n = 0; - FILE *input; - char *json_file; - - json_file = strtok(json_bucket, " "); - while ((json_file != NULL)) { - if (strcmp(json_file, "-") == 0) { - input = stdin; - json = json_loadf(input, JSON_DISABLE_EOF_CHECK, &err); - } else { - input = fopen(json_file, "r"); + json = json_loadf(input, JSON_DISABLE_EOF_CHECK, &err); + while (!feof(input)) { + n++; + if (verbose && !(n % 1000)) + printf("Processing record %d\n", n); + if (!json) { + if (errabort) { + fprintf(stderr, "JSON error on line %d, column %d, pos %d: %s, aborting.\n", n, err.column, err.position, err.text); + return; + } + fprintf(stderr, "JSON error on line %d, column %d, pos %d: %s, skipping to EOL\n", n, err.column, err.position, err.text); + while (getc(input) != '\n' && !feof(input)) {}; json = json_loadf(input, JSON_DISABLE_EOF_CHECK, &err); + continue; } - while (!feof(input)) { - n++; - if (verbose && !(n % 1000)) - printf("Processing record %d\n", n); - if (!json) { - if (errabort) { - fprintf(stderr, "JSON error on line %d, column %d, pos %d: %s, aborting.\n", n, err.column, err.position, err.text); - return; - } - fprintf(stderr, "JSON error on line %d, column %d, pos %d: %s, skipping to EOL\n", n, err.column, err.position, err.text); - while (getc(input) != '\n' && !feof(input)) {}; - json = json_loadf(input, JSON_DISABLE_EOF_CHECK, &err); - continue; - } - avro_value_t record; - avro_value_iface_t *iface = avro_generic_class_from_schema(schema); - avro_generic_value_new(iface, &record); + avro_value_t record; + avro_value_iface_t *iface = avro_generic_class_from_schema(schema); + avro_generic_value_new(iface, &record); - if (!schema_traverse(schema, json, NULL, &record, 0, strjson, max_str_sz)) { + if (!schema_traverse(schema, json, NULL, &record, 0, strjson, max_str_sz)) { - if (avro_file_writer_append_value(out, &record)) { - fprintf(stderr, "ERROR: avro_file_writer_append_value() FAILED: %s\n", avro_strerror()); - exit(EXIT_FAILURE); - } + if (avro_file_writer_append_value(out, &record)) { + fprintf(stderr, "ERROR: avro_file_writer_append_value() FAILED: %s\n", avro_strerror()); + exit(EXIT_FAILURE); + } - } else - fprintf(stderr, "Error processing record %d, skipping...\n", n); + } else + fprintf(stderr, "Error processing record %d, skipping...\n", n); - avro_value_iface_decref(iface); - avro_value_decref(&record); + avro_value_iface_decref(iface); + avro_value_decref(&record); - json_decref(json); - if (memstat && !(n % 1000)) - memory_status(); + json_decref(json); + if (memstat && !(n % 1000)) + memory_status(); - json = json_loadf(input, JSON_DISABLE_EOF_CHECK, &err); - } - fclose(input); - json_file = strtok(NULL, " "); + json = json_loadf(input, JSON_DISABLE_EOF_CHECK, &err); } if (memstat) memory_status(); @@ -349,7 +335,7 @@ void process_file(char *json_bucket, avro_file_writer_t out, avro_schema_t schem } void print_help(char *program_name) { - fprintf(stderr, "Usage: %s [options] [input_file(s).json] \n", program_name); + fprintf(stderr, "Usage: %s [options] [input_file.json] \n", program_name); fprintf(stderr, "\n"); fprintf(stderr, "Where options are:\n"); fprintf(stderr, " -s schema (required) Avro schema to use for conversion.\n"); @@ -383,7 +369,6 @@ int main(int argc, char *argv[]) { int opt, opterr = 0, verbose = 0, memstat = 0, errabort = 0, strjson = 0; char *schema_arg = NULL; - char json_bucket[8192]; char *codec = NULL; char *endptr = NULL; char *outpath = NULL; @@ -450,6 +435,10 @@ int main(int argc, char *argv[]) { if (file_args_cnt == 0) { usage_error(argv[0], "Please provide at least one file name argument"); } + if (file_args_cnt > 2) { + fprintf(stderr, "Too many file name arguments: %d!\n", file_args_cnt); + usage_error(argv[0], 0); + } if (opterr) usage_error(argv[0], 0); if (!schema_arg) usage_error(argv[0], "Please provide correct schema!"); @@ -461,23 +450,15 @@ int main(int argc, char *argv[]) { } if ((argc - optind) == 1) { - // put a "-" into json_bucket to make it easier later to identify wether the stream we're reading is coming - // from stdin or from a stream of files - strcat(json_bucket, "-"); + input = stdin; outpath = argv[optind]; } else { - outpath = argv[argc-1]; - for (;optind < argc-1 && *argv[optind] != '-'; optind++) { - input = fopen(argv[optind], "rb"); - if (errno != 0) { - fprintf(stderr, "ERROR: Cannot open input file: %s: ", argv[optind]); - perror(0); - exit(EXIT_FAILURE); // Uncomment if some of the files you're passing as input json are not present - } else { - strcat(json_bucket, argv[optind]); - strcat(json_bucket, " "); - } - fclose(input); + outpath = argv[optind+1]; + input = fopen(argv[optind], "rb"); + if ( errno != 0 ) { + fprintf(stderr, "ERROR: Cannot open input file: %s: ", argv[optind]); + perror(0); + exit(EXIT_FAILURE); } } @@ -503,7 +484,7 @@ int main(int argc, char *argv[]) { if (verbose) fprintf(stderr, "Using codec: %s\n", codec); - process_file(json_bucket, out, schema, verbose, memstat, errabort, strjson, max_str_sz); + process_file(input, out, schema, verbose, memstat, errabort, strjson, max_str_sz); if (verbose) printf("Closing writer....\n");