diff --git a/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/Changeset.java b/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/Changeset.java index 2283a3e1c96..2ed3634052a 100644 --- a/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/Changeset.java +++ b/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/Changeset.java @@ -123,6 +123,39 @@ public abstract class Changeset implements SailSink, ModelFactory { private boolean closed; + public static boolean isOrderIndependent(Changeset changeset1, Changeset changeset2) { + Objects.requireNonNull(changeset1, "changeset1"); + Objects.requireNonNull(changeset2, "changeset2"); + + boolean c1StatementChanges = hasStatementChanges(changeset1); + boolean c2StatementChanges = hasStatementChanges(changeset2); + boolean c1NamespaceChanges = hasNamespaceChanges(changeset1); + boolean c2NamespaceChanges = hasNamespaceChanges(changeset2); + + // Both changesets are order independent if they don't add or remove anything that the other changeset modifies. + // To simplify the check we will assume that if one changeset only modifies namespaces and the other only + // statements then they are order independent. + if ((c1NamespaceChanges && !c1StatementChanges && c2StatementChanges && !c2NamespaceChanges) + || (c2NamespaceChanges && !c2StatementChanges && c1StatementChanges && !c1NamespaceChanges)) { + return true; + } + + return false; + } + + private static boolean hasNamespaceChanges(Changeset changeset) { + Map addedNamespaces = changeset.getAddedNamespaces(); + Set removedPrefixes = changeset.getRemovedPrefixes(); + return changeset.namespaceCleared || (addedNamespaces != null && !addedNamespaces.isEmpty()) + || (removedPrefixes != null && !removedPrefixes.isEmpty()); + } + + private static boolean hasStatementChanges(Changeset changeset) { + Set deprecatedContexts = changeset.getDeprecatedContexts(); + return changeset.statementCleared || changeset.hasApproved() || changeset.hasDeprecated() + || (deprecatedContexts != null && !deprecatedContexts.isEmpty()); + } + @Override public void close() throws SailException { closed = true; diff --git a/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/SailSourceBranch.java b/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/SailSourceBranch.java index 95ed831dc6e..0f5950ce320 100644 --- a/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/SailSourceBranch.java +++ b/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/SailSourceBranch.java @@ -399,11 +399,38 @@ void merge(Changeset change) { void compressChanges() { try { semaphore.lock(); + + if (changes.size() == 2) { + boolean swap = false; + Changeset[] array = changes.toArray(new Changeset[0]); + if (!(array[0].hasDeprecated() || array[0].hasApproved()) + && (array[1].hasDeprecated() || array[1].hasApproved())) { + + if (Changeset.isOrderIndependent(array[0], array[1])) { + + // The last change contains all the approved and deprecated statements, faster to swap before + // merging. This is typically the case when parsing a file with namespaces at the beginning. + + Changeset second = changes.removeLast(); + if (changes.peekLast().isRefback()) { + changes.addLast(second); + return; + } + Changeset first = changes.removeLast(); + + changes.addFirst(second); + changes.addLast(first); + } + + } + + } + while (changes.size() > 1) { Changeset pop = changes.removeLast(); if (changes.peekLast().isRefback()) { changes.addLast(pop); - break; + return; } try { diff --git a/core/sail/base/src/test/java/org/eclipse/rdf4j/sail/base/ChangesetTest.java b/core/sail/base/src/test/java/org/eclipse/rdf4j/sail/base/ChangesetTest.java index b20c87a4d39..86a39824a79 100644 --- a/core/sail/base/src/test/java/org/eclipse/rdf4j/sail/base/ChangesetTest.java +++ b/core/sail/base/src/test/java/org/eclipse/rdf4j/sail/base/ChangesetTest.java @@ -10,6 +10,9 @@ *******************************************************************************/ package org.eclipse.rdf4j.sail.base; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + import java.util.Iterator; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -33,6 +36,90 @@ public class ChangesetTest { SimpleValueFactory vf = SimpleValueFactory.getInstance(); Resource[] allGraph = {}; + @Test + public void testOrderIndependentNamespacesVsStatements() { + Changeset namespacesOnly = getChangeset(); + namespacesOnly.setNamespace("ex", "http://example.org/"); + + Changeset statementsOnly = getChangeset(); + statementsOnly.approve(vf.createStatement(vf.createIRI("urn:s"), vf.createIRI("urn:p"), + vf.createLiteral("o"))); + + assertTrue(Changeset.isOrderIndependent(namespacesOnly, statementsOnly)); + assertTrue(Changeset.isOrderIndependent(statementsOnly, namespacesOnly)); + } + + @Test + public void testOrderDependentOnOverlappingStatements() { + Changeset change1 = getChangeset(); + Changeset change2 = getChangeset(); + + Statement statement = vf.createStatement(vf.createIRI("urn:s"), vf.createIRI("urn:p"), + vf.createLiteral("o")); + + change1.approve(statement); + change2.deprecate(statement); + + assertFalse(Changeset.isOrderIndependent(change1, change2)); + assertFalse(Changeset.isOrderIndependent(change2, change1)); + } + + @Test + public void testOrderIndependentWhenStatementsCleared() { + Changeset statementCleared = getChangeset(); + statementCleared.clear(); + + Changeset namespacesCleared = getChangeset(); + namespacesCleared.clearNamespaces(); + + assertTrue(Changeset.isOrderIndependent(statementCleared, namespacesCleared)); + assertTrue(Changeset.isOrderIndependent(namespacesCleared, statementCleared)); + } + + @Test + public void testOrderIndependentWhenDeprecatedStatements() { + Statement statement = vf.createStatement(vf.createIRI("urn:s"), vf.createIRI("urn:p"), + vf.createLiteral("o")); + + Changeset deprecatedOnly = getChangeset(); + deprecatedOnly.deprecate(statement); + + Changeset namespacesCleared = getChangeset(); + namespacesCleared.clearNamespaces(); + + assertTrue(Changeset.isOrderIndependent(deprecatedOnly, namespacesCleared)); + assertTrue(Changeset.isOrderIndependent(namespacesCleared, deprecatedOnly)); + } + + @Test + public void testOrderIndependentWhenClearingContexts() { + Changeset contextCleared = getChangeset(); + contextCleared.clear(vf.createIRI("urn:ctx")); + + Changeset namespaceRemoved = getChangeset(); + namespaceRemoved.removeNamespace("ex"); + + assertTrue(Changeset.isOrderIndependent(contextCleared, namespaceRemoved)); + assertTrue(Changeset.isOrderIndependent(namespaceRemoved, contextCleared)); + } + + @Test + public void testOrderDependentWhenMixingNamespacesAndStatements() { + Statement statement = vf.createStatement(vf.createIRI("urn:s"), vf.createIRI("urn:p"), + vf.createLiteral("o")); + + Changeset mixedChanges = getChangeset(); + mixedChanges.approve(statement); + mixedChanges.setNamespace("ex", "http://example.org/"); + + Changeset statementsOnly = getChangeset(); + statementsOnly.approve(vf.createStatement(vf.createIRI("urn:s2"), vf.createIRI("urn:p2"), + vf.createLiteral("o2"))); + + assertFalse(Changeset.isOrderIndependent(mixedChanges, statementsOnly)); + assertFalse(Changeset.isOrderIndependent(statementsOnly, mixedChanges)); + } + @Test public void testConcurrency() { @@ -47,68 +134,72 @@ public void testConcurrency() { t.setDaemon(true); return t; }); + try { + + Runnable addingData = () -> { + countDownLatch.countDown(); + try { + countDownLatch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + for (int i = 0; i < 100000; i++) { + changeset.approve(vf.createStatement(vf.createBNode(), RDF.TYPE, RDFS.RESOURCE)); + } - Runnable addingData = () -> { - countDownLatch.countDown(); - try { - countDownLatch.await(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - for (int i = 0; i < 100000; i++) { - changeset.approve(vf.createStatement(vf.createBNode(), RDF.TYPE, RDFS.RESOURCE)); - } - - }; - - Runnable readingData = () -> { - countDownLatch.countDown(); - try { - countDownLatch.await(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - for (int i = 0; i < 100000; i++) { - changeset.hasApproved(null, RDF.TYPE, RDFS.RESOURCE, allGraph); - } - }; - - Runnable readingDataIterator = () -> { - countDownLatch.countDown(); - try { - countDownLatch.await(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - while (!changeset.hasApproved(null, RDF.TYPE, RDFS.RESOURCE, allGraph)) { - Thread.yield(); - } + }; - for (int i = 0; i < 100; i++) { - Iterator approvedStatements = changeset.getApprovedStatements(null, RDF.TYPE, null, allGraph) - .iterator(); + Runnable readingData = () -> { + countDownLatch.countDown(); + try { + countDownLatch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + for (int i = 0; i < 100000; i++) { + changeset.hasApproved(null, RDF.TYPE, RDFS.RESOURCE, allGraph); + } + }; + + Runnable readingDataIterator = () -> { + countDownLatch.countDown(); + try { + countDownLatch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } - for (int j = 0; j < 100 && approvedStatements.hasNext(); j++) { - approvedStatements.next(); + while (!changeset.hasApproved(null, RDF.TYPE, RDFS.RESOURCE, allGraph)) { + Thread.yield(); } - } - }; + for (int i = 0; i < 100; i++) { + Iterator approvedStatements = changeset + .getApprovedStatements(null, RDF.TYPE, null, allGraph) + .iterator(); - Stream.of(addingData, readingData, readingDataIterator) - .map(executorService::submit) - .collect(Collectors.toList()) - .forEach(future -> { - try { - future.get(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); + for (int j = 0; j < 100 && approvedStatements.hasNext(); j++) { + approvedStatements.next(); } - }); - executorService.shutdown(); - executorService.shutdownNow(); + } + }; + + Stream.of(addingData, readingData, readingDataIterator) + .map(executorService::submit) + .collect(Collectors.toList()) + .forEach(future -> { + try { + future.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + } finally { + executorService.shutdown(); + executorService.shutdownNow(); + } + } private Changeset getChangeset() { diff --git a/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/DatagovLoadIsolationBenchmark.java b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/DatagovLoadIsolationBenchmark.java new file mode 100644 index 00000000000..14195088935 --- /dev/null +++ b/core/sail/lmdb/src/test/java/org/eclipse/rdf4j/sail/lmdb/benchmark/DatagovLoadIsolationBenchmark.java @@ -0,0 +1,111 @@ +/******************************************************************************* + * Copyright (c) 2025 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.sail.lmdb.benchmark; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.io.FileUtils; +import org.assertj.core.util.Files; +import org.eclipse.rdf4j.common.transaction.IsolationLevels; +import org.eclipse.rdf4j.model.Model; +import org.eclipse.rdf4j.repository.sail.SailRepository; +import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection; +import org.eclipse.rdf4j.rio.Rio; +import org.eclipse.rdf4j.sail.lmdb.LmdbStore; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +/** + * Benchmarks loading the datagovbe-valid.ttl dataset in a single transaction across all supported LMDB isolation + * levels. + */ +@State(Scope.Benchmark) +@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@BenchmarkMode(Mode.AverageTime) +@Fork(value = 1, jvmArgs = { "-Xms2G", "-Xmx2G", "-XX:+UseG1GC" }) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class DatagovLoadIsolationBenchmark { + + private static final String DATA_FILE = "benchmarkFiles/datagovbe-valid.ttl"; + + @Param({ "NONE", "READ_COMMITTED", "SNAPSHOT_READ", "SNAPSHOT", "SERIALIZABLE" }) + public IsolationLevels isolationLevel; + + private Model data; + + @Setup(Level.Trial) + public void setup() throws IOException, InterruptedException { + try (InputStream resourceAsStream = Objects.requireNonNull( + DatagovLoadIsolationBenchmark.class.getClassLoader().getResourceAsStream(DATA_FILE), + "dataset resource not found: " + DATA_FILE)) { + this.data = Rio.parse(resourceAsStream, "", Rio.getParserFormatForFileName(DATA_FILE).orElseThrow()); + } + System.gc(); + Thread.sleep(100); + System.gc(); + Thread.sleep(100); + } + + public static void main(String[] args) throws RunnerException { + Options opt = new OptionsBuilder() + .include(DatagovLoadIsolationBenchmark.class.getSimpleName()) + .forks(0) + .build(); + + new Runner(opt).run(); + } + + @Benchmark + public boolean loadDatagovFileSingleTransaction() throws IOException { + return loadOnce(); + } + + boolean loadOnce() throws IOException { + File temporaryFolder = Files.newTemporaryFolder(); + SailRepository sailRepository = null; + try { + sailRepository = new SailRepository(new LmdbStore(temporaryFolder, ConfigUtil.createConfig())); + try (SailRepositoryConnection connection = sailRepository.getConnection()) { + connection.begin(isolationLevel); + connection.add(data); + connection.commit(); + return connection.hasStatement(null, null, null, true); + } + } finally { + try { + if (sailRepository != null) { + sailRepository.shutDown(); + } + } finally { + FileUtils.deleteDirectory(temporaryFolder); + } + } + } +}