Skip to content

Support multi-layer consumer nodes with input data products from unfold nodes #550

@knoepfel

Description

@knoepfel

It is currently not possible to have a multi-layer (e.g.) transform where one of the data products comes from an unfold. For example, consider the following minimal reproducer of a provider + unfold + multi-layer transform:

  experimental::layer_generator gen;
  gen.add_layer("event", {"job", 2u});

  experimental::framework_graph g{driver_for_test(gen)};

  // Provide maximum number to be used by 'iota' unfold
  g.provide("provide_max_number", provide_max_number, concurrency::unlimited)
    .output_product(product_query{.creator = "input", .layer = "event", .suffix = "max_number"});

  // Unfold from 0 to that maximum number (exclusive)
  g.unfold<iota>("iota", &iota::predicate, &iota::unfold, concurrency::unlimited, "subevent")
    .input_family(product_query{.creator = "input", .layer = "event", .suffix = "max_number"})
    .output_product_suffixes("new_number");

  // Read the new_number from the unfold and max_number from the event 
  g.transform("add_max_and_new", [](unsigned int i, unsigned int j) { return i + j; }, concurrency::unlimited)
    .input_family(
      product_query{.creator = "iota", .layer = "subevent", .suffix = "new_number"},
      product_query{.creator = "input", .layer = "event", .suffix = "max_number"})
    .output_product_suffixes("result");

  g.execute();

  // event 0: max_number=10, new_number in [0,9]  -> 10 executions
  // event 1: max_number=20, new_number in [0,19] -> 20 executions
  CHECK(g.execution_count("iota") == 2u;
  CHECK(g.execution_count("add_max_and_new") == 30u);

When running this job, one sees:

/Users/kyleknoepfel/work/phlex-devel/srcs/phlex/test/unfold.cpp:184: FAILED:
  CHECK( g.execution_count("add_max_and_new") == 30u )
with expansion:
  0 == 30

[2026-04-23 11:54:31.581] [warning] [add_max_and_new/event] Cached messages: 2
[2026-04-23 11:54:31.581] [warning] [add_max_and_new/event]   Product for [event:1]
[2026-04-23 11:54:31.581] [warning] [add_max_and_new/event]   Product for [event:0]
[2026-04-23 11:54:31.581] [warning] [add_max_and_new/subevent] Cached messages: 30
[2026-04-23 11:54:31.581] [warning] [add_max_and_new/subevent]   Product for [event:1, subevent:16]
[2026-04-23 11:54:31.581] [warning] [add_max_and_new/subevent]   Product for [event:1, subevent:4]
[2026-04-23 11:54:31.581] [warning] [add_max_and_new/subevent]   Product for [event:1, subevent:2]
[2026-04-23 11:54:31.581] [warning] [add_max_and_new/subevent]   Product for [event:1, subevent:3]
[2026-04-23 11:54:31.581] [warning] [add_max_and_new/subevent]   Product for [event:0, subevent:0]
[2026-04-23 11:54:31.581] [warning] [add_max_and_new/subevent]   Product for [event:1, subevent:10]
[2026-04-23 11:54:31.581] [warning] [add_max_and_new/subevent]   Product for [event:1, subevent:0]
[2026-04-23 11:54:31.581] [warning] [add_max_and_new/subevent]   Product for [event:0, subevent:1]
[2026-04-23 11:54:31.581] [warning] [add_max_and_new/subevent]   Product for [event:0, subevent:5]
[2026-04-23 11:54:31.581] [warning] [add_max_and_new/subevent]   Product for [event:1, subevent:7]
[2026-04-23 11:54:31.581] [warning] [add_max_and_new/subevent]   Product for [event:1, subevent:6]
[2026-04-23 11:54:31.581] [warning] [add_max_and_new/subevent]   Product for [event:0, subevent:3]
[2026-04-23 11:54:31.581] [warning] [add_max_and_new/subevent]   Product for [event:1, subevent:13]
[2026-04-23 11:54:31.581] [warning] [add_max_and_new/subevent]   Product for [event:0, subevent:9]
[2026-04-23 11:54:31.581] [warning] [add_max_and_new/subevent]   Product for [event:1, subevent:14]
[2026-04-23 11:54:31.581] [warning] [add_max_and_new/subevent]   Product for [event:1, subevent:9]
[2026-04-23 11:54:31.581] [warning] [add_max_and_new/subevent]   Product for [event:0, subevent:8]
[2026-04-23 11:54:31.581] [warning] [add_max_and_new/subevent]   Product for [event:0, subevent:4]
[2026-04-23 11:54:31.581] [warning] [add_max_and_new/subevent]   Product for [event:0, subevent:2]
[2026-04-23 11:54:31.581] [warning] [add_max_and_new/subevent]   Product for [event:1, subevent:17]
[2026-04-23 11:54:31.581] [warning] [add_max_and_new/subevent]   Product for [event:1, subevent:5]
[2026-04-23 11:54:31.581] [warning] [add_max_and_new/subevent]   Product for [event:1, subevent:8]
[2026-04-23 11:54:31.581] [warning] [add_max_and_new/subevent]   Product for [event:1, subevent:1]
[2026-04-23 11:54:31.581] [warning] [add_max_and_new/subevent]   Product for [event:0, subevent:7]
[2026-04-23 11:54:31.581] [warning] [add_max_and_new/subevent]   Product for [event:1, subevent:18]
[2026-04-23 11:54:31.581] [warning] [add_max_and_new/subevent]   Product for [event:1, subevent:15]
[2026-04-23 11:54:31.581] [warning] [add_max_and_new/subevent]   Product for [event:1, subevent:11]
[2026-04-23 11:54:31.581] [warning] [add_max_and_new/subevent]   Product for [event:1, subevent:19]
[2026-04-23 11:54:31.581] [warning] [add_max_and_new/subevent]   Product for [event:0, subevent:6]
[2026-04-23 11:54:31.581] [warning] [add_max_and_new/subevent]   Product for [event:1, subevent:12]
[2026-04-23 11:54:31.581] [info] 

Seen layers:

  job
   │ 
   └ event: 2
      │ 
      └ subevent: 30

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions