Skip to content

Commit 63ae8d3

Browse files
committed
GH-5672 improved handling of drift and also enabled for Memory Store and Native Store
1 parent c17e4c6 commit 63ae8d3

17 files changed

Lines changed: 581 additions & 27 deletions

File tree

core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/impl/LearningEvaluationStrategyFactory.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@
1515

1616
import org.eclipse.rdf4j.query.Dataset;
1717
import org.eclipse.rdf4j.query.algebra.evaluation.EvaluationStrategy;
18+
import org.eclipse.rdf4j.query.algebra.evaluation.RDFStarTripleSource;
1819
import org.eclipse.rdf4j.query.algebra.evaluation.TripleSource;
1920
import org.eclipse.rdf4j.query.algebra.evaluation.federation.FederatedServiceResolver;
2021
import org.eclipse.rdf4j.query.algebra.evaluation.optimizer.JoinStatsProvider;
2122
import org.eclipse.rdf4j.query.algebra.evaluation.optimizer.LearningQueryOptimizerPipeline;
23+
import org.eclipse.rdf4j.query.algebra.evaluation.optimizer.LearningRdfStarTripleSource;
2224
import org.eclipse.rdf4j.query.algebra.evaluation.optimizer.LearningTripleSource;
2325
import org.eclipse.rdf4j.query.algebra.evaluation.optimizer.MemoryJoinStats;
2426

@@ -60,7 +62,9 @@ public JoinStatsProvider getStatsProvider() {
6062
@Override
6163
public EvaluationStrategy createEvaluationStrategy(Dataset dataset, TripleSource tripleSource,
6264
EvaluationStatistics evaluationStatistics) {
63-
TripleSource learningTripleSource = new LearningTripleSource(tripleSource, statsProvider);
65+
TripleSource learningTripleSource = tripleSource instanceof RDFStarTripleSource
66+
? new LearningRdfStarTripleSource((RDFStarTripleSource) tripleSource, statsProvider)
67+
: new LearningTripleSource(tripleSource, statsProvider);
6468
EvaluationStrategy strategy = super.createEvaluationStrategy(dataset, learningTripleSource,
6569
evaluationStatistics);
6670
EvaluationStatistics optimizerStatistics = optimizerStatisticsOverride != null

core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/optimizer/JoinStatsProvider.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,23 @@ public interface JoinStatsProvider {
2222

2323
void recordResults(PatternKey key, long resultCount);
2424

25+
/**
26+
* Seeds statistics for the given key. Implementations may also invalidate or refresh existing entries if the
27+
* supplied default cardinality has drifted significantly from the stored baseline.
28+
*/
2529
void seedIfAbsent(PatternKey key, double defaultCardinality, long priorCalls);
2630

2731
double getAverageResults(PatternKey key);
2832

2933
boolean hasStats(PatternKey key);
3034

3135
long getTotalCalls();
36+
37+
/**
38+
* Records that statements have been added to the store. Implementations may use this to invalidate statistics when
39+
* a write threshold is exceeded in a time window.
40+
*/
41+
default void recordStatementsAdded(long statementCount) {
42+
// no-op by default
43+
}
3244
}

core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/optimizer/LearnedQueryJoinOptimizer.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,11 @@ protected double getTupleExprCost(TupleExpr tupleExpr, Map<TupleExpr, Double> ca
7171

7272
private double estimateCardinality(StatementPattern node) {
7373
PatternKey key = buildKey(node);
74-
if (!statsProvider.hasStats(key)) {
75-
double defaultEstimate = statistics.getCardinality(node);
76-
statsProvider.seedIfAbsent(key, defaultEstimate, DEFAULT_PRIOR_CALLS);
77-
}
74+
double defaultEstimate = statistics.getCardinality(node);
75+
statsProvider.seedIfAbsent(key, defaultEstimate, DEFAULT_PRIOR_CALLS);
7876
double estimate = statsProvider.getAverageResults(key);
7977
if (estimate <= 0.0d) {
80-
estimate = statistics.getCardinality(node);
78+
estimate = defaultEstimate;
8179
}
8280
return estimate;
8381
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2026 Eclipse RDF4J contributors.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Distribution License v1.0
6+
* which accompanies this distribution, and is available at
7+
* http://www.eclipse.org/org/documents/edl-v10.php.
8+
*
9+
* SPDX-License-Identifier: BSD-3-Clause
10+
*******************************************************************************/
11+
// Some portions generated by Codex
12+
package org.eclipse.rdf4j.query.algebra.evaluation.optimizer;
13+
14+
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
15+
import org.eclipse.rdf4j.model.IRI;
16+
import org.eclipse.rdf4j.model.Resource;
17+
import org.eclipse.rdf4j.model.Triple;
18+
import org.eclipse.rdf4j.model.Value;
19+
import org.eclipse.rdf4j.query.QueryEvaluationException;
20+
import org.eclipse.rdf4j.query.algebra.evaluation.RDFStarTripleSource;
21+
22+
/**
23+
* RDF-star aware wrapper that records call counts and result sizes.
24+
*/
25+
public class LearningRdfStarTripleSource extends LearningTripleSource implements RDFStarTripleSource {
26+
27+
private final RDFStarTripleSource rdfStarDelegate;
28+
29+
public LearningRdfStarTripleSource(RDFStarTripleSource delegate, JoinStatsProvider statsProvider) {
30+
super(delegate, statsProvider);
31+
this.rdfStarDelegate = delegate;
32+
}
33+
34+
@Override
35+
public CloseableIteration<? extends Triple> getRdfStarTriples(Resource subj, IRI pred, Value obj)
36+
throws QueryEvaluationException {
37+
PatternKey key = buildKey(subj, pred, obj);
38+
statsProvider.recordCall(key);
39+
CloseableIteration<? extends Triple> base = rdfStarDelegate.getRdfStarTriples(subj, pred, obj);
40+
return new CountingIteration<>(base, statsProvider, key);
41+
}
42+
}

core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/optimizer/LearningTripleSource.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@
3131
*/
3232
public class LearningTripleSource implements TripleSource {
3333

34-
private final TripleSource delegate;
35-
private final JoinStatsProvider statsProvider;
34+
protected final TripleSource delegate;
35+
protected final JoinStatsProvider statsProvider;
3636

3737
public LearningTripleSource(TripleSource delegate, JoinStatsProvider statsProvider) {
3838
this.delegate = Objects.requireNonNull(delegate, "delegate");
@@ -72,7 +72,7 @@ public ValueFactory getValueFactory() {
7272
return delegate.getValueFactory();
7373
}
7474

75-
private static PatternKey buildKey(Resource subj, IRI pred, Value obj) {
75+
protected static PatternKey buildKey(Resource subj, IRI pred, Value obj) {
7676
int mask = 0;
7777
if (subj != null) {
7878
mask |= PatternKey.SUBJECT_BOUND;
@@ -86,14 +86,14 @@ private static PatternKey buildKey(Resource subj, IRI pred, Value obj) {
8686
return new PatternKey(pred, mask);
8787
}
8888

89-
private static final class CountingIteration extends AbstractCloseableIteration<Statement> {
89+
protected static final class CountingIteration<T> extends AbstractCloseableIteration<T> {
9090

91-
private final CloseableIteration<? extends Statement> delegate;
91+
private final CloseableIteration<? extends T> delegate;
9292
private final JoinStatsProvider statsProvider;
9393
private final PatternKey key;
9494
private long count;
9595

96-
private CountingIteration(CloseableIteration<? extends Statement> delegate, JoinStatsProvider statsProvider,
96+
protected CountingIteration(CloseableIteration<? extends T> delegate, JoinStatsProvider statsProvider,
9797
PatternKey key) {
9898
this.delegate = delegate;
9999
this.statsProvider = statsProvider;
@@ -116,15 +116,15 @@ public boolean hasNext() {
116116
}
117117

118118
@Override
119-
public Statement next() {
119+
public T next() {
120120
if (isClosed()) {
121121
throw new NoSuchElementException("The iteration has been closed.");
122122
} else if (Thread.currentThread().isInterrupted()) {
123123
close();
124124
throw new NoSuchElementException("The iteration has been interrupted.");
125125
}
126126
try {
127-
Statement statement = delegate.next();
127+
T statement = delegate.next();
128128
count++;
129129
return statement;
130130
} catch (NoSuchElementException e) {

core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/optimizer/MemoryJoinStats.java

Lines changed: 138 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,24 +11,84 @@
1111
// Some portions generated by Codex
1212
package org.eclipse.rdf4j.query.algebra.evaluation.optimizer;
1313

14+
import java.time.Clock;
15+
import java.time.Duration;
1416
import java.util.Map;
17+
import java.util.Objects;
1518
import java.util.concurrent.ConcurrentHashMap;
19+
import java.util.concurrent.atomic.AtomicLong;
1620
import java.util.concurrent.atomic.LongAdder;
1721

1822
/**
1923
* In-memory statistics store for join pattern fanout metrics.
2024
*/
2125
public class MemoryJoinStats implements JoinStatsProvider {
2226

27+
public static final Duration DEFAULT_INVALIDATION_WINDOW = Duration.ofMinutes(10);
28+
public static final long DEFAULT_INVALIDATION_STATEMENT_THRESHOLD = 100_000L;
29+
public static final double DEFAULT_BASELINE_DRIFT_RATIO = 0.5d;
30+
31+
public static final class InvalidationSettings {
32+
private final Duration window;
33+
private final long statementThreshold;
34+
private final double baselineDriftRatio;
35+
36+
private InvalidationSettings(Duration window, long statementThreshold, double baselineDriftRatio) {
37+
this.window = window;
38+
this.statementThreshold = statementThreshold;
39+
this.baselineDriftRatio = baselineDriftRatio;
40+
}
41+
42+
public static InvalidationSettings disabled() {
43+
return new InvalidationSettings(Duration.ZERO, 0, 0.0d);
44+
}
45+
46+
public static InvalidationSettings of(Duration window, long statementThreshold) {
47+
return of(window, statementThreshold, DEFAULT_BASELINE_DRIFT_RATIO);
48+
}
49+
50+
public static InvalidationSettings of(Duration window, long statementThreshold, double baselineDriftRatio) {
51+
Objects.requireNonNull(window, "window");
52+
if (window.isNegative() || window.isZero()) {
53+
throw new IllegalArgumentException("window must be positive");
54+
}
55+
if (statementThreshold <= 0) {
56+
throw new IllegalArgumentException("statementThreshold must be positive");
57+
}
58+
if (baselineDriftRatio < 0.0d) {
59+
throw new IllegalArgumentException("baselineDriftRatio must be >= 0");
60+
}
61+
return new InvalidationSettings(window, statementThreshold, baselineDriftRatio);
62+
}
63+
64+
public Duration getWindow() {
65+
return window;
66+
}
67+
68+
public long getStatementThreshold() {
69+
return statementThreshold;
70+
}
71+
72+
public double getBaselineDriftRatio() {
73+
return baselineDriftRatio;
74+
}
75+
76+
private boolean enabled() {
77+
return statementThreshold > 0 && !window.isZero();
78+
}
79+
}
80+
2381
private static final class Stats {
2482
private final LongAdder calls = new LongAdder();
2583
private final LongAdder results = new LongAdder();
2684
private final long priorCalls;
2785
private final double priorResults;
86+
private final double baselineCardinality;
2887

29-
private Stats(long priorCalls, double priorResults) {
88+
private Stats(long priorCalls, double priorResults, double baselineCardinality) {
3089
this.priorCalls = priorCalls;
3190
this.priorResults = priorResults;
91+
this.baselineCardinality = baselineCardinality;
3292
}
3393

3494
private double averageResults() {
@@ -43,9 +103,34 @@ private double averageResults() {
43103
private long actualCalls() {
44104
return calls.sum();
45105
}
106+
107+
private double baselineCardinality() {
108+
return baselineCardinality;
109+
}
46110
}
47111

48112
private final Map<PatternKey, Stats> stats = new ConcurrentHashMap<>();
113+
private final InvalidationSettings invalidationSettings;
114+
private final Clock clock;
115+
private final Object invalidationLock = new Object();
116+
private final AtomicLong windowStartMillis;
117+
private final LongAdder statementsAddedInWindow = new LongAdder();
118+
119+
public MemoryJoinStats() {
120+
this(InvalidationSettings.of(DEFAULT_INVALIDATION_WINDOW, DEFAULT_INVALIDATION_STATEMENT_THRESHOLD),
121+
Clock.systemUTC());
122+
}
123+
124+
public MemoryJoinStats(InvalidationSettings invalidationSettings) {
125+
this(invalidationSettings, Clock.systemUTC());
126+
}
127+
128+
public MemoryJoinStats(InvalidationSettings invalidationSettings, Clock clock) {
129+
this.invalidationSettings = Objects.requireNonNull(invalidationSettings, "invalidationSettings");
130+
this.clock = Objects.requireNonNull(clock, "clock");
131+
long start = invalidationSettings.enabled() ? clock.millis() : 0L;
132+
this.windowStartMillis = new AtomicLong(start);
133+
}
49134

50135
@Override
51136
public void reset() {
@@ -54,22 +139,30 @@ public void reset() {
54139

55140
@Override
56141
public void recordCall(PatternKey key) {
57-
stats.computeIfAbsent(key, ignored -> new Stats(0, 0)).calls.increment();
142+
stats.computeIfAbsent(key, ignored -> new Stats(0, 0, 0.0d)).calls.increment();
58143
}
59144

60145
@Override
61146
public void recordResults(PatternKey key, long resultCount) {
62147
if (resultCount < 0) {
63148
resultCount = 0;
64149
}
65-
stats.computeIfAbsent(key, ignored -> new Stats(0, 0)).results.add(resultCount);
150+
stats.computeIfAbsent(key, ignored -> new Stats(0, 0, 0.0d)).results.add(resultCount);
66151
}
67152

68153
@Override
69154
public void seedIfAbsent(PatternKey key, double defaultCardinality, long priorCalls) {
70155
long seedCalls = Math.max(0, priorCalls);
71156
double priorResults = Math.max(0.0d, defaultCardinality) * seedCalls;
72-
stats.computeIfAbsent(key, ignored -> new Stats(seedCalls, priorResults));
157+
stats.compute(key, (ignored, existing) -> {
158+
if (existing == null) {
159+
return new Stats(seedCalls, priorResults, defaultCardinality);
160+
}
161+
if (baselineDrifted(existing, defaultCardinality)) {
162+
return new Stats(seedCalls, priorResults, defaultCardinality);
163+
}
164+
return existing;
165+
});
73166
}
74167

75168
@Override
@@ -91,4 +184,45 @@ public long getTotalCalls() {
91184
}
92185
return total;
93186
}
187+
188+
@Override
189+
public void recordStatementsAdded(long statementCount) {
190+
if (!invalidationSettings.enabled() || statementCount <= 0) {
191+
return;
192+
}
193+
synchronized (invalidationLock) {
194+
long nowMillis = clock.millis();
195+
long windowStart = windowStartMillis.get();
196+
long windowMillis = invalidationSettings.getWindow().toMillis();
197+
if (windowMillis <= 0) {
198+
return;
199+
}
200+
if (nowMillis - windowStart >= windowMillis) {
201+
windowStartMillis.set(nowMillis);
202+
statementsAddedInWindow.reset();
203+
}
204+
statementsAddedInWindow.add(statementCount);
205+
if (statementsAddedInWindow.sum() >= invalidationSettings.getStatementThreshold()) {
206+
reset();
207+
windowStartMillis.set(nowMillis);
208+
statementsAddedInWindow.reset();
209+
}
210+
}
211+
}
212+
213+
private boolean baselineDrifted(Stats existing, double newBaseline) {
214+
double driftRatio = invalidationSettings.getBaselineDriftRatio();
215+
if (driftRatio <= 0.0d) {
216+
return false;
217+
}
218+
double baseline = existing.baselineCardinality();
219+
if (baseline <= 0.0d) {
220+
return false;
221+
}
222+
if (newBaseline <= 0.0d) {
223+
return baseline > 0.0d;
224+
}
225+
double relativeChange = Math.abs(newBaseline - baseline) / baseline;
226+
return relativeChange >= driftRatio;
227+
}
94228
}

0 commit comments

Comments
 (0)