Skip to content

Commit ceb8156

Browse files
authored
GH-4962: invalidation support for FedX source selection cache + factory (#4966)
2 parents 03aefb9 + 2f15974 commit ceb8156

6 files changed

Lines changed: 83 additions & 1 deletion

File tree

tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedXConfig.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.util.Optional;
1414

1515
import org.eclipse.rdf4j.federated.cache.SourceSelectionCache;
16+
import org.eclipse.rdf4j.federated.cache.SourceSelectionCacheFactory;
1617
import org.eclipse.rdf4j.federated.cache.SourceSelectionMemoryCache;
1718
import org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler;
1819
import org.eclipse.rdf4j.federated.evaluation.concurrent.TaskWrapper;
@@ -57,6 +58,8 @@ public class FedXConfig {
5758

5859
private String sourceSelectionCacheSpec = null;
5960

61+
private SourceSelectionCacheFactory sourceSelectionCacheFactory = null;
62+
6063
private TaskWrapper taskWrapper = null;
6164

6265
private String prefixDeclarations = null;
@@ -251,6 +254,18 @@ public FedXConfig withSourceSelectionCacheSpec(String cacheSpec) {
251254
return this;
252255
}
253256

257+
/**
258+
* The {@link SourceSelectionCacheFactory} to be used. If not set explicitly, the default in memory implementation
259+
* is used with the configued {@link #getSourceSelectionCacheSpec()}.
260+
*
261+
* @param factory the {@link SourceSelectionCacheFactory}
262+
* @return the current config
263+
*/
264+
public FedXConfig withSourceSelectionCacheFactory(SourceSelectionCacheFactory factory) {
265+
this.sourceSelectionCacheFactory = factory;
266+
return this;
267+
}
268+
254269
/**
255270
* Sets a {@link TaskWrapper} which may be used for wrapping any background {@link Runnable}s. If no such wrapper is
256271
* explicitly configured, the unmodified task is returned. See {@link TaskWrapper} for more information.
@@ -398,12 +413,24 @@ public String getPrefixDeclarations() {
398413
* Returns the configured {@link CacheBuilderSpec} (if any) for the {@link SourceSelectionMemoryCache}. If not
399414
* defined, the {@link SourceSelectionMemoryCache#DEFAULT_CACHE_SPEC} is used.
400415
*
416+
* If {@link #getSourceSelectionCacheFactory()} is configured, this setting is ignored.
417+
*
401418
* @return the {@link CacheBuilderSpec} or <code>null</code>
402419
*/
403420
public String getSourceSelectionCacheSpec() {
404421
return this.sourceSelectionCacheSpec;
405422
}
406423

424+
/**
425+
* Returns the {@link SourceSelectionCacheFactory} (if any). If not defined, the {@link SourceSelectionCache} is
426+
* instantiated using the default implementation and respects {@link #getSourceSelectionCacheSpec()}.
427+
*
428+
* @return {@link SourceSelectionCacheFactory}
429+
*/
430+
public SourceSelectionCacheFactory getSourceSelectionCacheFactory() {
431+
return this.sourceSelectionCacheFactory;
432+
}
433+
407434
/**
408435
* The debug mode for query plan. If enabled, the query execution plan is printed to stdout
409436
*

tools/federation/src/main/java/org/eclipse/rdf4j/federated/FederationContext.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,12 +97,17 @@ public FederationEvalStrategy createStrategy(Dataset dataset) {
9797
}
9898

9999
/**
100-
* Create the {@link SourceSelectionCache}
100+
* Create the {@link SourceSelectionCache}.
101101
*
102102
* @return the {@link SourceSelectionCache}
103103
* @see FedXConfig#getSourceSelectionCacheSpec()
104+
* @see FedXConfig#getSourceSelectionCacheFactory()
104105
*/
105106
private SourceSelectionCache createSourceSelectionCache() {
107+
var factory = getConfig().getSourceSelectionCacheFactory();
108+
if (factory != null) {
109+
return factory.create();
110+
}
106111
String cacheSpec = getConfig().getSourceSelectionCacheSpec();
107112
return new SourceSelectionMemoryCache(cacheSpec);
108113
}

tools/federation/src/main/java/org/eclipse/rdf4j/federated/cache/SourceSelectionCache.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,4 +66,10 @@ enum StatementSourceAssurance {
6666
* @param hasStatements
6767
*/
6868
void updateInformation(SubQuery subQuery, Endpoint endpoint, boolean hasStatements);
69+
70+
/**
71+
* Invalidate the underlying cache
72+
*/
73+
void invalidate();
74+
6975
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2024 Eclipse RDF4J contributors.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Distribution License v1.0
6+
* which accompanies this distribution, and is available at
7+
* http://www.eclipse.org/org/documents/edl-v10.php.
8+
*
9+
* SPDX-License-Identifier: BSD-3-Clause
10+
*******************************************************************************/
11+
package org.eclipse.rdf4j.federated.cache;
12+
13+
/**
14+
* A factory for {@link SourceSelectionCache}.
15+
*
16+
* @author Andreas Schwarte
17+
*/
18+
public interface SourceSelectionCacheFactory {
19+
20+
/**
21+
* Create the {@link SourceSelectionCache}
22+
*
23+
* @return
24+
*/
25+
SourceSelectionCache create();
26+
}

tools/federation/src/main/java/org/eclipse/rdf4j/federated/cache/SourceSelectionMemoryCache.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,11 @@ public void updateInformation(SubQuery subQuery, Endpoint endpoint, boolean hasS
8383
updateInferredInformation(subQuery, endpoint, hasStatements);
8484
}
8585

86+
@Override
87+
public void invalidate() {
88+
cache.invalidateAll(); // invalidate the entire cache
89+
}
90+
8691
private void updateCacheEntry(SubQuery subQuery, Endpoint endpoint, boolean hasStatements) {
8792
Entry entry;
8893
try {

tools/federation/src/test/java/org/eclipse/rdf4j/federated/cache/SourceSelectionMemoryCacheTest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,19 @@ public void testCache_Integration() throws Exception {
148148
// source selection is cached, only from fetching data
149149
Assertions.assertEquals(1, requestsForEndpoint(endpoints.get(0)));
150150
Assertions.assertEquals(0, requestsForEndpoint(endpoints.get(1)));
151+
152+
// invalidate source selection cache
153+
federationContext().getSourceSelectionCache().invalidate();
154+
155+
// re-run requests
156+
monitoring().resetMonitoringInformation();
157+
try (TupleQueryResult tqr = federationContext().getQueryManager().prepareTupleQuery(query).evaluate()) {
158+
Assertions.assertEquals(2, Iterations.asList(tqr).size());
159+
}
160+
161+
// source selection is non longer cached,
162+
Assertions.assertEquals(2, requestsForEndpoint(endpoints.get(0)));
163+
Assertions.assertEquals(1, requestsForEndpoint(endpoints.get(1)));
151164
}
152165

153166
@Test

0 commit comments

Comments
 (0)