Skip to content

Commit f687e85

Browse files
authored
GH-5197: preparation for supporting fair sub-query execution in FedX (#5198)
2 parents 1339109 + 721b3b6 commit f687e85

5 files changed

Lines changed: 162 additions & 10 deletions

File tree

tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedX.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.eclipse.rdf4j.federated.endpoint.Endpoint;
2323
import org.eclipse.rdf4j.federated.endpoint.ResolvableEndpoint;
2424
import org.eclipse.rdf4j.federated.evaluation.FederationEvaluationStrategyFactory;
25+
import org.eclipse.rdf4j.federated.evaluation.concurrent.DefaultSchedulerFactory;
26+
import org.eclipse.rdf4j.federated.evaluation.concurrent.SchedulerFactory;
2527
import org.eclipse.rdf4j.federated.exception.ExceptionUtil;
2628
import org.eclipse.rdf4j.federated.exception.FedXException;
2729
import org.eclipse.rdf4j.federated.exception.FedXRuntimeException;
@@ -64,6 +66,8 @@ public class FedX extends AbstractSail implements RepositoryResolverClient {
6466

6567
private FederationEvaluationStrategyFactory strategyFactory;
6668

69+
private SchedulerFactory schedulerFactory = DefaultSchedulerFactory.INSTANCE;
70+
6771
private WriteStrategyFactory writeStrategyFactory;
6872

6973
private File dataDir;
@@ -96,6 +100,19 @@ public void setFederationEvaluationStrategy(FederationEvaluationStrategyFactory
96100
this.strategyFactory = strategyFactory;
97101
}
98102

103+
/* package */ SchedulerFactory getSchedulerFactory() {
104+
return schedulerFactory;
105+
}
106+
107+
/**
108+
* Set the {@link SchedulerFactory}. Can only be done before initialization of the federation
109+
*
110+
* @param schedulerFactory the {@link SchedulerFactory}
111+
*/
112+
public void setSchedulerFactory(SchedulerFactory schedulerFactory) {
113+
this.schedulerFactory = schedulerFactory;
114+
}
115+
99116
/**
100117
*
101118
* @param writeStrategyFactory the {@link WriteStrategyFactory}

tools/federation/src/main/java/org/eclipse/rdf4j/federated/FederationManager.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler;
2727
import org.eclipse.rdf4j.federated.evaluation.concurrent.NamingThreadFactory;
2828
import org.eclipse.rdf4j.federated.evaluation.concurrent.Scheduler;
29+
import org.eclipse.rdf4j.federated.evaluation.concurrent.SchedulerFactory;
2930
import org.eclipse.rdf4j.federated.evaluation.concurrent.TaskWrapper;
3031
import org.eclipse.rdf4j.federated.evaluation.union.ControlledWorkerUnion;
3132
import org.eclipse.rdf4j.federated.evaluation.union.SynchronousWorkerUnion;
@@ -118,26 +119,28 @@ public void reset() {
118119
log.debug("Scheduler for join and union are reset.");
119120
}
120121

122+
SchedulerFactory schedulerFactory = federation.getSchedulerFactory();
123+
121124
Optional<TaskWrapper> taskWrapper = federationContext.getConfig().getTaskWrapper();
122125
if (joinScheduler != null) {
123126
joinScheduler.abort();
124127
}
125-
joinScheduler = new ControlledWorkerScheduler<>(federationContext.getConfig().getJoinWorkerThreads(),
126-
"Join Scheduler");
128+
joinScheduler = schedulerFactory.createJoinScheduler(federationContext,
129+
federationContext.getConfig().getJoinWorkerThreads());
127130
taskWrapper.ifPresent(joinScheduler::setTaskWrapper);
128131

129132
if (unionScheduler != null) {
130133
unionScheduler.abort();
131134
}
132-
unionScheduler = new ControlledWorkerScheduler<>(federationContext.getConfig().getUnionWorkerThreads(),
133-
"Union Scheduler");
135+
unionScheduler = schedulerFactory.createUnionScheduler(federationContext,
136+
federationContext.getConfig().getUnionWorkerThreads());
134137
taskWrapper.ifPresent(unionScheduler::setTaskWrapper);
135138

136139
if (leftJoinScheduler != null) {
137140
leftJoinScheduler.abort();
138141
}
139-
leftJoinScheduler = new ControlledWorkerScheduler<>(federationContext.getConfig().getLeftJoinWorkerThreads(),
140-
"Left Join Scheduler");
142+
leftJoinScheduler = schedulerFactory.createLeftJoinScheduler(federationContext,
143+
federationContext.getConfig().getLeftJoinWorkerThreads());
141144
taskWrapper.ifPresent(leftJoinScheduler::setTaskWrapper);
142145

143146
}

tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ControlledWorkerScheduler.java

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,14 @@
1111
package org.eclipse.rdf4j.federated.evaluation.concurrent;
1212

1313
import java.util.List;
14+
import java.util.concurrent.BlockingQueue;
1415
import java.util.concurrent.ExecutorService;
1516
import java.util.concurrent.Future;
1617
import java.util.concurrent.LinkedBlockingQueue;
1718
import java.util.concurrent.ThreadPoolExecutor;
1819
import java.util.concurrent.TimeUnit;
1920

21+
import org.eclipse.rdf4j.common.annotation.Experimental;
2022
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
2123
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin;
2224
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerJoin;
@@ -42,7 +44,9 @@ public class ControlledWorkerScheduler<T> implements Scheduler<T>, TaskWrapperAw
4244

4345
private final ExecutorService executor;
4446

45-
private final LinkedBlockingQueue<Runnable> _taskQueue = new LinkedBlockingQueue<>();
47+
// TODO: in the next major version of RDF4J this final field should be removed.
48+
// Initialization of the executor service should managed the details
49+
private final BlockingQueue<Runnable> _taskQueue;
4650

4751
private final int nWorkers;
4852
private final String name;
@@ -57,7 +61,8 @@ public class ControlledWorkerScheduler<T> implements Scheduler<T>, TaskWrapperAw
5761
public ControlledWorkerScheduler(int nWorkers, String name) {
5862
this.nWorkers = nWorkers;
5963
this.name = name;
60-
this.executor = createExecutorService();
64+
this._taskQueue = createBlockingQueue();
65+
this.executor = createExecutorService(nWorkers, name);
6166
}
6267

6368
/**
@@ -112,13 +117,36 @@ public int getTotalNumberOfWorkers() {
112117
return nWorkers;
113118
}
114119

120+
@Deprecated(forRemoval = true, since = "5.1") // currently unused and this class is internal
115121
public int getNumberOfTasks() {
116122
return _taskQueue.size();
117123
}
118124

119-
private ExecutorService createExecutorService() {
125+
/**
126+
* Create the {@link BlockingQueue} used for the thread pool. The default implementation creates a
127+
* {@link LinkedBlockingQueue}.
128+
*
129+
* @return
130+
*/
131+
@Experimental
132+
protected BlockingQueue<Runnable> createBlockingQueue() {
133+
return new LinkedBlockingQueue<>();
134+
}
135+
136+
/**
137+
* Create the {@link ExecutorService} which is managing the individual {@link ParallelTask}s in a thread pool. The
138+
* default implementation creates a thread pool with a {@link LinkedBlockingQueue}.
139+
*
140+
* The thread pool should be configured to terminate idle threads after a period of time (default: 60s)
141+
*
142+
* @param nWorkers the number of workers in the thread pool
143+
* @param name the base name for threads in the pool
144+
* @return
145+
*/
146+
@Experimental
147+
protected ExecutorService createExecutorService(int nWorkers, String name) {
120148

121-
ThreadPoolExecutor executor = new ThreadPoolExecutor(nWorkers, nWorkers, 60L, TimeUnit.SECONDS, _taskQueue,
149+
ThreadPoolExecutor executor = new ThreadPoolExecutor(nWorkers, nWorkers, 60L, TimeUnit.SECONDS, this._taskQueue,
122150
new NamingThreadFactory(name));
123151
executor.allowCoreThreadTimeOut(true);
124152
return executor;
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2024 Eclipse RDF4J contributors.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Distribution License v1.0
6+
* which accompanies this distribution, and is available at
7+
* http://www.eclipse.org/org/documents/edl-v10.php.
8+
*
9+
* SPDX-License-Identifier: BSD-3-Clause
10+
*******************************************************************************/
11+
package org.eclipse.rdf4j.federated.evaluation.concurrent;
12+
13+
import org.eclipse.rdf4j.federated.FederationContext;
14+
import org.eclipse.rdf4j.query.BindingSet;
15+
16+
/**
17+
* The default {@link SchedulerFactory}
18+
*/
19+
public class DefaultSchedulerFactory implements SchedulerFactory {
20+
21+
public static final DefaultSchedulerFactory INSTANCE = new DefaultSchedulerFactory();
22+
23+
@Override
24+
public ControlledWorkerScheduler<BindingSet> createJoinScheduler(FederationContext federationContext,
25+
int nWorkers) {
26+
return new ControlledWorkerScheduler<>(nWorkers,
27+
"Join Scheduler");
28+
}
29+
30+
@Override
31+
public ControlledWorkerScheduler<BindingSet> createUnionScheduler(FederationContext federationContext,
32+
int nWorkers) {
33+
return new ControlledWorkerScheduler<>(nWorkers,
34+
"Union Scheduler");
35+
}
36+
37+
@Override
38+
public ControlledWorkerScheduler<BindingSet> createLeftJoinScheduler(FederationContext federationContext,
39+
int nWorkers) {
40+
return new ControlledWorkerScheduler<>(nWorkers,
41+
"Left Join Scheduler");
42+
}
43+
44+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2024 Eclipse RDF4J contributors.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Distribution License v1.0
6+
* which accompanies this distribution, and is available at
7+
* http://www.eclipse.org/org/documents/edl-v10.php.
8+
*
9+
* SPDX-License-Identifier: BSD-3-Clause
10+
*******************************************************************************/
11+
package org.eclipse.rdf4j.federated.evaluation.concurrent;
12+
13+
import org.eclipse.rdf4j.federated.FederationContext;
14+
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin;
15+
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindLeftJoin;
16+
import org.eclipse.rdf4j.federated.evaluation.join.ParallelBindLeftJoinTask;
17+
import org.eclipse.rdf4j.federated.evaluation.join.ParallelBoundJoinTask;
18+
import org.eclipse.rdf4j.query.BindingSet;
19+
20+
/**
21+
* Factory for creating {@link ControlledWorkerScheduler} for executing subqueries (e.g. joins) in the background
22+
*
23+
* @see DefaultSchedulerFactory
24+
* @author Andreas Schwarte
25+
*/
26+
public interface SchedulerFactory {
27+
28+
/**
29+
* Create a {@link ControlledWorkerScheduler} for regular joins (e.g., the sub-queries generated as part of bind
30+
* joins)
31+
*
32+
* @param federationContext
33+
* @param nWorkers
34+
* @return
35+
* @see ControlledWorkerBindJoin
36+
* @see ParallelBoundJoinTask
37+
*/
38+
ControlledWorkerScheduler<BindingSet> createJoinScheduler(FederationContext federationContext, int nWorkers);
39+
40+
/**
41+
* Create a {@link ControlledWorkerScheduler} for unions (e.g., for executing UNION operands in parallel)
42+
*
43+
* @param federationContext
44+
* @param nWorkers
45+
* @return
46+
*/
47+
ControlledWorkerScheduler<BindingSet> createUnionScheduler(FederationContext federationContext, int nWorkers);
48+
49+
/**
50+
* Create a {@link ControlledWorkerScheduler} for left joins (e.g., the sub-queries generated as part of left bind
51+
* joins, i.e. OPTIONAL)
52+
*
53+
* @param federationContext
54+
* @param nWorkers
55+
* @return
56+
* @see ControlledWorkerBindLeftJoin
57+
* @see ParallelBindLeftJoinTask
58+
*/
59+
ControlledWorkerScheduler<BindingSet> createLeftJoinScheduler(FederationContext federationContext, int nWorkers);
60+
}

0 commit comments

Comments
 (0)