Skip to content

Commit 92e89e3

Browse files
committed
GH-5718: merge FedX FederationEvalStrategy variants
There is no need to maintain different federation evaluation strategy implementations as both are very close in their implementation. We now use the implementation logic of the previous SPARQL evaluation strategy as default and incorporate it right away inside FederationEvalStrategy.
1 parent 867566f commit 92e89e3

10 files changed

Lines changed: 150 additions & 392 deletions

File tree

tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedXConfig.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212

1313
import java.util.Optional;
1414

15-
import org.eclipse.rdf4j.collection.factory.api.CollectionFactory;
1615
import org.eclipse.rdf4j.federated.cache.SourceSelectionCache;
1716
import org.eclipse.rdf4j.federated.cache.SourceSelectionCacheFactory;
1817
import org.eclipse.rdf4j.federated.cache.SourceSelectionMemoryCache;

tools/federation/src/main/java/org/eclipse/rdf4j/federated/FederationManager.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
import org.eclipse.rdf4j.federated.endpoint.Endpoint;
2222
import org.eclipse.rdf4j.federated.endpoint.EndpointClassification;
2323
import org.eclipse.rdf4j.federated.evaluation.FederationEvaluationStrategyFactory;
24-
import org.eclipse.rdf4j.federated.evaluation.SailFederationEvalStrategy;
25-
import org.eclipse.rdf4j.federated.evaluation.SparqlFederationEvalStrategy;
2624
import org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler;
2725
import org.eclipse.rdf4j.federated.evaluation.concurrent.NamingThreadFactory;
2826
import org.eclipse.rdf4j.federated.evaluation.concurrent.Scheduler;
@@ -107,7 +105,6 @@ public void init(FedX federation, FederationContext federationContext) {
107105
FederationEvaluationStrategyFactory strategyFactory = federation.getFederationEvaluationStrategyFactory();
108106
strategyFactory.setFederationType(federationType);
109107
strategyFactory.setFederationContext(federationContext);
110-
strategyFactory.setCollectionFactory(federation.getCollectionFactory());
111108
return strategyFactory;
112109
}
113110

tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/FederationEvalStrategy.java

Lines changed: 145 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.eclipse.rdf4j.federated.FedX;
2626
import org.eclipse.rdf4j.federated.FedXConfig;
2727
import org.eclipse.rdf4j.federated.FederationContext;
28-
import org.eclipse.rdf4j.federated.algebra.CheckStatementPattern;
28+
import org.eclipse.rdf4j.federated.algebra.BoundJoinTupleExpr;
2929
import org.eclipse.rdf4j.federated.algebra.ConjunctiveFilterExpr;
3030
import org.eclipse.rdf4j.federated.algebra.EmptyResult;
3131
import org.eclipse.rdf4j.federated.algebra.EmptyStatementPattern;
@@ -55,12 +55,17 @@
5555
import org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler;
5656
import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelServiceExecutor;
5757
import org.eclipse.rdf4j.federated.evaluation.iterator.BindLeftJoinIteration;
58+
import org.eclipse.rdf4j.federated.evaluation.iterator.BoundJoinVALUESConversionIteration;
5859
import org.eclipse.rdf4j.federated.evaluation.iterator.FedXPathIteration;
5960
import org.eclipse.rdf4j.federated.evaluation.iterator.FederatedDescribeIteration;
6061
import org.eclipse.rdf4j.federated.evaluation.iterator.FilteringIteration;
62+
import org.eclipse.rdf4j.federated.evaluation.iterator.InsertBindingsIteration;
6163
import org.eclipse.rdf4j.federated.evaluation.iterator.SingleBindingSetIteration;
6264
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin;
65+
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindLeftJoin;
6366
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerJoin;
67+
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerLeftJoin;
68+
import org.eclipse.rdf4j.federated.evaluation.join.JoinExecutorBase;
6469
import org.eclipse.rdf4j.federated.evaluation.union.ControlledWorkerUnion;
6570
import org.eclipse.rdf4j.federated.evaluation.union.ParallelGetStatementsTask;
6671
import org.eclipse.rdf4j.federated.evaluation.union.ParallelPreparedAlgebraUnionTask;
@@ -131,15 +136,20 @@
131136
import org.slf4j.LoggerFactory;
132137

133138
/**
134-
* Base class for the Evaluation strategies.
139+
* Implementation of a federation evaluation strategy which provides some special optimizations for SPARQL (remote)
140+
* endpoints. The most important optimization is to used prepared SPARQL Queries that are already created using Strings.
141+
* <p>
142+
* Joins are executed using {@link ControlledWorkerBindJoin}.
143+
* </p>
144+
* <p>
145+
* This implementation uses the SPARQL 1.1 VALUES operator for the bound-join evaluation
146+
* </p>
147+
* s
135148
*
136149
* @author Andreas Schwarte
137150
*
138-
* @see SailFederationEvalStrategy
139-
* @see SparqlFederationEvalStrategy
140-
*
141151
*/
142-
public abstract class FederationEvalStrategy extends StrictEvaluationStrategy {
152+
public class FederationEvalStrategy extends StrictEvaluationStrategy {
143153

144154
private static final Logger log = LoggerFactory.getLogger(FederationEvalStrategy.class);
145155

@@ -904,10 +914,35 @@ public QueryEvaluationStep prepareNaryUnion(NUnion union, QueryEvaluationContext
904914
* @return the result
905915
* @throws QueryEvaluationException
906916
*/
907-
protected abstract CloseableIteration<BindingSet> executeJoin(
917+
public CloseableIteration<BindingSet> executeJoin(
908918
ControlledWorkerScheduler<BindingSet> joinScheduler,
909-
CloseableIteration<BindingSet> leftIter, TupleExpr rightArg,
910-
Set<String> joinVariables, BindingSet bindings, QueryInfo queryInfo) throws QueryEvaluationException;
919+
CloseableIteration<BindingSet> leftIter,
920+
TupleExpr rightArg, Set<String> joinVars, BindingSet bindings, QueryInfo queryInfo)
921+
throws QueryEvaluationException {
922+
923+
// determine if we can execute the expr as bind join
924+
boolean executeAsBindJoin = false;
925+
if (rightArg instanceof BoundJoinTupleExpr) {
926+
if (rightArg instanceof FedXService) {
927+
executeAsBindJoin = queryInfo.getFederationContext().getConfig().getEnableServiceAsBoundJoin();
928+
} else {
929+
executeAsBindJoin = true;
930+
}
931+
}
932+
933+
JoinExecutorBase<BindingSet> join;
934+
if (executeAsBindJoin) {
935+
join = new ControlledWorkerBindJoin(joinScheduler, this, leftIter, rightArg,
936+
bindings, queryInfo);
937+
} else {
938+
join = new ControlledWorkerJoin(joinScheduler, this, leftIter, rightArg, bindings,
939+
queryInfo);
940+
}
941+
942+
join.setJoinVars(joinVars);
943+
executor.execute(join);
944+
return join;
945+
}
911946

912947
/**
913948
* Execute the left join in a separate thread using some join executor.
@@ -919,14 +954,63 @@ protected abstract CloseableIteration<BindingSet> executeJoin(
919954
* @return the result
920955
* @throws QueryEvaluationException
921956
*/
922-
protected abstract CloseableIteration<BindingSet> executeLeftJoin(
923-
ControlledWorkerScheduler<BindingSet> joinScheduler,
924-
CloseableIteration<BindingSet> leftIter, LeftJoin leftJoin,
925-
BindingSet bindings, QueryInfo queryInfo) throws QueryEvaluationException;
957+
protected CloseableIteration<BindingSet> executeLeftJoin(ControlledWorkerScheduler<BindingSet> joinScheduler,
958+
CloseableIteration<BindingSet> leftIter, LeftJoin leftJoin, BindingSet bindings, QueryInfo queryInfo)
959+
throws QueryEvaluationException {
960+
961+
var rightArg = leftJoin.getRightArg();
962+
var fedxConfig = queryInfo.getFederationContext().getConfig();
963+
964+
// determine if we can execute the expr as bind join
965+
boolean executeAsBindJoin = false;
966+
if (fedxConfig.isEnableOptionalAsBindJoin() && rightArg instanceof BoundJoinTupleExpr) {
967+
if (rightArg instanceof FedXService) {
968+
executeAsBindJoin = false;
969+
} else {
970+
executeAsBindJoin = true;
971+
}
972+
}
926973

927-
public abstract CloseableIteration<BindingSet> evaluateExclusiveGroup(
928-
ExclusiveGroup group, BindingSet bindings)
929-
throws RepositoryException, MalformedQueryException, QueryEvaluationException;
974+
JoinExecutorBase<BindingSet> join;
975+
if (executeAsBindJoin) {
976+
join = new ControlledWorkerBindLeftJoin(joinScheduler, this, leftIter, rightArg,
977+
bindings, queryInfo);
978+
} else {
979+
join = new ControlledWorkerLeftJoin(joinScheduler, this,
980+
leftIter, leftJoin, bindings, queryInfo);
981+
}
982+
983+
executor.execute(join);
984+
return join;
985+
}
986+
987+
public CloseableIteration<BindingSet> evaluateExclusiveGroup(
988+
ExclusiveGroup group, BindingSet bindings) throws RepositoryException,
989+
MalformedQueryException, QueryEvaluationException {
990+
991+
TripleSource tripleSource = group.getOwnedEndpoint().getTripleSource();
992+
AtomicBoolean isEvaluated = new AtomicBoolean(false);
993+
994+
try {
995+
String preparedQuery = QueryStringUtil.selectQueryString(group, bindings, group.getFilterExpr(),
996+
isEvaluated, group.getQueryInfo().getDataset());
997+
return tripleSource.getStatements(preparedQuery, bindings,
998+
(isEvaluated.get() ? null : group.getFilterExpr()), group.getQueryInfo());
999+
} catch (IllegalQueryException e) {
1000+
/* no projection vars, e.g. local vars only, can occur in joins */
1001+
if (tripleSource.hasStatements(group, bindings)) {
1002+
CloseableIteration<BindingSet> res = new SingleBindingSetIteration(bindings);
1003+
if (group.getBoundFilters() != null) {
1004+
// make sure to insert any values from FILTER expressions that are directly
1005+
// bound in this expression
1006+
res = new InsertBindingsIteration(res, group.getBoundFilters());
1007+
}
1008+
return res;
1009+
}
1010+
return new EmptyIteration<>();
1011+
}
1012+
1013+
}
9301014

9311015
/**
9321016
* Evaluate an {@link ExclusiveTupleExpr}. The default implementation converts the given expression to a SELECT
@@ -993,8 +1077,51 @@ protected QueryEvaluationStep prepareExclusiveTupleExpr(
9931077
* @return the result iteration
9941078
* @throws QueryEvaluationException
9951079
*/
996-
public abstract CloseableIteration<BindingSet> evaluateBoundJoinStatementPattern(
997-
StatementTupleExpr stmt, final List<BindingSet> bindings) throws QueryEvaluationException;
1080+
public CloseableIteration<BindingSet> evaluateBoundJoinStatementPattern(
1081+
StatementTupleExpr stmt, List<BindingSet> bindings)
1082+
throws QueryEvaluationException {
1083+
1084+
// we can omit the bound join handling
1085+
if (bindings.size() == 1) {
1086+
return evaluate(stmt, bindings.get(0));
1087+
}
1088+
1089+
FilterValueExpr filterExpr = null;
1090+
if (stmt instanceof FilterTuple) {
1091+
filterExpr = ((FilterTuple) stmt).getFilterExpr();
1092+
}
1093+
1094+
AtomicBoolean isEvaluated = new AtomicBoolean(false);
1095+
String preparedQuery = QueryStringUtil.selectQueryStringBoundJoinVALUES((StatementPattern) stmt, bindings,
1096+
filterExpr, isEvaluated, stmt.getQueryInfo().getDataset());
1097+
1098+
CloseableIteration<BindingSet> result = null;
1099+
try {
1100+
result = evaluateAtStatementSources(preparedQuery, stmt.getStatementSources(), stmt.getQueryInfo());
1101+
1102+
// apply filter and/or convert to original bindings
1103+
if (filterExpr != null && !isEvaluated.get()) {
1104+
result = new BoundJoinVALUESConversionIteration(result, bindings); // apply conversion
1105+
result = new FilteringIteration(filterExpr, result, this); // apply filter
1106+
if (!result.hasNext()) {
1107+
result.close();
1108+
return new EmptyIteration<>();
1109+
}
1110+
} else {
1111+
result = new BoundJoinVALUESConversionIteration(result, bindings);
1112+
}
1113+
1114+
return result;
1115+
} catch (Throwable t) {
1116+
if (result != null) {
1117+
result.close();
1118+
}
1119+
if (t instanceof InterruptedException) {
1120+
Thread.currentThread().interrupt();
1121+
}
1122+
throw ExceptionUtil.toQueryEvaluationException(t);
1123+
}
1124+
}
9981125

9991126
/**
10001127
* Evaluate the left bind join for the given {@link StatementTupleExpr} and bindings at the relevant endpoints.

tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/FederationEvaluationStrategyFactory.java

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -70,19 +70,6 @@ public FederationEvalStrategy createEvaluationStrategy(Dataset dataset, TripleSo
7070
// Note: currently dataset, triplesource and statistics are explicitly ignored
7171
// in the federation
7272

73-
switch (federationType) {
74-
case LOCAL: {
75-
SailFederationEvalStrategy evalStrategy = new SailFederationEvalStrategy(federationContext);
76-
evalStrategy.setCollectionFactory(collectionFactorySupplier);
77-
return evalStrategy;
78-
}
79-
case REMOTE:
80-
case HYBRID:
81-
default: {
82-
SparqlFederationEvalStrategy evalStrategy = new SparqlFederationEvalStrategy(federationContext);
83-
evalStrategy.setCollectionFactory(collectionFactorySupplier);
84-
return evalStrategy;
85-
}
86-
}
73+
return new FederationEvalStrategy(federationContext);
8774
}
8875
}

0 commit comments

Comments
 (0)