diff --git a/be/src/util/block_compression.cpp b/be/src/util/block_compression.cpp index 21f7e72f5d935f..43995c47af01f0 100644 --- a/be/src/util/block_compression.cpp +++ b/be/src/util/block_compression.cpp @@ -140,37 +140,44 @@ class Lz4BlockCompression : public BlockCompressionCodec { _release_compression_ctx(std::move(context)); } }}; - Slice compressed_buf; - size_t max_len = max_compressed_len(input.size); - if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { - // use output directly - output->resize(max_len); - compressed_buf.data = reinterpret_cast(output->data()); - compressed_buf.size = max_len; - } else { - // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE - { - // context->buffer is resuable between queries, should accouting to - // global tracker. - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( - ExecEnv::GetInstance()->block_compression_mem_tracker()); - context->buffer->resize(max_len); + + try { + Slice compressed_buf; + size_t max_len = max_compressed_len(input.size); + if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { + // use output directly + output->resize(max_len); + compressed_buf.data = reinterpret_cast(output->data()); + compressed_buf.size = max_len; + } else { + // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE + { + // context->buffer is resuable between queries, should accouting to + // global tracker. + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + context->buffer->resize(max_len); + } + compressed_buf.data = reinterpret_cast(context->buffer->data()); + compressed_buf.size = max_len; } - compressed_buf.data = reinterpret_cast(context->buffer->data()); - compressed_buf.size = max_len; - } - size_t compressed_len = - LZ4_compress_fast_continue(context->ctx, input.data, compressed_buf.data, - input.size, compressed_buf.size, ACCELARATION); - if (compressed_len == 0) { - compress_failed = true; - return Status::InvalidArgument("Output buffer's capacity is not enough, size={}", - compressed_buf.size); - } - output->resize(compressed_len); - if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { - output->assign_copy(reinterpret_cast(compressed_buf.data), compressed_len); + size_t compressed_len = + LZ4_compress_fast_continue(context->ctx, input.data, compressed_buf.data, + input.size, compressed_buf.size, ACCELARATION); + if (compressed_len == 0) { + compress_failed = true; + return Status::InvalidArgument("Output buffer's capacity is not enough, size={}", + compressed_buf.size); + } + output->resize(compressed_len); + if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { + output->assign_copy(reinterpret_cast(compressed_buf.data), compressed_len); + } + } catch (...) { + // Do not set compress_failed to release context + DCHECK(!compress_failed); + return Status::InternalError("Fail to do LZ4Block compress due to exception"); } return Status::OK(); } @@ -323,54 +330,61 @@ class Lz4fBlockCompression : public BlockCompressionCodec { _release_compression_ctx(std::move(context)); } }}; - Slice compressed_buf; - size_t max_len = max_compressed_len(uncompressed_size); - if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { - // use output directly - output->resize(max_len); - compressed_buf.data = reinterpret_cast(output->data()); - compressed_buf.size = max_len; - } else { - { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( - ExecEnv::GetInstance()->block_compression_mem_tracker()); - // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE - context->buffer->resize(max_len); + + try { + Slice compressed_buf; + size_t max_len = max_compressed_len(uncompressed_size); + if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { + // use output directly + output->resize(max_len); + compressed_buf.data = reinterpret_cast(output->data()); + compressed_buf.size = max_len; + } else { + { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE + context->buffer->resize(max_len); + } + compressed_buf.data = reinterpret_cast(context->buffer->data()); + compressed_buf.size = max_len; } - compressed_buf.data = reinterpret_cast(context->buffer->data()); - compressed_buf.size = max_len; - } - auto wbytes = LZ4F_compressBegin(context->ctx, compressed_buf.data, compressed_buf.size, - &_s_preferences); - if (LZ4F_isError(wbytes)) { - compress_failed = true; - return Status::InvalidArgument("Fail to do LZ4F compress begin, res={}", - LZ4F_getErrorName(wbytes)); - } - size_t offset = wbytes; - for (auto input : inputs) { - wbytes = LZ4F_compressUpdate(context->ctx, compressed_buf.data + offset, - compressed_buf.size - offset, input.data, input.size, - nullptr); + auto wbytes = LZ4F_compressBegin(context->ctx, compressed_buf.data, compressed_buf.size, + &_s_preferences); if (LZ4F_isError(wbytes)) { compress_failed = true; - return Status::InvalidArgument("Fail to do LZ4F compress update, res={}", - LZ4F_getErrorName(wbytes)); + return Status::InvalidArgument("Fail to do LZ4F compress begin, res={}", + LZ4F_getErrorName(wbytes)); + } + size_t offset = wbytes; + for (auto input : inputs) { + wbytes = LZ4F_compressUpdate(context->ctx, compressed_buf.data + offset, + compressed_buf.size - offset, input.data, input.size, + nullptr); + if (LZ4F_isError(wbytes)) { + compress_failed = true; + return Status::InvalidArgument("Fail to do LZ4F compress update, res={}", + LZ4F_getErrorName(wbytes)); + } + offset += wbytes; + } + wbytes = LZ4F_compressEnd(context->ctx, compressed_buf.data + offset, + compressed_buf.size - offset, nullptr); + if (LZ4F_isError(wbytes)) { + compress_failed = true; + return Status::InvalidArgument("Fail to do LZ4F compress end, res={}", + LZ4F_getErrorName(wbytes)); } offset += wbytes; - } - wbytes = LZ4F_compressEnd(context->ctx, compressed_buf.data + offset, - compressed_buf.size - offset, nullptr); - if (LZ4F_isError(wbytes)) { - compress_failed = true; - return Status::InvalidArgument("Fail to do LZ4F compress end, res={}", - LZ4F_getErrorName(wbytes)); - } - offset += wbytes; - output->resize(offset); - if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { - output->assign_copy(reinterpret_cast(compressed_buf.data), offset); + output->resize(offset); + if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { + output->assign_copy(reinterpret_cast(compressed_buf.data), offset); + } + } catch (...) { + // Do not set compress_failed to release context + DCHECK(!compress_failed); + return Status::InternalError("Fail to do LZ4F compress due to exception"); } return Status::OK(); @@ -523,34 +537,41 @@ class Lz4HCBlockCompression : public BlockCompressionCodec { _release_compression_ctx(std::move(context)); } }}; - Slice compressed_buf; - size_t max_len = max_compressed_len(input.size); - if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { - // use output directly - output->resize(max_len); - compressed_buf.data = reinterpret_cast(output->data()); - compressed_buf.size = max_len; - } else { - { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( - ExecEnv::GetInstance()->block_compression_mem_tracker()); - // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE - context->buffer->resize(max_len); + + try { + Slice compressed_buf; + size_t max_len = max_compressed_len(input.size); + if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { + // use output directly + output->resize(max_len); + compressed_buf.data = reinterpret_cast(output->data()); + compressed_buf.size = max_len; + } else { + { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE + context->buffer->resize(max_len); + } + compressed_buf.data = reinterpret_cast(context->buffer->data()); + compressed_buf.size = max_len; } - compressed_buf.data = reinterpret_cast(context->buffer->data()); - compressed_buf.size = max_len; - } - size_t compressed_len = LZ4_compress_HC_continue( - context->ctx, input.data, compressed_buf.data, input.size, compressed_buf.size); - if (compressed_len == 0) { - compress_failed = true; - return Status::InvalidArgument("Output buffer's capacity is not enough, size={}", - compressed_buf.size); - } - output->resize(compressed_len); - if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { - output->assign_copy(reinterpret_cast(compressed_buf.data), compressed_len); + size_t compressed_len = LZ4_compress_HC_continue( + context->ctx, input.data, compressed_buf.data, input.size, compressed_buf.size); + if (compressed_len == 0) { + compress_failed = true; + return Status::InvalidArgument("Output buffer's capacity is not enough, size={}", + compressed_buf.size); + } + output->resize(compressed_len); + if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { + output->assign_copy(reinterpret_cast(compressed_buf.data), compressed_len); + } + } catch (...) { + // Do not set compress_failed to release context + DCHECK(!compress_failed); + return Status::InternalError("Fail to do LZ4HC compress due to exception"); } return Status::OK(); } @@ -853,71 +874,77 @@ class ZstdBlockCompression : public BlockCompressionCodec { } }}; - size_t max_len = max_compressed_len(uncompressed_size); - Slice compressed_buf; - if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { - // use output directly - output->resize(max_len); - compressed_buf.data = reinterpret_cast(output->data()); - compressed_buf.size = max_len; - } else { - { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( - ExecEnv::GetInstance()->block_compression_mem_tracker()); - // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE - context->buffer->resize(max_len); + try { + size_t max_len = max_compressed_len(uncompressed_size); + Slice compressed_buf; + if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { + // use output directly + output->resize(max_len); + compressed_buf.data = reinterpret_cast(output->data()); + compressed_buf.size = max_len; + } else { + { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE + context->buffer->resize(max_len); + } + compressed_buf.data = reinterpret_cast(context->buffer->data()); + compressed_buf.size = max_len; } - compressed_buf.data = reinterpret_cast(context->buffer->data()); - compressed_buf.size = max_len; - } - // set compression level to default 3 - auto ret = - ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_compressionLevel, ZSTD_CLEVEL_DEFAULT); - if (ZSTD_isError(ret)) { - return Status::InvalidArgument("ZSTD_CCtx_setParameter compression level error: {}", - ZSTD_getErrorString(ZSTD_getErrorCode(ret))); - } - // set checksum flag to 1 - ret = ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_checksumFlag, 1); - if (ZSTD_isError(ret)) { - return Status::InvalidArgument("ZSTD_CCtx_setParameter checksumFlag error: {}", - ZSTD_getErrorString(ZSTD_getErrorCode(ret))); - } + // set compression level to default 3 + auto ret = + ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_compressionLevel, ZSTD_CLEVEL_DEFAULT); + if (ZSTD_isError(ret)) { + return Status::InvalidArgument("ZSTD_CCtx_setParameter compression level error: {}", + ZSTD_getErrorString(ZSTD_getErrorCode(ret))); + } + // set checksum flag to 1 + ret = ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_checksumFlag, 1); + if (ZSTD_isError(ret)) { + return Status::InvalidArgument("ZSTD_CCtx_setParameter checksumFlag error: {}", + ZSTD_getErrorString(ZSTD_getErrorCode(ret))); + } - ZSTD_outBuffer out_buf = {compressed_buf.data, compressed_buf.size, 0}; + ZSTD_outBuffer out_buf = {compressed_buf.data, compressed_buf.size, 0}; - for (size_t i = 0; i < inputs.size(); i++) { - ZSTD_inBuffer in_buf = {inputs[i].data, inputs[i].size, 0}; + for (size_t i = 0; i < inputs.size(); i++) { + ZSTD_inBuffer in_buf = {inputs[i].data, inputs[i].size, 0}; - bool last_input = (i == inputs.size() - 1); - auto mode = last_input ? ZSTD_e_end : ZSTD_e_continue; + bool last_input = (i == inputs.size() - 1); + auto mode = last_input ? ZSTD_e_end : ZSTD_e_continue; - bool finished = false; - do { - // do compress - auto ret = ZSTD_compressStream2(context->ctx, &out_buf, &in_buf, mode); + bool finished = false; + do { + // do compress + auto ret = ZSTD_compressStream2(context->ctx, &out_buf, &in_buf, mode); - if (ZSTD_isError(ret)) { - compress_failed = true; - return Status::InvalidArgument("ZSTD_compressStream2 error: {}", - ZSTD_getErrorString(ZSTD_getErrorCode(ret))); - } + if (ZSTD_isError(ret)) { + compress_failed = true; + return Status::InvalidArgument("ZSTD_compressStream2 error: {}", + ZSTD_getErrorString(ZSTD_getErrorCode(ret))); + } - // ret is ZSTD hint for needed output buffer size - if (ret > 0 && out_buf.pos == out_buf.size) { - compress_failed = true; - return Status::InvalidArgument("ZSTD_compressStream2 output buffer full"); - } + // ret is ZSTD hint for needed output buffer size + if (ret > 0 && out_buf.pos == out_buf.size) { + compress_failed = true; + return Status::InvalidArgument("ZSTD_compressStream2 output buffer full"); + } - finished = last_input ? (ret == 0) : (in_buf.pos == inputs[i].size); - } while (!finished); - } + finished = last_input ? (ret == 0) : (in_buf.pos == inputs[i].size); + } while (!finished); + } - // set compressed size for caller - output->resize(out_buf.pos); - if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { - output->assign_copy(reinterpret_cast(compressed_buf.data), out_buf.pos); + // set compressed size for caller + output->resize(out_buf.pos); + if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { + output->assign_copy(reinterpret_cast(compressed_buf.data), out_buf.pos); + } + } catch (...) { + // Do not set compress_failed to release context + DCHECK(!compress_failed); + return Status::InternalError("Fail to do ZSTD compress due to exception"); } return Status::OK();