Skip to content

Commit 6be83a2

Browse files
authored
GH-4983: fine-tune bind join implementation in FedX engine (#4984)
2 parents d41666b + 9653a02 commit 6be83a2

6 files changed

Lines changed: 80 additions & 45 deletions

File tree

site/content/documentation/programming/federation.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ FedX provides various means for configuration. Configuration settings can be def
303303
|joinWorkerThreads | The number of join worker threads for parallelization, default _20_ |
304304
|unionWorkerThreads | The number of union worker threads for parallelization, default _20_ |
305305
|leftJoinWorkerThreads | The number of left join worker threads for parallelization, default _10_ |
306-
|boundJoinBlockSize | Block size for bound joins, default _15_ |
306+
|boundJoinBlockSize | Block size for bound joins, default _25_ |
307307
|enforceMaxQueryTime | Max query time in seconds, 0 to disable, default _30_ |
308308
|enableServiceAsBoundJoin | Flag for evaluating a SERVICE expression (contacting non-federation members) using vectored evaluation, default _true_. For today's endpoints it is more efficient to disable vectored evaluation of SERVICE |
309309
|includeInferredDefault | whether include inferred statements should be considered, default _true_ |

tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedXConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public class FedXConfig {
4040

4141
private int leftJoinWorkerThreads = 10;
4242

43-
private int boundJoinBlockSize = 15;
43+
private int boundJoinBlockSize = 25;
4444

4545
private int enforceMaxQueryTime = 30;
4646

tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ControlledWorkerBoundJoin.java

Lines changed: 68 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434

3535
/**
3636
* Execute the nested loop join in an asynchronous fashion, using grouped requests, i.e. group bindings into one SPARQL
37-
* request using the UNION operator.
37+
* request using a VALUES clause.
3838
*
3939
* The number of concurrent threads is controlled by a {@link ControlledWorkerScheduler} which works according to the
4040
* FIFO principle and uses worker threads.
@@ -49,13 +49,22 @@ public class ControlledWorkerBoundJoin extends ControlledWorkerJoin {
4949

5050
private static final Logger log = LoggerFactory.getLogger(ControlledWorkerBoundJoin.class);
5151

52+
/**
53+
* Whether to submit the first intermediate result immediately in a non-bound way
54+
*/
55+
private boolean submitFirstResultImmediately = false;
56+
5257
public ControlledWorkerBoundJoin(ControlledWorkerScheduler<BindingSet> scheduler, FederationEvalStrategy strategy,
5358
CloseableIteration<BindingSet> leftIter,
5459
TupleExpr rightArg, BindingSet bindings, QueryInfo queryInfo)
5560
throws QueryEvaluationException {
5661
super(scheduler, strategy, leftIter, rightArg, bindings, queryInfo);
5762
}
5863

64+
protected void setSubmitFirstResultImmediately(boolean flag) {
65+
this.submitFirstResultImmediately = flag;
66+
}
67+
5968
@Override
6069
protected void handleBindings() throws Exception {
6170
if (!(canApplyVectoredEvaluation(rightArg))) {
@@ -73,27 +82,17 @@ protected void handleBindings() throws Exception {
7382
TaskCreator taskCreator = null;
7483
Phaser currentPhaser = phaser;
7584

76-
// first item is always sent in a non-bound way
77-
if (!isClosed() && leftIter.hasNext()) {
78-
BindingSet b = leftIter.next();
79-
totalBindings++;
80-
if (expr instanceof StatementTupleExpr) {
81-
StatementTupleExpr stmt = (StatementTupleExpr) expr;
82-
if (stmt.hasFreeVarsFor(b)) {
83-
taskCreator = new BoundJoinTaskCreator(strategy, stmt);
84-
} else {
85-
expr = new CheckStatementPattern(stmt, queryInfo);
86-
taskCreator = new CheckJoinTaskCreator(strategy, (CheckStatementPattern) expr);
87-
}
88-
} else if (expr instanceof FedXService) {
89-
taskCreator = new FedXServiceJoinTaskCreator(strategy, (FedXService) expr);
90-
} else {
91-
throw new RuntimeException("Expr is of unexpected type: " + expr.getClass().getCanonicalName()
92-
+ ". Please report this problem.");
85+
if (submitFirstResultImmediately) {
86+
// first item is always sent in a non-bound way
87+
if (!isClosed() && leftIter.hasNext()) {
88+
BindingSet b = leftIter.next();
89+
totalBindings++;
90+
taskCreator = determineTaskCreator(expr, b);
91+
currentPhaser.register();
92+
scheduler.schedule(
93+
new ParallelJoinTask(new PhaserHandlingParallelExecutor(this, currentPhaser), strategy, expr,
94+
b));
9395
}
94-
currentPhaser.register();
95-
scheduler.schedule(
96-
new ParallelJoinTask(new PhaserHandlingParallelExecutor(this, currentPhaser), strategy, expr, b));
9796
}
9897

9998
int nBindings;
@@ -106,27 +105,18 @@ protected void handleBindings() throws Exception {
106105
currentPhaser = new Phaser(currentPhaser);
107106
}
108107

109-
/*
110-
* XXX idea:
111-
*
112-
* make nBindings dependent on the number of intermediate results of the left argument.
113-
*
114-
* If many intermediate results, increase the number of bindings. This will result in less remote SPARQL
115-
* requests.
116-
*
117-
*/
118-
119-
if (totalBindings > 10) {
120-
nBindings = nBindingsCfg;
121-
} else {
122-
nBindings = 3;
123-
}
108+
// determine the bind join block size
109+
nBindings = getNextBindJoinSize(nBindingsCfg, totalBindings);
124110

125111
bindings = new ArrayList<>(nBindings);
126112

127113
int count = 0;
128114
while (!isClosed() && count < nBindings && leftIter.hasNext()) {
129-
bindings.add(leftIter.next());
115+
var bs = leftIter.next();
116+
if (taskCreator == null) {
117+
taskCreator = determineTaskCreator(expr, bs);
118+
}
119+
bindings.add(bs);
130120
count++;
131121
}
132122

@@ -157,6 +147,47 @@ public void handleClose() throws QueryEvaluationException {
157147
}
158148
}
159149

150+
protected TaskCreator determineTaskCreator(TupleExpr expr, BindingSet bs) {
151+
final TaskCreator taskCreator;
152+
if (expr instanceof StatementTupleExpr) {
153+
StatementTupleExpr stmt = (StatementTupleExpr) expr;
154+
if (stmt.hasFreeVarsFor(bs)) {
155+
taskCreator = new BoundJoinTaskCreator(strategy, stmt);
156+
} else {
157+
expr = new CheckStatementPattern(stmt, queryInfo);
158+
taskCreator = new CheckJoinTaskCreator(strategy, (CheckStatementPattern) expr);
159+
}
160+
} else if (expr instanceof FedXService) {
161+
taskCreator = new FedXServiceJoinTaskCreator(strategy, (FedXService) expr);
162+
} else {
163+
throw new RuntimeException("Expr is of unexpected type: " + expr.getClass().getCanonicalName()
164+
+ ". Please report this problem.");
165+
}
166+
return taskCreator;
167+
}
168+
169+
/**
170+
* Return the size of the next bind join block.
171+
*
172+
* @param configuredBindJoinSize the configured bind join size
173+
* @param totalBindings the current process bindings from the intermediate result set
174+
* @return
175+
*/
176+
protected int getNextBindJoinSize(int configuredBindJoinSize, int totalBindings) {
177+
178+
/*
179+
* XXX idea:
180+
*
181+
* make nBindings dependent on the number of intermediate results of the left argument.
182+
*
183+
* If many intermediate results, increase the number of bindings. This will result in less remote SPARQL
184+
* requests.
185+
*
186+
*/
187+
188+
return configuredBindJoinSize;
189+
}
190+
160191
/**
161192
* Returns true if the vectored evaluation can be applied for the join argument, i.e. there is no fallback to
162193
* {@link ControlledWorkerJoin#handleBindings()}. This is

tools/federation/src/test/java/org/eclipse/rdf4j/federated/BoundJoinTests.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,10 @@ public void testBoundJoin_FailingEndpoint() throws Exception {
3737
/* test a simple bound join */
3838
prepareTest(Arrays.asList("/tests/data/data1.ttl", "/tests/data/data2.ttl"));
3939

40-
repoSettings(2).setFailAfter(5);
40+
// specifically reduce the bind join size for this test to run into "fail After"
41+
fedxRule.getFederationContext().getConfig().withBoundJoinBlockSize(2);
42+
43+
repoSettings(2).setFailAfter(5); // fail after 5 remote requests on repo 2
4144
Assertions.assertThrows(QueryEvaluationException.class, () -> {
4245
execute("/tests/boundjoin/query01.rq", "/tests/boundjoin/query01.srx", false, true);
4346
});

tools/federation/src/test/java/org/eclipse/rdf4j/federated/ServiceTests.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,8 @@ protected FederatedService createService(String serviceUrl) throws QueryEvaluati
241241
repo.setFederatedServiceResolver(serviceResolver);
242242
repo.init();
243243

244+
fedxRule.getFederationContext().getConfig().withBoundJoinBlockSize(5);
245+
244246
/*
245247
* test select query retrieving all persons from endpoint 1 (SERVICE), endpoint not part of federation =>
246248
* evaluate using externally provided service resolver endpoint1 is reachable as
@@ -271,12 +273,11 @@ protected FederatedService createService(String serviceUrl) throws QueryEvaluati
271273
Assertions.assertEquals(expected, res.stream().map(b -> b.getValue("output")).collect(Collectors.toSet()));
272274
}
273275

274-
// first binding is evaluated using regular service, then we have groups of 4 groups of three bindings and 3
275-
// groups with 15
276+
// all requests are executed in bind-join with constant size
277+
// for this test bind join size is set to 5, hence we see 10 bind join requests
276278
TestSparqlFederatedService tfs = ((TestSparqlFederatedService) serviceResolver
277279
.getService("http://localhost:18080/repositories/endpoint1"));
278-
Assertions.assertEquals(1, tfs.serviceRequestCount.get());
279-
Assertions.assertEquals(7, tfs.boundJoinRequestCount.get());
280+
Assertions.assertEquals(10, tfs.boundJoinRequestCount.get());
280281
}
281282

282283
@Test

tools/federation/src/test/java/org/eclipse/rdf4j/federated/repository/FedXRepositoryConfigTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ public void testExportWithEmptyConfig() throws Exception {
262262
assertThat(
263263
Models.objectLiteral(
264264
export.getStatements(configNode, FedXRepositoryConfig.CONFIG_BOUND_JOIN_BLOCK_SIZE, null)))
265-
.hasValueSatisfying(v -> assertThat(v.intValue()).isEqualTo(15));
265+
.hasValueSatisfying(v -> assertThat(v.intValue()).isEqualTo(25));
266266
assertThat(
267267
Models.objectLiteral(
268268
export.getStatements(configNode, FedXRepositoryConfig.CONFIG_ENFORCE_MAX_QUERY_TIME, null)))

0 commit comments

Comments
 (0)