Skip to content
Draft
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/

// Some portions generated by Codex
package org.eclipse.rdf4j.common.iteration;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Supplier;
Expand All @@ -20,14 +22,16 @@
*/
public class DistinctIteration<E> extends FilterIteration<E> {

private static final String OPERATOR_NAME = "DISTINCT";

/*-----------*
* Variables *
*-----------*/

/**
* The elements that have already been returned.
*/
private final Set<E> excludeSet;
private Set<E> excludeSet;

/*--------------*
* Constructors *
Expand Down Expand Up @@ -76,6 +80,15 @@ public DistinctIteration(CloseableIteration<? extends E> iter, Supplier<Set<E>>
*/
@Override
protected boolean accept(E object) {
try {
QueryExecutionContextBridge.throwIfHeavyOperatorExecutionDisabled(OPERATOR_NAME);
QueryExecutionContextBridge.markHeavy(OPERATOR_NAME);
QueryExecutionContextBridge.checkpoint(OPERATOR_NAME);
} catch (Throwable t) {
excludeSet = null;
throw t;
}

if (inExcludeSet(object)) {
// object has already been returned
return false;
Expand All @@ -87,7 +100,7 @@ protected boolean accept(E object) {

@Override
protected void handleClose() {

excludeSet = null;
}

/**
Expand All @@ -104,4 +117,86 @@ private boolean inExcludeSet(E object) {
protected boolean add(E object) {
return excludeSet.add(object);
}

private static final class QueryExecutionContextBridge {

private static final String QUERY_EXECUTION_CONTEXT_CLASS = "org.eclipse.rdf4j.http.client.QueryExecutionContext";

private static volatile boolean initialized;
private static volatile Method markHeavyMethod;
private static volatile Method checkpointMethod;
private static volatile Method throwIfHeavyOperatorExecutionDisabledMethod;

private QueryExecutionContextBridge() {
}

private static void markHeavy(String operator) {
Method method = getMethod(true);
if (method != null) {
invoke(method, operator);
}
}

private static void checkpoint(String operator) {
Method method = getMethod(false);
if (method != null) {
invoke(method, operator);
}
}

private static void throwIfHeavyOperatorExecutionDisabled(String operator) {
if (!initialized) {
initialize();
}
if (throwIfHeavyOperatorExecutionDisabledMethod != null) {
invoke(throwIfHeavyOperatorExecutionDisabledMethod, operator);
}
}

private static Method getMethod(boolean markHeavy) {
if (!initialized) {
initialize();
}
return markHeavy ? markHeavyMethod : checkpointMethod;
}

private static synchronized void initialize() {
if (initialized) {
return;
}
try {
Class<?> contextType = Class.forName(QUERY_EXECUTION_CONTEXT_CLASS);
markHeavyMethod = contextType.getMethod("markHeavy", String.class);
checkpointMethod = contextType.getMethod("checkpoint", String.class);
throwIfHeavyOperatorExecutionDisabledMethod = contextType.getMethod(
"throwIfHeavyOperatorExecutionDisabled", String.class);
} catch (ClassNotFoundException | NoSuchMethodException e) {
markHeavyMethod = null;
checkpointMethod = null;
throwIfHeavyOperatorExecutionDisabledMethod = null;
}
initialized = true;
}

private static void invoke(Method method, String operator) {
try {
method.invoke(null, operator);
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unable to access query execution context bridge", e);
} catch (InvocationTargetException e) {
throw propagate(e.getCause());
}
}

private static RuntimeException propagate(Throwable throwable) {
if (throwable instanceof RuntimeException) {
throw (RuntimeException) throwable;
}
if (throwable instanceof Error) {
throw (Error) throwable;
}
throw new IllegalStateException("Unexpected checked exception from query execution context bridge",
throwable);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*******************************************************************************
* Copyright (c) 2026 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
// Some portions generated by Codex
package org.eclipse.rdf4j.common.iteration;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.List;

import org.eclipse.rdf4j.http.client.QueryExecutionContext;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

class DistinctIterationTest {

@AfterEach
void tearDown() {
QueryExecutionContext.reset();
}

@Test
void shouldConsultQueryExecutionContextWhenFilteringDistinctValues() {
QueryExecutionContext.failOnCheckpoint(new RuntimeException("breaker checkpoint"));

DistinctIteration<Integer> iteration = new DistinctIteration<>(
new CloseableIteratorIteration<>(List.of(1, 2, 3).iterator()), java.util.HashSet::new);

RuntimeException exception = assertThrows(RuntimeException.class, iteration::hasNext);
assertEquals("breaker checkpoint", exception.getMessage());
assertTrue(QueryExecutionContext.getMarkHeavyCalls() > 0);
assertTrue(QueryExecutionContext.getCheckpointCalls() > 0);
}

@Test
void shouldAbortDistinctIterationWhenHeavyOperatorExecutionIsDisabled() {
QueryExecutionContext.disableHeavyOperatorExecution(new RuntimeException("critical-breaker-stop"));

DistinctIteration<Integer> iteration = new DistinctIteration<>(
new CloseableIteratorIteration<>(List.of(1, 2, 3).iterator()), java.util.HashSet::new);

RuntimeException exception = assertThrows(RuntimeException.class, iteration::hasNext);
assertEquals("critical-breaker-stop", exception.getMessage());
assertEquals(0, QueryExecutionContext.getMarkHeavyCalls());
assertEquals(0, QueryExecutionContext.getCheckpointCalls());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*******************************************************************************
* Copyright (c) 2026 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
// Some portions generated by Codex
package org.eclipse.rdf4j.http.client;

import java.util.concurrent.atomic.AtomicInteger;

public final class QueryExecutionContext {

private static final AtomicInteger MARK_HEAVY_CALLS = new AtomicInteger();
private static final AtomicInteger CHECKPOINT_CALLS = new AtomicInteger();

private static volatile boolean heavyOperatorExecutionEnabled = true;
private static volatile RuntimeException checkpointFailure;
private static volatile RuntimeException heavyOperatorExecutionFailure;

private QueryExecutionContext() {
}

public static void markHeavy(String operator) {
MARK_HEAVY_CALLS.incrementAndGet();
}

public static void checkpoint(String operator) {
CHECKPOINT_CALLS.incrementAndGet();
if (checkpointFailure != null) {
throw checkpointFailure;
}
}

public static void throwIfHeavyOperatorExecutionDisabled(String operator) {
if (!heavyOperatorExecutionEnabled) {
throw heavyOperatorExecutionFailure != null ? heavyOperatorExecutionFailure
: new RuntimeException("heavy operator execution disabled: " + operator);
}
}

public static void failOnCheckpoint(RuntimeException runtimeException) {
checkpointFailure = runtimeException;
}

public static void disableHeavyOperatorExecution(RuntimeException runtimeException) {
heavyOperatorExecutionEnabled = false;
heavyOperatorExecutionFailure = runtimeException;
}

public static void enableHeavyOperatorExecution() {
heavyOperatorExecutionEnabled = true;
heavyOperatorExecutionFailure = null;
}

public static int getMarkHeavyCalls() {
return MARK_HEAVY_CALLS.get();
}

public static int getCheckpointCalls() {
return CHECKPOINT_CALLS.get();
}

public static void reset() {
MARK_HEAVY_CALLS.set(0);
CHECKPOINT_CALLS.set(0);
heavyOperatorExecutionEnabled = true;
checkpointFailure = null;
heavyOperatorExecutionFailure = null;
}
}
Loading
Loading