Skip to content

Commit 9653a02

Browse files
committed
GH-4983: fine-tune bind join implementation in FedX engine
This change applies fine-tuning to the bind join implementation on certain parameters. Note that the original implementations and benchmarks were done when there was no SPARQL 1.1 around, and FedX was using UNION constructs for the bind join implementation. Since a few year back it is using VALUES clauses. Changes: - default bind join size is set to 25 (also reflected in documentation, some unit tests require adjustments) - no longer send small hard-coded sub-queries of size 3 for the first requests and instead rely on configured bind join size - no longer send the first request in a non-bound way by default (Note: kept as internal configuration parameter in the implementation to allow modifications in extensions) - support to influence the bind join size in customizations
1 parent a7a3171 commit 9653a02

6 files changed

Lines changed: 80 additions & 45 deletions

File tree

site/content/documentation/programming/federation.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ FedX provides various means for configuration. Configuration settings can be def
303303
|joinWorkerThreads | The number of join worker threads for parallelization, default _20_ |
304304
|unionWorkerThreads | The number of union worker threads for parallelization, default _20_ |
305305
|leftJoinWorkerThreads | The number of left join worker threads for parallelization, default _10_ |
306-
|boundJoinBlockSize | Block size for bound joins, default _15_ |
306+
|boundJoinBlockSize | Block size for bound joins, default _25_ |
307307
|enforceMaxQueryTime | Max query time in seconds, 0 to disable, default _30_ |
308308
|enableServiceAsBoundJoin | Flag for evaluating a SERVICE expression (contacting non-federation members) using vectored evaluation, default _true_. For today's endpoints it is more efficient to disable vectored evaluation of SERVICE |
309309
|includeInferredDefault | whether include inferred statements should be considered, default _true_ |

tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedXConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public class FedXConfig {
4040

4141
private int leftJoinWorkerThreads = 10;
4242

43-
private int boundJoinBlockSize = 15;
43+
private int boundJoinBlockSize = 25;
4444

4545
private int enforceMaxQueryTime = 30;
4646

tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ControlledWorkerBoundJoin.java

Lines changed: 68 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434

3535
/**
3636
* Execute the nested loop join in an asynchronous fashion, using grouped requests, i.e. group bindings into one SPARQL
37-
* request using the UNION operator.
37+
* request using a VALUES clause.
3838
*
3939
* The number of concurrent threads is controlled by a {@link ControlledWorkerScheduler} which works according to the
4040
* FIFO principle and uses worker threads.
@@ -49,13 +49,22 @@ public class ControlledWorkerBoundJoin extends ControlledWorkerJoin {
4949

5050
private static final Logger log = LoggerFactory.getLogger(ControlledWorkerBoundJoin.class);
5151

52+
/**
53+
* Whether to submit the first intermediate result immediately in a non-bound way
54+
*/
55+
private boolean submitFirstResultImmediately = false;
56+
5257
public ControlledWorkerBoundJoin(ControlledWorkerScheduler<BindingSet> scheduler, FederationEvalStrategy strategy,
5358
CloseableIteration<BindingSet> leftIter,
5459
TupleExpr rightArg, BindingSet bindings, QueryInfo queryInfo)
5560
throws QueryEvaluationException {
5661
super(scheduler, strategy, leftIter, rightArg, bindings, queryInfo);
5762
}
5863

64+
protected void setSubmitFirstResultImmediately(boolean flag) {
65+
this.submitFirstResultImmediately = flag;
66+
}
67+
5968
@Override
6069
protected void handleBindings() throws Exception {
6170
if (!(canApplyVectoredEvaluation(rightArg))) {
@@ -73,27 +82,17 @@ protected void handleBindings() throws Exception {
7382
TaskCreator taskCreator = null;
7483
Phaser currentPhaser = phaser;
7584

76-
// first item is always sent in a non-bound way
77-
if (!isClosed() && leftIter.hasNext()) {
78-
BindingSet b = leftIter.next();
79-
totalBindings++;
80-
if (expr instanceof StatementTupleExpr) {
81-
StatementTupleExpr stmt = (StatementTupleExpr) expr;
82-
if (stmt.hasFreeVarsFor(b)) {
83-
taskCreator = new BoundJoinTaskCreator(strategy, stmt);
84-
} else {
85-
expr = new CheckStatementPattern(stmt, queryInfo);
86-
taskCreator = new CheckJoinTaskCreator(strategy, (CheckStatementPattern) expr);
87-
}
88-
} else if (expr instanceof FedXService) {
89-
taskCreator = new FedXServiceJoinTaskCreator(strategy, (FedXService) expr);
90-
} else {
91-
throw new RuntimeException("Expr is of unexpected type: " + expr.getClass().getCanonicalName()
92-
+ ". Please report this problem.");
85+
if (submitFirstResultImmediately) {
86+
// first item is always sent in a non-bound way
87+
if (!isClosed() && leftIter.hasNext()) {
88+
BindingSet b = leftIter.next();
89+
totalBindings++;
90+
taskCreator = determineTaskCreator(expr, b);
91+
currentPhaser.register();
92+
scheduler.schedule(
93+
new ParallelJoinTask(new PhaserHandlingParallelExecutor(this, currentPhaser), strategy, expr,
94+
b));
9395
}
94-
currentPhaser.register();
95-
scheduler.schedule(
96-
new ParallelJoinTask(new PhaserHandlingParallelExecutor(this, currentPhaser), strategy, expr, b));
9796
}
9897

9998
int nBindings;
@@ -106,27 +105,18 @@ protected void handleBindings() throws Exception {
106105
currentPhaser = new Phaser(currentPhaser);
107106
}
108107

109-
/*
110-
* XXX idea:
111-
*
112-
* make nBindings dependent on the number of intermediate results of the left argument.
113-
*
114-
* If many intermediate results, increase the number of bindings. This will result in less remote SPARQL
115-
* requests.
116-
*
117-
*/
118-
119-
if (totalBindings > 10) {
120-
nBindings = nBindingsCfg;
121-
} else {
122-
nBindings = 3;
123-
}
108+
// determine the bind join block size
109+
nBindings = getNextBindJoinSize(nBindingsCfg, totalBindings);
124110

125111
bindings = new ArrayList<>(nBindings);
126112

127113
int count = 0;
128114
while (!isClosed() && count < nBindings && leftIter.hasNext()) {
129-
bindings.add(leftIter.next());
115+
var bs = leftIter.next();
116+
if (taskCreator == null) {
117+
taskCreator = determineTaskCreator(expr, bs);
118+
}
119+
bindings.add(bs);
130120
count++;
131121
}
132122

@@ -157,6 +147,47 @@ public void handleClose() throws QueryEvaluationException {
157147
}
158148
}
159149

150+
protected TaskCreator determineTaskCreator(TupleExpr expr, BindingSet bs) {
151+
final TaskCreator taskCreator;
152+
if (expr instanceof StatementTupleExpr) {
153+
StatementTupleExpr stmt = (StatementTupleExpr) expr;
154+
if (stmt.hasFreeVarsFor(bs)) {
155+
taskCreator = new BoundJoinTaskCreator(strategy, stmt);
156+
} else {
157+
expr = new CheckStatementPattern(stmt, queryInfo);
158+
taskCreator = new CheckJoinTaskCreator(strategy, (CheckStatementPattern) expr);
159+
}
160+
} else if (expr instanceof FedXService) {
161+
taskCreator = new FedXServiceJoinTaskCreator(strategy, (FedXService) expr);
162+
} else {
163+
throw new RuntimeException("Expr is of unexpected type: " + expr.getClass().getCanonicalName()
164+
+ ". Please report this problem.");
165+
}
166+
return taskCreator;
167+
}
168+
169+
/**
170+
* Return the size of the next bind join block.
171+
*
172+
* @param configuredBindJoinSize the configured bind join size
173+
* @param totalBindings the current process bindings from the intermediate result set
174+
* @return
175+
*/
176+
protected int getNextBindJoinSize(int configuredBindJoinSize, int totalBindings) {
177+
178+
/*
179+
* XXX idea:
180+
*
181+
* make nBindings dependent on the number of intermediate results of the left argument.
182+
*
183+
* If many intermediate results, increase the number of bindings. This will result in less remote SPARQL
184+
* requests.
185+
*
186+
*/
187+
188+
return configuredBindJoinSize;
189+
}
190+
160191
/**
161192
* Returns true if the vectored evaluation can be applied for the join argument, i.e. there is no fallback to
162193
* {@link ControlledWorkerJoin#handleBindings()}. This is

tools/federation/src/test/java/org/eclipse/rdf4j/federated/BoundJoinTests.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,10 @@ public void testBoundJoin_FailingEndpoint() throws Exception {
3737
/* test a simple bound join */
3838
prepareTest(Arrays.asList("/tests/data/data1.ttl", "/tests/data/data2.ttl"));
3939

40-
repoSettings(2).setFailAfter(5);
40+
// specifically reduce the bind join size for this test to run into "fail After"
41+
fedxRule.getFederationContext().getConfig().withBoundJoinBlockSize(2);
42+
43+
repoSettings(2).setFailAfter(5); // fail after 5 remote requests on repo 2
4144
Assertions.assertThrows(QueryEvaluationException.class, () -> {
4245
execute("/tests/boundjoin/query01.rq", "/tests/boundjoin/query01.srx", false, true);
4346
});

tools/federation/src/test/java/org/eclipse/rdf4j/federated/ServiceTests.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,8 @@ protected FederatedService createService(String serviceUrl) throws QueryEvaluati
241241
repo.setFederatedServiceResolver(serviceResolver);
242242
repo.init();
243243

244+
fedxRule.getFederationContext().getConfig().withBoundJoinBlockSize(5);
245+
244246
/*
245247
* test select query retrieving all persons from endpoint 1 (SERVICE), endpoint not part of federation =>
246248
* evaluate using externally provided service resolver endpoint1 is reachable as
@@ -271,12 +273,11 @@ protected FederatedService createService(String serviceUrl) throws QueryEvaluati
271273
Assertions.assertEquals(expected, res.stream().map(b -> b.getValue("output")).collect(Collectors.toSet()));
272274
}
273275

274-
// first binding is evaluated using regular service, then we have groups of 4 groups of three bindings and 3
275-
// groups with 15
276+
// all requests are executed in bind-join with constant size
277+
// for this test bind join size is set to 5, hence we see 10 bind join requests
276278
TestSparqlFederatedService tfs = ((TestSparqlFederatedService) serviceResolver
277279
.getService("http://localhost:18080/repositories/endpoint1"));
278-
Assertions.assertEquals(1, tfs.serviceRequestCount.get());
279-
Assertions.assertEquals(7, tfs.boundJoinRequestCount.get());
280+
Assertions.assertEquals(10, tfs.boundJoinRequestCount.get());
280281
}
281282

282283
@Test

tools/federation/src/test/java/org/eclipse/rdf4j/federated/repository/FedXRepositoryConfigTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ public void testExportWithEmptyConfig() throws Exception {
262262
assertThat(
263263
Models.objectLiteral(
264264
export.getStatements(configNode, FedXRepositoryConfig.CONFIG_BOUND_JOIN_BLOCK_SIZE, null)))
265-
.hasValueSatisfying(v -> assertThat(v.intValue()).isEqualTo(15));
265+
.hasValueSatisfying(v -> assertThat(v.intValue()).isEqualTo(25));
266266
assertThat(
267267
Models.objectLiteral(
268268
export.getStatements(configNode, FedXRepositoryConfig.CONFIG_ENFORCE_MAX_QUERY_TIME, null)))

0 commit comments

Comments
 (0)