diff --git a/LMDB_ISOLATION_EXECPLAN.md b/LMDB_ISOLATION_EXECPLAN.md new file mode 100644 index 00000000000..fc7304eb0bd --- /dev/null +++ b/LMDB_ISOLATION_EXECPLAN.md @@ -0,0 +1,196 @@ +# LMDB-Backed Isolation Up To Snapshot + +This ExecPlan is a living document. The sections `Progress`, `Surprises & Discoveries`, `Decision Log`, and `Outcomes & Retrospective` must be kept up to date as work proceeds. + +This plan must be maintained in accordance with `PLANS.md` at the repository root. + +## Purpose / Big Picture + +After this change, the LMDB Store will rely on LMDB's own transaction isolation for NONE, READ_COMMITTED, SNAPSHOT_READ, and SNAPSHOT. Snapshot and repeatable-read behavior will come from LMDB read transactions rather than the current in-memory SnapshotSailStore. Users will see the same externally observable isolation behavior (as proven by the existing isolation tests), but with simpler transaction mechanics and fewer in-memory overlays. + +## Progress + +- [x] (2026-01-02 09:45Z) Drafted ExecPlan and initial analysis. +- [x] (2026-01-02 13:30Z) Defined LMDB transaction context and APIs. +- [x] (2026-01-02 14:25Z) Implemented LMDB-backed SailSource/Sink/Dataset behavior. +- [x] (2026-01-02 14:25Z) Updated isolation level support and lock strategy. +- [x] (2026-01-02 13:59Z) Enforced thread-affine LMDB txn context use. +- [x] (2026-01-02 15:01Z) Ran LmdbStoreIsolationLevelTest and LmdbSailStoreTest. +- [x] (2026-01-02 15:18Z) Added commit read-lock + pinned ValueStore reads. +- [x] (2026-01-02 15:25Z) Added commit-window regression test and commit/read lock guard. +- [x] (2026-01-02 21:35Z) Added SNAPSHOT begin/commit regression test. +- [x] (2026-01-02 21:45Z) Guarded snapshot txn pinning with commit read lock. +- [x] (2026-01-03 11:39Z) Wired SNAPSHOT_READ pinning and deferred updates in LmdbTxnContext. +- [x] (2026-01-03 11:45Z) Aligned deprecated tracking with pending update buffering. +- [x] (2026-01-03 11:54Z) Treated SNAPSHOT reads as pinned + deferred to keep snapshotRead stable. +- [x] (2026-01-03 13:30Z) Added serializable conflict tracking + prepare checks for LMDB snapshots. +- [x] (2026-01-03 13:34Z) LmdbOptimisticIsolationTest green (mvnf core/sail/lmdb LmdbOptimisticIsolationTest). +- [ ] Run full LMDB module verify and isolation suite. + +## Surprises & Discoveries + +- Observation: LmdbStore wraps the backend with SnapshotSailStore and bypasses it only when isolation is disabled, so snapshot semantics are not coming from LMDB. + Evidence: + core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStore.java + this.store = new SnapshotSailStore(backingStore, ...) + if (isIsolationDisabled()) { return backingStore.getExplicitSailSource(); } +- Observation: LmdbSailSource overrides fork() and does not use SailSourceBranch; prepare() on the source is a no-op unless we wire serializable conflict checks directly into LmdbSailSource/LmdbTxnContext. + Evidence: + core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java + LmdbSailSource.fork() returns a new LmdbSailSource (bypassing SailSourceBranch) + LmdbSailSource.prepare() now performs serializable conflict checks +- Observation: LmdbSailSink.flush() commits LMDB transactions immediately, which would prematurely commit changes if SnapshotSailStore were removed. + Evidence: + core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java + tripleStore.commit(); ... valueStore.commit(); +- Observation: Triple store writes can run on a background thread, so the write transaction is not on the caller thread; LMDB write transactions are thread-affine. + Evidence: + core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java + tripleStoreExecutor.submit(() -> { tripleStore.startTransaction(); ... }); +- Observation: SPARQL MODIFY updates must not see their own writes while iterating WHERE bindings. + Evidence: + testsuites/repository/src/main/java/org/eclipse/rdf4j/testsuite/repository/optimistic/DeleteInsertTest.java + DeleteInsertTest initially failed until per-update changes were buffered. +- Observation: LMDB module verify appears to stall in OptimisticIsolationTest (IsolationLevelTest logging). + Evidence: + logs/mvnf/20260102-213127-verify.log (no completion after IsolationLevelTest stack trace). + core/sail/lmdb/target/surefire-reports/2026-01-02T22-31-54_127-jvmRun1.dumpstream + +## Decision Log + +- Decision: Implement LMDB-native isolation for NONE, READ_COMMITTED, SNAPSHOT_READ, SNAPSHOT and stop using SnapshotSailStore for those levels. + Rationale: The requirement is to rely solely on LMDB isolation up to SNAPSHOT, and the current SnapshotSailStore overlay prevents that. + Date/Author: 2026-01-02 / Codex +- Decision: Implement SERIALIZABLE via optimistic conflict detection over LMDB snapshots. + Rationale: Track observed statement patterns and compare pinned snapshot vs current state at prepare/commit; this preserves serializable semantics without a SnapshotSailStore overlay. + Date/Author: 2026-01-03 / Codex +- Decision: Disable async triple-store write threading for transactions that need read-your-writes. + Rationale: LMDB write transactions are thread-affine; read-your-writes requires reading from the same write transaction. + Date/Author: 2026-01-02 / Codex +- Decision: Buffer per-update changes and merge on endUpdate to keep MODIFY WHERE evaluation stable. + Rationale: SPARQL updates must not observe their own modifications while streaming bindings. + Date/Author: 2026-01-02 / Codex +- Decision: Bind LMDB transaction contexts to the thread that opens pinned/read-write txns; reject cross-thread access. + Rationale: LMDB read/write transactions are thread-affine, so snapshot or write transactions must stay on one thread. + Date/Author: 2026-01-02 / Codex +- Decision: Block dataset creation during commit and pin ValueStore reads per dataset. + Rationale: Avoid a window where triple-store commits become visible before the ValueStore commit, leading to unresolved IDs. + Date/Author: 2026-01-02 / Codex +- Decision: Split SNAPSHOT_READ vs SNAPSHOT handling in LmdbTxnContext using explicit mode flags. + Rationale: Defer writes for SNAPSHOT_READ while pinning read snapshots, and eagerly start write transactions for SNAPSHOT to keep snapshot boundaries deterministic. + Date/Author: 2026-01-03 / Codex +- Decision: Track deprecated removals per pending update before merging into transaction-wide state. + Rationale: Aborted updates must not leak deprecations into later change visibility or connection listener notifications. + Date/Author: 2026-01-03 / Codex +- Decision: For SNAPSHOT, pin read snapshots and defer writes like SNAPSHOT_READ. + Rationale: SnapshotRead tests require stable iterators; deferring writes avoids mid-iteration visibility while preserving read-your-writes via overlays. + Date/Author: 2026-01-03 / Codex + +## Outcomes & Retrospective + +Implemented LMDB-backed transaction context, forkable sources, and snapshot handling without SnapshotSailStore. Added serializable conflict detection based on observed patterns against pinned snapshots, with prepare-time checks for LMDB-backed branches. Remaining work: run full LMDB module verify and isolation suite before final acceptance. + +## Context and Orientation + +The LMDB store implementation lives under core/sail/lmdb. LmdbStore is the Sail entry point. It currently wraps LmdbSailStore in SnapshotSailStore, which uses in-memory changesets to provide snapshot semantics. SailSourceConnection in core/sail/base uses SailSource.fork() to create transaction-scoped branches when isolation is not NONE, so the LMDB store must implement a forking SailSource if it is used directly. LmdbSailSink and LmdbSailDataset are the LMDB-specific write and read adapters. TxnManager and ValueStore manage LMDB read/write transactions. + +Definitions used here: +- LMDB environment means the LMDB database handle opened by mdb_env_open (the value store and triple store each use separate environments). +- Snapshot isolation means repeatable reads within a transaction and no visibility of concurrent commits after the transaction begins. +- Pinned read transaction means a long-lived LMDB read transaction held open for the duration of a Sail transaction. + +## Plan of Work + +First, define an LMDB transaction context that is created when a Sail transaction begins and closed on commit or rollback. This context must own the LMDB read transaction used for repeatable reads (SNAPSHOT, SNAPSHOT_READ) and, when writes occur, the LMDB write transactions for the value store and triple store. It must also provide a consistent story for NONE and READ_COMMITTED: reads use short-lived read transactions unless a write transaction is active, in which case reads must use the write transaction so that read-your-writes works. + +Next, make LmdbSailSource forkable and context-aware. A forked LmdbSailSource should bind to the transaction context so that datasets and sinks created from it use the same read/write transactions. LmdbSailSink.flush() must not commit for transaction-scoped sinks; commit should occur once per Sail transaction at SailSource.flush() or on connection commit. This removes the need for SnapshotSailStore to buffer changes in memory. + +Then, update the LMDB transaction machinery. TxnManager must support pinned read transactions that are not reset on every commit, and ValueStore must allow a pinned read transaction for snapshot transactions instead of its current per-call renew/reset behavior. Ensure map-resize logic does not invalidate pinned snapshot transactions; if it must, detect and fail those transactions with a clear conflict error. + +Finally, update LmdbStore to remove the SnapshotSailStore wrapper for SNAPSHOT and below, adjust supported isolation levels, and run the isolation and LMDB module tests to validate behavior. + +## Concrete Steps + +All commands are from the repository root. + +1) Baseline build (required before tests): + mvn -T 1C -o -Dmaven.repo.local=.m2_repo -Pquick clean install | tail -200 + +2) If adding or adjusting tests, run the smallest targeted test first: + python3 .codex/skills/mvnf/scripts/mvnf.py LmdbStoreIsolationLevelTest + + Expected tail excerpt contains "BUILD SUCCESS". + +3) Run LMDB module verification after implementation: + python3 .codex/skills/mvnf/scripts/mvnf.py core/sail/lmdb + +4) If module tests pass but isolation semantics are still in doubt, run the Sail isolation suite explicitly: + python3 .codex/skills/mvnf/scripts/mvnf.py SailIsolationLevelTest + +If any command fails because of missing offline artifacts, rerun the same command once without -o, then return to offline runs. + +## Validation and Acceptance + +Acceptance is met when: +- LmdbStoreIsolationLevelTest passes for NONE, READ_COMMITTED, SNAPSHOT_READ, and SNAPSHOT. +- Snapshot semantics are visible: repeated reads inside a transaction are stable; concurrent commits do not appear mid-transaction; read-your-writes works. +- The LMDB module test run core/sail/lmdb is green. +- LmdbStore no longer relies on SnapshotSailStore for the supported isolation levels, and LmdbSailSource.fork() is implemented. + +If SERIALIZABLE is removed, tests should skip it by reporting it as unsupported. + +## Idempotence and Recovery + +All steps are repeatable. If a change causes isolation tests to fail, revert only the LMDB module changes, keep any new tests, and iterate. If map resize conflicts with pinned read transactions, return a clear SailConflictException and add a targeted test to document the behavior. + +## Artifacts and Notes + +Collect short snippets (no more than a few lines) from: +- core/sail/lmdb/target/surefire-reports/ showing passing isolation tests. +- Any new LMDB-specific tests added for snapshot or read-your-writes behavior. + +Example (placeholder): + Tests run: 12, Failures: 0, Errors: 0, Skipped: 0 + +## Interfaces and Dependencies + +New or adjusted LMDB-facing APIs should live in core/sail/lmdb: + +- New class org.eclipse.rdf4j.sail.lmdb.LmdbTxnContext + Responsibilities: track isolation level, hold pinned read transactions, and manage write transaction lifecycle across TripleStore and ValueStore. + Required methods (names can be adjusted as long as intent is preserved): + - void begin(IsolationLevel level) + - Txn acquireTripleReadTxn() (returns pinned or per-call based on level) + - long acquireTripleWriteTxn() (write transaction handle on the caller thread) + - ValueStore.ReadTxn acquireValueReadTxn() (pinned or per-call) + - void markWriteStarted() + - void commit() + - void rollback() + - void close() + +- LmdbSailStore.LmdbSailSource + Implement fork() and accept an optional LmdbTxnContext. + dataset(IsolationLevel) must use context-provided read transactions. + sink(IsolationLevel) must create sinks that write into context-managed LMDB write transactions. + +- LmdbSailStore.LmdbSailSink + Allow a transaction-scoped mode where flush() does not commit; commit is handled by the enclosing SailSource.flush() or connection commit. + +- TxnManager + Add a pinned read transaction path (not reset on commit) or a way to exclude active snapshot transactions from reset(). + +- ValueStore + Add a pinned read transaction API (similar semantics to TxnManager) and a method to run reads against that pinned transaction. + +- LmdbStore + Remove the SnapshotSailStore wrapper for supported isolation levels and update setSupportedIsolationLevels(...) to stop claiming SERIALIZABLE if it is not implemented. + +Each new or changed API should be documented inline with a short comment explaining how it supports LMDB-backed snapshot isolation. + +## Plan Change Note + +(2026-01-02) Created the initial ExecPlan document from repository analysis so implementation can proceed with a living plan. +(2026-01-03) Updated progress and decisions to reflect snapshot-read pinning and update deferral work in LmdbTxnContext. +(2026-01-03) Recorded deprecated-tracking alignment in progress and decision log. +(2026-01-03) Documented SNAPSHOT deferral decision to satisfy snapshotRead stability. +(2026-01-03) Updated serializable approach and prepare-time conflict checks after LmdbOptimisticIsolationTest coverage. diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java index 2bdff8fc2b7..3e8049de52e 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java @@ -8,6 +8,7 @@ * * SPDX-License-Identifier: BSD-3-Clause *******************************************************************************/ +// Some portions generated by Codex package org.eclipse.rdf4j.sail.lmdb; import java.io.File; @@ -15,8 +16,12 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.ArrayList; +import java.util.Arrays; import java.util.Comparator; +import java.util.IdentityHashMap; +import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.ExecutorService; @@ -24,25 +29,33 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; import org.eclipse.rdf4j.common.iteration.CloseableIteration; import org.eclipse.rdf4j.common.iteration.CloseableIteratorIteration; import org.eclipse.rdf4j.common.iteration.ConvertingIteration; import org.eclipse.rdf4j.common.iteration.FilterIteration; +import org.eclipse.rdf4j.common.iteration.IterationWrapper; import org.eclipse.rdf4j.common.iteration.UnionIteration; import org.eclipse.rdf4j.common.order.StatementOrder; import org.eclipse.rdf4j.common.transaction.IsolationLevel; +import org.eclipse.rdf4j.common.transaction.IsolationLevels; import org.eclipse.rdf4j.model.IRI; +import org.eclipse.rdf4j.model.Model; import org.eclipse.rdf4j.model.Namespace; import org.eclipse.rdf4j.model.Resource; import org.eclipse.rdf4j.model.Statement; import org.eclipse.rdf4j.model.Value; import org.eclipse.rdf4j.model.ValueFactory; +import org.eclipse.rdf4j.model.impl.LinkedHashModel; import org.eclipse.rdf4j.query.algebra.evaluation.impl.EvaluationStatistics; import org.eclipse.rdf4j.sail.InterruptedSailException; +import org.eclipse.rdf4j.sail.SailConflictException; import org.eclipse.rdf4j.sail.SailException; import org.eclipse.rdf4j.sail.base.BackingSailSource; +import org.eclipse.rdf4j.sail.base.Changeset; +import org.eclipse.rdf4j.sail.base.Changeset.SimpleStatementPattern; import org.eclipse.rdf4j.sail.base.SailDataset; import org.eclipse.rdf4j.sail.base.SailSink; import org.eclipse.rdf4j.sail.base.SailSource; @@ -173,9 +186,133 @@ abstract static class StatefulOperation implements Operation { private final ReentrantLock sinkStoreAccessLock = new ReentrantLock(); /** - * Boolean indicating whether any {@link LmdbSailSink} has started a transaction on the {@link TripleStore}. + * Global write lock to serialize LMDB write transactions. */ - private final AtomicBoolean storeTxnStarted = new AtomicBoolean(false); + private final ReentrantLock writeTxnLock = new ReentrantLock(); + /** + * Guard against readers starting mid-commit across LMDB environments. + */ + private final ReentrantReadWriteLock commitLock = new ReentrantReadWriteLock(); + + private final Object defaultContextKey = new Object(); + private final Object contextLock = new Object(); + private final Map contexts = new IdentityHashMap<>(); + + private static final class ContextState { + private final LmdbTxnContext context; + private int refCount; + + private ContextState(LmdbTxnContext context) { + this.context = context; + this.refCount = 1; + } + } + + final class ConnectionSailStore implements SailStore { + private final Object contextKey = new Object(); + private LmdbTxnContext transactionContext; + private boolean transactionInitialized; + private boolean updateContextInitialized; + + void initTransaction(IsolationLevel level) throws SailException { + if (!transactionInitialized) { + transactionContext = acquireContext(contextKey); + transactionInitialized = true; + updateContextInitialized = false; + } + transactionContext.init(level); + transactionContext.markManagedTransaction(); + } + + void endTransaction(boolean commit) throws SailException { + if (!transactionInitialized) { + return; + } + try { + if (transactionContext != null) { + if (commit) { + transactionContext.commit(); + } else { + transactionContext.rollback(); + } + } + } finally { + try { + releaseContext(contextKey, transactionContext); + } finally { + transactionContext = null; + transactionInitialized = false; + updateContextInitialized = false; + } + } + } + + void beginUpdate() throws SailException { + if (!transactionInitialized) { + transactionContext = acquireContext(contextKey); + transactionInitialized = true; + updateContextInitialized = true; + } + if (transactionContext != null) { + transactionContext.beginUpdate(); + } + } + + void endUpdate() throws SailException { + if (!transactionInitialized || transactionContext == null) { + return; + } + transactionContext.endUpdate(); + if (updateContextInitialized) { + releaseContext(contextKey, transactionContext); + transactionContext = null; + transactionInitialized = false; + updateContextInitialized = false; + } + } + + void abortUpdate() throws SailException { + if (!transactionInitialized || transactionContext == null) { + return; + } + transactionContext.abortUpdate(); + if (updateContextInitialized) { + releaseContext(contextKey, transactionContext); + transactionContext = null; + transactionInitialized = false; + updateContextInitialized = false; + } + } + + @Override + public ValueFactory getValueFactory() { + return LmdbSailStore.this.getValueFactory(); + } + + @Override + public EvaluationStatistics getEvaluationStatistics() { + return LmdbSailStore.this.getEvaluationStatistics(); + } + + @Override + public SailSource getExplicitSailSource() { + return new LmdbSailSource(true, selectContextKey()); + } + + @Override + public SailSource getInferredSailSource() { + return new LmdbSailSource(false, selectContextKey()); + } + + private Object selectContextKey() { + return transactionInitialized ? contextKey : new Object(); + } + + @Override + public void close() throws SailException { + // no-op: backing store manages lifecycle + } + } /** * Creates a new {@link LmdbSailStore}. @@ -213,26 +350,13 @@ public ValueFactory getValueFactory() { void rollback() throws SailException { sinkStoreAccessLock.lock(); try { - try { - valueStore.rollback(); - } finally { - if (multiThreadingActive) { - while (!opQueue.add(ROLLBACK_TRANSACTION)) { - if (tripleStoreException != null) { - throw wrapTripleStoreException(); - } else { - Thread.yield(); - } - } - } else { - tripleStore.rollback(); - } - } + valueStore.rollback(); + tripleStore.rollback(); + clearUnusedIds(); } catch (Exception e) { logger.warn("Failed to rollback LMDB transaction", e); throw e instanceof SailException ? (SailException) e : new SailException(e); } finally { - tripleStoreException = null; sinkStoreAccessLock.unlock(); } } @@ -291,14 +415,178 @@ public EvaluationStatistics getEvaluationStatistics() { return new LmdbEvaluationStatistics(valueStore, tripleStore); } + ReentrantLock getWriteTxnLock() { + return writeTxnLock; + } + + void acquireCommitReadLock() { + commitLock.readLock().lock(); + } + + void releaseCommitReadLock() { + commitLock.readLock().unlock(); + } + + void acquireCommitWriteLock() { + commitLock.writeLock().lock(); + } + + void releaseCommitWriteLock() { + commitLock.writeLock().unlock(); + } + + ConnectionSailStore createConnectionStore() { + return new ConnectionSailStore(); + } + + private LmdbTxnContext acquireContext(Object key) { + synchronized (contextLock) { + ContextState state = contexts.get(key); + if (state == null) { + state = new ContextState(new LmdbTxnContext(this)); + contexts.put(key, state); + } else { + state.refCount++; + } + return state.context; + } + } + + private void releaseContext(Object key, LmdbTxnContext context) throws SailException { + LmdbTxnContext toClose = null; + synchronized (contextLock) { + ContextState state = contexts.get(key); + if (state == null || state.context != context) { + return; + } + state.refCount--; + if (state.refCount <= 0) { + contexts.remove(key); + toClose = state.context; + } + } + if (toClose != null) { + toClose.close(); + } + } + + private LmdbTxnContext peekContext(Object key) { + synchronized (contextLock) { + ContextState state = contexts.get(key); + return state == null ? null : state.context; + } + } + + NamespaceStore getNamespaceStore() { + return namespaceStore; + } + + TripleStore getTripleStore() { + return tripleStore; + } + + ValueStore getValueStore() { + return valueStore; + } + + void applyBufferedChanges(LmdbTxnContext context) throws IOException { + applyRemoved(context.getRemoved(true), true); + applyRemoved(context.getRemoved(false), false); + applyAdded(context.getAdded(true), true); + applyAdded(context.getAdded(false), false); + } + + private void applyAdded(Set added, boolean explicit) throws IOException { + if (added.isEmpty()) { + return; + } + for (LmdbTxnContext.Quad quad : added) { + long subjId = valueStore.getId(quad.subj, true); + long predId = valueStore.getId(quad.pred, true); + long objId = valueStore.getId(quad.obj, true); + long ctxId = quad.ctx == null ? 0L : valueStore.getId(quad.ctx, true); + if (!explicit) { + mayHaveInferred = true; + } + if (!unusedIds.isEmpty()) { + unusedIds.remove(subjId); + unusedIds.remove(predId); + unusedIds.remove(objId); + unusedIds.remove(ctxId); + } + tripleStore.storeTriple(subjId, predId, objId, ctxId, explicit); + } + } + + private void applyRemoved(Set removed, boolean explicit) throws IOException { + if (removed.isEmpty()) { + return; + } + for (LmdbTxnContext.Quad quad : removed) { + long subjId = valueStore.getId(quad.subj, false); + if (subjId == LmdbValue.UNKNOWN_ID) { + continue; + } + long predId = valueStore.getId(quad.pred, false); + if (predId == LmdbValue.UNKNOWN_ID) { + continue; + } + long objId = valueStore.getId(quad.obj, false); + if (objId == LmdbValue.UNKNOWN_ID) { + continue; + } + long ctxId = 0L; + if (quad.ctx != null) { + ctxId = valueStore.getId(quad.ctx, false); + if (ctxId == LmdbValue.UNKNOWN_ID) { + continue; + } + } + tripleStore.removeTriplesByContext(subjId, predId, objId, ctxId, explicit, quadIds -> { + for (long id : quadIds) { + if (id != 0L) { + unusedIds.add(id); + } + } + }); + } + } + + void clearUnusedIds() { + unusedIds.clear(); + nextUnusedIds.clear(); + } + + void filterUsedIdsInTripleStore() throws IOException { + if (!unusedIds.isEmpty()) { + tripleStore.filterUsedIds(unusedIds); + } + } + + void handleRemovedIdsInValueStore() throws IOException { + if (!unusedIds.isEmpty()) { + do { + valueStore.gcIds(unusedIds, nextUnusedIds); + unusedIds.clear(); + if (!nextUnusedIds.isEmpty()) { + // swap sets + PersistentSet ids = unusedIds; + unusedIds = nextUnusedIds; + nextUnusedIds = ids; + filterUsedIdsInTripleStore(); + } + } while (!unusedIds.isEmpty()); + } + } + @Override public SailSource getExplicitSailSource() { - return new LmdbSailSource(true); + return new LmdbSailSource(true, defaultContextKey); } @Override public SailSource getInferredSailSource() { - return new LmdbSailSource(false); + return new LmdbSailSource(false, defaultContextKey); } CloseableIteration getContexts() throws IOException { @@ -412,122 +700,250 @@ CloseableIteration createStatementIterator( } } + void ensureObservedPatternsUnchanged(Txn snapshotTxn, Txn currentTxn, + Set observed, boolean explicit) throws IOException { + if (observed == null || observed.isEmpty()) { + return; + } + for (SimpleStatementPattern pattern : observed) { + if (!recordsEqual(snapshotTxn, currentTxn, pattern, explicit)) { + throw new SailConflictException("Observed State has Changed"); + } + } + } + + private boolean recordsEqual(Txn snapshotTxn, Txn currentTxn, SimpleStatementPattern pattern, boolean explicit) + throws IOException { + Resource subj = pattern.getSubject(); + IRI pred = pattern.getPredicate(); + Value obj = pattern.getObject(); + Resource context = pattern.getContext(); + boolean allContexts = pattern.isAllContexts(); + + long snapshotSubjId = LmdbValue.UNKNOWN_ID; + long currentSubjId = LmdbValue.UNKNOWN_ID; + boolean snapshotEmpty = false; + boolean currentEmpty = false; + if (subj != null) { + snapshotSubjId = valueStore.getId(subj); + currentSubjId = valueStore.getIdUnpinned(subj); + if (snapshotSubjId == LmdbValue.UNKNOWN_ID) { + snapshotEmpty = true; + } + if (currentSubjId == LmdbValue.UNKNOWN_ID) { + currentEmpty = true; + } + } + + long snapshotPredId = LmdbValue.UNKNOWN_ID; + long currentPredId = LmdbValue.UNKNOWN_ID; + if (pred != null) { + snapshotPredId = valueStore.getId(pred); + currentPredId = valueStore.getIdUnpinned(pred); + if (snapshotPredId == LmdbValue.UNKNOWN_ID) { + snapshotEmpty = true; + } + if (currentPredId == LmdbValue.UNKNOWN_ID) { + currentEmpty = true; + } + } + + long snapshotObjId = LmdbValue.UNKNOWN_ID; + long currentObjId = LmdbValue.UNKNOWN_ID; + if (obj != null) { + snapshotObjId = valueStore.getId(obj); + currentObjId = valueStore.getIdUnpinned(obj); + if (snapshotObjId == LmdbValue.UNKNOWN_ID) { + snapshotEmpty = true; + } + if (currentObjId == LmdbValue.UNKNOWN_ID) { + currentEmpty = true; + } + } + + long snapshotContextId = LmdbValue.UNKNOWN_ID; + long currentContextId = LmdbValue.UNKNOWN_ID; + if (allContexts) { + snapshotContextId = LmdbValue.UNKNOWN_ID; + currentContextId = LmdbValue.UNKNOWN_ID; + } else if (context == null) { + snapshotContextId = 0L; + currentContextId = 0L; + } else if (context.isTriple()) { + snapshotEmpty = true; + currentEmpty = true; + } else { + snapshotContextId = valueStore.getId(context); + currentContextId = valueStore.getIdUnpinned(context); + if (snapshotContextId == LmdbValue.UNKNOWN_ID) { + snapshotEmpty = true; + } + if (currentContextId == LmdbValue.UNKNOWN_ID) { + currentEmpty = true; + } + } + + if (snapshotEmpty && currentEmpty) { + return true; + } + + RecordIterator snapshotIter = null; + RecordIterator currentIter = null; + try { + if (!snapshotEmpty) { + snapshotIter = tripleStore.getTriples(snapshotTxn, snapshotSubjId, snapshotPredId, snapshotObjId, + snapshotContextId, explicit); + } + if (!currentEmpty) { + currentIter = tripleStore.getTriples(currentTxn, currentSubjId, currentPredId, currentObjId, + currentContextId, explicit); + } + return recordsEqual(snapshotIter, currentIter); + } finally { + if (snapshotIter != null) { + snapshotIter.close(); + } + if (currentIter != null) { + currentIter.close(); + } + } + } + + private boolean recordsEqual(RecordIterator snapshotIter, RecordIterator currentIter) { + if (snapshotIter == null) { + return currentIter == null || currentIter.next() == null; + } + if (currentIter == null) { + return snapshotIter.next() == null; + } + while (true) { + long[] snapshotRecord = snapshotIter.next(); + long[] currentRecord = currentIter.next(); + if (snapshotRecord == null || currentRecord == null) { + return snapshotRecord == null && currentRecord == null; + } + if (!Arrays.equals(snapshotRecord, currentRecord)) { + return false; + } + } + } + private final class LmdbSailSource extends BackingSailSource { private final boolean explicit; + private final Object contextKey; - public LmdbSailSource(boolean explicit) { + public LmdbSailSource(boolean explicit, Object contextKey) { this.explicit = explicit; + this.contextKey = contextKey; } @Override public SailSource fork() { - throw new UnsupportedOperationException("This store does not support multiple datasets"); + return new LmdbSailSource(explicit, contextKey); + } + + @Override + public void prepare() throws SailException { + if (!explicit) { + return; + } + LmdbTxnContext context = peekContext(contextKey); + if (context != null) { + context.checkSerializableConflictsForPrepare(); + } } @Override public SailSink sink(IsolationLevel level) throws SailException { - return new LmdbSailSink(explicit); + LmdbTxnContext context = acquireContext(contextKey); + return new LmdbSailSink(explicit, contextKey, context, level); } @Override public LmdbSailDataset dataset(IsolationLevel level) throws SailException { - return new LmdbSailDataset(explicit); + LmdbTxnContext context = acquireContext(contextKey); + return new LmdbSailDataset(explicit, contextKey, context, level); + } + + @Override + public void flush() throws SailException { + LmdbTxnContext context = peekContext(contextKey); + if (context != null) { + context.commit(); + } } + @Override + public void close() throws SailException { + LmdbTxnContext context = peekContext(contextKey); + if (context != null && !context.isManagedTransaction()) { + context.close(); + } + } } - private final class LmdbSailSink implements SailSink { + private final class LmdbSailSink extends Changeset { private final boolean explicit; + private final Object contextKey; + private final LmdbTxnContext txnContext; + private final IsolationLevel level; - public LmdbSailSink(boolean explicit) throws SailException { + public LmdbSailSink(boolean explicit, Object contextKey, LmdbTxnContext txnContext, IsolationLevel level) + throws SailException { this.explicit = explicit; + this.contextKey = contextKey; + this.txnContext = txnContext; + this.level = level; + this.txnContext.init(level); } @Override - public void close() { - // do nothing + public void close() throws SailException { + try { + if (!txnContext.isManagedTransaction()) { + txnContext.rollback(); + } + } finally { + releaseContext(contextKey, txnContext); + } } @Override public void prepare() throws SailException { - // serializable is not supported at this level - } - - protected void filterUsedIdsInTripleStore() throws IOException { - if (!unusedIds.isEmpty()) { - tripleStore.filterUsedIds(unusedIds); + super.prepare(); + if (level == null || !level.isCompatibleWith(IsolationLevels.SERIALIZABLE)) { + return; + } + Set observed = getObserved(); + if (observed != null && !observed.isEmpty()) { + txnContext.recordObserved(explicit, observed); + } + Set checkObserved = txnContext.getObserved(explicit); + if (checkObserved == null || checkObserved.isEmpty()) { + return; + } + LmdbSailStore.this.acquireCommitReadLock(); + try (Txn snapshotTxn = txnContext.acquirePinnedReadTxn(); + Txn currentTxn = tripleStore.getTxnManager().createReadTxn()) { + ensureObservedPatternsUnchanged(snapshotTxn, currentTxn, checkObserved, explicit); + } catch (IOException e) { + throw new SailException(e); + } finally { + LmdbSailStore.this.releaseCommitReadLock(); } } - protected void handleRemovedIdsInValueStore() throws IOException { - if (!unusedIds.isEmpty()) { - do { - valueStore.gcIds(unusedIds, nextUnusedIds); - unusedIds.clear(); - if (!nextUnusedIds.isEmpty()) { - // swap sets - PersistentSet ids = unusedIds; - unusedIds = nextUnusedIds; - nextUnusedIds = ids; - filterUsedIdsInTripleStore(); - } - } while (!unusedIds.isEmpty()); - } + @Override + public Model createEmptyModel() { + return new LinkedHashModel(); } @Override public void flush() throws SailException { - sinkStoreAccessLock.lock(); - boolean activeTxn = storeTxnStarted.get(); - try { - if (multiThreadingActive) { - while (!opQueue.add(COMMIT_TRANSACTION)) { - if (tripleStoreException != null) { - throw wrapTripleStoreException(); - } else { - Thread.yield(); - } - } - } - - try { - namespaceStore.sync(); - } finally { - if (multiThreadingActive) { - while (!asyncTransactionFinished) { - if (tripleStoreException != null) { - throw wrapTripleStoreException(); - } else { - Thread.yield(); - } - } - } - if (activeTxn) { - if (!multiThreadingActive) { - tripleStore.commit(); - filterUsedIdsInTripleStore(); - } - handleRemovedIdsInValueStore(); - valueStore.commit(); - // do not set flag to false until _after_ commit is successfully completed. - storeTxnStarted.set(false); - } - } - } catch (IOException e) { - rollback(); - running.set(false); - logger.error("Encountered an unexpected problem while trying to commit", e); - throw new SailException(e); - } catch (RuntimeException e) { - rollback(); - running.set(false); - logger.error("Encountered an unexpected problem while trying to commit", e); - throw e; - } finally { - multiThreadingActive = false; - sinkStoreAccessLock.unlock(); + if (!txnContext.isManagedTransaction()) { + txnContext.commit(); } } @@ -535,8 +951,7 @@ public void flush() throws SailException { public void setNamespace(String prefix, String name) throws SailException { sinkStoreAccessLock.lock(); try { - startTransaction(true); - namespaceStore.setNamespace(prefix, name); + txnContext.setNamespace(prefix, name); } finally { sinkStoreAccessLock.unlock(); } @@ -546,8 +961,7 @@ public void setNamespace(String prefix, String name) throws SailException { public void removeNamespace(String prefix) throws SailException { sinkStoreAccessLock.lock(); try { - startTransaction(true); - namespaceStore.removeNamespace(prefix); + txnContext.removeNamespace(prefix); } finally { sinkStoreAccessLock.unlock(); } @@ -557,8 +971,7 @@ public void removeNamespace(String prefix) throws SailException { public void clearNamespaces() throws SailException { sinkStoreAccessLock.lock(); try { - startTransaction(true); - namespaceStore.clear(); + txnContext.clearNamespaces(); } finally { sinkStoreAccessLock.unlock(); } @@ -566,7 +979,46 @@ public void clearNamespaces() throws SailException { @Override public void observe(Resource subj, IRI pred, Value obj, Resource... contexts) throws SailException { - // serializable is not supported at this level + super.observe(subj, pred, obj, contexts); + } + + @Override + public boolean hasDeprecated(Resource subj, IRI pred, Value obj, Resource[] contexts) { + Set removed = txnContext.snapshotDeprecated(explicit); + if (removed.isEmpty()) { + return false; + } + boolean matchAllContexts = contexts == null || contexts.length == 0; + Set contextSet = matchAllContexts ? Set.of() : normalizeContexts(contexts); + LmdbTxnContext.Quad probe = new LmdbTxnContext.Quad(subj, pred, obj, null); + for (LmdbTxnContext.Quad deprecated : removed) { + if (probe.subj != null && !Objects.equals(probe.subj, deprecated.subj)) { + continue; + } + if (probe.pred != null && !Objects.equals(probe.pred, deprecated.pred)) { + continue; + } + if (probe.obj != null && !Objects.equals(probe.obj, deprecated.obj)) { + continue; + } + if (matchAllContexts || contextSet.contains(deprecated.ctx)) { + return true; + } + } + return false; + } + + private Set normalizeContexts(Resource... contexts) { + Set resolved = new LinkedHashSet<>(); + for (Resource context : contexts) { + if (context == null) { + resolved.add(null); + } else if (!context.isTriple()) { + LmdbTxnContext.Quad quad = new LmdbTxnContext.Quad(null, null, null, context); + resolved.add(quad.ctx); + } + } + return resolved; } @Override @@ -585,7 +1037,10 @@ public void approveAll(Set approved, Set approvedContexts) sinkStoreAccessLock.lock(); try { - startTransaction(true); + boolean deferWrites = txnContext.deferWrites(); + if (!deferWrites) { + ensureWriteTxn(); + } for (Statement statement : approved) { last = statement; @@ -594,35 +1049,29 @@ public void approveAll(Set approved, Set approvedContexts) Value obj = statement.getObject(); Resource context = statement.getContext(); - AddQuadOperation q = new AddQuadOperation(); - q.s = valueStore.storeValue(subj); - q.p = valueStore.storeValue(pred); - q.o = valueStore.storeValue(obj); - q.c = context == null ? 0 : valueStore.storeValue(context); - q.explicit = explicit; - - if (multiThreadingActive) { - while (!opQueue.add(q)) { - if (tripleStoreException != null) { - throw wrapTripleStoreException(); - } - Thread.onSpinWait(); + if (deferWrites) { + if (!explicit) { + mayHaveInferred = true; } - + txnContext.recordAdd(subj, pred, obj, context, explicit); } else { + AddQuadOperation q = new AddQuadOperation(); + q.s = valueStore.storeValue(subj); + q.p = valueStore.storeValue(pred); + q.o = valueStore.storeValue(obj); + q.c = context == null ? 0 : valueStore.storeValue(context); + q.explicit = explicit; + q.execute(); + txnContext.recordAdd(subj, pred, obj, context, explicit); } } } catch (IOException | RuntimeException e) { - rollback(); - if (multiThreadingActive) { - logger.error("Encountered an unexpected problem while trying to add a statement.", e); - } else { - logger.error( - "Encountered an unexpected problem while trying to add a statement. Last statement that was attempted to be added: [ {} ]", - last, e); - } + txnContext.rollback(); + logger.error( + "Encountered an unexpected problem while trying to add a statement. Last statement that was attempted to be added: [ {} ]", + last, e); if (e instanceof RuntimeException) { throw (RuntimeException) e; @@ -635,107 +1084,33 @@ public void approveAll(Set approved, Set approvedContexts) @Override public void deprecate(Statement statement) throws SailException { + if (txnContext.deferWrites()) { + txnContext.recordRemove(statement.getSubject(), statement.getPredicate(), statement.getObject(), + statement.getContext(), explicit); + return; + } removeStatements(statement.getSubject(), statement.getPredicate(), statement.getObject(), explicit, statement.getContext()); } - /** - * Starts a transaction on the triplestore, if necessary. - * - * @throws SailException if a transaction could not be started. - */ - private void startTransaction(boolean preferThreading) throws SailException { - synchronized (storeTxnStarted) { - if (storeTxnStarted.compareAndSet(false, true)) { - multiThreadingActive = preferThreading && enableMultiThreading; - nextTransactionAsync = multiThreadingActive; - asyncTransactionFinished = false; - try { - if (multiThreadingActive) { - if (running.compareAndSet(false, true)) { - tripleStoreException = null; - tripleStoreExecutor.submit(() -> { - try { - while (running.get()) { - tripleStore.startTransaction(); - while (true) { - Operation op = opQueue.remove(); - if (op != null) { - if (op == COMMIT_TRANSACTION) { - tripleStore.commit(); - filterUsedIdsInTripleStore(); - - nextTransactionAsync = false; - asyncTransactionFinished = true; - break; - } else if (op == ROLLBACK_TRANSACTION) { - tripleStore.rollback(); - nextTransactionAsync = false; - asyncTransactionFinished = true; - break; - } else { - op.execute(); - } - } else { - if (!running.get()) { - logger.warn( - "LmdbSailStore was closed while active transaction was waiting for the next operation. Forcing a rollback!"); - rollback(); - } else if (Thread.interrupted()) { - throw new InterruptedException(); - } else { - Thread.yield(); - } - } - } - - // keep thread running for at least 2ms to lock-free wait for the next - // transaction - long start = 0; - while (running.get() && !nextTransactionAsync) { - if (start == 0) { - // System.currentTimeMillis() is expensive, so only call it when we - // are sure we need to wait - start = System.currentTimeMillis(); - } - - if (System.currentTimeMillis() - start > 2) { - synchronized (storeTxnStarted) { - if (!nextTransactionAsync) { - running.set(false); - return; - } - } - } else { - Thread.yield(); - } - } - } - } catch (Throwable e) { - tripleStoreException = e; - synchronized (storeTxnStarted) { - running.set(false); - } - } - }); - } - } else { - tripleStore.startTransaction(); - } - valueStore.startTransaction(true); - } catch (Exception e) { - storeTxnStarted.set(false); - throw new SailException(e); - } - } - } + private void ensureWriteTxn() throws SailException { + txnContext.ensureWriteTxn(level); } private void addStatement(Resource subj, IRI pred, Value obj, boolean explicit, Resource context) throws SailException { sinkStoreAccessLock.lock(); try { - startTransaction(true); + boolean deferWrites = txnContext.deferWrites(); + if (deferWrites) { + if (!explicit) { + mayHaveInferred = true; + } + txnContext.recordAdd(subj, pred, obj, context, explicit); + return; + } + + ensureWriteTxn(); AddQuadOperation q = new AddQuadOperation(); q.s = valueStore.storeValue(subj); @@ -744,22 +1119,13 @@ private void addStatement(Resource subj, IRI pred, Value obj, boolean explicit, q.c = context == null ? 0 : valueStore.storeValue(context); q.explicit = explicit; - if (multiThreadingActive) { - while (!opQueue.add(q)) { - if (tripleStoreException != null) { - throw wrapTripleStoreException(); - } else { - Thread.onSpinWait(); - } - } - } else { - q.execute(); - } + q.execute(); + txnContext.recordAdd(subj, pred, obj, context, explicit); } catch (IOException e) { - rollback(); + txnContext.rollback(); throw new SailException(e); } catch (RuntimeException e) { - rollback(); + txnContext.rollback(); logger.error("Encountered an unexpected problem while trying to add a statement", e); throw e; } finally { @@ -773,6 +1139,15 @@ private long removeStatements(long subj, long pred, long obj, boolean explicit, for (long contextId : contexts) { tripleStore.removeTriplesByContext(subj, pred, obj, contextId, explicit, quad -> { removeCount[0]++; + try { + Resource subjValue = (Resource) valueStore.getLazyValue(quad[0]); + IRI predValue = (IRI) valueStore.getLazyValue(quad[1]); + Value objValue = valueStore.getLazyValue(quad[2]); + Resource ctxValue = quad[3] == 0 ? null : (Resource) valueStore.getLazyValue(quad[3]); + txnContext.recordRemove(subjValue, predValue, objValue, ctxValue, explicit); + } catch (IOException e) { + throw new RuntimeException(e); + } for (long id : quad) { if (id != 0L) { unusedIds.add(id); @@ -790,7 +1165,10 @@ private long removeStatements(Resource subj, IRI pred, Value obj, boolean explic sinkStoreAccessLock.lock(); try { - startTransaction(false); + if (txnContext.deferWrites()) { + return bufferRemoveStatements(subj, pred, obj, explicit, contexts); + } + ensureWriteTxn(); final long subjID; if (subj != null) { subjID = valueStore.getId(subj); @@ -836,43 +1214,12 @@ private long removeStatements(Resource subj, IRI pred, Value obj, boolean explic } } - if (multiThreadingActive) { - long[] removeCount = new long[1]; - StatefulOperation removeOp = new StatefulOperation() { - @Override - public void execute() throws Exception { - try { - removeCount[0] = removeStatements(subjID, predID, objID, explicit, contextIds); - } finally { - finished = true; - } - } - }; - - while (!opQueue.add(removeOp)) { - if (tripleStoreException != null) { - throw wrapTripleStoreException(); - } else { - Thread.yield(); - } - } - - while (!removeOp.finished) { - if (tripleStoreException != null) { - throw wrapTripleStoreException(); - } else { - Thread.yield(); - } - } - return removeCount[0]; - } else { - return removeStatements(subjID, predID, objID, explicit, contextIds); - } + return removeStatements(subjID, predID, objID, explicit, contextIds); } catch (IOException e) { - rollback(); + txnContext.rollback(); throw new SailException(e); } catch (RuntimeException e) { - rollback(); + txnContext.rollback(); logger.error("Encountered an unexpected problem while trying to remove statements", e); throw e; } finally { @@ -880,6 +1227,25 @@ public void execute() throws Exception { } } + private long bufferRemoveStatements(Resource subj, IRI pred, Value obj, boolean explicit, Resource... contexts) + throws SailException { + Set removed = new LinkedHashSet<>(); + SailSource source = new LmdbSailSource(explicit, contextKey); + try (SailDataset dataset = source.dataset(level); + CloseableIteration iter = dataset.getStatements(subj, pred, obj, contexts)) { + while (iter.hasNext()) { + Statement statement = iter.next(); + LmdbTxnContext.Quad quad = new LmdbTxnContext.Quad(statement.getSubject(), + statement.getPredicate(), statement.getObject(), statement.getContext()); + if (removed.add(quad)) { + txnContext.recordRemove(statement.getSubject(), statement.getPredicate(), statement.getObject(), + statement.getContext(), explicit); + } + } + } + return removed.size(); + } + @Override public boolean deprecateByQuery(Resource subj, IRI pred, Value obj, Resource[] contexts) { return removeStatements(subj, pred, obj, explicit, contexts) > 0; @@ -894,37 +1260,87 @@ public boolean supportsDeprecateByQuery() { private final class LmdbSailDataset implements SailDataset { private final boolean explicit; - private final Txn txn; + private final Object contextKey; + private final LmdbTxnContext txnContext; + private final IsolationLevel level; + private final Txn sharedTxn; - public LmdbSailDataset(boolean explicit) throws SailException { + public LmdbSailDataset(boolean explicit, Object contextKey, LmdbTxnContext txnContext, IsolationLevel level) + throws SailException { this.explicit = explicit; + this.contextKey = contextKey; + this.txnContext = txnContext; + this.level = level; + Txn localTxn = null; + LmdbSailStore.this.acquireCommitReadLock(); try { - this.txn = tripleStore.getTxnManager().createReadTxn(); + if (txnContext != null) { + localTxn = txnContext.acquireReadTxn(level); + } else { + localTxn = tripleStore.getTxnManager().createReadTxn(); + } } catch (IOException e) { + if (localTxn != null) { + localTxn.close(); + } throw new SailException(e); + } finally { + LmdbSailStore.this.releaseCommitReadLock(); } + this.sharedTxn = localTxn; } @Override public void close() { // close the associated txn - txn.close(); + try { + if (sharedTxn != null) { + sharedTxn.close(); + } + } finally { + if (txnContext != null) { + txnContext.releaseReadSnapshot(); + } + releaseContext(contextKey, txnContext); + } } @Override public String getNamespace(String prefix) throws SailException { + if (txnContext != null) { + return txnContext.getNamespace(prefix); + } return namespaceStore.getNamespace(prefix); } @Override public CloseableIteration getNamespaces() { + if (txnContext != null) { + return new CloseableIteratorIteration(txnContext.getNamespaces().iterator()); + } return new CloseableIteratorIteration(namespaceStore.iterator()); } @Override public CloseableIteration getContextIDs() throws SailException { + recordObservedPattern(null, null, null); + if (txnContext != null && txnContext.shouldOverlayChanges()) { + LmdbTxnContext.ChangeView changeView = txnContext.snapshotChanges(explicit); + if (!changeView.added.isEmpty() || !changeView.removed.isEmpty()) { + Set contexts = new LinkedHashSet<>(); + try (CloseableIteration iter = getStatements(null, null, null)) { + while (iter.hasNext()) { + Resource ctx = iter.next().getContext(); + if (ctx != null) { + contexts.add(ctx); + } + } + } + return new CloseableIteratorIteration<>(contexts.iterator()); + } + } try { - return new LmdbContextIterator(tripleStore.getContexts(txn), valueStore); + return new LmdbContextIterator(tripleStore.getContexts(sharedTxn), valueStore); } catch (IOException e) { throw new SailException("Unable to get contexts", e); } @@ -933,20 +1349,153 @@ public CloseableIteration getContextIDs() throws SailExcepti @Override public CloseableIteration getStatements(Resource subj, IRI pred, Value obj, Resource... contexts) throws SailException { + recordObservedPattern(subj, pred, obj, contexts); try { - return createStatementIterator(txn, subj, pred, obj, explicit, contexts); + CloseableIteration baseIter = createStatementIterator(sharedTxn, subj, pred, obj, + explicit, contexts); + if (txnContext == null || !txnContext.shouldOverlayChanges()) { + return baseIter; + } + LmdbTxnContext.ChangeView changeView = txnContext.snapshotChanges(explicit); + Set added = changeView.added; + Set removed = new LinkedHashSet<>(changeView.removed); + if (added.isEmpty() && removed.isEmpty()) { + return baseIter; + } + boolean matchAllContexts = contexts.length == 0; + Set contextSet = matchAllContexts ? Set.of() : resolveContexts(contexts); + if (!matchAllContexts && contextSet.isEmpty()) { + baseIter.close(); + return CloseableIteration.EMPTY_STATEMENT_ITERATION; + } + CloseableIteration filtered = new FilterIteration(baseIter) { + @Override + protected boolean accept(Statement st) throws SailException { + LmdbTxnContext.Quad quad = toQuad(st); + return !removed.contains(quad) && !added.contains(quad); + } + + @Override + protected void handleClose() { + // handled by base iteration + } + }; + List addedStatements = collectAddedStatements(added, removed, subj, pred, obj, contextSet, + matchAllContexts); + if (addedStatements.isEmpty()) { + return filtered; + } + CloseableIteration addedIter = new CloseableIteratorIteration<>(addedStatements.iterator()); + return new UnionIteration<>(List.of(filtered, addedIter)); } catch (IOException e) { try { logger.warn("Failed to get statements, retrying", e); // try once more before giving up Thread.yield(); - return createStatementIterator(txn, subj, pred, obj, explicit, contexts); + Txn retryTxn = txnContext != null ? txnContext.acquireReadTxn(level) + : tripleStore.getTxnManager().createReadTxn(); + CloseableIteration baseIter = createStatementIterator(retryTxn, subj, pred, + obj, + explicit, contexts); + return new TxnClosingIteration<>(baseIter, retryTxn); } catch (IOException e2) { throw new SailException("Unable to get statements", e); } } } + private Set resolveContexts(Resource... contexts) { + Set resolved = new LinkedHashSet<>(); + for (Resource context : contexts) { + if (context == null) { + resolved.add(null); + } else if (!context.isTriple()) { + resolved.add(context); + } + } + return resolved; + } + + private void recordObservedPattern(Resource subj, IRI pred, Value obj, Resource... contexts) { + if (txnContext == null || level == null || !level.isCompatibleWith(IsolationLevels.SERIALIZABLE)) { + return; + } + Set patterns = new LinkedHashSet<>(); + if (contexts == null) { + patterns.add(new SimpleStatementPattern(subj, pred, obj, null, false)); + } else if (contexts.length == 0) { + patterns.add(new SimpleStatementPattern(subj, pred, obj, null, true)); + } else { + for (Resource context : contexts) { + patterns.add(new SimpleStatementPattern(subj, pred, obj, context, false)); + } + } + txnContext.recordObserved(explicit, patterns); + } + + private final class TxnClosingIteration extends IterationWrapper { + private final Txn txn; + + private TxnClosingIteration(CloseableIteration iter, Txn txn) { + super(iter); + this.txn = txn; + } + + @Override + protected void handleClose() { + try { + super.handleClose(); + } finally { + txn.close(); + } + } + } + + private LmdbTxnContext.Quad toQuad(Statement statement) throws SailException { + return new LmdbTxnContext.Quad(statement.getSubject(), statement.getPredicate(), statement.getObject(), + statement.getContext()); + } + + private List collectAddedStatements(Set added, Set removed, + Resource subj, IRI pred, Value obj, Set contextSet, boolean matchAllContexts) + throws SailException { + if (added.isEmpty()) { + return List.of(); + } + List statements = new ArrayList<>(); + for (LmdbTxnContext.Quad quad : added) { + if (removed.contains(quad)) { + continue; + } + if (!matches(quad, subj, pred, obj, contextSet, matchAllContexts)) { + continue; + } + statements.add(toStatement(quad)); + } + return statements; + } + + private boolean matches(LmdbTxnContext.Quad quad, Resource subj, IRI pred, Value obj, + Set contextSet, boolean matchAllContexts) { + if (subj != null && !Objects.equals(subj, quad.subj)) { + return false; + } + if (pred != null && !Objects.equals(pred, quad.pred)) { + return false; + } + if (obj != null && !Objects.equals(obj, quad.obj)) { + return false; + } + if (matchAllContexts) { + return true; + } + return contextSet.contains(quad.ctx); + } + + private Statement toStatement(LmdbTxnContext.Quad quad) throws SailException { + return valueStore.createStatement(quad.subj, quad.pred, quad.obj, quad.ctx); + } + @Override public CloseableIteration getStatements(StatementOrder statementOrder, Resource subj, IRI pred, Value obj, Resource... contexts) throws SailException { diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStore.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStore.java index 2dc7fd6e517..97d968807c5 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStore.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStore.java @@ -8,6 +8,7 @@ * * SPDX-License-Identifier: BSD-3-Clause *******************************************************************************/ +// Some portions generated by Codex package org.eclipse.rdf4j.sail.lmdb; import java.io.File; @@ -16,7 +17,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Comparator; -import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; import java.util.stream.Stream; @@ -25,9 +25,7 @@ import org.eclipse.rdf4j.collection.factory.mapdb.MapDb3CollectionFactory; import org.eclipse.rdf4j.common.annotation.Experimental; import org.eclipse.rdf4j.common.concurrent.locks.Lock; -import org.eclipse.rdf4j.common.concurrent.locks.LockManager; import org.eclipse.rdf4j.common.io.MavenUtil; -import org.eclipse.rdf4j.common.transaction.IsolationLevel; import org.eclipse.rdf4j.common.transaction.IsolationLevels; import org.eclipse.rdf4j.model.ValueFactory; import org.eclipse.rdf4j.query.algebra.evaluation.EvaluationStrategy; @@ -36,12 +34,9 @@ import org.eclipse.rdf4j.query.algebra.evaluation.federation.FederatedServiceResolverClient; import org.eclipse.rdf4j.query.algebra.evaluation.impl.StrictEvaluationStrategyFactory; import org.eclipse.rdf4j.repository.sparql.federation.SPARQLServiceResolver; -import org.eclipse.rdf4j.sail.InterruptedSailException; import org.eclipse.rdf4j.sail.NotifyingSailConnection; import org.eclipse.rdf4j.sail.SailException; -import org.eclipse.rdf4j.sail.base.SailSource; import org.eclipse.rdf4j.sail.base.SailStore; -import org.eclipse.rdf4j.sail.base.SnapshotSailStore; import org.eclipse.rdf4j.sail.helpers.AbstractNotifyingSail; import org.eclipse.rdf4j.sail.helpers.DirectoryLockManager; import org.eclipse.rdf4j.sail.lmdb.config.LmdbStoreConfig; @@ -97,21 +92,6 @@ public class LmdbStore extends AbstractNotifyingSail implements FederatedService */ private SPARQLServiceResolver dependentServiceResolver; - /** - * Lock manager used to prevent concurrent {@link #getTransactionLock(IsolationLevel)} calls. - */ - private final ReentrantLock txnLockManager = new ReentrantLock(); - - /** - * Holds locks for all isolated transactions. - */ - private final LockManager isolatedLockManager = new LockManager(debugEnabled()); - - /** - * Holds locks for all {@link IsolationLevels#NONE} isolation transactions. - */ - private final LockManager disabledIsolationLockManager = new LockManager(debugEnabled()); - /*--------------* * Constructors * *--------------*/ @@ -255,36 +235,7 @@ protected void initializeInternal() throws SailException { FileUtils.writeStringToFile(versionFile, VERSION, StandardCharsets.UTF_8); } backingStore = new LmdbSailStore(dataDir, config); - this.store = new SnapshotSailStore(backingStore, () -> new MemoryOverflowModel(false) { - @Override - protected LmdbSailStore createSailStore(File dataDir) throws IOException, SailException { - // Model can't fit into memory, use another LmdbSailStore to store delta - LmdbSailStore lmdbSailStore = new LmdbSailStore(dataDir, config); - lmdbSailStore.enableMultiThreading = false; - return lmdbSailStore; - } - }) { - - @Override - public SailSource getExplicitSailSource() { - if (isIsolationDisabled()) { - // no isolation, use LmdbSailStore directly - return backingStore.getExplicitSailSource(); - } else { - return super.getExplicitSailSource(); - } - } - - @Override - public SailSource getInferredSailSource() { - if (isIsolationDisabled()) { - // no isolation, use LmdbSailStore directly - return backingStore.getInferredSailSource(); - } else { - return super.getInferredSailSource(); - } - } - }; + this.store = backingStore; } catch (Throwable e) { // LmdbStore initialization failed, release any allocated files dirLock.release(); @@ -353,48 +304,6 @@ public ValueFactory getValueFactory() { return store.getValueFactory(); } - /** - * This call will block when {@link IsolationLevels#NONE} is provided when there are active transactions with a - * higher isolation and block when a higher isolation is provided when there are active transactions with - * {@link IsolationLevels#NONE} isolation. Store is either exclusively in {@link IsolationLevels#NONE} isolation - * with potentially zero or more transactions, or exclusively in higher isolation mode with potentially zero or more - * transactions. - * - * @param level indicating desired mode {@link IsolationLevels#NONE} or higher - * @return Lock used to prevent Store from switching isolation modes - * @throws SailException - */ - protected Lock getTransactionLock(IsolationLevel level) throws SailException { - txnLockManager.lock(); - try { - if (IsolationLevels.NONE.isCompatibleWith(level)) { - // make sure no isolated transaction are active - isolatedLockManager.waitForActiveLocks(); - // mark isolation as disabled - return disabledIsolationLockManager.createLock(level.toString()); - } else { - // make sure isolation is not disabled - disabledIsolationLockManager.waitForActiveLocks(); - // mark isolated transaction as active - return isolatedLockManager.createLock(level.toString()); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new InterruptedSailException(e); - } finally { - txnLockManager.unlock(); - } - } - - /** - * Checks if any {@link IsolationLevels#NONE} isolation transactions are active. - * - * @return true if at least one transaction has direct access to the indexes - */ - boolean isIsolationDisabled() { - return disabledIsolationLockManager.isActiveLock(); - } - SailStore getSailStore() { return store; } diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStoreConnection.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStoreConnection.java index 9c0577e655a..47f26ac35e2 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStoreConnection.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbStoreConnection.java @@ -8,11 +8,12 @@ * * SPDX-License-Identifier: BSD-3-Clause *******************************************************************************/ +// Some portions generated by Codex package org.eclipse.rdf4j.sail.lmdb; -import org.eclipse.rdf4j.common.concurrent.locks.Lock; import org.eclipse.rdf4j.common.iteration.CloseableIteration; import org.eclipse.rdf4j.common.iteration.IterationWrapper; +import org.eclipse.rdf4j.common.transaction.IsolationLevel; import org.eclipse.rdf4j.model.IRI; import org.eclipse.rdf4j.model.Resource; import org.eclipse.rdf4j.model.Statement; @@ -23,6 +24,7 @@ import org.eclipse.rdf4j.query.algebra.TupleExpr; import org.eclipse.rdf4j.sail.SailException; import org.eclipse.rdf4j.sail.SailReadOnlyException; +import org.eclipse.rdf4j.sail.UpdateContext; import org.eclipse.rdf4j.sail.base.SailSourceConnection; import org.eclipse.rdf4j.sail.helpers.DefaultSailChangedEvent; import org.eclipse.rdf4j.sail.lmdb.model.LmdbValue; @@ -32,11 +34,14 @@ */ public class LmdbStoreConnection extends SailSourceConnection { + private static final ThreadLocal STORE_HOLDER = new ThreadLocal<>(); + /*-----------* * Constants * *-----------*/ protected final LmdbStore lmdbStore; + private final LmdbSailStore.ConnectionSailStore connectionStore; /*-----------* * Variables * @@ -44,21 +49,24 @@ public class LmdbStoreConnection extends SailSourceConnection { private volatile DefaultSailChangedEvent sailChangedEvent; - /** - * The transaction lock held by this connection during transactions. - */ - private volatile Lock txnLock; - /*--------------* * Constructors * *--------------*/ protected LmdbStoreConnection(LmdbStore sail) { - super(sail, sail.getSailStore(), sail.getEvaluationStrategyFactory()); + super(sail, initStore(sail), sail.getEvaluationStrategyFactory()); this.lmdbStore = sail; + this.connectionStore = STORE_HOLDER.get(); + STORE_HOLDER.remove(); sailChangedEvent = new DefaultSailChangedEvent(sail); } + private static LmdbSailStore.ConnectionSailStore initStore(LmdbStore sail) { + LmdbSailStore.ConnectionSailStore store = sail.getBackingStore().createConnectionStore(); + STORE_HOLDER.set(store); + return store; + } + /*---------* * Methods * *---------*/ @@ -68,37 +76,68 @@ protected void startTransactionInternal() throws SailException { if (!lmdbStore.isWritable()) { throw new SailReadOnlyException("Unable to start transaction: data file is locked or read-only"); } - boolean releaseLock = true; + IsolationLevel level = getTransactionIsolation(); + connectionStore.initTransaction(level); + boolean started = false; try { - if (txnLock == null || !txnLock.isActive()) { - txnLock = lmdbStore.getTransactionLock(getTransactionIsolation()); - if (lmdbStore.isIsolationDisabled()) { - // if the transaction isn't isolated then we need to keep holding our exclusive lock until commit - releaseLock = false; - } - } super.startTransactionInternal(); + started = true; } finally { - if (releaseLock && txnLock != null && txnLock.isActive()) { - txnLock.release(); + if (!started) { + connectionStore.endTransaction(false); } } } @Override - protected void commitInternal() throws SailException { + public void startUpdate(UpdateContext op) throws SailException { + if (op != null) { + connectionStore.beginUpdate(); + boolean success = false; + try { + super.startUpdate(op); + success = true; + } finally { + if (!success) { + connectionStore.abortUpdate(); + } + } + } else { + super.startUpdate(op); + } + } + + @Override + protected void endUpdateInternal(UpdateContext op) throws SailException { + boolean success = false; try { - super.commitInternal(); + super.endUpdateInternal(op); + success = true; } finally { - if (txnLock != null && txnLock.isActive()) { - txnLock.release(); + if (op != null) { + if (success) { + connectionStore.endUpdate(); + } else { + connectionStore.abortUpdate(); + } } } + } + + @Override + protected void commitInternal() throws SailException { + boolean committed = false; + try { + super.commitInternal(); - lmdbStore.notifySailChanged(sailChangedEvent); + lmdbStore.notifySailChanged(sailChangedEvent); - // create a fresh event object. - sailChangedEvent = new DefaultSailChangedEvent(lmdbStore); + // create a fresh event object. + sailChangedEvent = new DefaultSailChangedEvent(lmdbStore); + committed = true; + } finally { + connectionStore.endTransaction(committed); + } } @Override @@ -106,12 +145,13 @@ protected void rollbackInternal() throws SailException { try { super.rollbackInternal(); } finally { - if (txnLock != null && txnLock.isActive()) { - txnLock.release(); + try { + connectionStore.endTransaction(false); + } finally { + // create a fresh event object. + sailChangedEvent = new DefaultSailChangedEvent(lmdbStore); } } - // create a fresh event object. - sailChangedEvent = new DefaultSailChangedEvent(lmdbStore); } @Override diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbTxnContext.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbTxnContext.java new file mode 100644 index 00000000000..c90de1c3b86 --- /dev/null +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbTxnContext.java @@ -0,0 +1,1095 @@ +/******************************************************************************* + * Copyright (c) 2026 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +// Some portions generated by Codex +package org.eclipse.rdf4j.sail.lmdb; + +import java.io.IOException; +import java.lang.management.LockInfo; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Deque; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; + +import org.eclipse.rdf4j.common.transaction.IsolationLevel; +import org.eclipse.rdf4j.common.transaction.IsolationLevels; +import org.eclipse.rdf4j.model.BNode; +import org.eclipse.rdf4j.model.IRI; +import org.eclipse.rdf4j.model.Literal; +import org.eclipse.rdf4j.model.Namespace; +import org.eclipse.rdf4j.model.Resource; +import org.eclipse.rdf4j.model.Triple; +import org.eclipse.rdf4j.model.Value; +import org.eclipse.rdf4j.model.ValueFactory; +import org.eclipse.rdf4j.model.impl.SimpleNamespace; +import org.eclipse.rdf4j.model.impl.SimpleValueFactory; +import org.eclipse.rdf4j.sail.InterruptedSailException; +import org.eclipse.rdf4j.sail.SailException; +import org.eclipse.rdf4j.sail.base.Changeset.SimpleStatementPattern; +import org.eclipse.rdf4j.sail.lmdb.TxnManager.Txn; +import org.eclipse.rdf4j.sail.lmdb.model.LmdbValue; + +class LmdbTxnContext { + + private final LmdbSailStore store; + private final ReentrantLock writeTxnLock; + private final NamespaceTxn namespaceTxn = new NamespaceTxn(); + private final Set addedExplicit = new LinkedHashSet<>(); + private final Set removedExplicit = new LinkedHashSet<>(); + private final Set addedInferred = new LinkedHashSet<>(); + private final Set removedInferred = new LinkedHashSet<>(); + private final Set deprecatedExplicit = new LinkedHashSet<>(); + private final Set deprecatedInferred = new LinkedHashSet<>(); + private final Deque pendingUpdates = new ArrayDeque<>(); + + private IsolationLevel level; + private boolean initialized; + private boolean writeActive; + private boolean finished; + private boolean writeLockHeld; + private boolean managedTransaction; + private boolean deferWrites; + private boolean overlayChanges; + private boolean snapshotReadMode; + private boolean snapshotWriteMode; + private boolean pinnedSnapshotTransactionScoped; + private int pinnedSnapshotRefCount; + private long ownerThreadId = -1; + + private Txn pinnedTripleReadTxn; + private ValueStore.PinnedReadTxn pinnedValueReadTxn; + private Set observedExplicit; + private Set observedInferred; + + LmdbTxnContext(LmdbSailStore store) { + this.store = store; + this.writeTxnLock = store.getWriteTxnLock(); + } + + void init(IsolationLevel level) throws SailException { + if (finished) { + resetStateForNewTransaction(); + } + if (initialized) { + return; + } + this.level = level; + initialized = true; + snapshotWriteMode = IsolationLevels.SNAPSHOT.equals(level); + boolean serializable = IsolationLevels.SERIALIZABLE.equals(level); + snapshotReadMode = snapshotWriteMode || IsolationLevels.SNAPSHOT_READ.equals(level) || serializable; + if (snapshotWriteMode || serializable) { + ensurePinnedReadSnapshot(); + pinnedSnapshotTransactionScoped = true; + } + if (snapshotReadMode) { + deferWrites = true; + overlayChanges = true; + } + } + + boolean deferWrites() { + return deferWrites; + } + + boolean shouldOverlayChanges() { + return overlayChanges; + } + + boolean hasStatementChanges() { + return !(addedExplicit.isEmpty() && removedExplicit.isEmpty() && addedInferred.isEmpty() + && removedInferred.isEmpty()); + } + + void markManagedTransaction() { + managedTransaction = true; + deferWrites = true; + overlayChanges = true; + } + + boolean isManagedTransaction() { + return managedTransaction; + } + + boolean isSerializable() { + return level != null && level.isCompatibleWith(IsolationLevels.SERIALIZABLE); + } + + boolean usesPinnedReadTxn() { + return pinnedTripleReadTxn != null; + } + + void recordObserved(boolean explicit, Set observed) { + if (observed == null || observed.isEmpty()) { + return; + } + Set target = explicit ? observedExplicit : observedInferred; + if (target == null) { + target = new LinkedHashSet<>(observed); + if (explicit) { + observedExplicit = target; + } else { + observedInferred = target; + } + } else { + target.addAll(observed); + } + } + + Set getObserved(boolean explicit) { + return explicit ? observedExplicit : observedInferred; + } + + Txn acquireReadTxn(IsolationLevel level) throws SailException { + init(level); + verifyThread(); + try { + if (snapshotReadMode && !pinnedSnapshotTransactionScoped) { + ensurePinnedReadSnapshot(); + pinnedSnapshotRefCount++; + } + if (pinnedTripleReadTxn != null) { + return store.getTripleStore().getTxnManager().createTxn(pinnedTripleReadTxn.get()); + } + return store.getTripleStore().createReadTxnForCurrentThread(); + } catch (IOException e) { + throw new SailException(e); + } + } + + Txn acquirePinnedReadTxn() throws SailException { + init(level); + verifyThread(); + try { + if (pinnedTripleReadTxn != null) { + return store.getTripleStore().getTxnManager().createTxn(pinnedTripleReadTxn.get()); + } + return store.getTripleStore().createReadTxnForCurrentThread(); + } catch (IOException e) { + throw new SailException(e); + } + } + + void ensureWriteTxn(IsolationLevel level) throws SailException { + init(level); + ensureWriteTxnInitialized(); + } + + private void ensurePinnedReadSnapshot() throws SailException { + if (pinnedTripleReadTxn != null) { + verifyThread(); + return; + } + bindToCurrentThread(); + store.acquireCommitReadLock(); + try { + if (pinnedTripleReadTxn == null) { + pinnedTripleReadTxn = store.getTripleStore().createPinnedReadTxn(); + } + if (pinnedValueReadTxn == null) { + pinnedValueReadTxn = store.getValueStore().pinReadTxn(); + } + } catch (IOException e) { + if (pinnedTripleReadTxn != null) { + pinnedTripleReadTxn.close(); + pinnedTripleReadTxn = null; + } + if (pinnedValueReadTxn != null) { + store.getValueStore().clearPinnedReadTxn(pinnedValueReadTxn); + pinnedValueReadTxn = null; + } + throw new SailException(e); + } finally { + store.releaseCommitReadLock(); + } + } + + private void ensureWriteTxnInitialized() throws SailException { + if (writeActive) { + verifyThread(); + return; + } + bindToCurrentThread(); + try { + boolean b = writeTxnLock.tryLock(10, TimeUnit.SECONDS); + int i = 0; + while (!b) { + i++; + System.err.println( + Thread.currentThread() + " - LMDB writeTxnLock wait took more than " + i * 10 + " seconds"); + // print lock diagnostics + printWriteTxnLockDiagnostics(i * 10, i == 1 || i % 6 == 0); + + b = writeTxnLock.tryLock(10, TimeUnit.SECONDS); + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedSailException(); + } + } + + writeLockHeld = true; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new InterruptedSailException(e); + } + try { + store.getTripleStore().startTransaction(); + store.getValueStore().startTransaction(true); + writeActive = true; + } catch (IOException e) { + releaseWriteLock(); + throw new SailException(e); + } + } + + void commit() throws SailException { + if (finished) { + return; + } + verifyThread(); + boolean commitLockHeld = false; + try { + boolean hasBufferedWrites = deferWrites && hasStatementChanges(); + if (writeActive || hasBufferedWrites) { + if (!writeActive) { + ensureWriteTxn(level); + } + store.acquireCommitWriteLock(); + commitLockHeld = true; + checkSerializableConflicts(); + if (hasBufferedWrites) { + store.applyBufferedChanges(this); + } + store.getTripleStore().commit(); + store.filterUsedIdsInTripleStore(); + store.handleRemovedIdsInValueStore(); + store.getValueStore().commit(); + } + namespaceTxn.apply(store.getNamespaceStore()); + store.getNamespaceStore().sync(); + clearStatementChanges(); + } catch (org.eclipse.rdf4j.sail.SailConflictException e) { + rollbackSilently(); + throw e; + } catch (IOException e) { + rollbackSilently(); + throw new SailException(e); + } finally { + if (commitLockHeld) { + store.releaseCommitWriteLock(); + } + cleanup(); + finished = true; + } + } + + void checkSerializableConflictsForPrepare() throws SailException { + if (!isSerializable()) { + return; + } + if ((observedExplicit == null || observedExplicit.isEmpty()) + && (observedInferred == null || observedInferred.isEmpty())) { + return; + } + store.acquireCommitReadLock(); + try { + checkSerializableConflicts(); + } finally { + store.releaseCommitReadLock(); + } + } + + private void checkSerializableConflicts() throws SailException { + if ((observedExplicit == null || observedExplicit.isEmpty()) + && (observedInferred == null || observedInferred.isEmpty())) { + return; + } + try (Txn snapshotTxn = acquirePinnedReadTxn(); + Txn currentTxn = store.getTripleStore().getTxnManager().createReadTxn()) { + store.ensureObservedPatternsUnchanged(snapshotTxn, currentTxn, observedExplicit, true); + store.ensureObservedPatternsUnchanged(snapshotTxn, currentTxn, observedInferred, false); + } catch (IOException e) { + throw new SailException(e); + } + } + + void releaseReadSnapshot() { + if (!snapshotReadMode || pinnedSnapshotTransactionScoped) { + return; + } + if (pinnedSnapshotRefCount > 0) { + pinnedSnapshotRefCount--; + } + if (pinnedSnapshotRefCount == 0) { + closePinnedReadSnapshot(); + } + } + + void rollback() throws SailException { + if (finished) { + return; + } + verifyThread(); + try { + if (writeActive) { + store.getTripleStore().rollback(); + store.getValueStore().rollback(); + } + store.clearUnusedIds(); + namespaceTxn.clear(); + clearStatementChanges(); + } catch (IOException e) { + throw new SailException(e); + } finally { + cleanup(); + finished = true; + } + } + + void close() throws SailException { + if (!finished) { + rollback(); + } + } + + void setNamespace(String prefix, String name) { + namespaceTxn.set(prefix, name); + } + + void removeNamespace(String prefix) { + namespaceTxn.remove(prefix); + } + + void clearNamespaces() { + namespaceTxn.clearAll(); + } + + void beginUpdate() throws SailException { + if (finished) { + resetStateForNewTransaction(); + } + deferWrites = true; + overlayChanges = true; + if (!managedTransaction && !snapshotReadMode && level != null) { + ensureWriteTxn(level); + } + pendingUpdates.push(new PendingChanges()); + } + + void endUpdate() throws SailException { + if (pendingUpdates.isEmpty()) { + return; + } + PendingChanges completed = pendingUpdates.pop(); + PendingChanges parent = pendingUpdates.peek(); + if (parent != null) { + parent.mergeFrom(completed); + return; + } + if (deferWrites) { + mergeIntoMain(completed); + if (managedTransaction) { + return; + } + try { + if (!writeActive && level != null) { + ensureWriteTxn(level); + } + store.applyBufferedChanges(this); + } catch (IOException e) { + throw new SailException(e); + } finally { + clearStatementChanges(); + deferWrites = false; + overlayChanges = false; + } + } else { + mergeIntoMain(completed); + } + } + + void abortUpdate() { + if (!pendingUpdates.isEmpty()) { + pendingUpdates.pop(); + } + if (pendingUpdates.isEmpty() && deferWrites) { + if (managedTransaction) { + return; + } + clearStatementChanges(); + deferWrites = false; + overlayChanges = false; + } + } + + void recordAdd(Resource subj, IRI pred, Value obj, Resource ctx, boolean explicit) { + if (!deferWrites) { + return; + } + Quad quad = new Quad(subj, pred, obj, ctx); + PendingChanges pending = pendingUpdates.peek(); + if (pending != null) { + pending.recordAdd(quad, explicit); + return; + } + if (explicit) { + if (deferWrites) { + applyAddIfMissing(addedExplicit, removedExplicit, quad, true); + } else { + applyAdd(addedExplicit, removedExplicit, quad); + } + } else { + if (deferWrites) { + applyAddIfMissing(addedInferred, removedInferred, quad, false); + } else { + applyAdd(addedInferred, removedInferred, quad); + } + } + } + + void recordRemove(Resource subj, IRI pred, Value obj, Resource ctx, boolean explicit) { + if (!deferWrites) { + return; + } + Quad quad = new Quad(subj, pred, obj, ctx); + PendingChanges pending = pendingUpdates.peek(); + if (pending != null) { + pending.recordRemove(quad, explicit); + return; + } + if (explicit) { + deprecatedExplicit.add(quad); + applyRemove(addedExplicit, removedExplicit, quad); + } else { + deprecatedInferred.add(quad); + applyRemove(addedInferred, removedInferred, quad); + } + } + + Set getAdded(boolean explicit) { + return explicit ? addedExplicit : addedInferred; + } + + Set getRemoved(boolean explicit) { + return explicit ? removedExplicit : removedInferred; + } + + ChangeView snapshotChanges(boolean explicit) { + Set added = new LinkedHashSet<>(explicit ? addedExplicit : addedInferred); + Set removed = new LinkedHashSet<>(explicit ? removedExplicit : removedInferred); + return new ChangeView(added, removed); + } + + Set snapshotDeprecated(boolean explicit) { + Set deprecated = new LinkedHashSet<>(explicit ? deprecatedExplicit : deprecatedInferred); + applyPendingDeprecatedTo(deprecated, explicit); + return deprecated; + } + + String getNamespace(String prefix) { + return namespaceTxn.get(prefix, store.getNamespaceStore()); + } + + List getNamespaces() { + return namespaceTxn.snapshot(store.getNamespaceStore()); + } + + private void bindToCurrentThread() throws SailException { + long currentThreadId = Thread.currentThread().getId(); + if (ownerThreadId == -1) { + ownerThreadId = currentThreadId; + } else if (ownerThreadId != currentThreadId) { + throw new SailException("LMDB transactions are thread-bound; use a single thread per transaction"); + } + } + + private void verifyThread() throws SailException { + if (ownerThreadId != -1 && ownerThreadId != Thread.currentThread().getId()) { + if (!managedTransaction && !writeActive) { + return; + } + throw new SailException("LMDB transactions are thread-bound; use a single thread per transaction"); + } + } + + private void cleanup() { + clearStatementChanges(); + observedExplicit = null; + observedInferred = null; + managedTransaction = false; + ownerThreadId = -1; + writeActive = false; + deferWrites = false; + overlayChanges = false; + snapshotReadMode = false; + snapshotWriteMode = false; + pinnedSnapshotTransactionScoped = false; + pinnedSnapshotRefCount = 0; + closePinnedReadSnapshot(); + releaseWriteLock(); + } + + private void resetStateForNewTransaction() { + finished = false; + initialized = false; + writeActive = false; + writeLockHeld = false; + managedTransaction = false; + deferWrites = false; + overlayChanges = false; + snapshotReadMode = false; + snapshotWriteMode = false; + pinnedSnapshotTransactionScoped = false; + pinnedSnapshotRefCount = 0; + observedExplicit = null; + observedInferred = null; + ownerThreadId = -1; + clearStatementChanges(); + closePinnedReadSnapshot(); + } + + private void releaseWriteLock() { + if (writeLockHeld) { + writeLockHeld = false; + writeTxnLock.unlock(); + } + } + + private void rollbackSilently() { + try { + if (writeActive) { + store.getTripleStore().rollback(); + store.getValueStore().rollback(); + } + } catch (IOException e) { + // ignore + } finally { + store.clearUnusedIds(); + } + } + + private void closePinnedReadSnapshot() { + if (pinnedTripleReadTxn != null) { + pinnedTripleReadTxn.close(); + pinnedTripleReadTxn = null; + } + if (pinnedValueReadTxn != null) { + store.getValueStore().clearPinnedReadTxn(pinnedValueReadTxn); + pinnedValueReadTxn = null; + } + } + + private void clearStatementChanges() { + addedExplicit.clear(); + removedExplicit.clear(); + addedInferred.clear(); + removedInferred.clear(); + deprecatedExplicit.clear(); + deprecatedInferred.clear(); + pendingUpdates.clear(); + } + + private void mergeIntoMain(PendingChanges changes) { + applyPendingRemovals(changes, true); + applyPendingRemovals(changes, false); + applyPendingAdds(changes, true); + applyPendingAdds(changes, false); + applyPendingDeprecated(changes, true); + applyPendingDeprecated(changes, false); + } + + private static void applyAdd(Set added, Set removed, Quad quad) { + removed.remove(quad); + added.add(quad); + } + + private void applyAddIfMissing(Set added, Set removed, Quad quad, boolean explicit) { + if (added.contains(quad)) { + return; + } + if (removed.remove(quad)) { + added.add(quad); + return; + } + if (existsInStore(quad, explicit)) { + return; + } + added.add(quad); + } + + private static void applyRemove(Set added, Set removed, Quad quad) { + if (added.remove(quad)) { + return; + } + removed.add(quad); + } + + private void applyPendingRemovals(PendingChanges changes, boolean explicit) { + Set removedSource = explicit ? changes.removedExplicit : changes.removedInferred; + Set added = explicit ? addedExplicit : addedInferred; + Set removed = explicit ? removedExplicit : removedInferred; + for (Quad quad : removedSource) { + applyRemove(added, removed, quad); + } + } + + private void applyPendingAdds(PendingChanges changes, boolean explicit) { + Set addedSource = explicit ? changes.addedExplicit : changes.addedInferred; + Set added = explicit ? addedExplicit : addedInferred; + Set removed = explicit ? removedExplicit : removedInferred; + for (Quad quad : addedSource) { + applyAddIfMissing(added, removed, quad, explicit); + } + } + + private void applyPendingDeprecated(PendingChanges changes, boolean explicit) { + Set deprecated = explicit ? deprecatedExplicit : deprecatedInferred; + changes.applyDeprecatedTo(deprecated, explicit); + } + + private void applyPendingTo(Set added, Set removed, boolean explicit) { + if (pendingUpdates.isEmpty()) { + return; + } + for (var iterator = pendingUpdates.descendingIterator(); iterator.hasNext();) { + PendingChanges pending = iterator.next(); + pending.applyTo(added, removed, explicit); + } + } + + private void applyPendingDeprecatedTo(Set deprecated, boolean explicit) { + if (pendingUpdates.isEmpty()) { + return; + } + for (var iterator = pendingUpdates.descendingIterator(); iterator.hasNext();) { + PendingChanges pending = iterator.next(); + pending.applyDeprecatedTo(deprecated, explicit); + } + } + + private boolean existsInStore(Quad quad, boolean explicit) { + store.acquireCommitReadLock(); + Txn txn = null; + boolean releaseSnapshot = false; + try { + txn = acquireReadTxn(level); + releaseSnapshot = snapshotReadMode && !pinnedSnapshotTransactionScoped; + ValueStore valueStore = store.getValueStore(); + long subjId = valueStore.getId(quad.subj, false); + if (subjId == LmdbValue.UNKNOWN_ID) { + return false; + } + long predId = valueStore.getId(quad.pred, false); + if (predId == LmdbValue.UNKNOWN_ID) { + return false; + } + long objId = valueStore.getId(quad.obj, false); + if (objId == LmdbValue.UNKNOWN_ID) { + return false; + } + long ctxId = 0L; + if (quad.ctx != null) { + ctxId = valueStore.getId(quad.ctx, false); + if (ctxId == LmdbValue.UNKNOWN_ID) { + return false; + } + } + RecordIterator records = store.getTripleStore().getTriples(txn, subjId, predId, objId, ctxId, explicit); + if (records == null) { + return false; + } + try { + return records.next() != null; + } finally { + records.close(); + } + } catch (IOException e) { + throw new SailException(e); + } finally { + if (txn != null) { + txn.close(); + } + if (releaseSnapshot) { + releaseReadSnapshot(); + } + store.releaseCommitReadLock(); + } + } + + static final class Quad { + private static final ValueFactory VALUE_FACTORY = SimpleValueFactory.getInstance(); + + final Resource subj; + final IRI pred; + final Value obj; + final Resource ctx; + + Quad(Resource subj, IRI pred, Value obj, Resource ctx) { + this.subj = (Resource) normalizeValue(subj); + this.pred = (IRI) normalizeValue(pred); + this.obj = normalizeValue(obj); + this.ctx = (Resource) normalizeValue(ctx); + } + + private static Value normalizeValue(Value value) { + if (value == null) { + return null; + } + if (value instanceof LmdbValue) { + ((LmdbValue) value).init(); + } + if (value instanceof IRI) { + return VALUE_FACTORY.createIRI(value.stringValue()); + } + if (value instanceof BNode) { + return VALUE_FACTORY.createBNode(((BNode) value).getID()); + } + if (value instanceof Literal) { + Literal literal = (Literal) value; + return literal.getLanguage() + .map(language -> VALUE_FACTORY.createLiteral(literal.getLabel(), language)) + .orElseGet(() -> VALUE_FACTORY.createLiteral(literal.getLabel(), literal.getDatatype())); + } + if (value instanceof Triple) { + Triple triple = (Triple) value; + Resource subj = (Resource) normalizeValue(triple.getSubject()); + IRI pred = (IRI) normalizeValue(triple.getPredicate()); + Value obj = normalizeValue(triple.getObject()); + return VALUE_FACTORY.createTriple(subj, pred, obj); + } + return value; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (!(other instanceof Quad)) { + return false; + } + Quad quad = (Quad) other; + return Objects.equals(subj, quad.subj) && Objects.equals(pred, quad.pred) + && Objects.equals(obj, quad.obj) && Objects.equals(ctx, quad.ctx); + } + + @Override + public int hashCode() { + return Objects.hash(subj, pred, obj, ctx); + } + } + + static final class ChangeView { + final Set added; + final Set removed; + + ChangeView(Set added, Set removed) { + this.added = added; + this.removed = removed; + } + } + + private static final class PendingChanges { + private final Set addedExplicit = new LinkedHashSet<>(); + private final Set removedExplicit = new LinkedHashSet<>(); + private final Set addedInferred = new LinkedHashSet<>(); + private final Set removedInferred = new LinkedHashSet<>(); + private final Set deprecatedExplicit = new LinkedHashSet<>(); + private final Set deprecatedInferred = new LinkedHashSet<>(); + + void recordAdd(Quad quad, boolean explicit) { + if (explicit) { + applyAdd(addedExplicit, removedExplicit, quad); + } else { + applyAdd(addedInferred, removedInferred, quad); + } + } + + void recordRemove(Quad quad, boolean explicit) { + if (explicit) { + applyRemove(addedExplicit, removedExplicit, quad); + deprecatedExplicit.add(quad); + } else { + applyRemove(addedInferred, removedInferred, quad); + deprecatedInferred.add(quad); + } + } + + void mergeFrom(PendingChanges other) { + other.applyTo(addedExplicit, removedExplicit, true); + other.applyTo(addedInferred, removedInferred, false); + other.applyDeprecatedTo(deprecatedExplicit, true); + other.applyDeprecatedTo(deprecatedInferred, false); + } + + void applyTo(Set added, Set removed, boolean explicit) { + Set removedSource = explicit ? removedExplicit : removedInferred; + for (Quad quad : removedSource) { + applyRemove(added, removed, quad); + } + Set addedSource = explicit ? addedExplicit : addedInferred; + for (Quad quad : addedSource) { + applyAdd(added, removed, quad); + } + } + + void applyDeprecatedTo(Set deprecated, boolean explicit) { + if (explicit) { + deprecated.addAll(deprecatedExplicit); + } else { + deprecated.addAll(deprecatedInferred); + } + } + } + + private static final class NamespaceTxn { + + private final Map additions = new LinkedHashMap<>(); + private final Set removals = new HashSet<>(); + private boolean cleared; + + void set(String prefix, String name) { + additions.put(prefix, name); + removals.remove(prefix); + } + + void remove(String prefix) { + additions.remove(prefix); + removals.add(prefix); + } + + void clearAll() { + cleared = true; + additions.clear(); + removals.clear(); + } + + void clear() { + cleared = false; + additions.clear(); + removals.clear(); + } + + boolean hasChanges() { + return cleared || !additions.isEmpty() || !removals.isEmpty(); + } + + String get(String prefix, NamespaceStore base) { + if (removals.contains(prefix)) { + return null; + } + String added = additions.get(prefix); + if (added != null) { + return added; + } + if (cleared) { + return null; + } + return base.getNamespace(prefix); + } + + List snapshot(NamespaceStore base) { + Map map = new LinkedHashMap<>(); + if (!cleared) { + for (SimpleNamespace ns : base) { + map.put(ns.getPrefix(), new SimpleNamespace(ns.getPrefix(), ns.getName())); + } + } + for (String removed : removals) { + map.remove(removed); + } + for (Map.Entry entry : additions.entrySet()) { + map.put(entry.getKey(), new SimpleNamespace(entry.getKey(), entry.getValue())); + } + return new ArrayList<>(map.values()); + } + + void apply(NamespaceStore base) { + if (!hasChanges()) { + return; + } + if (cleared) { + base.clear(); + } + for (String prefix : removals) { + base.removeNamespace(prefix); + } + for (Map.Entry entry : additions.entrySet()) { + base.setNamespace(entry.getKey(), entry.getValue()); + } + clear(); + } + } + + private void printWriteTxnLockDiagnostics(int waitedSeconds, boolean includeThreadDump) { + try { + System.err.println("LMDB writeTxnLock diagnostics after waiting " + waitedSeconds + " seconds"); + System.err.println(" lock=" + writeTxnLock); + System.err.println(" isLocked=" + writeTxnLock.isLocked() + + ", isHeldByCurrentThread=" + writeTxnLock.isHeldByCurrentThread() + + ", holdCount=" + writeTxnLock.getHoldCount() + + ", hasQueuedThreads=" + writeTxnLock.hasQueuedThreads() + + ", queueLength~=" + writeTxnLock.getQueueLength() + + ", isFair=" + writeTxnLock.isFair()); + + ThreadMXBean bean = ManagementFactory.getThreadMXBean(); + + // Deadlock detection can be very useful when a lock wait stretches out. + try { + long[] deadlocked = bean.findDeadlockedThreads(); + if (deadlocked != null && deadlocked.length > 0) { + System.err.println(" DEADLOCK DETECTED (thread ids): " + java.util.Arrays.toString(deadlocked)); + ThreadInfo[] infos = bean.getThreadInfo(deadlocked, true, true); + if (infos != null) { + for (ThreadInfo info : infos) { + if (info != null) { + printThreadInfo(info, 80); + } + } + } + } + } catch (Throwable t) { + System.err.println(" (Deadlock detection failed: " + t + ")"); + } + + // Try to identify and dump the owner thread stack if the lock can tell us. + String ownerThreadName = extractOwnerThreadName(writeTxnLock.toString()); + if (ownerThreadName != null) { + dumpThreadByName(ownerThreadName, 120); + } + + // Full-ish dump is intentionally throttled (e.g., first time and then every ~60s). + if (includeThreadDump) { + System.err.println(" --- Threads waiting on / holding ReentrantLock synchronizers (best effort) ---"); + ThreadInfo[] all = bean.dumpAllThreads(true, true); + if (all != null) { + for (ThreadInfo info : all) { + if (info == null) { + continue; + } + if (isReentrantLockRelated(info) + || (ownerThreadName != null && ownerThreadName.equals(info.getThreadName()))) { + printThreadInfo(info, 60); + } + } + } + } + } catch (Throwable t) { + // Diagnostics must never prevent progress/interrupt handling. + System.err.println("LMDB writeTxnLock diagnostics failed: " + t); + } + } + + private static boolean isReentrantLockRelated(ThreadInfo info) { + LockInfo lock = info.getLockInfo(); + if (lock != null && lock.getClassName() != null + && lock.getClassName().startsWith("java.util.concurrent.locks.ReentrantLock")) { + return true; + } + LockInfo[] synchronizers = info.getLockedSynchronizers(); + if (synchronizers != null) { + for (LockInfo li : synchronizers) { + if (li != null && li.getClassName() != null + && li.getClassName().startsWith("java.util.concurrent.locks.ReentrantLock")) { + return true; + } + } + } + return false; + } + + private static void printThreadInfo(ThreadInfo info, int maxFrames) { + System.err.println("Thread \"" + info.getThreadName() + "\" id=" + info.getThreadId() + + " state=" + info.getThreadState()); + if (info.getLockInfo() != null) { + System.err.println(" waitingOn=" + info.getLockInfo()); + } + if (info.getLockOwnerName() != null) { + System.err.println(" lockOwner=" + info.getLockOwnerName() + " (id=" + info.getLockOwnerId() + ")"); + } + StackTraceElement[] stack = info.getStackTrace(); + if (stack != null) { + int limit = Math.min(stack.length, Math.max(0, maxFrames)); + for (int frame = 0; frame < limit; frame++) { + System.err.println("\tat " + stack[frame]); + } + if (stack.length > limit) { + System.err.println("\t... " + (stack.length - limit) + " more"); + } + } + System.err.println(); + } + + private static String extractOwnerThreadName(String lockString) { + if (lockString == null) { + return null; + } + int idx = lockString.indexOf("Locked by thread "); + if (idx < 0) { + return null; + } + int start = idx + "Locked by thread ".length(); + int end = lockString.indexOf(']', start); + if (end < 0) { + end = lockString.length(); + } + String name = lockString.substring(start, end).trim(); + return name.isEmpty() ? null : name; + } + + private static void dumpThreadByName(String threadName, int maxFrames) { + try { + for (Map.Entry entry : Thread.getAllStackTraces().entrySet()) { + Thread t = entry.getKey(); + if (t != null && threadName.equals(t.getName())) { + System.err.println(" --- Owner thread stack (" + t + ") ---"); + StackTraceElement[] stack = entry.getValue(); + if (stack != null) { + int limit = Math.min(stack.length, Math.max(0, maxFrames)); + for (int frame = 0; frame < limit; frame++) { + System.err.println("\tat " + stack[frame]); + } + if (stack.length > limit) { + System.err.println("\t... " + (stack.length - limit) + " more"); + } + } + System.err.println(); + return; + } + } + System.err.println(" (Could not find owner thread named: " + threadName + ")"); + + System.err.println("----------------------------------------------------------"); + System.err.println(" Available threads: "); + + for (Map.Entry entry : Thread.getAllStackTraces().entrySet()) { + Thread t = entry.getKey(); + if (t != null) { + System.err.println(" - " + t.getName() + " (id=" + t.getId() + ")"); + } + } + System.err.println("----------------------------------------------------------"); + System.err.println(); + + } catch (Throwable t) { + System.err.println(" (Owner thread dump failed: " + t + ")"); + } + } +} diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TripleStore.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TripleStore.java index 15edbb09b4a..88847b91c41 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TripleStore.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TripleStore.java @@ -8,6 +8,7 @@ * * SPDX-License-Identifier: BSD-3-Clause *******************************************************************************/ +// Some portions generated by Codex package org.eclipse.rdf4j.sail.lmdb; import static org.eclipse.rdf4j.sail.lmdb.LmdbUtil.E; @@ -168,6 +169,7 @@ class TripleStore implements Closeable { private final boolean autoGrow; private long mapSize; private long writeTxn; + private long writeTxnOwnerThreadId = -1; private final TxnManager txnManager; private TxnRecordCache recordCache = null; @@ -312,6 +314,21 @@ TxnManager getTxnManager() { return txnManager; } + Txn createReadTxnForCurrentThread() throws IOException { + if (isWriteTxnActiveOnCurrentThread()) { + return txnManager.createTxn(writeTxn); + } + return txnManager.createReadTxn(); + } + + Txn createPinnedReadTxn() throws IOException { + return txnManager.createPinnedReadTxn(); + } + + boolean isWriteTxnActiveOnCurrentThread() { + return writeTxn != 0 && writeTxnOwnerThreadId == Thread.currentThread().getId(); + } + /** * Parses a comma/whitespace-separated list of index specifications. Index specifications are required to consists * of 4 characters: 's', 'p', 'o' and 'c'. @@ -1085,6 +1102,7 @@ public void startTransaction() throws IOException { E(mdb_txn_begin(env, NULL, 0, pp)); writeTxn = pp.get(0); + writeTxnOwnerThreadId = Thread.currentThread().getId(); } } @@ -1142,6 +1160,7 @@ void endTransaction(boolean commit) throws IOException { } } finally { writeTxn = 0; + writeTxnOwnerThreadId = -1; // ensure that record cache is always reset if (recordCache != null) { try { diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TxnManager.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TxnManager.java index 7b5ec043da3..c3dd51794d6 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TxnManager.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TxnManager.java @@ -8,6 +8,7 @@ * * SPDX-License-Identifier: BSD-3-Clause *******************************************************************************/ +// Some portions generated by Codex package org.eclipse.rdf4j.sail.lmdb; import static org.eclipse.rdf4j.sail.lmdb.LmdbUtil.E; @@ -86,6 +87,10 @@ Txn createReadTxn() throws IOException { return txnRef; } + Txn createPinnedReadTxn() throws IOException { + return new Txn(createReadTxnInternal()); + } + long createReadTxnInternal() throws IOException { long txn = 0; if (mode == Mode.RESET) { diff --git a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/ValueStore.java b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/ValueStore.java index e6174ae9230..75bd07b2dee 100644 --- a/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/ValueStore.java +++ b/core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/ValueStore.java @@ -8,6 +8,7 @@ * * SPDX-License-Identifier: BSD-3-Clause *******************************************************************************/ +// Some portions generated by Codex package org.eclipse.rdf4j.sail.lmdb; import static org.eclipse.rdf4j.sail.lmdb.LmdbUtil.E; @@ -172,6 +173,7 @@ class ValueStore extends AbstractValueFactory { // database with internal reference counts for IRIs and namespaces private int refCountsDbi; private long writeTxn; + private long writeTxnOwnerThreadId = -1; private final boolean forceSync; private final boolean autoGrow; private boolean invalidateRevisionOnCommit = false; @@ -212,6 +214,7 @@ class ValueStore extends AbstractValueFactory { readTxn.cleaner(cleaner); return readTxn; }); + private final ThreadLocal pinnedReadTxn = new ThreadLocal<>(); @SuppressWarnings("unused") private Object[] previousNamespaceEntry; @@ -845,12 +848,38 @@ private long findId(byte[] data, boolean create) throws IOException { return id != null ? id : LmdbValue.UNKNOWN_ID; } + PinnedReadTxn pinReadTxn() throws IOException { + PinnedReadTxn pinned = pinnedReadTxn.get(); + if (pinned == null) { + pinned = new PinnedReadTxn(startReadTxn()); + pinnedReadTxn.set(pinned); + } + return pinned; + } + + void clearPinnedReadTxn(PinnedReadTxn pinned) { + if (pinned == null) { + return; + } + PinnedReadTxn current = pinnedReadTxn.get(); + if (current == null || current != pinned) { + pinned.close(); + return; + } + pinnedReadTxn.remove(); + pinned.close(); + } + T readTransaction(long env, Transaction transaction) throws IOException { txnLock.readLock().lock(); try { - if (writeTxn != 0) { + if (isWriteTxnOwnedByCurrentThread()) { return LmdbUtil.readTransaction(env, writeTxn, transaction); } + PinnedReadTxn pinned = pinnedReadTxn.get(); + if (pinned != null) { + return LmdbUtil.readTransaction(env, pinned.get(), transaction); + } return threadLocalReadTxn.get().execute(transaction, env); } finally { txnLock.readLock().unlock(); @@ -858,12 +887,26 @@ T readTransaction(long env, Transaction transaction) throws IOException { } T writeTransaction(Transaction transaction) throws IOException { - if (writeTxn != 0) { + if (isWriteTxnOwnedByCurrentThread()) { try (MemoryStack stack = MemoryStack.stackPush()) { return transaction.exec(stack, writeTxn); } - } else { - return LmdbUtil.transaction(env, transaction); + } + if (writeTxn != 0) { + throw new IllegalStateException("Write transaction is active on a different thread"); + } + return LmdbUtil.transaction(env, transaction); + } + + private boolean isWriteTxnOwnedByCurrentThread() { + return writeTxn != 0 && writeTxnOwnerThreadId == Thread.currentThread().getId(); + } + + private long startReadTxn() throws IOException { + try (MemoryStack stack = MemoryStack.stackPush()) { + PointerBuffer pp = stack.mallocPointer(1); + E(mdb_txn_begin(env, NULL, MDB_RDONLY, pp)); + return pp.get(0); } } @@ -886,6 +929,19 @@ public long getId(Value value) throws IOException { return getId(value, false); } + long getIdUnpinned(Value value) throws IOException { + PinnedReadTxn pinned = pinnedReadTxn.get(); + if (pinned == null) { + return getId(value); + } + pinnedReadTxn.remove(); + try { + return getId(value); + } finally { + pinnedReadTxn.set(pinned); + } + } + private final ConcurrentHashMap commonVocabulary = new ConcurrentHashMap<>(); /** @@ -1043,7 +1099,7 @@ protected void deleteValueToIdMappings(MemoryStack stack, long txn, Collection rev } public void startTransaction(boolean resize) throws IOException { + if (writeTxn != 0) { + if (isWriteTxnOwnedByCurrentThread()) { + return; + } + throw new IllegalStateException("Write transaction is active on a different thread"); + } try (MemoryStack stack = stackPush()) { PointerBuffer pp = stack.mallocPointer(1); E(mdb_txn_begin(env, NULL, 0, pp)); writeTxn = pp.get(0); + writeTxnOwnerThreadId = Thread.currentThread().getId(); // delete unused IDs if required on a regular basis // this is also run after opening the database @@ -1242,6 +1305,7 @@ void endTransaction(boolean commit) throws IOException { mdb_txn_abort(writeTxn); } writeTxn = 0; + writeTxnOwnerThreadId = -1; invalidateRevisionOnCommit = false; } } @@ -1315,6 +1379,11 @@ private void closeReadTransactions() { readTxn.close(); } threadLocalReadTxn.remove(); + PinnedReadTxn pinned = pinnedReadTxn.get(); + if (pinned != null) { + pinned.close(); + } + pinnedReadTxn.remove(); } finally { txnLock.writeLock().unlock(); } @@ -1335,6 +1404,28 @@ public void close() throws IOException { } } + static final class PinnedReadTxn implements AutoCloseable { + + private long txn; + + PinnedReadTxn(long txn) { + this.txn = txn; + } + + long get() { + return txn; + } + + @Override + public void close() { + long toClose = txn; + txn = 0; + if (toClose != 0) { + mdb_txn_abort(toClose); + } + } + } + private static final class ReadTxn { private final State state = new State(-1); diff --git a/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStoreTest.java b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStoreTest.java index b735074c00c..1212e3301df 100644 --- a/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStoreTest.java +++ b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStoreTest.java @@ -8,13 +8,26 @@ * * SPDX-License-Identifier: BSD-3-Clause *******************************************************************************/ +// Some portions generated by Codex package org.eclipse.rdf4j.sail.lmdb; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.File; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import org.eclipse.rdf4j.common.iteration.CloseableIteration; import org.eclipse.rdf4j.common.iteration.EmptyIteration; @@ -32,6 +45,8 @@ import org.eclipse.rdf4j.repository.sail.SailRepository; import org.eclipse.rdf4j.sail.SailException; import org.eclipse.rdf4j.sail.base.SailDataset; +import org.eclipse.rdf4j.sail.base.SailSink; +import org.eclipse.rdf4j.sail.base.SailSource; import org.eclipse.rdf4j.sail.lmdb.config.LmdbStoreConfig; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -209,6 +224,237 @@ public void testInferredSourceHasEmptyIterationWithoutInferredStatements() throw } } + @Test + public void testLazyValueResolutionAfterConcurrentDelete() throws Exception { + LmdbStore sail = (LmdbStore) ((SailRepository) repo).getSail(); + LmdbSailStore backingStore = sail.getBackingStore(); + ValueStore valueStore = backingStore.getValueStore(); + IRI subj = F.createIRI("http://example.org/temp"); + IRI pred = F.createIRI("http://example.org/p"); + var literal = F.createLiteral("temp"); + + try (RepositoryConnection conn = repo.getConnection()) { + conn.add(subj, pred, literal); + } + Statement stmt; + try (SailDataset dataset = backingStore.getExplicitSailSource().dataset(IsolationLevels.READ_COMMITTED)) { + try (CloseableIteration iter = dataset.getStatements(subj, pred, null)) { + assertTrue(iter.hasNext()); + stmt = iter.next(); + } + + try (RepositoryConnection conn = repo.getConnection()) { + conn.begin(); + conn.remove(subj, pred, literal); + conn.commit(); + } + + long objId = valueStore.getId(literal); + Set ids = new HashSet<>(); + ids.add(objId); + valueStore.gcIds(ids, new HashSet<>()); + + assertNotNull(stmt.getObject().stringValue()); + } + } + + @Test + public void testCommitDoesNotExposePartialState(@TempDir File dataDir) throws Exception { + CountDownLatch commitReached = new CountDownLatch(1); + CountDownLatch allowCommit = new CountDownLatch(1); + BlockingCommitLmdbSailStore store = new BlockingCommitLmdbSailStore(dataDir, + new LmdbStoreConfig("spoc,posc"), commitReached, allowCommit); + AtomicReference commitFailure = new AtomicReference<>(); + ExecutorService readerExecutor = Executors.newSingleThreadExecutor(); + Future readerFuture = null; + Thread commitThread = null; + try { + LmdbSailStore.ConnectionSailStore readerStore = store.createConnectionStore(); + + IRI subj = F.createIRI("urn:commit"); + IRI pred = F.createIRI("urn:predicate"); + var literal = F.createLiteral("value"); + + commitThread = new Thread(() -> { + try { + LmdbSailStore.ConnectionSailStore writerStore = store.createConnectionStore(); + SailSource writerSource = writerStore.getExplicitSailSource(); + SailSink writerSink = writerSource.sink(IsolationLevels.SNAPSHOT_READ); + writerSink.approve(subj, pred, literal, null); + writerSink.flush(); + writerSink.close(); + } catch (Throwable t) { + commitFailure.set(t); + } + }); + commitThread.start(); + + assertTrue("Commit did not reach value store phase in time", commitReached.await(10, TimeUnit.SECONDS)); + + readerFuture = readerExecutor.submit(() -> { + SailSource readerSource = readerStore.getExplicitSailSource(); + try (SailDataset dataset = readerSource.dataset(IsolationLevels.SNAPSHOT_READ); + CloseableIteration iter = dataset.getStatements(subj, pred, null)) { + if (!iter.hasNext()) { + return false; + } + Statement statement = iter.next(); + return statement.getObject().stringValue() != null; + } + }); + + try { + readerFuture.get(200, TimeUnit.MILLISECONDS); + fail("Reader should block while commit is in progress"); + } catch (TimeoutException expected) { + // expected: commit lock should block readers + } + + allowCommit.countDown(); + assertTrue("Reader did not complete after commit", readerFuture.get(10, TimeUnit.SECONDS)); + } finally { + allowCommit.countDown(); + if (readerFuture != null) { + readerFuture.cancel(true); + } + readerExecutor.shutdownNow(); + if (commitThread != null) { + commitThread.join(TimeUnit.SECONDS.toMillis(10)); + } + if (commitFailure.get() != null) { + throw new AssertionError("Commit failed unexpectedly", commitFailure.get()); + } + store.close(); + } + } + + @Test + public void testSnapshotBeginBlocksDuringCommit(@TempDir File dataDir) throws Exception { + CountDownLatch commitReached = new CountDownLatch(1); + CountDownLatch allowCommit = new CountDownLatch(1); + BlockingCommitLmdbSailStore store = new BlockingCommitLmdbSailStore(dataDir, + new LmdbStoreConfig("spoc,posc"), commitReached, allowCommit); + AtomicReference commitFailure = new AtomicReference<>(); + ExecutorService beginExecutor = Executors.newSingleThreadExecutor(); + Future beginFuture = null; + Thread commitThread = null; + try { + IRI subj = F.createIRI("urn:begin"); + IRI pred = F.createIRI("urn:predicate"); + var literal = F.createLiteral("value"); + + commitThread = new Thread(() -> { + try { + LmdbSailStore.ConnectionSailStore writerStore = store.createConnectionStore(); + SailSource writerSource = writerStore.getExplicitSailSource(); + SailSink writerSink = writerSource.sink(IsolationLevels.SNAPSHOT_READ); + writerSink.approve(subj, pred, literal, null); + writerSink.flush(); + writerSink.close(); + } catch (Throwable t) { + commitFailure.set(t); + } + }); + commitThread.start(); + + assertTrue("Commit did not reach value store phase in time", commitReached.await(10, TimeUnit.SECONDS)); + + beginFuture = beginExecutor.submit(() -> { + LmdbSailStore.ConnectionSailStore readerStore = store.createConnectionStore(); + try { + readerStore.initTransaction(IsolationLevels.SNAPSHOT); + } finally { + readerStore.endTransaction(false); + } + }); + + try { + beginFuture.get(200, TimeUnit.MILLISECONDS); + fail("Transaction begin should block while commit is in progress"); + } catch (TimeoutException expected) { + // expected: commit lock should block begin + } + + allowCommit.countDown(); + beginFuture.get(10, TimeUnit.SECONDS); + } finally { + allowCommit.countDown(); + if (beginFuture != null) { + beginFuture.cancel(true); + } + beginExecutor.shutdownNow(); + if (commitThread != null) { + commitThread.join(TimeUnit.SECONDS.toMillis(10)); + } + if (commitFailure.get() != null) { + throw new AssertionError("Commit failed unexpectedly", commitFailure.get()); + } + store.close(); + } + } + + @Test + public void testSnapshotConcurrentWritesDoNotDeadlock(@TempDir File dataDir) throws Exception { + ExecutorService executor = Executors.newSingleThreadExecutor(runnable -> { + Thread thread = new Thread(runnable, "lmdb-snapshot-write-test"); + thread.setDaemon(true); + return thread; + }); + Future future = executor.submit(() -> { + Repository localRepo = new SailRepository(new LmdbStore(dataDir, new LmdbStoreConfig("spoc,posc"))); + localRepo.init(); + try (RepositoryConnection first = localRepo.getConnection(); + RepositoryConnection second = localRepo.getConnection()) { + first.begin(IsolationLevels.SNAPSHOT_READ); + second.begin(IsolationLevels.SNAPSHOT_READ); + IRI subj1 = F.createIRI("urn:first"); + IRI subj2 = F.createIRI("urn:second"); + first.add(subj1, RDFS.LABEL, F.createLiteral("one")); + second.add(subj2, RDFS.LABEL, F.createLiteral("two")); + first.commit(); + second.commit(); + } finally { + localRepo.shutDown(); + } + }); + try { + future.get(2, TimeUnit.SECONDS); + } catch (TimeoutException e) { + fail("Concurrent snapshot writes should not deadlock"); + } finally { + future.cancel(true); + executor.shutdownNow(); + } + } + + private static final class BlockingCommitLmdbSailStore extends LmdbSailStore { + + private final CountDownLatch commitReached; + private final CountDownLatch allowCommit; + + BlockingCommitLmdbSailStore(File dataDir, LmdbStoreConfig config, CountDownLatch commitReached, + CountDownLatch allowCommit) + throws IOException, SailException { + super(dataDir, config); + this.commitReached = commitReached; + this.allowCommit = allowCommit; + } + + @Override + void handleRemovedIdsInValueStore() throws IOException { + commitReached.countDown(); + try { + if (!allowCommit.await(10, TimeUnit.SECONDS)) { + throw new IOException("Timed out waiting to resume commit"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while waiting to resume commit", e); + } + super.handleRemovedIdsInValueStore(); + } + } + @AfterEach public void after() { repo.shutDown(); diff --git a/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/ThemeQueryBenchmark.java b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/ThemeQueryBenchmark.java index 4359fa4ce92..d9214c6477b 100644 --- a/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/ThemeQueryBenchmark.java +++ b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/ThemeQueryBenchmark.java @@ -152,6 +152,7 @@ public void testQueryCounts() throws IOException { } @Test + @Disabled public void testQueryExplanation() throws IOException { String[] queryIndexes = paramValues("z_queryIndex"); String[] themeNames = paramValues("themeName"); diff --git a/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/TransactionsPerSecondBenchmark.java b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/TransactionsPerSecondBenchmark.java index dd20a7ceac4..01c11c57b4f 100644 --- a/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/TransactionsPerSecondBenchmark.java +++ b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/TransactionsPerSecondBenchmark.java @@ -140,6 +140,24 @@ public void mediumTransactionsLevelNone() { connection.commit(); } + @Benchmark + public void mediumTransactionsLevelSnapshot() { + connection.begin(IsolationLevels.SNAPSHOT); + for (int k = 0; k < 10; k++) { + connection.add(randomResource(), randomPredicate(), literalGenerator.createRandomLiteral()); + } + connection.commit(); + } + + @Benchmark + public void mediumTransactionsLevelSerializable() { + connection.begin(IsolationLevels.SERIALIZABLE); + for (int k = 0; k < 10; k++) { + connection.add(randomResource(), randomPredicate(), literalGenerator.createRandomLiteral()); + } + connection.commit(); + } + @Benchmark public void largerTransaction() { connection.begin(); diff --git a/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/TransactionsPerSecondMultithreadedBenchmark.java b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/TransactionsPerSecondMultithreadedBenchmark.java new file mode 100644 index 00000000000..c37176ecd22 --- /dev/null +++ b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/TransactionsPerSecondMultithreadedBenchmark.java @@ -0,0 +1,206 @@ +/******************************************************************************* + * Copyright (c) 2025 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ + +package org.eclipse.rdf4j.sail.lmdb.benchmark; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.io.FileUtils; +import org.assertj.core.util.Files; +import org.eclipse.rdf4j.common.transaction.IsolationLevels; +import org.eclipse.rdf4j.model.IRI; +import org.eclipse.rdf4j.repository.sail.SailRepository; +import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection; +import org.eclipse.rdf4j.sail.lmdb.LmdbStore; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +/** + * Benchmarks insertion performance with synthetic data using multiple threads. + */ +@State(Scope.Benchmark) +@Warmup(iterations = 5) +@BenchmarkMode({ Mode.Throughput }) +@Fork(value = 1, jvmArgs = { "-Xms2G", "-Xmx2G", "-XX:+UseG1GC" }) +@Measurement(iterations = 5) +@OutputTimeUnit(TimeUnit.SECONDS) +@Threads(4) +public class TransactionsPerSecondMultithreadedBenchmark { + + RandomLiteralGenerator literalGenerator; + Random random; + int i; + List resources; + List predicates; + protected SailRepository repository; + protected File file; + protected boolean forceSync = false; + + public static void main(String[] args) throws RunnerException { + Options opt = new OptionsBuilder() + .include("TransactionsPerSecondBenchmark\\.") // adapt to control which benchmarks to run + .forks(1) + .build(); + + new Runner(opt).run(); + } + + @Setup(Level.Iteration) + public void beforeClass() { + i = 0; + file = Files.newTemporaryFolder(); + + LmdbStore sail = new LmdbStore(file, ConfigUtil.createConfig().setForceSync(forceSync)); + repository = new SailRepository(sail); + random = new Random(1337); + try (SailRepositoryConnection connection = repository.getConnection()) { + + literalGenerator = new RandomLiteralGenerator(connection.getValueFactory(), random); + resources = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + resources.add(connection.getValueFactory().createIRI("some:resource-" + i)); + } + predicates = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + predicates.add(connection.getValueFactory().createIRI("some:predicate-" + i)); + } + } + + System.gc(); + } + + IRI randomResource() { + return resources.get(random.nextInt(resources.size())); + } + + IRI randomPredicate() { + return predicates.get(random.nextInt(predicates.size())); + } + + @TearDown(Level.Iteration) + public void afterClass() throws IOException { + repository.shutDown(); + FileUtils.deleteDirectory(file); + + } + + @Benchmark + public void transactions() { + try (SailRepositoryConnection connection = repository.getConnection()) { + + connection.begin(); + connection.add(randomResource(), randomPredicate(), literalGenerator.createRandomLiteral()); + connection.commit(); + } + } + + @Benchmark + public void transactionsLevelNone() { + try (SailRepositoryConnection connection = repository.getConnection()) { + + connection.begin(IsolationLevels.NONE); + connection.add(randomResource(), randomPredicate(), literalGenerator.createRandomLiteral()); + connection.commit(); + } + } + + @Benchmark + public void mediumTransactionsLevelNone() { + try (SailRepositoryConnection connection = repository.getConnection()) { + + connection.begin(IsolationLevels.NONE); + for (int k = 0; k < 10; k++) { + connection.add(randomResource(), randomPredicate(), literalGenerator.createRandomLiteral()); + } + connection.commit(); + } + } + + @Benchmark + public void mediumTransactionsLevelSnapshot() { + try (SailRepositoryConnection connection = repository.getConnection()) { + + connection.begin(IsolationLevels.SNAPSHOT); + for (int k = 0; k < 10; k++) { + connection.add(randomResource(), randomPredicate(), literalGenerator.createRandomLiteral()); + } + connection.commit(); + } + } + + @Benchmark + public void mediumTransactionsLevelSerializable() { + try (SailRepositoryConnection connection = repository.getConnection()) { + + connection.begin(IsolationLevels.SERIALIZABLE); + for (int k = 0; k < 10; k++) { + connection.add(randomResource(), randomPredicate(), literalGenerator.createRandomLiteral()); + } + connection.commit(); + } + } + + @Benchmark + public void largerTransaction() { + try (SailRepositoryConnection connection = repository.getConnection()) { + + connection.begin(); + for (int k = 0; k < 10000; k++) { + connection.add(randomResource(), randomPredicate(), literalGenerator.createRandomLiteral()); + } + connection.commit(); + } + } + + @Benchmark + public void largerTransactionLevelNone() { + try (SailRepositoryConnection connection = repository.getConnection()) { + + connection.begin(IsolationLevels.NONE); + for (int k = 0; k < 10000; k++) { + connection.add(randomResource(), randomPredicate(), literalGenerator.createRandomLiteral()); + } + connection.commit(); + } + } + + @Benchmark + public void veryLargerTransactionLevelNone() { + try (SailRepositoryConnection connection = repository.getConnection()) { + + connection.begin(IsolationLevels.NONE); + for (int k = 0; k < 1000000; k++) { + connection.add(randomResource(), randomPredicate(), literalGenerator.createRandomLiteral()); + } + connection.commit(); + } + } +}