Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions Framework/Core/src/CommonServices.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,46 @@ auto decongestionCallbackOrdered = [](AsyncTask& task, size_t id) -> void {
}
};

// Callback for consumeWhenPastOldestPossibleTimeframe.
// Runs in the async queue at the beginning of the next iteration,
// after Retry slots unblocked by an oldestPossibleInput change have
// been consumed and freed. Rescans all slots and forwards the
// (now up-to-date) oldestPossibleOutput downstream.
auto decongestionCallbackPastOldest = [](AsyncTask& task, size_t id) -> void {
auto& ref = task.user<DecongestionContext>().ref;

auto& decongestion = ref.get<DecongestionService>();
auto& timesliceIndex = ref.get<TimesliceIndex>();
auto& relayer = ref.get<DataRelayer>();
auto& proxy = ref.get<FairMQDeviceProxy>();
O2_SIGNPOST_ID_GENERATE(cid, async_queue);

timesliceIndex.rescan();
timesliceIndex.updateOldestPossibleOutput(decongestion.nextEnumerationTimesliceRewinded);
auto oldestPossibleOutput = relayer.getOldestPossibleOutput();

if (oldestPossibleOutput.timeslice.value <= decongestion.lastTimeslice) {
O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "oldest_possible_timeslice",
"consumeWhenPastOldestPossibleTimeframe: not forwarding already sent value %" PRIu64,
(uint64_t)oldestPossibleOutput.timeslice.value);
return;
}
O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "oldest_possible_timeslice",
"consumeWhenPastOldestPossibleTimeframe: forwarding oldest possible timeslice %" PRIu64,
(uint64_t)oldestPossibleOutput.timeslice.value);
DataProcessingHelpers::broadcastOldestPossibleTimeslice(ref, oldestPossibleOutput.timeslice.value);

for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
auto& info = proxy.getForwardChannelInfo(ChannelIndex{fi});
auto& state = proxy.getForwardChannelState(ChannelIndex{fi});
if (info.channelType != ChannelAccountingType::DPL) {
continue;
}
DataProcessingHelpers::sendOldestPossibleTimeframe(ref, info, state, oldestPossibleOutput.timeslice.value);
}
decongestion.lastTimeslice = oldestPossibleOutput.timeslice.value;
};

// Decongestion service
// If we do not have any Timeframe input, it means we must be creating timeslices
// in order and that we should propagate the oldest possible timeslice at the end
Expand Down Expand Up @@ -705,6 +745,22 @@ o2::framework::ServiceSpec
timesliceIndex.updateOldestPossibleOutput(decongestion.nextEnumerationTimesliceRewinded);
auto oldestPossibleOutput = relayer.getOldestPossibleOutput();

// When consumeWhenPastOldestPossibleTimeframe is active, we always
// schedule the callback even when oldestPossibleOutput has not changed
// yet. Retry slots held by this policy will be consumed after this
// domainInfoUpdated call (once getReadyToProcess re-checks them), and
// the callback — running in the next iteration — will recompute
// oldestPossibleOutput and forward the updated value downstream.
if (decongestion.consumeWhenPastOldestPossibleTimeframeActive) {
auto& queue = services.get<AsyncQueue>();
AsyncQueueHelpers::post(
queue, AsyncTask{.timeslice = TimesliceId{oldestPossibleTimeslice},
.id = decongestion.oldestPossibleTimesliceTask,
.debounce = -1,
.callback = decongestionCallbackPastOldest}
.user<DecongestionContext>({.ref = services, .oldestPossibleOutput = oldestPossibleOutput}));
}

if (oldestPossibleOutput.timeslice.value == decongestion.lastTimeslice) {
O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Synchronous: Not sending already sent value: %" PRIu64, (uint64_t)oldestPossibleOutput.timeslice.value);
return;
Expand Down
2 changes: 2 additions & 0 deletions Framework/Core/src/CompletionPolicyHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,8 @@ CompletionPolicy CompletionPolicyHelpers::consumeWhenAnyZeroCount(const char* na
CompletionPolicy CompletionPolicyHelpers::consumeWhenPastOldestPossibleTimeframe(const char* name, CompletionPolicy::Matcher matcher)
{
auto callback = [](InputSpan const& inputs, std::vector<InputSpec> const&, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
auto& decongestionService = ref.get<DecongestionService>();
decongestionService.consumeWhenPastOldestPossibleTimeframeActive = true;
size_t currentTimeslice = -1;
for (auto& input : inputs) {
if (input.header == nullptr) {
Expand Down
2 changes: 2 additions & 0 deletions Framework/Core/src/DecongestionService.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ struct DecongestionService {
int64_t nextTimeslice = 0;
/// Ordered completion policy is active.
bool orderedCompletionPolicyActive = false;
/// consumeWhenPastOldestPossibleTimeframe completion policy is active.
bool consumeWhenPastOldestPossibleTimeframeActive = false;
// Task to enqueue the oldest possible timeslice propagation
// at the end of any processing chain.
o2::framework::AsyncTaskId oldestPossibleTimesliceTask = {0};
Expand Down
Loading