Skip to content

Commit 889b91b

Browse files
committed
improved detection and monitoring
1 parent acd119d commit 889b91b

11 files changed

Lines changed: 184 additions & 18 deletions

File tree

core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/DistinctIteration.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,10 @@ public DistinctIteration(CloseableIteration<? extends E> iter, Supplier<Set<E>>
8181
@Override
8282
protected boolean accept(E object) {
8383
try {
84+
QueryExecutionContextBridge.throwIfHeavyOperatorExecutionDisabled(OPERATOR_NAME);
8485
QueryExecutionContextBridge.markHeavy(OPERATOR_NAME);
8586
QueryExecutionContextBridge.checkpoint(OPERATOR_NAME);
86-
}catch (Throwable t) {
87+
} catch (Throwable t) {
8788
excludeSet = null;
8889
throw t;
8990
}
@@ -124,6 +125,7 @@ private static final class QueryExecutionContextBridge {
124125
private static volatile boolean initialized;
125126
private static volatile Method markHeavyMethod;
126127
private static volatile Method checkpointMethod;
128+
private static volatile Method throwIfHeavyOperatorExecutionDisabledMethod;
127129

128130
private QueryExecutionContextBridge() {
129131
}
@@ -142,6 +144,15 @@ private static void checkpoint(String operator) {
142144
}
143145
}
144146

147+
private static void throwIfHeavyOperatorExecutionDisabled(String operator) {
148+
if (!initialized) {
149+
initialize();
150+
}
151+
if (throwIfHeavyOperatorExecutionDisabledMethod != null) {
152+
invoke(throwIfHeavyOperatorExecutionDisabledMethod, operator);
153+
}
154+
}
155+
145156
private static Method getMethod(boolean markHeavy) {
146157
if (!initialized) {
147158
initialize();
@@ -157,9 +168,12 @@ private static synchronized void initialize() {
157168
Class<?> contextType = Class.forName(QUERY_EXECUTION_CONTEXT_CLASS);
158169
markHeavyMethod = contextType.getMethod("markHeavy", String.class);
159170
checkpointMethod = contextType.getMethod("checkpoint", String.class);
171+
throwIfHeavyOperatorExecutionDisabledMethod = contextType.getMethod(
172+
"throwIfHeavyOperatorExecutionDisabled", String.class);
160173
} catch (ClassNotFoundException | NoSuchMethodException e) {
161174
markHeavyMethod = null;
162175
checkpointMethod = null;
176+
throwIfHeavyOperatorExecutionDisabledMethod = null;
163177
}
164178
initialized = true;
165179
}

core/common/iterator/src/test/java/org/eclipse/rdf4j/common/iteration/DistinctIterationTest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,17 @@ void shouldConsultQueryExecutionContextWhenFilteringDistinctValues() {
4040
assertTrue(QueryExecutionContext.getMarkHeavyCalls() > 0);
4141
assertTrue(QueryExecutionContext.getCheckpointCalls() > 0);
4242
}
43+
44+
@Test
45+
void shouldAbortDistinctIterationWhenHeavyOperatorExecutionIsDisabled() {
46+
QueryExecutionContext.disableHeavyOperatorExecution(new RuntimeException("critical-breaker-stop"));
47+
48+
DistinctIteration<Integer> iteration = new DistinctIteration<>(
49+
new CloseableIteratorIteration<>(List.of(1, 2, 3).iterator()), java.util.HashSet::new);
50+
51+
RuntimeException exception = assertThrows(RuntimeException.class, iteration::hasNext);
52+
assertEquals("critical-breaker-stop", exception.getMessage());
53+
assertEquals(0, QueryExecutionContext.getMarkHeavyCalls());
54+
assertEquals(0, QueryExecutionContext.getCheckpointCalls());
55+
}
4356
}

core/common/iterator/src/test/java/org/eclipse/rdf4j/http/client/QueryExecutionContext.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ public final class QueryExecutionContext {
1818
private static final AtomicInteger MARK_HEAVY_CALLS = new AtomicInteger();
1919
private static final AtomicInteger CHECKPOINT_CALLS = new AtomicInteger();
2020

21+
private static volatile boolean heavyOperatorExecutionEnabled = true;
2122
private static volatile RuntimeException checkpointFailure;
23+
private static volatile RuntimeException heavyOperatorExecutionFailure;
2224

2325
private QueryExecutionContext() {
2426
}
@@ -34,10 +36,27 @@ public static void checkpoint(String operator) {
3436
}
3537
}
3638

39+
public static void throwIfHeavyOperatorExecutionDisabled(String operator) {
40+
if (!heavyOperatorExecutionEnabled) {
41+
throw heavyOperatorExecutionFailure != null ? heavyOperatorExecutionFailure
42+
: new RuntimeException("heavy operator execution disabled: " + operator);
43+
}
44+
}
45+
3746
public static void failOnCheckpoint(RuntimeException runtimeException) {
3847
checkpointFailure = runtimeException;
3948
}
4049

50+
public static void disableHeavyOperatorExecution(RuntimeException runtimeException) {
51+
heavyOperatorExecutionEnabled = false;
52+
heavyOperatorExecutionFailure = runtimeException;
53+
}
54+
55+
public static void enableHeavyOperatorExecution() {
56+
heavyOperatorExecutionEnabled = true;
57+
heavyOperatorExecutionFailure = null;
58+
}
59+
4160
public static int getMarkHeavyCalls() {
4261
return MARK_HEAVY_CALLS.get();
4362
}
@@ -49,6 +68,8 @@ public static int getCheckpointCalls() {
4968
public static void reset() {
5069
MARK_HEAVY_CALLS.set(0);
5170
CHECKPOINT_CALLS.set(0);
71+
heavyOperatorExecutionEnabled = true;
5272
checkpointFailure = null;
73+
heavyOperatorExecutionFailure = null;
5374
}
5475
}

core/http/client/src/main/java/org/eclipse/rdf4j/http/client/QueryCircuitBreaker.java

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -138,17 +138,15 @@ public void beforeExecution(QueryCircuitBreakerHandle handle) throws QueryInterr
138138
}
139139

140140
if (state == QueryPressureState.WARN && configuration.getWarnAdmissionDelayMs() > 0) {
141-
System.gc();
142141
delay(configuration.getWarnAdmissionDelayMs(), configuration, false);
143142
}
144143

145144
if (state.rejectsNewQueries()) {
145+
System.gc();
146146
rejectCount.incrementAndGet();
147147
if (state == QueryPressureState.CRITICAL) {
148-
System.gc();
149148
cancelOneHeavyQueryIfNeeded(configuration, snapshot, "critical-admission");
150149
}
151-
System.gc();
152150
throw CircuitBreakerException.rejected(state, configuration.getRetryAfterSeconds(),
153151
buildPressureMessage("Query rejected by global memory circuit breaker", snapshot, state));
154152
}
@@ -213,6 +211,12 @@ public StatusSnapshot snapshotStatus() {
213211
transition.getTimestampMillis(), transition.getReason());
214212
}
215213

214+
void refreshHeavyOperatorExecutionState() {
215+
Configuration configuration = configurationSupplier.get();
216+
QueryPressureMonitor.Snapshot snapshot = pressureMonitor.sample();
217+
refreshState("activate", configuration, snapshot);
218+
}
219+
216220
public static CircuitBreakerException asCircuitBreakerException(Throwable throwable) {
217221
if (throwable instanceof CircuitBreakerException) {
218222
return (CircuitBreakerException) throwable;
@@ -250,7 +254,11 @@ private QueryPressureState refreshState(String reason, Configuration configurati
250254
LOGGER.info(
251255
"Query circuit breaker transition previous={} current={} freeMb={} rollingGcMs={} reason={}",
252256
previous, next, snapshot.getFreeMemoryMb(), snapshot.getRollingGcMs(), reason);
257+
if (currentState == QueryPressureState.HIGH || (currentState == QueryPressureState.WARN && determineFreeMemoryState(configuration, snapshot.getFreeMemoryMb()) == QueryPressureState.WARN)) {
258+
System.gc();
259+
}
253260
}
261+
QueryExecutionContext.setHeavyOperatorExecutionEnabled(currentState != QueryPressureState.CRITICAL);
254262
return currentState;
255263
}
256264

@@ -404,6 +412,27 @@ private synchronized void cancelOneHeavyQueryIfNeeded(Configuration configuratio
404412
candidate.getLastHeavyOperator(), reason);
405413
}
406414

415+
void interruptForCriticalHeavyOperator(QueryCircuitBreakerHandle handle, String operator)
416+
throws QueryInterruptedException {
417+
if (handle == null) {
418+
return;
419+
}
420+
421+
Configuration configuration = configurationSupplier.get();
422+
QueryPressureMonitor.Snapshot snapshot = pressureMonitor.sample();
423+
handle.markHeavy(operator, clock.getAsLong());
424+
425+
String cancelReason = buildPressureMessage("Query cancelled by global memory circuit breaker", snapshot,
426+
QueryPressureState.CRITICAL);
427+
if (handle.requestCancel(QueryPressureState.CRITICAL, cancelReason)) {
428+
cancelCount.incrementAndGet();
429+
}
430+
431+
String effectiveReason = handle.getCancellationReason() != null ? handle.getCancellationReason() : cancelReason;
432+
throw CircuitBreakerException.cancelled(handle.getCancellationState(), configuration.getRetryAfterSeconds(),
433+
effectiveReason);
434+
}
435+
407436
private String buildPressureMessage(String prefix, QueryPressureMonitor.Snapshot snapshot,
408437
QueryPressureState state) {
409438
return prefix + " (state=" + state + ", freeMb=" + snapshot.getFreeMemoryMb() + ", rollingGcMs="
@@ -641,10 +670,10 @@ private Configuration(boolean enabled, int warnGcMs, int highGcMs, int criticalG
641670
}
642671

643672
static Configuration fromSystemProperties() {
644-
return new Configuration(Boolean.parseBoolean(System.getProperty(ENABLED_PROPERTY, "false")),
645-
getIntProperty(WARN_GC_MS_PROPERTY, 100),
646-
getIntProperty(HIGH_GC_MS_PROPERTY, 250),
647-
getIntProperty(CRITICAL_GC_MS_PROPERTY, 400),
673+
return new Configuration(Boolean.parseBoolean(System.getProperty(ENABLED_PROPERTY, "true")),
674+
getIntProperty(WARN_GC_MS_PROPERTY, 200),
675+
getIntProperty(HIGH_GC_MS_PROPERTY, 400),
676+
getIntProperty(CRITICAL_GC_MS_PROPERTY, 800),
648677
getIntProperty(WARN_FREE_MB_PROPERTY, 256),
649678
getIntProperty(HIGH_FREE_MB_PROPERTY, 128),
650679
getIntProperty(CRITICAL_FREE_MB_PROPERTY, 96),

core/http/client/src/main/java/org/eclipse/rdf4j/http/client/QueryExecutionContext.java

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,14 @@
2020
*/
2121
public final class QueryExecutionContext {
2222

23-
private static final int CHECKPOINT_STRIDE = 16384;
24-
private static final int CHECKPOINT_MASK = CHECKPOINT_STRIDE - 1;
23+
private static final int CHECKPOINT_STRIDE_DEFAULT = 1024;
24+
private static int CHECKPOINT_STRIDE = CHECKPOINT_STRIDE_DEFAULT;
25+
private static int CHECKPOINT_MASK = CHECKPOINT_STRIDE - 1;
2526
private static final ThreadLocal<State> CURRENT = new ThreadLocal<>();
27+
private static boolean heavyOperatorExecutionEnabled = true;
28+
private static int checkpointCalls;
29+
30+
2631

2732
private QueryExecutionContext() {
2833
}
@@ -33,6 +38,7 @@ public static Activation activate(QueryCircuitBreakerHandle handle) {
3338
State next = new State(normalizedHandle, previous);
3439
normalizedHandle.attachCurrentThread(null);
3540
CURRENT.set(next);
41+
QueryCircuitBreaker.getInstance().refreshHeavyOperatorExecutionState();
3642
return () -> {
3743
State current = CURRENT.get();
3844
if (current != next) {
@@ -59,29 +65,49 @@ public static void markHeavy(String operator) {
5965
}
6066

6167
public static void checkpoint(String operator) throws QueryInterruptedException {
68+
if(!shouldCheckpoint()) return;
6269
State state = CURRENT.get();
63-
if (state != null && state.shouldCheckpoint()) {
70+
if (state != null) {
6471
QueryCircuitBreaker.getInstance().checkpoint(state.handle, operator);
6572
}
6673
}
6774

75+
public static void throwIfHeavyOperatorExecutionDisabled(String operator) throws QueryInterruptedException {
76+
if (heavyOperatorExecutionEnabled) {
77+
return;
78+
}
79+
80+
State state = CURRENT.get();
81+
82+
if (state == null) {
83+
return;
84+
}
85+
86+
QueryCircuitBreaker.getInstance().interruptForCriticalHeavyOperator(state.handle, operator);
87+
}
88+
89+
static void setHeavyOperatorExecutionEnabled(boolean heavyOperatorExecutionEnabled) {
90+
QueryExecutionContext.heavyOperatorExecutionEnabled = heavyOperatorExecutionEnabled;
91+
}
92+
6893
public interface Activation extends AutoCloseable {
6994
@Override
7095
void close();
7196
}
7297

98+
private static boolean shouldCheckpoint() {
99+
return ((++checkpointCalls) & CHECKPOINT_MASK) == 0;
100+
}
101+
73102
private static final class State {
74103
private final QueryCircuitBreakerHandle handle;
75104
private final State previous;
76-
private int checkpointCalls;
77105

78106
private State(QueryCircuitBreakerHandle handle, State previous) {
79107
this.handle = handle;
80108
this.previous = previous;
81109
}
82110

83-
private boolean shouldCheckpoint() {
84-
return ((++checkpointCalls) & CHECKPOINT_MASK) == 0;
85-
}
111+
86112
}
87113
}

core/http/client/src/test/java/org/eclipse/rdf4j/http/client/QueryCircuitBreakerTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,23 @@ void shouldThrottleExecutionContextCheckpointToEvery16384Calls() throws Exceptio
9090
}
9191
}
9292

93+
@Test
94+
void shouldDisableHeavyOperatorExecutionOnlyWhileCritical() throws Exception {
95+
Fixture fixture = new Fixture();
96+
QueryCircuitBreaker breaker = fixture.breaker(configuration(true, 100, 200, 300, 400, 300, 200, 25, 10, 0,
97+
7), 0);
98+
99+
assertTrue(readHeavyOperatorExecutionEnabled());
100+
101+
fixture.freeMemoryMb.set(150);
102+
assertEquals("CRITICAL", breaker.snapshotStatus().getState());
103+
assertFalse(readHeavyOperatorExecutionEnabled());
104+
105+
fixture.freeMemoryMb.set(1024);
106+
assertEquals("NORMAL", breaker.snapshotStatus().getState());
107+
assertTrue(readHeavyOperatorExecutionEnabled());
108+
}
109+
93110
@Test
94111
void shouldTransitionAcrossPressureLevelsUsingFreeMemoryThresholds() {
95112
Fixture fixture = new Fixture();
@@ -305,6 +322,12 @@ private static Constructor<QueryCircuitBreaker.Configuration> configurationCtor(
305322
}
306323
}
307324

325+
private static boolean readHeavyOperatorExecutionEnabled() throws Exception {
326+
Field field = QueryExecutionContext.class.getDeclaredField("heavyOperatorExecutionEnabled");
327+
field.setAccessible(true);
328+
return field.getBoolean(null);
329+
}
330+
308331
private static Set<Long> threadIds(String threadName) {
309332
Set<Long> ids = new HashSet<>();
310333
for (Thread thread : Thread.getAllStackTraces().keySet()) {

core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/iterator/GroupIterator.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,7 @@ private BiConsumer<Entry, MutableBindingSet> makeBindSolution(
278278

279279
private Collection<Entry> buildEntries(List<AggregatePredicateCollectorSupplier<?, ?>> aggregates)
280280
throws QueryEvaluationException {
281+
QueryExecutionContext.throwIfHeavyOperatorExecutionDisabled(OPERATOR_NAME);
281282
QueryExecutionContext.markHeavy(OPERATOR_NAME);
282283
QueryExecutionContext.checkpoint(OPERATOR_NAME + "_START");
283284
// store the arguments' iterator so it can be closed while building entries
@@ -303,7 +304,11 @@ private Collection<Entry> buildEntries(List<AggregatePredicateCollectorSupplier<
303304
Map<BindingSetKey, Entry> entries = cf.createGroupByMap();
304305
// Make an optimized hash function valid during this query evaluation step.
305306
ToIntFunction<BindingSet> hashMaker = cf.hashOfBindingSetFuntion(getValues);
306-
while (!isClosed() && iter.hasNext()) {
307+
while (!isClosed()) {
308+
QueryExecutionContext.throwIfHeavyOperatorExecutionDisabled(OPERATOR_NAME);
309+
if (!iter.hasNext()) {
310+
break;
311+
}
307312
BindingSet sol = iter.next();
308313
inputRows++;
309314
if ((inputRows & (BUILD_CHECKPOINT_INTERVAL - 1)) == 0) {

core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/iterator/OrderIterator.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ public OrderIterator(CloseableIteration<BindingSet> iter, Comparator<BindingSet>
9898

9999
@Override
100100
protected CloseableIteration<BindingSet> createIteration() throws QueryEvaluationException {
101+
QueryExecutionContext.throwIfHeavyOperatorExecutionDisabled(OPERATOR_NAME);
101102
QueryExecutionContext.markHeavy(OPERATOR_NAME);
102103
QueryExecutionContext.checkpoint(OPERATOR_NAME + "_START");
103104
BindingSet threshold = null;
@@ -109,6 +110,7 @@ protected CloseableIteration<BindingSet> createIteration() throws QueryEvaluatio
109110
int syncThreshold = (int) Math.min(iterationSyncThreshold, Integer.MAX_VALUE);
110111
try {
111112
while (iter.hasNext()) {
113+
QueryExecutionContext.throwIfHeavyOperatorExecutionDisabled(OPERATOR_NAME);
112114
if (list.size() >= syncThreshold && list.size() < limit) {
113115
QueryExecutionContext.checkpoint(OPERATOR_NAME + "_SPILL");
114116
SerializedQueue<BindingSet> queue = new SerializedQueue<>("orderiter");
@@ -185,6 +187,7 @@ protected void decrement(int amount) throws QueryEvaluationException {
185187
}
186188

187189
private Stream<BindingSet> sort(Collection<BindingSet> collection) {
190+
QueryExecutionContext.throwIfHeavyOperatorExecutionDisabled(OPERATOR_NAME);
188191
QueryExecutionContext.checkpoint(OPERATOR_NAME + "_SORT");
189192
BindingSet[] array = collection.toArray(new BindingSet[collection.size()]);
190193
Arrays.parallelSort(array, comparator);

0 commit comments

Comments
 (0)