Skip to content

Commit

Permalink
ORC-1387: [C++] Support schema evolution from decimal to numeric/decimal
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Support schema evolution from decimal to {boolean, byte, short, int, long, float, double, decimal}

### Why are the changes needed?
to support schema evolution

### How was this patch tested?
UT passed

Closes apache#1629 from ffacs/ORC-1387.

Authored-by: ffacs <ffacs520@gmail.com>
Signed-off-by: Gang Wu <ustcwg@gmail.com>
  • Loading branch information
ffacs authored and cxzl25 committed Jan 11, 2024
1 parent d98e68f commit 86e70c2
Show file tree
Hide file tree
Showing 7 changed files with 440 additions and 15 deletions.
7 changes: 6 additions & 1 deletion c++/include/orc/Int128.hh
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ namespace orc {
}

/**
* Convert the value to a long and
* Convert the value to a long and throw std::range_error on overflow.
*/
int64_t toLong() const {
if (fitsInLong()) {
Expand All @@ -300,6 +300,11 @@ namespace orc {
throw std::range_error("Int128 too large to convert to long");
}

/**
* Convert the value to a double, the return value may not be precise.
*/
double toDouble() const;

/**
* Return the base 10 string representation of the integer.
*/
Expand Down
228 changes: 217 additions & 11 deletions c++/src/ConvertColumnReader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,133 @@ namespace orc {
}
}

template <typename FileTypeBatch, typename ReadTypeBatch, typename ReadType>
class DecimalToNumericColumnReader : public ConvertColumnReader {
public:
DecimalToNumericColumnReader(const Type& _readType, const Type& fileType, StripeStreams& stripe,
bool _throwOnOverflow)
: ConvertColumnReader(_readType, fileType, stripe, _throwOnOverflow) {
precision = fileType.getPrecision();
scale = fileType.getScale();
factor = 1;
for (int i = 0; i < scale; i++) {
factor *= 10;
}
}

void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override {
ConvertColumnReader::next(rowBatch, numValues, notNull);

const auto& srcBatch = *SafeCastBatchTo<const FileTypeBatch*>(data.get());
auto& dstBatch = *SafeCastBatchTo<ReadTypeBatch*>(&rowBatch);
for (uint64_t i = 0; i < numValues; ++i) {
if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
if constexpr (std::is_floating_point_v<ReadType>) {
convertDecimalToDouble(dstBatch, i, srcBatch);
} else {
convertDecimalToInteger(dstBatch, i, srcBatch);
}
}
}
}

private:
void convertDecimalToInteger(ReadTypeBatch& dstBatch, uint64_t idx,
const FileTypeBatch& srcBatch) {
using FileType = decltype(srcBatch.values[idx]);
Int128 result = scaleDownInt128ByPowerOfTen(srcBatch.values[idx], scale);
if (!result.fitsInLong()) {
handleOverflow<FileType, ReadType>(dstBatch, idx, throwOnOverflow);
return;
}
convertNumericElement<ReadType, int64_t>(result.toLong(), dstBatch.data[idx], dstBatch, idx,
throwOnOverflow);
}

void convertDecimalToDouble(ReadTypeBatch& dstBatch, uint64_t idx,
const FileTypeBatch& srcBatch) {
double doubleValue = Int128(srcBatch.values[idx]).toDouble();
dstBatch.data[idx] = static_cast<ReadType>(doubleValue) / static_cast<ReadType>(factor);
}

int32_t precision;
int32_t scale;
int64_t factor;
};

template <typename FileTypeBatch>
class DecimalToNumericColumnReader<FileTypeBatch, BooleanVectorBatch, bool>
: public ConvertColumnReader {
public:
DecimalToNumericColumnReader(const Type& _readType, const Type& fileType, StripeStreams& stripe,
bool _throwOnOverflow)
: ConvertColumnReader(_readType, fileType, stripe, _throwOnOverflow) {}

void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override {
ConvertColumnReader::next(rowBatch, numValues, notNull);

const auto& srcBatch = *SafeCastBatchTo<const FileTypeBatch*>(data.get());
auto& dstBatch = *SafeCastBatchTo<BooleanVectorBatch*>(&rowBatch);
for (uint64_t i = 0; i < numValues; ++i) {
if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
dstBatch.data[i] = srcBatch.values[i] == 0 ? 0 : 1;
}
}
}
};

template <typename FileTypeBatch, typename ReadTypeBatch>
class DecimalConvertColumnReader : public ConvertColumnReader {
public:
DecimalConvertColumnReader(const Type& _readType, const Type& fileType, StripeStreams& stripe,
bool _throwOnOverflow)
: ConvertColumnReader(_readType, fileType, stripe, _throwOnOverflow) {
fromPrecision = fileType.getPrecision();
fromScale = fileType.getScale();
toPrecision = _readType.getPrecision();
toScale = _readType.getScale();
}

void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override {
ConvertColumnReader::next(rowBatch, numValues, notNull);

const auto& srcBatch = *SafeCastBatchTo<const FileTypeBatch*>(data.get());
auto& dstBatch = *SafeCastBatchTo<ReadTypeBatch*>(&rowBatch);
for (uint64_t i = 0; i < numValues; ++i) {
if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
convertDecimalToDecimal(dstBatch, i, srcBatch);
}
}
}

private:
void convertDecimalToDecimal(ReadTypeBatch& dstBatch, uint64_t idx,
const FileTypeBatch& srcBatch) {
using FileType = decltype(srcBatch.values[idx]);
using ReadType = decltype(dstBatch.values[idx]);

auto [overflows, resultI128] =
convertDecimal(srcBatch.values[idx], fromScale, toPrecision, toScale);
if (overflows) {
handleOverflow<FileType, ReadType>(dstBatch, idx, throwOnOverflow);
}
if constexpr (std::is_same_v<ReadTypeBatch, Decimal64VectorBatch>) {
if (!resultI128.fitsInLong()) {
handleOverflow<FileType, ReadType>(dstBatch, idx, throwOnOverflow);
} else {
dstBatch.values[idx] = resultI128.toLong();
}
} else {
dstBatch.values[idx] = resultI128;
}
}

int32_t fromPrecision;
int32_t fromScale;
int32_t toPrecision;
int32_t toScale;
};

#define DEFINE_NUMERIC_CONVERT_READER(FROM, TO, TYPE) \
using FROM##To##TO##ColumnReader = \
NumericConvertColumnReader<FROM##VectorBatch, TO##VectorBatch, TYPE>;
Expand All @@ -482,6 +609,18 @@ namespace orc {
#define DEFINE_NUMERIC_CONVERT_TO_TIMESTAMP_READER(FROM) \
using FROM##ToTimestampColumnReader = NumericToTimestampColumnReader<FROM##VectorBatch>;

#define DEFINE_DECIMAL_CONVERT_TO_NUMERIC_READER(TO, TYPE) \
using Decimal64##To##TO##ColumnReader = \
DecimalToNumericColumnReader<Decimal64VectorBatch, TO##VectorBatch, TYPE>; \
using Decimal128##To##TO##ColumnReader = \
DecimalToNumericColumnReader<Decimal128VectorBatch, TO##VectorBatch, TYPE>;

#define DEFINE_DECIMAL_CONVERT_TO_DECIMAL_READER(TO) \
using Decimal64##To##TO##ColumnReader = \
DecimalConvertColumnReader<Decimal64VectorBatch, TO##VectorBatch>; \
using Decimal128##To##TO##ColumnReader = \
DecimalConvertColumnReader<Decimal128VectorBatch, TO##VectorBatch>;

DEFINE_NUMERIC_CONVERT_READER(Boolean, Byte, int8_t)
DEFINE_NUMERIC_CONVERT_READER(Boolean, Short, int16_t)
DEFINE_NUMERIC_CONVERT_READER(Boolean, Int, int32_t)
Expand Down Expand Up @@ -568,21 +707,48 @@ namespace orc {
DEFINE_NUMERIC_CONVERT_TO_TIMESTAMP_READER(Float)
DEFINE_NUMERIC_CONVERT_TO_TIMESTAMP_READER(Double)

// Decimal to Numeric
DEFINE_DECIMAL_CONVERT_TO_NUMERIC_READER(Boolean, bool)
DEFINE_DECIMAL_CONVERT_TO_NUMERIC_READER(Byte, int8_t)
DEFINE_DECIMAL_CONVERT_TO_NUMERIC_READER(Short, int16_t)
DEFINE_DECIMAL_CONVERT_TO_NUMERIC_READER(Int, int32_t)
DEFINE_DECIMAL_CONVERT_TO_NUMERIC_READER(Long, int64_t)
DEFINE_DECIMAL_CONVERT_TO_NUMERIC_READER(Float, float)
DEFINE_DECIMAL_CONVERT_TO_NUMERIC_READER(Double, double)

// Decimal to Decimal
DEFINE_DECIMAL_CONVERT_TO_DECIMAL_READER(Decimal64)
DEFINE_DECIMAL_CONVERT_TO_DECIMAL_READER(Decimal128)

#define CREATE_READER(NAME) \
return std::make_unique<NAME>(_readType, fileType, stripe, throwOnOverflow);

#define CASE_CREATE_READER(TYPE, CONVERT) \
case TYPE: \
return std::make_unique<CONVERT##ColumnReader>(_readType, fileType, stripe, throwOnOverflow);
CREATE_READER(CONVERT##ColumnReader)

const static int32_t MAX_PRECISION_64 = 18;

#define CASE_CREATE_DECIMAL_READER(FROM) \
case DECIMAL: { \
if (_readType.getPrecision() > 0 && _readType.getPrecision() <= MAX_PRECISION_64) { \
return std::make_unique<FROM##ToDecimal64ColumnReader>(_readType, fileType, stripe, \
throwOnOverflow); \
} else { \
return std::make_unique<FROM##ToDecimal128ColumnReader>(_readType, fileType, stripe, \
throwOnOverflow); \
} \
static inline bool isDecimal64(const Type& type) {
return type.getPrecision() > 0 && type.getPrecision() <= MAX_PRECISION_64;
}

#define CASE_CREATE_FROM_DECIMAL_READER(TYPE, TO) \
case TYPE: { \
if (isDecimal64(fileType)) { \
CREATE_READER(Decimal64To##TO##ColumnReader) \
} else { \
CREATE_READER(Decimal128To##TO##ColumnReader) \
} \
}

#define CASE_CREATE_DECIMAL_READER(FROM) \
case DECIMAL: { \
if (isDecimal64(_readType)) { \
CREATE_READER(FROM##ToDecimal64ColumnReader) \
} else { \
CREATE_READER(FROM##ToDecimal128ColumnReader) \
} \
}

#define CASE_EXCEPTION \
Expand Down Expand Up @@ -776,7 +942,44 @@ namespace orc {
case MAP:
case STRUCT:
case UNION:
case DECIMAL:
case DECIMAL: {
switch (_readType.getKind()) {
CASE_CREATE_FROM_DECIMAL_READER(BOOLEAN, Boolean)
CASE_CREATE_FROM_DECIMAL_READER(BYTE, Byte)
CASE_CREATE_FROM_DECIMAL_READER(SHORT, Short)
CASE_CREATE_FROM_DECIMAL_READER(INT, Int)
CASE_CREATE_FROM_DECIMAL_READER(LONG, Long)
CASE_CREATE_FROM_DECIMAL_READER(FLOAT, Float)
CASE_CREATE_FROM_DECIMAL_READER(DOUBLE, Double)
case DECIMAL: {
if (isDecimal64(fileType)) {
if (isDecimal64(_readType)) {
CREATE_READER(Decimal64ToDecimal64ColumnReader)
} else {
CREATE_READER(Decimal64ToDecimal128ColumnReader)
}
} else {
if (isDecimal64(_readType)) {
CREATE_READER(Decimal128ToDecimal64ColumnReader)
} else {
CREATE_READER(Decimal128ToDecimal128ColumnReader)
}
}
}
case STRING:
case CHAR:
case VARCHAR:
case TIMESTAMP:
case TIMESTAMP_INSTANT:
case BINARY:
case LIST:
case MAP:
case STRUCT:
case UNION:
case DATE:
CASE_EXCEPTION
}
}
case DATE:
case VARCHAR:
case CHAR:
Expand All @@ -789,6 +992,9 @@ namespace orc {
#undef DEFINE_NUMERIC_CONVERT_TO_STRING_VARINT_READER
#undef DEFINE_NUMERIC_CONVERT_TO_DECIMAL_READER
#undef DEFINE_NUMERIC_CONVERT_TO_TIMESTAMP_READER
#undef DEFINE_DECIMAL_CONVERT_TO_NUMERIC_READER
#undef DEFINE_DECIMAL_CONVERT_TO_DECIMAL_READER
#undef CASE_CREATE_FROM_DECIMAL_READER
#undef CASE_CREATE_READER
#undef CASE_EXCEPTION

Expand Down
7 changes: 7 additions & 0 deletions c++/src/Int128.cc
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,13 @@ namespace orc {
return buf.str();
}

double Int128::toDouble() const {
if (fitsInLong()) {
return static_cast<double>(toLong());
}
return static_cast<double>(lowbits) + std::ldexp(static_cast<double>(highbits), 64);
}

const static int32_t MAX_PRECISION_64 = 18;
const static int32_t MAX_PRECISION_128 = 38;
const static int64_t POWERS_OF_TEN[MAX_PRECISION_64 + 1] = {1,
Expand Down
9 changes: 6 additions & 3 deletions c++/src/SchemaEvolution.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ namespace orc {
if (fileType.getKind() == CHAR || fileType.getKind() == VARCHAR) {
ret.isValid = readType.getMaximumLength() == fileType.getMaximumLength();
} else if (fileType.getKind() == DECIMAL) {
ret.isValid = readType.getPrecision() == fileType.getPrecision() &&
readType.getScale() == fileType.getScale();
ret.needConvert = readType.getPrecision() != fileType.getPrecision() ||
readType.getScale() != fileType.getScale();
}
} else {
switch (fileType.getKind()) {
Expand All @@ -98,7 +98,10 @@ namespace orc {
isDecimal(readType) || isTimestamp(readType);
break;
}
case DECIMAL:
case DECIMAL: {
ret.isValid = ret.needConvert = isNumeric(readType);
break;
}
case STRING:
case CHAR:
case VARCHAR:
Expand Down
Loading

0 comments on commit 86e70c2

Please sign in to comment.