Skip to content

Commit 5327791

Browse files
committed
improved detection and monitoring
1 parent ee5e7f3 commit 5327791

2 files changed

Lines changed: 101 additions & 6 deletions

File tree

core/http/client/src/main/java/org/eclipse/rdf4j/http/client/QueryCircuitBreaker.java

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ public final class QueryCircuitBreaker {
4747
private static final Logger LOGGER = LoggerFactory.getLogger(QueryCircuitBreaker.class);
4848
private static final long DEFAULT_RECOVERY_COOLDOWN_MS = 1000;
4949
private static final long DEFAULT_GC_MONITOR_POLL_MS = 250;
50+
private static final long CHECKPOINT_GC_BASE_INTERVAL_MS = 1000;
51+
private static final int CHECKPOINT_GC_MAX_INTERVAL_STEPS = 10;
52+
private static final long CHECKPOINT_GC_RESET_AFTER_MS = 30_000;
5053
private static final long HIGH_MEMORY_GC_INTERVAL_MS = 5000;
5154
private static final long CRITICAL_MEMORY_GC_INTERVAL_MS = 1000;
5255
private static final long THROTTLE_WARNING_INTERVAL_MS = 60_000;
@@ -74,6 +77,8 @@ public final class QueryCircuitBreaker {
7477
private volatile long lastCancelAt = Long.MIN_VALUE;
7578
private volatile QueryPressureState lastMonitorMemoryState = QueryPressureState.NORMAL;
7679
private volatile long lastMonitorGcAt = Long.MIN_VALUE;
80+
private long lastCheckpointGcAt = Long.MIN_VALUE;
81+
private int checkpointGcIntervalStep;
7782

7883
public static QueryCircuitBreaker getInstance() {
7984
return INSTANCE;
@@ -180,15 +185,18 @@ public void checkpoint(QueryCircuitBreakerHandle handle, String operator) throws
180185
QueryPressureMonitor.Snapshot snapshot = pressureMonitor.sample();
181186
QueryPressureState state = refreshState("checkpoint:" + operator, configuration, snapshot);
182187
warnAboutRunningQueryThrottlingIfNeeded(state, snapshot);
188+
if (shouldRequestCheckpointGc(state, configuration, snapshot)) {
189+
maybeRunCheckpointGc();
190+
}
183191
if (state == QueryPressureState.CRITICAL) {
184-
System.gc();
185192
cancelOneHeavyQueryIfNeeded(configuration, snapshot, "critical-checkpoint:" + operator);
186193
}
187194
if (handle.isCancelRequested()) {
188195
throw CircuitBreakerException.cancelled(handle.getCancellationState(), configuration.getRetryAfterSeconds(),
189196
handle.getCancellationReason());
190197
}
191-
if (state.throttlesRunningQueries() && configuration.getCheckpointDelayMs() > 0 && (throttleCount++)%256==0) {
198+
if (state.throttlesRunningQueries() && configuration.getCheckpointDelayMs() > 0
199+
&& (throttleCount++) % 256 == 0) {
192200
delay(configuration.getCheckpointDelayMs(), state, configuration, true);
193201
}
194202
}
@@ -262,9 +270,7 @@ private QueryPressureState refreshState(String reason, Configuration configurati
262270
LOGGER.info(
263271
"Query circuit breaker transition previous={} current={} freeMb={} rollingGcMs={} reason={}",
264272
previous, next, snapshot.getFreeMemoryMb(), snapshot.getRollingGcMs(), reason);
265-
if (currentState == QueryPressureState.HIGH
266-
|| (currentState == QueryPressureState.WARN && determineFreeMemoryState(configuration,
267-
snapshot.getFreeMemoryMb()) == QueryPressureState.WARN)) {
273+
if (!isCheckpointReason(reason) && shouldRequestTransitionGc(currentState, configuration, snapshot)) {
268274
System.gc();
269275
}
270276
}
@@ -305,6 +311,48 @@ private boolean throttlesNewQueriesAtEntry(QueryPressureState state, Configurati
305311
|| state.rejectsNewQueries();
306312
}
307313

314+
private boolean isCheckpointReason(String reason) {
315+
return reason.startsWith("checkpoint:");
316+
}
317+
318+
private boolean shouldRequestTransitionGc(QueryPressureState state, Configuration configuration,
319+
QueryPressureMonitor.Snapshot snapshot) {
320+
return state == QueryPressureState.HIGH
321+
|| (state == QueryPressureState.WARN
322+
&& determineFreeMemoryState(configuration,
323+
snapshot.getFreeMemoryMb()) == QueryPressureState.WARN);
324+
}
325+
326+
private boolean shouldRequestCheckpointGc(QueryPressureState state, Configuration configuration,
327+
QueryPressureMonitor.Snapshot snapshot) {
328+
return state == QueryPressureState.CRITICAL || shouldRequestTransitionGc(state, configuration, snapshot);
329+
}
330+
331+
private void maybeRunCheckpointGc() {
332+
long now = clock.getAsLong();
333+
if (!shouldRunCheckpointGc(now)) {
334+
return;
335+
}
336+
gcInvoker.runGc();
337+
}
338+
339+
private synchronized boolean shouldRunCheckpointGc(long now) {
340+
if (lastCheckpointGcAt == Long.MIN_VALUE || now - lastCheckpointGcAt > CHECKPOINT_GC_RESET_AFTER_MS) {
341+
lastCheckpointGcAt = now;
342+
checkpointGcIntervalStep = 1;
343+
return true;
344+
}
345+
346+
long requiredDelayMs = checkpointGcIntervalStep * CHECKPOINT_GC_BASE_INTERVAL_MS;
347+
if (now - lastCheckpointGcAt < requiredDelayMs) {
348+
return false;
349+
}
350+
351+
lastCheckpointGcAt = now;
352+
checkpointGcIntervalStep = Math.min(checkpointGcIntervalStep + 1, CHECKPOINT_GC_MAX_INTERVAL_STEPS);
353+
return true;
354+
}
355+
308356
private boolean shouldLogThrottleWarning(AtomicLong lastWarningAt) {
309357
long now = clock.getAsLong();
310358
while (true) {
@@ -728,7 +776,7 @@ private Configuration(boolean enabled, int warnGcMs, int highGcMs, int criticalG
728776
}
729777

730778
static Configuration fromSystemProperties() {
731-
return new Configuration(Boolean.parseBoolean(System.getProperty(ENABLED_PROPERTY, "true")),
779+
return new Configuration(Boolean.parseBoolean(System.getProperty(ENABLED_PROPERTY, "false")),
732780
getIntProperty(WARN_GC_MS_PROPERTY, 400),
733781
getIntProperty(HIGH_GC_MS_PROPERTY, 600),
734782
getIntProperty(CRITICAL_GC_MS_PROPERTY, 800),

core/http/client/src/test/java/org/eclipse/rdf4j/http/client/QueryCircuitBreakerTest.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,53 @@ void shouldRunMonitorGcEverySecondInCriticalMemoryMode() {
246246
assertEquals(List.of(0L, 1000L), fixture.gcInvocations);
247247
}
248248

249+
@Test
250+
void shouldBackOffCheckpointGcRequestsAndResetAfterLongPause() {
251+
Fixture fixture = new Fixture();
252+
QueryCircuitBreaker breaker = fixture.breaker(configuration(true, 100, 200, 300, 400, 300, 200, 25, 0, 1000,
253+
7), 0, () -> fixture.gcInvocations.add(fixture.clock.get()));
254+
QueryCircuitBreakerHandle handle = breaker.register(QueryCircuitBreakerHandle.Source.SERVER, "repo",
255+
"checkpoint-gc-backoff");
256+
257+
fixture.freeMemoryMb.set(250);
258+
breaker.checkpoint(handle, "JOIN");
259+
assertEquals(List.of(0L), fixture.gcInvocations);
260+
261+
fixture.clock.set(999);
262+
breaker.checkpoint(handle, "JOIN");
263+
assertEquals(List.of(0L), fixture.gcInvocations);
264+
265+
fixture.clock.set(1000);
266+
breaker.checkpoint(handle, "JOIN");
267+
assertEquals(List.of(0L, 1000L), fixture.gcInvocations);
268+
269+
fixture.clock.set(2999);
270+
breaker.checkpoint(handle, "JOIN");
271+
assertEquals(List.of(0L, 1000L), fixture.gcInvocations);
272+
273+
fixture.clock.set(3000);
274+
breaker.checkpoint(handle, "JOIN");
275+
assertEquals(List.of(0L, 1000L, 3000L), fixture.gcInvocations);
276+
277+
fixture.clock.set(6000);
278+
breaker.checkpoint(handle, "JOIN");
279+
assertEquals(List.of(0L, 1000L, 3000L, 6000L), fixture.gcInvocations);
280+
281+
fixture.clock.set(36001);
282+
breaker.checkpoint(handle, "JOIN");
283+
assertEquals(List.of(0L, 1000L, 3000L, 6000L, 36001L), fixture.gcInvocations);
284+
285+
fixture.clock.set(37000);
286+
breaker.checkpoint(handle, "JOIN");
287+
assertEquals(List.of(0L, 1000L, 3000L, 6000L, 36001L), fixture.gcInvocations);
288+
289+
fixture.clock.set(37001);
290+
breaker.checkpoint(handle, "JOIN");
291+
assertEquals(List.of(0L, 1000L, 3000L, 6000L, 36001L, 37001L), fixture.gcInvocations);
292+
293+
breaker.complete(handle);
294+
}
295+
249296
@Test
250297
void shouldHoldCriticalStateUntilRecoveryCooldownExpires() {
251298
Fixture fixture = new Fixture();

0 commit comments

Comments
 (0)