Skip to content

Commit e6988bf

Browse files
committed
GH-5121: support empty left bind join (OPTIONAL) in FedX
Previously we introduced support for left bind joins in FedX. The case of empty left bind joins (i.e. where the clause inside the OPTIONAL does not provide any statements) was not handled and resulted in an exception This change now adds support for empty optional joins and passes the results from the left-handside through.
1 parent ab02413 commit e6988bf

3 files changed

Lines changed: 171 additions & 3 deletions

File tree

tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ControlledWorkerBindLeftJoin.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.util.List;
1414

1515
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
16+
import org.eclipse.rdf4j.federated.algebra.EmptyStatementPattern;
1617
import org.eclipse.rdf4j.federated.algebra.StatementTupleExpr;
1718
import org.eclipse.rdf4j.federated.evaluation.FederationEvalStrategy;
1819
import org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler;
@@ -42,7 +43,9 @@ protected TaskCreator determineTaskCreator(TupleExpr expr, BindingSet bs) {
4243
if (expr instanceof StatementTupleExpr) {
4344
StatementTupleExpr stmt = (StatementTupleExpr) expr;
4445
taskCreator = new LeftBoundJoinTaskCreator(strategy, stmt);
45-
46+
} else if (expr instanceof EmptyStatementPattern) {
47+
EmptyStatementPattern stmt = (EmptyStatementPattern) expr;
48+
taskCreator = new EmptyLeftBoundJoinTaskCreator(strategy, stmt);
4649
} else {
4750
throw new RuntimeException("Expr is of unexpected type: " + expr.getClass().getCanonicalName()
4851
+ ". Please report this problem.");
@@ -67,4 +70,20 @@ public ParallelTask<BindingSet> getTask(ParallelExecutor<BindingSet> control, Li
6770
}
6871
}
6972

73+
static protected class EmptyLeftBoundJoinTaskCreator implements TaskCreator {
74+
protected final FederationEvalStrategy _strategy;
75+
protected final EmptyStatementPattern _expr;
76+
77+
public EmptyLeftBoundJoinTaskCreator(
78+
FederationEvalStrategy strategy, EmptyStatementPattern expr) {
79+
super();
80+
_strategy = strategy;
81+
_expr = expr;
82+
}
83+
84+
@Override
85+
public ParallelTask<BindingSet> getTask(ParallelExecutor<BindingSet> control, List<BindingSet> bindings) {
86+
return new ParallelEmptyBindLeftJoinTask(control, _strategy, _expr, bindings);
87+
}
88+
}
7089
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2024 Eclipse RDF4J contributors.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Distribution License v1.0
6+
* which accompanies this distribution, and is available at
7+
* http://www.eclipse.org/org/documents/edl-v10.php.
8+
*
9+
* SPDX-License-Identifier: BSD-3-Clause
10+
*******************************************************************************/
11+
package org.eclipse.rdf4j.federated.evaluation.join;
12+
13+
import java.util.List;
14+
15+
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
16+
import org.eclipse.rdf4j.federated.algebra.EmptyStatementPattern;
17+
import org.eclipse.rdf4j.federated.evaluation.FederationEvalStrategy;
18+
import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelExecutor;
19+
import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelTaskBase;
20+
import org.eclipse.rdf4j.query.BindingSet;
21+
import org.eclipse.rdf4j.repository.sparql.federation.CollectionIteration;
22+
23+
/**
24+
* A {@link ParallelTaskBase} for executing bind left joins, where the join argument is an
25+
* {@link EmptyStatementPattern}. The effective result is that the input bindings from the left operand are passed
26+
* through.
27+
*
28+
* @author Andreas Schwarte
29+
*/
30+
public class ParallelEmptyBindLeftJoinTask extends ParallelTaskBase<BindingSet> {
31+
32+
protected final FederationEvalStrategy strategy;
33+
protected final EmptyStatementPattern rightArg;
34+
protected final List<BindingSet> bindings;
35+
protected final ParallelExecutor<BindingSet> joinControl;
36+
37+
public ParallelEmptyBindLeftJoinTask(ParallelExecutor<BindingSet> joinControl, FederationEvalStrategy strategy,
38+
EmptyStatementPattern expr, List<BindingSet> bindings) {
39+
this.strategy = strategy;
40+
this.rightArg = expr;
41+
this.bindings = bindings;
42+
this.joinControl = joinControl;
43+
}
44+
45+
@Override
46+
public ParallelExecutor<BindingSet> getControl() {
47+
return joinControl;
48+
}
49+
50+
@Override
51+
protected CloseableIteration<BindingSet> performTaskInternal() throws Exception {
52+
// simply return the input bindings (=> the empty statement pattern cannot add results)
53+
return new CollectionIteration<BindingSet>(bindings);
54+
}
55+
56+
}

tools/federation/src/test/java/org/eclipse/rdf4j/federated/BindLeftJoinTests.java

Lines changed: 95 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,8 +192,6 @@ public void testBoundLeftJoin_stmt_nonExclusive_boundCheck(boolean bindLeftJoinO
192192
conn.add(Values.iri("http://other.com/p30"), FOAF.GENDER, Values.literal("male"));
193193
}
194194

195-
fedxRule.enableDebug();
196-
197195
try {
198196
// run query which joins results from multiple repos
199197
// for a subset of persons there exist names
@@ -246,4 +244,99 @@ public void testBoundLeftJoin_stmt_nonExclusive_boundCheck(boolean bindLeftJoinO
246244
}
247245
}
248246

247+
@ParameterizedTest
248+
@ValueSource(booleans = { true, false })
249+
public void test_leftBindJoin_emptyOptional(boolean bindLeftJoinOptimizationEnabled) throws Exception {
250+
251+
prepareTest(
252+
Arrays.asList("/tests/basic/data_emptyStore.ttl", "/tests/basic/data_emptyStore.ttl",
253+
"/tests/basic/data_emptyStore.ttl"));
254+
255+
Repository repo1 = getRepository(1);
256+
Repository repo2 = getRepository(2);
257+
Repository repo3 = getRepository(3);
258+
259+
Repository fedxRepo = fedxRule.getRepository();
260+
261+
fedxRule.setConfig(config -> {
262+
config.withBoundJoinBlockSize(10);
263+
config.withEnableOptionalAsBindJoin(bindLeftJoinOptimizationEnabled);
264+
});
265+
266+
// add some persons
267+
try (RepositoryConnection conn = repo1.getConnection()) {
268+
269+
for (int i = 1; i <= 30; i++) {
270+
var p = Values.iri("http://ex.com/p" + i);
271+
var otherP = Values.iri("http://other.com/p" + i);
272+
conn.add(p, OWL.SAMEAS, otherP);
273+
}
274+
}
275+
276+
// add names for person 1, 4, 7, ...
277+
try (RepositoryConnection conn = repo2.getConnection()) {
278+
279+
for (int i = 1; i <= 30; i += 3) {
280+
var otherP = Values.iri("http://other.com/p" + i);
281+
conn.add(otherP, FOAF.NAME, Values.literal("Person " + i));
282+
}
283+
}
284+
285+
// add names for person 2, 5, 8, ...
286+
try (RepositoryConnection conn = repo3.getConnection()) {
287+
288+
for (int i = 2; i <= 30; i += 3) {
289+
var otherP = Values.iri("http://other.com/p" + i);
290+
conn.add(otherP, FOAF.NAME, Values.literal("Person " + i));
291+
}
292+
}
293+
294+
try {
295+
// run query which joins results from multiple repos
296+
// for a subset of persons there exist names
297+
// the age does not exist for any person
298+
try (RepositoryConnection conn = fedxRepo.getConnection()) {
299+
String query = "PREFIX foaf: <http://xmlns.com/foaf/0.1/> " +
300+
"SELECT * WHERE { "
301+
+ " ?person owl:sameAs ?otherPerson . "
302+
+ " OPTIONAL { ?otherPerson foaf:name ?name . } " // # @repo2 and @repo3
303+
+ " OPTIONAL { ?otherPerson foaf:age ?age . } " // # does not exist
304+
+ "}";
305+
306+
TupleQuery tupleQuery = conn.prepareTupleQuery(query);
307+
try (TupleQueryResult tqr = tupleQuery.evaluate()) {
308+
var bindings = Iterations.asList(tqr);
309+
310+
Assertions.assertEquals(30, bindings.size());
311+
312+
for (int i = 1; i <= 30; i++) {
313+
var p = Values.iri("http://ex.com/p" + i);
314+
var otherP = Values.iri("http://other.com/p" + i);
315+
316+
// find the bindingset for the person in the unordered result
317+
BindingSet bs = bindings.stream()
318+
.filter(b -> b.getValue("person").equals(p))
319+
.findFirst()
320+
.orElseThrow();
321+
322+
Assertions.assertEquals(otherP, bs.getValue("otherPerson"));
323+
if (i % 3 == 1 || i % 3 == 2) {
324+
// names from repo 2 or 3
325+
Assertions.assertEquals("Person " + i, bs.getValue("name").stringValue());
326+
} else {
327+
// no name for others
328+
Assertions.assertFalse(bs.hasBinding("name"));
329+
}
330+
331+
Assertions.assertEquals(otherP, bs.getValue("otherPerson"));
332+
Assertions.assertFalse(bs.hasBinding("age"));
333+
}
334+
}
335+
}
336+
337+
} finally {
338+
fedxRepo.shutDown();
339+
}
340+
}
341+
249342
}

0 commit comments

Comments
 (0)