Skip to content

Commit

Permalink
Provide support for BSON on BE
Browse files Browse the repository at this point in the history
  • Loading branch information
kothiga committed Sep 22, 2023
1 parent 6a4e4cc commit 80d5110
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 30 deletions.
56 changes: 31 additions & 25 deletions src/Processors/Formats/Impl/BSONEachRowRowInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ static UInt8 readBSONType(ReadBuffer & in)
static size_t readBSONSize(ReadBuffer & in)
{
BSONSizeT size;
readBinary(size, in);
readBinaryLittleEndian(size, in);
return size;
}

Expand All @@ -131,19 +131,19 @@ static void readAndInsertInteger(ReadBuffer & in, IColumn & column, const DataTy
if (bson_type == BSONType::INT32)
{
UInt32 value;
readBinary(value, in);
readBinaryLittleEndian(value, in);
assert_cast<ColumnVector<T> &>(column).insertValue(static_cast<T>(value));
}
else if (bson_type == BSONType::INT64)
{
UInt64 value;
readBinary(value, in);
readBinaryLittleEndian(value, in);
assert_cast<ColumnVector<T> &>(column).insertValue(static_cast<T>(value));
}
else if (bson_type == BSONType::BOOL)
{
UInt8 value;
readBinary(value, in);
readBinaryLittleEndian(value, in);
assert_cast<ColumnVector<T> &>(column).insertValue(static_cast<T>(value));
}
else
Expand All @@ -160,7 +160,7 @@ static void readAndInsertIPv4(ReadBuffer & in, IColumn & column, BSONType bson_t
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Cannot insert BSON Int32 into column with type IPv4");

UInt32 value;
readBinary(value, in);
readBinaryLittleEndian(value, in);
assert_cast<ColumnIPv4 &>(column).insertValue(IPv4(value));
}

Expand All @@ -172,7 +172,7 @@ static void readAndInsertDouble(ReadBuffer & in, IColumn & column, const DataTyp
getBSONTypeName(bson_type), data_type->getName());

Float64 value;
readBinary(value, in);
readBinaryLittleEndian(value, in);
assert_cast<ColumnVector<T> &>(column).insertValue(static_cast<T>(value));
}

Expand All @@ -184,7 +184,7 @@ static void readAndInsertSmallDecimal(ReadBuffer & in, IColumn & column, const D
getBSONTypeName(bson_type), data_type->getName());

DecimalType value;
readBinary(value, in);
readBinaryLittleEndian(value, in);
assert_cast<ColumnDecimal<DecimalType> &>(column).insertValue(value);
}

Expand All @@ -194,7 +194,7 @@ static void readAndInsertDateTime64(ReadBuffer & in, IColumn & column, BSONType
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Cannot insert BSON {} into DateTime64 column", getBSONTypeName(bson_type));

DateTime64 value;
readBinary(value, in);
readBinaryLittleEndian(value, in);
assert_cast<DataTypeDateTime64::ColumnType &>(column).insertValue(value);
}

Expand Down Expand Up @@ -222,7 +222,7 @@ static void readAndInsertBigInteger(ReadBuffer & in, IColumn & column, const Dat
sizeof(ValueType));

ValueType value;
readBinary(value, in);
readBinaryLittleEndian(value, in);
assert_cast<ColumnType &>(column).insertValue(value);
}

Expand Down Expand Up @@ -355,7 +355,7 @@ static void readAndInsertUUID(ReadBuffer & in, IColumn & column, BSONType bson_t
sizeof(UUID));

UUID value;
readBinary(value, in);
readBinaryLittleEndian(value, in);
assert_cast<ColumnUUID &>(column).insertValue(value);
}

Expand All @@ -371,7 +371,7 @@ void BSONEachRowRowInputFormat::readArray(IColumn & column, const DataTypePtr &

size_t document_start = in->count();
BSONSizeT document_size;
readBinary(document_size, *in);
readBinaryLittleEndian(document_size, *in);
if (document_size < sizeof(BSONSizeT) + sizeof(BSON_DOCUMENT_END))
throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid document size: {}", document_size);

Expand Down Expand Up @@ -401,7 +401,7 @@ void BSONEachRowRowInputFormat::readTuple(IColumn & column, const DataTypePtr &

size_t document_start = in->count();
BSONSizeT document_size;
readBinary(document_size, *in);
readBinaryLittleEndian(document_size, *in);
if (document_size < sizeof(BSONSizeT) + sizeof(BSON_DOCUMENT_END))
throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid document size: {}", document_size);

Expand Down Expand Up @@ -462,7 +462,7 @@ void BSONEachRowRowInputFormat::readMap(IColumn & column, const DataTypePtr & da

size_t document_start = in->count();
BSONSizeT document_size;
readBinary(document_size, *in);
readBinaryLittleEndian(document_size, *in);
if (document_size < sizeof(BSONSizeT) + sizeof(BSON_DOCUMENT_END))
throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid document size: {}", document_size);

Expand Down Expand Up @@ -696,15 +696,15 @@ static void skipBSONField(ReadBuffer & in, BSONType type)
case BSONType::STRING:
{
BSONSizeT size;
readBinary(size, in);
readBinaryLittleEndian(size, in);
in.ignore(size);
break;
}
case BSONType::DOCUMENT: [[fallthrough]];
case BSONType::ARRAY:
{
BSONSizeT size;
readBinary(size, in);
readBinaryLittleEndian(size, in);
if (size < sizeof(BSONSizeT) + sizeof(BSON_DOCUMENT_END))
throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid document size: {}", size);
in.ignore(size - sizeof(size));
Expand All @@ -713,7 +713,7 @@ static void skipBSONField(ReadBuffer & in, BSONType type)
case BSONType::BINARY:
{
BSONSizeT size;
readBinary(size, in);
readBinaryLittleEndian(size, in);
in.ignore(size + 1);
break;
}
Expand All @@ -738,14 +738,14 @@ static void skipBSONField(ReadBuffer & in, BSONType type)
case BSONType::DB_POINTER:
{
BSONSizeT size;
readBinary(size, in);
readBinaryLittleEndian(size, in);
in.ignore(size + BSON_DB_POINTER_SIZE);
break;
}
case BSONType::JAVA_SCRIPT_CODE_W_SCOPE:
{
BSONSizeT size;
readBinary(size, in);
readBinaryLittleEndian(size, in);
if (size < sizeof(BSONSizeT))
throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid java code_w_scope size: {}", size);
in.ignore(size - sizeof(size));
Expand Down Expand Up @@ -787,7 +787,7 @@ bool BSONEachRowRowInputFormat::readRow(MutableColumns & columns, RowReadExtensi
size_t key_index = 0;

current_document_start = in->count();
readBinary(current_document_size, *in);
readBinaryLittleEndian(current_document_size, *in);
if (current_document_size < sizeof(BSONSizeT) + sizeof(BSON_DOCUMENT_END))
throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid document size: {}", current_document_size);

Expand Down Expand Up @@ -844,7 +844,7 @@ size_t BSONEachRowRowInputFormat::countRows(size_t max_block_size)
BSONSizeT document_size;
while (!in->eof() && num_rows < max_block_size)
{
readBinary(document_size, *in);
readBinaryLittleEndian(document_size, *in);
if (document_size < sizeof(BSONSizeT) + sizeof(BSON_DOCUMENT_END))
throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid document size: {}", document_size);
in->ignore(document_size - sizeof(BSONSizeT));
Expand Down Expand Up @@ -893,7 +893,7 @@ DataTypePtr BSONEachRowSchemaReader::getDataTypeFromBSONField(BSONType type, boo
case BSONType::STRING:
{
BSONSizeT size;
readBinary(size, in);
readBinaryLittleEndian(size, in);
in.ignore(size);
return std::make_shared<DataTypeString>();
}
Expand Down Expand Up @@ -947,7 +947,7 @@ DataTypePtr BSONEachRowSchemaReader::getDataTypeFromBSONField(BSONType type, boo
case BSONType::BINARY:
{
BSONSizeT size;
readBinary(size, in);
readBinaryLittleEndian(size, in);
auto subtype = getBSONBinarySubtype(readBSONType(in));
in.ignore(size);
switch (subtype)
Expand Down Expand Up @@ -982,7 +982,7 @@ NamesAndTypesList BSONEachRowSchemaReader::getDataTypesFromBSONDocument(bool all
{
size_t document_start = in.count();
BSONSizeT document_size;
readBinary(document_size, in);
readBinaryLittleEndian(document_size, in);
NamesAndTypesList names_and_types;
while (in.count() - document_start + sizeof(BSON_DOCUMENT_END) != document_size)
{
Expand Down Expand Up @@ -1028,7 +1028,7 @@ fileSegmentationEngineBSONEachRow(ReadBuffer & in, DB::Memory<> & memory, size_t
while (!in.eof() && memory.size() < min_bytes && number_of_rows < max_rows)
{
BSONSizeT document_size;
readBinary(document_size, in);
readBinaryLittleEndian(document_size, in);

if (document_size < sizeof(document_size))
throw ParsingException(ErrorCodes::INCORRECT_DATA, "Size of BSON document is invalid");
Expand All @@ -1045,7 +1045,13 @@ fileSegmentationEngineBSONEachRow(ReadBuffer & in, DB::Memory<> & memory, size_t

size_t old_size = memory.size();
memory.resize(old_size + document_size);
unalignedStore<BSONSizeT>(memory.data() + old_size, document_size);

// Ensure the document size we write to the memory is byte arranged for LE.
BSONSizeT size_out = document_size;
if constexpr(std::endian::native == std::endian::big)
size_out = std::byteswap(size_out);
unalignedStore<BSONSizeT>(memory.data() + old_size, size_out);

in.readStrict(memory.data() + old_size + sizeof(document_size), document_size - sizeof(document_size));
++number_of_rows;
}
Expand Down
9 changes: 4 additions & 5 deletions src/Processors/Formats/Impl/BSONEachRowRowOutputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ static void writeBSONSize(size_t size, WriteBuffer & buf)
if (size > MAX_BSON_SIZE)
throw Exception(ErrorCodes::INCORRECT_DATA, "Too large document/value size: {}. Maximum allowed size: {}.", size, MAX_BSON_SIZE);

writePODBinary<BSONSizeT>(BSONSizeT(size), buf);
writeBinaryLittleEndian(BSONSizeT(size), buf);
}

template <typename Type>
Expand All @@ -79,7 +79,7 @@ template <typename ColumnType, typename ValueType>
static void writeBSONNumber(BSONType type, const IColumn & column, size_t row_num, const String & name, WriteBuffer & buf)
{
writeBSONTypeAndKeyName(type, name, buf);
writePODBinary<ValueType>(assert_cast<const ColumnType &>(column).getElement(row_num), buf);
writeBinaryLittleEndian(ValueType(assert_cast<const ColumnType &>(column).getElement(row_num)), buf);
}

template <typename StringColumnType>
Expand Down Expand Up @@ -109,8 +109,7 @@ static void writeBSONBigInteger(const IColumn & column, size_t row_num, const St
writeBSONTypeAndKeyName(BSONType::BINARY, name, buf);
writeBSONSize(sizeof(typename ColumnType::ValueType), buf);
writeBSONType(BSONBinarySubtype::BINARY, buf);
auto data = assert_cast<const ColumnType &>(column).getDataAt(row_num);
buf.write(data.data, data.size);
writeBinaryLittleEndian(assert_cast<const ColumnType &>(column).getElement(row_num), buf);
}

size_t BSONEachRowRowOutputFormat::countBSONFieldSize(const IColumn & column, const DataTypePtr & data_type, size_t row_num, const String & name, const String & path, std::unordered_map<String, size_t> & nested_document_sizes)
Expand Down Expand Up @@ -407,7 +406,7 @@ void BSONEachRowRowOutputFormat::serializeField(const IColumn & column, const Da
writeBSONTypeAndKeyName(BSONType::BINARY, name, out);
writeBSONSize(sizeof(UUID), out);
writeBSONType(BSONBinarySubtype::UUID, out);
writeBinary(assert_cast<const ColumnUUID &>(column).getElement(row_num), out);
writeBinaryLittleEndian(assert_cast<const ColumnUUID &>(column).getElement(row_num), out);
break;
}
case TypeIndex::LowCardinality:
Expand Down

0 comments on commit 80d5110

Please sign in to comment.