Skip to content

Commit eefa338

Browse files
authored
Gh 5121 - bind left join support in FedX
2 parents 5d25e15 + 01bc075 commit eefa338

19 files changed

Lines changed: 1002 additions & 39 deletions

site/content/documentation/programming/federation.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,8 @@ FedX provides various means for configuration. Configuration settings can be def
305305
|leftJoinWorkerThreads | The number of left join worker threads for parallelization, default _10_ |
306306
|boundJoinBlockSize | Block size for bound joins, default _25_ |
307307
|enforceMaxQueryTime | Max query time in seconds, 0 to disable, default _30_ |
308-
|enableServiceAsBoundJoin | Flag for evaluating a SERVICE expression (contacting non-federation members) using vectored evaluation, default _true_. For today's endpoints it is more efficient to disable vectored evaluation of SERVICE |
308+
|enableServiceAsBoundJoin | Flag for evaluating a SERVICE expression (contacting non-federation members) using vectored evaluation, default _true_. |
309+
|enableOptionalAsBindJoin | Flag for evaluating an OPTIONAL expression using bind join, default _true_. |
309310
|includeInferredDefault | whether include inferred statements should be considered, default _true_ |
310311
|consumingIterationMax | the max number of results to be consumed by `ConsumingIteration`, default _1000_ |
311312
|debugQueryPlan | Print the optimized query execution plan to stdout, default _false_ |

tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedXConfig.java

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import java.util.Optional;
1414

1515
import org.eclipse.rdf4j.collection.factory.api.CollectionFactory;
16-
import org.eclipse.rdf4j.collection.factory.impl.DefaultCollectionFactory;
1716
import org.eclipse.rdf4j.federated.cache.SourceSelectionCache;
1817
import org.eclipse.rdf4j.federated.cache.SourceSelectionCacheFactory;
1918
import org.eclipse.rdf4j.federated.cache.SourceSelectionMemoryCache;
@@ -48,6 +47,8 @@ public class FedXConfig {
4847

4948
private boolean enableServiceAsBoundJoin = true;
5049

50+
private boolean enableOptionalAsBindJoin = true;
51+
5152
private boolean enableMonitoring = false;
5253

5354
private boolean isLogQueryPlan = false;
@@ -68,7 +69,6 @@ public class FedXConfig {
6869

6970
private int consumingIterationMax = 1000;
7071

71-
private CollectionFactory cf = new DefaultCollectionFactory();
7272
/* factory like setters */
7373

7474
/**
@@ -244,6 +244,17 @@ public FedXConfig withEnableServiceAsBoundJoin(boolean flag) {
244244
return this;
245245
}
246246

247+
/**
248+
* Whether OPTIONAL clauses are evaluated using bind join (i.e. with the VALUES clause). Default <i>true</i>
249+
*
250+
* @param flag
251+
* @return the current config.
252+
*/
253+
public FedXConfig withEnableOptionalAsBindJoin(boolean flag) {
254+
this.enableOptionalAsBindJoin = flag;
255+
return this;
256+
}
257+
247258
/**
248259
* The cache specification for the {@link SourceSelectionMemoryCache}. If not set explicitly, the
249260
* {@link SourceSelectionMemoryCache#DEFAULT_CACHE_SPEC} is used.
@@ -326,16 +337,26 @@ public int getBoundJoinBlockSize() {
326337
* Returns a flag indicating whether vectored evaluation using the VALUES clause shall be applied for SERVICE
327338
* expressions.
328339
*
329-
* Default: false
340+
* Default: true
330341
*
331-
* Note: for todays endpoints it is more efficient to disable vectored evaluation of SERVICE.
332-
*
333-
* @return whether SERVICE expressions are evaluated using bound joins
342+
* @return whether SERVICE expressions are evaluated using bind joins
334343
*/
335344
public boolean getEnableServiceAsBoundJoin() {
336345
return enableServiceAsBoundJoin;
337346
}
338347

348+
/**
349+
* Returns a flag indicating whether bind join evaluation using the VALUES clause shall be applied for OPTIONAL
350+
* expressions.
351+
*
352+
* Default: true
353+
*
354+
* @return whether OPTIONAL expressions are evaluated using bind joins
355+
*/
356+
public boolean isEnableOptionalAsBindJoin() {
357+
return enableOptionalAsBindJoin;
358+
}
359+
339360
/**
340361
* Get the maximum query time in seconds used for query evaluation. Applied if {@link QueryManager} is used to
341362
* create queries.
@@ -485,9 +506,10 @@ public int getConsumingIterationMax() {
485506
*
486507
* @param cf
487508
* @return the current config
509+
* @deprecated unusedO
488510
*/
511+
@Deprecated(forRemoval = true)
489512
public FedXConfig withCollectionFactory(CollectionFactory cf) {
490-
this.cf = cf;
491513
return this;
492514
}
493515
}

tools/federation/src/main/java/org/eclipse/rdf4j/federated/algebra/BoundJoinTupleExpr.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@
1010
*******************************************************************************/
1111
package org.eclipse.rdf4j.federated.algebra;
1212

13-
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBoundJoin;
13+
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin;
1414

1515
/**
1616
* Marker interface indicating that instances are applicable for bound join processing (see
17-
* {@link ControlledWorkerBoundJoin}
17+
* {@link ControlledWorkerBindJoin}
1818
*
1919
* @author Andreas Schwarte
20-
* @see ControlledWorkerBoundJoin
20+
* @see ControlledWorkerBindJoin
2121
*/
2222
public interface BoundJoinTupleExpr {
2323

tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/FederationEvalStrategy.java

Lines changed: 84 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,8 @@
1515
import java.util.Set;
1616
import java.util.concurrent.Executor;
1717
import java.util.concurrent.atomic.AtomicBoolean;
18-
import java.util.function.Supplier;
1918
import java.util.stream.Collectors;
2019

21-
import org.eclipse.rdf4j.collection.factory.api.CollectionFactory;
2220
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
2321
import org.eclipse.rdf4j.common.iteration.EmptyIteration;
2422
import org.eclipse.rdf4j.common.iteration.SingletonIteration;
@@ -39,6 +37,7 @@
3937
import org.eclipse.rdf4j.federated.algebra.FedXZeroLengthPath;
4038
import org.eclipse.rdf4j.federated.algebra.FederatedDescribeOperator;
4139
import org.eclipse.rdf4j.federated.algebra.FilterExpr;
40+
import org.eclipse.rdf4j.federated.algebra.FilterTuple;
4241
import org.eclipse.rdf4j.federated.algebra.FilterValueExpr;
4342
import org.eclipse.rdf4j.federated.algebra.HolderNode;
4443
import org.eclipse.rdf4j.federated.algebra.NJoin;
@@ -53,12 +52,14 @@
5352
import org.eclipse.rdf4j.federated.endpoint.Endpoint;
5453
import org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler;
5554
import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelServiceExecutor;
55+
import org.eclipse.rdf4j.federated.evaluation.iterator.BindLeftJoinIteration;
5656
import org.eclipse.rdf4j.federated.evaluation.iterator.FedXPathIteration;
5757
import org.eclipse.rdf4j.federated.evaluation.iterator.FederatedDescribeIteration;
58+
import org.eclipse.rdf4j.federated.evaluation.iterator.FilteringIteration;
5859
import org.eclipse.rdf4j.federated.evaluation.iterator.SingleBindingSetIteration;
60+
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin;
5961
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBoundJoin;
6062
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerJoin;
61-
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerLeftJoin;
6263
import org.eclipse.rdf4j.federated.evaluation.join.SynchronousBoundJoin;
6364
import org.eclipse.rdf4j.federated.evaluation.join.SynchronousJoin;
6465
import org.eclipse.rdf4j.federated.evaluation.union.ControlledWorkerUnion;
@@ -68,6 +69,7 @@
6869
import org.eclipse.rdf4j.federated.evaluation.union.ParallelUnionOperatorTask;
6970
import org.eclipse.rdf4j.federated.evaluation.union.SynchronousWorkerUnion;
7071
import org.eclipse.rdf4j.federated.evaluation.union.WorkerUnionBase;
72+
import org.eclipse.rdf4j.federated.exception.ExceptionUtil;
7173
import org.eclipse.rdf4j.federated.exception.FedXRuntimeException;
7274
import org.eclipse.rdf4j.federated.exception.IllegalQueryException;
7375
import org.eclipse.rdf4j.federated.optimizer.DefaultFedXCostModel;
@@ -97,6 +99,7 @@
9799
import org.eclipse.rdf4j.query.QueryEvaluationException;
98100
import org.eclipse.rdf4j.query.algebra.DescribeOperator;
99101
import org.eclipse.rdf4j.query.algebra.Join;
102+
import org.eclipse.rdf4j.query.algebra.LeftJoin;
100103
import org.eclipse.rdf4j.query.algebra.QueryRoot;
101104
import org.eclipse.rdf4j.query.algebra.Service;
102105
import org.eclipse.rdf4j.query.algebra.StatementPattern;
@@ -108,12 +111,10 @@
108111
import org.eclipse.rdf4j.query.algebra.evaluation.ValueExprEvaluationException;
109112
import org.eclipse.rdf4j.query.algebra.evaluation.federation.FederatedService;
110113
import org.eclipse.rdf4j.query.algebra.evaluation.federation.ServiceJoinIterator;
111-
import org.eclipse.rdf4j.query.algebra.evaluation.impl.DefaultEvaluationStrategy;
112114
import org.eclipse.rdf4j.query.algebra.evaluation.impl.EvaluationStatistics;
113115
import org.eclipse.rdf4j.query.algebra.evaluation.impl.QueryEvaluationContext;
114116
import org.eclipse.rdf4j.query.algebra.evaluation.impl.StrictEvaluationStrategy;
115117
import org.eclipse.rdf4j.query.algebra.evaluation.iterator.BadlyDesignedLeftJoinIterator;
116-
import org.eclipse.rdf4j.query.algebra.evaluation.iterator.DescribeIteration;
117118
import org.eclipse.rdf4j.query.algebra.evaluation.iterator.HashJoinIteration;
118119
import org.eclipse.rdf4j.query.algebra.evaluation.optimizer.ConstantOptimizer;
119120
import org.eclipse.rdf4j.query.algebra.evaluation.optimizer.DisjunctiveConstraintOptimizer;
@@ -748,10 +749,7 @@ public CloseableIteration<BindingSet> evaluate(BindingSet bindings) {
748749

749750
if (problemVars.containsAll(bindings.getBindingNames())) {
750751
var leftIter = leftPrepared.evaluate(bindings);
751-
ControlledWorkerLeftJoin join = new ControlledWorkerLeftJoin(scheduler, FederationEvalStrategy.this,
752-
leftIter, leftJoin, bindings, leftJoin.getQueryInfo());
753-
executor.execute(join);
754-
return join;
752+
return executeLeftJoin(scheduler, leftIter, leftJoin, bindings, leftJoin.getQueryInfo());
755753
} else {
756754
Set<String> problemVarsClone = new HashSet<>(problemVars);
757755
problemVarsClone.retainAll(bindings.getBindingNames());
@@ -815,8 +813,14 @@ public QueryEvaluationStep prepareNaryUnion(NUnion union, QueryEvaluationContext
815813
/**
816814
* Execute the join in a separate thread using some join executor.
817815
*
818-
* Join executors are for instance: - {@link SynchronousJoin} - {@link SynchronousBoundJoin} -
819-
* {@link ControlledWorkerJoin} - {@link ControlledWorkerBoundJoin}
816+
* Join executors are for instance:
817+
*
818+
* <ul>
819+
* <li>{@link SynchronousJoin}</li>
820+
* <li>{@link SynchronousBoundJoin}</li>
821+
* <li>{@link ControlledWorkerJoin}</li>
822+
* <li>{@link ControlledWorkerBindJoin}</li>
823+
* </ul>
820824
*
821825
* For endpoint federation use controlled worker bound join, for local federation use controlled worker join. The
822826
* other operators are there for completeness.
@@ -836,6 +840,21 @@ protected abstract CloseableIteration<BindingSet> executeJoin(
836840
CloseableIteration<BindingSet> leftIter, TupleExpr rightArg,
837841
Set<String> joinVariables, BindingSet bindings, QueryInfo queryInfo) throws QueryEvaluationException;
838842

843+
/**
844+
* Execute the left join in a separate thread using some join executor.
845+
*
846+
* @param joinScheduler
847+
* @param leftIter
848+
* @param leftJoin
849+
* @param bindings
850+
* @return the result
851+
* @throws QueryEvaluationException
852+
*/
853+
protected abstract CloseableIteration<BindingSet> executeLeftJoin(
854+
ControlledWorkerScheduler<BindingSet> joinScheduler,
855+
CloseableIteration<BindingSet> leftIter, LeftJoin leftJoin,
856+
BindingSet bindings, QueryInfo queryInfo) throws QueryEvaluationException;
857+
839858
public abstract CloseableIteration<BindingSet> evaluateExclusiveGroup(
840859
ExclusiveGroup group, BindingSet bindings)
841860
throws RepositoryException, MalformedQueryException, QueryEvaluationException;
@@ -920,10 +939,63 @@ public abstract CloseableIteration<BindingSet> evaluateBoundJoinStatementPattern
920939
public abstract CloseableIteration<BindingSet> evaluateGroupedCheck(
921940
CheckStatementPattern stmt, final List<BindingSet> bindings) throws QueryEvaluationException;
922941

942+
/**
943+
* Evaluate the left bind join for the given {@link StatementTupleExpr} and bindings at the relevant endpoints.
944+
*
945+
* @param stmt
946+
* @param bindings
947+
* @return the result iteration
948+
* @throws QueryEvaluationException
949+
* @see {@link BindLeftJoinIteration}
950+
*/
951+
public CloseableIteration<BindingSet> evaluateLeftBoundJoinStatementPattern(
952+
StatementTupleExpr stmt, final List<BindingSet> bindings) throws QueryEvaluationException {
953+
// we can omit the bound join handling
954+
if (bindings.size() == 1) {
955+
return evaluate(stmt, bindings.get(0));
956+
}
957+
958+
FilterValueExpr filterExpr = null;
959+
if (stmt instanceof FilterTuple) {
960+
filterExpr = ((FilterTuple) stmt).getFilterExpr();
961+
}
962+
963+
AtomicBoolean isEvaluated = new AtomicBoolean(false);
964+
String preparedQuery = QueryStringUtil.selectQueryStringBoundJoinVALUES((StatementPattern) stmt, bindings,
965+
filterExpr, isEvaluated, stmt.getQueryInfo().getDataset());
966+
967+
CloseableIteration<BindingSet> result = null;
968+
try {
969+
result = evaluateAtStatementSources(preparedQuery, stmt.getStatementSources(), stmt.getQueryInfo());
970+
971+
// apply filter and/or convert to original bindings
972+
if (filterExpr != null && !isEvaluated.get()) {
973+
result = new BindLeftJoinIteration(result, bindings); // apply conversion
974+
result = new FilteringIteration(filterExpr, result, this); // apply filter
975+
if (!result.hasNext()) {
976+
result.close();
977+
return new EmptyIteration<>();
978+
}
979+
} else {
980+
result = new BindLeftJoinIteration(result, bindings);
981+
}
982+
983+
return result;
984+
} catch (Throwable t) {
985+
if (result != null) {
986+
result.close();
987+
}
988+
if (t instanceof InterruptedException) {
989+
Thread.currentThread().interrupt();
990+
}
991+
throw ExceptionUtil.toQueryEvaluationException(t);
992+
}
993+
}
994+
923995
/**
924996
* Evaluate a SERVICE using vectored evaluation, taking the provided bindings as input.
925997
*
926-
* See {@link ControlledWorkerBoundJoin} and {@link FedXConfig#getEnableServiceAsBoundJoin()}
998+
* See {@link ControlledWorkerBindJoin} and {@link FedXConfig#getEnableServiceAsBoundJoin()}
927999
*
9281000
* @param service
9291001
* @param bindings

tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/SailFederationEvalStrategy.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@
2727
import org.eclipse.rdf4j.federated.evaluation.iterator.FilteringIteration;
2828
import org.eclipse.rdf4j.federated.evaluation.iterator.GroupedCheckConversionIteration;
2929
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerJoin;
30+
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerLeftJoin;
3031
import org.eclipse.rdf4j.federated.structures.QueryInfo;
3132
import org.eclipse.rdf4j.federated.util.QueryAlgebraUtil;
3233
import org.eclipse.rdf4j.query.BindingSet;
3334
import org.eclipse.rdf4j.query.MalformedQueryException;
3435
import org.eclipse.rdf4j.query.QueryEvaluationException;
36+
import org.eclipse.rdf4j.query.algebra.LeftJoin;
3537
import org.eclipse.rdf4j.query.algebra.StatementPattern;
3638
import org.eclipse.rdf4j.query.algebra.TupleExpr;
3739
import org.eclipse.rdf4j.repository.RepositoryException;
@@ -119,6 +121,16 @@ public CloseableIteration<BindingSet> executeJoin(
119121
return join;
120122
}
121123

124+
@Override
125+
protected CloseableIteration<BindingSet> executeLeftJoin(ControlledWorkerScheduler<BindingSet> joinScheduler,
126+
CloseableIteration<BindingSet> leftIter, LeftJoin leftJoin, BindingSet bindings, QueryInfo queryInfo)
127+
throws QueryEvaluationException {
128+
ControlledWorkerLeftJoin join = new ControlledWorkerLeftJoin(joinScheduler, this,
129+
leftIter, leftJoin, bindings, queryInfo);
130+
executor.execute(join);
131+
return join;
132+
}
133+
122134
@Override
123135
public CloseableIteration<BindingSet> evaluateExclusiveGroup(
124136
ExclusiveGroup group, BindingSet bindings)

0 commit comments

Comments
 (0)