Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions Framework/CCDBSupport/src/CCDBFetcherHelper.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
#include "CCDBFetcherHelper.h"
#include "CCDBHelpers.h"
#include "Framework/DataTakingContext.h"
#include "Framework/Signpost.h"
#include "Framework/DataSpecUtils.h"
Expand Down Expand Up @@ -257,14 +258,13 @@ auto CCDBFetcherHelper::populateCacheWith(std::shared_ptr<CCDBFetcherHelper> con
helper->totalFetchedBytes += size;
helper->totalRequestedBytes += size;
api.appendFlatHeader(v, headers);
auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB);
auto cacheId = CCDBHelpers::adoptAndReplaceCachedMessage(allocator, helper->mapURL2DPLCache, path, output, std::move(v), header::gSerializationMethodCCDB);
helper->mapURL2DPLCache[path] = cacheId;
responses.emplace_back(Response{.id = cacheId, .size = size, .request = nullptr});
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ", size %zu)", path.data(), headers["ETag"].data(), cacheId.value, size);
continue;
}
if (v.size()) { // but should be overridden by fresh object
// somewhere here pruneFromCache should be called
if (v.size()) { // but should be overridden by fresh object
helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid
helper->mapURL2UUID[path].cachePopulatedAt = timestampToUse;
helper->mapURL2UUID[path].cacheValidUntil = headers["Cache-Valid-Until"].empty() ? 0 : std::stoul(headers["Cache-Valid-Until"]);
Expand All @@ -276,12 +276,10 @@ auto CCDBFetcherHelper::populateCacheWith(std::shared_ptr<CCDBFetcherHelper> con
helper->totalFetchedBytes += size;
helper->totalRequestedBytes += size;
api.appendFlatHeader(v, headers);
auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB);
auto cacheId = CCDBHelpers::adoptAndReplaceCachedMessage(allocator, helper->mapURL2DPLCache, path, output, std::move(v), header::gSerializationMethodCCDB);
helper->mapURL2DPLCache[path] = cacheId;
responses.emplace_back(Response{.id = cacheId, .size = size, .request = nullptr});
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
// one could modify the adoptContainer to take optional old cacheID to clean:
// mapURL2DPLCache[URL] = ctx.outputs().adoptContainer(output, std::move(outputBuffer), DataAllocator::CacheStrategy::Always, mapURL2DPLCache[URL]);
continue;
} else {
// Only once the etag is actually used, we get the information on how long the object is valid
Expand Down
34 changes: 22 additions & 12 deletions Framework/CCDBSupport/src/CCDBHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,22 @@ bool isOnlineRun(DataTakingContext const& dtc)
return dtc.deploymentMode == DeploymentMode::OnlineAUX || dtc.deploymentMode == DeploymentMode::OnlineDDS || dtc.deploymentMode == DeploymentMode::OnlineECS;
}

DataAllocator::CacheId CCDBHelpers::adoptAndReplaceCachedMessage(
DataAllocator& allocator,
std::unordered_map<std::string, DataAllocator::CacheId> const& cache,
std::string const& path,
Output const& output,
o2::pmr::vector<char>&& v,
o2::header::SerializationMethod method)
{
auto oldIt = cache.find(path);
auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, method);
if (oldIt != cache.end()) {
allocator.pruneFromCache(oldIt->second);
}
return cacheId;
}

auto populateCacheWith(std::shared_ptr<CCDBFetcherHelper> const& helper,
int64_t timestamp,
TimingInfo& timingInfo,
Expand Down Expand Up @@ -347,13 +363,12 @@ auto populateCacheWith(std::shared_ptr<CCDBFetcherHelper> const& helper,
helper->totalFetchedBytes += v.size();
helper->totalRequestedBytes += v.size();
api.appendFlatHeader(v, headers);
auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB);
auto cacheId = CCDBHelpers::adoptAndReplaceCachedMessage(allocator, helper->mapURL2DPLCache, path, output, std::move(v), header::gSerializationMethodCCDB);
helper->mapURL2DPLCache[path] = cacheId;
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
continue;
}
if (v.size()) { // but should be overridden by fresh object
// somewhere here pruneFromCache should be called
if (v.size()) { // but should be overridden by fresh object
helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid
helper->mapURL2UUID[path].cachePopulatedAt = timestampToUse;
helper->mapURL2UUID[path].cacheValidUntil = headers["Cache-Valid-Until"].empty() ? 0 : std::stoul(headers["Cache-Valid-Until"]);
Expand All @@ -364,11 +379,9 @@ auto populateCacheWith(std::shared_ptr<CCDBFetcherHelper> const& helper,
helper->totalFetchedBytes += v.size();
helper->totalRequestedBytes += v.size();
api.appendFlatHeader(v, headers);
auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB);
auto cacheId = CCDBHelpers::adoptAndReplaceCachedMessage(allocator, helper->mapURL2DPLCache, path, output, std::move(v), header::gSerializationMethodCCDB);
helper->mapURL2DPLCache[path] = cacheId;
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
// one could modify the adoptContainer to take optional old cacheID to clean:
// mapURL2DPLCache[URL] = ctx.outputs().adoptContainer(output, std::move(outputBuffer), DataAllocator::CacheStrategy::Always, mapURL2DPLCache[URL]);
continue;
} else {
// Only once the etag is actually used, we get the information on how long the object is valid
Expand Down Expand Up @@ -448,11 +461,10 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB()
helper->totalRequestedBytes += v.size();
newOrbitResetTime = getOrbitResetTime(v);
api.appendFlatHeader(v, headers);
auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodNone);
auto cacheId = CCDBHelpers::adoptAndReplaceCachedMessage(allocator, helper->mapURL2DPLCache, path, output, std::move(v), header::gSerializationMethodNone);
helper->mapURL2DPLCache[path] = cacheId;
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
} else if (v.size()) { // but should be overridden by fresh object
// somewhere here pruneFromCache should be called
} else if (v.size()) { // but should be overridden by fresh object
helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid
helper->mapURL2UUID[path].cacheMiss++;
helper->mapURL2UUID[path].size = v.size();
Expand All @@ -462,11 +474,9 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB()
helper->totalRequestedBytes += v.size();
newOrbitResetTime = getOrbitResetTime(v);
api.appendFlatHeader(v, headers);
auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodNone);
auto cacheId = CCDBHelpers::adoptAndReplaceCachedMessage(allocator, helper->mapURL2DPLCache, path, output, std::move(v), header::gSerializationMethodNone);
helper->mapURL2DPLCache[path] = cacheId;
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
// one could modify the adoptContainer to take optional old cacheID to clean:
// mapURL2DPLCache[URL] = ctx.outputs().adoptContainer(output, std::move(outputBuffer), DataAllocator::CacheStrategy::Always, mapURL2DPLCache[URL]);
}
// cached object is fine
}
Expand Down
22 changes: 22 additions & 0 deletions Framework/CCDBSupport/src/CCDBHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
#define O2_FRAMEWORK_CCDBHELPERS_H_

#include "Framework/AlgorithmSpec.h"
#include "Framework/DataAllocator.h"
#include "Framework/Output.h"
#include "Headers/DataHeader.h"
#include "MemoryResources/MemoryResources.h"
#include <unordered_map>
#include <string>

Expand All @@ -25,6 +29,24 @@ struct CCDBHelpers {
};
static AlgorithmSpec fetchFromCCDB();
static ParserResult parseRemappings(char const*);

/// Adopt a freshly-fetched CCDB payload as a new SHM message and prune
/// the previously cached one for this path. The new SHM message is
/// adopted BEFORE the old cached one is pruned
/// @a allocator producer-device DPL DataAllocator
/// @a cache read-only view of the producer-local path -> CacheId map;
/// @a path CCDB path
/// @a output DPL Output matcher
/// @a v freshly-fetched CCDB payload; consumed by the call, leaving @a v empty
/// @a method serialization-method tag written into the message header
/// @return the new CacheId; the caller must record it in its map
static DataAllocator::CacheId adoptAndReplaceCachedMessage(
DataAllocator& allocator,
std::unordered_map<std::string, DataAllocator::CacheId> const& cache,
std::string const& path,
Output const& output,
o2::pmr::vector<char>&& v,
o2::header::SerializationMethod method);
};

} // namespace o2::framework
Expand Down
4 changes: 4 additions & 0 deletions Framework/Core/include/Framework/DataAllocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,10 @@ class DataAllocator
/// Adopt an already cached message, using an already provided CacheId.
void adoptFromCache(Output const& spec, CacheId id, header::SerializationMethod method = header::gSerializationMethodNone);

/// Prune a previously cached message identified by @a id from the message cache.
/// Calling this with an unknown id is a no-op.
void pruneFromCache(CacheId id);

/// snapshot object and route to output specified by OutputRef
/// Framework makes a (serialized) copy of object content.
///
Expand Down
56 changes: 26 additions & 30 deletions Framework/Core/include/Framework/InputRecord.h
Original file line number Diff line number Diff line change
Expand Up @@ -415,37 +415,35 @@ class InputRecord
auto id = ObjectCache::Id::fromRef(ref);
ConcreteDataMatcher matcher{header->dataOrigin, header->dataDescription, header->subSpecification};
// If the matcher does not have an entry in the cache, deserialise it
// and cache the deserialised object at the given id.
// and cache the deserialised object alongside its id, keyed by path.
auto path = fmt::format("{}", DataSpecUtils::describe(matcher));
LOGP(debug, "{}", path);
auto& cache = mRegistry.get<ObjectCache>();
auto& callbacks = mRegistry.get<CallbackService>();
auto cacheEntry = cache.matcherToId.find(path);
if (cacheEntry == cache.matcherToId.end()) {
cache.matcherToId.insert(std::make_pair(path, id));
auto cacheEntry = cache.matcherToEntry.find(path);
if (cacheEntry == cache.matcherToEntry.end()) {
std::unique_ptr<ValueT const, Deleter<ValueT const>> result(DataRefUtils::as<CCDBSerialized<ValueT>>(ref).release(), false);
void* obj = (void*)result.get();
callbacks.call<CallbackService::Id::CCDBDeserialised>((ConcreteDataMatcher&)matcher, (void*)obj);
cache.idToObject[id] = obj;
cache.matcherToEntry.emplace(path, ObjectCache::Entry{id, obj});
LOGP(info, "Caching in {} ptr to {} ({})", id.value, path, obj);
return result;
}
auto& oldId = cacheEntry->second;
auto& entry = cacheEntry->second;
// The id in the cache is the same, let's simply return it.
if (oldId.value == id.value) {
std::unique_ptr<ValueT const, Deleter<ValueT const>> result((ValueT const*)cache.idToObject[id], false);
if (entry.id.value == id.value) {
std::unique_ptr<ValueT const, Deleter<ValueT const>> result((ValueT const*)entry.obj, false);
LOGP(debug, "Returning cached entry {} for {} ({})", id.value, path, (void*)result.get());
return result;
}
// The id in the cache is different. Let's destroy the old cached entry
// and create a new one.
delete reinterpret_cast<ValueT*>(cache.idToObject[oldId]);
// The id in the cache is different. Destroy this path's previously cached object and replace it.
delete reinterpret_cast<ValueT*>(entry.obj);
std::unique_ptr<ValueT const, Deleter<ValueT const>> result(DataRefUtils::as<CCDBSerialized<ValueT>>(ref).release(), false);
void* obj = (void*)result.get();
callbacks.call<CallbackService::Id::CCDBDeserialised>((ConcreteDataMatcher&)matcher, (void*)obj);
cache.idToObject[id] = obj;
LOGP(info, "Replacing cached entry {} with {} for {} ({})", oldId.value, id.value, path, obj);
oldId.value = id.value;
LOGP(info, "Replacing cached entry {} with {} for {} ({})", entry.id.value, id.value, path, obj);
entry.id = id;
entry.obj = obj;
return result;
} else {
throw runtime_error("Attempt to extract object from message with unsupported serialization type");
Expand Down Expand Up @@ -496,30 +494,28 @@ class InputRecord
// it's updated.
auto id = ObjectCache::Id::fromRef(ref);
ConcreteDataMatcher matcher{header->dataOrigin, header->dataDescription, header->subSpecification};
// If the matcher does not have an entry in the cache, deserialise it
// and cache the deserialised object at the given id.
// If the matcher does not have an entry in the cache, deserialise it and cache it per path.
auto path = fmt::format("{}", DataSpecUtils::describe(matcher));
LOGP(debug, "{}", path);
auto& cache = mRegistry.get<ObjectCache>();
auto cacheEntry = cache.matcherToMetadataId.find(path);
if (cacheEntry == cache.matcherToMetadataId.end()) {
cache.matcherToMetadataId.insert(std::make_pair(path, id));
cache.idToMetadata[id] = DataRefUtils::extractCCDBHeaders(ref);
auto cacheEntry = cache.matcherToMetadata.find(path);
if (cacheEntry == cache.matcherToMetadata.end()) {
auto [it, inserted] = cache.matcherToMetadata.emplace(
path, ObjectCache::MetadataEntry{id, DataRefUtils::extractCCDBHeaders(ref)});
LOGP(info, "Caching CCDB metadata {}: {}", id.value, path);
return cache.idToMetadata[id];
return it->second.metadata;
}
auto& oldId = cacheEntry->second;
auto& entry = cacheEntry->second;
// The id in the cache is the same, let's simply return it.
if (oldId.value == id.value) {
if (entry.id.value == id.value) {
LOGP(debug, "Returning cached CCDB metatada {}: {}", id.value, path);
return cache.idToMetadata[id];
return entry.metadata;
}
// The id in the cache is different. Let's destroy the old cached entry
// and create a new one.
LOGP(info, "Replacing cached entry {} with {} for {}", oldId.value, id.value, path);
cache.idToMetadata[id] = DataRefUtils::extractCCDBHeaders(ref);
oldId.value = id.value;
return cache.idToMetadata[id];
// The id in the cache is different. Replace this path's metadata.
LOGP(info, "Replacing cached entry {} with {} for {}", entry.id.value, id.value, path);
entry.id = id;
entry.metadata = DataRefUtils::extractCCDBHeaders(ref);
return entry.metadata;
}

template <typename T>
Expand Down
32 changes: 21 additions & 11 deletions Framework/Core/include/Framework/ObjectCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
#include "Framework/DataRef.h"
#include <unordered_map>
#include <map>
#include <string>

namespace o2::framework
{

/// A cache for CCDB objects or objects in general
/// which have more than one timeframe of lifetime.
/// The cache is keyed *per path* rather than by a global id-derived hash.
struct ObjectCache {
struct Id {
int64_t value;
Expand All @@ -39,20 +41,28 @@ struct ObjectCache {
}
};
};
/// A cache for deserialised objects.

/// Per-path cache entry for a deserialised CCDB object.
struct Entry {
Id id{0};
void* obj{nullptr};
};

/// Per-path cache entry for the CCDB metadata map.
struct MetadataEntry {
Id id{0};
std::map<std::string, std::string> metadata;
};

/// A per-path cache for deserialised objects.
/// This keeps a mapping so that we can tell if a given
/// path was already received and it's blob stored in
/// .second.
std::unordered_map<std::string, Id> matcherToId;
/// A map from a CacheId (which is the void* ptr of the previous map).
/// to an actual (type erased) pointer to the deserialised object.
std::unordered_map<Id, void*, Id::hash_fn> idToObject;

/// A cache to the deserialised metadata
/// path was already received and it's blob stored in .second.obj
std::unordered_map<std::string, Entry> matcherToEntry;

/// A per-path cache to the deserialised metadata
/// We keep it separate because we want to avoid that looking up
/// the metadata also pollutes the object cache.
std::unordered_map<std::string, Id> matcherToMetadataId;
std::unordered_map<Id, std::map<std::string, std::string>, Id::hash_fn> idToMetadata;
std::unordered_map<std::string, MetadataEntry> matcherToMetadata;
};

} // namespace o2::framework
Expand Down
6 changes: 6 additions & 0 deletions Framework/Core/src/DataAllocator.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,12 @@ void DataAllocator::adoptFromCache(const Output& spec, CacheId id, header::Seria
context.add<MessageContext::TrivialObject>(std::move(headerMessage), std::move(payloadMessage), routeIndex);
}

void DataAllocator::pruneFromCache(CacheId id)
{
auto& context = mRegistry.get<MessageContext>();
context.pruneFromCache(id.value);
}

void DataAllocator::cookDeadBeef(const Output& spec)
{
auto& proxy = mRegistry.get<FairMQDeviceProxy>();
Expand Down