Skip to content

Commit 558a595

Browse files
committed
GH-5121: implementation of left bind join operator
This change provides the implementation and activation for the left bind join operator. The algorithm is as follows: - execute left bind join using regular bound join query - process result iteration similar to BoundJoinVALUESConversionIteration - remember seen set of bindings (using index) and add original bindings to those, i.e. put to result return all non-seen bindings directly from the input Note that the terminology in literature has changed to "bind joins". Hence, for new classes and methods I try to follow that. Change is covered with some unit tests
1 parent 2d86cd3 commit 558a595

6 files changed

Lines changed: 527 additions & 2 deletions

File tree

tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/FederationEvalStrategy.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.eclipse.rdf4j.federated.algebra.FedXZeroLengthPath;
3838
import org.eclipse.rdf4j.federated.algebra.FederatedDescribeOperator;
3939
import org.eclipse.rdf4j.federated.algebra.FilterExpr;
40+
import org.eclipse.rdf4j.federated.algebra.FilterTuple;
4041
import org.eclipse.rdf4j.federated.algebra.FilterValueExpr;
4142
import org.eclipse.rdf4j.federated.algebra.HolderNode;
4243
import org.eclipse.rdf4j.federated.algebra.NJoin;
@@ -51,8 +52,10 @@
5152
import org.eclipse.rdf4j.federated.endpoint.Endpoint;
5253
import org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler;
5354
import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelServiceExecutor;
55+
import org.eclipse.rdf4j.federated.evaluation.iterator.BindLeftJoinIteration;
5456
import org.eclipse.rdf4j.federated.evaluation.iterator.FedXPathIteration;
5557
import org.eclipse.rdf4j.federated.evaluation.iterator.FederatedDescribeIteration;
58+
import org.eclipse.rdf4j.federated.evaluation.iterator.FilteringIteration;
5659
import org.eclipse.rdf4j.federated.evaluation.iterator.SingleBindingSetIteration;
5760
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin;
5861
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBoundJoin;
@@ -66,6 +69,7 @@
6669
import org.eclipse.rdf4j.federated.evaluation.union.ParallelUnionOperatorTask;
6770
import org.eclipse.rdf4j.federated.evaluation.union.SynchronousWorkerUnion;
6871
import org.eclipse.rdf4j.federated.evaluation.union.WorkerUnionBase;
72+
import org.eclipse.rdf4j.federated.exception.ExceptionUtil;
6973
import org.eclipse.rdf4j.federated.exception.FedXRuntimeException;
7074
import org.eclipse.rdf4j.federated.exception.IllegalQueryException;
7175
import org.eclipse.rdf4j.federated.optimizer.DefaultFedXCostModel;
@@ -935,6 +939,59 @@ public abstract CloseableIteration<BindingSet> evaluateBoundJoinStatementPattern
935939
public abstract CloseableIteration<BindingSet> evaluateGroupedCheck(
936940
CheckStatementPattern stmt, final List<BindingSet> bindings) throws QueryEvaluationException;
937941

942+
/**
943+
* Evaluate the left bind join for the given {@link StatementTupleExpr} and bindings at the relevant endpoints.
944+
*
945+
* @param stmt
946+
* @param bindings
947+
* @return the result iteration
948+
* @throws QueryEvaluationException
949+
* @see {@link BindLeftJoinIteration}
950+
*/
951+
public CloseableIteration<BindingSet> evaluateLeftBoundJoinStatementPattern(
952+
StatementTupleExpr stmt, final List<BindingSet> bindings) throws QueryEvaluationException {
953+
// we can omit the bound join handling
954+
if (bindings.size() == 1) {
955+
return evaluate(stmt, bindings.get(0));
956+
}
957+
958+
FilterValueExpr filterExpr = null;
959+
if (stmt instanceof FilterTuple) {
960+
filterExpr = ((FilterTuple) stmt).getFilterExpr();
961+
}
962+
963+
AtomicBoolean isEvaluated = new AtomicBoolean(false);
964+
String preparedQuery = QueryStringUtil.selectQueryStringBoundJoinVALUES((StatementPattern) stmt, bindings,
965+
filterExpr, isEvaluated, stmt.getQueryInfo().getDataset());
966+
967+
CloseableIteration<BindingSet> result = null;
968+
try {
969+
result = evaluateAtStatementSources(preparedQuery, stmt.getStatementSources(), stmt.getQueryInfo());
970+
971+
// apply filter and/or convert to original bindings
972+
if (filterExpr != null && !isEvaluated.get()) {
973+
result = new BindLeftJoinIteration(result, bindings); // apply conversion
974+
result = new FilteringIteration(filterExpr, result, this); // apply filter
975+
if (!result.hasNext()) {
976+
result.close();
977+
return new EmptyIteration<>();
978+
}
979+
} else {
980+
result = new BindLeftJoinIteration(result, bindings);
981+
}
982+
983+
return result;
984+
} catch (Throwable t) {
985+
if (result != null) {
986+
result.close();
987+
}
988+
if (t instanceof InterruptedException) {
989+
Thread.currentThread().interrupt();
990+
}
991+
throw ExceptionUtil.toQueryEvaluationException(t);
992+
}
993+
}
994+
938995
/**
939996
* Evaluate a SERVICE using vectored evaluation, taking the provided bindings as input.
940997
*

tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/SparqlFederationEvalStrategy.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.eclipse.rdf4j.federated.evaluation.iterator.InsertBindingsIteration;
3333
import org.eclipse.rdf4j.federated.evaluation.iterator.SingleBindingSetIteration;
3434
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin;
35+
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindLeftJoin;
3536
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerJoin;
3637
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerLeftJoin;
3738
import org.eclipse.rdf4j.federated.evaluation.join.JoinExecutorBase;
@@ -207,8 +208,28 @@ public CloseableIteration<BindingSet> executeJoin(
207208
protected CloseableIteration<BindingSet> executeLeftJoin(ControlledWorkerScheduler<BindingSet> joinScheduler,
208209
CloseableIteration<BindingSet> leftIter, LeftJoin leftJoin, BindingSet bindings, QueryInfo queryInfo)
209210
throws QueryEvaluationException {
210-
ControlledWorkerLeftJoin join = new ControlledWorkerLeftJoin(joinScheduler, this,
211-
leftIter, leftJoin, bindings, queryInfo);
211+
212+
var rightArg = leftJoin.getRightArg();
213+
214+
// determine if we can execute the expr as bind join
215+
boolean executeAsBindJoin = false;
216+
if (rightArg instanceof BoundJoinTupleExpr) {
217+
if (rightArg instanceof FedXService) {
218+
executeAsBindJoin = false;
219+
} else {
220+
executeAsBindJoin = true;
221+
}
222+
}
223+
224+
JoinExecutorBase<BindingSet> join;
225+
if (executeAsBindJoin) {
226+
join = new ControlledWorkerBindLeftJoin(joinScheduler, this, leftIter, rightArg,
227+
bindings, queryInfo);
228+
} else {
229+
join = new ControlledWorkerLeftJoin(joinScheduler, this,
230+
leftIter, leftJoin, bindings, queryInfo);
231+
}
232+
212233
executor.execute(join);
213234
return join;
214235
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2024 Eclipse RDF4J contributors.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Distribution License v1.0
6+
* which accompanies this distribution, and is available at
7+
* http://www.eclipse.org/org/documents/edl-v10.php.
8+
*
9+
* SPDX-License-Identifier: BSD-3-Clause
10+
*******************************************************************************/
11+
package org.eclipse.rdf4j.federated.evaluation.iterator;
12+
13+
import java.util.HashSet;
14+
import java.util.Iterator;
15+
import java.util.List;
16+
import java.util.ListIterator;
17+
import java.util.Set;
18+
19+
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
20+
import org.eclipse.rdf4j.common.iteration.LookAheadIteration;
21+
import org.eclipse.rdf4j.query.Binding;
22+
import org.eclipse.rdf4j.query.BindingSet;
23+
import org.eclipse.rdf4j.query.QueryEvaluationException;
24+
import org.eclipse.rdf4j.query.algebra.evaluation.QueryBindingSet;
25+
26+
/**
27+
* A {@link LookAheadIteration} for processing bind left join results (i.e., result of joining OPTIONAL clauses)
28+
*
29+
* Algorithm:
30+
*
31+
* <ul>
32+
* <li>execute left bind join using regular bound join query</li>
33+
* <li>process result iteration similar to {@link BoundJoinVALUESConversionIteration}</li>
34+
* <li>remember seen set of bindings (using index) and add original bindings to those, i.e. put to result return all
35+
* non-seen bindings directly from the input</li>
36+
*
37+
*
38+
* @author Andreas Schwarte
39+
*/
40+
public class BindLeftJoinIteration extends LookAheadIteration<BindingSet> {
41+
42+
protected final CloseableIteration<BindingSet> iter;
43+
protected final List<BindingSet> bindings;
44+
45+
protected Set<Integer> seenBindingIndexes = new HashSet<>();
46+
protected final ListIterator<BindingSet> bindingsIterator;
47+
48+
public BindLeftJoinIteration(CloseableIteration<BindingSet> iter,
49+
List<BindingSet> bindings) {
50+
this.iter = iter;
51+
this.bindings = bindings;
52+
this.bindingsIterator = bindings.listIterator();
53+
}
54+
55+
@Override
56+
protected BindingSet getNextElement() {
57+
58+
if (iter.hasNext()) {
59+
var bIn = iter.next();
60+
int bIndex = Integer.parseInt(
61+
bIn.getBinding(BoundJoinVALUESConversionIteration.INDEX_BINDING_NAME).getValue().stringValue());
62+
seenBindingIndexes.add(bIndex);
63+
return convert(bIn, bIndex);
64+
}
65+
66+
while (bindingsIterator.hasNext()) {
67+
if (seenBindingIndexes.contains(bindingsIterator.nextIndex())) {
68+
// the binding was already processed as part of the optional
69+
bindingsIterator.next();
70+
continue;
71+
}
72+
return bindingsIterator.next();
73+
}
74+
75+
return null;
76+
}
77+
78+
@Override
79+
protected void handleClose() {
80+
iter.close();
81+
}
82+
83+
protected BindingSet convert(BindingSet bIn, int bIndex) throws QueryEvaluationException {
84+
QueryBindingSet res = new QueryBindingSet();
85+
Iterator<Binding> bIter = bIn.iterator();
86+
while (bIter.hasNext()) {
87+
Binding b = bIter.next();
88+
if (b.getName().equals(BoundJoinVALUESConversionIteration.INDEX_BINDING_NAME)) {
89+
continue;
90+
}
91+
res.addBinding(b);
92+
}
93+
for (Binding bs : bindings.get(bIndex)) {
94+
res.setBinding(bs);
95+
}
96+
return res;
97+
}
98+
99+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2024 Eclipse RDF4J contributors.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Distribution License v1.0
6+
* which accompanies this distribution, and is available at
7+
* http://www.eclipse.org/org/documents/edl-v10.php.
8+
*
9+
* SPDX-License-Identifier: BSD-3-Clause
10+
*******************************************************************************/
11+
package org.eclipse.rdf4j.federated.evaluation.join;
12+
13+
import java.util.List;
14+
15+
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
16+
import org.eclipse.rdf4j.federated.algebra.StatementTupleExpr;
17+
import org.eclipse.rdf4j.federated.evaluation.FederationEvalStrategy;
18+
import org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler;
19+
import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelExecutor;
20+
import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelTask;
21+
import org.eclipse.rdf4j.federated.structures.QueryInfo;
22+
import org.eclipse.rdf4j.query.BindingSet;
23+
import org.eclipse.rdf4j.query.QueryEvaluationException;
24+
import org.eclipse.rdf4j.query.algebra.TupleExpr;
25+
26+
/**
27+
* Bind join implementation for left joins (i.e., OPTIOAL clauses)
28+
*
29+
* @author Andreas Schwarte
30+
*/
31+
public class ControlledWorkerBindLeftJoin extends ControlledWorkerBindJoinBase {
32+
33+
public ControlledWorkerBindLeftJoin(ControlledWorkerScheduler<BindingSet> scheduler,
34+
FederationEvalStrategy strategy, CloseableIteration<BindingSet> leftIter, TupleExpr rightArg,
35+
BindingSet bindings, QueryInfo queryInfo) throws QueryEvaluationException {
36+
super(scheduler, strategy, leftIter, rightArg, bindings, queryInfo);
37+
}
38+
39+
@Override
40+
protected TaskCreator determineTaskCreator(TupleExpr expr, BindingSet bs) {
41+
final TaskCreator taskCreator;
42+
if (expr instanceof StatementTupleExpr) {
43+
StatementTupleExpr stmt = (StatementTupleExpr) expr;
44+
taskCreator = new LeftBoundJoinTaskCreator(strategy, stmt);
45+
46+
} else {
47+
throw new RuntimeException("Expr is of unexpected type: " + expr.getClass().getCanonicalName()
48+
+ ". Please report this problem.");
49+
}
50+
return taskCreator;
51+
}
52+
53+
static protected class LeftBoundJoinTaskCreator implements TaskCreator {
54+
protected final FederationEvalStrategy _strategy;
55+
protected final StatementTupleExpr _expr;
56+
57+
public LeftBoundJoinTaskCreator(
58+
FederationEvalStrategy strategy, StatementTupleExpr expr) {
59+
super();
60+
_strategy = strategy;
61+
_expr = expr;
62+
}
63+
64+
@Override
65+
public ParallelTask<BindingSet> getTask(ParallelExecutor<BindingSet> control, List<BindingSet> bindings) {
66+
return new ParallelBindLeftJoinTask(control, _strategy, _expr, bindings);
67+
}
68+
}
69+
70+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2024 Eclipse RDF4J contributors.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Distribution License v1.0
6+
* which accompanies this distribution, and is available at
7+
* http://www.eclipse.org/org/documents/edl-v10.php.
8+
*
9+
* SPDX-License-Identifier: BSD-3-Clause
10+
*******************************************************************************/
11+
package org.eclipse.rdf4j.federated.evaluation.join;
12+
13+
import java.util.List;
14+
15+
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
16+
import org.eclipse.rdf4j.federated.algebra.StatementTupleExpr;
17+
import org.eclipse.rdf4j.federated.evaluation.FederationEvalStrategy;
18+
import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelExecutor;
19+
import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelTaskBase;
20+
import org.eclipse.rdf4j.query.BindingSet;
21+
22+
/**
23+
* A {@link ParallelTaskBase} for executing bind left joins.
24+
*
25+
* @author Andreas Schwarte
26+
* @see FederationEvalStrategy#evaluateLeftBoundJoinStatementPattern(StatementTupleExpr, List)
27+
*/
28+
public class ParallelBindLeftJoinTask extends ParallelTaskBase<BindingSet> {
29+
30+
protected final FederationEvalStrategy strategy;
31+
protected final StatementTupleExpr rightArg;
32+
protected final List<BindingSet> bindings;
33+
protected final ParallelExecutor<BindingSet> joinControl;
34+
35+
public ParallelBindLeftJoinTask(ParallelExecutor<BindingSet> joinControl, FederationEvalStrategy strategy,
36+
StatementTupleExpr expr, List<BindingSet> bindings) {
37+
this.strategy = strategy;
38+
this.rightArg = expr;
39+
this.bindings = bindings;
40+
this.joinControl = joinControl;
41+
}
42+
43+
@Override
44+
public ParallelExecutor<BindingSet> getControl() {
45+
return joinControl;
46+
}
47+
48+
@Override
49+
protected CloseableIteration<BindingSet> performTaskInternal() throws Exception {
50+
return strategy.evaluateLeftBoundJoinStatementPattern(rightArg, bindings);
51+
}
52+
53+
}

0 commit comments

Comments
 (0)