Skip to content

Commit acd119d

Browse files
committed
improved detection and monitoring
1 parent 66de5c5 commit acd119d

4 files changed

Lines changed: 340 additions & 21 deletions

File tree

core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/DistinctIteration.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public class DistinctIteration<E> extends FilterIteration<E> {
3131
/**
3232
* The elements that have already been returned.
3333
*/
34-
private final Set<E> excludeSet;
34+
private Set<E> excludeSet;
3535

3636
/*--------------*
3737
* Constructors *
@@ -80,8 +80,14 @@ public DistinctIteration(CloseableIteration<? extends E> iter, Supplier<Set<E>>
8080
*/
8181
@Override
8282
protected boolean accept(E object) {
83-
QueryExecutionContextBridge.markHeavy(OPERATOR_NAME);
84-
QueryExecutionContextBridge.checkpoint(OPERATOR_NAME);
83+
try {
84+
QueryExecutionContextBridge.markHeavy(OPERATOR_NAME);
85+
QueryExecutionContextBridge.checkpoint(OPERATOR_NAME);
86+
}catch (Throwable t) {
87+
excludeSet = null;
88+
throw t;
89+
}
90+
8591
if (inExcludeSet(object)) {
8692
// object has already been returned
8793
return false;
@@ -93,7 +99,7 @@ protected boolean accept(E object) {
9399

94100
@Override
95101
protected void handleClose() {
96-
102+
excludeSet = null;
97103
}
98104

99105
/**

core/http/client/src/main/java/org/eclipse/rdf4j/http/client/QueryCircuitBreaker.java

Lines changed: 140 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,21 @@ public final class QueryCircuitBreaker {
4646

4747
private static final Logger LOGGER = LoggerFactory.getLogger(QueryCircuitBreaker.class);
4848
private static final long DEFAULT_RECOVERY_COOLDOWN_MS = 1000;
49+
private static final long DEFAULT_GC_MONITOR_POLL_MS = 250;
50+
private static final long HIGH_MEMORY_GC_INTERVAL_MS = 5000;
51+
private static final long CRITICAL_MEMORY_GC_INTERVAL_MS = 1000;
52+
private static final String GC_MONITOR_THREAD_NAME = "rdf4j-query-breaker-gc-monitor";
4953
private static final QueryCircuitBreaker INSTANCE = new QueryCircuitBreaker(new QueryPressureMonitor(),
5054
Configuration::fromSystemProperties, System::currentTimeMillis, Thread::sleep,
51-
DEFAULT_RECOVERY_COOLDOWN_MS);
55+
System::gc, DEFAULT_RECOVERY_COOLDOWN_MS, DEFAULT_GC_MONITOR_POLL_MS, true);
5256

5357
private final QueryPressureMonitor pressureMonitor;
5458
private final Supplier<Configuration> configurationSupplier;
5559
private final LongSupplier clock;
5660
private final Sleeper sleeper;
61+
private final GcInvoker gcInvoker;
5762
private final long recoveryCooldownMs;
63+
private final long gcMonitorPollMs;
5864
private final ConcurrentMap<String, QueryCircuitBreakerHandle> activeHandles = new ConcurrentHashMap<>();
5965
private final AtomicLong handleSequence = new AtomicLong();
6066
private final AtomicLong rejectCount = new AtomicLong();
@@ -63,23 +69,37 @@ public final class QueryCircuitBreaker {
6369
private volatile QueryPressureState currentState = QueryPressureState.NORMAL;
6470
private volatile Transition lastTransition = Transition.initial();
6571
private volatile long lastCancelAt = Long.MIN_VALUE;
72+
private volatile QueryPressureState lastMonitorMemoryState = QueryPressureState.NORMAL;
73+
private volatile long lastMonitorGcAt = Long.MIN_VALUE;
6674

6775
public static QueryCircuitBreaker getInstance() {
6876
return INSTANCE;
6977
}
7078

7179
public QueryCircuitBreaker(QueryPressureMonitor pressureMonitor) {
7280
this(pressureMonitor, Configuration::fromSystemProperties, System::currentTimeMillis, Thread::sleep,
73-
DEFAULT_RECOVERY_COOLDOWN_MS);
81+
System::gc, DEFAULT_RECOVERY_COOLDOWN_MS, DEFAULT_GC_MONITOR_POLL_MS, true);
7482
}
7583

7684
QueryCircuitBreaker(QueryPressureMonitor pressureMonitor, Supplier<Configuration> configurationSupplier,
7785
LongSupplier clock, Sleeper sleeper, long recoveryCooldownMs) {
86+
this(pressureMonitor, configurationSupplier, clock, sleeper, System::gc, recoveryCooldownMs,
87+
DEFAULT_GC_MONITOR_POLL_MS, false);
88+
}
89+
90+
QueryCircuitBreaker(QueryPressureMonitor pressureMonitor, Supplier<Configuration> configurationSupplier,
91+
LongSupplier clock, Sleeper sleeper, GcInvoker gcInvoker, long recoveryCooldownMs, long gcMonitorPollMs,
92+
boolean startGcMonitorThread) {
7893
this.pressureMonitor = Objects.requireNonNull(pressureMonitor, "Pressure monitor was null");
7994
this.configurationSupplier = Objects.requireNonNull(configurationSupplier, "Configuration supplier was null");
8095
this.clock = Objects.requireNonNull(clock, "Clock was null");
8196
this.sleeper = Objects.requireNonNull(sleeper, "Sleeper was null");
97+
this.gcInvoker = Objects.requireNonNull(gcInvoker, "GC invoker was null");
8298
this.recoveryCooldownMs = recoveryCooldownMs;
99+
this.gcMonitorPollMs = gcMonitorPollMs;
100+
if (startGcMonitorThread) {
101+
startGcMonitorThread();
102+
}
83103
}
84104

85105
public QueryCircuitBreakerHandle register(QueryCircuitBreakerHandle.Source source, String repositoryId,
@@ -118,14 +138,17 @@ public void beforeExecution(QueryCircuitBreakerHandle handle) throws QueryInterr
118138
}
119139

120140
if (state == QueryPressureState.WARN && configuration.getWarnAdmissionDelayMs() > 0) {
141+
System.gc();
121142
delay(configuration.getWarnAdmissionDelayMs(), configuration, false);
122143
}
123144

124145
if (state.rejectsNewQueries()) {
125146
rejectCount.incrementAndGet();
126147
if (state == QueryPressureState.CRITICAL) {
148+
System.gc();
127149
cancelOneHeavyQueryIfNeeded(configuration, snapshot, "critical-admission");
128150
}
151+
System.gc();
129152
throw CircuitBreakerException.rejected(state, configuration.getRetryAfterSeconds(),
130153
buildPressureMessage("Query rejected by global memory circuit breaker", snapshot, state));
131154
}
@@ -152,6 +175,7 @@ public void checkpoint(QueryCircuitBreakerHandle handle, String operator) throws
152175
QueryPressureMonitor.Snapshot snapshot = pressureMonitor.sample();
153176
QueryPressureState state = refreshState("checkpoint:" + operator, configuration, snapshot);
154177
if (state == QueryPressureState.CRITICAL) {
178+
System.gc();
155179
cancelOneHeavyQueryIfNeeded(configuration, snapshot, "critical-checkpoint:" + operator);
156180
}
157181
if (handle.isCancelRequested()) {
@@ -231,20 +255,115 @@ private QueryPressureState refreshState(String reason, Configuration configurati
231255
}
232256

233257
private QueryPressureState determineState(Configuration configuration, QueryPressureMonitor.Snapshot snapshot) {
234-
if (matches(snapshot, configuration.getCriticalGcMs(), configuration.getCriticalFreeMb())) {
258+
QueryPressureState gcState = determineGcState(configuration, snapshot.getRollingGcMs());
259+
QueryPressureState freeMemoryState = determineFreeMemoryState(configuration, snapshot.getFreeMemoryMb());
260+
return max(freeMemoryState, capGcState(gcState, freeMemoryState));
261+
}
262+
263+
void runGcMonitorCycle() {
264+
Configuration configuration = configurationSupplier.get();
265+
QueryPressureMonitor.Snapshot snapshot = pressureMonitor.sample();
266+
refreshState("monitor", configuration, snapshot);
267+
268+
QueryPressureState freeMemoryState = configuration.isEnabled()
269+
? determineFreeMemoryState(configuration, snapshot.getFreeMemoryMb())
270+
: QueryPressureState.NORMAL;
271+
if (freeMemoryState != lastMonitorMemoryState) {
272+
lastMonitorMemoryState = freeMemoryState;
273+
lastMonitorGcAt = Long.MIN_VALUE;
274+
}
275+
276+
long intervalMs = gcMonitorIntervalMs(freeMemoryState);
277+
if (intervalMs < 0) {
278+
return;
279+
}
280+
281+
long now = clock.getAsLong();
282+
if (lastMonitorGcAt != Long.MIN_VALUE && now - lastMonitorGcAt < intervalMs) {
283+
return;
284+
}
285+
286+
lastMonitorGcAt = now;
287+
LOGGER.info("Query circuit breaker requesting System.gc() monitorState={} freeMb={} rollingGcMs={}",
288+
freeMemoryState, snapshot.getFreeMemoryMb(), snapshot.getRollingGcMs());
289+
gcInvoker.runGc();
290+
}
291+
292+
private void startGcMonitorThread() {
293+
Thread gcMonitorThread = new Thread(this::runGcMonitorLoop, GC_MONITOR_THREAD_NAME);
294+
gcMonitorThread.setDaemon(true);
295+
gcMonitorThread.start();
296+
}
297+
298+
private void runGcMonitorLoop() {
299+
while (!Thread.currentThread().isInterrupted()) {
300+
try {
301+
runGcMonitorCycle();
302+
Thread.sleep(gcMonitorPollMs);
303+
} catch (InterruptedException e) {
304+
Thread.currentThread().interrupt();
305+
return;
306+
} catch (RuntimeException e) {
307+
LOGGER.warn("Query circuit breaker GC monitor loop failed", e);
308+
}
309+
}
310+
}
311+
312+
private long gcMonitorIntervalMs(QueryPressureState freeMemoryState) {
313+
if (freeMemoryState == QueryPressureState.CRITICAL) {
314+
return CRITICAL_MEMORY_GC_INTERVAL_MS;
315+
}
316+
if (freeMemoryState == QueryPressureState.HIGH) {
317+
return HIGH_MEMORY_GC_INTERVAL_MS;
318+
}
319+
return -1;
320+
}
321+
322+
private QueryPressureState determineGcState(Configuration configuration, int rollingGcMs) {
323+
if (rollingGcMs >= configuration.getCriticalGcMs()) {
324+
return QueryPressureState.CRITICAL;
325+
}
326+
if (rollingGcMs >= configuration.getHighGcMs()) {
327+
return QueryPressureState.HIGH;
328+
}
329+
if (rollingGcMs >= configuration.getWarnGcMs()) {
330+
return QueryPressureState.WARN;
331+
}
332+
return QueryPressureState.NORMAL;
333+
}
334+
335+
private QueryPressureState determineFreeMemoryState(Configuration configuration, int freeMemoryMb) {
336+
if (freeMemoryMb <= configuration.getCriticalFreeMb()) {
235337
return QueryPressureState.CRITICAL;
236338
}
237-
if (matches(snapshot, configuration.getHighGcMs(), configuration.getHighFreeMb())) {
339+
if (freeMemoryMb <= configuration.getHighFreeMb()) {
238340
return QueryPressureState.HIGH;
239341
}
240-
if (matches(snapshot, configuration.getWarnGcMs(), configuration.getWarnFreeMb())) {
342+
if (freeMemoryMb <= configuration.getWarnFreeMb()) {
241343
return QueryPressureState.WARN;
242344
}
243345
return QueryPressureState.NORMAL;
244346
}
245347

246-
private boolean matches(QueryPressureMonitor.Snapshot snapshot, int gcThresholdMs, int freeThresholdMb) {
247-
return snapshot.getRollingGcMs() >= gcThresholdMs || snapshot.getFreeMemoryMb() <= freeThresholdMb;
348+
private QueryPressureState capGcState(QueryPressureState gcState, QueryPressureState freeMemoryState) {
349+
switch (freeMemoryState) {
350+
case NORMAL:
351+
return min(gcState, QueryPressureState.WARN);
352+
case WARN:
353+
return min(gcState, QueryPressureState.HIGH);
354+
case HIGH:
355+
case CRITICAL:
356+
default:
357+
return gcState;
358+
}
359+
}
360+
361+
private QueryPressureState min(QueryPressureState left, QueryPressureState right) {
362+
return left.ordinal() <= right.ordinal() ? left : right;
363+
}
364+
365+
private QueryPressureState max(QueryPressureState left, QueryPressureState right) {
366+
return left.ordinal() >= right.ordinal() ? left : right;
248367
}
249368

250369
private synchronized void cancelOneHeavyQueryIfNeeded(Configuration configuration,
@@ -312,6 +431,11 @@ interface Sleeper {
312431
void sleep(long millis) throws InterruptedException;
313432
}
314433

434+
@FunctionalInterface
435+
interface GcInvoker {
436+
void runGc();
437+
}
438+
315439
public static final class CircuitBreakerException extends QueryInterruptedException {
316440
private static final long serialVersionUID = -5525396489058918736L;
317441

@@ -518,13 +642,16 @@ private Configuration(boolean enabled, int warnGcMs, int highGcMs, int criticalG
518642

519643
static Configuration fromSystemProperties() {
520644
return new Configuration(Boolean.parseBoolean(System.getProperty(ENABLED_PROPERTY, "false")),
521-
getIntProperty(WARN_GC_MS_PROPERTY, 100), getIntProperty(HIGH_GC_MS_PROPERTY, 200),
522-
getIntProperty(CRITICAL_GC_MS_PROPERTY, 300), getIntProperty(WARN_FREE_MB_PROPERTY, 256),
523-
getIntProperty(HIGH_FREE_MB_PROPERTY, 128), getIntProperty(CRITICAL_FREE_MB_PROPERTY, 64),
645+
getIntProperty(WARN_GC_MS_PROPERTY, 100),
646+
getIntProperty(HIGH_GC_MS_PROPERTY, 250),
647+
getIntProperty(CRITICAL_GC_MS_PROPERTY, 400),
648+
getIntProperty(WARN_FREE_MB_PROPERTY, 256),
649+
getIntProperty(HIGH_FREE_MB_PROPERTY, 128),
650+
getIntProperty(CRITICAL_FREE_MB_PROPERTY, 96),
524651
getIntProperty(WARN_ADMISSION_DELAY_MS_PROPERTY, 50),
525-
getIntProperty(CHECKPOINT_DELAY_MS_PROPERTY, 25),
526-
getIntProperty(CANCEL_COOLDOWN_MS_PROPERTY, 5000),
527-
getIntProperty(RETRY_AFTER_SECONDS_PROPERTY, 5));
652+
getIntProperty(CHECKPOINT_DELAY_MS_PROPERTY, 10),
653+
getIntProperty(CANCEL_COOLDOWN_MS_PROPERTY, 1000),
654+
getIntProperty(RETRY_AFTER_SECONDS_PROPERTY, 3));
528655
}
529656

530657
private static int getIntProperty(String propertyName, int defaultValue) {

core/http/client/src/main/java/org/eclipse/rdf4j/http/client/QueryExecutionContext.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
*/
2121
public final class QueryExecutionContext {
2222

23+
private static final int CHECKPOINT_STRIDE = 16384;
24+
private static final int CHECKPOINT_MASK = CHECKPOINT_STRIDE - 1;
2325
private static final ThreadLocal<State> CURRENT = new ThreadLocal<>();
2426

2527
private QueryExecutionContext() {
@@ -57,9 +59,9 @@ public static void markHeavy(String operator) {
5759
}
5860

5961
public static void checkpoint(String operator) throws QueryInterruptedException {
60-
QueryCircuitBreakerHandle handle = getHandle();
61-
if (handle != null) {
62-
QueryCircuitBreaker.getInstance().checkpoint(handle, operator);
62+
State state = CURRENT.get();
63+
if (state != null && state.shouldCheckpoint()) {
64+
QueryCircuitBreaker.getInstance().checkpoint(state.handle, operator);
6365
}
6466
}
6567

@@ -71,10 +73,15 @@ public interface Activation extends AutoCloseable {
7173
private static final class State {
7274
private final QueryCircuitBreakerHandle handle;
7375
private final State previous;
76+
private int checkpointCalls;
7477

7578
private State(QueryCircuitBreakerHandle handle, State previous) {
7679
this.handle = handle;
7780
this.previous = previous;
7881
}
82+
83+
private boolean shouldCheckpoint() {
84+
return ((++checkpointCalls) & CHECKPOINT_MASK) == 0;
85+
}
7986
}
8087
}

0 commit comments

Comments
 (0)