diff --git a/phlex/core/CMakeLists.txt b/phlex/core/CMakeLists.txt index 660f5a518..c9672508e 100644 --- a/phlex/core/CMakeLists.txt +++ b/phlex/core/CMakeLists.txt @@ -24,10 +24,11 @@ cet_make_library( message.cpp node_catalog.cpp index_router.cpp + product_query.cpp + product_registry.cpp products_consumer.cpp registrar.cpp registration_api.cpp - product_query.cpp store_counters.cpp LIBRARIES PUBLIC @@ -62,6 +63,7 @@ install( multilayer_join_node.hpp index_router.hpp node_catalog.hpp + product_registry.hpp product_query.hpp products_consumer.hpp registrar.hpp diff --git a/phlex/core/declared_fold.cpp b/phlex/core/declared_fold.cpp index 54cd24018..7f6aaba76 100644 --- a/phlex/core/declared_fold.cpp +++ b/phlex/core/declared_fold.cpp @@ -3,8 +3,9 @@ namespace phlex::experimental { declared_fold::declared_fold(algorithm_name name, std::vector predicates, - product_queries input_products) : - products_consumer{std::move(name), std::move(predicates), std::move(input_products)} + product_queries input_products, + product_registry const& registry) : + products_consumer{std::move(name), std::move(predicates), std::move(input_products), registry} { } diff --git a/phlex/core/declared_fold.hpp b/phlex/core/declared_fold.hpp index 44d0ae8b9..5a64ec1c8 100644 --- a/phlex/core/declared_fold.hpp +++ b/phlex/core/declared_fold.hpp @@ -40,12 +40,14 @@ namespace phlex::experimental { public: declared_fold(algorithm_name name, std::vector predicates, - product_queries input_products); + product_queries input_products, + product_registry const& registry); virtual ~declared_fold(); virtual tbb::flow::sender& output_port() = 0; virtual tbb::flow::receiver& flush_port() = 0; virtual product_specifications const& output() const = 0; + virtual identifier const& layer() const = 0; virtual std::size_t product_count() const = 0; }; @@ -73,8 +75,9 @@ namespace phlex::experimental { InitTuple initializer, product_queries input_products, std::vector output, - std::string partition) : - declared_fold{std::move(name), std::move(predicates), std::move(input_products)}, + std::string partition, + product_registry const& registry) : + declared_fold{std::move(name), std::move(predicates), std::move(input_products), registry}, initializer_{std::move(initializer)}, output_{ to_product_specifications(full_name(), std::move(output), make_type_ids())}, @@ -122,6 +125,17 @@ namespace phlex::experimental { } } + product_specifications const& output() const override { return output_; } + identifier const& layer() const override { return partition_; } + + tbb::flow::receiver& port_for(product_query const& input_product) override + { + return receiver_for(join_, input(), input_product, fold_); + } + + tbb::flow::receiver& flush_port() override { return flush_receiver_; } + tbb::flow::sender& output_port() override { return tbb::flow::output_port<0>(fold_); } + private: void emit_and_evict_if_done(data_cell_index_ptr const& fold_index) { @@ -134,20 +148,11 @@ namespace phlex::experimental { } } - tbb::flow::receiver& port_for(product_query const& input_product) override - { - return receiver_for(join_, input(), input_product, fold_); - } - std::vector*> ports() override { return input_ports(join_, fold_); } - tbb::flow::receiver& flush_port() override { return flush_receiver_; } - tbb::flow::sender& output_port() override { return tbb::flow::output_port<0>(fold_); } - product_specifications const& output() const override { return output_; } - template void call(function_t const& ft, messages_t const& messages, diff --git a/phlex/core/declared_observer.cpp b/phlex/core/declared_observer.cpp index 9661cfeb4..e56c1d118 100644 --- a/phlex/core/declared_observer.cpp +++ b/phlex/core/declared_observer.cpp @@ -3,8 +3,9 @@ namespace phlex::experimental { declared_observer::declared_observer(algorithm_name name, std::vector predicates, - product_queries input_products) : - products_consumer{std::move(name), std::move(predicates), std::move(input_products)} + product_queries input_products, + product_registry const& registry) : + products_consumer{std::move(name), std::move(predicates), std::move(input_products), registry} { } diff --git a/phlex/core/declared_observer.hpp b/phlex/core/declared_observer.hpp index 4a70f26f4..4fa423618 100644 --- a/phlex/core/declared_observer.hpp +++ b/phlex/core/declared_observer.hpp @@ -36,7 +36,8 @@ namespace phlex::experimental { public: declared_observer(algorithm_name name, std::vector predicates, - product_queries input_products); + product_queries input_products, + product_registry const& registry); virtual ~declared_observer(); }; @@ -60,8 +61,10 @@ namespace phlex::experimental { std::vector predicates, tbb::flow::graph& g, AlgorithmBits alg, - product_queries input_products) : - declared_observer{std::move(name), std::move(predicates), std::move(input_products)}, + product_queries input_products, + product_registry const& registry) : + declared_observer{ + std::move(name), std::move(predicates), std::move(input_products), registry}, join_{make_join_or_none(g, full_name(), layers())}, observer_{g, concurrency, diff --git a/phlex/core/declared_predicate.cpp b/phlex/core/declared_predicate.cpp index 056d8ea9e..d5f8f1897 100644 --- a/phlex/core/declared_predicate.cpp +++ b/phlex/core/declared_predicate.cpp @@ -3,8 +3,9 @@ namespace phlex::experimental { declared_predicate::declared_predicate(algorithm_name name, std::vector predicates, - product_queries input_products) : - products_consumer{std::move(name), std::move(predicates), std::move(input_products)} + product_queries input_products, + product_registry const& registry) : + products_consumer{std::move(name), std::move(predicates), std::move(input_products), registry} { } diff --git a/phlex/core/declared_predicate.hpp b/phlex/core/declared_predicate.hpp index 82334667b..14d55cc4f 100644 --- a/phlex/core/declared_predicate.hpp +++ b/phlex/core/declared_predicate.hpp @@ -38,7 +38,8 @@ namespace phlex::experimental { public: declared_predicate(algorithm_name name, std::vector predicates, - product_queries input_products); + product_queries input_products, + product_registry const& registry); virtual ~declared_predicate(); virtual tbb::flow::sender& sender() = 0; @@ -64,8 +65,10 @@ namespace phlex::experimental { std::vector predicates, tbb::flow::graph& g, AlgorithmBits alg, - product_queries input_products) : - declared_predicate{std::move(name), std::move(predicates), std::move(input_products)}, + product_queries input_products, + product_registry const& registry) : + declared_predicate{ + std::move(name), std::move(predicates), std::move(input_products), registry}, join_{make_join_or_none(g, full_name(), layers())}, predicate_{g, concurrency, diff --git a/phlex/core/declared_provider.cpp b/phlex/core/declared_provider.cpp index da4e98209..23537f1fc 100644 --- a/phlex/core/declared_provider.cpp +++ b/phlex/core/declared_provider.cpp @@ -15,5 +15,5 @@ namespace phlex::experimental { return output_product_; } - identifier const& declared_provider::layer() const noexcept { return output_product_.layer; } + identifier const& declared_provider::layer() const noexcept { return *output_product_.layer; } } diff --git a/phlex/core/declared_provider.hpp b/phlex/core/declared_provider.hpp index 0d1fbd5c7..333cfbaee 100644 --- a/phlex/core/declared_provider.hpp +++ b/phlex/core/declared_provider.hpp @@ -56,9 +56,10 @@ namespace phlex::experimental { std::size_t concurrency, tbb::flow::graph& g, AlgorithmBits alg, - product_query output) : + product_query output, + product_registry const& /* unused */) : declared_provider{std::move(name), output}, - output_{algorithm_name::create(std::string_view(identifier(output.creator))), + output_{algorithm_name::create(std::string_view(*output.creator)), output.suffix.value_or(identifier("")), output.type}, provider_{g, diff --git a/phlex/core/declared_transform.cpp b/phlex/core/declared_transform.cpp index 52bdd5197..32c0d8f42 100644 --- a/phlex/core/declared_transform.cpp +++ b/phlex/core/declared_transform.cpp @@ -3,8 +3,9 @@ namespace phlex::experimental { declared_transform::declared_transform(algorithm_name name, std::vector predicates, - product_queries input_products) : - products_consumer{std::move(name), std::move(predicates), std::move(input_products)} + product_queries input_products, + product_registry const& registry) : + products_consumer{std::move(name), std::move(predicates), std::move(input_products), registry} { } diff --git a/phlex/core/declared_transform.hpp b/phlex/core/declared_transform.hpp index 7e64a4f33..81156f6f3 100644 --- a/phlex/core/declared_transform.hpp +++ b/phlex/core/declared_transform.hpp @@ -42,7 +42,8 @@ namespace phlex::experimental { public: declared_transform(algorithm_name name, std::vector predicates, - product_queries input_products); + product_queries input_products, + product_registry const& registry); virtual ~declared_transform(); virtual tbb::flow::sender& output_port() = 0; @@ -73,8 +74,10 @@ namespace phlex::experimental { tbb::flow::graph& g, AlgorithmBits alg, product_queries input_products, - std::vector output) : - declared_transform{std::move(name), std::move(predicates), std::move(input_products)}, + std::vector output, + product_registry const& registry) : + declared_transform{ + std::move(name), std::move(predicates), std::move(input_products), registry}, output_{to_product_specifications( full_name(), std::move(output), make_output_type_ids())}, join_{make_join_or_none(g, full_name(), layers())}, @@ -102,6 +105,8 @@ namespace phlex::experimental { } } + product_specifications const& output() const override { return output_; } + private: tbb::flow::receiver& port_for(product_query const& input_product) override { @@ -117,7 +122,6 @@ namespace phlex::experimental { { return tbb::flow::output_port<0>(transform_); } - product_specifications const& output() const override { return output_; } template auto call(function_t const& ft, diff --git a/phlex/core/declared_unfold.cpp b/phlex/core/declared_unfold.cpp index 640a92433..77cd0703a 100644 --- a/phlex/core/declared_unfold.cpp +++ b/phlex/core/declared_unfold.cpp @@ -34,8 +34,9 @@ namespace phlex::experimental { declared_unfold::declared_unfold(algorithm_name name, std::vector predicates, product_queries input_products, - std::string child_layer) : - products_consumer{std::move(name), std::move(predicates), std::move(input_products)}, + std::string child_layer, + product_registry const& registry) : + products_consumer{std::move(name), std::move(predicates), std::move(input_products), registry}, child_layer_{std::move(child_layer)} { } diff --git a/phlex/core/declared_unfold.hpp b/phlex/core/declared_unfold.hpp index b9d2c5eff..f32847ae3 100644 --- a/phlex/core/declared_unfold.hpp +++ b/phlex/core/declared_unfold.hpp @@ -60,7 +60,8 @@ namespace phlex::experimental { declared_unfold(algorithm_name name, std::vector predicates, product_queries input_products, - std::string child_layer); + std::string child_layer, + product_registry const& registry); virtual ~declared_unfold(); virtual tbb::flow::sender& output_port() = 0; @@ -69,6 +70,8 @@ namespace phlex::experimental { virtual std::size_t product_count() const = 0; virtual flusher_t& flusher() = 0; + // Looks weird but this reduces the amount of special code in product registration + identifier layer() const noexcept { return identifier(child_layer_); } std::string const& child_layer() const noexcept { return child_layer_; } private: @@ -95,11 +98,13 @@ namespace phlex::experimental { Unfold&& unfold, product_queries input_products, std::vector output_product_suffixes, - std::string child_layer_name) : + std::string child_layer_name, + product_registry const& registry) : declared_unfold{std::move(name), std::move(predicates), std::move(input_products), - std::move(child_layer_name)}, + std::move(child_layer_name), + registry}, output_{to_product_specifications(full_name(), std::move(output_product_suffixes), make_type_ids>>())}, @@ -131,10 +136,6 @@ namespace phlex::experimental { { return receiver_for(join_, input(), input_product, unfold_); } - std::vector*> ports() override - { - return input_ports(join_, unfold_); - } tbb::flow::sender& output_port() override { @@ -146,6 +147,10 @@ namespace phlex::experimental { } product_specifications const& output() const override { return output_; } flusher_t& flusher() override { return flusher_; } + std::vector*> ports() override + { + return input_ports(join_, unfold_); + } template void call(Predicate const& predicate, diff --git a/phlex/core/framework_graph.cpp b/phlex/core/framework_graph.cpp index 3e32feb69..d21a52e86 100644 --- a/phlex/core/framework_graph.cpp +++ b/phlex/core/framework_graph.cpp @@ -159,7 +159,11 @@ namespace phlex::experimental { std::set flushers; // For providers for (product_query const& pq : n->input()) { - if (auto it = unfold_flushers.find(pq.layer); it != unfold_flushers.end()) { + if (!pq.layer) { + spdlog::error("The input product query {} has no layer specified", pq); + continue; + } + if (auto it = unfold_flushers.find(*pq.layer); it != unfold_flushers.end()) { flushers.insert(it->second); } else { flushers.insert(&index_router_.flusher()); diff --git a/phlex/core/fwd.hpp b/phlex/core/fwd.hpp index 076f4073f..7de6f3672 100644 --- a/phlex/core/fwd.hpp +++ b/phlex/core/fwd.hpp @@ -14,6 +14,7 @@ namespace phlex::experimental { struct message; class index_router; class products_consumer; + class product_registry; using flusher_t = tbb::flow::broadcast_node; } diff --git a/phlex/core/index_router.cpp b/phlex/core/index_router.cpp index 7d81abb0f..bafe1ac74 100644 --- a/phlex/core/index_router.cpp +++ b/phlex/core/index_router.cpp @@ -136,8 +136,12 @@ namespace phlex::experimental { // Create the index-set broadcast nodes for providers for (auto& [pq, provider_port] : provider_input_ports_ | std::views::values) { - auto [it, _] = broadcasters_.try_emplace(static_cast(pq.layer), - std::make_shared(g)); + if (!pq.layer) { + spdlog::error("The input product query {} has not layer specified", pq); + continue; + } + auto [it, _] = + broadcasters_.try_emplace(*pq.layer, std::make_shared(g)); make_edge(*it->second, *provider_port); } diff --git a/phlex/core/node_catalog.hpp b/phlex/core/node_catalog.hpp index 0d661c3fb..6351824b9 100644 --- a/phlex/core/node_catalog.hpp +++ b/phlex/core/node_catalog.hpp @@ -10,6 +10,7 @@ #include "phlex/core/declared_provider.hpp" #include "phlex/core/declared_transform.hpp" #include "phlex/core/declared_unfold.hpp" +#include "phlex/core/product_registry.hpp" #include "phlex/core/registrar.hpp" #include "phlex/utilities/simple_ptr_map.hpp" @@ -23,7 +24,7 @@ namespace phlex::experimental { template auto registrar_for(std::vector& errors) { - return registrar{boost::pfr::get>(*this), errors}; + return registrar{boost::pfr::get>(*this), errors, registry}; } std::size_t execution_count(std::string const& node_name) const; @@ -35,6 +36,8 @@ namespace phlex::experimental { simple_ptr_map unfolds{}; simple_ptr_map transforms{}; simple_ptr_map providers{}; + + product_registry registry{}; }; } diff --git a/phlex/core/product_query.cpp b/phlex/core/product_query.cpp index e91422f3e..f335cfae1 100644 --- a/phlex/core/product_query.cpp +++ b/phlex/core/product_query.cpp @@ -7,10 +7,10 @@ namespace phlex { bool product_query::match(product_query const& other) const { using experimental::identifier; - if (identifier(creator) != identifier(other.creator)) { + if (creator && creator != other.creator) { return false; } - if (identifier(layer) != identifier(other.layer)) { + if (layer && layer != other.layer) { return false; } if (suffix && suffix != other.suffix) { @@ -29,48 +29,37 @@ namespace phlex { // Check if a product_specification satisfies this query bool product_query::match(experimental::product_specification const& spec) const { - experimental::identifier tmp_creator{this->creator}; - if (tmp_creator != spec.algorithm() && tmp_creator != spec.plugin()) { + if (creator && creator != spec.algorithm() && creator != spec.plugin()) { return false; } if (type != spec.type()) { return false; } - if (suffix) { - if (*suffix != spec.suffix()) { - return false; - } + if (suffix && suffix != spec.suffix()) { + return false; } return true; } std::string product_query::to_string() const { - if (suffix) { - return fmt::format("{}/{} ϵ {}", creator, *suffix, layer); - } - return fmt::format("{} ϵ {}", creator, layer); + std::string_view creator_sv = creator ? std::string_view(*creator) : "[ANY]"; + std::string_view layer_sv = layer ? std::string_view(*layer) : "[ANY]"; + std::string_view suffix_sv = suffix ? std::string_view(*suffix) : "[ANY]"; + return fmt::format("{}/{} ϵ {}", creator_sv, suffix_sv, layer_sv); } bool product_query::operator==(product_query const& rhs) const { using experimental::identifier; - return (type == rhs.type) && (identifier(creator) == identifier(rhs.creator)) && - (identifier(layer) == identifier(rhs.layer)) && (suffix == rhs.suffix) && - (stage == rhs.stage); + return (type == rhs.type) && (creator == rhs.creator) && (layer == rhs.layer) && + (suffix == rhs.suffix) && (stage == rhs.stage); } std::strong_ordering product_query::operator<=>(product_query const& rhs) const { using experimental::identifier; - return std::tie(type, - static_cast(creator), - static_cast(layer), - suffix, - stage) <=> std::tie(rhs.type, - static_cast(rhs.creator), - static_cast(rhs.layer), - rhs.suffix, - rhs.stage); + return std::tie(type, creator, layer, suffix, stage) <=> + std::tie(rhs.type, rhs.creator, rhs.layer, rhs.suffix, rhs.stage); } experimental::product_specification const* resolve_in_store( diff --git a/phlex/core/product_query.hpp b/phlex/core/product_query.hpp index 1dfa2c8b5..104fbdd2f 100644 --- a/phlex/core/product_query.hpp +++ b/phlex/core/product_query.hpp @@ -19,57 +19,9 @@ using namespace phlex::experimental::literals; namespace phlex { - namespace detail { - // The required_creator_name has to be a template for static_assert(false) - template T> - class required_creator_name { - public: - required_creator_name() - { - static_assert(false, "The creator name has not been set in this product_query."); - } - template - requires std::constructible_from - required_creator_name(U&& rhs) : content_(std::forward_like(rhs)) - { - if (content_.empty()) { - throw std::runtime_error("Cannot specify product with empty creator name."); - } - } - - operator T const&() const noexcept { return content_; } - - private: - experimental::identifier content_; - }; - - // The required_layer_name has to be a template for static_assert(false) - template T> - class required_layer_name { - public: - required_layer_name() - { - static_assert(false, "The layer name has not been set in this product_query."); - } - template - requires std::constructible_from - required_layer_name(U&& rhs) : content_(std::forward_like(rhs)) - { - if (content_.empty()) { - throw std::runtime_error("Cannot specify the empty string as a data layer."); - } - } - - operator T const&() const noexcept { return content_; } - - private: - experimental::identifier content_; - }; - } - struct PHLEX_CORE_EXPORT product_query { - detail::required_creator_name creator; - detail::required_layer_name layer; + std::optional creator; + std::optional layer; std::optional suffix; std::optional stage; experimental::type_id type; diff --git a/phlex/core/product_registry.cpp b/phlex/core/product_registry.cpp new file mode 100644 index 000000000..9629b462e --- /dev/null +++ b/phlex/core/product_registry.cpp @@ -0,0 +1,161 @@ +#include "phlex/core/product_registry.hpp" + +#include "spdlog/spdlog.h" + +#include +#include + +namespace phlex::experimental { + namespace { + template + using set_t = std::set; + std::string store_listing_for_error(product_registry::store_t const& store) + { + namespace views = std::ranges::views; + auto list = store | views::transform([](detail::full_product_spec const& spec) { + return fmt::format(" - {}:{}/{} [{}] ∈ {} FROM {}", + spec.plugin(), + spec.algorithm(), + spec.suffix(), + spec.type(), + spec.layer, + spec.stage); + }); + return fmt::format("{}", fmt::join(list, "\n")); + } + + // convert the pair of iterators returned from equal_range to a set + template + auto make_set(std::pair const& its) + { + using ret_t = set_t; + auto [b, e] = its; + return std::ranges::subrange(b, e) | std::ranges::views::values | std::ranges::to(); + } + + template + set_t intersect(set_t const& lhs, set_t const& rhs) + { + set_t result; + std::ranges::set_intersection(lhs, rhs, std::inserter(result, result.begin())); + return result; + } + } + // Add the product to store_ and add entries to each map + void product_registry::add_product(product_specification spec, identifier layer, identifier stage) + { + auto* ptr = &store_.emplace_front(std::move(spec), std::move(layer), std::move(stage)); + spdlog::debug("Adding product {}:{}/{} [{}] ∈ {}", + ptr->plugin(), + ptr->algorithm(), + ptr->suffix(), + ptr->type(), + ptr->layer); + plugin_map_.emplace(std::cref(ptr->plugin()), ptr); + algorithm_map_.emplace(std::cref(ptr->algorithm()), ptr); + suffix_map_.emplace(std::cref(ptr->suffix()), ptr); + layer_map_.emplace(std::cref(ptr->layer), ptr); + type_map_.emplace(ptr->type(), ptr); + } + + // Lookup a product query by checking each map and finding the intersection of the results + detail::full_product_spec const& product_registry::lookup(product_query const& query) const + { + spdlog::debug("Looking up product {}", query); + std::forward_list result_sets; + if (query.type.valid()) { + auto res = make_set(type_map_.equal_range(query.type)); + if (res.empty()) { + throw std::runtime_error(fmt::format( + "Could not find {} in product registry (no matching type). Contents are:\n{}\n", + query, + store_listing_for_error(store_))); + } + result_sets.push_front(std::move(res)); + } + if (query.creator) { + auto creator = std::string_view(*query.creator); + // If the creator name contains a `:` we have both a plugin name and an algorithm name. + // Otherwise the creator name can match the plugin OR the algorithm + if (creator.contains(':')) { + auto plugin = identifier(creator.substr(0, creator.find(':'))); + auto algorithm = identifier(creator.substr(creator.find(':') + 1)); + auto plugin_res = make_set(plugin_map_.equal_range(plugin)); + if (plugin_res.empty()) { + throw std::runtime_error(fmt::format("Could not find {} in product registry (no matching " + "creator plugin). Contents are:\n{}\n", + query, + store_listing_for_error(store_))); + } + result_sets.push_front(std::move(plugin_res)); + auto algo_res = make_set(algorithm_map_.equal_range(algorithm)); + if (algo_res.empty()) { + throw std::runtime_error(fmt::format("Could not find {} in product registry (no matching " + "creator algorithm). Contents are:\n{}\n", + query, + store_listing_for_error(store_))); + } + result_sets.push_front(std::move(algo_res)); + } else { + auto plugin_res = make_set(plugin_map_.equal_range(*query.creator)); + auto algo_res = make_set(algorithm_map_.equal_range(*query.creator)); + plugin_res.merge(std::move(algo_res)); + if (plugin_res.empty()) { + throw std::runtime_error(fmt::format( + "Could not find {} in product registry (no matching creator). Contents are:\n{}\n", + query, + store_listing_for_error(store_))); + } + result_sets.push_front(std::move(plugin_res)); + } + } + if (query.suffix) { + auto res = make_set(suffix_map_.equal_range(*query.suffix)); + if (res.empty()) { + throw std::runtime_error(fmt::format( + "Could not find {} in product registry (no matching suffix). Contents are:\n{}\n", + query, + store_listing_for_error(store_))); + } + result_sets.push_front(std::move(res)); + } + if (query.layer) { + auto res = make_set(layer_map_.equal_range(*query.layer)); + if (res.empty()) { + throw std::runtime_error(fmt::format( + "Could not find {} in product registry (no matching layer). Contents are:\n{}\n", + query, + store_listing_for_error(store_))); + } + result_sets.push_front(std::move(res)); + } + if (query.stage) { + auto res = make_set(stage_map_.equal_range(*query.stage)); + if (res.empty()) { + throw std::runtime_error(fmt::format( + "Could not find {} in product registry (no matching stage). Contents are:\n{}\n", + query, + store_listing_for_error(store_))); + } + result_sets.push_front(std::move(res)); + } + + if (result_sets.empty()) { + throw std::runtime_error(fmt::format("Query {} is empty!", query)); + } + auto result = *std::ranges::fold_left_first(result_sets, intersect); + if (result.empty()) { + throw std::runtime_error(fmt::format("Could not find {} in product registry (nothing matches " + "all constraints). Contents are:\n{}\n", + query, + store_listing_for_error(store_))); + } + if (result.size() > 1) { + throw std::runtime_error( + fmt::format("Multiple matches for {} in product registry. Contents are:\n{}\n", + query, + store_listing_for_error(store_))); + } + return **result.begin(); + } +} diff --git a/phlex/core/product_registry.hpp b/phlex/core/product_registry.hpp new file mode 100644 index 000000000..a085ee3b9 --- /dev/null +++ b/phlex/core/product_registry.hpp @@ -0,0 +1,54 @@ +#ifndef PHLEX_CORE_PRODUCT_REGISTRY_HPP +#define PHLEX_CORE_PRODUCT_REGISTRY_HPP + +#include "phlex/core/product_query.hpp" +#include "phlex/model/product_specification.hpp" + +#include +#include +#include + +namespace phlex::experimental { + namespace detail { + struct full_product_spec { + product_specification spec; + identifier layer; + identifier stage; + + identifier const& plugin() const { return spec.plugin(); } + identifier const& algorithm() const { return spec.algorithm(); } + identifier const& suffix() const { return spec.suffix(); } + type_id type() const { return spec.type(); } + }; + } + class product_registry { + public: + // We just need a container with cheap insertion (somewhere) that doesn't + // invalidate iterators on insertion. Nothing gets accessed through the container, + // that all happens through a set of multimaps. + using store_t = std::forward_list; + // NB: clang-tidy's suggestion to use std::less<> causes a compile error + using map_t = std::multimap, + store_t::const_pointer, + std::less>; + using type_map_t = std::multimap; + using result_set_t = std::set; + + void add_product(product_specification spec, identifier layer, identifier stage); + + // A failed lookup in the registry is an error (an input product hasn't been registered yet, + // so we can't determine the layer for a calculated output) + detail::full_product_spec const& lookup(product_query const& query) const; + + private: + store_t store_; + map_t plugin_map_; + map_t algorithm_map_; + map_t suffix_map_; + map_t layer_map_; + map_t stage_map_; + type_map_t type_map_; + }; +} + +#endif // PHLEX_CORE_PRODUCT_REGISTRY_HPP diff --git a/phlex/core/products_consumer.cpp b/phlex/core/products_consumer.cpp index 6c9a920ff..cd894950e 100644 --- a/phlex/core/products_consumer.cpp +++ b/phlex/core/products_consumer.cpp @@ -1,12 +1,21 @@ #include "phlex/core/products_consumer.hpp" +#include "phlex/core/product_registry.hpp" + +#include "spdlog/spdlog.h" namespace { - std::vector layers_from(phlex::product_queries const& queries) + using phlex::experimental::identifier; + std::vector layers_from(identifier const& plugin, + identifier const& algorithm, + phlex::product_queries const& queries, + phlex::experimental::product_registry const& registry) { - std::vector result; + spdlog::debug( + "Determining layers for {}:{} from input queries [{}]", plugin, algorithm, queries); + std::vector result; result.reserve(queries.size()); for (auto const& query : queries) { - result.push_back(query.layer); + result.push_back(registry.lookup(query).layer); } return result; } @@ -16,10 +25,11 @@ namespace phlex::experimental { products_consumer::products_consumer(algorithm_name name, std::vector predicates, - product_queries input_products) : + product_queries input_products, + product_registry const& registry) : consumer{std::move(name), std::move(predicates)}, input_products_{std::move(input_products)}, - layers_{layers_from(input_products_)} + registry_{®istry} { } @@ -33,5 +43,11 @@ namespace phlex::experimental { } product_queries const& products_consumer::input() const noexcept { return input_products_; } - std::vector const& products_consumer::layers() const noexcept { return layers_; } + std::vector const& products_consumer::layers() const noexcept + { + if (layers_.empty() && !input_products_.empty()) { + layers_ = layers_from(plugin(), algorithm(), input_products_, *registry_); + } + return layers_; + } } diff --git a/phlex/core/products_consumer.hpp b/phlex/core/products_consumer.hpp index 7c91ead4b..f67f6b48e 100644 --- a/phlex/core/products_consumer.hpp +++ b/phlex/core/products_consumer.hpp @@ -21,7 +21,8 @@ namespace phlex::experimental { public: products_consumer(algorithm_name name, std::vector predicates, - product_queries input_products); + product_queries input_products, + product_registry const& registry); virtual ~products_consumer(); @@ -46,7 +47,8 @@ namespace phlex::experimental { virtual tbb::flow::receiver& port_for(product_query const& input_product) = 0; product_queries input_products_; - std::vector layers_; + mutable std::vector layers_; + product_registry const* registry_; }; } diff --git a/phlex/core/registrar.hpp b/phlex/core/registrar.hpp index 655c8887f..01a6eee89 100644 --- a/phlex/core/registrar.hpp +++ b/phlex/core/registrar.hpp @@ -49,11 +49,13 @@ // // ======================================================================================= +#include "phlex/core/product_registry.hpp" #include "phlex/utilities/simple_ptr_map.hpp" #include #include #include +#include #include #include @@ -67,18 +69,21 @@ namespace phlex::experimental { template class registrar { using nodes = simple_ptr_map; - using node_creator = std::function, std::vector)>; + using node_creator = std::function, std::vector)>; public: - explicit registrar(nodes& node_map, std::vector& errors) : - nodes_{&node_map}, errors_{&errors} + explicit registrar(nodes& node_map, + std::vector& errors, + product_registry& registry) : + nodes_{&node_map}, errors_{&errors}, registry_{®istry} { } registrar(registrar const&) = delete; - registrar& operator=(registrar const&) = delete; registrar(registrar&&) = default; + registrar& operator=(registrar const&) = delete; registrar& operator=(registrar&&) = default; bool has_predicates() const { return predicates_.has_value(); } @@ -110,9 +115,48 @@ namespace phlex::experimental { void create_node(std::vector output_product_suffixes) { + using namespace phlex::experimental::literals; assert(creator_); - auto ptr = creator_(release_predicates(), std::move(output_product_suffixes)); + auto ptr = creator_(*registry_, release_predicates(), std::move(output_product_suffixes)); auto name = ptr->full_name(); + spdlog::debug("Creating node for {}", name); + // + // Register output products + // Providers have a different interface + if constexpr (requires { + { ptr->output_product() } -> std::same_as; + }) { + auto const& prod = ptr->output_product(); + // The product had better have it's creator and layer defined + assert(prod.creator && prod.layer); + registry_->add_product( + product_specification(algorithm_name::create(std::string_view(*prod.creator)), + prod.suffix.value_or(""_id), + prod.type), + ptr->layer(), + prod.stage.value_or(""_id)); + } + // Now deal with everything else that produces outputs + else if constexpr (requires { + { ptr->output() } -> std::same_as; + }) { + auto spec = ptr->output(); + // FIXME: Once we implement stage names this should be replaced with the current stage name + identifier stage = "current"_id; + identifier layer{}; + if constexpr (requires { ptr->layer(); }) { + layer = ptr->layer(); + } else { + std::set layer_set{ptr->layers()}; + if (layer_set.size() != 1) { + throw std::runtime_error( + "Transforms with inputs from more than one layer are unsupported!!"); + } + layer = *layer_set.begin(); + } + + registry_->add_product(spec, layer, stage); + } auto [_, inserted] = nodes_->try_emplace(name, std::move(ptr)); if (not inserted) { detail::add_to_error_messages(*errors_, name); @@ -124,6 +168,7 @@ namespace phlex::experimental { node_creator creator_{}; std::optional> predicates_; std::vector output_product_suffixes_{}; + product_registry* registry_; }; } diff --git a/phlex/core/registration_api.cpp b/phlex/core/registration_api.cpp index 8b28e89d6..cc04ffed5 100644 --- a/phlex/core/registration_api.cpp +++ b/phlex/core/registration_api.cpp @@ -18,7 +18,7 @@ namespace phlex::experimental { if (config) { reg_.set_predicates(detail::maybe_predicates(config)); } - reg_.set_creator([this](auto predicates, auto) { + reg_.set_creator([this](product_registry const& /* unused */, auto predicates, auto) { return std::make_unique( std::move(name_), concurrency_.value, std::move(predicates), graph_, std::move(ft_)); }); diff --git a/phlex/core/registration_api.hpp b/phlex/core/registration_api.hpp index e63632333..47caf67f4 100644 --- a/phlex/core/registration_api.hpp +++ b/phlex/core/registration_api.hpp @@ -55,27 +55,34 @@ namespace phlex::experimental { auto input_family(std::array input_args) { populate_types(input_args); + spdlog::debug("input_family: setting creator"); if constexpr (num_outputs == 0ull) { - registrar_.set_creator([this, inputs = std::move(input_args)]( - auto predicates, auto /* output_product_suffixes */) { - return std::make_unique(std::move(name_), - concurrency_.value, - std::move(predicates), - graph_, - std::move(alg_), - std::vector(inputs.begin(), inputs.end())); - }); + registrar_.set_creator( + [this, inputs = std::move(input_args)]( + product_registry const& registry, auto predicates, auto /* output_products */) { + spdlog::debug("In creator for {}", name_.full()); + return std::make_unique(std::move(name_), + concurrency_.value, + std::move(predicates), + graph_, + std::move(alg_), + std::vector(inputs.begin(), inputs.end()), + registry); + }); } else { registrar_.set_creator( - [this, inputs = std::move(input_args)](auto predicates, auto output_product_suffixes) { + [this, inputs = std::move(input_args)]( + product_registry const& registry, auto predicates, auto output_product_suffixes) { + spdlog::debug("In creator for {}", name_.full()); return std::make_unique(std::move(name_), concurrency_.value, std::move(predicates), graph_, std::move(alg_), std::vector(inputs.begin(), inputs.end()), - std::move(output_product_suffixes)); + std::move(output_product_suffixes), + registry); }); } return upstream_predicates{std::move(registrar_), config_}; @@ -140,11 +147,17 @@ namespace phlex::experimental { output.type = make_type_id(); - registrar_.set_creator([this, output = std::move(output)]( - auto /* predicates */, auto /* output_product_suffixes */) { - return std::make_unique( - std::move(name_), concurrency_.value, graph_, std::move(alg_), std::move(output)); - }); + registrar_.set_creator( + [this, output = std::move(output)](product_registry const& registry, + auto /* predicates */, + auto /* output_product_suffixes */) { + return std::make_unique(std::move(name_), + concurrency_.value, + graph_, + std::move(alg_), + std::move(output), + registry); + }); } private: @@ -193,7 +206,8 @@ namespace phlex::experimental { populate_types(input_args); registrar_.set_creator( - [this, inputs = std::move(input_args)](auto predicates, auto output_product_suffixes) { + [this, inputs = std::move(input_args)]( + product_registry const& registry, auto predicates, auto output_product_suffixes) { return std::make_unique>( std::move(name_), concurrency_.value, @@ -203,7 +217,8 @@ namespace phlex::experimental { std::move(init_), std::vector(inputs.begin(), inputs.end()), std::move(output_product_suffixes), - std::move(partition_)); + std::move(partition_), + registry); }); return upstream_predicates{std::move(registrar_), config_}; } @@ -268,19 +283,22 @@ namespace phlex::experimental { { populate_types(input_args); - registrar_.set_creator([this, inputs = std::move(input_args)](auto upstream_predicates, - auto output_product_suffixes) { - return std::make_unique>( - std::move(name_), - concurrency_, - std::move(upstream_predicates), - graph_, - std::move(predicate_), - std::move(unfold_), - std::vector(inputs.begin(), inputs.end()), - std::move(output_product_suffixes), - std::move(destination_layer_)); - }); + registrar_.set_creator( + [this, inputs = std::move(input_args)](product_registry const& registry, + auto upstream_predicates, + auto output_product_suffixes) { + return std::make_unique>( + std::move(name_), + concurrency_, + std::move(upstream_predicates), + graph_, + std::move(predicate_), + std::move(unfold_), + std::vector(inputs.begin(), inputs.end()), + std::move(output_product_suffixes), + std::move(destination_layer_), + registry); + }); return upstream_predicates{std::move(registrar_), config_}; } diff --git a/plugins/python/src/modulewrap.cpp b/plugins/python/src/modulewrap.cpp index a43f726b1..cdb8e8c2d 100644 --- a/plugins/python/src/modulewrap.cpp +++ b/plugins/python/src/modulewrap.cpp @@ -873,10 +873,11 @@ static PyObject* md_transform(py_phlex_module* mod, PyObject* args, PyObject* kw // TODO: it's not clear what the output layer will be if the input layers are not // all the same, so for now, simply raise an error if their is any ambiguity - auto output_layer = static_cast(input_queries[0].layer); + // FIXME: Just assuming the layer is there + auto output_layer = static_cast(*input_queries[0].layer); if (1 < input_queries.size()) { for (std::vector::size_type iq = 1; iq < input_queries.size(); ++iq) { - if (static_cast(input_queries[iq].layer) != output_layer) { + if (static_cast(*input_queries[iq].layer) != output_layer) { PyErr_Format(PyExc_ValueError, "transform %s output layer is ambiguous", cname.c_str()); Py_DECREF(callable); return nullptr;