Skip to content

Commit 41d8c2a

Browse files
committed
improved detection and monitoring
1 parent e4421e8 commit 41d8c2a

2 files changed

Lines changed: 101 additions & 8 deletions

File tree

core/http/client/src/main/java/org/eclipse/rdf4j/http/client/QueryCircuitBreaker.java

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ public void beforeExecution(QueryCircuitBreakerHandle handle) throws QueryInterr
138138
}
139139

140140
if (state == QueryPressureState.WARN && configuration.getWarnAdmissionDelayMs() > 0) {
141-
delay(configuration.getWarnAdmissionDelayMs(), configuration, false);
141+
delay(configuration.getWarnAdmissionDelayMs(), state, configuration, false);
142142
}
143143

144144
if (state.rejectsNewQueries()) {
@@ -180,8 +180,8 @@ public void checkpoint(QueryCircuitBreakerHandle handle, String operator) throws
180180
throw CircuitBreakerException.cancelled(handle.getCancellationState(), configuration.getRetryAfterSeconds(),
181181
handle.getCancellationReason());
182182
}
183-
if (state == QueryPressureState.WARN && configuration.getCheckpointDelayMs() > 0) {
184-
delay(configuration.getCheckpointDelayMs(), configuration, true);
183+
if (state.ordinal() >= QueryPressureState.HIGH.ordinal() && configuration.getCheckpointDelayMs() > 0) {
184+
delay(configuration.getCheckpointDelayMs(), state, configuration, true);
185185
}
186186
}
187187

@@ -224,13 +224,13 @@ public static CircuitBreakerException asCircuitBreakerException(Throwable throwa
224224
return null;
225225
}
226226

227-
private void delay(long delayMs, Configuration configuration, boolean checkpointDelay)
227+
private void delay(long delayMs, QueryPressureState state, Configuration configuration, boolean checkpointDelay)
228228
throws QueryInterruptedException {
229229
try {
230230
sleeper.sleep(delayMs);
231231
} catch (InterruptedException e) {
232232
Thread.currentThread().interrupt();
233-
throw CircuitBreakerException.cancelled(QueryPressureState.WARN, configuration.getRetryAfterSeconds(),
233+
throw CircuitBreakerException.cancelled(state, configuration.getRetryAfterSeconds(),
234234
checkpointDelay ? "Query interrupted during breaker checkpoint delay"
235235
: "Query interrupted during breaker admission delay",
236236
e);
@@ -644,6 +644,8 @@ public boolean isCancelRequested() {
644644
}
645645

646646
public static final class Configuration {
647+
private static final long BYTES_PER_MB = 1024L * 1024L;
648+
647649
private final boolean enabled;
648650
private final int warnGcMs;
649651
private final int highGcMs;
@@ -677,15 +679,29 @@ static Configuration fromSystemProperties() {
677679
getIntProperty(WARN_GC_MS_PROPERTY, 200),
678680
getIntProperty(HIGH_GC_MS_PROPERTY, 400),
679681
getIntProperty(CRITICAL_GC_MS_PROPERTY, 800),
680-
getIntProperty(WARN_FREE_MB_PROPERTY, 256),
681-
getIntProperty(HIGH_FREE_MB_PROPERTY, 128),
682+
getIntProperty(WARN_FREE_MB_PROPERTY, defaultWarnFreeMb()),
683+
getIntProperty(HIGH_FREE_MB_PROPERTY, defaultHighFreeMb()),
682684
getIntProperty(CRITICAL_FREE_MB_PROPERTY, 96),
683685
getIntProperty(WARN_ADMISSION_DELAY_MS_PROPERTY, 50),
684686
getIntProperty(CHECKPOINT_DELAY_MS_PROPERTY, 1),
685687
getIntProperty(CANCEL_COOLDOWN_MS_PROPERTY, 1000),
686688
getIntProperty(RETRY_AFTER_SECONDS_PROPERTY, 3));
687689
}
688690

691+
private static int defaultWarnFreeMb() {
692+
return Math.max(256, percentOfMaxMemoryMb(2));
693+
}
694+
695+
private static int defaultHighFreeMb() {
696+
return Math.max(128, percentOfMaxMemoryMb(1));
697+
}
698+
699+
private static int percentOfMaxMemoryMb(int percent) {
700+
long scaledBytes = Runtime.getRuntime().maxMemory() * percent;
701+
long divisor = 100L * BYTES_PER_MB;
702+
return (int) ((scaledBytes + divisor - 1) / divisor);
703+
}
704+
689705
private static int getIntProperty(String propertyName, int defaultValue) {
690706
String rawValue = System.getProperty(propertyName);
691707
if (rawValue == null || rawValue.isBlank()) {

core/http/client/src/test/java/org/eclipse/rdf4j/http/client/QueryCircuitBreakerTest.java

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
import java.lang.reflect.Constructor;
2323
import java.lang.reflect.Field;
24+
import java.nio.charset.StandardCharsets;
25+
import java.nio.file.Path;
2426
import java.util.ArrayList;
2527
import java.util.HashSet;
2628
import java.util.List;
@@ -66,6 +68,14 @@ void shouldStartBackgroundMonitorThreadForHighMemoryPressure() throws Exception
6668
}
6769
}
6870

71+
@Test
72+
void shouldDeriveDefaultFreeMemoryThresholdsFromMaxHeap() throws Exception {
73+
Thresholds thresholds = readDefaultThresholdsFromFreshJvm("12801m");
74+
75+
assertEquals(257, thresholds.warnFreeMb);
76+
assertEquals(129, thresholds.highFreeMb);
77+
}
78+
6979
@Test
7080
void shouldThrottleExecutionContextCheckpointToEvery16384Calls() throws Exception {
7181
PropertiesScope properties = new PropertiesScope().with(QueryCircuitBreaker.ENABLED_PROPERTY, "false");
@@ -221,6 +231,25 @@ void shouldHoldCriticalStateUntilRecoveryCooldownExpires() {
221231
assertEquals("NORMAL", breaker.snapshotStatus().getState());
222232
}
223233

234+
@Test
235+
void shouldThrottleCheckpointsOnlyAtHighOrAbove() {
236+
Fixture fixture = new Fixture();
237+
QueryCircuitBreaker breaker = fixture.breaker(configuration(true, 100, 200, 300, 400, 300, 200, 25, 10, 1000,
238+
7), 1000);
239+
QueryCircuitBreakerHandle handle = breaker.register(QueryCircuitBreakerHandle.Source.SERVER, "repo",
240+
"checkpoint-query");
241+
242+
fixture.freeMemoryMb.set(350);
243+
breaker.checkpoint(handle, "JOIN");
244+
assertEquals(List.of(), fixture.sleeps);
245+
246+
fixture.freeMemoryMb.set(250);
247+
breaker.checkpoint(handle, "JOIN");
248+
assertEquals(List.of(10L), fixture.sleeps);
249+
250+
breaker.complete(handle);
251+
}
252+
224253
@Test
225254
void shouldDelayWarnAdmissionAndRejectHighAdmission() {
226255
Fixture fixture = new Fixture();
@@ -388,6 +417,23 @@ private static QueryPressureState waitForCurrentState(QueryCircuitBreaker breake
388417
return state;
389418
}
390419

420+
private static Thresholds readDefaultThresholdsFromFreshJvm(String maxHeap) throws Exception {
421+
String classpath = System.getProperty("java.class.path");
422+
String javaExecutable = Path.of(System.getProperty("java.home"), "bin", "java").toString();
423+
Process process = new ProcessBuilder(javaExecutable, "-Xmx" + maxHeap,
424+
"-D" + QueryCircuitBreaker.WARN_FREE_MB_PROPERTY + "=",
425+
"-D" + QueryCircuitBreaker.HIGH_FREE_MB_PROPERTY + "=",
426+
"-cp", classpath, DefaultThresholdProbe.class.getName())
427+
.redirectErrorStream(true)
428+
.start();
429+
String output = new String(process.getInputStream().readAllBytes(), StandardCharsets.UTF_8).trim();
430+
int exitCode = process.waitFor();
431+
assertEquals(0, exitCode, output);
432+
433+
String[] parts = output.split(",");
434+
return new Thresholds(Integer.parseInt(parts[0]), Integer.parseInt(parts[1]));
435+
}
436+
391437
private static final class PropertiesScope {
392438
private final List<String> keys = new ArrayList<>();
393439
private final List<String> previousValues = new ArrayList<>();
@@ -400,9 +446,21 @@ private PropertiesScope with(String key, String value) {
400446
return this;
401447
}
402448

449+
private PropertiesScope without(String key) {
450+
keys.add(key);
451+
previousValues.add(System.getProperty(key));
452+
nextValues.add(null);
453+
return this;
454+
}
455+
403456
private void apply() {
404457
for (int i = 0; i < keys.size(); i++) {
405-
System.setProperty(keys.get(i), nextValues.get(i));
458+
String nextValue = nextValues.get(i);
459+
if (nextValue == null) {
460+
System.clearProperty(keys.get(i));
461+
} else {
462+
System.setProperty(keys.get(i), nextValue);
463+
}
406464
}
407465
}
408466

@@ -436,4 +494,23 @@ private QueryCircuitBreaker breaker(QueryCircuitBreaker.Configuration configurat
436494
recoveryCooldownMs, 1, false);
437495
}
438496
}
497+
498+
public static final class DefaultThresholdProbe {
499+
public static void main(String[] args) {
500+
QueryCircuitBreaker.Configuration configuration = QueryCircuitBreaker.Configuration.fromSystemProperties();
501+
System.out.print(configuration.getWarnFreeMb());
502+
System.out.print(",");
503+
System.out.print(configuration.getHighFreeMb());
504+
}
505+
}
506+
507+
private static final class Thresholds {
508+
private final int warnFreeMb;
509+
private final int highFreeMb;
510+
511+
private Thresholds(int warnFreeMb, int highFreeMb) {
512+
this.warnFreeMb = warnFreeMb;
513+
this.highFreeMb = highFreeMb;
514+
}
515+
}
439516
}

0 commit comments

Comments
 (0)