@@ -8,13 +8,14 @@ use crate::dsl::exec_helpers::{
88 build_field_names, cell_data_to_json_map, distributed_directions_for_edge_schemas,
99 id_to_base58, project_fields, rrf_rerank_ids,
1010} ;
11- use crate :: dsl:: graph:: json:: { SubgraphEdge , SubgraphResult , SubgraphVertex } ;
11+ use crate :: dsl:: graph:: json:: { SubgraphResult , SubgraphVertex } ;
12+ use crate :: dsl:: graph:: path_materialize:: { expand_edge_report_key, materialize_expand_edges} ;
1213use crate :: dsl:: query:: bind:: { extract_root_seed_ids, BoundPredicate , BoundSubgraphQuery } ;
1314use crate :: graph:: edge:: EdgeType ;
1415use crate :: server:: schema:: GraphSchema ;
1516use crate :: server:: MorpheusRuntime ;
1617use crate :: traversal:: dist:: adjacency:: { AdjacencyConfig , WeightSpec } ;
17- use crate :: traversal:: expand:: distributed:: { ExpandJobOptions , ExpandRunConfig } ;
18+ use crate :: traversal:: expand:: distributed:: { ExpandEdge , ExpandJobOptions , ExpandRunConfig } ;
1819
1920pub async fn execute_subgraph (
2021 runtime : & Arc < MorpheusRuntime > ,
@@ -37,7 +38,7 @@ pub async fn execute_subgraph(
3738 // Also include the seed vertices themselves
3839 all_vertex_ids. extend ( seeds. iter ( ) . copied ( ) ) ;
3940
40- let mut all_edges: Vec < ( Id , Id , u32 , Option < Id > ) > = Vec :: new ( ) ;
41+ let mut all_edges: Vec < ExpandEdge > = Vec :: new ( ) ;
4142 let directions = distributed_directions_for_edge_schemas (
4243 runtime,
4344 & query. expand . edge_schema_ids ,
@@ -70,9 +71,7 @@ pub async fn execute_subgraph(
7071 . map_err ( |e| format ! ( "subgraph expand failed: {e}" ) ) ?;
7172
7273 all_vertex_ids. extend ( result. vertices . iter ( ) . copied ( ) ) ;
73- for e in result. edges {
74- all_edges. push ( ( e. from , e. to , e. schema_id , e. edge_id ) ) ;
75- }
74+ all_edges. extend ( result. edges ) ;
7675 }
7776
7877 // 3. Fetch vertex cells and serialize to SubgraphVertex
@@ -116,66 +115,22 @@ pub async fn execute_subgraph(
116115 // Deduplicate by canonical (from, to, schema_id).
117116 // For undirected schemas, (A,B) and (B,A) represent the same edge; normalise
118117 // them so that the smaller Id comes first before inserting into the seen set.
119- let mut seen_edges: HashSet < ( Id , Id , u32 ) > = HashSet :: new ( ) ;
118+ let mut seen_edges = HashSet :: new ( ) ;
120119 let deduped_edges: Vec < _ > = all_edges
121120 . into_iter ( )
122- . filter ( |( from , to , schema_id , _ ) | {
121+ . filter ( |edge | {
123122 let edge_type = runtime
124123 . schema_container ( )
125- . schema_type ( * schema_id)
124+ . schema_type ( edge . schema_id )
126125 . and_then ( |gs| match gs {
127126 GraphSchema :: Edge ( attrs) => Some ( attrs. edge_type ) ,
128127 _ => None ,
129128 } ) ;
130- let key = match edge_type {
131- Some ( EdgeType :: Undirected ) => {
132- let ( a, b) = if from. higher < to. higher
133- || ( from. higher == to. higher && from. lower <= to. lower )
134- {
135- ( * from, * to)
136- } else {
137- ( * to, * from)
138- } ;
139- ( a, b, * schema_id)
140- }
141- _ => ( * from, * to, * schema_id) ,
142- } ;
143- seen_edges. insert ( key)
129+ seen_edges. insert ( expand_edge_report_key ( edge, edge_type) )
144130 } )
145131 . collect ( ) ;
146- let mut edges = Vec :: new ( ) ;
147- for ( from, to, schema_id, edge_body_id) in deduped_edges {
148- let schema_name = runtime
149- . schema_container ( )
150- . get_neb_schema ( schema_id)
151- . map ( |s| s. name . clone ( ) )
152- . unwrap_or_default ( ) ;
153-
154- let fields = if let Some ( cell_id) = edge_body_id {
155- match runtime. neb_client ( ) . read_cell ( cell_id) . await {
156- Ok ( Ok ( cell) ) => {
157- let field_names = runtime
158- . schema_container ( )
159- . get_neb_schema ( cell. header . schema )
160- . map ( |s| build_field_names ( & s. fields ) ) ;
161- project_fields (
162- cell_data_to_json_map ( & cell. data , field_names. as_ref ( ) ) ,
163- query. edge_select . as_deref ( ) ,
164- )
165- }
166- _ => serde_json:: Map :: new ( ) ,
167- }
168- } else {
169- serde_json:: Map :: new ( )
170- } ;
171-
172- edges. push ( SubgraphEdge {
173- from : id_to_base58 ( & from) ,
174- to : id_to_base58 ( & to) ,
175- schema : schema_name,
176- fields,
177- } ) ;
178- }
132+ let edges = materialize_expand_edges ( runtime, & deduped_edges, query. edge_select . as_deref ( ) )
133+ . await ?;
179134
180135 Ok ( SubgraphResult { vertices, edges } )
181136}
0 commit comments