From fe755c470b728c9ab1e02a6e81b91240b58034c9 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Sat, 9 May 2026 08:31:50 +0200 Subject: [PATCH] DPL: schedule rescan when oldest possible timeframe updated --- Framework/Core/src/CommonServices.cxx | 56 +++++++++++++++++++ .../Core/src/CompletionPolicyHelpers.cxx | 2 + Framework/Core/src/DecongestionService.h | 2 + 3 files changed, 60 insertions(+) diff --git a/Framework/Core/src/CommonServices.cxx b/Framework/Core/src/CommonServices.cxx index 0f53f5a6be5a1..2cdd046dedc34 100644 --- a/Framework/Core/src/CommonServices.cxx +++ b/Framework/Core/src/CommonServices.cxx @@ -589,6 +589,46 @@ auto decongestionCallbackOrdered = [](AsyncTask& task, size_t id) -> void { } }; +// Callback for consumeWhenPastOldestPossibleTimeframe. +// Runs in the async queue at the beginning of the next iteration, +// after Retry slots unblocked by an oldestPossibleInput change have +// been consumed and freed. Rescans all slots and forwards the +// (now up-to-date) oldestPossibleOutput downstream. +auto decongestionCallbackPastOldest = [](AsyncTask& task, size_t id) -> void { + auto& ref = task.user().ref; + + auto& decongestion = ref.get(); + auto& timesliceIndex = ref.get(); + auto& relayer = ref.get(); + auto& proxy = ref.get(); + O2_SIGNPOST_ID_GENERATE(cid, async_queue); + + timesliceIndex.rescan(); + timesliceIndex.updateOldestPossibleOutput(decongestion.nextEnumerationTimesliceRewinded); + auto oldestPossibleOutput = relayer.getOldestPossibleOutput(); + + if (oldestPossibleOutput.timeslice.value <= decongestion.lastTimeslice) { + O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "oldest_possible_timeslice", + "consumeWhenPastOldestPossibleTimeframe: not forwarding already sent value %" PRIu64, + (uint64_t)oldestPossibleOutput.timeslice.value); + return; + } + O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "oldest_possible_timeslice", + "consumeWhenPastOldestPossibleTimeframe: forwarding oldest possible timeslice %" PRIu64, + (uint64_t)oldestPossibleOutput.timeslice.value); + DataProcessingHelpers::broadcastOldestPossibleTimeslice(ref, oldestPossibleOutput.timeslice.value); + + for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) { + auto& info = proxy.getForwardChannelInfo(ChannelIndex{fi}); + auto& state = proxy.getForwardChannelState(ChannelIndex{fi}); + if (info.channelType != ChannelAccountingType::DPL) { + continue; + } + DataProcessingHelpers::sendOldestPossibleTimeframe(ref, info, state, oldestPossibleOutput.timeslice.value); + } + decongestion.lastTimeslice = oldestPossibleOutput.timeslice.value; +}; + // Decongestion service // If we do not have any Timeframe input, it means we must be creating timeslices // in order and that we should propagate the oldest possible timeslice at the end @@ -705,6 +745,22 @@ o2::framework::ServiceSpec timesliceIndex.updateOldestPossibleOutput(decongestion.nextEnumerationTimesliceRewinded); auto oldestPossibleOutput = relayer.getOldestPossibleOutput(); + // When consumeWhenPastOldestPossibleTimeframe is active, we always + // schedule the callback even when oldestPossibleOutput has not changed + // yet. Retry slots held by this policy will be consumed after this + // domainInfoUpdated call (once getReadyToProcess re-checks them), and + // the callback — running in the next iteration — will recompute + // oldestPossibleOutput and forward the updated value downstream. + if (decongestion.consumeWhenPastOldestPossibleTimeframeActive) { + auto& queue = services.get(); + AsyncQueueHelpers::post( + queue, AsyncTask{.timeslice = TimesliceId{oldestPossibleTimeslice}, + .id = decongestion.oldestPossibleTimesliceTask, + .debounce = -1, + .callback = decongestionCallbackPastOldest} + .user({.ref = services, .oldestPossibleOutput = oldestPossibleOutput})); + } + if (oldestPossibleOutput.timeslice.value == decongestion.lastTimeslice) { O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Synchronous: Not sending already sent value: %" PRIu64, (uint64_t)oldestPossibleOutput.timeslice.value); return; diff --git a/Framework/Core/src/CompletionPolicyHelpers.cxx b/Framework/Core/src/CompletionPolicyHelpers.cxx index cc593ee7a2ed9..e5a91ae58f899 100644 --- a/Framework/Core/src/CompletionPolicyHelpers.cxx +++ b/Framework/Core/src/CompletionPolicyHelpers.cxx @@ -267,6 +267,8 @@ CompletionPolicy CompletionPolicyHelpers::consumeWhenAnyZeroCount(const char* na CompletionPolicy CompletionPolicyHelpers::consumeWhenPastOldestPossibleTimeframe(const char* name, CompletionPolicy::Matcher matcher) { auto callback = [](InputSpan const& inputs, std::vector const&, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp { + auto& decongestionService = ref.get(); + decongestionService.consumeWhenPastOldestPossibleTimeframeActive = true; size_t currentTimeslice = -1; for (auto& input : inputs) { if (input.header == nullptr) { diff --git a/Framework/Core/src/DecongestionService.h b/Framework/Core/src/DecongestionService.h index 1a42d3577bc0a..fe7fce8d640db 100644 --- a/Framework/Core/src/DecongestionService.h +++ b/Framework/Core/src/DecongestionService.h @@ -33,6 +33,8 @@ struct DecongestionService { int64_t nextTimeslice = 0; /// Ordered completion policy is active. bool orderedCompletionPolicyActive = false; + /// consumeWhenPastOldestPossibleTimeframe completion policy is active. + bool consumeWhenPastOldestPossibleTimeframeActive = false; // Task to enqueue the oldest possible timeslice propagation // at the end of any processing chain. o2::framework::AsyncTaskId oldestPossibleTimesliceTask = {0};