diff --git a/json2avro.c b/json2avro.c index ba391cd..e979f57 100644 --- a/json2avro.c +++ b/json2avro.c @@ -282,51 +282,65 @@ int schema_traverse(const avro_schema_t schema, json_t *json, json_t *dft, return 0; } -void process_file(FILE *input, avro_file_writer_t out, avro_schema_t schema, +void process_file(char *json_bucket, avro_file_writer_t out, avro_schema_t schema, int verbose, int memstat, int errabort, int strjson, size_t max_str_sz) { json_error_t err; json_t *json; int n = 0; + FILE *input; + char *json_file; - json = json_loadf(input, JSON_DISABLE_EOF_CHECK, &err); - while (!feof(input)) { - n++; - if (verbose && !(n % 1000)) - printf("Processing record %d\n", n); - if (!json) { - if (errabort) { - fprintf(stderr, "JSON error on line %d, column %d, pos %d: %s, aborting.\n", n, err.column, err.position, err.text); - return; - } - fprintf(stderr, "JSON error on line %d, column %d, pos %d: %s, skipping to EOL\n", n, err.column, err.position, err.text); - while (getc(input) != '\n' && !feof(input)) {}; + json_file = strtok(json_bucket, " "); + + while ((json_file != NULL)) { + if (strcmp(json_file, "-") == 0) { + input = stdin; + json = json_loadf(input, JSON_DISABLE_EOF_CHECK, &err); + } else { + input = fopen(json_file, "r"); json = json_loadf(input, JSON_DISABLE_EOF_CHECK, &err); - continue; } + while (!feof(input)) { + n++; + if (verbose && !(n % 1000)) + printf("Processing record %d\n", n); + if (!json) { + if (errabort) { + fprintf(stderr, "JSON error on line %d, column %d, pos %d: %s, aborting.\n", n, err.column, err.position, err.text); + return; + } + fprintf(stderr, "JSON error on line %d, column %d, pos %d: %s, skipping to EOL\n", n, err.column, err.position, err.text); + while (getc(input) != '\n' && !feof(input)) {}; + json = json_loadf(input, JSON_DISABLE_EOF_CHECK, &err); + continue; + } - avro_value_t record; - avro_value_iface_t *iface = avro_generic_class_from_schema(schema); - avro_generic_value_new(iface, &record); + avro_value_t record; + avro_value_iface_t *iface = avro_generic_class_from_schema(schema); + avro_generic_value_new(iface, &record); - if (!schema_traverse(schema, json, NULL, &record, 0, strjson, max_str_sz)) { + if (!schema_traverse(schema, json, NULL, &record, 0, strjson, max_str_sz)) { - if (avro_file_writer_append_value(out, &record)) { - fprintf(stderr, "ERROR: avro_file_writer_append_value() FAILED: %s\n", avro_strerror()); - exit(EXIT_FAILURE); - } + if (avro_file_writer_append_value(out, &record)) { + fprintf(stderr, "ERROR: avro_file_writer_append_value() FAILED: %s\n", avro_strerror()); + exit(EXIT_FAILURE); + } - } else - fprintf(stderr, "Error processing record %d, skipping...\n", n); + } else + fprintf(stderr, "Error processing record %d, skipping...\n", n); - avro_value_iface_decref(iface); - avro_value_decref(&record); + avro_value_iface_decref(iface); + avro_value_decref(&record); - json_decref(json); - if (memstat && !(n % 1000)) - memory_status(); + json_decref(json); + if (memstat && !(n % 1000)) + memory_status(); - json = json_loadf(input, JSON_DISABLE_EOF_CHECK, &err); + json = json_loadf(input, JSON_DISABLE_EOF_CHECK, &err); + } + fclose(input); + json_file = strtok(NULL, " "); } if (memstat) memory_status(); @@ -335,7 +349,7 @@ void process_file(FILE *input, avro_file_writer_t out, avro_schema_t schema, } void print_help(char *program_name) { - fprintf(stderr, "Usage: %s [options] [input_file.json] \n", program_name); + fprintf(stderr, "Usage: %s [options] [input_file(s).json] \n", program_name); fprintf(stderr, "\n"); fprintf(stderr, "Where options are:\n"); fprintf(stderr, " -s schema (required) Avro schema to use for conversion.\n"); @@ -369,6 +383,7 @@ int main(int argc, char *argv[]) { int opt, opterr = 0, verbose = 0, memstat = 0, errabort = 0, strjson = 0; char *schema_arg = NULL; + char json_bucket[8192]; char *codec = NULL; char *endptr = NULL; char *outpath = NULL; @@ -435,10 +450,6 @@ int main(int argc, char *argv[]) { if (file_args_cnt == 0) { usage_error(argv[0], "Please provide at least one file name argument"); } - if (file_args_cnt > 2) { - fprintf(stderr, "Too many file name arguments: %d!\n", file_args_cnt); - usage_error(argv[0], 0); - } if (opterr) usage_error(argv[0], 0); if (!schema_arg) usage_error(argv[0], "Please provide correct schema!"); @@ -450,15 +461,23 @@ int main(int argc, char *argv[]) { } if ((argc - optind) == 1) { - input = stdin; + // put a "-" into json_bucket to make it easier later to identify wether the stream we're reading is coming + // from stdin or from a stream of files + strcat(json_bucket, "-"); outpath = argv[optind]; } else { - outpath = argv[optind+1]; - input = fopen(argv[optind], "rb"); - if ( errno != 0 ) { - fprintf(stderr, "ERROR: Cannot open input file: %s: ", argv[optind]); - perror(0); - exit(EXIT_FAILURE); + outpath = argv[argc-1]; + for (;optind < argc-1 && *argv[optind] != '-'; optind++) { + input = fopen(argv[optind], "rb"); + if (errno != 0) { + fprintf(stderr, "ERROR: Cannot open input file: %s: ", argv[optind]); + perror(0); + exit(EXIT_FAILURE); // Uncomment if some of the files you're passing as input json are not present + } else { + strcat(json_bucket, argv[optind]); + strcat(json_bucket, " "); + } + fclose(input); } } @@ -484,7 +503,7 @@ int main(int argc, char *argv[]) { if (verbose) fprintf(stderr, "Using codec: %s\n", codec); - process_file(input, out, schema, verbose, memstat, errabort, strjson, max_str_sz); + process_file(json_bucket, out, schema, verbose, memstat, errabort, strjson, max_str_sz); if (verbose) printf("Closing writer....\n");