Skip to content

Commit ffbe90f

Browse files
committed
mostly functional
1 parent 0886807 commit ffbe90f

1 file changed

Lines changed: 20 additions & 10 deletions

File tree

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbTxnContext.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,11 @@ void init(IsolationLevel level) throws SailException {
9292
this.level = level;
9393
initialized = true;
9494
snapshotWriteMode = IsolationLevels.SNAPSHOT.equals(level);
95-
snapshotReadMode = IsolationLevels.SNAPSHOT_READ.equals(level) || snapshotWriteMode
95+
snapshotReadMode = IsolationLevels.SNAPSHOT_READ.equals(level)
9696
|| IsolationLevels.SERIALIZABLE.equals(level);
97-
if (snapshotReadMode) {
97+
if (snapshotWriteMode) {
98+
ensureWriteTxnInitialized();
99+
} else if (snapshotReadMode) {
98100
ensurePinnedReadSnapshot();
99101
deferWrites = true;
100102
overlayChanges = true;
@@ -116,6 +118,10 @@ boolean hasStatementChanges() {
116118

117119
void markManagedTransaction() {
118120
managedTransaction = true;
121+
if (!snapshotWriteMode) {
122+
deferWrites = true;
123+
overlayChanges = true;
124+
}
119125
}
120126

121127
boolean isManagedTransaction() {
@@ -222,7 +228,8 @@ private void ensureWriteTxnInitialized() throws SailException {
222228
int i = 0;
223229
while (!b) {
224230
i++;
225-
System.err.println(Thread.currentThread() + " - LMDB writeTxnLock wait took more than " + i * 10 + " seconds");
231+
System.err.println(
232+
Thread.currentThread() + " - LMDB writeTxnLock wait took more than " + i * 10 + " seconds");
226233
// print lock diagnostics
227234
printWriteTxnLockDiagnostics(i * 10, i == 1 || i % 6 == 0);
228235

@@ -310,7 +317,7 @@ private void checkSerializableConflicts() throws SailException {
310317
return;
311318
}
312319
try (Txn snapshotTxn = acquirePinnedReadTxn();
313-
Txn currentTxn = store.getTripleStore().getTxnManager().createReadTxn()) {
320+
Txn currentTxn = store.getTripleStore().getTxnManager().createReadTxn()) {
314321
store.ensureObservedPatternsUnchanged(snapshotTxn, currentTxn, observedExplicit, true);
315322
store.ensureObservedPatternsUnchanged(snapshotTxn, currentTxn, observedInferred, false);
316323
} catch (IOException e) {
@@ -363,7 +370,7 @@ void beginUpdate() throws SailException {
363370
}
364371
deferWrites = true;
365372
overlayChanges = true;
366-
if (!(snapshotReadMode || snapshotWriteMode) && level != null) {
373+
if (!(snapshotReadMode || snapshotWriteMode) && level != null && !shouldDeferWriteFlush()) {
367374
ensureWriteTxn(level);
368375
}
369376
pendingUpdates.push(new PendingChanges());
@@ -381,7 +388,7 @@ void endUpdate() throws SailException {
381388
}
382389
if (deferWrites) {
383390
mergeIntoMain(completed);
384-
if (snapshotReadMode) {
391+
if (shouldDeferWriteFlush()) {
385392
return;
386393
}
387394
try {
@@ -406,7 +413,7 @@ void abortUpdate() {
406413
pendingUpdates.pop();
407414
}
408415
if (pendingUpdates.isEmpty() && deferWrites) {
409-
if (snapshotReadMode) {
416+
if (shouldDeferWriteFlush()) {
410417
return;
411418
}
412419
clearStatementChanges();
@@ -415,6 +422,10 @@ void abortUpdate() {
415422
}
416423
}
417424

425+
private boolean shouldDeferWriteFlush() {
426+
return managedTransaction && !snapshotWriteMode;
427+
}
428+
418429
void recordAdd(Resource subj, IRI pred, Value obj, Resource ctx, boolean explicit) {
419430
if (!deferWrites) {
420431
return;
@@ -643,7 +654,7 @@ private void applyPendingTo(Set<Quad> added, Set<Quad> removed, boolean explicit
643654
if (pendingUpdates.isEmpty()) {
644655
return;
645656
}
646-
for (var iterator = pendingUpdates.descendingIterator(); iterator.hasNext(); ) {
657+
for (var iterator = pendingUpdates.descendingIterator(); iterator.hasNext();) {
647658
PendingChanges pending = iterator.next();
648659
pending.applyTo(added, removed, explicit);
649660
}
@@ -653,7 +664,7 @@ private void applyPendingDeprecatedTo(Set<Quad> deprecated, boolean explicit) {
653664
if (pendingUpdates.isEmpty()) {
654665
return;
655666
}
656-
for (var iterator = pendingUpdates.descendingIterator(); iterator.hasNext(); ) {
667+
for (var iterator = pendingUpdates.descendingIterator(); iterator.hasNext();) {
657668
PendingChanges pending = iterator.next();
658669
pending.applyDeprecatedTo(deprecated, explicit);
659670
}
@@ -908,7 +919,6 @@ void apply(NamespaceStore base) {
908919
}
909920
}
910921

911-
912922
private void printWriteTxnLockDiagnostics(int waitedSeconds, boolean includeThreadDump) {
913923
try {
914924
System.err.println("LMDB writeTxnLock diagnostics after waiting " + waitedSeconds + " seconds");

0 commit comments

Comments
 (0)