Skip to content

Commit

Permalink
[fix](compression) handle exception to reuse compression context
Browse files Browse the repository at this point in the history
Otherwise, there is memleak and new context is allocated, then flush tlb
consumes a lot sys cpu.
  • Loading branch information
dataroaring committed May 24, 2024
1 parent 686e48f commit b24003f
Showing 1 changed file with 179 additions and 152 deletions.
331 changes: 179 additions & 152 deletions be/src/util/block_compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,37 +140,44 @@ class Lz4BlockCompression : public BlockCompressionCodec {
_release_compression_ctx(std::move(context));
}
}};
Slice compressed_buf;
size_t max_len = max_compressed_len(input.size);
if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
// use output directly
output->resize(max_len);
compressed_buf.data = reinterpret_cast<char*>(output->data());
compressed_buf.size = max_len;
} else {
// reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
{
// context->buffer is resuable between queries, should accouting to
// global tracker.
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
context->buffer->resize(max_len);

try {
Slice compressed_buf;
size_t max_len = max_compressed_len(input.size);
if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
// use output directly
output->resize(max_len);
compressed_buf.data = reinterpret_cast<char*>(output->data());
compressed_buf.size = max_len;
} else {
// reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
{
// context->buffer is resuable between queries, should accouting to
// global tracker.
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
context->buffer->resize(max_len);
}
compressed_buf.data = reinterpret_cast<char*>(context->buffer->data());
compressed_buf.size = max_len;
}
compressed_buf.data = reinterpret_cast<char*>(context->buffer->data());
compressed_buf.size = max_len;
}

size_t compressed_len =
LZ4_compress_fast_continue(context->ctx, input.data, compressed_buf.data,
input.size, compressed_buf.size, ACCELARATION);
if (compressed_len == 0) {
compress_failed = true;
return Status::InvalidArgument("Output buffer's capacity is not enough, size={}",
compressed_buf.size);
}
output->resize(compressed_len);
if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), compressed_len);
size_t compressed_len =
LZ4_compress_fast_continue(context->ctx, input.data, compressed_buf.data,
input.size, compressed_buf.size, ACCELARATION);
if (compressed_len == 0) {
compress_failed = true;
return Status::InvalidArgument("Output buffer's capacity is not enough, size={}",
compressed_buf.size);
}
output->resize(compressed_len);
if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), compressed_len);
}
} catch (...) {
// Do not set compress_failed to release context
DCHECK(!compress_failed);
return Status::InternalError("Fail to do LZ4Block compress due to exception");
}
return Status::OK();
}
Expand Down Expand Up @@ -323,54 +330,61 @@ class Lz4fBlockCompression : public BlockCompressionCodec {
_release_compression_ctx(std::move(context));
}
}};
Slice compressed_buf;
size_t max_len = max_compressed_len(uncompressed_size);
if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
// use output directly
output->resize(max_len);
compressed_buf.data = reinterpret_cast<char*>(output->data());
compressed_buf.size = max_len;
} else {
{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
// reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
context->buffer->resize(max_len);

try {
Slice compressed_buf;
size_t max_len = max_compressed_len(uncompressed_size);
if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
// use output directly
output->resize(max_len);
compressed_buf.data = reinterpret_cast<char*>(output->data());
compressed_buf.size = max_len;
} else {
{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
// reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
context->buffer->resize(max_len);
}
compressed_buf.data = reinterpret_cast<char*>(context->buffer->data());
compressed_buf.size = max_len;
}
compressed_buf.data = reinterpret_cast<char*>(context->buffer->data());
compressed_buf.size = max_len;
}

auto wbytes = LZ4F_compressBegin(context->ctx, compressed_buf.data, compressed_buf.size,
&_s_preferences);
if (LZ4F_isError(wbytes)) {
compress_failed = true;
return Status::InvalidArgument("Fail to do LZ4F compress begin, res={}",
LZ4F_getErrorName(wbytes));
}
size_t offset = wbytes;
for (auto input : inputs) {
wbytes = LZ4F_compressUpdate(context->ctx, compressed_buf.data + offset,
compressed_buf.size - offset, input.data, input.size,
nullptr);
auto wbytes = LZ4F_compressBegin(context->ctx, compressed_buf.data, compressed_buf.size,
&_s_preferences);
if (LZ4F_isError(wbytes)) {
compress_failed = true;
return Status::InvalidArgument("Fail to do LZ4F compress update, res={}",
LZ4F_getErrorName(wbytes));
return Status::InvalidArgument("Fail to do LZ4F compress begin, res={}",
LZ4F_getErrorName(wbytes));
}
size_t offset = wbytes;
for (auto input : inputs) {
wbytes = LZ4F_compressUpdate(context->ctx, compressed_buf.data + offset,
compressed_buf.size - offset, input.data, input.size,
nullptr);
if (LZ4F_isError(wbytes)) {
compress_failed = true;
return Status::InvalidArgument("Fail to do LZ4F compress update, res={}",
LZ4F_getErrorName(wbytes));
}
offset += wbytes;
}
wbytes = LZ4F_compressEnd(context->ctx, compressed_buf.data + offset,
compressed_buf.size - offset, nullptr);
if (LZ4F_isError(wbytes)) {
compress_failed = true;
return Status::InvalidArgument("Fail to do LZ4F compress end, res={}",
LZ4F_getErrorName(wbytes));
}
offset += wbytes;
}
wbytes = LZ4F_compressEnd(context->ctx, compressed_buf.data + offset,
compressed_buf.size - offset, nullptr);
if (LZ4F_isError(wbytes)) {
compress_failed = true;
return Status::InvalidArgument("Fail to do LZ4F compress end, res={}",
LZ4F_getErrorName(wbytes));
}
offset += wbytes;
output->resize(offset);
if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), offset);
output->resize(offset);
if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), offset);
}
} catch (...) {
// Do not set compress_failed to release context
DCHECK(!compress_failed);
return Status::InternalError("Fail to do LZ4F compress due to exception");
}

return Status::OK();
Expand Down Expand Up @@ -523,34 +537,41 @@ class Lz4HCBlockCompression : public BlockCompressionCodec {
_release_compression_ctx(std::move(context));
}
}};
Slice compressed_buf;
size_t max_len = max_compressed_len(input.size);
if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
// use output directly
output->resize(max_len);
compressed_buf.data = reinterpret_cast<char*>(output->data());
compressed_buf.size = max_len;
} else {
{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
// reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
context->buffer->resize(max_len);

try {
Slice compressed_buf;
size_t max_len = max_compressed_len(input.size);
if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
// use output directly
output->resize(max_len);
compressed_buf.data = reinterpret_cast<char*>(output->data());
compressed_buf.size = max_len;
} else {
{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
// reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
context->buffer->resize(max_len);
}
compressed_buf.data = reinterpret_cast<char*>(context->buffer->data());
compressed_buf.size = max_len;
}
compressed_buf.data = reinterpret_cast<char*>(context->buffer->data());
compressed_buf.size = max_len;
}

size_t compressed_len = LZ4_compress_HC_continue(
context->ctx, input.data, compressed_buf.data, input.size, compressed_buf.size);
if (compressed_len == 0) {
compress_failed = true;
return Status::InvalidArgument("Output buffer's capacity is not enough, size={}",
compressed_buf.size);
}
output->resize(compressed_len);
if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), compressed_len);
size_t compressed_len = LZ4_compress_HC_continue(
context->ctx, input.data, compressed_buf.data, input.size, compressed_buf.size);
if (compressed_len == 0) {
compress_failed = true;
return Status::InvalidArgument("Output buffer's capacity is not enough, size={}",
compressed_buf.size);
}
output->resize(compressed_len);
if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), compressed_len);
}
} catch (...) {
// Do not set compress_failed to release context
DCHECK(!compress_failed);
return Status::InternalError("Fail to do LZ4HC compress due to exception");
}
return Status::OK();
}
Expand Down Expand Up @@ -853,71 +874,77 @@ class ZstdBlockCompression : public BlockCompressionCodec {
}
}};

size_t max_len = max_compressed_len(uncompressed_size);
Slice compressed_buf;
if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
// use output directly
output->resize(max_len);
compressed_buf.data = reinterpret_cast<char*>(output->data());
compressed_buf.size = max_len;
} else {
{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
// reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
context->buffer->resize(max_len);
try {
size_t max_len = max_compressed_len(uncompressed_size);
Slice compressed_buf;
if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
// use output directly
output->resize(max_len);
compressed_buf.data = reinterpret_cast<char*>(output->data());
compressed_buf.size = max_len;
} else {
{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->block_compression_mem_tracker());
// reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
context->buffer->resize(max_len);
}
compressed_buf.data = reinterpret_cast<char*>(context->buffer->data());
compressed_buf.size = max_len;
}
compressed_buf.data = reinterpret_cast<char*>(context->buffer->data());
compressed_buf.size = max_len;
}

// set compression level to default 3
auto ret =
ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_compressionLevel, ZSTD_CLEVEL_DEFAULT);
if (ZSTD_isError(ret)) {
return Status::InvalidArgument("ZSTD_CCtx_setParameter compression level error: {}",
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
}
// set checksum flag to 1
ret = ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_checksumFlag, 1);
if (ZSTD_isError(ret)) {
return Status::InvalidArgument("ZSTD_CCtx_setParameter checksumFlag error: {}",
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
}
// set compression level to default 3
auto ret =
ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_compressionLevel, ZSTD_CLEVEL_DEFAULT);
if (ZSTD_isError(ret)) {
return Status::InvalidArgument("ZSTD_CCtx_setParameter compression level error: {}",
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
}
// set checksum flag to 1
ret = ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_checksumFlag, 1);
if (ZSTD_isError(ret)) {
return Status::InvalidArgument("ZSTD_CCtx_setParameter checksumFlag error: {}",
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
}

ZSTD_outBuffer out_buf = {compressed_buf.data, compressed_buf.size, 0};
ZSTD_outBuffer out_buf = {compressed_buf.data, compressed_buf.size, 0};

for (size_t i = 0; i < inputs.size(); i++) {
ZSTD_inBuffer in_buf = {inputs[i].data, inputs[i].size, 0};
for (size_t i = 0; i < inputs.size(); i++) {
ZSTD_inBuffer in_buf = {inputs[i].data, inputs[i].size, 0};

bool last_input = (i == inputs.size() - 1);
auto mode = last_input ? ZSTD_e_end : ZSTD_e_continue;
bool last_input = (i == inputs.size() - 1);
auto mode = last_input ? ZSTD_e_end : ZSTD_e_continue;

bool finished = false;
do {
// do compress
auto ret = ZSTD_compressStream2(context->ctx, &out_buf, &in_buf, mode);
bool finished = false;
do {
// do compress
auto ret = ZSTD_compressStream2(context->ctx, &out_buf, &in_buf, mode);

if (ZSTD_isError(ret)) {
compress_failed = true;
return Status::InvalidArgument("ZSTD_compressStream2 error: {}",
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
}
if (ZSTD_isError(ret)) {
compress_failed = true;
return Status::InvalidArgument("ZSTD_compressStream2 error: {}",
ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
}

// ret is ZSTD hint for needed output buffer size
if (ret > 0 && out_buf.pos == out_buf.size) {
compress_failed = true;
return Status::InvalidArgument("ZSTD_compressStream2 output buffer full");
}
// ret is ZSTD hint for needed output buffer size
if (ret > 0 && out_buf.pos == out_buf.size) {
compress_failed = true;
return Status::InvalidArgument("ZSTD_compressStream2 output buffer full");
}

finished = last_input ? (ret == 0) : (in_buf.pos == inputs[i].size);
} while (!finished);
}
finished = last_input ? (ret == 0) : (in_buf.pos == inputs[i].size);
} while (!finished);
}

// set compressed size for caller
output->resize(out_buf.pos);
if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), out_buf.pos);
// set compressed size for caller
output->resize(out_buf.pos);
if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), out_buf.pos);
}
} catch (...) {
// Do not set compress_failed to release context
DCHECK(!compress_failed);
return Status::InternalError("Fail to do ZSTD compress due to exception");
}

return Status::OK();
Expand Down

0 comments on commit b24003f

Please sign in to comment.