@@ -102,12 +102,11 @@ public void setMergeInfo(SortInfo info) {
102102
103103 @ Override
104104 protected void toThrift (TPlanNode msg ) {
105- // Exchange serial status should depend only on isSerialOperator() (UNPARTITIONED or
106- // use_serial_exchange), NOT on fragment.hasSerialScanNode(). With FE-planned local
107- // exchanges, serial scans are already handled by PASSTHROUGH fan-out. Making
108- // non-UNPARTITIONED exchanges (e.g. BUCKET_SHUFFLE) serial would reduce the build-side
109- // pipeline to num_tasks=1, preventing shared state propagation to other instances.
110- msg .setIsSerialOperator (isSerialOperator ()
105+ // BE local shuffle only supports two modes: 1 instance or all instances receiving data.
106+ // hasSerialScanNode() ensures Exchange is serial when sibling scan is pooling,
107+ // avoiding a middle-ground where some but not all instances receive data
108+ // (EOS count mismatch → hang).
109+ msg .setIsSerialOperator ((isSerialOperator () || fragment .hasSerialScanNode ())
111110 && fragment .useSerialSource (ConnectContext .get ()));
112111 msg .node_type = TPlanNodeType .EXCHANGE_NODE ;
113112 msg .exchange_node = new TExchangeNode ();
@@ -182,7 +181,9 @@ public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange(PlanTrans
182181 // Without useSerialSource() check, we'd insert PASSTHROUGH in non-pooling fragments
183182 // where the exchange has N tasks, corrupting broadcast join data distribution
184183 // (PASSTHROUGH round-robin splits complete-dataset sinks into 1/N subsets per source).
185- boolean willBeSerialOnBe = isSerialOperator ()
184+ // Must match toThrift: include hasSerialScanNode() — when sibling scan is pooling,
185+ // Exchange becomes serial on BE even if Exchange itself isn't inherently serial.
186+ boolean willBeSerialOnBe = (isSerialOperator () || fragment .hasSerialScanNode ())
186187 && fragment != null
187188 && fragment .useSerialSource (ConnectContext .get ());
188189 if (willBeSerialOnBe ) {
@@ -195,23 +196,35 @@ public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange(PlanTrans
195196 if (translatorContext .hasSerialAncestorInPipeline (this )) {
196197 return Pair .of (this , LocalExchangeType .NOOP );
197198 }
198- // Serial exchange → 1 task. Must fan out to N tasks for downstream operators.
199- // For HASH/BUCKET exchanges: return NOOP and let parent insert the appropriate
200- // redistribution (HASH_SHUFFLE or BUCKET_HASH_SHUFFLE). PASSTHROUGH round-robin
201- // would corrupt the hash/bucket distribution, and PASS_TO_ONE doesn't work for
202- // BUCKET_SHUFFLE joins (no shared hash table mechanism unlike BROADCAST).
203- // The heavy-ops bottleneck avoidance in enforceChildExchange() will automatically
204- // insert a PASSTHROUGH fan-out before the hash/bucket shuffle if needed.
199+ // Serial HASH/BUCKET exchange:
200+ // In pooling fragments, return NOOP so parent inserts hash/bucket LE with
201+ // PASSTHROUGH fan-out (heavy-ops avoidance). Serial exchange has 1 task,
202+ // LE fans out to _num_instances tasks.
203+ // In non-pooling fragments, report the actual distribution type so parent's
204+ // require is satisfied without inserting LE. The serial exchange reduces
205+ // pipeline num_tasks to 1, matching BE-native behavior. Inserting LE in
206+ // non-pooling fragments creates a pipeline split where downstream has
207+ // _num_instances tasks but only 1 sender, causing shared-state mismatch.
205208 if (partitionType == TPartitionType .HASH_PARTITIONED
206209 || partitionType == TPartitionType .BUCKET_SHFFULE_HASH_PARTITIONED ) {
207- return Pair .of (this , LocalExchangeType .NOOP );
210+ if (translatorContext .isLocalShuffleFragment ()) {
211+ return Pair .of (this , LocalExchangeType .NOOP );
212+ }
213+ LocalExchangeType outputType = partitionType == TPartitionType .HASH_PARTITIONED
214+ ? LocalExchangeType .GLOBAL_EXECUTION_HASH_SHUFFLE
215+ : LocalExchangeType .BUCKET_HASH_SHUFFLE ;
216+ return Pair .of (this , outputType );
217+ }
218+ // For UNPARTITIONED (broadcast): in pooling fragments, PASSTHROUGH fan-out is
219+ // needed because the exchange has 1 task but downstream needs N tasks.
220+ // In non-pooling fragments, don't insert PASSTHROUGH — BE-native handles
221+ // this via _plan_local_exchange which checks serial operators in the pipeline.
222+ if (translatorContext .isLocalShuffleFragment ()) {
223+ PlanNode pt = new LocalExchangeNode (translatorContext .nextPlanNodeId (),
224+ this , LocalExchangeType .PASSTHROUGH , null );
225+ return Pair .of (pt , LocalExchangeType .PASSTHROUGH );
208226 }
209- // For UNPARTITIONED (broadcast): PASSTHROUGH fan-out is safe because the
210- // exchange has the complete dataset. Parent nodes (HashJoin) may add PASS_TO_ONE
211- // on top for single-builder semantics (broadcast join build side).
212- PlanNode pt = new LocalExchangeNode (translatorContext .nextPlanNodeId (),
213- this , LocalExchangeType .PASSTHROUGH , null );
214- return Pair .of (pt , LocalExchangeType .PASSTHROUGH );
227+ return Pair .of (this , LocalExchangeType .NOOP );
215228 } else if (partitionType == TPartitionType .HASH_PARTITIONED ) {
216229 return Pair .of (this , LocalExchangeType .GLOBAL_EXECUTION_HASH_SHUFFLE );
217230 } else if (partitionType == TPartitionType .BUCKET_SHFFULE_HASH_PARTITIONED ) {
0 commit comments