From 3d19c635764069f254f020c9f63d5652a6a512bd Mon Sep 17 00:00:00 2001 From: Kyle Knoepfel Date: Tue, 21 Apr 2026 17:03:32 -0500 Subject: [PATCH] Create and test accumulator node * Includes adjusting infrastructure for "sendable" types --- phlex/core/concepts.hpp | 5 +- phlex/core/detail/accumulator_node.hpp | 313 +++++++++++++++++++++++++ phlex/core/fold/send.hpp | 19 ++ test/CMakeLists.txt | 1 + test/accumulator_test.cpp | 250 ++++++++++++++++++++ 5 files changed, 585 insertions(+), 3 deletions(-) create mode 100644 phlex/core/detail/accumulator_node.hpp create mode 100644 test/accumulator_test.cpp diff --git a/phlex/core/concepts.hpp b/phlex/core/concepts.hpp index 4fb112f7d..9eda07ace 100644 --- a/phlex/core/concepts.hpp +++ b/phlex/core/concepts.hpp @@ -1,6 +1,7 @@ #ifndef PHLEX_CORE_CONCEPTS_HPP #define PHLEX_CORE_CONCEPTS_HPP +#include "phlex/core/fold/send.hpp" #include "phlex/core/fwd.hpp" #include "phlex/metaprogramming/type_deduction.hpp" #include "phlex/model/fwd.hpp" @@ -14,9 +15,7 @@ namespace phlex::experimental { concept not_void = !std::same_as; template - concept sendable = std::move_constructible || requires(T& t) { - { send(t) } -> std::move_constructible; - }; + concept sendable = std::move_constructible || requires(T const& t) { sendable_type{t}; }; template concept at_least_n_input_parameters = number_parameters >= N; diff --git a/phlex/core/detail/accumulator_node.hpp b/phlex/core/detail/accumulator_node.hpp new file mode 100644 index 000000000..e33a3eff8 --- /dev/null +++ b/phlex/core/detail/accumulator_node.hpp @@ -0,0 +1,313 @@ +#ifndef PHLEX_CORE_DETAIL_ACCUMULATOR_NODE_HPP +#define PHLEX_CORE_DETAIL_ACCUMULATOR_NODE_HPP + +#include "phlex/core/fold/send.hpp" +#include "phlex/core/message.hpp" +#include "phlex/phlex_core_export.hpp" + +#include "oneapi/tbb/concurrent_hash_map.h" +#include "oneapi/tbb/concurrent_queue.h" +#include "oneapi/tbb/flow_graph.h" +#include "spdlog/spdlog.h" + +#include +#include +#include +#include +#include + +namespace phlex::experimental::detail { + + template + class accumulator { + public: + using sendable_t = sendable_type; + + explicit accumulator(std::unique_ptr initial_value) : accumulator_(std::move(initial_value)) + { + } + + template + void call(FT const& f, Args&&... args) + { + std::invoke(f, *accumulator_, args...); + } + + auto release_as_product() + { + auto result = std::move(accumulator_); + if constexpr (requires { send(*result); }) { + return std::make_unique>(send(*result)); + } else { + return std::make_unique>(std::move(*result)); + } + } + + private: + std::unique_ptr accumulator_; + }; + + template + struct accumulator_message { + phlex::data_cell_index_ptr index; + std::shared_ptr> partial_result; + std::size_t id; + + accumulator_message propagate_with_id(std::size_t new_id) const + { + return {.index = index, .partial_result = partial_result, .id = new_id}; + } + + message release_as_message(std::string const& node_name, + product_specifications const& output, + std::size_t original_id) + { + auto store = std::make_shared(index, node_name); + // FIXME: Only read the first output specification, which is a temporary limitation until + // we support multiple outputs from folds. + store->add_product(output.front(), partial_result->release_as_product()); + partial_result.reset(); + return {.store = std::move(store), .id = original_id}; + } + }; + + template + struct accumulator_message_matcher { + std::size_t operator()(accumulator_message const& msg) const { return msg.id; } + }; + + using accumulator_node_input = + std::tuple; + + template + class PHLEX_CORE_EXPORT accumulator_node : + public tbb::flow::composite_node>> { + + using result_initializer_t = std::function(data_cell_index const&)>; + + public: + accumulator_node(tbb::flow::graph& g, + std::string node_name, + identifier partition_layer_name, + product_specifications output, + result_initializer_t initializer); + + tbb::flow::receiver& partition_port(); + tbb::flow::receiver& flush_port(); + tbb::flow::receiver& index_port(); + + bool cache_is_empty() const; + std::size_t cache_size() const; + + ~accumulator_node(); + + private: + using accumulator_msg_t = accumulator_message; + using base_t = + tbb::flow::composite_node>; + using tagged_msg_t = tbb::flow:: + tagged_msg; + using multifunction_node_t = + tbb::flow::multifunction_node>; + + struct cached_accumulator { + std::shared_ptr accumulator_msg; + tbb::concurrent_queue msg_ids{}; + std::atomic counter; + std::atomic_flag flush_received{}; + std::size_t original_message_id{}; + }; + + using cache_t = + tbb::concurrent_hash_map; // Key is the index hash + using accessor = cache_t::accessor; + + void emit_pending_ids(cached_accumulator* entry); + void handle_partition_message(index_message const& msg); + void handle_flush_token(indexed_end_token const& token); + void handle_index_message(index_message const& msg); + void cleanup_cache_entry(accessor& a); + void increment_cache_entry_then_cleanup(std::size_t key); + + tbb::flow::indexer_node indexer_; + multifunction_node_t repeater_; + cache_t cached_results_; + std::string node_name_; + identifier partition_layer_; + result_initializer_t initializer_; + product_specifications output_; + }; + + // ============================================================================== + // Implementation + + template + accumulator_node::accumulator_node(tbb::flow::graph& g, + std::string node_name, + identifier partition_layer_name, + product_specifications output, + result_initializer_t initializer) : + base_t{g}, + indexer_{g}, + repeater_{g, + tbb::flow::unlimited, + [this](tagged_msg_t const& tagged, auto& /* outputs */) { + if (tagged.tag() == 0) { + handle_partition_message(tagged.cast_to()); + } else if (tagged.tag() == 1) { + handle_flush_token(tagged.cast_to()); + } else if (tagged.tag() == 2) { + handle_index_message(tagged.cast_to()); + } else { + assert(tagged.tag() == 3); + // This means that a fold operation has taken place, and an attempt should + // be made to emit the fold result and clean up the cache entry. + increment_cache_entry_then_cleanup(tagged.cast_to()); + } + }}, + node_name_{std::move(node_name)}, + partition_layer_{std::move(partition_layer_name)}, + initializer_{std::move(initializer)}, + output_{std::move(output)} + { + base_t::set_external_ports( + typename base_t::input_ports_type{input_port<0>(indexer_), + input_port<1>(indexer_), + input_port<2>(indexer_), + input_port<3>(indexer_)}, + typename base_t::output_ports_type{output_port<0>(repeater_), output_port<1>(repeater_)}); + make_edge(indexer_, repeater_); + } + + template + tbb::flow::receiver& accumulator_node::partition_port() + { + return input_port<0>(indexer_); + } + + template + tbb::flow::receiver& accumulator_node::flush_port() + { + return input_port<1>(indexer_); + } + + template + tbb::flow::receiver& accumulator_node::index_port() + { + return input_port<2>(indexer_); + } + + template + bool accumulator_node::cache_is_empty() const + { + return cached_results_.empty(); + } + + template + std::size_t accumulator_node::cache_size() const + { + return cached_results_.size(); + } + + template + accumulator_node::~accumulator_node() + { + if (cached_results_.empty()) { + return; + } + + spdlog::warn( + "[{}/{}] Cached accumulators: {}", node_name_, partition_layer_, cached_results_.size()); + for (auto const& [_, cache] : cached_results_) { + if (cache.accumulator_msg) { + spdlog::warn("[{}/{}] Partition {}", + node_name_, + partition_layer_, + cache.accumulator_msg->index->to_string()); + } else { + spdlog::warn("[{}/{}] Partition index not yet received", node_name_, partition_layer_); + } + } + } + + template + void accumulator_node::emit_pending_ids(cached_accumulator* entry) + { + assert(entry->accumulator_msg); + std::size_t msg_id{}; + while (entry->msg_ids.try_pop(msg_id)) { + auto& accum_msg = entry->accumulator_msg; + output_port<1>(repeater_).try_put(accum_msg->propagate_with_id(msg_id)); + } + } + + template + void accumulator_node::handle_partition_message(index_message const& msg) + { + auto const key = msg.index->hash(); + + accessor a; + cached_results_.insert(a, key); + auto* entry = &a->second; + entry->accumulator_msg.reset(new accumulator_msg_t{ + .index = msg.index, + .partial_result = std::make_shared>(initializer_(*msg.index))}); + entry->original_message_id = msg.msg_id; + emit_pending_ids(entry); + } + + template + void accumulator_node::handle_flush_token(indexed_end_token const& token) + { + auto const& [index, count] = token; + accessor a; + cached_results_.insert(a, index->hash()); + auto* entry = &a->second; + entry->counter -= count; + std::ignore = entry->flush_received.test_and_set(); + cleanup_cache_entry(a); + } + + template + void accumulator_node::handle_index_message(index_message const& msg) + { + auto const& [index, msg_id, cache] = msg; + assert(cache); + auto const key = index->hash(); + + accessor a; + cached_results_.insert(a, key); + auto* entry = &a->second; + if (auto& accum_msg = entry->accumulator_msg) { + output_port<1>(repeater_).try_put(accum_msg->propagate_with_id(msg_id)); + emit_pending_ids(entry); + } else { + entry->msg_ids.push(msg_id); + } + } + + template + void accumulator_node::cleanup_cache_entry(accessor& a) + { + auto* entry = &a->second; + if (entry->flush_received.test() and entry->counter == 0) { + output_port<0>(repeater_).try_put(entry->accumulator_msg->release_as_message( + node_name_, output_, entry->original_message_id)); + cached_results_.erase(a); + } + } + + template + void accumulator_node::increment_cache_entry_then_cleanup(std::size_t key) + { + accessor a; + if (!cached_results_.find(a, key)) { + return; + } + ++a->second.counter; + cleanup_cache_entry(a); + } +} + +#endif // PHLEX_CORE_DETAIL_ACCUMULATOR_NODE_HPP diff --git a/phlex/core/fold/send.hpp b/phlex/core/fold/send.hpp index 01a0a657d..b2df05132 100644 --- a/phlex/core/fold/send.hpp +++ b/phlex/core/fold/send.hpp @@ -20,6 +20,25 @@ namespace phlex::experimental { { return a.load(); } + + template + struct sendable_type_impl {}; + + template + struct sendable_type_impl { + using type = T; + }; + + template + requires requires(T const& t) { + { send(t) } -> std::move_constructible; + } + struct sendable_type_impl { + using type = decltype(send(std::declval())); + }; + + template + using sendable_type = typename sendable_type_impl::type; } #endif // PHLEX_CORE_FOLD_SEND_HPP diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 79b60c0f4..d5e5f0c83 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -11,6 +11,7 @@ include(CetTest) cet_test_env(SPDLOG_LEVEL=debug) +cet_test(accumulator USE_CATCH2_MAIN SOURCE accumulator_test.cpp LIBRARIES phlex::core_internal) cet_test(concepts SOURCE concepts.cpp LIBRARIES phlex::core_internal) cet_test( adjust_config diff --git a/test/accumulator_test.cpp b/test/accumulator_test.cpp new file mode 100644 index 000000000..fe8a28723 --- /dev/null +++ b/test/accumulator_test.cpp @@ -0,0 +1,250 @@ +#include "phlex/core/detail/accumulator_node.hpp" +#include "phlex/model/data_cell_index.hpp" +#include "phlex/model/product_specification.hpp" + +#include "catch2/catch_test_macros.hpp" +#include "catch2/matchers/catch_matchers_string.hpp" +#include "oneapi/tbb/flow_graph.h" +#include "spdlog/sinks/ostream_sink.h" +#include "spdlog/spdlog.h" + +#include +#include +#include +#include +#include + +using namespace oneapi; +using namespace phlex::experimental; + +namespace { + auto make_run_index(int run_number) + { + return phlex::data_cell_index::job()->make_child("run", run_number); + } + + class message_collector : public tbb::flow::function_node { + public: + explicit message_collector(tbb::flow::graph& g) : + tbb::flow::function_node{g, tbb::flow::serial, [this](message const& msg) { + messages_.emplace(msg.store->index()->hash(), msg); + }} + { + } + + std::size_t received_result_count() const { return messages_.size(); } + + std::pair fold_result(phlex::data_cell_index_ptr const& idx) const + { + auto it = messages_.find(idx->hash()); + assert(it != messages_.end()); + return {it->second.store->get_product(product_specification{}), it->second.id}; + } + + private: + std::map messages_; + }; + + void use_ostream_logger(std::ostringstream& oss) + { + auto ostream_sink = std::make_shared(oss); + auto ostream_logger = std::make_shared("my_logger", ostream_sink); + spdlog::set_default_logger(ostream_logger); + } + + // Fold function: increments the accumulator by 1 per call. The accumulated value thus equals + // the number of data cells processed. + auto count_fold(detail::accumulator_message const& msg) -> std::size_t + { + msg.partial_result->call([](int& acc) { acc += 1; }); + return msg.index->hash(); + } + + class accumulator_test_fixture { + public: + explicit accumulator_test_fixture(std::string node_name) : + accumulator_{g_, + std::move(node_name), + "run"_id, + product_specifications(1), + [](phlex::data_cell_index const&) { return std::make_unique(0); }}, + fold_op_{g_, tbb::flow::unlimited, count_fold}, + result_consumer_{g_} + { + make_edge(output_port<0>(accumulator_), result_consumer_); + make_edge(output_port<1>(accumulator_), fold_op_); + make_edge(fold_op_, input_port<3>(accumulator_)); + } + + void put_partition(index_message const& msg) { accumulator_.partition_port().try_put(msg); } + void put_index_message(index_message const& msg) { accumulator_.index_port().try_put(msg); } + void put_flush_token(indexed_end_token const& t) { accumulator_.flush_port().try_put(t); } + + void wait_for_all() { g_.wait_for_all(); } + + bool cache_is_empty() const { return accumulator_.cache_is_empty(); } + std::size_t cache_size() const { return accumulator_.cache_size(); } + std::size_t result_count() const { return result_consumer_.received_result_count(); } + + std::pair fold_result(phlex::data_cell_index_ptr const& idx) const + { + return result_consumer_.fold_result(idx); + } + + private: + tbb::flow::graph g_; + detail::accumulator_node accumulator_; + tbb::flow::function_node, std::size_t> fold_op_; + message_collector result_consumer_; + }; +} + +TEST_CASE("Test accumulator basic functionality", "[multithreading]") +{ + auto partition_index = make_run_index(1); + accumulator_test_fixture fixture{"test_accumulator_multiple_cells"}; + + fixture.put_partition({.index = partition_index, .msg_id = 1}); + auto const num_data_cells = 5; + for (std::size_t i = 0; i < num_data_cells; ++i) { + fixture.put_index_message({.index = partition_index, .msg_id = 2 + i, .cache = true}); + } + fixture.wait_for_all(); + + // Fold is complete but no flush received — result should not be emitted yet + CHECK(fixture.result_count() == 0); + CHECK(fixture.cache_size() == 1); + + fixture.put_flush_token({.index = partition_index, .count = num_data_cells}); + fixture.wait_for_all(); + + // Exactly one result carrying the accumulated count + REQUIRE(fixture.result_count() == 1); + CHECK( + fixture.fold_result(partition_index) == + std::pair{num_data_cells, 1}); // Fold value equals number of data cells, original message ID 1 + CHECK(fixture.cache_is_empty()); +} + +TEST_CASE("Test accumulator with multiple partitions", "[multithreading]") +{ + auto run1 = make_run_index(1); + auto run2 = make_run_index(2); + accumulator_test_fixture fixture{"test_accumulator_multiple_partitions"}; + + // Feed both partitions interleaved + fixture.put_partition({.index = run1, .msg_id = 1}); + fixture.put_partition({.index = run2, .msg_id = 2}); + + // 3 data cells for run1, 2 for run2, interleaved + fixture.put_index_message({.index = run1, .msg_id = 3, .cache = true}); + fixture.put_index_message({.index = run2, .msg_id = 4, .cache = true}); + fixture.put_index_message({.index = run1, .msg_id = 5, .cache = true}); + fixture.put_index_message({.index = run2, .msg_id = 6, .cache = true}); + fixture.put_index_message({.index = run1, .msg_id = 7, .cache = true}); + + fixture.put_flush_token({.index = run1, .count = 3}); + fixture.put_flush_token({.index = run2, .count = 2}); + fixture.wait_for_all(); + + // One result per partition: run1 accumulates 3 calls, run2 accumulates 2 calls + REQUIRE(fixture.result_count() == 2); + CHECK(fixture.fold_result(run1) == std::pair{3, 1}); // 3 fold operations, original message ID 1 + CHECK(fixture.fold_result(run2) == std::pair{2, 2}); // 2 fold operations, original message ID 2 + CHECK(fixture.cache_is_empty()); +} + +TEST_CASE("Test accumulator with index messages before partition", "[multithreading]") +{ + auto partition_index = make_run_index(1); + accumulator_test_fixture fixture{"test_accumulator_index_before_partition"}; + + // Send index messages before the partition message — they should be queued + fixture.put_index_message({.index = partition_index, .msg_id = 1, .cache = true}); + fixture.put_index_message({.index = partition_index, .msg_id = 2, .cache = true}); + fixture.wait_for_all(); + + // Cache entry exists but accumulator is not yet created, so no fold operations have run + CHECK(fixture.result_count() == 0); + CHECK(fixture.cache_size() == 1); + + // Partition arrives — pending index messages are dispatched and folds run + fixture.put_partition({.index = partition_index, .msg_id = 3}); + fixture.wait_for_all(); + + // Fold operations are done but no flush received yet + CHECK(fixture.result_count() == 0); + + fixture.put_flush_token({.index = partition_index, .count = 2}); + fixture.wait_for_all(); + + REQUIRE(fixture.result_count() == 1); + CHECK(fixture.fold_result(partition_index) == + std::pair{2, 3}); // 2 fold operations, original message ID 3 + CHECK(fixture.cache_is_empty()); +} + +TEST_CASE("Test accumulator warning message if cache is not flushed", "[multithreading]") +{ + std::ostringstream oss; + use_ostream_logger(oss); + + tbb::flow::graph g; + auto accumulator = std::make_unique>( + g, + "test_accumulator_warning", + "run"_id, + product_specifications(1), + [](phlex::data_cell_index const&) { return std::make_unique(0); }); + + SECTION("Partition received, no flush") + { + accumulator->partition_port().try_put({.index = make_run_index(1), .msg_id = 1}); + g.wait_for_all(); + + CHECK(accumulator->cache_size() == 1); + + accumulator.reset(); // Invoke destructor to trigger warning + + auto const warning = oss.str(); + CHECK_THAT(warning, Catch::Matchers::ContainsSubstring("Cached accumulators: 1")); + CHECK_THAT(warning, Catch::Matchers::ContainsSubstring("Partition [run:1]")); + } + + SECTION("Index message received, no partition") + { + accumulator->index_port().try_put({.index = make_run_index(1), .msg_id = 1, .cache = true}); + g.wait_for_all(); + + CHECK(accumulator->cache_size() == 1); + + accumulator.reset(); // Invoke destructor to trigger warning + + auto const warning = oss.str(); + CHECK_THAT(warning, Catch::Matchers::ContainsSubstring("Cached accumulators: 1")); + CHECK_THAT(warning, Catch::Matchers::ContainsSubstring("Partition index not yet received")); + } +} + +TEST_CASE("Test accumulator with zero data cells emits initial value", "[multithreading]") +{ + // This exercises the case where a partition exists but no data cells fall within it + // (e.g. all events were filtered out). A flush with count=0 should immediately emit the + // initial accumulator value without waiting for any fold operations. + auto partition_index = make_run_index(1); + accumulator_test_fixture fixture{"test_accumulator_zero_cells"}; + + fixture.put_partition({.index = partition_index, .msg_id = 1}); + fixture.wait_for_all(); + + CHECK(fixture.result_count() == 0); + CHECK(fixture.cache_size() == 1); + + fixture.put_flush_token({.index = partition_index, .count = 0}); + fixture.wait_for_all(); + + REQUIRE(fixture.result_count() == 1); + CHECK(fixture.fold_result(partition_index) == + std::pair{0, 1}); // Initial value, original message ID 1 + CHECK(fixture.cache_is_empty()); +}