Skip to content

Commit 4eb1479

Browse files
committed
GH-5121: refactor the bind join logic into a reusable base class
Refactor the existing logic for executing bind joins into a reusable base class. This change mostly moves the implementation logic from the existing ControlledWorkerBindJoin class to a new intermediate implementation (with the goal to make it reusable in a second step for left joins). Note that the new bind join implementation no longer uses the ControlledWorkerJoin as base class, i.e. the decision of which join implementation to use is moved to the strategy. For backwards code compatibility the "ControlledWorkerBoundJoin" is kept, but no longer used. Instead the new code is in ControlledWorkerBindJoin.
1 parent 7efff29 commit 4eb1479

7 files changed

Lines changed: 319 additions & 16 deletions

File tree

tools/federation/src/main/java/org/eclipse/rdf4j/federated/algebra/BoundJoinTupleExpr.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@
1010
*******************************************************************************/
1111
package org.eclipse.rdf4j.federated.algebra;
1212

13-
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBoundJoin;
13+
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin;
1414

1515
/**
1616
* Marker interface indicating that instances are applicable for bound join processing (see
17-
* {@link ControlledWorkerBoundJoin}
17+
* {@link ControlledWorkerBindJoin}
1818
*
1919
* @author Andreas Schwarte
20-
* @see ControlledWorkerBoundJoin
20+
* @see ControlledWorkerBindJoin
2121
*/
2222
public interface BoundJoinTupleExpr {
2323

tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/FederationEvalStrategy.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,8 @@
1515
import java.util.Set;
1616
import java.util.concurrent.Executor;
1717
import java.util.concurrent.atomic.AtomicBoolean;
18-
import java.util.function.Supplier;
1918
import java.util.stream.Collectors;
2019

21-
import org.eclipse.rdf4j.collection.factory.api.CollectionFactory;
2220
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
2321
import org.eclipse.rdf4j.common.iteration.EmptyIteration;
2422
import org.eclipse.rdf4j.common.iteration.SingletonIteration;
@@ -56,6 +54,7 @@
5654
import org.eclipse.rdf4j.federated.evaluation.iterator.FedXPathIteration;
5755
import org.eclipse.rdf4j.federated.evaluation.iterator.FederatedDescribeIteration;
5856
import org.eclipse.rdf4j.federated.evaluation.iterator.SingleBindingSetIteration;
57+
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin;
5958
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBoundJoin;
6059
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerJoin;
6160
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerLeftJoin;
@@ -108,12 +107,10 @@
108107
import org.eclipse.rdf4j.query.algebra.evaluation.ValueExprEvaluationException;
109108
import org.eclipse.rdf4j.query.algebra.evaluation.federation.FederatedService;
110109
import org.eclipse.rdf4j.query.algebra.evaluation.federation.ServiceJoinIterator;
111-
import org.eclipse.rdf4j.query.algebra.evaluation.impl.DefaultEvaluationStrategy;
112110
import org.eclipse.rdf4j.query.algebra.evaluation.impl.EvaluationStatistics;
113111
import org.eclipse.rdf4j.query.algebra.evaluation.impl.QueryEvaluationContext;
114112
import org.eclipse.rdf4j.query.algebra.evaluation.impl.StrictEvaluationStrategy;
115113
import org.eclipse.rdf4j.query.algebra.evaluation.iterator.BadlyDesignedLeftJoinIterator;
116-
import org.eclipse.rdf4j.query.algebra.evaluation.iterator.DescribeIteration;
117114
import org.eclipse.rdf4j.query.algebra.evaluation.iterator.HashJoinIteration;
118115
import org.eclipse.rdf4j.query.algebra.evaluation.optimizer.ConstantOptimizer;
119116
import org.eclipse.rdf4j.query.algebra.evaluation.optimizer.DisjunctiveConstraintOptimizer;
@@ -815,8 +812,14 @@ public QueryEvaluationStep prepareNaryUnion(NUnion union, QueryEvaluationContext
815812
/**
816813
* Execute the join in a separate thread using some join executor.
817814
*
818-
* Join executors are for instance: - {@link SynchronousJoin} - {@link SynchronousBoundJoin} -
819-
* {@link ControlledWorkerJoin} - {@link ControlledWorkerBoundJoin}
815+
* Join executors are for instance:
816+
*
817+
* <ul>
818+
* <li>{@link SynchronousJoin}</li>
819+
* <li>{@link SynchronousBoundJoin}</li>
820+
* <li>{@link ControlledWorkerJoin}</li>
821+
* <li>{@link ControlledWorkerBindJoin}</li>
822+
* </ul>
820823
*
821824
* For endpoint federation use controlled worker bound join, for local federation use controlled worker join. The
822825
* other operators are there for completeness.
@@ -923,7 +926,7 @@ public abstract CloseableIteration<BindingSet> evaluateGroupedCheck(
923926
/**
924927
* Evaluate a SERVICE using vectored evaluation, taking the provided bindings as input.
925928
*
926-
* See {@link ControlledWorkerBoundJoin} and {@link FedXConfig#getEnableServiceAsBoundJoin()}
929+
* See {@link ControlledWorkerBindJoin} and {@link FedXConfig#getEnableServiceAsBoundJoin()}
927930
*
928931
* @param service
929932
* @param bindings

tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/SparqlFederationEvalStrategy.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
1818
import org.eclipse.rdf4j.common.iteration.EmptyIteration;
1919
import org.eclipse.rdf4j.federated.FederationContext;
20+
import org.eclipse.rdf4j.federated.algebra.BoundJoinTupleExpr;
2021
import org.eclipse.rdf4j.federated.algebra.CheckStatementPattern;
2122
import org.eclipse.rdf4j.federated.algebra.ExclusiveGroup;
23+
import org.eclipse.rdf4j.federated.algebra.FedXService;
2224
import org.eclipse.rdf4j.federated.algebra.FilterTuple;
2325
import org.eclipse.rdf4j.federated.algebra.FilterValueExpr;
2426
import org.eclipse.rdf4j.federated.algebra.StatementTupleExpr;
@@ -29,7 +31,9 @@
2931
import org.eclipse.rdf4j.federated.evaluation.iterator.GroupedCheckConversionIteration;
3032
import org.eclipse.rdf4j.federated.evaluation.iterator.InsertBindingsIteration;
3133
import org.eclipse.rdf4j.federated.evaluation.iterator.SingleBindingSetIteration;
32-
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBoundJoin;
34+
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin;
35+
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerJoin;
36+
import org.eclipse.rdf4j.federated.evaluation.join.JoinExecutorBase;
3337
import org.eclipse.rdf4j.federated.exception.ExceptionUtil;
3438
import org.eclipse.rdf4j.federated.exception.IllegalQueryException;
3539
import org.eclipse.rdf4j.federated.structures.QueryInfo;
@@ -45,7 +49,7 @@
4549
* Implementation of a federation evaluation strategy which provides some special optimizations for SPARQL (remote)
4650
* endpoints. The most important optimization is to used prepared SPARQL Queries that are already created using Strings.
4751
* <p>
48-
* Joins are executed using {@link ControlledWorkerBoundJoin}.
52+
* Joins are executed using {@link ControlledWorkerBindJoin}.
4953
* </p>
5054
* <p>
5155
* This implementation uses the SPARQL 1.1 VALUES operator for the bound-join evaluation
@@ -173,8 +177,25 @@ public CloseableIteration<BindingSet> executeJoin(
173177
TupleExpr rightArg, Set<String> joinVars, BindingSet bindings, QueryInfo queryInfo)
174178
throws QueryEvaluationException {
175179

176-
ControlledWorkerBoundJoin join = new ControlledWorkerBoundJoin(joinScheduler, this, leftIter, rightArg,
177-
bindings, queryInfo);
180+
// determine if we can execute the expr as bind join
181+
boolean executeAsBindJoin = false;
182+
if (rightArg instanceof BoundJoinTupleExpr) {
183+
if (rightArg instanceof FedXService) {
184+
executeAsBindJoin = queryInfo.getFederationContext().getConfig().getEnableServiceAsBoundJoin();
185+
} else {
186+
executeAsBindJoin = true;
187+
}
188+
}
189+
190+
JoinExecutorBase<BindingSet> join;
191+
if (executeAsBindJoin) {
192+
join = new ControlledWorkerBindJoin(joinScheduler, this, leftIter, rightArg,
193+
bindings, queryInfo);
194+
} else {
195+
join = new ControlledWorkerJoin(joinScheduler, this, leftIter, rightArg, bindings,
196+
queryInfo);
197+
}
198+
178199
join.setJoinVars(joinVars);
179200
executor.execute(join);
180201
return join;

tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ControlledWorkerScheduler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import java.util.concurrent.TimeUnit;
1919

2020
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
21-
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBoundJoin;
21+
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin;
2222
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerJoin;
2323
import org.eclipse.rdf4j.federated.evaluation.union.ControlledWorkerUnion;
2424
import org.eclipse.rdf4j.federated.exception.ExceptionUtil;
@@ -34,7 +34,7 @@
3434
* @author Andreas Schwarte
3535
* @see ControlledWorkerUnion
3636
* @see ControlledWorkerJoin
37-
* @see ControlledWorkerBoundJoin
37+
* @see ControlledWorkerBindJoin
3838
*/
3939
public class ControlledWorkerScheduler<T> implements Scheduler<T>, TaskWrapperAware {
4040

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2019 Eclipse RDF4J contributors.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Distribution License v1.0
6+
* which accompanies this distribution, and is available at
7+
* http://www.eclipse.org/org/documents/edl-v10.php.
8+
*
9+
* SPDX-License-Identifier: BSD-3-Clause
10+
*******************************************************************************/
11+
package org.eclipse.rdf4j.federated.evaluation.join;
12+
13+
import java.util.List;
14+
15+
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
16+
import org.eclipse.rdf4j.federated.algebra.CheckStatementPattern;
17+
import org.eclipse.rdf4j.federated.algebra.FedXService;
18+
import org.eclipse.rdf4j.federated.algebra.StatementTupleExpr;
19+
import org.eclipse.rdf4j.federated.evaluation.FederationEvalStrategy;
20+
import org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler;
21+
import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelExecutor;
22+
import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelTask;
23+
import org.eclipse.rdf4j.federated.structures.QueryInfo;
24+
import org.eclipse.rdf4j.query.BindingSet;
25+
import org.eclipse.rdf4j.query.QueryEvaluationException;
26+
import org.eclipse.rdf4j.query.algebra.TupleExpr;
27+
28+
/**
29+
* Execution of a regular join as bind join.
30+
*
31+
* @author Andreas Schwarte
32+
* @see ControlledWorkerBindJoinBase
33+
*/
34+
public class ControlledWorkerBindJoin extends ControlledWorkerBindJoinBase {
35+
36+
public ControlledWorkerBindJoin(ControlledWorkerScheduler<BindingSet> scheduler, FederationEvalStrategy strategy,
37+
CloseableIteration<BindingSet> leftIter,
38+
TupleExpr rightArg, BindingSet bindings, QueryInfo queryInfo)
39+
throws QueryEvaluationException {
40+
super(scheduler, strategy, leftIter, rightArg, bindings, queryInfo);
41+
}
42+
43+
@Override
44+
protected TaskCreator determineTaskCreator(TupleExpr expr, BindingSet bs) {
45+
final TaskCreator taskCreator;
46+
if (expr instanceof StatementTupleExpr) {
47+
StatementTupleExpr stmt = (StatementTupleExpr) expr;
48+
if (stmt.hasFreeVarsFor(bs)) {
49+
taskCreator = new BoundJoinTaskCreator(strategy, stmt);
50+
} else {
51+
expr = new CheckStatementPattern(stmt, queryInfo);
52+
taskCreator = new CheckJoinTaskCreator(strategy, (CheckStatementPattern) expr);
53+
}
54+
} else if (expr instanceof FedXService) {
55+
taskCreator = new FedXServiceJoinTaskCreator(strategy, (FedXService) expr);
56+
} else {
57+
throw new RuntimeException("Expr is of unexpected type: " + expr.getClass().getCanonicalName()
58+
+ ". Please report this problem.");
59+
}
60+
return taskCreator;
61+
}
62+
63+
protected class BoundJoinTaskCreator implements TaskCreator {
64+
protected final FederationEvalStrategy _strategy;
65+
protected final StatementTupleExpr _expr;
66+
67+
public BoundJoinTaskCreator(
68+
FederationEvalStrategy strategy, StatementTupleExpr expr) {
69+
super();
70+
_strategy = strategy;
71+
_expr = expr;
72+
}
73+
74+
@Override
75+
public ParallelTask<BindingSet> getTask(ParallelExecutor<BindingSet> control, List<BindingSet> bindings) {
76+
return new ParallelBoundJoinTask(control, _strategy, _expr, bindings);
77+
}
78+
}
79+
80+
protected class CheckJoinTaskCreator implements TaskCreator {
81+
protected final FederationEvalStrategy _strategy;
82+
protected final CheckStatementPattern _expr;
83+
84+
public CheckJoinTaskCreator(
85+
FederationEvalStrategy strategy, CheckStatementPattern expr) {
86+
super();
87+
_strategy = strategy;
88+
_expr = expr;
89+
}
90+
91+
@Override
92+
public ParallelTask<BindingSet> getTask(ParallelExecutor<BindingSet> control, List<BindingSet> bindings) {
93+
return new ParallelCheckJoinTask(control, _strategy, _expr, bindings);
94+
}
95+
}
96+
97+
protected class FedXServiceJoinTaskCreator implements TaskCreator {
98+
protected final FederationEvalStrategy _strategy;
99+
protected final FedXService _expr;
100+
101+
public FedXServiceJoinTaskCreator(
102+
FederationEvalStrategy strategy, FedXService expr) {
103+
super();
104+
_strategy = strategy;
105+
_expr = expr;
106+
}
107+
108+
@Override
109+
public ParallelTask<BindingSet> getTask(ParallelExecutor<BindingSet> control, List<BindingSet> bindings) {
110+
return new ParallelServiceJoinTask(control, _strategy, _expr, bindings);
111+
}
112+
}
113+
114+
}

0 commit comments

Comments
 (0)