Skip to content

Commit f23bccb

Browse files
authored
GH-5723: propagate maxExecutionTime as per-request HTTP response timeout (#5771)
2 parents d2faa95 + fe55058 commit f23bccb

8 files changed

Lines changed: 132 additions & 31 deletions

File tree

core/http/client-apache5/src/main/java/org/eclipse/rdf4j/http/client/apache5/ApacheHC5RDF4JHttpClient.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
import org.apache.hc.client5.http.classic.methods.HttpPost;
1919
import org.apache.hc.client5.http.classic.methods.HttpPut;
2020
import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase;
21+
import org.apache.hc.client5.http.config.RequestConfig;
2122
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
2223
import org.apache.hc.core5.http.ClassicHttpResponse;
2324
import org.apache.hc.core5.http.ContentType;
2425
import org.apache.hc.core5.http.io.entity.ByteArrayEntity;
2526
import org.apache.hc.core5.http.io.entity.InputStreamEntity;
27+
import org.apache.hc.core5.util.Timeout;
2628
import org.eclipse.rdf4j.http.client.spi.HttpHeader;
2729
import org.eclipse.rdf4j.http.client.spi.HttpRequest;
2830
import org.eclipse.rdf4j.http.client.spi.HttpRequestBody;
@@ -45,9 +47,17 @@ public class ApacheHC5RDF4JHttpClient implements RDF4JHttpClient {
4547
*/
4648
private final int maxRetries408;
4749

48-
public ApacheHC5RDF4JHttpClient(CloseableHttpClient httpClient, int maxConnectionsPerRoute) {
50+
/**
51+
* Default request config used as the base when building per-request overrides, so that factory-level settings
52+
* (connectionRequestTimeout, redirectsEnabled, cookieSpec) are preserved.
53+
*/
54+
private final RequestConfig defaultRequestConfig;
55+
56+
public ApacheHC5RDF4JHttpClient(CloseableHttpClient httpClient, int maxConnectionsPerRoute,
57+
RequestConfig defaultRequestConfig) {
4958
this.httpClient = httpClient;
5059
this.maxRetries408 = maxConnectionsPerRoute + 1;
60+
this.defaultRequestConfig = defaultRequestConfig;
5161
}
5262

5363
@Override
@@ -101,6 +111,14 @@ private HttpUriRequestBase buildRequest(HttpRequest request) throws IOException
101111
hcRequest.addHeader(header.getName(), header.getValue());
102112
}
103113

114+
// Apply per-request response timeout if present, copying from the default config so that
115+
// factory-level settings (connectionRequestTimeout, redirectsEnabled, cookieSpec) are preserved.
116+
request.getResponseTimeout()
117+
.ifPresent(timeout -> hcRequest
118+
.setConfig(RequestConfig.copy(defaultRequestConfig)
119+
.setResponseTimeout(Timeout.of(timeout))
120+
.build()));
121+
104122
// Set body. Repeatable (byte-backed) bodies use ByteArrayEntity so that Apache HC5's
105123
// RedirectExec can follow redirects and the 408 retry loop can re-send the request.
106124
// Single-use stream bodies use InputStreamEntity (non-repeatable); RedirectExec will

core/http/client-apache5/src/main/java/org/eclipse/rdf4j/http/client/apache5/ApacheHC5RDF4JHttpClientFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ public RDF4JHttpClient create(RDF4JHttpClientConfig config) {
136136
}
137137

138138
CloseableHttpClient httpClient = buildHttpClient(builder, config);
139-
return new ApacheHC5RDF4JHttpClient(httpClient, config.getMaxConnectionsPerRoute());
139+
return new ApacheHC5RDF4JHttpClient(httpClient, config.getMaxConnectionsPerRoute(), requestConfig);
140140
}
141141

142142
/**

core/http/client-api/src/main/java/org/eclipse/rdf4j/http/client/spi/HttpRequest.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
package org.eclipse.rdf4j.http.client.spi;
1212

1313
import java.net.URI;
14+
import java.time.Duration;
1415
import java.util.ArrayList;
1516
import java.util.List;
1617
import java.util.Map;
@@ -48,12 +49,14 @@ public class HttpRequest {
4849
private final URI uri;
4950
private final List<HttpHeader> headers;
5051
private final HttpRequestBody body;
52+
private final Duration responseTimeout;
5153

5254
private HttpRequest(Builder builder) {
5355
this.method = builder.method;
5456
this.uri = builder.uri;
5557
this.headers = new ArrayList<>(builder.headers);
5658
this.body = builder.body;
59+
this.responseTimeout = builder.responseTimeout;
5760
}
5861

5962
/**
@@ -142,6 +145,17 @@ public Optional<HttpRequestBody> getBody() {
142145
return Optional.ofNullable(body);
143146
}
144147

148+
/**
149+
* Returns the optional response timeout for this request. When present, the HTTP client must abort the request if
150+
* no response data arrives within this duration. Overrides any client-level socket timeout for this specific
151+
* request.
152+
*
153+
* @return an {@link Optional} containing the response timeout, or empty if no per-request timeout is set
154+
*/
155+
public Optional<Duration> getResponseTimeout() {
156+
return Optional.ofNullable(responseTimeout);
157+
}
158+
145159
/**
146160
* Creates a new {@link Builder} for the given HTTP method and target URI.
147161
*
@@ -165,7 +179,8 @@ public static Builder newBuilder(String method, URI uri) {
165179
public static Builder copyOf(HttpRequest original, URI uri) {
166180
return new Builder(original.method, uri)
167181
.headers(original.headers)
168-
.body(original.body);
182+
.body(original.body)
183+
.responseTimeout(original.responseTimeout);
169184
}
170185

171186
/**
@@ -181,6 +196,7 @@ public static final class Builder {
181196
private final URI uri;
182197
private final List<HttpHeader> headers = new ArrayList<>();
183198
private HttpRequestBody body;
199+
private Duration responseTimeout;
184200

185201
private Builder(String method, URI uri) {
186202
this.method = method;
@@ -236,6 +252,19 @@ public Builder body(HttpRequestBody body) {
236252
return this;
237253
}
238254

255+
/**
256+
* Sets the response timeout for this request. When set, the HTTP client must abort the request if no response
257+
* data arrives within this duration, overriding any client-level socket timeout. Pass {@code null} to clear a
258+
* previously set timeout.
259+
*
260+
* @param timeout the response timeout, or {@code null} for no per-request timeout
261+
* @return this builder
262+
*/
263+
public Builder responseTimeout(Duration timeout) {
264+
this.responseTimeout = timeout;
265+
return this;
266+
}
267+
239268
/**
240269
* Builds and returns the immutable {@link HttpRequest}.
241270
*

core/http/client-jdk/src/main/java/org/eclipse/rdf4j/http/client/jdk/JdkRDF4JHttpClient.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,10 @@ public org.eclipse.rdf4j.http.client.spi.HttpResponse execute(org.eclipse.rdf4j.
5353
builder.header(header.getName(), header.getValue());
5454
}
5555

56-
// Set socket timeout if configured
57-
if (config.getSocketTimeoutMs() > 0) {
56+
// Per-request response timeout takes precedence over the global socket timeout from config
57+
if (request.getResponseTimeout().isPresent()) {
58+
builder.timeout(request.getResponseTimeout().get());
59+
} else if (config.getSocketTimeoutMs() > 0) {
5860
builder.timeout(Duration.ofMillis(config.getSocketTimeoutMs()));
5961
}
6062

core/http/client/src/main/java/org/eclipse/rdf4j/http/client/RDF4JProtocolSession.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.net.URI;
2323
import java.nio.charset.Charset;
2424
import java.nio.charset.StandardCharsets;
25+
import java.time.Duration;
2526
import java.util.ArrayList;
2627
import java.util.List;
2728
import java.util.Locale;
@@ -855,6 +856,9 @@ protected HttpRequest getQueryMethod(QueryLanguage ql, String query, String base
855856
for (Map.Entry<String, String> additionalHeader : getAdditionalHttpHeaders().entrySet()) {
856857
builder.header(additionalHeader.getKey(), additionalHeader.getValue());
857858
}
859+
if (maxQueryTime > 0) {
860+
builder.responseTimeout(Duration.ofSeconds(maxQueryTime));
861+
}
858862
return builder.build();
859863
}
860864

@@ -918,6 +922,9 @@ protected HttpRequest getUpdateMethod(QueryLanguage ql, String update, String ba
918922
for (Map.Entry<String, String> additionalHeader : getAdditionalHttpHeaders().entrySet()) {
919923
builder.header(additionalHeader.getKey(), additionalHeader.getValue());
920924
}
925+
if (maxExecutionTime > 0) {
926+
builder.responseTimeout(Duration.ofSeconds(maxExecutionTime));
927+
}
921928
return builder.build();
922929
}
923930

core/http/client/src/main/java/org/eclipse/rdf4j/http/client/SPARQLProtocolSession.java

Lines changed: 13 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.net.HttpURLConnection;
2323
import java.nio.charset.Charset;
2424
import java.nio.charset.StandardCharsets;
25+
import java.time.Duration;
2526
import java.util.ArrayList;
2627
import java.util.Collections;
2728
import java.util.List;
@@ -498,6 +499,9 @@ protected HttpRequest getQueryMethod(QueryLanguage ql, String query, String base
498499
for (Map.Entry<String, String> additionalHeader : additionalHttpHeaders.entrySet()) {
499500
builder.header(additionalHeader.getKey(), additionalHeader.getValue());
500501
}
502+
if (maxQueryTime > 0) {
503+
builder.responseTimeout(Duration.ofSeconds(maxQueryTime));
504+
}
501505
return builder.build();
502506
}
503507

@@ -529,6 +533,9 @@ protected HttpRequest getUpdateMethod(QueryLanguage ql, String update, String ba
529533
builder.header(additionalHeader.getKey(), additionalHeader.getValue());
530534
}
531535
}
536+
if (maxQueryTime > 0) {
537+
builder.responseTimeout(Duration.ofSeconds(maxQueryTime));
538+
}
532539
return builder.build();
533540
}
534541

@@ -745,11 +752,10 @@ private HttpResponse sendTupleQueryViaHttp(HttpRequest method, Set<QueryResultFo
745752
}
746753
}
747754

748-
HttpRequest methodWithAccept = withHeader(method, ACCEPT_PARAM_NAME,
749-
String.join(", ", acceptValues));
755+
method.addHeader(ACCEPT_PARAM_NAME, String.join(", ", acceptValues));
750756

751757
try {
752-
return executeOK(methodWithAccept);
758+
return executeOK(method);
753759
} catch (RepositoryException | MalformedQueryException | QueryInterruptedException e) {
754760
throw e;
755761
} catch (RDF4JException e) {
@@ -894,11 +900,10 @@ private HttpResponse sendGraphQueryViaHttp(HttpRequest method, boolean requireCo
894900

895901
List<String> acceptParams = RDFFormat.getAcceptParams(rdfFormats, requireContext, getPreferredRDFFormat());
896902

897-
HttpRequest methodWithAccept = withHeader(method, ACCEPT_PARAM_NAME,
898-
String.join(", ", acceptParams));
903+
method.addHeader(ACCEPT_PARAM_NAME, String.join(", ", acceptParams));
899904

900905
try {
901-
return executeOK(methodWithAccept);
906+
return executeOK(method);
902907
} catch (RepositoryException | MalformedQueryException | QueryInterruptedException e) {
903908
throw e;
904909
} catch (RDF4JException e) {
@@ -965,10 +970,9 @@ private HttpResponse sendBooleanQueryViaHttp(HttpRequest method, Set<QueryResult
965970
}
966971
}
967972

968-
HttpRequest methodWithAccept = withHeader(method, ACCEPT_PARAM_NAME,
969-
String.join(", ", acceptValues));
973+
method.addHeader(ACCEPT_PARAM_NAME, String.join(", ", acceptValues));
970974

971-
return executeOK(methodWithAccept);
975+
return executeOK(method);
972976
}
973977

974978
/**
@@ -1243,19 +1247,4 @@ public void setPassThroughEnabled(boolean passThroughEnabled) {
12431247
this.passThroughEnabled = passThroughEnabled;
12441248
}
12451249

1246-
/**
1247-
* Creates a new {@link HttpRequest} with an additional header added to the existing request.
1248-
*
1249-
* @param request the original request
1250-
* @param name the header name
1251-
* @param value the header value
1252-
* @return a new request with the additional header
1253-
*/
1254-
private static HttpRequest withHeader(HttpRequest request, String name, String value) {
1255-
HttpRequest.Builder builder = HttpRequest.newBuilder(request.getMethod(), request.getUri())
1256-
.headers(request.getHeaders())
1257-
.header(name, value);
1258-
request.getBody().ifPresent(builder::body);
1259-
return builder.build();
1260-
}
12611250
}

core/http/client/src/main/java/org/eclipse/rdf4j/http/client/query/AbstractHTTPQuery.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,10 @@ public Binding[] getBindingsArray() {
7070
@Override
7171
public void setMaxExecutionTime(int maxExecutionTimeSeconds) {
7272
super.setMaxExecutionTime(maxExecutionTimeSeconds);
73-
// TODO allow per query timeouts on the http connection used
74-
// Note: connection timeout is now configured via HttpClientConfig when building the client.
73+
// maxExecutionTimeSeconds is propagated as a per-request response timeout on the HTTP connection:
74+
// concrete subclasses pass getMaxExecutionTime() to SPARQLProtocolSession, which sets it on the
75+
// HttpRequest via HttpRequest.Builder#responseTimeout(Duration). The ApacheHC5RDF4JHttpClient
76+
// implementation applies it as a per-request RequestConfig#setResponseTimeout override.
7577
}
7678

7779
@Override

core/http/client/src/test/java/org/eclipse/rdf4j/http/client/SPARQLProtocolSessionTest.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@
2222
import java.io.IOException;
2323
import java.io.InputStream;
2424
import java.nio.charset.StandardCharsets;
25+
import java.time.Duration;
2526
import java.util.List;
2627
import java.util.concurrent.Executors;
2728
import java.util.stream.Stream;
2829

2930
import org.apache.commons.io.IOUtils;
3031
import org.eclipse.rdf4j.http.client.spi.HttpHeader;
32+
import org.eclipse.rdf4j.http.client.spi.HttpRequest;
3133
import org.eclipse.rdf4j.http.client.spi.HttpResponse;
3234
import org.eclipse.rdf4j.http.client.spi.RDF4JHttpClient;
3335
import org.eclipse.rdf4j.http.client.spi.RDF4JHttpClients;
@@ -299,6 +301,58 @@ public void testTupleQuery_Passthrough_ConfiguredFalse(String factoryName, MockS
299301
assertThat(out.toString()).startsWith("<");
300302
}
301303

304+
@ParameterizedTest(name = "[{0}]")
305+
@MethodSource("httpClientFactories")
306+
public void getQueryMethod_setsResponseTimeout_whenMaxQueryTimeIsPositive(String factoryName) {
307+
this.factoryName = factoryName;
308+
sparqlSession = createProtocolSession();
309+
310+
HttpRequest request = sparqlSession.getQueryMethod(
311+
QueryLanguage.SPARQL, "SELECT * WHERE { ?s ?p ?o }", null, null, true, 30);
312+
313+
assertThat(request.getResponseTimeout())
314+
.isPresent()
315+
.hasValue(Duration.ofSeconds(30));
316+
}
317+
318+
@ParameterizedTest(name = "[{0}]")
319+
@MethodSource("httpClientFactories")
320+
public void getQueryMethod_noResponseTimeout_whenMaxQueryTimeIsZero(String factoryName) {
321+
this.factoryName = factoryName;
322+
sparqlSession = createProtocolSession();
323+
324+
HttpRequest request = sparqlSession.getQueryMethod(
325+
QueryLanguage.SPARQL, "SELECT * WHERE { ?s ?p ?o }", null, null, true, 0);
326+
327+
assertThat(request.getResponseTimeout()).isEmpty();
328+
}
329+
330+
@ParameterizedTest(name = "[{0}]")
331+
@MethodSource("httpClientFactories")
332+
public void getUpdateMethod_setsResponseTimeout_whenMaxQueryTimeIsPositive(String factoryName) {
333+
this.factoryName = factoryName;
334+
sparqlSession = createProtocolSession();
335+
336+
HttpRequest request = sparqlSession.getUpdateMethod(
337+
QueryLanguage.SPARQL, "INSERT DATA { <urn:s> <urn:p> <urn:o> }", null, null, true, 30);
338+
339+
assertThat(request.getResponseTimeout())
340+
.isPresent()
341+
.hasValue(Duration.ofSeconds(30));
342+
}
343+
344+
@ParameterizedTest(name = "[{0}]")
345+
@MethodSource("httpClientFactories")
346+
public void getUpdateMethod_noResponseTimeout_whenMaxQueryTimeIsZero(String factoryName) {
347+
this.factoryName = factoryName;
348+
sparqlSession = createProtocolSession();
349+
350+
HttpRequest request = sparqlSession.getUpdateMethod(
351+
QueryLanguage.SPARQL, "INSERT DATA { <urn:s> <urn:p> <urn:o> }", null, null, true, 0);
352+
353+
assertThat(request.getResponseTimeout()).isEmpty();
354+
}
355+
302356
@Test
303357
public void getContentTypeSerialisationTest() {
304358
{

0 commit comments

Comments
 (0)