Skip to content

Commit ee5e7f3

Browse files
committed
improved detection and monitoring
1 parent 7e10c8c commit ee5e7f3

4 files changed

Lines changed: 273 additions & 17 deletions

File tree

core/http/client/src/main/java/org/eclipse/rdf4j/http/client/QueryCircuitBreaker.java

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public final class QueryCircuitBreaker {
4949
private static final long DEFAULT_GC_MONITOR_POLL_MS = 250;
5050
private static final long HIGH_MEMORY_GC_INTERVAL_MS = 5000;
5151
private static final long CRITICAL_MEMORY_GC_INTERVAL_MS = 1000;
52+
private static final long THROTTLE_WARNING_INTERVAL_MS = 60_000;
5253
private static final String GC_MONITOR_THREAD_NAME = "rdf4j-query-breaker-gc-monitor";
5354
private static final QueryCircuitBreaker INSTANCE = new QueryCircuitBreaker(new QueryPressureMonitor(),
5455
Configuration::fromSystemProperties, System::currentTimeMillis, Thread::sleep,
@@ -65,6 +66,8 @@ public final class QueryCircuitBreaker {
6566
private final AtomicLong handleSequence = new AtomicLong();
6667
private final AtomicLong rejectCount = new AtomicLong();
6768
private final AtomicLong cancelCount = new AtomicLong();
69+
private final AtomicLong lastEntryThrottleWarningAt = new AtomicLong(Long.MIN_VALUE);
70+
private final AtomicLong lastRunningThrottleWarningAt = new AtomicLong(Long.MIN_VALUE);
6871

6972
private volatile QueryPressureState currentState = QueryPressureState.NORMAL;
7073
private volatile Transition lastTransition = Transition.initial();
@@ -137,6 +140,8 @@ public void beforeExecution(QueryCircuitBreakerHandle handle) throws QueryInterr
137140
return;
138141
}
139142

143+
warnAboutNewQueryThrottlingIfNeeded(state, configuration, snapshot);
144+
140145
if (state == QueryPressureState.WARN && configuration.getWarnAdmissionDelayMs() > 0) {
141146
delay(configuration.getWarnAdmissionDelayMs(), state, configuration, false);
142147
}
@@ -159,6 +164,8 @@ public void markHeavy(QueryCircuitBreakerHandle handle, String operator) {
159164
handle.markHeavy(operator, clock.getAsLong());
160165
}
161166

167+
int throttleCount = 0;
168+
162169
public void checkpoint(QueryCircuitBreakerHandle handle, String operator) throws QueryInterruptedException {
163170
if (handle == null) {
164171
return;
@@ -172,6 +179,7 @@ public void checkpoint(QueryCircuitBreakerHandle handle, String operator) throws
172179

173180
QueryPressureMonitor.Snapshot snapshot = pressureMonitor.sample();
174181
QueryPressureState state = refreshState("checkpoint:" + operator, configuration, snapshot);
182+
warnAboutRunningQueryThrottlingIfNeeded(state, snapshot);
175183
if (state == QueryPressureState.CRITICAL) {
176184
System.gc();
177185
cancelOneHeavyQueryIfNeeded(configuration, snapshot, "critical-checkpoint:" + operator);
@@ -180,7 +188,7 @@ public void checkpoint(QueryCircuitBreakerHandle handle, String operator) throws
180188
throw CircuitBreakerException.cancelled(handle.getCancellationState(), configuration.getRetryAfterSeconds(),
181189
handle.getCancellationReason());
182190
}
183-
if (state.ordinal() >= QueryPressureState.HIGH.ordinal() && configuration.getCheckpointDelayMs() > 0) {
191+
if (state.throttlesRunningQueries() && configuration.getCheckpointDelayMs() > 0 && (throttleCount++)%256==0) {
184192
delay(configuration.getCheckpointDelayMs(), state, configuration, true);
185193
}
186194
}
@@ -260,11 +268,56 @@ private QueryPressureState refreshState(String reason, Configuration configurati
260268
System.gc();
261269
}
262270
}
263-
QueryExecutionContext.setIgnoreCheckpointStride(currentState != QueryPressureState.NORMAL);
271+
if (!throttlesNewQueriesAtEntry(currentState, configuration)) {
272+
lastEntryThrottleWarningAt.set(Long.MIN_VALUE);
273+
}
274+
if (!currentState.throttlesRunningQueries()) {
275+
lastRunningThrottleWarningAt.set(Long.MIN_VALUE);
276+
}
277+
QueryExecutionContext.setIgnoreCheckpointStride(currentState.throttlesRunningQueries());
264278
QueryExecutionContext.setHeavyOperatorExecutionEnabled(currentState != QueryPressureState.CRITICAL);
265279
return currentState;
266280
}
267281

282+
private void warnAboutNewQueryThrottlingIfNeeded(QueryPressureState state, Configuration configuration,
283+
QueryPressureMonitor.Snapshot snapshot) {
284+
if (!throttlesNewQueriesAtEntry(state, configuration)
285+
|| !shouldLogThrottleWarning(lastEntryThrottleWarningAt)) {
286+
return;
287+
}
288+
LOGGER.warn(
289+
"Query circuit breaker is throttling new queries at entry state={} action={} freeMb={} rollingGcMs={}",
290+
state, state.rejectsNewQueries() ? "reject" : "delay", snapshot.getFreeMemoryMb(),
291+
snapshot.getRollingGcMs());
292+
}
293+
294+
private void warnAboutRunningQueryThrottlingIfNeeded(QueryPressureState state,
295+
QueryPressureMonitor.Snapshot snapshot) {
296+
if (!state.throttlesRunningQueries() || !shouldLogThrottleWarning(lastRunningThrottleWarningAt)) {
297+
return;
298+
}
299+
LOGGER.warn("Query circuit breaker is throttling running queries state={} freeMb={} rollingGcMs={}", state,
300+
snapshot.getFreeMemoryMb(), snapshot.getRollingGcMs());
301+
}
302+
303+
private boolean throttlesNewQueriesAtEntry(QueryPressureState state, Configuration configuration) {
304+
return (state == QueryPressureState.WARN && configuration.getWarnAdmissionDelayMs() > 0)
305+
|| state.rejectsNewQueries();
306+
}
307+
308+
private boolean shouldLogThrottleWarning(AtomicLong lastWarningAt) {
309+
long now = clock.getAsLong();
310+
while (true) {
311+
long previous = lastWarningAt.get();
312+
if (previous != Long.MIN_VALUE && now - previous < THROTTLE_WARNING_INTERVAL_MS) {
313+
return false;
314+
}
315+
if (lastWarningAt.compareAndSet(previous, now)) {
316+
return true;
317+
}
318+
}
319+
}
320+
268321
private QueryPressureState determineState(Configuration configuration, QueryPressureMonitor.Snapshot snapshot) {
269322
QueryPressureState gcState = determineGcState(configuration, snapshot.getRollingGcMs());
270323
QueryPressureState freeMemoryState = determineFreeMemoryState(configuration, snapshot.getFreeMemoryMb());
@@ -676,8 +729,8 @@ private Configuration(boolean enabled, int warnGcMs, int highGcMs, int criticalG
676729

677730
static Configuration fromSystemProperties() {
678731
return new Configuration(Boolean.parseBoolean(System.getProperty(ENABLED_PROPERTY, "true")),
679-
getIntProperty(WARN_GC_MS_PROPERTY, 200),
680-
getIntProperty(HIGH_GC_MS_PROPERTY, 400),
732+
getIntProperty(WARN_GC_MS_PROPERTY, 400),
733+
getIntProperty(HIGH_GC_MS_PROPERTY, 600),
681734
getIntProperty(CRITICAL_GC_MS_PROPERTY, 800),
682735
getIntProperty(WARN_FREE_MB_PROPERTY, defaultWarnFreeMb()),
683736
getIntProperty(HIGH_FREE_MB_PROPERTY, defaultHighFreeMb()),

core/http/client/src/main/java/org/eclipse/rdf4j/http/client/QueryPressureState.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,11 @@ public enum QueryPressureState {
2020
HIGH,
2121
CRITICAL;
2222

23-
public boolean rejectsNewQueries() {
23+
public boolean throttlesRunningQueries() {
2424
return this == HIGH || this == CRITICAL;
2525
}
26+
27+
public boolean rejectsNewQueries() {
28+
return throttlesRunningQueries();
29+
}
2630
}
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2026 Eclipse RDF4J contributors.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Distribution License v1.0
6+
* which accompanies this distribution, and is available at
7+
* http://www.eclipse.org/org/documents/edl-v10.php.
8+
*
9+
* SPDX-License-Identifier: BSD-3-Clause
10+
*******************************************************************************/
11+
// Some portions generated by Codex
12+
package org.eclipse.rdf4j.http.client;
13+
14+
import static org.junit.jupiter.api.Assertions.assertEquals;
15+
import static org.junit.jupiter.api.Assertions.assertThrows;
16+
import static org.junit.jupiter.api.Assertions.assertTrue;
17+
18+
import java.lang.reflect.Constructor;
19+
import java.util.ArrayList;
20+
import java.util.List;
21+
import java.util.concurrent.atomic.AtomicLong;
22+
23+
import org.junit.jupiter.api.AfterEach;
24+
import org.junit.jupiter.api.BeforeEach;
25+
import org.junit.jupiter.api.Test;
26+
import org.slf4j.LoggerFactory;
27+
28+
import ch.qos.logback.classic.Level;
29+
import ch.qos.logback.classic.Logger;
30+
import ch.qos.logback.classic.spi.ILoggingEvent;
31+
import ch.qos.logback.core.read.ListAppender;
32+
33+
class QueryCircuitBreakerLoggingTest {
34+
35+
private static final Constructor<QueryCircuitBreaker.Configuration> CONFIGURATION_CONSTRUCTOR = configurationCtor();
36+
private static final Logger LOGGER = (Logger) LoggerFactory.getLogger(QueryCircuitBreaker.class);
37+
private static final String ENTRY_THROTTLE_WARNING = "Query circuit breaker is throttling new queries at entry";
38+
private static final String RUNNING_THROTTLE_WARNING = "Query circuit breaker is throttling running queries";
39+
40+
private ListAppender<ILoggingEvent> listAppender;
41+
42+
@BeforeEach
43+
void attachAppender() {
44+
listAppender = new ListAppender<>();
45+
listAppender.start();
46+
LOGGER.addAppender(listAppender);
47+
}
48+
49+
@AfterEach
50+
void detachAppender() {
51+
LOGGER.detachAppender(listAppender);
52+
listAppender.stop();
53+
}
54+
55+
@Test
56+
void shouldWarnWhenThrottlingNewQueriesAtEntryOncePerMinute() {
57+
Fixture fixture = new Fixture();
58+
QueryCircuitBreaker breaker = fixture.breaker(configuration(true, 100, 200, 300, 400, 300, 200, 25, 10, 1000,
59+
7), 1000);
60+
List<QueryCircuitBreakerHandle> handles = new ArrayList<>();
61+
62+
try {
63+
fixture.freeMemoryMb.set(350);
64+
QueryCircuitBreakerHandle warnHandle = registerHandle(breaker, handles, "warn-entry-1");
65+
breaker.beforeExecution(warnHandle);
66+
67+
assertEquals(1, warningCount(ENTRY_THROTTLE_WARNING));
68+
assertTrue(lastWarning(ENTRY_THROTTLE_WARNING).contains("action=delay"));
69+
70+
QueryCircuitBreakerHandle secondWarnHandle = registerHandle(breaker, handles, "warn-entry-2");
71+
breaker.beforeExecution(secondWarnHandle);
72+
assertEquals(1, warningCount(ENTRY_THROTTLE_WARNING));
73+
74+
fixture.clock.addAndGet(60_000);
75+
QueryCircuitBreakerHandle thirdWarnHandle = registerHandle(breaker, handles, "warn-entry-3");
76+
breaker.beforeExecution(thirdWarnHandle);
77+
assertEquals(2, warningCount(ENTRY_THROTTLE_WARNING));
78+
79+
fixture.clock.addAndGet(60_000);
80+
fixture.freeMemoryMb.set(250);
81+
QueryCircuitBreakerHandle highHandle = registerHandle(breaker, handles, "high-entry");
82+
QueryCircuitBreaker.CircuitBreakerException exception = assertThrows(
83+
QueryCircuitBreaker.CircuitBreakerException.class, () -> breaker.beforeExecution(highHandle));
84+
85+
assertTrue(exception.isRejected());
86+
assertEquals(3, warningCount(ENTRY_THROTTLE_WARNING));
87+
assertTrue(lastWarning(ENTRY_THROTTLE_WARNING).contains("action=reject"));
88+
} finally {
89+
handles.forEach(breaker::complete);
90+
}
91+
}
92+
93+
@Test
94+
void shouldWarnWhenThrottlingRunningQueriesOncePerMinute() {
95+
Fixture fixture = new Fixture();
96+
QueryCircuitBreaker breaker = fixture.breaker(configuration(true, 100, 200, 300, 400, 300, 200, 25, 10, 1000,
97+
7), 1000);
98+
QueryCircuitBreakerHandle handle = breaker.register(QueryCircuitBreakerHandle.Source.SERVER, "repo",
99+
"running-query");
100+
101+
try {
102+
fixture.freeMemoryMb.set(250);
103+
breaker.checkpoint(handle, "JOIN");
104+
assertEquals(1, warningCount(RUNNING_THROTTLE_WARNING));
105+
106+
breaker.checkpoint(handle, "JOIN");
107+
assertEquals(1, warningCount(RUNNING_THROTTLE_WARNING));
108+
109+
fixture.clock.addAndGet(60_000);
110+
breaker.checkpoint(handle, "JOIN");
111+
assertEquals(2, warningCount(RUNNING_THROTTLE_WARNING));
112+
} finally {
113+
breaker.complete(handle);
114+
}
115+
}
116+
117+
private int warningCount(String messageFragment) {
118+
return (int) listAppender.list.stream()
119+
.filter(event -> event.getLevel() == Level.WARN)
120+
.map(ILoggingEvent::getFormattedMessage)
121+
.filter(message -> message.contains(messageFragment))
122+
.count();
123+
}
124+
125+
private String lastWarning(String messageFragment) {
126+
return listAppender.list.stream()
127+
.filter(event -> event.getLevel() == Level.WARN)
128+
.map(ILoggingEvent::getFormattedMessage)
129+
.filter(message -> message.contains(messageFragment))
130+
.reduce((first, second) -> second)
131+
.orElseThrow();
132+
}
133+
134+
private QueryCircuitBreakerHandle registerHandle(QueryCircuitBreaker breaker,
135+
List<QueryCircuitBreakerHandle> handles, String queryText) {
136+
QueryCircuitBreakerHandle handle = breaker.register(QueryCircuitBreakerHandle.Source.SERVER, "repo", queryText);
137+
handles.add(handle);
138+
return handle;
139+
}
140+
141+
private static QueryCircuitBreaker.Configuration configuration(boolean enabled, int warnGcMs, int highGcMs,
142+
int criticalGcMs, int warnFreeMb, int highFreeMb, int criticalFreeMb, int warnAdmissionDelayMs,
143+
int checkpointDelayMs, int cancelCooldownMs, int retryAfterSeconds) {
144+
try {
145+
return CONFIGURATION_CONSTRUCTOR.newInstance(enabled, warnGcMs, highGcMs, criticalGcMs, warnFreeMb,
146+
highFreeMb, criticalFreeMb, warnAdmissionDelayMs, checkpointDelayMs, cancelCooldownMs,
147+
retryAfterSeconds);
148+
} catch (Exception e) {
149+
throw new IllegalStateException("Unable to construct query breaker configuration", e);
150+
}
151+
}
152+
153+
private static Constructor<QueryCircuitBreaker.Configuration> configurationCtor() {
154+
try {
155+
Constructor<QueryCircuitBreaker.Configuration> constructor = QueryCircuitBreaker.Configuration.class
156+
.getDeclaredConstructor(boolean.class, int.class, int.class, int.class, int.class, int.class,
157+
int.class, int.class, int.class, int.class, int.class);
158+
constructor.setAccessible(true);
159+
return constructor;
160+
} catch (Exception e) {
161+
throw new IllegalStateException("Unable to access query breaker configuration constructor", e);
162+
}
163+
}
164+
165+
private static final class Fixture {
166+
private final AtomicLong clock = new AtomicLong();
167+
private final AtomicLong freeMemoryMb = new AtomicLong(1024);
168+
private final QueryPressureMonitor monitor = new QueryPressureMonitor(freeMemoryMb::get, clock::get, false);
169+
private final List<Long> sleeps = new ArrayList<>();
170+
171+
private QueryCircuitBreaker breaker(QueryCircuitBreaker.Configuration configuration, long recoveryCooldownMs) {
172+
return new QueryCircuitBreaker(monitor, () -> configuration, clock::get, sleeps::add,
173+
recoveryCooldownMs);
174+
}
175+
}
176+
}

core/http/client/src/test/java/org/eclipse/rdf4j/http/client/QueryCircuitBreakerTest.java

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -118,20 +118,37 @@ void shouldDisableHeavyOperatorExecutionOnlyWhileCritical() throws Exception {
118118
}
119119

120120
@Test
121-
void shouldIgnoreCheckpointStrideOutsideNormalState() throws Exception {
122-
Fixture fixture = new Fixture();
123-
QueryCircuitBreaker breaker = fixture.breaker(configuration(true, 100, 200, 300, 400, 300, 200, 25, 10, 0,
124-
7), 0);
125-
126-
retryAssertion(() -> assertFalse(readIgnoreCheckpointStride()));
121+
void shouldIgnoreCheckpointStrideOnlyAtHighOrAbove() throws Exception {
122+
PropertiesScope properties = new PropertiesScope()
123+
.with(QueryCircuitBreaker.ENABLED_PROPERTY, "true")
124+
.with(QueryCircuitBreaker.WARN_GC_MS_PROPERTY, Integer.toString(Integer.MAX_VALUE))
125+
.with(QueryCircuitBreaker.HIGH_GC_MS_PROPERTY, Integer.toString(Integer.MAX_VALUE))
126+
.with(QueryCircuitBreaker.CRITICAL_GC_MS_PROPERTY, Integer.toString(Integer.MAX_VALUE))
127+
.with(QueryCircuitBreaker.WARN_FREE_MB_PROPERTY, Integer.toString(Integer.MAX_VALUE))
128+
.with(QueryCircuitBreaker.HIGH_FREE_MB_PROPERTY, "0")
129+
.with(QueryCircuitBreaker.CRITICAL_FREE_MB_PROPERTY, "0")
130+
.with(QueryCircuitBreaker.CHECKPOINT_DELAY_MS_PROPERTY, "0");
131+
QueryCircuitBreaker breaker = QueryCircuitBreaker.getInstance();
132+
QueryCircuitBreakerHandle handle = breaker.register(QueryCircuitBreakerHandle.Source.SERVER, "repo",
133+
"warn-checkpoint-stride");
127134

128-
fixture.freeMemoryMb.set(350);
129-
assertEquals("WARN", breaker.snapshotStatus().getState());
130-
retryAssertion(() -> assertTrue(readIgnoreCheckpointStride()));
135+
try {
136+
properties.apply();
137+
resetCheckpointCalls();
138+
try (QueryExecutionContext.Activation ignored = QueryExecutionContext.activate(handle)) {
139+
for (int i = 0; i < 1023; i++) {
140+
QueryExecutionContext.checkpoint("JOIN");
141+
}
142+
assertEquals(-1L, handle.getLastHeavyCheckpointMillis());
131143

132-
fixture.freeMemoryMb.set(1024);
133-
assertEquals("NORMAL", breaker.snapshotStatus().getState());
134-
retryAssertion(() -> assertFalse(readIgnoreCheckpointStride()));
144+
QueryExecutionContext.checkpoint("JOIN");
145+
assertTrue(handle.getLastHeavyCheckpointMillis() > 0);
146+
}
147+
} finally {
148+
properties.restore();
149+
breaker.complete(handle);
150+
resetCheckpointCalls();
151+
}
135152
}
136153

137154
private static void retryAssertion(Runnable assertion) throws InterruptedException {
@@ -404,6 +421,12 @@ private static boolean readIgnoreCheckpointStride() {
404421
}
405422
}
406423

424+
private static void resetCheckpointCalls() throws Exception {
425+
Field field = QueryExecutionContext.class.getDeclaredField("checkpointCalls");
426+
field.setAccessible(true);
427+
field.setInt(null, 0);
428+
}
429+
407430
private static Set<Long> threadIds(String threadName) {
408431
Set<Long> ids = new HashSet<>();
409432
for (Thread thread : Thread.getAllStackTraces().keySet()) {

0 commit comments

Comments
 (0)