Skip to content

Commit ddcadc0

Browse files
committed
GH-5197: javadoc refinements + smaller initialization changes
- for minor version compatibility the type of the "_taskQueue" field in the scheduler cannot be changed (to non-final). Hence, for now we use a dedicated protected initialization method. In the future (next major release) the idea is to leave the queue entirely managed by the executor service. - refinements and clarifications to the javadoc
1 parent d74e6be commit ddcadc0

3 files changed

Lines changed: 31 additions & 13 deletions

File tree

tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedX.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public class FedX extends AbstractSail implements RepositoryResolverClient {
6666

6767
private FederationEvaluationStrategyFactory strategyFactory;
6868

69-
private SchedulerFactory schedulerFactory;
69+
private SchedulerFactory schedulerFactory = DefaultSchedulerFactory.INSTANCE;
7070

7171
private WriteStrategyFactory writeStrategyFactory;
7272

@@ -101,9 +101,6 @@ public void setFederationEvaluationStrategy(FederationEvaluationStrategyFactory
101101
}
102102

103103
/* package */ SchedulerFactory getSchedulerFactory() {
104-
if (schedulerFactory == null) {
105-
schedulerFactory = DefaultSchedulerFactory.INSTANCE;
106-
}
107104
return schedulerFactory;
108105
}
109106

tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ControlledWorkerScheduler.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,9 @@ public class ControlledWorkerScheduler<T> implements Scheduler<T>, TaskWrapperAw
4343

4444
private final ExecutorService executor;
4545

46-
// Note: initialized in #createExecutorService
47-
protected BlockingQueue<Runnable> _taskQueue;
46+
// TODO: in the next major version of RDF4J this final field should be removed.
47+
// Initialization of the executor service should managed the details
48+
private final BlockingQueue<Runnable> _taskQueue;
4849

4950
private final int nWorkers;
5051
private final String name;
@@ -59,6 +60,7 @@ public class ControlledWorkerScheduler<T> implements Scheduler<T>, TaskWrapperAw
5960
public ControlledWorkerScheduler(int nWorkers, String name) {
6061
this.nWorkers = nWorkers;
6162
this.name = name;
63+
this._taskQueue = createBlockingQueue();
6264
this.executor = createExecutorService(nWorkers, name);
6365
}
6466

@@ -114,24 +116,33 @@ public int getTotalNumberOfWorkers() {
114116
return nWorkers;
115117
}
116118

117-
@Deprecated(forRemoval = true) // currently unused and this class is internal
119+
@Deprecated(forRemoval = true, since = "5.1") // currently unused and this class is internal
118120
public int getNumberOfTasks() {
119121
return _taskQueue.size();
120122
}
121123

124+
/**
125+
* Create the {@link BlockingQueue} used for the thread pool. The default implementation creates a
126+
* {@link LinkedBlockingQueue}.
127+
*
128+
* @return
129+
*/
130+
protected BlockingQueue<Runnable> createBlockingQueue() {
131+
return new LinkedBlockingQueue<>();
132+
}
133+
122134
/**
123135
* Create the {@link ExecutorService} which is managing the individual {@link ParallelTask}s in a thread pool. The
124136
* default implementation creates a thread pool with a {@link LinkedBlockingQueue}.
125137
*
138+
* The thread pool should be configured to terminate idle threads after a period of time (default: 60s)
139+
*
126140
* @param nWorkers the number of workers in the thread pool
127141
* @param name the base name for threads in the pool
128142
* @return
129143
*/
130144
protected ExecutorService createExecutorService(int nWorkers, String name) {
131145

132-
// use a LinkedBlockingQueue by default
133-
this._taskQueue = new LinkedBlockingQueue<>();
134-
135146
ThreadPoolExecutor executor = new ThreadPoolExecutor(nWorkers, nWorkers, 60L, TimeUnit.SECONDS, this._taskQueue,
136147
new NamingThreadFactory(name));
137148
executor.allowCoreThreadTimeOut(true);

tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/SchedulerFactory.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@
1111
package org.eclipse.rdf4j.federated.evaluation.concurrent;
1212

1313
import org.eclipse.rdf4j.federated.FederationContext;
14+
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin;
15+
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindLeftJoin;
16+
import org.eclipse.rdf4j.federated.evaluation.join.ParallelBindLeftJoinTask;
17+
import org.eclipse.rdf4j.federated.evaluation.join.ParallelBoundJoinTask;
1418
import org.eclipse.rdf4j.query.BindingSet;
1519

1620
/**
@@ -22,16 +26,19 @@
2226
public interface SchedulerFactory {
2327

2428
/**
25-
* Create a {@link ControlledWorkerScheduler} for joins
29+
* Create a {@link ControlledWorkerScheduler} for regular joins (e.g., the sub-queries generated as part of bind
30+
* joins)
2631
*
2732
* @param federationContext
2833
* @param nWorkers
2934
* @return
35+
* @see ControlledWorkerBindJoin
36+
* @see ParallelBoundJoinTask
3037
*/
3138
ControlledWorkerScheduler<BindingSet> createJoinScheduler(FederationContext federationContext, int nWorkers);
3239

3340
/**
34-
* Create a {@link ControlledWorkerScheduler} for unions
41+
* Create a {@link ControlledWorkerScheduler} for unions (e.g., for executing UNION operands in parallel)
3542
*
3643
* @param federationContext
3744
* @param nWorkers
@@ -40,11 +47,14 @@ public interface SchedulerFactory {
4047
ControlledWorkerScheduler<BindingSet> createUnionScheduler(FederationContext federationContext, int nWorkers);
4148

4249
/**
43-
* Create a {@link ControlledWorkerScheduler} for left joins
50+
* Create a {@link ControlledWorkerScheduler} for left joins (e.g., the sub-queries generated as part of left bind
51+
* joins, i.e. OPTIONAL)
4452
*
4553
* @param federationContext
4654
* @param nWorkers
4755
* @return
56+
* @see ControlledWorkerBindLeftJoin
57+
* @see ParallelBindLeftJoinTask
4858
*/
4959
ControlledWorkerScheduler<BindingSet> createLeftJoinScheduler(FederationContext federationContext, int nWorkers);
5060
}

0 commit comments

Comments
 (0)