5151import java .io .IOException ;
5252import java .lang .invoke .MethodHandles ;
5353import java .lang .invoke .VarHandle ;
54+ import java .lang .ref .Cleaner ;
5455import java .nio .ByteBuffer ;
5556import java .nio .charset .StandardCharsets ;
5657import java .util .Arrays ;
5960import java .util .HashSet ;
6061import java .util .Optional ;
6162import java .util .Set ;
63+ import java .util .WeakHashMap ;
6264import java .util .concurrent .ConcurrentHashMap ;
6365import java .util .concurrent .locks .ReadWriteLock ;
6466import java .util .concurrent .locks .ReentrantReadWriteLock ;
7577import org .eclipse .rdf4j .model .base .AbstractValueFactory ;
7678import org .eclipse .rdf4j .model .base .CoreDatatype ;
7779import org .eclipse .rdf4j .model .util .Literals ;
80+ import org .eclipse .rdf4j .sail .SailException ;
7881import org .eclipse .rdf4j .sail .lmdb .LmdbUtil .Transaction ;
7982import org .eclipse .rdf4j .sail .lmdb .config .LmdbStoreConfig ;
8083import org .eclipse .rdf4j .sail .lmdb .model .LmdbBNode ;
@@ -203,14 +206,16 @@ class ValueStore extends AbstractValueFactory {
203206 final Set <Long > unusedRevisionIds = new HashSet <>();
204207
205208 private final ConcurrentCleaner cleaner = new ConcurrentCleaner ();
206- private final Set <ReadTxn > readTransactions = Collections .newSetFromMap (new ConcurrentHashMap <>());
209+ private final Set <ReadTxn > readTransactions = Collections
210+ .synchronizedSet (Collections .newSetFromMap (new WeakHashMap <>()));
207211 private final ThreadLocal <ReadTxn > threadLocalReadTxn = ThreadLocal .withInitial (() -> {
208- ReadTxn readTxn = new ReadTxn ();
212+ ReadTxn readTxn = new ReadTxn (readTransactions );
209213 readTxn .registerIfNeeded ();
210- cleaner . register ( readTxn , readTxn :: close );
214+ readTxn . cleaner ( cleaner );
211215 return readTxn ;
212216 });
213217
218+ @ SuppressWarnings ("unused" )
214219 private Object [] previousNamespaceEntry ;
215220
216221 ValueStore (File dir , LmdbStoreConfig config ) throws IOException {
@@ -591,7 +596,7 @@ public boolean resolveValue(long id, LmdbValue value) {
591596 return true ;
592597 }
593598 } catch (IOException e ) {
594- // should not happen
599+ throw new SailException ( e );
595600 }
596601 return false ;
597602 }
@@ -854,7 +859,7 @@ <T> T readTransaction(long env, Transaction<T> transaction) throws IOException {
854859 if (writeTxn != 0 ) {
855860 return LmdbUtil .readTransaction (env , writeTxn , transaction );
856861 }
857- return threadLocalReadTxn .get ().execute (transaction );
862+ return threadLocalReadTxn .get ().execute (transaction , env );
858863 } finally {
859864 txnLock .readLock ().unlock ();
860865 }
@@ -1335,59 +1340,119 @@ public void close() throws IOException {
13351340 }
13361341 }
13371342
1338- private final class ReadTxn {
1343+ private static final class ReadTxn {
13391344
1340- private long txn ;
1341- private boolean initialized ;
1342- private int depth ;
1345+ private final State state = new State (-1 );
13431346 private boolean registered ;
13441347
1345- <T > T execute (Transaction <T > transaction ) throws IOException {
1348+ private final Set <ReadTxn > readTransactions ;
1349+ private Cleaner .Cleanable cleaner ;
1350+
1351+ static class State implements Runnable {
1352+ public long txn ;
1353+ public long depth ;
1354+ private boolean initialized ;
1355+
1356+ public State (long txn ) {
1357+ this .txn = txn ;
1358+ }
1359+
1360+ @ Override
1361+ public void run () {
1362+ if (initialized ) {
1363+ var txn = this .txn ;
1364+ this .txn = -1 ;
1365+ if (txn != -1 ) {
1366+ try {
1367+ if (depth > 0 ) {
1368+ mdb_txn_reset (txn );
1369+ depth = 0 ;
1370+ }
1371+ } finally {
1372+ mdb_txn_abort (txn );
1373+ }
1374+ }
1375+ initialized = false ;
1376+ }
1377+ }
1378+ }
1379+
1380+ public ReadTxn (Set <ReadTxn > readTransactions ) {
1381+ this .readTransactions = readTransactions ;
1382+ }
1383+
1384+ public void cleaner (ConcurrentCleaner cleaner ) {
1385+ this .cleaner = cleaner .register (this , state );
1386+ }
1387+
1388+ synchronized <T > T execute (Transaction <T > transaction , long env ) throws IOException {
13461389 try (MemoryStack stack = MemoryStack .stackPush ()) {
1347- long handle = ensureTxn ();
1348- depth ++;
13491390 try {
1350- return transaction .exec (stack , handle );
1351- } finally {
1352- releaseTxn ();
1391+ ensureTxn (env );
1392+ state .depth ++;
1393+ try {
1394+ return transaction .exec (stack , state .txn );
1395+ } finally {
1396+ releaseTxn ();
1397+ }
1398+ } catch (Exception e ) {
1399+ // Retry once
1400+ try {
1401+ System .gc ();
1402+ Thread .sleep (1 );
1403+ System .gc ();
1404+ Thread .sleep (1 );
1405+ } catch (InterruptedException ie ) {
1406+ Thread .currentThread ().interrupt ();
1407+ }
1408+
1409+ ensureTxn (env );
1410+ state .depth ++;
1411+ try {
1412+ return transaction .exec (stack , state .txn );
1413+ } finally {
1414+ releaseTxn ();
1415+ }
13531416 }
13541417 }
13551418 }
13561419
1357- private long ensureTxn () throws IOException {
1420+ private void ensureTxn (long env ) throws IOException {
13581421 registerIfNeeded ();
1359- if (!initialized ) {
1360- txn = startTxn ();
1361- initialized = true ;
1362- return txn ;
1422+
1423+ if (!state .initialized ) {
1424+ startTxn (env );
1425+ state .initialized = true ;
1426+ return ;
13631427 }
1364- if (depth == 0 ) {
1428+
1429+ if (state .depth == 0 ) {
13651430 try {
1366- E (mdb_txn_renew (txn ));
1431+ E (mdb_txn_renew (state . txn ));
13671432 } catch (IOException e ) {
13681433 closeInternal ();
1369- txn = startTxn ();
1370- initialized = true ;
1434+ startTxn (env );
1435+ state . initialized = true ;
13711436 }
13721437 }
1373- return txn ;
1438+
13741439 }
13751440
1376- private long startTxn () throws IOException {
1441+ private void startTxn (long env ) throws IOException {
13771442 try (MemoryStack stack = MemoryStack .stackPush ()) {
13781443 PointerBuffer pp = stack .mallocPointer (1 );
13791444 E (mdb_txn_begin (env , NULL , MDB_RDONLY , pp ));
1380- return pp .get (0 );
1445+ state . txn = pp .get (0 );
13811446 }
13821447 }
13831448
13841449 private void releaseTxn () {
1385- if (depth == 0 ) {
1450+ if (state . depth == 0 ) {
13861451 return ;
13871452 }
1388- depth --;
1389- if (depth == 0 && initialized ) {
1390- mdb_txn_reset (txn );
1453+ state . depth --;
1454+ if (state . depth == 0 && state . initialized ) {
1455+ mdb_txn_reset (state . txn );
13911456 }
13921457 }
13931458
@@ -1403,13 +1468,8 @@ private void unregister() {
14031468 }
14041469
14051470 private void closeInternal () {
1406- if (initialized ) {
1407- if (depth > 0 ) {
1408- mdb_txn_reset (txn );
1409- depth = 0 ;
1410- }
1411- mdb_txn_abort (txn );
1412- initialized = false ;
1471+ if (state .initialized ) {
1472+ cleaner .clean ();
14131473 }
14141474 }
14151475
0 commit comments