Skip to content

Commit 6e225bb

Browse files
924060929Copilot
andcommitted
[fix](fe) Fix NLJ build-side source_deps crash when serial NLJ gets PASSTHROUGH
### What problem does this PR solve? Issue Number: close #xxx Problem Summary: When NLJ is serial (RIGHT_OUTER/ANTI/SEMI/FULL_OUTER), shouldResetSerialFlagForChild(1) was unconditionally true, clearing the serial ancestor flag for the build side. This caused Exchange(UNPARTITIONED) on the build side to insert a PASSTHROUGH local exchange, restoring the build pipeline's num_tasks to _num_instances while the probe pipeline stays at 1. The extra build tasks had NLJ shared states with empty source_deps, crashing in Dependency::set_ready_to_read(). Fix: shouldResetSerialFlagForChild(1) now returns false when NLJ is serial, preserving the serial ancestor flag on the build side so Exchange returns NOOP instead of inserting PASSTHROUGH. ### Release note None ### Check List (For Author) - Test: Regression test added (Bug 19 in test_local_shuffle_rqg_bugs.groovy) - Behavior changed: No - Does this need documentation: No Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent b047529 commit 6e225bb

2 files changed

Lines changed: 59 additions & 1 deletion

File tree

fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,14 @@ public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange(PlanTrans
231231

232232
@Override
233233
protected boolean shouldResetSerialFlagForChild(int childIndex) {
234-
return childIndex == 1;
234+
// Build side (child 1) is a separate pipeline in BE. Normally,
235+
// the serial-ancestor flag should be reset across pipeline boundaries.
236+
// BUT when NLJ itself is serial (RIGHT_OUTER/ANTI/SEMI/FULL_OUTER),
237+
// the probe pipeline has num_tasks=1. If we reset the flag, the
238+
// build-side Exchange may insert PASSTHROUGH (restoring num_tasks to
239+
// _num_instances), creating more build tasks than probe tasks. The
240+
// extra build tasks have a NLJ shared state with empty source_deps,
241+
// crashing in set_ready_to_read().
242+
return childIndex == 1 && !isSerialOperator();
235243
}
236244
}

regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1041,5 +1041,55 @@ suite("test_local_shuffle_rqg_bugs") {
10411041
assertTrue(false, "Bug 18: Serial NLJ PASSTHROUGH LE crash: ${t.message}")
10421042
}
10431043

1044+
// Bug 19: source_deps.size()=0 crash in NLJ build sink.
1045+
// Root cause: serial NLJ (RIGHT_OUTER) resets serial ancestor flag for build side.
1046+
// Exchange(UNPARTITIONED) on build side sees hasSerialAncestorInPipeline=false and
1047+
// inserts PASSTHROUGH LE. This restores build pipeline num_tasks to _num_instances
1048+
// while probe pipeline stays at 1. The extra build tasks have NLJ shared state with
1049+
// empty source_deps → crash in set_ready_to_read().
1050+
// Fix: shouldResetSerialFlagForChild(1) returns false when NLJ is serial.
1051+
// Differs from Bug 18 in fuzzy vars: enable_share_hash_table=true, broadcast_passthrough=false.
1052+
try {
1053+
logger.info("Bug 19: Testing serial NLJ build-side source_deps crash")
1054+
def bug19_baseline = sql """
1055+
SELECT /*+SET_VAR(enable_local_shuffle_planner=false,
1056+
enable_sql_cache=false)*/
1057+
table1.col_varchar_10__undef_signed AS field1
1058+
FROM rqg_t1 AS table1
1059+
LEFT JOIN rqg_t1 AS table2
1060+
ON table2.pk = table2.pk
1061+
WHERE table1.pk BETWEEN 2 AND 11
1062+
GROUP BY field1
1063+
ORDER BY 1
1064+
"""
1065+
1066+
for (int ppt : [2, 4]) {
1067+
def bug19_result = sql """
1068+
SELECT /*+SET_VAR(use_serial_exchange=false,
1069+
parallel_pipeline_task_num=${ppt},
1070+
enable_local_shuffle_planner=true,
1071+
ignore_storage_data_distribution=true,
1072+
enable_sql_cache=false,
1073+
enable_share_hash_table_for_broadcast_join=true,
1074+
enable_broadcast_join_force_passthrough=false,
1075+
enable_parallel_scan=true,
1076+
disable_streaming_preaggregations=true)*/
1077+
table1.col_varchar_10__undef_signed AS field1
1078+
FROM rqg_t1 AS table1
1079+
LEFT JOIN rqg_t1 AS table2
1080+
ON table2.pk = table2.pk
1081+
WHERE table1.pk BETWEEN 2 AND 11
1082+
GROUP BY field1
1083+
ORDER BY 1
1084+
"""
1085+
assertEquals(bug19_baseline, bug19_result,
1086+
"Bug 19 pptn=${ppt}: result mismatch with serial NLJ build side crash")
1087+
}
1088+
logger.info("Bug 19: PASSED (no crash, correct results)")
1089+
} catch (Throwable t) {
1090+
logger.error("Bug 19 FAILED: ${t.message}")
1091+
assertTrue(false, "Bug 19: Serial NLJ build-side source_deps crash: ${t.message}")
1092+
}
1093+
10441094
logger.info("=== All RQG bug reproduction tests completed ===")
10451095
}

0 commit comments

Comments
 (0)