Skip to content

Commit

Permalink
Merge pull request apache#61 from pldash/master
Browse files Browse the repository at this point in the history
Modified the csvtojson converter to use the inputstreamcsvreader and handled the IOException to throw the DataConversion Exception in the csvtojsonconverter
  • Loading branch information
narasimhareddyv committed Mar 25, 2015
2 parents 1bcd664 + ae7912d commit 4a9ecca
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@

import gobblin.converter.Converter;
import gobblin.converter.SingleRecordIterable;
import java.io.IOException;
import java.util.List;

import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
Expand All @@ -27,6 +27,7 @@
import gobblin.configuration.WorkUnitState;
import gobblin.converter.DataConversionException;
import gobblin.converter.SchemaConversionException;
import gobblin.source.extractor.utils.InputStreamCSVReader;


public class CsvToJsonConverter extends Converter<String, JsonArray, String, JsonObject> {
Expand All @@ -37,8 +38,7 @@ public class CsvToJsonConverter extends Converter<String, JsonArray, String, Jso
* @return a JsonArray representation of the schema
*/
@Override
public JsonArray convertSchema(String inputSchema, WorkUnitState workUnit)
throws SchemaConversionException {
public JsonArray convertSchema(String inputSchema, WorkUnitState workUnit) throws SchemaConversionException {
JsonParser jsonParser = new JsonParser();
JsonElement jsonSchema = jsonParser.parse(inputSchema);
return jsonSchema.getAsJsonArray();
Expand All @@ -48,18 +48,27 @@ public JsonArray convertSchema(String inputSchema, WorkUnitState workUnit)
* Takes in a record with format String and splits the data based on SOURCE_SCHEMA_DELIMITER
* Uses the inputSchema and the split record to convert the record to a JsonObject
* @return a JsonObject representing the record
* @throws IOException
*/
@Override
public Iterable<JsonObject> convertRecord(JsonArray outputSchema, String inputRecord, WorkUnitState workUnit)
throws DataConversionException {
List<String> recordSplit = Lists.newArrayList(
Splitter.onPattern(workUnit.getProp(ConfigurationKeys.CONVERTER_CSV_TO_JSON_DELIMITER)).trimResults()
.split(inputRecord));
InputStreamCSVReader reader =
new InputStreamCSVReader(inputRecord, workUnit.getProp(ConfigurationKeys.CONVERTER_CSV_TO_JSON_DELIMITER)
.trim().charAt(0));
List<String> recordSplit;
try {
recordSplit = Lists.newArrayList(reader.splitRecord());
} catch (IOException e) {
throw new DataConversionException(e);
}
JsonObject outputRecord = new JsonObject();

for (int i = 0; i < outputSchema.size(); i++) {
if (i < recordSplit.size()) {
if (recordSplit.get(i).isEmpty() || recordSplit.get(i).toLowerCase().equals(NULL)) {
if (recordSplit.get(i) == null) {
outputRecord.add(outputSchema.get(i).getAsJsonObject().get("columnName").getAsString(), JsonNull.INSTANCE);
} else if (recordSplit.get(i).isEmpty() || recordSplit.get(i).toLowerCase().equals(NULL)) {
outputRecord.add(outputSchema.get(i).getAsJsonObject().get("columnName").getAsString(), JsonNull.INSTANCE);
} else {
outputRecord
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public InputStreamCSVReader(BufferedReader input) {

public InputStreamCSVReader(String input) {
this(new InputStreamReader(new ByteArrayInputStream(input.getBytes()),
Charset.forName(ConfigurationKeys.DEFAULT_CHARSET_ENCODING)));
Charset.forName(ConfigurationKeys.DEFAULT_CHARSET_ENCODING)), ',', '\"');
}

public InputStreamCSVReader(Reader input, char customizedSeparator) {
Expand Down Expand Up @@ -98,8 +98,12 @@ public InputStreamCSVReader(BufferedReader input, char separator, char enclosedC
atEOF = false;
}

public ArrayList<String> nextRecord()
throws IOException {
public ArrayList<String> splitRecord() throws IOException {
ArrayList<String> record = this.getNextRecordFromStream();
return record;
}

public ArrayList<String> nextRecord() throws IOException {
ArrayList<String> record = this.getNextRecordFromStream();

// skip record if it is empty
Expand Down

0 comments on commit 4a9ecca

Please sign in to comment.