Skip to content

Commit

Permalink
DPL: handle spurious messages when in READY state (AliceO2Group#11799)
Browse files Browse the repository at this point in the history
  • Loading branch information
ktf authored Aug 23, 2023
1 parent 637b7d4 commit 44b5559
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 3 deletions.
4 changes: 4 additions & 0 deletions Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1481,6 +1481,10 @@ void DataProcessingDevice::doPrepare(ServiceRegistryRef ref)
if (info.channel == nullptr) {
continue;
}
// Only poll DPL channels for now.
if (info.channelType != ChannelAccountingType::DPL) {
continue;
}
auto& socket = info.channel->GetSocket();
// If we have pending events from a previous iteration,
// we do receive in any case.
Expand Down
29 changes: 26 additions & 3 deletions Framework/Core/src/ExternalFairMQDeviceProxy.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

#include <fairmq/Parts.h>
#include <fairmq/Device.h>
#include <uv.h>
#include <cstring>
#include <cassert>
#include <memory>
Expand Down Expand Up @@ -466,11 +467,12 @@ DataProcessorSpec specifyExternalFairMQDeviceProxy(char const* name,
for (auto& channel : channels) {
LOGP(detail, "Injecting channel '{}' into DPL configuration", channel);
// Converter should pump messages
auto& channelPtr = services.get<RawDeviceService>().device()->GetChannel(channel, 0);
deviceState.inputChannelInfos.push_back(InputChannelInfo{
.state = InputChannelState::Running,
.hasPendingEvents = false,
.readPolled = false,
.channel = nullptr,
.channel = &channelPtr,
.id = {ChannelIndex::INVALID},
.channelType = ChannelAccountingType::RAWFMQ,
});
Expand All @@ -491,12 +493,31 @@ DataProcessorSpec specifyExternalFairMQDeviceProxy(char const* name,
std::vector<bool> lastNewStatePending(deviceState.inputChannelInfos.size(), false);

// Continue iterating until all channels have seen a new state.
while (std::all_of(lastNewStatePending.begin(), lastNewStatePending.end(), [](bool b) { return b; })) {
while (std::all_of(lastNewStatePending.begin(), lastNewStatePending.end(), [](bool b) { return b; }) != true) {
fair::mq::Parts parts;
for (size_t ci = 0; ci < deviceState.inputChannelInfos.size(); ++ci) {
auto& info = deviceState.inputChannelInfos[ci];
// We only care about rawfmq channels.
if (info.channelType != ChannelAccountingType::RAWFMQ) {
lastNewStatePending[ci] = true;
continue;
}
// This means we have not set things up yet. I.e. the first iteration from
// ready to run has not happened yet.
if (info.channel == nullptr) {
lastNewStatePending[ci] = true;
continue;
}
info.channel->Receive(parts, 10);
lastNewStatePending[ci] = device->NewStatePending();
// Handle both cases of state changes:
//
// - The state has been changed from the outside and FairMQ knows about it.
// - The state has been changed from the GUI, and deviceState.nextFairMQState knows about it.
//
// This latter case is probably better handled from DPL itself, after all it's fair to
// assume we need to switch state as soon as the GUI notifies us.
// For now we keep it here to avoid side effects.
lastNewStatePending[ci] = device->NewStatePending() || (deviceState.nextFairMQState.empty() == false);
if (parts.Size() == 0) {
continue;
}
Expand All @@ -510,6 +531,8 @@ DataProcessorSpec specifyExternalFairMQDeviceProxy(char const* name,
info.readPolled = true;
}
}
// Keep state transitions going also when running with the standalone GUI.
uv_run(deviceState.loop, UV_RUN_NOWAIT);
}
};

Expand Down

0 comments on commit 44b5559

Please sign in to comment.