Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion phlex/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ cet_make_library(
message.cpp
node_catalog.cpp
index_router.cpp
product_query.cpp
product_registry.cpp
products_consumer.cpp
registrar.cpp
registration_api.cpp
product_query.cpp
store_counters.cpp
LIBRARIES
PUBLIC
Expand Down Expand Up @@ -62,6 +63,7 @@ install(
multilayer_join_node.hpp
index_router.hpp
node_catalog.hpp
product_registry.hpp
product_query.hpp
products_consumer.hpp
registrar.hpp
Expand Down
5 changes: 3 additions & 2 deletions phlex/core/declared_fold.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
namespace phlex::experimental {
declared_fold::declared_fold(algorithm_name name,
std::vector<std::string> predicates,
product_queries input_products) :
products_consumer{std::move(name), std::move(predicates), std::move(input_products)}
product_queries input_products,
product_registry const& registry) :
products_consumer{std::move(name), std::move(predicates), std::move(input_products), registry}
{
}

Expand Down
29 changes: 17 additions & 12 deletions phlex/core/declared_fold.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@ namespace phlex::experimental {
public:
declared_fold(algorithm_name name,
std::vector<std::string> predicates,
product_queries input_products);
product_queries input_products,
product_registry const& registry);
virtual ~declared_fold();

virtual tbb::flow::sender<message>& output_port() = 0;
virtual tbb::flow::receiver<flush_message>& flush_port() = 0;
virtual product_specifications const& output() const = 0;
virtual identifier const& layer() const = 0;
virtual std::size_t product_count() const = 0;
};

Expand Down Expand Up @@ -73,8 +75,9 @@ namespace phlex::experimental {
InitTuple initializer,
product_queries input_products,
std::vector<std::string> output,
std::string partition) :
declared_fold{std::move(name), std::move(predicates), std::move(input_products)},
std::string partition,
product_registry const& registry) :
declared_fold{std::move(name), std::move(predicates), std::move(input_products), registry},
initializer_{std::move(initializer)},
output_{
to_product_specifications(full_name(), std::move(output), make_type_ids<result_type>())},
Expand Down Expand Up @@ -122,6 +125,17 @@ namespace phlex::experimental {
}
}

product_specifications const& output() const override { return output_; }
identifier const& layer() const override { return partition_; }

tbb::flow::receiver<message>& port_for(product_query const& input_product) override
{
return receiver_for<num_inputs>(join_, input(), input_product, fold_);
}

tbb::flow::receiver<flush_message>& flush_port() override { return flush_receiver_; }
tbb::flow::sender<message>& output_port() override { return tbb::flow::output_port<0>(fold_); }

private:
void emit_and_evict_if_done(data_cell_index_ptr const& fold_index)
{
Expand All @@ -134,20 +148,11 @@ namespace phlex::experimental {
}
}

tbb::flow::receiver<message>& port_for(product_query const& input_product) override
{
return receiver_for<num_inputs>(join_, input(), input_product, fold_);
}

std::vector<tbb::flow::receiver<message>*> ports() override
{
return input_ports<num_inputs>(join_, fold_);
}

tbb::flow::receiver<flush_message>& flush_port() override { return flush_receiver_; }
tbb::flow::sender<message>& output_port() override { return tbb::flow::output_port<0>(fold_); }
product_specifications const& output() const override { return output_; }

template <std::size_t... Is>
void call(function_t const& ft,
messages_t<num_inputs> const& messages,
Expand Down
5 changes: 3 additions & 2 deletions phlex/core/declared_observer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
namespace phlex::experimental {
declared_observer::declared_observer(algorithm_name name,
std::vector<std::string> predicates,
product_queries input_products) :
products_consumer{std::move(name), std::move(predicates), std::move(input_products)}
product_queries input_products,
product_registry const& registry) :
products_consumer{std::move(name), std::move(predicates), std::move(input_products), registry}
{
}

Expand Down
9 changes: 6 additions & 3 deletions phlex/core/declared_observer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ namespace phlex::experimental {
public:
declared_observer(algorithm_name name,
std::vector<std::string> predicates,
product_queries input_products);
product_queries input_products,
product_registry const& registry);
virtual ~declared_observer();
};

Expand All @@ -60,8 +61,10 @@ namespace phlex::experimental {
std::vector<std::string> predicates,
tbb::flow::graph& g,
AlgorithmBits alg,
product_queries input_products) :
declared_observer{std::move(name), std::move(predicates), std::move(input_products)},
product_queries input_products,
product_registry const& registry) :
declared_observer{
std::move(name), std::move(predicates), std::move(input_products), registry},
join_{make_join_or_none<num_inputs>(g, full_name(), layers())},
observer_{g,
concurrency,
Expand Down
5 changes: 3 additions & 2 deletions phlex/core/declared_predicate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
namespace phlex::experimental {
declared_predicate::declared_predicate(algorithm_name name,
std::vector<std::string> predicates,
product_queries input_products) :
products_consumer{std::move(name), std::move(predicates), std::move(input_products)}
product_queries input_products,
product_registry const& registry) :
products_consumer{std::move(name), std::move(predicates), std::move(input_products), registry}
{
}

Expand Down
9 changes: 6 additions & 3 deletions phlex/core/declared_predicate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ namespace phlex::experimental {
public:
declared_predicate(algorithm_name name,
std::vector<std::string> predicates,
product_queries input_products);
product_queries input_products,
product_registry const& registry);
virtual ~declared_predicate();

virtual tbb::flow::sender<predicate_result>& sender() = 0;
Expand All @@ -64,8 +65,10 @@ namespace phlex::experimental {
std::vector<std::string> predicates,
tbb::flow::graph& g,
AlgorithmBits alg,
product_queries input_products) :
declared_predicate{std::move(name), std::move(predicates), std::move(input_products)},
product_queries input_products,
product_registry const& registry) :
declared_predicate{
std::move(name), std::move(predicates), std::move(input_products), registry},
join_{make_join_or_none<num_inputs>(g, full_name(), layers())},
predicate_{g,
concurrency,
Expand Down
2 changes: 1 addition & 1 deletion phlex/core/declared_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ namespace phlex::experimental {
return output_product_;
}

identifier const& declared_provider::layer() const noexcept { return output_product_.layer; }
identifier const& declared_provider::layer() const noexcept { return *output_product_.layer; }
}
5 changes: 3 additions & 2 deletions phlex/core/declared_provider.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,10 @@ namespace phlex::experimental {
std::size_t concurrency,
tbb::flow::graph& g,
AlgorithmBits alg,
product_query output) :
product_query output,
product_registry const& /* unused */) :
declared_provider{std::move(name), output},
output_{algorithm_name::create(std::string_view(identifier(output.creator))),
output_{algorithm_name::create(std::string_view(*output.creator)),
output.suffix.value_or(identifier("")),
output.type},
provider_{g,
Expand Down
5 changes: 3 additions & 2 deletions phlex/core/declared_transform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
namespace phlex::experimental {
declared_transform::declared_transform(algorithm_name name,
std::vector<std::string> predicates,
product_queries input_products) :
products_consumer{std::move(name), std::move(predicates), std::move(input_products)}
product_queries input_products,
product_registry const& registry) :
products_consumer{std::move(name), std::move(predicates), std::move(input_products), registry}
{
}

Expand Down
12 changes: 8 additions & 4 deletions phlex/core/declared_transform.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ namespace phlex::experimental {
public:
declared_transform(algorithm_name name,
std::vector<std::string> predicates,
product_queries input_products);
product_queries input_products,
product_registry const& registry);
virtual ~declared_transform();

virtual tbb::flow::sender<message>& output_port() = 0;
Expand Down Expand Up @@ -73,8 +74,10 @@ namespace phlex::experimental {
tbb::flow::graph& g,
AlgorithmBits alg,
product_queries input_products,
std::vector<std::string> output) :
declared_transform{std::move(name), std::move(predicates), std::move(input_products)},
std::vector<std::string> output,
product_registry const& registry) :
declared_transform{
std::move(name), std::move(predicates), std::move(input_products), registry},
output_{to_product_specifications(
full_name(), std::move(output), make_output_type_ids<function_t>())},
join_{make_join_or_none<num_inputs>(g, full_name(), layers())},
Expand Down Expand Up @@ -102,6 +105,8 @@ namespace phlex::experimental {
}
}

product_specifications const& output() const override { return output_; }

private:
tbb::flow::receiver<message>& port_for(product_query const& input_product) override
{
Expand All @@ -117,7 +122,6 @@ namespace phlex::experimental {
{
return tbb::flow::output_port<0>(transform_);
}
product_specifications const& output() const override { return output_; }

template <std::size_t... Is>
auto call(function_t const& ft,
Expand Down
5 changes: 3 additions & 2 deletions phlex/core/declared_unfold.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ namespace phlex::experimental {
declared_unfold::declared_unfold(algorithm_name name,
std::vector<std::string> predicates,
product_queries input_products,
std::string child_layer) :
products_consumer{std::move(name), std::move(predicates), std::move(input_products)},
std::string child_layer,
product_registry const& registry) :
products_consumer{std::move(name), std::move(predicates), std::move(input_products), registry},
child_layer_{std::move(child_layer)}
{
}
Expand Down
19 changes: 12 additions & 7 deletions phlex/core/declared_unfold.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ namespace phlex::experimental {
declared_unfold(algorithm_name name,
std::vector<std::string> predicates,
product_queries input_products,
std::string child_layer);
std::string child_layer,
product_registry const& registry);
virtual ~declared_unfold();

virtual tbb::flow::sender<message>& output_port() = 0;
Expand All @@ -69,6 +70,8 @@ namespace phlex::experimental {
virtual std::size_t product_count() const = 0;
virtual flusher_t& flusher() = 0;

// Looks weird but this reduces the amount of special code in product registration
identifier layer() const noexcept { return identifier(child_layer_); }
std::string const& child_layer() const noexcept { return child_layer_; }

private:
Expand All @@ -95,11 +98,13 @@ namespace phlex::experimental {
Unfold&& unfold,
product_queries input_products,
std::vector<std::string> output_product_suffixes,
std::string child_layer_name) :
std::string child_layer_name,
product_registry const& registry) :
declared_unfold{std::move(name),
std::move(predicates),
std::move(input_products),
std::move(child_layer_name)},
std::move(child_layer_name),
registry},
output_{to_product_specifications(full_name(),
std::move(output_product_suffixes),
make_type_ids<skip_first_type<return_type<Unfold>>>())},
Expand Down Expand Up @@ -131,10 +136,6 @@ namespace phlex::experimental {
{
return receiver_for<num_inputs>(join_, input(), input_product, unfold_);
}
std::vector<tbb::flow::receiver<message>*> ports() override
{
return input_ports<num_inputs>(join_, unfold_);
}

tbb::flow::sender<message>& output_port() override
{
Expand All @@ -146,6 +147,10 @@ namespace phlex::experimental {
}
product_specifications const& output() const override { return output_; }
flusher_t& flusher() override { return flusher_; }
std::vector<tbb::flow::receiver<message>*> ports() override
{
return input_ports<num_inputs>(join_, unfold_);
}

template <std::size_t... Is>
void call(Predicate const& predicate,
Expand Down
6 changes: 5 additions & 1 deletion phlex/core/framework_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,11 @@ namespace phlex::experimental {
std::set<flusher_t*> flushers;
// For providers
for (product_query const& pq : n->input()) {
if (auto it = unfold_flushers.find(pq.layer); it != unfold_flushers.end()) {
if (!pq.layer) {
spdlog::error("The input product query {} has no layer specified", pq);
continue;
}
if (auto it = unfold_flushers.find(*pq.layer); it != unfold_flushers.end()) {
flushers.insert(it->second);
} else {
flushers.insert(&index_router_.flusher());
Expand Down
1 change: 1 addition & 0 deletions phlex/core/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ namespace phlex::experimental {
struct message;
class index_router;
class products_consumer;
class product_registry;

using flusher_t = tbb::flow::broadcast_node<flush_message>;
}
Expand Down
8 changes: 6 additions & 2 deletions phlex/core/index_router.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,12 @@ namespace phlex::experimental {

// Create the index-set broadcast nodes for providers
for (auto& [pq, provider_port] : provider_input_ports_ | std::views::values) {
auto [it, _] = broadcasters_.try_emplace(static_cast<identifier const&>(pq.layer),
std::make_shared<detail::index_set_node>(g));
if (!pq.layer) {
spdlog::error("The input product query {} has not layer specified", pq);
continue;
}
auto [it, _] =
broadcasters_.try_emplace(*pq.layer, std::make_shared<detail::index_set_node>(g));
make_edge(*it->second, *provider_port);
}

Expand Down
5 changes: 4 additions & 1 deletion phlex/core/node_catalog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "phlex/core/declared_provider.hpp"
#include "phlex/core/declared_transform.hpp"
#include "phlex/core/declared_unfold.hpp"
#include "phlex/core/product_registry.hpp"
#include "phlex/core/registrar.hpp"
#include "phlex/utilities/simple_ptr_map.hpp"

Expand All @@ -23,7 +24,7 @@ namespace phlex::experimental {
template <typename Ptr>
auto registrar_for(std::vector<std::string>& errors)
{
return registrar{boost::pfr::get<simple_ptr_map<Ptr>>(*this), errors};
return registrar{boost::pfr::get<simple_ptr_map<Ptr>>(*this), errors, registry};
}

std::size_t execution_count(std::string const& node_name) const;
Expand All @@ -35,6 +36,8 @@ namespace phlex::experimental {
simple_ptr_map<declared_unfold_ptr> unfolds{};
simple_ptr_map<declared_transform_ptr> transforms{};
simple_ptr_map<declared_provider_ptr> providers{};

product_registry registry{};
};
}

Expand Down
Loading
Loading