Skip to content

Commit

Permalink
TPC: automatizing creation of IDC scalers during synchronous reco
Browse files Browse the repository at this point in the history
  • Loading branch information
matthias-kleiner committed May 29, 2024
1 parent 9ddc990 commit 7b8f055
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "TPCWorkflow/ProcessingHelpers.h"
#include "TPCBase/CDBInterface.h"
#include "DetectorsCalibration/Utils.h"
#include "TPCCalibration/IDCCCDBHelper.h"

using namespace o2::framework;
using o2::header::gDataOriginTPC;
Expand Down Expand Up @@ -154,6 +155,7 @@ class TPCFactorizeIDCSpec : public o2::framework::Task
}

static constexpr header::DataDescription getDataDescriptionIDC0() { return header::DataDescription{"IDC0"}; }
static constexpr header::DataDescription getDataDescriptionIDC0Mean() { return header::DataDescription{"IDC0MEAN"}; }
static constexpr header::DataDescription getDataDescriptionIDC1() { return header::DataDescription{"IDC1"}; }
static constexpr header::DataDescription getDataDescriptionTimeStamp() { return header::DataDescription{"FOURIERTS"}; }
static constexpr header::DataDescription getDataDescriptionIntervals() { return header::DataDescription{"INTERVALS"}; }
Expand Down Expand Up @@ -215,6 +217,11 @@ class TPCFactorizeIDCSpec : public o2::framework::Task
const unsigned int iSide = static_cast<unsigned int>(side);
LOGP(info, "Sending IDC1 for side {} of size {}", iSide, mIDCFactorization.getIDCOneVec(side).size());
output.snapshot(Output{gDataOriginTPC, getDataDescriptionIDC1(), header::DataHeader::SubSpecificationType{iSide}}, mIDCFactorization.getIDCOneVec(side));

// calculating mean of IDC0 for the IDC scalers
o2::tpc::IDCCCDBHelper helper;
const float mean = helper.getMeanIDC0(side, mIDCFactorization.getIDCZero(side), mIDCFactorization.getPadStatusMap().get());
output.snapshot(Output{gDataOriginTPC, getDataDescriptionIDC0Mean(), header::DataHeader::SubSpecificationType{iSide}}, mean);
}
output.snapshot(Output{gDataOriginTPC, getDataDescriptionTimeStamp()}, std::vector<long>{mTimestampStart, timeStampEnd});
output.snapshot(Output{gDataOriginTPC, getDataDescriptionIntervals()}, mIDCFactorization.getIntegrationIntervalsPerTF());
Expand Down Expand Up @@ -457,6 +464,7 @@ DataProcessorSpec getTPCFactorizeIDCSpec(const int lane, const std::vector<uint3
if (sendOutputFFT) {
for (auto side : sides) {
outputSpecs.emplace_back(ConcreteDataMatcher{gDataOriginTPC, TPCFactorizeIDCSpec::getDataDescriptionIDC1(), header::DataHeader::SubSpecificationType{side}}, Lifetime::Sporadic);
outputSpecs.emplace_back(ConcreteDataMatcher{gDataOriginTPC, TPCFactorizeIDCSpec::getDataDescriptionIDC0Mean(), header::DataHeader::SubSpecificationType{side}}, Lifetime::Sporadic);
}
outputSpecs.emplace_back(ConcreteDataMatcher{gDataOriginTPC, TPCFactorizeIDCSpec::getDataDescriptionTimeStamp(), header::DataHeader::SubSpecificationType{0}}, Lifetime::Sporadic);
outputSpecs.emplace_back(ConcreteDataMatcher{gDataOriginTPC, TPCFactorizeIDCSpec::getDataDescriptionIntervals(), header::DataHeader::SubSpecificationType{0}}, Lifetime::Sporadic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "TPCWorkflow/TPCFactorizeSACSpec.h"
#include "TPCBase/CRU.h"
#include "CommonUtils/NameConf.h"
#include "TPCCalibration/TPCScaler.h"

using namespace o2::framework;
using o2::header::gDataOriginTPC;
Expand All @@ -38,6 +39,15 @@ using namespace o2::tpc;
namespace o2::tpc
{

struct TPCScalerProc {
std::array<long, 2> timestamp; ///< start -> end timestamp
std::array<std::vector<float>, 2> idc1; ///< IDC1
};

struct TPCScalerProcContainer {
std::unordered_map<long, TPCScalerProc> idcs;
};

class TPCFourierTransformAggregatorSpec : public o2::framework::Task
{
public:
Expand Down Expand Up @@ -92,6 +102,37 @@ class TPCFourierTransformAggregatorSpec : public o2::framework::Task
}
}

// buffer IDCs for TPC scaler
if (!mProcessSACs) {
TPCScalerProc& scaler = mTPCScalerCont.idcs[scaler.timestamp[0]];
scaler.timestamp[0] = mCCDBBuffer[lane].front();
scaler.timestamp[1] = mCCDBBuffer[lane].back();
for (auto& ref : InputRecordWalker(pc.inputs(), mFilterI0)) {
auto const* dataHeader = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
const int side = dataHeader->subSpecification;
const float idc0mean = pc.inputs().get<float>(ref);
LOGP(info, "Received {} 0D-IDC mean for side {}", idc0mean, side);
scaler.idc1[side] = mIDCOneBuffer[lane][side].mIDCOne;

// scale IDC1 with IDC0Mean
std::transform(scaler.idc1[side].begin(), scaler.idc1[side].end(), scaler.idc1[side].begin(), [&idc0mean](auto idc) { return idc * idc0mean; });
}
// check if A- and C-side has the same length!
const int lenA = scaler.idc1[0].size();
const int lenC = scaler.idc1[1].size();
if (lenA != lenC) {
// This should never happen
LOGP(warning, "Received IDCs have different length! A-side length: {} and C-side length: {}", lenA, lenC);
// what to do (?) add dummy to shorter or remove value
const int maxLen = std::max(lenA, lenC);
scaler.idc1[0].resize(maxLen);
scaler.idc1[1].resize(maxLen);
}

mRun = processing_helpers::getRunNumber(pc);
makeTPCScaler(pc.outputs());
}

FourierCoeffSAC coeffSAC;
if (lane == mExpectedInputLane) {
const int nSides = mIDCOneBuffer[lane][Side::A].mIDCOne.empty() + mIDCOneBuffer[lane][Side::C].mIDCOne.empty();
Expand Down Expand Up @@ -147,11 +188,13 @@ class TPCFourierTransformAggregatorSpec : public o2::framework::Task

void endOfStream(o2::framework::EndOfStreamContext& ec) final
{
makeTPCScaler(ec.outputs());
ec.services().get<ControlService>().readyToQuit(QuitRequest::Me);
}

static constexpr header::DataDescription getDataDescriptionFourier() { return header::DataDescription{"FOURIER"}; }
static constexpr header::DataDescription getDataDescriptionCCDBFourier() { return header::DataDescription{"TPC_CalibFFT"}; }
static constexpr header::DataDescription getDataDescriptionCCDBTPCScaler() { return header::DataDescription{"TPC_IDCScaler"}; }

private:
std::array<IDCFType, SIDES> mIDCFourierTransform{}; ///< object for performing the fourier transform of 1D-IDCs
Expand All @@ -163,10 +206,17 @@ class TPCFourierTransformAggregatorSpec : public o2::framework::Task
std::vector<std::vector<long>> mCCDBBuffer{}; ///< buffer for CCDB time stamp in case one facotorize lane is earlier sending data the n the other lane
std::vector<std::vector<unsigned int>> mIntervalsBuffer{}; ///< buffer for the intervals in case one facotorize lane is earlier sending data the n the other lane
std::vector<std::array<o2::tpc::IDCOne, SIDES>> mIDCOneBuffer{}; ///< buffer for the received IDCOne in case one facotorize lane is earlier sending data the n the other lane
std::vector<std::array<float, SIDES>> mIDCZeroMeanBuffer{}; ///< buffer for the received IDCZero mean in case one facotorize lane is earlier sending data the n the other lane
unsigned int mIntervalsSACs{12}; ///< number of intervals which are skipped for calculationg the fourier coefficients
int mExpectedInputLane{0}; ///< expeceted data from this input lane
TPCScalerProcContainer mTPCScalerCont; ///< container for buffering the IDCs for the creation of the TPC scalers
float mLengthIDCScalerSeconds = 300; ///< length of the IDC scaler in seconds
long mIDCSCalerEndTSLast = 0; ///< end time stamp of last TPC IDC scaler object to ensure no gapps
o2::tpc::TPCScaler mScalerLast; ///< buffer last scaler to easily add internal overlap for the beginning
int mRun{};
const std::array<std::vector<InputSpec>, 2> mFilter = {std::vector<InputSpec>{{"idcone", ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, TPCFactorizeIDCSpec::getDataDescriptionIDC1()}, Lifetime::Sporadic}},
std::vector<InputSpec>{{"sacone", ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, TPCFactorizeSACSpec::getDataDescriptionSAC1()}, Lifetime::Sporadic}}}; ///< filter for looping over input data
const std::vector<InputSpec> mFilterI0 = std::vector<InputSpec>{{"idczeromean", ConcreteDataTypeMatcher{o2::header::gDataOriginTPC, TPCFactorizeIDCSpec::getDataDescriptionIDC1()}, Lifetime::Sporadic}}; ///< filter for looping over input data from IDC0 mean

void sendOutput(DataAllocator& output, const int side)
{
Expand All @@ -179,19 +229,127 @@ class TPCFourierTransformAggregatorSpec : public o2::framework::Task
mIntervalsBuffer.resize(expectedLanes);
mIDCOneBuffer.resize(expectedLanes);
}

void makeTPCScaler(DataAllocator& output)
{
// check if IDC scalers can be created - check length of continous received IDCs
std::vector<std::pair<long, long>> times;
times.reserve(mTPCScalerCont.idcs.size());
for (const auto& idc : mTPCScalerCont.idcs) {
times.emplace_back(idc.second.timestamp[0], idc.second.timestamp[1]);
}

// sort received times of the IDCs
std::sort(times.begin(), times.end());

// loop over times and make checks
long timestart = times.front().second;
const int checkGapp = 10;
// if the diff between end of data[i] and start of data[i+1] is smaller than this, the data is contigous
long timesDuration = (times.front().second - times.front().first) / checkGapp;

// make check and store lastValid index in case IDC scalers can be created
int lastValidIdx = -1;
for (int i = 1; i < times.size(); ++i) {
const auto& time = times[i];
const auto deltaTime = time.first - timestart;
// check if IDCs are contigous
if (deltaTime > timesDuration) {
// check if the gap is very large - in this case the gapp might be lost, so just write out the TPC scaler until the gap
if (deltaTime > checkGapp * timesDuration) {
lastValidIdx = i - 1;
}
break;
}
timestart = time.first;

// check if time length is >= than mLengthIDCScalerSeconds
if ((times.front().first - timestart) / 1000 >= mLengthIDCScalerSeconds) {
lastValidIdx = i;
}
}

// create IDC scaler in case index is valid
if (lastValidIdx >= 0) {
o2::tpc::TPCScaler scaler;
scaler.setIonDriftTimeMS(170);
scaler.setRun(mRun);
scaler.setStartTimeStampMS(times.front().first);
const auto idcIntegrationTime = 12 /*12 orbits integration interval per IDC*/ * o2::constants::lhc::LHCOrbitMUS / 1000;
scaler.setIntegrationTimeMS(idcIntegrationTime);

std::vector<float> idc1A;
std::vector<float> idc1C;
long idc1ASize = 0;
long idc1CSize = 0;

// in case already one object is stored add internal overlap
if (mIDCSCalerEndTSLast != 0) {
const int nOverlap = 500; /// ~500ms overlap
idc1ASize += nOverlap;
idc1CSize += nOverlap;
const auto& scalerALast = mScalerLast.getScalers(o2::tpc::Side::A);
const auto& scalerCLast = mScalerLast.getScalers(o2::tpc::Side::C);
if (scalerALast.size() > nOverlap) {
idc1A.insert(idc1A.end(), scalerALast.end() - nOverlap, scalerALast.end());
idc1C.insert(idc1C.end(), scalerCLast.end() - nOverlap, scalerCLast.end());
// adjust start time
scaler.setStartTimeStampMS(scaler.getStartTimeStampMS() - nOverlap * idcIntegrationTime);
}
} else {
// store end timestamp as start time stamp for first object for correct time stamp in CCDB
mIDCSCalerEndTSLast = scaler.getStartTimeStampMS();
}

for (int iter = 0; iter < 2; ++iter) {
if (iter == 1) {
idc1A.reserve(idc1ASize);
idc1C.reserve(idc1CSize);
}
for (int i = 0; i < times.size(); ++i) {
const auto& time = times[i];
const auto& idc = mTPCScalerCont.idcs[time.first];
if (iter == 0) {
idc1ASize += idc.idc1[0].size();
idc1CSize += idc.idc1[1].size();
} else {
idc1A.insert(idc1A.end(), idc.idc1[0].begin(), idc.idc1[0].end());
idc1C.insert(idc1C.end(), idc.idc1[1].begin(), idc.idc1[1].end());
}
}
}
scaler.setScaler(idc1A, o2::tpc::Side::A);
scaler.setScaler(idc1C, o2::tpc::Side::C);

// store in CCDB
o2::ccdb::CcdbObjectInfo ccdbInfoIDC(CDBTypeMap.at(CDBType::CalScaler), std::string{}, std::string{}, std::map<std::string, std::string>{}, mIDCSCalerEndTSLast, scaler.getEndTimeStampMS(o2::tpc::Side::A));
auto imageIDC = o2::ccdb::CcdbApi::createObjectImage(&scaler, &ccdbInfoIDC);
LOGP(info, "Sending object {} / {} of size {} bytes, valid for {} : {} ", ccdbInfoIDC.getPath(), ccdbInfoIDC.getFileName(), imageIDC->size(), ccdbInfoIDC.getStartValidityTimestamp(), ccdbInfoIDC.getEndValidityTimestamp());
output.snapshot(Output{o2::calibration::Utils::gDataOriginCDBPayload, getDataDescriptionCCDBTPCScaler(), 0}, *imageIDC.get());
output.snapshot(Output{o2::calibration::Utils::gDataOriginCDBWrapper, getDataDescriptionCCDBTPCScaler(), 0}, ccdbInfoIDC);

// store end timestamp
mIDCSCalerEndTSLast = scaler.getEndTimeStampMS(o2::tpc::Side::A);

// push to CCDB
mScalerLast = std::move(scaler);
}
}
};
DataProcessorSpec getTPCFourierTransformAggregatorSpec(const unsigned int rangeIDC, const unsigned int nFourierCoefficientsStore, const bool senddebug, const bool processSACs, const int inputLanes)
{
std::vector<OutputSpec> outputSpecs;
outputSpecs.emplace_back(ConcreteDataTypeMatcher{o2::calibration::Utils::gDataOriginCDBPayload, TPCFourierTransformAggregatorSpec::getDataDescriptionCCDBFourier()}, Lifetime::Sporadic);
outputSpecs.emplace_back(ConcreteDataTypeMatcher{o2::calibration::Utils::gDataOriginCDBWrapper, TPCFourierTransformAggregatorSpec::getDataDescriptionCCDBFourier()}, Lifetime::Sporadic);
outputSpecs.emplace_back(ConcreteDataTypeMatcher{o2::calibration::Utils::gDataOriginCDBWrapper, TPCFourierTransformAggregatorSpec::getDataDescriptionCCDBTPCScaler()}, Lifetime::Sporadic);

if (senddebug) {
outputSpecs.emplace_back(ConcreteDataTypeMatcher{gDataOriginTPC, TPCFourierTransformAggregatorSpec::getDataDescriptionFourier()}, Lifetime::Sporadic);
}

std::vector<InputSpec> inputSpecs;
if (!processSACs) {
inputSpecs.emplace_back(InputSpec{"idczeromean", ConcreteDataTypeMatcher{gDataOriginTPC, TPCFactorizeIDCSpec::getDataDescriptionIDC0Mean()}, Lifetime::Sporadic});
inputSpecs.emplace_back(InputSpec{"idcone", ConcreteDataTypeMatcher{gDataOriginTPC, TPCFactorizeIDCSpec::getDataDescriptionIDC1()}, Lifetime::Sporadic});
inputSpecs.emplace_back(InputSpec{"tsccdb", gDataOriginTPC, TPCFactorizeIDCSpec::getDataDescriptionTimeStamp(), Lifetime::Sporadic});
inputSpecs.emplace_back(InputSpec{"intervals", gDataOriginTPC, TPCFactorizeIDCSpec::getDataDescriptionIntervals(), Lifetime::Sporadic});
Expand Down

0 comments on commit 7b8f055

Please sign in to comment.