Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple input files support #6

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 61 additions & 44 deletions json2avro.c
Original file line number Diff line number Diff line change
Expand Up @@ -282,51 +282,63 @@ int schema_traverse(const avro_schema_t schema, json_t *json, json_t *dft,
return 0;
}

void process_file(FILE *input, avro_file_writer_t out, avro_schema_t schema,
void process_file(char *json_bucket, avro_file_writer_t out, avro_schema_t schema,
int verbose, int memstat, int errabort, int strjson, size_t max_str_sz) {

json_error_t err;
json_t *json;
int n = 0;
FILE *input;
char *json_file;

json_file = strtok(json_bucket, " ");

json = json_loadf(input, JSON_DISABLE_EOF_CHECK, &err);
while (!feof(input)) {
n++;
if (verbose && !(n % 1000))
printf("Processing record %d\n", n);
if (!json) {
if (errabort) {
fprintf(stderr, "JSON error on line %d, column %d, pos %d: %s, aborting.\n", n, err.column, err.position, err.text);
return;
while ((json_file != NULL)) {
if (strcmp(json_file, "-") == 0)
input = stdin;
else
input = fopen(json_file, "r");
json = json_loadf(input, JSON_DISABLE_EOF_CHECK, &err);
while (!feof(input)) {
n++;
if (verbose && !(n % 1000))
printf("Processing record %d\n", n);
if (!json) {
if (errabort) {
fprintf(stderr, "JSON error on line %d, column %d, pos %d: %s, aborting.\n", n, err.column, err.position, err.text);
return;
}
fprintf(stderr, "JSON error on line %d, column %d, pos %d: %s, skipping to EOL\n", n, err.column, err.position, err.text);
while (getc(input) != '\n' && !feof(input)) {};
json = json_loadf(input, JSON_DISABLE_EOF_CHECK, &err);
continue;
}
fprintf(stderr, "JSON error on line %d, column %d, pos %d: %s, skipping to EOL\n", n, err.column, err.position, err.text);
while (getc(input) != '\n' && !feof(input)) {};
json = json_loadf(input, JSON_DISABLE_EOF_CHECK, &err);
continue;
}

avro_value_t record;
avro_value_iface_t *iface = avro_generic_class_from_schema(schema);
avro_generic_value_new(iface, &record);
avro_value_t record;
avro_value_iface_t *iface = avro_generic_class_from_schema(schema);
avro_generic_value_new(iface, &record);

if (!schema_traverse(schema, json, NULL, &record, 0, strjson, max_str_sz)) {
if (!schema_traverse(schema, json, NULL, &record, 0, strjson, max_str_sz)) {

if (avro_file_writer_append_value(out, &record)) {
fprintf(stderr, "ERROR: avro_file_writer_append_value() FAILED: %s\n", avro_strerror());
exit(EXIT_FAILURE);
}
if (avro_file_writer_append_value(out, &record)) {
fprintf(stderr, "ERROR: avro_file_writer_append_value() FAILED: %s\n", avro_strerror());
exit(EXIT_FAILURE);
}

} else
fprintf(stderr, "Error processing record %d, skipping...\n", n);
} else
fprintf(stderr, "Error processing record %d, skipping...\n", n);

avro_value_iface_decref(iface);
avro_value_decref(&record);
avro_value_iface_decref(iface);
avro_value_decref(&record);

json_decref(json);
if (memstat && !(n % 1000))
memory_status();
json_decref(json);
if (memstat && !(n % 1000))
memory_status();

json = json_loadf(input, JSON_DISABLE_EOF_CHECK, &err);
json = json_loadf(input, JSON_DISABLE_EOF_CHECK, &err);
}
fclose(input);
json_file = strtok(NULL, " ");
}

if (memstat) memory_status();
Expand All @@ -335,7 +347,7 @@ void process_file(FILE *input, avro_file_writer_t out, avro_schema_t schema,
}

void print_help(char *program_name) {
fprintf(stderr, "Usage: %s [options] [input_file.json] <output_file.avro>\n", program_name);
fprintf(stderr, "Usage: %s [options] [input_file(s).json] <output_file.avro>\n", program_name);
fprintf(stderr, "\n");
fprintf(stderr, "Where options are:\n");
fprintf(stderr, " -s schema (required) Avro schema to use for conversion.\n");
Expand Down Expand Up @@ -369,6 +381,7 @@ int main(int argc, char *argv[]) {

int opt, opterr = 0, verbose = 0, memstat = 0, errabort = 0, strjson = 0;
char *schema_arg = NULL;
char json_bucket[8192];
char *codec = NULL;
char *endptr = NULL;
char *outpath = NULL;
Expand Down Expand Up @@ -435,10 +448,6 @@ int main(int argc, char *argv[]) {
if (file_args_cnt == 0) {
usage_error(argv[0], "Please provide at least one file name argument");
}
if (file_args_cnt > 2) {
fprintf(stderr, "Too many file name arguments: %d!\n", file_args_cnt);
usage_error(argv[0], 0);
}

if (opterr) usage_error(argv[0], 0);
if (!schema_arg) usage_error(argv[0], "Please provide correct schema!");
Expand All @@ -450,15 +459,23 @@ int main(int argc, char *argv[]) {
}

if ((argc - optind) == 1) {
input = stdin;
// put a "-" into json_bucket to make it easier later to identify wether the stream we're reading is coming
// from stdin or from a stream of files
strcat(json_bucket, "-");
outpath = argv[optind];
} else {
outpath = argv[optind+1];
input = fopen(argv[optind], "rb");
if ( errno != 0 ) {
fprintf(stderr, "ERROR: Cannot open input file: %s: ", argv[optind]);
perror(0);
exit(EXIT_FAILURE);
outpath = argv[argc-1];
for (;optind < argc-1 && *argv[optind] != '-'; optind++) {
input = fopen(argv[optind], "rb");
if (errno != 0) {
fprintf(stderr, "ERROR: Cannot open input file: %s: ", argv[optind]);
perror(0);
exit(EXIT_FAILURE); // Uncomment if some of the files you're passing as input json are not present
} else {
strcat(json_bucket, argv[optind]);
strcat(json_bucket, " ");
}
fclose(input);
}
}

Expand All @@ -484,7 +501,7 @@ int main(int argc, char *argv[]) {
if (verbose)
fprintf(stderr, "Using codec: %s\n", codec);

process_file(input, out, schema, verbose, memstat, errabort, strjson, max_str_sz);
process_file(json_bucket, out, schema, verbose, memstat, errabort, strjson, max_str_sz);

if (verbose)
printf("Closing writer....\n");
Expand Down