Skip to content

Commit ca1abf3

Browse files
committed
GH-5291: Make it possible to disable transactions in LuceneSail
1 parent f92d108 commit ca1abf3

5 files changed

Lines changed: 271 additions & 10 deletions

File tree

core/sail/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/LuceneSail.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,37 @@ public class LuceneSail extends NotifyingSailWrapper {
289289
*/
290290
public static final String LUCENE_RAMDIR_KEY = "useramdir";
291291

292+
/**
293+
* Whether the LuceneIndex should support transactions and rollbacks (true by default). Set the key
294+
* "transactional=false" as sail parameter to DISABLE rollback support. This will improve performance if you are
295+
* doing a lot of inserts/updates and don't need rollback support. It will also run fsync only at an interval (see
296+
* {@link #FSYNC_INTERVAL_KEY}), instead of after each transaction commit. Changes in the index will be only visible
297+
* to readers after the periodic fsync is done.
298+
* <p>
299+
* See <a href="https://github.com/eclipse-rdf4j/rdf4j/issues/5291">the original issue</a>.
300+
*/
301+
public static final String TRANSACTIONAL_KEY = "transactional";
302+
303+
/**
304+
* Default value for {@link #TRANSACTIONAL_KEY}, true (rollbacks enabled, fsync after each commit).
305+
*/
306+
public static final boolean DEFAULT_TRANSACTIONAL = true;
307+
308+
/**
309+
* Set the key "fsyncInterval=&lt;t&gt;" as sail parameter to configure the interval in milliseconds in which fsync
310+
* is called on the Lucene index, default is defined in {@link #DEFAULT_FSYNC_INTERVAL}. Changes in the index will
311+
* become visible to readers at most after the interval is elapsed. If set to 0 or a negative value, fsync will be
312+
* disabled completely and there are no guarantees as to when your data will be persisted. This setting is only used
313+
* when {@link #TRANSACTIONAL_KEY} is set to false (rollbacks disabled).
314+
*/
315+
public static final String FSYNC_INTERVAL_KEY = "fsyncInterval";
316+
317+
/**
318+
* Default fsync interval in milliseconds, used when {@link #FSYNC_INTERVAL_KEY} is not set and
319+
* {@link #TRANSACTIONAL_KEY} is set to false (rollbacks disabled).
320+
*/
321+
public static final long DEFAULT_FSYNC_INTERVAL = 10_000L;
322+
292323
/**
293324
* Set the key "defaultNumDocs=&lt;n&gt;" as sail parameter to limit the maximum number of documents to return from
294325
* a search query. The default is to return all documents. NB: this may involve extra cost for some SearchIndex

core/sail/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/impl/LuceneIndex.java

Lines changed: 105 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727
import java.util.Map;
2828
import java.util.Properties;
2929
import java.util.Set;
30+
import java.util.concurrent.Executors;
31+
import java.util.concurrent.ScheduledThreadPoolExecutor;
32+
import java.util.concurrent.TimeUnit;
3033
import java.util.concurrent.atomic.AtomicBoolean;
3134
import java.util.function.Supplier;
3235

@@ -149,6 +152,12 @@ public class LuceneIndex extends AbstractLuceneIndex {
149152

150153
private volatile int fuzzyPrefixLength;
151154

155+
private boolean transactionsEnabled = LuceneSail.DEFAULT_TRANSACTIONAL;
156+
157+
private long fsyncIntervalMillis = LuceneSail.DEFAULT_FSYNC_INTERVAL;
158+
159+
private ScheduledThreadPoolExecutor fsyncScheduler;
160+
152161
/**
153162
* The IndexWriter that can be used to alter the index' contents. Created lazily.
154163
*/
@@ -212,6 +221,16 @@ public synchronized void initialize(Properties parameters) throws Exception {
212221
this.fuzzyPrefixLength = NumberUtils.toInt(parameters.getProperty(FUZZY_PREFIX_LENGTH_KEY), 0);
213222
}
214223

224+
if (parameters.containsKey(LuceneSail.TRANSACTIONAL_KEY)) {
225+
this.transactionsEnabled = Boolean.parseBoolean(
226+
parameters.getProperty(LuceneSail.TRANSACTIONAL_KEY));
227+
}
228+
if (parameters.containsKey(LuceneSail.FSYNC_INTERVAL_KEY)) {
229+
this.fsyncIntervalMillis = NumberUtils.toLong(
230+
parameters.getProperty(LuceneSail.FSYNC_INTERVAL_KEY),
231+
LuceneSail.DEFAULT_FSYNC_INTERVAL);
232+
}
233+
215234
postInit();
216235
}
217236

@@ -269,6 +288,43 @@ private void postInit() throws IOException {
269288
IndexWriter writer = new IndexWriter(directory, indexWriterConfig);
270289
writer.close();
271290
}
291+
292+
setUpFsyncScheduler();
293+
}
294+
295+
private void setUpFsyncScheduler() {
296+
// If transactions are disabled, launch a background thread to fsync periodically.
297+
if (this.transactionsEnabled || this.fsyncIntervalMillis <= 0) {
298+
return;
299+
}
300+
301+
// Use a daemon thread so the scheduler does not prevent JVM shutdown.
302+
this.fsyncScheduler = new ScheduledThreadPoolExecutor(1, r -> {
303+
Thread t = Executors.defaultThreadFactory().newThread(r);
304+
t.setDaemon(true);
305+
t.setName("rdf4j-lucene-fsync-" + t.getId());
306+
return t;
307+
});
308+
// Help GC by removing cancelled tasks from the queue.
309+
this.fsyncScheduler.setRemoveOnCancelPolicy(true);
310+
this.fsyncScheduler.scheduleAtFixedRate(
311+
() -> {
312+
try {
313+
if (this.getIndexWriter().hasUncommittedChanges()) {
314+
this.getIndexWriter().commit();
315+
invalidateReaders();
316+
}
317+
} catch (Throwable e) {
318+
// We just log errors here, there's not much else we can do.
319+
// Rethrowing exceptions on the next commit would be confusing, especially if
320+
// the error is transient.
321+
logger.error("Exception during Lucene index fsync", e);
322+
}
323+
},
324+
this.fsyncIntervalMillis,
325+
this.fsyncIntervalMillis,
326+
TimeUnit.MILLISECONDS
327+
);
272328
}
273329

274330
protected Function<String, ? extends SpatialStrategy> createSpatialStrategyMapper(Map<String, String> parameters) {
@@ -383,15 +439,47 @@ public void shutDown() throws IOException {
383439
oldmonitors.clear();
384440
}
385441
} finally {
442+
// shutdown fsync scheduler
386443
try {
387-
IndexWriter toCloseIndexWriter = indexWriter;
388-
indexWriter = null;
389-
if (toCloseIndexWriter != null) {
390-
toCloseIndexWriter.close();
444+
if (fsyncScheduler != null) {
445+
fsyncScheduler.shutdown();
446+
if (!fsyncScheduler.awaitTermination(10, TimeUnit.SECONDS)) {
447+
logger.error("Failed to shut down Lucene fsync scheduler within 10s");
448+
// Try interrupting the thread
449+
fsyncScheduler.shutdownNow();
450+
if (!fsyncScheduler.awaitTermination(10, TimeUnit.SECONDS)) {
451+
logger.error("Failed to forcibly shut down Lucene fsync scheduler within 10s");
452+
}
453+
}
391454
}
455+
} catch (InterruptedException e) {
456+
logger.error("Interrupted while trying to shut down Lucene fsync scheduler", e);
457+
Thread.currentThread().interrupt();
458+
} catch (Throwable e) {
459+
exceptions.add(e);
392460
} finally {
393-
if (!exceptions.isEmpty()) {
394-
throw new UndeclaredThrowableException(exceptions.get(0));
461+
// Do a final sync of any uncommitted changes
462+
IndexWriter toCloseIndexWriter = indexWriter;
463+
try {
464+
if (!this.transactionsEnabled && toCloseIndexWriter != null && toCloseIndexWriter.isOpen()
465+
&& toCloseIndexWriter.hasUncommittedChanges()) {
466+
toCloseIndexWriter.commit();
467+
}
468+
} catch (Throwable e) {
469+
logger.error("Failed to commit Lucene IndexWriter while closing", e);
470+
exceptions.add(e);
471+
} finally {
472+
// shutdown the index writer
473+
try {
474+
indexWriter = null;
475+
if (toCloseIndexWriter != null) {
476+
toCloseIndexWriter.close();
477+
}
478+
} finally {
479+
if (!exceptions.isEmpty()) {
480+
throw new UndeclaredThrowableException(exceptions.get(0));
481+
}
482+
}
395483
}
396484
}
397485
}
@@ -707,14 +795,21 @@ public synchronized void begin() throws IOException {
707795
*/
708796
@Override
709797
public synchronized void commit() throws IOException {
710-
getIndexWriter().commit();
711-
// the old IndexReaders/Searchers are not outdated
712-
invalidateReaders();
798+
if (this.transactionsEnabled) {
799+
getIndexWriter().commit();
800+
// the old IndexReaders/Searchers are not outdated
801+
invalidateReaders();
802+
} else {
803+
getIndexWriter().flush();
804+
}
713805
}
714806

715807
@Override
716808
public synchronized void rollback() throws IOException {
717-
getIndexWriter().rollback();
809+
if (this.transactionsEnabled) {
810+
getIndexWriter().rollback();
811+
}
812+
// If transactions are disabled, we cannot rollback changes, so we just ignore the rollback request
718813
}
719814

720815
// //////////////////////////////// Methods for querying the index

core/sail/lucene/src/test/java/org/eclipse/rdf4j/sail/lucene/impl/AbstractGenericLuceneTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,26 @@ public abstract class AbstractGenericLuceneTest {
119119

120120
protected abstract void configure(LuceneSail sail) throws IOException;
121121

122+
/**
123+
* How long to wait after a commit() to ensure index is updated.
124+
*
125+
* @return milliseconds to wait
126+
*/
127+
protected long waitAfterCommitMillis() {
128+
return 0;
129+
}
130+
131+
protected final void sleepAfterCommitIfNeeded() {
132+
long waitMillis = waitAfterCommitMillis();
133+
if (waitMillis > 0) {
134+
try {
135+
Thread.sleep(waitMillis);
136+
} catch (InterruptedException e) {
137+
Thread.currentThread().interrupt();
138+
}
139+
}
140+
}
141+
122142
@BeforeEach
123143
public void setUp() throws Exception {
124144
// set logging, uncomment this to get better logging for debugging
@@ -166,6 +186,7 @@ public void tearDown() throws RepositoryException {
166186

167187
@Test
168188
public void testComplexQueryTwo() throws MalformedQueryException, RepositoryException, QueryEvaluationException {
189+
sleepAfterCommitIfNeeded();
169190
// prepare the query
170191
StringBuilder buffer = new StringBuilder();
171192
buffer.append("SELECT ?Resource ?Matching ?Score ");
@@ -275,6 +296,7 @@ private void evaluate(String[] queries, ArrayList<List<Map<String, String>>> exp
275296
@Test
276297
public void testPredicateLuceneQueries()
277298
throws MalformedQueryException, RepositoryException, QueryEvaluationException {
299+
sleepAfterCommitIfNeeded();
278300
// prepare the query
279301
String[] queries = new String[] {
280302
"SELECT ?Resource ?Score ?Snippet \n"
@@ -337,6 +359,7 @@ public void testPredicateLuceneQueries()
337359

338360
@Test
339361
public void testSnippetQueries() throws MalformedQueryException, RepositoryException, QueryEvaluationException {
362+
sleepAfterCommitIfNeeded();
340363
// prepare the query
341364
// search for the term "one", but only in predicate 1
342365
StringBuilder buffer = new StringBuilder();
@@ -396,6 +419,7 @@ public void testSnippetLimitedToPredicate()
396419
localConnection.add(SUBJECT_1, PREDICATE_1, vf.createLiteral("but the unicorn charly said to goaway"));
397420
localConnection.add(SUBJECT_1, PREDICATE_2, vf.createLiteral("there was poor charly without a kidney"));
398421
localConnection.commit();
422+
sleepAfterCommitIfNeeded();
399423
}
400424

401425
// prepare the query
@@ -471,6 +495,7 @@ public void testCharlyTerm() {
471495
localConnection.add(SUBJECT_1, PREDICATE_1, vf.createLiteral("but the unicorn charly said to goaway"));
472496
localConnection.add(SUBJECT_1, PREDICATE_2, vf.createLiteral("there was poor charly without a kidney"));
473497
localConnection.commit();
498+
sleepAfterCommitIfNeeded();
474499
}
475500
// search for the term "charly" in all predicates
476501
StringBuilder buffer = new StringBuilder();
@@ -529,6 +554,7 @@ public void testCharlyTerm() {
529554

530555
@Test
531556
public void testGraphQuery() throws QueryEvaluationException, MalformedQueryException, RepositoryException {
557+
sleepAfterCommitIfNeeded();
532558
IRI score = vf.createIRI(LuceneSailSchema.NAMESPACE + "score");
533559
StringBuilder query = new StringBuilder();
534560

@@ -577,6 +603,7 @@ public void testGraphQuery() throws QueryEvaluationException, MalformedQueryExce
577603
@Test
578604
public void testQueryWithSpecifiedSubject()
579605
throws RepositoryException, MalformedQueryException, QueryEvaluationException {
606+
sleepAfterCommitIfNeeded();
580607
// fire a query with the subject pre-specified
581608
TupleQuery query = connection.prepareTupleQuery(QUERY_STRING);
582609
query.setBinding("Subject", SUBJECT_1);
@@ -594,6 +621,7 @@ public void testQueryWithSpecifiedSubject()
594621

595622
@Test
596623
public void testUnionQuery() throws RepositoryException, MalformedQueryException, QueryEvaluationException {
624+
sleepAfterCommitIfNeeded();
597625
String queryStr = "";
598626
queryStr += "PREFIX search: <http://www.openrdf.org/contrib/lucenesail#> ";
599627
queryStr += "PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#> ";
@@ -626,6 +654,7 @@ public void testContextHandling() {
626654
connection.add(SUBJECT_5, PREDICATE_1, vf.createLiteral("sfiveponectwo"), CONTEXT_2);
627655
connection.add(SUBJECT_5, PREDICATE_2, vf.createLiteral("sfiveptwoctwo"), CONTEXT_2);
628656
connection.commit();
657+
sleepAfterCommitIfNeeded();
629658
// connection.close();
630659
// connection = repository.getConnection();
631660
// connection.setAutoCommit(false);
@@ -640,6 +669,7 @@ public void testContextHandling() {
640669
// remove a context
641670
connection.clear(CONTEXT_1);
642671
connection.commit();
672+
sleepAfterCommitIfNeeded();
643673
assertNoQueryResult("sfourponecone");
644674
assertNoQueryResult("sfourptwocone");
645675
assertNoQueryResult("sfiveponecone");
@@ -659,6 +689,7 @@ public void testNullContextHandling() {
659689
connection.add(SUBJECT_5, PREDICATE_1, vf.createLiteral("sfiveponectwo"), CONTEXT_2);
660690
connection.add(SUBJECT_5, PREDICATE_2, vf.createLiteral("sfiveptwoctwo"), CONTEXT_2);
661691
connection.commit();
692+
sleepAfterCommitIfNeeded();
662693
// connection.close();
663694
// connection = repository.getConnection();
664695
// connection.setAutoCommit(false);
@@ -673,6 +704,7 @@ public void testNullContextHandling() {
673704
// remove a context
674705
connection.clear((Resource) null);
675706
connection.commit();
707+
sleepAfterCommitIfNeeded();
676708
assertNoQueryResult("sfourponecone");
677709
assertNoQueryResult("sfourptwocone");
678710
assertNoQueryResult("sfiveponecone");
@@ -682,6 +714,7 @@ public void testNullContextHandling() {
682714

683715
@Test
684716
public void testFuzzyQuery() throws MalformedQueryException, RepositoryException, QueryEvaluationException {
717+
sleepAfterCommitIfNeeded();
685718
// prepare the query
686719
// search for the term "one" with 80% fuzzyness
687720
StringBuilder buffer = new StringBuilder();
@@ -729,11 +762,13 @@ public void testFuzzyQuery() throws MalformedQueryException, RepositoryException
729762
@Test
730763
public void testReindexing() {
731764
sail.reindex();
765+
sleepAfterCommitIfNeeded();
732766
testComplexQueryTwo();
733767
}
734768

735769
@Test
736770
public void testPropertyVar() throws MalformedQueryException, RepositoryException, QueryEvaluationException {
771+
sleepAfterCommitIfNeeded();
737772
StringBuilder buffer = new StringBuilder();
738773
buffer.append("SELECT ?Resource ?Property \n");
739774
buffer.append("WHERE { \n");

0 commit comments

Comments
 (0)