Skip to content

Commit

Permalink
add flag to tof compressor to skip large payloads
Browse files Browse the repository at this point in the history
  • Loading branch information
noferini committed Oct 7, 2023
1 parent e1f6469 commit 460c1e6
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@ template <typename RDH, bool verbose, bool paranoid>
class CompressorTask : public Task
{
public:
CompressorTask() = default;
CompressorTask(long payloadLim = -1) : mPayloadLimit(payloadLim) {}
~CompressorTask() override = default;
void init(InitContext& ic) final;
void run(ProcessingContext& pc) final;

private:
Compressor<RDH, verbose, paranoid> mCompressor;
int mOutputBufferSize;
long mPayloadLimit = -1;
};

} // namespace tof
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@ template <typename RDH, bool verbose, bool paranoid>
class CompressorTaskOld : public Task
{
public:
CompressorTaskOld() = default;
CompressorTaskOld(long payloadLim = -1) : mPayloadLimit(payloadLim) {}
~CompressorTaskOld() override = default;
void init(InitContext& ic) final;
void run(ProcessingContext& pc) final;

private:
Compressor<RDH, verbose, paranoid> mCompressor;
int mOutputBufferSize;
long mPayloadLimit = -1;
};

} // namespace tof
Expand Down
11 changes: 10 additions & 1 deletion Detectors/TOF/compression/src/CompressorTask.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ namespace o2::tof
template <typename RDH, bool verbose, bool paranoid>
void CompressorTask<RDH, verbose, paranoid>::init(InitContext& ic)
{
LOG(info) << "Compressor init";
if (mPayloadLimit < 0) {
LOG(info) << "Compressor init";
} else {
LOG(info) << "Compressor init with Payload limit at " << mPayloadLimit;
}

auto decoderCONET = ic.options().get<bool>("tof-compressor-conet-mode");
auto decoderVerbose = ic.options().get<bool>("tof-compressor-decoder-verbose");
Expand Down Expand Up @@ -137,6 +141,11 @@ void CompressorTask<RDH, verbose, paranoid>::run(ProcessingContext& pc)
auto payloadIn = ref.payload;
auto payloadInSize = DataRefUtils::getPayloadSize(ref);

if (mPayloadLimit > -1 && payloadInSize > mPayloadLimit) {
LOG(error) << "Payload larger than limit (" << mPayloadLimit << "), payload = " << payloadInSize;
continue;
}

/** prepare compressor **/
mCompressor.setDecoderBuffer(payloadIn);
mCompressor.setDecoderBufferSize(payloadInSize);
Expand Down
11 changes: 10 additions & 1 deletion Detectors/TOF/compression/src/CompressorTaskOld.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ namespace tof
template <typename RDH, bool verbose, bool paranoid>
void CompressorTaskOld<RDH, verbose, paranoid>::init(InitContext& ic)
{
LOG(info) << "Compressor init";
if (mPayloadLimit < 0) {
LOG(info) << "Compressor init";
} else {
LOG(info) << "Compressor init with Payload limit at " << mPayloadLimit;
}

auto decoderCONET = ic.options().get<bool>("tof-compressor-conet-mode");
auto decoderVerbose = ic.options().get<bool>("tof-compressor-decoder-verbose");
Expand Down Expand Up @@ -166,6 +170,11 @@ void CompressorTaskOld<RDH, verbose, paranoid>::run(ProcessingContext& pc)
auto payloadIn = ref.payload;
auto payloadInSize = DataRefUtils::getPayloadSize(ref);

if (mPayloadLimit > -1 && payloadInSize > mPayloadLimit) {
LOG(error) << "Payload larger than limit (" << mPayloadLimit << "), payload = " << payloadInSize;
continue;
}

/** prepare compressor **/
mCompressor.setDecoderBuffer(payloadIn);
mCompressor.setDecoderBufferSize(payloadInSize);
Expand Down
19 changes: 11 additions & 8 deletions Detectors/TOF/compression/src/tof-compressor.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@ void customize(std::vector<ConfigParamSpec>& workflowOptions)
auto verbose = ConfigParamSpec{"tof-compressor-verbose", VariantType::Bool, false, {"Enable verbose compressor"}};
auto paranoid = ConfigParamSpec{"tof-compressor-paranoid", VariantType::Bool, false, {"Enable paranoid compressor"}};
auto ignoreStf = ConfigParamSpec{"ignore-dist-stf", VariantType::Bool, false, {"do not subscribe to FLP/DISTSUBTIMEFRAME/0 message (no lost TF recovery)"}};
auto payloadlim = ConfigParamSpec{"payload-limit", VariantType::Int64, -1ll, {"Payload limit in Byte (-1 -> no limits)"}};

workflowOptions.push_back(config);
workflowOptions.push_back(outputDesc);
workflowOptions.push_back(rdhVersion);
workflowOptions.push_back(verbose);
workflowOptions.push_back(paranoid);
workflowOptions.push_back(ignoreStf);
workflowOptions.push_back(payloadlim);
workflowOptions.emplace_back(ConfigParamSpec{"old", VariantType::Bool, false, {"use the non-DPL version of the compressor"}});
workflowOptions.push_back(ConfigParamSpec{"configKeyValues", VariantType::String, "", {"Semicolon separated key=value strings"}});
}
Expand All @@ -60,6 +62,7 @@ WorkflowSpec defineDataProcessing(ConfigContext const& cfgc)
auto paranoid = cfgc.options().get<bool>("tof-compressor-paranoid");
auto ignoreStf = cfgc.options().get<bool>("ignore-dist-stf");
auto old = cfgc.options().get<bool>("old");
auto payloadLim = cfgc.options().get<long>("payload-limit");

std::vector<OutputSpec> outputs;
outputs.emplace_back(OutputSpec(ConcreteDataTypeMatcher{"TOF", "CRAWDATA"}));
Expand All @@ -68,30 +71,30 @@ WorkflowSpec defineDataProcessing(ConfigContext const& cfgc)
if (rdhVersion == o2::raw::RDHUtils::getVersion<o2::header::RAWDataHeader>()) {
if (!verbose && !paranoid) {
if (old) {
algoSpec = AlgorithmSpec{adaptFromTask<o2::tof::CompressorTaskOld<o2::header::RAWDataHeader, false, false>>()};
algoSpec = AlgorithmSpec{adaptFromTask<o2::tof::CompressorTaskOld<o2::header::RAWDataHeader, false, false>>(payloadLim)};
} else {
algoSpec = AlgorithmSpec{adaptFromTask<o2::tof::CompressorTask<o2::header::RAWDataHeader, false, false>>()};
algoSpec = AlgorithmSpec{adaptFromTask<o2::tof::CompressorTask<o2::header::RAWDataHeader, false, false>>(payloadLim)};
}
}
if (!verbose && paranoid) {
if (old) {
algoSpec = AlgorithmSpec{adaptFromTask<o2::tof::CompressorTaskOld<o2::header::RAWDataHeader, false, true>>()};
algoSpec = AlgorithmSpec{adaptFromTask<o2::tof::CompressorTaskOld<o2::header::RAWDataHeader, false, true>>(payloadLim)};
} else {
algoSpec = AlgorithmSpec{adaptFromTask<o2::tof::CompressorTask<o2::header::RAWDataHeader, false, true>>()};
algoSpec = AlgorithmSpec{adaptFromTask<o2::tof::CompressorTask<o2::header::RAWDataHeader, false, true>>(payloadLim)};
}
}
if (verbose && !paranoid) {
if (old) {
algoSpec = AlgorithmSpec{adaptFromTask<o2::tof::CompressorTaskOld<o2::header::RAWDataHeader, true, false>>()};
algoSpec = AlgorithmSpec{adaptFromTask<o2::tof::CompressorTaskOld<o2::header::RAWDataHeader, true, false>>(payloadLim)};
} else {
algoSpec = AlgorithmSpec{adaptFromTask<o2::tof::CompressorTask<o2::header::RAWDataHeader, true, false>>()};
algoSpec = AlgorithmSpec{adaptFromTask<o2::tof::CompressorTask<o2::header::RAWDataHeader, true, false>>(payloadLim)};
}
}
if (verbose && paranoid) {
if (old) {
algoSpec = AlgorithmSpec{adaptFromTask<o2::tof::CompressorTaskOld<o2::header::RAWDataHeader, true, true>>()};
algoSpec = AlgorithmSpec{adaptFromTask<o2::tof::CompressorTaskOld<o2::header::RAWDataHeader, true, true>>(payloadLim)};
} else {
algoSpec = AlgorithmSpec{adaptFromTask<o2::tof::CompressorTask<o2::header::RAWDataHeader, true, true>>()};
algoSpec = AlgorithmSpec{adaptFromTask<o2::tof::CompressorTask<o2::header::RAWDataHeader, true, true>>(payloadLim)};
}
}
}
Expand Down

0 comments on commit 460c1e6

Please sign in to comment.