Skip to content

Commit a2bfd92

Browse files
committed
[refactor](local shuffle) Refactor FE local exchange planning: extract helper, fix root PASSTHROUGH, fix ScanNode output
1. Extract duplicated heavy-ops bottleneck avoidance code from enforceChild/ enforceChildExchange/forceEnforceChildExchange into insertLocalExchange(). 2. Fix root-level PASSTHROUGH insertion: only check newRoot.isSerialOperator() instead of recursive hasSerialChildren() — after enforceAndDeriveLocalExchange(), serial children are already handled by LocalExchangeNodes inserted during tree walk. This fixes 6 FE/BE consistency mismatches where FE inserted ~2x PASSTHROUGH vs BE. 3. Fix ScanNode base class: return NOOP instead of BUCKET_HASH_SHUFFLE (only OlapScanNode should report bucket distribution for non-pooling scans). 4. Remove ExchangeNode.toThrift() dependency on hasSerialScanNode() — with FE-planned local exchanges, serial scans are handled by PASSTHROUGH fan-out. 5. Fix DistributePlanner: BUCKET_SHUFFLE with local shuffle returns all instances (not just first-per-worker); use assignedJoinBucketIndexes for bucket mapping. 6. Cap parallelism in UnassignedScanBucketOlapTableJob when fragment has non-serial exchange to avoid padding instances that hang. 7. Remove dead DataStreamSink.getLocalExchangeTypeRequire() override. 8. Inline shouldUseLocalExecutionHash() constant. 9. Fix test Bug 18/19: use col_int_undef_signed (existing column) instead of col_varchar_10__undef_signed (non-existent).
1 parent 6e225bb commit a2bfd92

9 files changed

Lines changed: 123 additions & 115 deletions

File tree

fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.doris.nereids.trees.plans.distribute.worker.job.BucketScanSource;
3232
import org.apache.doris.nereids.trees.plans.distribute.worker.job.DefaultScanSource;
3333
import org.apache.doris.nereids.trees.plans.distribute.worker.job.LocalShuffleAssignedJob;
34+
import org.apache.doris.nereids.trees.plans.distribute.worker.job.LocalShuffleBucketJoinAssignedJob;
3435
import org.apache.doris.nereids.trees.plans.distribute.worker.job.StaticAssignedJob;
3536
import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJob;
3637
import org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedJobBuilder;
@@ -244,6 +245,13 @@ private List<AssignedJob> filterInstancesWhichCanReceiveDataFromRemote(
244245
boolean useLocalShuffle = receiverPlan.getInstanceJobs().stream()
245246
.anyMatch(LocalShuffleAssignedJob.class::isInstance);
246247
if (useLocalShuffle) {
248+
// For BUCKET_SHUFFLE with local shuffle, each instance handles specific buckets.
249+
// The exchange must route data to each instance individually, not just the first
250+
// per worker. Otherwise instances 1..N-1 create ExchangeSource tasks that never
251+
// receive data or EOS, causing a hang.
252+
if (linkNode.getPartitionType() == TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED) {
253+
return receiverPlan.getInstanceJobs();
254+
}
247255
return getFirstInstancePerWorker(receiverPlan.getInstanceJobs());
248256
} else if (enableShareHashTableForBroadcastJoin && linkNode.isRightChildOfBroadcastHashJoin()) {
249257
return getFirstInstancePerWorker(receiverPlan.getInstanceJobs());
@@ -256,8 +264,20 @@ private List<AssignedJob> sortDestinationInstancesByBuckets(
256264
PipelineDistributedPlan plan, List<AssignedJob> unsorted, int bucketNum) {
257265
AssignedJob[] instances = new AssignedJob[bucketNum];
258266
for (AssignedJob instanceJob : unsorted) {
259-
BucketScanSource bucketScanSource = (BucketScanSource) instanceJob.getScanSource();
260-
for (Integer bucketIndex : bucketScanSource.bucketIndexToScanNodeToTablets.keySet()) {
267+
// For LocalShuffleBucketJoinAssignedJob, use assignedJoinBucketIndexes which
268+
// gives the specific join bucket(s) this instance handles. The scanSource
269+
// buckets include ALL buckets on this worker (shared via pooling scan),
270+
// which would cause collisions when multiple instances on the same worker
271+
// each handle different join buckets.
272+
Iterable<Integer> bucketIndices;
273+
if (instanceJob instanceof LocalShuffleBucketJoinAssignedJob) {
274+
bucketIndices = ((LocalShuffleBucketJoinAssignedJob) instanceJob)
275+
.getAssignedJoinBucketIndexes();
276+
} else {
277+
BucketScanSource bucketScanSource = (BucketScanSource) instanceJob.getScanSource();
278+
bucketIndices = bucketScanSource.bucketIndexToScanNodeToTablets.keySet();
279+
}
280+
for (Integer bucketIndex : bucketIndices) {
261281
if (instances[bucketIndex] != null) {
262282
throw new IllegalStateException(
263283
"Multi instances scan same buckets: " + instances[bucketIndex] + " and " + instanceJob

fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -539,6 +539,27 @@ protected int degreeOfParallelism(int maxParallel, boolean useLocalShuffleToAddP
539539
}
540540

541541
int maxParallelism = (int) Math.max(tabletNum, fragment.getParallelExecNum());
542-
return Math.min(maxParallelism, colocateMaxParallelNum);
542+
int result = Math.min(maxParallelism, colocateMaxParallelNum);
543+
544+
// When the fragment has a non-serial exchange, the BE won't insert local exchange
545+
// for the exchange pipeline (distribution matches BUCKET_HASH_SHUFFLE). Instances
546+
// beyond per-BE bucket count are "padding" with no bucket assignment — they create
547+
// VDataStreamRecvrs that never receive data and hang. Cap at per-BE bucket count.
548+
if (useLocalShuffleToAddParallel && hasNonSerialExchangeInFragment()) {
549+
result = Math.min(result, maxParallel);
550+
}
551+
552+
return result;
553+
}
554+
555+
private boolean hasNonSerialExchangeInFragment() {
556+
List<ExchangeNode> exchanges = fragment.getPlanRoot()
557+
.collectInCurrentFragment(ExchangeNode.class::isInstance);
558+
for (ExchangeNode exchange : exchanges) {
559+
if (!exchange.isSerialOperator()) {
560+
return true;
561+
}
562+
}
563+
return false;
543564
}
544565
}

fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -70,17 +70,16 @@ private void addLocalExchangeForFragment(PlanFragment fragment, PlanTranslatorCo
7070
Pair<PlanNode, LocalExchangeType> output = root
7171
.enforceAndDeriveLocalExchange(context, null, require);
7272
PlanNode newRoot = output.first;
73-
// Mirror BE OperatorBase base class required_data_distribution():
74-
// when any operator in the fragment plan is serial AND the fragment uses pooling scan
75-
// (LocalShuffleAssignedJob → ignoreDataDistribution → _parallel_instances=1),
76-
// serial operators reduce pipeline num_tasks to 1, and this propagates via
77-
// add_pipeline() to all downstream pipelines (including the DataStreamSink pipeline).
78-
// Only instance 0 creates tasks and sends EOS. Downstream receivers expect
79-
// _num_instances EOSes → hang.
73+
// The DataStreamSink runs in the same pipeline as the fragment root.
74+
// If the root is serial in a pooling-scan fragment, the sink pipeline has 1 task
75+
// and only instance 0 sends EOS → downstream receivers hang.
8076
// Insert PASSTHROUGH fan-out to create _num_instances sink tasks.
81-
// Non-pooling fragments (regular bucket distribution) have _num_instances ==
82-
// bucket_count and every instance gets a scan range, so all sink tasks run.
83-
if (isLocalShuffle && (newRoot.isSerialOperator() || newRoot.hasSerialChildren())) {
77+
//
78+
// Only check the root node itself (not recursive hasSerialChildren), because
79+
// after enforceAndDeriveLocalExchange(), any serial children deeper in the tree
80+
// are already handled by LocalExchangeNodes inserted during the tree walk.
81+
// Those LocalExchangeNodes create pipeline boundaries with _num_instances tasks.
82+
if (isLocalShuffle && newRoot.isSerialOperator()) {
8483
newRoot = new LocalExchangeNode(context.nextPlanNodeId(), newRoot,
8584
LocalExchangeType.PASSTHROUGH, null);
8685
}
@@ -116,22 +115,17 @@ public static boolean isColocated(PlanNode plan) {
116115

117116
public static LocalExchangeType resolveExchangeType(LocalExchangeTypeRequire require,
118117
PlanTranslatorContext translatorContext, PlanNode parent, PlanNode child) {
119-
// Only generic RequireHash adapts to LOCAL_EXECUTION_HASH_SHUFFLE based on context.
118+
// Only generic RequireHash adapts to LOCAL_EXECUTION_HASH_SHUFFLE.
120119
// Explicit RequireSpecific (GLOBAL_EXECUTION_HASH_SHUFFLE, BUCKET_HASH_SHUFFLE, etc.)
121120
// must never be degraded — if they appear in an invalid context, the plan is wrong.
122-
if (require instanceof RequireHash
123-
&& shouldUseLocalExecutionHash(translatorContext, parent, child)) {
124-
return LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE;
125-
}
126-
return require.preferType();
127-
}
128-
129-
private static boolean shouldUseLocalExecutionHash(
130-
PlanTranslatorContext translatorContext, PlanNode parent, PlanNode child) {
121+
//
131122
// Always prefer LOCAL_EXECUTION_HASH_SHUFFLE for FE-planned intra-fragment hash exchanges.
132123
// GLOBAL_EXECUTION_HASH_SHUFFLE requires shuffle_idx_to_instance_idx which may be empty
133124
// for fragments with non-hash sinks (UNPARTITIONED/MERGE). LOCAL_HASH is always safe
134125
// since it partitions by local instance count without needing external shuffle maps.
135-
return true;
126+
if (require instanceof RequireHash) {
127+
return LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE;
128+
}
129+
return require.preferType();
136130
}
137131
}

fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.doris.analysis.ExprToThriftVisitor;
2626
import org.apache.doris.analysis.ToSqlParams;
2727
import org.apache.doris.analysis.TupleDescriptor;
28-
import org.apache.doris.planner.LocalExchangeNode.LocalExchangeTypeRequire;
2928
import org.apache.doris.thrift.TDataSink;
3029
import org.apache.doris.thrift.TDataSinkType;
3130
import org.apache.doris.thrift.TDataStreamSink;
@@ -241,8 +240,4 @@ protected TDataSink toThrift() {
241240
return result;
242241
}
243242

244-
@Override
245-
public LocalExchangeTypeRequire getLocalExchangeTypeRequire() {
246-
return LocalExchangeTypeRequire.noRequire();
247-
}
248243
}

fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -102,15 +102,12 @@ public void setMergeInfo(SortInfo info) {
102102

103103
@Override
104104
protected void toThrift(TPlanNode msg) {
105-
// If this fragment has another scan node, this exchange node is serial or not should be decided by the scan
106-
// node.
107-
// When FE-planned local shuffle is enabled, ExchangeNode's serial flag is preserved
108-
// (same as BE-planned path). The ExchangeNode pipeline will have num_tasks=1 when serial,
109-
// meaning only instance 0 creates a task. This is correct because FE's
110-
// filterInstancesWhichCanReceiveDataFromRemote() ensures only the first instance per
111-
// worker receives remote data. The deferred exchanger uses upstream_pipe->num_tasks()
112-
// to get the correct sender_count.
113-
msg.setIsSerialOperator((isSerialOperator() || fragment.hasSerialScanNode())
105+
// Exchange serial status should depend only on isSerialOperator() (UNPARTITIONED or
106+
// use_serial_exchange), NOT on fragment.hasSerialScanNode(). With FE-planned local
107+
// exchanges, serial scans are already handled by PASSTHROUGH fan-out. Making
108+
// non-UNPARTITIONED exchanges (e.g. BUCKET_SHUFFLE) serial would reduce the build-side
109+
// pipeline to num_tasks=1, preventing shared state propagation to other instances.
110+
msg.setIsSerialOperator(isSerialOperator()
114111
&& fragment.useSerialSource(ConnectContext.get()));
115112
msg.node_type = TPlanNodeType.EXCHANGE_NODE;
116113
msg.exchange_node = new TExchangeNode();

fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1419,6 +1419,12 @@ public long getCatalogId() {
14191419
public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange(
14201420
PlanTranslatorContext translatorContext, PlanNode parent,
14211421
LocalExchangeTypeRequire parentRequire) {
1422-
return super.enforceAndDeriveLocalExchange(translatorContext, parent, parentRequire);
1422+
boolean useSerialSource = fragment != null
1423+
&& fragment.useSerialSource(translatorContext.getConnectContext());
1424+
if (useSerialSource) {
1425+
return Pair.of(this, LocalExchangeType.NOOP);
1426+
}
1427+
// Non-pooling OlapScan has bucket distribution — each instance scans specific buckets
1428+
return Pair.of(this, LocalExchangeType.BUCKET_HASH_SHUFFLE);
14231429
}
14241430
}

0 commit comments

Comments
 (0)