From 35cc326a5a039c9cb87289217ce84adbc204335d Mon Sep 17 00:00:00 2001 From: haifwu Date: Mon, 13 May 2024 11:07:24 +0800 Subject: [PATCH 1/2] Support parquet input in local stats --- .../ml/shifu/shifu/fs/ParquetInputStream.java | 69 +++++++++++++++++++ .../ml/shifu/shifu/fs/ShifuFileUtils.java | 2 + 2 files changed, 71 insertions(+) create mode 100644 src/main/java/ml/shifu/shifu/fs/ParquetInputStream.java diff --git a/src/main/java/ml/shifu/shifu/fs/ParquetInputStream.java b/src/main/java/ml/shifu/shifu/fs/ParquetInputStream.java new file mode 100644 index 000000000..9a8a3b84c --- /dev/null +++ b/src/main/java/ml/shifu/shifu/fs/ParquetInputStream.java @@ -0,0 +1,69 @@ +package ml.shifu.shifu.fs; + +import ml.shifu.shifu.util.Constants; +import ml.shifu.shifu.util.Environment; +import org.apache.hadoop.fs.Path; +import org.apache.logging.log4j.util.Strings; +import parquet.example.data.Group; +import parquet.example.data.simple.SimpleGroup; +import parquet.hadoop.ParquetReader; +import parquet.hadoop.example.GroupReadSupport; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class ParquetInputStream extends InputStream { + + private final ParquetReader reader; + private static final String FIELD_DELIMITER = Environment.getProperty(Constants.SHIFU_OUTPUT_DATA_DELIMITER, + Constants.DEFAULT_DELIMITER); + private static final Integer ALL_DONE = -1; + private ByteArrayInputStream buffer = new ByteArrayInputStream(new byte[0]); + private final Charset charset; + + public ParquetInputStream(Path filePath, String charset) throws IOException { + this.reader = ParquetReader.builder(new GroupReadSupport(), filePath).build(); + this.charset = Charset.forName(charset); + } + + @Override + public int read() throws IOException { + int readOne = buffer.read(); + while (readOne == ALL_DONE) { + Group record = this.reader.read(); + if(record == null) { + return ALL_DONE; + } else { + this.buffer = buildScanBuffer(record); + } + readOne = this.buffer.read(); + } + return readOne; + } + + private ByteArrayInputStream buildScanBuffer(Group group) { + String record = simpleGroupToString((SimpleGroup) group) + Strings.LINE_SEPARATOR; + return new ByteArrayInputStream(record.getBytes(this.charset)); + } + + private String simpleGroupToString(SimpleGroup group) { + return IntStream.range(0, group.getType().getFieldCount()) + .mapToObj(i -> { + int fieldRepetitionCount = group.getFieldRepetitionCount(i); + if(fieldRepetitionCount > 0) { + return group.getValueToString(i, 0); + } + return Strings.EMPTY; + }) + .collect(Collectors.joining(FIELD_DELIMITER)); + } + + @Override + public void close() throws IOException { + this.reader.close(); + } +} \ No newline at end of file diff --git a/src/main/java/ml/shifu/shifu/fs/ShifuFileUtils.java b/src/main/java/ml/shifu/shifu/fs/ShifuFileUtils.java index e4d0d5632..95a062ab3 100644 --- a/src/main/java/ml/shifu/shifu/fs/ShifuFileUtils.java +++ b/src/main/java/ml/shifu/shifu/fs/ShifuFileUtils.java @@ -328,6 +328,8 @@ public int compare(FileStatus f1, FileStatus f2) { } else if(filename.endsWith(Constants.BZ2_SUFFIX)) { scanners.add(new Scanner(new BZip2CompressorInputStream(fs.open(f.getPath())), Constants.DEFAULT_CHARSET)); + } else if(filename.endsWith(Constants.SHIFU_OUTPUT_PARQUET_FORMAT)) { + scanners.add(new Scanner(new ParquetInputStream(f.getPath(), Constants.DEFAULT_CHARSET), Constants.DEFAULT_CHARSET)); } else { scanners.add(new Scanner(new BufferedInputStream(fs.open(f.getPath())), Constants.DEFAULT_CHARSET)); } From c0e9b53bb7fd620b112039ce750604e607be2623 Mon Sep 17 00:00:00 2001 From: haifwu Date: Mon, 26 Aug 2024 22:41:25 +0800 Subject: [PATCH 2/2] fixed cateMinCnt not working issue --- src/main/java/ml/shifu/shifu/container/obj/ModelStatsConf.java | 2 +- .../ml/shifu/shifu/core/binning/UpdateBinningInfoReducer.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/ml/shifu/shifu/container/obj/ModelStatsConf.java b/src/main/java/ml/shifu/shifu/container/obj/ModelStatsConf.java index c493b8179..a74ebfc10 100644 --- a/src/main/java/ml/shifu/shifu/container/obj/ModelStatsConf.java +++ b/src/main/java/ml/shifu/shifu/container/obj/ModelStatsConf.java @@ -202,7 +202,6 @@ public void setPsiColumnName(String psiColumnName) { /** * @return the cateMinCnt */ - @JsonIgnore public Integer getCateMinCnt() { return cateMinCnt; } @@ -223,6 +222,7 @@ public ModelStatsConf clone() { other.setBinningAutoTypeThreshold(binningAutoTypeThreshold); other.setBinningMergeEnable(binningMergeEnable); other.setBinningMethod(binningMethod); + other.setCateMinCnt(cateMinCnt); other.setMaxNumBin(maxNumBin); other.setNumericalValueThreshold(numericalValueThreshold); other.setPsiColumnName(psiColumnName); diff --git a/src/main/java/ml/shifu/shifu/core/binning/UpdateBinningInfoReducer.java b/src/main/java/ml/shifu/shifu/core/binning/UpdateBinningInfoReducer.java index b8c9b6926..171f078f4 100644 --- a/src/main/java/ml/shifu/shifu/core/binning/UpdateBinningInfoReducer.java +++ b/src/main/java/ml/shifu/shifu/core/binning/UpdateBinningInfoReducer.java @@ -382,7 +382,7 @@ protected void reduce(IntWritable key, Iterable values, Con binWeightPos = newBinWeightPos; binWeightNeg = newBinWeightNeg; - double[] newBinPosRate = new double[binCountPos.length - smallCategories.size()]; + double[] newBinPosRate = new double[newBinCountPos.length]; for(int i = 0; i < newBinCountPos.length; i++) { long newCount = newBinCountPos[i] + newBinCountNeg[i]; if(newCount > 0) {