Skip to content

Commit 1339109

Browse files
authored
GH-5121: support empty left bind join (OPTIONAL) in FedX (#5200)
2 parents ab02413 + e6988bf commit 1339109

3 files changed

Lines changed: 171 additions & 3 deletions

File tree

tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ControlledWorkerBindLeftJoin.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.util.List;
1414

1515
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
16+
import org.eclipse.rdf4j.federated.algebra.EmptyStatementPattern;
1617
import org.eclipse.rdf4j.federated.algebra.StatementTupleExpr;
1718
import org.eclipse.rdf4j.federated.evaluation.FederationEvalStrategy;
1819
import org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler;
@@ -42,7 +43,9 @@ protected TaskCreator determineTaskCreator(TupleExpr expr, BindingSet bs) {
4243
if (expr instanceof StatementTupleExpr) {
4344
StatementTupleExpr stmt = (StatementTupleExpr) expr;
4445
taskCreator = new LeftBoundJoinTaskCreator(strategy, stmt);
45-
46+
} else if (expr instanceof EmptyStatementPattern) {
47+
EmptyStatementPattern stmt = (EmptyStatementPattern) expr;
48+
taskCreator = new EmptyLeftBoundJoinTaskCreator(strategy, stmt);
4649
} else {
4750
throw new RuntimeException("Expr is of unexpected type: " + expr.getClass().getCanonicalName()
4851
+ ". Please report this problem.");
@@ -67,4 +70,20 @@ public ParallelTask<BindingSet> getTask(ParallelExecutor<BindingSet> control, Li
6770
}
6871
}
6972

73+
static protected class EmptyLeftBoundJoinTaskCreator implements TaskCreator {
74+
protected final FederationEvalStrategy _strategy;
75+
protected final EmptyStatementPattern _expr;
76+
77+
public EmptyLeftBoundJoinTaskCreator(
78+
FederationEvalStrategy strategy, EmptyStatementPattern expr) {
79+
super();
80+
_strategy = strategy;
81+
_expr = expr;
82+
}
83+
84+
@Override
85+
public ParallelTask<BindingSet> getTask(ParallelExecutor<BindingSet> control, List<BindingSet> bindings) {
86+
return new ParallelEmptyBindLeftJoinTask(control, _strategy, _expr, bindings);
87+
}
88+
}
7089
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2024 Eclipse RDF4J contributors.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Distribution License v1.0
6+
* which accompanies this distribution, and is available at
7+
* http://www.eclipse.org/org/documents/edl-v10.php.
8+
*
9+
* SPDX-License-Identifier: BSD-3-Clause
10+
*******************************************************************************/
11+
package org.eclipse.rdf4j.federated.evaluation.join;
12+
13+
import java.util.List;
14+
15+
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
16+
import org.eclipse.rdf4j.federated.algebra.EmptyStatementPattern;
17+
import org.eclipse.rdf4j.federated.evaluation.FederationEvalStrategy;
18+
import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelExecutor;
19+
import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelTaskBase;
20+
import org.eclipse.rdf4j.query.BindingSet;
21+
import org.eclipse.rdf4j.repository.sparql.federation.CollectionIteration;
22+
23+
/**
24+
* A {@link ParallelTaskBase} for executing bind left joins, where the join argument is an
25+
* {@link EmptyStatementPattern}. The effective result is that the input bindings from the left operand are passed
26+
* through.
27+
*
28+
* @author Andreas Schwarte
29+
*/
30+
public class ParallelEmptyBindLeftJoinTask extends ParallelTaskBase<BindingSet> {
31+
32+
protected final FederationEvalStrategy strategy;
33+
protected final EmptyStatementPattern rightArg;
34+
protected final List<BindingSet> bindings;
35+
protected final ParallelExecutor<BindingSet> joinControl;
36+
37+
public ParallelEmptyBindLeftJoinTask(ParallelExecutor<BindingSet> joinControl, FederationEvalStrategy strategy,
38+
EmptyStatementPattern expr, List<BindingSet> bindings) {
39+
this.strategy = strategy;
40+
this.rightArg = expr;
41+
this.bindings = bindings;
42+
this.joinControl = joinControl;
43+
}
44+
45+
@Override
46+
public ParallelExecutor<BindingSet> getControl() {
47+
return joinControl;
48+
}
49+
50+
@Override
51+
protected CloseableIteration<BindingSet> performTaskInternal() throws Exception {
52+
// simply return the input bindings (=> the empty statement pattern cannot add results)
53+
return new CollectionIteration<BindingSet>(bindings);
54+
}
55+
56+
}

tools/federation/src/test/java/org/eclipse/rdf4j/federated/BindLeftJoinTests.java

Lines changed: 95 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,8 +192,6 @@ public void testBoundLeftJoin_stmt_nonExclusive_boundCheck(boolean bindLeftJoinO
192192
conn.add(Values.iri("http://other.com/p30"), FOAF.GENDER, Values.literal("male"));
193193
}
194194

195-
fedxRule.enableDebug();
196-
197195
try {
198196
// run query which joins results from multiple repos
199197
// for a subset of persons there exist names
@@ -246,4 +244,99 @@ public void testBoundLeftJoin_stmt_nonExclusive_boundCheck(boolean bindLeftJoinO
246244
}
247245
}
248246

247+
@ParameterizedTest
248+
@ValueSource(booleans = { true, false })
249+
public void test_leftBindJoin_emptyOptional(boolean bindLeftJoinOptimizationEnabled) throws Exception {
250+
251+
prepareTest(
252+
Arrays.asList("/tests/basic/data_emptyStore.ttl", "/tests/basic/data_emptyStore.ttl",
253+
"/tests/basic/data_emptyStore.ttl"));
254+
255+
Repository repo1 = getRepository(1);
256+
Repository repo2 = getRepository(2);
257+
Repository repo3 = getRepository(3);
258+
259+
Repository fedxRepo = fedxRule.getRepository();
260+
261+
fedxRule.setConfig(config -> {
262+
config.withBoundJoinBlockSize(10);
263+
config.withEnableOptionalAsBindJoin(bindLeftJoinOptimizationEnabled);
264+
});
265+
266+
// add some persons
267+
try (RepositoryConnection conn = repo1.getConnection()) {
268+
269+
for (int i = 1; i <= 30; i++) {
270+
var p = Values.iri("http://ex.com/p" + i);
271+
var otherP = Values.iri("http://other.com/p" + i);
272+
conn.add(p, OWL.SAMEAS, otherP);
273+
}
274+
}
275+
276+
// add names for person 1, 4, 7, ...
277+
try (RepositoryConnection conn = repo2.getConnection()) {
278+
279+
for (int i = 1; i <= 30; i += 3) {
280+
var otherP = Values.iri("http://other.com/p" + i);
281+
conn.add(otherP, FOAF.NAME, Values.literal("Person " + i));
282+
}
283+
}
284+
285+
// add names for person 2, 5, 8, ...
286+
try (RepositoryConnection conn = repo3.getConnection()) {
287+
288+
for (int i = 2; i <= 30; i += 3) {
289+
var otherP = Values.iri("http://other.com/p" + i);
290+
conn.add(otherP, FOAF.NAME, Values.literal("Person " + i));
291+
}
292+
}
293+
294+
try {
295+
// run query which joins results from multiple repos
296+
// for a subset of persons there exist names
297+
// the age does not exist for any person
298+
try (RepositoryConnection conn = fedxRepo.getConnection()) {
299+
String query = "PREFIX foaf: <http://xmlns.com/foaf/0.1/> " +
300+
"SELECT * WHERE { "
301+
+ " ?person owl:sameAs ?otherPerson . "
302+
+ " OPTIONAL { ?otherPerson foaf:name ?name . } " // # @repo2 and @repo3
303+
+ " OPTIONAL { ?otherPerson foaf:age ?age . } " // # does not exist
304+
+ "}";
305+
306+
TupleQuery tupleQuery = conn.prepareTupleQuery(query);
307+
try (TupleQueryResult tqr = tupleQuery.evaluate()) {
308+
var bindings = Iterations.asList(tqr);
309+
310+
Assertions.assertEquals(30, bindings.size());
311+
312+
for (int i = 1; i <= 30; i++) {
313+
var p = Values.iri("http://ex.com/p" + i);
314+
var otherP = Values.iri("http://other.com/p" + i);
315+
316+
// find the bindingset for the person in the unordered result
317+
BindingSet bs = bindings.stream()
318+
.filter(b -> b.getValue("person").equals(p))
319+
.findFirst()
320+
.orElseThrow();
321+
322+
Assertions.assertEquals(otherP, bs.getValue("otherPerson"));
323+
if (i % 3 == 1 || i % 3 == 2) {
324+
// names from repo 2 or 3
325+
Assertions.assertEquals("Person " + i, bs.getValue("name").stringValue());
326+
} else {
327+
// no name for others
328+
Assertions.assertFalse(bs.hasBinding("name"));
329+
}
330+
331+
Assertions.assertEquals(otherP, bs.getValue("otherPerson"));
332+
Assertions.assertFalse(bs.hasBinding("age"));
333+
}
334+
}
335+
}
336+
337+
} finally {
338+
fedxRepo.shutDown();
339+
}
340+
}
341+
249342
}

0 commit comments

Comments
 (0)