Skip to content

Commit

Permalink
Closes #2444 - SegArray with String Values Parquet Support (#2492)
Browse files Browse the repository at this point in the history
* Configuration to support writing segarray of string values to parquet. Read not yet implemented.

* Read of SegArray containing strings.

* Modifications to allow read functionality on multilocale and single locale to work properly.

* Removing unneeded change.

* Making one update that I forgot about to remove aggregation use.

* Adjusting dtype handling

* Correcting locality issue for larger number of locales.

* Addressing other review comments.
  • Loading branch information
Ethan-DeBandi99 committed Jun 15, 2023
1 parent a6f495e commit eeebe70
Show file tree
Hide file tree
Showing 5 changed files with 335 additions and 79 deletions.
5 changes: 0 additions & 5 deletions arkouda/segarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -1065,10 +1065,6 @@ def to_parquet(
"""
from arkouda.io import _mode_str_to_int

if self.dtype == str_:
# Support will be added by Issue #2444
raise TypeError("SegArrays with Strings values are not yet supported by Parquet")

if mode.lower() == "append":
raise ValueError("Append mode is not supported for SegArray.")

Expand All @@ -1083,7 +1079,6 @@ def to_parquet(
"mode": _mode_str_to_int(mode),
"prefix": prefix_path,
"objType": self.objType,
"dtype": self.dtype,
"compression": compression,
},
),
Expand Down
187 changes: 167 additions & 20 deletions src/ArrowFunctions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,17 @@ int cpp_getListType(const char* filename, const char* colname, char** errMsg) {
int64_t cpp_getStringColumnNumBytes(const char* filename, const char* colname, void* chpl_offsets, int64_t numElems, int64_t startIdx, char** errMsg) {
try {
int64_t ty = cpp_getType(filename, colname, errMsg);
int64_t dty; // used to store the type of data so we can handle lists
if (ty == ARROWLIST) { // get the type of the list so we can verify it is ARROWSTRING
dty = cpp_getListType(filename, colname, errMsg);
}
else {
dty = ty;
}
auto offsets = (int64_t*)chpl_offsets;
int64_t byteSize = 0;

if(ty == ARROWSTRING) {
if(dty == ARROWSTRING) {
std::unique_ptr<parquet::ParquetFileReader> parquet_reader =
parquet::ParquetFileReader::OpenFile(filename, false);

Expand All @@ -225,7 +232,12 @@ int64_t cpp_getStringColumnNumBytes(const char* filename, const char* colname, v

std::shared_ptr<parquet::ColumnReader> column_reader;

auto idx = file_metadata -> schema() -> ColumnIndex(colname);
int64_t idx;
if (ty == ARROWLIST) {
idx = file_metadata -> schema() -> group_node() -> FieldIndex(colname);
} else {
idx = file_metadata -> schema() -> ColumnIndex(colname);
}

if(idx < 0) {
std::string dname(colname);
Expand Down Expand Up @@ -352,6 +364,29 @@ int64_t cpp_getListColumnSize(const char* filename, const char* colname, void* c
seg_sizes[i] = seg_size;
}
}
} else if (lty == ARROWSTRING) {
parquet::ByteArrayReader* reader =
static_cast<parquet::ByteArrayReader*>(column_reader.get());

while (reader->HasNext()) {
parquet::ByteArray value;
(void)reader->ReadBatch(1, &definition_level, &rep_lvl, &value, &values_read);
if (values_read == 0 || (!first && rep_lvl == 0)) {
seg_sizes[i] = seg_size;
i++;
seg_size = 0;
}
if (values_read != 0) {
seg_size++;
vct++;
if (first) {
first = false;
}
}
if (!reader->HasNext()){
seg_sizes[i] = seg_size;
}
}
} else if(lty == ARROWBOOLEAN) {
parquet::BoolReader* bool_reader =
static_cast<parquet::BoolReader*>(column_reader.get());
Expand Down Expand Up @@ -425,7 +460,6 @@ int64_t cpp_getListColumnSize(const char* filename, const char* colname, void* c
}
}
}
// TODO - add string case
}
return vct;
}
Expand Down Expand Up @@ -548,6 +582,24 @@ int cpp_readListColumnByName(const char* filename, void* chpl_arr, const char* c
i+=values_read;
}
free(tmpArr);
} else if (lty == ARROWSTRING) {
int16_t definition_level; // nullable type and only reading single records in batch
auto chpl_ptr = (unsigned char*)chpl_arr;
parquet::ByteArrayReader* reader =
static_cast<parquet::ByteArrayReader*>(column_reader.get());

while (reader->HasNext()) {
parquet::ByteArray value;
(void)reader->ReadBatch(1, &definition_level, nullptr, &value, &values_read);
// if values_read is 0, that means that it was a null value
if(values_read > 0) {
for(int j = 0; j < value.len; j++) {
chpl_ptr[i] = value.ptr[j];
i++;
}
}
i++; // skip one space so the strings are null terminated with a 0
}
} else if(lty == ARROWBOOLEAN) {
auto chpl_ptr = (bool*)chpl_arr;
parquet::BoolReader* reader =
Expand Down Expand Up @@ -1251,7 +1303,91 @@ int cpp_writeStrColumnToParquet(const char* filename, void* chpl_arr, void* chpl
}
}

int cpp_writeListColumnToParquet(const char* filename, void* chpl_arr, void* chpl_offsets,
int cpp_writeStrListColumnToParquet(const char* filename, void* chpl_segs, void* chpl_offsets, void* chpl_arr,
const char* dsetname, int64_t numelems,
int64_t rowGroupSize, int64_t dtype, int64_t compression,
char** errMsg) {
try {
if(dtype == ARROWSTRING) { // check the type here so if it is wrong we don't create a bad file
using FileClass = ::arrow::io::FileOutputStream;
std::shared_ptr<FileClass> out_file;
PARQUET_ASSIGN_OR_THROW(out_file, FileClass::Open(filename));

parquet::schema::NodeVector fields;

auto element = parquet::schema::PrimitiveNode::Make("item", parquet::Repetition::OPTIONAL, parquet::Type::BYTE_ARRAY, parquet::ConvertedType::NONE);
auto list = parquet::schema::GroupNode::Make("list", parquet::Repetition::REPEATED, {element});
fields.push_back(parquet::schema::GroupNode::Make(dsetname, parquet::Repetition::OPTIONAL, {list}, parquet::ConvertedType::LIST));
std::shared_ptr<parquet::schema::GroupNode> schema = std::static_pointer_cast<parquet::schema::GroupNode>
(parquet::schema::GroupNode::Make("schema", parquet::Repetition::REQUIRED, fields));

parquet::WriterProperties::Builder builder;
// assign the proper compression
if(compression == SNAPPY_COMP) {
builder.compression(parquet::Compression::SNAPPY);
builder.encoding(parquet::Encoding::RLE);
} else if (compression == GZIP_COMP) {
builder.compression(parquet::Compression::GZIP);
builder.encoding(parquet::Encoding::RLE);
} else if (compression == BROTLI_COMP) {
builder.compression(parquet::Compression::BROTLI);
builder.encoding(parquet::Encoding::RLE);
} else if (compression == ZSTD_COMP) {
builder.compression(parquet::Compression::ZSTD);
builder.encoding(parquet::Encoding::RLE);
} else if (compression == LZ4_COMP) {
builder.compression(parquet::Compression::LZ4);
builder.encoding(parquet::Encoding::RLE);
}
std::shared_ptr<parquet::WriterProperties> props = builder.build();

std::shared_ptr<parquet::ParquetFileWriter> file_writer =
parquet::ParquetFileWriter::Open(out_file, schema, props);

int64_t i = 0;
int64_t numLeft = numelems;
auto segments = (int64_t*)chpl_segs;
auto offsets = (int64_t*)chpl_offsets;
auto chpl_ptr = (uint8_t*)chpl_arr;
int64_t segIdx = 0; // index into segarray segments
int64_t offIdx = 0; // index into the segstring segments
int64_t valIdx = 0; // index into chpl_arr

while(numLeft > 0) { // write all local values to the file
parquet::RowGroupWriter* rg_writer = file_writer->AppendRowGroup();
parquet::ByteArrayWriter* ba_writer =
static_cast<parquet::ByteArrayWriter*>(rg_writer->NextColumn());
int64_t count = 0;
while (numLeft > 0 && count < rowGroupSize) { // ensures rowGroupSize maintained
int64_t segmentLength = segments[segIdx+1] - segments[segIdx];
for (int64_t x = 0; x < segmentLength; x++){
int16_t rep_lvl = (x == 0) ? 0 : 1;
int16_t def_lvl = 3;
parquet::ByteArray value;
value.ptr = reinterpret_cast<const uint8_t*>(&chpl_ptr[valIdx]);
value.len = offsets[offIdx+1] - offsets[offIdx] - 1;
ba_writer->WriteBatch(1, &def_lvl, &rep_lvl, &value);
offIdx++;
valIdx+=offsets[offIdx] - offsets[offIdx-1];
}
segIdx++;
numLeft--;count++;
}
}

file_writer->Close();
ARROWSTATUS_OK(out_file->Close());
return 0;
} else {
return ARROWERROR;
}
} catch (const std::exception& e) {
*errMsg = strdup(e.what());
return ARROWERROR;
}
}

int cpp_writeListColumnToParquet(const char* filename, void* chpl_segs, void* chpl_arr,
const char* dsetname, int64_t numelems,
int64_t rowGroupSize, int64_t dtype, int64_t compression,
char** errMsg) {
Expand Down Expand Up @@ -1311,9 +1447,9 @@ int cpp_writeListColumnToParquet(const char* filename, void* chpl_arr, void* chp

int64_t i = 0;
int64_t numLeft = numelems;
auto offsets = (int64_t*)chpl_offsets;
auto segments = (int64_t*)chpl_segs;
int64_t valIdx = 0; // index into chpl_arr
int64_t offIdx = 0; // index into offsets
int64_t segIdx = 0; // index into offsets

if(dtype == ARROWINT64 || dtype == ARROWUINT64) {
auto chpl_ptr = (int64_t*)chpl_arr;
Expand All @@ -1322,9 +1458,9 @@ int cpp_writeListColumnToParquet(const char* filename, void* chpl_arr, void* chp
parquet::RowGroupWriter* rg_writer = file_writer->AppendRowGroup();
parquet::Int64Writer* writer =
static_cast<parquet::Int64Writer*>(rg_writer->NextColumn());

while (numLeft > 0 && offIdx < rowGroupSize) { // ensures rowGroupSize maintained
int64_t batchSize = offsets[offIdx+1] - offsets[offIdx];
int64_t count = 0;
while (numLeft > 0 && count < rowGroupSize) { // ensures rowGroupSize maintained
int64_t batchSize = segments[segIdx+1] - segments[segIdx];
if (batchSize > 0) {
int16_t* def_lvl = new int16_t[batchSize] { 3 };
int16_t* rep_lvl = new int16_t[batchSize] { 0 };
Expand All @@ -1344,7 +1480,8 @@ int cpp_writeListColumnToParquet(const char* filename, void* chpl_arr, void* chp
int16_t* rep_lvl = new int16_t[batchSize] { 0 };
writer->WriteBatch(batchSize, def_lvl, rep_lvl, nullptr);
}
offIdx++;
count++;
segIdx++;
numLeft--;
}
}
Expand All @@ -1356,9 +1493,9 @@ int cpp_writeListColumnToParquet(const char* filename, void* chpl_arr, void* chp
parquet::RowGroupWriter* rg_writer = file_writer->AppendRowGroup();
parquet::BoolWriter* writer =
static_cast<parquet::BoolWriter*>(rg_writer->NextColumn());

while (numLeft > 0 && offIdx < rowGroupSize) {
int64_t batchSize = offsets[offIdx+1] - offsets[offIdx];
int64_t count = 0;
while (numLeft > 0 && count < rowGroupSize) {
int64_t batchSize = segments[segIdx+1] - segments[segIdx];
if (batchSize > 0) {
// if the value is first in the segment rep_lvl = 0, otherwise 1
// all values defined at the item level (3)
Expand All @@ -1378,7 +1515,8 @@ int cpp_writeListColumnToParquet(const char* filename, void* chpl_arr, void* chp
int16_t* rep_lvl = new int16_t[batchSize] { 0 };
writer->WriteBatch(batchSize, def_lvl, rep_lvl, nullptr);
}
offIdx++;
count++;
segIdx++;
numLeft--;
}
}
Expand All @@ -1390,9 +1528,9 @@ int cpp_writeListColumnToParquet(const char* filename, void* chpl_arr, void* chp
parquet::RowGroupWriter* rg_writer = file_writer->AppendRowGroup();
parquet::DoubleWriter* writer =
static_cast<parquet::DoubleWriter*>(rg_writer->NextColumn());

while (numLeft > 0 && offIdx < rowGroupSize) {
int64_t batchSize = offsets[offIdx+1] - offsets[offIdx];
int64_t count = 0;
while (numLeft > 0 && count < rowGroupSize) {
int64_t batchSize = segments[segIdx+1] - segments[segIdx];
if (batchSize > 0) {
// if the value is first in the segment rep_lvl = 0, otherwise 1
// all values defined at the item level (3)
Expand All @@ -1412,7 +1550,8 @@ int cpp_writeListColumnToParquet(const char* filename, void* chpl_arr, void* chp
int16_t* rep_lvl = new int16_t[batchSize] { 0 };
writer->WriteBatch(batchSize, def_lvl, rep_lvl, nullptr);
}
offIdx++;
count++;
segIdx++;
numLeft--;
}
}
Expand Down Expand Up @@ -1747,11 +1886,19 @@ extern "C" {
dsetname, numelems, rowGroupSize, dtype, compression, errMsg);
}

int c_writeListColumnToParquet(const char* filename, void* chpl_arr, void* chpl_offsets,
int c_writeListColumnToParquet(const char* filename, void* chpl_segs, void* chpl_arr,
const char* dsetname, int64_t numelems,
int64_t rowGroupSize, int64_t dtype, int64_t compression,
char** errMsg) {
return cpp_writeListColumnToParquet(filename, chpl_segs, chpl_arr,
dsetname, numelems, rowGroupSize, dtype, compression, errMsg);
}

int c_writeStrListColumnToParquet(const char* filename, void* chpl_segs, void* chpl_offsets, void* chpl_arr,
const char* dsetname, int64_t numelems,
int64_t rowGroupSize, int64_t dtype, int64_t compression,
char** errMsg) {
return cpp_writeListColumnToParquet(filename, chpl_arr, chpl_offsets,
return cpp_writeStrListColumnToParquet(filename, chpl_segs, chpl_offsets, chpl_arr,
dsetname, numelems, rowGroupSize, dtype, compression, errMsg);
}

Expand Down
13 changes: 11 additions & 2 deletions src/ArrowFunctions.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,23 @@ extern "C" {
int cpp_createEmptyListParquetFile(const char* filename, const char* dsetname, int64_t dtype,
int64_t compression, char** errMsg);

int c_writeListColumnToParquet(const char* filename, void* chpl_arr, void* chpl_offsets,
int c_writeListColumnToParquet(const char* filename, void* chpl_offsets, void* chpl_arr,
const char* dsetname, int64_t numelems,
int64_t rowGroupSize, int64_t dtype, int64_t compression,
char** errMsg);
int cpp_writeListColumnToParquet(const char* filename, void* chpl_arr, void* chpl_offsets,
int cpp_writeListColumnToParquet(const char* filename, void* chpl_offsets, void* chpl_arr,
const char* dsetname, int64_t numelems,
int64_t rowGroupSize, int64_t dtype, int64_t compression,
char** errMsg);

int c_writeStrListColumnToParquet(const char* filename, void* chpl_segs, void* chpl_offsets,
void* chpl_arr, const char* dsetname, int64_t numelems,
int64_t rowGroupSize, int64_t dtype, int64_t compression,
char** errMsg);
int cpp_writeStrListColumnToParquet(const char* filename, void* chpl_segs, void* chpl_offsets,
void* chpl_arr, const char* dsetname, int64_t numelems,
int64_t rowGroupSize, int64_t dtype, int64_t compression,
char** errMsg);

int c_createEmptyParquetFile(const char* filename, const char* dsetname, int64_t dtype,
int64_t compression, char** errMsg);
Expand Down
Loading

0 comments on commit eeebe70

Please sign in to comment.