Skip to content

Commit 602e6b1

Browse files
committed
[fix](local shuffle) Fix COREDUMP: skip LE insertion when child is serial in enforceChild
When a streaming AGG requests PASSTHROUGH from a serial UNPARTITIONED Exchange child, enforceChild would insert a PASSTHROUGH local exchange creating a pipeline split. This disconnects the AggSink↔AggSource shared state pairing because the LE boundary puts them in separate pipelines with mismatched task counts, causing source_deps to be empty → DCHECK (ASAN) or SIGSEGV (Release) in set_ready_to_read(). Fix: add childOutput.first.isSerialOperator() check in enforceChild, mirroring BE's need_to_local_exchange which checks any_of(operators[idx..end], is_serial) to skip LE insertion when a serial operator is in the pipeline. Reproducer: multi-distinct COUNT query on table with many buckets (non-pooling) where MultiCastDataSinks feed serial UNPARTITIONED Exchanges → streaming AGG fragments.
1 parent 1034371 commit 602e6b1

1 file changed

Lines changed: 7 additions & 1 deletion

File tree

fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -979,7 +979,13 @@ protected Pair<PlanNode, LocalExchangeType> enforceChild(
979979
Pair<PlanNode, LocalExchangeType> childOutput = deriveAndEnforceChildLocalExchange(
980980
translatorContext, originChild, requireChild, childIndex);
981981
if (!requireChild.satisfy(childOutput.second)) {
982-
if (translatorContext.hasSerialAncestorInPipeline(this) || isSerialOperator()) {
982+
// Mirrors BE need_to_local_exchange: any_of(operators[idx..end], is_serial) → skip.
983+
// Check ancestors (hasSerialAncestorInPipeline), self (isSerialOperator), AND child.
984+
// When the child is serial (e.g. UNPARTITIONED Exchange), inserting a PASSTHROUGH LE
985+
// creates a pipeline split where AggSink↔AggSource shared state becomes disconnected
986+
// because the LE boundary puts them in separate pipelines with mismatched task counts.
987+
if (translatorContext.hasSerialAncestorInPipeline(this) || isSerialOperator()
988+
|| childOutput.first.isSerialOperator()) {
983989
return childOutput;
984990
}
985991
LocalExchangeType preferType = AddLocalExchange.resolveExchangeType(

0 commit comments

Comments
 (0)