From 460c1e6aba196383ce5fc7857ade7f5a20a96553 Mon Sep 17 00:00:00 2001 From: noferini Date: Sat, 7 Oct 2023 11:58:09 +0200 Subject: [PATCH] add flag to tof compressor to skip large payloads --- .../include/TOFCompression/CompressorTask.h | 3 ++- .../TOFCompression/CompressorTaskOld.h | 3 ++- .../TOF/compression/src/CompressorTask.cxx | 11 ++++++++++- .../TOF/compression/src/CompressorTaskOld.cxx | 11 ++++++++++- .../TOF/compression/src/tof-compressor.cxx | 19 +++++++++++-------- 5 files changed, 35 insertions(+), 12 deletions(-) diff --git a/Detectors/TOF/compression/include/TOFCompression/CompressorTask.h b/Detectors/TOF/compression/include/TOFCompression/CompressorTask.h index e18cde9d3dc4e..cd3823857677d 100644 --- a/Detectors/TOF/compression/include/TOFCompression/CompressorTask.h +++ b/Detectors/TOF/compression/include/TOFCompression/CompressorTask.h @@ -33,7 +33,7 @@ template class CompressorTask : public Task { public: - CompressorTask() = default; + CompressorTask(long payloadLim = -1) : mPayloadLimit(payloadLim) {} ~CompressorTask() override = default; void init(InitContext& ic) final; void run(ProcessingContext& pc) final; @@ -41,6 +41,7 @@ class CompressorTask : public Task private: Compressor mCompressor; int mOutputBufferSize; + long mPayloadLimit = -1; }; } // namespace tof diff --git a/Detectors/TOF/compression/include/TOFCompression/CompressorTaskOld.h b/Detectors/TOF/compression/include/TOFCompression/CompressorTaskOld.h index 12beead1087ba..ed78dd88caa64 100644 --- a/Detectors/TOF/compression/include/TOFCompression/CompressorTaskOld.h +++ b/Detectors/TOF/compression/include/TOFCompression/CompressorTaskOld.h @@ -33,7 +33,7 @@ template class CompressorTaskOld : public Task { public: - CompressorTaskOld() = default; + CompressorTaskOld(long payloadLim = -1) : mPayloadLimit(payloadLim) {} ~CompressorTaskOld() override = default; void init(InitContext& ic) final; void run(ProcessingContext& pc) final; @@ -41,6 +41,7 @@ class CompressorTaskOld : public Task private: Compressor mCompressor; int mOutputBufferSize; + long mPayloadLimit = -1; }; } // namespace tof diff --git a/Detectors/TOF/compression/src/CompressorTask.cxx b/Detectors/TOF/compression/src/CompressorTask.cxx index 90235c86f40cf..cad61e9ed0576 100644 --- a/Detectors/TOF/compression/src/CompressorTask.cxx +++ b/Detectors/TOF/compression/src/CompressorTask.cxx @@ -30,7 +30,11 @@ namespace o2::tof template void CompressorTask::init(InitContext& ic) { - LOG(info) << "Compressor init"; + if (mPayloadLimit < 0) { + LOG(info) << "Compressor init"; + } else { + LOG(info) << "Compressor init with Payload limit at " << mPayloadLimit; + } auto decoderCONET = ic.options().get("tof-compressor-conet-mode"); auto decoderVerbose = ic.options().get("tof-compressor-decoder-verbose"); @@ -137,6 +141,11 @@ void CompressorTask::run(ProcessingContext& pc) auto payloadIn = ref.payload; auto payloadInSize = DataRefUtils::getPayloadSize(ref); + if (mPayloadLimit > -1 && payloadInSize > mPayloadLimit) { + LOG(error) << "Payload larger than limit (" << mPayloadLimit << "), payload = " << payloadInSize; + continue; + } + /** prepare compressor **/ mCompressor.setDecoderBuffer(payloadIn); mCompressor.setDecoderBufferSize(payloadInSize); diff --git a/Detectors/TOF/compression/src/CompressorTaskOld.cxx b/Detectors/TOF/compression/src/CompressorTaskOld.cxx index 333b79865de2d..1a4e28393a8c3 100644 --- a/Detectors/TOF/compression/src/CompressorTaskOld.cxx +++ b/Detectors/TOF/compression/src/CompressorTaskOld.cxx @@ -36,7 +36,11 @@ namespace tof template void CompressorTaskOld::init(InitContext& ic) { - LOG(info) << "Compressor init"; + if (mPayloadLimit < 0) { + LOG(info) << "Compressor init"; + } else { + LOG(info) << "Compressor init with Payload limit at " << mPayloadLimit; + } auto decoderCONET = ic.options().get("tof-compressor-conet-mode"); auto decoderVerbose = ic.options().get("tof-compressor-decoder-verbose"); @@ -166,6 +170,11 @@ void CompressorTaskOld::run(ProcessingContext& pc) auto payloadIn = ref.payload; auto payloadInSize = DataRefUtils::getPayloadSize(ref); + if (mPayloadLimit > -1 && payloadInSize > mPayloadLimit) { + LOG(error) << "Payload larger than limit (" << mPayloadLimit << "), payload = " << payloadInSize; + continue; + } + /** prepare compressor **/ mCompressor.setDecoderBuffer(payloadIn); mCompressor.setDecoderBufferSize(payloadInSize); diff --git a/Detectors/TOF/compression/src/tof-compressor.cxx b/Detectors/TOF/compression/src/tof-compressor.cxx index 9d248fcaf8db2..604609e1080d6 100644 --- a/Detectors/TOF/compression/src/tof-compressor.cxx +++ b/Detectors/TOF/compression/src/tof-compressor.cxx @@ -36,6 +36,7 @@ void customize(std::vector& workflowOptions) auto verbose = ConfigParamSpec{"tof-compressor-verbose", VariantType::Bool, false, {"Enable verbose compressor"}}; auto paranoid = ConfigParamSpec{"tof-compressor-paranoid", VariantType::Bool, false, {"Enable paranoid compressor"}}; auto ignoreStf = ConfigParamSpec{"ignore-dist-stf", VariantType::Bool, false, {"do not subscribe to FLP/DISTSUBTIMEFRAME/0 message (no lost TF recovery)"}}; + auto payloadlim = ConfigParamSpec{"payload-limit", VariantType::Int64, -1ll, {"Payload limit in Byte (-1 -> no limits)"}}; workflowOptions.push_back(config); workflowOptions.push_back(outputDesc); @@ -43,6 +44,7 @@ void customize(std::vector& workflowOptions) workflowOptions.push_back(verbose); workflowOptions.push_back(paranoid); workflowOptions.push_back(ignoreStf); + workflowOptions.push_back(payloadlim); workflowOptions.emplace_back(ConfigParamSpec{"old", VariantType::Bool, false, {"use the non-DPL version of the compressor"}}); workflowOptions.push_back(ConfigParamSpec{"configKeyValues", VariantType::String, "", {"Semicolon separated key=value strings"}}); } @@ -60,6 +62,7 @@ WorkflowSpec defineDataProcessing(ConfigContext const& cfgc) auto paranoid = cfgc.options().get("tof-compressor-paranoid"); auto ignoreStf = cfgc.options().get("ignore-dist-stf"); auto old = cfgc.options().get("old"); + auto payloadLim = cfgc.options().get("payload-limit"); std::vector outputs; outputs.emplace_back(OutputSpec(ConcreteDataTypeMatcher{"TOF", "CRAWDATA"})); @@ -68,30 +71,30 @@ WorkflowSpec defineDataProcessing(ConfigContext const& cfgc) if (rdhVersion == o2::raw::RDHUtils::getVersion()) { if (!verbose && !paranoid) { if (old) { - algoSpec = AlgorithmSpec{adaptFromTask>()}; + algoSpec = AlgorithmSpec{adaptFromTask>(payloadLim)}; } else { - algoSpec = AlgorithmSpec{adaptFromTask>()}; + algoSpec = AlgorithmSpec{adaptFromTask>(payloadLim)}; } } if (!verbose && paranoid) { if (old) { - algoSpec = AlgorithmSpec{adaptFromTask>()}; + algoSpec = AlgorithmSpec{adaptFromTask>(payloadLim)}; } else { - algoSpec = AlgorithmSpec{adaptFromTask>()}; + algoSpec = AlgorithmSpec{adaptFromTask>(payloadLim)}; } } if (verbose && !paranoid) { if (old) { - algoSpec = AlgorithmSpec{adaptFromTask>()}; + algoSpec = AlgorithmSpec{adaptFromTask>(payloadLim)}; } else { - algoSpec = AlgorithmSpec{adaptFromTask>()}; + algoSpec = AlgorithmSpec{adaptFromTask>(payloadLim)}; } } if (verbose && paranoid) { if (old) { - algoSpec = AlgorithmSpec{adaptFromTask>()}; + algoSpec = AlgorithmSpec{adaptFromTask>(payloadLim)}; } else { - algoSpec = AlgorithmSpec{adaptFromTask>()}; + algoSpec = AlgorithmSpec{adaptFromTask>(payloadLim)}; } } }