Skip to content

Commit d74e6be

Browse files
committed
GH-5197: preparation for supporting fair sub-query execution in FedX
This change adds preparational infrastructure for having different implementations of schedulers. Configuration is here prepared by means of defining a "SchedulerFactory" interface with a default implementation aside (which essentially mimics the current behavior). Note that for ease of development some aspects of ControlledWorkerScheduler are made accessible to sub-classes. The idea is that in the end version there is an abstract scheduler class providing shared functionality and different implementation (e.g. the current FIFO one and a fair implementation)
1 parent ab02413 commit d74e6be

5 files changed

Lines changed: 141 additions & 10 deletions

File tree

tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedX.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.eclipse.rdf4j.federated.endpoint.Endpoint;
2323
import org.eclipse.rdf4j.federated.endpoint.ResolvableEndpoint;
2424
import org.eclipse.rdf4j.federated.evaluation.FederationEvaluationStrategyFactory;
25+
import org.eclipse.rdf4j.federated.evaluation.concurrent.DefaultSchedulerFactory;
26+
import org.eclipse.rdf4j.federated.evaluation.concurrent.SchedulerFactory;
2527
import org.eclipse.rdf4j.federated.exception.ExceptionUtil;
2628
import org.eclipse.rdf4j.federated.exception.FedXException;
2729
import org.eclipse.rdf4j.federated.exception.FedXRuntimeException;
@@ -64,6 +66,8 @@ public class FedX extends AbstractSail implements RepositoryResolverClient {
6466

6567
private FederationEvaluationStrategyFactory strategyFactory;
6668

69+
private SchedulerFactory schedulerFactory;
70+
6771
private WriteStrategyFactory writeStrategyFactory;
6872

6973
private File dataDir;
@@ -96,6 +100,22 @@ public void setFederationEvaluationStrategy(FederationEvaluationStrategyFactory
96100
this.strategyFactory = strategyFactory;
97101
}
98102

103+
/* package */ SchedulerFactory getSchedulerFactory() {
104+
if (schedulerFactory == null) {
105+
schedulerFactory = DefaultSchedulerFactory.INSTANCE;
106+
}
107+
return schedulerFactory;
108+
}
109+
110+
/**
111+
* Set the {@link SchedulerFactory}. Can only be done before initialization of the federation
112+
*
113+
* @param schedulerFactory the {@link SchedulerFactory}
114+
*/
115+
public void setSchedulerFactory(SchedulerFactory schedulerFactory) {
116+
this.schedulerFactory = schedulerFactory;
117+
}
118+
99119
/**
100120
*
101121
* @param writeStrategyFactory the {@link WriteStrategyFactory}

tools/federation/src/main/java/org/eclipse/rdf4j/federated/FederationManager.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler;
2727
import org.eclipse.rdf4j.federated.evaluation.concurrent.NamingThreadFactory;
2828
import org.eclipse.rdf4j.federated.evaluation.concurrent.Scheduler;
29+
import org.eclipse.rdf4j.federated.evaluation.concurrent.SchedulerFactory;
2930
import org.eclipse.rdf4j.federated.evaluation.concurrent.TaskWrapper;
3031
import org.eclipse.rdf4j.federated.evaluation.union.ControlledWorkerUnion;
3132
import org.eclipse.rdf4j.federated.evaluation.union.SynchronousWorkerUnion;
@@ -118,26 +119,28 @@ public void reset() {
118119
log.debug("Scheduler for join and union are reset.");
119120
}
120121

122+
SchedulerFactory schedulerFactory = federation.getSchedulerFactory();
123+
121124
Optional<TaskWrapper> taskWrapper = federationContext.getConfig().getTaskWrapper();
122125
if (joinScheduler != null) {
123126
joinScheduler.abort();
124127
}
125-
joinScheduler = new ControlledWorkerScheduler<>(federationContext.getConfig().getJoinWorkerThreads(),
126-
"Join Scheduler");
128+
joinScheduler = schedulerFactory.createJoinScheduler(federationContext,
129+
federationContext.getConfig().getJoinWorkerThreads());
127130
taskWrapper.ifPresent(joinScheduler::setTaskWrapper);
128131

129132
if (unionScheduler != null) {
130133
unionScheduler.abort();
131134
}
132-
unionScheduler = new ControlledWorkerScheduler<>(federationContext.getConfig().getUnionWorkerThreads(),
133-
"Union Scheduler");
135+
unionScheduler = schedulerFactory.createUnionScheduler(federationContext,
136+
federationContext.getConfig().getUnionWorkerThreads());
134137
taskWrapper.ifPresent(unionScheduler::setTaskWrapper);
135138

136139
if (leftJoinScheduler != null) {
137140
leftJoinScheduler.abort();
138141
}
139-
leftJoinScheduler = new ControlledWorkerScheduler<>(federationContext.getConfig().getLeftJoinWorkerThreads(),
140-
"Left Join Scheduler");
142+
leftJoinScheduler = schedulerFactory.createLeftJoinScheduler(federationContext,
143+
federationContext.getConfig().getLeftJoinWorkerThreads());
141144
taskWrapper.ifPresent(leftJoinScheduler::setTaskWrapper);
142145

143146
}

tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ControlledWorkerScheduler.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
package org.eclipse.rdf4j.federated.evaluation.concurrent;
1212

1313
import java.util.List;
14+
import java.util.concurrent.BlockingQueue;
1415
import java.util.concurrent.ExecutorService;
1516
import java.util.concurrent.Future;
1617
import java.util.concurrent.LinkedBlockingQueue;
@@ -42,7 +43,8 @@ public class ControlledWorkerScheduler<T> implements Scheduler<T>, TaskWrapperAw
4243

4344
private final ExecutorService executor;
4445

45-
private final LinkedBlockingQueue<Runnable> _taskQueue = new LinkedBlockingQueue<>();
46+
// Note: initialized in #createExecutorService
47+
protected BlockingQueue<Runnable> _taskQueue;
4648

4749
private final int nWorkers;
4850
private final String name;
@@ -57,7 +59,7 @@ public class ControlledWorkerScheduler<T> implements Scheduler<T>, TaskWrapperAw
5759
public ControlledWorkerScheduler(int nWorkers, String name) {
5860
this.nWorkers = nWorkers;
5961
this.name = name;
60-
this.executor = createExecutorService();
62+
this.executor = createExecutorService(nWorkers, name);
6163
}
6264

6365
/**
@@ -112,13 +114,25 @@ public int getTotalNumberOfWorkers() {
112114
return nWorkers;
113115
}
114116

117+
@Deprecated(forRemoval = true) // currently unused and this class is internal
115118
public int getNumberOfTasks() {
116119
return _taskQueue.size();
117120
}
118121

119-
private ExecutorService createExecutorService() {
122+
/**
123+
* Create the {@link ExecutorService} which is managing the individual {@link ParallelTask}s in a thread pool. The
124+
* default implementation creates a thread pool with a {@link LinkedBlockingQueue}.
125+
*
126+
* @param nWorkers the number of workers in the thread pool
127+
* @param name the base name for threads in the pool
128+
* @return
129+
*/
130+
protected ExecutorService createExecutorService(int nWorkers, String name) {
131+
132+
// use a LinkedBlockingQueue by default
133+
this._taskQueue = new LinkedBlockingQueue<>();
120134

121-
ThreadPoolExecutor executor = new ThreadPoolExecutor(nWorkers, nWorkers, 60L, TimeUnit.SECONDS, _taskQueue,
135+
ThreadPoolExecutor executor = new ThreadPoolExecutor(nWorkers, nWorkers, 60L, TimeUnit.SECONDS, this._taskQueue,
122136
new NamingThreadFactory(name));
123137
executor.allowCoreThreadTimeOut(true);
124138
return executor;
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2024 Eclipse RDF4J contributors.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Distribution License v1.0
6+
* which accompanies this distribution, and is available at
7+
* http://www.eclipse.org/org/documents/edl-v10.php.
8+
*
9+
* SPDX-License-Identifier: BSD-3-Clause
10+
*******************************************************************************/
11+
package org.eclipse.rdf4j.federated.evaluation.concurrent;
12+
13+
import org.eclipse.rdf4j.federated.FederationContext;
14+
import org.eclipse.rdf4j.query.BindingSet;
15+
16+
/**
17+
* The default {@link SchedulerFactory}
18+
*/
19+
public class DefaultSchedulerFactory implements SchedulerFactory {
20+
21+
public static final DefaultSchedulerFactory INSTANCE = new DefaultSchedulerFactory();
22+
23+
@Override
24+
public ControlledWorkerScheduler<BindingSet> createJoinScheduler(FederationContext federationContext,
25+
int nWorkers) {
26+
return new ControlledWorkerScheduler<>(nWorkers,
27+
"Join Scheduler");
28+
}
29+
30+
@Override
31+
public ControlledWorkerScheduler<BindingSet> createUnionScheduler(FederationContext federationContext,
32+
int nWorkers) {
33+
return new ControlledWorkerScheduler<>(nWorkers,
34+
"Union Scheduler");
35+
}
36+
37+
@Override
38+
public ControlledWorkerScheduler<BindingSet> createLeftJoinScheduler(FederationContext federationContext,
39+
int nWorkers) {
40+
return new ControlledWorkerScheduler<>(nWorkers,
41+
"Left Join Scheduler");
42+
}
43+
44+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2024 Eclipse RDF4J contributors.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Distribution License v1.0
6+
* which accompanies this distribution, and is available at
7+
* http://www.eclipse.org/org/documents/edl-v10.php.
8+
*
9+
* SPDX-License-Identifier: BSD-3-Clause
10+
*******************************************************************************/
11+
package org.eclipse.rdf4j.federated.evaluation.concurrent;
12+
13+
import org.eclipse.rdf4j.federated.FederationContext;
14+
import org.eclipse.rdf4j.query.BindingSet;
15+
16+
/**
17+
* Factory for creating {@link ControlledWorkerScheduler} for executing subqueries (e.g. joins) in the background
18+
*
19+
* @see DefaultSchedulerFactory
20+
* @author Andreas Schwarte
21+
*/
22+
public interface SchedulerFactory {
23+
24+
/**
25+
* Create a {@link ControlledWorkerScheduler} for joins
26+
*
27+
* @param federationContext
28+
* @param nWorkers
29+
* @return
30+
*/
31+
ControlledWorkerScheduler<BindingSet> createJoinScheduler(FederationContext federationContext, int nWorkers);
32+
33+
/**
34+
* Create a {@link ControlledWorkerScheduler} for unions
35+
*
36+
* @param federationContext
37+
* @param nWorkers
38+
* @return
39+
*/
40+
ControlledWorkerScheduler<BindingSet> createUnionScheduler(FederationContext federationContext, int nWorkers);
41+
42+
/**
43+
* Create a {@link ControlledWorkerScheduler} for left joins
44+
*
45+
* @param federationContext
46+
* @param nWorkers
47+
* @return
48+
*/
49+
ControlledWorkerScheduler<BindingSet> createLeftJoinScheduler(FederationContext federationContext, int nWorkers);
50+
}

0 commit comments

Comments
 (0)