Skip to content

Commit 675c063

Browse files
committed
working on new ID based join iterator
1 parent 4ad6a3f commit 675c063

4 files changed

Lines changed: 86 additions & 17 deletions

File tree

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2025 Eclipse RDF4J contributors.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Distribution License v1.0
6+
* which accompanies this distribution, and is available at
7+
* http://www.eclipse.org/org/documents/edl-v10.php.
8+
*
9+
* SPDX-License-Identifier: BSD-3-Clause
10+
*******************************************************************************/
11+
package org.eclipse.rdf4j.sail.lmdb.join;
12+
13+
import static org.assertj.core.api.Assertions.assertThat;
14+
import static org.mockito.ArgumentMatchers.anyLong;
15+
import static org.mockito.Mockito.mock;
16+
import static org.mockito.Mockito.never;
17+
import static org.mockito.Mockito.times;
18+
import static org.mockito.Mockito.verify;
19+
import static org.mockito.Mockito.when;
20+
21+
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
22+
import org.eclipse.rdf4j.query.MutableBindingSet;
23+
import org.eclipse.rdf4j.query.algebra.StatementPattern;
24+
import org.eclipse.rdf4j.query.algebra.Var;
25+
import org.eclipse.rdf4j.query.algebra.evaluation.ArrayBindingSet;
26+
import org.eclipse.rdf4j.sail.lmdb.IdBindingInfo;
27+
import org.eclipse.rdf4j.sail.lmdb.ValueStore;
28+
import org.eclipse.rdf4j.sail.lmdb.model.LmdbValue;
29+
import org.junit.jupiter.api.Test;
30+
import org.junit.jupiter.api.parallel.Isolated;
31+
32+
@Isolated
33+
class LmdbIdJoinLazyMaterializationTest {
34+
35+
private static final String PROPERTY = "rdf4j.lmdb.idJoin.lazyMaterialization";
36+
37+
@Test
38+
void eagerMaterializationWhenDisabled() throws Exception {
39+
System.setProperty(PROPERTY, "false");
40+
try {
41+
ValueStore valueStore = mock(ValueStore.class);
42+
LmdbValue materialized = mock(LmdbValue.class);
43+
when(valueStore.getLazyValue(1L)).thenReturn(materialized);
44+
when(valueStore.getLazyValue(2L)).thenReturn(materialized);
45+
46+
StatementPattern pattern = new StatementPattern(
47+
new Var("person"),
48+
new Var("predicate", SimpleValueFactory.getInstance().createIRI("http://example.com/p")),
49+
new Var("item"));
50+
LmdbIdJoinIterator.PatternInfo patternInfo = LmdbIdJoinIterator.PatternInfo.create(pattern);
51+
IdBindingInfo info = IdBindingInfo.fromFirstPattern(patternInfo);
52+
53+
long[] record = new long[] { 1L, 2L };
54+
MutableBindingSet target = new ArrayBindingSet(new String[] { "person", "item" });
55+
boolean result = info.applyRecord(record, target, valueStore);
56+
57+
assertThat(result).isTrue();
58+
verify(valueStore).getLazyValue(1L);
59+
verify(valueStore).getLazyValue(2L);
60+
verify(materialized, times(2)).init();
61+
verify(valueStore, never()).getValue(anyLong());
62+
assertThat(target.getValue("person")).isSameAs(materialized);
63+
assertThat(target.getValue("item")).isSameAs(materialized);
64+
} finally {
65+
System.clearProperty(PROPERTY);
66+
}
67+
}
68+
}

testsuites/repository/src/main/java/org/eclipse/rdf4j/testsuite/repository/optimistic/DeadLockTest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import static org.junit.Assert.assertNull;
1414

1515
import java.util.concurrent.CountDownLatch;
16+
import java.util.concurrent.TimeUnit;
1617

1718
import org.eclipse.rdf4j.common.transaction.IsolationLevel;
1819
import org.eclipse.rdf4j.common.transaction.IsolationLevels;
@@ -91,7 +92,7 @@ public void test() throws Exception {
9192
start.countDown();
9293
a.begin(level);
9394
a.add(PICASSO, RDF.TYPE, PAINTER);
94-
commit.await();
95+
commit.await(10, TimeUnit.SECONDS);
9596
a.commit();
9697
} catch (Exception e) {
9798
e1.initCause(e);
@@ -106,7 +107,7 @@ public void test() throws Exception {
106107
start.countDown();
107108
b.begin(level);
108109
b.add(REMBRANDT, RDF.TYPE, PAINTER);
109-
commit.await();
110+
commit.await(10, TimeUnit.SECONDS);
110111
b.commit();
111112
} catch (Exception e) {
112113
e2.initCause(e);
@@ -115,10 +116,10 @@ public void test() throws Exception {
115116
end.countDown();
116117
}
117118
}).start();
118-
start.await();
119+
start.await(10, TimeUnit.SECONDS);
119120
commit.countDown();
120121
Thread.sleep(500);
121-
end.await();
122+
end.await(10, TimeUnit.SECONDS);
122123
assertNull(e1.getCause());
123124
assertNull(e2.getCause());
124125
}

testsuites/repository/src/main/java/org/eclipse/rdf4j/testsuite/repository/optimistic/DeleteInsertTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public void tearDown() {
6666
}
6767
}
6868

69-
@Test
69+
@Test(timeout = 10000)
7070
public void test() throws Exception {
7171
String load = IOUtil.readString(cl.getResource("test/insert-data.ru"));
7272
con.prepareUpdate(QueryLanguage.SPARQL, load, NS).execute();

testsuites/repository/src/main/java/org/eclipse/rdf4j/testsuite/repository/optimistic/IsolationLevelTest.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ public void testReadUncommitted() {
107107
readPending(IsolationLevels.READ_UNCOMMITTED);
108108
}
109109

110-
@Test
110+
@Test(timeout = 5000)
111111
public void testReadCommitted() throws Exception {
112112
readCommitted(IsolationLevels.READ_COMMITTED);
113113
rollbackTriple(IsolationLevels.READ_COMMITTED);
@@ -194,7 +194,7 @@ private void readCommitted(final IsolationLevel level) throws Exception {
194194
Thread writer = new Thread(() -> {
195195
try (RepositoryConnection write = store.getConnection()) {
196196
start.countDown();
197-
start.await();
197+
start.await(10, TimeUnit.SECONDS);
198198
write.begin(level);
199199
write.add(RDF.NIL, RDF.TYPE, RDF.LIST);
200200
begin.countDown();
@@ -207,8 +207,8 @@ private void readCommitted(final IsolationLevel level) throws Exception {
207207
Thread reader = new Thread(() -> {
208208
try (RepositoryConnection read = store.getConnection()) {
209209
start.countDown();
210-
start.await();
211-
begin.await();
210+
start.await(10, TimeUnit.SECONDS);
211+
begin.await(10, TimeUnit.SECONDS);
212212
read.begin(level);
213213
// must not read uncommitted changes
214214
long counted = count(read, RDF.NIL, RDF.TYPE, RDF.LIST, false);
@@ -245,13 +245,13 @@ private void repeatableRead(final IsolationLevels level) throws Exception {
245245
Thread writer = new Thread(() -> {
246246
try (RepositoryConnection write = store.getConnection()) {
247247
start.countDown();
248-
start.await();
248+
start.await(10, TimeUnit.SECONDS);
249249
write.begin(level);
250250
write.add(RDF.NIL, RDF.TYPE, RDF.LIST);
251251
write.commit();
252252

253253
begin.countDown();
254-
observed.await();
254+
observed.await(10, TimeUnit.SECONDS);
255255

256256
write.begin(level);
257257
write.remove(RDF.NIL, RDF.TYPE, RDF.LIST);
@@ -264,8 +264,8 @@ private void repeatableRead(final IsolationLevels level) throws Exception {
264264
Thread reader = new Thread(() -> {
265265
try (RepositoryConnection read = store.getConnection()) {
266266
start.countDown();
267-
start.await();
268-
begin.await();
267+
start.await(10, TimeUnit.SECONDS);
268+
begin.await(10, TimeUnit.SECONDS);
269269
read.begin(level);
270270
long first = count(read, RDF.NIL, RDF.TYPE, RDF.LIST, false);
271271
assertEquals(1, first);
@@ -343,7 +343,7 @@ private void snapshot(final IsolationLevels level) throws Exception {
343343
try {
344344
try (RepositoryConnection write = store.getConnection()) {
345345
start.countDown();
346-
start.await();
346+
start.await(10, TimeUnit.SECONDS);
347347
write.begin(level);
348348
insertTestStatement(write, 1);
349349
write.commit();
@@ -363,8 +363,8 @@ private void snapshot(final IsolationLevels level) throws Exception {
363363
Thread reader = new Thread(() -> {
364364
try (RepositoryConnection read = store.getConnection()) {
365365
start.countDown();
366-
start.await();
367-
begin.await();
366+
start.await(10, TimeUnit.SECONDS);
367+
begin.await(10, TimeUnit.SECONDS);
368368
read.begin(level);
369369
long first = count(read, null, null, null, false);
370370
observed.countDown();
@@ -431,7 +431,7 @@ protected Thread incrementBy(final CountDownLatch start, final CountDownLatch ob
431431
return new Thread(() -> {
432432
try (RepositoryConnection con = store.getConnection()) {
433433
start.countDown();
434-
start.await();
434+
start.await(10, TimeUnit.SECONDS);
435435
con.begin(level);
436436
Literal o1 = readLiteral(con, subj, pred);
437437
observed.countDown();

0 commit comments

Comments
 (0)