Skip to content

Commit 2d86cd3

Browse files
committed
GH-5121: prepare execution of left joins in the federation strategy
Prepare to execute a specific implementation of a left join implementation through the federation strategy.
1 parent 4eb1479 commit 2d86cd3

3 files changed

Lines changed: 41 additions & 5 deletions

File tree

tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/FederationEvalStrategy.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@
5757
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin;
5858
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBoundJoin;
5959
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerJoin;
60-
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerLeftJoin;
6160
import org.eclipse.rdf4j.federated.evaluation.join.SynchronousBoundJoin;
6261
import org.eclipse.rdf4j.federated.evaluation.join.SynchronousJoin;
6362
import org.eclipse.rdf4j.federated.evaluation.union.ControlledWorkerUnion;
@@ -96,6 +95,7 @@
9695
import org.eclipse.rdf4j.query.QueryEvaluationException;
9796
import org.eclipse.rdf4j.query.algebra.DescribeOperator;
9897
import org.eclipse.rdf4j.query.algebra.Join;
98+
import org.eclipse.rdf4j.query.algebra.LeftJoin;
9999
import org.eclipse.rdf4j.query.algebra.QueryRoot;
100100
import org.eclipse.rdf4j.query.algebra.Service;
101101
import org.eclipse.rdf4j.query.algebra.StatementPattern;
@@ -745,10 +745,7 @@ public CloseableIteration<BindingSet> evaluate(BindingSet bindings) {
745745

746746
if (problemVars.containsAll(bindings.getBindingNames())) {
747747
var leftIter = leftPrepared.evaluate(bindings);
748-
ControlledWorkerLeftJoin join = new ControlledWorkerLeftJoin(scheduler, FederationEvalStrategy.this,
749-
leftIter, leftJoin, bindings, leftJoin.getQueryInfo());
750-
executor.execute(join);
751-
return join;
748+
return executeLeftJoin(scheduler, leftIter, leftJoin, bindings, leftJoin.getQueryInfo());
752749
} else {
753750
Set<String> problemVarsClone = new HashSet<>(problemVars);
754751
problemVarsClone.retainAll(bindings.getBindingNames());
@@ -839,6 +836,21 @@ protected abstract CloseableIteration<BindingSet> executeJoin(
839836
CloseableIteration<BindingSet> leftIter, TupleExpr rightArg,
840837
Set<String> joinVariables, BindingSet bindings, QueryInfo queryInfo) throws QueryEvaluationException;
841838

839+
/**
840+
* Execute the left join in a separate thread using some join executor.
841+
*
842+
* @param joinScheduler
843+
* @param leftIter
844+
* @param leftJoin
845+
* @param bindings
846+
* @return the result
847+
* @throws QueryEvaluationException
848+
*/
849+
protected abstract CloseableIteration<BindingSet> executeLeftJoin(
850+
ControlledWorkerScheduler<BindingSet> joinScheduler,
851+
CloseableIteration<BindingSet> leftIter, LeftJoin leftJoin,
852+
BindingSet bindings, QueryInfo queryInfo) throws QueryEvaluationException;
853+
842854
public abstract CloseableIteration<BindingSet> evaluateExclusiveGroup(
843855
ExclusiveGroup group, BindingSet bindings)
844856
throws RepositoryException, MalformedQueryException, QueryEvaluationException;

tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/SailFederationEvalStrategy.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@
2727
import org.eclipse.rdf4j.federated.evaluation.iterator.FilteringIteration;
2828
import org.eclipse.rdf4j.federated.evaluation.iterator.GroupedCheckConversionIteration;
2929
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerJoin;
30+
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerLeftJoin;
3031
import org.eclipse.rdf4j.federated.structures.QueryInfo;
3132
import org.eclipse.rdf4j.federated.util.QueryAlgebraUtil;
3233
import org.eclipse.rdf4j.query.BindingSet;
3334
import org.eclipse.rdf4j.query.MalformedQueryException;
3435
import org.eclipse.rdf4j.query.QueryEvaluationException;
36+
import org.eclipse.rdf4j.query.algebra.LeftJoin;
3537
import org.eclipse.rdf4j.query.algebra.StatementPattern;
3638
import org.eclipse.rdf4j.query.algebra.TupleExpr;
3739
import org.eclipse.rdf4j.repository.RepositoryException;
@@ -119,6 +121,16 @@ public CloseableIteration<BindingSet> executeJoin(
119121
return join;
120122
}
121123

124+
@Override
125+
protected CloseableIteration<BindingSet> executeLeftJoin(ControlledWorkerScheduler<BindingSet> joinScheduler,
126+
CloseableIteration<BindingSet> leftIter, LeftJoin leftJoin, BindingSet bindings, QueryInfo queryInfo)
127+
throws QueryEvaluationException {
128+
ControlledWorkerLeftJoin join = new ControlledWorkerLeftJoin(joinScheduler, this,
129+
leftIter, leftJoin, bindings, queryInfo);
130+
executor.execute(join);
131+
return join;
132+
}
133+
122134
@Override
123135
public CloseableIteration<BindingSet> evaluateExclusiveGroup(
124136
ExclusiveGroup group, BindingSet bindings)

tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/SparqlFederationEvalStrategy.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.eclipse.rdf4j.federated.evaluation.iterator.SingleBindingSetIteration;
3434
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin;
3535
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerJoin;
36+
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerLeftJoin;
3637
import org.eclipse.rdf4j.federated.evaluation.join.JoinExecutorBase;
3738
import org.eclipse.rdf4j.federated.exception.ExceptionUtil;
3839
import org.eclipse.rdf4j.federated.exception.IllegalQueryException;
@@ -41,6 +42,7 @@
4142
import org.eclipse.rdf4j.query.BindingSet;
4243
import org.eclipse.rdf4j.query.MalformedQueryException;
4344
import org.eclipse.rdf4j.query.QueryEvaluationException;
45+
import org.eclipse.rdf4j.query.algebra.LeftJoin;
4446
import org.eclipse.rdf4j.query.algebra.StatementPattern;
4547
import org.eclipse.rdf4j.query.algebra.TupleExpr;
4648
import org.eclipse.rdf4j.repository.RepositoryException;
@@ -201,6 +203,16 @@ public CloseableIteration<BindingSet> executeJoin(
201203
return join;
202204
}
203205

206+
@Override
207+
protected CloseableIteration<BindingSet> executeLeftJoin(ControlledWorkerScheduler<BindingSet> joinScheduler,
208+
CloseableIteration<BindingSet> leftIter, LeftJoin leftJoin, BindingSet bindings, QueryInfo queryInfo)
209+
throws QueryEvaluationException {
210+
ControlledWorkerLeftJoin join = new ControlledWorkerLeftJoin(joinScheduler, this,
211+
leftIter, leftJoin, bindings, queryInfo);
212+
executor.execute(join);
213+
return join;
214+
}
215+
204216
@Override
205217
public CloseableIteration<BindingSet> evaluateExclusiveGroup(
206218
ExclusiveGroup group, BindingSet bindings) throws RepositoryException,

0 commit comments

Comments
 (0)